Skip to main content

Builders & Adapters

Tips for Builders & Adapters

  • Use StateBuilder to define state machines, actions, events, and validation logic in a fluent, type-safe way.
  • Use ActBuilder to compose applications from multiple state machines and reactions.
  • Adapters allow you to swap event stores (e.g., in-memory, Postgres, custom) without changing business logic.
  • Use reactions to decouple workflows and integrate with external systems.
  • Visualize your application architecture to clarify how builders and adapters interact.

Example 1: Composing State Machines with Builders

Scenario: You want to define two state machines (e.g., a Counter and a TodoList) and compose them into a single application using ActBuilder.

import { act, state, z } from "@rotorsoft/act";

const Counter = state("Counter", z.object({ count: z.number() }))
.init(() => ({ count: 0 }))
.emits({ Incremented: z.object({ amount: z.number() }) })
.patch({
Incremented: (event, state) => ({ count: state.count + event.amount }),
})
.on("increment", z.object({ by: z.number() }))
.emit((action) => ["Incremented", { amount: action.by }])
.build();

const TodoList = state("TodoList", z.object({ todos: z.array(z.string()) }))
.init(() => ({ todos: [] }))
.emits({ Added: z.object({ todo: z.string() }) })
.patch({
Added: (event, state) => ({ todos: [...state.todos, event.todo] }),
})
.on("add", z.object({ todo: z.string() }))
.emit((action) => ["Added", { todo: action.todo }])
.build();

const app = act().with(Counter).with(TodoList).build();

Example 2: Using a Postgres or Custom Store Adapter

Scenario: You want to use a Postgres event store in production and an in-memory store for testing, without changing your application logic. You inject the adapter using the store() function before any event processing.

import { store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";

store(
new PostgresStore({
host: process.env.PGHOST || "localhost",
port: Number(process.env.PGPORT) || 5432,
database: process.env.PGDATABASE || "postgres",
user: process.env.PGUSER || "postgres",
password: process.env.PGPASSWORD || "postgres",
schema: "public", // or your custom schema
table: "events", // or your custom table
})
);

How to Build a Custom Store Adapter

Scenario: You want to connect your application to a custom event store (e.g., a cloud database). Implement the Store interface and inject your adapter using the store() function.

import type {
Store,
EventMeta,
Message,
Committed,
Schemas,
} from "@rotorsoft/act";

export class MyCustomStore implements Store {
async seed() {
/* ... */
}
async drop() {
/* ... */
}
async dispose() {
/* ... */
}
async commit<E extends Schemas>(
stream: string,
msgs: Message<E, keyof E>[],
meta: EventMeta,
expectedVersion?: number
): Promise<Committed<E, keyof E>[]> {
// Your persistence logic here
return [];
}
async query<E extends Schemas>(
callback: (event: Committed<E, keyof E>) => void,
query?: any
): Promise<number> {
// Your query logic here
return 0;
}
async fetch<E extends Schemas>(limit: number) {
/* ... */ return { streams: [], events: [] };
}
async lease(leases: any[]) {
/* ... */ return [];
}
async ack(leases: any[]) {
/* ... */
}
}

// Inject your custom store
import { store } from "@rotorsoft/act";
store(new MyCustomStore());

Advanced: Lease Management & Reliable Event Stream Processing

Building a robust custom store adapter is much more than just persisting events—it requires careful design for distributed, reliable, and efficient event stream processing, with lease management at its core.

What is a Lease?

A lease is a temporary claim on a stream for processing. It ensures that only one process (or worker) is handling a given event stream at a time, supporting distributed, parallel, and fault-tolerant event processing.

Key Methods

  • fetch(limit): Returns a batch of streams and events that are ready for processing (not blocked or already leased).
  • lease(leases): Attempts to acquire leases on the given streams. Should atomically mark streams as leased (with a timeout/expiry).
  • ack(leases): Acknowledges processing is complete, updates stream positions, and releases the lease.

Challenges

  • Atomicity: Leasing and updating stream positions must be atomic to avoid race conditions.
  • Timeouts: Leases must expire if a worker crashes or hangs, so other workers can take over.
  • Retries: Support for retrying failed streams, with backoff or retry counters.
  • Blocking: Ability to block streams that are in an error state or require manual intervention.
  • Scalability: Efficiently handle many streams and high event throughput.

Best Practices

  • Use transactions for all lease and commit operations.
  • Index your stream and lease tables for fast lookups and updates.
  • Handle lease expiry: If a lease is not acknowledged in time, it should become available for other workers.
  • Support retries and backoff for failed processing.
  • Log and monitor lease acquisition, expiry, and errors for observability.

Sample Lease Table Schema (for SQL-based stores)

CREATE TABLE streams (
stream VARCHAR PRIMARY KEY,
at INT NOT NULL DEFAULT -1,
retry SMALLINT NOT NULL DEFAULT 0,
blocked BOOLEAN NOT NULL DEFAULT FALSE,
leased_by UUID,
leased_at INT,
leased_until TIMESTAMPTZ
);

Summary Table

MethodPurposeComplexity/Notes
fetchFind streams/events ready for processingMust filter out blocked/leased streams
leaseAtomically acquire leases on streamsUse transactions/locking, set expiry
ackRelease lease, update stream positionMust be atomic, handle errors and retries

Note: Lease management is critical for correctness and reliability in distributed event-driven systems. Study the PostgresStore for a production-grade reference implementation.