Real-Time with act-sse
@rotorsoft/act-sse broadcasts incremental state updates over Server-Sent Events. Each event-sourced commit emits a domain patch (a partial state update); the broadcast layer forwards those patches keyed by event version, so subscribers can apply them to their cached state without ever refetching.
npm install @rotorsoft/act-sse
Architecture
app.do() → snapshots (each carries its domain patch)
│
▼
deriveState(snap) ← app-specific (overlay presence, etc.)
state._v = snap.event.version ← event store version = single source of truth
│
▼
broadcast.publish(streamId, state, patches)
│
├── version-key each patch: { [baseV+1]: patch1, [baseV+2]: patch2, ... }
└── push to all SSE subscribers
│
▼
Client: applyPatchMessage(msg, cached)
│
├── contiguous → deep-merge patches in version order
├── stale → skip (client already ahead)
└── behind → resync (client missed versions)
The version contract
_v on every state object is always snap.event.version — the event store's monotonic stream version. There is no separate counter, no clock-based ordering. The event store is the single source of truth for ordering; the broadcast layer is just a fan-out.
Server-side
BroadcastChannel
Manages per-stream subscriber sets and an LRU state cache for reconnects:
import { BroadcastChannel } from "@rotorsoft/act-sse";
import type { BroadcastState, PatchMessage } from "@rotorsoft/act-sse";
type AppState = BroadcastState & {
// your domain state fields
name: string;
status: string;
};
const broadcast = new BroadcastChannel<AppState>({
cacheSize: 50, // LRU entries; default 50
});
Broadcasting a commit
After every app.do(), forward each emitted snapshot's domain patch:
const snaps = await app.do("CreateItem", target, input);
const last = snaps.at(-1)!;
// 1. Derive the broadcast view (typically snap.state plus overlays)
const state: AppState = {
...last.state,
_v: last.event!.version, // MUST come from event.version
};
// 2. Collect each emitted snapshot's patch, in commit order
const patches = snaps
.map((s) => s.patch)
.filter(Boolean) as Partial<AppState>[];
// 3. Publish — sends a version-keyed PatchMessage to all subscribers
broadcast.publish(streamId, state, patches);
// Reactions drain automatically if you've wired
// app.on("committed", () => app.settle()) at bootstrap.
publish() writes the new state to the LRU cache (so reconnects can read it) and pushes a PatchMessage<AppState> to subscribers. The keys of the message are absolute event versions (baseV + 1, baseV + 2, …), so subscribers can apply them directly to their cached state without computing offsets.
Overlays (non-event state changes)
Some state changes don't have a corresponding event — typically presence ("alice is online") or computed-field refreshes. Use publishOverlay():
broadcast.publishOverlay(streamId, {
onlineUsers: presence.getOnline(streamId),
});
This applies the overlay to the cached state, leaves _v unchanged, and emits a single-key patch message at the cached version.
Presence
PresenceTracker is a ref-counted online-status tracker designed for multi-tab clients (each tab opens its own SSE; add / remove maintain a per-identity counter):
import { PresenceTracker } from "@rotorsoft/act-sse";
const presence = new PresenceTracker();
// On SSE connect
presence.add(streamId, identityId);
// On SSE disconnect
presence.remove(streamId, identityId);
// Query
presence.getOnline(streamId); // Set<string>
presence.isOnline(streamId, identityId); // boolean
tRPC subscription
act-sse doesn't dictate the wire format — your tRPC handler decides. A typical pattern yields the cached state on connect, then forwards each patch message. Wrap the two shapes in a small app-level envelope so the client can tell them apart:
import type { PatchMessage } from "@rotorsoft/act-sse";
type Envelope<S> =
| { kind: "snap"; state: S }
| { kind: "patch"; msg: PatchMessage<S> };
export const onStateChange = publicProcedure
.input(z.object({ streamId: z.string(), identityId: z.string().optional() }))
.subscription(async function* ({ input, signal }) {
const { streamId, identityId } = input;
let resolve: (() => void) | null = null;
let pending: PatchMessage<AppState> | null = null;
const cleanup = broadcast.subscribe(streamId, (msg) => {
pending = msg;
resolve?.();
resolve = null;
});
if (identityId) presence.add(streamId, identityId);
try {
// Initial snapshot for first paint
const cached = broadcast.getState(streamId);
if (cached) yield { kind: "snap", state: cached } satisfies Envelope<AppState>;
while (!signal?.aborted) {
if (!pending) {
await new Promise<void>((r) => {
resolve = r;
signal?.addEventListener("abort", () => r(), { once: true });
});
}
if (signal?.aborted) break;
if (pending) {
const msg = pending;
pending = null;
yield { kind: "patch", msg } satisfies Envelope<AppState>;
}
}
} finally {
cleanup();
if (identityId) presence.remove(streamId, identityId);
}
});
Client-side
applyPatchMessage
import { applyPatchMessage } from "@rotorsoft/act-sse";
onData: (env) => {
if (env.kind === "snap") {
utils.getState.setData({ streamId }, env.state);
return;
}
const cached = utils.getState.getData({ streamId });
const result = applyPatchMessage(env.msg, cached);
if (result.ok) {
utils.getState.setData({ streamId }, result.state);
} else if (result.reason === "behind") {
utils.getState.invalidate({ streamId }); // missed versions — refetch
}
// "stale" → no-op; the cache is already past these versions
};
applyPatchMessage(msg, cached) returns { ok: true, state } | { ok: false, reason: "stale" | "behind" }:
- Contiguous —
min(msg.keys)is exactlycachedV + 1. Apply patches in version order via the deep-merge from@rotorsoft/act-patch; final_v=max(msg.keys). - Stale —
max(msg.keys) <= cachedV. The client is already ahead (e.g., a mutation response landed before the SSE patch arrived). No-op. - Behind —
min(msg.keys) > cachedV + 1. The client missed versions and must resync via a full refetch.
Key rules
_vissnap.event.version— the event store's stream version is the single source of truth. Never invent a version.- One broadcast function — every code path that calls
app.do()should funnel through the same publish helper. Multiple publish sites with different state shapes is how double-apply bugs start. - Broadcast from snapshots, not projections — projections are eventually consistent and may lag. Broadcast from the snapshots returned by
app.do(). - Presence is an overlay, not an event — use
publishOverlay()so connect/disconnect doesn't pollute the event log.
The double-apply bug
If a projection falls back to the broadcast cache on a miss, it reads state that already has event patches applied. Re-applying those same patches corrupts counters and indices.
// BUG — broadcast cache holds post-event snapshots
let state = projCache.get(id) ?? broadcast.getState(id); // ← already patched!
mutator(state); // patches applied a second time
// FIX — fall back to durable storage only
let state = projCache.get(id) ?? (await db.select(id)) ?? defaultState();
mutator(state);
The broadcast cache exists for reconnect seeding and for publishOverlay()'s read-modify-write. Everything else should go through the database.