Skip to main content

BroadcastChannel

@rotorsoft/act-root


@rotorsoft/act-root / act-sse/src / BroadcastChannel

Class: BroadcastChannel<S>

Defined in: libs/act-sse/src/broadcast.ts:39

Server-side broadcast channel for incremental state sync over SSE.

Manages per-stream subscriber sets and an LRU state cache. When state changes, forwards domain patches (from event handlers) to all subscribers as version-keyed messages.

Usageโ€‹

const broadcast = new BroadcastChannel<MyState>();

// After every app.do():
const snaps = await app.do(...);
const patches = snaps.map(s => s.patch).filter(Boolean);
const state = deriveState(snaps.at(-1));
broadcast.publish(streamId, state, patches);

// In SSE subscription:
const cleanup = broadcast.subscribe(streamId, (msg) => {
pending = msg;
resolve?.();
});

// Initial state for reconnects:
const cached = broadcast.getState(streamId);

Version Contractโ€‹

The _v field on state MUST be set from snap.event.version (the event store's monotonic stream version) BEFORE calling publish(). This is the single source of truth for ordering โ€” no separate version counters.

Type Parametersโ€‹

Sโ€‹

S extends BroadcastState = BroadcastState

Constructorsโ€‹

Constructorโ€‹

new BroadcastChannel<S>(options?): BroadcastChannel<S>

Defined in: libs/act-sse/src/broadcast.ts:43

Parametersโ€‹

options?โ€‹
cacheSize?โ€‹

number

Returnsโ€‹

BroadcastChannel<S>

Accessorsโ€‹

cacheโ€‹

Get Signatureโ€‹

get cache(): StateCache<S>

Defined in: libs/act-sse/src/broadcast.ts:124

Direct access to the state cache (for app-specific reads like presence).

Returnsโ€‹

StateCache<S>

Methodsโ€‹

getState()โ€‹

getState(streamId): S | undefined

Defined in: libs/act-sse/src/broadcast.ts:119

Get the cached state for a stream (for reconnects / initial SSE yield).

Parametersโ€‹

streamIdโ€‹

string

Returnsโ€‹

S | undefined


getSubscriberCount()โ€‹

getSubscriberCount(streamId): number

Defined in: libs/act-sse/src/broadcast.ts:114

Get the number of subscribers for a stream.

Parametersโ€‹

streamIdโ€‹

string

Returnsโ€‹

number


publish()โ€‹

publish(streamId, state, patches?): PatchMessage<S>

Defined in: libs/act-sse/src/broadcast.ts:55

Publish domain patches from a commit. patches[i] corresponds to version baseV + i + 1.

Parametersโ€‹

streamIdโ€‹

string

The event store stream ID

stateโ€‹

S

Full state with _v set from snap.event.version

patches?โ€‹

Partial<S>[] = []

Array of domain patches, one per emitted event

Returnsโ€‹

PatchMessage<S>


publishOverlay()โ€‹

publishOverlay(streamId, overlayPatch): PatchMessage<S> | undefined

Defined in: libs/act-sse/src/broadcast.ts:80

Publish a state update that doesn't change the event version (e.g. presence overlay, computed field refresh). Uses the same version as the cached state, single entry.

Parametersโ€‹

streamIdโ€‹

string

overlayPatchโ€‹

Partial<S>

Returnsโ€‹

PatchMessage<S> | undefined


subscribe()โ€‹

subscribe(streamId, cb): () => void

Defined in: libs/act-sse/src/broadcast.ts:102

Subscribe to broadcast messages for a stream. Returns a cleanup function that removes the subscription.

Parametersโ€‹

streamIdโ€‹

string

cbโ€‹

Subscriber<S>

Returnsโ€‹

() => void