diff --git a/src/index.ts b/src/index.ts index 7e591a10..cdec6bb9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,6 +5,8 @@ export * from "./crdt.js"; export * from "./indexer.js"; +export { defaultWriteQueueOpts } from "./write-queue.js"; + export * as bs from "./blockstore/index.js"; export * as blockstore from "./blockstore/index.js"; diff --git a/src/ledger.ts b/src/ledger.ts index d8da28f3..d1a6432b 100644 --- a/src/ledger.ts +++ b/src/ledger.ts @@ -1,6 +1,6 @@ import { BuildURI, CoerceURI, KeyedResolvOnce, Logger, ResolveOnce, URI } from "@adviser/cement"; -import { WriteQueue, writeQueue } from "./write-queue.js"; +import { defaultWriteQueueOpts, WriteQueue, writeQueue, WriteQueueParams } from "./write-queue.js"; import { CRDT, HasCRDT } from "./crdt.js"; import { index } from "./indexer.js"; import { @@ -15,6 +15,7 @@ import { type IndexKeyType, type ListenerFn, type DocResponse, + type BulkResponse, type ChangesResponse, type DocTypes, type IndexRows, @@ -48,6 +49,8 @@ export interface LedgerOpts { // readonly public?: boolean; readonly meta?: DbMeta; readonly gatewayInterceptor?: GatewayInterceptor; + + readonly writeQueue: WriteQueueParams; // readonly factoryUnreg?: () => void; // readonly persistIndexes?: boolean; // readonly autoCompact?: number; @@ -73,6 +76,7 @@ export interface Ledger
> extends HasC get(id: string): Promise>; put(doc: DocSet): Promise; + bulk(docs: DocSet[]): Promise; del(id: string): Promise; changes(since?: ClockHead, opts?: ChangesOptions): Promise>; allDocs(opts?: AllDocsQueryOpts): Promise>; @@ -106,6 +110,7 @@ export function LedgerFactory>(name: s keyBag: defaultKeyBagOpts(sthis, opts?.keyBag), storeUrls: toStoreURIRuntime(sthis, name, opts?.storeUrls), gatewayInterceptor: opts?.gatewayInterceptor, + writeQueue: defaultWriteQueueOpts(opts?.writeQueue), storeEnDe: { encodeFile, decodeFile, @@ -161,6 +166,9 @@ export class LedgerShell
> implements put(doc: DocSet): Promise { return this.ref.put(doc); } + bulk(docs: DocSet[]): Promise { + return this.ref.bulk(docs); + } del(id: string): Promise { return this.ref.del(id); } @@ -225,6 +233,7 @@ class LedgerImpl
> implements Ledger fn()); } // await this.blockstore.close(); @@ -256,15 +265,9 @@ class LedgerImpl
> implements Ledger[]) => { - return await this.crdt.bulk(updates); - }); //, Infinity) - this.crdt.clock.onTock(() => { - this._no_update_notify(); - }); + this._writeQueue = writeQueue(this.sthis, async (updates: DocUpdate
[]) => this.crdt.bulk(updates), this.opts.writeQueue); + this.crdt.clock.onTock(() => this._no_update_notify()); } get name(): string { @@ -299,6 +302,23 @@ class LedgerImpl
> implements Ledger(docs: DocSet[]): Promise { + await this.ready(); + + const updates = docs.map((doc) => { + const id = doc._id || this.sthis.timeOrderedNextId().str; + return { + id, + value: { + ...(doc as unknown as DocSet
), + _id: id, + }, + }; + }); + const result = (await this._writeQueue.bulk(updates)) as CRDTMeta; + return { ids: updates.map((u) => u.id), clock: result.head, name: this.name } as BulkResponse; + } + async del(id: string): Promise { await this.ready(); this.logger.Debug().Str("id", id).Msg("del"); diff --git a/src/types.ts b/src/types.ts index d484eb1b..ce40c3df 100644 --- a/src/types.ts +++ b/src/types.ts @@ -6,6 +6,7 @@ import { EnvFactoryOpts, Env, Logger, CryptoRuntime, Result } from "@adviser/cem // import type { MakeDirectoryOptions, PathLike, Stats } from "fs"; import { KeyBagOpts } from "./runtime/key-bag.js"; +import { WriteQueueParams } from "./write-queue.js"; export type { DbMeta }; @@ -118,6 +119,7 @@ export interface ConfigOpts extends Partial { readonly public?: boolean; readonly meta?: DbMeta; // readonly persistIndexes?: boolean; + readonly writeQueue?: Partial; readonly gatewayInterceptor?: GatewayInterceptor; readonly autoCompact?: number; readonly storeUrls?: StoreUrlsOpts; @@ -301,6 +303,12 @@ export interface DocResponse { readonly name?: string; } +export interface BulkResponse { + readonly ids: string[]; + readonly clock: ClockHead; + readonly name?: string; +} + export type UpdateListenerFn = (docs: DocWithId[]) => Promise | void; export type NoUpdateListenerFn = () => Promise | void; export type ListenerFn = UpdateListenerFn | NoUpdateListenerFn; diff --git a/src/write-queue.ts b/src/write-queue.ts index a2430603..cfd68126 100644 --- a/src/write-queue.ts +++ b/src/write-queue.ts @@ -1,60 +1,100 @@ -import { DocTypes, MetaType, DocUpdate } from "./types.js"; +import { ensureLogger } from "./utils.js"; +import { DocTypes, MetaType, DocUpdate, SuperThis } from "./types.js"; +import { Future, Logger } from "@adviser/cement"; type WorkerFunction = (tasks: DocUpdate[]) => Promise; export interface WriteQueue { push(task: DocUpdate): Promise; + bulk(tasks: DocUpdate[]): Promise; + close(): Promise; } interface WriteQueueItem { - readonly task: DocUpdate; + // readonly task?: DocUpdate; + readonly tasks?: DocUpdate[]; resolve(result: MetaType): void; reject(error: Error): void; } -export function writeQueue(worker: WorkerFunction, payload = Infinity, unbounded = false): WriteQueue { - const queue: WriteQueueItem[] = []; - let isProcessing = false; +export interface WriteQueueParams { + // default 32 + // if chunkSize is 1 the result will be ordered in time + readonly chunkSize: number; +} + +export function defaultWriteQueueOpts(opts: Partial = {}): WriteQueueParams { + return { + ...opts, + chunkSize: opts.chunkSize && opts.chunkSize > 0 ? opts.chunkSize : 32, + }; +} - async function process() { - if (isProcessing || queue.length === 0) return; - isProcessing = true; +class WriteQueueImpl implements WriteQueue { + private readonly opts: WriteQueueParams; - const tasksToProcess = queue.splice(0, payload); - const updates = tasksToProcess.map((item) => item.task); + private readonly queue: WriteQueueItem[] = []; + private readonly worker: WorkerFunction; + private isProcessing = false; + private readonly logger: Logger; + + constructor(sthis: SuperThis, worker: WorkerFunction, opts: WriteQueueParams) { + this.logger = ensureLogger(sthis, "WriteQueueImpl"); + this.worker = worker; + this.opts = defaultWriteQueueOpts(opts); + } + + private waitForEmptyQueue?: Future; + private testEmptyQueue() { + if (this.waitForEmptyQueue && this.queue.length === 0) { + this.waitForEmptyQueue.resolve(); + } + } - if (unbounded) { - // Run all updates in parallel and resolve/reject them individually + private async process() { + if (this.isProcessing || this.queue.length === 0) { + this.testEmptyQueue(); + return; + } + this.isProcessing = true; + try { + this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks"); + const tasksToProcess = this.queue.splice(0, this.opts.chunkSize); + const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate[][]; const promises = updates.map(async (update, index) => { try { - const result = await worker([update]); + const result = await this.worker(update); tasksToProcess[index].resolve(result); } catch (error) { - tasksToProcess[index].reject(error as Error); + tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError()); } }); - - await Promise.all(promises); - } else { - // Original logic: Run updates in a batch and resolve/reject them together - try { - const result = await worker(updates); - tasksToProcess.forEach((task) => task.resolve(result)); - } catch (error) { - tasksToProcess.forEach((task) => task.reject(error as Error)); - } + await Promise.allSettled(promises); + this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks"); + } catch (error) { + this.logger.Error().Err(error).Msg("Error processing tasks"); + } finally { + this.isProcessing = false; + setTimeout(() => this.process(), 0); } + } - isProcessing = false; - void process(); + bulk(tasks: DocUpdate[]): Promise { + return new Promise((resolve, reject) => { + this.queue.push({ tasks, resolve, reject }); + this.process(); + }); + } + push(task: DocUpdate): Promise { + return this.bulk([task]); + } + close(): Promise { + this.waitForEmptyQueue = new Future(); + this.testEmptyQueue(); + return this.waitForEmptyQueue.asPromise(); } +} - return { - push(task: DocUpdate): Promise { - return new Promise((resolve, reject) => { - queue.push({ task, resolve, reject }); - void process(); - }); - }, - }; +export function writeQueue(sthis: SuperThis, worker: WorkerFunction, opts: WriteQueueParams): WriteQueue { + return new WriteQueueImpl(sthis, worker, opts); } diff --git a/tests/fireproof/crdt.test.ts b/tests/fireproof/crdt.test.ts index 75ed9e33..0c1153b5 100644 --- a/tests/fireproof/crdt.test.ts +++ b/tests/fireproof/crdt.test.ts @@ -1,4 +1,4 @@ -import { CRDT, LedgerOpts, toStoreURIRuntime } from "@fireproof/core"; +import { CRDT, defaultWriteQueueOpts, LedgerOpts, toStoreURIRuntime } from "@fireproof/core"; import { bs } from "@fireproof/core"; import { CRDTMeta, DocValue } from "@fireproof/core"; import { Index, index } from "@fireproof/core"; @@ -15,6 +15,7 @@ describe("Fresh crdt", function () { beforeEach(async function () { await sthis.start(); const dbOpts: LedgerOpts = { + writeQueue: defaultWriteQueueOpts({}), keyBag: defaultKeyBagOpts(sthis), storeUrls: toStoreURIRuntime(sthis, "test-crdt-cold"), storeEnDe: bs.ensureStoreEnDeFile({}), @@ -57,6 +58,7 @@ describe("CRDT with one record", function () { beforeEach(async function () { await sthis.start(); const dbOpts: LedgerOpts = { + writeQueue: defaultWriteQueueOpts({}), keyBag: defaultKeyBagOpts(sthis), storeUrls: toStoreURIRuntime(sthis, `test@${sthis.nextId()}`), storeEnDe: bs.ensureStoreEnDeFile({}), @@ -112,6 +114,7 @@ describe("CRDT with a multi-write", function () { beforeEach(async function () { await sthis.start(); const dbOpts: LedgerOpts = { + writeQueue: defaultWriteQueueOpts({}), keyBag: defaultKeyBagOpts(sthis), storeUrls: toStoreURIRuntime(sthis, "test-crdt-cold"), storeEnDe: bs.ensureStoreEnDeFile({}), @@ -182,6 +185,7 @@ describe("CRDT with two multi-writes", function () { beforeEach(async () => { await sthis.start(); const dbOpts: LedgerOpts = { + writeQueue: defaultWriteQueueOpts({}), keyBag: defaultKeyBagOpts(sthis), storeUrls: toStoreURIRuntime(sthis, `test-multiple-writes@${sthis.nextId()}`), storeEnDe: bs.ensureStoreEnDeFile({}), @@ -235,6 +239,7 @@ describe("Compact a named CRDT with writes", function () { beforeEach(async function () { await sthis.start(); const dbOpts: LedgerOpts = { + writeQueue: defaultWriteQueueOpts({}), keyBag: defaultKeyBagOpts(sthis), storeUrls: toStoreURIRuntime(sthis, `named-crdt-compaction`), storeEnDe: bs.ensureStoreEnDeFile({}), @@ -296,6 +301,7 @@ describe("CRDT with an index", function () { beforeEach(async function () { await sthis.start(); const dbOpts: LedgerOpts = { + writeQueue: defaultWriteQueueOpts({}), keyBag: defaultKeyBagOpts(sthis), storeUrls: toStoreURIRuntime(sthis, "test-crdt-cold"), storeEnDe: bs.ensureStoreEnDeFile({}), @@ -347,6 +353,7 @@ describe("Loader with a committed transaction", function () { beforeEach(async function () { await sthis.start(); const dbOpts: LedgerOpts = { + writeQueue: defaultWriteQueueOpts({}), keyBag: defaultKeyBagOpts(sthis), storeUrls: toStoreURIRuntime(sthis, dbname), storeEnDe: bs.ensureStoreEnDeFile({}), @@ -400,6 +407,7 @@ describe("Loader with two committed transactions", function () { beforeEach(async function () { await sthis.start(); const dbOpts: LedgerOpts = { + writeQueue: defaultWriteQueueOpts({}), keyBag: defaultKeyBagOpts(sthis), storeUrls: toStoreURIRuntime(sthis, "test-loader"), storeEnDe: bs.ensureStoreEnDeFile({}), @@ -455,6 +463,7 @@ describe("Loader with many committed transactions", function () { beforeEach(async function () { await sthis.start(); const dbOpts: LedgerOpts = { + writeQueue: defaultWriteQueueOpts({}), keyBag: defaultKeyBagOpts(sthis), storeUrls: toStoreURIRuntime(sthis, "test-loader-many"), storeEnDe: bs.ensureStoreEnDeFile({}), diff --git a/tests/fireproof/fireproof.test.ts b/tests/fireproof/fireproof.test.ts index da092620..15e68f13 100644 --- a/tests/fireproof/fireproof.test.ts +++ b/tests/fireproof/fireproof.test.ts @@ -139,6 +139,15 @@ describe("basic ledger", function () { const got = await db.get(ok.id); expect(got.foo).toBe("bam"); }); + it("can bulk an array", async function () { + const ok = await db.bulk([{ foo: "cool" }, { foo: "dude" }]); + expect(ok).toBeTruthy(); + expect(ok.ids.length).toBe(2); + const got = await db.get(ok.ids[0]); + expect(got.foo).toBe("cool"); + const got2 = await db.get(ok.ids[1]); + expect(got2.foo).toBe("dude"); + }); it("can define an index", async function () { const ok = await db.put({ _id: "test", foo: "bar" }); expect(ok).toBeTruthy(); diff --git a/tests/fireproof/indexer.test.ts b/tests/fireproof/indexer.test.ts index aa139487..e1084b7e 100644 --- a/tests/fireproof/indexer.test.ts +++ b/tests/fireproof/indexer.test.ts @@ -1,4 +1,16 @@ -import { Index, index, Ledger, CRDT, IndexRows, LedgerOpts, toStoreURIRuntime, bs, rt, LedgerFactory } from "@fireproof/core"; +import { + Index, + index, + Ledger, + CRDT, + IndexRows, + LedgerOpts, + toStoreURIRuntime, + bs, + rt, + LedgerFactory, + defaultWriteQueueOpts, +} from "@fireproof/core"; import { mockSuperThis } from "../helpers.js"; interface TestType { @@ -278,6 +290,7 @@ describe("basic Index upon cold start", function () { const logger = sthis.logger.With().Module("IndexerTest").Logger(); logger.Debug().Msg("enter beforeEach"); dbOpts = { + writeQueue: defaultWriteQueueOpts({}), keyBag: rt.kb.defaultKeyBagOpts(sthis), storeUrls: toStoreURIRuntime(sthis, "test-indexer-cold"), storeEnDe: bs.ensureStoreEnDeFile({}), diff --git a/tests/fireproof/ledger.test.ts b/tests/fireproof/ledger.test.ts index 55847660..f8b60fd5 100644 --- a/tests/fireproof/ledger.test.ts +++ b/tests/fireproof/ledger.test.ts @@ -233,7 +233,7 @@ describe("named Ledger with record", function () { // await Promise.all(writes) // }) -describe("basic Ledger parallel writes / public", function () { +describe("basic Ledger parallel writes / public ordered", () => { let db: Ledger; const writes: Promise[] = []; const sthis = mockSuperThis(); @@ -241,20 +241,56 @@ describe("basic Ledger parallel writes / public", function () { await db.close(); await db.destroy(); }); - beforeEach(async function () { + beforeEach(async () => { await sthis.start(); - db = LedgerFactory("test-parallel-writes", { public: true }); + db = LedgerFactory("test-parallel-writes-ordered", { writeQueue: { chunkSize: 1 } }); for (let i = 0; i < 10; i++) { const doc = { _id: `id-${i}`, hello: "world" }; writes.push(db.put(doc)); } await Promise.all(writes); }); - it("should have one head", function () { + + it("should have one head", () => { const crdt = db.crdt; expect(crdt.clock.head.length).toBe(1); }); - it("should write all", async function () { + + it("has changes ordered", async function () { + const { rows, clock } = await db.changes([]); + expect(clock[0]).toBe(db.crdt.clock.head[0]); + expect(rows.length).toBe(10); + for (let i = 0; i < 10; i++) { + expect(rows[i].key).toBe("id-" + i); + expect(rows[i].clock).toBeTruthy(); + } + }); +}); + +describe("basic Ledger parallel writes / public", () => { + let db: Ledger; + const writes: Promise[] = []; + const sthis = mockSuperThis(); + afterEach(async () => { + await db.close(); + await db.destroy(); + }); + beforeEach(async () => { + await sthis.start(); + db = LedgerFactory("test-parallel-writes", { writeQueue: { chunkSize: 32 } }); + for (let i = 0; i < 10; i++) { + const doc = { _id: `id-${i}`, hello: "world" }; + writes.push(db.put(doc)); + } + await Promise.all(writes); + }); + it("should resolve to one head", async () => { + const crdt = db.crdt; + expect(crdt.clock.head.length).toBe(9); + await db.put({ _id: "id-10", hello: "world" }); + expect(crdt.clock.head.length).toBe(1); + }); + it("should write all", async () => { for (let i = 0; i < 10; i++) { const id = `id-${i}`; const doc = await db.get<{ hello: string }>(id); @@ -286,11 +322,12 @@ describe("basic Ledger parallel writes / public", function () { expect(e.message).toMatch(/Not found/); } }); - it("has changes", async function () { + it("has changes not ordered", async function () { const { rows, clock } = await db.changes([]); expect(clock[0]).toBe(db.crdt.clock.head[0]); expect(rows.length).toBe(10); - // rows.sort((a, b) => a.key.localeCompare(b.key)); + rows.sort((a, b) => a.key.localeCompare(b.key)); + // console.log(rows); for (let i = 0; i < 10; i++) { expect(rows[i].key).toBe("id-" + i); expect(rows[i].clock).toBeTruthy();