Correlation and drain
How reactions actually fire. Two cooperating subsystems with a shared goal: deliver every reactive event to its handler, exactly once, eventually. Different concerns:
- Correlation โ discovery. Given an event, which streams should react to it?
- Drain โ delivery. Given streams that need processing, fetch and run their reactions.
Both run lazily: nothing happens until a caller invokes correlate(), drain(), settle(), or one of the polling timers. The framework never spins up background workers without being told to.
The shape of a reactionโ
A reaction is registered against an event name with a resolver and a handler:
.on("OrderPlaced")
.do(async (event, stream, app) => { /* handler */ })
.to((event) => ({ target: `order-${event.orderId}` })) // dynamic resolver
// or .to("inventory") // static resolver
The resolver answers "for this event, which target stream processes the reaction?" Two kinds:
- Static: a constant target (string). Known at build time. Subscribed once during
correlate.init(). Doesn't need event-by-event scanning. - Dynamic: a function
(event) => ({ target, source? }). Target depends on event content. Discovered lazily bycorrelate().
Build-time classification (internal/build-classify.ts) walks the registry, partitions resolvers by kind, and stashes:
staticTargets[]โ subscribed once at inithasDynamicResolvers: booleanโ short-circuit flag forcorrelate()reactiveEvents: Set<string>โ events with at least one reaction; drives the drain skip-flag indo()andreset()
If hasDynamicResolvers is false, correlate() becomes effectively a no-op past init โ no event scan needed.
Correlation โ discovering dynamic targetsโ
correlate(query) scans events past the correlation checkpoint, evaluates each registered dynamic resolver, and registers any new (target, source) pairs as subscribed streams via store.subscribe.
correlate({ after, limit })
โ
has dynamic resolvers?
no โโโฌโโ yes
โ โ
โผ โผ
return as-is query events past checkpoint
โ
โผ
for each event:
for each registered dynamic resolver:
resolved = resolver(event)
if resolved.target not yet subscribed:
add to "to subscribe" map
โ
โผ
subscribe(map.entries())
โ
โผ
advance checkpoint to last scanned event id
โ
โผ
add new targets to subscribed-streams LRU
The checkpoint advances only after subscribe succeeds. If subscribe throws, the checkpoint stays where it was and the next correlate retries from the same point.
The subscribed-streams LRUโ
CorrelateCycle holds an LruSet<string> cap (default 1000, configurable via ActOptions.maxSubscribedStreams). Apps that mint millions of dynamic targets โ e.g., one stream per user activity โ would otherwise grow this set unbounded.
Eviction cost: a redundant store.subscribe call when an evicted-but-still-active stream's event is correlated again. subscribe is idempotent at the store level (INSERT โฆ ON CONFLICT DO NOTHING), so this is harmless. The LRU is a memory bound, not a correctness mechanism.
Drain โ claim, fetch, dispatchโ
drain() runs one cycle of the pipeline:
drain({ streamLimit, eventLimit, leaseMillis })
โ
armed? (do() / reset() flagged work)
no โโโฌโโ yes
โ
return empty result
โ
โผ
concurrent drain in flight?
yes โโฌโโ no
โ
return empty result
โ
โผ
compute lagging/leading split via ratio
โ
โผ
ops.claim(lagging, leading, by, leaseMillis)
โ
โโโโโโดโโโโโโโโ
โผ โผ
empty? leases
โ โ
disarm; return โ
โผ
ops.fetch(leases, eventLimit)
โ
โผ
for each leased stream:
build payloads (filter events to ones
whose registered reaction targets us)
โ
โผ
dispatch via handle / handleBatch
โ
โผ
ops.ack(successes), ops.block(retries-exhausted)
โ
โผ
update lag/lead ratio per pressure
โ
โผ
emit "acked" / "blocked" lifecycle events
โ
โผ
disarm if no acks / blocks / errors this cycle
The dual-frontier splitโ
claim() takes two budgets: lagging (streams with low watermarks) and leading (streams with high watermarks). The split is adaptive โ DrainController._ratio starts at 0.5 and adjusts each cycle based on which frontier produced more events:
// internal/drain-ratio.ts (paraphrased)
ratio = (laggingHandled - leadingHandled) / total
clamped to [0.2, 0.8]
If lagging streams produced more work this cycle, the next cycle leans toward lagging (fast-forward streams that have fallen behind). If leading streams produced more, lean toward leading (keep up with active streams). The clamp prevents starvation in either direction.
The _armed skip flagโ
A naive drain would query the store on every call. For apps where most actions don't have reactions, that's wasted I/O. The framework keeps an _armed boolean on DrainController:
do()sets_armed = trueif any committed event is inreactiveEventsreset()sets_armed = trueif there are any reactive eventscorrelate.init()sets_armed = trueon cold start (might have historical reactive events to process)drain()clears_armedin two cases:claim()returned no leases (fully caught up), or the cycle finished with no acks, no blocks, no errors
When _armed is false, drain() returns immediately without issuing claim. Three round trips saved per call (claim, query, ack). Cold start: armed by correlate.init() so historical events are picked up on first drain.
One controller per laneโ
ACT-1103: the orchestrator builds one DrainController per active lane (implicit default + every .withLane(...)). Act._drainAll runs every controller's drain() in parallel via Promise.all and aggregates fetched/leased/acked/blocked. Each controller filters its claim() by its lane โ durable adapters serve the filter from streams_lane_ix so the four parallel claims add up to the same total work the single all-lanes claim was doing.
The _armed flag is per-controller. do(), reset(), unblock(), and the cold-start path arm every controller via Act._armAll. Per-lane LaneConfig.cycleMs auto-starts a setTimeout chain on the controller that drains at the lane's cadence independent of the Act-level settle loop โ useful for "always-on" lanes that need low commit-to-ack latency without callers explicitly driving settle(). Apps that never call .withLane(...) see one controller with lane: undefined, and the adapter SQL collapses to the pre-1103 shape. See Concepts โ Lanes.
Settle โ the catch-up loopโ
settle() is the production-friendly entry point. It debounces multiple rapid calls into one cycle, then runs correlate โ drain in a loop until a pass produces no progress:
settle(options) โโ debounce timer โโ timer fires
โ
โผ
reentrancy guard
โ
โผ
await correlate.init()
โ
โผ
loop until no progress:
correlate({ after: checkpoint })
drain(options)
progress = subscribed > 0 ||
acked.length > 0 ||
blocked.length > 0
โ
โผ
emit "settled" with last drain
"Until no progress" handles paginated catch-up. After app.reset(...), a settled stream might have thousands of events. One drain cycle's streamLimit ร eventLimit won't catch up; subsequent cycles will. settle() doesn't return until the work is done โ the caller gets the "settled" event when there's nothing left.
The debounce is ActOptions.settleDebounceMs ?? 10 by default. Coalesces commits in the same tick (typical pattern: tRPC mutation chain calling app.do many times) into one settle pass.
Why drain is one-cycle, settle is the loopโ
drain() is one round-trip: claim, fetch, dispatch, ack/block, return. Predictable. Useful for tests and synchronous catch-up scripts where the caller wants control over each cycle.
settle() wraps drain in a debounced async loop with progress detection. Useful for production: fire and forget; the framework figures out when "done" means done. Listeners on "settled" get notified once per coalesced burst.
Mixing them is fine โ settle() doesn't acquire any global lock, just a per-controller reentrancy guard. Multiple settle calls on different Act instances proceed independently.
Pointersโ
libs/act/src/internal/correlate-cycle.tsโCorrelateCycleclass, init, scan, pollinglibs/act/src/internal/drain-cycle.tsโrunDrainCycle(pure cycle),DrainController(stateful driver)libs/act/src/internal/drain-ratio.tsโ adaptive lag/lead ratiolibs/act/src/internal/settle.tsโSettleLoopdebounce + progress looplibs/act/src/internal/build-classify.tsโ registry classification at constructionlibs/act/src/internal/reactions.tsโbuildHandle/buildHandleBatchโ what runs inside a drain cycle for each leased stream