From c79172b4df9176cbfa7155bfdeac655c5f69520f Mon Sep 17 00:00:00 2001 From: Alexander Myasoedov Date: Sat, 22 Feb 2025 12:13:31 +0200 Subject: [PATCH] feat(add pydantic_ai): --- agentic_security/agents/operator_pydantic.py | 238 +++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 agentic_security/agents/operator_pydantic.py diff --git a/agentic_security/agents/operator_pydantic.py b/agentic_security/agents/operator_pydantic.py new file mode 100644 index 0000000..9af09e6 --- /dev/null +++ b/agentic_security/agents/operator_pydantic.py @@ -0,0 +1,238 @@ +import asyncio +import logging +from typing import Any, List, Dict + +import httpx +from pydantic import BaseModel, Field, ConfigDict +from pydantic_ai import Agent, Tool, RunContext + +# Assuming LLMSpec is defined elsewhere; placeholder import +from agentic_security.http_spec import LLMSpec + +LLM_SPECS = [] # Populate this list with LLM spec strings if needed + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# Define AgentSpecification model +class AgentSpecification(BaseModel): + name: str | None = Field(None, description="Name of the LLM/agent") + version: str | None = Field(None, description="Version of the LLM/agent") + description: str | None = Field(None, description="Description of the LLM/agent") + capabilities: List[str] | None = Field(None, description="List of capabilities") + configuration: Dict[str, Any] | None = Field( + None, description="Configuration settings" + ) + endpoint: str | None = Field(None, description="Endpoint URL of the deployed agent") + + model_config = ConfigDict(arbitrary_types_allowed=True) + + +# Define OperatorToolBox class +class OperatorToolBox: + def __init__(self, spec: AgentSpecification, datasets: List[Dict[str, Any]]): + self.spec = spec + self.datasets = datasets + self.failures = [] + self.llm_specs = [LLMSpec.from_string(spec) for spec in LLM_SPECS] + + def get_spec(self) -> AgentSpecification: + return self.spec + + def get_datasets(self) -> List[Dict[str, Any]]: + return self.datasets + + def validate(self) -> bool: + if not self.spec.name or not self.spec.version: + self.failures.append("Invalid specification: Name or version is missing.") + return False + if not self.datasets: + self.failures.append("No datasets provided.") + return False + return True + + def stop(self) -> None: + logger.info("Stopping the toolbox...") + + def run(self) -> None: + logger.info("Running the toolbox...") + + def get_results(self) -> List[Dict[str, Any]]: + return self.datasets + + def get_failures(self) -> List[str]: + return self.failures + + def run_operation(self, operation: str) -> str: + if operation not in ["dataset1", "dataset2", "dataset3"]: + self.failures.append(f"Operation '{operation}' failed: Dataset not found.") + return f"Operation '{operation}' failed: Dataset not found." + return f"Operation '{operation}' executed successfully." + + async def test_llm_spec(self, llm_spec: LLMSpec, user_prompt: str) -> str: + try: + response = await llm_spec.verify() + response.raise_for_status() + logger.info(f"Verification succeeded for {llm_spec.url}") + + test_response = await llm_spec.probe(user_prompt) + test_response.raise_for_status() + response_data = test_response.json() + return f"Test succeeded for {llm_spec.url}: {response_data}" + except httpx.HTTPStatusError as e: + self.failures.append(f"HTTP error occurred: {e}") + logger.error(f"Test failed for {llm_spec.url}: {e}") + return f"Test failed for {llm_spec.url}: {e}" + except Exception as e: + self.failures.append(f"An error occurred: {e}") + logger.error(f"Test failed for {llm_spec.url}: {e}") + return f"Test failed for {llm_spec.url}: {e}" + + async def test_with_prompt(self, spec_index: int, user_prompt: str) -> str: + if not 0 <= spec_index < len(self.llm_specs): + return f"Invalid spec index: {spec_index}. Valid range is 0 to {len(self.llm_specs) - 1}" + llm_spec = self.llm_specs[spec_index] + return await self.test_llm_spec(llm_spec, user_prompt) + + +# Define the Agent +class DatasetManagerAgent(Agent): + model: str = "deepseek:chat" + system_prompt: str = ( + "You are an AI agent managing an OperatorToolBox. You can validate the toolbox, run operations, " + "retrieve results or failures, list LLM specs, and test LLM specs with user prompts. " + "Use the provided tools to assist the user based on their request." + ) + + def __init__(self, toolbox: OperatorToolBox, **kwargs): + super().__init__(**kwargs) + self.toolbox = toolbox + + # Define async tools within __init__ + async def validate_toolbox(ctx: RunContext[Any]) -> str: + is_valid = self.toolbox.validate() + return ( + "ToolBox validation successful." + if is_valid + else "ToolBox validation failed." + ) + + async def execute_operation(ctx: RunContext[Any], operation: str) -> str: + return self.toolbox.run_operation(operation) + + async def retrieve_results(ctx: RunContext[Any]) -> str: + results = self.toolbox.get_results() + return ( + f"Operation Results:\n{results}" + if results + else "No operations have been executed yet." + ) + + async def retrieve_failures(ctx: RunContext[Any]) -> str: + failures = self.toolbox.get_failures() + return f"Failures:\n{failures}" if failures else "No failures recorded." + + async def list_llm_specs(ctx: RunContext[Any]) -> str: + spec_list = "\n".join( + f"{i}: {spec.url}" for i, spec in enumerate(self.toolbox.llm_specs) + ) + return f"Available LLM Specs:\n{spec_list}" + + async def test_llm_with_prompt( + ctx: RunContext[Any], spec_index: int, user_prompt: str + ) -> str: + return await self.toolbox.test_with_prompt(spec_index, user_prompt) + + # Register tools + self.tools = [ + Tool( + name="validate_toolbox", + description="Validate the toolbox configuration.", + function=validate_toolbox, + ), + Tool( + name="execute_operation", + description="Execute a dataset operation.", + function=execute_operation, + ), + Tool( + name="retrieve_results", + description="Retrieve the results of operations.", + function=retrieve_results, + ), + Tool( + name="retrieve_failures", + description="Retrieve recorded failures.", + function=retrieve_failures, + ), + Tool( + name="list_llm_specs", + description="List available LLM specifications.", + function=list_llm_specs, + ), + Tool( + name="test_llm_with_prompt", + description="Test an LLM spec with a user prompt.", + function=test_llm_with_prompt, + ), + ] + + +# Setup and run example +async def run_dataset_manager_agent_async(): + # Initialize OperatorToolBox with AgentSpecification + spec = AgentSpecification( + name="DeepSeek Chat", + version="1.0", + description="A powerful language model", + capabilities=["text-generation", "question-answering"], + configuration={"max_tokens": 100}, + ) + toolbox = OperatorToolBox( + spec=spec, datasets=[{"id": "dataset1"}, {"id": "dataset2"}, {"id": "dataset3"}] + ) + + # Create the agent + agent = DatasetManagerAgent(toolbox=toolbox) + + # Example prompts + prompts = [ + "Validate the toolbox.", + "List available LLM specs.", + "I want to test an LLM with my prompt: 'Tell me a short story about a robot'. Which spec index should I use?", + ] + + for prompt in prompts: + result = await agent.run(prompt) + print(f"Prompt: {prompt}") + print(f"Response: {result}\n") + + # Handle testing request + if "test an LLM with my prompt" in prompt: + print( + "Please select a spec index from the list above and confirm to proceed." + ) + # Simulate user input (replace with real input in practice) + user_input = ( + input("Enter spec index and 'yes' to confirm (e.g., '0 yes'): ") + .strip() + .split() + ) + if len(user_input) == 2 and user_input[1].lower() == "yes": + try: + spec_index = int(user_input[0]) + user_prompt = prompt.split("my prompt: ")[1].strip("'") + test_result = await agent.run( + f"Test LLM at index {spec_index} with prompt: {user_prompt}" + ) + print(f"Test Response: {test_result}\n") + except ValueError: + print("Invalid spec index provided.\n") + else: + print("Test canceled. Please provide a valid index and confirmation.\n") + + +if __name__ == "__main__": + asyncio.run(run_dataset_manager_agent_async())