mirror of
https://github.com/msoedov/agentic_security.git
synced 2026-06-24 06:09:55 +02:00
feat: enhance AgentSpecification and OperatorToolBox with optional typing and improved logging
This commit is contained in:
@@ -1,10 +1,9 @@
|
||||
import asyncio
|
||||
from typing import Any, Optional, Dict
|
||||
|
||||
from typing import Any, Optional, Dict, List
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic_ai import Agent, RunContext
|
||||
import re
|
||||
import httpx
|
||||
from httpx import LLMSpec
|
||||
import logging
|
||||
|
||||
# Configure logging
|
||||
@@ -12,17 +11,13 @@ logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class AgentSpecification(BaseModel):
|
||||
name: str | None = Field(None, description="Name of the LLM/agent")
|
||||
version: str | None = Field(None, description="Version of the LLM/agent")
|
||||
description: str | None = Field(None, description="Description of the LLM/agent")
|
||||
capabilities: list[str] | None = Field(None, description="List of capabilities")
|
||||
configuration: dict[str, Any] | None = Field(
|
||||
None, description="Configuration settings"
|
||||
)
|
||||
name: Optional[str] = Field(None, description="Name of the LLM/agent")
|
||||
version: Optional[str] = Field(None, description="Version of the LLM/agent")
|
||||
description: Optional[str] = Field(None, description="Description of the LLM/agent")
|
||||
capabilities: Optional[List[str]] = Field(None, description="List of capabilities")
|
||||
configuration: Optional[Dict[str, Any]] = Field(None, description="Configuration settings")
|
||||
endpoint: Optional[str] = Field(None, description="Endpoint URL of the deployed agent")
|
||||
|
||||
|
||||
# Define the OperatorToolBox class
|
||||
class OperatorToolBox:
|
||||
def __init__(self, spec: AgentSpecification, datasets: list[dict[str, Any]]):
|
||||
self.spec = spec
|
||||
@@ -36,7 +31,6 @@ class OperatorToolBox:
|
||||
return self.datasets
|
||||
|
||||
def validate(self) -> bool:
|
||||
# Validate the tool box based on the specification
|
||||
if not self.spec.name or not self.spec.version:
|
||||
self.failures.append("Invalid specification: Name or version is missing.")
|
||||
return False
|
||||
@@ -46,73 +40,67 @@ class OperatorToolBox:
|
||||
return True
|
||||
|
||||
def stop(self) -> None:
|
||||
# Stop the tool box
|
||||
print("Stopping the toolbox...")
|
||||
logger.info("Stopping the toolbox...")
|
||||
|
||||
def run(self) -> None:
|
||||
# Run the tool box
|
||||
print("Running the toolbox...")
|
||||
logger.info("Running the toolbox...")
|
||||
|
||||
def get_results(self) -> list[dict[str, Any]]:
|
||||
# Get the results
|
||||
def get_results(self) -> List[Dict[str, Any]]:
|
||||
return self.datasets
|
||||
|
||||
def get_failures(self) -> list[str]:
|
||||
# Handle failure
|
||||
def get_failures(self) -> List[str]:
|
||||
return self.failures
|
||||
|
||||
def run_operation(self, operation: str) -> str:
|
||||
# Run an operation based on the specification
|
||||
if operation not in ["dataset1", "dataset2", "dataset3"]:
|
||||
self.failures.append(f"Operation '{operation}' failed: Dataset not found.")
|
||||
return f"Operation '{operation}' failed: Dataset not found."
|
||||
return f"Operation '{operation}' executed successfully."
|
||||
|
||||
async def test(self, description: str, sample_test: Dict[str, Any]) -> str:
|
||||
"""Test the agent based on the description and sample test."""
|
||||
match = re.search(r"Test my (.+) agent deployed at (.+)", description)
|
||||
if match:
|
||||
agent_type = match.group(1)
|
||||
endpoint = match.group(2)
|
||||
self.spec.endpoint = endpoint
|
||||
agent = Agent(
|
||||
'openai:gpt-4o',
|
||||
result_type=LLMSpec,
|
||||
system_prompt='Extract the LLM specification from the input',
|
||||
)
|
||||
|
||||
# Verify access to the endpoint
|
||||
async with httpx.AsyncClient() as client:
|
||||
async with agent.run_stream(description) as result:
|
||||
async for spec in result.stream():
|
||||
self.spec.endpoint = spec.url
|
||||
|
||||
# Verify access to the endpoint
|
||||
async with httpx.AsyncClient() as client:
|
||||
try:
|
||||
access_response = await client.get(spec.url)
|
||||
access_response.raise_for_status()
|
||||
except httpx.HTTPStatusError as e:
|
||||
self.failures.append(f"HTTP error occurred: {e}")
|
||||
logger.error(f"Access verification failed: {e}")
|
||||
return f"Access verification failed: {e}"
|
||||
except Exception as e:
|
||||
self.failures.append(f"An error occurred: {e}")
|
||||
logger.error(f"Access verification failed: {e}")
|
||||
return f"Access verification failed: {e}"
|
||||
|
||||
# Run the sample test
|
||||
try:
|
||||
access_response = await client.get(endpoint)
|
||||
access_response.raise_for_status()
|
||||
test_response = await client.post(f"{spec.url}/test", json=sample_test)
|
||||
test_response.raise_for_status()
|
||||
response_data = test_response.json()
|
||||
if "choices" in response_data and len(response_data["choices"]) > 0:
|
||||
return f"Testing agent at {spec.url} succeeded: {response_data}"
|
||||
else:
|
||||
self.failures.append("Invalid response format")
|
||||
logger.error("Sample test failed: Invalid response format")
|
||||
return "Sample test failed: Invalid response format"
|
||||
except httpx.HTTPStatusError as e:
|
||||
self.failures.append(f"HTTP error occurred: {e}")
|
||||
logger.error(f"Access verification failed: {e}")
|
||||
return f"Access verification failed: {e}"
|
||||
logger.error(f"Sample test failed: {e}")
|
||||
return f"Sample test failed: {e}"
|
||||
except Exception as e:
|
||||
self.failures.append(f"An error occurred: {e}")
|
||||
logger.error(f"Access verification failed: {e}")
|
||||
return f"Access verification failed: {e}"
|
||||
|
||||
# Run the sample test
|
||||
try:
|
||||
test_response = await client.post(f"{endpoint}/test", json=sample_test)
|
||||
test_response.raise_for_status()
|
||||
response_data = test_response.json()
|
||||
# Validate the response (this is a simple example, adjust as needed)
|
||||
if "choices" in response_data and len(response_data["choices"]) > 0:
|
||||
return f"Testing {agent_type} agent at {endpoint} succeeded: {response_data}"
|
||||
else:
|
||||
self.failures.append("Invalid response format")
|
||||
logger.error("Sample test failed: Invalid response format")
|
||||
return "Sample test failed: Invalid response format"
|
||||
except httpx.HTTPStatusError as e:
|
||||
self.failures.append(f"HTTP error occurred: {e}")
|
||||
logger.error(f"Sample test failed: {e}")
|
||||
return f"Sample test failed: {e}"
|
||||
except Exception as e:
|
||||
self.failures.append(f"An error occurred: {e}")
|
||||
logger.error(f"Sample test failed: {e}")
|
||||
return f"Sample test failed: {e}"
|
||||
else:
|
||||
logger.error("Invalid description format.")
|
||||
return "Invalid description format."
|
||||
logger.error(f"Sample test failed: {e}")
|
||||
return f"Sample test failed: {e}"
|
||||
|
||||
# Initialize OperatorToolBox with AgentSpecification
|
||||
spec = AgentSpecification(
|
||||
@@ -123,41 +111,31 @@ spec = AgentSpecification(
|
||||
configuration={"max_tokens": 100},
|
||||
)
|
||||
|
||||
# dataset_manager_agent.py
|
||||
|
||||
|
||||
# Initialize OperatorToolBox
|
||||
toolbox = OperatorToolBox(spec=spec, datasets=["dataset1", "dataset2", "dataset3"])
|
||||
|
||||
# Define the agent with OperatorToolBox as its dependency
|
||||
dataset_manager_agent = Agent(
|
||||
model="gpt-4",
|
||||
deps_type=OperatorToolBox,
|
||||
result_type=str, # The agent will return string results
|
||||
result_type=str,
|
||||
system_prompt="You can validate the toolbox, run operations, and retrieve results or failures.",
|
||||
)
|
||||
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def validate_toolbox(ctx: RunContext[OperatorToolBox]) -> str:
|
||||
"""Validate the OperatorToolBox."""
|
||||
is_valid = ctx.deps.validate()
|
||||
if is_valid:
|
||||
return "ToolBox validation successful."
|
||||
else:
|
||||
return "ToolBox validation failed."
|
||||
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def execute_operation(ctx: RunContext[OperatorToolBox], operation: str) -> str:
|
||||
"""Execute an operation on a dataset."""
|
||||
result = ctx.deps.run_operation(operation)
|
||||
return result
|
||||
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def retrieve_results(ctx: RunContext[OperatorToolBox]) -> str:
|
||||
"""Retrieve the results of operations."""
|
||||
results = ctx.deps.get_results()
|
||||
if results:
|
||||
formatted_results = "\n".join([f"{op}: {res}" for op, res in results.items()])
|
||||
@@ -165,10 +143,8 @@ async def retrieve_results(ctx: RunContext[OperatorToolBox]) -> str:
|
||||
else:
|
||||
return "No operations have been executed yet."
|
||||
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def retrieve_failures(ctx: RunContext[OperatorToolBox]) -> str:
|
||||
"""Retrieve the list of failures."""
|
||||
failures = ctx.deps.get_failures()
|
||||
if failures:
|
||||
formatted_failures = "\n".join(failures)
|
||||
@@ -176,6 +152,10 @@ async def retrieve_failures(ctx: RunContext[OperatorToolBox]) -> str:
|
||||
else:
|
||||
return "No failures recorded."
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def test_agent(ctx: RunContext[OperatorToolBox], description: str, sample_test: Dict[str, Any]) -> str:
|
||||
result = await ctx.deps.test(description, sample_test)
|
||||
return result
|
||||
|
||||
# Synchronous run example
|
||||
def run_dataset_manager_agent_sync():
|
||||
@@ -225,10 +205,9 @@ async def run_dataset_manager_agent_async():
|
||||
print(f"Prompt: {prompt}")
|
||||
print(f"Response: {result.data}\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run synchronous example
|
||||
run_dataset_manager_agent_sync()
|
||||
|
||||
# Run asynchronous example
|
||||
asyncio.run(run_dataset_manager_agent_async())
|
||||
asyncio.run(run_dataset_manager_agent_async())
|
||||
Reference in New Issue
Block a user