Skip to main content

Act

@rotorsoft/act-root


@rotorsoft/act-root / act/src / Act

Class: Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>

Defined in: libs/act/src/act.ts:52

See

Store

Main orchestrator for event-sourced state machines and workflows.

It manages the lifecycle of actions, reactions, and event streams, providing APIs for loading state, executing actions, querying events, and draining reactions.

Usage

const app = new Act(registry, 100);
await app.do("increment", { stream: "counter1", actor }, { by: 1 });
const snapshot = await app.load(Counter, "counter1");
await app.drain();
  • Register event listeners with .on("committed", ...) and .on("acked", ...) to react to lifecycle events.
  • Use .query() to analyze event streams for analytics or debugging.

Type Parameters

TSchemaReg

TSchemaReg extends SchemaRegister<TActions>

SchemaRegister for state

TEvents

TEvents extends Schemas

Schemas for events

TActions

TActions extends Schemas

Schemas for actions

TStateMap

TStateMap extends Record<string, Schema> = Record<string, never>

Map of state names to state schemas

TActor

TActor extends Actor = Actor

Actor type extending base Actor

Constructors

Constructor

new Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>(registry, _states?): Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>

Defined in: libs/act/src/act.ts:132

Create a new Act orchestrator.

Parameters

registry

Registry<TSchemaReg, TEvents, TActions>

The registry of state, event, and action schemas

_states?

Map<string, State<any, any, any>> = ...

Returns

Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>

Properties

registry

readonly registry: Registry<TSchemaReg, TEvents, TActions>

Defined in: libs/act/src/act.ts:133

The registry of state, event, and action schemas

Methods

correlate()

correlate(query?): Promise<{ last_id: number; leased: Lease[]; }>

Defined in: libs/act/src/act.ts:678

Discovers and registers new streams dynamically based on reaction resolvers.

Correlation enables "dynamic reactions" where target streams are determined at runtime based on event content. For example, you might create a stats stream for each user when they perform certain actions.

This method scans events matching the query and identifies new target streams based on reaction resolvers. It then registers these streams so they'll be picked up by the next drain cycle.

Parameters

query?

Query = ...

Query filter to scan for new correlations

Returns

Promise<{ last_id: number; leased: Lease[]; }>

Object with newly leased streams and last scanned event ID

Examples

// Scan for new streams
const { leased, last_id } = await app.correlate({ after: 0, limit: 100 });
console.log(`Found ${leased.length} new streams`);

// Save last_id for next scan
await saveCheckpoint(last_id);
const app = act()
.withState(User)
.withState(UserStats)
.on("UserLoggedIn")
.do(async (event) => ["incrementLoginCount", {}])
.to((event) => ({
target: `stats-${event.stream}` // Dynamic target per user
}))
.build();

// Discover stats streams as users log in
await app.correlate();

See


do()

do<TKey>(action, target, payload, reactingTo?, skipValidation?): Promise<Snapshot<TSchemaReg[TKey], TEvents>[]>

Defined in: libs/act/src/act.ts:227

Executes an action on a state instance, committing resulting events.

This is the primary method for modifying state. It:

  1. Validates the action payload against the schema
  2. Loads the current state snapshot
  3. Checks invariants (business rules)
  4. Executes the action handler to generate events
  5. Applies events to create new state
  6. Commits events to the store with optimistic concurrency control

Type Parameters

TKey

TKey extends string | number | symbol

Action name from registered actions

Parameters

action

TKey

The name of the action to execute

target

Target<TActor>

Target specification with stream ID and actor context

payload

Readonly<TActions[TKey]>

Action payload matching the action's schema

reactingTo?

Committed<TEvents, string & keyof TEvents>

Optional event that triggered this action (for correlation)

skipValidation?

boolean = false

Skip schema validation (use carefully, for performance)

Returns

Promise<Snapshot<TSchemaReg[TKey], TEvents>[]>

Array of snapshots for all affected states (usually one)

Throws

If payload doesn't match action schema

Throws

If business rules are violated

Throws

If another process modified the stream

Examples

const snapshots = await app.do(
"increment",
{
stream: "counter-1",
actor: { id: "user1", name: "Alice" }
},
{ by: 5 }
);

console.log(snapshots[0].state.count); // Current count after increment
try {
await app.do(
"withdraw",
{ stream: "account-123", actor: { id: "user1", name: "Alice" } },
{ amount: 1000 }
);
} catch (error) {
if (error instanceof InvariantError) {
console.error("Business rule violated:", error.description);
} else if (error instanceof ConcurrencyError) {
console.error("Concurrent modification detected, retry...");
} else if (error instanceof ValidationError) {
console.error("Invalid payload:", error.details);
}
}
const app = act()
.withState(Order)
.withState(Inventory)
.on("OrderPlaced")
.do(async (event, context) => {
// This action is triggered by an event
const result = await context.app.do(
"reduceStock",
{
stream: "inventory-1",
actor: event.meta.causation.action.actor
},
{ amount: event.data.items.length },
event // Pass event for correlation tracking
);
return result;
})
.to("inventory-1")
.build();

See


drain()

drain(options?): Promise<Drain<TEvents>>

Defined in: libs/act/src/act.ts:514

Processes pending reactions by draining uncommitted events from the event store.

Runs a single drain cycle:

  1. Polls the store for streams with uncommitted events
  2. Leases streams to prevent concurrent processing
  3. Fetches events for each leased stream
  4. Executes matching reaction handlers
  5. Acknowledges successful reactions or blocks failing ones

Drain uses a dual-frontier strategy to balance processing of new streams (lagging) vs active streams (leading). The ratio adapts based on event pressure.

Call correlate() before drain() to discover target streams. For a higher-level API that handles debouncing, correlation, and signaling automatically, use settle.

Parameters

options?

DrainOptions = {}

Drain configuration options

Returns

Promise<Drain<TEvents>>

Drain statistics with fetched, leased, acked, and blocked counts

Examples

await app.do("createUser", target, payload);
await app.correlate();
await app.drain();
await app.do("CreateItem", target, input);
app.settle(); // debounced correlate→drain, emits "settled"

See


emit()

Call Signature

emit(event, args): boolean

Defined in: libs/act/src/act.ts:74

Emit a lifecycle event (internal use, but can be used for custom listeners).

Parameters
event

"committed"

The event name ("committed", "acked", or "blocked")

args

Snapshot<TSchemaReg, TEvents>[]

The event payload

Returns

boolean

true if the event had listeners, false otherwise

Call Signature

emit(event, args): boolean

Defined in: libs/act/src/act.ts:75

Emit a lifecycle event (internal use, but can be used for custom listeners).

Parameters
event

"acked"

The event name ("committed", "acked", or "blocked")

args

Lease[]

The event payload

Returns

boolean

true if the event had listeners, false otherwise

Call Signature

emit(event, args): boolean

Defined in: libs/act/src/act.ts:76

Emit a lifecycle event (internal use, but can be used for custom listeners).

Parameters
event

"blocked"

The event name ("committed", "acked", or "blocked")

args

Lease & object[]

The event payload

Returns

boolean

true if the event had listeners, false otherwise

Call Signature

emit(event, args): boolean

Defined in: libs/act/src/act.ts:77

Emit a lifecycle event (internal use, but can be used for custom listeners).

Parameters
event

"settled"

The event name ("committed", "acked", or "blocked")

args

Drain<TEvents>

The event payload

Returns

boolean

true if the event had listeners, false otherwise


load()

Call Signature

load<TNewState, TNewEvents, TNewActions>(state, stream, callback?): Promise<Snapshot<TNewState, TNewEvents>>

Defined in: libs/act/src/act.ts:287

Loads the current state snapshot for a specific stream.

Reconstructs the current state by replaying events from the event store. Uses snapshots when available to optimize loading performance.

Accepts either a State definition object or a state name string. When using a string, the merged state (from partial states registered via .withState()) is resolved by name.

Type Parameters
TNewState

TNewState extends Schema

State schema type

TNewEvents

TNewEvents extends Schemas

Event schemas type

TNewActions

TNewActions extends Schemas

Action schemas type

Parameters
state

State<TNewState, TNewEvents, TNewActions>

The state definition or state name to load

stream

string

The stream ID (state instance identifier)

callback?

(snapshot) => void

Optional callback invoked with the loaded snapshot

Returns

Promise<Snapshot<TNewState, TNewEvents>>

The current state snapshot for the stream

Examples
const snapshot = await app.load(Counter, "counter-1");
console.log(snapshot.state.count); // Current count
console.log(snapshot.patches); // Events since last snapshot
const snapshot = await app.load("Ticket", "ticket-123");
console.log(snapshot.state.title); // Merged state from all partials
const [user, account] = await Promise.all([
app.load(User, "user-123"),
app.load(BankAccount, "account-456")
]);
See

Snapshot for snapshot structure

Call Signature

load<TKey>(name, stream, callback?): Promise<Snapshot<TStateMap[TKey], TEvents>>

Defined in: libs/act/src/act.ts:296

Loads the current state snapshot for a specific stream.

Reconstructs the current state by replaying events from the event store. Uses snapshots when available to optimize loading performance.

Accepts either a State definition object or a state name string. When using a string, the merged state (from partial states registered via .withState()) is resolved by name.

Type Parameters
TKey

TKey extends string

Parameters
name

TKey

stream

string

The stream ID (state instance identifier)

callback?

(snapshot) => void

Optional callback invoked with the loaded snapshot

Returns

Promise<Snapshot<TStateMap[TKey], TEvents>>

The current state snapshot for the stream

Examples
const snapshot = await app.load(Counter, "counter-1");
console.log(snapshot.state.count); // Current count
console.log(snapshot.patches); // Events since last snapshot
const snapshot = await app.load("Ticket", "ticket-123");
console.log(snapshot.state.title); // Merged state from all partials
const [user, account] = await Promise.all([
app.load(User, "user-123"),
app.load(BankAccount, "account-456")
]);
See

Snapshot for snapshot structure


off()

Call Signature

off(event, listener): this

Defined in: libs/act/src/act.ts:111

Remove a listener for a lifecycle event.

Parameters
event

"committed"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)

Call Signature

off(event, listener): this

Defined in: libs/act/src/act.ts:115

Remove a listener for a lifecycle event.

Parameters
event

"acked"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)

Call Signature

off(event, listener): this

Defined in: libs/act/src/act.ts:116

Remove a listener for a lifecycle event.

Parameters
event

"blocked"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)

Call Signature

off(event, listener): this

Defined in: libs/act/src/act.ts:120

Remove a listener for a lifecycle event.

Parameters
event

"settled"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)


on()

Call Signature

on(event, listener): this

Defined in: libs/act/src/act.ts:89

Register a listener for a lifecycle event ("committed", "acked", or "blocked").

Parameters
event

"committed"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)

Call Signature

on(event, listener): this

Defined in: libs/act/src/act.ts:93

Register a listener for a lifecycle event ("committed", "acked", or "blocked").

Parameters
event

"acked"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)

Call Signature

on(event, listener): this

Defined in: libs/act/src/act.ts:94

Register a listener for a lifecycle event ("committed", "acked", or "blocked").

Parameters
event

"blocked"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)

Call Signature

on(event, listener): this

Defined in: libs/act/src/act.ts:98

Register a listener for a lifecycle event ("committed", "acked", or "blocked").

Parameters
event

"settled"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)


query()

query(query, callback?): Promise<{ count: number; first?: Committed<TEvents, keyof TEvents>; last?: Committed<TEvents, keyof TEvents>; }>

Defined in: libs/act/src/act.ts:368

Queries the event store for events matching a filter.

Use this for analyzing event streams, generating reports, or debugging. The callback is invoked for each matching event, and the method returns summary information (first event, last event, total count).

For small result sets, consider using query_array instead.

Parameters

query

Query

The query filter

callback?

(event) => void

Optional callback invoked for each matching event

Returns

Promise<{ count: number; first?: Committed<TEvents, keyof TEvents>; last?: Committed<TEvents, keyof TEvents>; }>

Object with first event, last event, and total count

Examples

const { first, last, count } = await app.query(
{ stream: "counter-1" },
(event) => console.log(event.name, event.data)
);
console.log(`Found ${count} events from ${first?.id} to ${last?.id}`);
const { count } = await app.query(
{ name: "UserCreated", limit: 100 },
(event) => {
console.log("User created:", event.data.email);
}
);
const yesterday = new Date(Date.now() - 24 * 60 * 60 * 1000);
const { count } = await app.query({
created_after: yesterday,
stream: "user-123"
});
console.log(`User had ${count} events in last 24 hours`);

See

query_array for loading events into memory


query_array()

query_array(query): Promise<Committed<TEvents, keyof TEvents>[]>

Defined in: libs/act/src/act.ts:412

Queries the event store and returns all matching events in memory.

Use with caution - this loads all results into memory. For large result sets, use query with a callback instead to process events incrementally.

Parameters

query

Query

The query filter (same as query)

Returns

Promise<Committed<TEvents, keyof TEvents>[]>

Array of all matching events

Examples

const events = await app.query_array({ stream: "counter-1" });
console.log(`Loaded ${events.length} events`);
events.forEach(event => console.log(event.name, event.data));
const recent = await app.query_array({
stream: "user-123",
limit: 10
});

See

query for large result sets


settle()

settle(options?): void

Defined in: libs/act/src/act.ts:862

Debounced, non-blocking correlate→drain cycle.

Call this after app.do() to schedule a background drain. Multiple rapid calls within the debounce window are coalesced into a single cycle. Runs correlate→drain in a loop until the system reaches a consistent state, then emits the "settled" lifecycle event.

Parameters

options?

SettleOptions = {}

Settle configuration options

Returns

void

Example

await app.do("CreateItem", target, input);
app.settle(); // non-blocking, returns immediately

app.on("settled", (drain) => {
// notify SSE clients, invalidate caches, etc.
});

See

  • drain for single synchronous drain cycles
  • correlate for manual correlation

start_correlations()

start_correlations(query?, frequency?, callback?): boolean

Defined in: libs/act/src/act.ts:775

Starts automatic periodic correlation worker for discovering new streams.

The correlation worker runs in the background, scanning for new events and identifying new target streams based on reaction resolvers. It maintains a sliding window that advances with each scan, ensuring all events are eventually correlated.

This is useful for dynamic stream creation patterns where you don't know all streams upfront - they're discovered as events arrive.

Note: Only one correlation worker can run at a time per Act instance.

Parameters

query?

Query = {}

Query filter for correlation scans

frequency?

number = 10_000

Correlation frequency in milliseconds (default: 10000)

callback?

(leased) => void

Optional callback invoked with newly discovered streams

Returns

boolean

true if worker started, false if already running

Examples

// Start correlation worker scanning every 5 seconds
app.start_correlations(
{ after: 0, limit: 100 },
5000,
(leased) => {
console.log(`Discovered ${leased.length} new streams`);
}
);

// Later, stop it
app.stop_correlations();
// Load last checkpoint
const lastId = await loadCheckpoint();

app.start_correlations(
{ after: lastId, limit: 100 },
10000,
async (leased) => {
// Save checkpoint for next restart
if (leased.length) {
const maxId = Math.max(...leased.map(l => l.at));
await saveCheckpoint(maxId);
}
}
);

See


stop_correlations()

stop_correlations(): void

Defined in: libs/act/src/act.ts:814

Stops the automatic correlation worker.

Call this to stop the background correlation worker started by start_correlations. This is automatically called when the Act instance is disposed.

Returns

void

Example

// Start correlation
app.start_correlations();

// Later, stop it
app.stop_correlations();

See

start_correlations


stop_settling()

stop_settling(): void

Defined in: libs/act/src/act.ts:826

Cancels any pending or active settle cycle.

Returns

void

See

settle