diff --git a/agentic_security/probe_actor/operator.py b/agentic_security/probe_actor/operator.py index c7c0ca8..73440cf 100644 --- a/agentic_security/probe_actor/operator.py +++ b/agentic_security/probe_actor/operator.py @@ -1,9 +1,15 @@ import asyncio -from typing import Any +from typing import Any, Optional, Dict from pydantic import BaseModel, Field from pydantic_ai import Agent, RunContext +import re +import httpx +import logging +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) class AgentSpecification(BaseModel): name: str | None = Field(None, description="Name of the LLM/agent") @@ -13,6 +19,7 @@ class AgentSpecification(BaseModel): configuration: dict[str, Any] | None = Field( None, description="Configuration settings" ) + endpoint: Optional[str] = Field(None, description="Endpoint URL of the deployed agent") # Define the OperatorToolBox class @@ -61,6 +68,51 @@ class OperatorToolBox: return f"Operation '{operation}' failed: Dataset not found." return f"Operation '{operation}' executed successfully." + async def test(self, description: str, sample_test: Dict[str, Any]) -> str: + """Test the agent based on the description and sample test.""" + match = re.search(r"Test my (.+) agent deployed at (.+)", description) + if match: + agent_type = match.group(1) + endpoint = match.group(2) + self.spec.endpoint = endpoint + + # Verify access to the endpoint + async with httpx.AsyncClient() as client: + try: + access_response = await client.get(endpoint) + access_response.raise_for_status() + except httpx.HTTPStatusError as e: + self.failures.append(f"HTTP error occurred: {e}") + logger.error(f"Access verification failed: {e}") + return f"Access verification failed: {e}" + except Exception as e: + self.failures.append(f"An error occurred: {e}") + logger.error(f"Access verification failed: {e}") + return f"Access verification failed: {e}" + + # Run the sample test + try: + test_response = await client.post(f"{endpoint}/test", json=sample_test) + test_response.raise_for_status() + response_data = test_response.json() + # Validate the response (this is a simple example, adjust as needed) + if "choices" in response_data and len(response_data["choices"]) > 0: + return f"Testing {agent_type} agent at {endpoint} succeeded: {response_data}" + else: + self.failures.append("Invalid response format") + logger.error("Sample test failed: Invalid response format") + return "Sample test failed: Invalid response format" + except httpx.HTTPStatusError as e: + self.failures.append(f"HTTP error occurred: {e}") + logger.error(f"Sample test failed: {e}") + return f"Sample test failed: {e}" + except Exception as e: + self.failures.append(f"An error occurred: {e}") + logger.error(f"Sample test failed: {e}") + return f"Sample test failed: {e}" + else: + logger.error("Invalid description format.") + return "Invalid description format." # Initialize OperatorToolBox with AgentSpecification spec = AgentSpecification( @@ -133,14 +185,22 @@ def run_dataset_manager_agent_sync(): "Execute operation on 'dataset4'.", # This should fail "Retrieve the results.", "Retrieve any failures.", + "Test my openAI compatible agent deployed at localhost:3000" ] + sample_test = { + "prompt": "Hello, how are you?", + "max_tokens": 5 + } + for prompt in prompts: - result = dataset_manager_agent.run_sync(prompt, deps=toolbox) + if "Test my" in prompt: + result = dataset_manager_agent.run_sync(prompt, deps=toolbox, sample_test=sample_test) + else: + result = dataset_manager_agent.run_sync(prompt, deps=toolbox) print(f"Prompt: {prompt}") print(f"Response: {result.data}\n") - # Asynchronous run example async def run_dataset_manager_agent_async(): prompts = [ @@ -149,10 +209,19 @@ async def run_dataset_manager_agent_async(): "Execute operation on 'dataset4'.", # This should fail "Retrieve the results.", "Retrieve any failures.", + "Test my openAI compatible agent deployed at localhost:3000" ] + sample_test = { + "prompt": "Hello, how are you?", + "max_tokens": 5 + } + for prompt in prompts: - result = await dataset_manager_agent.run(prompt, deps=toolbox) + if "Test my" in prompt: + result = await dataset_manager_agent.run(prompt, deps=toolbox, sample_test=sample_test) + else: + result = await dataset_manager_agent.run(prompt, deps=toolbox) print(f"Prompt: {prompt}") print(f"Response: {result.data}\n")