diff --git a/agentic_security/probe_actor/operator.py b/agentic_security/probe_actor/operator.py index c270dd1..99d65f0 100644 --- a/agentic_security/probe_actor/operator.py +++ b/agentic_security/probe_actor/operator.py @@ -3,10 +3,13 @@ import logging from typing import Any import httpx -from httpx import LLMSpec from pydantic import BaseModel, Field from pydantic_ai import Agent, RunContext +from agentic_security.http_spec import LLMSpec + +LLM_SPECS = [] + # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -28,6 +31,7 @@ class OperatorToolBox: self.spec = spec self.datasets = datasets self.failures = [] + self.llm_specs = [LLMSpec.from_string(spec) for spec in LLM_SPECS] def get_spec(self) -> AgentSpecification: return self.spec @@ -62,52 +66,33 @@ class OperatorToolBox: return f"Operation '{operation}' failed: Dataset not found." return f"Operation '{operation}' executed successfully." - async def test(self, description: str, sample_test: dict[str, Any]) -> str: - agent = Agent( - "openai:gpt-4o", - result_type=LLMSpec, - system_prompt="Extract the LLM specification from the input", - ) + async def test_llm_spec(self, llm_spec: LLMSpec, user_prompt: str) -> str: + try: + # Verify the spec + response = await llm_spec.verify() + response.raise_for_status() + logger.info(f"Verification succeeded for {llm_spec.url}") - async with agent.run_stream(description) as result: - async for spec in result.stream(): - self.spec.endpoint = spec.url + # Run test with user prompt + test_response = await llm_spec.probe(user_prompt) + test_response.raise_for_status() + response_data = test_response.json() + return f"Test succeeded for {llm_spec.url}: {response_data}" + except httpx.HTTPStatusError as e: + self.failures.append(f"HTTP error occurred: {e}") + logger.error(f"Test failed for {llm_spec.url}: {e}") + return f"Test failed for {llm_spec.url}: {e}" + except Exception as e: + self.failures.append(f"An error occurred: {e}") + logger.error(f"Test failed for {llm_spec.url}: {e}") + return f"Test failed for {llm_spec.url}: {e}" - # Verify access to the endpoint - async with httpx.AsyncClient() as client: - try: - access_response = await client.get(spec.url) - access_response.raise_for_status() - except httpx.HTTPStatusError as e: - self.failures.append(f"HTTP error occurred: {e}") - logger.error(f"Access verification failed: {e}") - return f"Access verification failed: {e}" - except Exception as e: - self.failures.append(f"An error occurred: {e}") - logger.error(f"Access verification failed: {e}") - return f"Access verification failed: {e}" + async def test_with_prompt(self, spec_index: int, user_prompt: str) -> str: + if not 0 <= spec_index < len(self.llm_specs): + return f"Invalid spec index: {spec_index}. Valid range is 0 to {len(self.llm_specs) - 1}" - # Run the sample test - try: - test_response = await client.post( - f"{spec.url}/test", json=sample_test - ) - test_response.raise_for_status() - response_data = test_response.json() - if "choices" in response_data and len(response_data["choices"]) > 0: - return f"Testing agent at {spec.url} succeeded: {response_data}" - else: - self.failures.append("Invalid response format") - logger.error("Sample test failed: Invalid response format") - return "Sample test failed: Invalid response format" - except httpx.HTTPStatusError as e: - self.failures.append(f"HTTP error occurred: {e}") - logger.error(f"Sample test failed: {e}") - return f"Sample test failed: {e}" - except Exception as e: - self.failures.append(f"An error occurred: {e}") - logger.error(f"Sample test failed: {e}") - return f"Sample test failed: {e}" + llm_spec = self.llm_specs[spec_index] + return await self.test_llm_spec(llm_spec, user_prompt) # Initialize OperatorToolBox with AgentSpecification @@ -126,104 +111,95 @@ dataset_manager_agent = Agent( model="gpt-4", deps_type=OperatorToolBox, result_type=str, - system_prompt="You can validate the toolbox, run operations, and retrieve results or failures.", + system_prompt="You can validate the toolbox, run operations, retrieve results or failures, and test LLM specs.", ) @dataset_manager_agent.tool async def validate_toolbox(ctx: RunContext[OperatorToolBox]) -> str: is_valid = ctx.deps.validate() - if is_valid: - return "ToolBox validation successful." - else: - return "ToolBox validation failed." + return ( + "ToolBox validation successful." if is_valid else "ToolBox validation failed." + ) @dataset_manager_agent.tool async def execute_operation(ctx: RunContext[OperatorToolBox], operation: str) -> str: - result = ctx.deps.run_operation(operation) - return result + return ctx.deps.run_operation(operation) @dataset_manager_agent.tool async def retrieve_results(ctx: RunContext[OperatorToolBox]) -> str: results = ctx.deps.get_results() - if results: - formatted_results = "\n".join([f"{op}: {res}" for op, res in results.items()]) - return f"Operation Results:\n{formatted_results}" - else: - return "No operations have been executed yet." + return ( + f"Operation Results:\n{results}" + if results + else "No operations have been executed yet." + ) @dataset_manager_agent.tool async def retrieve_failures(ctx: RunContext[OperatorToolBox]) -> str: failures = ctx.deps.get_failures() - if failures: - formatted_failures = "\n".join(failures) - return f"Failures:\n{formatted_failures}" - else: - return "No failures recorded." + return f"Failures:\n{failures}" if failures else "No failures recorded." @dataset_manager_agent.tool -async def test_agent( - ctx: RunContext[OperatorToolBox], description: str, sample_test: dict[str, Any] +async def list_llm_specs(ctx: RunContext[OperatorToolBox]) -> str: + spec_list = "\n".join( + f"{i}: {spec.url}" for i, spec in enumerate(ctx.deps.llm_specs) + ) + return f"Available LLM Specs:\n{spec_list}" + + +@dataset_manager_agent.tool +async def test_llm_with_prompt( + ctx: RunContext[OperatorToolBox], spec_index: int, user_prompt: str ) -> str: - result = await ctx.deps.test(description, sample_test) - return result + return await ctx.deps.test_with_prompt(spec_index, user_prompt) -# Synchronous run example -def run_dataset_manager_agent_sync(): - prompts = [ - "Validate the toolbox.", - "Execute operation on 'dataset2'.", - "Execute operation on 'dataset4'.", # This should fail - "Retrieve the results.", - "Retrieve any failures.", - "Test my openAI compatible agent deployed at localhost:3000", - ] - - sample_test = {"prompt": "Hello, how are you?", "max_tokens": 5} - - for prompt in prompts: - if "Test my" in prompt: - result = dataset_manager_agent.run_sync( - prompt, deps=toolbox, sample_test=sample_test - ) - else: - result = dataset_manager_agent.run_sync(prompt, deps=toolbox) - print(f"Prompt: {prompt}") - print(f"Response: {result.data}\n") - - -# Asynchronous run example +# Asynchronous run example with user confirmation async def run_dataset_manager_agent_async(): prompts = [ "Validate the toolbox.", - "Execute operation on 'dataset2'.", - "Execute operation on 'dataset4'.", # This should fail - "Retrieve the results.", - "Retrieve any failures.", - "Test my openAI compatible agent deployed at localhost:3000", + "List available LLM specs.", + "I want to test an LLM with my prompt: 'Tell me a short story about a robot'. Which spec index should I use?", ] - sample_test = {"prompt": "Hello, how are you?", "max_tokens": 5} - for prompt in prompts: - if "Test my" in prompt: - result = await dataset_manager_agent.run( - prompt, deps=toolbox, sample_test=sample_test - ) - else: - result = await dataset_manager_agent.run(prompt, deps=toolbox) + result = await dataset_manager_agent.run(prompt, deps=toolbox) print(f"Prompt: {prompt}") print(f"Response: {result.data}\n") + # Handle testing request + if "test an LLM with my prompt" in prompt: + print( + "Please select a spec index from the list above and confirm to proceed." + ) + # Simulate user input for demo (in real app, you'd get this from user) + user_input = ( + input("Enter spec index and 'yes' to confirm (e.g., '0 yes'): ") + .strip() + .split() + ) + if len(user_input) == 2 and user_input[1].lower() == "yes": + try: + spec_index = int(user_input[0]) + # Extract prompt from the original input + user_prompt = prompt.split("my prompt: ")[1].strip("'") + test_result = await dataset_manager_agent.run( + f"Test LLM at index {spec_index} with prompt: {user_prompt}", + deps=toolbox, + spec_index=spec_index, + user_prompt=user_prompt, + ) + print(f"Test Response: {test_result.data}\n") + except ValueError: + print("Invalid spec index provided.\n") + else: + print("Test canceled. Please provide a valid index and confirmation.\n") + if __name__ == "__main__": - # Run synchronous example - run_dataset_manager_agent_sync() - - # Run asynchronous example asyncio.run(run_dataset_manager_agent_async())