Skip to main content

Act

@rotorsoft/act-root


@rotorsoft/act-root / act/src / Act

Class: Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>

Defined in: libs/act/src/act.ts:262

Type Parameters​

TSchemaReg​

TSchemaReg extends SchemaRegister<TActions>

TEvents​

TEvents extends Schemas

TActions​

TActions extends Schemas

TStateMap​

TStateMap extends Record<string, Schema> = Record<string, never>

TActor​

TActor extends Actor = Actor

Implements​

  • IAct<TEvents, TActions, TActor>

Constructors​

Constructor​

new Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>(registry, states?, batch_handlers?, options?, lanes?): Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>

Defined in: libs/act/src/act.ts:442

Create a new Act orchestrator. Prefer the act builder over direct construction β€” act()...build() wires the registry, merges partial states, and collects batch handlers from registered slices and projections in one pass.

Parameters​

registry​

Registry<TSchemaReg, TEvents, TActions, keyof TStateMap & string>

Schemas for every event and action across registered states

states?​

Map<string, State<any, any, any>> = ...

Merged map of state name β†’ state definition

batch_handlers?​

Map<string, BatchHandler<any>> = ...

Static-target projection batch handlers (target β†’ handler)

options?​

ActOptions = {}

Tuning knobs β€” see ActOptions

lanes?​

readonly LaneConfig[] = []

Declared drain lanes (ACT-1103). The builder collects these from .withLane(...) calls. Slice 1 records them on the instance; later slices fan out one DrainController per lane.

Returns​

Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>

Properties​

registry​

readonly registry: Registry<TSchemaReg, TEvents, TActions, keyof TStateMap & string>

Defined in: libs/act/src/act.ts:303

Public registry β€” kept as-is per the no-prefix-on-public convention.

Accessors​

lanes​

Get Signature​

get lanes(): readonly LaneConfig[]

Defined in: libs/act/src/act.ts:424

Drain lanes declared via .withLane(...). Implicit default not included.

Returns​

readonly LaneConfig[]

Methods​

audit()​

audit(categories?, options?): AsyncIterable<AuditFinding>

Defined in: libs/act/src/act.ts:1539

Operator-driven store audit (#723).

Walks the connected store and yields per-category findings β€” each tagged with the remediation it suggests. Same operator- driven category as app.close() / app.reset() / app.unblock() / app.blocked_streams(): never auto-invoked by the framework; the operator decides when to run it (CI gate, scheduled job, ad-hoc forensics) and what to do with the findings.

Categories are independent β€” pass a subset to scope the work, or omit to run everything:

// Targeted: schema drift + deprecated-event load only
for await (const f of app.audit(["schema", "deprecated-load"], {
query: { created_after: lastScan },
thresholds: { deprecatedLoadShareMin: 0.10 },
})) {
await escalate(f);
}

// Full audit, default thresholds
for await (const f of app.audit()) console.log(f);

Returns an AsyncIterable so callers can break early β€” the underlying store paginations respect the iterator protocol and stop cleanly. Each finding is emitted independently, so pipelining into Slack / persistence / further analysis works without buffering the full report in memory.

Findings shape β€” see AuditFinding. The discriminated union carries enough context for the operator to act on each finding directly: stream id, event id, recommendation hints.

Parameters​

categories?​

AuditCategory[]

Subset of categories to run (default: all).

options?​

AuditOptions

Query window + per-category thresholds.

Returns​

AsyncIterable<AuditFinding>

Async iterable of AuditFinding.


blocked_streams()​

blocked_streams(options?): Promise<StreamPosition[]>

Defined in: libs/act/src/act.ts:1482

Return every currently-blocked stream position. Convenience wrapper around store().query_streams(cb, { blocked: true }) for the common "show me what's broken" operational query.

Results are ordered by stream name, paginated by limit (default 100). Pass after to fetch the next page (keyset cursor on the stream name). For richer queries β€” including blocked + source filters, or full unblocked introspection β€” drop to store().query_streams(...) directly.

Parameters​

options?​
after?​

string

limit?​

number

Returns​

Promise<StreamPosition[]>

Array of StreamPosition for currently-blocked streams.

Example​

Discover and recover

const blocked = await app.blocked_streams();
console.table(blocked.map(({ stream, retry, error }) => ({ stream, retry, error })));

// Operator investigates, then bulk-unblocks the family:
await app.unblock({ stream: "^webhooks-out-" });

close()​

close(targets): Promise<CloseResult>

Defined in: libs/act/src/act.ts:1621

Close the books β€” guard, archive, truncate, and optionally restart streams.

Safely removes historical events from the operational store:

  1. Correlate β€” discover pending reaction targets
  2. Safety check β€” skip streams with pending reactions (skipped when no reactive events)
  3. Guard β€” commit __tombstone__ with expectedVersion to block concurrent writes
  4. Load state β€” for streams in snapshots, load final state while guarded (no races)
  5. Archive β€” user callback per stream (abort-all on failure, streams are guarded)
  6. Truncate + seed β€” atomic: delete all events, insert __snapshot__ or __tombstone__
  7. Cache β€” invalidate (tombstoned) or warm (restarted)
  8. Emit "closed" β€” lifecycle event with results

Parameters​

targets​

CloseTarget[]

Per-stream close options (stream, restart?, archive?)

Returns​

Promise<CloseResult>

{ truncated: TruncateResult, skipped: string[] }

Examples​

Archive and close

await app.close([
{ stream: "order-123", archive: async () => { await archiveToS3("order-123"); } },
{ stream: "order-456" },
]);

Close with restart (state loaded automatically after guard)

await app.close([
{ stream: "counter-1", restart: true },
{ stream: "counter-2" }, // tombstoned
]);

correlate()​

correlate(query?): Promise<{ last_id: number; subscribed: number; }>

Defined in: libs/act/src/act.ts:1197

Discovers and registers new streams dynamically based on reaction resolvers.

Correlation enables "dynamic reactions" where target streams are determined at runtime based on event content. For example, you might create a stats stream for each user when they perform certain actions.

This method scans events matching the query and identifies new target streams based on reaction resolvers. It then registers these streams so they'll be picked up by the next drain cycle.

Parameters​

query?​

Query = ...

Query filter to scan for new correlations

Returns​

Promise<{ last_id: number; subscribed: number; }>

Object with newly leased streams and last scanned event ID

Examples​

Manual correlation

// Scan for new streams
const { leased, last_id } = await app.correlate({ after: 0, limit: 100 });
console.log(`Found ${leased.length} new streams`);

// Save last_id for next scan
await saveCheckpoint(last_id);

Dynamic stream creation

const app = act()
.withState(User)
.withState(UserStats)
.on("UserLoggedIn")
.do(async (event) => ["incrementLoginCount", {}])
.to((event) => ({
target: `stats-${event.stream}` // Dynamic target per user
}))
.build();

// Discover stats streams as users log in
await app.correlate();

See​


do()​

do<TKey>(action, target, payload, options?): Promise<Snapshot<TSchemaReg[TKey], TEvents>[]>

Defined in: libs/act/src/act.ts:785

Executes an action on a state instance, committing resulting events.

This is the primary method for modifying state. It:

  1. Validates the action payload against the schema
  2. Loads the current state snapshot
  3. Checks invariants (business rules)
  4. Executes the action handler to generate events
  5. Applies events to create new state
  6. Commits events to the store with optimistic concurrency control

Type Parameters​

TKey​

TKey extends string | number | symbol

Action name from registered actions

Parameters​

action​

TKey

The name of the action to execute

target​

Target<TActor>

Target specification with stream ID and actor context

payload​

Readonly<TActions[TKey]>

Action payload matching the action's schema

options?​

DoOptions<TEvents>

Per-call dispatch options (DoOptions) β€” reactingTo to thread correlation, correlator to override the framework or orchestrator-level correlator for this call only.

Returns​

Promise<Snapshot<TSchemaReg[TKey], TEvents>[]>

Array of snapshots for all affected states (usually one)

Throws​

If payload doesn't match action schema

Throws​

If business rules are violated

Throws​

If another process modified the stream

Examples​

Basic action execution

const snapshots = await app.do(
"increment",
{
stream: "counter-1",
actor: { id: "user1", name: "Alice" }
},
{ by: 5 }
);

console.log(snapshots[0].state.count); // Current count after increment

With error handling

try {
await app.do(
"withdraw",
{ stream: "account-123", actor: { id: "user1", name: "Alice" } },
{ amount: 1000 }
);
} catch (error) {
if (error instanceof InvariantError) {
console.error("Business rule violated:", error.description);
} else if (error instanceof ConcurrencyError) {
console.error("Concurrent modification detected, retry...");
} else if (error instanceof ValidationError) {
console.error("Invalid payload:", error.details);
}
}

Reaction triggering another action (reactingTo auto-injected)

const app = act()
.withState(Order)
.withState(Inventory)
.on("OrderPlaced")
.do(async function reduceInventory(event, _stream, app) {
// Inside reaction handlers, reactingTo is auto-injected when omitted.
// The triggering event is used by default, maintaining the correlation chain.
await app.do(
"reduceStock",
{ stream: "inventory-1", actor: { id: "sys", name: "system" } },
{ amount: event.data.items.length }
);
// To use a different correlation, pass reactingTo explicitly:
// await app.do("reduceStock", target, payload, { reactingTo: customEvent });
})
.to("inventory-1")
.build();

See​

Implementation of​

IAct.do


drain()​

drain(options?): Promise<Drain<TEvents>>

Defined in: libs/act/src/act.ts:1088

Processes pending reactions by draining uncommitted events from the event store.

Runs a single drain cycle:

  1. Polls the store for streams with uncommitted events
  2. Leases streams to prevent concurrent processing
  3. Fetches events for each leased stream
  4. Executes matching reaction handlers
  5. Acknowledges successful reactions or blocks failing ones

Drain uses a dual-frontier strategy to balance processing of new streams (lagging) vs active streams (leading). The ratio adapts based on event pressure.

Call correlate() before drain() to discover target streams. For a higher-level API that handles debouncing, correlation, and signaling automatically, use settle.

Parameters​

options?​

DrainOptions = {}

Drain configuration β€” see DrainOptions for fields (streamLimit, eventLimit, leaseMillis).

Returns​

Promise<Drain<TEvents>>

Drain statistics with fetched, leased, acked, and blocked counts

Examples​

In tests and scripts

await app.do("createUser", target, payload);
await app.correlate();
await app.drain();

In production, prefer settle()

await app.do("CreateItem", target, input);
app.settle(); // debounced correlate→drain, emits "settled"

See​


emit()​

emit<E>(event, args): boolean

Defined in: libs/act/src/act.ts:332

Emit a lifecycle event. The payload type is inferred from the event name via ActLifecycleEvents.

Type Parameters​

E​

E extends keyof ActLifecycleEvents<TSchemaReg, TEvents, TActions>

Parameters​

event​

E

args​

ActLifecycleEvents<TSchemaReg, TEvents, TActions>[E]

Returns​

boolean


forget()​

forget(stream): Promise<{ eventCount: number; }>

Defined in: libs/act/src/act.ts:1033

Wipe the sensitive-data payload for every event on the stream β€” see IAct.forget. Application-level half of #566.

Throws on adapters without Store.forget_pii, invalidates the cache entry for the stream, emits the forgotten lifecycle event with the row count. Idempotent: a second call returns {eventCount: 0} and does NOT re-emit.

Parameters​

stream​

string

Target stream.

Returns​

Promise<{ eventCount: number; }>

{eventCount} β€” number of events whose PII column was wiped.

Implementation of​

IAct.forget


load()​

Call Signature​

load<TNewState, TNewEvents, TNewActions>(state, stream, callback?, asOf?): Promise<Snapshot<TNewState, TNewEvents>>

Defined in: libs/act/src/act.ts:858

Loads the current state snapshot for a specific stream.

Reconstructs the current state by replaying events from the event store. Uses snapshots when available to optimize loading performance.

Accepts either a State definition object or a state name string. When using a string, the merged state (from partial states registered via .withState()) is resolved by name.

Type Parameters​
TNewState​

TNewState extends Schema

State schema type

TNewEvents​

TNewEvents extends Schemas

Event schemas type

TNewActions​

TNewActions extends Schemas

Action schemas type

Parameters​
state​

State<TNewState, TNewEvents, TNewActions>

The state definition or state name to load

stream​

string

The stream ID (state instance identifier)

callback?​

(snapshot) => void

Optional callback invoked with the loaded snapshot

asOf?​

AsOf

Returns​

Promise<Snapshot<TNewState, TNewEvents>>

The current state snapshot for the stream

Examples​

Load by state definition

const snapshot = await app.load(Counter, "counter-1");
console.log(snapshot.state.count); // Current count
console.log(snapshot.patches); // Events since last snapshot

Load by state name (useful with partial states)

const snapshot = await app.load("Ticket", "ticket-123");
console.log(snapshot.state.title); // Merged state from all partials

Load multiple states

const [user, account] = await Promise.all([
app.load(User, "user-123"),
app.load(BankAccount, "account-456")
]);
See​

Snapshot for snapshot structure

Implementation of​

IAct.load

Call Signature​

load<TKey>(name, stream, callback?, asOf?): Promise<Snapshot<TStateMap[TKey], TEvents>>

Defined in: libs/act/src/act.ts:868

Loads the current state snapshot for a specific stream.

Reconstructs the current state by replaying events from the event store. Uses snapshots when available to optimize loading performance.

Accepts either a State definition object or a state name string. When using a string, the merged state (from partial states registered via .withState()) is resolved by name.

Type Parameters​
TKey​

TKey extends string

Parameters​
name​

TKey

stream​

string

The stream ID (state instance identifier)

callback?​

(snapshot) => void

Optional callback invoked with the loaded snapshot

asOf?​

AsOf

Returns​

Promise<Snapshot<TStateMap[TKey], TEvents>>

The current state snapshot for the stream

Examples​

Load by state definition

const snapshot = await app.load(Counter, "counter-1");
console.log(snapshot.state.count); // Current count
console.log(snapshot.patches); // Events since last snapshot

Load by state name (useful with partial states)

const snapshot = await app.load("Ticket", "ticket-123");
console.log(snapshot.state.title); // Merged state from all partials

Load multiple states

const [user, account] = await Promise.all([
app.load(User, "user-123"),
app.load(BankAccount, "account-456")
]);
See​

Snapshot for snapshot structure

Implementation of​

IAct.load

Call Signature​

load<TNewState, TNewEvents, TNewActions>(state, target, callback?): Promise<Snapshot<TNewState, TNewEvents>>

Defined in: libs/act/src/act.ts:875

Loads the current state snapshot for a specific stream.

Reconstructs the current state by replaying events from the event store. Uses snapshots when available to optimize loading performance.

Accepts either a State definition object or a state name string. When using a string, the merged state (from partial states registered via .withState()) is resolved by name.

Type Parameters​
TNewState​

TNewState extends Schema

State schema type

TNewEvents​

TNewEvents extends Schemas

Event schemas type

TNewActions​

TNewActions extends Schemas

Action schemas type

Parameters​
state​

State<TNewState, TNewEvents, TNewActions>

The state definition or state name to load

target​

LoadTarget<TActor>

callback?​

(snapshot) => void

Optional callback invoked with the loaded snapshot

Returns​

Promise<Snapshot<TNewState, TNewEvents>>

The current state snapshot for the stream

Examples​

Load by state definition

const snapshot = await app.load(Counter, "counter-1");
console.log(snapshot.state.count); // Current count
console.log(snapshot.patches); // Events since last snapshot

Load by state name (useful with partial states)

const snapshot = await app.load("Ticket", "ticket-123");
console.log(snapshot.state.title); // Merged state from all partials

Load multiple states

const [user, account] = await Promise.all([
app.load(User, "user-123"),
app.load(BankAccount, "account-456")
]);
See​

Snapshot for snapshot structure

Implementation of​

IAct.load

Call Signature​

load<TKey>(name, target, callback?): Promise<Snapshot<TStateMap[TKey], TEvents>>

Defined in: libs/act/src/act.ts:884

Loads the current state snapshot for a specific stream.

Reconstructs the current state by replaying events from the event store. Uses snapshots when available to optimize loading performance.

Accepts either a State definition object or a state name string. When using a string, the merged state (from partial states registered via .withState()) is resolved by name.

Type Parameters​
TKey​

TKey extends string

Parameters​
name​

TKey

target​

LoadTarget<TActor>

callback?​

(snapshot) => void

Optional callback invoked with the loaded snapshot

Returns​

Promise<Snapshot<TStateMap[TKey], TEvents>>

The current state snapshot for the stream

Examples​

Load by state definition

const snapshot = await app.load(Counter, "counter-1");
console.log(snapshot.state.count); // Current count
console.log(snapshot.patches); // Events since last snapshot

Load by state name (useful with partial states)

const snapshot = await app.load("Ticket", "ticket-123");
console.log(snapshot.state.title); // Merged state from all partials

Load multiple states

const [user, account] = await Promise.all([
app.load(User, "user-123"),
app.load(BankAccount, "account-456")
]);
See​

Snapshot for snapshot structure

Implementation of​

IAct.load


off()​

off<E>(event, listener): this

Defined in: libs/act/src/act.ts:356

Remove a previously registered lifecycle listener.

Type Parameters​

E​

E extends keyof ActLifecycleEvents<TSchemaReg, TEvents, TActions>

Parameters​

event​

E

listener​

(args) => void

Returns​

this


on()​

on<E>(event, listener): this

Defined in: libs/act/src/act.ts:343

Register a listener for a lifecycle event. The listener receives the event-specific payload.

Type Parameters​

E​

E extends keyof ActLifecycleEvents<TSchemaReg, TEvents, TActions>

Parameters​

event​

E

listener​

(args) => void

Returns​

this


prioritize()​

prioritize(filter, priority): Promise<number>

Defined in: libs/act/src/act.ts:1584

Bulk-update scheduling priority for streams matching filter.

Operator-grade override of the claim() lagging-frontier ordering (ACT-102). Useful when a long-running replay needs to jump ahead of other lagging streams, or when a no-longer-urgent job should yield slots back to the rest. Build-time priorities (set via the resolver's priority field) are subject to a max() invariant across reactions; this API ignores that and sets the priority outright on every matching row.

Filter shape mirrors query / Store.query_streams: stream / source are regex by default, exact with the *_exact flags; blocked restricts to blocked or unblocked rows. An empty filter ({}) updates every registered stream.

Parameters​

filter​

StreamFilter

Selection criteria (regex by default).

priority​

number

New priority value. Set as-is β€” no clamp.

Returns​

Promise<number>

Count of streams whose priority changed.

Examples​

Boost a specific projection mid-replay

await app.prioritize({ stream: "^proj-orders$", stream_exact: false }, 10);

Drop all audit projections to background

await app.prioritize({ source: "^audit-" }, -5);

Reset everyone to default

await app.prioritize({}, 0);

See​

  • Store.prioritize for the underlying primitive
  • claim for how priority biases scheduling

query()​

query(query, callback?): Promise<{ count: number; first?: Committed<TEvents, keyof TEvents>; last?: Committed<TEvents, keyof TEvents>; }>

Defined in: libs/act/src/act.ts:965

Queries the event store for events matching a filter.

Use this for analyzing event streams, generating reports, or debugging. The callback is invoked for each matching event, and the method returns summary information (first event, last event, total count).

For small result sets, consider using query_array instead.

Parameters​

query​

Query

Filter criteria β€” see Query for available fields (stream, name, after, before, created_after, created_before, limit, with_snaps, stream_exact)

callback?​

(event) => void

Optional callback invoked for each matching event

Returns​

Promise<{ count: number; first?: Committed<TEvents, keyof TEvents>; last?: Committed<TEvents, keyof TEvents>; }>

Object with first event, last event, and total count

Examples​

Query all events for a stream

const { first, last, count } = await app.query(
{ stream: "counter-1" },
(event) => console.log(event.name, event.data)
);
console.log(`Found ${count} events from ${first?.id} to ${last?.id}`);

Query specific event types

const { count } = await app.query(
{ name: "UserCreated", limit: 100 },
(event) => {
console.log("User created:", event.data.email);
}
);

Query events in time range

const yesterday = new Date(Date.now() - 24 * 60 * 60 * 1000);
const { count } = await app.query({
created_after: yesterday,
stream: "user-123"
});
console.log(`User had ${count} events in last 24 hours`);

See​

query_array for loading events into memory

Implementation of​

IAct.query


query_array()​

query_array(query): Promise<Committed<TEvents, keyof TEvents>[]>

Defined in: libs/act/src/act.ts:1011

Queries the event store and returns all matching events in memory.

Use with caution - this loads all results into memory. For large result sets, use query with a callback instead to process events incrementally.

Parameters​

query​

Query

The query filter (same as query)

Returns​

Promise<Committed<TEvents, keyof TEvents>[]>

Array of all matching events

Examples​

Load all events for a stream

const events = await app.query_array({ stream: "counter-1" });
console.log(`Loaded ${events.length} events`);
events.forEach(event => console.log(event.name, event.data));

Get recent events

const recent = await app.query_array({
stream: "user-123",
limit: 10
});

See​

query for large result sets

Implementation of​

IAct.query_array


reset()​

reset(input): Promise<number>

Defined in: libs/act/src/act.ts:1338

Reset reaction stream watermarks and request a drain on the next drain() / settle() cycle.

Use this to replay events through projections (or other reaction targets) after changing handler logic. Equivalent to calling store().reset(streams) directly, but also raises the orchestrator's internal "needs drain" flag β€” store().reset(...) alone leaves the flag untouched, so a settled app would short-circuit and skip the replay.

Pair with app.settle() (or a single app.drain() for small streams). settle() loops correlate→drain until no progress is made, so one call fully catches up paginated streams without forcing callers to roll their own loop.

Parameters​

input​

string[] | StreamFilter

Reaction target streams (e.g., projection names) to reset, or a StreamFilter for bulk operations

Returns​

Promise<number>

Count of streams that were actually reset

Examples​

Rebuild a projection (production)

await app.reset(["my-projection"]);
app.settle({ eventLimit: 1000 }); // emits "settled" when fully replayed

Rebuild a projection (tests / scripts)

await app.reset(["my-projection"]);
await app.drain({ eventLimit: 1000 }); // small streams: one pass is enough

See​

  • Store.reset for the underlying store primitive
  • settle for the debounced full-catch-up loop

restore()​

restore(source, opts?, sink?): Promise<ScanResult>

Defined in: libs/act/src/act.ts:1423

Atomically wipe the store and rebuild it from an async stream of committed events. The framework owns iteration, validation, drop_snapshots filtering, on_progress, and the per-call old β†’ new causation remap; the adapter's Store.restore driver supplies the transaction lifecycle and per-event insert.

Throws if the adapter has no restore capability. Throws on the first invalid event (negative version, malformed created) with the running index in the message; atomic transaction rollback in the adapter means a failing restore leaves the store byte-for-byte unchanged.

Parameters​

source​

EventSource

Async stream of events in target order. Streamed rather than buffered so multi-million-event backups don't OOM. Each event's original id is used as a causation lookup key but never written through β€” adapters renumber densely.

opts?​

ScanOptions = {}

ScanOptions. drop_snapshots skips __snapshot__ events (counted in the result); on_progress fires once per event.

sink?​

EventSink

Returns​

Promise<ScanResult>

ScanResult with kept, duration_ms, and dropped per-category counters.

Example​

Round-trip a CSV backup

async function* parseCsv(blob: string) {
for (const line of blob.split("\n").slice(1)) {
const [id, name, data, stream, version, created, meta] = parse(line);
yield {
id: +id, name, data: JSON.parse(data), stream,
version: +version, created: new Date(created),
meta: JSON.parse(meta),
};
}
}
const result = await app.restore(parseCsv(csvBlob), {});
console.log(`Restored ${result.kept} events in ${result.duration_ms}ms`);
await cache().clear(); // operator's responsibility

See​

Store.restore for the underlying driver-pattern primitive.


settle()​

settle(options?): void

Defined in: libs/act/src/act.ts:1676

Debounced, non-blocking correlate→drain cycle.

Call this after app.do() (or app.reset()) to schedule a background drain. Multiple rapid calls within the debounce window are coalesced into a single cycle. Runs correlate→drain in a loop until a pass makes no progress — no new subscriptions, no acks, no blocks — then emits the "settled" lifecycle event. This means a single settle() call fully catches up paginated streams (e.g. after reset() on a long projection) without forcing callers to loop.

Parameters​

options?​

SettleOptions = {}

Settle configuration β€” see SettleOptions for fields: debounceMs (default 10), correlate (default { after: -1, limit: 100 }), maxPasses (default Infinity β€” kill-switch for runaway loops), streamLimit (default 10), eventLimit (default 10), leaseMillis (default 10000).

Returns​

void

Example​

API mutations

await app.do("CreateItem", target, input);
app.settle(); // non-blocking, returns immediately

app.on("settled", (drain) => {
// notify SSE clients, invalidate caches, etc.
});

See​

  • drain for single synchronous drain cycles
  • correlate for manual correlation

shutdown()​

shutdown(): Promise<void>

Defined in: libs/act/src/act.ts:639

Per-instance teardown: remove lifecycle listeners, stop the correlation worker, cancel any pending settle cycle, and tear down the cross-process notify subscription.

Idempotent β€” repeated calls return the same promise. Registered automatically with the global dispose() registry at construction, so process-wide dispose()() covers it; test helpers (or operators that mint short-lived Acts) call it explicitly for prompt cleanup.

Returns​

Promise<void>


start_correlations()​

start_correlations(query?, frequency?, callback?): boolean

Defined in: libs/act/src/act.ts:1261

Starts automatic periodic correlation worker for discovering new streams.

The correlation worker runs in the background, scanning for new events and identifying new target streams based on reaction resolvers. It maintains a sliding window that advances with each scan, ensuring all events are eventually correlated.

This is useful for dynamic stream creation patterns where you don't know all streams upfront - they're discovered as events arrive.

Note: Only one correlation worker can run at a time per Act instance.

Parameters​

query?​

Query = {}

Query filter for correlation scans β€” see Query (typically { after: -1, limit: 100 })

frequency?​

number = 10_000

Correlation frequency in milliseconds (default: 10000)

callback?​

(subscribed) => void

Optional callback invoked with newly discovered streams

Returns​

boolean

true if worker started, false if already running

Examples​

Start automatic correlation

// Start correlation worker scanning every 5 seconds
app.start_correlations(
{ after: 0, limit: 100 },
5000,
(leased) => {
console.log(`Discovered ${leased.length} new streams`);
}
);

// Later, stop it
app.stop_correlations();

With checkpoint persistence

// Load last checkpoint
const lastId = await loadCheckpoint();

app.start_correlations(
{ after: lastId, limit: 100 },
10000,
async (leased) => {
// Save checkpoint for next restart
if (leased.length) {
const maxId = Math.max(...leased.map(l => l.at));
await saveCheckpoint(maxId);
}
}
);

See​


stop_correlations()​

stop_correlations(): void

Defined in: libs/act/src/act.ts:1291

Stops the automatic correlation worker.

Call this to stop the background correlation worker started by start_correlations. This is automatically called when the Act instance is disposed.

Returns​

void

Example​

// Start correlation
app.start_correlations();

// Later, stop it
app.stop_correlations();

See​

start_correlations


stop_settling()​

stop_settling(): void

Defined in: libs/act/src/act.ts:1301

Cancels any pending or active settle cycle.

Returns​

void

See​

settle


unblock()​

unblock(input): Promise<number>

Defined in: libs/act/src/act.ts:1373

Clear the blocked flag on streams without replaying their history.

Use this to recover from a poison message after fixing the underlying issue β€” the stream resumes from the next event after the last successful ack, not from the beginning. Compare with reset, which rebuilds from event 0 (suitable for projection rebuilds, wrong for "I fixed the bug, please retry").

Wraps store().unblock(streams) and raises the orchestrator's internal "needs drain" flag so a settled app picks up the now-free streams on the next cycle. Equivalent to calling store().unblock(...) directly, but store().unblock(...) alone leaves the flag untouched.

Parameters​

input​

string[] | StreamFilter

Stream names to unblock, or a StreamFilter for bulk recovery

Returns​

Promise<number>

Count of streams that were actually flipped (were blocked)

Example​

Recover from a 4xx webhook after fixing the bug

await app.unblock(["webhooks-out-customer-42"]);
// The stream resumes from the next event, not from zero.

See​

  • Store.unblock for the underlying store primitive
  • reset for the rebuild-from-zero alternative