Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebuild the query and allDocs API #464

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/crdt-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ export async function applyBulkUpdateToCrdt<T extends DocTypes>(
return { head: result.head } as CRDTMeta;
}

export function docUpdateToDocWithId<T extends DocTypes>({ id, del, value }: DocUpdate<T>): DocWithId<T> {
return (del ? { _id: id, _deleted: true } : { _id: id, ...value }) as DocWithId<T>;
}

// this whole thing can get pulled outside of the write queue
async function writeDocContent<T extends DocTypes>(
store: StoreRuntime,
Expand Down
116 changes: 95 additions & 21 deletions src/crdt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,29 @@ import {
toStoreRuntime,
} from "./blockstore/index.js";
import {
clockChangesSince,
applyBulkUpdateToCrdt,
getValueFromCrdt,
readFiles,
getAllEntries,
clockVis,
getBlock,
doCompact,
docUpdateToDocWithId,
getAllEntries,
} from "./crdt-helpers.js";
import type {
DocUpdate,
CRDTMeta,
ClockHead,
ChangesOptions,
DocValue,
IndexKeyType,
DocWithId,
DocTypes,
Falsy,
SuperThis,
IndexTransactionMeta,
QueryResponse,
ListenerFn,
QueryStreamMarker,
} from "./types.js";
import { index, type Index } from "./indexer.js";
import { CRDTClock } from "./crdt-clock.js";
Expand All @@ -55,6 +57,11 @@ export class CRDT<T extends DocTypes> {
readonly logger: Logger;
readonly sthis: SuperThis;

// Subscriptions
_listening = false;
readonly _listeners = new Set<ListenerFn<T>>();
readonly _noupdate_listeners = new Set<ListenerFn<T>>();

constructor(sthis: SuperThis, opts: LedgerOpts) {
this.sthis = sthis;
this.logger = ensureLogger(sthis, "CRDT");
Expand Down Expand Up @@ -151,13 +158,91 @@ export class CRDT<T extends DocTypes> {

// if (snap) await this.clock.applyHead(crdtMeta.head, this.clock.head)

async allDocs(): Promise<{ result: DocUpdate<T>[]; head: ClockHead }> {
await this.ready();
const result: DocUpdate<T>[] = [];
for await (const entry of getAllEntries<T>(this.blockstore, this.clock.head, this.logger)) {
result.push(entry);
}
return { result, head: this.clock.head };
/**
* Retrieve the current set of documents.
*/
allDocs<T extends DocTypes>(opts: { waitFor?: Promise<unknown> } = {}): QueryResponse<T> {
const waitFor = opts.waitFor;

const currentDocs = (since?: ClockHead) => {
void since;

// TODO:
// const opts: ChangesOptions = {};

// TODO:
// if (since) {
// return clockChangesSince<T>(this.blockstore, this.clock.head, since || [], opts, this.logger).then((a) => a.result);
// }

const iterator = getAllEntries<T>(this.blockstore, this.clock.head, this.logger);
return iterator;
};

const snapshot = async () => {
await waitFor;
await this.ready();
// TODO: Map over async iterable
// return currentDocs().map(docUpdateToDocWithId)

// NOTE:
// return Array.fromAsync(currentDocs()).then((a) => a.map(docUpdateToDocWithId));

const docs: DocWithId<T>[] = [];
for await (const update of currentDocs()) {
docs.push(docUpdateToDocWithId(update));
}
return docs;
};

const stream = (opts: { futureOnly: boolean; since?: ClockHead }) => {
const clock = this.clock;
const ready = this.ready.bind(this);

return new ReadableStream<{ doc: DocWithId<T>; marker: QueryStreamMarker }>({
async start(controller) {
await waitFor;
await ready();

if (opts.futureOnly === false) {
const it = currentDocs(opts.since);

async function iterate(prevValue: DocUpdate<T>) {
const { done, value } = await it.next();

controller.enqueue({
doc: docUpdateToDocWithId(prevValue),
marker: { kind: "preexisting", done: done || false },
});

if (!done) await iterate(value);
}

const { value } = await it.next();
if (value) await iterate(value);
}

clock.onTick((updates: DocUpdate<NonNullable<unknown>>[]) => {
updates.forEach((update) => {
controller.enqueue({ doc: docUpdateToDocWithId(update as DocUpdate<T>), marker: { kind: "new" } });
});
});
},

// NOTE: Ideally we unsubscribe from `onTick` here.
// cancel() {}
});
};

return {
snapshot,
live(opts?: { since?: ClockHead }) {
return stream({ futureOnly: false, since: opts?.since });
},
future() {
return stream({ futureOnly: true });
},
};
}

async vis(): Promise<string> {
Expand All @@ -181,17 +266,6 @@ export class CRDT<T extends DocTypes> {
return result;
}

async changes(
since: ClockHead = [],
opts: ChangesOptions = {},
): Promise<{
result: DocUpdate<T>[];
head: ClockHead;
}> {
await this.ready();
return await clockChangesSince<T>(this.blockstore, this.clock.head, since, opts, this.logger);
}

async compact(): Promise<void> {
const blocks = this.blockstore as EncryptedBlockstore;
return await blocks.compact();
Expand Down
122 changes: 10 additions & 112 deletions src/ledger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,20 @@ import { CRDT, HasCRDT } from "./crdt.js";
import { index } from "./indexer.js";
import {
type DocUpdate,
type ClockHead,
type ConfigOpts,
type MapFn,
type QueryOpts,
type ChangesOptions,
type DocSet,
type DocWithId,
type IndexKeyType,
type ListenerFn,
type DocResponse,
type ChangesResponse,
type DocTypes,
type IndexRows,
type DocFragment,
type ChangesResponseRow,
type CRDTMeta,
type AllDocsQueryOpts,
type AllDocsResponse,
type SuperThis,
PARAM,
QueryResponse,
} from "./types.js";
import { DbMeta, StoreEnDeFile, StoreURIRuntime, StoreUrlsOpts, getDefaultURI, GatewayInterceptor } from "./blockstore/index.js";
import { ensureLogger, ensureSuperThis, NotFoundError, toSortedArray } from "./utils.js";
Expand Down Expand Up @@ -74,16 +68,8 @@ export interface Ledger<DT extends DocTypes = NonNullable<unknown>> extends HasC
get<T extends DocTypes>(id: string): Promise<DocWithId<T>>;
put<T extends DocTypes>(doc: DocSet<T>): Promise<DocResponse>;
del(id: string): Promise<DocResponse>;
changes<T extends DocTypes>(since?: ClockHead, opts?: ChangesOptions): Promise<ChangesResponse<T>>;
allDocs<T extends DocTypes>(opts?: AllDocsQueryOpts): Promise<AllDocsResponse<T>>;
allDocuments<T extends DocTypes>(): Promise<{
rows: {
key: string;
value: DocWithId<T>;
}[];
clock: ClockHead;
}>;
subscribe<T extends DocTypes>(listener: ListenerFn<T>, updates?: boolean): () => void;
allDocs<T extends DocTypes>(): QueryResponse<T>;
allDocuments<T extends DocTypes>(): QueryResponse<T>;

query<K extends IndexKeyType, T extends DocTypes, R extends DocFragment = T>(
field: string | MapFn<T>,
Expand Down Expand Up @@ -139,7 +125,6 @@ export class LedgerShell<DT extends DocTypes = NonNullable<unknown>> implements
get crdt(): CRDT<DT> {
return this.ref.crdt;
}

get name(): string {
return this.ref.name;
}
Expand All @@ -164,24 +149,12 @@ export class LedgerShell<DT extends DocTypes = NonNullable<unknown>> implements
del(id: string): Promise<DocResponse> {
return this.ref.del(id);
}
changes<T extends DocTypes>(since?: ClockHead, opts?: ChangesOptions): Promise<ChangesResponse<T>> {
return this.ref.changes(since, opts);
}
allDocs<T extends DocTypes>(opts?: AllDocsQueryOpts): Promise<AllDocsResponse<T>> {
return this.ref.allDocs(opts);
allDocs<T extends DocTypes>(): QueryResponse<T> {
return this.ref.allDocs();
}
allDocuments<T extends DocTypes>(): Promise<{
rows: {
key: string;
value: DocWithId<T>;
}[];
clock: ClockHead;
}> {
allDocuments<T extends DocTypes>(): QueryResponse<T> {
return this.ref.allDocuments();
}
subscribe<T extends DocTypes>(listener: ListenerFn<T>, updates?: boolean): () => void {
return this.ref.subscribe(listener, updates);
}
query<K extends IndexKeyType, T extends DocTypes, R extends DocFragment = T>(
field: string | MapFn<T>,
opts?: QueryOpts<K>,
Expand All @@ -197,9 +170,6 @@ class LedgerImpl<DT extends DocTypes = NonNullable<unknown>> implements Ledger<D
// readonly name: string;
readonly opts: LedgerOpts;

_listening = false;
readonly _listeners = new Set<ListenerFn<DT>>();
readonly _noupdate_listeners = new Set<ListenerFn<DT>>();
readonly crdt: CRDT<DT>;
readonly _writeQueue: WriteQueue<DT>;
// readonly blockstore: BaseBlockstore;
Expand Down Expand Up @@ -262,9 +232,6 @@ class LedgerImpl<DT extends DocTypes = NonNullable<unknown>> implements Ledger<D
this._writeQueue = writeQueue(async (updates: DocUpdate<DT>[]) => {
return await this.crdt.bulk(updates);
}); //, Infinity)
this.crdt.clock.onTock(() => {
this._no_update_notify();
});
}

get name(): string {
Expand Down Expand Up @@ -306,59 +273,13 @@ class LedgerImpl<DT extends DocTypes = NonNullable<unknown>> implements Ledger<D
return { id, clock: result?.head, name: this.name } as DocResponse;
}

async changes<T extends DocTypes>(since: ClockHead = [], opts: ChangesOptions = {}): Promise<ChangesResponse<T>> {
await this.ready();
this.logger.Debug().Any("since", since).Any("opts", opts).Msg("changes");
const { result, head } = await this.crdt.changes(since, opts);
const rows: ChangesResponseRow<T>[] = result.map(({ id: key, value, del, clock }) => ({
key,
value: (del ? { _id: key, _deleted: true } : { _id: key, ...value }) as DocWithId<T>,
clock,
}));
return { rows, clock: head, name: this.name };
}

async allDocs<T extends DocTypes>(opts: AllDocsQueryOpts = {}): Promise<AllDocsResponse<T>> {
await this.ready();
void opts;
allDocs<T extends DocTypes>(): QueryResponse<T> {
this.logger.Debug().Msg("allDocs");
const { result, head } = await this.crdt.allDocs();
const rows = result.map(({ id: key, value, del }) => ({
key,
value: (del ? { _id: key, _deleted: true } : { _id: key, ...value }) as DocWithId<T>,
}));
return { rows, clock: head, name: this.name };
}

async allDocuments<T extends DocTypes>(): Promise<{
rows: {
key: string;
value: DocWithId<T>;
}[];
clock: ClockHead;
}> {
return this.allDocs<T>();
return this.crdt.allDocs({ waitFor: this.ready() });
}

subscribe<T extends DocTypes>(listener: ListenerFn<T>, updates?: boolean): () => void {
this.logger.Debug().Bool("updates", updates).Msg("subscribe");
if (updates) {
if (!this._listening) {
this._listening = true;
this.crdt.clock.onTick((updates: DocUpdate<NonNullable<unknown>>[]) => {
void this._notify(updates);
});
}
this._listeners.add(listener as ListenerFn<NonNullable<unknown>>);
return () => {
this._listeners.delete(listener as ListenerFn<NonNullable<unknown>>);
};
} else {
this._noupdate_listeners.add(listener as ListenerFn<NonNullable<unknown>>);
return () => {
this._noupdate_listeners.delete(listener as ListenerFn<NonNullable<unknown>>);
};
}
allDocuments<T extends DocTypes>(): QueryResponse<T> {
return this.allDocs<T>();
}

// todo if we add this onto dbs in fireproof.ts then we can make index.ts a separate package
Expand All @@ -380,29 +301,6 @@ class LedgerImpl<DT extends DocTypes = NonNullable<unknown>> implements Ledger<D
await this.ready();
await this.crdt.compact();
}

async _notify(updates: DocUpdate<NonNullable<unknown>>[]) {
await this.ready();
if (this._listeners.size) {
const docs: DocWithId<NonNullable<unknown>>[] = updates.map(({ id, value }) => ({ ...value, _id: id }));
for (const listener of this._listeners) {
await (async () => await listener(docs as DocWithId<DT>[]))().catch((e: Error) => {
this.logger.Error().Err(e).Msg("subscriber error");
});
}
}
}

async _no_update_notify() {
await this.ready();
if (this._noupdate_listeners.size) {
for (const listener of this._noupdate_listeners) {
await (async () => await listener([]))().catch((e: Error) => {
this.logger.Error().Err(e).Msg("subscriber error");
});
}
}
}
}

function defaultURI(
Expand Down
13 changes: 6 additions & 7 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,12 @@ export interface AllDocsQueryOpts extends QueryOpts<string> {
prefix?: string;
}

export interface AllDocsResponse<T extends DocTypes> {
readonly rows: {
readonly key: string;
readonly value: DocWithId<T>;
}[];
readonly clock: ClockHead;
readonly name?: string;
export type QueryStreamMarker = { kind: "preexisting"; done: boolean } | { kind: "new" };

export interface QueryResponse<T extends DocTypes> {
snapshot(): Promise<DocWithId<T>[]>;
live(opts?: { since?: ClockHead }): ReadableStream<{ doc: DocWithId<T>; marker: QueryStreamMarker }>;
future(): ReadableStream<{ doc: DocWithId<T>; marker: QueryStreamMarker }>;
}

type EmitFn = (k: IndexKeyType, v?: DocFragment) => void;
Expand Down
Loading
Loading