Skip to main content

PostgresStore

@rotorsoft/act-root


@rotorsoft/act-root / act-pg/src / PostgresStore

Class: PostgresStore

Defined in: libs/act-pg/src/postgres-store.ts:212

Production-ready PostgreSQL event store implementation.

PostgresStore provides persistent, scalable event storage using PostgreSQL. It implements the full Store interface with production-grade features:

Features:

  • Persistent event storage with ACID guarantees
  • Optimistic concurrency control via version numbers
  • Distributed stream processing with leasing
  • Snapshot support for performance optimization
  • Connection pooling for scalability
  • Automatic table and index creation

Database Schema:

  • Events table: Stores all committed events
  • Streams table: Tracks stream metadata and leases
  • Indexes on stream, version, and timestamps for fast queries

Examplesโ€‹

import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";

store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret"
}));

const app = act()
.withState(Counter)
.build();
import { PostgresStore } from "@rotorsoft/act-pg";

const pgStore = new PostgresStore({
host: process.env.DB_HOST || "localhost",
port: parseInt(process.env.DB_PORT || "5432"),
database: process.env.DB_NAME || "myapp",
user: process.env.DB_USER || "postgres",
password: process.env.DB_PASSWORD,
schema: "events", // Custom schema
table: "act_events" // Custom table name
});

// Initialize tables
await pgStore.seed();
// PostgresStore uses node-postgres (pg) connection pooling
// Pool is created automatically with default settings
// For custom pool config, use environment variables:
// PGHOST, PGPORT, PGDATABASE, PGUSER, PGPASSWORD
// PGMAXCONNECTIONS, PGIDLETIMEOUT, etc.

const pgStore = new PostgresStore({
host: "db.example.com",
port: 5432,
database: "production",
user: "app_user",
password: process.env.DB_PASSWORD
});
// Use separate schemas per tenant
const tenants = ["tenant1", "tenant2", "tenant3"];

for (const tenant of tenants) {
const tenantStore = new PostgresStore({
host: "localhost",
database: "multitenant",
schema: tenant, // Each tenant gets own schema
table: "events"
});
await tenantStore.seed();
}
// For advanced queries, you can access pg client
const pgStore = new PostgresStore(config);
await pgStore.seed();

// Use the store's query method for standard queries
await pgStore.query(
(event) => console.log(event),
{ stream: "user-123", limit: 100 }
);

Seeโ€‹

Implementsโ€‹

Constructorsโ€‹

Constructorโ€‹

new PostgresStore(config?): PostgresStore

Defined in: libs/act-pg/src/postgres-store.ts:257

Create a new PostgresStore instance.

Parametersโ€‹

config?โ€‹

Partial<Config> = {}

Partial configuration (host, port, user, password, schema, table, etc.)

Returnsโ€‹

PostgresStore

Propertiesโ€‹

configโ€‹

readonly config: Config

Defined in: libs/act-pg/src/postgres-store.ts:214


notify?โ€‹

optional notify?: (handler) => Promise<NotifyDisposer>

Defined in: libs/act-pg/src/postgres-store.ts:249

Cross-process commit subscription. Present only when config.notify === true โ€” the orchestrator's auto-wire path checks if (store.notify), so omitting the method keeps single-instance deployments free of any LISTEN/NOTIFY overhead (no dedicated client, no per-commit pg_notify).

Parametersโ€‹

handlerโ€‹

(notification) => void

Returnsโ€‹

Promise<NotifyDisposer>

Seeโ€‹

Config.notify for the rationale and the multi-process contract.

Implementation ofโ€‹

Store.notify

Methodsโ€‹

ack()โ€‹

ack(leases): Promise<Lease[]>

Defined in: libs/act-pg/src/postgres-store.ts:806

Acknowledge and release leases after processing, updating stream positions.

Parametersโ€‹

leasesโ€‹

Lease[]

Leases to acknowledge, including last processed watermark and lease holder.

Returnsโ€‹

Promise<Lease[]>

Acked leases.

Implementation ofโ€‹

Store.ack


block()โ€‹

block(leases): Promise<BlockedLease[]>

Defined in: libs/act-pg/src/postgres-store.ts:861

Block a stream for processing after failing to process and reaching max retries with blocking enabled.

Parametersโ€‹

leasesโ€‹

BlockedLease[]

Leases to block, including lease holder and last error message.

Returnsโ€‹

Promise<BlockedLease[]>

Blocked leases.

Implementation ofโ€‹

Store.block


claim()โ€‹

claim(lagging, leading, by, millis, lane?): Promise<Lease[]>

Defined in: libs/act-pg/src/postgres-store.ts:632

Atomically discovers and leases streams for reaction processing.

Uses FOR UPDATE SKIP LOCKED to implement zero-contention competing consumers:

  • Workers never block each other โ€” locked rows are silently skipped
  • Discovery and locking happen in a single atomic transaction
  • No wasted polls โ€” every returned stream is exclusively owned

Parametersโ€‹

laggingโ€‹

number

Max streams from lagging frontier (ascending watermark)

leadingโ€‹

number

Max streams from leading frontier (descending watermark)

byโ€‹

string

Lease holder identifier (UUID)

millisโ€‹

number

Lease duration in milliseconds

lane?โ€‹

string

Returnsโ€‹

Promise<Lease[]>

Leased streams with metadata

Implementation ofโ€‹

Store.claim


commit()โ€‹

commit<E>(stream, msgs, meta, expectedVersion?): Promise<Committed<E, keyof E>[]>

Defined in: libs/act-pg/src/postgres-store.ts:532

Commit new events to the store for a given stream, with concurrency control.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Parametersโ€‹

streamโ€‹

string

The stream name

msgsโ€‹

Message<E, keyof E>[]

Array of messages (event name and data)

metaโ€‹

EventMeta

Event metadata (correlation, causation, etc.)

expectedVersion?โ€‹

number

(Optional) Expected stream version for concurrency control

Returnsโ€‹

Promise<Committed<E, keyof E>[]>

Array of committed events

Throwsโ€‹

ConcurrencyError if the expected version does not match

Implementation ofโ€‹

Store.commit


dispose()โ€‹

dispose(): Promise<void>

Defined in: libs/act-pg/src/postgres-store.ts:279

Dispose of the store and close all database connections. Releases any active LISTEN client first so the pool can drain cleanly.

Returnsโ€‹

Promise<void>

Promise that resolves when all connections are closed

Implementation ofโ€‹

Store.dispose


drop()โ€‹

drop(): Promise<void>

Defined in: libs/act-pg/src/postgres-store.ts:417

Drop all tables and schema created by the store (for testing or cleanup).

Returnsโ€‹

Promise<void>

Promise that resolves when the schema is dropped

Implementation ofโ€‹

Store.drop


prioritize()โ€‹

prioritize(filter, priority): Promise<number>

Defined in: libs/act-pg/src/postgres-store.ts:1032

Bulk-update priority of streams matching filter (ACT-102).

Filter semantics mirror query_streams: regex on stream / source by default, exact match with the _exact flags, blocked restricts to blocked or unblocked rows. Empty filter ({}) updates every registered stream.

Unlike subscribe (which keeps max() of registered priorities), this sets the priority outright โ€” operator override for the build-time scheduling policy.

Parametersโ€‹

filterโ€‹

StreamFilter

priorityโ€‹

number

Returnsโ€‹

Promise<number>

Count of streams whose priority changed.

Implementation ofโ€‹

Store.prioritize


query()โ€‹

query<E>(callback, query?): Promise<number>

Defined in: libs/act-pg/src/postgres-store.ts:447

Query events from the store, optionally filtered by stream, event name, time, etc.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Parametersโ€‹

callbackโ€‹

(event) => void

Function called for each event found

query?โ€‹

Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; stream_exact?: boolean; with_snaps?: boolean; }>

(Optional) Query filter (stream, names, before, after, etc.)

Returnsโ€‹

Promise<number>

The number of events found

Exampleโ€‹

await store.query((event) => console.log(event), { stream: "A" });

Implementation ofโ€‹

Store.query


query_stats()โ€‹

query_stats<E>(input, options?): Promise<Map<string, StreamStats<E>>>

Defined in: libs/act-pg/src/postgres-store.ts:1162

Per-stream aggregated stats โ€” see Store.query_stats.

Two code paths chosen by the requested stats:

  • Heads-only path (no count, no names): one or two SELECT DISTINCT ON (stream) ... ORDER BY stream, version DESC|ASC queries, executed in parallel when tail: true. The (stream, version) unique index gives index-only access โ€” K rows touched per query (K = matched streams), not N (events). Ordering by version (not id) is equivalent within a stream (versions are monotonic per stream and events are committed sequentially) and is the column actually indexed.

  • Full-scan path (count or names set): one CTE materializes the filtered events, then GROUP BY stream, name โ†’ jsonb_object_agg(name, n) for the names map plus per-stream COUNT(*) for count. Heads (and tails when requested) come from DISTINCT ON over the same CTE โ€” they ride free on the already-paid scan.

The stream universe is derived from the events table: filter form matches event-bearing streams (not subscription rows). When the filter sets source or blocked, the events table is joined against the streams subscription table since those concepts only exist for subscribed streams.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Parametersโ€‹

inputโ€‹

string[] | Pick<StreamFilter, "stream" | "stream_exact">

options?โ€‹

QueryStatsOptions<E>

Returnsโ€‹

Promise<Map<string, StreamStats<E>>>

Implementation ofโ€‹

Store.query_stats


query_streams()โ€‹

query_streams(callback, query?): Promise<QueryStreamsResult>

Defined in: libs/act-pg/src/postgres-store.ts:1050

Streams subscription positions to a callback, ordered by stream name, along with the highest event id in the store.

Filters (stream, source, blocked, after, limit) are applied server-side. stream/source are regex by default (~), or exact with *_exact: true โ€” same convention as Store.query.

Parametersโ€‹

callbackโ€‹

(position) => void

query?โ€‹

QueryStreams

Returnsโ€‹

Promise<QueryStreamsResult>

maxEventId and the count of positions emitted.

Implementation ofโ€‹

Store.query_streams


reset()โ€‹

reset(input): Promise<number>

Defined in: libs/act-pg/src/postgres-store.ts:958

Resets watermarks for the given streams to -1, making them eligible for replay from the beginning. Also clears retry, blocked, error, and lease state so the streams can be claimed immediately.

Prefer Act.reset() over calling this directly. This primitive only resets the store; it does not raise the orchestrator's internal "needs drain" flag, so a settled Act instance will short-circuit and skip the replay. Act.reset() wraps this and arms the flag.

Accepts either an explicit list of stream names or a StreamFilter for bulk operations (e.g., "rebuild every blocked stream"). The filter form is the same shape used by prioritize and query_streams. An empty filter ({}) matches every registered stream โ€” typically a footgun for reset; prefer narrower filters like { blocked: true }.

Parametersโ€‹

inputโ€‹

string[] | StreamFilter

Stream names or a StreamFilter

Returnsโ€‹

Promise<number>

Count of streams that were actually reset

Exampleโ€‹

// By name
await app.reset(["my-projection"]);

// By filter โ€” rebuild every blocked stream in a projection family
await app.reset({ stream: "^proj-orders-", blocked: true });

// Low-level (does NOT trigger replay on settled apps)
await store().reset(["my-projection"]);

Seeโ€‹

Act.reset for the high-level rebuild API that wraps this primitive and arms the orchestrator's drain flag

Implementation ofโ€‹

Store.reset


restore()โ€‹

restore(driver): Promise<void>

Defined in: libs/act-pg/src/postgres-store.ts:1554

Atomically wipe-and-rebuild the store inside a single BEGIN/COMMIT transaction.

On any throw inside the driver the transaction rolls back and the store ends byte-for-byte unchanged. TRUNCATE ... RESTART IDENTITY CASCADE wipes events + resets the serial sequence to 1; the streams table is cleared in the same statement via CASCADE-like DELETE. Events are inserted one at a time with explicit columns (skipping id) so the serial assigns dense ids from 1. created is preserved verbatim from the source.

Parametersโ€‹

driverโ€‹

(callback) => Promise<void>

Returnsโ€‹

Promise<void>

Implementation ofโ€‹

Store.restore


seed()โ€‹

seed(): Promise<void>

Defined in: libs/act-pg/src/postgres-store.ts:311

Seed the database with required tables, indexes, and schema for event storage.

Returnsโ€‹

Promise<void>

Promise that resolves when seeding is complete

Throwsโ€‹

Error if seeding fails

Implementation ofโ€‹

Store.seed


subscribe()โ€‹

subscribe(streams): Promise<{ subscribed: number; watermark: number; }>

Defined in: libs/act-pg/src/postgres-store.ts:731

Registers streams for event processing. Upserts stream entries so they become visible to claim(). Also returns the current max watermark across all subscriptions.

Parametersโ€‹

streamsโ€‹

object[]

Streams to register with optional source.

Returnsโ€‹

Promise<{ subscribed: number; watermark: number; }>

subscribed count and current max watermark.

Implementation ofโ€‹

Store.subscribe


truncate()โ€‹

truncate(targets): Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>

Defined in: libs/act-pg/src/postgres-store.ts:1487

Atomically truncates streams and seeds each with a snapshot or tombstone.

Parametersโ€‹

targetsโ€‹

object[]

Streams to truncate with optional snapshot state and meta.

Returnsโ€‹

Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>

Map keyed by stream name, each entry with deleted count and committed event.

Implementation ofโ€‹

Store.truncate


unblock()โ€‹

unblock(input): Promise<number>

Defined in: libs/act-pg/src/postgres-store.ts:992

Clear blocked flag (and retry / error / lease state) on streams without touching the at watermark. blocked = true is always applied, so the return count reflects only streams that were actually flipped โ€” already-unblocked rows, unknown streams, and filter matches that aren't blocked are silently skipped.

retry = -1 matches the InMemoryStore convention: claim() bumps retry on every acquisition, so storing -1 means the first claim after unblock returns retry=0 ("first attempt"). Storing 0 would mis-report the post-recovery attempt as a continuation of the failed sequence. See Store.unblock.

Parametersโ€‹

inputโ€‹

string[] | StreamFilter

Returnsโ€‹

Promise<number>

Count of streams that were actually flipped (were blocked).

Implementation ofโ€‹

Store.unblock