InMemoryStore
@rotorsoft/act-root / act/src / InMemoryStore
Class: InMemoryStore
Defined in: libs/act/src/adapters/in-memory-store.ts:287
In-memory event store implementation.
This is the default store used by Act when no other store is injected. It stores all events in memory and is suitable for:
- Development and prototyping
- Unit and integration testing
- Demonstrations and examples
Not suitable for production - all data is lost when the process exits. Use PostgresStore for production deployments.
The in-memory store provides:
- Full Store interface implementation
- Optimistic concurrency control
- Stream leasing for distributed processing simulation
- Snapshot support
- Fast performance (no I/O overhead)
Store.notify is intentionally not implemented. The notify hook is a
cross-process wake-up signal โ local commits already arm the drain via
do(). An in-memory store is single-process by definition, so there is
no remote writer to be notified of. The Act orchestrator
detects the absence and falls back to the existing debounce/poll path.
Examplesโ
Using in tests
import { store } from "@rotorsoft/act";
describe("Counter", () => {
beforeEach(async () => {
// Reset store between tests
await store().seed();
});
it("increments", async () => {
await app.do("increment", target, { by: 5 });
const snapshot = await app.load(Counter, "counter-1");
expect(snapshot.state.count).toBe(5);
});
});
Explicit instantiation
import { InMemoryStore } from "@rotorsoft/act";
const testStore = new InMemoryStore();
await testStore.seed();
// Use for specific test scenarios
await testStore.commit("test-stream", events, meta);
Querying events
const events: any[] = [];
await store().query(
(event) => events.push(event),
{ stream: "test-stream" }
);
console.log(`Found ${events.length} events`);
Seeโ
Implementsโ
Constructorsโ
Constructorโ
new InMemoryStore():
InMemoryStore
Returnsโ
InMemoryStore
Methodsโ
ack()โ
ack(
leases):Promise<object[]>
Defined in: libs/act/src/adapters/in-memory-store.ts:601
Acknowledge completion of processing for leased streams.
Parametersโ
leasesโ
Lease[]
Leases to acknowledge, including last processed watermark and lease holder.
Returnsโ
Promise<object[]>
Implementation ofโ
block()โ
block(
leases):Promise<object[]>
Defined in: libs/act/src/adapters/in-memory-store.ts:613
Block a stream for processing after failing to process and reaching max retries with blocking enabled.
Parametersโ
leasesโ
Leases to block, including lease holder and last error message.
Returnsโ
Promise<object[]>
Blocked leases.
Implementation ofโ
claim()โ
claim(
lagging,leading,by,millis,lane?):Promise<Lease[]>
Defined in: libs/act/src/adapters/in-memory-store.ts:484
Atomically discovers and leases streams for processing. Fuses poll + lease into a single operation.
Parametersโ
laggingโ
number
Max streams from lagging frontier.
leadingโ
number
Max streams from leading frontier.
byโ
string
Lease holder identifier.
millisโ
number
Lease duration in milliseconds.
lane?โ
string
Returnsโ
Promise<Lease[]>
Granted leases.
Implementation ofโ
commit()โ
commit<
E>(stream,msgs,meta,expectedVersion?):Promise<Committed<E, keyofE>[]>
Defined in: libs/act/src/adapters/in-memory-store.ts:417
Commit one or more events to a stream.
Type Parametersโ
Eโ
E extends Schemas
Parametersโ
streamโ
string
The stream name.
msgsโ
Message<E, keyof E>[]
The events/messages to commit.
metaโ
Event metadata.
expectedVersion?โ
number
Optional optimistic concurrency check.
Returnsโ
Promise<Committed<E, keyof E>[]>
The committed events with metadata.
Throwsโ
ConcurrencyError if expectedVersion does not match.
Implementation ofโ
dispose()โ
dispose():
Promise<void>
Defined in: libs/act/src/adapters/in-memory-store.ts:327
Dispose of the store and clear all events.
Returnsโ
Promise<void>
Promise that resolves when disposal is complete.
Implementation ofโ
Store.dispose
drop()โ
drop():
Promise<void>
Defined in: libs/act/src/adapters/in-memory-store.ts:344
Drop all data from the store.
Returnsโ
Promise<void>
Promise that resolves when the store is cleared.
Implementation ofโ
forget_pii()โ
forget_pii(
stream):Promise<number>
Defined in: libs/act/src/adapters/in-memory-store.ts:712
Wipe the sensitive-data payload for every event on the stream โ see
Store.forget_pii. O(1) drop of the stream's inner Map; the size of
that Map is the count of events that had PII. Idempotent: a second call
finds no inner Map and returns 0.
Parametersโ
streamโ
string
Target stream.
Returnsโ
Promise<number>
Count of events whose isolated PII payload was deleted.
Implementation ofโ
prioritize()โ
prioritize(
filter,priority):Promise<number>
Defined in: libs/act/src/adapters/in-memory-store.ts:748
Bulk-update priority of streams matching filter. Mirrors
query_streams's filter semantics โ see Store.prioritize.
Unlike subscribe (which keeps max() of registered
priorities), this sets the priority outright โ operator override
for the build-time scheduling policy.
Parametersโ
filterโ
priorityโ
number
Returnsโ
Promise<number>
Count of streams whose priority changed.
Implementation ofโ
query()โ
query<
E>(callback,query?):Promise<number>
Defined in: libs/act/src/adapters/in-memory-store.ts:369
Query events in the store, optionally filtered by query options.
Type Parametersโ
Eโ
E extends Schemas
Parametersโ
callbackโ
(event) => void
Function to call for each event.
query?โ
Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; stream_exact?: boolean; with_snaps?: boolean; }>
Optional query options.
Returnsโ
Promise<number>
The number of events processed.
Implementation ofโ
query_stats()โ
query_stats<
E>(input,options?):Promise<Map<string,StreamStats<E>>>
Defined in: libs/act/src/adapters/in-memory-store.ts:844
Per-stream aggregated stats โ see Store.query_stats.
Single forward scan over the in-memory event list, accumulating per stream. The "cheap heads" cost tier from durable adapters doesn't apply here (InMemory has no indexes); correctness is the goal, perf is a non-issue.
Scope rules:
- Array
inputโ explicit stream names, regardless of subscription. - Filter
inputโstream/stream_exactmatch against event-bearing stream names;source/source_exact/blockedrequire a corresponding subscription in_streams(those are subscription concepts, not event concepts). Empty filter{}matches every event-bearing stream.
Type Parametersโ
Eโ
E extends Schemas
Parametersโ
inputโ
string[] | Pick<StreamFilter, "stream" | "stream_exact">
options?โ
Returnsโ
Promise<Map<string, StreamStats<E>>>
Implementation ofโ
query_streams()โ
query_streams(
callback,query?):Promise<QueryStreamsResult>
Defined in: libs/act/src/adapters/in-memory-store.ts:767
Streams registered subscription positions to the callback, ordered by stream name. Returns the highest event id in the store and the count of positions emitted.
Parametersโ
callbackโ
(position) => void
query?โ
Returnsโ
Promise<QueryStreamsResult>
Implementation ofโ
reset()โ
reset(
input):Promise<number>
Defined in: libs/act/src/adapters/in-memory-store.ts:669
Reset watermarks to -1, clearing retry, blocked, error, and lease state so the matched streams can be replayed from the beginning. Accepts either an explicit list of names or a StreamFilter.
Parametersโ
inputโ
string[] | StreamFilter
Stream names or a filter selecting the streams to reset.
Returnsโ
Promise<number>
Count of streams that were actually reset.
Implementation ofโ
restore()โ
restore(
driver):Promise<void>
Defined in: libs/act/src/adapters/in-memory-store.ts:996
Atomically wipe-and-rebuild the store under an in-process snapshot.
Captures every index state up front, clears it, then hands the
orchestrator a per-event insert callback via the driver. Any
throw inside the driver restores the snapshot, leaving the store
byte-for-byte unchanged from the operator's perspective.
ids are reassigned 0..N-1 as events arrive (matching the
adapter's commit-id convention โ InMemory uses _events.length).
created is preserved verbatim from the source.
Parametersโ
driverโ
(callback) => Promise<void>
Returnsโ
Promise<void>
Implementation ofโ
seed()โ
seed():
Promise<void>
Defined in: libs/act/src/adapters/in-memory-store.ts:336
Seed the store with initial data (no-op for in-memory).
Returnsโ
Promise<void>
Promise that resolves when seeding is complete.
Implementation ofโ
subscribe()โ
subscribe(
streams):Promise<{subscribed:number;watermark:number; }>
Defined in: libs/act/src/adapters/in-memory-store.ts:562
Registers streams for event processing. When the same stream is resubscribed with a different priority, the maximum wins โ so the highest-priority registered reaction sets the scheduling lane. Use prioritize for operator runtime overrides.
Parametersโ
streamsโ
object[]
Streams to register with optional source + priority.
Returnsโ
Promise<{ subscribed: number; watermark: number; }>
subscribed count and current max watermark.
Implementation ofโ
truncate()โ
truncate(
targets):Promise<Map<string, {committed:Committed<Schemas,string>;deleted:number; }>>
Defined in: libs/act/src/adapters/in-memory-store.ts:929
Atomically truncates streams and seeds each with a snapshot or tombstone.
Parametersโ
targetsโ
object[]
Streams to truncate with optional snapshot state and meta.
Returnsโ
Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>
Map keyed by stream name, each entry with deleted count and committed event.
Implementation ofโ
unblock()โ
unblock(
input):Promise<number>
Defined in: libs/act/src/adapters/in-memory-store.ts:719
Clears the blocked flag on streams without replaying their history.
Sets blocked = false, retry_count = 0, error = null, and
clears any lease bookkeeping. The at watermark stays where it
was โ the stream resumes from the next event after the last
successful ack, not from zero.
The distinction from reset matters: reset() is for
projection rebuilds (replay from event 0); unblock() is for
recovering from a poison message after the operator fixes the
underlying issue. Use unblock() when you don't want to re-process
history.
Prefer Act.unblock() over calling this directly. Like
reset(), this primitive doesn't raise the orchestrator's internal
"needs drain" flag โ a settled Act instance will short-circuit and
skip the resume. Act.unblock() wraps this and arms the flag.
Only streams that were actually blocked at call time count toward
the return value; already-unblocked streams and unknown stream
names are silently skipped. The atomic single-statement update
makes the call safe to issue concurrently with claim() โ workers
holding a FOR UPDATE SKIP LOCKED lock won't see partial state.
Accepts either an explicit list of stream names or a
StreamFilter for bulk recovery (e.g., "unblock every
blocked order projection"). The blocked = true predicate is
always applied โ passing blocked: false in the filter matches
nothing. An empty filter ({}) means "unblock everything that's
blocked," which is a sane post-incident bulk recovery.
Parametersโ
inputโ
string[] | StreamFilter
Stream names or a StreamFilter
Returnsโ
Promise<number>
Count of streams that were actually flipped (were blocked)
Exampleโ
// By name (single targeted recovery)
await app.unblock(["webhooks-out-customer-42"]);
// By filter โ unblock every blocked stream in a family
await app.unblock({ stream: "^webhooks-out-" });
// Post-incident: unblock everything that's blocked
await app.unblock({});
// Low-level (does NOT trigger resume on settled apps)
await store().unblock(["webhooks-out-customer-42"]);
Seeโ
- Act.unblock for the high-level recovery API
- reset for the rebuild-from-zero alternative