Skip to main content

Real-Time with act-sse

@rotorsoft/act-sse broadcasts incremental state updates over Server-Sent Events. Each event-sourced commit emits a domain patch (a partial state update); the broadcast layer forwards those patches keyed by event version, so subscribers can apply them to their cached state without ever refetching.

SSE is one of two HTTP-shaped integration paths. For the outbound side โ€” webhooks, downstream services, message buses โ€” see External integration patterns. The two are independent; an app can run both.

npm install @rotorsoft/act-sse

Architectureโ€‹

app.do() โ†’ snapshots (each carries its domain patch)
โ”‚
โ–ผ
deriveState(snap) โ† app-specific (overlay presence, etc.)
state._v = snap.event.version โ† event store version = single source of truth
โ”‚
โ–ผ
broadcast.publish(streamId, state, patches)
โ”‚
โ”œโ”€โ”€ version-key each patch: { [baseV+1]: patch1, [baseV+2]: patch2, ... }
โ””โ”€โ”€ push to all SSE subscribers
โ”‚
โ–ผ
Client: applyPatchMessage(msg, cached)
โ”‚
โ”œโ”€โ”€ contiguous โ†’ deep-merge patches in version order
โ”œโ”€โ”€ stale โ†’ skip (client already ahead)
โ””โ”€โ”€ behind โ†’ resync (client missed versions)

The version contractโ€‹

_v on every state object is always snap.event.version โ€” the event store's monotonic stream version. There is no separate counter, no clock-based ordering. The event store is the single source of truth for ordering; the broadcast layer is just a fan-out.

Server-sideโ€‹

BroadcastChannelโ€‹

Manages per-stream subscriber sets and an LRU state cache for reconnects:

import { BroadcastChannel } from "@rotorsoft/act-sse";
import type { BroadcastState, PatchMessage } from "@rotorsoft/act-sse";

type AppState = BroadcastState & {
// your domain state fields
name: string;
status: string;
};

const broadcast = new BroadcastChannel<AppState>({
cacheSize: 50, // LRU entries; default 50
});

Broadcasting a commitโ€‹

After every app.do(), forward each emitted snapshot's domain patch:

const snaps = await app.do("CreateItem", target, input);
const last = snaps.at(-1)!;

// 1. Derive the broadcast view (typically snap.state plus overlays)
const state: AppState = {
...last.state,
_v: last.event!.version, // MUST come from event.version
};

// 2. Collect each emitted snapshot's patch, in commit order
const patches = snaps
.map((s) => s.patch)
.filter(Boolean) as Partial<AppState>[];

// 3. Publish โ€” sends a version-keyed PatchMessage to all subscribers
broadcast.publish(streamId, state, patches);
// Reactions drain automatically if you've wired
// app.on("committed", () => app.settle()) at bootstrap.

publish() writes the new state to the LRU cache (so reconnects can read it) and pushes a PatchMessage<AppState> to subscribers. The keys of the message are absolute event versions (baseV + 1, baseV + 2, โ€ฆ), so subscribers can apply them directly to their cached state without computing offsets.

Overlays (non-event state changes)โ€‹

Some state changes don't have a corresponding event โ€” typically presence ("alice is online") or computed-field refreshes. Use publishOverlay():

broadcast.publishOverlay(streamId, {
onlineUsers: presence.getOnline(streamId),
});

This applies the overlay to the cached state, leaves _v unchanged, and emits a single-key patch message at the cached version.

Presenceโ€‹

PresenceTracker is a ref-counted online-status tracker designed for multi-tab clients (each tab opens its own SSE; add / remove maintain a per-identity counter):

import { PresenceTracker } from "@rotorsoft/act-sse";

const presence = new PresenceTracker();

// On SSE connect
presence.add(streamId, identityId);

// On SSE disconnect
presence.remove(streamId, identityId);

// Query
presence.getOnline(streamId); // Set<string>
presence.isOnline(streamId, identityId); // boolean

tRPC subscriptionโ€‹

act-sse doesn't dictate the wire format โ€” your tRPC handler decides. A typical pattern yields the cached state on connect, then forwards each patch message. Wrap the two shapes in a small app-level envelope so the client can tell them apart:

import type { PatchMessage } from "@rotorsoft/act-sse";

type Envelope<S> =
| { kind: "snap"; state: S }
| { kind: "patch"; msg: PatchMessage<S> };

export const onStateChange = publicProcedure
.input(z.object({ streamId: z.string(), identityId: z.string().optional() }))
.subscription(async function* ({ input, signal }) {
const { streamId, identityId } = input;
let resolve: (() => void) | null = null;
let pending: PatchMessage<AppState> | null = null;

const cleanup = broadcast.subscribe(streamId, (msg) => {
pending = msg;
resolve?.();
resolve = null;
});

if (identityId) presence.add(streamId, identityId);

try {
// Initial snapshot for first paint
const cached = broadcast.getState(streamId);
if (cached) yield { kind: "snap", state: cached } satisfies Envelope<AppState>;

while (!signal?.aborted) {
if (!pending) {
await new Promise<void>((r) => {
resolve = r;
signal?.addEventListener("abort", () => r(), { once: true });
});
}
if (signal?.aborted) break;
if (pending) {
const msg = pending;
pending = null;
yield { kind: "patch", msg } satisfies Envelope<AppState>;
}
}
} finally {
cleanup();
if (identityId) presence.remove(streamId, identityId);
}
});

Client-sideโ€‹

applyPatchMessageโ€‹

import { applyPatchMessage } from "@rotorsoft/act-sse";

onData: (env) => {
if (env.kind === "snap") {
utils.getState.setData({ streamId }, env.state);
return;
}
const cached = utils.getState.getData({ streamId });
const result = applyPatchMessage(env.msg, cached);

if (result.ok) {
utils.getState.setData({ streamId }, result.state);
} else if (result.reason === "behind") {
utils.getState.invalidate({ streamId }); // missed versions โ€” refetch
}
// "stale" โ†’ no-op; the cache is already past these versions
};

applyPatchMessage(msg, cached) returns { ok: true, state } | { ok: false, reason: "stale" | "behind" }:

  • Contiguous โ€” min(msg.keys) is exactly cachedV + 1. Apply patches in version order via the deep-merge from @rotorsoft/act-patch; final _v = max(msg.keys).
  • Stale โ€” max(msg.keys) <= cachedV. The client is already ahead (e.g., a mutation response landed before the SSE patch arrived). No-op.
  • Behind โ€” min(msg.keys) > cachedV + 1. The client missed versions and must resync via a full refetch.

Key rulesโ€‹

  1. _v is snap.event.version โ€” the event store's stream version is the single source of truth. Never invent a version.
  2. One broadcast function โ€” every code path that calls app.do() should funnel through the same publish helper. Multiple publish sites with different state shapes is how double-apply bugs start.
  3. Broadcast from snapshots, not projections โ€” projections are eventually consistent and may lag. Broadcast from the snapshots returned by app.do().
  4. Presence is an overlay, not an event โ€” use publishOverlay() so connect/disconnect doesn't pollute the event log.

The double-apply bugโ€‹

If a projection falls back to the broadcast cache on a miss, it reads state that already has event patches applied. Re-applying those same patches corrupts counters and indices.

// BUG โ€” broadcast cache holds post-event snapshots
let state = projCache.get(id) ?? broadcast.getState(id); // โ† already patched!
mutator(state); // patches applied a second time

// FIX โ€” fall back to durable storage only
let state = projCache.get(id) ?? (await db.select(id)) ?? defaultState();
mutator(state);

The broadcast cache exists for reconnect seeding and for publishOverlay()'s read-modify-write. Everything else should go through the database.