Act
@rotorsoft/act-root / act/src / Act
Class: Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>
Defined in: libs/act/src/act.ts:115
Public interface for the Act orchestrator, passed to reaction handlers.
Provides typed access to action dispatch, state loading, and event querying. Construct with InferActions and InferEvents to avoid circular imports between slice files and the bootstrap module.
Example
import type { IAct, InferActions, InferEvents } from "@rotorsoft/act";
type App = IAct<
InferEvents<typeof StateA> & InferEvents<typeof StateB>,
InferActions<typeof StateA> & InferActions<typeof StateB>
>;
async function myReaction(event: ..., stream: string, app: App) {
await app.do("someAction", target, payload, event);
const snapshot = await app.load(MyState, "stream-1");
const events = await app.query_array({ stream: "stream-1" });
}
Type Parameters
TSchemaReg
TSchemaReg extends SchemaRegister<TActions>
Event schemas
TEvents
TEvents extends Schemas
Action schemas (maps action names to payload types)
TActions
TActions extends Schemas
Actor type extending base Actor
TStateMap
TStateMap extends Record<string, Schema> = Record<string, never>
TActor
Implements
IAct<TEvents,TActions,TActor>
Constructors
Constructor
new Act<
TSchemaReg,TEvents,TActions,TStateMap,TActor>(registry,_states?,batchHandlers?,options?):Act<TSchemaReg,TEvents,TActions,TStateMap,TActor>
Defined in: libs/act/src/act.ts:207
Create a new Act orchestrator. Prefer the act builder over
direct construction — act()...build() wires the registry, merges
partial states, and collects batch handlers from registered slices
and projections in one pass.
Parameters
registry
Registry<TSchemaReg, TEvents, TActions>
Schemas for every event and action across registered states
_states?
Map<string, State<any, any, any>> = ...
Merged map of state name → state definition
batchHandlers?
Map<string, BatchHandler<any>> = ...
Static-target projection batch handlers (target → handler)
options?
ActOptions = {}
Tuning knobs — see ActOptions
Returns
Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>
Properties
registry
readonlyregistry:Registry<TSchemaReg,TEvents,TActions>
Defined in: libs/act/src/act.ts:208
Schemas for every event and action across registered states
Methods
close()
close(
targets):Promise<CloseResult>
Defined in: libs/act/src/act.ts:814
Close the books — guard, archive, truncate, and optionally restart streams.
Safely removes historical events from the operational store:
- Correlate — discover pending reaction targets
- Safety check — skip streams with pending reactions (skipped when no reactive events)
- Guard — commit
__tombstone__withexpectedVersionto block concurrent writes - Load state — for streams in
snapshots, load final state while guarded (no races) - Archive — user callback per stream (abort-all on failure, streams are guarded)
- Truncate + seed — atomic: delete all events, insert
__snapshot__or__tombstone__ - Cache — invalidate (tombstoned) or warm (restarted)
- Emit "closed" — lifecycle event with results
Parameters
targets
Per-stream close options (stream, restart?, archive?)
Returns
Promise<CloseResult>
{ truncated: TruncateResult, skipped: string[] }
Examples
await app.close([
{ stream: "order-123", archive: async () => { await archiveToS3("order-123"); } },
{ stream: "order-456" },
]);
await app.close([
{ stream: "counter-1", restart: true },
{ stream: "counter-2" }, // tombstoned
]);
correlate()
correlate(
query?):Promise<{last_id:number;subscribed:number; }>
Defined in: libs/act/src/act.ts:644
Discovers and registers new streams dynamically based on reaction resolvers.
Correlation enables "dynamic reactions" where target streams are determined at runtime based on event content. For example, you might create a stats stream for each user when they perform certain actions.
This method scans events matching the query and identifies new target streams based on reaction resolvers. It then registers these streams so they'll be picked up by the next drain cycle.
Parameters
query?
Query = ...
Query filter to scan for new correlations
Returns
Promise<{ last_id: number; subscribed: number; }>
Object with newly leased streams and last scanned event ID
Examples
// Scan for new streams
const { leased, last_id } = await app.correlate({ after: 0, limit: 100 });
console.log(`Found ${leased.length} new streams`);
// Save last_id for next scan
await saveCheckpoint(last_id);
const app = act()
.withState(User)
.withState(UserStats)
.on("UserLoggedIn")
.do(async (event) => ["incrementLoginCount", {}])
.to((event) => ({
target: `stats-${event.stream}` // Dynamic target per user
}))
.build();
// Discover stats streams as users log in
await app.correlate();
See
- start_correlations for automatic periodic correlation
- stop_correlations to stop automatic correlation
do()
do<
TKey>(action,target,payload,reactingTo?,skipValidation?):Promise<Snapshot<TSchemaReg[TKey],TEvents>[]>
Defined in: libs/act/src/act.ts:353
Executes an action on a state instance, committing resulting events.
This is the primary method for modifying state. It:
- Validates the action payload against the schema
- Loads the current state snapshot
- Checks invariants (business rules)
- Executes the action handler to generate events
- Applies events to create new state
- Commits events to the store with optimistic concurrency control
Type Parameters
TKey
TKey extends string | number | symbol
Action name from registered actions
Parameters
action
TKey
The name of the action to execute
target
Target<TActor>
Target specification with stream ID and actor context
payload
Readonly<TActions[TKey]>
Action payload matching the action's schema
reactingTo?
Committed<TEvents, string & keyof TEvents>
Optional event that triggered this action (for correlation)
skipValidation?
boolean = false
Skip schema validation (use carefully, for performance)
Returns
Promise<Snapshot<TSchemaReg[TKey], TEvents>[]>
Array of snapshots for all affected states (usually one)
Throws
If payload doesn't match action schema
Throws
If business rules are violated
Throws
If another process modified the stream
Examples
const snapshots = await app.do(
"increment",
{
stream: "counter-1",
actor: { id: "user1", name: "Alice" }
},
{ by: 5 }
);
console.log(snapshots[0].state.count); // Current count after increment
try {
await app.do(
"withdraw",
{ stream: "account-123", actor: { id: "user1", name: "Alice" } },
{ amount: 1000 }
);
} catch (error) {
if (error instanceof InvariantError) {
console.error("Business rule violated:", error.description);
} else if (error instanceof ConcurrencyError) {
console.error("Concurrent modification detected, retry...");
} else if (error instanceof ValidationError) {
console.error("Invalid payload:", error.details);
}
}
const app = act()
.withState(Order)
.withState(Inventory)
.on("OrderPlaced")
.do(async function reduceInventory(event, _stream, app) {
// Inside reaction handlers, reactingTo is auto-injected when omitted.
// The triggering event is used by default, maintaining the correlation chain.
await app.do(
"reduceStock",
{ stream: "inventory-1", actor: { id: "sys", name: "system" } },
{ amount: event.data.items.length }
);
// To use a different correlation, pass reactingTo explicitly:
// await app.do("reduceStock", target, payload, customEvent);
})
.to("inventory-1")
.build();
See
- Target for target structure
- Snapshot for return value structure
- ValidationError, InvariantError, ConcurrencyError
Implementation of
drain()
drain(
options?):Promise<Drain<TEvents>>
Defined in: libs/act/src/act.ts:595
Processes pending reactions by draining uncommitted events from the event store.
Runs a single drain cycle:
- Polls the store for streams with uncommitted events
- Leases streams to prevent concurrent processing
- Fetches events for each leased stream
- Executes matching reaction handlers
- Acknowledges successful reactions or blocks failing ones
Drain uses a dual-frontier strategy to balance processing of new streams (lagging) vs active streams (leading). The ratio adapts based on event pressure.
Call correlate() before drain() to discover target streams. For a higher-level
API that handles debouncing, correlation, and signaling automatically, use settle.
Parameters
options?
DrainOptions = {}
Drain configuration — see DrainOptions for fields
(streamLimit, eventLimit, leaseMillis).
Returns
Promise<Drain<TEvents>>
Drain statistics with fetched, leased, acked, and blocked counts
Examples
await app.do("createUser", target, payload);
await app.correlate();
await app.drain();
await app.do("CreateItem", target, input);
app.settle(); // debounced correlate→drain, emits "settled"
See
- settle for debounced correlate→drain with lifecycle events
- correlate for dynamic stream discovery
- start_correlations for automatic correlation
emit()
emit<
E>(event,args):boolean
Defined in: libs/act/src/act.ts:137
Emit a lifecycle event. The payload type is inferred from the event name via ActLifecycleEvents.
Type Parameters
E
E extends keyof ActLifecycleEvents<TSchemaReg, TEvents, TActions>
Parameters
event
E
args
ActLifecycleEvents<TSchemaReg, TEvents, TActions>[E]
Returns
boolean
load()
Call Signature
load<
TNewState,TNewEvents,TNewActions>(state,stream,callback?,asOf?):Promise<Snapshot<TNewState,TNewEvents>>
Defined in: libs/act/src/act.ts:427
Loads the current state snapshot for a specific stream.
Reconstructs the current state by replaying events from the event store. Uses snapshots when available to optimize loading performance.
Accepts either a State definition object or a state name string. When
using a string, the merged state (from partial states registered via
.withState()) is resolved by name.
Type Parameters
TNewState
TNewState extends Schema
State schema type
TNewEvents
TNewEvents extends Schemas
Event schemas type
TNewActions
TNewActions extends Schemas
Action schemas type
Parameters
state
State<TNewState, TNewEvents, TNewActions>
The state definition or state name to load
stream
string
The stream ID (state instance identifier)
callback?
(snapshot) => void
Optional callback invoked with the loaded snapshot
asOf?
Returns
Promise<Snapshot<TNewState, TNewEvents>>
The current state snapshot for the stream
Examples
const snapshot = await app.load(Counter, "counter-1");
console.log(snapshot.state.count); // Current count
console.log(snapshot.patches); // Events since last snapshot
const snapshot = await app.load("Ticket", "ticket-123");
console.log(snapshot.state.title); // Merged state from all partials
const [user, account] = await Promise.all([
app.load(User, "user-123"),
app.load(BankAccount, "account-456")
]);
See
Snapshot for snapshot structure
Implementation of
Call Signature
load<
TKey>(name,stream,callback?,asOf?):Promise<Snapshot<TStateMap[TKey],TEvents>>
Defined in: libs/act/src/act.ts:437
Loads the current state snapshot for a specific stream.
Reconstructs the current state by replaying events from the event store. Uses snapshots when available to optimize loading performance.
Accepts either a State definition object or a state name string. When
using a string, the merged state (from partial states registered via
.withState()) is resolved by name.
Type Parameters
TKey
TKey extends string
Parameters
name
TKey
stream
string
The stream ID (state instance identifier)
callback?
(snapshot) => void
Optional callback invoked with the loaded snapshot
asOf?
Returns
Promise<Snapshot<TStateMap[TKey], TEvents>>
The current state snapshot for the stream
Examples
const snapshot = await app.load(Counter, "counter-1");
console.log(snapshot.state.count); // Current count
console.log(snapshot.patches); // Events since last snapshot
const snapshot = await app.load("Ticket", "ticket-123");
console.log(snapshot.state.title); // Merged state from all partials
const [user, account] = await Promise.all([
app.load(User, "user-123"),
app.load(BankAccount, "account-456")
]);
See
Snapshot for snapshot structure
Implementation of
IAct.load
off()
off<
E>(event,listener):this
Defined in: libs/act/src/act.ts:161
Remove a previously registered lifecycle listener.
Type Parameters
E
E extends keyof ActLifecycleEvents<TSchemaReg, TEvents, TActions>
Parameters
event
E
listener
(args) => void
Returns
this
on()
on<
E>(event,listener):this
Defined in: libs/act/src/act.ts:148
Register a listener for a lifecycle event. The listener receives the event-specific payload.
Type Parameters
E
E extends keyof ActLifecycleEvents<TSchemaReg, TEvents, TActions>
Parameters
event
E
listener
(args) => void
Returns
this
query()
query(
query,callback?):Promise<{count:number;first?:Committed<TEvents, keyofTEvents>;last?:Committed<TEvents, keyofTEvents>; }>
Defined in: libs/act/src/act.ts:506
Queries the event store for events matching a filter.
Use this for analyzing event streams, generating reports, or debugging. The callback is invoked for each matching event, and the method returns summary information (first event, last event, total count).
For small result sets, consider using query_array instead.
Parameters
query
Filter criteria — see Query for available fields
(stream, name, after, before, created_after, created_before,
limit, with_snaps, stream_exact)
callback?
(event) => void
Optional callback invoked for each matching event
Returns
Promise<{ count: number; first?: Committed<TEvents, keyof TEvents>; last?: Committed<TEvents, keyof TEvents>; }>
Object with first event, last event, and total count
Examples
const { first, last, count } = await app.query(
{ stream: "counter-1" },
(event) => console.log(event.name, event.data)
);
console.log(`Found ${count} events from ${first?.id} to ${last?.id}`);
const { count } = await app.query(
{ name: "UserCreated", limit: 100 },
(event) => {
console.log("User created:", event.data.email);
}
);
const yesterday = new Date(Date.now() - 24 * 60 * 60 * 1000);
const { count } = await app.query({
created_after: yesterday,
stream: "user-123"
});
console.log(`User had ${count} events in last 24 hours`);
See
query_array for loading events into memory
Implementation of
query_array()
query_array(
query):Promise<Committed<TEvents, keyofTEvents>[]>
Defined in: libs/act/src/act.ts:550
Queries the event store and returns all matching events in memory.
Use with caution - this loads all results into memory. For large result sets, use query with a callback instead to process events incrementally.
Parameters
query
The query filter (same as query)
Returns
Promise<Committed<TEvents, keyof TEvents>[]>
Array of all matching events
Examples
const events = await app.query_array({ stream: "counter-1" });
console.log(`Loaded ${events.length} events`);
events.forEach(event => console.log(event.name, event.data));
const recent = await app.query_array({
stream: "user-123",
limit: 10
});
See
query for large result sets
Implementation of
reset()
reset(
streams):Promise<number>
Defined in: libs/act/src/act.ts:775
Reset reaction stream watermarks and request a drain on the next
drain() / settle() cycle.
Use this to replay events through projections (or other reaction targets)
after changing handler logic. Equivalent to calling store().reset(streams)
directly, but also raises the orchestrator's internal "needs drain" flag —
store().reset(...) alone leaves the flag untouched, so a settled app
would short-circuit and skip the replay.
Pair with app.settle() (or a single app.drain() for small streams).
settle() loops correlate→drain until no progress is made, so one call
fully catches up paginated streams without forcing callers to roll
their own loop.
Parameters
streams
string[]
Reaction target streams (e.g., projection names) to reset
Returns
Promise<number>
Count of streams that were actually reset
Examples
await app.reset(["my-projection"]);
app.settle({ eventLimit: 1000 }); // emits "settled" when fully replayed
await app.reset(["my-projection"]);
await app.drain({ eventLimit: 1000 }); // small streams: one pass is enough
See
- Store.reset for the underlying store primitive
- settle for the debounced full-catch-up loop
settle()
settle(
options?):void
Defined in: libs/act/src/act.ts:863
Debounced, non-blocking correlate→drain cycle.
Call this after app.do() (or app.reset()) to schedule a background
drain. Multiple rapid calls within the debounce window are coalesced
into a single cycle. Runs correlate→drain in a loop until a pass makes
no progress — no new subscriptions, no acks, no blocks — then emits
the "settled" lifecycle event. This means a single settle() call
fully catches up paginated streams (e.g. after reset() on a long
projection) without forcing callers to loop.
Parameters
options?
SettleOptions = {}
Settle configuration — see SettleOptions for fields:
debounceMs (default 10), correlate (default { after: -1, limit: 100 }),
maxPasses (default Infinity — kill-switch for runaway loops),
streamLimit (default 10), eventLimit (default 10),
leaseMillis (default 10000).
Returns
void
Example
await app.do("CreateItem", target, input);
app.settle(); // non-blocking, returns immediately
app.on("settled", (drain) => {
// notify SSE clients, invalidate caches, etc.
});
See
start_correlations()
start_correlations(
query?,frequency?,callback?):boolean
Defined in: libs/act/src/act.ts:704
Starts automatic periodic correlation worker for discovering new streams.
The correlation worker runs in the background, scanning for new events and identifying new target streams based on reaction resolvers. It maintains a sliding window that advances with each scan, ensuring all events are eventually correlated.
This is useful for dynamic stream creation patterns where you don't know all streams upfront - they're discovered as events arrive.
Note: Only one correlation worker can run at a time per Act instance.
Parameters
query?
Query = {}
Query filter for correlation scans — see Query
(typically { after: -1, limit: 100 })
frequency?
number = 10_000
Correlation frequency in milliseconds (default: 10000)
callback?
(subscribed) => void
Optional callback invoked with newly discovered streams
Returns
boolean
true if worker started, false if already running
Examples
// Start correlation worker scanning every 5 seconds
app.start_correlations(
{ after: 0, limit: 100 },
5000,
(leased) => {
console.log(`Discovered ${leased.length} new streams`);
}
);
// Later, stop it
app.stop_correlations();
// Load last checkpoint
const lastId = await loadCheckpoint();
app.start_correlations(
{ after: lastId, limit: 100 },
10000,
async (leased) => {
// Save checkpoint for next restart
if (leased.length) {
const maxId = Math.max(...leased.map(l => l.at));
await saveCheckpoint(maxId);
}
}
);
See
- correlate for manual one-time correlation
- stop_correlations to stop the worker
stop_correlations()
stop_correlations():
void
Defined in: libs/act/src/act.ts:729
Stops the automatic correlation worker.
Call this to stop the background correlation worker started by start_correlations. This is automatically called when the Act instance is disposed.
Returns
void
Example
// Start correlation
app.start_correlations();
// Later, stop it
app.stop_correlations();
See
stop_settling()
stop_settling():
void
Defined in: libs/act/src/act.ts:738
Cancels any pending or active settle cycle.
Returns
void