PostgresStore
@rotorsoft/act-root / act-pg/src / PostgresStore
Class: PostgresStore
Defined in: libs/act-pg/src/postgres-store.ts:212
Production-ready PostgreSQL event store implementation.
PostgresStore provides persistent, scalable event storage using PostgreSQL. It implements the full Store interface with production-grade features:
Features:
- Persistent event storage with ACID guarantees
- Optimistic concurrency control via version numbers
- Distributed stream processing with leasing
- Snapshot support for performance optimization
- Connection pooling for scalability
- Automatic table and index creation
Database Schema:
- Events table: Stores all committed events
- Streams table: Tracks stream metadata and leases
- Indexes on stream, version, and timestamps for fast queries
Examplesโ
import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";
store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret"
}));
const app = act()
.withState(Counter)
.build();
import { PostgresStore } from "@rotorsoft/act-pg";
const pgStore = new PostgresStore({
host: process.env.DB_HOST || "localhost",
port: parseInt(process.env.DB_PORT || "5432"),
database: process.env.DB_NAME || "myapp",
user: process.env.DB_USER || "postgres",
password: process.env.DB_PASSWORD,
schema: "events", // Custom schema
table: "act_events" // Custom table name
});
// Initialize tables
await pgStore.seed();
// PostgresStore uses node-postgres (pg) connection pooling
// Pool is created automatically with default settings
// For custom pool config, use environment variables:
// PGHOST, PGPORT, PGDATABASE, PGUSER, PGPASSWORD
// PGMAXCONNECTIONS, PGIDLETIMEOUT, etc.
const pgStore = new PostgresStore({
host: "db.example.com",
port: 5432,
database: "production",
user: "app_user",
password: process.env.DB_PASSWORD
});
// Use separate schemas per tenant
const tenants = ["tenant1", "tenant2", "tenant3"];
for (const tenant of tenants) {
const tenantStore = new PostgresStore({
host: "localhost",
database: "multitenant",
schema: tenant, // Each tenant gets own schema
table: "events"
});
await tenantStore.seed();
}
// For advanced queries, you can access pg client
const pgStore = new PostgresStore(config);
await pgStore.seed();
// Use the store's query method for standard queries
await pgStore.query(
(event) => console.log(event),
{ stream: "user-123", limit: 100 }
);
Seeโ
- Store for the interface definition
- InMemoryStore for development/testing
- store for injecting stores
- node-postgres documentation
Implementsโ
Constructorsโ
Constructorโ
new PostgresStore(
config?):PostgresStore
Defined in: libs/act-pg/src/postgres-store.ts:257
Create a new PostgresStore instance.
Parametersโ
config?โ
Partial<Config> = {}
Partial configuration (host, port, user, password, schema, table, etc.)
Returnsโ
PostgresStore
Propertiesโ
configโ
readonlyconfig:Config
Defined in: libs/act-pg/src/postgres-store.ts:214
notify?โ
optionalnotify?: (handler) =>Promise<NotifyDisposer>
Defined in: libs/act-pg/src/postgres-store.ts:249
Cross-process commit subscription. Present only when
config.notify === true โ the orchestrator's auto-wire path
checks if (store.notify), so omitting the method keeps
single-instance deployments free of any LISTEN/NOTIFY overhead
(no dedicated client, no per-commit pg_notify).
Parametersโ
handlerโ
(notification) => void
Returnsโ
Promise<NotifyDisposer>
Seeโ
Config.notify for the rationale and the multi-process contract.
Implementation ofโ
Methodsโ
ack()โ
ack(
leases):Promise<Lease[]>
Defined in: libs/act-pg/src/postgres-store.ts:806
Acknowledge and release leases after processing, updating stream positions.
Parametersโ
leasesโ
Lease[]
Leases to acknowledge, including last processed watermark and lease holder.
Returnsโ
Promise<Lease[]>
Acked leases.
Implementation ofโ
block()โ
block(
leases):Promise<BlockedLease[]>
Defined in: libs/act-pg/src/postgres-store.ts:861
Block a stream for processing after failing to process and reaching max retries with blocking enabled.
Parametersโ
leasesโ
Leases to block, including lease holder and last error message.
Returnsโ
Promise<BlockedLease[]>
Blocked leases.
Implementation ofโ
claim()โ
claim(
lagging,leading,by,millis,lane?):Promise<Lease[]>
Defined in: libs/act-pg/src/postgres-store.ts:632
Atomically discovers and leases streams for reaction processing.
Uses FOR UPDATE SKIP LOCKED to implement zero-contention competing consumers:
- Workers never block each other โ locked rows are silently skipped
- Discovery and locking happen in a single atomic transaction
- No wasted polls โ every returned stream is exclusively owned
Parametersโ
laggingโ
number
Max streams from lagging frontier (ascending watermark)
leadingโ
number
Max streams from leading frontier (descending watermark)
byโ
string
Lease holder identifier (UUID)
millisโ
number
Lease duration in milliseconds
lane?โ
string
Returnsโ
Promise<Lease[]>
Leased streams with metadata
Implementation ofโ
commit()โ
commit<
E>(stream,msgs,meta,expectedVersion?):Promise<Committed<E, keyofE>[]>
Defined in: libs/act-pg/src/postgres-store.ts:532
Commit new events to the store for a given stream, with concurrency control.
Type Parametersโ
Eโ
E extends Schemas
Parametersโ
streamโ
string
The stream name
msgsโ
Message<E, keyof E>[]
Array of messages (event name and data)
metaโ
Event metadata (correlation, causation, etc.)
expectedVersion?โ
number
(Optional) Expected stream version for concurrency control
Returnsโ
Promise<Committed<E, keyof E>[]>
Array of committed events
Throwsโ
ConcurrencyError if the expected version does not match
Implementation ofโ
dispose()โ
dispose():
Promise<void>
Defined in: libs/act-pg/src/postgres-store.ts:279
Dispose of the store and close all database connections. Releases any active LISTEN client first so the pool can drain cleanly.
Returnsโ
Promise<void>
Promise that resolves when all connections are closed
Implementation ofโ
Store.dispose
drop()โ
drop():
Promise<void>
Defined in: libs/act-pg/src/postgres-store.ts:417
Drop all tables and schema created by the store (for testing or cleanup).
Returnsโ
Promise<void>
Promise that resolves when the schema is dropped
Implementation ofโ
prioritize()โ
prioritize(
filter,priority):Promise<number>
Defined in: libs/act-pg/src/postgres-store.ts:1032
Bulk-update priority of streams matching filter (ACT-102).
Filter semantics mirror query_streams: regex on stream /
source by default, exact match with the _exact flags,
blocked restricts to blocked or unblocked rows. Empty filter
({}) updates every registered stream.
Unlike subscribe (which keeps max() of registered
priorities), this sets the priority outright โ operator override
for the build-time scheduling policy.
Parametersโ
filterโ
priorityโ
number
Returnsโ
Promise<number>
Count of streams whose priority changed.
Implementation ofโ
query()โ
query<
E>(callback,query?):Promise<number>
Defined in: libs/act-pg/src/postgres-store.ts:447
Query events from the store, optionally filtered by stream, event name, time, etc.
Type Parametersโ
Eโ
E extends Schemas
Parametersโ
callbackโ
(event) => void
Function called for each event found
query?โ
Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; stream_exact?: boolean; with_snaps?: boolean; }>
(Optional) Query filter (stream, names, before, after, etc.)
Returnsโ
Promise<number>
The number of events found
Exampleโ
await store.query((event) => console.log(event), { stream: "A" });
Implementation ofโ
query_stats()โ
query_stats<
E>(input,options?):Promise<Map<string,StreamStats<E>>>
Defined in: libs/act-pg/src/postgres-store.ts:1162
Per-stream aggregated stats โ see Store.query_stats.
Two code paths chosen by the requested stats:
-
Heads-only path (no
count, nonames): one or twoSELECT DISTINCT ON (stream) ... ORDER BY stream, version DESC|ASCqueries, executed in parallel whentail: true. The(stream, version)unique index gives index-only access โ K rows touched per query (K = matched streams), not N (events). Ordering byversion(notid) is equivalent within a stream (versions are monotonic per stream and events are committed sequentially) and is the column actually indexed. -
Full-scan path (
countornamesset): one CTE materializes the filtered events, thenGROUP BY stream, nameโjsonb_object_agg(name, n)for thenamesmap plus per-streamCOUNT(*)forcount. Heads (andtailswhen requested) come fromDISTINCT ONover the same CTE โ they ride free on the already-paid scan.
The stream universe is derived from the events table: filter form
matches event-bearing streams (not subscription rows). When the
filter sets source or blocked, the events table is joined
against the streams subscription table since those concepts only
exist for subscribed streams.
Type Parametersโ
Eโ
E extends Schemas
Parametersโ
inputโ
string[] | Pick<StreamFilter, "stream" | "stream_exact">
options?โ
Returnsโ
Promise<Map<string, StreamStats<E>>>
Implementation ofโ
query_streams()โ
query_streams(
callback,query?):Promise<QueryStreamsResult>
Defined in: libs/act-pg/src/postgres-store.ts:1050
Streams subscription positions to a callback, ordered by stream name, along with the highest event id in the store.
Filters (stream, source, blocked, after, limit) are applied
server-side. stream/source are regex by default (~), or exact
with *_exact: true โ same convention as Store.query.
Parametersโ
callbackโ
(position) => void
query?โ
Returnsโ
Promise<QueryStreamsResult>
maxEventId and the count of positions emitted.
Implementation ofโ
reset()โ
reset(
input):Promise<number>
Defined in: libs/act-pg/src/postgres-store.ts:958
Resets watermarks for the given streams to -1, making them eligible for replay from the beginning. Also clears retry, blocked, error, and lease state so the streams can be claimed immediately.
Prefer Act.reset() over calling this directly. This primitive
only resets the store; it does not raise the orchestrator's internal
"needs drain" flag, so a settled Act instance will short-circuit and
skip the replay. Act.reset() wraps this and arms the flag.
Accepts either an explicit list of stream names or a
StreamFilter for bulk operations (e.g., "rebuild every
blocked stream"). The filter form is the same shape used by
prioritize and query_streams. An empty filter
({}) matches every registered stream โ typically a footgun for
reset; prefer narrower filters like { blocked: true }.
Parametersโ
inputโ
string[] | StreamFilter
Stream names or a StreamFilter
Returnsโ
Promise<number>
Count of streams that were actually reset
Exampleโ
// By name
await app.reset(["my-projection"]);
// By filter โ rebuild every blocked stream in a projection family
await app.reset({ stream: "^proj-orders-", blocked: true });
// Low-level (does NOT trigger replay on settled apps)
await store().reset(["my-projection"]);
Seeโ
Act.reset for the high-level rebuild API that wraps this primitive and arms the orchestrator's drain flag
Implementation ofโ
restore()โ
restore(
driver):Promise<void>
Defined in: libs/act-pg/src/postgres-store.ts:1554
Atomically wipe-and-rebuild the store inside a single
BEGIN/COMMIT transaction.
On any throw inside the driver the transaction rolls back and the
store ends byte-for-byte unchanged. TRUNCATE ... RESTART IDENTITY CASCADE wipes events + resets the serial sequence to 1;
the streams table is cleared in the same statement via
CASCADE-like DELETE. Events are inserted one at a time with
explicit columns (skipping id) so the serial assigns dense ids
from 1. created is preserved verbatim from the source.
Parametersโ
driverโ
(callback) => Promise<void>
Returnsโ
Promise<void>
Implementation ofโ
seed()โ
seed():
Promise<void>
Defined in: libs/act-pg/src/postgres-store.ts:311
Seed the database with required tables, indexes, and schema for event storage.
Returnsโ
Promise<void>
Promise that resolves when seeding is complete
Throwsโ
Error if seeding fails
Implementation ofโ
subscribe()โ
subscribe(
streams):Promise<{subscribed:number;watermark:number; }>
Defined in: libs/act-pg/src/postgres-store.ts:731
Registers streams for event processing. Upserts stream entries so they become visible to claim(). Also returns the current max watermark across all subscriptions.
Parametersโ
streamsโ
object[]
Streams to register with optional source.
Returnsโ
Promise<{ subscribed: number; watermark: number; }>
subscribed count and current max watermark.
Implementation ofโ
truncate()โ
truncate(
targets):Promise<Map<string, {committed:Committed<Schemas,string>;deleted:number; }>>
Defined in: libs/act-pg/src/postgres-store.ts:1487
Atomically truncates streams and seeds each with a snapshot or tombstone.
Parametersโ
targetsโ
object[]
Streams to truncate with optional snapshot state and meta.
Returnsโ
Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>
Map keyed by stream name, each entry with deleted count and committed event.
Implementation ofโ
unblock()โ
unblock(
input):Promise<number>
Defined in: libs/act-pg/src/postgres-store.ts:992
Clear blocked flag (and retry / error / lease state) on streams
without touching the at watermark. blocked = true is always
applied, so the return count reflects only streams that were
actually flipped โ already-unblocked rows, unknown streams, and
filter matches that aren't blocked are silently skipped.
retry = -1 matches the InMemoryStore convention: claim() bumps
retry on every acquisition, so storing -1 means the first claim
after unblock returns retry=0 ("first attempt"). Storing 0 would
mis-report the post-recovery attempt as a continuation of the
failed sequence. See Store.unblock.
Parametersโ
inputโ
string[] | StreamFilter
Returnsโ
Promise<number>
Count of streams that were actually flipped (were blocked).