Merge feature/ai_module into dev

Add AI module with A2A wrapper and task agent
This commit is contained in:
tduhamel42
2025-10-14 15:03:15 +02:00
33 changed files with 1941 additions and 16 deletions

3
.gitignore vendored
View File

@@ -185,6 +185,9 @@ logs/
# FuzzForge project directories (user projects should manage their own .gitignore)
.fuzzforge/
# Docker volume configs (keep .env.example but ignore actual .env)
volumes/env/.env
# Test project databases and configurations
test_projects/*/.fuzzforge/
test_projects/*/findings.db*

View File

@@ -0,0 +1,9 @@
__pycache__
*.pyc
*.pyo
*.pytest_cache
*.coverage
coverage.xml
build/
dist/
.env

View File

@@ -0,0 +1,10 @@
# Default LiteLLM configuration
LITELLM_MODEL=gemini/gemini-2.0-flash-001
# LITELLM_PROVIDER=gemini
# API keys (uncomment and fill as needed)
# GOOGLE_API_KEY=
# OPENAI_API_KEY=
# ANTHROPIC_API_KEY=
# OPENROUTER_API_KEY=
# MISTRAL_API_KEY=

View File

@@ -0,0 +1,82 @@
# Architecture Overview
This package is a minimal ADK agent that keeps runtime behaviour and A2A access in separate layers so it can double as boilerplate.
## Directory Layout
```text
agent_with_adk_format/
├── __init__.py # Exposes root_agent for ADK runners
├── a2a_hot_swap.py # JSON-RPC helper for model/prompt swaps
├── README.md, QUICKSTART.md # Operational docs
├── ARCHITECTURE.md # This document
├── .env # Active environment (gitignored)
├── .env.example # Environment template
└── litellm_agent/
├── agent.py # Root Agent definition (LiteLLM shell)
├── callbacks.py # before_agent / before_model hooks
├── config.py # Defaults, state keys, control prefix
├── control.py # HOTSWAP command parsing/serialization
├── state.py # Session state wrapper + LiteLLM factory
├── tools.py # set_model / set_prompt / get_config
├── prompts.py # Base instruction text
└── agent.json # A2A agent card (served under /.well-known)
```
```mermaid
flowchart TD
subgraph ADK Runner
A["adk api_server / adk web / adk run"]
B["agent_with_adk_format/__init__.py"]
C["litellm_agent/agent.py (root_agent)"]
D["HotSwapState (state.py)"]
E["LiteLlm(model, provider)"]
end
subgraph Session State
S1[app:litellm_agent/model]
S2[app:litellm_agent/provider]
S3[app:litellm_agent/prompt]
end
A --> B --> C
C --> D
D -->|instantiate| E
D --> S1
D --> S2
D --> S3
E --> C
```
## Runtime Flow (ADK Runners)
1. **Startup**: `adk api_server`/`adk web` imports `agent_with_adk_format`, which exposes `root_agent` from `litellm_agent/agent.py`. `.env` at package root is loaded before the runner constructs the agent.
2. **Session State**: `callbacks.py` and `tools.py` read/write through `state.py`. We store `model`, `provider`, and `prompt` keys (prefixed `app:litellm_agent/…`) which persist across turns.
3. **Instruction Generation**: `provide_instruction` composes the base persona from `prompts.py` plus any stored prompt override. The current model/provider is appended for observability.
4. **Model Hot-Swap**: When a control message is detected (`[HOTSWAP:MODEL:…]`) the callback parses it via `control.py`, updates the session state, and calls `state.apply_state_to_agent` to instantiate a new `LiteLlm(model=…, custom_llm_provider=…)`. ADK runners reuse that instance for subsequent turns.
5. **Prompt Hot-Swap**: Similar path (`set_prompt` tool/callback) updates state; the dynamic instruction immediately reflects the change.
6. **Config Reporting**: Both the callback and the tool surface the summary string produced by `HotSwapState.describe()`, ensuring CLI, A2A, and UI all show the same data.
## A2A Integration
- `agent.json` defines the agent card and enables ADK to register `/a2a/litellm_agent` routes when launched with `--a2a`.
- `a2a_hot_swap.py` uses `a2a.client.A2AClient` to programmatically send control messages and user text via JSON-RPC. It supports streaming when available and falls back to blocking requests otherwise.
```mermaid
sequenceDiagram
participant Client as a2a_hot_swap.py
participant Server as ADK API Server
participant Agent as root_agent
Client->>Server: POST /a2a/litellm_agent (message/stream or message/send)
Server->>Agent: Invoke callbacks/tools
Agent->>Server: Status / artifacts / final message
Server->>Client: Streamed Task events
Client->>Client: Extract text & print summary
```
## Extending the Boilerplate
- Add tools under `litellm_agent/tools.py` and register them in `agent.py` to expose new capabilities.
- Use `state.py` to track additional configuration or session data (store under your own prefix to avoid collisions).
- When layering business logic, prefer expanding callbacks or adding higher-level agents while leaving the hot-swap mechanism untouched for reuse.

View File

@@ -0,0 +1,71 @@
# Docker & Kubernetes Deployment
## Local Docker
Build from the repository root:
```bash
docker build -t litellm-hot-swap:latest agent_with_adk_format
```
Run the container (port 8000, inject provider keys via env file or flags):
```bash
docker run \
-p 8000:8000 \
--env-file agent_with_adk_format/.env \
litellm-hot-swap:latest
```
The container serves Uvicorn on `http://localhost:8000`. Update `.env` (or pass `-e KEY=value`) before launching if you plan to hot-swap providers.
## Kubernetes (example manifest)
Use the same image, optionally pushed to a registry (`docker tag` + `docker push`). A simple Deployment/Service pair:
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: litellm-hot-swap
spec:
replicas: 1
selector:
matchLabels:
app: litellm-hot-swap
template:
metadata:
labels:
app: litellm-hot-swap
spec:
containers:
- name: server
image: <REGISTRY_URI>/litellm-hot-swap:latest
ports:
- containerPort: 8000
env:
- name: PORT
value: "8000"
- name: LITELLM_MODEL
value: gemini/gemini-2.0-flash-001
# Add provider keys as needed
# - name: OPENAI_API_KEY
# valueFrom:
# secretKeyRef:
# name: litellm-secrets
# key: OPENAI_API_KEY
---
apiVersion: v1
kind: Service
metadata:
name: litellm-hot-swap
spec:
type: LoadBalancer
selector:
app: litellm-hot-swap
ports:
- port: 80
targetPort: 8000
```
Apply with `kubectl apply -f deployment.yaml`. Provide secrets via `env` or Kubernetes Secrets.

View File

@@ -0,0 +1,19 @@
# syntax=docker/dockerfile:1
FROM python:3.11-slim AS base
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1 \
PORT=8000
WORKDIR /app
COPY requirements.txt ./requirements.txt
RUN pip install --upgrade pip && pip install -r requirements.txt
COPY . /app/agent_with_adk_format
WORKDIR /app/agent_with_adk_format
ENV PYTHONPATH=/app
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -0,0 +1,61 @@
# Quick Start Guide
## Launch the Agent
From the repository root you can expose the agent through any ADK entry point:
```bash
# A2A / HTTP server
adk api_server --a2a --port 8000 agent_with_adk_format
# Browser UI
adk web agent_with_adk_format
# Interactive terminal
adk run agent_with_adk_format
```
The A2A server exposes the JSON-RPC endpoint at `http://localhost:8000/a2a/litellm_agent`.
## Hot-Swap from the Command Line
Use the bundled helper to change model and prompt via A2A without touching the UI:
```bash
python agent_with_adk_format/a2a_hot_swap.py \
--model openai gpt-4o \
--prompt "You are concise." \
--config \
--context demo-session
```
The script sends the control messages for you and prints the servers responses. The `--context` flag lets you reuse the same conversation across multiple invocations.
### Follow-up Messages
Once the swaps are applied you can send a user message on the same session:
```bash
python agent_with_adk_format/a2a_hot_swap.py \
--context demo-session \
--message "Summarise the current configuration in five words."
```
### Clearing the Prompt
```bash
python agent_with_adk_format/a2a_hot_swap.py \
--context demo-session \
--prompt "" \
--config
```
## Control Messages (for reference)
Behind the scenes the helper sends plain text messages understood by the callbacks:
- `[HOTSWAP:MODEL:provider/model]`
- `[HOTSWAP:PROMPT:text]`
- `[HOTSWAP:GET_CONFIG]`
You can craft the same messages from any A2A client if you prefer.

View File

@@ -0,0 +1,349 @@
# LiteLLM Agent with Hot-Swap Support
A flexible AI agent powered by LiteLLM that supports runtime hot-swapping of models and system prompts. Compatible with ADK and A2A protocols.
## Features
- 🔄 **Hot-Swap Models**: Change LLM models on-the-fly without restarting
- 📝 **Dynamic Prompts**: Update system prompts during conversation
- 🌐 **Multi-Provider Support**: Works with OpenAI, Anthropic, Google, OpenRouter, and more
- 🔌 **A2A Compatible**: Can be served as an A2A agent
- 🛠️ **ADK Integration**: Run with `adk web`, `adk run`, or `adk api_server`
## Architecture
```
task_agent/
├── __init__.py # Exposes root_agent for ADK
├── a2a_hot_swap.py # JSON-RPC helper for hot-swapping
├── README.md # This guide
├── QUICKSTART.md # Quick-start walkthrough
├── .env # Active environment (gitignored)
├── .env.example # Environment template
└── litellm_agent/
├── __init__.py
├── agent.py # Main agent implementation
├── agent.json # A2A agent card
├── callbacks.py # ADK callbacks
├── config.py # Defaults and state keys
├── control.py # HOTSWAP message helpers
├── prompts.py # Base instruction
├── state.py # Session state utilities
└── tools.py # set_model / set_prompt / get_config
```
## Setup
### 1. Environment Configuration
Copying the example file is optional—the repository already ships with a root-level `.env` seeded with defaults. Adjust the values at the package root:
```bash
cd task_agent
# Optionally refresh from the template
# cp .env.example .env
```
Edit `.env` (or `.env.example`) and add your API keys. The agent must be restarted after changes so the values are picked up:
```bash
# Set default model
LITELLM_MODEL=gemini/gemini-2.0-flash-001
# Add API keys for providers you want to use
GOOGLE_API_KEY=your_google_api_key
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
OPENROUTER_API_KEY=your_openrouter_api_key
```
### 2. Install Dependencies
```bash
pip install "google-adk" "a2a-sdk[all]" "python-dotenv" "litellm"
```
### 3. Run in Docker
Build the container (this image can be pushed to any registry or run locally):
```bash
docker build -t litellm-hot-swap:latest task_agent
```
Provide environment configuration at runtime (either pass variables individually or mount a file):
```bash
docker run \
-p 8000:8000 \
--env-file task_agent/.env \
litellm-hot-swap:latest
```
The container starts Uvicorn with the ADK app (`main.py`) listening on port 8000.
## Running the Agent
### Option 1: ADK Web UI (Recommended for Testing)
Start the web interface:
```bash
adk web task_agent
```
> **Tip:** before launching `adk web`/`adk run`/`adk api_server`, ensure the root-level `.env` contains valid API keys for any provider you plan to hot-swap to (e.g. set `OPENAI_API_KEY` before switching to `openai/gpt-4o`).
Open http://localhost:8000 in your browser and interact with the agent.
### Option 2: ADK Terminal
Run in terminal mode:
```bash
adk run task_agent
```
### Option 3: A2A API Server
Start as an A2A-compatible API server:
```bash
adk api_server --a2a --port 8000 task_agent
```
The agent will be available at: `http://localhost:8000/a2a/litellm_agent`
### Command-line helper
Use the bundled script to drive hot-swaps and user messages over A2A:
```bash
python task_agent/a2a_hot_swap.py \
--url http://127.0.0.1:8000/a2a/litellm_agent \
--model openai gpt-4o \
--prompt "You are concise." \
--config \
--context demo-session
```
To send a follow-up prompt in the same session (with a larger timeout for long answers):
```bash
python task_agent/a2a_hot_swap.py \
--url http://127.0.0.1:8000/a2a/litellm_agent \
--model openai gpt-4o \
--prompt "You are concise." \
--message "Give me a fuzzing harness." \
--context demo-session \
--timeout 120
```
> Ensure the corresponding provider keys are present in `.env` (or passed via environment variables) before issuing model swaps.
## Hot-Swap Tools
The agent provides three special tools:
### 1. `set_model` - Change the LLM Model
Change the model during conversation:
```
User: Use the set_model tool to change to gpt-4o with openai provider
Agent: ✅ Model configured to: openai/gpt-4o
This change is active now!
```
**Parameters:**
- `model`: Model name (e.g., "gpt-4o", "claude-3-sonnet-20240229")
- `custom_llm_provider`: Optional provider prefix (e.g., "openai", "anthropic", "openrouter")
**Examples:**
- OpenAI: `set_model(model="gpt-4o", custom_llm_provider="openai")`
- Anthropic: `set_model(model="claude-3-sonnet-20240229", custom_llm_provider="anthropic")`
- Google: `set_model(model="gemini-2.0-flash-001", custom_llm_provider="gemini")`
### 2. `set_prompt` - Change System Prompt
Update the system instructions:
```
User: Use set_prompt to change my behavior to "You are a helpful coding assistant"
Agent: ✅ System prompt updated:
You are a helpful coding assistant
This change is active now!
```
### 3. `get_config` - View Configuration
Check current model and prompt:
```
User: Use get_config to show me your configuration
Agent: 📊 Current Configuration:
━━━━━━━━━━━━━━━━━━━━━━
Model: openai/gpt-4o
System Prompt: You are a helpful coding assistant
━━━━━━━━━━━━━━━━━━━━━━
```
## Testing
### Basic A2A Client Test
```bash
python agent/test_a2a_client.py
```
### Hot-Swap Functionality Test
```bash
python agent/test_hotswap.py
```
This will:
1. Check initial configuration
2. Query with default model
3. Hot-swap to GPT-4o
4. Verify model changed
5. Change system prompt
6. Test new prompt behavior
7. Hot-swap to Claude
8. Verify final configuration
### Command-Line Hot-Swap Helper
You can trigger model and prompt changes directly against the A2A endpoint without the interactive CLI:
```bash
# Start the agent first (in another terminal):
adk api_server --a2a --port 8000 task_agent
# Apply swaps via pure A2A calls
python task_agent/a2a_hot_swap.py --model openai gpt-4o --prompt "You are concise." --config
python task_agent/a2a_hot_swap.py --model anthropic claude-3-sonnet-20240229 --context shared-session --config
python task_agent/a2a_hot_swap.py --prompt "" --context shared-session --config # Clear the prompt and show current state
```
`--model` accepts either `provider/model` or a provider/model pair. Add `--context` if you want to reuse the same conversation across invocations. Use `--config` to dump the agent's configuration after the changes are applied.
## Supported Models
### OpenAI
- `openai/gpt-4o`
- `openai/gpt-4-turbo`
- `openai/gpt-3.5-turbo`
### Anthropic
- `anthropic/claude-3-opus-20240229`
- `anthropic/claude-3-sonnet-20240229`
- `anthropic/claude-3-haiku-20240307`
### Google
- `gemini/gemini-2.0-flash-001`
- `gemini/gemini-2.5-pro-exp-03-25`
- `vertex_ai/gemini-2.0-flash-001`
### OpenRouter
- `openrouter/anthropic/claude-3-opus`
- `openrouter/openai/gpt-4`
- Any model from OpenRouter catalog
## How It Works
### Session State
- Model and prompt settings are stored in session state
- Each session maintains its own configuration
- Settings persist across messages in the same session
### Hot-Swap Mechanism
1. Tools update session state with new model/prompt
2. `before_agent_callback` checks for changes
3. If model changed, directly updates: `agent.model = LiteLlm(model=new_model)`
4. Dynamic instruction function reads custom prompt from session state
### A2A Compatibility
- Agent card at `agent.json` defines A2A metadata
- Served at `/a2a/litellm_agent` endpoint
- Compatible with A2A client protocol
## Example Usage
### Interactive Session
```python
from a2a.client import A2AClient
import asyncio
async def chat():
client = A2AClient("http://localhost:8000/a2a/litellm_agent")
context_id = "my-session-123"
# Start with default model
async for msg in client.send_message("Hello!", context_id=context_id):
print(msg)
# Switch to GPT-4
async for msg in client.send_message(
"Use set_model with model gpt-4o and provider openai",
context_id=context_id
):
print(msg)
# Continue with new model
async for msg in client.send_message(
"Help me write a function",
context_id=context_id
):
print(msg)
asyncio.run(chat())
```
## Troubleshooting
### Model Not Found
- Ensure API key for the provider is set in `.env`
- Check model name is correct for the provider
- Verify LiteLLM supports the model (https://docs.litellm.ai/docs/providers)
### Connection Refused
- Ensure the agent is running (`adk api_server --a2a task_agent`)
- Check the port matches (default: 8000)
- Verify no firewall blocking localhost
### Hot-Swap Not Working
- Check that you're using the same `context_id` across messages
- Ensure the tool is being called (not just asked to switch)
- Look for `🔄 Hot-swapped model to:` in server logs
## Development
### Adding New Tools
```python
async def my_tool(tool_ctx: ToolContext, param: str) -> str:
"""Your tool description."""
# Access session state
tool_ctx.state["my_key"] = "my_value"
return "Tool result"
# Add to agent
root_agent = LlmAgent(
# ...
tools=[set_model, set_prompt, get_config, my_tool],
)
```
### Modifying Callbacks
```python
async def after_model_callback(
callback_context: CallbackContext,
llm_response: LlmResponse
) -> Optional[LlmResponse]:
"""Modify response after model generates it."""
# Your logic here
return llm_response
```
## License
Apache 2.0

View File

@@ -0,0 +1,5 @@
"""Package entry point for the ADK-formatted hot swap agent."""
from .litellm_agent.agent import root_agent
__all__ = ["root_agent"]

View File

@@ -0,0 +1,224 @@
#!/usr/bin/env python3
"""Minimal A2A client utility for hot-swapping LiteLLM model/prompt."""
from __future__ import annotations
import argparse
import asyncio
from typing import Optional
from uuid import uuid4
import httpx
from a2a.client import A2AClient
from a2a.client.errors import A2AClientHTTPError
from a2a.types import (
JSONRPCErrorResponse,
Message,
MessageSendConfiguration,
MessageSendParams,
Part,
Role,
SendMessageRequest,
SendStreamingMessageRequest,
Task,
TaskArtifactUpdateEvent,
TaskStatusUpdateEvent,
TextPart,
)
from litellm_agent.control import (
HotSwapCommand,
build_control_message,
parse_model_spec,
serialize_model_spec,
)
DEFAULT_URL = "http://localhost:8000/a2a/litellm_agent"
async def _collect_text(client: A2AClient, message: str, context_id: str) -> str:
"""Send a message and collect streamed agent text into a single string."""
params = MessageSendParams(
configuration=MessageSendConfiguration(blocking=True),
message=Message(
context_id=context_id,
message_id=str(uuid4()),
role=Role.user,
parts=[Part(root=TextPart(text=message))],
),
)
stream_request = SendStreamingMessageRequest(id=str(uuid4()), params=params)
buffer: list[str] = []
try:
async for response in client.send_message_streaming(stream_request):
root = response.root
if isinstance(root, JSONRPCErrorResponse):
raise RuntimeError(f"A2A error: {root.error}")
payload = root.result
buffer.extend(_extract_text(payload))
except A2AClientHTTPError as exc:
if "text/event-stream" not in str(exc):
raise
send_request = SendMessageRequest(id=str(uuid4()), params=params)
response = await client.send_message(send_request)
root = response.root
if isinstance(root, JSONRPCErrorResponse):
raise RuntimeError(f"A2A error: {root.error}")
payload = root.result
buffer.extend(_extract_text(payload))
if buffer:
buffer = list(dict.fromkeys(buffer))
return "\n".join(buffer).strip()
def _extract_text(
result: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent,
) -> list[str]:
texts: list[str] = []
if isinstance(result, Message):
if result.role is Role.agent:
for part in result.parts:
root_part = part.root
text = getattr(root_part, "text", None)
if text:
texts.append(text)
elif isinstance(result, Task) and result.history:
for msg in result.history:
if msg.role is Role.agent:
for part in msg.parts:
root_part = part.root
text = getattr(root_part, "text", None)
if text:
texts.append(text)
elif isinstance(result, TaskStatusUpdateEvent):
message = result.status.message
if message:
texts.extend(_extract_text(message))
elif isinstance(result, TaskArtifactUpdateEvent):
artifact = result.artifact
if artifact and artifact.parts:
for part in artifact.parts:
root_part = part.root
text = getattr(root_part, "text", None)
if text:
texts.append(text)
return texts
def _split_model_args(model_args: Optional[list[str]]) -> tuple[Optional[str], Optional[str]]:
if not model_args:
return None, None
if len(model_args) == 1:
return model_args[0], None
provider = model_args[0]
model = " ".join(model_args[1:])
return model, provider
async def hot_swap(
url: str,
*,
model_args: Optional[list[str]],
provider: Optional[str],
prompt: Optional[str],
message: Optional[str],
show_config: bool,
context_id: Optional[str],
timeout: float,
) -> None:
"""Execute the requested hot-swap operations against the A2A endpoint."""
timeout_config = httpx.Timeout(timeout)
async with httpx.AsyncClient(timeout=timeout_config) as http_client:
client = A2AClient(url=url, httpx_client=http_client)
session_id = context_id or str(uuid4())
model, derived_provider = _split_model_args(model_args)
if model:
spec = parse_model_spec(model, provider=provider or derived_provider)
payload = serialize_model_spec(spec)
control_msg = build_control_message(HotSwapCommand.MODEL, payload)
result = await _collect_text(client, control_msg, session_id)
print(f"Model response: {result or '(no response)'}")
if prompt is not None:
control_msg = build_control_message(HotSwapCommand.PROMPT, prompt)
result = await _collect_text(client, control_msg, session_id)
print(f"Prompt response: {result or '(no response)'}")
if show_config:
control_msg = build_control_message(HotSwapCommand.GET_CONFIG)
result = await _collect_text(client, control_msg, session_id)
print(f"Config:\n{result or '(no response)'}")
if message:
result = await _collect_text(client, message, session_id)
print(f"Message response: {result or '(no response)'}")
print(f"Context ID: {session_id}")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--url",
default=DEFAULT_URL,
help=f"A2A endpoint for the agent (default: {DEFAULT_URL})",
)
parser.add_argument(
"--model",
nargs="+",
help="LiteLLM model spec: either 'provider/model' or '<provider> <model>'.",
)
parser.add_argument(
"--provider",
help="Optional LiteLLM provider when --model lacks a prefix.")
parser.add_argument(
"--prompt",
help="Set the system prompt (omit to leave unchanged; empty string clears it).",
)
parser.add_argument(
"--message",
help="Send an additional user message after the swaps complete.")
parser.add_argument(
"--config",
action="store_true",
help="Print the agent configuration after performing swaps.")
parser.add_argument(
"--context",
help="Optional context/session identifier to reuse across calls.")
parser.add_argument(
"--timeout",
type=float,
default=60.0,
help="Request timeout (seconds) for A2A calls (default: 60).",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
asyncio.run(
hot_swap(
args.url,
model_args=args.model,
provider=args.provider,
prompt=args.prompt,
message=args.message,
show_config=args.config,
context_id=args.context,
timeout=args.timeout,
)
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,24 @@
version: '3.8'
services:
task-agent:
build:
context: .
dockerfile: Dockerfile
container_name: fuzzforge-task-agent
ports:
- "10900:8000"
env_file:
- ../../../volumes/env/.env
environment:
- PORT=8000
- PYTHONUNBUFFERED=1
volumes:
# Mount volumes/env for runtime config access
- ../../../volumes/env:/app/config:ro
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3

View File

@@ -0,0 +1,55 @@
"""LiteLLM hot-swap agent package exports."""
from .agent import root_agent
from .callbacks import (
before_agent_callback,
before_model_callback,
provide_instruction,
)
from .config import (
AGENT_DESCRIPTION,
AGENT_NAME,
CONTROL_PREFIX,
DEFAULT_MODEL,
DEFAULT_PROVIDER,
STATE_MODEL_KEY,
STATE_PROVIDER_KEY,
STATE_PROMPT_KEY,
)
from .control import (
HotSwapCommand,
ModelSpec,
build_control_message,
parse_control_message,
parse_model_spec,
serialize_model_spec,
)
from .state import HotSwapState, apply_state_to_agent
from .tools import HOTSWAP_TOOLS, get_config, set_model, set_prompt
__all__ = [
"root_agent",
"before_agent_callback",
"before_model_callback",
"provide_instruction",
"AGENT_DESCRIPTION",
"AGENT_NAME",
"CONTROL_PREFIX",
"DEFAULT_MODEL",
"DEFAULT_PROVIDER",
"STATE_MODEL_KEY",
"STATE_PROVIDER_KEY",
"STATE_PROMPT_KEY",
"HotSwapCommand",
"ModelSpec",
"HotSwapState",
"apply_state_to_agent",
"build_control_message",
"parse_control_message",
"parse_model_spec",
"serialize_model_spec",
"HOTSWAP_TOOLS",
"get_config",
"set_model",
"set_prompt",
]

View File

@@ -0,0 +1,24 @@
{
"name": "litellm_agent",
"description": "A flexible AI agent powered by LiteLLM with hot-swappable models from OpenRouter and other providers",
"url": "http://localhost:8000",
"version": "1.0.0",
"defaultInputModes": ["text/plain"],
"defaultOutputModes": ["text/plain"],
"capabilities": {
"streaming": true
},
"skills": [
{
"id": "litellm-general-purpose",
"name": "General Purpose AI Assistant",
"description": "A flexible AI assistant that can help with various tasks using any LiteLLM-supported model. Supports runtime model and prompt hot-swapping.",
"tags": ["ai", "assistant", "litellm", "flexible", "hot-swap"],
"examples": [
"Help me write a Python function",
"Explain quantum computing",
"Switch to Claude model and help me code"
]
}
]
}

View File

@@ -0,0 +1,29 @@
"""Root agent definition for the LiteLLM hot-swap shell."""
from __future__ import annotations
from google.adk.agents import Agent
from .callbacks import (
before_agent_callback,
before_model_callback,
provide_instruction,
)
from .config import AGENT_DESCRIPTION, AGENT_NAME, DEFAULT_MODEL, DEFAULT_PROVIDER
from .state import HotSwapState
from .tools import HOTSWAP_TOOLS
_initial_state = HotSwapState(model=DEFAULT_MODEL, provider=DEFAULT_PROVIDER)
root_agent = Agent(
name=AGENT_NAME,
model=_initial_state.instantiate_llm(),
description=AGENT_DESCRIPTION,
instruction=provide_instruction,
tools=HOTSWAP_TOOLS,
before_agent_callback=before_agent_callback,
before_model_callback=before_model_callback,
)
__all__ = ["root_agent"]

View File

@@ -0,0 +1,137 @@
"""Callbacks and instruction providers for the LiteLLM hot-swap agent."""
from __future__ import annotations
import logging
from typing import Optional
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.models.llm_request import LlmRequest
from google.genai import types
from .config import CONTROL_PREFIX, DEFAULT_MODEL
from .control import HotSwapCommand, parse_control_message, parse_model_spec
from .prompts import BASE_INSTRUCTION
from .state import HotSwapState, apply_state_to_agent
_LOGGER = logging.getLogger(__name__)
def provide_instruction(ctx: ReadonlyContext | None = None) -> str:
"""Compose the system instruction using the stored state."""
state_mapping = getattr(ctx, "state", None)
state = HotSwapState.from_mapping(state_mapping)
prompt = state.prompt or BASE_INSTRUCTION
return f"{prompt}\n\nActive model: {state.display_model}"
def _ensure_state(callback_context: CallbackContext) -> HotSwapState:
state = HotSwapState.from_mapping(callback_context.state)
state.persist(callback_context.state)
return state
def _session_id(callback_context: CallbackContext) -> str:
session = getattr(callback_context, "session", None)
if session is None:
session = getattr(callback_context._invocation_context, "session", None)
return getattr(session, "id", "unknown-session")
async def before_model_callback(
callback_context: CallbackContext,
llm_request: LlmRequest,
) -> Optional[types.Content]:
"""Ensure outgoing requests use the active model from session state."""
state = _ensure_state(callback_context)
try:
apply_state_to_agent(callback_context._invocation_context, state)
except Exception: # pragma: no cover - defensive logging
_LOGGER.exception(
"Failed to apply LiteLLM model '%s' (provider=%s) for session %s",
state.model,
state.provider,
callback_context.session.id,
)
llm_request.model = state.model or DEFAULT_MODEL
return None
async def before_agent_callback(
callback_context: CallbackContext,
) -> Optional[types.Content]:
"""Intercept hot-swap control messages and update session state."""
user_content = callback_context.user_content
if not user_content or not user_content.parts:
return None
first_part = user_content.parts[0]
message_text = (first_part.text or "").strip()
if not message_text.startswith(CONTROL_PREFIX):
return None
parsed = parse_control_message(message_text)
if not parsed:
return None
command, payload = parsed
state = _ensure_state(callback_context)
if command is HotSwapCommand.MODEL:
if not payload:
return _render("❌ Missing model specification for hot-swap.")
try:
spec = parse_model_spec(payload)
except ValueError as exc:
return _render(f"❌ Invalid model specification: {exc}")
state.model = spec.model
state.provider = spec.provider
state.persist(callback_context.state)
try:
apply_state_to_agent(callback_context._invocation_context, state)
except Exception: # pragma: no cover - defensive logging
_LOGGER.exception(
"Failed to apply LiteLLM model '%s' (provider=%s) for session %s",
state.model,
state.provider,
_session_id(callback_context),
)
_LOGGER.info(
"Hot-swapped model to %s (provider=%s, session=%s)",
state.model,
state.provider,
_session_id(callback_context),
)
label = state.display_model
return _render(f"✅ Model switched to: {label}")
if command is HotSwapCommand.PROMPT:
prompt_value = (payload or "").strip()
state.prompt = prompt_value or None
state.persist(callback_context.state)
if state.prompt:
_LOGGER.info(
"Updated prompt for session %s", _session_id(callback_context)
)
return _render(
"✅ System prompt updated. This change takes effect immediately."
)
return _render("✅ System prompt cleared. Reverting to default instruction.")
if command is HotSwapCommand.GET_CONFIG:
return _render(state.describe())
expected = ", ".join(HotSwapCommand.choices())
return _render(
"⚠️ Unsupported hot-swap command. Available verbs: "
f"{expected}."
)
def _render(message: str) -> types.ModelContent:
return types.ModelContent(parts=[types.Part(text=message)])

View File

@@ -0,0 +1,20 @@
"""Configuration constants for the LiteLLM hot-swap agent."""
from __future__ import annotations
import os
AGENT_NAME = "litellm_agent"
AGENT_DESCRIPTION = (
"A LiteLLM-backed shell that exposes hot-swappable model and prompt controls."
)
DEFAULT_MODEL = os.getenv("LITELLM_MODEL", "gemini-2.0-flash-001")
DEFAULT_PROVIDER = os.getenv("LITELLM_PROVIDER")
STATE_PREFIX = "app:litellm_agent/"
STATE_MODEL_KEY = f"{STATE_PREFIX}model"
STATE_PROVIDER_KEY = f"{STATE_PREFIX}provider"
STATE_PROMPT_KEY = f"{STATE_PREFIX}prompt"
CONTROL_PREFIX = "[HOTSWAP"

View File

@@ -0,0 +1,99 @@
"""Control message helpers for hot-swapping model and prompt."""
from __future__ import annotations
import re
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple
from .config import DEFAULT_PROVIDER
class HotSwapCommand(str, Enum):
"""Supported control verbs embedded in user messages."""
MODEL = "MODEL"
PROMPT = "PROMPT"
GET_CONFIG = "GET_CONFIG"
@classmethod
def choices(cls) -> tuple[str, ...]:
return tuple(item.value for item in cls)
@dataclass(frozen=True)
class ModelSpec:
"""Represents a LiteLLM model and optional provider."""
model: str
provider: Optional[str] = None
_COMMAND_PATTERN = re.compile(
r"^\[HOTSWAP:(?P<verb>[A-Z_]+)(?::(?P<payload>.*))?\]$",
)
def parse_control_message(text: str) -> Optional[Tuple[HotSwapCommand, Optional[str]]]:
"""Return hot-swap command tuple when the string matches the control format."""
match = _COMMAND_PATTERN.match(text.strip())
if not match:
return None
verb = match.group("verb")
if verb not in HotSwapCommand.choices():
return None
payload = match.group("payload")
return HotSwapCommand(verb), payload if payload else None
def build_control_message(command: HotSwapCommand, payload: Optional[str] = None) -> str:
"""Serialise a control command for downstream clients."""
if command not in HotSwapCommand:
raise ValueError(f"Unsupported hot-swap command: {command}")
if payload is None or payload == "":
return f"[HOTSWAP:{command.value}]"
return f"[HOTSWAP:{command.value}:{payload}]"
def parse_model_spec(model: str, provider: Optional[str] = None) -> ModelSpec:
"""Parse model/provider inputs into a structured ModelSpec."""
candidate = (model or "").strip()
if not candidate:
raise ValueError("Model name cannot be empty")
if provider:
provider_clean = provider.strip()
if not provider_clean:
raise ValueError("Provider cannot be empty when supplied")
if "/" in candidate:
raise ValueError(
"Provide either provider/model or use provider argument, not both",
)
return ModelSpec(model=candidate, provider=provider_clean)
if "/" in candidate:
provider_part, model_part = candidate.split("/", 1)
provider_part = provider_part.strip()
model_part = model_part.strip()
if not provider_part or not model_part:
raise ValueError("Model spec must include provider and model when using '/' format")
return ModelSpec(model=model_part, provider=provider_part)
if DEFAULT_PROVIDER:
return ModelSpec(model=candidate, provider=DEFAULT_PROVIDER.strip())
return ModelSpec(model=candidate, provider=None)
def serialize_model_spec(spec: ModelSpec) -> str:
"""Render a ModelSpec to provider/model string for control messages."""
if spec.provider:
return f"{spec.provider}/{spec.model}"
return spec.model

View File

@@ -0,0 +1,9 @@
"""System prompt templates for the LiteLLM agent."""
BASE_INSTRUCTION = (
"You are a focused orchestration layer that relays between the user and a"
" LiteLLM managed model."
"\n- Keep answers concise and actionable."
"\n- Prefer plain language; reveal intermediate reasoning only when helpful."
"\n- Surface any tool results clearly with short explanations."
)

View File

@@ -0,0 +1,86 @@
"""Session state utilities for the LiteLLM hot-swap agent."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Mapping, MutableMapping, Optional
from .config import (
DEFAULT_MODEL,
DEFAULT_PROVIDER,
STATE_MODEL_KEY,
STATE_PROMPT_KEY,
STATE_PROVIDER_KEY,
)
@dataclass(slots=True)
class HotSwapState:
"""Lightweight view of the hot-swap session state."""
model: str = DEFAULT_MODEL
provider: Optional[str] = None
prompt: Optional[str] = None
@classmethod
def from_mapping(cls, mapping: Optional[Mapping[str, Any]]) -> "HotSwapState":
if not mapping:
return cls()
raw_model = mapping.get(STATE_MODEL_KEY, DEFAULT_MODEL)
raw_provider = mapping.get(STATE_PROVIDER_KEY)
raw_prompt = mapping.get(STATE_PROMPT_KEY)
model = raw_model.strip() if isinstance(raw_model, str) else DEFAULT_MODEL
provider = raw_provider.strip() if isinstance(raw_provider, str) else None
if not provider and DEFAULT_PROVIDER:
provider = DEFAULT_PROVIDER.strip() or None
prompt = raw_prompt.strip() if isinstance(raw_prompt, str) else None
return cls(
model=model or DEFAULT_MODEL,
provider=provider or None,
prompt=prompt or None,
)
def persist(self, store: MutableMapping[str, object]) -> None:
store[STATE_MODEL_KEY] = self.model
if self.provider:
store[STATE_PROVIDER_KEY] = self.provider
else:
store[STATE_PROVIDER_KEY] = None
store[STATE_PROMPT_KEY] = self.prompt
def describe(self) -> str:
prompt_value = self.prompt if self.prompt else "(default prompt)"
provider_value = self.provider if self.provider else "(default provider)"
return (
"📊 Current Configuration\n"
"━━━━━━━━━━━━━━━━━━━━━━\n"
f"Model: {self.model}\n"
f"Provider: {provider_value}\n"
f"System Prompt: {prompt_value}\n"
"━━━━━━━━━━━━━━━━━━━━━━"
)
def instantiate_llm(self):
"""Create a LiteLlm instance for the current state."""
from google.adk.models.lite_llm import LiteLlm # Lazy import to avoid cycle
kwargs = {"model": self.model}
if self.provider:
kwargs["custom_llm_provider"] = self.provider
return LiteLlm(**kwargs)
@property
def display_model(self) -> str:
if self.provider:
return f"{self.provider}/{self.model}"
return self.model
def apply_state_to_agent(invocation_context, state: HotSwapState) -> None:
"""Update the provided agent with a LiteLLM instance matching state."""
agent = invocation_context.agent
agent.model = state.instantiate_llm()

View File

@@ -0,0 +1,64 @@
"""Tool definitions exposed to the LiteLLM agent."""
from __future__ import annotations
from typing import Optional
from google.adk.tools import FunctionTool, ToolContext
from .control import parse_model_spec
from .state import HotSwapState, apply_state_to_agent
async def set_model(
model: str,
*,
provider: Optional[str] = None,
tool_context: ToolContext,
) -> str:
"""Hot-swap the active LiteLLM model for this session."""
spec = parse_model_spec(model, provider=provider)
state = HotSwapState.from_mapping(tool_context.state)
state.model = spec.model
state.provider = spec.provider
state.persist(tool_context.state)
try:
apply_state_to_agent(tool_context._invocation_context, state)
except Exception as exc: # pragma: no cover - defensive reporting
return f"❌ Failed to apply model '{state.display_model}': {exc}"
return f"✅ Model switched to: {state.display_model}"
async def set_prompt(prompt: str, *, tool_context: ToolContext) -> str:
"""Update or clear the system prompt used for this session."""
state = HotSwapState.from_mapping(tool_context.state)
prompt_value = prompt.strip()
state.prompt = prompt_value or None
state.persist(tool_context.state)
if state.prompt:
return "✅ System prompt updated. This change takes effect immediately."
return "✅ System prompt cleared. Reverting to default instruction."
async def get_config(*, tool_context: ToolContext) -> str:
"""Return a summary of the current model and prompt configuration."""
state = HotSwapState.from_mapping(tool_context.state)
return state.describe()
HOTSWAP_TOOLS = [
FunctionTool(set_model),
FunctionTool(set_prompt),
FunctionTool(get_config),
]
__all__ = [
"set_model",
"set_prompt",
"get_config",
"HOTSWAP_TOOLS",
]

View File

@@ -0,0 +1,13 @@
"""ASGI entrypoint for containerized deployments."""
from pathlib import Path
from google.adk.cli.fast_api import get_fast_api_app
AGENT_DIR = Path(__file__).resolve().parent
app = get_fast_api_app(
agents_dir=str(AGENT_DIR),
web=False,
a2a=True,
)

View File

@@ -0,0 +1,4 @@
google-adk
a2a-sdk[all]
litellm
python-dotenv

View File

@@ -3,6 +3,11 @@ FuzzForge AI Module - Agent-to-Agent orchestration system
This module integrates the fuzzforge_ai components into FuzzForge,
providing intelligent AI agent capabilities for security analysis.
Usage:
from fuzzforge_ai.a2a_wrapper import send_agent_task
from fuzzforge_ai.agent import FuzzForgeAgent
from fuzzforge_ai.config_manager import ConfigManager
"""
# Copyright (c) 2025 FuzzingLabs
#
@@ -17,8 +22,3 @@ providing intelligent AI agent capabilities for security analysis.
__version__ = "0.6.0"
from .agent import FuzzForgeAgent
from .config_manager import ConfigManager
__all__ = ['FuzzForgeAgent', 'ConfigManager']

View File

@@ -0,0 +1,288 @@
"""
A2A Wrapper Module for FuzzForge
Programmatic interface to send tasks to A2A agents with custom model/prompt/context
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
from __future__ import annotations
from typing import Optional, Dict, Any
from uuid import uuid4
import httpx
from a2a.client import A2AClient
from a2a.client.errors import A2AClientHTTPError
from a2a.types import (
JSONRPCErrorResponse,
Message,
MessageSendConfiguration,
MessageSendParams,
Part,
Role,
SendMessageRequest,
SendStreamingMessageRequest,
Task,
TaskArtifactUpdateEvent,
TaskStatusUpdateEvent,
TextPart,
)
class A2ATaskResult:
"""Result from an A2A agent task"""
def __init__(self, text: str, context_id: str, raw_response: Any = None):
self.text = text
self.context_id = context_id
self.raw_response = raw_response
def __str__(self) -> str:
return self.text
def __repr__(self) -> str:
return f"A2ATaskResult(text={self.text[:50]}..., context_id={self.context_id})"
def _build_control_message(command: str, payload: Optional[str] = None) -> str:
"""Build a control message for hot-swapping agent configuration"""
if payload is None or payload == "":
return f"[HOTSWAP:{command}]"
return f"[HOTSWAP:{command}:{payload}]"
def _extract_text(
result: Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent,
) -> list[str]:
"""Extract text content from A2A response objects"""
texts: list[str] = []
if isinstance(result, Message):
if result.role is Role.agent:
for part in result.parts:
root_part = part.root
text = getattr(root_part, "text", None)
if text:
texts.append(text)
elif isinstance(result, Task) and result.history:
for msg in result.history:
if msg.role is Role.agent:
for part in msg.parts:
root_part = part.root
text = getattr(root_part, "text", None)
if text:
texts.append(text)
elif isinstance(result, TaskStatusUpdateEvent):
message = result.status.message
if message:
texts.extend(_extract_text(message))
elif isinstance(result, TaskArtifactUpdateEvent):
artifact = result.artifact
if artifact and artifact.parts:
for part in artifact.parts:
root_part = part.root
text = getattr(root_part, "text", None)
if text:
texts.append(text)
return texts
async def _send_message(
client: A2AClient,
message: str,
context_id: str,
) -> str:
"""Send a message to the A2A agent and collect the response"""
params = MessageSendParams(
configuration=MessageSendConfiguration(blocking=True),
message=Message(
context_id=context_id,
message_id=str(uuid4()),
role=Role.user,
parts=[Part(root=TextPart(text=message))],
),
)
stream_request = SendStreamingMessageRequest(id=str(uuid4()), params=params)
buffer: list[str] = []
try:
async for response in client.send_message_streaming(stream_request):
root = response.root
if isinstance(root, JSONRPCErrorResponse):
raise RuntimeError(f"A2A error: {root.error}")
payload = root.result
buffer.extend(_extract_text(payload))
except A2AClientHTTPError as exc:
if "text/event-stream" not in str(exc):
raise
# Fallback to non-streaming
send_request = SendMessageRequest(id=str(uuid4()), params=params)
response = await client.send_message(send_request)
root = response.root
if isinstance(root, JSONRPCErrorResponse):
raise RuntimeError(f"A2A error: {root.error}")
payload = root.result
buffer.extend(_extract_text(payload))
if buffer:
buffer = list(dict.fromkeys(buffer)) # Remove duplicates
return "\n".join(buffer).strip()
async def send_agent_task(
url: str,
message: str,
*,
model: Optional[str] = None,
provider: Optional[str] = None,
prompt: Optional[str] = None,
context: Optional[str] = None,
timeout: float = 120.0,
) -> A2ATaskResult:
"""
Send a task to an A2A agent with optional model/prompt configuration.
Args:
url: A2A endpoint URL (e.g., "http://127.0.0.1:8000/a2a/litellm_agent")
message: The task message to send to the agent
model: Optional model name (e.g., "gpt-4o", "gemini-2.0-flash")
provider: Optional provider name (e.g., "openai", "gemini")
prompt: Optional system prompt to set before sending the message
context: Optional context/session ID (generated if not provided)
timeout: Request timeout in seconds (default: 120)
Returns:
A2ATaskResult with the agent's response text and context ID
Example:
>>> result = await send_agent_task(
... url="http://127.0.0.1:8000/a2a/litellm_agent",
... model="gpt-4o",
... provider="openai",
... prompt="You are concise.",
... message="Give me a fuzzing harness.",
... context="fuzzing",
... timeout=120
... )
>>> print(result.text)
"""
timeout_config = httpx.Timeout(timeout)
context_id = context or str(uuid4())
async with httpx.AsyncClient(timeout=timeout_config) as http_client:
client = A2AClient(url=url, httpx_client=http_client)
# Set model if provided
if model:
model_spec = f"{provider}/{model}" if provider else model
control_msg = _build_control_message("MODEL", model_spec)
await _send_message(client, control_msg, context_id)
# Set prompt if provided
if prompt is not None:
control_msg = _build_control_message("PROMPT", prompt)
await _send_message(client, control_msg, context_id)
# Send the actual task message
response_text = await _send_message(client, message, context_id)
return A2ATaskResult(
text=response_text,
context_id=context_id,
)
async def get_agent_config(
url: str,
context: Optional[str] = None,
timeout: float = 60.0,
) -> str:
"""
Get the current configuration of an A2A agent.
Args:
url: A2A endpoint URL
context: Optional context/session ID
timeout: Request timeout in seconds
Returns:
Configuration string from the agent
"""
timeout_config = httpx.Timeout(timeout)
context_id = context or str(uuid4())
async with httpx.AsyncClient(timeout=timeout_config) as http_client:
client = A2AClient(url=url, httpx_client=http_client)
control_msg = _build_control_message("GET_CONFIG")
config_text = await _send_message(client, control_msg, context_id)
return config_text
async def hot_swap_model(
url: str,
model: str,
provider: Optional[str] = None,
context: Optional[str] = None,
timeout: float = 60.0,
) -> str:
"""
Hot-swap the model of an A2A agent without sending a task.
Args:
url: A2A endpoint URL
model: Model name to switch to
provider: Optional provider name
context: Optional context/session ID
timeout: Request timeout in seconds
Returns:
Response from the agent
"""
timeout_config = httpx.Timeout(timeout)
context_id = context or str(uuid4())
async with httpx.AsyncClient(timeout=timeout_config) as http_client:
client = A2AClient(url=url, httpx_client=http_client)
model_spec = f"{provider}/{model}" if provider else model
control_msg = _build_control_message("MODEL", model_spec)
response = await _send_message(client, control_msg, context_id)
return response
async def hot_swap_prompt(
url: str,
prompt: str,
context: Optional[str] = None,
timeout: float = 60.0,
) -> str:
"""
Hot-swap the system prompt of an A2A agent.
Args:
url: A2A endpoint URL
prompt: System prompt to set
context: Optional context/session ID
timeout: Request timeout in seconds
Returns:
Response from the agent
"""
timeout_config = httpx.Timeout(timeout)
context_id = context or str(uuid4())
async with httpx.AsyncClient(timeout=timeout_config) as http_client:
client = A2AClient(url=url, httpx_client=http_client)
control_msg = _build_control_message("PROMPT", prompt)
response = await _send_message(client, control_msg, context_id)
return response

View File

@@ -60,7 +60,7 @@ class FuzzForgeAgent:
debug=os.getenv('FUZZFORGE_DEBUG', '0') == '1',
memory_service=self.memory_service,
session_persistence=os.getenv('SESSION_PERSISTENCE', 'inmemory'),
fuzzforge_mcp_url=os.getenv('FUZZFORGE_MCP_URL'),
fuzzforge_mcp_url=None, # Disabled
)
# Create Hybrid Memory Manager (ADK + Cognee direct integration)

View File

@@ -172,7 +172,6 @@ def get_fuzzforge_agent_card(url: str = "http://localhost:10100") -> AgentCard:
orchestration_skill,
memory_skill,
conversation_skill,
workflow_automation_skill,
agent_management_skill
],
capabilities=fuzzforge_capabilities,

View File

@@ -21,10 +21,10 @@ except ImportError: # pragma: no cover - used when CLI not available
raise ImportError(
"ProjectConfigManager is unavailable. Install the FuzzForge CLI "
"package or supply a compatible configuration object."
) from exc
)
def __getattr__(name): # pragma: no cover - defensive
raise ImportError("ProjectConfigManager unavailable") from exc
raise ImportError("ProjectConfigManager unavailable")
ProjectConfigManager = _ProjectConfigManager

View File

@@ -403,12 +403,12 @@ class ProjectConfigManager:
if max_tokens:
os.environ["LLM_MAX_TOKENS"] = str(max_tokens)
# Provide a default MCP endpoint for local FuzzForge backend access when unset
if not os.getenv("FUZZFORGE_MCP_URL"):
os.environ["FUZZFORGE_MCP_URL"] = os.getenv(
"FUZZFORGE_DEFAULT_MCP_URL",
"http://localhost:8010/mcp",
)
# Disabled - FuzzForge MCP backend connection
# if not os.getenv("FUZZFORGE_MCP_URL"):
# os.environ["FUZZFORGE_MCP_URL"] = os.getenv(
# "FUZZFORGE_DEFAULT_MCP_URL",
# "http://localhost:8010/mcp",
# )
def refresh(self) -> None:
"""Reload configuration from disk."""

View File

@@ -448,6 +448,27 @@ services:
timeout: 10s
retries: 3
# ============================================================================
# Task Agent - A2A LiteLLM Agent
# ============================================================================
task-agent:
build:
context: ./ai/agents/task_agent
dockerfile: Dockerfile
container_name: fuzzforge-task-agent
ports:
- "10900:8000"
env_file:
- ./volumes/env/.env
environment:
- PORT=8000
- PYTHONUNBUFFERED=1
volumes:
- ./volumes/env:/app/config:ro
networks:
- fuzzforge-network
restart: unless-stopped
# ============================================================================
# Vertical Worker: OSS-Fuzz Campaigns
# ============================================================================

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python3
"""
Simple example of using the A2A wrapper
Run from project root: python examples/test_a2a_simple.py
"""
import asyncio
async def main():
# Clean import!
from fuzzforge_ai.a2a_wrapper import send_agent_task
print("Sending task to agent at http://127.0.0.1:10900...")
result = await send_agent_task(
url="http://127.0.0.1:10900/a2a/litellm_agent",
model="gpt-4o-mini",
provider="openai",
prompt="You are concise.",
message="Give me a simple Python function that adds two numbers.",
context="test_session",
timeout=120
)
print(f"\nContext ID: {result.context_id}")
print(f"\nResponse:\n{result.text}")
if __name__ == "__main__":
asyncio.run(main())

151
test_a2a_wrapper.py Executable file
View File

@@ -0,0 +1,151 @@
#!/usr/bin/env python3
"""
Test script for A2A wrapper module
Sends tasks to the task-agent to verify functionality
"""
import asyncio
import sys
from pathlib import Path
# Add ai module to path
ai_src = Path(__file__).parent / "ai" / "src"
sys.path.insert(0, str(ai_src))
from fuzzforge_ai.a2a_wrapper import send_agent_task, get_agent_config
async def test_basic_task():
"""Test sending a basic task to the agent"""
print("=" * 80)
print("Test 1: Basic task without model specification")
print("=" * 80)
result = await send_agent_task(
url="http://127.0.0.1:10900/a2a/litellm_agent",
message="What is 2+2? Answer in one sentence.",
timeout=30
)
print(f"Context ID: {result.context_id}")
print(f"Response:\n{result.text}")
print()
return result.context_id
async def test_with_model_and_prompt():
"""Test sending a task with custom model and prompt"""
print("=" * 80)
print("Test 2: Task with model and prompt specification")
print("=" * 80)
result = await send_agent_task(
url="http://127.0.0.1:10900/a2a/litellm_agent",
model="gpt-4o-mini",
provider="openai",
prompt="You are a concise Python expert. Answer in 2 sentences max.",
message="Write a simple Python function that checks if a number is prime.",
context="python_test",
timeout=60
)
print(f"Context ID: {result.context_id}")
print(f"Response:\n{result.text}")
print()
return result.context_id
async def test_fuzzing_task():
"""Test a fuzzing-related task"""
print("=" * 80)
print("Test 3: Fuzzing harness generation task")
print("=" * 80)
result = await send_agent_task(
url="http://127.0.0.1:10900/a2a/litellm_agent",
model="gpt-4o-mini",
provider="openai",
prompt="You are a security testing expert. Provide practical, working code.",
message="Generate a simple fuzzing harness for a C function that parses JSON strings. Include only the essential code.",
context="fuzzing_session",
timeout=90
)
print(f"Context ID: {result.context_id}")
print(f"Response:\n{result.text}")
print()
async def test_get_config():
"""Test getting agent configuration"""
print("=" * 80)
print("Test 4: Get agent configuration")
print("=" * 80)
config = await get_agent_config(
url="http://127.0.0.1:10900/a2a/litellm_agent",
timeout=30
)
print(f"Agent Config:\n{config}")
print()
async def test_multi_turn():
"""Test multi-turn conversation with same context"""
print("=" * 80)
print("Test 5: Multi-turn conversation")
print("=" * 80)
# First message
result1 = await send_agent_task(
url="http://127.0.0.1:10900/a2a/litellm_agent",
message="What is the capital of France?",
context="geography_quiz",
timeout=30
)
print(f"Q1: What is the capital of France?")
print(f"A1: {result1.text}")
print()
# Follow-up in same context
result2 = await send_agent_task(
url="http://127.0.0.1:10900/a2a/litellm_agent",
message="What is the population of that city?",
context="geography_quiz", # Same context
timeout=30
)
print(f"Q2: What is the population of that city?")
print(f"A2: {result2.text}")
print()
async def main():
"""Run all tests"""
print("\n" + "=" * 80)
print("FuzzForge A2A Wrapper Test Suite")
print("=" * 80 + "\n")
try:
# Run tests
await test_basic_task()
await test_with_model_and_prompt()
await test_fuzzing_task()
await test_get_config()
await test_multi_turn()
print("=" * 80)
print("✅ All tests completed successfully!")
print("=" * 80)
except Exception as e:
print(f"\n❌ Test failed with error: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)

17
volumes/env/.env.example vendored Normal file
View File

@@ -0,0 +1,17 @@
# FuzzForge Agent Configuration
# Copy this to .env and configure your API keys
# LiteLLM Model Configuration
LITELLM_MODEL=gemini/gemini-2.0-flash-001
# LITELLM_PROVIDER=gemini
# API Keys (uncomment and configure as needed)
# GOOGLE_API_KEY=
# OPENAI_API_KEY=
# ANTHROPIC_API_KEY=
# OPENROUTER_API_KEY=
# MISTRAL_API_KEY=
# Agent Configuration
# DEFAULT_TIMEOUT=120
# DEFAULT_CONTEXT_ID=default

22
volumes/env/README.md vendored Normal file
View File

@@ -0,0 +1,22 @@
# FuzzForge Environment Configuration
This directory contains environment files that are mounted into Docker containers.
## Files
- `.env.example` - Template configuration file
- `.env` - Your actual configuration (create by copying .env.example)
## Usage
1. Copy the example file:
```bash
cp .env.example .env
```
2. Edit `.env` and add your API keys
3. Restart Docker containers to apply changes:
```bash
docker-compose restart
```