Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better error handling #31

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ typescript/*.tgz
*aws-exports.json
!download.js

docs/*.py
docs/*.py
docs/*.txt
docs/code_execution_env

examples/local-demo/.env
typescript/coverage/**/*

Expand Down
73 changes: 66 additions & 7 deletions docs/src/content/docs/agents/built-in/bedrock-translator-agent.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The `BedrockTranslatorAgent` uses Amazon Bedrock's language models to translate
- Allows dynamic setting of source and target languages
- Can be used standalone or as part of a [ChainAgent](/multi-agent-orchestrator/agents/built-in/chain-agent)
- Configurable inference parameters for fine-tuned control
- Supports both streaming and non-streaming responses

## Creating a Bedrock Translator Agent

Expand All @@ -29,7 +30,8 @@ import { Tabs, TabItem } from '@astrojs/starlight/components';
const agent = new BedrockTranslatorAgent({
name: 'BasicTranslator',
description: 'Translates text to English',
targetLanguage: 'English'
targetLanguage: 'English',
streaming: false // Set to true for streaming responses
});
```
</TabItem>
Expand All @@ -40,7 +42,8 @@ import { Tabs, TabItem } from '@astrojs/starlight/components';
agent = BedrockTranslatorAgent(BedrockTranslatorAgentOptions(
name='BasicTranslator',
description='Translates text to English',
target_language='English'
target_language='English',
streaming=False # Set to True for streaming responses
))
```
</TabItem>
Expand All @@ -62,6 +65,7 @@ For more complex use cases, you can create a BedrockTranslatorAgent with custom
targetLanguage: 'German',
modelId: BEDROCK_MODEL_ID_CLAUDE_3_SONNET,
region: 'us-west-2',
streaming: true, // Enable streaming responses
inferenceConfig: {
maxTokens: 2000,
temperature: 0.1,
Expand All @@ -85,6 +89,7 @@ For more complex use cases, you can create a BedrockTranslatorAgent with custom
target_language='German',
model_id=BEDROCK_MODEL_ID_CLAUDE_3_SONNET,
region='us-west-2',
streaming=True, # Enable streaming responses
inference_config={
'maxTokens': 2000,
'temperature': 0.1,
Expand All @@ -98,6 +103,56 @@ For more complex use cases, you can create a BedrockTranslatorAgent with custom
</TabItem>
</Tabs>

## Streaming Responses

The `streaming` parameter allows you to choose between receiving the entire translation at once or as a stream of partial responses. When set to `true`, the agent will return an asynchronous iterable of string chunks, which can be useful for real-time display of translations or processing very large texts.

### Example of using streaming responses:

<Tabs syncKey="runtime">
<TabItem label="TypeScript" icon="seti:typescript" color="blue">
```typescript
import { BedrockTranslatorAgent, BedrockTranslatorAgentOptions } from 'multi-agent-orchestrator';

const agent = new BedrockTranslatorAgent({
name: 'StreamingTranslator',
description: 'Translates text with streaming responses',
targetLanguage: 'French',
streaming: true
});

async function translateWithStreaming(text: string) {
const response = await agent.processRequest(text, 'user123', 'session456');
for await (const chunk of response) {
console.log('Partial translation:', chunk);
}
}

translateWithStreaming("Hello, world!");
```
</TabItem>
<TabItem label="Python" icon="seti:python">
```python
from multi_agent_orchestrator.agents import BedrockTranslatorAgent, BedrockTranslatorAgentOptions

agent = BedrockTranslatorAgent(BedrockTranslatorAgentOptions(
name='StreamingTranslator',
description='Translates text with streaming responses',
target_language='French',
streaming=True
))

async def translate_with_streaming(text: str):
response = await agent.process_request(text, 'user123', 'session456')
async for chunk in response:
print('Partial translation:', chunk)

import asyncio
asyncio.run(translate_with_streaming("Hello, world!"))
```
</TabItem>
</Tabs>

## Dynamic Language Setting

To set the language during the invocation:
Expand All @@ -109,7 +164,8 @@ To set the language during the invocation:

const translator = new BedrockTranslatorAgent({
name: 'DynamicTranslator',
description: 'Translator with dynamically set languages'
description: 'Translator with dynamically set languages',
streaming: false // Set to true if you want streaming responses
});

const orchestrator = new MultiAgentOrchestrator();
Expand Down Expand Up @@ -140,7 +196,8 @@ To set the language during the invocation:

translator = BedrockTranslatorAgent(BedrockTranslatorAgentOptions(
name='DynamicTranslator',
description='Translator with dynamically set languages'
description='Translator with dynamically set languages',
streaming=False # Set to True if you want streaming responses
))

orchestrator = MultiAgentOrchestrator()
Expand Down Expand Up @@ -180,7 +237,8 @@ The `BedrockTranslatorAgent` can be effectively used within a `ChainAgent` for c
const translatorToEnglish = new BedrockTranslatorAgent({
name: 'TranslatorToEnglish',
description: 'Translates input to English',
targetLanguage: 'English'
targetLanguage: 'English',
streaming: false // Set to true for streaming responses
});

// Create a processing agent (e.g., a BedrockLLMAgent)
Expand Down Expand Up @@ -226,7 +284,8 @@ The `BedrockTranslatorAgent` can be effectively used within a `ChainAgent` for c
translator_to_english = BedrockTranslatorAgent(BedrockTranslatorAgentOptions(
name='TranslatorToEnglish',
description='Translates input to English',
target_language='English'
target_language='English',
streaming=False # Set to True for streaming responses
))

# Create a processing agent (e.g., a BedrockLLMAgent)
Expand Down Expand Up @@ -273,4 +332,4 @@ This setup allows for seamless multilingual processing, where the core logic can

---

By leveraging the `BedrockTranslatorAgent`, you can create sophisticated multilingual applications and workflows, enabling seamless communication and processing across language barriers in your Multi-Agent Orchestrator system.
By leveraging the `BedrockTranslatorAgent`, you can create sophisticated multilingual applications and workflows, enabling seamless communication and processing across language barriers in your Multi-Agent Orchestrator system. The streaming capability allows for real-time translation of large texts or integration into applications that require immediate feedback.
18 changes: 17 additions & 1 deletion python/src/multi_agent_orchestrator/agents/agent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import Dict, List, Union, AsyncIterable, Optional, Any
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from multi_agent_orchestrator.types import ConversationMessage
from multi_agent_orchestrator.types import ConversationMessage, ParticipantRole
from multi_agent_orchestrator.utils import Logger


@dataclass
class AgentProcessingResult:
Expand Down Expand Up @@ -60,3 +62,17 @@ async def process_request(
additional_params: Optional[Dict[str, str]] = None
) -> Union[ConversationMessage, AsyncIterable[any]]:
pass

def create_error_response(self, message: str, error: Optional[Exception] = None) -> ConversationMessage:
error_message = "Sorry, I encountered an error while processing your request."
if error is not None:
error_message += f" Error details: {str(error)}"
else:
error_message += f" {message}"

Logger.error(f"{self.name} Error: {error_message}")

return ConversationMessage(
role=ParticipantRole.ASSISTANT,
content=[{"text": error_message}]
)
15 changes: 2 additions & 13 deletions python/src/multi_agent_orchestrator/agents/amazon_bedrock_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,21 @@
from multi_agent_orchestrator.types import ConversationMessage, ParticipantRole
from multi_agent_orchestrator.utils import Logger


@dataclass
class AmazonBedrockAgentOptions(AgentOptions):
"""Options for Amazon Bedrock Agent."""
agent_id: str = None
agent_alias_id: str = None


class AmazonBedrockAgent(Agent):
"""
Represents an Amazon Bedrock agent that interacts with a runtime client.
Extends base Agent class and implements specific methods for Amazon Bedrock.
"""

def __init__(self, options: AmazonBedrockAgentOptions):
"""
Constructs an instance of AmazonBedrockAgent with the specified options.
Initializes the agent ID, agent alias ID, and creates a new Bedrock agent runtime client.

:param options: Options to configure the Amazon Bedrock agent.
"""
super().__init__(options)
Expand All @@ -46,12 +42,11 @@ async def process_request(
) -> ConversationMessage:
"""
Processes a user request by sending it to the Amazon Bedrock agent for processing.

:param input_text: The user input as a string.
:param user_id: The ID of the user sending the request.
:param session_id: The ID of the session associated with the conversation.
:param chat_history: A list of ConversationMessage objects representing
the conversation history.
the conversation history.
:param additional_params: Optional additional parameters as key-value pairs.
:return: A ConversationMessage object containing the agent's response.
"""
Expand All @@ -62,7 +57,6 @@ async def process_request(
sessionId=session_id,
inputText=input_text
)

completion = ""
for event in response['completion']:
if 'chunk' in event:
Expand All @@ -71,15 +65,10 @@ async def process_request(
completion += decoded_response
else:
Logger.warn("Received a chunk event with no chunk data")

return ConversationMessage(
role=ParticipantRole.ASSISTANT,
content=[{"text": completion}]
)

except (BotoCoreError, ClientError) as error:
Logger.error(f"Error processing request: {error}")
return ConversationMessage(
role=ParticipantRole.ASSISTANT,
content=[{"text": "Sorry, I encountered an error while processing your request."}]
)
return self.createErrorResponse("An error occurred while processing your request.", error)
101 changes: 52 additions & 49 deletions python/src/multi_agent_orchestrator/agents/bedrock_llm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,65 +81,68 @@ async def process_request(
chat_history: List[ConversationMessage],
additional_params: Optional[Dict[str, str]] = None
) -> Union[ConversationMessage, AsyncIterable[Any]]:

user_message =ConversationMessage(
role=ParticipantRole.USER.value,
content=[{'text': input_text}]
)
try:
user_message = ConversationMessage(
role=ParticipantRole.USER.value,
content=[{'text': input_text}]
)

conversation = [*chat_history, user_message]
conversation = [*chat_history, user_message]

self.update_system_prompt()
self.update_system_prompt()

system_prompt = self.system_prompt

system_prompt = self.system_prompt

if self.retriever:
response = await self.retriever.retrieve_and_combine_results(input_text)
context_prompt = "\nHere is the context to use to answer the user's question:\n" + response
system_prompt += context_prompt

converse_cmd = {
'modelId': self.model_id,
'messages': conversation_to_dict(conversation),
'system': [{'text': system_prompt}],
'inferenceConfig': {
'maxTokens': self.inference_config.get('maxTokens'),
'temperature': self.inference_config.get('temperature'),
'topP': self.inference_config.get('topP'),
'stopSequences': self.inference_config.get('stopSequences'),
if self.retriever:
response = await self.retriever.retrieve_and_combine_results(input_text)
context_prompt = "\nHere is the context to use to answer the user's question:\n" + response
system_prompt += context_prompt

converse_cmd = {
'modelId': self.model_id,
'messages': conversation_to_dict(conversation),
'system': [{'text': system_prompt}],
'inferenceConfig': {
'maxTokens': self.inference_config.get('maxTokens'),
'temperature': self.inference_config.get('temperature'),
'topP': self.inference_config.get('topP'),
'stopSequences': self.inference_config.get('stopSequences'),
}
}
}

if self.guardrail_config:
converse_cmd["guardrailConfig"] = self.guardrail_config
if self.guardrail_config:
converse_cmd["guardrailConfig"] = self.guardrail_config

if self.tool_config:
converse_cmd["toolConfig"] = self.tool_config["tool"]
if self.tool_config:
converse_cmd["toolConfig"] = self.tool_config["tool"]

if self.tool_config:
continue_with_tools = True
final_message: ConversationMessage = {'role': ParticipantRole.USER.value, 'content': []}
max_recursions = self.tool_config.get('toolMaxRecursions', self.default_max_recursions)
if self.tool_config:
continue_with_tools = True
final_message: ConversationMessage = {'role': ParticipantRole.USER.value, 'content': []}
max_recursions = self.tool_config.get('toolMaxRecursions', self.default_max_recursions)

while continue_with_tools and max_recursions > 0:
bedrock_response = await self.handle_single_response(converse_cmd)
conversation.append(bedrock_response)
while continue_with_tools and max_recursions > 0:
bedrock_response = await self.handle_single_response(converse_cmd)
conversation.append(bedrock_response)

if any('toolUse' in content for content in bedrock_response.content):
await self.tool_config['useToolHandler'](bedrock_response, conversation)
else:
continue_with_tools = False
final_message = bedrock_response
if any('toolUse' in content for content in bedrock_response.content):
await self.tool_config['useToolHandler'](bedrock_response, conversation)
else:
continue_with_tools = False
final_message = bedrock_response

max_recursions -= 1
converse_cmd['messages'] = conversation
max_recursions -= 1
converse_cmd['messages'] = conversation

return final_message
return final_message

if self.streaming:
return await self.handle_streaming_response(converse_cmd)
if self.streaming:
return await self.handle_streaming_response(converse_cmd)

return await self.handle_single_response(converse_cmd)
return await self.handle_single_response(converse_cmd)
except Exception as error:
Logger.error("Error in BedrockLLMAgent.process_request:", error)
return self.createErrorResponse("An error occurred while processing your request.", error)

async def handle_single_response(self, converse_input: Dict[str, Any]) -> ConversationMessage:
try:
Expand All @@ -152,7 +155,7 @@ async def handle_single_response(self, converse_input: Dict[str, Any]) -> Conver
)
except Exception as error:
Logger.error("Error invoking Bedrock model:", error)
raise
return self.createErrorResponse("An error occurred while processing your request with the Bedrock model.", error)

async def handle_streaming_response(self, converse_input: Dict[str, Any]) -> ConversationMessage:
try:
Expand All @@ -168,7 +171,7 @@ async def handle_streaming_response(self, converse_input: Dict[str, Any]) -> Con
)
except Exception as error:
Logger.error("Error getting stream from Bedrock model:", error)
raise
return self.createErrorResponse("An error occurred while streaming the response from the Bedrock model.", error)

def set_system_prompt(self,
template: Optional[str] = None,
Expand All @@ -192,4 +195,4 @@ def replace(match):
return '\n'.join(value) if isinstance(value, list) else str(value)
return match.group(0)

return re.sub(r'{{(\w+)}}', replace, template)
return re.sub(r'{{(\w+)}}', replace, template)
Loading