Skip to main content

Configuration

Act uses a fluent builder pattern for defining domain logic, a port/adapter pattern for infrastructure (store, cache, logger), and a small set of orchestrator options for tuning correlation and settle behavior. This page covers all three.

State Builderโ€‹

Define state machines with actions, events, and validation:

import { state } from "@rotorsoft/act";
import { z } from "zod";

const Counter = state({ Counter: z.object({ count: z.number() }) })
.init(() => ({ count: 0 }))
.emits({ Incremented: z.object({ amount: z.number() }) })
.patch({
Incremented: ({ data }, state) => ({ count: state.count + data.amount }),
})
.on({ increment: z.object({ by: z.number() }) })
.emit((action) => ["Incremented", { amount: action.by }])
.build();

Projection Builderโ€‹

Read-model updaters that react to events:

import { projection } from "@rotorsoft/act";

const CounterProjection = projection("counters")
.on({ Incremented: z.object({ amount: z.number() }) })
.do(async ({ stream, data }) => { /* update read model */ })
.build();

Batched replayโ€‹

For high-throughput rebuilds (e.g. catching up after a long downtime, or projecting onto a fresh read model), define a .batch(handler) that processes every event for a stream in a single transaction. When defined, .batch() is always called instead of the per-event .do() handlers.

const TicketProjection = projection("tickets")
.on({ TicketOpened: TicketOpenedSchema })
.do(async ({ stream, data }) => { /* per-event fallback */ })
.on({ TicketClosed: TicketClosedSchema })
.do(async ({ stream, data }) => { /* per-event fallback */ })
.batch(async (events, stream) => {
await db.transaction(async (tx) => {
for (const e of events) {
switch (e.name) {
case "TicketOpened": /* bulk insert */ break;
case "TicketClosed": /* bulk update */ break;
}
}
});
})
.build();

.batch() is only available on static-target projections (projection("target")). The events array is a discriminated union, so a switch (e.name) narrows both the name and data.

Slice Builderโ€‹

Vertical feature modules grouping states, projections, and reactions:

import { slice } from "@rotorsoft/act";

const CounterSlice = slice()
.withState(Counter)
.withProjection(CounterProjection)
.on("Incremented")
.do(async (event, stream, app) => { /* cross-state dispatch via app */ })
.to((event) => ({ target: event.stream }))
.build();

Act Orchestratorโ€‹

Compose everything into an application:

import { act } from "@rotorsoft/act";

const app = act()
.withSlice(CounterSlice)
.withState(AnotherState)
.withProjection(StandaloneProjection)
.on("SomeEvent")
.do(handler)
.to(resolver)
.build();

Act optionsโ€‹

act().build(options?) accepts a small ActOptions object for tuning the orchestrator:

const app = act()
.withState(Counter)
.build({
maxSubscribedStreams: 5_000, // default 1000
settleDebounceMs: 25, // default 10
});
  • maxSubscribedStreams (default 1000) โ€” cap for the LRU set tracking already-subscribed reaction targets. Apps that mint many dynamic targets (e.g. one stream per user activity) should raise this; the LRU is a memory bound, not a correctness mechanism โ€” eviction at most causes a redundant subscribe() call.
  • settleDebounceMs (default 10) โ€” debounce window used by settle() when no per-call debounceMs is given. Coalesces commits in the same tick into a single correlateโ†’drain pass. Lower for tight tests; raise for bursty production traffic.
  • onlyLanes (default: every declared lane) โ€” restrict this process to a subset of declared drain lanes (ACT-1103). See Lanes below.

Lanesโ€‹

By default, every reaction lives in a single implicit "default" lane: one DrainController runs the whole pipeline with one timing budget. That works until reactions diverge โ€” a webhook delivery wants leaseMillis measured in tens of seconds, a best-effort notification wants short retries, and a long projection replay needs a generous claim budget. Tuning any one of them globally penalises the others.

.withLane({...}) declares an independent drain lane with its own controller, lease budget, claim limit, and cycle cadence. Reactions opt in via .to({lane}); reactions without an explicit lane stay in "default".

const app = act()
.withState(Ticket)
.withLane({ name: "webhooks", leaseMillis: 30_000, streamLimit: 5, cycleMs: 500 })
.withLane({ name: "best-effort", leaseMillis: 1_000, streamLimit: 20, cycleMs: 50 })
.on("OrderConfirmed")
.do(deliverWebhook)
.to({ target: "webhooks-out", lane: "webhooks" })
.on("OrderConfirmed")
.do(emitMetric)
.to({ target: "metrics-out", lane: "best-effort" })
.build();

LaneConfig fieldsโ€‹

  • name โ€” the lane identifier. "default" is reserved for the implicit lane; declaring it explicitly throws.
  • leaseMillis โ€” lease window for claim() calls in this lane. Sized to the longest expected handler invocation in the lane plus headroom.
  • streamLimit โ€” max streams claimed per cycle. Bounds the parallel-handler dispatch budget for the lane.
  • cycleMs โ€” when set, auto-starts a per-lane setTimeout chain that calls the controller's drain() at this cadence. The timer is unref()'d so it doesn't keep the process alive; app.shutdown() clears it. When omitted, the lane drains alongside the Act-level settle() loop.

Each declared lane field overrides caller-passed DrainOptions at drain time โ€” withLane({leaseMillis: 30_000}) would be meaningless if drain({leaseMillis: 1_000}) could erase it. Caller options only apply when the lane is silent on the field.

Type-safe lane referencesโ€‹

The builder threads declared lane names into its TLanes generic. .to({lane: "..."}) and ActOptions.onlyLanes are narrowed to that union at the call site โ€” typos fail compile:

const app = act()
.withState(Ticket)
.withLane({ name: "webhooks" })
.on("OrderConfirmed")
.do(deliverWebhook)
// @ts-expect-error "wbhooks" is not a declared lane
.to({ target: "out", lane: "wbhooks" })
.build({
// @ts-expect-error same โ€” caught at the options site too
onlyLanes: ["wbhooks"],
});

Slices declare their own lanes via the same .withLane(...) method; act().withSlice(slice) merges the slice's lanes into the Act's set. Conflicting timing configs (same lane name, different leaseMillis/streamLimit/cycleMs between the slice and the Act) throw at composition time โ€” pick one declaration.

Re-laning at restartโ€‹

subscribe() UPSERTs each stream's lane on every call. If you change a target's lane in the builder and restart, the store rewrites the persisted lane on the next correlate(). Online re-laning (changing a stream's lane while workers hold leases) is not supported โ€” the safe trigger is process restart.

Conflicting lane assignmentsโ€‹

Two reactions routing to the same (target, source) stream must declare the same lane. Lanes have no ordering, so there's no max() merge analogous to priority โ€” the build-time scan throws on disagreement:

// throws at act().build()
act()
.withState(Ticket)
.withLane({ name: "slow" }).withLane({ name: "fast" })
.on("OrderConfirmed").do(handlerA).to({ target: "shared", lane: "slow" })
.on("OrderConfirmed").do(handlerB).to({ target: "shared", lane: "fast" })
.build();

onlyLanes โ€” process-per-lane deploymentโ€‹

ActOptions.onlyLanes restricts which lanes' controllers boot in this process. With onlyLanes: ["webhooks"], only the webhook controller runs; other declared lanes are silent. Workers in different processes coordinate via the store's SKIP LOCKED semantics, so the same image can be deployed as one-process-per-lane without code changes.

This is an escape hatch, not the primary path. A single process with multiple declared lanes already gets fast-lane responsiveness โ€” Act._drainAll runs every controller's drain in parallel, so a slow lane's in-flight handler doesn't block a fast lane's claim. onlyLanes is for the cases where you want hardware isolation (different CPU/memory per lane) on top of that.

Port/Adapter Patternโ€‹

Infrastructure concerns (logging, storage, caching) use singleton adapters injected via port functions. All three ports follow the same pattern โ€” first call wins, with a sensible default:

import { log, store, cache } from "@rotorsoft/act";

const logger = log(); // ConsoleLogger (default)
const s = store(); // InMemoryStore (default)
const c = cache(); // InMemoryCache (default)

Loggerโ€‹

The default ConsoleLogger emits JSON lines in production (compatible with GCP, AWS CloudWatch, Datadog) and colorized output in development โ€” zero dependencies.

import { log } from "@rotorsoft/act";

const logger = log();
logger.info("Application started");

For pino, inject the adapter from @rotorsoft/act-pino:

import { log } from "@rotorsoft/act";
import { PinoLogger } from "@rotorsoft/act-pino";

log(new PinoLogger({ level: "debug", pretty: true }));

The Logger interface is minimal and compatible with pino, winston, bunyan, and other popular loggers:

interface Logger extends Disposable {
level: string;
fatal(obj: unknown, msg?: string): void;
error(obj: unknown, msg?: string): void;
warn(obj: unknown, msg?: string): void;
info(obj: unknown, msg?: string): void;
debug(obj: unknown, msg?: string): void;
trace(obj: unknown, msg?: string): void;
child(bindings: Record<string, unknown>): Logger;
}

Storeโ€‹

import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";

// Development: in-memory (default)
const s = store();

// Production: inject PostgreSQL
store(new PostgresStore({
host: "localhost",
database: "myapp",
user: "postgres",
password: "secret",
schema: "public",
table: "events",
}));

// Embedded / single-node: SQLite via libSQL
import { SqliteStore } from "@rotorsoft/act-sqlite";
store(new SqliteStore({ url: "file:myapp.db" }));

Cacheโ€‹

Cache is always-on with InMemoryCache (LRU, maxSize 1000) as the default:

import { cache } from "@rotorsoft/act";

// Default: InMemoryCache โ€” no setup needed
// For distributed deployments:
cache(new RedisCache({ url: "redis://localhost:6379" }));

The Cache interface is async for forward-compatibility with external caches:

interface Cache extends Disposable {
get<TState>(stream: string): Promise<CacheEntry<TState> | undefined>;
set<TState>(stream: string, entry: CacheEntry<TState>): Promise<void>;
invalidate(stream: string): Promise<void>;
clear(): Promise<void>;
}

Resource Disposalโ€‹

All adapters (logger, store, cache, and custom disposers) are cleaned up via dispose()():

import { dispose } from "@rotorsoft/act";

// Register custom cleanup
dispose(async () => {
await redis.quit();
});

// Trigger cleanup (graceful shutdown or test teardown)
await dispose()();

Custom Store Implementationโ€‹

Implement the Store interface for custom backends:

interface Store extends Disposable {
seed(): Promise<void>;
drop(): Promise<void>;
commit(stream, msgs, meta, expectedVersion?): Promise<Committed[]>;
query(callback, filter?): Promise<number>;
claim(lagging, leading, by, millis, lane?): Promise<Lease[]>;
subscribe(streams): Promise<{ subscribed: number; watermark: number }>;
ack(leases): Promise<Lease[]>;
block(leases): Promise<(Lease & { error })[]>;
reset(streams): Promise<number>;
truncate(targets): Promise<TruncateResult>;
query_streams(callback, query?): Promise<{ maxEventId: number; count: number }>;
dispose(): Promise<void>;
}

claim() atomically discovers and locks streams for processing using PostgreSQL's FOR UPDATE SKIP LOCKED pattern โ€” zero-contention competing consumers where workers never block each other. subscribe() registers new streams for reaction processing and returns the count of newly registered streams. query_streams() is read-only introspection over subscription positions โ€” used by operational dashboards (projection lag, blocked subscriptions) without opening a second connection or running raw SQL against the adapter-specific streams table. Version-based optimistic concurrency must be implemented correctly. See the PostgresStore source for a production-grade reference.