Configuration
Act uses a fluent builder pattern for defining domain logic, a port/adapter pattern for infrastructure (store, cache, logger), and a small set of orchestrator options for tuning correlation and settle behavior. This page covers all three.
State Builderโ
Define state machines with actions, events, and validation:
import { state } from "@rotorsoft/act";
import { z } from "zod";
const Counter = state({ Counter: z.object({ count: z.number() }) })
.init(() => ({ count: 0 }))
.emits({ Incremented: z.object({ amount: z.number() }) })
.patch({
Incremented: ({ data }, state) => ({ count: state.count + data.amount }),
})
.on({ increment: z.object({ by: z.number() }) })
.emit((action) => ["Incremented", { amount: action.by }])
.build();
Projection Builderโ
Read-model updaters that react to events:
import { projection } from "@rotorsoft/act";
const CounterProjection = projection("counters")
.on({ Incremented: z.object({ amount: z.number() }) })
.do(async ({ stream, data }) => { /* update read model */ })
.build();
Batched replayโ
For high-throughput rebuilds (e.g. catching up after a long downtime, or projecting onto a fresh read model), define a .batch(handler) that processes every event for a stream in a single transaction. When defined, .batch() is always called instead of the per-event .do() handlers.
const TicketProjection = projection("tickets")
.on({ TicketOpened: TicketOpenedSchema })
.do(async ({ stream, data }) => { /* per-event fallback */ })
.on({ TicketClosed: TicketClosedSchema })
.do(async ({ stream, data }) => { /* per-event fallback */ })
.batch(async (events, stream) => {
await db.transaction(async (tx) => {
for (const e of events) {
switch (e.name) {
case "TicketOpened": /* bulk insert */ break;
case "TicketClosed": /* bulk update */ break;
}
}
});
})
.build();
.batch() is only available on static-target projections (projection("target")). The events array is a discriminated union, so a switch (e.name) narrows both the name and data.
Slice Builderโ
Vertical feature modules grouping states, projections, and reactions:
import { slice } from "@rotorsoft/act";
const CounterSlice = slice()
.withState(Counter)
.withProjection(CounterProjection)
.on("Incremented")
.do(async (event, stream, app) => { /* cross-state dispatch via app */ })
.to((event) => ({ target: event.stream }))
.build();
Act Orchestratorโ
Compose everything into an application:
import { act } from "@rotorsoft/act";
const app = act()
.withSlice(CounterSlice)
.withState(AnotherState)
.withProjection(StandaloneProjection)
.on("SomeEvent")
.do(handler)
.to(resolver)
.build();
Act optionsโ
act().build(options?) accepts a small ActOptions object for tuning the orchestrator:
const app = act()
.withState(Counter)
.build({
maxSubscribedStreams: 5_000, // default 1000
settleDebounceMs: 25, // default 10
});
maxSubscribedStreams(default1000) โ cap for the LRU set tracking already-subscribed reaction targets. Apps that mint many dynamic targets (e.g. one stream per user activity) should raise this; the LRU is a memory bound, not a correctness mechanism โ eviction at most causes a redundantsubscribe()call.settleDebounceMs(default10) โ debounce window used bysettle()when no per-calldebounceMsis given. Coalesces commits in the same tick into a single correlateโdrain pass. Lower for tight tests; raise for bursty production traffic.onlyLanes(default: every declared lane) โ restrict this process to a subset of declared drain lanes (ACT-1103). See Lanes below.
Lanesโ
By default, every reaction lives in a single implicit "default" lane: one DrainController runs the whole pipeline with one timing budget. That works until reactions diverge โ a webhook delivery wants leaseMillis measured in tens of seconds, a best-effort notification wants short retries, and a long projection replay needs a generous claim budget. Tuning any one of them globally penalises the others.
.withLane({...}) declares an independent drain lane with its own controller, lease budget, claim limit, and cycle cadence. Reactions opt in via .to({lane}); reactions without an explicit lane stay in "default".
const app = act()
.withState(Ticket)
.withLane({ name: "webhooks", leaseMillis: 30_000, streamLimit: 5, cycleMs: 500 })
.withLane({ name: "best-effort", leaseMillis: 1_000, streamLimit: 20, cycleMs: 50 })
.on("OrderConfirmed")
.do(deliverWebhook)
.to({ target: "webhooks-out", lane: "webhooks" })
.on("OrderConfirmed")
.do(emitMetric)
.to({ target: "metrics-out", lane: "best-effort" })
.build();
LaneConfig fieldsโ
nameโ the lane identifier."default"is reserved for the implicit lane; declaring it explicitly throws.leaseMillisโ lease window forclaim()calls in this lane. Sized to the longest expected handler invocation in the lane plus headroom.streamLimitโ max streams claimed per cycle. Bounds the parallel-handler dispatch budget for the lane.cycleMsโ when set, auto-starts a per-lanesetTimeoutchain that calls the controller'sdrain()at this cadence. The timer isunref()'d so it doesn't keep the process alive;app.shutdown()clears it. When omitted, the lane drains alongside the Act-levelsettle()loop.
Each declared lane field overrides caller-passed DrainOptions at drain time โ withLane({leaseMillis: 30_000}) would be meaningless if drain({leaseMillis: 1_000}) could erase it. Caller options only apply when the lane is silent on the field.
Type-safe lane referencesโ
The builder threads declared lane names into its TLanes generic. .to({lane: "..."}) and ActOptions.onlyLanes are narrowed to that union at the call site โ typos fail compile:
const app = act()
.withState(Ticket)
.withLane({ name: "webhooks" })
.on("OrderConfirmed")
.do(deliverWebhook)
// @ts-expect-error "wbhooks" is not a declared lane
.to({ target: "out", lane: "wbhooks" })
.build({
// @ts-expect-error same โ caught at the options site too
onlyLanes: ["wbhooks"],
});
Slices declare their own lanes via the same .withLane(...) method; act().withSlice(slice) merges the slice's lanes into the Act's set. Conflicting timing configs (same lane name, different leaseMillis/streamLimit/cycleMs between the slice and the Act) throw at composition time โ pick one declaration.
Re-laning at restartโ
subscribe() UPSERTs each stream's lane on every call. If you change a target's lane in the builder and restart, the store rewrites the persisted lane on the next correlate(). Online re-laning (changing a stream's lane while workers hold leases) is not supported โ the safe trigger is process restart.
Conflicting lane assignmentsโ
Two reactions routing to the same (target, source) stream must declare the same lane. Lanes have no ordering, so there's no max() merge analogous to priority โ the build-time scan throws on disagreement:
// throws at act().build()
act()
.withState(Ticket)
.withLane({ name: "slow" }).withLane({ name: "fast" })
.on("OrderConfirmed").do(handlerA).to({ target: "shared", lane: "slow" })
.on("OrderConfirmed").do(handlerB).to({ target: "shared", lane: "fast" })
.build();
onlyLanes โ process-per-lane deploymentโ
ActOptions.onlyLanes restricts which lanes' controllers boot in this process. With onlyLanes: ["webhooks"], only the webhook controller runs; other declared lanes are silent. Workers in different processes coordinate via the store's SKIP LOCKED semantics, so the same image can be deployed as one-process-per-lane without code changes.
This is an escape hatch, not the primary path. A single process with multiple declared lanes already gets fast-lane responsiveness โ Act._drainAll runs every controller's drain in parallel, so a slow lane's in-flight handler doesn't block a fast lane's claim. onlyLanes is for the cases where you want hardware isolation (different CPU/memory per lane) on top of that.
Port/Adapter Patternโ
Infrastructure concerns (logging, storage, caching) use singleton adapters injected via port functions. All three ports follow the same pattern โ first call wins, with a sensible default:
import { log, store, cache } from "@rotorsoft/act";
const logger = log(); // ConsoleLogger (default)
const s = store(); // InMemoryStore (default)
const c = cache(); // InMemoryCache (default)
Loggerโ
The default ConsoleLogger emits JSON lines in production (compatible with GCP, AWS CloudWatch, Datadog) and colorized output in development โ zero dependencies.
import { log } from "@rotorsoft/act";
const logger = log();
logger.info("Application started");
For pino, inject the adapter from @rotorsoft/act-pino:
import { log } from "@rotorsoft/act";
import { PinoLogger } from "@rotorsoft/act-pino";
log(new PinoLogger({ level: "debug", pretty: true }));
The Logger interface is minimal and compatible with pino, winston, bunyan, and other popular loggers:
interface Logger extends Disposable {
level: string;
fatal(obj: unknown, msg?: string): void;
error(obj: unknown, msg?: string): void;
warn(obj: unknown, msg?: string): void;
info(obj: unknown, msg?: string): void;
debug(obj: unknown, msg?: string): void;
trace(obj: unknown, msg?: string): void;
child(bindings: Record<string, unknown>): Logger;
}
Storeโ
import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";
// Development: in-memory (default)
const s = store();
// Production: inject PostgreSQL
store(new PostgresStore({
host: "localhost",
database: "myapp",
user: "postgres",
password: "secret",
schema: "public",
table: "events",
}));
// Embedded / single-node: SQLite via libSQL
import { SqliteStore } from "@rotorsoft/act-sqlite";
store(new SqliteStore({ url: "file:myapp.db" }));
Cacheโ
Cache is always-on with InMemoryCache (LRU, maxSize 1000) as the default:
import { cache } from "@rotorsoft/act";
// Default: InMemoryCache โ no setup needed
// For distributed deployments:
cache(new RedisCache({ url: "redis://localhost:6379" }));
The Cache interface is async for forward-compatibility with external caches:
interface Cache extends Disposable {
get<TState>(stream: string): Promise<CacheEntry<TState> | undefined>;
set<TState>(stream: string, entry: CacheEntry<TState>): Promise<void>;
invalidate(stream: string): Promise<void>;
clear(): Promise<void>;
}
Resource Disposalโ
All adapters (logger, store, cache, and custom disposers) are cleaned up via dispose()():
import { dispose } from "@rotorsoft/act";
// Register custom cleanup
dispose(async () => {
await redis.quit();
});
// Trigger cleanup (graceful shutdown or test teardown)
await dispose()();
Custom Store Implementationโ
Implement the Store interface for custom backends:
interface Store extends Disposable {
seed(): Promise<void>;
drop(): Promise<void>;
commit(stream, msgs, meta, expectedVersion?): Promise<Committed[]>;
query(callback, filter?): Promise<number>;
claim(lagging, leading, by, millis, lane?): Promise<Lease[]>;
subscribe(streams): Promise<{ subscribed: number; watermark: number }>;
ack(leases): Promise<Lease[]>;
block(leases): Promise<(Lease & { error })[]>;
reset(streams): Promise<number>;
truncate(targets): Promise<TruncateResult>;
query_streams(callback, query?): Promise<{ maxEventId: number; count: number }>;
dispose(): Promise<void>;
}
claim() atomically discovers and locks streams for processing using PostgreSQL's FOR UPDATE SKIP LOCKED pattern โ zero-contention competing consumers where workers never block each other. subscribe() registers new streams for reaction processing and returns the count of newly registered streams. query_streams() is read-only introspection over subscription positions โ used by operational dashboards (projection lag, blocked subscriptions) without opening a second connection or running raw SQL against the adapter-specific streams table. Version-based optimistic concurrency must be implemented correctly. See the PostgresStore source for a production-grade reference.