diff --git a/src/index.ts b/src/index.ts
index 7e591a10..cdec6bb9 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -5,6 +5,8 @@ export * from "./crdt.js";
export * from "./indexer.js";
+export { defaultWriteQueueOpts } from "./write-queue.js";
+
export * as bs from "./blockstore/index.js";
export * as blockstore from "./blockstore/index.js";
diff --git a/src/ledger.ts b/src/ledger.ts
index d8da28f3..d1a6432b 100644
--- a/src/ledger.ts
+++ b/src/ledger.ts
@@ -1,6 +1,6 @@
import { BuildURI, CoerceURI, KeyedResolvOnce, Logger, ResolveOnce, URI } from "@adviser/cement";
-import { WriteQueue, writeQueue } from "./write-queue.js";
+import { defaultWriteQueueOpts, WriteQueue, writeQueue, WriteQueueParams } from "./write-queue.js";
import { CRDT, HasCRDT } from "./crdt.js";
import { index } from "./indexer.js";
import {
@@ -15,6 +15,7 @@ import {
type IndexKeyType,
type ListenerFn,
type DocResponse,
+ type BulkResponse,
type ChangesResponse,
type DocTypes,
type IndexRows,
@@ -48,6 +49,8 @@ export interface LedgerOpts {
// readonly public?: boolean;
readonly meta?: DbMeta;
readonly gatewayInterceptor?: GatewayInterceptor;
+
+ readonly writeQueue: WriteQueueParams;
// readonly factoryUnreg?: () => void;
// readonly persistIndexes?: boolean;
// readonly autoCompact?: number;
@@ -73,6 +76,7 @@ export interface Ledger
> extends HasC
get(id: string): Promise>;
put(doc: DocSet): Promise;
+ bulk(docs: DocSet[]): Promise;
del(id: string): Promise;
changes(since?: ClockHead, opts?: ChangesOptions): Promise>;
allDocs(opts?: AllDocsQueryOpts): Promise>;
@@ -106,6 +110,7 @@ export function LedgerFactory>(name: s
keyBag: defaultKeyBagOpts(sthis, opts?.keyBag),
storeUrls: toStoreURIRuntime(sthis, name, opts?.storeUrls),
gatewayInterceptor: opts?.gatewayInterceptor,
+ writeQueue: defaultWriteQueueOpts(opts?.writeQueue),
storeEnDe: {
encodeFile,
decodeFile,
@@ -161,6 +166,9 @@ export class LedgerShell> implements
put(doc: DocSet): Promise {
return this.ref.put(doc);
}
+ bulk(docs: DocSet[]): Promise {
+ return this.ref.bulk(docs);
+ }
del(id: string): Promise {
return this.ref.del(id);
}
@@ -225,6 +233,7 @@ class LedgerImpl> implements Ledger fn());
}
// await this.blockstore.close();
@@ -256,15 +265,9 @@ class LedgerImpl> implements Ledger[]) => {
- return await this.crdt.bulk(updates);
- }); //, Infinity)
- this.crdt.clock.onTock(() => {
- this._no_update_notify();
- });
+ this._writeQueue = writeQueue(this.sthis, async (updates: DocUpdate[]) => this.crdt.bulk(updates), this.opts.writeQueue);
+ this.crdt.clock.onTock(() => this._no_update_notify());
}
get name(): string {
@@ -299,6 +302,23 @@ class LedgerImpl> implements Ledger(docs: DocSet[]): Promise {
+ await this.ready();
+
+ const updates = docs.map((doc) => {
+ const id = doc._id || this.sthis.timeOrderedNextId().str;
+ return {
+ id,
+ value: {
+ ...(doc as unknown as DocSet),
+ _id: id,
+ },
+ };
+ });
+ const result = (await this._writeQueue.bulk(updates)) as CRDTMeta;
+ return { ids: updates.map((u) => u.id), clock: result.head, name: this.name } as BulkResponse;
+ }
+
async del(id: string): Promise {
await this.ready();
this.logger.Debug().Str("id", id).Msg("del");
diff --git a/src/types.ts b/src/types.ts
index d484eb1b..ce40c3df 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -6,6 +6,7 @@ import { EnvFactoryOpts, Env, Logger, CryptoRuntime, Result } from "@adviser/cem
// import type { MakeDirectoryOptions, PathLike, Stats } from "fs";
import { KeyBagOpts } from "./runtime/key-bag.js";
+import { WriteQueueParams } from "./write-queue.js";
export type { DbMeta };
@@ -118,6 +119,7 @@ export interface ConfigOpts extends Partial {
readonly public?: boolean;
readonly meta?: DbMeta;
// readonly persistIndexes?: boolean;
+ readonly writeQueue?: Partial;
readonly gatewayInterceptor?: GatewayInterceptor;
readonly autoCompact?: number;
readonly storeUrls?: StoreUrlsOpts;
@@ -301,6 +303,12 @@ export interface DocResponse {
readonly name?: string;
}
+export interface BulkResponse {
+ readonly ids: string[];
+ readonly clock: ClockHead;
+ readonly name?: string;
+}
+
export type UpdateListenerFn = (docs: DocWithId[]) => Promise | void;
export type NoUpdateListenerFn = () => Promise | void;
export type ListenerFn = UpdateListenerFn | NoUpdateListenerFn;
diff --git a/src/write-queue.ts b/src/write-queue.ts
index a2430603..cfd68126 100644
--- a/src/write-queue.ts
+++ b/src/write-queue.ts
@@ -1,60 +1,100 @@
-import { DocTypes, MetaType, DocUpdate } from "./types.js";
+import { ensureLogger } from "./utils.js";
+import { DocTypes, MetaType, DocUpdate, SuperThis } from "./types.js";
+import { Future, Logger } from "@adviser/cement";
type WorkerFunction = (tasks: DocUpdate[]) => Promise;
export interface WriteQueue {
push(task: DocUpdate): Promise;
+ bulk(tasks: DocUpdate[]): Promise;
+ close(): Promise;
}
interface WriteQueueItem {
- readonly task: DocUpdate;
+ // readonly task?: DocUpdate;
+ readonly tasks?: DocUpdate[];
resolve(result: MetaType): void;
reject(error: Error): void;
}
-export function writeQueue(worker: WorkerFunction, payload = Infinity, unbounded = false): WriteQueue {
- const queue: WriteQueueItem[] = [];
- let isProcessing = false;
+export interface WriteQueueParams {
+ // default 32
+ // if chunkSize is 1 the result will be ordered in time
+ readonly chunkSize: number;
+}
+
+export function defaultWriteQueueOpts(opts: Partial = {}): WriteQueueParams {
+ return {
+ ...opts,
+ chunkSize: opts.chunkSize && opts.chunkSize > 0 ? opts.chunkSize : 32,
+ };
+}
- async function process() {
- if (isProcessing || queue.length === 0) return;
- isProcessing = true;
+class WriteQueueImpl implements WriteQueue {
+ private readonly opts: WriteQueueParams;
- const tasksToProcess = queue.splice(0, payload);
- const updates = tasksToProcess.map((item) => item.task);
+ private readonly queue: WriteQueueItem[] = [];
+ private readonly worker: WorkerFunction;
+ private isProcessing = false;
+ private readonly logger: Logger;
+
+ constructor(sthis: SuperThis, worker: WorkerFunction, opts: WriteQueueParams) {
+ this.logger = ensureLogger(sthis, "WriteQueueImpl");
+ this.worker = worker;
+ this.opts = defaultWriteQueueOpts(opts);
+ }
+
+ private waitForEmptyQueue?: Future;
+ private testEmptyQueue() {
+ if (this.waitForEmptyQueue && this.queue.length === 0) {
+ this.waitForEmptyQueue.resolve();
+ }
+ }
- if (unbounded) {
- // Run all updates in parallel and resolve/reject them individually
+ private async process() {
+ if (this.isProcessing || this.queue.length === 0) {
+ this.testEmptyQueue();
+ return;
+ }
+ this.isProcessing = true;
+ try {
+ this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks");
+ const tasksToProcess = this.queue.splice(0, this.opts.chunkSize);
+ const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate[][];
const promises = updates.map(async (update, index) => {
try {
- const result = await worker([update]);
+ const result = await this.worker(update);
tasksToProcess[index].resolve(result);
} catch (error) {
- tasksToProcess[index].reject(error as Error);
+ tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError());
}
});
-
- await Promise.all(promises);
- } else {
- // Original logic: Run updates in a batch and resolve/reject them together
- try {
- const result = await worker(updates);
- tasksToProcess.forEach((task) => task.resolve(result));
- } catch (error) {
- tasksToProcess.forEach((task) => task.reject(error as Error));
- }
+ await Promise.allSettled(promises);
+ this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks");
+ } catch (error) {
+ this.logger.Error().Err(error).Msg("Error processing tasks");
+ } finally {
+ this.isProcessing = false;
+ setTimeout(() => this.process(), 0);
}
+ }
- isProcessing = false;
- void process();
+ bulk(tasks: DocUpdate[]): Promise {
+ return new Promise((resolve, reject) => {
+ this.queue.push({ tasks, resolve, reject });
+ this.process();
+ });
+ }
+ push(task: DocUpdate): Promise {
+ return this.bulk([task]);
+ }
+ close(): Promise {
+ this.waitForEmptyQueue = new Future();
+ this.testEmptyQueue();
+ return this.waitForEmptyQueue.asPromise();
}
+}
- return {
- push(task: DocUpdate): Promise {
- return new Promise((resolve, reject) => {
- queue.push({ task, resolve, reject });
- void process();
- });
- },
- };
+export function writeQueue(sthis: SuperThis, worker: WorkerFunction, opts: WriteQueueParams): WriteQueue {
+ return new WriteQueueImpl(sthis, worker, opts);
}
diff --git a/tests/fireproof/crdt.test.ts b/tests/fireproof/crdt.test.ts
index 75ed9e33..0c1153b5 100644
--- a/tests/fireproof/crdt.test.ts
+++ b/tests/fireproof/crdt.test.ts
@@ -1,4 +1,4 @@
-import { CRDT, LedgerOpts, toStoreURIRuntime } from "@fireproof/core";
+import { CRDT, defaultWriteQueueOpts, LedgerOpts, toStoreURIRuntime } from "@fireproof/core";
import { bs } from "@fireproof/core";
import { CRDTMeta, DocValue } from "@fireproof/core";
import { Index, index } from "@fireproof/core";
@@ -15,6 +15,7 @@ describe("Fresh crdt", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
+ writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-crdt-cold"),
storeEnDe: bs.ensureStoreEnDeFile({}),
@@ -57,6 +58,7 @@ describe("CRDT with one record", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
+ writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, `test@${sthis.nextId()}`),
storeEnDe: bs.ensureStoreEnDeFile({}),
@@ -112,6 +114,7 @@ describe("CRDT with a multi-write", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
+ writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-crdt-cold"),
storeEnDe: bs.ensureStoreEnDeFile({}),
@@ -182,6 +185,7 @@ describe("CRDT with two multi-writes", function () {
beforeEach(async () => {
await sthis.start();
const dbOpts: LedgerOpts = {
+ writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, `test-multiple-writes@${sthis.nextId()}`),
storeEnDe: bs.ensureStoreEnDeFile({}),
@@ -235,6 +239,7 @@ describe("Compact a named CRDT with writes", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
+ writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, `named-crdt-compaction`),
storeEnDe: bs.ensureStoreEnDeFile({}),
@@ -296,6 +301,7 @@ describe("CRDT with an index", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
+ writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-crdt-cold"),
storeEnDe: bs.ensureStoreEnDeFile({}),
@@ -347,6 +353,7 @@ describe("Loader with a committed transaction", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
+ writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, dbname),
storeEnDe: bs.ensureStoreEnDeFile({}),
@@ -400,6 +407,7 @@ describe("Loader with two committed transactions", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
+ writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-loader"),
storeEnDe: bs.ensureStoreEnDeFile({}),
@@ -455,6 +463,7 @@ describe("Loader with many committed transactions", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
+ writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-loader-many"),
storeEnDe: bs.ensureStoreEnDeFile({}),
diff --git a/tests/fireproof/fireproof.test.ts b/tests/fireproof/fireproof.test.ts
index da092620..15e68f13 100644
--- a/tests/fireproof/fireproof.test.ts
+++ b/tests/fireproof/fireproof.test.ts
@@ -139,6 +139,15 @@ describe("basic ledger", function () {
const got = await db.get(ok.id);
expect(got.foo).toBe("bam");
});
+ it("can bulk an array", async function () {
+ const ok = await db.bulk([{ foo: "cool" }, { foo: "dude" }]);
+ expect(ok).toBeTruthy();
+ expect(ok.ids.length).toBe(2);
+ const got = await db.get(ok.ids[0]);
+ expect(got.foo).toBe("cool");
+ const got2 = await db.get(ok.ids[1]);
+ expect(got2.foo).toBe("dude");
+ });
it("can define an index", async function () {
const ok = await db.put({ _id: "test", foo: "bar" });
expect(ok).toBeTruthy();
diff --git a/tests/fireproof/indexer.test.ts b/tests/fireproof/indexer.test.ts
index aa139487..e1084b7e 100644
--- a/tests/fireproof/indexer.test.ts
+++ b/tests/fireproof/indexer.test.ts
@@ -1,4 +1,16 @@
-import { Index, index, Ledger, CRDT, IndexRows, LedgerOpts, toStoreURIRuntime, bs, rt, LedgerFactory } from "@fireproof/core";
+import {
+ Index,
+ index,
+ Ledger,
+ CRDT,
+ IndexRows,
+ LedgerOpts,
+ toStoreURIRuntime,
+ bs,
+ rt,
+ LedgerFactory,
+ defaultWriteQueueOpts,
+} from "@fireproof/core";
import { mockSuperThis } from "../helpers.js";
interface TestType {
@@ -278,6 +290,7 @@ describe("basic Index upon cold start", function () {
const logger = sthis.logger.With().Module("IndexerTest").Logger();
logger.Debug().Msg("enter beforeEach");
dbOpts = {
+ writeQueue: defaultWriteQueueOpts({}),
keyBag: rt.kb.defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-indexer-cold"),
storeEnDe: bs.ensureStoreEnDeFile({}),
diff --git a/tests/fireproof/ledger.test.ts b/tests/fireproof/ledger.test.ts
index 55847660..f8b60fd5 100644
--- a/tests/fireproof/ledger.test.ts
+++ b/tests/fireproof/ledger.test.ts
@@ -233,7 +233,7 @@ describe("named Ledger with record", function () {
// await Promise.all(writes)
// })
-describe("basic Ledger parallel writes / public", function () {
+describe("basic Ledger parallel writes / public ordered", () => {
let db: Ledger;
const writes: Promise[] = [];
const sthis = mockSuperThis();
@@ -241,20 +241,56 @@ describe("basic Ledger parallel writes / public", function () {
await db.close();
await db.destroy();
});
- beforeEach(async function () {
+ beforeEach(async () => {
await sthis.start();
- db = LedgerFactory("test-parallel-writes", { public: true });
+ db = LedgerFactory("test-parallel-writes-ordered", { writeQueue: { chunkSize: 1 } });
for (let i = 0; i < 10; i++) {
const doc = { _id: `id-${i}`, hello: "world" };
writes.push(db.put(doc));
}
await Promise.all(writes);
});
- it("should have one head", function () {
+
+ it("should have one head", () => {
const crdt = db.crdt;
expect(crdt.clock.head.length).toBe(1);
});
- it("should write all", async function () {
+
+ it("has changes ordered", async function () {
+ const { rows, clock } = await db.changes([]);
+ expect(clock[0]).toBe(db.crdt.clock.head[0]);
+ expect(rows.length).toBe(10);
+ for (let i = 0; i < 10; i++) {
+ expect(rows[i].key).toBe("id-" + i);
+ expect(rows[i].clock).toBeTruthy();
+ }
+ });
+});
+
+describe("basic Ledger parallel writes / public", () => {
+ let db: Ledger;
+ const writes: Promise[] = [];
+ const sthis = mockSuperThis();
+ afterEach(async () => {
+ await db.close();
+ await db.destroy();
+ });
+ beforeEach(async () => {
+ await sthis.start();
+ db = LedgerFactory("test-parallel-writes", { writeQueue: { chunkSize: 32 } });
+ for (let i = 0; i < 10; i++) {
+ const doc = { _id: `id-${i}`, hello: "world" };
+ writes.push(db.put(doc));
+ }
+ await Promise.all(writes);
+ });
+ it("should resolve to one head", async () => {
+ const crdt = db.crdt;
+ expect(crdt.clock.head.length).toBe(9);
+ await db.put({ _id: "id-10", hello: "world" });
+ expect(crdt.clock.head.length).toBe(1);
+ });
+ it("should write all", async () => {
for (let i = 0; i < 10; i++) {
const id = `id-${i}`;
const doc = await db.get<{ hello: string }>(id);
@@ -286,11 +322,12 @@ describe("basic Ledger parallel writes / public", function () {
expect(e.message).toMatch(/Not found/);
}
});
- it("has changes", async function () {
+ it("has changes not ordered", async function () {
const { rows, clock } = await db.changes([]);
expect(clock[0]).toBe(db.crdt.clock.head[0]);
expect(rows.length).toBe(10);
- // rows.sort((a, b) => a.key.localeCompare(b.key));
+ rows.sort((a, b) => a.key.localeCompare(b.key));
+ // console.log(rows);
for (let i = 0; i < 10; i++) {
expect(rows[i].key).toBe("id-" + i);
expect(rows[i].clock).toBeTruthy();