Skip to main content

InMemoryStore

@rotorsoft/act-root


@rotorsoft/act-root / act/src / InMemoryStore

Class: InMemoryStore

Defined in: libs/act/src/adapters/in-memory-store.ts:287

In-memory event store implementation.

This is the default store used by Act when no other store is injected. It stores all events in memory and is suitable for:

  • Development and prototyping
  • Unit and integration testing
  • Demonstrations and examples

Not suitable for production - all data is lost when the process exits. Use PostgresStore for production deployments.

The in-memory store provides:

  • Full Store interface implementation
  • Optimistic concurrency control
  • Stream leasing for distributed processing simulation
  • Snapshot support
  • Fast performance (no I/O overhead)

Store.notify is intentionally not implemented. The notify hook is a cross-process wake-up signal โ€” local commits already arm the drain via do(). An in-memory store is single-process by definition, so there is no remote writer to be notified of. The Act orchestrator detects the absence and falls back to the existing debounce/poll path.

Examplesโ€‹

Using in tests

import { store } from "@rotorsoft/act";

describe("Counter", () => {
beforeEach(async () => {
// Reset store between tests
await store().seed();
});

it("increments", async () => {
await app.do("increment", target, { by: 5 });
const snapshot = await app.load(Counter, "counter-1");
expect(snapshot.state.count).toBe(5);
});
});

Explicit instantiation

import { InMemoryStore } from "@rotorsoft/act";

const testStore = new InMemoryStore();
await testStore.seed();

// Use for specific test scenarios
await testStore.commit("test-stream", events, meta);

Querying events

const events: any[] = [];
await store().query(
(event) => events.push(event),
{ stream: "test-stream" }
);
console.log(`Found ${events.length} events`);

Seeโ€‹

  • Store for the interface definition
  • PostgresStore for production use
  • store for injecting stores

Implementsโ€‹

Constructorsโ€‹

Constructorโ€‹

new InMemoryStore(): InMemoryStore

Returnsโ€‹

InMemoryStore

Methodsโ€‹

ack()โ€‹

ack(leases): Promise<object[]>

Defined in: libs/act/src/adapters/in-memory-store.ts:601

Acknowledge completion of processing for leased streams.

Parametersโ€‹

leasesโ€‹

Lease[]

Leases to acknowledge, including last processed watermark and lease holder.

Returnsโ€‹

Promise<object[]>

Implementation ofโ€‹

Store.ack


block()โ€‹

block(leases): Promise<object[]>

Defined in: libs/act/src/adapters/in-memory-store.ts:613

Block a stream for processing after failing to process and reaching max retries with blocking enabled.

Parametersโ€‹

leasesโ€‹

BlockedLease[]

Leases to block, including lease holder and last error message.

Returnsโ€‹

Promise<object[]>

Blocked leases.

Implementation ofโ€‹

Store.block


claim()โ€‹

claim(lagging, leading, by, millis, lane?): Promise<Lease[]>

Defined in: libs/act/src/adapters/in-memory-store.ts:484

Atomically discovers and leases streams for processing. Fuses poll + lease into a single operation.

Parametersโ€‹

laggingโ€‹

number

Max streams from lagging frontier.

leadingโ€‹

number

Max streams from leading frontier.

byโ€‹

string

Lease holder identifier.

millisโ€‹

number

Lease duration in milliseconds.

lane?โ€‹

string

Returnsโ€‹

Promise<Lease[]>

Granted leases.

Implementation ofโ€‹

Store.claim


commit()โ€‹

commit<E>(stream, msgs, meta, expectedVersion?): Promise<Committed<E, keyof E>[]>

Defined in: libs/act/src/adapters/in-memory-store.ts:417

Commit one or more events to a stream.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Parametersโ€‹

streamโ€‹

string

The stream name.

msgsโ€‹

Message<E, keyof E>[]

The events/messages to commit.

metaโ€‹

EventMeta

Event metadata.

expectedVersion?โ€‹

number

Optional optimistic concurrency check.

Returnsโ€‹

Promise<Committed<E, keyof E>[]>

The committed events with metadata.

Throwsโ€‹

ConcurrencyError if expectedVersion does not match.

Implementation ofโ€‹

Store.commit


dispose()โ€‹

dispose(): Promise<void>

Defined in: libs/act/src/adapters/in-memory-store.ts:327

Dispose of the store and clear all events.

Returnsโ€‹

Promise<void>

Promise that resolves when disposal is complete.

Implementation ofโ€‹

Store.dispose


drop()โ€‹

drop(): Promise<void>

Defined in: libs/act/src/adapters/in-memory-store.ts:344

Drop all data from the store.

Returnsโ€‹

Promise<void>

Promise that resolves when the store is cleared.

Implementation ofโ€‹

Store.drop


forget_pii()โ€‹

forget_pii(stream): Promise<number>

Defined in: libs/act/src/adapters/in-memory-store.ts:712

Wipe the sensitive-data payload for every event on the stream โ€” see Store.forget_pii. O(1) drop of the stream's inner Map; the size of that Map is the count of events that had PII. Idempotent: a second call finds no inner Map and returns 0.

Parametersโ€‹

streamโ€‹

string

Target stream.

Returnsโ€‹

Promise<number>

Count of events whose isolated PII payload was deleted.

Implementation ofโ€‹

Store.forget_pii


prioritize()โ€‹

prioritize(filter, priority): Promise<number>

Defined in: libs/act/src/adapters/in-memory-store.ts:748

Bulk-update priority of streams matching filter. Mirrors query_streams's filter semantics โ€” see Store.prioritize. Unlike subscribe (which keeps max() of registered priorities), this sets the priority outright โ€” operator override for the build-time scheduling policy.

Parametersโ€‹

filterโ€‹

StreamFilter

priorityโ€‹

number

Returnsโ€‹

Promise<number>

Count of streams whose priority changed.

Implementation ofโ€‹

Store.prioritize


query()โ€‹

query<E>(callback, query?): Promise<number>

Defined in: libs/act/src/adapters/in-memory-store.ts:369

Query events in the store, optionally filtered by query options.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Parametersโ€‹

callbackโ€‹

(event) => void

Function to call for each event.

query?โ€‹

Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; stream_exact?: boolean; with_snaps?: boolean; }>

Optional query options.

Returnsโ€‹

Promise<number>

The number of events processed.

Implementation ofโ€‹

Store.query


query_stats()โ€‹

query_stats<E>(input, options?): Promise<Map<string, StreamStats<E>>>

Defined in: libs/act/src/adapters/in-memory-store.ts:844

Per-stream aggregated stats โ€” see Store.query_stats.

Single forward scan over the in-memory event list, accumulating per stream. The "cheap heads" cost tier from durable adapters doesn't apply here (InMemory has no indexes); correctness is the goal, perf is a non-issue.

Scope rules:

  • Array input โ€” explicit stream names, regardless of subscription.
  • Filter input โ€” stream/stream_exact match against event-bearing stream names; source/source_exact/blocked require a corresponding subscription in _streams (those are subscription concepts, not event concepts). Empty filter {} matches every event-bearing stream.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Parametersโ€‹

inputโ€‹

string[] | Pick<StreamFilter, "stream" | "stream_exact">

options?โ€‹

QueryStatsOptions<E>

Returnsโ€‹

Promise<Map<string, StreamStats<E>>>

Implementation ofโ€‹

Store.query_stats


query_streams()โ€‹

query_streams(callback, query?): Promise<QueryStreamsResult>

Defined in: libs/act/src/adapters/in-memory-store.ts:767

Streams registered subscription positions to the callback, ordered by stream name. Returns the highest event id in the store and the count of positions emitted.

Parametersโ€‹

callbackโ€‹

(position) => void

query?โ€‹

QueryStreams

Returnsโ€‹

Promise<QueryStreamsResult>

Implementation ofโ€‹

Store.query_streams


reset()โ€‹

reset(input): Promise<number>

Defined in: libs/act/src/adapters/in-memory-store.ts:669

Reset watermarks to -1, clearing retry, blocked, error, and lease state so the matched streams can be replayed from the beginning. Accepts either an explicit list of names or a StreamFilter.

Parametersโ€‹

inputโ€‹

string[] | StreamFilter

Stream names or a filter selecting the streams to reset.

Returnsโ€‹

Promise<number>

Count of streams that were actually reset.

Implementation ofโ€‹

Store.reset


restore()โ€‹

restore(driver): Promise<void>

Defined in: libs/act/src/adapters/in-memory-store.ts:996

Atomically wipe-and-rebuild the store under an in-process snapshot.

Captures every index state up front, clears it, then hands the orchestrator a per-event insert callback via the driver. Any throw inside the driver restores the snapshot, leaving the store byte-for-byte unchanged from the operator's perspective.

ids are reassigned 0..N-1 as events arrive (matching the adapter's commit-id convention โ€” InMemory uses _events.length). created is preserved verbatim from the source.

Parametersโ€‹

driverโ€‹

(callback) => Promise<void>

Returnsโ€‹

Promise<void>

Implementation ofโ€‹

Store.restore


seed()โ€‹

seed(): Promise<void>

Defined in: libs/act/src/adapters/in-memory-store.ts:336

Seed the store with initial data (no-op for in-memory).

Returnsโ€‹

Promise<void>

Promise that resolves when seeding is complete.

Implementation ofโ€‹

Store.seed


subscribe()โ€‹

subscribe(streams): Promise<{ subscribed: number; watermark: number; }>

Defined in: libs/act/src/adapters/in-memory-store.ts:562

Registers streams for event processing. When the same stream is resubscribed with a different priority, the maximum wins โ€” so the highest-priority registered reaction sets the scheduling lane. Use prioritize for operator runtime overrides.

Parametersโ€‹

streamsโ€‹

object[]

Streams to register with optional source + priority.

Returnsโ€‹

Promise<{ subscribed: number; watermark: number; }>

subscribed count and current max watermark.

Implementation ofโ€‹

Store.subscribe


truncate()โ€‹

truncate(targets): Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>

Defined in: libs/act/src/adapters/in-memory-store.ts:929

Atomically truncates streams and seeds each with a snapshot or tombstone.

Parametersโ€‹

targetsโ€‹

object[]

Streams to truncate with optional snapshot state and meta.

Returnsโ€‹

Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>

Map keyed by stream name, each entry with deleted count and committed event.

Implementation ofโ€‹

Store.truncate


unblock()โ€‹

unblock(input): Promise<number>

Defined in: libs/act/src/adapters/in-memory-store.ts:719

Clears the blocked flag on streams without replaying their history. Sets blocked = false, retry_count = 0, error = null, and clears any lease bookkeeping. The at watermark stays where it was โ€” the stream resumes from the next event after the last successful ack, not from zero.

The distinction from reset matters: reset() is for projection rebuilds (replay from event 0); unblock() is for recovering from a poison message after the operator fixes the underlying issue. Use unblock() when you don't want to re-process history.

Prefer Act.unblock() over calling this directly. Like reset(), this primitive doesn't raise the orchestrator's internal "needs drain" flag โ€” a settled Act instance will short-circuit and skip the resume. Act.unblock() wraps this and arms the flag.

Only streams that were actually blocked at call time count toward the return value; already-unblocked streams and unknown stream names are silently skipped. The atomic single-statement update makes the call safe to issue concurrently with claim() โ€” workers holding a FOR UPDATE SKIP LOCKED lock won't see partial state.

Accepts either an explicit list of stream names or a StreamFilter for bulk recovery (e.g., "unblock every blocked order projection"). The blocked = true predicate is always applied โ€” passing blocked: false in the filter matches nothing. An empty filter ({}) means "unblock everything that's blocked," which is a sane post-incident bulk recovery.

Parametersโ€‹

inputโ€‹

string[] | StreamFilter

Stream names or a StreamFilter

Returnsโ€‹

Promise<number>

Count of streams that were actually flipped (were blocked)

Exampleโ€‹

// By name (single targeted recovery)
await app.unblock(["webhooks-out-customer-42"]);

// By filter โ€” unblock every blocked stream in a family
await app.unblock({ stream: "^webhooks-out-" });

// Post-incident: unblock everything that's blocked
await app.unblock({});

// Low-level (does NOT trigger resume on settled apps)
await store().unblock(["webhooks-out-customer-42"]);

Seeโ€‹

  • Act.unblock for the high-level recovery API
  • reset for the rebuild-from-zero alternative

Implementation ofโ€‹

Store.unblock