mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-13 02:32:47 +00:00
Wire FuzzForge agent to Cognee MCP tools
This commit is contained in:
@@ -16,12 +16,15 @@ The core agent that combines all components
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, List
|
||||
from typing import Dict, Any, List, Optional
|
||||
from google.adk import Agent
|
||||
from google.adk.models.lite_llm import LiteLlm
|
||||
from .agent_card import get_fuzzforge_agent_card
|
||||
from .agent_executor import FuzzForgeExecutor
|
||||
from .memory_service import FuzzForgeMemoryService, HybridMemoryManager
|
||||
from .mcp.cognee_mcp_client import CogneeMCPTools
|
||||
from .config_bridge import ProjectConfigManager
|
||||
from .utils.project_env import load_project_env
|
||||
|
||||
# Load environment variables from the AI module's .env file
|
||||
try:
|
||||
@@ -45,29 +48,62 @@ class FuzzForgeAgent:
|
||||
port: int = 10100,
|
||||
):
|
||||
"""Initialize FuzzForge agent with configuration"""
|
||||
load_project_env()
|
||||
|
||||
self.model = model or os.getenv('LITELLM_MODEL', 'gpt-4o-mini')
|
||||
self.cognee_url = cognee_url or os.getenv('COGNEE_MCP_URL')
|
||||
self.cognee_url = cognee_url or os.getenv('COGNEE_MCP_URL') or 'http://localhost:18001/mcp'
|
||||
os.environ.setdefault('COGNEE_MCP_URL', self.cognee_url)
|
||||
self.port = port
|
||||
|
||||
project_context: Dict[str, Any] = {}
|
||||
self.project_config: Optional[ProjectConfigManager] = None
|
||||
try:
|
||||
self.project_config = ProjectConfigManager()
|
||||
self.project_config.setup_cognee_environment()
|
||||
project_context = self.project_config.get_project_context()
|
||||
except Exception:
|
||||
self.project_config = None
|
||||
|
||||
# Initialize ADK Memory Service for conversational memory
|
||||
memory_type = os.getenv('MEMORY_SERVICE', 'inmemory')
|
||||
self.memory_service = FuzzForgeMemoryService(memory_type=memory_type)
|
||||
|
||||
|
||||
default_dataset = (
|
||||
os.getenv('COGNEE_DEFAULT_DATASET')
|
||||
or os.getenv('COGNEE_DATASET_NAME')
|
||||
)
|
||||
if not default_dataset and project_context.get('project_name'):
|
||||
default_dataset = f"{project_context['project_name']}_codebase"
|
||||
|
||||
cognee_email = os.getenv('COGNEE_SERVICE_USER_EMAIL') or os.getenv('DEFAULT_USER_EMAIL')
|
||||
cognee_password = os.getenv('COGNEE_SERVICE_USER_PASSWORD') or os.getenv('DEFAULT_USER_PASSWORD')
|
||||
service_url = os.getenv('COGNEE_SERVICE_URL') or os.getenv('COGNEE_API_URL')
|
||||
|
||||
self.cognee_tools: Optional[CogneeMCPTools] = None
|
||||
if self.cognee_url:
|
||||
self.cognee_tools = CogneeMCPTools(
|
||||
base_url=self.cognee_url,
|
||||
default_dataset=default_dataset,
|
||||
default_email=cognee_email,
|
||||
default_password=cognee_password,
|
||||
default_service_url=service_url,
|
||||
)
|
||||
|
||||
# Create the executor (the brain) with memory and session services
|
||||
self.executor = FuzzForgeExecutor(
|
||||
model=self.model,
|
||||
cognee_url=self.cognee_url,
|
||||
debug=os.getenv('FUZZFORGE_DEBUG', '0') == '1',
|
||||
memory_service=self.memory_service,
|
||||
memory_manager=self.memory_manager,
|
||||
session_persistence=os.getenv('SESSION_PERSISTENCE', 'inmemory'),
|
||||
fuzzforge_mcp_url=os.getenv('FUZZFORGE_MCP_URL'),
|
||||
)
|
||||
|
||||
# Create Hybrid Memory Manager (ADK + Cognee direct integration)
|
||||
# MCP tools removed - using direct Cognee integration only
|
||||
self.memory_manager = HybridMemoryManager(
|
||||
memory_service=self.memory_service,
|
||||
cognee_tools=None # No MCP tools, direct integration used instead
|
||||
cognee_tools=self.cognee_tools,
|
||||
)
|
||||
|
||||
# Get the agent card (the identity)
|
||||
|
||||
@@ -80,6 +80,7 @@ class FuzzForgeExecutor:
|
||||
cognee_url: str = None,
|
||||
debug: bool = False,
|
||||
memory_service=None,
|
||||
memory_manager=None,
|
||||
session_persistence: str = None,
|
||||
fuzzforge_mcp_url: str = None,
|
||||
):
|
||||
@@ -88,6 +89,7 @@ class FuzzForgeExecutor:
|
||||
self.cognee_url = cognee_url or os.getenv('COGNEE_MCP_URL')
|
||||
self.debug = debug
|
||||
self.memory_service = memory_service # ADK memory service
|
||||
self.memory_manager = memory_manager
|
||||
self.session_persistence = session_persistence or os.getenv('SESSION_PERSISTENCE', 'inmemory')
|
||||
self.fuzzforge_mcp_url = fuzzforge_mcp_url or os.getenv('FUZZFORGE_MCP_URL')
|
||||
self._background_tasks: set[asyncio.Task] = set()
|
||||
@@ -308,13 +310,13 @@ class FuzzForgeExecutor:
|
||||
return self._knowledge_integration
|
||||
|
||||
try:
|
||||
from .cognee_integration import CogneeProjectIntegration
|
||||
from .cognee_integration import create_cognee_integration
|
||||
|
||||
integration = CogneeProjectIntegration()
|
||||
integration = create_cognee_integration()
|
||||
initialised = await integration.initialize()
|
||||
if not initialised:
|
||||
if self.debug:
|
||||
print("[DEBUG] CogneeProjectIntegration initialization failed")
|
||||
print("[DEBUG] Cognee integration initialization failed")
|
||||
return None
|
||||
|
||||
self._knowledge_integration = integration
|
||||
@@ -345,6 +347,11 @@ class FuzzForgeExecutor:
|
||||
async def cognee_search(query: str) -> str:
|
||||
"""Search Cognee knowledge graph memory"""
|
||||
try:
|
||||
if self.memory_manager and getattr(self.memory_manager, "cognee_tools", None):
|
||||
results = await self.memory_manager.search_knowledge_graph(query)
|
||||
if isinstance(results, (dict, list)):
|
||||
return json.dumps(results, indent=2)
|
||||
return str(results)
|
||||
if self.cognee_service:
|
||||
results = await self.cognee_service.search_memory(query)
|
||||
return f"Cognee search results: {results}"
|
||||
@@ -554,8 +561,8 @@ class FuzzForgeExecutor:
|
||||
async def cognify_information(text: str) -> str:
|
||||
"""Transform information into knowledge graph format"""
|
||||
try:
|
||||
from .cognee_integration import CogneeProjectIntegration
|
||||
integration = CogneeProjectIntegration()
|
||||
from .cognee_integration import create_cognee_integration
|
||||
integration = create_cognee_integration()
|
||||
result = await integration.cognify_text(text)
|
||||
|
||||
if "error" in result:
|
||||
|
||||
@@ -21,6 +21,10 @@ import json
|
||||
from typing import Dict, List, Any, Optional, Union
|
||||
from pathlib import Path
|
||||
|
||||
from .config_bridge import ProjectConfigManager
|
||||
from .mcp.cognee_mcp_client import CogneeMCPTools
|
||||
from .utils.project_env import load_project_env
|
||||
|
||||
|
||||
class CogneeProjectIntegration:
|
||||
"""
|
||||
@@ -363,7 +367,7 @@ async def search_project_codebase(query: str, project_dir: Optional[str] = None,
|
||||
Returns:
|
||||
Formatted search results as string
|
||||
"""
|
||||
cognee_integration = CogneeProjectIntegration(project_dir)
|
||||
cognee_integration = create_cognee_integration(project_dir)
|
||||
result = await cognee_integration.search_knowledge_graph(query, search_type, dataset)
|
||||
|
||||
if "error" in result:
|
||||
@@ -402,7 +406,118 @@ async def search_project_codebase(query: str, project_dir: Optional[str] = None,
|
||||
else:
|
||||
output += f"{str(results)[:500]}..."
|
||||
|
||||
return output
|
||||
return output
|
||||
|
||||
|
||||
class CogneeServiceIntegration:
|
||||
"""Cognee integration backed by the MCP service."""
|
||||
|
||||
def __init__(self, project_dir: Optional[str] = None):
|
||||
self.project_dir = Path(project_dir) if project_dir else Path.cwd()
|
||||
self.project_manager: Optional[ProjectConfigManager] = None
|
||||
self.project_context: Dict[str, Any] = {}
|
||||
self.cognee_tools: Optional[CogneeMCPTools] = None
|
||||
self.dataset: Optional[str] = None
|
||||
self._initialized = False
|
||||
|
||||
async def initialize(self) -> bool:
|
||||
if self._initialized:
|
||||
return True
|
||||
|
||||
try:
|
||||
load_project_env(self.project_dir)
|
||||
self.project_manager = ProjectConfigManager(self.project_dir)
|
||||
self.project_manager.setup_cognee_environment()
|
||||
self.project_context = self.project_manager.get_project_context()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
dataset = os.getenv("COGNEE_DEFAULT_DATASET") or os.getenv("COGNEE_DATASET_NAME")
|
||||
if not dataset:
|
||||
project_name = self.project_context.get("project_name") or "fuzzforge"
|
||||
dataset = f"{project_name}_codebase"
|
||||
self.dataset = dataset
|
||||
|
||||
mcp_url = os.getenv("COGNEE_MCP_URL") or "http://localhost:18001/mcp"
|
||||
service_url = os.getenv("COGNEE_SERVICE_URL") or os.getenv("COGNEE_API_URL")
|
||||
user_email = os.getenv("COGNEE_SERVICE_USER_EMAIL") or os.getenv("DEFAULT_USER_EMAIL")
|
||||
user_password = os.getenv("COGNEE_SERVICE_USER_PASSWORD") or os.getenv("DEFAULT_USER_PASSWORD")
|
||||
|
||||
self.cognee_tools = CogneeMCPTools(
|
||||
base_url=mcp_url,
|
||||
default_dataset=self.dataset,
|
||||
default_email=user_email,
|
||||
default_password=user_password,
|
||||
default_service_url=service_url,
|
||||
)
|
||||
|
||||
os.environ.setdefault("COGNEE_MCP_URL", mcp_url)
|
||||
self._initialized = True
|
||||
return True
|
||||
|
||||
async def search_knowledge_graph(
|
||||
self,
|
||||
query: str,
|
||||
search_type: str = "GRAPH_COMPLETION",
|
||||
dataset: Optional[str] = None,
|
||||
**kwargs: Any,
|
||||
) -> Dict[str, Any]:
|
||||
if not self._initialized and not await self.initialize():
|
||||
return {"error": "Cognee service not initialised"}
|
||||
if not self.cognee_tools:
|
||||
return {"error": "Cognee tools unavailable"}
|
||||
|
||||
target_dataset = dataset or self.dataset
|
||||
result = await self.cognee_tools.search(
|
||||
query=query,
|
||||
search_type=search_type,
|
||||
dataset=target_dataset,
|
||||
**kwargs,
|
||||
)
|
||||
return {
|
||||
"tenant": result.get("tenant") if isinstance(result, dict) else None,
|
||||
"dataset": target_dataset,
|
||||
"search_type": search_type,
|
||||
"results": result,
|
||||
}
|
||||
|
||||
async def list_knowledge_data(self) -> Dict[str, Any]:
|
||||
if not self._initialized and not await self.initialize():
|
||||
return {"error": "Cognee service not initialised"}
|
||||
if not self.cognee_tools:
|
||||
return {"error": "Cognee tools unavailable"}
|
||||
|
||||
datasets = await self.cognee_tools.list_datasets()
|
||||
return {
|
||||
"project": self.project_context.get("project_name"),
|
||||
"available_data": datasets,
|
||||
}
|
||||
|
||||
async def ingest_text_to_dataset(self, text: str, dataset: str = None) -> Dict[str, Any]:
|
||||
if not self._initialized and not await self.initialize():
|
||||
return {"error": "Cognee service not initialised"}
|
||||
|
||||
target_dataset = dataset or self.dataset
|
||||
try:
|
||||
from .cognee_service import CogneeService
|
||||
|
||||
service = CogneeService(self.project_manager)
|
||||
await service.initialize()
|
||||
success = await service.ingest_text(text, target_dataset)
|
||||
if success:
|
||||
await self.cognee_tools.cognify(dataset=target_dataset, run_in_background=True)
|
||||
return {"status": "queued", "dataset": target_dataset}
|
||||
return {"error": "Cognee ingestion failed"}
|
||||
except Exception as exc:
|
||||
return {"error": f"Service ingestion failed: {exc}"}
|
||||
|
||||
|
||||
def create_cognee_integration(project_dir: Optional[str] = None):
|
||||
storage_mode = (os.getenv("COGNEE_STORAGE_MODE") or "").lower()
|
||||
service_url = os.getenv("COGNEE_SERVICE_URL") or os.getenv("COGNEE_API_URL")
|
||||
if storage_mode == "service" or service_url:
|
||||
return CogneeServiceIntegration(project_dir)
|
||||
return CogneeProjectIntegration(project_dir)
|
||||
|
||||
|
||||
async def list_project_knowledge(project_dir: Optional[str] = None) -> str:
|
||||
@@ -415,7 +530,7 @@ async def list_project_knowledge(project_dir: Optional[str] = None) -> str:
|
||||
Returns:
|
||||
Formatted list of available data
|
||||
"""
|
||||
cognee_integration = CogneeProjectIntegration(project_dir)
|
||||
cognee_integration = create_cognee_integration(project_dir)
|
||||
result = await cognee_integration.list_knowledge_data()
|
||||
|
||||
if "error" in result:
|
||||
|
||||
166
ai/src/fuzzforge_ai/mcp/cognee_mcp_client.py
Normal file
166
ai/src/fuzzforge_ai/mcp/cognee_mcp_client.py
Normal file
@@ -0,0 +1,166 @@
|
||||
"""Async helper for interacting with the Cognee MCP mirror."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Iterable, Optional
|
||||
|
||||
from mcp.client import streamable_http
|
||||
from mcp.client.session import ClientSession
|
||||
from mcp.types import Implementation, CallToolResult
|
||||
|
||||
from fuzzforge_ai import __version__
|
||||
|
||||
|
||||
@dataclass
|
||||
class MCPCallResult:
|
||||
raw: CallToolResult
|
||||
payload: Any
|
||||
|
||||
|
||||
class CogneeMCPTools:
|
||||
"""Minimal client for invoking Cognee MCP tools."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str,
|
||||
*,
|
||||
default_dataset: Optional[str] = None,
|
||||
default_email: Optional[str] = None,
|
||||
default_password: Optional[str] = None,
|
||||
default_service_url: Optional[str] = None,
|
||||
) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.default_dataset = default_dataset
|
||||
self.default_email = default_email
|
||||
self.default_password = default_password
|
||||
self.default_service_url = default_service_url
|
||||
self._client_info = Implementation(name="FuzzForge-Agent", version=__version__)
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def _call_tool(self, name: str, arguments: Dict[str, Any]) -> MCPCallResult:
|
||||
async with streamable_http.streamablehttp_client(self.base_url) as (read_stream, write_stream, _):
|
||||
session = ClientSession(read_stream, write_stream, client_info=self._client_info)
|
||||
await session.initialize()
|
||||
result = await session.call_tool(name, arguments)
|
||||
|
||||
if result.isError:
|
||||
message = getattr(result, "message", None) or "Unknown MCP tool error"
|
||||
raise RuntimeError(f"Tool '{name}' failed: {message}")
|
||||
|
||||
payload = self._extract_payload(result)
|
||||
return MCPCallResult(raw=result, payload=payload)
|
||||
|
||||
def _with_defaults(self, overrides: Dict[str, Any]) -> Dict[str, Any]:
|
||||
merged: Dict[str, Any] = {}
|
||||
if self.default_email and "user_email" not in overrides:
|
||||
merged["user_email"] = self.default_email
|
||||
if self.default_password and "user_password" not in overrides:
|
||||
merged["user_password"] = self.default_password
|
||||
if self.default_service_url and "service_url" not in overrides:
|
||||
merged["service_url"] = self.default_service_url
|
||||
if self.default_dataset and "dataset" not in overrides and "dataset" not in merged:
|
||||
merged["dataset"] = self.default_dataset
|
||||
merged.update({k: v for k, v in overrides.items() if v is not None})
|
||||
return merged
|
||||
|
||||
@staticmethod
|
||||
def _extract_payload(result: CallToolResult) -> Any:
|
||||
if result.content:
|
||||
texts: list[str] = []
|
||||
for item in result.content:
|
||||
text = getattr(item, "text", None)
|
||||
if text:
|
||||
texts.append(text)
|
||||
if texts:
|
||||
combined = "\n".join(texts).strip()
|
||||
if combined:
|
||||
try:
|
||||
return json.loads(combined)
|
||||
except json.JSONDecodeError:
|
||||
return combined
|
||||
return None
|
||||
|
||||
async def list_datasets(self, **overrides: Any) -> Any:
|
||||
args = self._with_defaults(overrides)
|
||||
result = await self._call_tool("list_datasets", args)
|
||||
return result.payload
|
||||
|
||||
async def search(
|
||||
self,
|
||||
*,
|
||||
query: str,
|
||||
search_type: str = "CHUNKS",
|
||||
top_k: Optional[int] = None,
|
||||
only_context: Optional[bool] = None,
|
||||
node_name: Optional[Iterable[str]] = None,
|
||||
system_prompt: Optional[str] = None,
|
||||
use_combined_context: Optional[bool] = None,
|
||||
dataset: Optional[str] = None,
|
||||
**overrides: Any,
|
||||
) -> Any:
|
||||
args = {
|
||||
"query": query,
|
||||
"search_type": search_type,
|
||||
"top_k": top_k,
|
||||
"only_context": only_context,
|
||||
"node_name": list(node_name) if node_name is not None else None,
|
||||
"system_prompt": system_prompt,
|
||||
"use_combined_context": use_combined_context,
|
||||
"dataset": dataset,
|
||||
}
|
||||
payload = self._with_defaults({**args, **overrides})
|
||||
result = await self._call_tool("search", payload)
|
||||
return result.payload
|
||||
|
||||
async def cognify(
|
||||
self,
|
||||
*,
|
||||
dataset: Optional[str] = None,
|
||||
run_in_background: bool = False,
|
||||
custom_prompt: Optional[str] = None,
|
||||
**overrides: Any,
|
||||
) -> Any:
|
||||
args = {
|
||||
"dataset": dataset,
|
||||
"run_in_background": run_in_background,
|
||||
"custom_prompt": custom_prompt,
|
||||
}
|
||||
payload = self._with_defaults({**args, **overrides})
|
||||
result = await self._call_tool("cognify", payload)
|
||||
return result.payload
|
||||
|
||||
async def memify(
|
||||
self,
|
||||
*,
|
||||
dataset: Optional[str] = None,
|
||||
extraction_tasks: Optional[Iterable[str]] = None,
|
||||
enrichment_tasks: Optional[Iterable[str]] = None,
|
||||
data: Optional[str] = None,
|
||||
node_name: Optional[Iterable[str]] = None,
|
||||
run_in_background: bool = False,
|
||||
**overrides: Any,
|
||||
) -> Any:
|
||||
args = {
|
||||
"dataset": dataset,
|
||||
"extraction_tasks": list(extraction_tasks) if extraction_tasks is not None else None,
|
||||
"enrichment_tasks": list(enrichment_tasks) if enrichment_tasks is not None else None,
|
||||
"data": data,
|
||||
"node_name": list(node_name) if node_name is not None else None,
|
||||
"run_in_background": run_in_background,
|
||||
}
|
||||
payload = self._with_defaults({**args, **overrides})
|
||||
result = await self._call_tool("memify", payload)
|
||||
return result.payload
|
||||
|
||||
async def search_chunks(self, **kwargs: Any) -> Any:
|
||||
return await self.search(search_type="CHUNKS", **kwargs)
|
||||
|
||||
async def search_insights(self, **kwargs: Any) -> Any:
|
||||
return await self.search(search_type="INSIGHTS", **kwargs)
|
||||
|
||||
|
||||
__all__ = ["CogneeMCPTools"]
|
||||
@@ -11,11 +11,11 @@ from pathlib import Path
|
||||
from typing import Dict, Iterable, List, Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from mcp.server import FastMCP
|
||||
import mcp.types as types
|
||||
|
||||
from fuzzforge_ai.cognee_api_client import CogneeAPIClient, CogneeAPIError
|
||||
from fuzzforge_ai.utils.project_env import load_project_env
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=os.getenv("MCP_LOG_LEVEL", "INFO"))
|
||||
@@ -23,14 +23,6 @@ logging.basicConfig(level=os.getenv("MCP_LOG_LEVEL", "INFO"))
|
||||
mcp = FastMCP("FuzzForge-Cognee")
|
||||
|
||||
|
||||
def _load_project_env() -> None:
|
||||
"""Load environment variables from `.fuzzforge/.env` if present."""
|
||||
|
||||
env_path = Path.cwd() / ".fuzzforge" / ".env"
|
||||
if env_path.exists():
|
||||
load_dotenv(env_path, override=False)
|
||||
|
||||
|
||||
class TenantCredentials:
|
||||
def __init__(self, alias: str, service_url: str, email: str, password: str, dataset: Optional[str]):
|
||||
self.alias = alias.lower()
|
||||
@@ -64,7 +56,7 @@ class CogneeMCPContext:
|
||||
TENANT_CONFIG_DEFAULT = Path(".fuzzforge") / "cognee.tenants.json"
|
||||
|
||||
def __init__(self) -> None:
|
||||
_load_project_env()
|
||||
load_project_env()
|
||||
self.verify_ssl = os.getenv("COGNEE_VERIFY_SSL", "true").lower() != "false"
|
||||
self.tenants = self._load_tenants()
|
||||
if not self.tenants:
|
||||
|
||||
@@ -209,7 +209,13 @@ class HybridMemoryManager:
|
||||
"""Search past conversations using ADK memory"""
|
||||
return await self.memory_service.search_memory(query)
|
||||
|
||||
async def search_knowledge_graph(self, query: str, search_type: str = "GRAPH_COMPLETION"):
|
||||
async def search_knowledge_graph(
|
||||
self,
|
||||
query: str,
|
||||
search_type: str = "GRAPH_COMPLETION",
|
||||
dataset: Optional[str] = None,
|
||||
**kwargs: Any,
|
||||
):
|
||||
"""Search Cognee knowledge graph (for RAG/codebase in future)"""
|
||||
if not self.cognee_tools:
|
||||
return None
|
||||
@@ -218,20 +224,22 @@ class HybridMemoryManager:
|
||||
# Use Cognee's graph search
|
||||
return await self.cognee_tools.search(
|
||||
query=query,
|
||||
search_type=search_type
|
||||
search_type=search_type,
|
||||
dataset=dataset,
|
||||
**kwargs,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Cognee search failed: {e}")
|
||||
return None
|
||||
|
||||
async def store_in_graph(self, content: str):
|
||||
async def store_in_graph(self, content: str, dataset: Optional[str] = None):
|
||||
"""Store in Cognee knowledge graph (for codebase analysis later)"""
|
||||
if not self.cognee_tools:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Use cognify to create graph structures
|
||||
return await self.cognee_tools.cognify(content)
|
||||
return await self.cognee_tools.cognify(dataset=dataset)
|
||||
except Exception as e:
|
||||
logger.debug(f"Cognee store failed: {e}")
|
||||
return None
|
||||
@@ -244,4 +252,4 @@ class HybridMemoryManager:
|
||||
"active": self.cognee_tools is not None,
|
||||
"purpose": "RAG/codebase analysis (future)"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
48
ai/src/fuzzforge_ai/utils/project_env.py
Normal file
48
ai/src/fuzzforge_ai/utils/project_env.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Helpers for loading project-scoped environment variables."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
try:
|
||||
from dotenv import load_dotenv
|
||||
except ImportError: # pragma: no cover - optional dependency
|
||||
load_dotenv = None # type: ignore
|
||||
|
||||
|
||||
def load_project_env(start_dir: Optional[Path] = None, *, override: bool = False) -> None:
|
||||
"""Load `.fuzzforge/.env` walking up from the given directory.
|
||||
|
||||
When running the AI agent outside of the CLI entrypoints we still want to
|
||||
honour the project-scoped environment file generated by `fuzzforge init` so
|
||||
credentials (Cognee, LLM provider, etc.) are available.
|
||||
"""
|
||||
|
||||
root = Path(start_dir) if start_dir else Path.cwd()
|
||||
|
||||
# Check the current directory first, then walk parents until we find a
|
||||
# project marker.
|
||||
for directory in [root] + list(root.parents):
|
||||
env_file = directory / ".fuzzforge" / ".env"
|
||||
if env_file.exists():
|
||||
if load_dotenv:
|
||||
load_dotenv(env_file, override=override)
|
||||
else: # pragma: no cover - best effort fallback when python-dotenv missing
|
||||
try:
|
||||
for line in env_file.read_text(encoding="utf-8").splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped or stripped.startswith("#"):
|
||||
continue
|
||||
if "=" not in stripped:
|
||||
continue
|
||||
key, value = stripped.split("=", 1)
|
||||
if override or key.strip() not in os.environ:
|
||||
os.environ[key.strip()] = value.strip()
|
||||
except Exception:
|
||||
pass
|
||||
break
|
||||
|
||||
|
||||
__all__ = ["load_project_env"]
|
||||
Reference in New Issue
Block a user