From 7578183ac25e74518e07f06a61b3eec8609ac2ed Mon Sep 17 00:00:00 2001 From: Henry Date: Thu, 7 Dec 2023 18:46:03 +0000 Subject: [PATCH 1/2] add custom analytics --- .../agents/OpenAIAssistant/OpenAIAssistant.ts | 32 +- packages/components/package.json | 3 +- packages/components/src/handler.ts | 489 ++++++++++++++++++ packages/server/src/index.ts | 2 + 4 files changed, 522 insertions(+), 4 deletions(-) diff --git a/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts b/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts index 7f2377bde67..d442639485f 100644 --- a/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts +++ b/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts @@ -8,6 +8,7 @@ import * as path from 'node:path' import fetch from 'node-fetch' import { flatten, uniqWith, isEqual } from 'lodash' import { zodToJsonSchema } from 'zod-to-json-schema' +import { AnalyticHandler } from '../../../src/handler' class OpenAIAssistant_Agents implements INode { label: string @@ -149,6 +150,11 @@ class OpenAIAssistant_Agents implements INode { const openai = new OpenAI({ apiKey: openAIApiKey }) + // Start analytics + const analyticHandlers = new AnalyticHandler(nodeData, options) + await analyticHandlers.init() + const parentIds = await analyticHandlers.onChainStart('OpenAIAssistant', input) + try { const assistantDetails = JSON.parse(assistant.details) const openAIAssistantId = assistantDetails.id @@ -171,7 +177,8 @@ class OpenAIAssistant_Agents implements INode { } const chatmessage = await appDataSource.getRepository(databaseEntities['ChatMessage']).findOneBy({ - chatId: options.chatId + chatId: options.chatId, + chatflowid: options.chatflowid }) let threadId = '' @@ -185,7 +192,7 @@ class OpenAIAssistant_Agents implements INode { threadId = thread.id } - // List all runs + // List all runs, in case existing thread is still running if (!isNewThread) { const promise = (threadId: string) => { return new Promise((resolve) => { @@ -221,6 +228,7 @@ class OpenAIAssistant_Agents implements INode { }) // Run assistant thread + const llmIds = await analyticHandlers.onLLMStart('ChatOpenAI', input, parentIds) const runThread = await openai.beta.threads.runs.create(threadId, { assistant_id: retrievedAssistant.id }) @@ -253,7 +261,15 @@ class OpenAIAssistant_Agents implements INode { for (let i = 0; i < actions.length; i += 1) { const tool = tools.find((tool: any) => tool.name === actions[i].tool) if (!tool) continue + + // Start tool analytics + const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds) + const toolOutput = await tool.call(actions[i].toolInput) + + // End tool analytics + await analyticHandlers.onToolEnd(toolIds, toolOutput) + submitToolOutputs.push({ tool_call_id: actions[i].toolCallId, output: toolOutput @@ -302,7 +318,9 @@ class OpenAIAssistant_Agents implements INode { runThreadId = newRunThread.id state = await promise(threadId, newRunThread.id) } else { - throw new Error(`Error processing thread: ${state}, Thread ID: ${threadId}`) + const errMsg = `Error processing thread: ${state}, Thread ID: ${threadId}` + await analyticHandlers.onChainError(parentIds, errMsg) + throw new Error(errMsg) } } @@ -387,11 +405,18 @@ class OpenAIAssistant_Agents implements INode { const bitmap = fsDefault.readFileSync(filePath) const base64String = Buffer.from(bitmap).toString('base64') + // TODO: Use a file path and retrieve image on the fly. Storing as base64 to localStorage and database will easily hit limits const imgHTML = `${fileObj.filename}
` returnVal += imgHTML } } + const imageRegex = /]*\/>/g + let llmOutput = returnVal.replace(imageRegex, '') + llmOutput = llmOutput.replace('
', '') + await analyticHandlers.onLLMEnd(llmIds, llmOutput) + await analyticHandlers.onChainEnd(parentIds, messageData, true) + return { text: returnVal, usedTools, @@ -399,6 +424,7 @@ class OpenAIAssistant_Agents implements INode { assistant: { assistantId: openAIAssistantId, threadId, runId: runThreadId, messages: messageData } } } catch (error) { + await analyticHandlers.onChainError(parentIds, error, true) throw new Error(error) } } diff --git a/packages/components/package.json b/packages/components/package.json index dd87754d544..a775e630de2 100644 --- a/packages/components/package.json +++ b/packages/components/package.json @@ -51,8 +51,9 @@ "husky": "^8.0.3", "ioredis": "^5.3.2", "langchain": "^0.0.196", + "langfuse": "^1.2.0", "langfuse-langchain": "^1.0.31", - "langsmith": "^0.0.32", + "langsmith": "^0.0.49", "linkifyjs": "^4.1.1", "llmonitor": "^0.5.5", "mammoth": "^1.5.1", diff --git a/packages/components/src/handler.ts b/packages/components/src/handler.ts index 456cf39c30c..ae5a9de00b5 100644 --- a/packages/components/src/handler.ts +++ b/packages/components/src/handler.ts @@ -8,6 +8,10 @@ import { LLMonitorHandler } from 'langchain/callbacks/handlers/llmonitor' import { getCredentialData, getCredentialParam } from './utils' import { ICommonObject, INodeData } from './Interface' import CallbackHandler from 'langfuse-langchain' +import { RunTree, RunTreeConfig, Client as LangsmithClient } from 'langsmith' +import { Langfuse, LangfuseTraceClient, LangfuseSpanClient, LangfuseGenerationClient } from 'langfuse' // or "langfuse-node" +import monitor from 'llmonitor' +import { v4 as uuidv4 } from 'uuid' interface AgentRun extends Run { actions: AgentAction[] @@ -273,3 +277,488 @@ export const additionalCallbacks = async (nodeData: INodeData, options: ICommonO throw new Error(e) } } + +export class AnalyticHandler { + nodeData: INodeData + options: ICommonObject = {} + handlers: ICommonObject = {} + + constructor(nodeData: INodeData, options: ICommonObject) { + this.options = options + this.nodeData = nodeData + this.init() + } + + async init() { + try { + if (!this.options.analytic) return + + const analytic = JSON.parse(this.options.analytic) + + for (const provider in analytic) { + const providerStatus = analytic[provider].status as boolean + + if (providerStatus) { + const credentialId = analytic[provider].credentialId as string + const credentialData = await getCredentialData(credentialId ?? '', this.options) + if (provider === 'langSmith') { + const langSmithProject = analytic[provider].projectName as string + const langSmithApiKey = getCredentialParam('langSmithApiKey', credentialData, this.nodeData) + const langSmithEndpoint = getCredentialParam('langSmithEndpoint', credentialData, this.nodeData) + + const client = new LangsmithClient({ + apiUrl: langSmithEndpoint ?? 'https://api.smith.langchain.com', + apiKey: langSmithApiKey + }) + + this.handlers['langSmith'] = { client, langSmithProject } + } else if (provider === 'langFuse') { + const release = analytic[provider].release as string + const langFuseSecretKey = getCredentialParam('langFuseSecretKey', credentialData, this.nodeData) + const langFusePublicKey = getCredentialParam('langFusePublicKey', credentialData, this.nodeData) + const langFuseEndpoint = getCredentialParam('langFuseEndpoint', credentialData, this.nodeData) + + const langfuse = new Langfuse({ + secretKey: langFuseSecretKey, + publicKey: langFusePublicKey, + baseUrl: langFuseEndpoint ?? 'https://cloud.langfuse.com', + release + }) + this.handlers['langFuse'] = { client: langfuse } + } else if (provider === 'llmonitor') { + const llmonitorAppId = getCredentialParam('llmonitorAppId', credentialData, this.nodeData) + const llmonitorEndpoint = getCredentialParam('llmonitorEndpoint', credentialData, this.nodeData) + + monitor.init({ + appId: llmonitorAppId, + apiUrl: llmonitorEndpoint + }) + + this.handlers['llmonitor'] = { client: monitor } + } + } + } + } catch (e) { + throw new Error(e) + } + } + + async onChainStart(name: string, input: string, parentIds?: ICommonObject) { + const returnIds: ICommonObject = { + langSmith: {}, + langFuse: {}, + llmonitor: {} + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) { + if (!parentIds || !Object.keys(parentIds).length) { + const parentRunConfig: RunTreeConfig = { + name, + run_type: 'chain', + inputs: { + text: input + }, + serialized: {}, + project_name: this.handlers['langSmith'].langSmithProject, + client: this.handlers['langSmith'].client + } + const parentRun = new RunTree(parentRunConfig) + await parentRun.postRun() + this.handlers['langSmith'].chainRun = { [parentRun.id]: parentRun } + returnIds['langSmith'].chainRun = parentRun.id + } else { + const parentRun: RunTree | undefined = this.handlers['langSmith'].chainRun[parentIds['langSmith'].chainRun] + if (parentRun) { + const childChainRun = await parentRun.createChild({ + name, + run_type: 'chain', + inputs: { + text: input + } + }) + await childChainRun.postRun() + this.handlers['langSmith'].chainRun = { [childChainRun.id]: childChainRun } + returnIds['langSmith'].chainRun = childChainRun.id + } + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) { + let langfuseTraceClient: LangfuseTraceClient + + if (!parentIds || !Object.keys(parentIds).length) { + const langfuse: Langfuse = this.handlers['langFuse'].client + langfuseTraceClient = langfuse.trace({ + name, + userId: this.options.chatId, + metadata: { tags: ['openai-assistant'] } + }) + } else { + langfuseTraceClient = this.handlers['langFuse'].trace[parentIds['langFuse']] + } + + if (langfuseTraceClient) { + const span = langfuseTraceClient.span({ + name, + input: { + text: input + } + }) + this.handlers['langFuse'].trace = { [langfuseTraceClient.id]: langfuseTraceClient } + this.handlers['langFuse'].span = { [span.id]: span } + returnIds['langFuse'].trace = langfuseTraceClient.id + returnIds['langFuse'].span = span.id + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) { + const monitor = this.handlers['llmonitor'].client + + if (monitor) { + const runId = uuidv4() + await monitor.trackEvent('chain', 'start', { + runId, + name, + userId: this.options.chatId, + input + }) + this.handlers['llmonitor'].chainEvent = { [runId]: runId } + returnIds['llmonitor'].chainEvent = runId + } + } + + return returnIds + } + + async onChainEnd(returnIds: ICommonObject, output: string | object, shutdown = false) { + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) { + const chainRun: RunTree | undefined = this.handlers['langSmith'].chainRun[returnIds['langSmith'].chainRun] + if (chainRun) { + await chainRun.end({ + outputs: { + output + } + }) + await chainRun.patchRun() + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) { + const span: LangfuseSpanClient | undefined = this.handlers['langFuse'].span[returnIds['langFuse'].span] + if (span) { + span.end({ + output + }) + if (shutdown) { + const langfuse: Langfuse = this.handlers['langFuse'].client + await langfuse.shutdownAsync() + } + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) { + const chainEventId = returnIds['llmonitor'].chainEvent + const monitor = this.handlers['llmonitor'].client + + if (monitor && chainEventId) { + await monitor.trackEvent('chain', 'end', { + runId: chainEventId, + output + }) + } + } + } + + async onChainError(returnIds: ICommonObject, error: string | object, shutdown = false) { + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) { + const chainRun: RunTree | undefined = this.handlers['langSmith'].chainRun[returnIds['langSmith'].chainRun] + if (chainRun) { + await chainRun.end({ + error: { + error + } + }) + await chainRun.patchRun() + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) { + const span: LangfuseSpanClient | undefined = this.handlers['langFuse'].span[returnIds['langFuse'].span] + if (span) { + span.end({ + output: { + error + } + }) + if (shutdown) { + const langfuse: Langfuse = this.handlers['langFuse'].client + await langfuse.shutdownAsync() + } + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) { + const chainEventId = returnIds['llmonitor'].chainEvent + const monitor = this.handlers['llmonitor'].client + + if (monitor && chainEventId) { + await monitor.trackEvent('chain', 'end', { + runId: chainEventId, + output: error + }) + } + } + } + + async onLLMStart(name: string, input: string, parentIds: ICommonObject) { + const returnIds: ICommonObject = { + langSmith: {}, + langFuse: {}, + llmonitor: {} + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) { + const parentRun: RunTree | undefined = this.handlers['langSmith'].chainRun[parentIds['langSmith'].chainRun] + if (parentRun) { + const childLLMRun = await parentRun.createChild({ + name, + run_type: 'llm', + inputs: { + prompts: [input] + } + }) + await childLLMRun.postRun() + this.handlers['langSmith'].llmRun = { [childLLMRun.id]: childLLMRun } + returnIds['langSmith'].llmRun = childLLMRun.id + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) { + const trace: LangfuseTraceClient | undefined = this.handlers['langFuse'].trace[parentIds['langFuse'].trace] + if (trace) { + const generation = trace.generation({ + name, + prompt: input + }) + this.handlers['langFuse'].generation = { [generation.id]: generation } + returnIds['langFuse'].generation = generation.id + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) { + const monitor = this.handlers['llmonitor'].client + const chainEventId: string = this.handlers['llmonitor'].chainEvent[parentIds['llmonitor'].chainEvent] + + if (monitor && chainEventId) { + const runId = uuidv4() + await monitor.trackEvent('llm', 'start', { + runId, + parentRunId: chainEventId, + name, + userId: this.options.chatId, + input + }) + this.handlers['llmonitor'].llmEvent = { [runId]: runId } + returnIds['llmonitor'].llmEvent = runId + } + } + + return returnIds + } + + async onLLMEnd(returnIds: ICommonObject, output: string) { + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) { + const llmRun: RunTree | undefined = this.handlers['langSmith'].llmRun[returnIds['langSmith'].llmRun] + if (llmRun) { + await llmRun.end({ + outputs: { + generations: [output] + } + }) + await llmRun.patchRun() + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) { + const generation: LangfuseGenerationClient | undefined = this.handlers['langFuse'].generation[returnIds['langFuse'].generation] + if (generation) { + generation.end({ + completion: output + }) + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) { + const llmEventId: string = this.handlers['llmonitor'].llmEvent[returnIds['llmonitor'].llmEvent] + const monitor = this.handlers['llmonitor'].client + + if (monitor && llmEventId) { + await monitor.trackEvent('llm', 'end', { + runId: llmEventId, + output + }) + } + } + } + + async onLLMError(returnIds: ICommonObject, error: string | object) { + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) { + const llmRun: RunTree | undefined = this.handlers['langSmith'].llmRun[returnIds['langSmith'].llmRun] + if (llmRun) { + await llmRun.end({ + error: { + error + } + }) + await llmRun.patchRun() + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) { + const generation: LangfuseGenerationClient | undefined = this.handlers['langFuse'].generation[returnIds['langFuse'].generation] + if (generation) { + generation.end({ + completion: error + }) + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) { + const llmEventId: string = this.handlers['llmonitor'].llmEvent[returnIds['llmonitor'].llmEvent] + const monitor = this.handlers['llmonitor'].client + + if (monitor && llmEventId) { + await monitor.trackEvent('llm', 'end', { + runId: llmEventId, + output: error + }) + } + } + } + + async onToolStart(name: string, input: string | object, parentIds: ICommonObject) { + const returnIds: ICommonObject = { + langSmith: {}, + langFuse: {}, + llmonitor: {} + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) { + const parentRun: RunTree | undefined = this.handlers['langSmith'].chainRun[parentIds['langSmith'].chainRun] + if (parentRun) { + const childToolRun = await parentRun.createChild({ + name, + run_type: 'tool', + inputs: { + input + } + }) + await childToolRun.postRun() + this.handlers['langSmith'].toolRun = { [childToolRun.id]: childToolRun } + returnIds['langSmith'].toolRun = childToolRun.id + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) { + const trace: LangfuseTraceClient | undefined = this.handlers['langFuse'].trace[parentIds['langFuse'].trace] + if (trace) { + const toolSpan = trace.span({ + name, + input + }) + this.handlers['langFuse'].toolSpan = { [toolSpan.id]: toolSpan } + returnIds['langFuse'].toolSpan = toolSpan.id + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) { + const monitor = this.handlers['llmonitor'].client + const chainEventId: string = this.handlers['llmonitor'].chainEvent[parentIds['llmonitor'].chainEvent] + + if (monitor && chainEventId) { + const runId = uuidv4() + await monitor.trackEvent('tool', 'start', { + runId, + parentRunId: chainEventId, + name, + userId: this.options.chatId, + input + }) + this.handlers['llmonitor'].toolEvent = { [runId]: runId } + returnIds['llmonitor'].toolEvent = runId + } + } + + return returnIds + } + + async onToolEnd(returnIds: ICommonObject, output: string | object) { + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) { + const toolRun: RunTree | undefined = this.handlers['langSmith'].toolRun[returnIds['langSmith'].toolRun] + if (toolRun) { + await toolRun.end({ + outputs: { + output + } + }) + await toolRun.patchRun() + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) { + const toolSpan: LangfuseSpanClient | undefined = this.handlers['langFuse'].toolSpan[returnIds['langFuse'].toolSpan] + if (toolSpan) { + toolSpan.end({ + output + }) + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) { + const toolEventId: string = this.handlers['llmonitor'].toolEvent[returnIds['llmonitor'].toolEvent] + const monitor = this.handlers['llmonitor'].client + + if (monitor && toolEventId) { + await monitor.trackEvent('tool', 'end', { + runId: toolEventId, + output + }) + } + } + } + + async onToolError(returnIds: ICommonObject, error: string | object) { + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) { + const toolRun: RunTree | undefined = this.handlers['langSmith'].toolRun[returnIds['langSmith'].toolRun] + if (toolRun) { + await toolRun.end({ + error: { + error + } + }) + await toolRun.patchRun() + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) { + const toolSpan: LangfuseSpanClient | undefined = this.handlers['langFuse'].toolSpan[returnIds['langFuse'].toolSpan] + if (toolSpan) { + toolSpan.end({ + output: error + }) + } + } + + if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) { + const toolEventId: string = this.handlers['llmonitor'].llmEvent[returnIds['llmonitor'].toolEvent] + const monitor = this.handlers['llmonitor'].client + + if (monitor && toolEventId) { + await monitor.trackEvent('tool', 'end', { + runId: toolEventId, + output: error + }) + } + } + } +} diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index d87d2c0ac02..61e55159b1f 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -1470,6 +1470,7 @@ export class App { let result = isStreamValid ? await nodeInstance.run(nodeToExecuteData, incomingInput.question, { + chatflowid, chatHistory, socketIO, socketIOClientId: incomingInput.socketIOClientId, @@ -1480,6 +1481,7 @@ export class App { chatId }) : await nodeInstance.run(nodeToExecuteData, incomingInput.question, { + chatflowid, chatHistory, logger, appDataSource: this.AppDataSource, From da2fe78e4491a3d611266325ba9b2b2e06c1b732 Mon Sep 17 00:00:00 2001 From: Henry Heng Date: Fri, 8 Dec 2023 12:09:29 +0000 Subject: [PATCH 2/2] Update handler.ts --- packages/components/src/handler.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/components/src/handler.ts b/packages/components/src/handler.ts index ae5a9de00b5..29aff3e2f05 100644 --- a/packages/components/src/handler.ts +++ b/packages/components/src/handler.ts @@ -9,7 +9,7 @@ import { getCredentialData, getCredentialParam } from './utils' import { ICommonObject, INodeData } from './Interface' import CallbackHandler from 'langfuse-langchain' import { RunTree, RunTreeConfig, Client as LangsmithClient } from 'langsmith' -import { Langfuse, LangfuseTraceClient, LangfuseSpanClient, LangfuseGenerationClient } from 'langfuse' // or "langfuse-node" +import { Langfuse, LangfuseTraceClient, LangfuseSpanClient, LangfuseGenerationClient } from 'langfuse' import monitor from 'llmonitor' import { v4 as uuidv4 } from 'uuid'