SqliteStore
@rotorsoft/act-root / act-sqlite/src / SqliteStore
Class: SqliteStore
Defined in: libs/act-sqlite/src/sqlite-store.ts:84
SQLite event store adapter for @rotorsoft/act.
Provides persistent event storage using SQLite via @libsql/client.
All write operations use transactions for ACID guarantees.
Since SQLite serializes writes at the database level, the concurrency
model is equivalent to PostgreSQL's FOR UPDATE SKIP LOCKED for
single-server deployments.
Store.notify is intentionally not implemented. The notify hook is
a cross-process wake-up signal that lets a horizontally-scaled Act
deployment wake settle() immediately on remote commits. SQLite is
single-node by design โ there is no remote writer to be notified of โ
so the Act orchestrator falls back to the existing
debounce/poll path, which is correct for this topology.
Exampleโ
import { store } from "@rotorsoft/act";
import { SqliteStore } from "@rotorsoft/act-sqlite";
store(new SqliteStore({ url: "file:myapp.db" }));
await store().seed();
Implementsโ
Constructorsโ
Constructorโ
new SqliteStore(
config?):SqliteStore
Defined in: libs/act-sqlite/src/sqlite-store.ts:87
Parametersโ
config?โ
Partial<SqliteConfig> = {}
Returnsโ
SqliteStore
Methodsโ
ack()โ
ack(
leases):Promise<Lease[]>
Defined in: libs/act-sqlite/src/sqlite-store.ts:454
Acknowledges successful processing of leased streams.
Updates the watermark to indicate events have been processed successfully. Releases the lease so other workers can process subsequent events.
Parametersโ
leasesโ
Lease[]
Leases to acknowledge with updated watermarks
Returnsโ
Promise<Lease[]>
Acknowledged leases
Exampleโ
const leased = await store().claim(5, 5, randomUUID(), 10000);
// Process events up to ID 150
await store().ack(leased.map(l => ({ ...l, at: 150 })));
Seeโ
claim for acquiring leases
Implementation ofโ
block()โ
block(
leases):Promise<BlockedLease[]>
Defined in: libs/act-sqlite/src/sqlite-store.ts:475
Blocks streams after persistent processing failures.
Blocked streams won't be returned by claim until manually unblocked. This prevents poison messages from repeatedly failing and consuming resources.
Streams are typically blocked when:
- Max retries reached
blockOnErroroption is true- Handler throws an error
Parametersโ
leasesโ
Leases to block with error messages
Returnsโ
Promise<BlockedLease[]>
Blocked leases
Exampleโ
try {
await processEvents(lease);
await store().ack([lease]);
} catch (error) {
if (lease.retry >= 3) {
await store().block([{
...lease,
error: error.message
}]);
}
}
Seeโ
claim for lease management
Implementation ofโ
claim()โ
claim(
lagging,leading,by,millis,lane?):Promise<Lease[]>
Defined in: libs/act-sqlite/src/sqlite-store.ts:355
Atomically discovers and leases streams for reaction processing.
Atomically discovers a stream and acquires a lease in one round-trip, eliminating the race that exists when discovery and locking are separate calls (a competing worker can grab the stream between the two).
PostgresStore uses FOR UPDATE SKIP LOCKED for zero-contention competing
consumer semantics โ workers never block each other, each grabbing different
streams atomically. InMemoryStore fuses its poll+lease logic equivalently.
Used by Act.drain() as the primary stream acquisition method.
Parametersโ
laggingโ
number
Max streams from the lagging frontier (ascending watermark)
leadingโ
number
Max streams from the leading frontier (descending watermark)
byโ
string
Unique lease holder identifier (UUID)
millisโ
number
Lease duration in milliseconds
lane?โ
string
Optional lane filter (ACT-1103)
Returnsโ
Promise<Lease[]>
Array of successfully leased streams with metadata
Exampleโ
const leased = await store().claim(5, 5, randomUUID(), 10000);
leased.forEach(({ stream, at, lagging }) => {
console.log(`Leased ${stream} at ${at} (lagging: ${lagging})`);
});
Seeโ
- subscribe for registering new streams (used by correlate)
- ack for acknowledging completion
- block for blocking failed streams
Implementation ofโ
commit()โ
commit<
E>(stream,msgs,meta,expectedVersion?):Promise<Committed<E, keyofE>[]>
Defined in: libs/act-sqlite/src/sqlite-store.ts:167
Commits one or more events to a stream atomically.
This is the core method for persisting events. It must:
- Assign global sequence IDs to events
- Increment the stream version
- Check optimistic concurrency if expectedVersion is provided
- Store events atomically (all or nothing)
- Attach metadata (id, stream, version, created timestamp)
Type Parametersโ
Eโ
E extends Schemas
Event schemas
Parametersโ
streamโ
string
The stream ID to commit to
msgsโ
Message<E, keyof E>[]
Array of messages (events) to commit
metaโ
Event metadata (correlation, causation)
expectedVersion?โ
number
Expected current version for optimistic concurrency
Returnsโ
Promise<Committed<E, keyof E>[]>
Array of committed events with full metadata
Throwsโ
If expectedVersion doesn't match current version
Exampleโ
const events = await store().commit(
"user-123",
[{ name: "UserCreated", data: { email: "user@example.com" } }],
{ correlation: "req-456", causation: { action: {...} } },
0 // Expect version 0 (new stream)
);
Implementation ofโ
dispose()โ
dispose():
Promise<void>
Defined in: libs/act-sqlite/src/sqlite-store.ts:161
Returnsโ
Promise<void>
Implementation ofโ
Store.dispose
drop()โ
drop():
Promise<void>
Defined in: libs/act-sqlite/src/sqlite-store.ts:156
Drops all data from the store.
Dangerous operation that deletes all events and state. Use with extreme caution, primarily for testing or development environments.
Returnsโ
Promise<void>
Exampleโ
// Clean up after tests
afterAll(async () => {
await store().drop();
});
Implementation ofโ
prioritize()โ
prioritize(
filter,priority):Promise<number>
Defined in: libs/act-sqlite/src/sqlite-store.ts:949
Bulk-update the scheduling priority of streams matching a filter.
Used by Act.prioritize for operator runtime control over
lagging-frontier claim() ordering. Unlike subscribe,
which keeps the per-stream priority at the max() of all
registered reactions targeting that stream, prioritize sets the
priority directly to priority for matching rows โ letting
operators override the build-time scheduling policy.
Filter semantics mirror query_streams: stream/source
are regex by default, exact with the *_exact flags. blocked
restricts to blocked or unblocked rows. Omitted fields don't
filter. An empty filter ({}) updates every registered
stream โ useful for "reset all priorities to N" but a footgun
otherwise.
Parametersโ
filterโ
PrioritizeFilter selecting which streams
to update. Required (use {} to target all).
priorityโ
number
New priority value. Set as-is โ no max(),
no clamp.
Returnsโ
Promise<number>
Count of streams whose priority was changed.
Examplesโ
await store().prioritize(
{ stream: "^projection-orders$", stream_exact: false },
10
);
await store().prioritize({ source: "^audit-" }, -5);
Seeโ
- Act.prioritize for the orchestrator-level wrapper
- claim for how priority biases stream scheduling
Implementation ofโ
query()โ
query<
E>(callback,query?):Promise<number>
Defined in: libs/act-sqlite/src/sqlite-store.ts:231
Queries events from the store with optional filtering.
Calls the callback for each matching event. The callback approach allows processing large result sets without loading everything into memory.
Type Parametersโ
Eโ
E extends Schemas
Event schemas
Parametersโ
callbackโ
(event) => void
Function invoked for each matching event
query?โ
Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; stream_exact?: boolean; with_snaps?: boolean; }>
Optional filter criteria โ see Query for fields
(stream, name, after, before, created_after, created_before,
limit, with_snaps, stream_exact).
Returnsโ
Promise<number>
Total number of events processed
Exampleโ
let count = 0;
await store().query(
(event) => {
console.log(event.name, event.data);
count++;
},
{ stream: "user-123" }
);
console.log(`Found ${count} events`);
Implementation ofโ
query_stats()โ
query_stats<
E>(input,options?):Promise<Map<string,StreamStats<E>>>
Defined in: libs/act-sqlite/src/sqlite-store.ts:700
Per-stream aggregated stats โ see Store.query_stats.
Two code paths (mirrors the PostgresStore strategy):
-
Heads-only path (no
count, nonames): one or two queries usingROW_NUMBER() OVER (PARTITION BY stream ORDER BY version DESC|ASC)(SQLite lacks PG'sDISTINCT ON). Window function +WHERE rn = 1materializes the head (or tail) per stream from the(stream, version)unique index. ParallelPromise.allwhen tail is requested. -
Full-scan path (
countornamesset): one CTE materializes the filtered events, thenGROUP BY stream, nameโjson_group_object(name, n)for the names map plusSUM(n)for count. Heads (and tails when requested) come from the same scan.
SQLite specifics:
dataandmetaare stored as TEXT (JSON-encoded); the reader JSON-parses them when materializing the Committed rows.blockedis stored as 0/1 integer; the filter converts.- Array input expands to a placeholder list (
IN (?, ?, ...)) since SQLite has no native array type.
Type Parametersโ
Eโ
E extends Schemas
Parametersโ
inputโ
string[] | Pick<StreamFilter, "stream" | "stream_exact">
options?โ
Returnsโ
Promise<Map<string, StreamStats<E>>>
Implementation ofโ
query_streams()โ
query_streams(
callback,query?):Promise<QueryStreamsResult>
Defined in: libs/act-sqlite/src/sqlite-store.ts:607
Streams registered subscription positions to a callback, plus the highest event id in the store.
Read-only introspection for operational dashboards (Store / Subscriptions tab, projection lag, blocked subscriptions). Avoids forcing apps to open a second connection and run raw SQL against adapter-specific schemas.
Mirrors the Store.query callback pattern: the callback is
invoked once per matching position, allowing large result sets to be
processed without buffering. Results are ordered by stream name; use
query.after (last seen stream name) for keyset pagination on big
tables (dynamic reactions can produce one subscription per aggregate).
Parametersโ
callbackโ
(position) => void
Invoked once per matching StreamPosition.
query?โ
Optional QueryStreams filter (default limit: 100).
Returnsโ
Promise<QueryStreamsResult>
maxEventId and the count of positions emitted.
Examplesโ
const { maxEventId } = await store().query_streams(
(s) => console.log(`${s.stream}: lag=${maxEventId - s.at} ${s.error}`),
{ blocked: true, limit: 50 }
);
let after: string | undefined;
for (;;) {
const page: StreamPosition[] = [];
const { count } = await store().query_streams(
(s) => page.push(s),
{ after, limit: 100 }
);
if (!count) break;
// ... use page ...
after = page.at(-1)?.stream;
}
Implementation ofโ
reset()โ
reset(
input):Promise<number>
Defined in: libs/act-sqlite/src/sqlite-store.ts:537
Resets watermarks for the given streams to -1, making them eligible for replay from the beginning. Also clears retry, blocked, error, and lease state so the streams can be claimed immediately.
Prefer Act.reset() over calling this directly. This primitive
only resets the store; it does not raise the orchestrator's internal
"needs drain" flag, so a settled Act instance will short-circuit and
skip the replay. Act.reset() wraps this and arms the flag.
Accepts either an explicit list of stream names or a
StreamFilter for bulk operations (e.g., "rebuild every
blocked stream"). The filter form is the same shape used by
prioritize and query_streams. An empty filter
({}) matches every registered stream โ typically a footgun for
reset; prefer narrower filters like { blocked: true }.
Parametersโ
inputโ
string[] | StreamFilter
Stream names or a StreamFilter
Returnsโ
Promise<number>
Count of streams that were actually reset
Exampleโ
// By name
await app.reset(["my-projection"]);
// By filter โ rebuild every blocked stream in a projection family
await app.reset({ stream: "^proj-orders-", blocked: true });
// Low-level (does NOT trigger replay on settled apps)
await store().reset(["my-projection"]);
Seeโ
Act.reset for the high-level rebuild API that wraps this primitive and arms the orchestrator's drain flag
Implementation ofโ
restore()โ
restore(
driver):Promise<void>
Defined in: libs/act-sqlite/src/sqlite-store.ts:1041
Atomically wipe-and-rebuild the store inside a single libsql
write transaction.
On any throw inside the driver the transaction rolls back and the
store is byte-for-byte unchanged. DELETE FROM events + DELETE FROM streams wipe both tables; DELETE FROM sqlite_sequence WHERE name = 'events' resets the autoincrement counter so the
new sequence is dense from 1. created is preserved verbatim
from the source.
Parametersโ
driverโ
(callback) => Promise<void>
Returnsโ
Promise<void>
Implementation ofโ
seed()โ
seed():
Promise<void>
Defined in: libs/act-sqlite/src/sqlite-store.ts:95
Initializes or resets the store.
Used primarily for testing to ensure a clean state between tests. For production stores, this might create necessary tables or indexes.
Returnsโ
Promise<void>
Exampleโ
// Reset store between tests
beforeEach(async () => {
await store().seed();
});
Implementation ofโ
subscribe()โ
subscribe(
streams):Promise<{subscribed:number;watermark:number; }>
Defined in: libs/act-sqlite/src/sqlite-store.ts:305
Registers streams for event processing.
Upserts stream entries so they become visible to claim. Used by
correlate() to register dynamically discovered reaction target streams.
Also returns the current maximum watermark across all subscribed streams, used internally for correlation checkpoint initialization on cold start.
Parametersโ
streamsโ
object[]
Streams to register with optional source hint
Returnsโ
Promise<{ subscribed: number; watermark: number; }>
subscribed count of newly registered streams, watermark max at across all streams
Exampleโ
const { subscribed, watermark } = await store().subscribe([
{ stream: "stats-user-1", source: "user-1" },
{ stream: "stats-user-2", source: "user-2", priority: 10 },
]);
Seeโ
- claim for discovering and leasing registered streams
- prioritize for changing priority after subscription
Implementation ofโ
truncate()โ
truncate(
targets):Promise<Map<string, {committed:Committed<Schemas,string>;deleted:number; }>>
Defined in: libs/act-sqlite/src/sqlite-store.ts:964
Atomically truncates streams and seeds each with a final event.
For each target, in a single transaction:
- Deletes all events for the stream
- Removes the stream's entry from the streams table
- Inserts a
__snapshot__(whensnapshotis provided) or__tombstone__event as the sole event on the stream
Parametersโ
targetsโ
object[]
Streams to truncate with optional snapshot state and meta
Returnsโ
Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>
Map keyed by stream name, each entry with deleted count and committed event
Seeโ
Act.close for the high-level close-the-books API that orchestrates safety checks, archive callbacks, and atomic truncate+seed
Implementation ofโ
unblock()โ
unblock(
input):Promise<number>
Defined in: libs/act-sqlite/src/sqlite-store.ts:570
Clears the blocked flag on streams without replaying their history.
Sets blocked = false, retry_count = 0, error = null, and
clears any lease bookkeeping. The at watermark stays where it
was โ the stream resumes from the next event after the last
successful ack, not from zero.
The distinction from reset matters: reset() is for
projection rebuilds (replay from event 0); unblock() is for
recovering from a poison message after the operator fixes the
underlying issue. Use unblock() when you don't want to re-process
history.
Prefer Act.unblock() over calling this directly. Like
reset(), this primitive doesn't raise the orchestrator's internal
"needs drain" flag โ a settled Act instance will short-circuit and
skip the resume. Act.unblock() wraps this and arms the flag.
Only streams that were actually blocked at call time count toward
the return value; already-unblocked streams and unknown stream
names are silently skipped. The atomic single-statement update
makes the call safe to issue concurrently with claim() โ workers
holding a FOR UPDATE SKIP LOCKED lock won't see partial state.
Accepts either an explicit list of stream names or a
StreamFilter for bulk recovery (e.g., "unblock every
blocked order projection"). The blocked = true predicate is
always applied โ passing blocked: false in the filter matches
nothing. An empty filter ({}) means "unblock everything that's
blocked," which is a sane post-incident bulk recovery.
Parametersโ
inputโ
string[] | StreamFilter
Stream names or a StreamFilter
Returnsโ
Promise<number>
Count of streams that were actually flipped (were blocked)
Exampleโ
// By name (single targeted recovery)
await app.unblock(["webhooks-out-customer-42"]);
// By filter โ unblock every blocked stream in a family
await app.unblock({ stream: "^webhooks-out-" });
// Post-incident: unblock everything that's blocked
await app.unblock({});
// Low-level (does NOT trigger resume on settled apps)
await store().unblock(["webhooks-out-customer-42"]);
Seeโ
- Act.unblock for the high-level recovery API
- reset for the rebuild-from-zero alternative