Fuzzforge agent store code ingested in artifact and transfer it to external agents

This commit is contained in:
AFredefon
2025-10-13 02:00:50 +02:00
parent 4ad44332ee
commit 85420c2328
3 changed files with 202 additions and 9 deletions

View File

@@ -65,7 +65,7 @@ def create_a2a_app():
port = int(os.getenv('FUZZFORGE_PORT', 10100))
# Get the FuzzForge agent
fuzzforge = get_fuzzforge_agent()
fuzzforge = get_fuzzforge_agent(auto_start_server=False)
# Print ASCII banner
print("\033[95m") # Purple color

View File

@@ -15,8 +15,12 @@ The core agent that combines all components
import os
import threading
import time
import socket
import asyncio
from pathlib import Path
from typing import Dict, Any, List
from typing import Dict, Any, List, Optional
from google.adk import Agent
from google.adk.models.lite_llm import LiteLlm
from .agent_card import get_fuzzforge_agent_card
@@ -43,11 +47,19 @@ class FuzzForgeAgent:
model: str = None,
cognee_url: str = None,
port: int = 10100,
auto_start_server: Optional[bool] = None,
):
"""Initialize FuzzForge agent with configuration"""
self.model = model or os.getenv('LITELLM_MODEL', 'gpt-4o-mini')
self.cognee_url = cognee_url or os.getenv('COGNEE_MCP_URL')
self.port = port
self.port = int(os.getenv('FUZZFORGE_PORT', port))
self._auto_start_server = (
auto_start_server
if auto_start_server is not None
else os.getenv('FUZZFORGE_AUTO_A2A_SERVER', '1') not in {'0', 'false', 'False'}
)
self._uvicorn_server = None
self._a2a_server_thread: Optional[threading.Thread] = None
# Initialize ADK Memory Service for conversational memory
memory_type = os.getenv('MEMORY_SERVICE', 'inmemory')
@@ -75,6 +87,9 @@ class FuzzForgeAgent:
# Create the ADK agent (for A2A server mode)
self.adk_agent = self._create_adk_agent()
if self._auto_start_server:
self._ensure_a2a_server_running()
def _create_adk_agent(self) -> Agent:
"""Create the ADK agent for A2A server mode"""
@@ -119,15 +134,85 @@ When responding to requests:
async def cleanup(self):
"""Clean up resources"""
await self._stop_a2a_server()
await self.executor.cleanup()
def _ensure_a2a_server_running(self):
"""Start the A2A server in the background if it's not already running."""
if self._a2a_server_thread and self._a2a_server_thread.is_alive():
return
try:
from uvicorn import Config, Server
from .a2a_server import create_a2a_app as create_custom_a2a_app
except ImportError as exc:
if os.getenv('FUZZFORGE_DEBUG', '0') == '1':
print(f"[DEBUG] Unable to start A2A server automatically: {exc}")
return
app = create_custom_a2a_app(
self.adk_agent,
port=self.port,
executor=self.executor,
)
log_level = os.getenv('FUZZFORGE_UVICORN_LOG_LEVEL', 'error')
config = Config(app=app, host='127.0.0.1', port=self.port, log_level=log_level, loop='asyncio')
server = Server(config=config)
self._uvicorn_server = server
def _run_server():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def _serve():
await server.serve()
try:
loop.run_until_complete(_serve())
finally:
loop.close()
thread = threading.Thread(target=_run_server, name='FuzzForgeA2AServer', daemon=True)
thread.start()
self._a2a_server_thread = thread
# Give the server a moment to bind to the port for downstream agents
for _ in range(50):
if server.should_exit:
break
try:
with socket.create_connection(('127.0.0.1', self.port), timeout=0.1):
if os.getenv('FUZZFORGE_DEBUG', '0') == '1':
print(f"[DEBUG] Auto-started A2A server on http://127.0.0.1:{self.port}")
break
except OSError:
time.sleep(0.1)
async def _stop_a2a_server(self):
"""Shut down the background A2A server if we started one."""
server = self._uvicorn_server
if server is None:
return
server.should_exit = True
if self._a2a_server_thread and self._a2a_server_thread.is_alive():
# Allow server loop to exit gracefully without blocking event loop
try:
await asyncio.wait_for(asyncio.to_thread(self._a2a_server_thread.join, 5), timeout=6)
except (asyncio.TimeoutError, RuntimeError):
pass
self._uvicorn_server = None
self._a2a_server_thread = None
# Create a singleton instance for import
_instance = None
def get_fuzzforge_agent() -> FuzzForgeAgent:
def get_fuzzforge_agent(auto_start_server: Optional[bool] = None) -> FuzzForgeAgent:
"""Get the singleton FuzzForge agent instance"""
global _instance
if _instance is None:
_instance = FuzzForgeAgent()
_instance = FuzzForgeAgent(auto_start_server=auto_start_server)
return _instance

View File

@@ -16,7 +16,7 @@ import base64
import time
import uuid
import json
from typing import Dict, Any, List, Union
from typing import Dict, Any, List, Union, Optional
from datetime import datetime
import os
import warnings
@@ -93,7 +93,8 @@ class FuzzForgeExecutor:
self._background_tasks: set[asyncio.Task] = set()
self.pending_runs: Dict[str, Dict[str, Any]] = {}
self.session_metadata: Dict[str, Dict[str, Any]] = {}
self._artifact_cache_dir = Path(os.getenv('FUZZFORGE_ARTIFACT_DIR', Path.cwd() / '.fuzzforge' / 'artifacts'))
self._project_root = self._detect_project_root()
self._artifact_cache_dir = self._resolve_artifact_cache_dir()
self._knowledge_integration = None
# Initialize Cognee service if available
@@ -194,6 +195,38 @@ class FuzzForgeExecutor:
if self.debug:
print(f"[DEBUG] Auto-registration error for {url}: {e}")
def _detect_project_root(self) -> Optional[Path]:
"""Locate the active FuzzForge project root directory if available."""
env_root = os.getenv('FUZZFORGE_PROJECT_DIR')
if env_root:
candidate = Path(env_root).expanduser().resolve()
if candidate.joinpath('.fuzzforge').is_dir():
return candidate
try:
config = ProjectConfigManager()
return config.config_path.parent.resolve()
except Exception:
pass
current = Path.cwd().resolve()
for path in (current,) + tuple(current.parents):
if path.joinpath('.fuzzforge').is_dir():
return path
return None
def _resolve_artifact_cache_dir(self) -> Path:
"""Determine the artifact cache directory, prioritizing project context."""
env_dir = os.getenv('FUZZFORGE_ARTIFACT_DIR')
if env_dir:
return Path(env_dir).expanduser().resolve()
project_root = self._project_root
if project_root:
return (project_root / '.fuzzforge' / 'artifacts').resolve()
return (Path.cwd() / '.fuzzforge' / 'artifacts').resolve()
def _create_artifact_service(self):
"""Create artifact service based on configuration"""
artifact_storage = os.getenv('ARTIFACT_STORAGE', 'inmemory')
@@ -788,6 +821,39 @@ class FuzzForgeExecutor:
tools.append(FunctionTool(send_file_to_agent))
async def send_code_snippet_to_agent(
agent_name: str,
code: str,
filename: str = "",
note: str = "",
tool_context: ToolContext | None = None,
) -> str:
"""Create an artifact from raw code and send it to a registered agent."""
if not agent_name:
return "agent_name is required"
if not code or not code.strip():
return "code is required"
session = None
context_id = None
if tool_context and getattr(tool_context, "invocation_context", None):
invocation = tool_context.invocation_context
session = invocation.session
context_id = self.session_lookup.get(getattr(session, 'id', None))
target_filename = filename or "snippet.rs"
snippet_note = note or "Please analyse the provided code snippet."
return await self.delegate_code_snippet_to_agent(
agent_name,
target_filename,
code,
note=snippet_note,
session=session,
context_id=context_id,
)
tools.append(FunctionTool(send_code_snippet_to_agent))
if self.debug:
print("[DEBUG] Added Cognee project integration tools")
@@ -1886,11 +1952,14 @@ Be concise and intelligent in your responses."""
async def create_project_file_artifact_api(self, file_path: str) -> Dict[str, Any]:
try:
config = ProjectConfigManager()
config = ProjectConfigManager(self._project_root) if self._project_root else ProjectConfigManager()
if not config.is_initialized():
return {"error": "Project not initialized. Run 'fuzzforge init' first."}
project_root = config.config_path.parent.resolve()
project_root = self._project_root or config.config_path.parent.resolve()
if self._project_root is None:
self._project_root = project_root
self._artifact_cache_dir = self._resolve_artifact_cache_dir()
requested_file = (project_root / file_path).resolve()
try:
@@ -2101,6 +2170,45 @@ Be concise and intelligent in your responses."""
await self._append_external_event(session, agent_name, response_text)
return response_text
async def delegate_code_snippet_to_agent(
self,
agent_name: str,
filename: str,
code: str,
note: str = "",
session: Any = None,
context_id: str | None = None,
) -> str:
try:
if not code or not code.strip():
return "No code snippet provided for delegation."
cache_dir = self._prepare_artifact_cache_dir()
artifact_id = uuid.uuid4().hex
# Normalise filename and ensure extension
safe_filename = (filename or "snippet.rs").strip()
if not safe_filename:
safe_filename = "snippet.rs"
if "." not in safe_filename:
safe_filename = f"{safe_filename}.rs"
snippet_dir = cache_dir / artifact_id
snippet_dir.mkdir(parents=True, exist_ok=True)
file_path = snippet_dir / safe_filename
file_path.write_text(code, encoding="utf-8")
message_note = note or f"Please analyse the code snippet {safe_filename}."
return await self.delegate_file_to_agent(
agent_name,
str(file_path),
message_note,
session=session,
context_id=context_id,
)
except Exception as exc:
return f"Failed to delegate code snippet: {exc}"
async def delegate_file_to_agent(
self,
agent_name: str,