Skip to main content

PostgresStore

@rotorsoft/act-root


@rotorsoft/act-root / act-pg/src / PostgresStore

Class: PostgresStore

Defined in: libs/act-pg/src/PostgresStore.ts:58

See

  • Store

PostgresStore is a production-ready event store adapter for Act, using PostgreSQL as the backend.

  • Supports event sourcing, leasing, snapshots, and concurrency control.
  • Designed for high-throughput, scalable, and reliable event storage.
  • Implements the Act Store interface.
  • https://github.com/rotorsoft/act-root

Example

import { PostgresStore } from "@act/pg";
const store = new PostgresStore({ schema: "my_schema", table: "events" });
await store.seed();

Implements

Constructors

Constructor

new PostgresStore(config): PostgresStore

Defined in: libs/act-pg/src/PostgresStore.ts:68

Create a new PostgresStore instance.

Parameters

config

Partial<Config> = {}

Partial configuration (host, port, user, password, schema, table, etc.)

Returns

PostgresStore

Properties

config

readonly config: Config

Defined in: libs/act-pg/src/PostgresStore.ts:60

Methods

ack()

ack(leases): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:444

Acknowledge and release leases after processing, updating stream positions.

Parameters

leases

Lease[]

Array of lease objects to acknowledge

Returns

Promise<void>

Promise that resolves when leases are acknowledged

Implementation of

Store.ack


commit()

commit<E>(stream, msgs, meta, expectedVersion?): Promise<Message<E, keyof E> & Readonly<{ created: Date; id: number; meta: Readonly<{ causation: { action?: Readonly<...> & object; event?: { id: number; name: string; stream: string; }; }; correlation: string; }>; stream: string; version: number; }>[]>

Defined in: libs/act-pg/src/PostgresStore.ts:280

Commit new events to the store for a given stream, with concurrency control.

Type Parameters

E

E extends Schemas

Parameters

stream

string

The stream name

msgs

Message<E, keyof E>[]

Array of messages (event name and data)

meta

EventMeta

Event metadata (correlation, causation, etc.)

expectedVersion?

number

(Optional) Expected stream version for concurrency control

Returns

Promise<Message<E, keyof E> & Readonly<{ created: Date; id: number; meta: Readonly<{ causation: { action?: Readonly<...> & object; event?: { id: number; name: string; stream: string; }; }; correlation: string; }>; stream: string; version: number; }>[]>

Array of committed events

Throws

ConcurrencyError if the expected version does not match

Implementation of

Store.commit


dispose()

dispose(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:79

Dispose of the store and close all database connections.

Returns

Promise<void>

Promise that resolves when all connections are closed

Implementation of

Store.dispose


drop()

drop(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:166

Drop all tables and schema created by the store (for testing or cleanup).

Returns

Promise<void>

Promise that resolves when the schema is dropped

Implementation of

Store.drop


fetch()

fetch<E>(limit): Promise<{ events: Committed<E, keyof E>[]; streams: string[]; }>

Defined in: libs/act-pg/src/PostgresStore.ts:352

Fetch a batch of events and streams for processing (drain cycle).

Type Parameters

E

E extends Schemas

Parameters

limit

number

The maximum number of events to fetch

Returns

Promise<{ events: Committed<E, keyof E>[]; streams: string[]; }>

An object with arrays of streams and events

Implementation of

Store.fetch


lease()

lease(leases): Promise<object[]>

Defined in: libs/act-pg/src/PostgresStore.ts:382

Lease streams for reaction processing, marking them as in-progress.

Parameters

leases

Lease[]

Array of lease objects (stream, at, etc.)

Returns

Promise<object[]>

Array of leased objects with updated lease info

Implementation of

Store.lease


query()

query<E>(callback, query?, withSnaps?): Promise<number>

Defined in: libs/act-pg/src/PostgresStore.ts:197

Query events from the store, optionally filtered by stream, event name, time, etc.

Type Parameters

E

E extends Schemas

Parameters

callback

(event) => void

Function called for each event found

query?

(Optional) Query filter (stream, names, before, after, etc.)

after?

number = ...

backward?

boolean = ...

before?

number = ...

correlation?

string = ...

created_after?

Date = ...

created_before?

Date = ...

limit?

number = ...

names?

string[] = ...

stream?

string = ...

withSnaps?

boolean = false

(Optional) If true, includes only events after the last snapshot

Returns

Promise<number>

The number of events found

Example

await store.query((event) => console.log(event), { stream: "A" });

Implementation of

Store.query


seed()

seed(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:88

Seed the database with required tables, indexes, and schema for event storage.

Returns

Promise<void>

Promise that resolves when seeding is complete

Throws

Error if seeding fails

Implementation of

Store.seed