Skip to main content

Act

@rotorsoft/act-root


@rotorsoft/act-root / act/src / Act

Class: Act<S, E, A>

Defined in: libs/act/src/act.ts:48

See

Store

Main orchestrator for event-sourced state machines and workflows.

It manages the lifecycle of actions, reactions, and event streams, providing APIs for loading state, executing actions, querying events, and draining reactions.

Usage

const app = new Act(registry, 100);
await app.do("increment", { stream: "counter1", actor }, { by: 1 });
const snapshot = await app.load(Counter, "counter1");
await app.drain();
  • Register event listeners with .on("committed", ...) and .on("acked", ...) to react to lifecycle events.
  • Use .query() to analyze event streams for analytics or debugging.

Type Parameters

S

S extends SchemaRegister<A>

SchemaRegister for state

E

E extends Schemas

Schemas for events

A

A extends Schemas

Schemas for actions

Constructors

Constructor

new Act<S, E, A>(registry): Act<S, E, A>

Defined in: libs/act/src/act.ts:113

Create a new Act orchestrator.

Parameters

registry

Registry<S, E, A>

The registry of state, event, and action schemas

Returns

Act<S, E, A>

Properties

registry

readonly registry: Registry<S, E, A>

Defined in: libs/act/src/act.ts:113

The registry of state, event, and action schemas

Methods

correlate()

correlate(query): Promise<{ last_id: number; leased: Lease[]; }>

Defined in: libs/act/src/act.ts:411

Correlates streams using reaction resolvers.

Parameters

query

Query = ...

The query filter (e.g., by stream, event name, or starting point).

Returns

Promise<{ last_id: number; leased: Lease[]; }>

The leases of newly correlated streams, and the last seen event ID.


do()

do<K>(action, target, payload, reactingTo?, skipValidation?): Promise<Snapshot<S[K], E>[]>

Defined in: libs/act/src/act.ts:135

Executes an action (command) against a state machine, emitting and committing the resulting event(s).

Type Parameters

K

K extends string | number | symbol

The type of action to execute

Parameters

action

K

The action name (key of the action schema)

target

Target

The target (stream and actor) for the action

payload

Readonly<A[K]>

The action payload (validated against the schema)

reactingTo?

Committed<E, keyof E>

(Optional) The event this action is reacting to

skipValidation?

boolean = false

(Optional) If true, skips schema validation (not recommended)

Returns

Promise<Snapshot<S[K], E>[]>

The snapshot of the committed event

Example

await app.do("increment", { stream: "counter1", actor }, { by: 1 });

drain()

drain<E>(__namedParameters): Promise<Drain<E>>

Defined in: libs/act/src/act.ts:286

Drains and processes events from the store, triggering reactions and updating state.

This is typically called in a background loop or after committing new events.

Type Parameters

E

E extends Schemas

Parameters

__namedParameters

DrainOptions = {}

Returns

Promise<Drain<E>>

The number of events drained and processed

Example

await app.drain();

emit()

Call Signature

emit(event, args): boolean

Defined in: libs/act/src/act.ts:65

Emit a lifecycle event (internal use, but can be used for custom listeners).

Parameters
event

"committed"

The event name ("committed", "acked", or "blocked")

args

Snapshot<S, E>[]

The event payload

Returns

boolean

true if the event had listeners, false otherwise

Call Signature

emit(event, args): boolean

Defined in: libs/act/src/act.ts:66

Emit a lifecycle event (internal use, but can be used for custom listeners).

Parameters
event

"acked"

The event name ("committed", "acked", or "blocked")

args

Lease[]

The event payload

Returns

boolean

true if the event had listeners, false otherwise

Call Signature

emit(event, args): boolean

Defined in: libs/act/src/act.ts:67

Emit a lifecycle event (internal use, but can be used for custom listeners).

Parameters
event

"blocked"

The event name ("committed", "acked", or "blocked")

args

Lease & object[]

The event payload

Returns

boolean

true if the event had listeners, false otherwise


load()

load<SX, EX, AX>(state, stream, callback?): Promise<Snapshot<SX, EX>>

Defined in: libs/act/src/act.ts:169

Loads the current state snapshot for a given state machine and stream.

Type Parameters

SX

SX extends Schema

The type of state

EX

EX extends Schemas

The type of events

AX

AX extends Schemas

The type of actions

Parameters

state

State<SX, EX, AX>

The state machine definition

stream

string

The stream (instance) to load

callback?

(snapshot) => void

(Optional) Callback to receive the loaded snapshot

Returns

Promise<Snapshot<SX, EX>>

The snapshot of the loaded state

Example

const snapshot = await app.load(Counter, "counter1");

off()

Call Signature

off(event, listener): this

Defined in: libs/act/src/act.ts:97

Remove a listener for a lifecycle event.

Parameters
event

"committed"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)

Call Signature

off(event, listener): this

Defined in: libs/act/src/act.ts:98

Remove a listener for a lifecycle event.

Parameters
event

"acked"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)

Call Signature

off(event, listener): this

Defined in: libs/act/src/act.ts:99

Remove a listener for a lifecycle event.

Parameters
event

"blocked"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)


on()

Call Signature

on(event, listener): this

Defined in: libs/act/src/act.ts:79

Register a listener for a lifecycle event ("committed", "acked", or "blocked").

Parameters
event

"committed"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)

Call Signature

on(event, listener): this

Defined in: libs/act/src/act.ts:80

Register a listener for a lifecycle event ("committed", "acked", or "blocked").

Parameters
event

"acked"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)

Call Signature

on(event, listener): this

Defined in: libs/act/src/act.ts:81

Register a listener for a lifecycle event ("committed", "acked", or "blocked").

Parameters
event

"blocked"

The event name

listener

(args) => void

The callback function

Returns

this

this (for chaining)


query()

query(query, callback?): Promise<{ count: number; first?: Committed<E, keyof E>; last?: Committed<E, keyof E>; }>

Defined in: libs/act/src/act.ts:187

Query the event store for events matching a filter.

Parameters

query

Query

The query filter (e.g., by stream, event name, or time range)

callback?

(event) => void

(Optional) Callback for each event found

Returns

Promise<{ count: number; first?: Committed<E, keyof E>; last?: Committed<E, keyof E>; }>

An object with the first and last event found, and the total count

Example

const { count } = await app.query({ stream: "counter1" }, (event) => console.log(event));

query_array()

query_array(query): Promise<Committed<E, keyof E>[]>

Defined in: libs/act/src/act.ts:215

Query the event store for events matching a filter. Use this version with caution, as it return events in memory.

Parameters

query

Query

The query filter (e.g., by stream, event name, or time range)

Returns

Promise<Committed<E, keyof E>[]>

The matching events

Example

const { count } = await app.query({ stream: "counter1" }, (event) => console.log(event));

start_correlations()

start_correlations(query, frequency, callback?): boolean

Defined in: libs/act/src/act.ts:468

Starts correlation worker that identifies and registers new streams using reaction resolvers.

Enables "dynamic reactions", allowing streams to be auto-discovered based on event content.

  • Uses a correlation sliding window over the event stream to identify new streams.
  • Once registered, these streams are picked up by the main drain loop.
  • Users should have full control over their correlation strategy.
  • The starting point keeps increasing with each new batch of events.
  • Users are responsible for storing the last seen event ID.

Parameters

query

Query = {}

The query filter (e.g., by stream, event name, or starting point).

frequency

number = 10_000

The frequency of correlation checks (in milliseconds).

callback?

(leased) => void

Callback to report stats (new strems, last seen event ID, etc.).

Returns

boolean

true if the correlation worker started, false otherwise (already started).


stop_correlations()

stop_correlations(): void

Defined in: libs/act/src/act.ts:490

Returns

void