diff --git a/ai/src/fuzzforge_ai/agent_executor.py b/ai/src/fuzzforge_ai/agent_executor.py index 9eb0d8f..ff324ce 100644 --- a/ai/src/fuzzforge_ai/agent_executor.py +++ b/ai/src/fuzzforge_ai/agent_executor.py @@ -16,7 +16,7 @@ import base64 import time import uuid import json -from typing import Dict, Any, List, Union +from typing import Dict, Any, List, Union, Optional from datetime import datetime import os import warnings @@ -52,7 +52,12 @@ try: except ImportError: AGENTOPS_AVAILABLE = False -# MCP functionality removed - keeping direct Cognee integration only +try: + from google.adk.tools.mcp_tool.mcp_toolset import McpToolset + from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPConnectionParams + MCP_TOOLSET_AVAILABLE = True +except ImportError: # pragma: no cover - MCP integration optional + MCP_TOOLSET_AVAILABLE = False from google.genai.types import Part from a2a.types import ( @@ -97,6 +102,7 @@ class FuzzForgeExecutor: self.session_metadata: Dict[str, Dict[str, Any]] = {} self._artifact_cache_dir = Path(os.getenv('FUZZFORGE_ARTIFACT_DIR', Path.cwd() / '.fuzzforge' / 'artifacts')) self._knowledge_integration = None + self._mcp_toolset: Optional[McpToolset] = None # Initialize Cognee service if available self.cognee_service = None @@ -563,7 +569,7 @@ class FuzzForgeExecutor: try: from .cognee_integration import create_cognee_integration integration = create_cognee_integration() - result = await integration.cognify_text(text) + result = await integration.ingest_text_to_dataset(text) if "error" in result: return f"Error cognifying information: {result['error']}" @@ -580,7 +586,39 @@ class FuzzForgeExecutor: FunctionTool(cognify_information), FunctionTool(self.query_project_knowledge_api) ]) - + + if MCP_TOOLSET_AVAILABLE and self.cognee_url: + try: + params = StreamableHTTPConnectionParams( + url=self.cognee_url, + timeout=float(os.getenv("COGNEE_MCP_TIMEOUT", "5")), + sse_read_timeout=float(os.getenv("COGNEE_MCP_SSE_TIMEOUT", str(60 * 5))), + terminate_on_close=False, + ) + tool_filter = [ + "list_datasets", + "search", + "search_chunks", + "search_insights", + "cognify", + "cognify_status", + "ingest_file", + "memify", + ] + self._mcp_toolset = McpToolset( + connection_params=params, + tool_filter=tool_filter, + ) + tools.append(self._mcp_toolset) + if self.debug: + print(f"[DEBUG] Connected MCP toolset to {self.cognee_url}") + except Exception as exc: + self._mcp_toolset = None + if self.debug: + print(f"[DEBUG] Failed to initialise Cognee MCP toolset: {exc}") + else: + self._mcp_toolset = None + # Add project-local filesystem tools async def list_project_files(path: str, pattern: str) -> str: """List files in the current project directory with optional pattern @@ -1718,6 +1756,15 @@ Be concise and intelligent in your responses.""" except: pass + if self._mcp_toolset: + try: + await self._mcp_toolset.close() + except Exception as exc: + if self.debug: + print(f"[DEBUG] MCP toolset cleanup failed: {exc}") + finally: + self._mcp_toolset = None + # Cancel background monitors for task in list(self._background_tasks): task.cancel()