diff --git a/agentic_security/probe_actor/operator.py b/agentic_security/probe_actor/operator.py index 73440cf..20c82e2 100644 --- a/agentic_security/probe_actor/operator.py +++ b/agentic_security/probe_actor/operator.py @@ -1,10 +1,9 @@ import asyncio -from typing import Any, Optional, Dict - +from typing import Any, Optional, Dict, List from pydantic import BaseModel, Field from pydantic_ai import Agent, RunContext -import re import httpx +from httpx import LLMSpec import logging # Configure logging @@ -12,17 +11,13 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AgentSpecification(BaseModel): - name: str | None = Field(None, description="Name of the LLM/agent") - version: str | None = Field(None, description="Version of the LLM/agent") - description: str | None = Field(None, description="Description of the LLM/agent") - capabilities: list[str] | None = Field(None, description="List of capabilities") - configuration: dict[str, Any] | None = Field( - None, description="Configuration settings" - ) + name: Optional[str] = Field(None, description="Name of the LLM/agent") + version: Optional[str] = Field(None, description="Version of the LLM/agent") + description: Optional[str] = Field(None, description="Description of the LLM/agent") + capabilities: Optional[List[str]] = Field(None, description="List of capabilities") + configuration: Optional[Dict[str, Any]] = Field(None, description="Configuration settings") endpoint: Optional[str] = Field(None, description="Endpoint URL of the deployed agent") - -# Define the OperatorToolBox class class OperatorToolBox: def __init__(self, spec: AgentSpecification, datasets: list[dict[str, Any]]): self.spec = spec @@ -36,7 +31,6 @@ class OperatorToolBox: return self.datasets def validate(self) -> bool: - # Validate the tool box based on the specification if not self.spec.name or not self.spec.version: self.failures.append("Invalid specification: Name or version is missing.") return False @@ -46,73 +40,67 @@ class OperatorToolBox: return True def stop(self) -> None: - # Stop the tool box - print("Stopping the toolbox...") + logger.info("Stopping the toolbox...") def run(self) -> None: - # Run the tool box - print("Running the toolbox...") + logger.info("Running the toolbox...") - def get_results(self) -> list[dict[str, Any]]: - # Get the results + def get_results(self) -> List[Dict[str, Any]]: return self.datasets - def get_failures(self) -> list[str]: - # Handle failure + def get_failures(self) -> List[str]: return self.failures def run_operation(self, operation: str) -> str: - # Run an operation based on the specification if operation not in ["dataset1", "dataset2", "dataset3"]: self.failures.append(f"Operation '{operation}' failed: Dataset not found.") return f"Operation '{operation}' failed: Dataset not found." return f"Operation '{operation}' executed successfully." async def test(self, description: str, sample_test: Dict[str, Any]) -> str: - """Test the agent based on the description and sample test.""" - match = re.search(r"Test my (.+) agent deployed at (.+)", description) - if match: - agent_type = match.group(1) - endpoint = match.group(2) - self.spec.endpoint = endpoint + agent = Agent( + 'openai:gpt-4o', + result_type=LLMSpec, + system_prompt='Extract the LLM specification from the input', + ) - # Verify access to the endpoint - async with httpx.AsyncClient() as client: + async with agent.run_stream(description) as result: + async for spec in result.stream(): + self.spec.endpoint = spec.url + + # Verify access to the endpoint + async with httpx.AsyncClient() as client: + try: + access_response = await client.get(spec.url) + access_response.raise_for_status() + except httpx.HTTPStatusError as e: + self.failures.append(f"HTTP error occurred: {e}") + logger.error(f"Access verification failed: {e}") + return f"Access verification failed: {e}" + except Exception as e: + self.failures.append(f"An error occurred: {e}") + logger.error(f"Access verification failed: {e}") + return f"Access verification failed: {e}" + + # Run the sample test try: - access_response = await client.get(endpoint) - access_response.raise_for_status() + test_response = await client.post(f"{spec.url}/test", json=sample_test) + test_response.raise_for_status() + response_data = test_response.json() + if "choices" in response_data and len(response_data["choices"]) > 0: + return f"Testing agent at {spec.url} succeeded: {response_data}" + else: + self.failures.append("Invalid response format") + logger.error("Sample test failed: Invalid response format") + return "Sample test failed: Invalid response format" except httpx.HTTPStatusError as e: self.failures.append(f"HTTP error occurred: {e}") - logger.error(f"Access verification failed: {e}") - return f"Access verification failed: {e}" + logger.error(f"Sample test failed: {e}") + return f"Sample test failed: {e}" except Exception as e: self.failures.append(f"An error occurred: {e}") - logger.error(f"Access verification failed: {e}") - return f"Access verification failed: {e}" - - # Run the sample test - try: - test_response = await client.post(f"{endpoint}/test", json=sample_test) - test_response.raise_for_status() - response_data = test_response.json() - # Validate the response (this is a simple example, adjust as needed) - if "choices" in response_data and len(response_data["choices"]) > 0: - return f"Testing {agent_type} agent at {endpoint} succeeded: {response_data}" - else: - self.failures.append("Invalid response format") - logger.error("Sample test failed: Invalid response format") - return "Sample test failed: Invalid response format" - except httpx.HTTPStatusError as e: - self.failures.append(f"HTTP error occurred: {e}") - logger.error(f"Sample test failed: {e}") - return f"Sample test failed: {e}" - except Exception as e: - self.failures.append(f"An error occurred: {e}") - logger.error(f"Sample test failed: {e}") - return f"Sample test failed: {e}" - else: - logger.error("Invalid description format.") - return "Invalid description format." + logger.error(f"Sample test failed: {e}") + return f"Sample test failed: {e}" # Initialize OperatorToolBox with AgentSpecification spec = AgentSpecification( @@ -123,41 +111,31 @@ spec = AgentSpecification( configuration={"max_tokens": 100}, ) -# dataset_manager_agent.py - - -# Initialize OperatorToolBox toolbox = OperatorToolBox(spec=spec, datasets=["dataset1", "dataset2", "dataset3"]) # Define the agent with OperatorToolBox as its dependency dataset_manager_agent = Agent( model="gpt-4", deps_type=OperatorToolBox, - result_type=str, # The agent will return string results + result_type=str, system_prompt="You can validate the toolbox, run operations, and retrieve results or failures.", ) - @dataset_manager_agent.tool async def validate_toolbox(ctx: RunContext[OperatorToolBox]) -> str: - """Validate the OperatorToolBox.""" is_valid = ctx.deps.validate() if is_valid: return "ToolBox validation successful." else: return "ToolBox validation failed." - @dataset_manager_agent.tool async def execute_operation(ctx: RunContext[OperatorToolBox], operation: str) -> str: - """Execute an operation on a dataset.""" result = ctx.deps.run_operation(operation) return result - @dataset_manager_agent.tool async def retrieve_results(ctx: RunContext[OperatorToolBox]) -> str: - """Retrieve the results of operations.""" results = ctx.deps.get_results() if results: formatted_results = "\n".join([f"{op}: {res}" for op, res in results.items()]) @@ -165,10 +143,8 @@ async def retrieve_results(ctx: RunContext[OperatorToolBox]) -> str: else: return "No operations have been executed yet." - @dataset_manager_agent.tool async def retrieve_failures(ctx: RunContext[OperatorToolBox]) -> str: - """Retrieve the list of failures.""" failures = ctx.deps.get_failures() if failures: formatted_failures = "\n".join(failures) @@ -176,6 +152,10 @@ async def retrieve_failures(ctx: RunContext[OperatorToolBox]) -> str: else: return "No failures recorded." +@dataset_manager_agent.tool +async def test_agent(ctx: RunContext[OperatorToolBox], description: str, sample_test: Dict[str, Any]) -> str: + result = await ctx.deps.test(description, sample_test) + return result # Synchronous run example def run_dataset_manager_agent_sync(): @@ -225,10 +205,9 @@ async def run_dataset_manager_agent_async(): print(f"Prompt: {prompt}") print(f"Response: {result.data}\n") - if __name__ == "__main__": # Run synchronous example run_dataset_manager_agent_sync() # Run asynchronous example - asyncio.run(run_dataset_manager_agent_async()) + asyncio.run(run_dataset_manager_agent_async()) \ No newline at end of file