mirror of
https://github.com/msoedov/agentic_security.git
synced 2026-06-24 06:09:55 +02:00
feat(operator): add agent testing functionality with endpoint verification
This commit is contained in:
@@ -1,9 +1,15 @@
|
||||
import asyncio
|
||||
from typing import Any
|
||||
from typing import Any, Optional, Dict
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic_ai import Agent, RunContext
|
||||
import re
|
||||
import httpx
|
||||
import logging
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class AgentSpecification(BaseModel):
|
||||
name: str | None = Field(None, description="Name of the LLM/agent")
|
||||
@@ -13,6 +19,7 @@ class AgentSpecification(BaseModel):
|
||||
configuration: dict[str, Any] | None = Field(
|
||||
None, description="Configuration settings"
|
||||
)
|
||||
endpoint: Optional[str] = Field(None, description="Endpoint URL of the deployed agent")
|
||||
|
||||
|
||||
# Define the OperatorToolBox class
|
||||
@@ -61,6 +68,51 @@ class OperatorToolBox:
|
||||
return f"Operation '{operation}' failed: Dataset not found."
|
||||
return f"Operation '{operation}' executed successfully."
|
||||
|
||||
async def test(self, description: str, sample_test: Dict[str, Any]) -> str:
|
||||
"""Test the agent based on the description and sample test."""
|
||||
match = re.search(r"Test my (.+) agent deployed at (.+)", description)
|
||||
if match:
|
||||
agent_type = match.group(1)
|
||||
endpoint = match.group(2)
|
||||
self.spec.endpoint = endpoint
|
||||
|
||||
# Verify access to the endpoint
|
||||
async with httpx.AsyncClient() as client:
|
||||
try:
|
||||
access_response = await client.get(endpoint)
|
||||
access_response.raise_for_status()
|
||||
except httpx.HTTPStatusError as e:
|
||||
self.failures.append(f"HTTP error occurred: {e}")
|
||||
logger.error(f"Access verification failed: {e}")
|
||||
return f"Access verification failed: {e}"
|
||||
except Exception as e:
|
||||
self.failures.append(f"An error occurred: {e}")
|
||||
logger.error(f"Access verification failed: {e}")
|
||||
return f"Access verification failed: {e}"
|
||||
|
||||
# Run the sample test
|
||||
try:
|
||||
test_response = await client.post(f"{endpoint}/test", json=sample_test)
|
||||
test_response.raise_for_status()
|
||||
response_data = test_response.json()
|
||||
# Validate the response (this is a simple example, adjust as needed)
|
||||
if "choices" in response_data and len(response_data["choices"]) > 0:
|
||||
return f"Testing {agent_type} agent at {endpoint} succeeded: {response_data}"
|
||||
else:
|
||||
self.failures.append("Invalid response format")
|
||||
logger.error("Sample test failed: Invalid response format")
|
||||
return "Sample test failed: Invalid response format"
|
||||
except httpx.HTTPStatusError as e:
|
||||
self.failures.append(f"HTTP error occurred: {e}")
|
||||
logger.error(f"Sample test failed: {e}")
|
||||
return f"Sample test failed: {e}"
|
||||
except Exception as e:
|
||||
self.failures.append(f"An error occurred: {e}")
|
||||
logger.error(f"Sample test failed: {e}")
|
||||
return f"Sample test failed: {e}"
|
||||
else:
|
||||
logger.error("Invalid description format.")
|
||||
return "Invalid description format."
|
||||
|
||||
# Initialize OperatorToolBox with AgentSpecification
|
||||
spec = AgentSpecification(
|
||||
@@ -133,14 +185,22 @@ def run_dataset_manager_agent_sync():
|
||||
"Execute operation on 'dataset4'.", # This should fail
|
||||
"Retrieve the results.",
|
||||
"Retrieve any failures.",
|
||||
"Test my openAI compatible agent deployed at localhost:3000"
|
||||
]
|
||||
|
||||
sample_test = {
|
||||
"prompt": "Hello, how are you?",
|
||||
"max_tokens": 5
|
||||
}
|
||||
|
||||
for prompt in prompts:
|
||||
result = dataset_manager_agent.run_sync(prompt, deps=toolbox)
|
||||
if "Test my" in prompt:
|
||||
result = dataset_manager_agent.run_sync(prompt, deps=toolbox, sample_test=sample_test)
|
||||
else:
|
||||
result = dataset_manager_agent.run_sync(prompt, deps=toolbox)
|
||||
print(f"Prompt: {prompt}")
|
||||
print(f"Response: {result.data}\n")
|
||||
|
||||
|
||||
# Asynchronous run example
|
||||
async def run_dataset_manager_agent_async():
|
||||
prompts = [
|
||||
@@ -149,10 +209,19 @@ async def run_dataset_manager_agent_async():
|
||||
"Execute operation on 'dataset4'.", # This should fail
|
||||
"Retrieve the results.",
|
||||
"Retrieve any failures.",
|
||||
"Test my openAI compatible agent deployed at localhost:3000"
|
||||
]
|
||||
|
||||
sample_test = {
|
||||
"prompt": "Hello, how are you?",
|
||||
"max_tokens": 5
|
||||
}
|
||||
|
||||
for prompt in prompts:
|
||||
result = await dataset_manager_agent.run(prompt, deps=toolbox)
|
||||
if "Test my" in prompt:
|
||||
result = await dataset_manager_agent.run(prompt, deps=toolbox, sample_test=sample_test)
|
||||
else:
|
||||
result = await dataset_manager_agent.run(prompt, deps=toolbox)
|
||||
print(f"Prompt: {prompt}")
|
||||
print(f"Response: {result.data}\n")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user