Skip to main content

BroadcastChannel

@rotorsoft/act-root


@rotorsoft/act-root / act-sse/src / BroadcastChannel

Class: BroadcastChannel<S>

Defined in: libs/act-sse/src/broadcast.ts:39

Server-side broadcast channel for incremental state sync over SSE.

Manages per-stream subscriber sets and an LRU state cache. When state changes, forwards domain patches (from event handlers) to all subscribers as version-keyed messages.

Usage

const broadcast = new BroadcastChannel<MyState>();

// After every app.do():
const snaps = await app.do(...);
const patches = snaps.map(s => s.patch).filter(Boolean);
const state = deriveState(snaps.at(-1));
broadcast.publish(streamId, state, patches);

// In SSE subscription:
const cleanup = broadcast.subscribe(streamId, (msg) => {
pending = msg;
resolve?.();
});

// Initial state for reconnects:
const cached = broadcast.getState(streamId);

Version Contract

The _v field on state MUST be set from snap.event.version (the event store's monotonic stream version) BEFORE calling publish(). This is the single source of truth for ordering — no separate version counters.

Type Parameters

S

S extends BroadcastState = BroadcastState

Constructors

Constructor

new BroadcastChannel<S>(options?): BroadcastChannel<S>

Defined in: libs/act-sse/src/broadcast.ts:43

Parameters

options?
cacheSize?

number

Returns

BroadcastChannel<S>

Accessors

cache

Get Signature

get cache(): StateCache<S>

Defined in: libs/act-sse/src/broadcast.ts:124

Direct access to the state cache (for app-specific reads like presence).

Returns

StateCache<S>

Methods

getState()

getState(streamId): S | undefined

Defined in: libs/act-sse/src/broadcast.ts:119

Get the cached state for a stream (for reconnects / initial SSE yield).

Parameters

streamId

string

Returns

S | undefined


getSubscriberCount()

getSubscriberCount(streamId): number

Defined in: libs/act-sse/src/broadcast.ts:114

Get the number of subscribers for a stream.

Parameters

streamId

string

Returns

number


publish()

publish(streamId, state, patches?): PatchMessage<S>

Defined in: libs/act-sse/src/broadcast.ts:55

Publish domain patches from a commit. patches[i] corresponds to version baseV + i + 1.

Parameters

streamId

string

The event store stream ID

state

S

Full state with _v set from snap.event.version

patches?

Partial<S>[] = []

Array of domain patches, one per emitted event

Returns

PatchMessage<S>


publishOverlay()

publishOverlay(streamId, overlayPatch): PatchMessage<S> | undefined

Defined in: libs/act-sse/src/broadcast.ts:80

Publish a state update that doesn't change the event version (e.g. presence overlay, computed field refresh). Uses the same version as the cached state, single entry.

Parameters

streamId

string

overlayPatch

Partial<S>

Returns

PatchMessage<S> | undefined


subscribe()

subscribe(streamId, cb): () => void

Defined in: libs/act-sse/src/broadcast.ts:102

Subscribe to broadcast messages for a stream. Returns a cleanup function that removes the subscription.

Parameters

streamId

string

cb

Subscriber<S>

Returns

() => void