PostgresStore
@rotorsoft/act-root / act-pg/src / PostgresStore
Class: PostgresStore
Defined in: libs/act-pg/src/PostgresStore.ts:57
See
- Store
PostgresStore is a production-ready event store adapter for Act, using PostgreSQL as the backend.
- Supports event sourcing, leasing, snapshots, and concurrency control.
- Designed for high-throughput, scalable, and reliable event storage.
- Implements the Act Store interface.
- https://github.com/rotorsoft/act-root
Example
import { PostgresStore } from "@act/pg";
const store = new PostgresStore({ schema: "my_schema", table: "events" });
await store.seed();
Implements
Constructors
Constructor
new PostgresStore(
config
):PostgresStore
Defined in: libs/act-pg/src/PostgresStore.ts:67
Create a new PostgresStore instance.
Parameters
config
Partial
<Config
> = {}
Partial configuration (host, port, user, password, schema, table, etc.)
Returns
PostgresStore
Properties
config
readonly
config:Config
Defined in: libs/act-pg/src/PostgresStore.ts:59
Methods
ack()
ack(
leases
):Promise
<Lease
[]>
Defined in: libs/act-pg/src/PostgresStore.ts:473
Acknowledge and release leases after processing, updating stream positions.
Parameters
leases
Lease
[]
Leases to acknowledge, including last processed watermark and lease holder.
Returns
Promise
<Lease
[]>
Acked leases.
Implementation of
block()
block(
leases
):Promise
<Lease
&object
[]>
Defined in: libs/act-pg/src/PostgresStore.ts:526
Block a stream for processing after failing to process and reaching max retries with blocking enabled.
Parameters
leases
Lease
& object
[]
Leases to block, including lease holder and last error message.
Returns
Promise
<Lease
& object
[]>
Blocked leases.
Implementation of
commit()
commit<
E
>(stream
,msgs
,meta
,expectedVersion?
):Promise
<Message
<E
, keyofE
> &Readonly
<{created
:Date
;id
:number
;meta
:Readonly
<{causation
: {action?
:Readonly
<...> &object
;event?
: {id
:number
;name
:string
;stream
:string
; }; };correlation
:string
; }>;stream
:string
;version
:number
; }>[]>
Defined in: libs/act-pg/src/PostgresStore.ts:278
Commit new events to the store for a given stream, with concurrency control.
Type Parameters
E
E
extends Schemas
Parameters
stream
string
The stream name
msgs
Message
<E
, keyof E
>[]
Array of messages (event name and data)
meta
Event metadata (correlation, causation, etc.)
expectedVersion?
number
(Optional) Expected stream version for concurrency control
Returns
Promise
<Message
<E
, keyof E
> & Readonly
<{ created
: Date
; id
: number
; meta
: Readonly
<{ causation
: { action?
: Readonly
<...> & object
; event?
: { id
: number
; name
: string
; stream
: string
; }; }; correlation
: string
; }>; stream
: string
; version
: number
; }>[]>
Array of committed events
Throws
ConcurrencyError if the expected version does not match
Implementation of
dispose()
dispose():
Promise
<void
>
Defined in: libs/act-pg/src/PostgresStore.ts:78
Dispose of the store and close all database connections.
Returns
Promise
<void
>
Promise that resolves when all connections are closed
Implementation of
Store.dispose
drop()
drop():
Promise
<void
>
Defined in: libs/act-pg/src/PostgresStore.ts:167
Drop all tables and schema created by the store (for testing or cleanup).
Returns
Promise
<void
>
Promise that resolves when the schema is dropped
Implementation of
lease()
lease(
leases
,millis
):Promise
<Lease
[]>
Defined in: libs/act-pg/src/PostgresStore.ts:391
Lease streams for reaction processing, marking them as in-progress.
Parameters
leases
Lease
[]
Lease requests for streams, including end-of-lease watermark, lease holder, and source stream.
millis
number
Lease duration in milliseconds.
Returns
Promise
<Lease
[]>
Array of leased objects with updated lease info
Implementation of
poll()
poll(
lagging
,leading
):Promise
<Poll
[]>
Defined in: libs/act-pg/src/PostgresStore.ts:352
Polls the store for unblocked streams needing processing, ordered by lease watermark ascending.
Parameters
lagging
number
Max number of streams to poll in ascending order.
leading
number
Max number of streams to poll in descending order.
Returns
Promise
<Poll
[]>
The polled streams.
Implementation of
query()
query<
E
>(callback
,query?
):Promise
<number
>
Defined in: libs/act-pg/src/PostgresStore.ts:197
Query events from the store, optionally filtered by stream, event name, time, etc.
Type Parameters
E
E
extends Schemas
Parameters
callback
(event
) => void
Function called for each event found
query?
Readonly
<{ after?
: number
; backward?
: boolean
; before?
: number
; correlation?
: string
; created_after?
: Date
; created_before?
: Date
; limit?
: number
; names?
: string
[]; stream?
: string
; with_snaps?
: boolean
; }>
(Optional) Query filter (stream, names, before, after, etc.)
Returns
Promise
<number
>
The number of events found
Example
await store.query((event) => console.log(event), { stream: "A" });
Implementation of
seed()
seed():
Promise
<void
>
Defined in: libs/act-pg/src/PostgresStore.ts:87
Seed the database with required tables, indexes, and schema for event storage.
Returns
Promise
<void
>
Promise that resolves when seeding is complete
Throws
Error if seeding fails