Skip to main content

PostgresStore

@rotorsoft/act-root


@rotorsoft/act-root / act-pg/src / PostgresStore

Class: PostgresStore

Defined in: libs/act-pg/src/PostgresStore.ts:148

Production-ready PostgreSQL event store implementation.

PostgresStore provides persistent, scalable event storage using PostgreSQL. It implements the full Store interface with production-grade features:

Features:

  • Persistent event storage with ACID guarantees
  • Optimistic concurrency control via version numbers
  • Distributed stream processing with leasing
  • Snapshot support for performance optimization
  • Connection pooling for scalability
  • Automatic table and index creation

Database Schema:

  • Events table: Stores all committed events
  • Streams table: Tracks stream metadata and leases
  • Indexes on stream, version, and timestamps for fast queries

Examples

import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";

store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret"
}));

const app = act()
.withState(Counter)
.build();
import { PostgresStore } from "@rotorsoft/act-pg";

const pgStore = new PostgresStore({
host: process.env.DB_HOST || "localhost",
port: parseInt(process.env.DB_PORT || "5432"),
database: process.env.DB_NAME || "myapp",
user: process.env.DB_USER || "postgres",
password: process.env.DB_PASSWORD,
schema: "events", // Custom schema
table: "act_events" // Custom table name
});

// Initialize tables
await pgStore.seed();
// PostgresStore uses node-postgres (pg) connection pooling
// Pool is created automatically with default settings
// For custom pool config, use environment variables:
// PGHOST, PGPORT, PGDATABASE, PGUSER, PGPASSWORD
// PGMAXCONNECTIONS, PGIDLETIMEOUT, etc.

const pgStore = new PostgresStore({
host: "db.example.com",
port: 5432,
database: "production",
user: "app_user",
password: process.env.DB_PASSWORD
});
// Use separate schemas per tenant
const tenants = ["tenant1", "tenant2", "tenant3"];

for (const tenant of tenants) {
const tenantStore = new PostgresStore({
host: "localhost",
database: "multitenant",
schema: tenant, // Each tenant gets own schema
table: "events"
});
await tenantStore.seed();
}
// For advanced queries, you can access pg client
const pgStore = new PostgresStore(config);
await pgStore.seed();

// Use the store's query method for standard queries
await pgStore.query(
(event) => console.log(event),
{ stream: "user-123", limit: 100 }
);

See

Implements

Constructors

Constructor

new PostgresStore(config?): PostgresStore

Defined in: libs/act-pg/src/PostgresStore.ts:158

Create a new PostgresStore instance.

Parameters

config?

Partial<Config> = {}

Partial configuration (host, port, user, password, schema, table, etc.)

Returns

PostgresStore

Properties

config

readonly config: Config

Defined in: libs/act-pg/src/PostgresStore.ts:150

Methods

ack()

ack(leases): Promise<Lease[]>

Defined in: libs/act-pg/src/PostgresStore.ts:564

Acknowledge and release leases after processing, updating stream positions.

Parameters

leases

Lease[]

Leases to acknowledge, including last processed watermark and lease holder.

Returns

Promise<Lease[]>

Acked leases.

Implementation of

Store.ack


block()

block(leases): Promise<Lease & object[]>

Defined in: libs/act-pg/src/PostgresStore.ts:617

Block a stream for processing after failing to process and reaching max retries with blocking enabled.

Parameters

leases

Lease & object[]

Leases to block, including lease holder and last error message.

Returns

Promise<Lease & object[]>

Blocked leases.

Implementation of

Store.block


commit()

commit<E>(stream, msgs, meta, expectedVersion?): Promise<Message<E, keyof E> & Readonly<{ created: Date; id: number; meta: Readonly<{ causation: { action?: Readonly<...> & object; event?: { id: number; name: string; stream: string; }; }; correlation: string; }>; stream: string; version: number; }>[]>

Defined in: libs/act-pg/src/PostgresStore.ts:369

Commit new events to the store for a given stream, with concurrency control.

Type Parameters

E

E extends Schemas

Parameters

stream

string

The stream name

msgs

Message<E, keyof E>[]

Array of messages (event name and data)

meta

EventMeta

Event metadata (correlation, causation, etc.)

expectedVersion?

number

(Optional) Expected stream version for concurrency control

Returns

Promise<Message<E, keyof E> & Readonly<{ created: Date; id: number; meta: Readonly<{ causation: { action?: Readonly<...> & object; event?: { id: number; name: string; stream: string; }; }; correlation: string; }>; stream: string; version: number; }>[]>

Array of committed events

Throws

ConcurrencyError if the expected version does not match

Implementation of

Store.commit


dispose()

dispose(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:169

Dispose of the store and close all database connections.

Returns

Promise<void>

Promise that resolves when all connections are closed

Implementation of

Store.dispose


drop()

drop(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:258

Drop all tables and schema created by the store (for testing or cleanup).

Returns

Promise<void>

Promise that resolves when the schema is dropped

Implementation of

Store.drop


lease()

lease(leases, millis): Promise<Lease[]>

Defined in: libs/act-pg/src/PostgresStore.ts:482

Lease streams for reaction processing, marking them as in-progress.

Parameters

leases

Lease[]

Lease requests for streams, including end-of-lease watermark, lease holder, and source stream.

millis

number

Lease duration in milliseconds.

Returns

Promise<Lease[]>

Array of leased objects with updated lease info

Implementation of

Store.lease


poll()

poll(lagging, leading): Promise<Poll[]>

Defined in: libs/act-pg/src/PostgresStore.ts:443

Polls the store for unblocked streams needing processing, ordered by lease watermark ascending.

Parameters

lagging

number

Max number of streams to poll in ascending order.

leading

number

Max number of streams to poll in descending order.

Returns

Promise<Poll[]>

The polled streams.

Implementation of

Store.poll


query()

query<E>(callback, query?): Promise<number>

Defined in: libs/act-pg/src/PostgresStore.ts:288

Query events from the store, optionally filtered by stream, event name, time, etc.

Type Parameters

E

E extends Schemas

Parameters

callback

(event) => void

Function called for each event found

query?

Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; with_snaps?: boolean; }>

(Optional) Query filter (stream, names, before, after, etc.)

Returns

Promise<number>

The number of events found

Example

await store.query((event) => console.log(event), { stream: "A" });

Implementation of

Store.query


seed()

seed(): Promise<void>

Defined in: libs/act-pg/src/PostgresStore.ts:178

Seed the database with required tables, indexes, and schema for event storage.

Returns

Promise<void>

Promise that resolves when seeding is complete

Throws

Error if seeding fails

Implementation of

Store.seed