External integration patterns
Act apps that reach beyond their own process โ webhooks, downstream services, message buses โ face the same two questions every time: what owns delivery and how do we keep at-least-once from becoming actual duplication. This guide answers both, with two integration shapes and one safety contract that makes either of them work.
TL;DRโ
| You need | Use |
|---|---|
| POST events to one receiver, fast and idempotent | Inline delivery โ webhook directly in a reaction |
| Multiple downstream consumers, high fan-out, slow consumers | Forwarded delivery โ reaction publishes to a bus, downstream owns delivery |
| At-least-once that doesn't double-charge customers | Receiver-side idempotency with Idempotency-Key โ both shapes need this |
| To recover a blocked stream after fixing the bug | app.blocked_streams() โ fix โ app.unblock(input) (the recovery loop) |
The rest of this page expands each shape.
1. Inline delivery โ drain is the pipelineโ
The simplest shape: a reaction calls webhook(...) (from @rotorsoft/act-http/webhook), drain owns ordering and retries, and the failure paths fall back onto the framework's existing primitives.
import { slice } from "@rotorsoft/act";
import { webhook } from "@rotorsoft/act-http/webhook";
export const OrderWebhooksSlice = slice()
.withState(OrderOperations)
.on("OrderConfirmed")
.do(
webhook({
url: "https://api.example.com/webhooks/orders",
headers: (event) => ({ Authorization: "Bearer " + token(event) }),
body: (event) => ({ orderId: event.stream, total: event.data.total }),
timeoutMs: 2_000,
}),
{
maxRetries: 5,
backoff: { strategy: "exponential", baseMs: 200, maxMs: 30_000, jitter: true },
}
)
.to({ target: "order-webhooks-out" })
.build();
This is the wolfdesk pattern verbatim โ see packages/wolfdesk/src/ticket-webhooks.ts for the running example.
What the framework gives you, for freeโ
Once you wire a reaction this way, drain provides:
- Per-stream ordering. Events for
order-42-webhooks-outalways dispatch in event-id order; concurrent claims on the same stream are prevented byFOR UPDATE SKIP LOCKED(or the in-memory equivalent). - At-least-once delivery. A handler that throws gets re-claimed on the next drain cycle. The watermark only advances on successful ack.
- Retry pacing.
backoff(since ACT-601) holds the lease and skips dispatch until the configured delay elapses, so a flaky receiver gets paced instead of hammered. - Permanent-failure detection. 4xx responses from
webhookthrowNonRetryableWebhookError(aNonRetryableErrorsubclass), which the drain finalizer recognizes and blocks the stream on the first failed attempt โ no wasted retries on a malformed payload. See Non-retryable errors. - Dead letter. Streams blocked by
block()(whether frommaxRetriesexhaustion orNonRetryableError) stay out ofclaim()untilapp.unblock(input)clears them. See Recovering a blocked stream. - Cross-process competing consumers. N workers running the same Act against the same store compete via
claim(); only one wins per stream. No coordination work for the developer. See cross-process reactions.
When inline delivery is the right toolโ
| Property | Inline |
|---|---|
| Number of receivers | One per reaction; small fan-out |
| Receiver latency | Sub-second; well under leaseMillis |
| Volume | Modest โ drain handles it cycle-by-cycle |
| Ownership | You control sender and receiver |
| Receiver behavior | Idempotent (handles Idempotency-Key) |
The constraint that catches teams: timeoutMs must stay below leaseMillis with a safety margin. The drain lease is what stops competing workers from re-dispatching while your handler is in flight. If timeoutMs approaches or exceeds the lease, a slow receiver can hold the lease through expiry, at which point another worker will claim the stream and POST the same event in parallel. The downstream Idempotency-Key then becomes load-bearing โ if your receiver doesn't dedup, you'll deliver twice. The webhook helper README covers this constraint in detail.
The default lease is around 5 seconds. A 2-second timeoutMs leaves headroom for retry. A 10-second timeoutMs does not.
When inline is not the right toolโ
Three signals that say "stop, this needs a different shape":
- One reaction, many receivers. Every new receiver means another reaction, another claim, another lease. Drain wasn't designed to coordinate ten parallel webhooks against the same event โ that's the bus's job.
- Slow consumers. A receiver that takes 30 seconds will exceed any reasonable lease. Bumping
leaseMillisglobally slows recovery for every reaction in the system. Wrong knob. - Bursty fan-out. A 10,000-receiver broadcast inside drain holds 10,000 leases at once. Drain is for ordered, paced delivery โ bursts belong on a bus.
Each of those is a signal to read the next section.
2. Forwarded delivery โ the bus is the pipelineโ
When inline doesn't fit, the pattern is to publish events to a real message bus (Kafka, SQS, Redpanda, NATS, RabbitMQ) and let downstream consumers own delivery semantics. Act keeps its drain semantics for the publish step; the bus takes over from there.
import { slice } from "@rotorsoft/act";
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
const sqs = new SQSClient({ region: "us-east-1" });
const QUEUE_URL = process.env.ORDERS_QUEUE_URL!;
export const OrderForwardingSlice = slice()
.withState(OrderOperations)
.on("OrderConfirmed")
.do(
async function forwardToSQS(event) {
// SQS auto-dedups within a 5-minute window when MessageDeduplicationId
// is supplied. event.id is the right key โ stable, monotonic, unique.
await sqs.send(
new SendMessageCommand({
QueueUrl: QUEUE_URL,
MessageBody: JSON.stringify({
orderId: event.stream,
data: event.data,
committedAt: event.created,
}),
MessageDeduplicationId: String(event.id),
MessageGroupId: event.stream, // FIFO ordering per order
})
);
},
{
maxRetries: 5,
backoff: { strategy: "exponential", baseMs: 200, maxMs: 30_000, jitter: true },
}
)
.to({ target: "order-forwarded" })
.build();
This is ~20 lines. It's deliberately thin โ Act keeps the lease only as long as the sqs.send() round-trip, which is bounded and fast. The actual delivery to consumers happens after Act's lease is released; the bus owns that timeline.
The same shape works for Kafka (producer.send({ topic, key: event.stream, value })), Redpanda, NATS (nc.publish(subject, payload)), or any RPC publisher. The framework doesn't care which โ it only cares that the publish step is fast and idempotent.
What you give up, and what you gainโ
| Property | Inline | Forwarded |
|---|---|---|
| Receivers per reaction | One | Many (bus distributes) |
| Receiver latency tolerance | Below leaseMillis | Unbounded (bus buffers) |
| Multi-consumer | Add reactions | Add subscribers โ free |
| Downstream delivery semantics | Drain's | The bus's |
| Operational dependency | Receiver | Receiver + bus |
The trade is real: forwarded delivery adds a piece of infrastructure (the bus) that has to be operated, monitored, and budgeted. For one fast receiver, that's overkill. For five receivers that each have their own SLO, it's the right architecture.
Drain's role after forwardingโ
After the publish step succeeds, drain acks the lease and the watermark advances. The bus now owns:
- Multi-consumer fan-out. Each subscriber reads from their own offset; one slow subscriber doesn't block others.
- Durable subscription state. A subscriber that crashes resumes from its last offset on restart.
- Retry semantics for downstream receivers. Kafka consumers handle their own retry budgets; SQS visibility timeouts give consumers room to fail and re-receive.
Drain stays in charge of one thing: getting the event to the bus exactly as the event store has it. Per-stream ordering is preserved on the publish side via the resolver target (order-forwarded in the example above โ one drain stream per logical order grouping). Beyond that, the bus owns the world.
Anti-patterns to avoidโ
- Don't
await consumer.process()inside the reaction. If the reaction calls SQS+wait-for-consumer-ack, you've reintroduced the slow-consumer problem with extra infrastructure. The publish must be fire-and-forget at Act's level. - Don't skip
Idempotency-Key/MessageDeduplicationId. Drain's at-least-once semantics mean any handler can retry; the publish step is no exception. Without dedup at the bus level, a retried publish doubles the message. - Don't carry the entire event payload if the bus stores it. Some buses cap message size aggressively (SQS at 256KB, Kafka by config). Either keep events small or pass an event-id and let consumers fetch the full event back from the Act store.
3. Receiver-side idempotency contractโ
At-least-once is the floor Act gives you. To make it safe, the receiver has to dedup. This section is the contract that makes "at-least-once + idempotency" equivalent to "exactly-once" from the caller's perspective.
Why this mattersโ
Two scenarios where the same event reaches the receiver twice without any bug in either Act or the receiver:
- Slow downstream. Sender's
timeoutMsexpires before the response. Drain treats the request as failed, retries it. Receiver successfully processed both attempts. - Lost ack. Sender processes the response fine, but the network drops before drain commits the ack. Next drain cycle re-dispatches.
Both are normal โ they're not bugs to fix in the sender. The fix is on the receiver, and it has exactly one shape: dedup by a stable key.
Idempotency-Key derivationโ
webhook auto-derives Idempotency-Key: <event.id> for every request. event.id is the framework's per-event monotonic integer:
- Stable. The same event has the same id from commit forever; reading the same row back returns the same id.
- Unique. Across the entire store, no two events share an id.
- Monotonic. Higher id always means later commit. Useful for keeping a sliding-window dedup cache bounded.
For custom callers โ non-webhook reactions, queue forwarders, anything โ pass String(event.id) (or your bus's dedup key field) as the idempotency token. Don't derive from event content or hash the payload; collisions and changes both bite you.
Cache shapesโ
Three implementations of the dedup cache, ordered by deployment complexity:
In-memory bounded LRUโ
class IdempotencyCache {
private seen = new Map<string, number>(); // key โ expiresAt epoch
constructor(
private readonly ttlMs = 24 * 60 * 60 * 1000,
private readonly maxEntries = 100_000
) {}
/** Returns true if the key was unseen and is now recorded. */
recordIfFresh(key: string): boolean {
const now = Date.now();
this.gc(now);
if (this.seen.has(key)) return false;
this.seen.set(key, now + this.ttlMs);
if (this.seen.size > this.maxEntries) {
// Map iteration is insertion-ordered; oldest entry is at the head.
this.seen.delete(this.seen.keys().next().value!);
}
return true;
}
private gc(now: number) {
for (const [key, expiresAt] of this.seen) {
if (expiresAt > now) break; // Insertion order โ first non-expired ends the sweep.
this.seen.delete(key);
}
}
}
Use when: receiver is single-process, dedup window is short (under a day), keys fit in RAM.
Redis SETNX with TTLโ
async function recordIfFresh(key: string, ttlSeconds: number): Promise<boolean> {
// SET ... NX EX is atomic: returns "OK" only when the key didn't exist.
const result = await redis.set(`idem:${key}`, "1", "EX", ttlSeconds, "NX");
return result === "OK";
}
Use when: receiver is multi-process, dedup window is hours-to-days, Redis is already in the stack.
Postgres unique indexโ
CREATE TABLE idempotency_keys (
key TEXT PRIMARY KEY,
seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_idempotency_seen_at ON idempotency_keys (seen_at);
-- Background job, runs hourly or daily:
DELETE FROM idempotency_keys WHERE seen_at < NOW() - INTERVAL '7 days';
async function recordIfFresh(key: string): Promise<boolean> {
try {
await db.query(`INSERT INTO idempotency_keys(key) VALUES ($1)`, [key]);
return true;
} catch (err) {
if (isUniqueViolation(err)) return false;
throw err;
}
}
Use when: receiver already has Postgres in its stack, dedup needs are durable (survive process restarts), TTL can be relaxed in favor of audit trail.
TTL sizingโ
The dedup window has one hard floor: it must be longer than the longest possible retry+backoff window from the sender.
For a reaction configured with maxRetries: 5 and backoff: { strategy: "exponential", baseMs: 200, maxMs: 30_000 }:
| Attempt | Wait before |
|---|---|
| 0 | 0 |
| 1 | 200ms |
| 2 | 400ms |
| 3 | 800ms |
| 4 | 1.6s |
| 5 | 3.2s (then block) |
Total: ~6 seconds. Add the per-attempt timeoutMs (say 2s ร 6 = 12s). Round up generously: a 24-hour TTL handles any reasonable retry window. Most apps land at 24hโ7d depending on whether they want the cache to survive incident review.
Don't undersize. If a key expires before the sender finishes retrying, dedup fails silently โ you'll see "exactly once" turn into "actually twice" in the data, not in any error log.
End-to-end example โ tRPC receiverโ
A small idempotent receiver, mirroring the packages/server tRPC setup. The middleware checks Idempotency-Key and short-circuits duplicates:
// packages/server/src/idempotency.ts
import { initTRPC, TRPCError } from "@trpc/server";
const cache = new IdempotencyCache();
const t = initTRPC.context<{ headers: Record<string, string> }>().create();
export const idempotent = t.procedure.use(({ ctx, next }) => {
const key = ctx.headers["idempotency-key"];
if (!key) {
throw new TRPCError({
code: "BAD_REQUEST",
message: "Missing Idempotency-Key header",
});
}
const fresh = cache.recordIfFresh(key);
return next({ ctx: { ...ctx, key, deduped: !fresh } });
});
A handler using the middleware returns success on both first-attempt and dedup-hit, distinguishing them only for telemetry:
export const webhookRouter = t.router({
orderConfirmed: idempotent
.input(OrderConfirmedSchema)
.mutation(async ({ input, ctx }) => {
if (ctx.deduped) {
return { status: "dedup-skipped", key: ctx.key };
}
await processOrder(input);
return { status: "processed", key: ctx.key };
}),
});
A runnable version of this lives at packages/server/src/webhook-receiver.ts โ point the wolfdesk webhook sender at it (WOLFDESK_ESCALATION_WEBHOOK=http://localhost:4001/escalations) and watch dedup work end-to-end.
4. Operational checklistโ
The integration is in production. Here's the day-2 surface.
Monitor blocked streamsโ
Both inline and forwarded paths can end up at the same place โ a stream blocked by block(). Wire the "blocked" lifecycle event into your alerting:
app.on("blocked", (blocked) => {
blocked.forEach(({ stream, error, retry }) => {
metrics.counter("act.streams.blocked").increment({ stream });
logger.error({ stream, error, retry }, "stream blocked");
});
});
act.streams.blocked should be a zero-floor counter โ any non-zero is a paging condition.
Discover what's blockedโ
app.blocked_streams() returns every currently-blocked stream position. The 90% case for "show me what's broken right now":
const blocked = await app.blocked_streams();
console.table(
blocked.map(({ stream, retry, error, at }) => ({ stream, at, retry, error }))
);
For pagination or source filters, drop to store().query_streams(callback, { blocked: true, ... }) directly.
Recover after fixing the root causeโ
app.unblock(input) clears the blocked flag and resumes from where the stream stopped โ not from event 0. Two forms:
// Single targeted recovery.
await app.unblock(["webhooks-out-customer-42"]);
// Bulk recovery โ every blocked stream in a family.
await app.unblock({ stream: "^webhooks-out-" });
// Post-incident: unblock everything that's blocked.
await app.unblock({});
Don't use app.reset() to recover. reset rebuilds from event 0 and would re-fire every historical webhook. Use it only when you're rebuilding a projection from scratch. See Recovering a blocked stream for the comparison table.
Distinguish error classes operationallyโ
When webhook is in the picture, the "blocked" event carries an error string that includes the response status. Two distinct operational meanings:
| Error class | Status | Operator action |
|---|---|---|
WebhookError (retryable) | 5xx, network, timeout | Receiver outage โ usually self-resolving via backoff. If the same stream blocks repeatedly, escalate to receiver team. |
NonRetryableWebhookError | 4xx | Sender bug or stale receiver contract โ fix the request shape, then app.unblock(stream) |
Greppable distinguisher: NonRetryableWebhookError shows as name: "NonRetryableWebhookError" in logs; WebhookError shows as name: "WebhookError".
Idempotency cache hygieneโ
A cache that fills up indefinitely defeats its purpose. Three checks for the receiver-side cache:
- TTL exceeds the sender's retry+backoff window. Recompute when you change
maxRetriesorbackoff.maxMs. - Cache size has a ceiling. In-memory LRU caps entries; Redis is bounded by maxmemory policy; Postgres needs a periodic
DELETE WHERE seen_at < NOW() - INTERVAL '...'job. - Cache metrics surface hit rate. A 0% hit rate means the cache is doing nothing (either dedup isn't needed or the key derivation is broken); a 100% hit rate means every request is a duplicate (sender is misconfigured).
When to migrate from inline to forwardedโ
Three signals that say "this was inline; it shouldn't be anymore":
- "We keep adding receivers." Every new receiver becomes another reaction. The reaction count is growing faster than the event count. The bus already exists conceptually โ make it real.
- "Drain is always behind."
act.streams.laggingis consistently non-zero because reactions are slower than commits. Inline is the bottleneck; the bus would let downstream pace itself. - "
leaseMilliskeeps creeping up." A receiver that neededtimeoutMs: 1_000last quarter now needs5_000. The pressure to bump the global lease is the framework asking you to move off inline.
The migration itself is cheap: replace the inline reaction's body with a bus.publish(...) call. The downstream gets re-implemented as a bus consumer. Per-stream ordering survives (use the stream id as the partition key); idempotency survives (use the event id as the dedup key).
Relatedโ
- Webhook helper โ the
@rotorsoft/act-http/webhookpackage and itstimeoutMs/leaseMillisconstraint - Error handling โ backoff, non-retryable errors, blocked streams,
unblock - Cross-process reactions โ
Store.notify, competing consumers, topology shapes - Concurrency model โ lease lifecycle,
claim/ack/block/timeout transitions - Real-time โ SSE for state broadcast (the other HTTP-shaped integration)