PostgresStore
@rotorsoft/act-root / act-pg/src / PostgresStore
Class: PostgresStore
Defined in: libs/act-pg/src/PostgresStore.ts:58
See
- Store
PostgresStore is a production-ready event store adapter for Act, using PostgreSQL as the backend.
- Supports event sourcing, leasing, snapshots, and concurrency control.
- Designed for high-throughput, scalable, and reliable event storage.
- Implements the Act Store interface.
- https://github.com/rotorsoft/act-root
Example
import { PostgresStore } from "@act/pg";
const store = new PostgresStore({ schema: "my_schema", table: "events" });
await store.seed();
Implements
Constructors
Constructor
new PostgresStore(
config
):PostgresStore
Defined in: libs/act-pg/src/PostgresStore.ts:68
Create a new PostgresStore instance.
Parameters
config
Partial
<Config
> = {}
Partial configuration (host, port, user, password, schema, table, etc.)
Returns
PostgresStore
Properties
config
readonly
config:Config
Defined in: libs/act-pg/src/PostgresStore.ts:60
Methods
ack()
ack(
leases
):Promise
<void
>
Defined in: libs/act-pg/src/PostgresStore.ts:444
Acknowledge and release leases after processing, updating stream positions.
Parameters
leases
Lease
[]
Array of lease objects to acknowledge
Returns
Promise
<void
>
Promise that resolves when leases are acknowledged
Implementation of
commit()
commit<
E
>(stream
,msgs
,meta
,expectedVersion?
):Promise
<Message
<E
, keyofE
> &Readonly
<{created
:Date
;id
:number
;meta
:Readonly
<{causation
: {action?
:Readonly
<...> &object
;event?
: {id
:number
;name
:string
;stream
:string
; }; };correlation
:string
; }>;stream
:string
;version
:number
; }>[]>
Defined in: libs/act-pg/src/PostgresStore.ts:280
Commit new events to the store for a given stream, with concurrency control.
Type Parameters
E
E
extends Schemas
Parameters
stream
string
The stream name
msgs
Message
<E
, keyof E
>[]
Array of messages (event name and data)
meta
Event metadata (correlation, causation, etc.)
expectedVersion?
number
(Optional) Expected stream version for concurrency control
Returns
Promise
<Message
<E
, keyof E
> & Readonly
<{ created
: Date
; id
: number
; meta
: Readonly
<{ causation
: { action?
: Readonly
<...> & object
; event?
: { id
: number
; name
: string
; stream
: string
; }; }; correlation
: string
; }>; stream
: string
; version
: number
; }>[]>
Array of committed events
Throws
ConcurrencyError if the expected version does not match
Implementation of
dispose()
dispose():
Promise
<void
>
Defined in: libs/act-pg/src/PostgresStore.ts:79
Dispose of the store and close all database connections.
Returns
Promise
<void
>
Promise that resolves when all connections are closed
Implementation of
Store.dispose
drop()
drop():
Promise
<void
>
Defined in: libs/act-pg/src/PostgresStore.ts:166
Drop all tables and schema created by the store (for testing or cleanup).
Returns
Promise
<void
>
Promise that resolves when the schema is dropped
Implementation of
fetch()
fetch<
E
>(limit
):Promise
<{events
:Committed
<E
, keyofE
>[];streams
:string
[]; }>
Defined in: libs/act-pg/src/PostgresStore.ts:352
Fetch a batch of events and streams for processing (drain cycle).
Type Parameters
E
E
extends Schemas
Parameters
limit
number
The maximum number of events to fetch
Returns
Promise
<{ events
: Committed
<E
, keyof E
>[]; streams
: string
[]; }>
An object with arrays of streams and events
Implementation of
lease()
lease(
leases
):Promise
<object
[]>
Defined in: libs/act-pg/src/PostgresStore.ts:382
Lease streams for reaction processing, marking them as in-progress.
Parameters
leases
Lease
[]
Array of lease objects (stream, at, etc.)
Returns
Promise
<object
[]>
Array of leased objects with updated lease info
Implementation of
query()
query<
E
>(callback
,query?
,withSnaps?
):Promise
<number
>
Defined in: libs/act-pg/src/PostgresStore.ts:197
Query events from the store, optionally filtered by stream, event name, time, etc.
Type Parameters
E
E
extends Schemas
Parameters
callback
(event
) => void
Function called for each event found
query?
(Optional) Query filter (stream, names, before, after, etc.)
after?
number
= ...
backward?
boolean
= ...
before?
number
= ...
correlation?
string
= ...
created_after?
Date
= ...
created_before?
Date
= ...
limit?
number
= ...
names?
string
[] = ...
stream?
string
= ...
withSnaps?
boolean
= false
(Optional) If true, includes only events after the last snapshot
Returns
Promise
<number
>
The number of events found
Example
await store.query((event) => console.log(event), { stream: "A" });
Implementation of
seed()
seed():
Promise
<void
>
Defined in: libs/act-pg/src/PostgresStore.ts:88
Seed the database with required tables, indexes, and schema for event storage.
Returns
Promise
<void
>
Promise that resolves when seeding is complete
Throws
Error if seeding fails