Skip to main content

SqliteStore

@rotorsoft/act-root


@rotorsoft/act-root / act-sqlite/src / SqliteStore

Class: SqliteStore

Defined in: libs/act-sqlite/src/sqlite-store.ts:84

SQLite event store adapter for @rotorsoft/act.

Provides persistent event storage using SQLite via @libsql/client. All write operations use transactions for ACID guarantees. Since SQLite serializes writes at the database level, the concurrency model is equivalent to PostgreSQL's FOR UPDATE SKIP LOCKED for single-server deployments.

Store.notify is intentionally not implemented. The notify hook is a cross-process wake-up signal that lets a horizontally-scaled Act deployment wake settle() immediately on remote commits. SQLite is single-node by design โ€” there is no remote writer to be notified of โ€” so the Act orchestrator falls back to the existing debounce/poll path, which is correct for this topology.

Exampleโ€‹

import { store } from "@rotorsoft/act";
import { SqliteStore } from "@rotorsoft/act-sqlite";

store(new SqliteStore({ url: "file:myapp.db" }));
await store().seed();

Implementsโ€‹

Constructorsโ€‹

Constructorโ€‹

new SqliteStore(config?): SqliteStore

Defined in: libs/act-sqlite/src/sqlite-store.ts:87

Parametersโ€‹

config?โ€‹

Partial<SqliteConfig> = {}

Returnsโ€‹

SqliteStore

Methodsโ€‹

ack()โ€‹

ack(leases): Promise<Lease[]>

Defined in: libs/act-sqlite/src/sqlite-store.ts:454

Acknowledges successful processing of leased streams.

Updates the watermark to indicate events have been processed successfully. Releases the lease so other workers can process subsequent events.

Parametersโ€‹

leasesโ€‹

Lease[]

Leases to acknowledge with updated watermarks

Returnsโ€‹

Promise<Lease[]>

Acknowledged leases

Exampleโ€‹

const leased = await store().claim(5, 5, randomUUID(), 10000);
// Process events up to ID 150
await store().ack(leased.map(l => ({ ...l, at: 150 })));

Seeโ€‹

claim for acquiring leases

Implementation ofโ€‹

Store.ack


block()โ€‹

block(leases): Promise<BlockedLease[]>

Defined in: libs/act-sqlite/src/sqlite-store.ts:475

Blocks streams after persistent processing failures.

Blocked streams won't be returned by claim until manually unblocked. This prevents poison messages from repeatedly failing and consuming resources.

Streams are typically blocked when:

  • Max retries reached
  • blockOnError option is true
  • Handler throws an error

Parametersโ€‹

leasesโ€‹

BlockedLease[]

Leases to block with error messages

Returnsโ€‹

Promise<BlockedLease[]>

Blocked leases

Exampleโ€‹

try {
await processEvents(lease);
await store().ack([lease]);
} catch (error) {
if (lease.retry >= 3) {
await store().block([{
...lease,
error: error.message
}]);
}
}

Seeโ€‹

claim for lease management

Implementation ofโ€‹

Store.block


claim()โ€‹

claim(lagging, leading, by, millis, lane?): Promise<Lease[]>

Defined in: libs/act-sqlite/src/sqlite-store.ts:355

Atomically discovers and leases streams for reaction processing.

Atomically discovers a stream and acquires a lease in one round-trip, eliminating the race that exists when discovery and locking are separate calls (a competing worker can grab the stream between the two).

PostgresStore uses FOR UPDATE SKIP LOCKED for zero-contention competing consumer semantics โ€” workers never block each other, each grabbing different streams atomically. InMemoryStore fuses its poll+lease logic equivalently.

Used by Act.drain() as the primary stream acquisition method.

Parametersโ€‹

laggingโ€‹

number

Max streams from the lagging frontier (ascending watermark)

leadingโ€‹

number

Max streams from the leading frontier (descending watermark)

byโ€‹

string

Unique lease holder identifier (UUID)

millisโ€‹

number

Lease duration in milliseconds

lane?โ€‹

string

Optional lane filter (ACT-1103)

Returnsโ€‹

Promise<Lease[]>

Array of successfully leased streams with metadata

Exampleโ€‹

const leased = await store().claim(5, 5, randomUUID(), 10000);
leased.forEach(({ stream, at, lagging }) => {
console.log(`Leased ${stream} at ${at} (lagging: ${lagging})`);
});

Seeโ€‹

  • subscribe for registering new streams (used by correlate)
  • ack for acknowledging completion
  • block for blocking failed streams

Implementation ofโ€‹

Store.claim


commit()โ€‹

commit<E>(stream, msgs, meta, expectedVersion?): Promise<Committed<E, keyof E>[]>

Defined in: libs/act-sqlite/src/sqlite-store.ts:167

Commits one or more events to a stream atomically.

This is the core method for persisting events. It must:

  • Assign global sequence IDs to events
  • Increment the stream version
  • Check optimistic concurrency if expectedVersion is provided
  • Store events atomically (all or nothing)
  • Attach metadata (id, stream, version, created timestamp)

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Event schemas

Parametersโ€‹

streamโ€‹

string

The stream ID to commit to

msgsโ€‹

Message<E, keyof E>[]

Array of messages (events) to commit

metaโ€‹

EventMeta

Event metadata (correlation, causation)

expectedVersion?โ€‹

number

Expected current version for optimistic concurrency

Returnsโ€‹

Promise<Committed<E, keyof E>[]>

Array of committed events with full metadata

Throwsโ€‹

If expectedVersion doesn't match current version

Exampleโ€‹

const events = await store().commit(
"user-123",
[{ name: "UserCreated", data: { email: "user@example.com" } }],
{ correlation: "req-456", causation: { action: {...} } },
0 // Expect version 0 (new stream)
);

Implementation ofโ€‹

Store.commit


dispose()โ€‹

dispose(): Promise<void>

Defined in: libs/act-sqlite/src/sqlite-store.ts:161

Returnsโ€‹

Promise<void>

Implementation ofโ€‹

Store.dispose


drop()โ€‹

drop(): Promise<void>

Defined in: libs/act-sqlite/src/sqlite-store.ts:156

Drops all data from the store.

Dangerous operation that deletes all events and state. Use with extreme caution, primarily for testing or development environments.

Returnsโ€‹

Promise<void>

Exampleโ€‹

// Clean up after tests
afterAll(async () => {
await store().drop();
});

Implementation ofโ€‹

Store.drop


prioritize()โ€‹

prioritize(filter, priority): Promise<number>

Defined in: libs/act-sqlite/src/sqlite-store.ts:949

Bulk-update the scheduling priority of streams matching a filter.

Used by Act.prioritize for operator runtime control over lagging-frontier claim() ordering. Unlike subscribe, which keeps the per-stream priority at the max() of all registered reactions targeting that stream, prioritize sets the priority directly to priority for matching rows โ€” letting operators override the build-time scheduling policy.

Filter semantics mirror query_streams: stream/source are regex by default, exact with the *_exact flags. blocked restricts to blocked or unblocked rows. Omitted fields don't filter. An empty filter ({}) updates every registered stream โ€” useful for "reset all priorities to N" but a footgun otherwise.

Parametersโ€‹

filterโ€‹

StreamFilter

PrioritizeFilter selecting which streams to update. Required (use {} to target all).

priorityโ€‹

number

New priority value. Set as-is โ€” no max(), no clamp.

Returnsโ€‹

Promise<number>

Count of streams whose priority was changed.

Examplesโ€‹

await store().prioritize(
{ stream: "^projection-orders$", stream_exact: false },
10
);
await store().prioritize({ source: "^audit-" }, -5);

Seeโ€‹

  • Act.prioritize for the orchestrator-level wrapper
  • claim for how priority biases stream scheduling

Implementation ofโ€‹

Store.prioritize


query()โ€‹

query<E>(callback, query?): Promise<number>

Defined in: libs/act-sqlite/src/sqlite-store.ts:231

Queries events from the store with optional filtering.

Calls the callback for each matching event. The callback approach allows processing large result sets without loading everything into memory.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Event schemas

Parametersโ€‹

callbackโ€‹

(event) => void

Function invoked for each matching event

query?โ€‹

Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; stream_exact?: boolean; with_snaps?: boolean; }>

Optional filter criteria โ€” see Query for fields (stream, name, after, before, created_after, created_before, limit, with_snaps, stream_exact).

Returnsโ€‹

Promise<number>

Total number of events processed

Exampleโ€‹

let count = 0;
await store().query(
(event) => {
console.log(event.name, event.data);
count++;
},
{ stream: "user-123" }
);
console.log(`Found ${count} events`);

Implementation ofโ€‹

Store.query


query_stats()โ€‹

query_stats<E>(input, options?): Promise<Map<string, StreamStats<E>>>

Defined in: libs/act-sqlite/src/sqlite-store.ts:700

Per-stream aggregated stats โ€” see Store.query_stats.

Two code paths (mirrors the PostgresStore strategy):

  • Heads-only path (no count, no names): one or two queries using ROW_NUMBER() OVER (PARTITION BY stream ORDER BY version DESC|ASC) (SQLite lacks PG's DISTINCT ON). Window function + WHERE rn = 1 materializes the head (or tail) per stream from the (stream, version) unique index. Parallel Promise.all when tail is requested.

  • Full-scan path (count or names set): one CTE materializes the filtered events, then GROUP BY stream, name โ†’ json_group_object(name, n) for the names map plus SUM(n) for count. Heads (and tails when requested) come from the same scan.

SQLite specifics:

  • data and meta are stored as TEXT (JSON-encoded); the reader JSON-parses them when materializing the Committed rows.
  • blocked is stored as 0/1 integer; the filter converts.
  • Array input expands to a placeholder list (IN (?, ?, ...)) since SQLite has no native array type.

Type Parametersโ€‹

Eโ€‹

E extends Schemas

Parametersโ€‹

inputโ€‹

string[] | Pick<StreamFilter, "stream" | "stream_exact">

options?โ€‹

QueryStatsOptions<E>

Returnsโ€‹

Promise<Map<string, StreamStats<E>>>

Implementation ofโ€‹

Store.query_stats


query_streams()โ€‹

query_streams(callback, query?): Promise<QueryStreamsResult>

Defined in: libs/act-sqlite/src/sqlite-store.ts:607

Streams registered subscription positions to a callback, plus the highest event id in the store.

Read-only introspection for operational dashboards (Store / Subscriptions tab, projection lag, blocked subscriptions). Avoids forcing apps to open a second connection and run raw SQL against adapter-specific schemas.

Mirrors the Store.query callback pattern: the callback is invoked once per matching position, allowing large result sets to be processed without buffering. Results are ordered by stream name; use query.after (last seen stream name) for keyset pagination on big tables (dynamic reactions can produce one subscription per aggregate).

Parametersโ€‹

callbackโ€‹

(position) => void

Invoked once per matching StreamPosition.

query?โ€‹

QueryStreams

Optional QueryStreams filter (default limit: 100).

Returnsโ€‹

Promise<QueryStreamsResult>

maxEventId and the count of positions emitted.

Examplesโ€‹

const { maxEventId } = await store().query_streams(
(s) => console.log(`${s.stream}: lag=${maxEventId - s.at} ${s.error}`),
{ blocked: true, limit: 50 }
);
let after: string | undefined;
for (;;) {
const page: StreamPosition[] = [];
const { count } = await store().query_streams(
(s) => page.push(s),
{ after, limit: 100 }
);
if (!count) break;
// ... use page ...
after = page.at(-1)?.stream;
}

Implementation ofโ€‹

Store.query_streams


reset()โ€‹

reset(input): Promise<number>

Defined in: libs/act-sqlite/src/sqlite-store.ts:537

Resets watermarks for the given streams to -1, making them eligible for replay from the beginning. Also clears retry, blocked, error, and lease state so the streams can be claimed immediately.

Prefer Act.reset() over calling this directly. This primitive only resets the store; it does not raise the orchestrator's internal "needs drain" flag, so a settled Act instance will short-circuit and skip the replay. Act.reset() wraps this and arms the flag.

Accepts either an explicit list of stream names or a StreamFilter for bulk operations (e.g., "rebuild every blocked stream"). The filter form is the same shape used by prioritize and query_streams. An empty filter ({}) matches every registered stream โ€” typically a footgun for reset; prefer narrower filters like { blocked: true }.

Parametersโ€‹

inputโ€‹

string[] | StreamFilter

Stream names or a StreamFilter

Returnsโ€‹

Promise<number>

Count of streams that were actually reset

Exampleโ€‹

// By name
await app.reset(["my-projection"]);

// By filter โ€” rebuild every blocked stream in a projection family
await app.reset({ stream: "^proj-orders-", blocked: true });

// Low-level (does NOT trigger replay on settled apps)
await store().reset(["my-projection"]);

Seeโ€‹

Act.reset for the high-level rebuild API that wraps this primitive and arms the orchestrator's drain flag

Implementation ofโ€‹

Store.reset


restore()โ€‹

restore(driver): Promise<void>

Defined in: libs/act-sqlite/src/sqlite-store.ts:1041

Atomically wipe-and-rebuild the store inside a single libsql write transaction.

On any throw inside the driver the transaction rolls back and the store is byte-for-byte unchanged. DELETE FROM events + DELETE FROM streams wipe both tables; DELETE FROM sqlite_sequence WHERE name = 'events' resets the autoincrement counter so the new sequence is dense from 1. created is preserved verbatim from the source.

Parametersโ€‹

driverโ€‹

(callback) => Promise<void>

Returnsโ€‹

Promise<void>

Implementation ofโ€‹

Store.restore


seed()โ€‹

seed(): Promise<void>

Defined in: libs/act-sqlite/src/sqlite-store.ts:95

Initializes or resets the store.

Used primarily for testing to ensure a clean state between tests. For production stores, this might create necessary tables or indexes.

Returnsโ€‹

Promise<void>

Exampleโ€‹

// Reset store between tests
beforeEach(async () => {
await store().seed();
});

Implementation ofโ€‹

Store.seed


subscribe()โ€‹

subscribe(streams): Promise<{ subscribed: number; watermark: number; }>

Defined in: libs/act-sqlite/src/sqlite-store.ts:305

Registers streams for event processing.

Upserts stream entries so they become visible to claim. Used by correlate() to register dynamically discovered reaction target streams.

Also returns the current maximum watermark across all subscribed streams, used internally for correlation checkpoint initialization on cold start.

Parametersโ€‹

streamsโ€‹

object[]

Streams to register with optional source hint

Returnsโ€‹

Promise<{ subscribed: number; watermark: number; }>

subscribed count of newly registered streams, watermark max at across all streams

Exampleโ€‹

const { subscribed, watermark } = await store().subscribe([
{ stream: "stats-user-1", source: "user-1" },
{ stream: "stats-user-2", source: "user-2", priority: 10 },
]);

Seeโ€‹

  • claim for discovering and leasing registered streams
  • prioritize for changing priority after subscription

Implementation ofโ€‹

Store.subscribe


truncate()โ€‹

truncate(targets): Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>

Defined in: libs/act-sqlite/src/sqlite-store.ts:964

Atomically truncates streams and seeds each with a final event.

For each target, in a single transaction:

  1. Deletes all events for the stream
  2. Removes the stream's entry from the streams table
  3. Inserts a __snapshot__ (when snapshot is provided) or __tombstone__ event as the sole event on the stream

Parametersโ€‹

targetsโ€‹

object[]

Streams to truncate with optional snapshot state and meta

Returnsโ€‹

Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>

Map keyed by stream name, each entry with deleted count and committed event

Seeโ€‹

Act.close for the high-level close-the-books API that orchestrates safety checks, archive callbacks, and atomic truncate+seed

Implementation ofโ€‹

Store.truncate


unblock()โ€‹

unblock(input): Promise<number>

Defined in: libs/act-sqlite/src/sqlite-store.ts:570

Clears the blocked flag on streams without replaying their history. Sets blocked = false, retry_count = 0, error = null, and clears any lease bookkeeping. The at watermark stays where it was โ€” the stream resumes from the next event after the last successful ack, not from zero.

The distinction from reset matters: reset() is for projection rebuilds (replay from event 0); unblock() is for recovering from a poison message after the operator fixes the underlying issue. Use unblock() when you don't want to re-process history.

Prefer Act.unblock() over calling this directly. Like reset(), this primitive doesn't raise the orchestrator's internal "needs drain" flag โ€” a settled Act instance will short-circuit and skip the resume. Act.unblock() wraps this and arms the flag.

Only streams that were actually blocked at call time count toward the return value; already-unblocked streams and unknown stream names are silently skipped. The atomic single-statement update makes the call safe to issue concurrently with claim() โ€” workers holding a FOR UPDATE SKIP LOCKED lock won't see partial state.

Accepts either an explicit list of stream names or a StreamFilter for bulk recovery (e.g., "unblock every blocked order projection"). The blocked = true predicate is always applied โ€” passing blocked: false in the filter matches nothing. An empty filter ({}) means "unblock everything that's blocked," which is a sane post-incident bulk recovery.

Parametersโ€‹

inputโ€‹

string[] | StreamFilter

Stream names or a StreamFilter

Returnsโ€‹

Promise<number>

Count of streams that were actually flipped (were blocked)

Exampleโ€‹

// By name (single targeted recovery)
await app.unblock(["webhooks-out-customer-42"]);

// By filter โ€” unblock every blocked stream in a family
await app.unblock({ stream: "^webhooks-out-" });

// Post-incident: unblock everything that's blocked
await app.unblock({});

// Low-level (does NOT trigger resume on settled apps)
await store().unblock(["webhooks-out-customer-42"]);

Seeโ€‹

  • Act.unblock for the high-level recovery API
  • reset for the rebuild-from-zero alternative

Implementation ofโ€‹

Store.unblock