Skip to main content

InMemoryStore

@rotorsoft/act-root


@rotorsoft/act-root / act/src / InMemoryStore

Class: InMemoryStore

Defined in: libs/act/src/adapters/in-memory-store.ts:283

In-memory event store implementation.

This is the default store used by Act when no other store is injected. It stores all events in memory and is suitable for:

  • Development and prototyping
  • Unit and integration testing
  • Demonstrations and examples

Not suitable for production - all data is lost when the process exits. Use PostgresStore for production deployments.

The in-memory store provides:

  • Full Store interface implementation
  • Optimistic concurrency control
  • Stream leasing for distributed processing simulation
  • Snapshot support
  • Fast performance (no I/O overhead)

Store.notify is intentionally not implemented. The notify hook is a cross-process wake-up signal โ€” local commits already arm the drain via do(). An in-memory store is single-process by definition, so there is no remote writer to be notified of. The Act orchestrator detects the absence and falls back to the existing debounce/poll path.

Examplesโ€‹

import { store } from "@rotorsoft/act";

describe("Counter", () => {
beforeEach(async () => {
// Reset store between tests
await store().seed();
});

it("increments", async () => {
await app.do("increment", target, { by: 5 });
const snapshot = await app.load(Counter, "counter-1");
expect(snapshot.state.count).toBe(5);
});
});
import { InMemoryStore } from "@rotorsoft/act";

const testStore = new InMemoryStore();
await testStore.seed();

// Use for specific test scenarios
await testStore.commit("test-stream", events, meta);
const events: any[] = [];
await store().query(
(event) => events.push(event),
{ stream: "test-stream" }
);
console.log(`Found ${events.length} events`);

Seeโ€‹

  • Store for the interface definition
  • PostgresStore for production use
  • store for injecting stores

Implementsโ€‹

Constructorsโ€‹

Constructorโ€‹

new InMemoryStore(): InMemoryStore

Returnsโ€‹

InMemoryStore

Methodsโ€‹

ack()โ€‹

ack(leases): Promise<object[]>

Defined in: libs/act/src/adapters/in-memory-store.ts:566

Acknowledge completion of processing for leased streams.

Parametersโ€‹

leasesโ€‹

Lease[]

Leases to acknowledge, including last processed watermark and lease holder.

Returnsโ€‹

Promise<object[]>

Implementation ofโ€‹

Store.ack


block()โ€‹

block(leases): Promise<object[]>

Defined in: libs/act/src/adapters/in-memory-store.ts:578

Block a stream for processing after failing to process and reaching max retries with blocking enabled.

Parametersโ€‹

leasesโ€‹

BlockedLease[]

Leases to block, including lease holder and last error message.

Returnsโ€‹

Promise<object[]>

Blocked leases.

Implementation ofโ€‹

Store.block


claim()โ€‹

claim(lagging, leading, by, millis, lane?): Promise<Lease[]>

Defined in: libs/act/src/adapters/in-memory-store.ts:449

Atomically discovers and leases streams for processing. Fuses poll + lease into a single operation.

Parametersโ€‹

laggingโ€‹

number

Max streams from lagging frontier.

leadingโ€‹

number

Max streams from leading frontier.

byโ€‹

string

Lease holder identifier.

millisโ€‹

number

Lease duration in milliseconds.

lane?โ€‹

string

Returnsโ€‹

Promise<Lease[]>

Granted leases.

Implementation ofโ€‹

Store.claim


commit()โ€‹

commit<E>(stream, msgs, meta, expectedVersion?): Promise<Committed<E, keyof E>[]>

Defined in: libs/act/src/adapters/in-memory-store.ts:393

Commit one or more events to a stream.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Parametersโ€‹

streamโ€‹

string

The stream name.

msgsโ€‹

Message<E, keyof E>[]

The events/messages to commit.

metaโ€‹

EventMeta

Event metadata.

expectedVersion?โ€‹

number

Optional optimistic concurrency check.

Returnsโ€‹

Promise<Committed<E, keyof E>[]>

The committed events with metadata.

Throwsโ€‹

ConcurrencyError if expectedVersion does not match.

Implementation ofโ€‹

Store.commit


dispose()โ€‹

dispose(): Promise<void>

Defined in: libs/act/src/adapters/in-memory-store.ts:307

Dispose of the store and clear all events.

Returnsโ€‹

Promise<void>

Promise that resolves when disposal is complete.

Implementation ofโ€‹

Store.dispose


drop()โ€‹

drop(): Promise<void>

Defined in: libs/act/src/adapters/in-memory-store.ts:324

Drop all data from the store.

Returnsโ€‹

Promise<void>

Promise that resolves when the store is cleared.

Implementation ofโ€‹

Store.drop


prioritize()โ€‹

prioritize(filter, priority): Promise<number>

Defined in: libs/act/src/adapters/in-memory-store.ts:697

Bulk-update priority of streams matching filter. Mirrors query_streams's filter semantics โ€” see Store.prioritize. Unlike subscribe (which keeps max() of registered priorities), this sets the priority outright โ€” operator override for the build-time scheduling policy.

Parametersโ€‹

filterโ€‹

StreamFilter

priorityโ€‹

number

Returnsโ€‹

Promise<number>

Count of streams whose priority changed.

Implementation ofโ€‹

Store.prioritize


query()โ€‹

query<E>(callback, query?): Promise<number>

Defined in: libs/act/src/adapters/in-memory-store.ts:349

Query events in the store, optionally filtered by query options.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Parametersโ€‹

callbackโ€‹

(event) => void

Function to call for each event.

query?โ€‹

Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; stream_exact?: boolean; with_snaps?: boolean; }>

Optional query options.

Returnsโ€‹

Promise<number>

The number of events processed.

Implementation ofโ€‹

Store.query


query_stats()โ€‹

query_stats<E>(input, options?): Promise<Map<string, StreamStats<E>>>

Defined in: libs/act/src/adapters/in-memory-store.ts:793

Per-stream aggregated stats โ€” see Store.query_stats.

Single forward scan over the in-memory event list, accumulating per stream. The "cheap heads" cost tier from durable adapters doesn't apply here (InMemory has no indexes); correctness is the goal, perf is a non-issue.

Scope rules:

  • Array input โ€” explicit stream names, regardless of subscription.
  • Filter input โ€” stream/stream_exact match against event-bearing stream names; source/source_exact/blocked require a corresponding subscription in _streams (those are subscription concepts, not event concepts). Empty filter {} matches every event-bearing stream.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Parametersโ€‹

inputโ€‹

string[] | Pick<StreamFilter, "stream" | "stream_exact">

options?โ€‹

QueryStatsOptions<E>

Returnsโ€‹

Promise<Map<string, StreamStats<E>>>

Implementation ofโ€‹

Store.query_stats


query_streams()โ€‹

query_streams(callback, query?): Promise<QueryStreamsResult>

Defined in: libs/act/src/adapters/in-memory-store.ts:716

Streams registered subscription positions to the callback, ordered by stream name. Returns the highest event id in the store and the count of positions emitted.

Parametersโ€‹

callbackโ€‹

(position) => void

query?โ€‹

QueryStreams

Returnsโ€‹

Promise<QueryStreamsResult>

Implementation ofโ€‹

Store.query_streams


reset()โ€‹

reset(input): Promise<number>

Defined in: libs/act/src/adapters/in-memory-store.ts:634

Reset watermarks to -1, clearing retry, blocked, error, and lease state so the matched streams can be replayed from the beginning. Accepts either an explicit list of names or a StreamFilter.

Parametersโ€‹

inputโ€‹

string[] | StreamFilter

Stream names or a filter selecting the streams to reset.

Returnsโ€‹

Promise<number>

Count of streams that were actually reset.

Implementation ofโ€‹

Store.reset


restore()โ€‹

restore(driver): Promise<void>

Defined in: libs/act/src/adapters/in-memory-store.ts:944

Atomically wipe-and-rebuild the store under an in-process snapshot.

Captures every index state up front, clears it, then hands the orchestrator a per-event insert callback via the driver. Any throw inside the driver restores the snapshot, leaving the store byte-for-byte unchanged from the operator's perspective.

ids are reassigned 0..N-1 as events arrive (matching the adapter's commit-id convention โ€” InMemory uses _events.length). created is preserved verbatim from the source.

Parametersโ€‹

driverโ€‹

(callback) => Promise<void>

Returnsโ€‹

Promise<void>

Implementation ofโ€‹

Store.restore


seed()โ€‹

seed(): Promise<void>

Defined in: libs/act/src/adapters/in-memory-store.ts:316

Seed the store with initial data (no-op for in-memory).

Returnsโ€‹

Promise<void>

Promise that resolves when seeding is complete.

Implementation ofโ€‹

Store.seed


subscribe()โ€‹

subscribe(streams): Promise<{ subscribed: number; watermark: number; }>

Defined in: libs/act/src/adapters/in-memory-store.ts:527

Registers streams for event processing. When the same stream is resubscribed with a different priority, the maximum wins โ€” so the highest-priority registered reaction sets the scheduling lane. Use prioritize for operator runtime overrides.

Parametersโ€‹

streamsโ€‹

object[]

Streams to register with optional source + priority.

Returnsโ€‹

Promise<{ subscribed: number; watermark: number; }>

subscribed count and current max watermark.

Implementation ofโ€‹

Store.subscribe


truncate()โ€‹

truncate(targets): Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>

Defined in: libs/act/src/adapters/in-memory-store.ts:878

Atomically truncates streams and seeds each with a snapshot or tombstone.

Parametersโ€‹

targetsโ€‹

object[]

Streams to truncate with optional snapshot state and meta.

Returnsโ€‹

Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>

Map keyed by stream name, each entry with deleted count and committed event.

Implementation ofโ€‹

Store.truncate


unblock()โ€‹

unblock(input): Promise<number>

Defined in: libs/act/src/adapters/in-memory-store.ts:668

Clear the blocked flag (and retry / error / lease) on the matched streams without touching the watermark. Streams that aren't blocked at call time are silently skipped. Accepts either an explicit list of names or a StreamFilter. The filter form always restricts to blocked streams โ€” passing blocked: false matches nothing. See Store.unblock.

Parametersโ€‹

inputโ€‹

string[] | StreamFilter

Stream names or a filter selecting the streams to unblock.

Returnsโ€‹

Promise<number>

Count of streams that were actually flipped (were blocked).

Implementation ofโ€‹

Store.unblock