mirror of
https://github.com/msoedov/agentic_security.git
synced 2026-06-24 06:09:55 +02:00
fix(update operator py):
This commit is contained in:
@@ -3,10 +3,13 @@ import logging
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
from httpx import LLMSpec
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic_ai import Agent, RunContext
|
||||
|
||||
from agentic_security.http_spec import LLMSpec
|
||||
|
||||
LLM_SPECS = []
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -28,6 +31,7 @@ class OperatorToolBox:
|
||||
self.spec = spec
|
||||
self.datasets = datasets
|
||||
self.failures = []
|
||||
self.llm_specs = [LLMSpec.from_string(spec) for spec in LLM_SPECS]
|
||||
|
||||
def get_spec(self) -> AgentSpecification:
|
||||
return self.spec
|
||||
@@ -62,52 +66,33 @@ class OperatorToolBox:
|
||||
return f"Operation '{operation}' failed: Dataset not found."
|
||||
return f"Operation '{operation}' executed successfully."
|
||||
|
||||
async def test(self, description: str, sample_test: dict[str, Any]) -> str:
|
||||
agent = Agent(
|
||||
"openai:gpt-4o",
|
||||
result_type=LLMSpec,
|
||||
system_prompt="Extract the LLM specification from the input",
|
||||
)
|
||||
async def test_llm_spec(self, llm_spec: LLMSpec, user_prompt: str) -> str:
|
||||
try:
|
||||
# Verify the spec
|
||||
response = await llm_spec.verify()
|
||||
response.raise_for_status()
|
||||
logger.info(f"Verification succeeded for {llm_spec.url}")
|
||||
|
||||
async with agent.run_stream(description) as result:
|
||||
async for spec in result.stream():
|
||||
self.spec.endpoint = spec.url
|
||||
# Run test with user prompt
|
||||
test_response = await llm_spec.probe(user_prompt)
|
||||
test_response.raise_for_status()
|
||||
response_data = test_response.json()
|
||||
return f"Test succeeded for {llm_spec.url}: {response_data}"
|
||||
except httpx.HTTPStatusError as e:
|
||||
self.failures.append(f"HTTP error occurred: {e}")
|
||||
logger.error(f"Test failed for {llm_spec.url}: {e}")
|
||||
return f"Test failed for {llm_spec.url}: {e}"
|
||||
except Exception as e:
|
||||
self.failures.append(f"An error occurred: {e}")
|
||||
logger.error(f"Test failed for {llm_spec.url}: {e}")
|
||||
return f"Test failed for {llm_spec.url}: {e}"
|
||||
|
||||
# Verify access to the endpoint
|
||||
async with httpx.AsyncClient() as client:
|
||||
try:
|
||||
access_response = await client.get(spec.url)
|
||||
access_response.raise_for_status()
|
||||
except httpx.HTTPStatusError as e:
|
||||
self.failures.append(f"HTTP error occurred: {e}")
|
||||
logger.error(f"Access verification failed: {e}")
|
||||
return f"Access verification failed: {e}"
|
||||
except Exception as e:
|
||||
self.failures.append(f"An error occurred: {e}")
|
||||
logger.error(f"Access verification failed: {e}")
|
||||
return f"Access verification failed: {e}"
|
||||
async def test_with_prompt(self, spec_index: int, user_prompt: str) -> str:
|
||||
if not 0 <= spec_index < len(self.llm_specs):
|
||||
return f"Invalid spec index: {spec_index}. Valid range is 0 to {len(self.llm_specs) - 1}"
|
||||
|
||||
# Run the sample test
|
||||
try:
|
||||
test_response = await client.post(
|
||||
f"{spec.url}/test", json=sample_test
|
||||
)
|
||||
test_response.raise_for_status()
|
||||
response_data = test_response.json()
|
||||
if "choices" in response_data and len(response_data["choices"]) > 0:
|
||||
return f"Testing agent at {spec.url} succeeded: {response_data}"
|
||||
else:
|
||||
self.failures.append("Invalid response format")
|
||||
logger.error("Sample test failed: Invalid response format")
|
||||
return "Sample test failed: Invalid response format"
|
||||
except httpx.HTTPStatusError as e:
|
||||
self.failures.append(f"HTTP error occurred: {e}")
|
||||
logger.error(f"Sample test failed: {e}")
|
||||
return f"Sample test failed: {e}"
|
||||
except Exception as e:
|
||||
self.failures.append(f"An error occurred: {e}")
|
||||
logger.error(f"Sample test failed: {e}")
|
||||
return f"Sample test failed: {e}"
|
||||
llm_spec = self.llm_specs[spec_index]
|
||||
return await self.test_llm_spec(llm_spec, user_prompt)
|
||||
|
||||
|
||||
# Initialize OperatorToolBox with AgentSpecification
|
||||
@@ -126,104 +111,95 @@ dataset_manager_agent = Agent(
|
||||
model="gpt-4",
|
||||
deps_type=OperatorToolBox,
|
||||
result_type=str,
|
||||
system_prompt="You can validate the toolbox, run operations, and retrieve results or failures.",
|
||||
system_prompt="You can validate the toolbox, run operations, retrieve results or failures, and test LLM specs.",
|
||||
)
|
||||
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def validate_toolbox(ctx: RunContext[OperatorToolBox]) -> str:
|
||||
is_valid = ctx.deps.validate()
|
||||
if is_valid:
|
||||
return "ToolBox validation successful."
|
||||
else:
|
||||
return "ToolBox validation failed."
|
||||
return (
|
||||
"ToolBox validation successful." if is_valid else "ToolBox validation failed."
|
||||
)
|
||||
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def execute_operation(ctx: RunContext[OperatorToolBox], operation: str) -> str:
|
||||
result = ctx.deps.run_operation(operation)
|
||||
return result
|
||||
return ctx.deps.run_operation(operation)
|
||||
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def retrieve_results(ctx: RunContext[OperatorToolBox]) -> str:
|
||||
results = ctx.deps.get_results()
|
||||
if results:
|
||||
formatted_results = "\n".join([f"{op}: {res}" for op, res in results.items()])
|
||||
return f"Operation Results:\n{formatted_results}"
|
||||
else:
|
||||
return "No operations have been executed yet."
|
||||
return (
|
||||
f"Operation Results:\n{results}"
|
||||
if results
|
||||
else "No operations have been executed yet."
|
||||
)
|
||||
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def retrieve_failures(ctx: RunContext[OperatorToolBox]) -> str:
|
||||
failures = ctx.deps.get_failures()
|
||||
if failures:
|
||||
formatted_failures = "\n".join(failures)
|
||||
return f"Failures:\n{formatted_failures}"
|
||||
else:
|
||||
return "No failures recorded."
|
||||
return f"Failures:\n{failures}" if failures else "No failures recorded."
|
||||
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def test_agent(
|
||||
ctx: RunContext[OperatorToolBox], description: str, sample_test: dict[str, Any]
|
||||
async def list_llm_specs(ctx: RunContext[OperatorToolBox]) -> str:
|
||||
spec_list = "\n".join(
|
||||
f"{i}: {spec.url}" for i, spec in enumerate(ctx.deps.llm_specs)
|
||||
)
|
||||
return f"Available LLM Specs:\n{spec_list}"
|
||||
|
||||
|
||||
@dataset_manager_agent.tool
|
||||
async def test_llm_with_prompt(
|
||||
ctx: RunContext[OperatorToolBox], spec_index: int, user_prompt: str
|
||||
) -> str:
|
||||
result = await ctx.deps.test(description, sample_test)
|
||||
return result
|
||||
return await ctx.deps.test_with_prompt(spec_index, user_prompt)
|
||||
|
||||
|
||||
# Synchronous run example
|
||||
def run_dataset_manager_agent_sync():
|
||||
prompts = [
|
||||
"Validate the toolbox.",
|
||||
"Execute operation on 'dataset2'.",
|
||||
"Execute operation on 'dataset4'.", # This should fail
|
||||
"Retrieve the results.",
|
||||
"Retrieve any failures.",
|
||||
"Test my openAI compatible agent deployed at localhost:3000",
|
||||
]
|
||||
|
||||
sample_test = {"prompt": "Hello, how are you?", "max_tokens": 5}
|
||||
|
||||
for prompt in prompts:
|
||||
if "Test my" in prompt:
|
||||
result = dataset_manager_agent.run_sync(
|
||||
prompt, deps=toolbox, sample_test=sample_test
|
||||
)
|
||||
else:
|
||||
result = dataset_manager_agent.run_sync(prompt, deps=toolbox)
|
||||
print(f"Prompt: {prompt}")
|
||||
print(f"Response: {result.data}\n")
|
||||
|
||||
|
||||
# Asynchronous run example
|
||||
# Asynchronous run example with user confirmation
|
||||
async def run_dataset_manager_agent_async():
|
||||
prompts = [
|
||||
"Validate the toolbox.",
|
||||
"Execute operation on 'dataset2'.",
|
||||
"Execute operation on 'dataset4'.", # This should fail
|
||||
"Retrieve the results.",
|
||||
"Retrieve any failures.",
|
||||
"Test my openAI compatible agent deployed at localhost:3000",
|
||||
"List available LLM specs.",
|
||||
"I want to test an LLM with my prompt: 'Tell me a short story about a robot'. Which spec index should I use?",
|
||||
]
|
||||
|
||||
sample_test = {"prompt": "Hello, how are you?", "max_tokens": 5}
|
||||
|
||||
for prompt in prompts:
|
||||
if "Test my" in prompt:
|
||||
result = await dataset_manager_agent.run(
|
||||
prompt, deps=toolbox, sample_test=sample_test
|
||||
)
|
||||
else:
|
||||
result = await dataset_manager_agent.run(prompt, deps=toolbox)
|
||||
result = await dataset_manager_agent.run(prompt, deps=toolbox)
|
||||
print(f"Prompt: {prompt}")
|
||||
print(f"Response: {result.data}\n")
|
||||
|
||||
# Handle testing request
|
||||
if "test an LLM with my prompt" in prompt:
|
||||
print(
|
||||
"Please select a spec index from the list above and confirm to proceed."
|
||||
)
|
||||
# Simulate user input for demo (in real app, you'd get this from user)
|
||||
user_input = (
|
||||
input("Enter spec index and 'yes' to confirm (e.g., '0 yes'): ")
|
||||
.strip()
|
||||
.split()
|
||||
)
|
||||
if len(user_input) == 2 and user_input[1].lower() == "yes":
|
||||
try:
|
||||
spec_index = int(user_input[0])
|
||||
# Extract prompt from the original input
|
||||
user_prompt = prompt.split("my prompt: ")[1].strip("'")
|
||||
test_result = await dataset_manager_agent.run(
|
||||
f"Test LLM at index {spec_index} with prompt: {user_prompt}",
|
||||
deps=toolbox,
|
||||
spec_index=spec_index,
|
||||
user_prompt=user_prompt,
|
||||
)
|
||||
print(f"Test Response: {test_result.data}\n")
|
||||
except ValueError:
|
||||
print("Invalid spec index provided.\n")
|
||||
else:
|
||||
print("Test canceled. Please provide a valid index and confirmation.\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run synchronous example
|
||||
run_dataset_manager_agent_sync()
|
||||
|
||||
# Run asynchronous example
|
||||
asyncio.run(run_dataset_manager_agent_async())
|
||||
|
||||
Reference in New Issue
Block a user