Skip to main content

PostgresStore

@rotorsoft/act-root


@rotorsoft/act-root / act-pg/src / PostgresStore

Class: PostgresStore

Defined in: libs/act-pg/src/PostgresStore.ts:139

Production-ready PostgreSQL event store implementation.

PostgresStore provides persistent, scalable event storage using PostgreSQL. It implements the full Store interface with production-grade features:

Features:

  • Persistent event storage with ACID guarantees
  • Optimistic concurrency control via version numbers
  • Distributed stream processing with leasing
  • Snapshot support for performance optimization
  • Connection pooling for scalability
  • Automatic table and index creation

Database Schema:

  • Events table: Stores all committed events
  • Streams table: Tracks stream metadata and leases
  • Indexes on stream, version, and timestamps for fast queries

Examples

import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";

store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret"
}));

const app = act()
.withState(Counter)
.build();
import { PostgresStore } from "@rotorsoft/act-pg";

const pgStore = new PostgresStore({
host: process.env.DB_HOST || "localhost",
port: parseInt(process.env.DB_PORT || "5432"),
database: process.env.DB_NAME || "myapp",
user: process.env.DB_USER || "postgres",
password: process.env.DB_PASSWORD,
schema: "events", // Custom schema
table: "act_events" // Custom table name
});

// Initialize tables
await pgStore.seed();
// PostgresStore uses node-postgres (pg) connection pooling
// Pool is created automatically with default settings
// For custom pool config, use environment variables:
// PGHOST, PGPORT, PGDATABASE, PGUSER, PGPASSWORD
// PGMAXCONNECTIONS, PGIDLETIMEOUT, etc.

const pgStore = new PostgresStore({
host: "db.example.com",
port: 5432,
database: "production",
user: "app_user",
password: process.env.DB_PASSWORD
});
// Use separate schemas per tenant
const tenants = ["tenant1", "tenant2", "tenant3"];

for (const tenant of tenants) {
const tenantStore = new PostgresStore({
host: "localhost",
database: "multitenant",
schema: tenant, // Each tenant gets own schema
table: "events"
});
await tenantStore.seed();
}
// For advanced queries, you can access pg client
const pgStore = new PostgresStore(config);
await pgStore.seed();

// Use the store's query method for standard queries
await pgStore.query(
(event) => console.log(event),
{ stream: "user-123", limit: 100 }
);

See

Implements

Constructors

Constructor

new PostgresStore(config?): PostgresStore

Defined in: libs/act-pg/src/PostgresStore.ts:149

Create a new PostgresStore instance.

Parameters

config?

Partial<Config> = {}

Partial configuration (host, port, user, password, schema, table, etc.)

Returns

PostgresStore

Properties

config

readonly config: Config

Defined in: libs/act-pg/src/PostgresStore.ts:141

Methods

ack()

ack(leases): Promise<Lease[]>

Defined in: libs/act-pg/src/PostgresStore.ts:567

Acknowledge and release leases after processing, updating stream positions.

Parameters

leases

Lease[]

Leases to acknowledge, including last processed watermark and lease holder.

Returns

Promise<Lease[]>

Acked leases.

Implementation of

Store.ack


block()

block(leases): Promise<Lease & object[]>

Defined in: libs/act-pg/src/PostgresStore.ts:620

Block a stream for processing after failing to process and reaching max retries with blocking enabled.

Parameters

leases

Lease & object[]

Leases to block, including lease holder and last error message.

Returns

Promise<Lease & object[]>

Blocked leases.

Implementation of

Store.block


claim()

claim(lagging, leading, by, millis): Promise<Lease[]>

Defined in: libs/act-pg/src/PostgresStore.ts:442

Atomically discovers and leases streams for reaction processing.

Uses FOR UPDATE SKIP LOCKED to implement zero-contention competing consumers:

  • Workers never block each other — locked rows are silently skipped
  • Discovery and locking happen in a single atomic transaction
  • No wasted polls — every returned stream is exclusively owned

Parameters

lagging

number

Max streams from lagging frontier (ascending watermark)

leading

number

Max streams from leading frontier (descending watermark)

by

string

Lease holder identifier (UUID)

millis

number

Lease duration in milliseconds

Returns

Promise<Lease[]>

Leased streams with metadata

Implementation of

Store.claim


commit()

commit<E>(stream, msgs, meta, expectedVersion?): Promise<Committed<E, keyof E>[]>

Defined in: libs/act-pg/src/PostgresStore.ts:361

Commit new events to the store for a given stream, with concurrency control.

Type Parameters

E

E extends Schemas

Parameters

stream

string

The stream name

msgs

Message<E, keyof E>[]

Array of messages (event name and data)

meta

EventMeta

Event metadata (correlation, causation, etc.)

expectedVersion?

number

(Optional) Expected stream version for concurrency control

Returns

Promise<Committed<E, keyof E>[]>

Array of committed events

Throws

ConcurrencyError if the expected version does not match

Implementation of

Store.commit


dispose()

dispose(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:161

Dispose of the store and close all database connections.

Returns

Promise<void>

Promise that resolves when all connections are closed

Implementation of

Store.dispose


drop()

drop(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:250

Drop all tables and schema created by the store (for testing or cleanup).

Returns

Promise<void>

Promise that resolves when the schema is dropped

Implementation of

Store.drop


query()

query<E>(callback, query?): Promise<number>

Defined in: libs/act-pg/src/PostgresStore.ts:280

Query events from the store, optionally filtered by stream, event name, time, etc.

Type Parameters

E

E extends Schemas

Parameters

callback

(event) => void

Function called for each event found

query?

Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; with_snaps?: boolean; }>

(Optional) Query filter (stream, names, before, after, etc.)

Returns

Promise<number>

The number of events found

Example

await store.query((event) => console.log(event), { stream: "A" });

Implementation of

Store.query


seed()

seed(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:170

Seed the database with required tables, indexes, and schema for event storage.

Returns

Promise<void>

Promise that resolves when seeding is complete

Throws

Error if seeding fails

Implementation of

Store.seed


subscribe()

subscribe(streams): Promise<{ subscribed: number; watermark: number; }>

Defined in: libs/act-pg/src/PostgresStore.ts:528

Registers streams for event processing. Upserts stream entries so they become visible to claim(). Also returns the current max watermark across all subscriptions.

Parameters

streams

object[]

Streams to register with optional source.

Returns

Promise<{ subscribed: number; watermark: number; }>

subscribed count and current max watermark.

Implementation of

Store.subscribe