Act
@rotorsoft/act-root / act/src / Act
Class: Act<S, E, A>
Defined in: libs/act/src/act.ts:48
See
Store
Main orchestrator for event-sourced state machines and workflows.
It manages the lifecycle of actions, reactions, and event streams, providing APIs for loading state, executing actions, querying events, and draining reactions.
Usage
const app = new Act(registry, 100);
await app.do("increment", { stream: "counter1", actor }, { by: 1 });
const snapshot = await app.load(Counter, "counter1");
await app.drain();
- Register event listeners with
.on("committed", ...)
and.on("acked", ...)
to react to lifecycle events. - Use
.query()
to analyze event streams for analytics or debugging.
Type Parameters
S
S
extends SchemaRegister
<A
>
SchemaRegister for state
E
E
extends Schemas
Schemas for events
A
A
extends Schemas
Schemas for actions
Constructors
Constructor
new Act<
S
,E
,A
>(registry
):Act
<S
,E
,A
>
Defined in: libs/act/src/act.ts:113
Create a new Act orchestrator.
Parameters
registry
Registry
<S
, E
, A
>
The registry of state, event, and action schemas
Returns
Act
<S
, E
, A
>
Properties
registry
readonly
registry:Registry
<S
,E
,A
>
Defined in: libs/act/src/act.ts:113
The registry of state, event, and action schemas
Methods
correlate()
correlate(
query
):Promise
<{last_id
:number
;leased
:Lease
[]; }>
Defined in: libs/act/src/act.ts:411
Correlates streams using reaction resolvers.
Parameters
query
Query
= ...
The query filter (e.g., by stream, event name, or starting point).
Returns
Promise
<{ last_id
: number
; leased
: Lease
[]; }>
The leases of newly correlated streams, and the last seen event ID.
do()
do<
K
>(action
,target
,payload
,reactingTo?
,skipValidation?
):Promise
<Snapshot
<S
[K
],E
>[]>
Defined in: libs/act/src/act.ts:135
Executes an action (command) against a state machine, emitting and committing the resulting event(s).
Type Parameters
K
K
extends string
| number
| symbol
The type of action to execute
Parameters
action
K
The action name (key of the action schema)
target
The target (stream and actor) for the action
payload
Readonly
<A
[K
]>
The action payload (validated against the schema)
reactingTo?
Committed
<E
, keyof E
>
(Optional) The event this action is reacting to
skipValidation?
boolean
= false
(Optional) If true, skips schema validation (not recommended)
Returns
Promise
<Snapshot
<S
[K
], E
>[]>
The snapshot of the committed event
Example
await app.do("increment", { stream: "counter1", actor }, { by: 1 });
drain()
drain<
E
>(__namedParameters
):Promise
<Drain
<E
>>
Defined in: libs/act/src/act.ts:286
Drains and processes events from the store, triggering reactions and updating state.
This is typically called in a background loop or after committing new events.
Type Parameters
E
E
extends Schemas
Parameters
__namedParameters
DrainOptions
= {}
Returns
Promise
<Drain
<E
>>
The number of events drained and processed
Example
await app.drain();
emit()
Call Signature
emit(
event
,args
):boolean
Defined in: libs/act/src/act.ts:65
Emit a lifecycle event (internal use, but can be used for custom listeners).
Parameters
event
"committed"
The event name ("committed", "acked", or "blocked")
args
Snapshot
<S
, E
>[]
The event payload
Returns
boolean
true if the event had listeners, false otherwise
Call Signature
emit(
event
,args
):boolean
Defined in: libs/act/src/act.ts:66
Emit a lifecycle event (internal use, but can be used for custom listeners).
Parameters
event
"acked"
The event name ("committed", "acked", or "blocked")
args
Lease
[]
The event payload
Returns
boolean
true if the event had listeners, false otherwise
Call Signature
emit(
event
,args
):boolean
Defined in: libs/act/src/act.ts:67
Emit a lifecycle event (internal use, but can be used for custom listeners).
Parameters
event
"blocked"
The event name ("committed", "acked", or "blocked")
args
Lease
& object
[]
The event payload
Returns
boolean
true if the event had listeners, false otherwise
load()
load<
SX
,EX
,AX
>(state
,stream
,callback?
):Promise
<Snapshot
<SX
,EX
>>
Defined in: libs/act/src/act.ts:169
Loads the current state snapshot for a given state machine and stream.
Type Parameters
SX
SX
extends Schema
The type of state
EX
EX
extends Schemas
The type of events
AX
AX
extends Schemas
The type of actions
Parameters
state
State
<SX
, EX
, AX
>
The state machine definition
stream
string
The stream (instance) to load
callback?
(snapshot
) => void
(Optional) Callback to receive the loaded snapshot
Returns
Promise
<Snapshot
<SX
, EX
>>
The snapshot of the loaded state
Example
const snapshot = await app.load(Counter, "counter1");
off()
Call Signature
off(
event
,listener
):this
Defined in: libs/act/src/act.ts:97
Remove a listener for a lifecycle event.
Parameters
event
"committed"
The event name
listener
(args
) => void
The callback function
Returns
this
this (for chaining)
Call Signature
off(
event
,listener
):this
Defined in: libs/act/src/act.ts:98
Remove a listener for a lifecycle event.
Parameters
event
"acked"
The event name
listener
(args
) => void
The callback function
Returns
this
this (for chaining)
Call Signature
off(
event
,listener
):this
Defined in: libs/act/src/act.ts:99
Remove a listener for a lifecycle event.
Parameters
event
"blocked"
The event name
listener
(args
) => void
The callback function
Returns
this
this (for chaining)
on()
Call Signature
on(
event
,listener
):this
Defined in: libs/act/src/act.ts:79
Register a listener for a lifecycle event ("committed", "acked", or "blocked").
Parameters
event
"committed"
The event name
listener
(args
) => void
The callback function
Returns
this
this (for chaining)
Call Signature
on(
event
,listener
):this
Defined in: libs/act/src/act.ts:80
Register a listener for a lifecycle event ("committed", "acked", or "blocked").
Parameters
event
"acked"
The event name
listener
(args
) => void
The callback function
Returns
this
this (for chaining)
Call Signature
on(
event
,listener
):this
Defined in: libs/act/src/act.ts:81
Register a listener for a lifecycle event ("committed", "acked", or "blocked").
Parameters
event
"blocked"
The event name
listener
(args
) => void
The callback function
Returns
this
this (for chaining)
query()
query(
query
,callback?
):Promise
<{count
:number
;first?
:Committed
<E
, keyofE
>;last?
:Committed
<E
, keyofE
>; }>
Defined in: libs/act/src/act.ts:187
Query the event store for events matching a filter.
Parameters
query
The query filter (e.g., by stream, event name, or time range)
callback?
(event
) => void
(Optional) Callback for each event found
Returns
Promise
<{ count
: number
; first?
: Committed
<E
, keyof E
>; last?
: Committed
<E
, keyof E
>; }>
An object with the first and last event found, and the total count
Example
const { count } = await app.query({ stream: "counter1" }, (event) => console.log(event));
query_array()
query_array(
query
):Promise
<Committed
<E
, keyofE
>[]>
Defined in: libs/act/src/act.ts:215
Query the event store for events matching a filter. Use this version with caution, as it return events in memory.
Parameters
query
The query filter (e.g., by stream, event name, or time range)
Returns
Promise
<Committed
<E
, keyof E
>[]>
The matching events
Example
const { count } = await app.query({ stream: "counter1" }, (event) => console.log(event));
start_correlations()
start_correlations(
query
,frequency
,callback?
):boolean
Defined in: libs/act/src/act.ts:468
Starts correlation worker that identifies and registers new streams using reaction resolvers.
Enables "dynamic reactions", allowing streams to be auto-discovered based on event content.
- Uses a correlation sliding window over the event stream to identify new streams.
- Once registered, these streams are picked up by the main
drain
loop. - Users should have full control over their correlation strategy.
- The starting point keeps increasing with each new batch of events.
- Users are responsible for storing the last seen event ID.
Parameters
query
Query
= {}
The query filter (e.g., by stream, event name, or starting point).
frequency
number
= 10_000
The frequency of correlation checks (in milliseconds).
callback?
(leased
) => void
Callback to report stats (new strems, last seen event ID, etc.).
Returns
boolean
true if the correlation worker started, false otherwise (already started).
stop_correlations()
stop_correlations():
void
Defined in: libs/act/src/act.ts:490
Returns
void