Skip to main content

PostgresStore

@rotorsoft/act-root


@rotorsoft/act-root / act-pg/src / PostgresStore

Class: PostgresStore

Defined in: libs/act-pg/src/PostgresStore.ts:57

See

  • Store

PostgresStore is a production-ready event store adapter for Act, using PostgreSQL as the backend.

  • Supports event sourcing, leasing, snapshots, and concurrency control.
  • Designed for high-throughput, scalable, and reliable event storage.
  • Implements the Act Store interface.
  • https://github.com/rotorsoft/act-root

Example

import { PostgresStore } from "@act/pg";
const store = new PostgresStore({ schema: "my_schema", table: "events" });
await store.seed();

Implements

Constructors

Constructor

new PostgresStore(config): PostgresStore

Defined in: libs/act-pg/src/PostgresStore.ts:67

Create a new PostgresStore instance.

Parameters

config

Partial<Config> = {}

Partial configuration (host, port, user, password, schema, table, etc.)

Returns

PostgresStore

Properties

config

readonly config: Config

Defined in: libs/act-pg/src/PostgresStore.ts:59

Methods

ack()

ack(leases): Promise<Lease[]>

Defined in: libs/act-pg/src/PostgresStore.ts:473

Acknowledge and release leases after processing, updating stream positions.

Parameters

leases

Lease[]

Leases to acknowledge, including last processed watermark and lease holder.

Returns

Promise<Lease[]>

Acked leases.

Implementation of

Store.ack


block()

block(leases): Promise<Lease & object[]>

Defined in: libs/act-pg/src/PostgresStore.ts:526

Block a stream for processing after failing to process and reaching max retries with blocking enabled.

Parameters

leases

Lease & object[]

Leases to block, including lease holder and last error message.

Returns

Promise<Lease & object[]>

Blocked leases.

Implementation of

Store.block


commit()

commit<E>(stream, msgs, meta, expectedVersion?): Promise<Message<E, keyof E> & Readonly<{ created: Date; id: number; meta: Readonly<{ causation: { action?: Readonly<...> & object; event?: { id: number; name: string; stream: string; }; }; correlation: string; }>; stream: string; version: number; }>[]>

Defined in: libs/act-pg/src/PostgresStore.ts:278

Commit new events to the store for a given stream, with concurrency control.

Type Parameters

E

E extends Schemas

Parameters

stream

string

The stream name

msgs

Message<E, keyof E>[]

Array of messages (event name and data)

meta

EventMeta

Event metadata (correlation, causation, etc.)

expectedVersion?

number

(Optional) Expected stream version for concurrency control

Returns

Promise<Message<E, keyof E> & Readonly<{ created: Date; id: number; meta: Readonly<{ causation: { action?: Readonly<...> & object; event?: { id: number; name: string; stream: string; }; }; correlation: string; }>; stream: string; version: number; }>[]>

Array of committed events

Throws

ConcurrencyError if the expected version does not match

Implementation of

Store.commit


dispose()

dispose(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:78

Dispose of the store and close all database connections.

Returns

Promise<void>

Promise that resolves when all connections are closed

Implementation of

Store.dispose


drop()

drop(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:167

Drop all tables and schema created by the store (for testing or cleanup).

Returns

Promise<void>

Promise that resolves when the schema is dropped

Implementation of

Store.drop


lease()

lease(leases, millis): Promise<Lease[]>

Defined in: libs/act-pg/src/PostgresStore.ts:391

Lease streams for reaction processing, marking them as in-progress.

Parameters

leases

Lease[]

Lease requests for streams, including end-of-lease watermark, lease holder, and source stream.

millis

number

Lease duration in milliseconds.

Returns

Promise<Lease[]>

Array of leased objects with updated lease info

Implementation of

Store.lease


poll()

poll(lagging, leading): Promise<Poll[]>

Defined in: libs/act-pg/src/PostgresStore.ts:352

Polls the store for unblocked streams needing processing, ordered by lease watermark ascending.

Parameters

lagging

number

Max number of streams to poll in ascending order.

leading

number

Max number of streams to poll in descending order.

Returns

Promise<Poll[]>

The polled streams.

Implementation of

Store.poll


query()

query<E>(callback, query?): Promise<number>

Defined in: libs/act-pg/src/PostgresStore.ts:197

Query events from the store, optionally filtered by stream, event name, time, etc.

Type Parameters

E

E extends Schemas

Parameters

callback

(event) => void

Function called for each event found

query?

Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; with_snaps?: boolean; }>

(Optional) Query filter (stream, names, before, after, etc.)

Returns

Promise<number>

The number of events found

Example

await store.query((event) => console.log(event), { stream: "A" });

Implementation of

Store.query


seed()

seed(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:87

Seed the database with required tables, indexes, and schema for event storage.

Returns

Promise<void>

Promise that resolves when seeding is complete

Throws

Error if seeding fails

Implementation of

Store.seed