Skip to main content

Concurrency model

Two distinct concurrency primitives, used at different layers for different problems. Conflating them is the most common source of confusion when reading the framework's source.

The two primitives​

Optimistic concurrencyStream leasing
WhereStore.commit (writes)Store.claim (reads-for-reactions)
What it protectsStream version integrityReaction processing exclusivity
MechanismexpectedVersion parameterFOR UPDATE SKIP LOCKED row lock
Caller's job on conflictReload + retry the actionNothing β€” the loser is silently skipped
Detected byConcurrencyError thrownEmpty claim() return for that stream

Same store, same DB, but they don't interact. A stream can have a held reaction lease and successful commits at the same time β€” those are different rows in different operations.

Optimistic concurrency β€” the writer's safety net​

Action commits are append-only and version-checked. Each event in a stream has a version (0-indexed, monotonic per stream). A commit asserts "the current head version is X; append after that."

caller framework store
β”‚ app.do(...) β”‚ β”‚
β”‚ ──────────────────────► β”‚ β”‚
β”‚ β”‚ load() β†’ snapshot.event β”‚
β”‚ β”‚ expectedVersion = ev? β”‚
β”‚ β”‚ reduce β†’ emit events β”‚
β”‚ β”‚ store.commit( β”‚
β”‚ β”‚ stream, msgs, meta, β”‚
β”‚ β”‚ expectedVersion ──────────► tx BEGIN
β”‚ β”‚ ) β”‚ SELECT max(version)
β”‚ β”‚ β”‚ if version != expectedVersion:
β”‚ β”‚ β”‚ throw ConcurrencyError
β”‚ β”‚ β”‚ INSERT events
β”‚ β”‚ β”‚ tx COMMIT
β”‚ β”‚ ◄──────────────────────────── return Committed[]
β”‚ ◄───────────────────── β”‚

If two callers race on the same stream, only one wins. The loser sees ConcurrencyError with expectedVersion and lastVersion (the actual head). Standard pattern is to reload state and retry.

Two failure modes the framework handles​

Predictable: caller's expectedVersion doesn't match. Framework throws ConcurrencyError from the version check.

Subtle: both transactions read the same max version, both pass the expectedVersion check, both try to INSERT at the same (stream, version) pair. The unique index catches the second INSERT β€” without explicit handling, this surfaces as an adapter-specific error (PG SQLSTATE 23505), not ConcurrencyError. Callers retrying on the framework signal would silently lose the commit.

Resolution (in PostgresStore.commit): catch the SQLSTATE 23505 from INSERT and re-throw as ConcurrencyError. After the catch, both failure modes look the same to the caller, and the retry path is consistent. Documented in commit.error.spec.ts and exercised by the same-stream scenario in the Postgres stress harness.

Reactions skip optimistic concurrency by design​

Inside action(), when reactingTo is provided (i.e., the action was triggered by a reaction handler), expectedVersion is not enforced:

// internal/event-sourcing.ts, action()
reactingTo ? undefined : expected

The reasoning: reactions are inherently asynchronous catch-up. By the time a reaction processes event N, the stream has likely advanced past N. Forcing an expectedVersion check would convert ordinary catch-up into spurious retries. Stream leasing already serializes concurrent reactions on the same stream, so the version race doesn't matter.

Stream leasing β€” the reader's exclusivity primitive​

The drain pipeline polls for streams that have new events past their last-processed watermark, claims them via FOR UPDATE SKIP LOCKED, processes their events, then acks (releases the lease and advances the watermark) or blocks (marks the stream failed after exceeding retry budget).

worker A store worker B
β”‚ claim(by="A") β”‚ β”‚
β”‚ ─────────────────────────► β”‚ β”‚
β”‚ β”‚ tx BEGIN β”‚
β”‚ β”‚ SELECT * FROM streams β”‚
β”‚ β”‚ WHERE leased_until<NOW β”‚
β”‚ β”‚ FOR UPDATE β”‚
β”‚ β”‚ SKIP LOCKED β”‚
β”‚ β”‚ UPDATE leased_by='A' β”‚
β”‚ β”‚ tx COMMIT β”‚
β”‚ ◄──────────────────────── β”‚ β”‚
β”‚ [streams 1, 3, 5] β”‚ β”‚
β”‚ β”‚ ◄────────────────────── claim(by="B")
β”‚ β”‚ tx BEGIN β”‚
β”‚ β”‚ SELECT ... SKIP LOCKED β”‚
β”‚ β”‚ β†’ returns 2, 4 (1,3,5 β”‚
β”‚ β”‚ locked by A; skipped) β”‚
β”‚ β”‚ ────────────────────────►
β”‚ β”‚ β”‚ [streams 2, 4]
β”‚ process events for 1,3,5 β”‚ β”‚ process events for 2,4
β”‚ ack(by="A") β”‚ β”‚ ack(by="B")
β”‚ ─────────────────────────► β”‚ ◄────────────────────── β”‚

SKIP LOCKED is the key: workers never block each other waiting for a lock. If a stream is held by another worker, the polling worker just gets the next available stream. Zero contention, no thundering herd. The trade-off is no fairness guarantees β€” a worker can repeatedly pick up the "easier" streams and leave the leased ones to time out β€” but in practice this is desirable (active workers stay active).

Lease lifecycle​

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ leased_by=NULL β”‚
β”‚ at=last_acked_pos β”‚ ← steady state
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ claim()
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ leased_by='worker-X' β”‚
β”‚ leased_until=NOW+leaseβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€ ack() β”Ό block() ─────┐
β”‚ β”‚ β”‚
β–Ό β–Ό β–Ό
leased_by=NULL leased_by=NULL blocked=true
at=new position at=last position retry_count++
retry_count=0 retry_count++
β”‚
β”‚ (if retry_count > maxRetries
β”‚ AND blockOnError)
β–Ό
set blocked=true
(no further claims)

Three "exits" from a leased state:

  • ack β€” handler succeeded; advance the watermark to the last processed event ID, clear the lease, reset retry count.
  • block β€” handler failed past the retry budget (or threw NonRetryableError); set blocked=true. The stream stays out of claim() results until something explicitly unblocks it. Use app.unblock(input) to resume from where the stream stopped (the common case β€” operator fixed the underlying issue), or app.reset(input) to rebuild from event 0 (projection rebuild, rare). Both accept either a string[] of stream names or a StreamFilter ({ stream?, source?, blocked? }) for bulk operations β€” e.g., app.unblock({ stream: "^webhooks-out-" }) to clear a whole family at once, or app.unblock({}) for a post-incident "unblock everything blocked" sweep.
  • Timeout β€” worker died or hung; leased_until passes; the next claim() from any worker can acquire the stream. Retry count is not incremented β€” the timed-out worker may have processed the events successfully but failed to ack.

Why a stream stays in claim() after a partial handler failure​

If a reaction handler throws, the framework blocks the lease only if retry_count > maxRetries && blockOnError. Otherwise it just releases without advancing the watermark. The next claim() cycle picks the stream up again β€” same events, fresh handler invocation, retry count incremented. Bounded retries with backoff are configured per-reaction via ReactionOptions.

How the two interact​

A common confusion: "If I commit while another worker holds a lease, does my commit fail?"

No. Stream leasing locks the row in the streams table (which tracks reaction watermarks). Commits write to the events table and check the (stream, version) index. Different rows, different locks. A commit and a reaction lease can be active on the same stream concurrently.

Real interaction surfaces in the close-the-books flow (Close cycle), where the close operation must coordinate both: tombstone the stream (write a guard event via commit), then verify no leases are held (lease lifecycle).

Observability​

Both primitives surface in the trace breadcrumb stream:

  • Optimistic concurrency: ConcurrencyError thrown to caller; the framework logs nothing extra (caller decides what to log)
  • Lease lifecycle: >> claimed, >> acked, >> blocked traces from internal/drain-cycle.ts decorators

For a stuck stream, query store.query_streams directly β€” it returns the per-stream at, retry, blocked, and leased_by/leased_until without taking a lease. The act-inspector tool is built on this primitive.

Why no framework-level request deduplication​

Optimistic concurrency catches stream-version conflicts. It does not catch the case where a client retries a network-failed POST and the same intent commits twice. That's request-level idempotency, and the framework deliberately leaves it to the API edge (see Idempotency at the API edge).

A "use the action's correlation id as a dedup key" hook was evaluated and rejected. Five reasons:

  1. TOCTOU races. Two concurrent retries with the same key both pass the existence check before either commits. Either you add a distributed lock around the check (re-introducing the contention you were trying to avoid), or two events land. The API-edge cache sidesteps this by returning the previous response on duplicate keys without re-running the action.
  2. Semantic overloading. correlation is a trace id that propagates through reactions. Reusing it as an idempotency key conflates two unrelated concerns β€” and means a downstream reaction that emits its own action with the same correlation id (the default) would be silently deduped against the original.
  3. Cross-action collisions. A correlation id can drive multiple actions in a single workflow (OpenTicket β†’ AssignTicket). If "saw this key before" gates the second action, the workflow stalls silently.
  4. State drift. The natural dedup behaviour is "return current state on duplicate." But current state may have advanced past the original commit's view β€” clients consuming the response would see different state for the "same" request depending on retry timing.
  5. No TTL in an immutable log. Correlation ids written to events live forever. A dedup table inside the event log can't expire entries without rewriting history. An external cache with a TTL is the natural fit, and that's what the API-edge pattern uses.

Resolution: keep the event log purely about what happened, and put "have I seen this request before?" in middleware where it can be cached, TTL'd, and shared across instances via Redis without touching the durable record. The production checklist shows the recommended tRPC middleware shape.

Pointers​

  • libs/act/src/internal/event-sourcing.ts β€” action() and the expectedVersion check
  • libs/act-pg/src/PostgresStore.ts β€” commit() (with the PG_UNIQUE_VIOLATION translation), claim() (with FOR UPDATE SKIP LOCKED)
  • libs/act/src/internal/drain-cycle.ts β€” runDrainCycle orchestration and DrainController lifecycle
  • libs/act-pg/test/stress/ β€” multi-process exercise of both primitives under contention