Event Sourcing & Processing
Act persists all state changes as immutable events in an append-only log. The event store is the single source of truth — state is reconstructed by replaying events.
Event Store
Append-Only, Immutable Log
- All state changes are recorded as new events — past data is never modified
- Events are timestamped and versioned, enabling state reconstruction at any point
- Each entity instance has its own stream with strict ordering
Optimistic Concurrency
Each stream maintains a version number. When committing events, the system verifies the expected version matches. If another process wrote events in the meantime, a ConcurrencyError is thrown. The caller retries with the latest state.
Storage Backends
The event store uses a port/adapter pattern:
- InMemoryStore (default) — fast, ephemeral, for development and testing
- PostgresStore (
@rotorsoft/act-pg) — production-ready with ACID guarantees and connection pooling
import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";
store(new PostgresStore({
host: "localhost",
database: "myapp",
user: "postgres",
password: "secret",
}));
Cache
Cache is always-on with InMemoryCache (LRU, maxSize 1000) as the default. It eliminates full event replay on every load() by storing the latest state checkpoint in memory. Actions update the cache after each commit; concurrency errors invalidate stale entries.
For distributed deployments, implement the Cache interface backed by Redis or another external store:
import { cache } from "@rotorsoft/act";
cache(new RedisCache({ url: "redis://localhost:6379" }));
Resource Disposal
All adapters (store, cache, custom disposers) are cleaned up via dispose()():
import { dispose } from "@rotorsoft/act";
// Graceful shutdown
process.on("SIGTERM", async () => {
await dispose()();
});
// In tests
afterAll(async () => {
await dispose()();
});
Event-Driven Processing
Act handles event-driven workflows through atomic stream claiming and correlation. The event store itself acts as the message backbone — no external message queues needed.
Reactions
Reactions are asynchronous handlers triggered by events. They can update other state streams, trigger external integrations, or drive cross-aggregate workflows:
const app = act()
.withState(Account)
.on("Deposited")
.do(async (event, stream, app) => {
await app.do("LogEntry", target, { message: `Deposit: ${event.data.amount}` }, event);
})
.to((event) => ({ target: `audit-${event.stream}` }))
.build();
Void vs Routed Reactions
.to(resolver)— reaction is processed bydrain(). The resolver returns{ target: string, source?: string }..void()— reaction is never processed bydrain(). Use only for inline side effects (logging, metrics).
Stream Claiming
Act uses an atomic claim mechanism to coordinate distributed consumers. The claim() method discovers and locks streams in a single operation using PostgreSQL's FOR UPDATE SKIP LOCKED pattern — zero-contention competing consumers where workers never block each other:
- Per-stream ordering — events within a stream are processed sequentially
- Temporary ownership — claims expire after a configurable duration
- Zero-contention — locked rows are silently skipped, no blocking between workers
- Backpressure — only a limited number of claims active at a time
Event Correlation
Correlation enables dynamic stream discovery:
- Each action/event carries
correlation(request trace) andcausation(what triggered it) metadata app.correlate()scans events, discovers new target streams via reaction resolvers, and registers them withsubscribe(). Returns{ subscribed, last_id }wheresubscribedis the count of newly registered streams- Must be called before
drain()to register streams
Optimization: Resolvers are classified at build time as static or dynamic. Static targets (_this_, .to("target")) are subscribed once at init. An advancing checkpoint ensures correlate only scans new events. When no dynamic resolvers exist, correlate is skipped entirely — settle goes straight to drain.
The Drain Cycle
drain() processes pending reactions:
- Claim — atomically discover and lock streams with pending events (uses
FOR UPDATE SKIP LOCKEDfor zero-contention) - Fetch — load events for each claimed stream
- Match — find reactions whose resolvers target each stream
- Handle — execute reaction handlers
- Ack/Block — release successful claims or block failed streams
// In tests
await app.correlate();
await app.drain();
// In production — debounced, non-blocking
app.settle();
Dual-Frontier Strategy
Each drain cycle divides streams into two sets:
- Lagging frontier — streams behind or newly discovered, advanced quickly to catch up
- Leading frontier — streams near the latest event, continue processing without waiting
The ratio adapts dynamically based on event pressure (clamped between 20-80%).
settle()
The recommended production pattern. Debounces rapid commits, runs correlate→drain in a loop until the system is consistent, then emits a "settled" lifecycle event:
await app.do("CreateItem", target, input);
app.settle(); // non-blocking, returns immediately
app.on("settled", (drain) => {
// notify SSE clients, invalidate caches, etc.
});
Lifecycle Events
app.on("committed", ...)— observe all state changesapp.on("acked", ...)— observe acknowledged reactionsapp.on("blocked", ...)— catch reaction processing failuresapp.on("settled", ...)— react whensettle()completes
Testing
import { store, dispose } from "@rotorsoft/act";
beforeEach(async () => {
await store().seed();
});
afterAll(async () => {
await dispose()();
});
it("should process reactions", async () => {
await app.do("CreateItem", target, { name: "Test" });
await app.correlate();
await app.drain();
// assert...
});