diff --git a/agentic_security/probe_actor/operator.py b/agentic_security/probe_actor/operator.py index d815b8c..76890eb 100644 --- a/agentic_security/probe_actor/operator.py +++ b/agentic_security/probe_actor/operator.py @@ -10,13 +10,19 @@ import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) + class AgentSpecification(BaseModel): name: Optional[str] = Field(None, description="Name of the LLM/agent") version: Optional[str] = Field(None, description="Version of the LLM/agent") description: Optional[str] = Field(None, description="Description of the LLM/agent") capabilities: Optional[list[str]] = Field(None, description="List of capabilities") - configuration: Optional[dict[str, Any]] = Field(None, description="Configuration settings") - endpoint: Optional[str] = Field(None, description="Endpoint URL of the deployed agent") + configuration: Optional[dict[str, Any]] = Field( + None, description="Configuration settings" + ) + endpoint: Optional[str] = Field( + None, description="Endpoint URL of the deployed agent" + ) + class OperatorToolBox: def __init__(self, spec: AgentSpecification, datasets: list[dict[str, Any]]): @@ -59,9 +65,9 @@ class OperatorToolBox: async def test(self, description: str, sample_test: dict[str, Any]) -> str: agent = Agent( - 'openai:gpt-4o', + "openai:gpt-4o", result_type=LLMSpec, - system_prompt='Extract the LLM specification from the input', + system_prompt="Extract the LLM specification from the input", ) async with agent.run_stream(description) as result: @@ -84,7 +90,9 @@ class OperatorToolBox: # Run the sample test try: - test_response = await client.post(f"{spec.url}/test", json=sample_test) + test_response = await client.post( + f"{spec.url}/test", json=sample_test + ) test_response.raise_for_status() response_data = test_response.json() if "choices" in response_data and len(response_data["choices"]) > 0: @@ -102,6 +110,7 @@ class OperatorToolBox: logger.error(f"Sample test failed: {e}") return f"Sample test failed: {e}" + # Initialize OperatorToolBox with AgentSpecification spec = AgentSpecification( name="GPT-4", @@ -121,6 +130,7 @@ dataset_manager_agent = Agent( system_prompt="You can validate the toolbox, run operations, and retrieve results or failures.", ) + @dataset_manager_agent.tool async def validate_toolbox(ctx: RunContext[OperatorToolBox]) -> str: is_valid = ctx.deps.validate() @@ -129,11 +139,13 @@ async def validate_toolbox(ctx: RunContext[OperatorToolBox]) -> str: else: return "ToolBox validation failed." + @dataset_manager_agent.tool async def execute_operation(ctx: RunContext[OperatorToolBox], operation: str) -> str: result = ctx.deps.run_operation(operation) return result + @dataset_manager_agent.tool async def retrieve_results(ctx: RunContext[OperatorToolBox]) -> str: results = ctx.deps.get_results() @@ -143,6 +155,7 @@ async def retrieve_results(ctx: RunContext[OperatorToolBox]) -> str: else: return "No operations have been executed yet." + @dataset_manager_agent.tool async def retrieve_failures(ctx: RunContext[OperatorToolBox]) -> str: failures = ctx.deps.get_failures() @@ -152,11 +165,15 @@ async def retrieve_failures(ctx: RunContext[OperatorToolBox]) -> str: else: return "No failures recorded." + @dataset_manager_agent.tool -async def test_agent(ctx: RunContext[OperatorToolBox], description: str, sample_test: dict[str, Any]) -> str: +async def test_agent( + ctx: RunContext[OperatorToolBox], description: str, sample_test: dict[str, Any] +) -> str: result = await ctx.deps.test(description, sample_test) return result + # Synchronous run example def run_dataset_manager_agent_sync(): prompts = [ @@ -165,22 +182,22 @@ def run_dataset_manager_agent_sync(): "Execute operation on 'dataset4'.", # This should fail "Retrieve the results.", "Retrieve any failures.", - "Test my openAI compatible agent deployed at localhost:3000" + "Test my openAI compatible agent deployed at localhost:3000", ] - sample_test = { - "prompt": "Hello, how are you?", - "max_tokens": 5 - } + sample_test = {"prompt": "Hello, how are you?", "max_tokens": 5} for prompt in prompts: if "Test my" in prompt: - result = dataset_manager_agent.run_sync(prompt, deps=toolbox, sample_test=sample_test) + result = dataset_manager_agent.run_sync( + prompt, deps=toolbox, sample_test=sample_test + ) else: result = dataset_manager_agent.run_sync(prompt, deps=toolbox) print(f"Prompt: {prompt}") print(f"Response: {result.data}\n") + # Asynchronous run example async def run_dataset_manager_agent_async(): prompts = [ @@ -189,25 +206,25 @@ async def run_dataset_manager_agent_async(): "Execute operation on 'dataset4'.", # This should fail "Retrieve the results.", "Retrieve any failures.", - "Test my openAI compatible agent deployed at localhost:3000" + "Test my openAI compatible agent deployed at localhost:3000", ] - sample_test = { - "prompt": "Hello, how are you?", - "max_tokens": 5 - } + sample_test = {"prompt": "Hello, how are you?", "max_tokens": 5} for prompt in prompts: if "Test my" in prompt: - result = await dataset_manager_agent.run(prompt, deps=toolbox, sample_test=sample_test) + result = await dataset_manager_agent.run( + prompt, deps=toolbox, sample_test=sample_test + ) else: result = await dataset_manager_agent.run(prompt, deps=toolbox) print(f"Prompt: {prompt}") print(f"Response: {result.data}\n") + if __name__ == "__main__": # Run synchronous example run_dataset_manager_agent_sync() # Run asynchronous example - asyncio.run(run_dataset_manager_agent_async()) \ No newline at end of file + asyncio.run(run_dataset_manager_agent_async())