Store
@rotorsoft/act-root / act/src / Store
Interface: Store
Defined in: libs/act/src/types/ports.ts:62
Interface for event store implementations.
The Store interface defines the contract for persistence adapters in Act. Implementations must provide event storage, querying, and distributed processing capabilities through leasing and watermark tracking.
Act includes two built-in implementations:
- InMemoryStore: For development and testing
- PostgresStore: For production use with PostgreSQL
Custom stores can be implemented for other databases or event log systems.
Example
import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";
// Replace the default in-memory store
store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret"
}));
const app = act()
.withState(Counter)
.build();
See
- InMemoryStore for the default implementation
- PostgresStore for the PostgreSQL implementation
Extends
Properties
ack()
ack: (
leases) =>Promise<Lease[]>
Defined in: libs/act/src/types/ports.ts:238
Acknowledges successful processing of leased streams.
Updates the watermark to indicate events have been processed successfully. Releases the lease so other workers can process subsequent events.
Parameters
leases
Lease[]
Leases to acknowledge with updated watermarks
Returns
Promise<Lease[]>
Acknowledged leases
Example
const leased = await store().lease([...], 10000);
// Process events up to ID 150
await store().ack(leased.map(l => ({ ...l, at: 150 })));
See
lease for acquiring leases
block()
block: (
leases) =>Promise<Lease&object[]>
Defined in: libs/act/src/types/ports.ts:271
Blocks streams after persistent processing failures.
Blocked streams won't be returned by poll until manually unblocked. This prevents poison messages from repeatedly failing and consuming resources.
Streams are typically blocked when:
- Max retries reached
blockOnErroroption is true- Handler throws an error
Parameters
leases
Lease & object[]
Leases to block with error messages
Returns
Promise<Lease & object[]>
Blocked leases
Example
try {
await processEvents(lease);
await store().ack([lease]);
} catch (error) {
if (lease.retry >= 3) {
await store().block([{
...lease,
error: error.message
}]);
}
}
See
lease for lease management
commit()
commit: <
E>(stream,msgs,meta,expectedVersion?) =>Promise<Committed<E, keyofE>[]>
Defined in: libs/act/src/types/ports.ts:123
Commits one or more events to a stream atomically.
This is the core method for persisting events. It must:
- Assign global sequence IDs to events
- Increment the stream version
- Check optimistic concurrency if expectedVersion is provided
- Store events atomically (all or nothing)
- Attach metadata (id, stream, version, created timestamp)
Type Parameters
E
E extends Schemas
Event schemas
Parameters
stream
string
The stream ID to commit to
msgs
Message<E, keyof E>[]
Array of messages (events) to commit
meta
Event metadata (correlation, causation)
expectedVersion?
number
Expected current version for optimistic concurrency
Returns
Promise<Committed<E, keyof E>[]>
Array of committed events with full metadata
Throws
If expectedVersion doesn't match current version
Example
const events = await store().commit(
"user-123",
[{ name: "UserCreated", data: { email: "user@example.com" } }],
{ correlation: "req-456", causation: { action: {...} } },
0 // Expect version 0 (new stream)
);
dispose
dispose:
Disposer
Defined in: libs/act/src/types/ports.ts:25
Inherited from
Disposable.dispose
drop()
drop: () =>
Promise<void>
Defined in: libs/act/src/types/ports.ts:92
Drops all data from the store.
Dangerous operation that deletes all events and state. Use with extreme caution, primarily for testing or development environments.
Returns
Promise<void>
Example
// Clean up after tests
afterAll(async () => {
await store().drop();
});
lease()
lease: (
leases,millis) =>Promise<Lease[]>
Defined in: libs/act/src/types/ports.ts:218
Acquires leases for exclusive stream processing.
Leasing prevents multiple workers from processing the same stream concurrently. Only grants leases if:
- Stream isn't currently leased by another worker
- Lease hasn't expired
- Stream isn't blocked due to errors
Parameters
leases
Lease[]
Array of lease requests
millis
number
Lease duration in milliseconds
Returns
Promise<Lease[]>
Array of successfully granted leases
Example
const granted = await store().lease([
{ stream: "user-123", by: workerId, at: 0, retry: 0, lagging: false }
], 10000); // 10 second lease
if (granted.length > 0) {
// Process events...
await store().ack(granted);
}
See
poll()
poll: (
lagging,leading) =>Promise<Poll[]>
Defined in: libs/act/src/types/ports.ts:188
Polls for streams that need reaction processing.
Returns streams that have uncommitted events, ordered by their watermark. Uses a dual-frontier approach:
- Lagging: New or behind streams (ascending watermark)
- Leading: Active streams (descending watermark)
Only returns unblocked streams that aren't currently leased.
Parameters
lagging
number
Max streams to return from lagging frontier
leading
number
Max streams to return from leading frontier
Returns
Promise<Poll[]>
Array of poll results with stream, source, watermark, and lag status
Example
const polled = await store().poll(5, 5); // 5 lagging + 5 leading
polled.forEach(({ stream, at, lagging }) => {
console.log(`${stream} at ${at} (lagging: ${lagging})`);
});
See
Lease for lease management
query()
query: <
E>(callback,query?) =>Promise<number>
Defined in: libs/act/src/types/ports.ts:159
Queries events from the store with optional filtering.
Calls the callback for each matching event. The callback approach allows processing large result sets without loading everything into memory.
Type Parameters
E
E extends Schemas
Event schemas
Parameters
callback
(event) => void
Function invoked for each matching event
query?
Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; with_snaps?: boolean; }>
Optional filter criteria
Returns
Promise<number>
Total number of events processed
Example
let count = 0;
await store().query(
(event) => {
console.log(event.name, event.data);
count++;
},
{ stream: "user-123" }
);
console.log(`Found ${count} events`);
seed()
seed: () =>
Promise<void>
Defined in: libs/act/src/types/ports.ts:77
Initializes or resets the store.
Used primarily for testing to ensure a clean state between tests. For production stores, this might create necessary tables or indexes.
Returns
Promise<void>
Example
// Reset store between tests
beforeEach(async () => {
await store().seed();
});