Skip to main content

Reaction priority lanes

How an operator biases the claim() lagging-frontier ordering when the worker is saturated. The short version: .priority(n) on the resolver target adds an ORDER BY priority DESC, at ASC clause to the lagging CTE so a high-priority replay wins lease slots before equal-watermark peers. Default 0. Only meaningful under contention.

:::note Priority is intra-lane

ACT-1103 introduced drain lanes โ€” separate DrainController instances with their own leaseMillis/streamLimit/cycleMs budgets. Priority (this page) and lane (ACT-1103) operate on different axes: a lane carves the drain pipeline along latency classes ("webhooks need 30 s leases, metrics need 1 s leases"), and priority biases which streams within a single lane win lease slots under saturation. A reaction sets both independently via .to({ target, lane, priority }). See Concepts โ†’ Lanes.

:::

The problemโ€‹

drain() uses a dual-frontier claim() strategy: a lagging budget (most-behind streams catch up) plus a leading budget (active streams stay current). Each cycle picks at most streamLimit total. Within the lagging budget, the SQL is ORDER BY at ASC โ€” most-behind first. Tie-breaking when many streams share a watermark โ€” the typical replay-after-reset shape โ€” falls to PostgreSQL's physical row order, which is undefined from the framework's perspective.

When streamLimit is binding (more candidate streams than the worker can claim per cycle), low-importance replays can claim leases ahead of customer-facing ones because the tie-break is essentially random. Until ACT-102 there was no way to express "this replay matters more."

The shapeโ€‹

A reaction's resolver gets an optional priority field:

.on("OrderConfirmed")
.do(sendCriticalNotification)
.to({ target: "notifications-out", priority: 10 })

Or for dynamic resolvers:

.on("UserActivity")
.do(updateLeaderboard)
.to((e) => ({
target: `leaderboard-${e.data.region}`,
source: e.stream,
priority: e.data.tier === "premium" ? 5 : 0,
}))

claim()'s lagging CTE becomes ORDER BY priority DESC, at ASC. With everyone at priority = 0 the ordering collapses to plain at ASC so existing workloads see no behavior change.

What stays inviolateโ€‹

Per-stream event ordering. Priority only biases which streams claim() picks first, never the order events within a stream are processed. Within a stream, events still drain by id ASC. That's a foundational ES guarantee โ€” ACT-102 explicitly does not break it.

If you need ordering changes inside a stream, the right tool is target filters at subscription time (different reactions on different target streams), not priority.

Build-time semanticโ€‹

When multiple reactions target the same stream with different priorities โ€” e.g., one slice registers target: "shared", priority: 3 and another registers target: "shared", priority: 7 โ€” the maximum wins:

.on("Inc").do(r1).to({ target: "shared", priority: 3 }) // ignored
.on("Inc").do(r2).to({ target: "shared", priority: 7 }) // sets the lane

The same max() invariant holds at runtime: subscribe() upserts priority via GREATEST(stored, new), so the highest-priority registered reaction sets the scheduling lane.

Runtime operator override โ€” app.prioritizeโ€‹

subscribe() can only raise priority (via the max invariant). For runtime adjustments โ€” including decreases โ€” use app.prioritize(filter, n):

// Boost a specific replay
await app.prioritize({ stream: "^proj-orders$", stream_exact: false }, 10);

// Drop background audit jobs to the back
await app.prioritize({ source: "^audit-" }, -5);

// Reset all to default
await app.prioritize({}, 0);

Filter shape mirrors query_streams: regex on stream/source by default, exact match with the _exact flags, blocked filter, empty {} matches everything. Returns the count of streams whose priority changed.

When it doesn't matterโ€‹

Priority only binds under saturation โ€” when streamLimit < number of candidate lagging streams. If the worker can claim every candidate every cycle, priority is irrelevant. Healthy systems with no backlog see no effect.

Concretely: with the default streamLimit = 10, priority starts mattering once you have ~15+ behind streams competing for the lagging slots simultaneously. Cold starts, projection rebuilds, and post-incident catch-up are the typical scenarios.

Performanceโ€‹

Benchmark in @rotorsoft/act-pg's PERFORMANCE.md. 50 cold-replay targets, 500 events each, streamLimit = 5 โ€” three back-to-back runs:

  • Priority target time-to-finish: ~11ร— faster (80 ms vs. 860 ms).
  • Total drain time (all 50 targets): ~6 % faster (priority arm reduces row-level contention on the streams table).
  • Final state: identical between arms โ€” priority reorders, doesn't reduce throughput.

Adapter supportโ€‹

Adapterclaim orderingprioritizeschema migration
PostgresStoreORDER BY priority DESC, at ASC in lag CTEUPDATE ... WHERE priority <> $1 AND ...ALTER TABLE ADD COLUMN IF NOT EXISTS priority
SqliteStoreserver-side SELECT ... ORDER BY priority DESC, at ASCparameterized UPDATE with LIKE-translated patternsALTER TABLE ADD COLUMN priority (try/swallow on duplicate)
InMemoryStoresort by priority DESC, at ASC in claim()iterate matching streams, set priority directlyn/a

All three keep the max invariant on subscribe() and treat prioritize() as an outright set.

See alsoโ€‹