Act
@rotorsoft/act-root / act/src / Act
Class: Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>
Defined in: libs/act/src/act.ts:168
Type Parametersβ
TSchemaRegβ
TSchemaReg extends SchemaRegister<TActions>
TEventsβ
TEvents extends Schemas
TActionsβ
TActions extends Schemas
TStateMapβ
TStateMap extends Record<string, Schema> = Record<string, never>
TActorβ
Implementsβ
IAct<TEvents,TActions,TActor>
Constructorsβ
Constructorβ
new Act<
TSchemaReg,TEvents,TActions,TStateMap,TActor>(registry,_states?,batchHandlers?,options?,lanes?):Act<TSchemaReg,TEvents,TActions,TStateMap,TActor>
Defined in: libs/act/src/act.ts:318
Create a new Act orchestrator. Prefer the act builder over
direct construction β act()...build() wires the registry, merges
partial states, and collects batch handlers from registered slices
and projections in one pass.
Parametersβ
registryβ
Registry<TSchemaReg, TEvents, TActions>
Schemas for every event and action across registered states
_states?β
Map<string, State<any, any, any>> = ...
Merged map of state name β state definition
batchHandlers?β
Map<string, BatchHandler<any>> = ...
Static-target projection batch handlers (target β handler)
options?β
ActOptions = {}
Tuning knobs β see ActOptions
lanes?β
readonly LaneConfig[] = []
Declared drain lanes (ACT-1103). The builder collects
these from .withLane(...) calls. Slice 1 records them on the
instance; later slices fan out one DrainController per lane.
Returnsβ
Act<TSchemaReg, TEvents, TActions, TStateMap, TActor>
Propertiesβ
registryβ
readonlyregistry:Registry<TSchemaReg,TEvents,TActions>
Defined in: libs/act/src/act.ts:319
Schemas for every event and action across registered states
Accessorsβ
lanesβ
Get Signatureβ
get lanes(): readonly
LaneConfig[]
Defined in: libs/act/src/act.ts:300
Drain lanes declared via .withLane(...). Implicit default not included.
Returnsβ
readonly LaneConfig[]
Methodsβ
audit()β
audit(
categories?,options?):AsyncIterable<AuditFinding>
Defined in: libs/act/src/act.ts:1278
Operator-driven store audit (#723).
Walks the connected store and yields per-category findings β
each tagged with the remediation it suggests. Same operator-
driven category as app.close() / app.reset() /
app.unblock() / app.blocked_streams(): never auto-invoked by
the framework; the operator decides when to run it (CI gate,
scheduled job, ad-hoc forensics) and what to do with the
findings.
Categories are independent β pass a subset to scope the work, or omit to run everything:
// Targeted: schema drift + deprecated-event load only
for await (const f of app.audit(["schema", "deprecated-load"], {
query: { created_after: lastScan },
thresholds: { deprecatedLoadShareMin: 0.10 },
})) {
await escalate(f);
}
// Full audit, default thresholds
for await (const f of app.audit()) console.log(f);
Returns an AsyncIterable so callers can break early β the
underlying store paginations respect the iterator protocol and
stop cleanly. Each finding is emitted independently, so
pipelining into Slack / persistence / further analysis works
without buffering the full report in memory.
Findings shape β see AuditFinding. The discriminated union carries enough context for the operator to act on each finding directly: stream id, event id, recommendation hints.
Parametersβ
categories?β
Subset of categories to run (default: all).
options?β
Query window + per-category thresholds.
Returnsβ
AsyncIterable<AuditFinding>
Async iterable of AuditFinding.
blocked_streams()β
blocked_streams(
options?):Promise<StreamPosition[]>
Defined in: libs/act/src/act.ts:1221
Return every currently-blocked stream position. Convenience wrapper
around store().query_streams(cb, { blocked: true }) for the common
"show me what's broken" operational query.
Results are ordered by stream name, paginated by limit (default
100). Pass after to fetch the next page (keyset cursor on the
stream name). For richer queries β including blocked + source
filters, or full unblocked introspection β drop to
store().query_streams(...) directly.
Parametersβ
options?β
after?β
string
limit?β
number
Returnsβ
Promise<StreamPosition[]>
Array of StreamPosition for currently-blocked streams.
Exampleβ
const blocked = await app.blocked_streams();
console.table(blocked.map(({ stream, retry, error }) => ({ stream, retry, error })));
// Operator investigates, then bulk-unblocks the family:
await app.unblock({ stream: "^webhooks-out-" });
close()β
close(
targets):Promise<CloseResult>
Defined in: libs/act/src/act.ts:1360
Close the books β guard, archive, truncate, and optionally restart streams.
Safely removes historical events from the operational store:
- Correlate β discover pending reaction targets
- Safety check β skip streams with pending reactions (skipped when no reactive events)
- Guard β commit
__tombstone__withexpectedVersionto block concurrent writes - Load state β for streams in
snapshots, load final state while guarded (no races) - Archive β user callback per stream (abort-all on failure, streams are guarded)
- Truncate + seed β atomic: delete all events, insert
__snapshot__or__tombstone__ - Cache β invalidate (tombstoned) or warm (restarted)
- Emit "closed" β lifecycle event with results
Parametersβ
targetsβ
Per-stream close options (stream, restart?, archive?)
Returnsβ
Promise<CloseResult>
{ truncated: TruncateResult, skipped: string[] }
Examplesβ
await app.close([
{ stream: "order-123", archive: async () => { await archiveToS3("order-123"); } },
{ stream: "order-456" },
]);
await app.close([
{ stream: "counter-1", restart: true },
{ stream: "counter-2" }, // tombstoned
]);
correlate()β
correlate(
query?):Promise<{last_id:number;subscribed:number; }>
Defined in: libs/act/src/act.ts:957
Discovers and registers new streams dynamically based on reaction resolvers.
Correlation enables "dynamic reactions" where target streams are determined at runtime based on event content. For example, you might create a stats stream for each user when they perform certain actions.
This method scans events matching the query and identifies new target streams based on reaction resolvers. It then registers these streams so they'll be picked up by the next drain cycle.
Parametersβ
query?β
Query = ...
Query filter to scan for new correlations
Returnsβ
Promise<{ last_id: number; subscribed: number; }>
Object with newly leased streams and last scanned event ID
Examplesβ
// Scan for new streams
const { leased, last_id } = await app.correlate({ after: 0, limit: 100 });
console.log(`Found ${leased.length} new streams`);
// Save last_id for next scan
await saveCheckpoint(last_id);
const app = act()
.withState(User)
.withState(UserStats)
.on("UserLoggedIn")
.do(async (event) => ["incrementLoginCount", {}])
.to((event) => ({
target: `stats-${event.stream}` // Dynamic target per user
}))
.build();
// Discover stats streams as users log in
await app.correlate();
Seeβ
- start_correlations for automatic periodic correlation
- stop_correlations to stop automatic correlation
do()β
do<
TKey>(action,target,payload,reactingTo?,skipValidation?):Promise<Snapshot<TSchemaReg[TKey],TEvents>[]>
Defined in: libs/act/src/act.ts:605
Executes an action on a state instance, committing resulting events.
This is the primary method for modifying state. It:
- Validates the action payload against the schema
- Loads the current state snapshot
- Checks invariants (business rules)
- Executes the action handler to generate events
- Applies events to create new state
- Commits events to the store with optimistic concurrency control
Type Parametersβ
TKeyβ
TKey extends string | number | symbol
Action name from registered actions
Parametersβ
actionβ
TKey
The name of the action to execute
targetβ
Target<TActor>
Target specification with stream ID and actor context
payloadβ
Readonly<TActions[TKey]>
Action payload matching the action's schema
reactingTo?β
Committed<TEvents, string & keyof TEvents>
Optional event that triggered this action (for correlation)
skipValidation?β
boolean = false
Skip schema validation (use carefully, for performance)
Returnsβ
Promise<Snapshot<TSchemaReg[TKey], TEvents>[]>
Array of snapshots for all affected states (usually one)
Throwsβ
If payload doesn't match action schema
Throwsβ
If business rules are violated
Throwsβ
If another process modified the stream
Examplesβ
const snapshots = await app.do(
"increment",
{
stream: "counter-1",
actor: { id: "user1", name: "Alice" }
},
{ by: 5 }
);
console.log(snapshots[0].state.count); // Current count after increment
try {
await app.do(
"withdraw",
{ stream: "account-123", actor: { id: "user1", name: "Alice" } },
{ amount: 1000 }
);
} catch (error) {
if (error instanceof InvariantError) {
console.error("Business rule violated:", error.description);
} else if (error instanceof ConcurrencyError) {
console.error("Concurrent modification detected, retry...");
} else if (error instanceof ValidationError) {
console.error("Invalid payload:", error.details);
}
}
const app = act()
.withState(Order)
.withState(Inventory)
.on("OrderPlaced")
.do(async function reduceInventory(event, _stream, app) {
// Inside reaction handlers, reactingTo is auto-injected when omitted.
// The triggering event is used by default, maintaining the correlation chain.
await app.do(
"reduceStock",
{ stream: "inventory-1", actor: { id: "sys", name: "system" } },
{ amount: event.data.items.length }
);
// To use a different correlation, pass reactingTo explicitly:
// await app.do("reduceStock", target, payload, customEvent);
})
.to("inventory-1")
.build();
Seeβ
- Target for target structure
- Snapshot for return value structure
- ValidationError, InvariantError, ConcurrencyError
Implementation ofβ
drain()β
drain(
options?):Promise<Drain<TEvents>>
Defined in: libs/act/src/act.ts:853
Processes pending reactions by draining uncommitted events from the event store.
Runs a single drain cycle:
- Polls the store for streams with uncommitted events
- Leases streams to prevent concurrent processing
- Fetches events for each leased stream
- Executes matching reaction handlers
- Acknowledges successful reactions or blocks failing ones
Drain uses a dual-frontier strategy to balance processing of new streams (lagging) vs active streams (leading). The ratio adapts based on event pressure.
Call correlate() before drain() to discover target streams. For a higher-level
API that handles debouncing, correlation, and signaling automatically, use settle.
Parametersβ
options?β
DrainOptions = {}
Drain configuration β see DrainOptions for fields
(streamLimit, eventLimit, leaseMillis).
Returnsβ
Promise<Drain<TEvents>>
Drain statistics with fetched, leased, acked, and blocked counts
Examplesβ
await app.do("createUser", target, payload);
await app.correlate();
await app.drain();
await app.do("CreateItem", target, input);
app.settle(); // debounced correlateβdrain, emits "settled"
Seeβ
- settle for debounced correlateβdrain with lifecycle events
- correlate for dynamic stream discovery
- start_correlations for automatic correlation
emit()β
emit<
E>(event,args):boolean
Defined in: libs/act/src/act.ts:209
Emit a lifecycle event. The payload type is inferred from the event name via ActLifecycleEvents.
Type Parametersβ
Eβ
E extends keyof ActLifecycleEvents<TSchemaReg, TEvents, TActions>
Parametersβ
eventβ
E
argsβ
ActLifecycleEvents<TSchemaReg, TEvents, TActions>[E]
Returnsβ
boolean
load()β
Call Signatureβ
load<
TNewState,TNewEvents,TNewActions>(state,stream,callback?,asOf?):Promise<Snapshot<TNewState,TNewEvents>>
Defined in: libs/act/src/act.ts:679
Loads the current state snapshot for a specific stream.
Reconstructs the current state by replaying events from the event store. Uses snapshots when available to optimize loading performance.
Accepts either a State definition object or a state name string. When
using a string, the merged state (from partial states registered via
.withState()) is resolved by name.
Type Parametersβ
TNewStateβ
TNewState extends Schema
State schema type
TNewEventsβ
TNewEvents extends Schemas
Event schemas type
TNewActionsβ
TNewActions extends Schemas
Action schemas type
Parametersβ
stateβ
State<TNewState, TNewEvents, TNewActions>
The state definition or state name to load
streamβ
string
The stream ID (state instance identifier)
callback?β
(snapshot) => void
Optional callback invoked with the loaded snapshot
asOf?β
Returnsβ
Promise<Snapshot<TNewState, TNewEvents>>
The current state snapshot for the stream
Examplesβ
const snapshot = await app.load(Counter, "counter-1");
console.log(snapshot.state.count); // Current count
console.log(snapshot.patches); // Events since last snapshot
const snapshot = await app.load("Ticket", "ticket-123");
console.log(snapshot.state.title); // Merged state from all partials
const [user, account] = await Promise.all([
app.load(User, "user-123"),
app.load(BankAccount, "account-456")
]);
Seeβ
Snapshot for snapshot structure
Implementation ofβ
Call Signatureβ
load<
TKey>(name,stream,callback?,asOf?):Promise<Snapshot<TStateMap[TKey],TEvents>>
Defined in: libs/act/src/act.ts:689
Loads the current state snapshot for a specific stream.
Reconstructs the current state by replaying events from the event store. Uses snapshots when available to optimize loading performance.
Accepts either a State definition object or a state name string. When
using a string, the merged state (from partial states registered via
.withState()) is resolved by name.
Type Parametersβ
TKeyβ
TKey extends string
Parametersβ
nameβ
TKey
streamβ
string
The stream ID (state instance identifier)
callback?β
(snapshot) => void
Optional callback invoked with the loaded snapshot
asOf?β
Returnsβ
Promise<Snapshot<TStateMap[TKey], TEvents>>
The current state snapshot for the stream
Examplesβ
const snapshot = await app.load(Counter, "counter-1");
console.log(snapshot.state.count); // Current count
console.log(snapshot.patches); // Events since last snapshot
const snapshot = await app.load("Ticket", "ticket-123");
console.log(snapshot.state.title); // Merged state from all partials
const [user, account] = await Promise.all([
app.load(User, "user-123"),
app.load(BankAccount, "account-456")
]);
Seeβ
Snapshot for snapshot structure
Implementation ofβ
IAct.load
off()β
off<
E>(event,listener):this
Defined in: libs/act/src/act.ts:233
Remove a previously registered lifecycle listener.
Type Parametersβ
Eβ
E extends keyof ActLifecycleEvents<TSchemaReg, TEvents, TActions>
Parametersβ
eventβ
E
listenerβ
(args) => void
Returnsβ
this
on()β
on<
E>(event,listener):this
Defined in: libs/act/src/act.ts:220
Register a listener for a lifecycle event. The listener receives the event-specific payload.
Type Parametersβ
Eβ
E extends keyof ActLifecycleEvents<TSchemaReg, TEvents, TActions>
Parametersβ
eventβ
E
listenerβ
(args) => void
Returnsβ
this
prioritize()β
prioritize(
filter,priority):Promise<number>
Defined in: libs/act/src/act.ts:1323
Bulk-update scheduling priority for streams matching filter.
Operator-grade override of the claim() lagging-frontier
ordering (ACT-102). Useful when a long-running replay needs to
jump ahead of other lagging streams, or when a no-longer-urgent
job should yield slots back to the rest. Build-time priorities
(set via the resolver's priority field) are subject to a
max() invariant across reactions; this API ignores that and
sets the priority outright on every matching row.
Filter shape mirrors query / Store.query_streams:
stream / source are regex by default, exact with the
*_exact flags; blocked restricts to blocked or unblocked
rows. An empty filter ({}) updates every registered stream.
Parametersβ
filterβ
Selection criteria (regex by default).
priorityβ
number
New priority value. Set as-is β no clamp.
Returnsβ
Promise<number>
Count of streams whose priority changed.
Examplesβ
await app.prioritize({ stream: "^proj-orders$", stream_exact: false }, 10);
await app.prioritize({ source: "^audit-" }, -5);
await app.prioritize({}, 0);
Seeβ
- Store.prioritize for the underlying primitive
- claim for how priority biases scheduling
query()β
query(
query,callback?):Promise<{count:number;first?:Committed<TEvents, keyofTEvents>;last?:Committed<TEvents, keyofTEvents>; }>
Defined in: libs/act/src/act.ts:760
Queries the event store for events matching a filter.
Use this for analyzing event streams, generating reports, or debugging. The callback is invoked for each matching event, and the method returns summary information (first event, last event, total count).
For small result sets, consider using query_array instead.
Parametersβ
queryβ
Filter criteria β see Query for available fields
(stream, name, after, before, created_after, created_before,
limit, with_snaps, stream_exact)
callback?β
(event) => void
Optional callback invoked for each matching event
Returnsβ
Promise<{ count: number; first?: Committed<TEvents, keyof TEvents>; last?: Committed<TEvents, keyof TEvents>; }>
Object with first event, last event, and total count
Examplesβ
const { first, last, count } = await app.query(
{ stream: "counter-1" },
(event) => console.log(event.name, event.data)
);
console.log(`Found ${count} events from ${first?.id} to ${last?.id}`);
const { count } = await app.query(
{ name: "UserCreated", limit: 100 },
(event) => {
console.log("User created:", event.data.email);
}
);
const yesterday = new Date(Date.now() - 24 * 60 * 60 * 1000);
const { count } = await app.query({
created_after: yesterday,
stream: "user-123"
});
console.log(`User had ${count} events in last 24 hours`);
Seeβ
query_array for loading events into memory
Implementation ofβ
query_array()β
query_array(
query):Promise<Committed<TEvents, keyofTEvents>[]>
Defined in: libs/act/src/act.ts:806
Queries the event store and returns all matching events in memory.
Use with caution - this loads all results into memory. For large result sets, use query with a callback instead to process events incrementally.
Parametersβ
queryβ
The query filter (same as query)
Returnsβ
Promise<Committed<TEvents, keyof TEvents>[]>
Array of all matching events
Examplesβ
const events = await app.query_array({ stream: "counter-1" });
console.log(`Loaded ${events.length} events`);
events.forEach(event => console.log(event.name, event.data));
const recent = await app.query_array({
stream: "user-123",
limit: 10
});
Seeβ
query for large result sets
Implementation ofβ
reset()β
reset(
input):Promise<number>
Defined in: libs/act/src/act.ts:1088
Reset reaction stream watermarks and request a drain on the next
drain() / settle() cycle.
Use this to replay events through projections (or other reaction targets)
after changing handler logic. Equivalent to calling store().reset(streams)
directly, but also raises the orchestrator's internal "needs drain" flag β
store().reset(...) alone leaves the flag untouched, so a settled app
would short-circuit and skip the replay.
Pair with app.settle() (or a single app.drain() for small streams).
settle() loops correlateβdrain until no progress is made, so one call
fully catches up paginated streams without forcing callers to roll
their own loop.
Parametersβ
inputβ
string[] | StreamFilter
Returnsβ
Promise<number>
Count of streams that were actually reset
Examplesβ
await app.reset(["my-projection"]);
app.settle({ eventLimit: 1000 }); // emits "settled" when fully replayed
await app.reset(["my-projection"]);
await app.drain({ eventLimit: 1000 }); // small streams: one pass is enough
Seeβ
- Store.reset for the underlying store primitive
- settle for the debounced full-catch-up loop
restore()β
restore(
source,opts?):Promise<ScanResult>
Defined in: libs/act/src/act.ts:1173
Atomically wipe the store and rebuild it from an async stream of
committed events. The framework owns iteration, validation,
drop_snapshots filtering, on_progress, and the per-call
old β new causation remap; the adapter's Store.restore
driver supplies the transaction lifecycle and per-event insert.
Throws if the adapter has no restore capability. Throws on the
first invalid event (negative version, malformed created) with
the running index in the message; atomic transaction rollback in
the adapter means a failing restore leaves the store byte-for-byte
unchanged.
Parametersβ
sourceβ
AsyncIterable<Committed<Schemas, string>>
Async stream of events in target order. Streamed
rather than buffered so multi-million-event backups don't OOM.
Each event's original id is used as a causation lookup key but
never written through β adapters renumber densely.
opts?β
ScanOptions = {}
ScanOptions. drop_snapshots skips
__snapshot__ events (counted in the result); on_progress
fires once per event.
Returnsβ
Promise<ScanResult>
ScanResult with kept, duration_ms, and
dropped per-category counters.
Exampleβ
async function* parseCsv(blob: string) {
for (const line of blob.split("\n").slice(1)) {
const [id, name, data, stream, version, created, meta] = parse(line);
yield {
id: +id, name, data: JSON.parse(data), stream,
version: +version, created: new Date(created),
meta: JSON.parse(meta),
};
}
}
const result = await app.restore(parseCsv(csvBlob), {});
console.log(`Restored ${result.kept} events in ${result.duration_ms}ms`);
await cache().clear(); // operator's responsibility
Seeβ
Store.restore for the underlying driver-pattern primitive.
settle()β
settle(
options?):void
Defined in: libs/act/src/act.ts:1415
Debounced, non-blocking correlateβdrain cycle.
Call this after app.do() (or app.reset()) to schedule a background
drain. Multiple rapid calls within the debounce window are coalesced
into a single cycle. Runs correlateβdrain in a loop until a pass makes
no progress β no new subscriptions, no acks, no blocks β then emits
the "settled" lifecycle event. This means a single settle() call
fully catches up paginated streams (e.g. after reset() on a long
projection) without forcing callers to loop.
Parametersβ
options?β
SettleOptions = {}
Settle configuration β see SettleOptions for fields:
debounceMs (default 10), correlate (default { after: -1, limit: 100 }),
maxPasses (default Infinity β kill-switch for runaway loops),
streamLimit (default 10), eventLimit (default 10),
leaseMillis (default 10000).
Returnsβ
void
Exampleβ
await app.do("CreateItem", target, input);
app.settle(); // non-blocking, returns immediately
app.on("settled", (drain) => {
// notify SSE clients, invalidate caches, etc.
});
Seeβ
shutdown()β
shutdown():
Promise<void>
Defined in: libs/act/src/act.ts:469
Per-instance teardown: remove lifecycle listeners, stop the correlation worker, cancel any pending settle cycle, and tear down the cross-process notify subscription.
Idempotent β repeated calls return the same promise. Registered
automatically with the global dispose() registry at construction,
so process-wide dispose()() covers it; test helpers (or operators
that mint short-lived Acts) call it explicitly for prompt cleanup.
Returnsβ
Promise<void>
start_correlations()β
start_correlations(
query?,frequency?,callback?):boolean
Defined in: libs/act/src/act.ts:1017
Starts automatic periodic correlation worker for discovering new streams.
The correlation worker runs in the background, scanning for new events and identifying new target streams based on reaction resolvers. It maintains a sliding window that advances with each scan, ensuring all events are eventually correlated.
This is useful for dynamic stream creation patterns where you don't know all streams upfront - they're discovered as events arrive.
Note: Only one correlation worker can run at a time per Act instance.
Parametersβ
query?β
Query = {}
Query filter for correlation scans β see Query
(typically { after: -1, limit: 100 })
frequency?β
number = 10_000
Correlation frequency in milliseconds (default: 10000)
callback?β
(subscribed) => void
Optional callback invoked with newly discovered streams
Returnsβ
boolean
true if worker started, false if already running
Examplesβ
// Start correlation worker scanning every 5 seconds
app.start_correlations(
{ after: 0, limit: 100 },
5000,
(leased) => {
console.log(`Discovered ${leased.length} new streams`);
}
);
// Later, stop it
app.stop_correlations();
// Load last checkpoint
const lastId = await loadCheckpoint();
app.start_correlations(
{ after: lastId, limit: 100 },
10000,
async (leased) => {
// Save checkpoint for next restart
if (leased.length) {
const maxId = Math.max(...leased.map(l => l.at));
await saveCheckpoint(maxId);
}
}
);
Seeβ
- correlate for manual one-time correlation
- stop_correlations to stop the worker
stop_correlations()β
stop_correlations():
void
Defined in: libs/act/src/act.ts:1042
Stops the automatic correlation worker.
Call this to stop the background correlation worker started by start_correlations. This is automatically called when the Act instance is disposed.
Returnsβ
void
Exampleβ
// Start correlation
app.start_correlations();
// Later, stop it
app.stop_correlations();
Seeβ
stop_settling()β
stop_settling():
void
Defined in: libs/act/src/act.ts:1051
Cancels any pending or active settle cycle.
Returnsβ
void
Seeβ
unblock()β
unblock(
input):Promise<number>
Defined in: libs/act/src/act.ts:1123
Clear the blocked flag on streams without replaying their history.
Use this to recover from a poison message after fixing the underlying issue β the stream resumes from the next event after the last successful ack, not from the beginning. Compare with reset, which rebuilds from event 0 (suitable for projection rebuilds, wrong for "I fixed the bug, please retry").
Wraps store().unblock(streams) and raises the orchestrator's
internal "needs drain" flag so a settled app picks up the now-free
streams on the next cycle. Equivalent to calling store().unblock(...)
directly, but store().unblock(...) alone leaves the flag
untouched.
Parametersβ
inputβ
string[] | StreamFilter
Returnsβ
Promise<number>
Count of streams that were actually flipped (were blocked)
Exampleβ
await app.unblock(["webhooks-out-customer-42"]);
// The stream resumes from the next event, not from zero.
Seeβ
- Store.unblock for the underlying store primitive
- reset for the rebuild-from-zero alternative