diff --git a/ai/src/fuzzforge_ai/agent.py b/ai/src/fuzzforge_ai/agent.py index 0cedc7a..f535b2b 100644 --- a/ai/src/fuzzforge_ai/agent.py +++ b/ai/src/fuzzforge_ai/agent.py @@ -16,12 +16,15 @@ The core agent that combines all components import os from pathlib import Path -from typing import Dict, Any, List +from typing import Dict, Any, List, Optional from google.adk import Agent from google.adk.models.lite_llm import LiteLlm from .agent_card import get_fuzzforge_agent_card from .agent_executor import FuzzForgeExecutor from .memory_service import FuzzForgeMemoryService, HybridMemoryManager +from .mcp.cognee_mcp_client import CogneeMCPTools +from .config_bridge import ProjectConfigManager +from .utils.project_env import load_project_env # Load environment variables from the AI module's .env file try: @@ -45,29 +48,62 @@ class FuzzForgeAgent: port: int = 10100, ): """Initialize FuzzForge agent with configuration""" + load_project_env() + self.model = model or os.getenv('LITELLM_MODEL', 'gpt-4o-mini') - self.cognee_url = cognee_url or os.getenv('COGNEE_MCP_URL') + self.cognee_url = cognee_url or os.getenv('COGNEE_MCP_URL') or 'http://localhost:18001/mcp' + os.environ.setdefault('COGNEE_MCP_URL', self.cognee_url) self.port = port + project_context: Dict[str, Any] = {} + self.project_config: Optional[ProjectConfigManager] = None + try: + self.project_config = ProjectConfigManager() + self.project_config.setup_cognee_environment() + project_context = self.project_config.get_project_context() + except Exception: + self.project_config = None + # Initialize ADK Memory Service for conversational memory memory_type = os.getenv('MEMORY_SERVICE', 'inmemory') self.memory_service = FuzzForgeMemoryService(memory_type=memory_type) - + + default_dataset = ( + os.getenv('COGNEE_DEFAULT_DATASET') + or os.getenv('COGNEE_DATASET_NAME') + ) + if not default_dataset and project_context.get('project_name'): + default_dataset = f"{project_context['project_name']}_codebase" + + cognee_email = os.getenv('COGNEE_SERVICE_USER_EMAIL') or os.getenv('DEFAULT_USER_EMAIL') + cognee_password = os.getenv('COGNEE_SERVICE_USER_PASSWORD') or os.getenv('DEFAULT_USER_PASSWORD') + service_url = os.getenv('COGNEE_SERVICE_URL') or os.getenv('COGNEE_API_URL') + + self.cognee_tools: Optional[CogneeMCPTools] = None + if self.cognee_url: + self.cognee_tools = CogneeMCPTools( + base_url=self.cognee_url, + default_dataset=default_dataset, + default_email=cognee_email, + default_password=cognee_password, + default_service_url=service_url, + ) + # Create the executor (the brain) with memory and session services self.executor = FuzzForgeExecutor( model=self.model, cognee_url=self.cognee_url, debug=os.getenv('FUZZFORGE_DEBUG', '0') == '1', memory_service=self.memory_service, + memory_manager=self.memory_manager, session_persistence=os.getenv('SESSION_PERSISTENCE', 'inmemory'), fuzzforge_mcp_url=os.getenv('FUZZFORGE_MCP_URL'), ) # Create Hybrid Memory Manager (ADK + Cognee direct integration) - # MCP tools removed - using direct Cognee integration only self.memory_manager = HybridMemoryManager( memory_service=self.memory_service, - cognee_tools=None # No MCP tools, direct integration used instead + cognee_tools=self.cognee_tools, ) # Get the agent card (the identity) diff --git a/ai/src/fuzzforge_ai/agent_executor.py b/ai/src/fuzzforge_ai/agent_executor.py index 6c0be70..9eb0d8f 100644 --- a/ai/src/fuzzforge_ai/agent_executor.py +++ b/ai/src/fuzzforge_ai/agent_executor.py @@ -80,6 +80,7 @@ class FuzzForgeExecutor: cognee_url: str = None, debug: bool = False, memory_service=None, + memory_manager=None, session_persistence: str = None, fuzzforge_mcp_url: str = None, ): @@ -88,6 +89,7 @@ class FuzzForgeExecutor: self.cognee_url = cognee_url or os.getenv('COGNEE_MCP_URL') self.debug = debug self.memory_service = memory_service # ADK memory service + self.memory_manager = memory_manager self.session_persistence = session_persistence or os.getenv('SESSION_PERSISTENCE', 'inmemory') self.fuzzforge_mcp_url = fuzzforge_mcp_url or os.getenv('FUZZFORGE_MCP_URL') self._background_tasks: set[asyncio.Task] = set() @@ -308,13 +310,13 @@ class FuzzForgeExecutor: return self._knowledge_integration try: - from .cognee_integration import CogneeProjectIntegration + from .cognee_integration import create_cognee_integration - integration = CogneeProjectIntegration() + integration = create_cognee_integration() initialised = await integration.initialize() if not initialised: if self.debug: - print("[DEBUG] CogneeProjectIntegration initialization failed") + print("[DEBUG] Cognee integration initialization failed") return None self._knowledge_integration = integration @@ -345,6 +347,11 @@ class FuzzForgeExecutor: async def cognee_search(query: str) -> str: """Search Cognee knowledge graph memory""" try: + if self.memory_manager and getattr(self.memory_manager, "cognee_tools", None): + results = await self.memory_manager.search_knowledge_graph(query) + if isinstance(results, (dict, list)): + return json.dumps(results, indent=2) + return str(results) if self.cognee_service: results = await self.cognee_service.search_memory(query) return f"Cognee search results: {results}" @@ -554,8 +561,8 @@ class FuzzForgeExecutor: async def cognify_information(text: str) -> str: """Transform information into knowledge graph format""" try: - from .cognee_integration import CogneeProjectIntegration - integration = CogneeProjectIntegration() + from .cognee_integration import create_cognee_integration + integration = create_cognee_integration() result = await integration.cognify_text(text) if "error" in result: diff --git a/ai/src/fuzzforge_ai/cognee_integration.py b/ai/src/fuzzforge_ai/cognee_integration.py index 2f134ce..62621ff 100644 --- a/ai/src/fuzzforge_ai/cognee_integration.py +++ b/ai/src/fuzzforge_ai/cognee_integration.py @@ -21,6 +21,10 @@ import json from typing import Dict, List, Any, Optional, Union from pathlib import Path +from .config_bridge import ProjectConfigManager +from .mcp.cognee_mcp_client import CogneeMCPTools +from .utils.project_env import load_project_env + class CogneeProjectIntegration: """ @@ -363,7 +367,7 @@ async def search_project_codebase(query: str, project_dir: Optional[str] = None, Returns: Formatted search results as string """ - cognee_integration = CogneeProjectIntegration(project_dir) + cognee_integration = create_cognee_integration(project_dir) result = await cognee_integration.search_knowledge_graph(query, search_type, dataset) if "error" in result: @@ -402,7 +406,118 @@ async def search_project_codebase(query: str, project_dir: Optional[str] = None, else: output += f"{str(results)[:500]}..." - return output + return output + + +class CogneeServiceIntegration: + """Cognee integration backed by the MCP service.""" + + def __init__(self, project_dir: Optional[str] = None): + self.project_dir = Path(project_dir) if project_dir else Path.cwd() + self.project_manager: Optional[ProjectConfigManager] = None + self.project_context: Dict[str, Any] = {} + self.cognee_tools: Optional[CogneeMCPTools] = None + self.dataset: Optional[str] = None + self._initialized = False + + async def initialize(self) -> bool: + if self._initialized: + return True + + try: + load_project_env(self.project_dir) + self.project_manager = ProjectConfigManager(self.project_dir) + self.project_manager.setup_cognee_environment() + self.project_context = self.project_manager.get_project_context() + except Exception: + return False + + dataset = os.getenv("COGNEE_DEFAULT_DATASET") or os.getenv("COGNEE_DATASET_NAME") + if not dataset: + project_name = self.project_context.get("project_name") or "fuzzforge" + dataset = f"{project_name}_codebase" + self.dataset = dataset + + mcp_url = os.getenv("COGNEE_MCP_URL") or "http://localhost:18001/mcp" + service_url = os.getenv("COGNEE_SERVICE_URL") or os.getenv("COGNEE_API_URL") + user_email = os.getenv("COGNEE_SERVICE_USER_EMAIL") or os.getenv("DEFAULT_USER_EMAIL") + user_password = os.getenv("COGNEE_SERVICE_USER_PASSWORD") or os.getenv("DEFAULT_USER_PASSWORD") + + self.cognee_tools = CogneeMCPTools( + base_url=mcp_url, + default_dataset=self.dataset, + default_email=user_email, + default_password=user_password, + default_service_url=service_url, + ) + + os.environ.setdefault("COGNEE_MCP_URL", mcp_url) + self._initialized = True + return True + + async def search_knowledge_graph( + self, + query: str, + search_type: str = "GRAPH_COMPLETION", + dataset: Optional[str] = None, + **kwargs: Any, + ) -> Dict[str, Any]: + if not self._initialized and not await self.initialize(): + return {"error": "Cognee service not initialised"} + if not self.cognee_tools: + return {"error": "Cognee tools unavailable"} + + target_dataset = dataset or self.dataset + result = await self.cognee_tools.search( + query=query, + search_type=search_type, + dataset=target_dataset, + **kwargs, + ) + return { + "tenant": result.get("tenant") if isinstance(result, dict) else None, + "dataset": target_dataset, + "search_type": search_type, + "results": result, + } + + async def list_knowledge_data(self) -> Dict[str, Any]: + if not self._initialized and not await self.initialize(): + return {"error": "Cognee service not initialised"} + if not self.cognee_tools: + return {"error": "Cognee tools unavailable"} + + datasets = await self.cognee_tools.list_datasets() + return { + "project": self.project_context.get("project_name"), + "available_data": datasets, + } + + async def ingest_text_to_dataset(self, text: str, dataset: str = None) -> Dict[str, Any]: + if not self._initialized and not await self.initialize(): + return {"error": "Cognee service not initialised"} + + target_dataset = dataset or self.dataset + try: + from .cognee_service import CogneeService + + service = CogneeService(self.project_manager) + await service.initialize() + success = await service.ingest_text(text, target_dataset) + if success: + await self.cognee_tools.cognify(dataset=target_dataset, run_in_background=True) + return {"status": "queued", "dataset": target_dataset} + return {"error": "Cognee ingestion failed"} + except Exception as exc: + return {"error": f"Service ingestion failed: {exc}"} + + +def create_cognee_integration(project_dir: Optional[str] = None): + storage_mode = (os.getenv("COGNEE_STORAGE_MODE") or "").lower() + service_url = os.getenv("COGNEE_SERVICE_URL") or os.getenv("COGNEE_API_URL") + if storage_mode == "service" or service_url: + return CogneeServiceIntegration(project_dir) + return CogneeProjectIntegration(project_dir) async def list_project_knowledge(project_dir: Optional[str] = None) -> str: @@ -415,7 +530,7 @@ async def list_project_knowledge(project_dir: Optional[str] = None) -> str: Returns: Formatted list of available data """ - cognee_integration = CogneeProjectIntegration(project_dir) + cognee_integration = create_cognee_integration(project_dir) result = await cognee_integration.list_knowledge_data() if "error" in result: diff --git a/ai/src/fuzzforge_ai/mcp/cognee_mcp_client.py b/ai/src/fuzzforge_ai/mcp/cognee_mcp_client.py new file mode 100644 index 0000000..098650f --- /dev/null +++ b/ai/src/fuzzforge_ai/mcp/cognee_mcp_client.py @@ -0,0 +1,166 @@ +"""Async helper for interacting with the Cognee MCP mirror.""" + +from __future__ import annotations + +import asyncio +import json +import os +from dataclasses import dataclass +from typing import Any, Dict, Iterable, Optional + +from mcp.client import streamable_http +from mcp.client.session import ClientSession +from mcp.types import Implementation, CallToolResult + +from fuzzforge_ai import __version__ + + +@dataclass +class MCPCallResult: + raw: CallToolResult + payload: Any + + +class CogneeMCPTools: + """Minimal client for invoking Cognee MCP tools.""" + + def __init__( + self, + base_url: str, + *, + default_dataset: Optional[str] = None, + default_email: Optional[str] = None, + default_password: Optional[str] = None, + default_service_url: Optional[str] = None, + ) -> None: + self.base_url = base_url.rstrip("/") + self.default_dataset = default_dataset + self.default_email = default_email + self.default_password = default_password + self.default_service_url = default_service_url + self._client_info = Implementation(name="FuzzForge-Agent", version=__version__) + self._lock = asyncio.Lock() + + async def _call_tool(self, name: str, arguments: Dict[str, Any]) -> MCPCallResult: + async with streamable_http.streamablehttp_client(self.base_url) as (read_stream, write_stream, _): + session = ClientSession(read_stream, write_stream, client_info=self._client_info) + await session.initialize() + result = await session.call_tool(name, arguments) + + if result.isError: + message = getattr(result, "message", None) or "Unknown MCP tool error" + raise RuntimeError(f"Tool '{name}' failed: {message}") + + payload = self._extract_payload(result) + return MCPCallResult(raw=result, payload=payload) + + def _with_defaults(self, overrides: Dict[str, Any]) -> Dict[str, Any]: + merged: Dict[str, Any] = {} + if self.default_email and "user_email" not in overrides: + merged["user_email"] = self.default_email + if self.default_password and "user_password" not in overrides: + merged["user_password"] = self.default_password + if self.default_service_url and "service_url" not in overrides: + merged["service_url"] = self.default_service_url + if self.default_dataset and "dataset" not in overrides and "dataset" not in merged: + merged["dataset"] = self.default_dataset + merged.update({k: v for k, v in overrides.items() if v is not None}) + return merged + + @staticmethod + def _extract_payload(result: CallToolResult) -> Any: + if result.content: + texts: list[str] = [] + for item in result.content: + text = getattr(item, "text", None) + if text: + texts.append(text) + if texts: + combined = "\n".join(texts).strip() + if combined: + try: + return json.loads(combined) + except json.JSONDecodeError: + return combined + return None + + async def list_datasets(self, **overrides: Any) -> Any: + args = self._with_defaults(overrides) + result = await self._call_tool("list_datasets", args) + return result.payload + + async def search( + self, + *, + query: str, + search_type: str = "CHUNKS", + top_k: Optional[int] = None, + only_context: Optional[bool] = None, + node_name: Optional[Iterable[str]] = None, + system_prompt: Optional[str] = None, + use_combined_context: Optional[bool] = None, + dataset: Optional[str] = None, + **overrides: Any, + ) -> Any: + args = { + "query": query, + "search_type": search_type, + "top_k": top_k, + "only_context": only_context, + "node_name": list(node_name) if node_name is not None else None, + "system_prompt": system_prompt, + "use_combined_context": use_combined_context, + "dataset": dataset, + } + payload = self._with_defaults({**args, **overrides}) + result = await self._call_tool("search", payload) + return result.payload + + async def cognify( + self, + *, + dataset: Optional[str] = None, + run_in_background: bool = False, + custom_prompt: Optional[str] = None, + **overrides: Any, + ) -> Any: + args = { + "dataset": dataset, + "run_in_background": run_in_background, + "custom_prompt": custom_prompt, + } + payload = self._with_defaults({**args, **overrides}) + result = await self._call_tool("cognify", payload) + return result.payload + + async def memify( + self, + *, + dataset: Optional[str] = None, + extraction_tasks: Optional[Iterable[str]] = None, + enrichment_tasks: Optional[Iterable[str]] = None, + data: Optional[str] = None, + node_name: Optional[Iterable[str]] = None, + run_in_background: bool = False, + **overrides: Any, + ) -> Any: + args = { + "dataset": dataset, + "extraction_tasks": list(extraction_tasks) if extraction_tasks is not None else None, + "enrichment_tasks": list(enrichment_tasks) if enrichment_tasks is not None else None, + "data": data, + "node_name": list(node_name) if node_name is not None else None, + "run_in_background": run_in_background, + } + payload = self._with_defaults({**args, **overrides}) + result = await self._call_tool("memify", payload) + return result.payload + + async def search_chunks(self, **kwargs: Any) -> Any: + return await self.search(search_type="CHUNKS", **kwargs) + + async def search_insights(self, **kwargs: Any) -> Any: + return await self.search(search_type="INSIGHTS", **kwargs) + + +__all__ = ["CogneeMCPTools"] diff --git a/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py b/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py index b5de61e..bfe43f2 100644 --- a/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py +++ b/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py @@ -11,11 +11,11 @@ from pathlib import Path from typing import Dict, Iterable, List, Optional from urllib.parse import urlparse -from dotenv import load_dotenv from mcp.server import FastMCP import mcp.types as types from fuzzforge_ai.cognee_api_client import CogneeAPIClient, CogneeAPIError +from fuzzforge_ai.utils.project_env import load_project_env logger = logging.getLogger(__name__) logging.basicConfig(level=os.getenv("MCP_LOG_LEVEL", "INFO")) @@ -23,14 +23,6 @@ logging.basicConfig(level=os.getenv("MCP_LOG_LEVEL", "INFO")) mcp = FastMCP("FuzzForge-Cognee") -def _load_project_env() -> None: - """Load environment variables from `.fuzzforge/.env` if present.""" - - env_path = Path.cwd() / ".fuzzforge" / ".env" - if env_path.exists(): - load_dotenv(env_path, override=False) - - class TenantCredentials: def __init__(self, alias: str, service_url: str, email: str, password: str, dataset: Optional[str]): self.alias = alias.lower() @@ -64,7 +56,7 @@ class CogneeMCPContext: TENANT_CONFIG_DEFAULT = Path(".fuzzforge") / "cognee.tenants.json" def __init__(self) -> None: - _load_project_env() + load_project_env() self.verify_ssl = os.getenv("COGNEE_VERIFY_SSL", "true").lower() != "false" self.tenants = self._load_tenants() if not self.tenants: diff --git a/ai/src/fuzzforge_ai/memory_service.py b/ai/src/fuzzforge_ai/memory_service.py index 8f2446d..ad33092 100644 --- a/ai/src/fuzzforge_ai/memory_service.py +++ b/ai/src/fuzzforge_ai/memory_service.py @@ -209,7 +209,13 @@ class HybridMemoryManager: """Search past conversations using ADK memory""" return await self.memory_service.search_memory(query) - async def search_knowledge_graph(self, query: str, search_type: str = "GRAPH_COMPLETION"): + async def search_knowledge_graph( + self, + query: str, + search_type: str = "GRAPH_COMPLETION", + dataset: Optional[str] = None, + **kwargs: Any, + ): """Search Cognee knowledge graph (for RAG/codebase in future)""" if not self.cognee_tools: return None @@ -218,20 +224,22 @@ class HybridMemoryManager: # Use Cognee's graph search return await self.cognee_tools.search( query=query, - search_type=search_type + search_type=search_type, + dataset=dataset, + **kwargs, ) except Exception as e: logger.debug(f"Cognee search failed: {e}") return None - async def store_in_graph(self, content: str): + async def store_in_graph(self, content: str, dataset: Optional[str] = None): """Store in Cognee knowledge graph (for codebase analysis later)""" if not self.cognee_tools: return None try: # Use cognify to create graph structures - return await self.cognee_tools.cognify(content) + return await self.cognee_tools.cognify(dataset=dataset) except Exception as e: logger.debug(f"Cognee store failed: {e}") return None @@ -244,4 +252,4 @@ class HybridMemoryManager: "active": self.cognee_tools is not None, "purpose": "RAG/codebase analysis (future)" } - } \ No newline at end of file + } diff --git a/ai/src/fuzzforge_ai/utils/project_env.py b/ai/src/fuzzforge_ai/utils/project_env.py new file mode 100644 index 0000000..87126f7 --- /dev/null +++ b/ai/src/fuzzforge_ai/utils/project_env.py @@ -0,0 +1,48 @@ +"""Helpers for loading project-scoped environment variables.""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Optional + +try: + from dotenv import load_dotenv +except ImportError: # pragma: no cover - optional dependency + load_dotenv = None # type: ignore + + +def load_project_env(start_dir: Optional[Path] = None, *, override: bool = False) -> None: + """Load `.fuzzforge/.env` walking up from the given directory. + + When running the AI agent outside of the CLI entrypoints we still want to + honour the project-scoped environment file generated by `fuzzforge init` so + credentials (Cognee, LLM provider, etc.) are available. + """ + + root = Path(start_dir) if start_dir else Path.cwd() + + # Check the current directory first, then walk parents until we find a + # project marker. + for directory in [root] + list(root.parents): + env_file = directory / ".fuzzforge" / ".env" + if env_file.exists(): + if load_dotenv: + load_dotenv(env_file, override=override) + else: # pragma: no cover - best effort fallback when python-dotenv missing + try: + for line in env_file.read_text(encoding="utf-8").splitlines(): + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + if "=" not in stripped: + continue + key, value = stripped.split("=", 1) + if override or key.strip() not in os.environ: + os.environ[key.strip()] = value.strip() + except Exception: + pass + break + + +__all__ = ["load_project_env"]