PostgresStore
@rotorsoft/act-root / act-pg/src / PostgresStore
Class: PostgresStore
Defined in: libs/act-pg/src/PostgresStore.ts:139
Production-ready PostgreSQL event store implementation.
PostgresStore provides persistent, scalable event storage using PostgreSQL. It implements the full Store interface with production-grade features:
Features:
- Persistent event storage with ACID guarantees
- Optimistic concurrency control via version numbers
- Distributed stream processing with leasing
- Snapshot support for performance optimization
- Connection pooling for scalability
- Automatic table and index creation
Database Schema:
- Events table: Stores all committed events
- Streams table: Tracks stream metadata and leases
- Indexes on stream, version, and timestamps for fast queries
Examples
import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";
store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret"
}));
const app = act()
.withState(Counter)
.build();
import { PostgresStore } from "@rotorsoft/act-pg";
const pgStore = new PostgresStore({
host: process.env.DB_HOST || "localhost",
port: parseInt(process.env.DB_PORT || "5432"),
database: process.env.DB_NAME || "myapp",
user: process.env.DB_USER || "postgres",
password: process.env.DB_PASSWORD,
schema: "events", // Custom schema
table: "act_events" // Custom table name
});
// Initialize tables
await pgStore.seed();
// PostgresStore uses node-postgres (pg) connection pooling
// Pool is created automatically with default settings
// For custom pool config, use environment variables:
// PGHOST, PGPORT, PGDATABASE, PGUSER, PGPASSWORD
// PGMAXCONNECTIONS, PGIDLETIMEOUT, etc.
const pgStore = new PostgresStore({
host: "db.example.com",
port: 5432,
database: "production",
user: "app_user",
password: process.env.DB_PASSWORD
});
// Use separate schemas per tenant
const tenants = ["tenant1", "tenant2", "tenant3"];
for (const tenant of tenants) {
const tenantStore = new PostgresStore({
host: "localhost",
database: "multitenant",
schema: tenant, // Each tenant gets own schema
table: "events"
});
await tenantStore.seed();
}
// For advanced queries, you can access pg client
const pgStore = new PostgresStore(config);
await pgStore.seed();
// Use the store's query method for standard queries
await pgStore.query(
(event) => console.log(event),
{ stream: "user-123", limit: 100 }
);
See
- Store for the interface definition
- InMemoryStore for development/testing
- store for injecting stores
- node-postgres documentation
Implements
Constructors
Constructor
new PostgresStore(
config?):PostgresStore
Defined in: libs/act-pg/src/PostgresStore.ts:149
Create a new PostgresStore instance.
Parameters
config?
Partial<Config> = {}
Partial configuration (host, port, user, password, schema, table, etc.)
Returns
PostgresStore
Properties
config
readonlyconfig:Config
Defined in: libs/act-pg/src/PostgresStore.ts:141
Methods
ack()
ack(
leases):Promise<Lease[]>
Defined in: libs/act-pg/src/PostgresStore.ts:567
Acknowledge and release leases after processing, updating stream positions.
Parameters
leases
Lease[]
Leases to acknowledge, including last processed watermark and lease holder.
Returns
Promise<Lease[]>
Acked leases.
Implementation of
block()
block(
leases):Promise<Lease&object[]>
Defined in: libs/act-pg/src/PostgresStore.ts:620
Block a stream for processing after failing to process and reaching max retries with blocking enabled.
Parameters
leases
Lease & object[]
Leases to block, including lease holder and last error message.
Returns
Promise<Lease & object[]>
Blocked leases.
Implementation of
claim()
claim(
lagging,leading,by,millis):Promise<Lease[]>
Defined in: libs/act-pg/src/PostgresStore.ts:442
Atomically discovers and leases streams for reaction processing.
Uses FOR UPDATE SKIP LOCKED to implement zero-contention competing consumers:
- Workers never block each other — locked rows are silently skipped
- Discovery and locking happen in a single atomic transaction
- No wasted polls — every returned stream is exclusively owned
Parameters
lagging
number
Max streams from lagging frontier (ascending watermark)
leading
number
Max streams from leading frontier (descending watermark)
by
string
Lease holder identifier (UUID)
millis
number
Lease duration in milliseconds
Returns
Promise<Lease[]>
Leased streams with metadata
Implementation of
commit()
commit<
E>(stream,msgs,meta,expectedVersion?):Promise<Committed<E, keyofE>[]>
Defined in: libs/act-pg/src/PostgresStore.ts:361
Commit new events to the store for a given stream, with concurrency control.
Type Parameters
E
E extends Schemas
Parameters
stream
string
The stream name
msgs
Message<E, keyof E>[]
Array of messages (event name and data)
meta
Event metadata (correlation, causation, etc.)
expectedVersion?
number
(Optional) Expected stream version for concurrency control
Returns
Promise<Committed<E, keyof E>[]>
Array of committed events
Throws
ConcurrencyError if the expected version does not match
Implementation of
dispose()
dispose():
Promise<void>
Defined in: libs/act-pg/src/PostgresStore.ts:161
Dispose of the store and close all database connections.
Returns
Promise<void>
Promise that resolves when all connections are closed
Implementation of
Store.dispose
drop()
drop():
Promise<void>
Defined in: libs/act-pg/src/PostgresStore.ts:250
Drop all tables and schema created by the store (for testing or cleanup).
Returns
Promise<void>
Promise that resolves when the schema is dropped
Implementation of
query()
query<
E>(callback,query?):Promise<number>
Defined in: libs/act-pg/src/PostgresStore.ts:280
Query events from the store, optionally filtered by stream, event name, time, etc.
Type Parameters
E
E extends Schemas
Parameters
callback
(event) => void
Function called for each event found
query?
Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; with_snaps?: boolean; }>
(Optional) Query filter (stream, names, before, after, etc.)
Returns
Promise<number>
The number of events found
Example
await store.query((event) => console.log(event), { stream: "A" });
Implementation of
seed()
seed():
Promise<void>
Defined in: libs/act-pg/src/PostgresStore.ts:170
Seed the database with required tables, indexes, and schema for event storage.
Returns
Promise<void>
Promise that resolves when seeding is complete
Throws
Error if seeding fails
Implementation of
subscribe()
subscribe(
streams):Promise<{subscribed:number;watermark:number; }>
Defined in: libs/act-pg/src/PostgresStore.ts:528
Registers streams for event processing. Upserts stream entries so they become visible to claim(). Also returns the current max watermark across all subscriptions.
Parameters
streams
object[]
Streams to register with optional source.
Returns
Promise<{ subscribed: number; watermark: number; }>
subscribed count and current max watermark.