Skip to main content

Correlation and drain

How reactions actually fire. Two cooperating subsystems with a shared goal: deliver every reactive event to its handler, exactly once, eventually. Different concerns:

  • Correlation โ€” discovery. Given an event, which streams should react to it?
  • Drain โ€” delivery. Given streams that need processing, fetch and run their reactions.

Both run lazily: nothing happens until a caller invokes correlate(), drain(), settle(), or one of the polling timers. The framework never spins up background workers without being told to.

The shape of a reactionโ€‹

A reaction is registered against an event name with a resolver and a handler:

.on("OrderPlaced")
.do(async (event, stream, app) => { /* handler */ })
.to((event) => ({ target: `order-${event.orderId}` })) // dynamic resolver
// or .to("inventory") // static resolver

The resolver answers "for this event, which target stream processes the reaction?" Two kinds:

  • Static: a constant target (string). Known at build time. Subscribed once during correlate.init(). Doesn't need event-by-event scanning.
  • Dynamic: a function (event) => ({ target, source? }). Target depends on event content. Discovered lazily by correlate().

Build-time classification (internal/build-classify.ts) walks the registry, partitions resolvers by kind, and stashes:

  • staticTargets[] โ€” subscribed once at init
  • hasDynamicResolvers: boolean โ€” short-circuit flag for correlate()
  • reactiveEvents: Set<string> โ€” events with at least one reaction; drives the drain skip-flag in do() and reset()

If hasDynamicResolvers is false, correlate() becomes effectively a no-op past init โ€” no event scan needed.

Correlation โ€” discovering dynamic targetsโ€‹

correlate(query) scans events past the correlation checkpoint, evaluates each registered dynamic resolver, and registers any new (target, source) pairs as subscribed streams via store.subscribe.

correlate({ after, limit })
โ”‚
has dynamic resolvers?
no โ”€โ”€โ”ฌโ”€โ”€ yes
โ”‚ โ”‚
โ–ผ โ–ผ
return as-is query events past checkpoint
โ”‚
โ–ผ
for each event:
for each registered dynamic resolver:
resolved = resolver(event)
if resolved.target not yet subscribed:
add to "to subscribe" map
โ”‚
โ–ผ
subscribe(map.entries())
โ”‚
โ–ผ
advance checkpoint to last scanned event id
โ”‚
โ–ผ
add new targets to subscribed-streams LRU

The checkpoint advances only after subscribe succeeds. If subscribe throws, the checkpoint stays where it was and the next correlate retries from the same point.

The subscribed-streams LRUโ€‹

CorrelateCycle holds an LruSet<string> cap (default 1000, configurable via ActOptions.maxSubscribedStreams). Apps that mint millions of dynamic targets โ€” e.g., one stream per user activity โ€” would otherwise grow this set unbounded.

Eviction cost: a redundant store.subscribe call when an evicted-but-still-active stream's event is correlated again. subscribe is idempotent at the store level (INSERT โ€ฆ ON CONFLICT DO NOTHING), so this is harmless. The LRU is a memory bound, not a correctness mechanism.

Drain โ€” claim, fetch, dispatchโ€‹

drain() runs one cycle of the pipeline:

drain({ streamLimit, eventLimit, leaseMillis })
โ”‚
armed? (do() / reset() flagged work)
no โ”€โ”€โ”ฌโ”€โ”€ yes
โ”‚
return empty result
โ”‚
โ–ผ
concurrent drain in flight?
yes โ”€โ”ฌโ”€โ”€ no
โ”‚
return empty result
โ”‚
โ–ผ
compute lagging/leading split via ratio
โ”‚
โ–ผ
ops.claim(lagging, leading, by, leaseMillis)
โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ–ผ โ–ผ
empty? leases
โ”‚ โ”‚
disarm; return โ”‚
โ–ผ
ops.fetch(leases, eventLimit)
โ”‚
โ–ผ
for each leased stream:
build payloads (filter events to ones
whose registered reaction targets us)
โ”‚
โ–ผ
dispatch via handle / handleBatch
โ”‚
โ–ผ
ops.ack(successes), ops.block(retries-exhausted)
โ”‚
โ–ผ
update lag/lead ratio per pressure
โ”‚
โ–ผ
emit "acked" / "blocked" lifecycle events
โ”‚
โ–ผ
disarm if no acks / blocks / errors this cycle

The dual-frontier splitโ€‹

claim() takes two budgets: lagging (streams with low watermarks) and leading (streams with high watermarks). The split is adaptive โ€” DrainController._ratio starts at 0.5 and adjusts each cycle based on which frontier produced more events:

// internal/drain-ratio.ts (paraphrased)
ratio = (laggingHandled - leadingHandled) / total
clamped to [0.2, 0.8]

If lagging streams produced more work this cycle, the next cycle leans toward lagging (fast-forward streams that have fallen behind). If leading streams produced more, lean toward leading (keep up with active streams). The clamp prevents starvation in either direction.

The _armed skip flagโ€‹

A naive drain would query the store on every call. For apps where most actions don't have reactions, that's wasted I/O. The framework keeps an _armed boolean on DrainController:

  • do() sets _armed = true if any committed event is in reactiveEvents
  • reset() sets _armed = true if there are any reactive events
  • correlate.init() sets _armed = true on cold start (might have historical reactive events to process)
  • drain() clears _armed in two cases: claim() returned no leases (fully caught up), or the cycle finished with no acks, no blocks, no errors

When _armed is false, drain() returns immediately without issuing claim. Three round trips saved per call (claim, query, ack). Cold start: armed by correlate.init() so historical events are picked up on first drain.

One controller per laneโ€‹

ACT-1103: the orchestrator builds one DrainController per active lane (implicit default + every .withLane(...)). Act._drainAll runs every controller's drain() in parallel via Promise.all and aggregates fetched/leased/acked/blocked. Each controller filters its claim() by its lane โ€” durable adapters serve the filter from streams_lane_ix so the four parallel claims add up to the same total work the single all-lanes claim was doing.

The _armed flag is per-controller. do(), reset(), unblock(), and the cold-start path arm every controller via Act._armAll. Per-lane LaneConfig.cycleMs auto-starts a setTimeout chain on the controller that drains at the lane's cadence independent of the Act-level settle loop โ€” useful for "always-on" lanes that need low commit-to-ack latency without callers explicitly driving settle(). Apps that never call .withLane(...) see one controller with lane: undefined, and the adapter SQL collapses to the pre-1103 shape. See Concepts โ†’ Lanes.

Settle โ€” the catch-up loopโ€‹

settle() is the production-friendly entry point. It debounces multiple rapid calls into one cycle, then runs correlate โ†’ drain in a loop until a pass produces no progress:

settle(options) โ”€โ”€ debounce timer โ”€โ”€ timer fires
โ”‚
โ–ผ
reentrancy guard
โ”‚
โ–ผ
await correlate.init()
โ”‚
โ–ผ
loop until no progress:
correlate({ after: checkpoint })
drain(options)
progress = subscribed > 0 ||
acked.length > 0 ||
blocked.length > 0
โ”‚
โ–ผ
emit "settled" with last drain

"Until no progress" handles paginated catch-up. After app.reset(...), a settled stream might have thousands of events. One drain cycle's streamLimit ร— eventLimit won't catch up; subsequent cycles will. settle() doesn't return until the work is done โ€” the caller gets the "settled" event when there's nothing left.

The debounce is ActOptions.settleDebounceMs ?? 10 by default. Coalesces commits in the same tick (typical pattern: tRPC mutation chain calling app.do many times) into one settle pass.

Why drain is one-cycle, settle is the loopโ€‹

drain() is one round-trip: claim, fetch, dispatch, ack/block, return. Predictable. Useful for tests and synchronous catch-up scripts where the caller wants control over each cycle.

settle() wraps drain in a debounced async loop with progress detection. Useful for production: fire and forget; the framework figures out when "done" means done. Listeners on "settled" get notified once per coalesced burst.

Mixing them is fine โ€” settle() doesn't acquire any global lock, just a per-controller reentrancy guard. Multiple settle calls on different Act instances proceed independently.

Pointersโ€‹

  • libs/act/src/internal/correlate-cycle.ts โ€” CorrelateCycle class, init, scan, polling
  • libs/act/src/internal/drain-cycle.ts โ€” runDrainCycle (pure cycle), DrainController (stateful driver)
  • libs/act/src/internal/drain-ratio.ts โ€” adaptive lag/lead ratio
  • libs/act/src/internal/settle.ts โ€” SettleLoop debounce + progress loop
  • libs/act/src/internal/build-classify.ts โ€” registry classification at construction
  • libs/act/src/internal/reactions.ts โ€” buildHandle / buildHandleBatch โ€” what runs inside a drain cycle for each leased stream