InMemoryStore
@rotorsoft/act-root / act/src / InMemoryStore
Class: InMemoryStore
Defined in: libs/act/src/adapters/in-memory-store.ts:283
In-memory event store implementation.
This is the default store used by Act when no other store is injected. It stores all events in memory and is suitable for:
- Development and prototyping
- Unit and integration testing
- Demonstrations and examples
Not suitable for production - all data is lost when the process exits. Use PostgresStore for production deployments.
The in-memory store provides:
- Full Store interface implementation
- Optimistic concurrency control
- Stream leasing for distributed processing simulation
- Snapshot support
- Fast performance (no I/O overhead)
Store.notify is intentionally not implemented. The notify hook is a
cross-process wake-up signal โ local commits already arm the drain via
do(). An in-memory store is single-process by definition, so there is
no remote writer to be notified of. The Act orchestrator
detects the absence and falls back to the existing debounce/poll path.
Examplesโ
import { store } from "@rotorsoft/act";
describe("Counter", () => {
beforeEach(async () => {
// Reset store between tests
await store().seed();
});
it("increments", async () => {
await app.do("increment", target, { by: 5 });
const snapshot = await app.load(Counter, "counter-1");
expect(snapshot.state.count).toBe(5);
});
});
import { InMemoryStore } from "@rotorsoft/act";
const testStore = new InMemoryStore();
await testStore.seed();
// Use for specific test scenarios
await testStore.commit("test-stream", events, meta);
const events: any[] = [];
await store().query(
(event) => events.push(event),
{ stream: "test-stream" }
);
console.log(`Found ${events.length} events`);
Seeโ
Implementsโ
Constructorsโ
Constructorโ
new InMemoryStore():
InMemoryStore
Returnsโ
InMemoryStore
Methodsโ
ack()โ
ack(
leases):Promise<object[]>
Defined in: libs/act/src/adapters/in-memory-store.ts:566
Acknowledge completion of processing for leased streams.
Parametersโ
leasesโ
Lease[]
Leases to acknowledge, including last processed watermark and lease holder.
Returnsโ
Promise<object[]>
Implementation ofโ
block()โ
block(
leases):Promise<object[]>
Defined in: libs/act/src/adapters/in-memory-store.ts:578
Block a stream for processing after failing to process and reaching max retries with blocking enabled.
Parametersโ
leasesโ
Leases to block, including lease holder and last error message.
Returnsโ
Promise<object[]>
Blocked leases.
Implementation ofโ
claim()โ
claim(
lagging,leading,by,millis,lane?):Promise<Lease[]>
Defined in: libs/act/src/adapters/in-memory-store.ts:449
Atomically discovers and leases streams for processing. Fuses poll + lease into a single operation.
Parametersโ
laggingโ
number
Max streams from lagging frontier.
leadingโ
number
Max streams from leading frontier.
byโ
string
Lease holder identifier.
millisโ
number
Lease duration in milliseconds.
lane?โ
string
Returnsโ
Promise<Lease[]>
Granted leases.
Implementation ofโ
commit()โ
commit<
E>(stream,msgs,meta,expectedVersion?):Promise<Committed<E, keyofE>[]>
Defined in: libs/act/src/adapters/in-memory-store.ts:393
Commit one or more events to a stream.
Type Parametersโ
Eโ
E extends Schemas
Parametersโ
streamโ
string
The stream name.
msgsโ
Message<E, keyof E>[]
The events/messages to commit.
metaโ
Event metadata.
expectedVersion?โ
number
Optional optimistic concurrency check.
Returnsโ
Promise<Committed<E, keyof E>[]>
The committed events with metadata.
Throwsโ
ConcurrencyError if expectedVersion does not match.
Implementation ofโ
dispose()โ
dispose():
Promise<void>
Defined in: libs/act/src/adapters/in-memory-store.ts:307
Dispose of the store and clear all events.
Returnsโ
Promise<void>
Promise that resolves when disposal is complete.
Implementation ofโ
Store.dispose
drop()โ
drop():
Promise<void>
Defined in: libs/act/src/adapters/in-memory-store.ts:324
Drop all data from the store.
Returnsโ
Promise<void>
Promise that resolves when the store is cleared.
Implementation ofโ
prioritize()โ
prioritize(
filter,priority):Promise<number>
Defined in: libs/act/src/adapters/in-memory-store.ts:697
Bulk-update priority of streams matching filter. Mirrors
query_streams's filter semantics โ see Store.prioritize.
Unlike subscribe (which keeps max() of registered
priorities), this sets the priority outright โ operator override
for the build-time scheduling policy.
Parametersโ
filterโ
priorityโ
number
Returnsโ
Promise<number>
Count of streams whose priority changed.
Implementation ofโ
query()โ
query<
E>(callback,query?):Promise<number>
Defined in: libs/act/src/adapters/in-memory-store.ts:349
Query events in the store, optionally filtered by query options.
Type Parametersโ
Eโ
E extends Schemas
Parametersโ
callbackโ
(event) => void
Function to call for each event.
query?โ
Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; stream_exact?: boolean; with_snaps?: boolean; }>
Optional query options.
Returnsโ
Promise<number>
The number of events processed.
Implementation ofโ
query_stats()โ
query_stats<
E>(input,options?):Promise<Map<string,StreamStats<E>>>
Defined in: libs/act/src/adapters/in-memory-store.ts:793
Per-stream aggregated stats โ see Store.query_stats.
Single forward scan over the in-memory event list, accumulating per stream. The "cheap heads" cost tier from durable adapters doesn't apply here (InMemory has no indexes); correctness is the goal, perf is a non-issue.
Scope rules:
- Array
inputโ explicit stream names, regardless of subscription. - Filter
inputโstream/stream_exactmatch against event-bearing stream names;source/source_exact/blockedrequire a corresponding subscription in_streams(those are subscription concepts, not event concepts). Empty filter{}matches every event-bearing stream.
Type Parametersโ
Eโ
E extends Schemas
Parametersโ
inputโ
string[] | Pick<StreamFilter, "stream" | "stream_exact">
options?โ
Returnsโ
Promise<Map<string, StreamStats<E>>>
Implementation ofโ
query_streams()โ
query_streams(
callback,query?):Promise<QueryStreamsResult>
Defined in: libs/act/src/adapters/in-memory-store.ts:716
Streams registered subscription positions to the callback, ordered by stream name. Returns the highest event id in the store and the count of positions emitted.
Parametersโ
callbackโ
(position) => void
query?โ
Returnsโ
Promise<QueryStreamsResult>
Implementation ofโ
reset()โ
reset(
input):Promise<number>
Defined in: libs/act/src/adapters/in-memory-store.ts:634
Reset watermarks to -1, clearing retry, blocked, error, and lease state so the matched streams can be replayed from the beginning. Accepts either an explicit list of names or a StreamFilter.
Parametersโ
inputโ
string[] | StreamFilter
Stream names or a filter selecting the streams to reset.
Returnsโ
Promise<number>
Count of streams that were actually reset.
Implementation ofโ
restore()โ
restore(
driver):Promise<void>
Defined in: libs/act/src/adapters/in-memory-store.ts:944
Atomically wipe-and-rebuild the store under an in-process snapshot.
Captures every index state up front, clears it, then hands the
orchestrator a per-event insert callback via the driver. Any
throw inside the driver restores the snapshot, leaving the store
byte-for-byte unchanged from the operator's perspective.
ids are reassigned 0..N-1 as events arrive (matching the
adapter's commit-id convention โ InMemory uses _events.length).
created is preserved verbatim from the source.
Parametersโ
driverโ
(callback) => Promise<void>
Returnsโ
Promise<void>
Implementation ofโ
seed()โ
seed():
Promise<void>
Defined in: libs/act/src/adapters/in-memory-store.ts:316
Seed the store with initial data (no-op for in-memory).
Returnsโ
Promise<void>
Promise that resolves when seeding is complete.
Implementation ofโ
subscribe()โ
subscribe(
streams):Promise<{subscribed:number;watermark:number; }>
Defined in: libs/act/src/adapters/in-memory-store.ts:527
Registers streams for event processing. When the same stream is resubscribed with a different priority, the maximum wins โ so the highest-priority registered reaction sets the scheduling lane. Use prioritize for operator runtime overrides.
Parametersโ
streamsโ
object[]
Streams to register with optional source + priority.
Returnsโ
Promise<{ subscribed: number; watermark: number; }>
subscribed count and current max watermark.
Implementation ofโ
truncate()โ
truncate(
targets):Promise<Map<string, {committed:Committed<Schemas,string>;deleted:number; }>>
Defined in: libs/act/src/adapters/in-memory-store.ts:878
Atomically truncates streams and seeds each with a snapshot or tombstone.
Parametersโ
targetsโ
object[]
Streams to truncate with optional snapshot state and meta.
Returnsโ
Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>
Map keyed by stream name, each entry with deleted count and committed event.
Implementation ofโ
unblock()โ
unblock(
input):Promise<number>
Defined in: libs/act/src/adapters/in-memory-store.ts:668
Clear the blocked flag (and retry / error / lease) on the matched
streams without touching the watermark. Streams that aren't blocked
at call time are silently skipped. Accepts either an explicit list
of names or a StreamFilter. The filter form always restricts
to blocked streams โ passing blocked: false matches nothing.
See Store.unblock.
Parametersโ
inputโ
string[] | StreamFilter
Stream names or a filter selecting the streams to unblock.
Returnsโ
Promise<number>
Count of streams that were actually flipped (were blocked).