diff --git a/lib/sqlsync-react/src/hooks.ts b/lib/sqlsync-react/src/hooks.ts index a9a4e78..1dcc382 100644 --- a/lib/sqlsync-react/src/hooks.ts +++ b/lib/sqlsync-react/src/hooks.ts @@ -18,7 +18,7 @@ export function useSQLSync(): SQLSync { const value = useContext(SQLSyncContext); if (!value) { throw new Error( - "could not find sqlsync context value; please ensure the component is wrapped in a ", + "could not find sqlsync context value; please ensure the component is wrapped in a " ); } return value; @@ -43,7 +43,7 @@ export function createDocHooks(docType: DocType): DocHooks { const sqlsync = useSQLSync(); return useCallback( (mutation: M) => sqlsync.mutate(docId, docType, mutation), - [sqlsync, docId, docType], + [sqlsync, docId, docType] ); }; @@ -55,7 +55,7 @@ export function createDocHooks(docType: DocType): DocHooks { const sqlsync = useSQLSync(); return useCallback( (enabled: boolean) => sqlsync.setConnectionEnabled(docId, docType, enabled), - [sqlsync, docId, docType], + [sqlsync, docId, docType] ); }; @@ -74,7 +74,7 @@ export type QueryState = export function useQuery( docType: DocType, docId: DocId, - rawQuery: ParameterizedQuery | string, + rawQuery: ParameterizedQuery | string ): QueryState { const sqlsync = useSQLSync(); const [state, setState] = useState>({ state: "pending" }); diff --git a/lib/sqlsync-solid-js/src/hooks.ts b/lib/sqlsync-solid-js/src/hooks.ts index 1c86e2a..5d30388 100644 --- a/lib/sqlsync-solid-js/src/hooks.ts +++ b/lib/sqlsync-solid-js/src/hooks.ts @@ -1,9 +1,16 @@ -import { ConnectionStatus, DocId } from "@orbitinghail/sqlsync-worker"; +import { + ConnectionStatus, + DocId, + DocType, + ParameterizedQuery, + QuerySubscription, + Row, + SQLSync, + normalizeQuery, + pendingPromise, +} from "@orbitinghail/sqlsync-worker"; import { Accessor, createEffect, createSignal, onCleanup, useContext } from "solid-js"; import { SQLSyncContext } from "./context"; -import { ParameterizedQuery, normalizeQuery } from "./sql"; -import { DocType, QuerySubscription, Row, SQLSync } from "./sqlsync"; -import { pendingPromise } from "./util"; export function useSQLSync(): Accessor { const [sqlSync] = useContext(SQLSyncContext); diff --git a/lib/sqlsync-solid-js/src/index.ts b/lib/sqlsync-solid-js/src/index.ts index 44ca817..f91277f 100644 --- a/lib/sqlsync-solid-js/src/index.ts +++ b/lib/sqlsync-solid-js/src/index.ts @@ -1,10 +1,6 @@ import { SQLSyncProvider } from "./context"; import { createDocHooks, useConnectionStatus } from "./hooks"; -import { sql } from "./sql"; -import { DocType, Row } from "./sqlsync"; -import { serializeMutationAsJSON } from "./util"; -export { SQLSyncProvider, createDocHooks, serializeMutationAsJSON, sql, useConnectionStatus }; -export type { DocType, Row }; +export { SQLSyncProvider, createDocHooks, useConnectionStatus }; // eof: this file only exports diff --git a/lib/sqlsync-solid-js/src/sql.ts b/lib/sqlsync-solid-js/src/sql.ts deleted file mode 100644 index 195d712..0000000 --- a/lib/sqlsync-solid-js/src/sql.ts +++ /dev/null @@ -1,42 +0,0 @@ -import { QueryKey, SqlValue } from "@orbitinghail/sqlsync-worker"; -import { base58 } from "@scure/base"; -import { sha256Digest } from "./util"; - -const UTF8_ENCODER = new TextEncoder(); - -export interface ParameterizedQuery { - sql: string; - params: SqlValue[]; -} - -export function normalizeQuery(query: ParameterizedQuery | string): ParameterizedQuery { - if (typeof query === "string") { - return { sql: query, params: [] }; - } - return query; -} - -/** - * Returns a parameterized query object with the given SQL string and parameters. - * This function should be used as a template literal tag. - * - * @example - * const query = sql`SELECT * FROM users WHERE id = ${userId}`; - * - * @param chunks - An array of string literals. - * @param params - An array of parameter values to be inserted into the SQL string. - * @returns A parameterized query object with the given SQL string and parameters. - */ -export function sql(chunks: readonly string[], ...params: SqlValue[]): ParameterizedQuery { - return { - sql: chunks.join("?"), - params, - }; -} - -export async function toQueryKey(query: ParameterizedQuery): Promise { - const queryJson = JSON.stringify([query.sql, query.params]); - const encoded = UTF8_ENCODER.encode(queryJson); - const hashed = await sha256Digest(encoded); - return base58.encode(new Uint8Array(hashed)); -} diff --git a/lib/sqlsync-solid-js/src/sqlsync.ts b/lib/sqlsync-solid-js/src/sqlsync.ts deleted file mode 100644 index 85ecbc0..0000000 --- a/lib/sqlsync-solid-js/src/sqlsync.ts +++ /dev/null @@ -1,306 +0,0 @@ -import { - ConnectionStatus, - DocEvent, - DocId, - DocReply, - HandlerId, - QueryKey, - SqlValue, - WorkerRequest, - WorkerToHostMsg, - journalIdToString, -} from "@orbitinghail/sqlsync-worker"; -import { ParameterizedQuery, toQueryKey } from "./sql"; -import { NarrowTaggedEnum, OmitUnion, assertUnreachable, initWorker, toRows } from "./util"; - -export type Row = Record; - -export interface DocType { - readonly reducerUrl: string | URL; - readonly serializeMutation: (mutation: Mutation) => Uint8Array; -} - -type DocReplyTag = DocReply["tag"]; -type SelectDocReply = NarrowTaggedEnum; - -export interface QuerySubscription { - handleRows: (rows: Row[]) => void; - handleErr: (err: string) => void; -} - -const nextHandlerId = (() => { - let handlerId = 0; - return () => handlerId++; -})(); - -export class SQLSync { - #port: MessagePort; - #openDocs = new Set(); - #pendingOpens = new Map>(); - #msgHandlers = new Map void>(); - #querySubscriptions = new Map(); - #connectionStatus: ConnectionStatus = "disconnected"; - #connectionStatusListeners = new Set<(status: ConnectionStatus) => void>(); - - constructor(workerUrl: string | URL, wasmUrl: string | URL, coordinatorUrl?: string | URL) { - this.#msgHandlers = new Map(); - const port = initWorker(workerUrl); - this.#port = port; - - // We use a WeakRef here to avoid a circular reference between this.port and this. - // This allows the SQLSync object to be garbage collected when it is no longer needed. - const weakThis = new WeakRef(this); - this.#port.onmessage = (msg) => { - const thisRef = weakThis.deref(); - if (thisRef) { - thisRef.#handleMessage(msg); - } else { - console.log( - "sqlsync: dropping message; sqlsync object has been garbage collected", - msg.data - ); - // clean up the port - port.postMessage({ tag: "Close", handlerId: 0 }); - port.onmessage = null; - return; - } - }; - - this.#boot(wasmUrl.toString(), coordinatorUrl?.toString()).catch((err) => { - // TODO: expose this error to the app in a nicer way - // probably through some event handlers on the SQLSync object - console.error("sqlsync boot failed", err); - throw err; - }); - } - - close() { - this.#port.onmessage = null; - this.#port.postMessage({ tag: "Close", handlerId: 0 }); - } - - #handleMessage(event: MessageEvent) { - const msg = event.data as WorkerToHostMsg; - - if (msg.tag === "Reply") { - console.log("sqlsync: received reply", msg.handlerId, msg.reply); - const handler = this.#msgHandlers.get(msg.handlerId); - if (handler) { - handler(msg.reply); - } else { - console.error("sqlsync: no handler for message", msg); - throw new Error("no handler for message"); - } - } else if (msg.tag === "Event") { - this.#handleDocEvent(msg.docId, msg.evt); - } else { - assertUnreachable("unknown message", msg as never); - } - } - - #handleDocEvent(docId: DocId, evt: DocEvent) { - console.log(`sqlsync: doc ${journalIdToString(docId)} received event`, evt); - if (evt.tag === "ConnectionStatus") { - this.#connectionStatus = evt.status; - for (const listener of this.#connectionStatusListeners) { - listener(evt.status); - } - } else if (evt.tag === "SubscriptionChanged") { - const subscriptions = this.#querySubscriptions.get(evt.key); - if (subscriptions) { - for (const subscription of subscriptions) { - subscription.handleRows(toRows(evt.columns, evt.rows)); - } - } - } else if (evt.tag === "SubscriptionErr") { - const subscriptions = this.#querySubscriptions.get(evt.key); - if (subscriptions) { - for (const subscription of subscriptions) { - subscription.handleErr(evt.err); - } - } - } else { - assertUnreachable("unknown event", evt as never); - } - } - - #send>( - expectedReplyTag: T, - msg: OmitUnion - ): Promise> { - return new Promise((resolve, reject) => { - const handlerId = nextHandlerId(); - const req: WorkerRequest = { ...msg, handlerId }; - - console.log("sqlsync: sending message", req.handlerId, req.tag === "Doc" ? req.req : req); - - this.#msgHandlers.set(handlerId, (msg: DocReply) => { - this.#msgHandlers.delete(handlerId); - if (msg.tag === "Err") { - reject(msg.err); - } else if (msg.tag === expectedReplyTag) { - // TODO: is it possible to get Typescript to infer this cast? - resolve(msg as SelectDocReply); - } else { - console.warn("sqlsync: unexpected reply", msg); - reject(new Error(`expected ${expectedReplyTag} reply; got ${msg.tag}`)); - } - }); - - this.#port.postMessage(req); - }); - } - - async #boot(wasmUrl: string, coordinatorUrl?: string): Promise { - await this.#send("Ack", { - tag: "Boot", - wasmUrl, - coordinatorUrl, - }); - } - - async #open(docId: DocId, docType: DocType): Promise { - let openPromise = this.#pendingOpens.get(docId); - if (!openPromise) { - openPromise = this.#send("Ack", { - tag: "Doc", - docId, - req: { - tag: "Open", - reducerUrl: docType.reducerUrl.toString(), - }, - }); - this.#pendingOpens.set(docId, openPromise); - } - await openPromise; - this.#pendingOpens.delete(docId); - this.#openDocs.add(docId); - } - - async query( - docId: DocId, - docType: DocType, - sql: string, - params: SqlValue[] - ): Promise { - if (!this.#openDocs.has(docId)) { - await this.#open(docId, docType); - } - - const reply = await this.#send("RecordSet", { - tag: "Doc", - docId: docId, - req: { tag: "Query", sql, params }, - }); - - return toRows(reply.columns, reply.rows); - } - - async subscribe( - docId: DocId, - docType: DocType, - query: ParameterizedQuery, - subscription: QuerySubscription - ): Promise<() => void> { - if (!this.#openDocs.has(docId)) { - await this.#open(docId, docType); - } - const queryKey = await toQueryKey(query); - - // get or create subscription - let subscriptions = this.#querySubscriptions.get(queryKey); - if (!subscriptions) { - subscriptions = []; - this.#querySubscriptions.set(queryKey, subscriptions); - } - if (subscriptions.indexOf(subscription) === -1) { - subscriptions.push(subscription); - } else { - throw new Error("sqlsync: duplicate subscription"); - } - - // send subscribe request - await this.#send("Ack", { - tag: "Doc", - docId, - req: { tag: "QuerySubscribe", key: queryKey, sql: query.sql, params: query.params }, - }); - - // return unsubscribe function - return () => { - const subscriptions = this.#querySubscriptions.get(queryKey); - if (!subscriptions) { - // no subscriptions - return; - } - const idx = subscriptions.indexOf(subscription); - if (idx === -1) { - // no subscription - return; - } - subscriptions.splice(idx, 1); - - window.setTimeout(() => { - // we want to wait a tiny bit before sending finalizing the unsubscribe - // to handle the case that React resubscribes to the same query right away - this.#unsubscribeIfNeeded(docId, queryKey).catch((err) => { - console.error("sqlsync: error unsubscribing", err); - }); - }, 10); - }; - } - - async #unsubscribeIfNeeded(docId: DocId, queryKey: QueryKey): Promise { - const subscriptions = this.#querySubscriptions.get(queryKey); - if (Array.isArray(subscriptions) && subscriptions.length === 0) { - // query subscription is still registered but has no subscriptions on our side - // inform the worker that we are no longer interested in this query - this.#querySubscriptions.delete(queryKey); - - if (this.#openDocs.has(docId)) { - await this.#send("Ack", { - tag: "Doc", - docId, - req: { tag: "QueryUnsubscribe", key: queryKey }, - }); - } - } - } - - async mutate(docId: DocId, docType: DocType, mutation: M): Promise { - if (!this.#openDocs.has(docId)) { - await this.#open(docId, docType); - } - await this.#send("Ack", { - tag: "Doc", - docId, - req: { tag: "Mutate", mutation: docType.serializeMutation(mutation) }, - }); - } - - get connectionStatus(): ConnectionStatus { - return this.#connectionStatus; - } - - addConnectionStatusListener(listener: (status: ConnectionStatus) => void): () => void { - this.#connectionStatusListeners.add(listener); - return () => { - this.#connectionStatusListeners.delete(listener); - }; - } - - async setConnectionEnabled( - docId: DocId, - docType: DocType, - enabled: boolean - ): Promise { - if (!this.#openDocs.has(docId)) { - await this.#open(docId, docType); - } - await this.#send("Ack", { - tag: "Doc", - docId, - req: { tag: "SetConnectionEnabled", enabled }, - }); - } -} diff --git a/lib/sqlsync-solid-js/src/util.ts b/lib/sqlsync-solid-js/src/util.ts deleted file mode 100644 index 902a705..0000000 --- a/lib/sqlsync-solid-js/src/util.ts +++ /dev/null @@ -1,63 +0,0 @@ -import { SqlValue } from "@orbitinghail/sqlsync-worker"; -import * as sha256 from "fast-sha256"; -import { Row } from "./sqlsync"; - -// omits the given keys from each member of the union -// https://stackoverflow.com/a/57103940/65872 -// biome-ignore lint/suspicious/noExplicitAny: any is required for this type magic to work -export type OmitUnion = T extends any ? Omit : never; - -export type NarrowTaggedEnum = E extends { tag: T } ? E : never; - -export function assertUnreachable(err: string, x: never): never { - throw new Error(`unreachable: ${err}; got ${JSON.stringify(x)}`); -} - -export function initWorker(workerUrl: string | URL): MessagePort { - const type: WorkerType = workerUrl.toString().endsWith(".cjs") ? "classic" : "module"; - - if (typeof SharedWorker !== "undefined") { - const worker = new SharedWorker(workerUrl, { type }); - return worker.port; - } - - const worker = new Worker(workerUrl, { type }); - // biome-ignore lint/suspicious/noExplicitAny: WebWorker extends MessagePort via duck typing - return worker as any as MessagePort; -} - -const UTF8Encoder = new TextEncoder(); -export const serializeMutationAsJSON = (mutation: M) => { - const serialized = JSON.stringify(mutation); - return UTF8Encoder.encode(serialized); -}; - -export function toRows(columns: string[], rows: SqlValue[][]): R[] { - const out: R[] = []; - for (const row of rows) { - const obj: Row = {}; - for (let i = 0; i < columns.length; i++) { - obj[columns[i]] = row[i]; - } - out.push(obj as R); - } - return out; -} - -export const pendingPromise = (): [Promise, (v: T) => void] => { - let resolve: (v: T) => void; - const promise = new Promise((r) => { - resolve = r; - }); - // biome-ignore lint/style/noNonNullAssertion: we know resolve is defined because the promise constructor runs syncronously - return [promise, resolve!]; -}; - -export const sha256Digest = async (data: Uint8Array): Promise => { - if (crypto?.subtle?.digest) { - const hash = await crypto.subtle.digest("SHA-256", data); - return new Uint8Array(hash); - } - - return Promise.resolve(sha256.hash(data)); -};