Concurrency model
Two distinct concurrency primitives, used at different layers for different problems. Conflating them is the most common source of confusion when reading the framework's source.
The two primitivesβ
| Optimistic concurrency | Stream leasing | |
|---|---|---|
| Where | Store.commit (writes) | Store.claim (reads-for-reactions) |
| What it protects | Stream version integrity | Reaction processing exclusivity |
| Mechanism | expectedVersion parameter | FOR UPDATE SKIP LOCKED row lock |
| Caller's job on conflict | Reload + retry the action | Nothing β the loser is silently skipped |
| Detected by | ConcurrencyError thrown | Empty claim() return for that stream |
Same store, same DB, but they don't interact. A stream can have a held reaction lease and successful commits at the same time β those are different rows in different operations.
Optimistic concurrency β the writer's safety netβ
Action commits are append-only and version-checked. Each event in a stream has a version (0-indexed, monotonic per stream). A commit asserts "the current head version is X; append after that."
caller framework store
β app.do(...) β β
β βββββββββββββββββββββββΊ β β
β β load() β snapshot.event β
β β expectedVersion = ev? β
β β reduce β emit events β
β β store.commit( β
β β stream, msgs, meta, β
β β expectedVersion βββββββββββΊ tx BEGIN
β β ) β SELECT max(version)
β β β if version != expectedVersion:
β β β throw ConcurrencyError
β β β INSERT events
β β β tx COMMIT
β β βββββββββββββββββββββββββββββ return Committed[]
β ββββββββββββββββββββββ β
If two callers race on the same stream, only one wins. The loser sees ConcurrencyError with expectedVersion and lastVersion (the actual head). Standard pattern is to reload state and retry.
Two failure modes the framework handlesβ
Predictable: caller's expectedVersion doesn't match. Framework throws ConcurrencyError from the version check.
Subtle: both transactions read the same max version, both pass the expectedVersion check, both try to INSERT at the same (stream, version) pair. The unique index catches the second INSERT β without explicit handling, this surfaces as an adapter-specific error (PG SQLSTATE 23505), not ConcurrencyError. Callers retrying on the framework signal would silently lose the commit.
Resolution (in PostgresStore.commit): catch the SQLSTATE 23505 from INSERT and re-throw as ConcurrencyError. After the catch, both failure modes look the same to the caller, and the retry path is consistent. Documented in commit.error.spec.ts and exercised by the same-stream scenario in the Postgres stress harness.
Reactions skip optimistic concurrency by designβ
Inside action(), when reactingTo is provided (i.e., the action was triggered by a reaction handler), expectedVersion is not enforced:
// internal/event-sourcing.ts, action()
reactingTo ? undefined : expected
The reasoning: reactions are inherently asynchronous catch-up. By the time a reaction processes event N, the stream has likely advanced past N. Forcing an expectedVersion check would convert ordinary catch-up into spurious retries. Stream leasing already serializes concurrent reactions on the same stream, so the version race doesn't matter.
Stream leasing β the reader's exclusivity primitiveβ
The drain pipeline polls for streams that have new events past their last-processed watermark, claims them via FOR UPDATE SKIP LOCKED, processes their events, then acks (releases the lease and advances the watermark) or blocks (marks the stream failed after exceeding retry budget).
worker A store worker B
β claim(by="A") β β
β ββββββββββββββββββββββββββΊ β β
β β tx BEGIN β
β β SELECT * FROM streams β
β β WHERE leased_until<NOW β
β β FOR UPDATE β
β β SKIP LOCKED β
β β UPDATE leased_by='A' β
β β tx COMMIT β
β βββββββββββββββββββββββββ β β
β [streams 1, 3, 5] β β
β β βββββββββββββββββββββββ claim(by="B")
β β tx BEGIN β
β β SELECT ... SKIP LOCKED β
β β β returns 2, 4 (1,3,5 β
β β locked by A; skipped) β
β β βββββββββββββββββββββββββΊ
β β β [streams 2, 4]
β process events for 1,3,5 β β process events for 2,4
β ack(by="A") β β ack(by="B")
β ββββββββββββββββββββββββββΊ β βββββββββββββββββββββββ β
SKIP LOCKED is the key: workers never block each other waiting for a lock. If a stream is held by another worker, the polling worker just gets the next available stream. Zero contention, no thundering herd. The trade-off is no fairness guarantees β a worker can repeatedly pick up the "easier" streams and leave the leased ones to time out β but in practice this is desirable (active workers stay active).
Lease lifecycleβ
βββββββββββββββββββββββββ
β leased_by=NULL β
β at=last_acked_pos β β steady state
ββββββββββββ¬βββββββββββββ
β claim()
βΌ
βββββββββββββββββββββββββ
β leased_by='worker-X' β
β leased_until=NOW+leaseβ
ββββββββββββ¬βββββββββββββ
β
βββββββββ ack() βΌ block() ββββββ
β β β
βΌ βΌ βΌ
leased_by=NULL leased_by=NULL blocked=true
at=new position at=last position retry_count++
retry_count=0 retry_count++
β
β (if retry_count > maxRetries
β AND blockOnError)
βΌ
set blocked=true
(no further claims)
Three "exits" from a leased state:
ackβ handler succeeded; advance the watermark to the last processed event ID, clear the lease, reset retry count.blockβ handler failed past the retry budget (or threwNonRetryableError); setblocked=true. The stream stays out ofclaim()results until something explicitly unblocks it. Useapp.unblock(input)to resume from where the stream stopped (the common case β operator fixed the underlying issue), orapp.reset(input)to rebuild from event 0 (projection rebuild, rare). Both accept either astring[]of stream names or aStreamFilter({ stream?, source?, blocked? }) for bulk operations β e.g.,app.unblock({ stream: "^webhooks-out-" })to clear a whole family at once, orapp.unblock({})for a post-incident "unblock everything blocked" sweep.- Timeout β worker died or hung;
leased_untilpasses; the nextclaim()from any worker can acquire the stream. Retry count is not incremented β the timed-out worker may have processed the events successfully but failed to ack.
Why a stream stays in claim() after a partial handler failureβ
If a reaction handler throws, the framework blocks the lease only if retry_count > maxRetries && blockOnError. Otherwise it just releases without advancing the watermark. The next claim() cycle picks the stream up again β same events, fresh handler invocation, retry count incremented. Bounded retries with backoff are configured per-reaction via ReactionOptions.
How the two interactβ
A common confusion: "If I commit while another worker holds a lease, does my commit fail?"
No. Stream leasing locks the row in the streams table (which tracks reaction watermarks). Commits write to the events table and check the (stream, version) index. Different rows, different locks. A commit and a reaction lease can be active on the same stream concurrently.
Real interaction surfaces in the close-the-books flow (Close cycle), where the close operation must coordinate both: tombstone the stream (write a guard event via commit), then verify no leases are held (lease lifecycle).
Observabilityβ
Both primitives surface in the trace breadcrumb stream:
- Optimistic concurrency:
ConcurrencyErrorthrown to caller; the framework logs nothing extra (caller decides what to log) - Lease lifecycle:
>> claimed,>> acked,>> blockedtraces frominternal/drain-cycle.tsdecorators
For a stuck stream, query store.query_streams directly β it returns the per-stream at, retry, blocked, and leased_by/leased_until without taking a lease. The act-inspector tool is built on this primitive.
Why no framework-level request deduplicationβ
Optimistic concurrency catches stream-version conflicts. It does not catch the case where a client retries a network-failed POST and the same intent commits twice. That's request-level idempotency, and the framework deliberately leaves it to the API edge (see Idempotency at the API edge).
A "use the action's correlation id as a dedup key" hook was evaluated and rejected. Five reasons:
- TOCTOU races. Two concurrent retries with the same key both pass the existence check before either commits. Either you add a distributed lock around the check (re-introducing the contention you were trying to avoid), or two events land. The API-edge cache sidesteps this by returning the previous response on duplicate keys without re-running the action.
- Semantic overloading.
correlationis a trace id that propagates through reactions. Reusing it as an idempotency key conflates two unrelated concerns β and means a downstream reaction that emits its own action with the same correlation id (the default) would be silently deduped against the original. - Cross-action collisions. A correlation id can drive multiple actions in a single workflow (
OpenTicketβAssignTicket). If "saw this key before" gates the second action, the workflow stalls silently. - State drift. The natural dedup behaviour is "return current state on duplicate." But current state may have advanced past the original commit's view β clients consuming the response would see different state for the "same" request depending on retry timing.
- No TTL in an immutable log. Correlation ids written to events live forever. A dedup table inside the event log can't expire entries without rewriting history. An external cache with a TTL is the natural fit, and that's what the API-edge pattern uses.
Resolution: keep the event log purely about what happened, and put "have I seen this request before?" in middleware where it can be cached, TTL'd, and shared across instances via Redis without touching the durable record. The production checklist shows the recommended tRPC middleware shape.
Pointersβ
libs/act/src/internal/event-sourcing.tsβaction()and theexpectedVersionchecklibs/act-pg/src/PostgresStore.tsβcommit()(with thePG_UNIQUE_VIOLATIONtranslation),claim()(withFOR UPDATE SKIP LOCKED)libs/act/src/internal/drain-cycle.tsβrunDrainCycleorchestration andDrainControllerlifecyclelibs/act-pg/test/stress/β multi-process exercise of both primitives under contention