Skip to main content

Error Handling

Act defines three primary error types. Each signals a different class of problem with a distinct resolution strategy.

ValidationError

Thrown when an action or event payload fails Zod schema validation.

import { ValidationError } from "@rotorsoft/act";

try {
await app.do("createUser", target, { email: 123 }); // wrong type
} catch (error) {
if (error instanceof ValidationError) {
console.error("Invalid payload:", error.details);
}
}

Resolution: Fix the payload to match the schema. This is always a caller error.

InvariantError

Thrown when a business rule defined via .given() is violated before events are emitted.

import { InvariantError } from "@rotorsoft/act";

try {
await app.do("CloseTicket", target, { reason: "Done" });
} catch (error) {
if (error instanceof InvariantError) {
console.error("Rule violated:", error.description);
console.error("Current state:", error.snapshot.state);
}
}

Resolution: Check preconditions before dispatching, or handle gracefully in the UI. The state was not modified.

ConcurrencyError

Thrown when optimistic concurrency control detects a conflict — another process committed events to the same stream between your load() and commit().

import { ConcurrencyError } from "@rotorsoft/act";

try {
await app.do("increment", target, { by: 1 });
} catch (error) {
if (error instanceof ConcurrencyError) {
console.error(`Stream ${error.stream}: expected v${error.expectedVersion}, found v${error.version}`);
}
}

Resolution: Retry with fresh state. The cache is invalidated automatically on concurrency errors.

Retry Pattern

async function withRetry(action, target, payload, maxRetries = 3) {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await app.do(action, target, payload);
} catch (error) {
if (error instanceof ConcurrencyError && attempt < maxRetries) {
continue; // cache was invalidated, next load() gets fresh state
}
throw error;
}
}
}

Error Constants

For string-based error matching (e.g., in tRPC error handlers):

import { Errors } from "@rotorsoft/act";

// Errors.ValidationError = "ERR_VALIDATION"
// Errors.InvariantError = "ERR_INVARIANT"
// Errors.ConcurrencyError = "ERR_CONCURRENCY"

Production Error Handling

import { Errors } from "@rotorsoft/act";

// tRPC mutation
CreateItem: authedProcedure
.input(z.object({ name: z.string() }))
.mutation(async ({ input, ctx }) => {
try {
const snaps = await app.do("CreateItem", { stream: id, actor: ctx.actor }, input);
app.settle();
return { success: true, id };
} catch (error) {
if (error.message === Errors.ValidationError) {
throw new TRPCError({ code: "BAD_REQUEST", message: "Invalid input" });
}
if (error.message === Errors.InvariantError) {
throw new TRPCError({ code: "PRECONDITION_FAILED", message: error.description });
}
if (error.message === Errors.ConcurrencyError) {
throw new TRPCError({ code: "CONFLICT", message: "Please retry" });
}
throw error;
}
}),

Blocked Streams

When a reaction handler fails repeatedly, the stream is blocked after exceeding maxRetries (default: 3). Blocked streams are excluded from poll() and won't be processed until manually unblocked.

Monitor blocked streams via the "blocked" lifecycle event:

app.on("blocked", (blocked) => {
blocked.forEach(({ stream, error, retry }) => {
console.error(`Stream ${stream} blocked after ${retry} retries: ${error}`);
// Alert, log to monitoring, etc.
});
});

Reaction options can be configured per handler:

.on("OrderPlaced")
.do(handler, { maxRetries: 5, blockOnError: true })
.to(resolver)