Skip to main content

Store

@rotorsoft/act-root


@rotorsoft/act-root / act/src / Store

Interface: Store

Defined in: libs/act/src/types/ports.ts:62

Interface for event store implementations.

The Store interface defines the contract for persistence adapters in Act. Implementations must provide event storage, querying, and distributed processing capabilities through leasing and watermark tracking.

Act includes two built-in implementations:

  • InMemoryStore: For development and testing
  • PostgresStore: For production use with PostgreSQL

Custom stores can be implemented for other databases or event log systems.

Example

import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";

// Replace the default in-memory store
store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret"
}));

const app = act()
.withState(Counter)
.build();

See

  • InMemoryStore for the default implementation
  • PostgresStore for the PostgreSQL implementation

Extends

Properties

ack()

ack: (leases) => Promise<Lease[]>

Defined in: libs/act/src/types/ports.ts:238

Acknowledges successful processing of leased streams.

Updates the watermark to indicate events have been processed successfully. Releases the lease so other workers can process subsequent events.

Parameters

leases

Lease[]

Leases to acknowledge with updated watermarks

Returns

Promise<Lease[]>

Acknowledged leases

Example

const leased = await store().lease([...], 10000);
// Process events up to ID 150
await store().ack(leased.map(l => ({ ...l, at: 150 })));

See

lease for acquiring leases


block()

block: (leases) => Promise<Lease & object[]>

Defined in: libs/act/src/types/ports.ts:271

Blocks streams after persistent processing failures.

Blocked streams won't be returned by poll until manually unblocked. This prevents poison messages from repeatedly failing and consuming resources.

Streams are typically blocked when:

  • Max retries reached
  • blockOnError option is true
  • Handler throws an error

Parameters

leases

Lease & object[]

Leases to block with error messages

Returns

Promise<Lease & object[]>

Blocked leases

Example

try {
await processEvents(lease);
await store().ack([lease]);
} catch (error) {
if (lease.retry >= 3) {
await store().block([{
...lease,
error: error.message
}]);
}
}

See

lease for lease management


commit()

commit: <E>(stream, msgs, meta, expectedVersion?) => Promise<Committed<E, keyof E>[]>

Defined in: libs/act/src/types/ports.ts:123

Commits one or more events to a stream atomically.

This is the core method for persisting events. It must:

  • Assign global sequence IDs to events
  • Increment the stream version
  • Check optimistic concurrency if expectedVersion is provided
  • Store events atomically (all or nothing)
  • Attach metadata (id, stream, version, created timestamp)

Type Parameters

E

E extends Schemas

Event schemas

Parameters

stream

string

The stream ID to commit to

msgs

Message<E, keyof E>[]

Array of messages (events) to commit

meta

EventMeta

Event metadata (correlation, causation)

expectedVersion?

number

Expected current version for optimistic concurrency

Returns

Promise<Committed<E, keyof E>[]>

Array of committed events with full metadata

Throws

If expectedVersion doesn't match current version

Example

const events = await store().commit(
"user-123",
[{ name: "UserCreated", data: { email: "user@example.com" } }],
{ correlation: "req-456", causation: { action: {...} } },
0 // Expect version 0 (new stream)
);

dispose

dispose: Disposer

Defined in: libs/act/src/types/ports.ts:25

Inherited from

Disposable.dispose


drop()

drop: () => Promise<void>

Defined in: libs/act/src/types/ports.ts:92

Drops all data from the store.

Dangerous operation that deletes all events and state. Use with extreme caution, primarily for testing or development environments.

Returns

Promise<void>

Example

// Clean up after tests
afterAll(async () => {
await store().drop();
});

lease()

lease: (leases, millis) => Promise<Lease[]>

Defined in: libs/act/src/types/ports.ts:218

Acquires leases for exclusive stream processing.

Leasing prevents multiple workers from processing the same stream concurrently. Only grants leases if:

  • Stream isn't currently leased by another worker
  • Lease hasn't expired
  • Stream isn't blocked due to errors

Parameters

leases

Lease[]

Array of lease requests

millis

number

Lease duration in milliseconds

Returns

Promise<Lease[]>

Array of successfully granted leases

Example

const granted = await store().lease([
{ stream: "user-123", by: workerId, at: 0, retry: 0, lagging: false }
], 10000); // 10 second lease

if (granted.length > 0) {
// Process events...
await store().ack(granted);
}

See

  • poll for finding streams to lease
  • ack for acknowledging completion

poll()

poll: (lagging, leading) => Promise<Poll[]>

Defined in: libs/act/src/types/ports.ts:188

Polls for streams that need reaction processing.

Returns streams that have uncommitted events, ordered by their watermark. Uses a dual-frontier approach:

  • Lagging: New or behind streams (ascending watermark)
  • Leading: Active streams (descending watermark)

Only returns unblocked streams that aren't currently leased.

Parameters

lagging

number

Max streams to return from lagging frontier

leading

number

Max streams to return from leading frontier

Returns

Promise<Poll[]>

Array of poll results with stream, source, watermark, and lag status

Example

const polled = await store().poll(5, 5); // 5 lagging + 5 leading
polled.forEach(({ stream, at, lagging }) => {
console.log(`${stream} at ${at} (lagging: ${lagging})`);
});

See

Lease for lease management


query()

query: <E>(callback, query?) => Promise<number>

Defined in: libs/act/src/types/ports.ts:159

Queries events from the store with optional filtering.

Calls the callback for each matching event. The callback approach allows processing large result sets without loading everything into memory.

Type Parameters

E

E extends Schemas

Event schemas

Parameters

callback

(event) => void

Function invoked for each matching event

query?

Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; with_snaps?: boolean; }>

Optional filter criteria

Returns

Promise<number>

Total number of events processed

Example

let count = 0;
await store().query(
(event) => {
console.log(event.name, event.data);
count++;
},
{ stream: "user-123" }
);
console.log(`Found ${count} events`);

seed()

seed: () => Promise<void>

Defined in: libs/act/src/types/ports.ts:77

Initializes or resets the store.

Used primarily for testing to ensure a clean state between tests. For production stores, this might create necessary tables or indexes.

Returns

Promise<void>

Example

// Reset store between tests
beforeEach(async () => {
await store().seed();
});