Skip to main content

Real-Time with act-sse

@rotorsoft/act-sse broadcasts incremental state updates over Server-Sent Events. Each event-sourced commit emits a domain patch (a partial state update); the broadcast layer forwards those patches keyed by event version, so subscribers can apply them to their cached state without ever refetching.

npm install @rotorsoft/act-sse

Architecture

app.do() → snapshots (each carries its domain patch)


deriveState(snap) ← app-specific (overlay presence, etc.)
state._v = snap.event.version ← event store version = single source of truth


broadcast.publish(streamId, state, patches)

├── version-key each patch: { [baseV+1]: patch1, [baseV+2]: patch2, ... }
└── push to all SSE subscribers


Client: applyPatchMessage(msg, cached)

├── contiguous → deep-merge patches in version order
├── stale → skip (client already ahead)
└── behind → resync (client missed versions)

The version contract

_v on every state object is always snap.event.version — the event store's monotonic stream version. There is no separate counter, no clock-based ordering. The event store is the single source of truth for ordering; the broadcast layer is just a fan-out.

Server-side

BroadcastChannel

Manages per-stream subscriber sets and an LRU state cache for reconnects:

import { BroadcastChannel } from "@rotorsoft/act-sse";
import type { BroadcastState, PatchMessage } from "@rotorsoft/act-sse";

type AppState = BroadcastState & {
// your domain state fields
name: string;
status: string;
};

const broadcast = new BroadcastChannel<AppState>({
cacheSize: 50, // LRU entries; default 50
});

Broadcasting a commit

After every app.do(), forward each emitted snapshot's domain patch:

const snaps = await app.do("CreateItem", target, input);
const last = snaps.at(-1)!;

// 1. Derive the broadcast view (typically snap.state plus overlays)
const state: AppState = {
...last.state,
_v: last.event!.version, // MUST come from event.version
};

// 2. Collect each emitted snapshot's patch, in commit order
const patches = snaps
.map((s) => s.patch)
.filter(Boolean) as Partial<AppState>[];

// 3. Publish — sends a version-keyed PatchMessage to all subscribers
broadcast.publish(streamId, state, patches);
// Reactions drain automatically if you've wired
// app.on("committed", () => app.settle()) at bootstrap.

publish() writes the new state to the LRU cache (so reconnects can read it) and pushes a PatchMessage<AppState> to subscribers. The keys of the message are absolute event versions (baseV + 1, baseV + 2, …), so subscribers can apply them directly to their cached state without computing offsets.

Overlays (non-event state changes)

Some state changes don't have a corresponding event — typically presence ("alice is online") or computed-field refreshes. Use publishOverlay():

broadcast.publishOverlay(streamId, {
onlineUsers: presence.getOnline(streamId),
});

This applies the overlay to the cached state, leaves _v unchanged, and emits a single-key patch message at the cached version.

Presence

PresenceTracker is a ref-counted online-status tracker designed for multi-tab clients (each tab opens its own SSE; add / remove maintain a per-identity counter):

import { PresenceTracker } from "@rotorsoft/act-sse";

const presence = new PresenceTracker();

// On SSE connect
presence.add(streamId, identityId);

// On SSE disconnect
presence.remove(streamId, identityId);

// Query
presence.getOnline(streamId); // Set<string>
presence.isOnline(streamId, identityId); // boolean

tRPC subscription

act-sse doesn't dictate the wire format — your tRPC handler decides. A typical pattern yields the cached state on connect, then forwards each patch message. Wrap the two shapes in a small app-level envelope so the client can tell them apart:

import type { PatchMessage } from "@rotorsoft/act-sse";

type Envelope<S> =
| { kind: "snap"; state: S }
| { kind: "patch"; msg: PatchMessage<S> };

export const onStateChange = publicProcedure
.input(z.object({ streamId: z.string(), identityId: z.string().optional() }))
.subscription(async function* ({ input, signal }) {
const { streamId, identityId } = input;
let resolve: (() => void) | null = null;
let pending: PatchMessage<AppState> | null = null;

const cleanup = broadcast.subscribe(streamId, (msg) => {
pending = msg;
resolve?.();
resolve = null;
});

if (identityId) presence.add(streamId, identityId);

try {
// Initial snapshot for first paint
const cached = broadcast.getState(streamId);
if (cached) yield { kind: "snap", state: cached } satisfies Envelope<AppState>;

while (!signal?.aborted) {
if (!pending) {
await new Promise<void>((r) => {
resolve = r;
signal?.addEventListener("abort", () => r(), { once: true });
});
}
if (signal?.aborted) break;
if (pending) {
const msg = pending;
pending = null;
yield { kind: "patch", msg } satisfies Envelope<AppState>;
}
}
} finally {
cleanup();
if (identityId) presence.remove(streamId, identityId);
}
});

Client-side

applyPatchMessage

import { applyPatchMessage } from "@rotorsoft/act-sse";

onData: (env) => {
if (env.kind === "snap") {
utils.getState.setData({ streamId }, env.state);
return;
}
const cached = utils.getState.getData({ streamId });
const result = applyPatchMessage(env.msg, cached);

if (result.ok) {
utils.getState.setData({ streamId }, result.state);
} else if (result.reason === "behind") {
utils.getState.invalidate({ streamId }); // missed versions — refetch
}
// "stale" → no-op; the cache is already past these versions
};

applyPatchMessage(msg, cached) returns { ok: true, state } | { ok: false, reason: "stale" | "behind" }:

  • Contiguousmin(msg.keys) is exactly cachedV + 1. Apply patches in version order via the deep-merge from @rotorsoft/act-patch; final _v = max(msg.keys).
  • Stalemax(msg.keys) <= cachedV. The client is already ahead (e.g., a mutation response landed before the SSE patch arrived). No-op.
  • Behindmin(msg.keys) > cachedV + 1. The client missed versions and must resync via a full refetch.

Key rules

  1. _v is snap.event.version — the event store's stream version is the single source of truth. Never invent a version.
  2. One broadcast function — every code path that calls app.do() should funnel through the same publish helper. Multiple publish sites with different state shapes is how double-apply bugs start.
  3. Broadcast from snapshots, not projections — projections are eventually consistent and may lag. Broadcast from the snapshots returned by app.do().
  4. Presence is an overlay, not an event — use publishOverlay() so connect/disconnect doesn't pollute the event log.

The double-apply bug

If a projection falls back to the broadcast cache on a miss, it reads state that already has event patches applied. Re-applying those same patches corrupts counters and indices.

// BUG — broadcast cache holds post-event snapshots
let state = projCache.get(id) ?? broadcast.getState(id); // ← already patched!
mutator(state); // patches applied a second time

// FIX — fall back to durable storage only
let state = projCache.get(id) ?? (await db.select(id)) ?? defaultState();
mutator(state);

The broadcast cache exists for reconnect seeding and for publishOverlay()'s read-modify-write. Everything else should go through the database.