From 1bb0ec4764fc07ecd55630d85ab54bf863319809 Mon Sep 17 00:00:00 2001 From: Songbird Date: Fri, 3 Oct 2025 20:41:46 +0200 Subject: [PATCH] Extend Cognee MCP mirror with search and memify --- ai/src/fuzzforge_ai/__init__.py | 13 +- ai/src/fuzzforge_ai/cognee_api_client.py | 63 +- ai/src/fuzzforge_ai/config_bridge.py | 17 +- ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py | 640 ++++++++++++++++--- docker-compose.cognee.yaml | 92 +-- docker/Dockerfile.cognee_mcp | 27 + test_cognee_mcp.py | 242 +++++++ 7 files changed, 937 insertions(+), 157 deletions(-) create mode 100644 docker/Dockerfile.cognee_mcp create mode 100644 test_cognee_mcp.py diff --git a/ai/src/fuzzforge_ai/__init__.py b/ai/src/fuzzforge_ai/__init__.py index 5b343a2..b76acbf 100644 --- a/ai/src/fuzzforge_ai/__init__.py +++ b/ai/src/fuzzforge_ai/__init__.py @@ -18,7 +18,14 @@ providing intelligent AI agent capabilities for security analysis. __version__ = "0.6.0" -from .agent import FuzzForgeAgent -from .config_manager import ConfigManager +try: + from .agent import FuzzForgeAgent +except ImportError: # pragma: no cover - optional dependency graph + FuzzForgeAgent = None # type: ignore -__all__ = ['FuzzForgeAgent', 'ConfigManager'] \ No newline at end of file +try: + from .config_manager import ConfigManager +except ImportError: # pragma: no cover - optional dependency graph + ConfigManager = None # type: ignore + +__all__ = ['FuzzForgeAgent', 'ConfigManager'] diff --git a/ai/src/fuzzforge_ai/cognee_api_client.py b/ai/src/fuzzforge_ai/cognee_api_client.py index 21ebfc5..787b326 100644 --- a/ai/src/fuzzforge_ai/cognee_api_client.py +++ b/ai/src/fuzzforge_ai/cognee_api_client.py @@ -214,8 +214,16 @@ class CogneeAPIClient: f"Cognee ingestion failed ({response.status_code}): {response.text.strip()}" ) - async def cognify(self, dataset_name: str) -> Dict[str, Any]: - payload = {"datasets": [dataset_name]} + async def cognify( + self, + dataset_name: str, + *, + run_in_background: bool = False, + custom_prompt: Optional[str] = None, + ) -> Dict[str, Any]: + payload: Dict[str, Any] = {"datasets": [dataset_name], "run_in_background": run_in_background} + if custom_prompt is not None: + payload["custom_prompt"] = custom_prompt try: response = await self._client.post("/api/v1/cognify", json=payload) except httpx.RequestError as exc: @@ -258,6 +266,10 @@ class CogneeAPIClient: datasets: Optional[Iterable[str]] = None, search_type: str = "INSIGHTS", top_k: Optional[int] = None, + only_context: Optional[bool] = None, + node_name: Optional[Iterable[str]] = None, + system_prompt: Optional[str] = None, + use_combined_context: Optional[bool] = None, ) -> Any: payload: Dict[str, Any] = { "query": query, @@ -267,6 +279,14 @@ class CogneeAPIClient: payload["datasets"] = list(datasets) if top_k is not None: payload["top_k"] = top_k + if only_context is not None: + payload["only_context"] = only_context + if node_name is not None: + payload["node_name"] = list(node_name) + if system_prompt is not None: + payload["system_prompt"] = system_prompt + if use_combined_context is not None: + payload["use_combined_context"] = use_combined_context try: response = await self._client.post("/api/v1/search", json=payload) @@ -274,12 +294,49 @@ class CogneeAPIClient: raise CogneeAPIError(f"Cognee search request failed: {exc}") from exc if response.status_code in {200, 201}: - # search may already be JSON serialisable but ensure data type is python object return response.json() raise CogneeAPIError( f"Cognee search failed ({response.status_code}): {response.text.strip()}" ) + async def memify( + self, + *, + dataset_name: Optional[str] = None, + dataset_id: Optional[str] = None, + extraction_tasks: Optional[Iterable[str]] = None, + enrichment_tasks: Optional[Iterable[str]] = None, + data: Optional[str] = None, + node_name: Optional[Iterable[str]] = None, + run_in_background: bool = False, + ) -> Dict[str, Any]: + payload: Dict[str, Any] = { + "datasetName": dataset_name, + "datasetId": dataset_id, + "runInBackground": run_in_background, + } + if extraction_tasks is not None: + payload["extractionTasks"] = list(extraction_tasks) + if enrichment_tasks is not None: + payload["enrichmentTasks"] = list(enrichment_tasks) + if data is not None: + payload["data"] = data + if node_name is not None: + payload["nodeName"] = list(node_name) + + payload = {key: value for key, value in payload.items() if value is not None} + + try: + response = await self._client.post("/api/v1/memify", json=payload) + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee memify request failed: {exc}") from exc + + if response.status_code in {200, 202}: + return response.json() + raise CogneeAPIError( + f"Cognee memify failed ({response.status_code}): {response.text.strip()}" + ) + async def status(self) -> Dict[str, Any]: """Return health information along with dataset list.""" info: Dict[str, Any] = {} diff --git a/ai/src/fuzzforge_ai/config_bridge.py b/ai/src/fuzzforge_ai/config_bridge.py index 668f607..b3e094a 100644 --- a/ai/src/fuzzforge_ai/config_bridge.py +++ b/ai/src/fuzzforge_ai/config_bridge.py @@ -13,18 +13,23 @@ try: from fuzzforge_cli.config import ProjectConfigManager as _ProjectConfigManager -except ImportError as exc: # pragma: no cover - used when CLI not available +except ImportError: # pragma: no cover - used when CLI not available + _MISSING_MSG = ( + "ProjectConfigManager is unavailable. Install the FuzzForge CLI package " + "or supply a compatible configuration object." + ) + + def _raise_missing() -> None: + raise ImportError(_MISSING_MSG) + class _ProjectConfigManager: # type: ignore[no-redef] """Fallback implementation that raises a helpful error.""" def __init__(self, *args, **kwargs): - raise ImportError( - "ProjectConfigManager is unavailable. Install the FuzzForge CLI " - "package or supply a compatible configuration object." - ) from exc + _raise_missing() def __getattr__(name): # pragma: no cover - defensive - raise ImportError("ProjectConfigManager unavailable") from exc + _raise_missing() ProjectConfigManager = _ProjectConfigManager diff --git a/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py b/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py index b79f048..b5de61e 100644 --- a/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py +++ b/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py @@ -8,7 +8,8 @@ import json import logging import os from pathlib import Path -from typing import List, Optional +from typing import Dict, Iterable, List, Optional +from urllib.parse import urlparse from dotenv import load_dotenv from mcp.server import FastMCP @@ -30,79 +31,293 @@ def _load_project_env() -> None: load_dotenv(env_path, override=False) +class TenantCredentials: + def __init__(self, alias: str, service_url: str, email: str, password: str, dataset: Optional[str]): + self.alias = alias.lower() + self.service_url = service_url.rstrip("/") + self.email = email + self.password = password + self.dataset = dataset + + class CogneeMCPContext: - """Holds shared state for MCP tools and manages the Cognee API client.""" + """Manages Cognee API clients for multiple tenants.""" + + T_PREFIX = "COGNEE_TENANT_" + SUPPORTED_SEARCH_TYPES = { + "SUMMARIES", + "INSIGHTS", + "CHUNKS", + "RAG_COMPLETION", + "GRAPH_COMPLETION", + "GRAPH_SUMMARY_COMPLETION", + "CODE", + "CYPHER", + "NATURAL_LANGUAGE", + "GRAPH_COMPLETION_COT", + "GRAPH_COMPLETION_CONTEXT_EXTENSION", + "FEELING_LUCKY", + "FEEDBACK", + "TEMPORAL", + "CODING_RULES", + } + TENANT_CONFIG_DEFAULT = Path(".fuzzforge") / "cognee.tenants.json" def __init__(self) -> None: _load_project_env() - self.service_url = os.getenv("COGNEE_SERVICE_URL", "").rstrip("/") self.verify_ssl = os.getenv("COGNEE_VERIFY_SSL", "true").lower() != "false" - self.email = os.getenv("COGNEE_SERVICE_USER_EMAIL") or os.getenv("DEFAULT_USER_EMAIL") - self.password = os.getenv("COGNEE_SERVICE_USER_PASSWORD") or os.getenv("DEFAULT_USER_PASSWORD") - if not (self.service_url and self.email and self.password): - raise RuntimeError( - "COGNEE_SERVICE_URL, COGNEE_SERVICE_USER_EMAIL and" - " COGNEE_SERVICE_USER_PASSWORD must be set in the environment" + self.tenants = self._load_tenants() + if not self.tenants: + logger.info( + "No preconfigured tenants detected; credentials must be provided per tool call." ) - self._client: Optional[CogneeAPIClient] = None - self.active_dataset_name: Optional[str] = os.getenv("COGNEE_DATASET_NAME") or self._default_dataset_name() - self.active_dataset_id: Optional[str] = None + self.clients: dict[str, CogneeAPIClient] = {} + self.dataset_cache: dict[tuple[str, str], str] = {} - async def start(self) -> None: - self._client = CogneeAPIClient(self.service_url, verify_ssl=self.verify_ssl) - await self._client.ensure_user(self.email, self.password, tenant_name=self._tenant_name()) - if self.active_dataset_name: - dataset_info = await self._client.ensure_dataset(self.active_dataset_name) - self.active_dataset_id = dataset_info.get("id") - logger.info( - "Connected to Cognee service", - extra={ - "service_url": self.service_url, - "dataset": self.active_dataset_name, - "dataset_id": self.active_dataset_id, - }, - ) + def _load_tenants(self) -> dict[str, TenantCredentials]: + tenants = self._load_tenants_from_env() + file_tenants = self._load_tenants_from_file() + for alias, creds in file_tenants.items(): + if alias in tenants: + existing = tenants[alias] + # Allow config file to augment missing fields from env-derived entry + if not existing.dataset and creds.dataset: + existing.dataset = creds.dataset + if creds.service_url and creds.service_url != existing.service_url: + existing.service_url = creds.service_url + continue + tenants[alias] = creds + return tenants + + def _load_tenants_from_env(self) -> dict[str, TenantCredentials]: + tenants: dict[str, TenantCredentials] = {} + service_url_default = os.getenv("COGNEE_SERVICE_URL", "").rstrip("/") + for key, value in os.environ.items(): + if not key.startswith(self.T_PREFIX) or not key.endswith("_EMAIL"): + continue + alias = key[len(self.T_PREFIX):-len("_EMAIL")].lower() + email = value + password = os.getenv(f"{self.T_PREFIX}{alias.upper()}_PASSWORD") + dataset = os.getenv(f"{self.T_PREFIX}{alias.upper()}_DATASET") + service_url = ( + os.getenv(f"{self.T_PREFIX}{alias.upper()}_SERVICE_URL") + or service_url_default + ).rstrip("/") + if not (service_url and email and password): + logger.warning("Skipping tenant '%s' due to incomplete credentials", alias) + continue + tenants[alias] = TenantCredentials(alias, service_url, email, password, dataset) + + # fallback to legacy single-tenant vars + if not tenants and service_url_default: + email = os.getenv("COGNEE_SERVICE_USER_EMAIL") or os.getenv("DEFAULT_USER_EMAIL") + password = os.getenv("COGNEE_SERVICE_USER_PASSWORD") or os.getenv("DEFAULT_USER_PASSWORD") + dataset = os.getenv("COGNEE_DATASET_NAME") + if email and password: + tenants["default"] = TenantCredentials("default", service_url_default, email, password, dataset) + + return tenants + + def _load_tenants_from_file(self) -> dict[str, TenantCredentials]: + tenants: Dict[str, TenantCredentials] = {} + config_path = os.getenv("COGNEE_TENANTS_FILE") + if config_path: + candidate = Path(config_path).expanduser() + else: + candidate = (Path.cwd() / self.TENANT_CONFIG_DEFAULT).resolve() + + if not candidate.exists(): + return tenants + + try: + raw = candidate.read_text(encoding="utf-8") + data = json.loads(raw) + except (OSError, json.JSONDecodeError) as exc: + logger.error("Failed to load tenant config '%s': %s", candidate, exc) + return tenants + + entries = [] + if isinstance(data, list): + entries = data + elif isinstance(data, dict): + entries = data.get("tenants", []) + else: + logger.error("Tenant config '%s' must be a list or dict with 'tenants'", candidate) + return tenants + + for item in entries: + if not isinstance(item, dict): + continue + alias = item.get("alias") + email = item.get("email") + password = item.get("password") + dataset = item.get("dataset") + service_url = (item.get("service_url") or os.getenv("COGNEE_SERVICE_URL", "")).rstrip("/") + if not (alias and email and password and service_url): + logger.warning("Skipping tenant entry with missing fields: %s", item) + continue + tenants[alias.lower()] = TenantCredentials(alias, service_url, email, password, dataset) + + if tenants: + logger.info("Loaded %d tenant(s) from %s", len(tenants), candidate) + return tenants + + async def get_client(self, alias: str) -> CogneeAPIClient: + alias = alias.lower() + if alias not in self.tenants: + raise ValueError(f"Unknown tenant alias '{alias}'. Available: {', '.join(self.tenants)}") + if alias not in self.clients: + tenant = self.tenants[alias] + client = CogneeAPIClient(tenant.service_url, verify_ssl=self.verify_ssl) + await client.ensure_user(tenant.email, tenant.password, tenant_name=self._tenant_name(alias)) + self.clients[alias] = client + return self.clients[alias] async def close(self) -> None: - if self._client: - await self._client.close() + await asyncio.gather(*(client.close() for client in self.clients.values())) + self.clients.clear() - @property - def client(self) -> CogneeAPIClient: - if not self._client: - raise RuntimeError("Cognee API client is not initialised yet") - return self._client + def _tenant_name(self, alias: str) -> str: + alias = alias.lower() + env_alias = alias.upper().replace(":", "_").replace("@", "_").replace("/", "_") + fallback = alias.replace(":", "-").replace("@", "-").replace("/", "-") + return os.getenv(f"COGNEE_TENANT_{env_alias}_NAME") or fallback - def _tenant_name(self) -> str: - return os.getenv("COGNEE_TENANT_NAME") or os.getenv("COGNEE_PROJECT_ID") or "default-tenant" + def _service_url(self, override: Optional[str]) -> str: + url = (override or os.getenv("COGNEE_SERVICE_URL", "")).strip() + url = url.rstrip("/") + if url: + parsed = urlparse(url) + host = parsed.hostname + if host in {"localhost", "127.0.0.1"}: + internal = os.getenv("COGNEE_INTERNAL_SERVICE_URL", "").rstrip("/") + if internal: + url = internal + if not url: + raise ValueError( + "Service URL not provided. Set COGNEE_SERVICE_URL or pass service_url argument." + ) + return url - def _default_dataset_name(self) -> str: - project_name = os.getenv("FUZZFORGE_PROJECT_NAME") or "project" - return f"{project_name}_codebase" + def _alias_for_credentials(self, email: str, service_url: str) -> str: + safe_url = service_url.lower().replace("://", "_").replace("/", "_") + return f"user:{email.lower()}@{safe_url}" - async def ensure_dataset(self, dataset_name: str) -> str: - """Ensure the dataset exists and cache its id.""" + def _ensure_tenant_entry( + self, + *, + tenant: Optional[str], + email: Optional[str], + password: Optional[str], + service_url: Optional[str], + dataset: Optional[str], + ) -> tuple[str, TenantCredentials]: + if tenant: + alias = tenant.lower() + if alias not in self.tenants: + raise ValueError( + f"Unknown tenant alias '{alias}'. Available: {', '.join(self.tenants)}" + ) + creds = self.tenants[alias] + if service_url and service_url.rstrip("/") != creds.service_url: + raise ValueError( + "service_url override does not match configured tenant service URL" + ) + if dataset and not creds.dataset: + creds.dataset = dataset + return alias, creds - info = await self.client.ensure_dataset(dataset_name) + if not (email and password): + raise ValueError( + "Provide either a tenant alias or user_email and user_password credentials." + ) + + base_url = self._service_url(service_url) + alias = self._alias_for_credentials(email, base_url) + creds = self.tenants.get(alias) + if not creds: + creds = TenantCredentials(alias, base_url, email, password, dataset) + self.tenants[alias] = creds + else: + if dataset and not creds.dataset: + creds.dataset = dataset + # Update password if changed to avoid stale credential reuse + if password != creds.password: + creds.password = password + return alias, creds + + async def prepare_client( + self, + *, + tenant: Optional[str], + email: Optional[str], + password: Optional[str], + service_url: Optional[str], + ) -> tuple[str, CogneeAPIClient]: + alias, _ = self._ensure_tenant_entry( + tenant=tenant, + email=email, + password=password, + service_url=service_url, + dataset=None, + ) + client = await self.get_client(alias) + return alias, client + + async def prepare_dataset_request( + self, + *, + tenant: Optional[str], + dataset: Optional[str], + email: Optional[str], + password: Optional[str], + service_url: Optional[str], + ) -> tuple[str, CogneeAPIClient, str, str]: + alias, _ = self._ensure_tenant_entry( + tenant=tenant, + email=email, + password=password, + service_url=service_url, + dataset=dataset, + ) + client = await self.get_client(alias) + dataset_name, dataset_id = await self.resolve_dataset(alias, dataset) + return alias, client, dataset_name, dataset_id + + async def start(self) -> None: + for alias, creds in self.tenants.items(): + if creds.dataset: + try: + await self.resolve_dataset(alias, creds.dataset) + except Exception as exc: + logger.warning("Failed to preload dataset for tenant '%s': %s", alias, exc) + logger.info("Tenants available: %s", ", ".join(sorted(self.tenants))) + + async def resolve_dataset(self, alias: str, dataset: Optional[str]) -> tuple[str, str]: + alias = alias.lower() + if alias not in self.tenants: + raise ValueError(f"Unknown tenant alias '{alias}'. Available: {', '.join(self.tenants)}") + tenant = self.tenants[alias] + dataset_name = dataset or tenant.dataset + if not dataset_name: + env_alias = alias.upper().replace(":", "_").replace("@", "_").replace("/", "_") + raise ValueError( + "Dataset not specified. Provide the dataset argument or set " + f"COGNEE_TENANT_{env_alias}_DATASET." + ) + cache_key = (alias, dataset_name) + if cache_key in self.dataset_cache: + return dataset_name, self.dataset_cache[cache_key] + client = await self.get_client(alias) + info = await client.ensure_dataset(dataset_name) dataset_id = info.get("id") if not dataset_id: raise RuntimeError(f"Failed to resolve dataset id for '{dataset_name}'") - self.active_dataset_name = dataset_name - self.active_dataset_id = dataset_id - return dataset_id + self.dataset_cache[cache_key] = dataset_id + return dataset_name, dataset_id - async def resolve_dataset(self, dataset: Optional[str]) -> tuple[str, str]: - name = dataset or self.active_dataset_name - if not name: - raise ValueError("Dataset is not specified; set COGNEE_DATASET_NAME or call set_dataset first") - # if the requested dataset differs from the cached one, ensure it - if dataset and dataset != self.active_dataset_name: - dataset_id = await self.ensure_dataset(dataset) - else: - dataset_id = self.active_dataset_id - if not dataset_id: - dataset_id = await self.ensure_dataset(name) - return name, dataset_id + def list_tenants(self) -> List[str]: + return sorted(self.tenants.keys()) CONTEXT = CogneeMCPContext() @@ -113,29 +328,44 @@ def _text(message: str) -> List[types.TextContent]: @mcp.tool() -async def set_dataset(dataset: str) -> List[types.TextContent]: - """Switch the active dataset for subsequent operations.""" +async def list_tenants() -> List[types.TextContent]: + """List known tenant aliases (configured or dynamically added).""" - dataset_id = await CONTEXT.ensure_dataset(dataset) - return _text(f"Active dataset set to '{dataset}' (id: {dataset_id}).") + aliases = CONTEXT.list_tenants() + return _text(json.dumps({"tenants": aliases}, indent=2)) @mcp.tool() -async def ingest_file(path: str, dataset: Optional[str] = None) -> List[types.TextContent]: - """Upload a local file into the Cognee dataset.""" +async def ingest_file( + path: str, + tenant: Optional[str] = None, + dataset: Optional[str] = None, + user_email: Optional[str] = None, + user_password: Optional[str] = None, + service_url: Optional[str] = None, +) -> List[types.TextContent]: + """Upload a local file into the Cognee dataset for a tenant or explicit user.""" file_path = Path(path).expanduser().resolve() if not file_path.is_file(): raise ValueError(f"File not found: {file_path}") - dataset_name, _ = await CONTEXT.resolve_dataset(dataset) - result = await CONTEXT.client.ingest_files(dataset_name, [file_path]) + alias, client, dataset_name, _ = await CONTEXT.prepare_dataset_request( + tenant=tenant, + dataset=dataset, + email=user_email, + password=user_password, + service_url=service_url, + ) + result = await client.ingest_files(dataset_name, [file_path]) success = result.get("success", 0) failed = result.get("failed", 0) errors = result.get("errors", []) message = [ - f"Uploaded {success} file(s) to dataset '{dataset_name}'.", + f"Tenant: {tenant or alias}", + f"Dataset: {dataset_name}", + f"Uploaded: {success} file(s)", f"Failed: {failed}", ] if errors: @@ -145,70 +375,272 @@ async def ingest_file(path: str, dataset: Optional[str] = None) -> List[types.Te @mcp.tool() -async def cognify(run_in_background: bool = False, dataset: Optional[str] = None) -> List[types.TextContent]: +async def cognify( + tenant: Optional[str] = None, + dataset: Optional[str] = None, + run_in_background: bool = False, + user_email: Optional[str] = None, + user_password: Optional[str] = None, + service_url: Optional[str] = None, + custom_prompt: Optional[str] = None, +) -> List[types.TextContent]: """Trigger the Cognify pipeline for the dataset.""" - dataset_name, _ = await CONTEXT.resolve_dataset(dataset) - response = await CONTEXT.client.cognify(dataset_name, run_in_background=run_in_background) - return _text(json.dumps(response, indent=2)) + alias, client, dataset_name, _ = await CONTEXT.prepare_dataset_request( + tenant=tenant, + dataset=dataset, + email=user_email, + password=user_password, + service_url=service_url, + ) + response = await client.cognify( + dataset_name, + run_in_background=run_in_background, + custom_prompt=custom_prompt, + ) + payload = { + "tenant": tenant or alias, + "dataset": dataset_name, + "response": response, + } + return _text(json.dumps(payload, indent=2)) @mcp.tool() -async def cognify_status(dataset: Optional[str] = None) -> List[types.TextContent]: +async def cognify_status( + tenant: Optional[str] = None, + dataset: Optional[str] = None, + user_email: Optional[str] = None, + user_password: Optional[str] = None, + service_url: Optional[str] = None, +) -> List[types.TextContent]: """Return the status of recent Cognify runs.""" - _, dataset_id = await CONTEXT.resolve_dataset(dataset) - status = await CONTEXT.client.status() - status.setdefault("dataset_ids", []).append(dataset_id) - return _text(json.dumps(status, indent=2)) + alias, client, _, dataset_id = await CONTEXT.prepare_dataset_request( + tenant=tenant, + dataset=dataset, + email=user_email, + password=user_password, + service_url=service_url, + ) + status = await client.status() + payload = { + "tenant": tenant or alias, + "dataset_id": dataset_id, + "status": status, + } + return _text(json.dumps(payload, indent=2)) @mcp.tool() -async def search_chunks(query: str, top_k: int = 5, dataset: Optional[str] = None) -> List[types.TextContent]: +async def search_chunks( + query: str, + tenant: Optional[str] = None, + top_k: int = 5, + dataset: Optional[str] = None, + user_email: Optional[str] = None, + user_password: Optional[str] = None, + service_url: Optional[str] = None, +) -> List[types.TextContent]: """Search dataset for relevant document chunks.""" - dataset_name, _ = await CONTEXT.resolve_dataset(dataset) - result = await CONTEXT.client.search( + alias, client, dataset_name, _ = await CONTEXT.prepare_dataset_request( + tenant=tenant, + dataset=dataset, + email=user_email, + password=user_password, + service_url=service_url, + ) + result = await client.search( query, datasets=[dataset_name], search_type="CHUNKS", top_k=top_k, ) - return _text(json.dumps(result, indent=2)) + payload = { + "tenant": tenant or alias, + "dataset": dataset_name, + "results": result, + } + return _text(json.dumps(payload, indent=2)) @mcp.tool() -async def search_insights(query: str, top_k: int = 5, dataset: Optional[str] = None) -> List[types.TextContent]: +async def search_insights( + query: str, + tenant: Optional[str] = None, + top_k: int = 5, + dataset: Optional[str] = None, + user_email: Optional[str] = None, + user_password: Optional[str] = None, + service_url: Optional[str] = None, +) -> List[types.TextContent]: """Search dataset for graph insights.""" - dataset_name, _ = await CONTEXT.resolve_dataset(dataset) - result = await CONTEXT.client.search( + alias, client, dataset_name, _ = await CONTEXT.prepare_dataset_request( + tenant=tenant, + dataset=dataset, + email=user_email, + password=user_password, + service_url=service_url, + ) + result = await client.search( query, datasets=[dataset_name], search_type="INSIGHTS", top_k=top_k, ) + payload = { + "tenant": tenant or alias, + "dataset": dataset_name, + "results": result, + } + return _text(json.dumps(payload, indent=2)) + + +@mcp.tool() +async def search( + query: str, + search_type: str = "CHUNKS", + tenant: Optional[str] = None, + dataset: Optional[str] = None, + top_k: Optional[int] = None, + only_context: Optional[bool] = None, + node_name: Optional[List[str]] = None, + system_prompt: Optional[str] = None, + use_combined_context: Optional[bool] = None, + user_email: Optional[str] = None, + user_password: Optional[str] = None, + service_url: Optional[str] = None, +) -> List[types.TextContent]: + """Execute a Cognee semantic search with custom modes.""" + + alias, client, dataset_name, _ = await CONTEXT.prepare_dataset_request( + tenant=tenant, + dataset=dataset, + email=user_email, + password=user_password, + service_url=service_url, + ) + search_type_upper = search_type.upper() + if search_type_upper not in CONTEXT.SUPPORTED_SEARCH_TYPES: + raise ValueError( + "Unsupported search_type. Choose one of: " + + ", ".join(sorted(CONTEXT.SUPPORTED_SEARCH_TYPES)) + ) + + node_iter: Optional[Iterable[str]] = node_name if node_name is not None else None + + result = await client.search( + query, + datasets=[dataset_name], + search_type=search_type_upper, + top_k=top_k, + only_context=only_context, + node_name=node_iter, + system_prompt=system_prompt, + use_combined_context=use_combined_context, + ) + payload = { + "tenant": tenant or alias, + "dataset": dataset_name, + "search_type": search_type_upper, + "results": result, + } + return _text(json.dumps(payload, indent=2)) + + +@mcp.tool() +async def memify( + tenant: Optional[str] = None, + dataset: Optional[str] = None, + extraction_tasks: Optional[List[str]] = None, + enrichment_tasks: Optional[List[str]] = None, + data: Optional[str] = None, + node_name: Optional[List[str]] = None, + run_in_background: bool = False, + user_email: Optional[str] = None, + user_password: Optional[str] = None, + service_url: Optional[str] = None, +) -> List[types.TextContent]: + """Run the Memify enrichment pipeline for a dataset.""" + + alias, client, dataset_name, dataset_id = await CONTEXT.prepare_dataset_request( + tenant=tenant, + dataset=dataset, + email=user_email, + password=user_password, + service_url=service_url, + ) + + response = await client.memify( + dataset_name=dataset_name, + dataset_id=dataset_id, + extraction_tasks=extraction_tasks, + enrichment_tasks=enrichment_tasks, + data=data, + node_name=node_name, + run_in_background=run_in_background, + ) + + payload = { + "tenant": tenant or alias, + "dataset": dataset_name, + "response": response, + } + return _text(json.dumps(payload, indent=2)) + + +@mcp.tool() +async def list_datasets( + tenant: Optional[str] = None, + user_email: Optional[str] = None, + user_password: Optional[str] = None, + service_url: Optional[str] = None, +) -> List[types.TextContent]: + """List datasets accessible to configured tenants or provided user credentials.""" + + result: dict[str, List[dict[str, object]]] = {} + + if tenant or user_email: + alias, client = await CONTEXT.prepare_client( + tenant=tenant, + email=user_email, + password=user_password, + service_url=service_url, + ) + result[tenant or alias] = await client.list_datasets() + else: + for alias in CONTEXT.list_tenants(): + client = await CONTEXT.get_client(alias) + datasets = await client.list_datasets() + result[alias] = datasets return _text(json.dumps(result, indent=2)) @mcp.tool() -async def list_datasets() -> List[types.TextContent]: - """List datasets accessible to the current service user.""" - - datasets = await CONTEXT.client.list_datasets() - return _text(json.dumps(datasets, indent=2)) - - -@mcp.tool() -async def prune_dataset(dataset: Optional[str] = None) -> List[types.TextContent]: +async def prune_dataset( + tenant: Optional[str] = None, + dataset: Optional[str] = None, + user_email: Optional[str] = None, + user_password: Optional[str] = None, + service_url: Optional[str] = None, +) -> List[types.TextContent]: """Delete the dataset and recreate it empty.""" - dataset_name, dataset_id = await CONTEXT.resolve_dataset(dataset) - await CONTEXT.client.delete_dataset(dataset_id) - dataset_info = await CONTEXT.client.ensure_dataset(dataset_name) - CONTEXT.active_dataset_name = dataset_name - CONTEXT.active_dataset_id = dataset_info.get("id") - return _text(f"Dataset '{dataset_name}' has been pruned and recreated.") + alias, client, dataset_name, dataset_id = await CONTEXT.prepare_dataset_request( + tenant=tenant, + dataset=dataset, + email=user_email, + password=user_password, + service_url=service_url, + ) + await client.delete_dataset(dataset_id) + dataset_info = await client.ensure_dataset(dataset_name) + CONTEXT.dataset_cache[(alias, dataset_name)] = dataset_info.get("id") + return _text( + f"Dataset '{dataset_name}' (tenant {tenant or alias}) has been pruned and recreated." + ) async def _async_main(args: argparse.Namespace) -> None: @@ -222,6 +654,14 @@ async def _async_main(args: argparse.Namespace) -> None: port = args.port path = args.path + # Configure FastMCP server settings before launch + mcp.settings.host = host + mcp.settings.port = port + if transport == "http": + mcp.settings.streamable_http_path = path + elif transport == "sse": + mcp.settings.sse_path = path + logger.info( "Starting Cognee MCP server", extra={ @@ -229,17 +669,17 @@ async def _async_main(args: argparse.Namespace) -> None: "host": host, "port": port, "path": path, - "dataset": CONTEXT.dataset_name, + "tenants": CONTEXT.list_tenants(), }, ) try: if transport == "http": - await mcp.run_http(host=host, port=port, path=path) + await mcp.run_streamable_http_async() elif transport == "sse": - await mcp.run_sse(host=host, port=port, path=path) + await mcp.run_sse_async(path) else: - await mcp.run_stdio() + await mcp.run_stdio_async() finally: await CONTEXT.close() diff --git a/docker-compose.cognee.yaml b/docker-compose.cognee.yaml index fd23550..322d3b8 100644 --- a/docker-compose.cognee.yaml +++ b/docker-compose.cognee.yaml @@ -16,23 +16,23 @@ services: DATA_ROOT_DIRECTORY: ${DATA_ROOT_DIRECTORY:-s3://cognee-bucket/data} SYSTEM_ROOT_DIRECTORY: ${SYSTEM_ROOT_DIRECTORY:-s3://cognee-bucket/system} # Uppercase for standard env - AWS_ENDPOINT_URL: ${AWS_ENDPOINT_URL:-http://minio:9000} + AWS_ENDPOINT_URL: ${AWS_ENDPOINT_URL:-http://fuzzforge-minio:9000} AWS_REGION: ${AWS_REGION:-us-east-1} AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-fuzzforge} AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} # Lowercase for boto3/s3fs - aws_endpoint_url: ${AWS_ENDPOINT_URL:-http://minio:9000} + aws_endpoint_url: ${AWS_ENDPOINT_URL:-http://fuzzforge-minio:9000} aws_region: ${AWS_REGION:-us-east-1} aws_access_key_id: ${AWS_ACCESS_KEY_ID:-fuzzforge} aws_secret_access_key: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} # For LanceDB object_store - S3_ENDPOINT: ${AWS_ENDPOINT_URL:-http://minio:9000} + S3_ENDPOINT: ${AWS_ENDPOINT_URL:-http://fuzzforge-minio:9000} S3_REGION: ${AWS_REGION:-us-east-1} S3_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-fuzzforge} S3_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} S3_ALLOW_HTTP: "true" DB_PROVIDER: ${DB_PROVIDER:-postgres} - DB_HOST: ${DB_HOST:-postgres} + DB_HOST: ${DB_HOST:-cognee-postgres} DB_PORT: ${DB_PORT:-5432} DB_NAME: ${DB_NAME:-cognee_db} DB_USERNAME: ${DB_USERNAME:-cognee} @@ -43,13 +43,14 @@ services: - cognee-system:/app/.cognee_system - cognee-data:/app/.data_storage ports: - - "${COGNEE_SERVICE_PORT:-8000}:8000" + - "${COGNEE_SERVICE_PORT:-18000}:8000" depends_on: - - postgres + - cognee-postgres + - cognee-minio-setup networks: - - cognee + - fuzzforge_temporal_network - postgres: + cognee-postgres: image: pgvector/pgvector:pg17 container_name: cognee-postgres env_file: @@ -61,56 +62,57 @@ services: volumes: - cognee-postgres:/var/lib/postgresql/data ports: - - "5432:5432" + - "5433:5432" networks: - - cognee + - fuzzforge_temporal_network - minio: - image: minio/minio:latest - container_name: cognee-minio - command: server /data --console-address ":9001" + cognee-minio-setup: + image: minio/mc:latest + container_name: cognee-minio-setup + env_file: + - *cognee_env_file + entrypoint: > + /bin/sh -c " + echo 'Setting up Cognee bucket in MinIO...'; + sleep 5; + mc alias set fuzzforge http://fuzzforge-minio:9000 fuzzforge fuzzforge123; + mc mb fuzzforge/cognee-bucket --ignore-existing; + mc anonymous set private fuzzforge/cognee-bucket; + echo 'Cognee bucket setup complete!'; + exit 0; + " + networks: + - fuzzforge_temporal_network + + cognee-mcp: + build: + context: . + dockerfile: docker/Dockerfile.cognee_mcp + container_name: cognee-mcp env_file: - *cognee_env_file environment: - MINIO_ROOT_USER: ${AWS_ACCESS_KEY_ID:-fuzzforge} - MINIO_ROOT_PASSWORD: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} + COGNEE_TENANTS_FILE: ${COGNEE_TENANTS_FILE:-/config/cognee.tenants.json} + COGNEE_SERVICE_URL: ${COGNEE_SERVICE_URL:-http://cognee-service:8000} + COGNEE_INTERNAL_SERVICE_URL: ${COGNEE_INTERNAL_SERVICE_URL:-http://cognee-service:8000} + MCP_HOST: 0.0.0.0 + MCP_PORT: ${MCP_PORT:-18001} + MCP_HTTP_PATH: ${MCP_HTTP_PATH:-/mcp} volumes: - - cognee-minio:/data - ports: - - "9000:9000" - - "9001:9001" - networks: - - cognee - profiles: - - self-hosted-minio - - minio-mc: - image: minio/mc:latest - container_name: cognee-minio-mc + - ${COGNEE_TENANTS_FILE:-/dev/null}:/config/cognee.tenants.json:ro depends_on: - minio: + cognee: condition: service_started - env_file: - - *cognee_env_file - entrypoint: | - /bin/sh -c " - sleep 3 && \ - mc alias set cognee ${AWS_ENDPOINT_URL:-http://minio:9000} ${AWS_ACCESS_KEY_ID:-fuzzforge} ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} && \ - mc mb -p cognee/${STORAGE_BUCKET_NAME:-cognee-bucket} || true && \ - mc anonymous set private cognee/${STORAGE_BUCKET_NAME:-cognee-bucket} && \ - tail -f /dev/null - " + ports: + - "${MCP_HTTP_PORT:-18001}:18001" networks: - - cognee - profiles: - - self-hosted-minio + - fuzzforge_temporal_network networks: - cognee: - driver: bridge + fuzzforge_temporal_network: + external: true volumes: cognee-system: cognee-data: cognee-postgres: - cognee-minio: diff --git a/docker/Dockerfile.cognee_mcp b/docker/Dockerfile.cognee_mcp new file mode 100644 index 0000000..b022a77 --- /dev/null +++ b/docker/Dockerfile.cognee_mcp @@ -0,0 +1,27 @@ +FROM python:3.11-slim + +ENV PYTHONUNBUFFERED=1 +WORKDIR /app + +# Install system deps needed by cognee + http clients +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + git \ + && rm -rf /var/lib/apt/lists/* + +# Copy source (AI module + Cognee client lib) +COPY ai /app/ai +COPY cognee/cognee /app/cognee + +# Adjust local dependency path for container build +RUN sed -i 's|cognee @ file://../cognee/cognee|cognee @ file:///app/cognee|g' /app/ai/pyproject.toml + +# Install dependencies +RUN pip install --no-cache-dir /app/cognee && \ + pip install --no-cache-dir /app/ai + +# Expose MCP HTTP port +EXPOSE 18001 + +# Default command - can be overridden by docker-compose +CMD ["python", "-m", "fuzzforge_ai.mcp.cognee_mcp_server", "--transport", "http", "--host", "0.0.0.0", "--port", "18001"] diff --git a/test_cognee_mcp.py b/test_cognee_mcp.py new file mode 100644 index 0000000..d71a396 --- /dev/null +++ b/test_cognee_mcp.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +"""Test multi-tenant Cognee MCP via HTTP""" + +import json +import requests + +MCP_URL = "http://localhost:18001/mcp" +HEADERS = { + "Content-Type": "application/json", + "Accept": "application/json, text/event-stream", +} + + +def send_mcp_request(method, params=None, session_id=None, is_notification=False): + """Send MCP JSON-RPC request or notification.""" + + payload = { + "jsonrpc": "2.0", + "method": method, + } + + if not is_notification: + payload["id"] = 1 + + if params: + payload["params"] = params + + headers = HEADERS.copy() + if session_id: + headers["Mcp-Session-Id"] = session_id + + response = requests.post(MCP_URL, json=payload, headers=headers, stream=True) + new_session_id = response.headers.get("Mcp-Session-Id") + + if is_notification: + return None, new_session_id + + for line in response.iter_lines(): + if not line: + continue + line_str = line.decode("utf-8") + if line_str.startswith("data: "): + data = json.loads(line_str[6:]) + return data, new_session_id + + return None, new_session_id + + +def extract_text(result): + """Return the first text content from an MCP result payload.""" + + if not result: + return None + payload = result.get("result") or {} + content = payload.get("content") or [] + if not content: + return None + return content[0].get("text") + + +def test_multi_tenant_mcp(): + print("๐Ÿงช Testing Multi-Tenant Cognee MCP\n") + + # Test 1: Initialize + print("1๏ธโƒฃ Initializing MCP connection...") + result, session_id = send_mcp_request( + "initialize", + { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "test", "version": "1.0"}, + }, + ) + if not result: + print(" โŒ No response from initialize") + return + server_info = result.get("result", {}).get("serverInfo", {}) + print( + f" โœ… Server: {server_info.get('name')} v{server_info.get('version')}" + ) + print(f" โœ… Session ID: {session_id}") + + # Send initialized notification (required by MCP spec) + print("\n1b๏ธโƒฃ Sending initialized notification...") + send_mcp_request( + "notifications/initialized", + {}, + session_id=session_id, + is_notification=True, + ) + print(" โœ… Initialized") + + # Test 2: List tools + print("\n2๏ธโƒฃ Listing available tools...") + result, _ = send_mcp_request("tools/list", session_id=session_id) + if not result or "result" not in result: + print(f" โŒ Error: Got response {result}") + return + tools = result["result"].get("tools", []) + print(f" โœ… Found {len(tools)} tools:") + for tool in tools[:5]: + print(f" - {tool['name']}") + + cognify_tool = next((t for t in tools if t["name"] == "cognify"), None) + if cognify_tool: + params = cognify_tool["inputSchema"].get("properties", {}) + print("\n ๐Ÿ” Cognify tool parameters:") + for field in [ + "tenant", + "dataset", + "run_in_background", + "user_email", + "user_password", + "service_url", + "custom_prompt", + ]: + print(f" - {field}: {'โœ…' if field in params else 'โŒ'}") + else: + print("\n โš ๏ธ Cognify tool not advertised") + + # Test 3: Discover tenants + print("\n3๏ธโƒฃ Fetching configured tenant aliases...") + result, _ = send_mcp_request( + "tools/call", + {"name": "list_tenants", "arguments": {}}, + session_id=session_id, + ) + tenants_payload = {} + tenants_text = extract_text(result) + if tenants_text: + try: + tenants_payload = json.loads(tenants_text) + except json.JSONDecodeError: + print(f" โŒ Failed to parse tenants payload: {tenants_text}") + tenants = tenants_payload.get("tenants", []) if isinstance(tenants_payload, dict) else [] + if tenants: + print(f" โœ… Found tenants: {', '.join(tenants)}") + else: + print(" โš ๏ธ No tenants discovered; configure COGNEE_TENANT_* env vars") + + # Test 4: Trigger cognify for up to two tenants + for idx, tenant in enumerate(tenants[:2], start=1): + print(f"\n{idx + 3}๏ธโƒฃ Triggering cognify for tenant '{tenant}'...") + result, _ = send_mcp_request( + "tools/call", + { + "name": "cognify", + "arguments": { + "tenant": tenant, + "run_in_background": True, + }, + }, + session_id=session_id, + ) + cognify_text = extract_text(result) + if not cognify_text: + print(" โŒ No response from cognify tool") + continue + try: + payload = json.loads(cognify_text) + except json.JSONDecodeError: + print(f" โŒ Unable to parse cognify response: {cognify_text[:100]}...") + continue + dataset = payload.get("dataset") + response = payload.get("response", {}) + status = response.get("status") or response.get("detail") + print(f" โœ… Cognify triggered (dataset={dataset}, status={status})") + + status_result, _ = send_mcp_request( + "tools/call", + {"name": "cognify_status", "arguments": {"tenant": tenant}}, + session_id=session_id, + ) + status_text = extract_text(status_result) + if status_text: + try: + status_payload = json.loads(status_text) + except json.JSONDecodeError: + print(f" โš ๏ธ Status not JSON: {status_text[:100]}...") + else: + dataset_id = status_payload.get("dataset_id") + print(f" ๐Ÿ“ฆ Dataset ID: {dataset_id}") + + if dataset: + print(f" ๐Ÿ”Ž Running search for tenant '{tenant}'...") + search_result, _ = send_mcp_request( + "tools/call", + { + "name": "search", + "arguments": { + "tenant": tenant, + "dataset": dataset, + "query": "hello", + "search_type": "CHUNKS", + "top_k": 3, + }, + }, + session_id=session_id, + ) + search_text = extract_text(search_result) + if search_text: + print(f" ๐Ÿ” Search response preview: {search_text[:80]}...") + + print(f" ๐Ÿง  Triggering memify for tenant '{tenant}'...") + memify_result, _ = send_mcp_request( + "tools/call", + { + "name": "memify", + "arguments": { + "tenant": tenant, + "dataset": dataset, + "run_in_background": True, + "data": "Automated memify check", + }, + }, + session_id=session_id, + ) + memify_text = extract_text(memify_result) + if memify_text: + print(f" ๐Ÿงพ Memify response preview: {memify_text[:80]}...") + else: + print(" โš ๏ธ Skipping search/memify because dataset name was not resolved") + + print("\nโœ… Multi-tenant MCP test complete!") + print("\n๐Ÿ“ Summary:") + print(" - MCP server responding correctly") + print(" - Tools expose tenant or credential parameters") + if tenants: + print(f" - Tested tenants: {', '.join(tenants[:2])}") + else: + print(" - Configure at least one tenant via environment variables") + print(" - Server URL: http://localhost:18001/mcp") + + +if __name__ == "__main__": + try: + test_multi_tenant_mcp() + except Exception as exc: + print(f"โŒ Error: {exc}") + import traceback + + traceback.print_exc()