Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write multiple documents in a single operation #467

Merged
merged 8 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ export * from "./crdt.js";

export * from "./indexer.js";

export { defaultWriteQueueOpts } from "./write-queue.js";

export * as bs from "./blockstore/index.js";
export * as blockstore from "./blockstore/index.js";

Expand Down
38 changes: 29 additions & 9 deletions src/ledger.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { BuildURI, CoerceURI, KeyedResolvOnce, Logger, ResolveOnce, URI } from "@adviser/cement";

import { WriteQueue, writeQueue } from "./write-queue.js";
import { defaultWriteQueueOpts, WriteQueue, writeQueue, WriteQueueParams } from "./write-queue.js";
import { CRDT, HasCRDT } from "./crdt.js";
import { index } from "./indexer.js";
import {
Expand All @@ -15,6 +15,7 @@ import {
type IndexKeyType,
type ListenerFn,
type DocResponse,
type BulkResponse,
type ChangesResponse,
type DocTypes,
type IndexRows,
Expand Down Expand Up @@ -48,6 +49,8 @@ export interface LedgerOpts {
// readonly public?: boolean;
readonly meta?: DbMeta;
readonly gatewayInterceptor?: GatewayInterceptor;

readonly writeQueue: WriteQueueParams;
// readonly factoryUnreg?: () => void;
// readonly persistIndexes?: boolean;
// readonly autoCompact?: number;
Expand All @@ -73,6 +76,7 @@ export interface Ledger<DT extends DocTypes = NonNullable<unknown>> extends HasC

get<T extends DocTypes>(id: string): Promise<DocWithId<T>>;
put<T extends DocTypes>(doc: DocSet<T>): Promise<DocResponse>;
bulk<T extends DocTypes>(docs: DocSet<T>[]): Promise<BulkResponse>;
del(id: string): Promise<DocResponse>;
changes<T extends DocTypes>(since?: ClockHead, opts?: ChangesOptions): Promise<ChangesResponse<T>>;
allDocs<T extends DocTypes>(opts?: AllDocsQueryOpts): Promise<AllDocsResponse<T>>;
Expand Down Expand Up @@ -106,6 +110,7 @@ export function LedgerFactory<T extends DocTypes = NonNullable<unknown>>(name: s
keyBag: defaultKeyBagOpts(sthis, opts?.keyBag),
storeUrls: toStoreURIRuntime(sthis, name, opts?.storeUrls),
gatewayInterceptor: opts?.gatewayInterceptor,
writeQueue: defaultWriteQueueOpts(opts?.writeQueue),
storeEnDe: {
encodeFile,
decodeFile,
Expand Down Expand Up @@ -161,6 +166,9 @@ export class LedgerShell<DT extends DocTypes = NonNullable<unknown>> implements
put<T extends DocTypes>(doc: DocSet<T>): Promise<DocResponse> {
return this.ref.put(doc);
}
bulk<T extends DocTypes>(docs: DocSet<T>[]): Promise<BulkResponse> {
return this.ref.bulk(docs);
}
del(id: string): Promise<DocResponse> {
return this.ref.del(id);
}
Expand Down Expand Up @@ -225,6 +233,7 @@ class LedgerImpl<DT extends DocTypes = NonNullable<unknown>> implements Ledger<D
if (this.shells.size === 0) {
await this.ready();
await this.crdt.close();
await this._writeQueue.close();
this._onClosedFns.forEach((fn) => fn());
}
// await this.blockstore.close();
Expand Down Expand Up @@ -256,15 +265,9 @@ class LedgerImpl<DT extends DocTypes = NonNullable<unknown>> implements Ledger<D
this.sthis = sthis;
this.id = sthis.timeOrderedNextId().str;
this.logger = ensureLogger(this.sthis, "Ledger");
// this.logger.SetDebug("Ledger")
this.crdt = new CRDT(this.sthis, this.opts);
// this.blockstore = this._crdt.blockstore; // for connector compatibility
this._writeQueue = writeQueue(async (updates: DocUpdate<DT>[]) => {
return await this.crdt.bulk(updates);
}); //, Infinity)
this.crdt.clock.onTock(() => {
this._no_update_notify();
});
this._writeQueue = writeQueue(this.sthis, async (updates: DocUpdate<DT>[]) => this.crdt.bulk(updates), this.opts.writeQueue);
this.crdt.clock.onTock(() => this._no_update_notify());
}

get name(): string {
Expand Down Expand Up @@ -299,6 +302,23 @@ class LedgerImpl<DT extends DocTypes = NonNullable<unknown>> implements Ledger<D
return { id: docId, clock: result?.head, name: this.name } as DocResponse;
}

async bulk<T extends DocTypes>(docs: DocSet<T>[]): Promise<BulkResponse> {
await this.ready();

const updates = docs.map((doc) => {
const id = doc._id || this.sthis.timeOrderedNextId().str;
return {
id,
value: {
...(doc as unknown as DocSet<DT>),
_id: id,
},
};
});
const result = (await this._writeQueue.bulk(updates)) as CRDTMeta;
return { ids: updates.map((u) => u.id), clock: result.head, name: this.name } as BulkResponse;
}

async del(id: string): Promise<DocResponse> {
await this.ready();
this.logger.Debug().Str("id", id).Msg("del");
Expand Down
8 changes: 8 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { EnvFactoryOpts, Env, Logger, CryptoRuntime, Result } from "@adviser/cem

// import type { MakeDirectoryOptions, PathLike, Stats } from "fs";
import { KeyBagOpts } from "./runtime/key-bag.js";
import { WriteQueueParams } from "./write-queue.js";

export type { DbMeta };

Expand Down Expand Up @@ -118,6 +119,7 @@ export interface ConfigOpts extends Partial<SuperThisOpts> {
readonly public?: boolean;
readonly meta?: DbMeta;
// readonly persistIndexes?: boolean;
readonly writeQueue?: Partial<WriteQueueParams>;
readonly gatewayInterceptor?: GatewayInterceptor;
readonly autoCompact?: number;
readonly storeUrls?: StoreUrlsOpts;
Expand Down Expand Up @@ -301,6 +303,12 @@ export interface DocResponse {
readonly name?: string;
}

export interface BulkResponse {
readonly ids: string[];
readonly clock: ClockHead;
readonly name?: string;
}

export type UpdateListenerFn<T extends DocTypes> = (docs: DocWithId<T>[]) => Promise<void> | void;
export type NoUpdateListenerFn = () => Promise<void> | void;
export type ListenerFn<T extends DocTypes> = UpdateListenerFn<T> | NoUpdateListenerFn;
108 changes: 74 additions & 34 deletions src/write-queue.ts
Original file line number Diff line number Diff line change
@@ -1,60 +1,100 @@
import { DocTypes, MetaType, DocUpdate } from "./types.js";
import { ensureLogger } from "./utils.js";
import { DocTypes, MetaType, DocUpdate, SuperThis } from "./types.js";
import { Future, Logger } from "@adviser/cement";

type WorkerFunction<T extends DocTypes> = (tasks: DocUpdate<T>[]) => Promise<MetaType>;

export interface WriteQueue<T extends DocTypes> {
push(task: DocUpdate<T>): Promise<MetaType>;
bulk(tasks: DocUpdate<T>[]): Promise<MetaType>;
close(): Promise<void>;
}

interface WriteQueueItem<T extends DocTypes> {
readonly task: DocUpdate<T>;
// readonly task?: DocUpdate<T>;
readonly tasks?: DocUpdate<T>[];
resolve(result: MetaType): void;
reject(error: Error): void;
}

export function writeQueue<T extends DocTypes>(worker: WorkerFunction<T>, payload = Infinity, unbounded = false): WriteQueue<T> {
const queue: WriteQueueItem<T>[] = [];
let isProcessing = false;
export interface WriteQueueParams {
// default 32
// if chunkSize is 1 the result will be ordered in time
readonly chunkSize: number;
}

export function defaultWriteQueueOpts(opts: Partial<WriteQueueParams> = {}): WriteQueueParams {
return {
...opts,
chunkSize: opts.chunkSize && opts.chunkSize > 0 ? opts.chunkSize : 32,
};
}

async function process() {
if (isProcessing || queue.length === 0) return;
isProcessing = true;
class WriteQueueImpl<T extends DocTypes> implements WriteQueue<T> {
private readonly opts: WriteQueueParams;

const tasksToProcess = queue.splice(0, payload);
const updates = tasksToProcess.map((item) => item.task);
private readonly queue: WriteQueueItem<T>[] = [];
private readonly worker: WorkerFunction<T>;
private isProcessing = false;
private readonly logger: Logger;

constructor(sthis: SuperThis, worker: WorkerFunction<T>, opts: WriteQueueParams) {
this.logger = ensureLogger(sthis, "WriteQueueImpl");
this.worker = worker;
this.opts = defaultWriteQueueOpts(opts);
}

private waitForEmptyQueue?: Future<void>;
private testEmptyQueue() {
if (this.waitForEmptyQueue && this.queue.length === 0) {
this.waitForEmptyQueue.resolve();
}
}

if (unbounded) {
// Run all updates in parallel and resolve/reject them individually
private async process() {
if (this.isProcessing || this.queue.length === 0) {
this.testEmptyQueue();
return;
}
this.isProcessing = true;
try {
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks");
const tasksToProcess = this.queue.splice(0, this.opts.chunkSize);
const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][];
const promises = updates.map(async (update, index) => {
try {
const result = await worker([update]);
const result = await this.worker(update);
tasksToProcess[index].resolve(result);
} catch (error) {
tasksToProcess[index].reject(error as Error);
tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError());
}
});

await Promise.all(promises);
} else {
// Original logic: Run updates in a batch and resolve/reject them together
try {
const result = await worker(updates);
tasksToProcess.forEach((task) => task.resolve(result));
} catch (error) {
tasksToProcess.forEach((task) => task.reject(error as Error));
}
await Promise.allSettled(promises);
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks");
} catch (error) {
this.logger.Error().Err(error).Msg("Error processing tasks");
} finally {
this.isProcessing = false;
setTimeout(() => this.process(), 0);
}
}

isProcessing = false;
void process();
bulk(tasks: DocUpdate<T>[]): Promise<MetaType> {
return new Promise<MetaType>((resolve, reject) => {
this.queue.push({ tasks, resolve, reject });
this.process();
});
}
push(task: DocUpdate<T>): Promise<MetaType> {
return this.bulk([task]);
}
close(): Promise<void> {
this.waitForEmptyQueue = new Future();
this.testEmptyQueue();
return this.waitForEmptyQueue.asPromise();
}
}

return {
push(task: DocUpdate<T>): Promise<MetaType> {
return new Promise<MetaType>((resolve, reject) => {
queue.push({ task, resolve, reject });
void process();
});
},
};
export function writeQueue<T extends DocTypes>(sthis: SuperThis, worker: WorkerFunction<T>, opts: WriteQueueParams): WriteQueue<T> {
return new WriteQueueImpl<T>(sthis, worker, opts);
}
11 changes: 10 additions & 1 deletion tests/fireproof/crdt.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CRDT, LedgerOpts, toStoreURIRuntime } from "@fireproof/core";
import { CRDT, defaultWriteQueueOpts, LedgerOpts, toStoreURIRuntime } from "@fireproof/core";
import { bs } from "@fireproof/core";
import { CRDTMeta, DocValue } from "@fireproof/core";
import { Index, index } from "@fireproof/core";
Expand All @@ -15,6 +15,7 @@ describe("Fresh crdt", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-crdt-cold"),
storeEnDe: bs.ensureStoreEnDeFile({}),
Expand Down Expand Up @@ -57,6 +58,7 @@ describe("CRDT with one record", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, `test@${sthis.nextId()}`),
storeEnDe: bs.ensureStoreEnDeFile({}),
Expand Down Expand Up @@ -112,6 +114,7 @@ describe("CRDT with a multi-write", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-crdt-cold"),
storeEnDe: bs.ensureStoreEnDeFile({}),
Expand Down Expand Up @@ -182,6 +185,7 @@ describe("CRDT with two multi-writes", function () {
beforeEach(async () => {
await sthis.start();
const dbOpts: LedgerOpts = {
writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, `test-multiple-writes@${sthis.nextId()}`),
storeEnDe: bs.ensureStoreEnDeFile({}),
Expand Down Expand Up @@ -235,6 +239,7 @@ describe("Compact a named CRDT with writes", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, `named-crdt-compaction`),
storeEnDe: bs.ensureStoreEnDeFile({}),
Expand Down Expand Up @@ -296,6 +301,7 @@ describe("CRDT with an index", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-crdt-cold"),
storeEnDe: bs.ensureStoreEnDeFile({}),
Expand Down Expand Up @@ -347,6 +353,7 @@ describe("Loader with a committed transaction", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, dbname),
storeEnDe: bs.ensureStoreEnDeFile({}),
Expand Down Expand Up @@ -400,6 +407,7 @@ describe("Loader with two committed transactions", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-loader"),
storeEnDe: bs.ensureStoreEnDeFile({}),
Expand Down Expand Up @@ -455,6 +463,7 @@ describe("Loader with many committed transactions", function () {
beforeEach(async function () {
await sthis.start();
const dbOpts: LedgerOpts = {
writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-loader-many"),
storeEnDe: bs.ensureStoreEnDeFile({}),
Expand Down
9 changes: 9 additions & 0 deletions tests/fireproof/fireproof.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ describe("basic ledger", function () {
const got = await db.get<Doc>(ok.id);
expect(got.foo).toBe("bam");
});
it("can bulk an array", async function () {
const ok = await db.bulk([{ foo: "cool" }, { foo: "dude" }]);
expect(ok).toBeTruthy();
expect(ok.ids.length).toBe(2);
const got = await db.get<Doc>(ok.ids[0]);
expect(got.foo).toBe("cool");
const got2 = await db.get<Doc>(ok.ids[1]);
expect(got2.foo).toBe("dude");
});
it("can define an index", async function () {
const ok = await db.put({ _id: "test", foo: "bar" });
expect(ok).toBeTruthy();
Expand Down
15 changes: 14 additions & 1 deletion tests/fireproof/indexer.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
import { Index, index, Ledger, CRDT, IndexRows, LedgerOpts, toStoreURIRuntime, bs, rt, LedgerFactory } from "@fireproof/core";
import {
Index,
index,
Ledger,
CRDT,
IndexRows,
LedgerOpts,
toStoreURIRuntime,
bs,
rt,
LedgerFactory,
defaultWriteQueueOpts,
} from "@fireproof/core";
import { mockSuperThis } from "../helpers.js";

interface TestType {
Expand Down Expand Up @@ -278,6 +290,7 @@ describe("basic Index upon cold start", function () {
const logger = sthis.logger.With().Module("IndexerTest").Logger();
logger.Debug().Msg("enter beforeEach");
dbOpts = {
writeQueue: defaultWriteQueueOpts({}),
keyBag: rt.kb.defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, "test-indexer-cold"),
storeEnDe: bs.ensureStoreEnDeFile({}),
Expand Down
Loading
Loading