From 85420c2328ad53da4e08565b3ccf6b781b56c740 Mon Sep 17 00:00:00 2001 From: AFredefon Date: Mon, 13 Oct 2025 02:00:50 +0200 Subject: [PATCH] Fuzzforge agent store code ingested in artifact and transfer it to external agents --- ai/src/fuzzforge_ai/__main__.py | 2 +- ai/src/fuzzforge_ai/agent.py | 93 ++++++++++++++++++++- ai/src/fuzzforge_ai/agent_executor.py | 116 +++++++++++++++++++++++++- 3 files changed, 202 insertions(+), 9 deletions(-) diff --git a/ai/src/fuzzforge_ai/__main__.py b/ai/src/fuzzforge_ai/__main__.py index 9a3e73b..4d5d198 100644 --- a/ai/src/fuzzforge_ai/__main__.py +++ b/ai/src/fuzzforge_ai/__main__.py @@ -65,7 +65,7 @@ def create_a2a_app(): port = int(os.getenv('FUZZFORGE_PORT', 10100)) # Get the FuzzForge agent - fuzzforge = get_fuzzforge_agent() + fuzzforge = get_fuzzforge_agent(auto_start_server=False) # Print ASCII banner print("\033[95m") # Purple color diff --git a/ai/src/fuzzforge_ai/agent.py b/ai/src/fuzzforge_ai/agent.py index 0cedc7a..6929df5 100644 --- a/ai/src/fuzzforge_ai/agent.py +++ b/ai/src/fuzzforge_ai/agent.py @@ -15,8 +15,12 @@ The core agent that combines all components import os +import threading +import time +import socket +import asyncio from pathlib import Path -from typing import Dict, Any, List +from typing import Dict, Any, List, Optional from google.adk import Agent from google.adk.models.lite_llm import LiteLlm from .agent_card import get_fuzzforge_agent_card @@ -43,11 +47,19 @@ class FuzzForgeAgent: model: str = None, cognee_url: str = None, port: int = 10100, + auto_start_server: Optional[bool] = None, ): """Initialize FuzzForge agent with configuration""" self.model = model or os.getenv('LITELLM_MODEL', 'gpt-4o-mini') self.cognee_url = cognee_url or os.getenv('COGNEE_MCP_URL') - self.port = port + self.port = int(os.getenv('FUZZFORGE_PORT', port)) + self._auto_start_server = ( + auto_start_server + if auto_start_server is not None + else os.getenv('FUZZFORGE_AUTO_A2A_SERVER', '1') not in {'0', 'false', 'False'} + ) + self._uvicorn_server = None + self._a2a_server_thread: Optional[threading.Thread] = None # Initialize ADK Memory Service for conversational memory memory_type = os.getenv('MEMORY_SERVICE', 'inmemory') @@ -75,6 +87,9 @@ class FuzzForgeAgent: # Create the ADK agent (for A2A server mode) self.adk_agent = self._create_adk_agent() + + if self._auto_start_server: + self._ensure_a2a_server_running() def _create_adk_agent(self) -> Agent: """Create the ADK agent for A2A server mode""" @@ -119,15 +134,85 @@ When responding to requests: async def cleanup(self): """Clean up resources""" + await self._stop_a2a_server() await self.executor.cleanup() + def _ensure_a2a_server_running(self): + """Start the A2A server in the background if it's not already running.""" + if self._a2a_server_thread and self._a2a_server_thread.is_alive(): + return + + try: + from uvicorn import Config, Server + from .a2a_server import create_a2a_app as create_custom_a2a_app + except ImportError as exc: + if os.getenv('FUZZFORGE_DEBUG', '0') == '1': + print(f"[DEBUG] Unable to start A2A server automatically: {exc}") + return + + app = create_custom_a2a_app( + self.adk_agent, + port=self.port, + executor=self.executor, + ) + + log_level = os.getenv('FUZZFORGE_UVICORN_LOG_LEVEL', 'error') + config = Config(app=app, host='127.0.0.1', port=self.port, log_level=log_level, loop='asyncio') + server = Server(config=config) + self._uvicorn_server = server + + def _run_server(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + async def _serve(): + await server.serve() + + try: + loop.run_until_complete(_serve()) + finally: + loop.close() + + thread = threading.Thread(target=_run_server, name='FuzzForgeA2AServer', daemon=True) + thread.start() + self._a2a_server_thread = thread + + # Give the server a moment to bind to the port for downstream agents + for _ in range(50): + if server.should_exit: + break + try: + with socket.create_connection(('127.0.0.1', self.port), timeout=0.1): + if os.getenv('FUZZFORGE_DEBUG', '0') == '1': + print(f"[DEBUG] Auto-started A2A server on http://127.0.0.1:{self.port}") + break + except OSError: + time.sleep(0.1) + + async def _stop_a2a_server(self): + """Shut down the background A2A server if we started one.""" + server = self._uvicorn_server + if server is None: + return + + server.should_exit = True + if self._a2a_server_thread and self._a2a_server_thread.is_alive(): + # Allow server loop to exit gracefully without blocking event loop + try: + await asyncio.wait_for(asyncio.to_thread(self._a2a_server_thread.join, 5), timeout=6) + except (asyncio.TimeoutError, RuntimeError): + pass + + self._uvicorn_server = None + self._a2a_server_thread = None + # Create a singleton instance for import _instance = None -def get_fuzzforge_agent() -> FuzzForgeAgent: +def get_fuzzforge_agent(auto_start_server: Optional[bool] = None) -> FuzzForgeAgent: """Get the singleton FuzzForge agent instance""" global _instance if _instance is None: - _instance = FuzzForgeAgent() + _instance = FuzzForgeAgent(auto_start_server=auto_start_server) return _instance diff --git a/ai/src/fuzzforge_ai/agent_executor.py b/ai/src/fuzzforge_ai/agent_executor.py index 6c0be70..b22b899 100644 --- a/ai/src/fuzzforge_ai/agent_executor.py +++ b/ai/src/fuzzforge_ai/agent_executor.py @@ -16,7 +16,7 @@ import base64 import time import uuid import json -from typing import Dict, Any, List, Union +from typing import Dict, Any, List, Union, Optional from datetime import datetime import os import warnings @@ -93,7 +93,8 @@ class FuzzForgeExecutor: self._background_tasks: set[asyncio.Task] = set() self.pending_runs: Dict[str, Dict[str, Any]] = {} self.session_metadata: Dict[str, Dict[str, Any]] = {} - self._artifact_cache_dir = Path(os.getenv('FUZZFORGE_ARTIFACT_DIR', Path.cwd() / '.fuzzforge' / 'artifacts')) + self._project_root = self._detect_project_root() + self._artifact_cache_dir = self._resolve_artifact_cache_dir() self._knowledge_integration = None # Initialize Cognee service if available @@ -194,6 +195,38 @@ class FuzzForgeExecutor: if self.debug: print(f"[DEBUG] Auto-registration error for {url}: {e}") + def _detect_project_root(self) -> Optional[Path]: + """Locate the active FuzzForge project root directory if available.""" + env_root = os.getenv('FUZZFORGE_PROJECT_DIR') + if env_root: + candidate = Path(env_root).expanduser().resolve() + if candidate.joinpath('.fuzzforge').is_dir(): + return candidate + + try: + config = ProjectConfigManager() + return config.config_path.parent.resolve() + except Exception: + pass + + current = Path.cwd().resolve() + for path in (current,) + tuple(current.parents): + if path.joinpath('.fuzzforge').is_dir(): + return path + return None + + def _resolve_artifact_cache_dir(self) -> Path: + """Determine the artifact cache directory, prioritizing project context.""" + env_dir = os.getenv('FUZZFORGE_ARTIFACT_DIR') + if env_dir: + return Path(env_dir).expanduser().resolve() + + project_root = self._project_root + if project_root: + return (project_root / '.fuzzforge' / 'artifacts').resolve() + + return (Path.cwd() / '.fuzzforge' / 'artifacts').resolve() + def _create_artifact_service(self): """Create artifact service based on configuration""" artifact_storage = os.getenv('ARTIFACT_STORAGE', 'inmemory') @@ -788,6 +821,39 @@ class FuzzForgeExecutor: tools.append(FunctionTool(send_file_to_agent)) + async def send_code_snippet_to_agent( + agent_name: str, + code: str, + filename: str = "", + note: str = "", + tool_context: ToolContext | None = None, + ) -> str: + """Create an artifact from raw code and send it to a registered agent.""" + if not agent_name: + return "agent_name is required" + if not code or not code.strip(): + return "code is required" + + session = None + context_id = None + if tool_context and getattr(tool_context, "invocation_context", None): + invocation = tool_context.invocation_context + session = invocation.session + context_id = self.session_lookup.get(getattr(session, 'id', None)) + + target_filename = filename or "snippet.rs" + snippet_note = note or "Please analyse the provided code snippet." + return await self.delegate_code_snippet_to_agent( + agent_name, + target_filename, + code, + note=snippet_note, + session=session, + context_id=context_id, + ) + + tools.append(FunctionTool(send_code_snippet_to_agent)) + if self.debug: print("[DEBUG] Added Cognee project integration tools") @@ -1886,11 +1952,14 @@ Be concise and intelligent in your responses.""" async def create_project_file_artifact_api(self, file_path: str) -> Dict[str, Any]: try: - config = ProjectConfigManager() + config = ProjectConfigManager(self._project_root) if self._project_root else ProjectConfigManager() if not config.is_initialized(): return {"error": "Project not initialized. Run 'fuzzforge init' first."} - project_root = config.config_path.parent.resolve() + project_root = self._project_root or config.config_path.parent.resolve() + if self._project_root is None: + self._project_root = project_root + self._artifact_cache_dir = self._resolve_artifact_cache_dir() requested_file = (project_root / file_path).resolve() try: @@ -2101,6 +2170,45 @@ Be concise and intelligent in your responses.""" await self._append_external_event(session, agent_name, response_text) return response_text + async def delegate_code_snippet_to_agent( + self, + agent_name: str, + filename: str, + code: str, + note: str = "", + session: Any = None, + context_id: str | None = None, + ) -> str: + try: + if not code or not code.strip(): + return "No code snippet provided for delegation." + + cache_dir = self._prepare_artifact_cache_dir() + artifact_id = uuid.uuid4().hex + + # Normalise filename and ensure extension + safe_filename = (filename or "snippet.rs").strip() + if not safe_filename: + safe_filename = "snippet.rs" + if "." not in safe_filename: + safe_filename = f"{safe_filename}.rs" + + snippet_dir = cache_dir / artifact_id + snippet_dir.mkdir(parents=True, exist_ok=True) + file_path = snippet_dir / safe_filename + file_path.write_text(code, encoding="utf-8") + + message_note = note or f"Please analyse the code snippet {safe_filename}." + return await self.delegate_file_to_agent( + agent_name, + str(file_path), + message_note, + session=session, + context_id=context_id, + ) + except Exception as exc: + return f"Failed to delegate code snippet: {exc}" + async def delegate_file_to_agent( self, agent_name: str,