From 17cd621ece5091a4cedfcafc892166648be4be5a Mon Sep 17 00:00:00 2001 From: Rajaniraiyn R Date: Sat, 23 Nov 2024 18:48:06 +0530 Subject: [PATCH 1/7] add sql storage --- typescript/package-lock.json | 297 ++++++++++++++++++++++- typescript/package.json | 1 + typescript/src/storage/sqlChatStorage.ts | 193 +++++++++++++++ 3 files changed, 489 insertions(+), 2 deletions(-) create mode 100644 typescript/src/storage/sqlChatStorage.ts diff --git a/typescript/package-lock.json b/typescript/package-lock.json index ebfa9910..998bfa81 100644 --- a/typescript/package-lock.json +++ b/typescript/package-lock.json @@ -1,12 +1,12 @@ { "name": "multi-agent-orchestrator", - "version": "0.0.15", + "version": "0.0.17", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "multi-agent-orchestrator", - "version": "0.0.15", + "version": "0.0.17", "license": "Apache-2.0", "dependencies": { "@anthropic-ai/sdk": "^0.24.3", @@ -18,6 +18,7 @@ "@aws-sdk/client-lex-runtime-v2": "^3.621.0", "@aws-sdk/lib-dynamodb": "^3.621.0", "@aws-sdk/util-dynamodb": "^3.621.0", + "@libsql/client": "^0.14.0", "axios": "^1.7.2", "eslint-config-prettier": "^9.1.0", "natural": "^7.0.7", @@ -2477,6 +2478,168 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "node_modules/@libsql/client": { + "version": "0.14.0", + "resolved": "https://registry.npmjs.org/@libsql/client/-/client-0.14.0.tgz", + "integrity": "sha512-/9HEKfn6fwXB5aTEEoMeFh4CtG0ZzbncBb1e++OCdVpgKZ/xyMsIVYXm0w7Pv4RUel803vE6LwniB3PqD72R0Q==", + "license": "MIT", + "dependencies": { + "@libsql/core": "^0.14.0", + "@libsql/hrana-client": "^0.7.0", + "js-base64": "^3.7.5", + "libsql": "^0.4.4", + "promise-limit": "^2.7.0" + } + }, + "node_modules/@libsql/core": { + "version": "0.14.0", + "resolved": "https://registry.npmjs.org/@libsql/core/-/core-0.14.0.tgz", + "integrity": "sha512-nhbuXf7GP3PSZgdCY2Ecj8vz187ptHlZQ0VRc751oB2C1W8jQUXKKklvt7t1LJiUTQBVJuadF628eUk+3cRi4Q==", + "license": "MIT", + "dependencies": { + "js-base64": "^3.7.5" + } + }, + "node_modules/@libsql/darwin-arm64": { + "version": "0.4.7", + "resolved": "https://registry.npmjs.org/@libsql/darwin-arm64/-/darwin-arm64-0.4.7.tgz", + "integrity": "sha512-yOL742IfWUlUevnI5PdnIT4fryY3LYTdLm56bnY0wXBw7dhFcnjuA7jrH3oSVz2mjZTHujxoITgAE7V6Z+eAbg==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@libsql/darwin-x64": { + "version": "0.4.7", + "resolved": "https://registry.npmjs.org/@libsql/darwin-x64/-/darwin-x64-0.4.7.tgz", + "integrity": "sha512-ezc7V75+eoyyH07BO9tIyJdqXXcRfZMbKcLCeF8+qWK5nP8wWuMcfOVywecsXGRbT99zc5eNra4NEx6z5PkSsA==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ] + }, + "node_modules/@libsql/hrana-client": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/@libsql/hrana-client/-/hrana-client-0.7.0.tgz", + "integrity": "sha512-OF8fFQSkbL7vJY9rfuegK1R7sPgQ6kFMkDamiEccNUvieQ+3urzfDFI616oPl8V7T9zRmnTkSjMOImYCAVRVuw==", + "license": "MIT", + "dependencies": { + "@libsql/isomorphic-fetch": "^0.3.1", + "@libsql/isomorphic-ws": "^0.1.5", + "js-base64": "^3.7.5", + "node-fetch": "^3.3.2" + } + }, + "node_modules/@libsql/hrana-client/node_modules/node-fetch": { + "version": "3.3.2", + "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-3.3.2.tgz", + "integrity": "sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==", + "license": "MIT", + "dependencies": { + "data-uri-to-buffer": "^4.0.0", + "fetch-blob": "^3.1.4", + "formdata-polyfill": "^4.0.10" + }, + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/node-fetch" + } + }, + "node_modules/@libsql/isomorphic-fetch": { + "version": "0.3.1", + "resolved": "https://registry.npmjs.org/@libsql/isomorphic-fetch/-/isomorphic-fetch-0.3.1.tgz", + "integrity": "sha512-6kK3SUK5Uu56zPq/Las620n5aS9xJq+jMBcNSOmjhNf/MUvdyji4vrMTqD7ptY7/4/CAVEAYDeotUz60LNQHtw==", + "license": "MIT", + "engines": { + "node": ">=18.0.0" + } + }, + "node_modules/@libsql/isomorphic-ws": { + "version": "0.1.5", + "resolved": "https://registry.npmjs.org/@libsql/isomorphic-ws/-/isomorphic-ws-0.1.5.tgz", + "integrity": "sha512-DtLWIH29onUYR00i0GlQ3UdcTRC6EP4u9w/h9LxpUZJWRMARk6dQwZ6Jkd+QdwVpuAOrdxt18v0K2uIYR3fwFg==", + "license": "MIT", + "dependencies": { + "@types/ws": "^8.5.4", + "ws": "^8.13.0" + } + }, + "node_modules/@libsql/linux-arm64-gnu": { + "version": "0.4.7", + "resolved": "https://registry.npmjs.org/@libsql/linux-arm64-gnu/-/linux-arm64-gnu-0.4.7.tgz", + "integrity": "sha512-WlX2VYB5diM4kFfNaYcyhw5y+UJAI3xcMkEUJZPtRDEIu85SsSFrQ+gvoKfcVh76B//ztSeEX2wl9yrjF7BBCA==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@libsql/linux-arm64-musl": { + "version": "0.4.7", + "resolved": "https://registry.npmjs.org/@libsql/linux-arm64-musl/-/linux-arm64-musl-0.4.7.tgz", + "integrity": "sha512-6kK9xAArVRlTCpWeqnNMCoXW1pe7WITI378n4NpvU5EJ0Ok3aNTIC2nRPRjhro90QcnmLL1jPcrVwO4WD1U0xw==", + "cpu": [ + "arm64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@libsql/linux-x64-gnu": { + "version": "0.4.7", + "resolved": "https://registry.npmjs.org/@libsql/linux-x64-gnu/-/linux-x64-gnu-0.4.7.tgz", + "integrity": "sha512-CMnNRCmlWQqqzlTw6NeaZXzLWI8bydaXDke63JTUCvu8R+fj/ENsLrVBtPDlxQ0wGsYdXGlrUCH8Qi9gJep0yQ==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@libsql/linux-x64-musl": { + "version": "0.4.7", + "resolved": "https://registry.npmjs.org/@libsql/linux-x64-musl/-/linux-x64-musl-0.4.7.tgz", + "integrity": "sha512-nI6tpS1t6WzGAt1Kx1n1HsvtBbZ+jHn0m7ogNNT6pQHZQj7AFFTIMeDQw/i/Nt5H38np1GVRNsFe99eSIMs9XA==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "linux" + ] + }, + "node_modules/@libsql/win32-x64-msvc": { + "version": "0.4.7", + "resolved": "https://registry.npmjs.org/@libsql/win32-x64-msvc/-/win32-x64-msvc-0.4.7.tgz", + "integrity": "sha512-7pJzOWzPm6oJUxml+PCDRzYQ4A1hTMHAciTAHfFK4fkbDZX33nWPVG7Y3vqdKtslcwAzwmrNDc6sXy2nwWnbiw==", + "cpu": [ + "x64" + ], + "license": "MIT", + "optional": true, + "os": [ + "win32" + ] + }, "node_modules/@mongodb-js/saslprep": { "version": "1.1.7", "resolved": "https://registry.npmjs.org/@mongodb-js/saslprep/-/saslprep-1.1.7.tgz", @@ -2485,6 +2648,12 @@ "sparse-bitfield": "^3.0.3" } }, + "node_modules/@neon-rs/load": { + "version": "0.0.4", + "resolved": "https://registry.npmjs.org/@neon-rs/load/-/load-0.0.4.tgz", + "integrity": "sha512-kTPhdZyTQxB+2wpiRcFWrDcejc4JI6tkPuS7UZCG4l6Zvc5kU/gGQ/ozvHTh1XR5tS+UlfAfGuPajjzQjCiHCw==", + "license": "MIT" + }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", @@ -3374,6 +3543,15 @@ "@types/webidl-conversions": "*" } }, + "node_modules/@types/ws": { + "version": "8.5.13", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.13.tgz", + "integrity": "sha512-osM/gWBTPKgHV8XkTunnegTRIsvF6owmf5w+JtAfOw472dptdm0dlGv4xCt6GwQRcC2XVOvvRE/0bAoQcL2QkA==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/yargs": { "version": "17.0.32", "resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.32.tgz", @@ -4273,6 +4451,15 @@ "node": ">= 8" } }, + "node_modules/data-uri-to-buffer": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/data-uri-to-buffer/-/data-uri-to-buffer-4.0.1.tgz", + "integrity": "sha512-0R9ikRb668HB7QDxT1vkpuUBtqc53YyAwMwGeUFKRojY/NWKvdZ+9UYtRfGmhqNbRkTSVpMbmyhXipFFv2cb/A==", + "license": "MIT", + "engines": { + "node": ">= 12" + } + }, "node_modules/debug": { "version": "4.3.5", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.5.tgz", @@ -4887,6 +5074,29 @@ "bser": "2.1.1" } }, + "node_modules/fetch-blob": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/fetch-blob/-/fetch-blob-3.2.0.tgz", + "integrity": "sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/jimmywarting" + }, + { + "type": "paypal", + "url": "https://paypal.me/jimmywarting" + } + ], + "license": "MIT", + "dependencies": { + "node-domexception": "^1.0.0", + "web-streams-polyfill": "^3.0.3" + }, + "engines": { + "node": "^12.20 || >= 14.13" + } + }, "node_modules/file-entry-cache": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.1.tgz", @@ -5028,6 +5238,18 @@ "node": ">= 14" } }, + "node_modules/formdata-polyfill": { + "version": "4.0.10", + "resolved": "https://registry.npmjs.org/formdata-polyfill/-/formdata-polyfill-4.0.10.tgz", + "integrity": "sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==", + "license": "MIT", + "dependencies": { + "fetch-blob": "^3.1.2" + }, + "engines": { + "node": ">=12.20.0" + } + }, "node_modules/fs.realpath": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", @@ -6180,6 +6402,12 @@ "url": "https://github.com/chalk/supports-color?sponsor=1" } }, + "node_modules/js-base64": { + "version": "3.7.7", + "resolved": "https://registry.npmjs.org/js-base64/-/js-base64-3.7.7.tgz", + "integrity": "sha512-7rCnleh0z2CkXhH67J8K1Ytz0b2Y+yxTPL+/KOJoa20hfnVQ/3/T6W/KflYI4bRHRagNeXeU2bkNGI3v1oS/lw==", + "license": "BSD-3-Clause" + }, "node_modules/js-tokens": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/js-tokens/-/js-tokens-4.0.0.tgz", @@ -6296,6 +6524,44 @@ "node": ">= 0.8.0" } }, + "node_modules/libsql": { + "version": "0.4.7", + "resolved": "https://registry.npmjs.org/libsql/-/libsql-0.4.7.tgz", + "integrity": "sha512-T9eIRCs6b0J1SHKYIvD8+KCJMcWZ900iZyxdnSCdqxN12Z1ijzT+jY5nrk72Jw4B0HGzms2NgpryArlJqvc3Lw==", + "cpu": [ + "x64", + "arm64", + "wasm32" + ], + "license": "MIT", + "os": [ + "darwin", + "linux", + "win32" + ], + "dependencies": { + "@neon-rs/load": "^0.0.4", + "detect-libc": "2.0.2" + }, + "optionalDependencies": { + "@libsql/darwin-arm64": "0.4.7", + "@libsql/darwin-x64": "0.4.7", + "@libsql/linux-arm64-gnu": "0.4.7", + "@libsql/linux-arm64-musl": "0.4.7", + "@libsql/linux-x64-gnu": "0.4.7", + "@libsql/linux-x64-musl": "0.4.7", + "@libsql/win32-x64-msvc": "0.4.7" + } + }, + "node_modules/libsql/node_modules/detect-libc": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/detect-libc/-/detect-libc-2.0.2.tgz", + "integrity": "sha512-UX6sGumvvqSaXgdKGUsgZWqcUyIXZ/vZTrlRT/iobiKhGL0zL4d3osHj3uqllWJK+i+sixDS/3COVEOFbupFyw==", + "license": "Apache-2.0", + "engines": { + "node": ">=8" + } + }, "node_modules/lines-and-columns": { "version": "1.2.4", "resolved": "https://registry.npmjs.org/lines-and-columns/-/lines-and-columns-1.2.4.tgz", @@ -7188,6 +7454,12 @@ "url": "https://github.com/chalk/ansi-styles?sponsor=1" } }, + "node_modules/promise-limit": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/promise-limit/-/promise-limit-2.7.0.tgz", + "integrity": "sha512-7nJ6v5lnJsXwGprnGXga4wx6d1POjvi5Qmf1ivTRxTjH4Z/9Czja/UCMLVmB9N93GeWOU93XaFaEt6jbuoagNw==", + "license": "ISC" + }, "node_modules/prompts": { "version": "2.4.2", "resolved": "https://registry.npmjs.org/prompts/-/prompts-2.4.2.tgz", @@ -8136,6 +8408,27 @@ "node": "^12.13.0 || ^14.15.0 || >=16.0.0" } }, + "node_modules/ws": { + "version": "8.18.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.0.tgz", + "integrity": "sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/xtend": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", diff --git a/typescript/package.json b/typescript/package.json index 9350087e..84413e79 100644 --- a/typescript/package.json +++ b/typescript/package.json @@ -36,6 +36,7 @@ "@aws-sdk/client-lex-runtime-v2": "^3.621.0", "@aws-sdk/lib-dynamodb": "^3.621.0", "@aws-sdk/util-dynamodb": "^3.621.0", + "@libsql/client": "^0.14.0", "axios": "^1.7.2", "eslint-config-prettier": "^9.1.0", "natural": "^7.0.7", diff --git a/typescript/src/storage/sqlChatStorage.ts b/typescript/src/storage/sqlChatStorage.ts new file mode 100644 index 00000000..bce0e78e --- /dev/null +++ b/typescript/src/storage/sqlChatStorage.ts @@ -0,0 +1,193 @@ +import { Client, createClient } from '@libsql/client'; +import { ConversationMessage, ParticipantRole } from "../types"; +import { Logger } from "../utils/logger"; +import { ChatStorage } from "./chatStorage"; + +export class SqlChatStorage extends ChatStorage { + private client: Client; + + constructor(url: string, authToken?: string) { + super(); + this.client = createClient({ + url, + authToken, + }); + this.initializeDatabase(); + } + + private async initializeDatabase() { + try { + // Create conversations table if it doesn't exist + await this.client.execute(/*sql*/` + CREATE TABLE IF NOT EXISTS conversations ( + user_id TEXT NOT NULL, + session_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + message_index INTEGER NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + timestamp INTEGER NOT NULL, + PRIMARY KEY (user_id, session_id, agent_id, message_index) + ); + `); + + // Create index for faster queries + await this.client.execute(/*sql*/` + CREATE INDEX IF NOT EXISTS idx_conversations_lookup + ON conversations(user_id, session_id, agent_id); + `); + } catch (error) { + Logger.logger.error("Error initializing database:", error); + throw error; + } + } + + async saveChatMessage( + userId: string, + sessionId: string, + agentId: string, + newMessage: ConversationMessage, + maxHistorySize?: number + ): Promise { + try { + // Fetch existing conversation + const existingConversation = await this.fetchChat(userId, sessionId, agentId); + + if (super.isConsecutiveMessage(existingConversation, newMessage)) { + Logger.logger.log(`> Consecutive ${newMessage.role} message detected for agent ${agentId}. Not saving.`); + return existingConversation; + } + + // Begin transaction + await this.client.transaction().then(async (tx) => { + // Get the next message index + const nextIndexResult = await tx.execute({ + sql: /*sql*/` + SELECT COALESCE(MAX(message_index) + 1, 0) as next_index + FROM conversations + WHERE user_id = ? AND session_id = ? AND agent_id = ? + `, + args: [userId, sessionId, agentId] + }); + + const nextIndex = nextIndexResult.rows[0].next_index as number; + const timestamp = Date.now(); + const content = Array.isArray(newMessage.content) + ? JSON.stringify(newMessage.content) + : JSON.stringify([{ text: newMessage.content }]); + + // Insert new message + await tx.execute({ + sql: /*sql*/` + INSERT INTO conversations ( + user_id, session_id, agent_id, message_index, + role, content, timestamp + ) VALUES (?, ?, ?, ?, ?, ?, ?) + `, + args: [ + userId, + sessionId, + agentId, + nextIndex, + newMessage.role, + content, + timestamp + ] + }); + + // If maxHistorySize is set, cleanup old messages + if (maxHistorySize !== undefined) { + await tx.execute({ + sql: /*sql*/` + DELETE FROM conversations + WHERE user_id = ? + AND session_id = ? + AND agent_id = ? + AND message_index <= ( + SELECT MAX(message_index) - ? + FROM conversations + WHERE user_id = ? + AND session_id = ? + AND agent_id = ? + ) + `, + args: [ + userId, sessionId, agentId, + maxHistorySize, + userId, sessionId, agentId + ] + }); + } + }); + + // Return updated conversation + return this.fetchChat(userId, sessionId, agentId, maxHistorySize); + } catch (error) { + Logger.logger.error("Error saving message:", error); + throw error; + } + } + + async fetchChat( + userId: string, + sessionId: string, + agentId: string, + maxHistorySize?: number + ): Promise { + try { + const sql = /*sql*/` + SELECT role, content, timestamp + FROM conversations + WHERE user_id = ? AND session_id = ? AND agent_id = ? + ORDER BY message_index ${maxHistorySize ? 'DESC LIMIT ?' : 'ASC'} + `; + + const args = maxHistorySize + ? [userId, sessionId, agentId, maxHistorySize] + : [userId, sessionId, agentId]; + + const result = await this.client.execute({ sql, args }); + const messages = result.rows; + + // If we used LIMIT, we need to reverse the results to maintain chronological order + if (maxHistorySize) messages.reverse(); + + return messages.map(msg => ({ + role: msg.role as ParticipantRole, + content: JSON.parse(msg.content as string), + })); + } catch (error) { + Logger.logger.error("Error fetching chat:", error); + throw error; + } + } + + async fetchAllChats( + userId: string, + sessionId: string + ): Promise { + try { + const result = await this.client.execute({ + sql: /*sql*/` + SELECT role, content, timestamp, agent_id + FROM conversations + WHERE user_id = ? AND session_id = ? + ORDER BY timestamp ASC + `, + args: [userId, sessionId] + }); + + const messages = result.rows; + + return messages.map(msg => ({ + role: msg.role as ParticipantRole, + content: msg.role === ParticipantRole.ASSISTANT + ? [{ text: `[${msg.agent_id}] ${JSON.parse(msg.content as string)[0]?.text || ''}` }] + : JSON.parse(msg.content as string) + })); + } catch (error) { + Logger.logger.error("Error fetching all chats:", error); + throw error; + } + } +} \ No newline at end of file From 9e195fe41d0468148d05048739c78ca5cc6d6090 Mon Sep 17 00:00:00 2001 From: Rajaniraiyn R Date: Sat, 23 Nov 2024 18:48:14 +0530 Subject: [PATCH 2/7] add sql storage docs --- docs/src/content/docs/storage/sql.mdx | 130 ++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 docs/src/content/docs/storage/sql.mdx diff --git a/docs/src/content/docs/storage/sql.mdx b/docs/src/content/docs/storage/sql.mdx new file mode 100644 index 00000000..13a72b11 --- /dev/null +++ b/docs/src/content/docs/storage/sql.mdx @@ -0,0 +1,130 @@ +--- +title: SQL Storage +description: Using SQL databases (SQLite/Turso) for persistent conversation storage in the Multi-Agent Orchestrator System +--- + +SQL storage provides a flexible and reliable solution for storing conversation history in the Multi-Agent Orchestrator System. This implementation supports both local SQLite databases and remote Turso databases, making it suitable for various deployment scenarios from development to production. + +## Features + +- Persistent storage across application restarts +- Support for both local and remote databases +- Built-in connection pooling and retry mechanisms +- Compatible with edge and serverless deployments +- Transaction support for data consistency +- Efficient indexing for quick data retrieval + +## When to Use SQL Storage + +- When you need a balance between simplicity and scalability +- For applications requiring persistent storage without complex infrastructure +- In both development and production environments +- When working with edge or serverless deployments +- When you need local-first development with remote deployment options + +## Implementation + +To use SQL storage in your Multi-Agent Orchestrator: + +import { Tabs, TabItem } from '@astrojs/starlight/components'; + + + + ```typescript + import { SqlChatStorage, MultiAgentOrchestrator } from 'multi-agent-orchestrator'; + + // For local SQLite database + const localStorage = new SqlChatStorage('file:local.db'); + + // For remote Turso database + const remoteStorage = new SqlChatStorage( + 'libsql://your-database-url.turso.io', + 'your-auth-token' + ); + + const orchestrator = new MultiAgentOrchestrator({ + storage: localStorage // or remoteStorage + }); + ``` + + + ```python + from multi_agent_orchestrator.storage import SqlChatStorage + from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator + + # For local SQLite database + local_storage = SqlChatStorage('file:local.db') + + # For remote Turso database + remote_storage = SqlChatStorage( + url='libsql://your-database-url.turso.io', + auth_token='your-auth-token' + ) + + orchestrator = MultiAgentOrchestrator(storage=local_storage) # or remote_storage + ``` + + + +## Configuration + +### Local DB +For local development, simply provide a file URL: +```typescript +const storage = new SqlChatStorage('file:local.db'); +``` + +### Remote DB +For production with Turso: +1. Create a Turso database through their platform +2. Obtain your database URL and authentication token +3. Configure your storage: +```typescript +const storage = new SqlChatStorage( + 'libsql://your-database-url.com', + 'your-auth-token' +); +``` + +## Database Schema + +The SQL storage implementation uses the following schema: + +```sql +CREATE TABLE conversations ( + user_id TEXT NOT NULL, + session_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + message_index INTEGER NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + timestamp INTEGER NOT NULL, + PRIMARY KEY (user_id, session_id, agent_id, message_index) +); + +CREATE INDEX idx_conversations_lookup +ON conversations(user_id, session_id, agent_id); +``` + +## Considerations + +- Automatic table and index creation on initialization +- Built-in transaction support for data consistency +- Efficient query performance through proper indexing +- Support for message history size limits +- Automatic JSON serialization/deserialization of message content + +## Best Practices + +- Use a single instance of SqlChatStorage throughout your application +- For serverless environments, implement proper connection pooling +- Implement proper error handling for database operations +- Consider implementing retry mechanisms for network issues +- Regularly backup your database +- Implement data cleanup strategies for old conversations +- Monitor database size and performance +- Keep your authentication tokens secure +- Use environment variables for sensitive configurationxe +- Implement proper access controls at the application level + +SQL storage provides a robust and flexible solution for managing conversation history in the Multi-Agent Orchestrator System. It offers a good balance between simplicity and features, making it suitable for both development and production environments. \ No newline at end of file From 07cc7f674a6cee9bdcef8785be20277c87b06fb0 Mon Sep 17 00:00:00 2001 From: Rajaniraiyn R Date: Sat, 23 Nov 2024 18:55:48 +0530 Subject: [PATCH 3/7] add sql storage python client --- python/setup.cfg | 1 + .../storage/__init__.py | 4 +- .../storage/sql_chat_storage.py | 202 ++++++++++++++++++ 3 files changed, 206 insertions(+), 1 deletion(-) create mode 100644 python/src/multi_agent_orchestrator/storage/sql_chat_storage.py diff --git a/python/setup.cfg b/python/setup.cfg index 47f50bc7..a754d09d 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -22,6 +22,7 @@ python_requires = >=3.11 install_requires = boto3==1.34.151 anthropic==0.32.0 + libsql-client==0.3.1 [options.packages.find] where = src diff --git a/python/src/multi_agent_orchestrator/storage/__init__.py b/python/src/multi_agent_orchestrator/storage/__init__.py index 483a5bab..13137d38 100644 --- a/python/src/multi_agent_orchestrator/storage/__init__.py +++ b/python/src/multi_agent_orchestrator/storage/__init__.py @@ -1,10 +1,12 @@ from .chat_storage import ChatStorage from .in_memory_chat_storage import InMemoryChatStorage from .dynamodb_chat_storage import DynamoDbChatStorage +from .sql_chat_storage import SqlChatStorage __all__ = [ 'ChatStorage', 'InMemoryChatStorage', - 'DynamoDbChatStorage' + 'DynamoDbChatStorage', + 'SqlChatStorage', ] diff --git a/python/src/multi_agent_orchestrator/storage/sql_chat_storage.py b/python/src/multi_agent_orchestrator/storage/sql_chat_storage.py new file mode 100644 index 00000000..b8274944 --- /dev/null +++ b/python/src/multi_agent_orchestrator/storage/sql_chat_storage.py @@ -0,0 +1,202 @@ +from typing import List, Dict, Optional, Union +import time +import json +from libsql_client import Client, create_client +from multi_agent_orchestrator.storage import ChatStorage +from multi_agent_orchestrator.types import ConversationMessage, ParticipantRole, TimestampedMessage +from multi_agent_orchestrator.utils import Logger + +class SqlChatStorage(ChatStorage): + """SQL-based chat storage implementation supporting both local SQLite and remote Turso databases.""" + + def __init__( + self, + url: str, + auth_token: Optional[str] = None + ): + """Initialize SQL storage. + + Args: + url: Database URL (e.g., 'file:local.db' or 'libsql://your-db-url.com') + auth_token: Authentication token for remote databases (optional) + """ + super().__init__() + self.client = create_client( + url=url, + auth_token=auth_token + ) + self._initialize_database() + + def _initialize_database(self) -> None: + """Create necessary tables and indexes if they don't exist.""" + try: + # Create conversations table + self.client.execute(""" + CREATE TABLE IF NOT EXISTS conversations ( + user_id TEXT NOT NULL, + session_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + message_index INTEGER NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + timestamp INTEGER NOT NULL, + PRIMARY KEY (user_id, session_id, agent_id, message_index) + ) + """) + + # Create index for faster queries + self.client.execute(""" + CREATE INDEX IF NOT EXISTS idx_conversations_lookup + ON conversations(user_id, session_id, agent_id) + """) + except Exception as error: + Logger.error(f"Error initializing database: {str(error)}") + raise error + + async def save_chat_message( + self, + user_id: str, + session_id: str, + agent_id: str, + new_message: ConversationMessage, + max_history_size: Optional[int] = None + ) -> List[ConversationMessage]: + """Save a new chat message.""" + try: + # Fetch existing conversation + existing_conversation = await self.fetch_chat(user_id, session_id, agent_id) + + if self.is_consecutive_message(existing_conversation, new_message): + Logger.debug(f"> Consecutive {new_message.role} message detected for agent {agent_id}. Not saving.") + return existing_conversation + + # Get next message index + result = self.client.execute(""" + SELECT COALESCE(MAX(message_index) + 1, 0) as next_index + FROM conversations + WHERE user_id = ? AND session_id = ? AND agent_id = ? + """, [user_id, session_id, agent_id]) + + next_index = result.rows[0]['next_index'] + timestamp = int(time.time() * 1000) + content = json.dumps(new_message.content) + + # Begin transaction + with self.client.transaction(): + # Insert new message + self.client.execute(""" + INSERT INTO conversations ( + user_id, session_id, agent_id, message_index, + role, content, timestamp + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, [ + user_id, session_id, agent_id, next_index, + new_message.role, content, timestamp + ]) + + # Clean up old messages if max_history_size is set + if max_history_size is not None: + self.client.execute(""" + DELETE FROM conversations + WHERE user_id = ? + AND session_id = ? + AND agent_id = ? + AND message_index <= ( + SELECT MAX(message_index) - ? + FROM conversations + WHERE user_id = ? + AND session_id = ? + AND agent_id = ? + ) + """, [ + user_id, session_id, agent_id, + max_history_size, + user_id, session_id, agent_id + ]) + + # Return updated conversation + return await self.fetch_chat(user_id, session_id, agent_id, max_history_size) + except Exception as error: + Logger.error(f"Error saving message: {str(error)}") + raise error + + async def fetch_chat( + self, + user_id: str, + session_id: str, + agent_id: str, + max_history_size: Optional[int] = None + ) -> List[ConversationMessage]: + """Fetch chat messages.""" + try: + query = """ + SELECT role, content, timestamp + FROM conversations + WHERE user_id = ? AND session_id = ? AND agent_id = ? + ORDER BY message_index {} + """.format('DESC LIMIT ?' if max_history_size else 'ASC') + + params = [user_id, session_id, agent_id] + if max_history_size: + params.append(max_history_size) + + result = self.client.execute(query, params) + messages = result.rows + + # If we used LIMIT, reverse to maintain chronological order + if max_history_size: + messages.reverse() + + return [ + ConversationMessage( + role=msg['role'], + content=json.loads(msg['content']) + ) for msg in messages + ] + except Exception as error: + Logger.error(f"Error fetching chat: {str(error)}") + raise error + + async def fetch_all_chats( + self, + user_id: str, + session_id: str + ) -> List[ConversationMessage]: + """Fetch all chat messages for a user and session.""" + try: + result = self.client.execute(""" + SELECT role, content, timestamp, agent_id + FROM conversations + WHERE user_id = ? AND session_id = ? + ORDER BY timestamp ASC + """, [user_id, session_id]) + + return [ + ConversationMessage( + role=msg['role'], + content=self._format_content( + msg['role'], + json.loads(msg['content']), + msg['agent_id'] + ) + ) for msg in result.rows + ] + except Exception as error: + Logger.error(f"Error fetching all chats: {str(error)}") + raise error + + def _format_content( + self, + role: str, + content: Union[List, str], + agent_id: str + ) -> List[Dict[str, str]]: + """Format message content with agent ID for assistant messages.""" + if role == ParticipantRole.ASSISTANT.value: + text = content[0]['text'] if isinstance(content, list) else content + return [{'text': f"[{agent_id}] {text}"}] + return content if isinstance(content, list) else [{'text': content}] + + def close(self) -> None: + """Close the database connection.""" + self.client.close() \ No newline at end of file From 4731f1ba9b3791ab2fbc320af0f7d65a8ea6136b Mon Sep 17 00:00:00 2001 From: Rajaniraiyn R Date: Wed, 27 Nov 2024 01:21:26 +0530 Subject: [PATCH 4/7] Enhance SQL storage implementation by adding initialization and connection management features. Updated documentation to reflect changes in database connection handling and best practices for closing connections. Improved error handling during database initialization. --- docs/src/content/docs/storage/sql.mdx | 16 ++++++++++------ typescript/src/storage/sqlChatStorage.ts | 18 +++++++++++++++--- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/docs/src/content/docs/storage/sql.mdx b/docs/src/content/docs/storage/sql.mdx index 13a72b11..a04d7b20 100644 --- a/docs/src/content/docs/storage/sql.mdx +++ b/docs/src/content/docs/storage/sql.mdx @@ -35,16 +35,23 @@ import { Tabs, TabItem } from '@astrojs/starlight/components'; // For local SQLite database const localStorage = new SqlChatStorage('file:local.db'); + await localStorage.waitForInitialization(); - // For remote Turso database + // For remote database const remoteStorage = new SqlChatStorage( - 'libsql://your-database-url.turso.io', + 'libsql://your-database-url.example.com', 'your-auth-token' ); + await remoteStorage.waitForInitialization(); const orchestrator = new MultiAgentOrchestrator({ storage: localStorage // or remoteStorage }); + + + // Close the database connections when done + await localStorage.close(); + await remoteStorage.close(); ``` @@ -117,14 +124,11 @@ ON conversations(user_id, session_id, agent_id); ## Best Practices - Use a single instance of SqlChatStorage throughout your application -- For serverless environments, implement proper connection pooling -- Implement proper error handling for database operations -- Consider implementing retry mechanisms for network issues - Regularly backup your database - Implement data cleanup strategies for old conversations - Monitor database size and performance - Keep your authentication tokens secure -- Use environment variables for sensitive configurationxe - Implement proper access controls at the application level +- Close the database connections when done SQL storage provides a robust and flexible solution for managing conversation history in the Multi-Agent Orchestrator System. It offers a good balance between simplicity and features, making it suitable for both development and production environments. \ No newline at end of file diff --git a/typescript/src/storage/sqlChatStorage.ts b/typescript/src/storage/sqlChatStorage.ts index bce0e78e..59c3c93f 100644 --- a/typescript/src/storage/sqlChatStorage.ts +++ b/typescript/src/storage/sqlChatStorage.ts @@ -5,6 +5,7 @@ import { ChatStorage } from "./chatStorage"; export class SqlChatStorage extends ChatStorage { private client: Client; + private initialized: Promise; constructor(url: string, authToken?: string) { super(); @@ -12,7 +13,7 @@ export class SqlChatStorage extends ChatStorage { url, authToken, }); - this.initializeDatabase(); + this.initialized = this.initializeDatabase(); } private async initializeDatabase() { @@ -38,7 +39,7 @@ export class SqlChatStorage extends ChatStorage { `); } catch (error) { Logger.logger.error("Error initializing database:", error); - throw error; + throw new Error("Database initialization error"); } } @@ -190,4 +191,15 @@ export class SqlChatStorage extends ChatStorage { throw error; } } -} \ No newline at end of file + + async waitForInitialization() { + if (this.client.closed) { + throw new Error("Database connection closed"); + } + await this.initialized; + } + + close() { + this.client.close(); + } +} From 9162787e36ad091f663d1938df7d7cdd3b2ab398 Mon Sep 17 00:00:00 2001 From: Rajaniraiyn R Date: Wed, 27 Nov 2024 01:32:23 +0530 Subject: [PATCH 5/7] Remove libsql-client dependency from setup.cfg, streamlining package requirements for the Python project. --- python/setup.cfg | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/setup.cfg b/python/setup.cfg index 7e92d326..3b57ccbc 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -28,8 +28,6 @@ anthropic = anthropic==0.32.0 openai = openai==1.55.0 -sql = - libsql-client==0.3.1 all = anthropic==0.32.0 openai==1.55.0 From 622f20421e9bf504e6b2b16530abda96ad099e67 Mon Sep 17 00:00:00 2001 From: Rajaniraiyn R Date: Tue, 24 Dec 2024 15:37:30 +0530 Subject: [PATCH 6/7] mention sql storage in overview docs --- docs/src/content/docs/storage/overview.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/src/content/docs/storage/overview.md b/docs/src/content/docs/storage/overview.md index 79211746..78c46264 100644 --- a/docs/src/content/docs/storage/overview.md +++ b/docs/src/content/docs/storage/overview.md @@ -21,19 +21,25 @@ The Multi-Agent Orchestrator System offers flexible storage options for maintain - Provides persistent storage for production environments. - Allows for scalable and durable conversation history storage. -3. **Custom Storage Solutions**: +3. **SQL Storage**: + - Offers persistent storage using SQLite or Turso databases. + - When you need local-first development with remote deployment options + +4. **Custom Storage Solutions**: - The system allows for implementation of custom storage options to meet specific needs. ## Choosing the Right Storage Option - Use In-Memory Storage for development, testing, or when persistence between application restarts is not necessary. - Choose DynamoDB Storage for production environments where conversation history needs to be preserved long-term or across multiple instances of your application. +- Consider SQL Storage for a balance between simplicity and scalability, supporting both local and remote databases. - Implement a custom storage solution if you have specific requirements not met by the provided options. ## Next Steps - Learn more about [In-Memory Storage](/multi-agent-orchestrator/storage/in-memory) - Explore [DynamoDB Storage](/multi-agent-orchestrator/storage/dynamodb) for persistent storage +- Explore [SQL Storage](/multi-agent-orchestrator/storage/sql) for persistent storage using SQLite or Turso. - Discover how to [implement custom storage solutions](/multi-agent-orchestrator/storage/custom) By leveraging these storage options, you can ensure that your Multi-Agent Orchestrator System maintains the necessary context for coherent and effective conversations across various use cases and deployment scenarios. \ No newline at end of file From 951fcbde6e7f320c6781d37672ac46ec5be07133 Mon Sep 17 00:00:00 2001 From: Rajaniraiyn R Date: Tue, 24 Dec 2024 20:17:58 +0530 Subject: [PATCH 7/7] Add Python package installation instructions for DynamoDB and SQL storage --- docs/src/content/docs/storage/dynamodb.mdx | 8 ++++++++ docs/src/content/docs/storage/sql.mdx | 10 ++++++++++ 2 files changed, 18 insertions(+) diff --git a/docs/src/content/docs/storage/dynamodb.mdx b/docs/src/content/docs/storage/dynamodb.mdx index b3917bed..426a9677 100644 --- a/docs/src/content/docs/storage/dynamodb.mdx +++ b/docs/src/content/docs/storage/dynamodb.mdx @@ -17,6 +17,14 @@ DynamoDB storage provides a scalable and persistent solution for storing convers - When long-term persistence of conversation history is required - For applications that need to scale horizontally +## Python Package + +If you haven't already installed the AWS-related dependencies, make sure to install them: + +```bash +pip install "multi-agent-orchestrator[aws]" +``` + ## Implementation To use DynamoDB storage in your Multi-Agent Orchestrator: diff --git a/docs/src/content/docs/storage/sql.mdx b/docs/src/content/docs/storage/sql.mdx index a04d7b20..68e316fe 100644 --- a/docs/src/content/docs/storage/sql.mdx +++ b/docs/src/content/docs/storage/sql.mdx @@ -22,6 +22,16 @@ SQL storage provides a flexible and reliable solution for storing conversation h - When working with edge or serverless deployments - When you need local-first development with remote deployment options +## Python Package Installation + +To use SQL storage in your Python application, make sure to install them: + +```bash +pip install "multi-agent-orchestrator[sql]" +``` + +This will install the `libsql-client` package required for SQL storage functionality. + ## Implementation To use SQL storage in your Multi-Agent Orchestrator: