SqliteStore
@rotorsoft/act-root / act-sqlite/src / SqliteStore
Class: SqliteStore
Defined in: libs/act-sqlite/src/SqliteStore.ts:74
SQLite event store adapter for @rotorsoft/act.
Provides persistent event storage using SQLite via @libsql/client.
All write operations use transactions for ACID guarantees.
Since SQLite serializes writes at the database level, the concurrency
model is equivalent to PostgreSQL's FOR UPDATE SKIP LOCKED for
single-server deployments.
Example
import { store } from "@rotorsoft/act";
import { SqliteStore } from "@rotorsoft/act-sqlite";
store(new SqliteStore({ url: "file:myapp.db" }));
await store().seed();
Implements
Constructors
Constructor
new SqliteStore(
config?):SqliteStore
Defined in: libs/act-sqlite/src/SqliteStore.ts:77
Parameters
config?
Partial<SqliteConfig> = {}
Returns
SqliteStore
Methods
ack()
ack(
leases):Promise<Lease[]>
Defined in: libs/act-sqlite/src/SqliteStore.ts:369
Acknowledges successful processing of leased streams.
Updates the watermark to indicate events have been processed successfully. Releases the lease so other workers can process subsequent events.
Parameters
leases
Lease[]
Leases to acknowledge with updated watermarks
Returns
Promise<Lease[]>
Acknowledged leases
Example
const leased = await store().claim(5, 5, randomUUID(), 10000);
// Process events up to ID 150
await store().ack(leased.map(l => ({ ...l, at: 150 })));
See
claim for acquiring leases
Implementation of
block()
block(
leases):Promise<BlockedLease[]>
Defined in: libs/act-sqlite/src/SqliteStore.ts:390
Blocks streams after persistent processing failures.
Blocked streams won't be returned by claim until manually unblocked. This prevents poison messages from repeatedly failing and consuming resources.
Streams are typically blocked when:
- Max retries reached
blockOnErroroption is true- Handler throws an error
Parameters
leases
Leases to block with error messages
Returns
Promise<BlockedLease[]>
Blocked leases
Example
try {
await processEvents(lease);
await store().ack([lease]);
} catch (error) {
if (lease.retry >= 3) {
await store().block([{
...lease,
error: error.message
}]);
}
}
See
claim for lease management
Implementation of
claim()
claim(
lagging,leading,by,millis):Promise<Lease[]>
Defined in: libs/act-sqlite/src/SqliteStore.ts:289
Atomically discovers and leases streams for reaction processing.
Atomically discovers a stream and acquires a lease in one round-trip, eliminating the race that exists when discovery and locking are separate calls (a competing worker can grab the stream between the two).
PostgresStore uses FOR UPDATE SKIP LOCKED for zero-contention competing
consumer semantics — workers never block each other, each grabbing different
streams atomically. InMemoryStore fuses its poll+lease logic equivalently.
Used by Act.drain() as the primary stream acquisition method.
Parameters
lagging
number
Max streams from the lagging frontier (ascending watermark)
leading
number
Max streams from the leading frontier (descending watermark)
by
string
Unique lease holder identifier (UUID)
millis
number
Lease duration in milliseconds
Returns
Promise<Lease[]>
Array of successfully leased streams with metadata
Example
const leased = await store().claim(5, 5, randomUUID(), 10000);
leased.forEach(({ stream, at, lagging }) => {
console.log(`Leased ${stream} at ${at} (lagging: ${lagging})`);
});
See
- subscribe for registering new streams (used by correlate)
- ack for acknowledging completion
- block for blocking failed streams
Implementation of
commit()
commit<
E>(stream,msgs,meta,expectedVersion?):Promise<Committed<E, keyofE>[]>
Defined in: libs/act-sqlite/src/SqliteStore.ts:130
Commits one or more events to a stream atomically.
This is the core method for persisting events. It must:
- Assign global sequence IDs to events
- Increment the stream version
- Check optimistic concurrency if expectedVersion is provided
- Store events atomically (all or nothing)
- Attach metadata (id, stream, version, created timestamp)
Type Parameters
E
E extends Schemas
Event schemas
Parameters
stream
string
The stream ID to commit to
msgs
Message<E, keyof E>[]
Array of messages (events) to commit
meta
Event metadata (correlation, causation)
expectedVersion?
number
Expected current version for optimistic concurrency
Returns
Promise<Committed<E, keyof E>[]>
Array of committed events with full metadata
Throws
If expectedVersion doesn't match current version
Example
const events = await store().commit(
"user-123",
[{ name: "UserCreated", data: { email: "user@example.com" } }],
{ correlation: "req-456", causation: { action: {...} } },
0 // Expect version 0 (new stream)
);
Implementation of
dispose()
dispose():
Promise<void>
Defined in: libs/act-sqlite/src/SqliteStore.ts:124
Returns
Promise<void>
Implementation of
Store.dispose
drop()
drop():
Promise<void>
Defined in: libs/act-sqlite/src/SqliteStore.ts:119
Drops all data from the store.
Dangerous operation that deletes all events and state. Use with extreme caution, primarily for testing or development environments.
Returns
Promise<void>
Example
// Clean up after tests
afterAll(async () => {
await store().drop();
});
Implementation of
query()
query<
E>(callback,query?):Promise<number>
Defined in: libs/act-sqlite/src/SqliteStore.ts:194
Queries events from the store with optional filtering.
Calls the callback for each matching event. The callback approach allows processing large result sets without loading everything into memory.
Type Parameters
E
E extends Schemas
Event schemas
Parameters
callback
(event) => void
Function invoked for each matching event
query?
Readonly<{ after?: number; backward?: boolean; before?: number; correlation?: string; created_after?: Date; created_before?: Date; limit?: number; names?: string[]; stream?: string; stream_exact?: boolean; with_snaps?: boolean; }>
Optional filter criteria — see Query for fields
(stream, name, after, before, created_after, created_before,
limit, with_snaps, stream_exact).
Returns
Promise<number>
Total number of events processed
Example
let count = 0;
await store().query(
(event) => {
console.log(event.name, event.data);
count++;
},
{ stream: "user-123" }
);
console.log(`Found ${count} events`);
Implementation of
query_streams()
query_streams(
callback,query?):Promise<QueryStreamsResult>
Defined in: libs/act-sqlite/src/SqliteStore.ts:432
Streams registered subscription positions to a callback, plus the highest event id in the store.
Read-only introspection for operational dashboards (Store / Subscriptions tab, projection lag, blocked subscriptions). Avoids forcing apps to open a second connection and run raw SQL against adapter-specific schemas.
Mirrors the Store.query callback pattern: the callback is
invoked once per matching position, allowing large result sets to be
processed without buffering. Results are ordered by stream name; use
query.after (last seen stream name) for keyset pagination on big
tables (dynamic reactions can produce one subscription per aggregate).
Parameters
callback
(position) => void
Invoked once per matching StreamPosition.
query?
Optional QueryStreams filter (default limit: 100).
Returns
Promise<QueryStreamsResult>
maxEventId and the count of positions emitted.
Examples
const { maxEventId } = await store().query_streams(
(s) => console.log(`${s.stream}: lag=${maxEventId - s.at} ${s.error}`),
{ blocked: true, limit: 50 }
);
let after: string | undefined;
for (;;) {
const page: StreamPosition[] = [];
const { count } = await store().query_streams(
(s) => page.push(s),
{ after, limit: 100 }
);
if (!count) break;
// ... use page ...
after = page.at(-1)?.stream;
}
Implementation of
reset()
reset(
streams):Promise<number>
Defined in: libs/act-sqlite/src/SqliteStore.ts:411
Resets watermarks for the given streams to -1, making them eligible for replay from the beginning. Also clears retry, blocked, error, and lease state so the streams can be claimed immediately.
Prefer Act.reset() over calling this directly. This primitive
only resets the store; it does not raise the orchestrator's internal
"needs drain" flag, so a settled Act instance will short-circuit and
skip the replay. Act.reset() wraps this and arms the flag.
Parameters
streams
string[]
Stream names to reset
Returns
Promise<number>
Count of streams that were actually reset
Example
// Recommended
await app.reset(["my-projection"]);
// Low-level (does NOT trigger replay on settled apps)
await store().reset(["my-projection"]);
See
Act.reset for the high-level rebuild API that wraps this primitive and arms the orchestrator's drain flag
Implementation of
seed()
seed():
Promise<void>
Defined in: libs/act-sqlite/src/SqliteStore.ts:85
Initializes or resets the store.
Used primarily for testing to ensure a clean state between tests. For production stores, this might create necessary tables or indexes.
Returns
Promise<void>
Example
// Reset store between tests
beforeEach(async () => {
await store().seed();
});
Implementation of
subscribe()
subscribe(
streams):Promise<{subscribed:number;watermark:number; }>
Defined in: libs/act-sqlite/src/SqliteStore.ts:265
Registers streams for event processing.
Upserts stream entries so they become visible to claim. Used by
correlate() to register dynamically discovered reaction target streams.
Also returns the current maximum watermark across all subscribed streams, used internally for correlation checkpoint initialization on cold start.
Parameters
streams
object[]
Streams to register with optional source hint
Returns
Promise<{ subscribed: number; watermark: number; }>
subscribed count of newly registered streams, watermark max at across all streams
Example
const { subscribed, watermark } = await store().subscribe([
{ stream: "stats-user-1", source: "user-1" },
{ stream: "stats-user-2", source: "user-2" },
]);
See
claim for discovering and leasing registered streams
Implementation of
truncate()
truncate(
targets):Promise<Map<string, {committed:Committed<Schemas,string>;deleted:number; }>>
Defined in: libs/act-sqlite/src/SqliteStore.ts:496
Atomically truncates streams and seeds each with a final event.
For each target, in a single transaction:
- Deletes all events for the stream
- Removes the stream's entry from the streams table
- Inserts a
__snapshot__(whensnapshotis provided) or__tombstone__event as the sole event on the stream
Parameters
targets
object[]
Streams to truncate with optional snapshot state and meta
Returns
Promise<Map<string, { committed: Committed<Schemas, string>; deleted: number; }>>
Map keyed by stream name, each entry with deleted count and committed event
See
Act.close for the high-level close-the-books API that orchestrates safety checks, archive callbacks, and atomic truncate+seed