diff --git a/poetry.lock b/poetry.lock index efce317..d370e49 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1205,6 +1205,17 @@ docs = ["myst-parser (==0.18.0)", "sphinx (==5.1.1)"] ssh = ["paramiko (>=2.4.3)"] websockets = ["websocket-client (>=1.3.0)"] +[[package]] +name = "docstring-parser" +version = "0.16" +description = "Parse Python docstrings in reST, Google and Numpydoc format" +optional = false +python-versions = ">=3.6,<4.0" +files = [ + {file = "docstring_parser-0.16-py3-none-any.whl", hash = "sha256:bf0a1387354d3691d102edef7ec124f219ef639982d096e26e3b60aeffa90637"}, + {file = "docstring_parser-0.16.tar.gz", hash = "sha256:538beabd0af1e2db0146b6bd3caa526c35a34d61af9fd2887f3a8a27a739aa6e"}, +] + [[package]] name = "drawsvg" version = "2.4.0" @@ -1838,6 +1849,38 @@ files = [ {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, ] +[[package]] +name = "instructor" +version = "1.4.2" +description = "structured outputs for llm" +optional = false +python-versions = "<4.0,>=3.9" +files = [ + {file = "instructor-1.4.2-py3-none-any.whl", hash = "sha256:e3c4791a9346eaa55877dc432024d6cf87d75ed03ab592b4cfc6fb201028af63"}, + {file = "instructor-1.4.2.tar.gz", hash = "sha256:1568bdbcae40eda8505be0e971da8e4c1c4cad7e419dbfb9756df52bc4f69808"}, +] + +[package.dependencies] +aiohttp = ">=3.9.1,<4.0.0" +docstring-parser = ">=0.16,<0.17" +jiter = ">=0.5.0,<0.6.0" +openai = ">=1.45.0,<2.0.0" +pydantic = ">=2.8.0,<3.0.0" +pydantic-core = ">=2.18.0,<3.0.0" +rich = ">=13.7.0,<14.0.0" +tenacity = ">=8.4.1,<9.0.0" +typer = ">=0.9.0,<1.0.0" + +[package.extras] +anthropic = ["anthropic (>=0.27.0,<0.28.0)", "xmltodict (>=0.13.0,<0.14.0)"] +cohere = ["cohere (>=5.1.8,<6.0.0)"] +google-generativeai = ["google-generativeai (>=0.5.4,<0.6.0)"] +groq = ["groq (>=0.4.2,<0.5.0)"] +litellm = ["litellm (>=1.35.31,<2.0.0)"] +mistralai = ["mistralai (>=0.1.8,<0.2.0)"] +test-docs = ["anthropic (>=0.27.0,<0.28.0)", "cohere (>=5.1.8,<6.0.0)", "diskcache (>=5.6.3,<6.0.0)", "fastapi (>=0.109.2,<0.110.0)", "groq (>=0.4.2,<0.5.0)", "litellm (>=1.35.31,<2.0.0)", "mistralai (>=0.1.8,<0.2.0)", "pandas (>=2.2.0,<3.0.0)", "pydantic_extra_types (>=2.6.0,<3.0.0)", "redis (>=5.0.1,<6.0.0)", "tabulate (>=0.9.0,<0.10.0)"] +vertexai = ["google-cloud-aiplatform (>=1.53.0,<2.0.0)", "jsonref (>=1.1.0,<2.0.0)"] + [[package]] name = "ipython" version = "8.27.0" @@ -2939,13 +2982,13 @@ sympy = "*" [[package]] name = "openai" -version = "1.43.0" +version = "1.46.0" description = "The official Python library for the openai API" optional = false python-versions = ">=3.7.1" files = [ - {file = "openai-1.43.0-py3-none-any.whl", hash = "sha256:1a748c2728edd3a738a72a0212ba866f4fdbe39c9ae03813508b267d45104abe"}, - {file = "openai-1.43.0.tar.gz", hash = "sha256:e607aff9fc3e28eade107e5edd8ca95a910a4b12589336d3cbb6bfe2ac306b3c"}, + {file = "openai-1.46.0-py3-none-any.whl", hash = "sha256:8e423690b121d0268c7bb83b552e14f339b0ba250e1d0f70d145c194e79c4e1b"}, + {file = "openai-1.46.0.tar.gz", hash = "sha256:0c5a783530d7cd90e2370dbd52d9239d2d53dc7a0badf9ee1e2e23d3f148969b"}, ] [package.dependencies] @@ -4720,13 +4763,13 @@ types-pyyaml = ">=6.0.12.12,<7.0.0.0" [[package]] name = "tenacity" -version = "9.0.0" +version = "8.5.0" description = "Retry code until it succeeds" optional = false python-versions = ">=3.8" files = [ - {file = "tenacity-9.0.0-py3-none-any.whl", hash = "sha256:93de0c98785b27fcf659856aa9f54bfbd399e29969b0621bc7f762bd441b4539"}, - {file = "tenacity-9.0.0.tar.gz", hash = "sha256:807f37ca97d62aa361264d497b0e31e92b8027044942bfa756160d908320d73b"}, + {file = "tenacity-8.5.0-py3-none-any.whl", hash = "sha256:b594c2a5945830c267ce6b79a166228323ed52718f30302c1359836112346687"}, + {file = "tenacity-8.5.0.tar.gz", hash = "sha256:8bc6c0c8a09b31e6cad13c47afbed1a567518250a9a171418582ed8d9c20ca78"}, ] [package.extras] @@ -5745,4 +5788,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = ">=3.12.4,<3.13" -content-hash = "d759bfcd47a6b7a833326a10ca889fa8bffc72420d6c1661fbe54735d988f001" +content-hash = "b1d3ab5985db81d68abebbcd1ae994ed3d3f1a058270dbc69c0f0993a73c62dd" diff --git a/pyproject.toml b/pyproject.toml index 986c4ba..d9f6fa7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ azure-core = "^1.30.2" drawsvg = {extras = ["all"], version = "^2.4.0"} svgpathtools = "^1.6.1" azure-cosmos = "^4.7.0" +instructor = "^1.4.2" [tool.poetry.group.dev.dependencies] pytest = "^7.4.0" diff --git a/src/Spec.py b/src/Spec.py deleted file mode 100644 index 247c474..0000000 --- a/src/Spec.py +++ /dev/null @@ -1,23 +0,0 @@ -import json -from typing import List, Union, Literal -from pydantic import BaseModel -from svg_to_png.lib.ThreatModel import User_Friendly_Block_Types - - -class Spec(BaseModel): - id: int - spec: str - instructions_to_solve: str - improvement_hints: str - hints_to_send: Union[List[User_Friendly_Block_Types], Literal["all"]] = "all" - - def build_instruction(self): - return f"""Spec id {self.id} - {self.spec}\n{self.instructions_to_solve} -If the criteria is not met, then {self.improvement_hints} -""" - - -def load_specs_from_json(file_path: str) -> List[Spec]: - with open(file_path, "r") as file: - specs_data = json.load(file) - return [Spec(**spec) for spec in specs_data] diff --git a/src/autogen_planner.py b/src/autogen_planner.py index 7c8915d..344359e 100644 --- a/src/autogen_planner.py +++ b/src/autogen_planner.py @@ -1,17 +1,18 @@ import json import re -from typing import List, Callable, Optional, Union -from datetime import datetime from dataclasses import dataclass -from dataclasses_json import dataclass_json -from botbuilder.schema import Attachment +from datetime import datetime +from typing import Callable, List, Optional, Union + +from autogen import Agent, ChatResult, GroupChat, GroupChatManager, runtime_logging from botbuilder.core import CardFactory, TurnContext -from teams.ai.prompts import Message -from teams.ai.planners import Planner, Plan, PredictedSayCommand -from autogen import Agent, GroupChat, GroupChatManager, ChatResult, runtime_logging +from botbuilder.schema import Attachment +from config import Config +from dataclasses_json import dataclass_json from state import AppTurnState +from teams.ai.planners import Plan, Planner, PredictedSayCommand +from teams.ai.prompts import Message from teams_user_proxy import TeamsUserProxy -from config import Config @dataclass_json @@ -37,10 +38,13 @@ def __init__( self.message_builder = message_builder super().__init__() - async def begin_task(self, context, state: AppTurnState): + async def begin_task(self, context: TurnContext, state: AppTurnState): return await self.continue_task(context, state) - async def continue_task(self, context, state: AppTurnState): + async def continue_task(self, context: TurnContext, state: AppTurnState): + return await self.run_with_autogen(context, state) + + async def run_with_autogen(self, context: TurnContext, state: AppTurnState): user_proxy = TeamsUserProxy( name="User", system_message="A human admin. This agent is a proxy for the user. This agent can help answer questions too.", @@ -95,6 +99,7 @@ async def continue_task(self, context, state: AppTurnState): ) if Config.ENABLE_RUNTIME_LOGGING: + print('Logging is enabled') runtime_logging.start() chat_result = await user_proxy.a_initiate_chat( @@ -157,7 +162,6 @@ async def continue_task(self, context, state: AppTurnState): ] ) - def create_chat_history_ac(message: ChatResult) -> Attachment: facts = [] for value in message.chat_history: diff --git a/src/bot.py b/src/bot.py index ec1502f..8aca79a 100644 --- a/src/bot.py +++ b/src/bot.py @@ -5,16 +5,26 @@ Description: initialize the app and listen for `message` activitys """ -from typing import Dict import sys import traceback -from autogen import GroupChat, Agent -from botbuilder.schema import Activity, ActivityTypes +from typing import Dict -from botbuilder.core import TurnContext, MemoryStorage, InvokeResponse +from autogen import Agent, GroupChat +from autogen_planner import AutoGenPlanner, PredictedSayCommandWithAttachments +from autogen_utils import ( + AppTurnStateConversationState, + TurnChatContext, + TypingCapability, +) +from botbuilder.core import InvokeResponse, MemoryStorage, TurnContext +from botbuilder.schema import Activity, ActivityTypes +from config import Config +from cosmos_memory_storage import CosmosDbPartitionedStorage +from privacy_review_assistant_group import PrivacyReviewAssistantGroup +from state import AppTurnState from teams import Application, ApplicationOptions, TeamsAdapter from teams.ai import AIOptions -from teams.ai.actions import ActionTypes, ActionTurnContext +from teams.ai.actions import ActionTurnContext, ActionTypes from teams.feedback_loop_data import FeedbackLoopData from teams.teams_attachment_downloader.teams_attachment_downloader import ( TeamsAttachmentDownloader, @@ -22,15 +32,6 @@ from teams.teams_attachment_downloader.teams_attachment_downloader_options import ( TeamsAttachmentDownloaderOptions, ) -from autogen_planner import AutoGenPlanner, PredictedSayCommandWithAttachments -from autogen_utils import AppTurnStateConversationState, TypingCapability, TurnChatContext - -# from botbuilder.azure import BlobStorage, BlobStorageSettings -from privacy_review_assistant_group import PrivacyReviewAssistantGroup - -from config import Config -from cosmos_memory_storage import CosmosDbPartitionedStorage -from state import AppTurnState from threat_model_file_utils import ( get_threat_model_image_file, get_threat_model_xml_file, @@ -39,11 +40,6 @@ config = Config() llm_config = config.build_llm_config() -if config.OPENAI_KEY is None and config.AZURE_OPENAI_KEY is None: - raise RuntimeError( - "Unable to build LLM config - please check that OPENAI_KEY or AZURE_OPENAI_KEY is set." - ) - cosmos_config = config.build_cosmos_db_config() storage = ( MemoryStorage() @@ -155,6 +151,13 @@ async def set_to_xml_multi_prompt(context: TurnContext, state: AppTurnState): await context.send_activity("Ready to use multi prompt XML evaluator") return True +@app.message("/useNoAutogen") +async def set_to_no_autogen(context: TurnContext, state: AppTurnState): + state.conversation.threat_model_evaluator = "no_autogen" + await state.save(context) + await context.send_activity("Ready to skip autogen evaluator") + return True + @app.activity("invoke") async def feedback_loop(context: TurnContext, state: AppTurnState): diff --git a/src/models.py b/src/models.py new file mode 100644 index 0000000..3265a33 --- /dev/null +++ b/src/models.py @@ -0,0 +1,130 @@ +import json +from typing import Annotated, Dict, List, Literal, Union + +from pydantic import BaseModel +from svg_to_png.lib.ThreatModel import User_Friendly_Block_Types + +Hints_To_Send = Union[List[User_Friendly_Block_Types], Literal["all"]] + +class Spec(BaseModel): + id: int + spec: str + instructions_to_solve: str + improvement_hints: str + hints_to_send: Hints_To_Send = "all" + + def build_instruction(self): + return f"""Spec id {self.id} - {self.spec}\n{self.instructions_to_solve} +If the criteria is not met, then {self.improvement_hints} +""" + + +class SpecAnswer(BaseModel): + spec_id: Annotated[int, "The spec id to answer"] + detailed_answer: Annotated[ + str, + "Does the threat model meet the spec criteria? Why or why not? Be helpful and specific.", + ] + steps_to_improve: Annotated[ + str, + "What are exact steps to improve the threat model to meet the spec criteria? Use 'None' if no steps to improve", + ] + tag: Annotated[ + Union[ + Annotated[Literal["green"], "Spec criteria is met"], + Annotated[Literal["red"], "Items needs to be fixed to meet criteria"], + Annotated[Literal["yellow"], "Criteria is met but can be improved"], + ], + "The tag of the spec answer", + ] + + +class SpecResult(BaseModel): + spec: Spec + answer: SpecAnswer + + + +def load_specs_from_json(file_path: str) -> List[Spec]: + with open(file_path, "r") as file: + specs_data = json.load(file) + return [Spec(**spec) for spec in specs_data] + +tag_with_headers = {"green": "✅", "red": "❌", "yellow": "⚠️"} + +def build_container_for_answer(spec: Spec, spec_answer: SpecAnswer): + answer_items = [ + { + "type": "TextBlock", + "text": spec_answer.detailed_answer, + "wrap": True, + } + ] + if spec_answer.steps_to_improve != "None" and spec_answer.steps_to_improve: + answer_items.append( + { + "type": "TextBlock", + "text": "Steps to improve:", + "wrap": True, + "weight": "Bolder", + } + ) + answer_items.append( + { + "type": "TextBlock", + "text": spec_answer.steps_to_improve, + "wrap": True, + } + ) + return { + "type": "Container", + "items": [ + { + "type": "TextBlock", + "text": spec.spec, + "wrap": True, + "weight": "Bolder", + }, + { + "type": "ColumnSet", + "columns": [ + { + "type": "Column", + "width": "auto", + "items": [ + { + "type": "TextBlock", + "text": tag_with_headers[spec_answer.tag], + "wrap": True, + } + ], + }, + { + "type": "Column", + "width": "stretch", + "items": answer_items, + }, + ], + }, + ], + "separator": True, + } + + +def build_adaptive_card(spec_answer_containers: List[Dict]): + body = [ + { + "type": "TextBlock", + "text": "Threat model review", + "wrap": True, + "weight": "Bolder", + "style": "heading", + }, + *spec_answer_containers, + ] + return { + "type": "AdaptiveCard", + "$schema": "https://adaptivecards.io/schemas/adaptive-card.json", + "version": "1.5", + "body": body, + } diff --git a/src/no_autogen_spec_solver.py b/src/no_autogen_spec_solver.py new file mode 100644 index 0000000..51b599f --- /dev/null +++ b/src/no_autogen_spec_solver.py @@ -0,0 +1,129 @@ +import asyncio +import logging +import os +from typing import Dict, List, Optional + +import instructor +from conversation_state import ChatContext, ConversationState +from models import ( + Spec, + SpecAnswer, + SpecResult, + build_adaptive_card, + build_container_for_answer, + load_specs_from_json, +) +from openai import AsyncAzureOpenAI as AzureOpenAI +from openai import AsyncOpenAI as OpenAI +from svg_to_png.lib.ThreatModel import ThreatModel +from svg_to_png.svg_to_png import load_threat_model +from threat_model_file_utils import get_threat_model_xml_file + +_SYSTEM_MESSAGE = """ +You are an answering agent. +You will *NEVER* speculate or infer anything that is not in the details of the threat model provided to you. +The threat model indicates the flow of data in a bigger system. You do not have any context about the system, but you can answer questions regarding the data flow present in the threat model. +Answer the questions as clearly and concisely as possible +""" + + +class ThreatModelValidator: + def __init__(self, llm_config: Dict): + assert "model" in llm_config, "Model is required in the config" + self._model = llm_config.get("model") + if llm_config.get("api_type") is None: + assert "api_key" in llm_config, "Open AI API key is required" + logging.debug("Using OpenAI API key") + self.client = instructor.from_openai(OpenAI(api_key=llm_config.get("api_key"))) + elif llm_config.get("api_type") == "azure": + logging.debug("Using Azure API key") + self.client = instructor.from_openai(AzureOpenAI( + base_url=llm_config.get("base_url", ""), + api_version=llm_config.get("api_version", ""), + azure_ad_token_provider=llm_config.get("azure_ad_token_provider"), + )) + logging.basicConfig(level=logging.INFO) + + async def _get_threat_model_details(self, state: ConversationState) -> Optional[ThreatModel]: + threat_model_file_bytes = get_threat_model_xml_file(state) + if not threat_model_file_bytes: + return None + svg_str = threat_model_file_bytes.decode("utf-8") + return load_threat_model(svg_content=svg_str) + + async def _get_specs(self) -> List[Spec]: + folder = os.path.dirname(os.path.abspath(__file__)) + specs = load_specs_from_json(f"{folder}/specs.json") + return specs + + async def _resolve_single_spec(self, threat_model: ThreatModel, spec: Spec) -> SpecAnswer: + logging.debug("Resolving spec %s", spec.id) + threat_model_details = f"""The file details for the file you need to validate are: +-------- +1. The data for the nodes is: {threat_model.get_node_data()}. +-------- +2. The list of label names is {threat_model.get_label_names()}. +-------- +3. The list of nodes with labels between them is {threat_model.get_node_label_pair_data()}. +-------- +4. The list of boundary names is {threat_model.get_boundary_names()}.""" + + spec_question = f"""Now, based on the given details of the spec model, see if it fulfills this criteria +Spec id {spec.id} +{spec.instructions_to_solve} +""" + + result = await self.client.chat.completions.create( + model="gpt-4o-mini", + response_model=SpecAnswer, + messages=[ + { + "role": "system", + "content": _SYSTEM_MESSAGE, + }, + { + "role": "user", + "content": threat_model_details, + }, + {"role": "assistant", "content": spec_question}, + ], + ) + + logging.debug("Resolved spec answer %s", result) + + return result + + async def _resolve_threat_model(self, threat_model: ThreatModel, specs: List[Spec]) -> List[SpecResult]: + futures = [] + for spec in specs: + futures.append(self._resolve_single_spec(threat_model, spec)) + spec_answers = await asyncio.gather(*futures) + spec_results: List[SpecResult] = [] + for spec_answer in spec_answers: + spec_id = spec_answer.spec_id + spec_question = next(filter(lambda x: x.id == spec_id, specs)) + if not spec_question: + logging.warning("Spec with id %s not found", spec_id) + continue + spec_results.append(SpecResult(spec=spec_question, answer=spec_answer)) + spec_results = sorted(spec_results, key=lambda x: x.spec.id) + + return spec_results + + async def handle_request(self, request: ChatContext, state: ConversationState) -> Optional[Dict]: + threat_model_details = await self._get_threat_model_details(state) + if not threat_model_details: + # TODO: Handle + return None + + specs = await self._get_specs() + + results = await self._resolve_threat_model(threat_model_details, specs) + + logging.debug("Results %s", results) + containers = [ + build_container_for_answer(result.spec, result.answer) + for result in results + ] + card = build_adaptive_card(containers) + return card diff --git a/src/privacy_review_assistant_group.py b/src/privacy_review_assistant_group.py index f676ad7..6eb1a50 100644 --- a/src/privacy_review_assistant_group.py +++ b/src/privacy_review_assistant_group.py @@ -1,18 +1,21 @@ -from autogen import AssistantAgent, GroupChat, Agent +from typing import Optional +from autogen import Agent, AssistantAgent, GroupChat +from autogen_utils import StoppableAgentCapability +from conversation_state import ChatContext, ConversationState from rag_agents import setup_rag_assistant -from visualizer_agent import setup_visualizer_agent from threat_model_visualizer import ThreatModelImageVisualizerCapability -from typing import Optional +from visualizer_agent import setup_visualizer_agent from xml_threat_model_reviewer import ( setup_xml_threat_model_reviewer as setup_xml_threat_model_reviewer_single_prompt, ) from xml_threat_model_reviewer2 import ( setup_xml_threat_model_reviewer as setup_xml_threat_model_reviewer_multi_prompt, ) +from xml_threat_model_reviewer_no_autogen import ( + setup_xml_threat_model_reviewer as setup_xml_threat_model_reviewer_no_autogen, +) -from conversation_state import ConversationState, ChatContext -from autogen_utils import StoppableAgentCapability class PrivacyReviewAssistantGroup: def __init__(self, llm_config): @@ -31,6 +34,10 @@ def group_chat_builder( threat_modeling_assistant = setup_visualizer_agent( self.llm_config, context, state ) + elif threat_model_evaluator_type == "no_autogen": + threat_modeling_assistant = setup_xml_threat_model_reviewer_no_autogen( + self.llm_config, context, state + ) else: threat_modeling_assistant = setup_xml_threat_model_reviewer_multi_prompt( self.llm_config, context, state, typing_capability diff --git a/src/state.py b/src/state.py index 0b44404..81d1bfd 100644 --- a/src/state.py +++ b/src/state.py @@ -3,11 +3,11 @@ Licensed under the MIT License. """ -from typing import Optional, List, Dict, Union, Literal +from datetime import datetime +from typing import Dict, List, Literal, Optional, Union from botbuilder.core import Storage, TurnContext from teams.state import ConversationState, TempState, TurnState, UserState -from datetime import datetime class AppConversationState(ConversationState): @@ -17,7 +17,7 @@ class AppConversationState(ConversationState): started_waiting_for_user_input_at: Optional[Union[datetime, str]] = None spec_url: Optional[str] = None threat_model_evaluator: Union[ - Literal["visual"], Literal["xml_single_prompt"], Literal["xml_multi_prompt"] + Literal["visual"], Literal["xml_single_prompt"], Literal["xml_multi_prompt"], Literal["no_autogen"] ] activity_id: Optional[str] = None diff --git a/src/threat_model_evaluation_script.py b/src/threat_model_evaluation_script.py index 7563957..7f50b0f 100644 --- a/src/threat_model_evaluation_script.py +++ b/src/threat_model_evaluation_script.py @@ -1,12 +1,14 @@ import argparse import asyncio -from config import Config -from privacy_review_assistant_group import PrivacyReviewAssistantGroup -from conversation_state import LocalChatContext, LocalConversationState -from autogen import GroupChatManager, ConversableAgent -from typing import Dict, Optional, Union, Literal import base64 import io +from typing import Dict, Literal, Optional, Union + +from autogen import ConversableAgent, GroupChatManager +from config import Config +from conversation_state import LocalChatContext, LocalConversationState +from privacy_review_assistant_group import PrivacyReviewAssistantGroup + def data_uri_to_bytes(data_uri) -> bytes: # Strip the data URI prefix @@ -80,7 +82,7 @@ def always_terminate(self, messages, sender, config): parser.add_argument("output_result_text_file", help="Path to the output result text file") parser.add_argument( "evaluation_type", - choices=["xml_single_prompt", "xml_multi_prompt", "visual"], + choices=["xml_single_prompt", "xml_multi_prompt", "visual", "no_autogen"], default="xml_multi_prompt", help="Type of evaluation (default: xml_multi_prompt)", ) diff --git a/src/xml_threat_model_reviewer.py b/src/xml_threat_model_reviewer.py index ca5b1bf..1e75209 100644 --- a/src/xml_threat_model_reviewer.py +++ b/src/xml_threat_model_reviewer.py @@ -1,19 +1,17 @@ import os -from PIL import Image +from asyncio import ensure_future + from autogen.agentchat import AssistantAgent -from autogen.agentchat.contrib.multimodal_conversable_agent import ConversableAgent from autogen.agentchat.contrib.capabilities.agent_capability import AgentCapability from autogen.agentchat.contrib.img_utils import pil_to_data_uri - -from Spec import Spec, load_specs_from_json +from autogen.agentchat.contrib.multimodal_conversable_agent import ConversableAgent +from conversation_state import ChatContext, ConversationState +from models import load_specs_from_json +from PIL import Image from svg_to_png.svg_to_png import load_threat_model -from asyncio import ensure_future - from threat_model_file_utils import get_threat_model_xml_file from threat_model_visualizer import ThreatModelImageVisualizer -from conversation_state import ConversationState, ChatContext - class ThreatModelDataExtractor(ThreatModelImageVisualizer): def __init__(self, context: ChatContext, state: ConversationState): diff --git a/src/xml_threat_model_reviewer2.py b/src/xml_threat_model_reviewer2.py index 5b6f0de..79c7fae 100644 --- a/src/xml_threat_model_reviewer2.py +++ b/src/xml_threat_model_reviewer2.py @@ -1,14 +1,18 @@ -import asyncio -import os import json -from typing import List, Annotated, Dict, Tuple, Union, Literal -from autogen import AssistantAgent, ConversableAgent, Agent -from autogen.agentchat.contrib.capabilities.agent_capability import AgentCapability -from autogen_utils import ImmediateExecutorCapability, TypingCapability -from pydantic import BaseModel +import os +from typing import Annotated, Dict, List, Literal, Tuple, Union +from autogen import Agent, AssistantAgent, ConversableAgent +from autogen.agentchat.contrib.capabilities.agent_capability import AgentCapability +from autogen_utils import ImmediateExecutorCapability +from models import ( + Spec, + SpecAnswer, + build_adaptive_card, + build_container_for_answer, + load_specs_from_json, +) from xml_threat_model_reviewer import XMLThreatModelImageAddToMessageCapability -from Spec import Spec, load_specs_from_json folder = os.path.dirname(os.path.abspath(__file__)) specs = load_specs_from_json(f"{folder}/specs.json") @@ -18,29 +22,6 @@ def build_instruction(spec: Spec): return f"""Now, based on the given details of the spec model, see if it fulfills this criteria\nSpec id {spec.id}\n{spec.instructions_to_solve} """ - -tag_with_headers = {"green": "✅", "red": "❌", "yellow": "⚠️"} - - -class SpecAnswer(BaseModel): - spec_id: Annotated[int, "The spec id to answer"] - detailed_answer: Annotated[ - str, - "Does the threat model meet the spec criteria? Why or why not? Be helpful and specific.", - ] - steps_to_improve: Annotated[ - str, - "What are exact steps to improve the threat model to meet the spec criteria? Use 'None' if no steps to improve", - ] - tag: Annotated[ - Union[ - Annotated[Literal["green"], "Spec criteria is met"], - Annotated[Literal["red"], "Items needs to be fixed to meet criteria"], - Annotated[Literal["yellow"], "Criteria is met but can be improved"], - ], - "The tag of the spec answer", - ] - class EvaluateSpecCapability(AgentCapability): def __init__(self, specs: List[Spec]): self.specs = specs @@ -58,8 +39,6 @@ def new_term_msg(x): agent._is_termination_msg = new_term_msg async def _send_question(self, self2, messages, sender, config): - # wait for 2 seconds - await asyncio.sleep(2) if self.spec_index < len(self.specs): print(f"Sending question for spec {self.specs[self.spec_index].id}") message = f"{build_instruction(self.specs[self.spec_index])}" @@ -198,80 +177,3 @@ def set_default(obj): return assistant - -def build_container_for_answer(spec: Spec, spec_answer: SpecAnswer): - answer_items = [ - { - "type": "TextBlock", - "text": spec_answer.detailed_answer, - "wrap": True, - } - ] - if spec_answer.steps_to_improve != "None" and spec_answer.steps_to_improve: - answer_items.append( - { - "type": "TextBlock", - "text": "Steps to improve:", - "wrap": True, - "weight": "Bolder", - } - ) - answer_items.append( - { - "type": "TextBlock", - "text": spec_answer.steps_to_improve, - "wrap": True, - } - ) - return { - "type": "Container", - "items": [ - { - "type": "TextBlock", - "text": spec.spec, - "wrap": True, - "weight": "Bolder", - }, - { - "type": "ColumnSet", - "columns": [ - { - "type": "Column", - "width": "auto", - "items": [ - { - "type": "TextBlock", - "text": tag_with_headers[spec_answer.tag], - "wrap": True, - } - ], - }, - { - "type": "Column", - "width": "stretch", - "items": answer_items, - }, - ], - }, - ], - "separator": True, - } - - -def build_adaptive_card(spec_answer_containers: List[Dict]): - body = [ - { - "type": "TextBlock", - "text": "Threat model review", - "wrap": True, - "weight": "Bolder", - "style": "heading", - }, - *spec_answer_containers, - ] - return { - "type": "AdaptiveCard", - "$schema": "https://adaptivecards.io/schemas/adaptive-card.json", - "version": "1.5", - "body": body, - } diff --git a/src/xml_threat_model_reviewer_no_autogen.py b/src/xml_threat_model_reviewer_no_autogen.py new file mode 100644 index 0000000..ce36f7a --- /dev/null +++ b/src/xml_threat_model_reviewer_no_autogen.py @@ -0,0 +1,73 @@ +import json +import os +from typing import Dict + +from autogen import Agent, AssistantAgent, ConversableAgent +from autogen.agentchat.contrib.capabilities.agent_capability import AgentCapability +from conversation_state import ChatContext, ConversationState +from models import ( + Spec, + load_specs_from_json, +) +from no_autogen_spec_solver import ThreatModelValidator + +folder = os.path.dirname(os.path.abspath(__file__)) +specs = load_specs_from_json(f"{folder}/specs.json") + + +def build_instruction(spec: Spec): + return f"""Now, based on the given details of the spec model, see if it fulfills this criteria\nSpec id {spec.id}\n{spec.instructions_to_solve} +""" + + +class ValidateSpecsWithoutAutogenCapability(AgentCapability): + def __init__( + self, llm_config: Dict, context: ChatContext, state: ConversationState + ): + self.context = context + self.state = state + self.validator = ThreatModelValidator(llm_config) + super().__init__() + + def add_to_agent(self, agent: ConversableAgent): + agent.register_reply( + [Agent, None], self._validate, remove_other_reply_funcs=True + ) + + async def _validate(self, self2, messages, sender, config): + result = await self.validator.handle_request(self.context, self.state) + if result is None: + return [True, "Unable to validate threat model"] + + def set_default(obj): + if isinstance(obj, set): + return list(obj) + raise TypeError + + return [ + True, + f"adaptive_card:{json.dumps(result, ensure_ascii=False, default=set_default)}", + ] + + +class ClearHistoryCapability(AgentCapability): + def __init__(self): + super().__init__() + + def add_to_agent(self, agent: ConversableAgent): + agent.register_hook("process_all_messages_before_reply", self._clear_history) + + def _clear_history(self, messages): + return [messages[-1]] + + +def setup_xml_threat_model_reviewer(llm_config, context, state): + assistant = AssistantAgent( + name="Threat_Model_Evaluator", + description="An agent that evaluates the quality of a threat model.", + ) + + capability = ValidateSpecsWithoutAutogenCapability(llm_config, context, state) + capability.add_to_agent(assistant) + + return assistant