Skip to main content

Store

@rotorsoft/act-root


@rotorsoft/act-root / act/src / Store

Interface: Store

Defined in: libs/act/src/types/ports.ts:189

Interface for event store implementations.

The Store interface defines the contract for persistence adapters in Act. Implementations must provide event storage, querying, and distributed processing capabilities through leasing and watermark tracking.

Act includes two built-in implementations:

  • InMemoryStore: For development and testing
  • PostgresStore: For production use with PostgreSQL

Custom stores can be implemented for other databases or event log systems.

Example

import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";

// Replace the default in-memory store
store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret"
}));

const app = act()
.withState(Counter)
.build();

See

  • InMemoryStore for the default implementation
  • PostgresStore for the PostgreSQL implementation

Extends

Properties

ack

ack: (leases) => Promise<Lease[]>

Defined in: libs/act/src/types/ports.ts:370

Acknowledges successful processing of leased streams.

Updates the watermark to indicate events have been processed successfully. Releases the lease so other workers can process subsequent events.

Parameters

leases

Lease[]

Leases to acknowledge with updated watermarks

Returns

Promise<Lease[]>

Acknowledged leases

Example

const leased = await store().claim(5, 5, randomUUID(), 10000);
// Process events up to ID 150
await store().ack(leased.map(l => ({ ...l, at: 150 })));

See

claim for acquiring leases


block

block: (leases) => Promise<BlockedLease[]>

Defined in: libs/act/src/types/ports.ts:403

Blocks streams after persistent processing failures.

Blocked streams won't be returned by claim until manually unblocked. This prevents poison messages from repeatedly failing and consuming resources.

Streams are typically blocked when:

  • Max retries reached
  • blockOnError option is true
  • Handler throws an error

Parameters

leases

BlockedLease[]

Leases to block with error messages

Returns

Promise<BlockedLease[]>

Blocked leases

Example

try {
await processEvents(lease);
await store().ack([lease]);
} catch (error) {
if (lease.retry >= 3) {
await store().block([{
...lease,
error: error.message
}]);
}
}

See

claim for lease management


claim

claim: (lagging, leading, by, millis) => Promise<Lease[]>

Defined in: libs/act/src/types/ports.ts:319

Atomically discovers and leases streams for reaction processing.

Atomically discovers a stream and acquires a lease in one round-trip, eliminating the race that exists when discovery and locking are separate calls (a competing worker can grab the stream between the two).

PostgresStore uses FOR UPDATE SKIP LOCKED for zero-contention competing consumer semantics — workers never block each other, each grabbing different streams atomically. InMemoryStore fuses its poll+lease logic equivalently.

Used by Act.drain() as the primary stream acquisition method.

Parameters

lagging

number

Max streams from the lagging frontier (ascending watermark)

leading

number

Max streams from the leading frontier (descending watermark)

by

string

Unique lease holder identifier (UUID)

millis

number

Lease duration in milliseconds

Returns

Promise<Lease[]>

Array of successfully leased streams with metadata

Example

const leased = await store().claim(5, 5, randomUUID(), 10000);
leased.forEach(({ stream, at, lagging }) => {
console.log(`Leased ${stream} at ${at} (lagging: ${lagging})`);
});

See

  • subscribe for registering new streams (used by correlate)
  • ack for acknowledging completion
  • block for blocking failed streams

commit

commit: <E>(stream, msgs, meta, expectedVersion?) => Promise<Committed<E, keyof E>[]>

Defined in: libs/act/src/types/ports.ts:250

Commits one or more events to a stream atomically.

This is the core method for persisting events. It must:

  • Assign global sequence IDs to events
  • Increment the stream version
  • Check optimistic concurrency if expectedVersion is provided
  • Store events atomically (all or nothing)
  • Attach metadata (id, stream, version, created timestamp)

Type Parameters

E

E extends Schemas

Event schemas

Parameters

stream

string

The stream ID to commit to

msgs

Message<E, keyof E>[]

Array of messages (events) to commit

meta

EventMeta

Event metadata (correlation, causation)

expectedVersion?

number

Expected current version for optimistic concurrency

Returns

Promise<Committed<E, keyof E>[]>

Array of committed events with full metadata

Throws

If expectedVersion doesn't match current version

Example

const events = await store().commit(
"user-123",
[{ name: "UserCreated", data: { email: "user@example.com" } }],
{ correlation: "req-456", causation: { action: {...} } },
0 // Expect version 0 (new stream)
);

dispose

dispose: Disposer

Defined in: libs/act/src/types/ports.ts:26

Inherited from

Disposable.dispose


drop

drop: () => Promise<void>

Defined in: libs/act/src/types/ports.ts:219

Drops all data from the store.

Dangerous operation that deletes all events and state. Use with extreme caution, primarily for testing or development environments.

Returns

Promise<void>

Example

// Clean up after tests
afterAll(async () => {
await store().drop();
});

query

query: <E>(callback, query?) => Promise<number>

Defined in: libs/act/src/types/ports.ts:283

Queries events from the store with optional filtering.

Calls the callback for each matching event. The callback approach allows processing large result sets without loading everything into memory.

Type Parameters

E

E extends Schemas

Event schemas

Parameters

callback

(event) => void

Function invoked for each matching event

query?

Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; stream_exact?: boolean; with_snaps?: boolean; }>

Optional filter criteria — see Query for fields (stream, name, after, before, created_after, created_before, limit, with_snaps, stream_exact).

Returns

Promise<number>

Total number of events processed

Example

let count = 0;
await store().query(
(event) => {
console.log(event.name, event.data);
count++;
},
{ stream: "user-123" }
);
console.log(`Found ${count} events`);

query_streams

query_streams: (callback, query?) => Promise<QueryStreamsResult>

Defined in: libs/act/src/types/ports.ts:498

Streams registered subscription positions to a callback, plus the highest event id in the store.

Read-only introspection for operational dashboards (Store / Subscriptions tab, projection lag, blocked subscriptions). Avoids forcing apps to open a second connection and run raw SQL against adapter-specific schemas.

Mirrors the Store.query callback pattern: the callback is invoked once per matching position, allowing large result sets to be processed without buffering. Results are ordered by stream name; use query.after (last seen stream name) for keyset pagination on big tables (dynamic reactions can produce one subscription per aggregate).

Parameters

callback

(position) => void

Invoked once per matching StreamPosition.

query?

QueryStreams

Optional QueryStreams filter (default limit: 100).

Returns

Promise<QueryStreamsResult>

maxEventId and the count of positions emitted.

Examples

const { maxEventId } = await store().query_streams(
(s) => console.log(`${s.stream}: lag=${maxEventId - s.at} ${s.error}`),
{ blocked: true, limit: 50 }
);
let after: string | undefined;
for (;;) {
const page: StreamPosition[] = [];
const { count } = await store().query_streams(
(s) => page.push(s),
{ after, limit: 100 }
);
if (!count) break;
// ... use page ...
after = page.at(-1)?.stream;
}

reset

reset: (streams) => Promise<number>

Defined in: libs/act/src/types/ports.ts:430

Resets watermarks for the given streams to -1, making them eligible for replay from the beginning. Also clears retry, blocked, error, and lease state so the streams can be claimed immediately.

Prefer Act.reset() over calling this directly. This primitive only resets the store; it does not raise the orchestrator's internal "needs drain" flag, so a settled Act instance will short-circuit and skip the replay. Act.reset() wraps this and arms the flag.

Parameters

streams

string[]

Stream names to reset

Returns

Promise<number>

Count of streams that were actually reset

Example

// Recommended
await app.reset(["my-projection"]);

// Low-level (does NOT trigger replay on settled apps)
await store().reset(["my-projection"]);

See

Act.reset for the high-level rebuild API that wraps this primitive and arms the orchestrator's drain flag


seed

seed: () => Promise<void>

Defined in: libs/act/src/types/ports.ts:204

Initializes or resets the store.

Used primarily for testing to ensure a clean state between tests. For production stores, this might create necessary tables or indexes.

Returns

Promise<void>

Example

// Reset store between tests
beforeEach(async () => {
await store().seed();
});

subscribe

subscribe: (streams) => Promise<{ subscribed: number; watermark: number; }>

Defined in: libs/act/src/types/ports.ts:348

Registers streams for event processing.

Upserts stream entries so they become visible to claim. Used by correlate() to register dynamically discovered reaction target streams.

Also returns the current maximum watermark across all subscribed streams, used internally for correlation checkpoint initialization on cold start.

Parameters

streams

object[]

Streams to register with optional source hint

Returns

Promise<{ subscribed: number; watermark: number; }>

subscribed count of newly registered streams, watermark max at across all streams

Example

const { subscribed, watermark } = await store().subscribe([
{ stream: "stats-user-1", source: "user-1" },
{ stream: "stats-user-2", source: "user-2" },
]);

See

claim for discovering and leasing registered streams


truncate

truncate: (targets) => Promise<TruncateResult>

Defined in: libs/act/src/types/ports.ts:448

Atomically truncates streams and seeds each with a final event.

For each target, in a single transaction:

  1. Deletes all events for the stream
  2. Removes the stream's entry from the streams table
  3. Inserts a __snapshot__ (when snapshot is provided) or __tombstone__ event as the sole event on the stream

Parameters

targets

object[]

Streams to truncate with optional snapshot state and meta

Returns

Promise<TruncateResult>

Map keyed by stream name, each entry with deleted count and committed event

See

Act.close for the high-level close-the-books API that orchestrates safety checks, archive callbacks, and atomic truncate+seed