PostgresStore
@rotorsoft/act-root / act-pg/src / PostgresStore
Class: PostgresStore
Defined in: libs/act-pg/src/PostgresStore.ts:148
Production-ready PostgreSQL event store implementation.
PostgresStore provides persistent, scalable event storage using PostgreSQL. It implements the full Store interface with production-grade features:
Features:
- Persistent event storage with ACID guarantees
- Optimistic concurrency control via version numbers
- Distributed stream processing with leasing
- Snapshot support for performance optimization
- Connection pooling for scalability
- Automatic table and index creation
Database Schema:
- Events table: Stores all committed events
- Streams table: Tracks stream metadata and leases
- Indexes on stream, version, and timestamps for fast queries
Examples
import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";
store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret"
}));
const app = act()
.withState(Counter)
.build();
import { PostgresStore } from "@rotorsoft/act-pg";
const pgStore = new PostgresStore({
host: process.env.DB_HOST || "localhost",
port: parseInt(process.env.DB_PORT || "5432"),
database: process.env.DB_NAME || "myapp",
user: process.env.DB_USER || "postgres",
password: process.env.DB_PASSWORD,
schema: "events", // Custom schema
table: "act_events" // Custom table name
});
// Initialize tables
await pgStore.seed();
// PostgresStore uses node-postgres (pg) connection pooling
// Pool is created automatically with default settings
// For custom pool config, use environment variables:
// PGHOST, PGPORT, PGDATABASE, PGUSER, PGPASSWORD
// PGMAXCONNECTIONS, PGIDLETIMEOUT, etc.
const pgStore = new PostgresStore({
host: "db.example.com",
port: 5432,
database: "production",
user: "app_user",
password: process.env.DB_PASSWORD
});
// Use separate schemas per tenant
const tenants = ["tenant1", "tenant2", "tenant3"];
for (const tenant of tenants) {
const tenantStore = new PostgresStore({
host: "localhost",
database: "multitenant",
schema: tenant, // Each tenant gets own schema
table: "events"
});
await tenantStore.seed();
}
// For advanced queries, you can access pg client
const pgStore = new PostgresStore(config);
await pgStore.seed();
// Use the store's query method for standard queries
await pgStore.query(
(event) => console.log(event),
{ stream: "user-123", limit: 100 }
);
See
- Store for the interface definition
- InMemoryStore for development/testing
- store for injecting stores
- node-postgres documentation
Implements
Constructors
Constructor
new PostgresStore(
config?):PostgresStore
Defined in: libs/act-pg/src/PostgresStore.ts:158
Create a new PostgresStore instance.
Parameters
config?
Partial<Config> = {}
Partial configuration (host, port, user, password, schema, table, etc.)
Returns
PostgresStore
Properties
config
readonlyconfig:Config
Defined in: libs/act-pg/src/PostgresStore.ts:150
Methods
ack()
ack(
leases):Promise<Lease[]>
Defined in: libs/act-pg/src/PostgresStore.ts:564
Acknowledge and release leases after processing, updating stream positions.
Parameters
leases
Lease[]
Leases to acknowledge, including last processed watermark and lease holder.
Returns
Promise<Lease[]>
Acked leases.
Implementation of
block()
block(
leases):Promise<Lease&object[]>
Defined in: libs/act-pg/src/PostgresStore.ts:617
Block a stream for processing after failing to process and reaching max retries with blocking enabled.
Parameters
leases
Lease & object[]
Leases to block, including lease holder and last error message.
Returns
Promise<Lease & object[]>
Blocked leases.
Implementation of
commit()
commit<
E>(stream,msgs,meta,expectedVersion?):Promise<Message<E, keyofE> &Readonly<{created:Date;id:number;meta:Readonly<{causation: {action?:Readonly<...> &object;event?: {id:number;name:string;stream:string; }; };correlation:string; }>;stream:string;version:number; }>[]>
Defined in: libs/act-pg/src/PostgresStore.ts:369
Commit new events to the store for a given stream, with concurrency control.
Type Parameters
E
E extends Schemas
Parameters
stream
string
The stream name
msgs
Message<E, keyof E>[]
Array of messages (event name and data)
meta
Event metadata (correlation, causation, etc.)
expectedVersion?
number
(Optional) Expected stream version for concurrency control
Returns
Promise<Message<E, keyof E> & Readonly<{ created: Date; id: number; meta: Readonly<{ causation: { action?: Readonly<...> & object; event?: { id: number; name: string; stream: string; }; }; correlation: string; }>; stream: string; version: number; }>[]>
Array of committed events
Throws
ConcurrencyError if the expected version does not match
Implementation of
dispose()
dispose():
Promise<void>
Defined in: libs/act-pg/src/PostgresStore.ts:169
Dispose of the store and close all database connections.
Returns
Promise<void>
Promise that resolves when all connections are closed
Implementation of
Store.dispose
drop()
drop():
Promise<void>
Defined in: libs/act-pg/src/PostgresStore.ts:258
Drop all tables and schema created by the store (for testing or cleanup).
Returns
Promise<void>
Promise that resolves when the schema is dropped
Implementation of
lease()
lease(
leases,millis):Promise<Lease[]>
Defined in: libs/act-pg/src/PostgresStore.ts:482
Lease streams for reaction processing, marking them as in-progress.
Parameters
leases
Lease[]
Lease requests for streams, including end-of-lease watermark, lease holder, and source stream.
millis
number
Lease duration in milliseconds.
Returns
Promise<Lease[]>
Array of leased objects with updated lease info
Implementation of
poll()
poll(
lagging,leading):Promise<Poll[]>
Defined in: libs/act-pg/src/PostgresStore.ts:443
Polls the store for unblocked streams needing processing, ordered by lease watermark ascending.
Parameters
lagging
number
Max number of streams to poll in ascending order.
leading
number
Max number of streams to poll in descending order.
Returns
Promise<Poll[]>
The polled streams.
Implementation of
query()
query<
E>(callback,query?):Promise<number>
Defined in: libs/act-pg/src/PostgresStore.ts:288
Query events from the store, optionally filtered by stream, event name, time, etc.
Type Parameters
E
E extends Schemas
Parameters
callback
(event) => void
Function called for each event found
query?
Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; with_snaps?: boolean; }>
(Optional) Query filter (stream, names, before, after, etc.)
Returns
Promise<number>
The number of events found
Example
await store.query((event) => console.log(event), { stream: "A" });
Implementation of
seed()
seed():
Promise<void>
Defined in: libs/act-pg/src/PostgresStore.ts:178
Seed the database with required tables, indexes, and schema for event storage.
Returns
Promise<void>
Promise that resolves when seeding is complete
Throws
Error if seeding fails