Extend Cognee MCP mirror with search and memify

This commit is contained in:
Songbird
2025-10-03 20:41:46 +02:00
parent c7adfabe0a
commit 1bb0ec4764
7 changed files with 937 additions and 157 deletions

View File

@@ -18,7 +18,14 @@ providing intelligent AI agent capabilities for security analysis.
__version__ = "0.6.0"
from .agent import FuzzForgeAgent
from .config_manager import ConfigManager
try:
from .agent import FuzzForgeAgent
except ImportError: # pragma: no cover - optional dependency graph
FuzzForgeAgent = None # type: ignore
__all__ = ['FuzzForgeAgent', 'ConfigManager']
try:
from .config_manager import ConfigManager
except ImportError: # pragma: no cover - optional dependency graph
ConfigManager = None # type: ignore
__all__ = ['FuzzForgeAgent', 'ConfigManager']

View File

@@ -214,8 +214,16 @@ class CogneeAPIClient:
f"Cognee ingestion failed ({response.status_code}): {response.text.strip()}"
)
async def cognify(self, dataset_name: str) -> Dict[str, Any]:
payload = {"datasets": [dataset_name]}
async def cognify(
self,
dataset_name: str,
*,
run_in_background: bool = False,
custom_prompt: Optional[str] = None,
) -> Dict[str, Any]:
payload: Dict[str, Any] = {"datasets": [dataset_name], "run_in_background": run_in_background}
if custom_prompt is not None:
payload["custom_prompt"] = custom_prompt
try:
response = await self._client.post("/api/v1/cognify", json=payload)
except httpx.RequestError as exc:
@@ -258,6 +266,10 @@ class CogneeAPIClient:
datasets: Optional[Iterable[str]] = None,
search_type: str = "INSIGHTS",
top_k: Optional[int] = None,
only_context: Optional[bool] = None,
node_name: Optional[Iterable[str]] = None,
system_prompt: Optional[str] = None,
use_combined_context: Optional[bool] = None,
) -> Any:
payload: Dict[str, Any] = {
"query": query,
@@ -267,6 +279,14 @@ class CogneeAPIClient:
payload["datasets"] = list(datasets)
if top_k is not None:
payload["top_k"] = top_k
if only_context is not None:
payload["only_context"] = only_context
if node_name is not None:
payload["node_name"] = list(node_name)
if system_prompt is not None:
payload["system_prompt"] = system_prompt
if use_combined_context is not None:
payload["use_combined_context"] = use_combined_context
try:
response = await self._client.post("/api/v1/search", json=payload)
@@ -274,12 +294,49 @@ class CogneeAPIClient:
raise CogneeAPIError(f"Cognee search request failed: {exc}") from exc
if response.status_code in {200, 201}:
# search may already be JSON serialisable but ensure data type is python object
return response.json()
raise CogneeAPIError(
f"Cognee search failed ({response.status_code}): {response.text.strip()}"
)
async def memify(
self,
*,
dataset_name: Optional[str] = None,
dataset_id: Optional[str] = None,
extraction_tasks: Optional[Iterable[str]] = None,
enrichment_tasks: Optional[Iterable[str]] = None,
data: Optional[str] = None,
node_name: Optional[Iterable[str]] = None,
run_in_background: bool = False,
) -> Dict[str, Any]:
payload: Dict[str, Any] = {
"datasetName": dataset_name,
"datasetId": dataset_id,
"runInBackground": run_in_background,
}
if extraction_tasks is not None:
payload["extractionTasks"] = list(extraction_tasks)
if enrichment_tasks is not None:
payload["enrichmentTasks"] = list(enrichment_tasks)
if data is not None:
payload["data"] = data
if node_name is not None:
payload["nodeName"] = list(node_name)
payload = {key: value for key, value in payload.items() if value is not None}
try:
response = await self._client.post("/api/v1/memify", json=payload)
except httpx.RequestError as exc:
raise CogneeAPIError(f"Cognee memify request failed: {exc}") from exc
if response.status_code in {200, 202}:
return response.json()
raise CogneeAPIError(
f"Cognee memify failed ({response.status_code}): {response.text.strip()}"
)
async def status(self) -> Dict[str, Any]:
"""Return health information along with dataset list."""
info: Dict[str, Any] = {}

View File

@@ -13,18 +13,23 @@
try:
from fuzzforge_cli.config import ProjectConfigManager as _ProjectConfigManager
except ImportError as exc: # pragma: no cover - used when CLI not available
except ImportError: # pragma: no cover - used when CLI not available
_MISSING_MSG = (
"ProjectConfigManager is unavailable. Install the FuzzForge CLI package "
"or supply a compatible configuration object."
)
def _raise_missing() -> None:
raise ImportError(_MISSING_MSG)
class _ProjectConfigManager: # type: ignore[no-redef]
"""Fallback implementation that raises a helpful error."""
def __init__(self, *args, **kwargs):
raise ImportError(
"ProjectConfigManager is unavailable. Install the FuzzForge CLI "
"package or supply a compatible configuration object."
) from exc
_raise_missing()
def __getattr__(name): # pragma: no cover - defensive
raise ImportError("ProjectConfigManager unavailable") from exc
_raise_missing()
ProjectConfigManager = _ProjectConfigManager

View File

@@ -8,7 +8,8 @@ import json
import logging
import os
from pathlib import Path
from typing import List, Optional
from typing import Dict, Iterable, List, Optional
from urllib.parse import urlparse
from dotenv import load_dotenv
from mcp.server import FastMCP
@@ -30,79 +31,293 @@ def _load_project_env() -> None:
load_dotenv(env_path, override=False)
class TenantCredentials:
def __init__(self, alias: str, service_url: str, email: str, password: str, dataset: Optional[str]):
self.alias = alias.lower()
self.service_url = service_url.rstrip("/")
self.email = email
self.password = password
self.dataset = dataset
class CogneeMCPContext:
"""Holds shared state for MCP tools and manages the Cognee API client."""
"""Manages Cognee API clients for multiple tenants."""
T_PREFIX = "COGNEE_TENANT_"
SUPPORTED_SEARCH_TYPES = {
"SUMMARIES",
"INSIGHTS",
"CHUNKS",
"RAG_COMPLETION",
"GRAPH_COMPLETION",
"GRAPH_SUMMARY_COMPLETION",
"CODE",
"CYPHER",
"NATURAL_LANGUAGE",
"GRAPH_COMPLETION_COT",
"GRAPH_COMPLETION_CONTEXT_EXTENSION",
"FEELING_LUCKY",
"FEEDBACK",
"TEMPORAL",
"CODING_RULES",
}
TENANT_CONFIG_DEFAULT = Path(".fuzzforge") / "cognee.tenants.json"
def __init__(self) -> None:
_load_project_env()
self.service_url = os.getenv("COGNEE_SERVICE_URL", "").rstrip("/")
self.verify_ssl = os.getenv("COGNEE_VERIFY_SSL", "true").lower() != "false"
self.email = os.getenv("COGNEE_SERVICE_USER_EMAIL") or os.getenv("DEFAULT_USER_EMAIL")
self.password = os.getenv("COGNEE_SERVICE_USER_PASSWORD") or os.getenv("DEFAULT_USER_PASSWORD")
if not (self.service_url and self.email and self.password):
raise RuntimeError(
"COGNEE_SERVICE_URL, COGNEE_SERVICE_USER_EMAIL and"
" COGNEE_SERVICE_USER_PASSWORD must be set in the environment"
self.tenants = self._load_tenants()
if not self.tenants:
logger.info(
"No preconfigured tenants detected; credentials must be provided per tool call."
)
self._client: Optional[CogneeAPIClient] = None
self.active_dataset_name: Optional[str] = os.getenv("COGNEE_DATASET_NAME") or self._default_dataset_name()
self.active_dataset_id: Optional[str] = None
self.clients: dict[str, CogneeAPIClient] = {}
self.dataset_cache: dict[tuple[str, str], str] = {}
async def start(self) -> None:
self._client = CogneeAPIClient(self.service_url, verify_ssl=self.verify_ssl)
await self._client.ensure_user(self.email, self.password, tenant_name=self._tenant_name())
if self.active_dataset_name:
dataset_info = await self._client.ensure_dataset(self.active_dataset_name)
self.active_dataset_id = dataset_info.get("id")
logger.info(
"Connected to Cognee service",
extra={
"service_url": self.service_url,
"dataset": self.active_dataset_name,
"dataset_id": self.active_dataset_id,
},
)
def _load_tenants(self) -> dict[str, TenantCredentials]:
tenants = self._load_tenants_from_env()
file_tenants = self._load_tenants_from_file()
for alias, creds in file_tenants.items():
if alias in tenants:
existing = tenants[alias]
# Allow config file to augment missing fields from env-derived entry
if not existing.dataset and creds.dataset:
existing.dataset = creds.dataset
if creds.service_url and creds.service_url != existing.service_url:
existing.service_url = creds.service_url
continue
tenants[alias] = creds
return tenants
def _load_tenants_from_env(self) -> dict[str, TenantCredentials]:
tenants: dict[str, TenantCredentials] = {}
service_url_default = os.getenv("COGNEE_SERVICE_URL", "").rstrip("/")
for key, value in os.environ.items():
if not key.startswith(self.T_PREFIX) or not key.endswith("_EMAIL"):
continue
alias = key[len(self.T_PREFIX):-len("_EMAIL")].lower()
email = value
password = os.getenv(f"{self.T_PREFIX}{alias.upper()}_PASSWORD")
dataset = os.getenv(f"{self.T_PREFIX}{alias.upper()}_DATASET")
service_url = (
os.getenv(f"{self.T_PREFIX}{alias.upper()}_SERVICE_URL")
or service_url_default
).rstrip("/")
if not (service_url and email and password):
logger.warning("Skipping tenant '%s' due to incomplete credentials", alias)
continue
tenants[alias] = TenantCredentials(alias, service_url, email, password, dataset)
# fallback to legacy single-tenant vars
if not tenants and service_url_default:
email = os.getenv("COGNEE_SERVICE_USER_EMAIL") or os.getenv("DEFAULT_USER_EMAIL")
password = os.getenv("COGNEE_SERVICE_USER_PASSWORD") or os.getenv("DEFAULT_USER_PASSWORD")
dataset = os.getenv("COGNEE_DATASET_NAME")
if email and password:
tenants["default"] = TenantCredentials("default", service_url_default, email, password, dataset)
return tenants
def _load_tenants_from_file(self) -> dict[str, TenantCredentials]:
tenants: Dict[str, TenantCredentials] = {}
config_path = os.getenv("COGNEE_TENANTS_FILE")
if config_path:
candidate = Path(config_path).expanduser()
else:
candidate = (Path.cwd() / self.TENANT_CONFIG_DEFAULT).resolve()
if not candidate.exists():
return tenants
try:
raw = candidate.read_text(encoding="utf-8")
data = json.loads(raw)
except (OSError, json.JSONDecodeError) as exc:
logger.error("Failed to load tenant config '%s': %s", candidate, exc)
return tenants
entries = []
if isinstance(data, list):
entries = data
elif isinstance(data, dict):
entries = data.get("tenants", [])
else:
logger.error("Tenant config '%s' must be a list or dict with 'tenants'", candidate)
return tenants
for item in entries:
if not isinstance(item, dict):
continue
alias = item.get("alias")
email = item.get("email")
password = item.get("password")
dataset = item.get("dataset")
service_url = (item.get("service_url") or os.getenv("COGNEE_SERVICE_URL", "")).rstrip("/")
if not (alias and email and password and service_url):
logger.warning("Skipping tenant entry with missing fields: %s", item)
continue
tenants[alias.lower()] = TenantCredentials(alias, service_url, email, password, dataset)
if tenants:
logger.info("Loaded %d tenant(s) from %s", len(tenants), candidate)
return tenants
async def get_client(self, alias: str) -> CogneeAPIClient:
alias = alias.lower()
if alias not in self.tenants:
raise ValueError(f"Unknown tenant alias '{alias}'. Available: {', '.join(self.tenants)}")
if alias not in self.clients:
tenant = self.tenants[alias]
client = CogneeAPIClient(tenant.service_url, verify_ssl=self.verify_ssl)
await client.ensure_user(tenant.email, tenant.password, tenant_name=self._tenant_name(alias))
self.clients[alias] = client
return self.clients[alias]
async def close(self) -> None:
if self._client:
await self._client.close()
await asyncio.gather(*(client.close() for client in self.clients.values()))
self.clients.clear()
@property
def client(self) -> CogneeAPIClient:
if not self._client:
raise RuntimeError("Cognee API client is not initialised yet")
return self._client
def _tenant_name(self, alias: str) -> str:
alias = alias.lower()
env_alias = alias.upper().replace(":", "_").replace("@", "_").replace("/", "_")
fallback = alias.replace(":", "-").replace("@", "-").replace("/", "-")
return os.getenv(f"COGNEE_TENANT_{env_alias}_NAME") or fallback
def _tenant_name(self) -> str:
return os.getenv("COGNEE_TENANT_NAME") or os.getenv("COGNEE_PROJECT_ID") or "default-tenant"
def _service_url(self, override: Optional[str]) -> str:
url = (override or os.getenv("COGNEE_SERVICE_URL", "")).strip()
url = url.rstrip("/")
if url:
parsed = urlparse(url)
host = parsed.hostname
if host in {"localhost", "127.0.0.1"}:
internal = os.getenv("COGNEE_INTERNAL_SERVICE_URL", "").rstrip("/")
if internal:
url = internal
if not url:
raise ValueError(
"Service URL not provided. Set COGNEE_SERVICE_URL or pass service_url argument."
)
return url
def _default_dataset_name(self) -> str:
project_name = os.getenv("FUZZFORGE_PROJECT_NAME") or "project"
return f"{project_name}_codebase"
def _alias_for_credentials(self, email: str, service_url: str) -> str:
safe_url = service_url.lower().replace("://", "_").replace("/", "_")
return f"user:{email.lower()}@{safe_url}"
async def ensure_dataset(self, dataset_name: str) -> str:
"""Ensure the dataset exists and cache its id."""
def _ensure_tenant_entry(
self,
*,
tenant: Optional[str],
email: Optional[str],
password: Optional[str],
service_url: Optional[str],
dataset: Optional[str],
) -> tuple[str, TenantCredentials]:
if tenant:
alias = tenant.lower()
if alias not in self.tenants:
raise ValueError(
f"Unknown tenant alias '{alias}'. Available: {', '.join(self.tenants)}"
)
creds = self.tenants[alias]
if service_url and service_url.rstrip("/") != creds.service_url:
raise ValueError(
"service_url override does not match configured tenant service URL"
)
if dataset and not creds.dataset:
creds.dataset = dataset
return alias, creds
info = await self.client.ensure_dataset(dataset_name)
if not (email and password):
raise ValueError(
"Provide either a tenant alias or user_email and user_password credentials."
)
base_url = self._service_url(service_url)
alias = self._alias_for_credentials(email, base_url)
creds = self.tenants.get(alias)
if not creds:
creds = TenantCredentials(alias, base_url, email, password, dataset)
self.tenants[alias] = creds
else:
if dataset and not creds.dataset:
creds.dataset = dataset
# Update password if changed to avoid stale credential reuse
if password != creds.password:
creds.password = password
return alias, creds
async def prepare_client(
self,
*,
tenant: Optional[str],
email: Optional[str],
password: Optional[str],
service_url: Optional[str],
) -> tuple[str, CogneeAPIClient]:
alias, _ = self._ensure_tenant_entry(
tenant=tenant,
email=email,
password=password,
service_url=service_url,
dataset=None,
)
client = await self.get_client(alias)
return alias, client
async def prepare_dataset_request(
self,
*,
tenant: Optional[str],
dataset: Optional[str],
email: Optional[str],
password: Optional[str],
service_url: Optional[str],
) -> tuple[str, CogneeAPIClient, str, str]:
alias, _ = self._ensure_tenant_entry(
tenant=tenant,
email=email,
password=password,
service_url=service_url,
dataset=dataset,
)
client = await self.get_client(alias)
dataset_name, dataset_id = await self.resolve_dataset(alias, dataset)
return alias, client, dataset_name, dataset_id
async def start(self) -> None:
for alias, creds in self.tenants.items():
if creds.dataset:
try:
await self.resolve_dataset(alias, creds.dataset)
except Exception as exc:
logger.warning("Failed to preload dataset for tenant '%s': %s", alias, exc)
logger.info("Tenants available: %s", ", ".join(sorted(self.tenants)))
async def resolve_dataset(self, alias: str, dataset: Optional[str]) -> tuple[str, str]:
alias = alias.lower()
if alias not in self.tenants:
raise ValueError(f"Unknown tenant alias '{alias}'. Available: {', '.join(self.tenants)}")
tenant = self.tenants[alias]
dataset_name = dataset or tenant.dataset
if not dataset_name:
env_alias = alias.upper().replace(":", "_").replace("@", "_").replace("/", "_")
raise ValueError(
"Dataset not specified. Provide the dataset argument or set "
f"COGNEE_TENANT_{env_alias}_DATASET."
)
cache_key = (alias, dataset_name)
if cache_key in self.dataset_cache:
return dataset_name, self.dataset_cache[cache_key]
client = await self.get_client(alias)
info = await client.ensure_dataset(dataset_name)
dataset_id = info.get("id")
if not dataset_id:
raise RuntimeError(f"Failed to resolve dataset id for '{dataset_name}'")
self.active_dataset_name = dataset_name
self.active_dataset_id = dataset_id
return dataset_id
self.dataset_cache[cache_key] = dataset_id
return dataset_name, dataset_id
async def resolve_dataset(self, dataset: Optional[str]) -> tuple[str, str]:
name = dataset or self.active_dataset_name
if not name:
raise ValueError("Dataset is not specified; set COGNEE_DATASET_NAME or call set_dataset first")
# if the requested dataset differs from the cached one, ensure it
if dataset and dataset != self.active_dataset_name:
dataset_id = await self.ensure_dataset(dataset)
else:
dataset_id = self.active_dataset_id
if not dataset_id:
dataset_id = await self.ensure_dataset(name)
return name, dataset_id
def list_tenants(self) -> List[str]:
return sorted(self.tenants.keys())
CONTEXT = CogneeMCPContext()
@@ -113,29 +328,44 @@ def _text(message: str) -> List[types.TextContent]:
@mcp.tool()
async def set_dataset(dataset: str) -> List[types.TextContent]:
"""Switch the active dataset for subsequent operations."""
async def list_tenants() -> List[types.TextContent]:
"""List known tenant aliases (configured or dynamically added)."""
dataset_id = await CONTEXT.ensure_dataset(dataset)
return _text(f"Active dataset set to '{dataset}' (id: {dataset_id}).")
aliases = CONTEXT.list_tenants()
return _text(json.dumps({"tenants": aliases}, indent=2))
@mcp.tool()
async def ingest_file(path: str, dataset: Optional[str] = None) -> List[types.TextContent]:
"""Upload a local file into the Cognee dataset."""
async def ingest_file(
path: str,
tenant: Optional[str] = None,
dataset: Optional[str] = None,
user_email: Optional[str] = None,
user_password: Optional[str] = None,
service_url: Optional[str] = None,
) -> List[types.TextContent]:
"""Upload a local file into the Cognee dataset for a tenant or explicit user."""
file_path = Path(path).expanduser().resolve()
if not file_path.is_file():
raise ValueError(f"File not found: {file_path}")
dataset_name, _ = await CONTEXT.resolve_dataset(dataset)
result = await CONTEXT.client.ingest_files(dataset_name, [file_path])
alias, client, dataset_name, _ = await CONTEXT.prepare_dataset_request(
tenant=tenant,
dataset=dataset,
email=user_email,
password=user_password,
service_url=service_url,
)
result = await client.ingest_files(dataset_name, [file_path])
success = result.get("success", 0)
failed = result.get("failed", 0)
errors = result.get("errors", [])
message = [
f"Uploaded {success} file(s) to dataset '{dataset_name}'.",
f"Tenant: {tenant or alias}",
f"Dataset: {dataset_name}",
f"Uploaded: {success} file(s)",
f"Failed: {failed}",
]
if errors:
@@ -145,70 +375,272 @@ async def ingest_file(path: str, dataset: Optional[str] = None) -> List[types.Te
@mcp.tool()
async def cognify(run_in_background: bool = False, dataset: Optional[str] = None) -> List[types.TextContent]:
async def cognify(
tenant: Optional[str] = None,
dataset: Optional[str] = None,
run_in_background: bool = False,
user_email: Optional[str] = None,
user_password: Optional[str] = None,
service_url: Optional[str] = None,
custom_prompt: Optional[str] = None,
) -> List[types.TextContent]:
"""Trigger the Cognify pipeline for the dataset."""
dataset_name, _ = await CONTEXT.resolve_dataset(dataset)
response = await CONTEXT.client.cognify(dataset_name, run_in_background=run_in_background)
return _text(json.dumps(response, indent=2))
alias, client, dataset_name, _ = await CONTEXT.prepare_dataset_request(
tenant=tenant,
dataset=dataset,
email=user_email,
password=user_password,
service_url=service_url,
)
response = await client.cognify(
dataset_name,
run_in_background=run_in_background,
custom_prompt=custom_prompt,
)
payload = {
"tenant": tenant or alias,
"dataset": dataset_name,
"response": response,
}
return _text(json.dumps(payload, indent=2))
@mcp.tool()
async def cognify_status(dataset: Optional[str] = None) -> List[types.TextContent]:
async def cognify_status(
tenant: Optional[str] = None,
dataset: Optional[str] = None,
user_email: Optional[str] = None,
user_password: Optional[str] = None,
service_url: Optional[str] = None,
) -> List[types.TextContent]:
"""Return the status of recent Cognify runs."""
_, dataset_id = await CONTEXT.resolve_dataset(dataset)
status = await CONTEXT.client.status()
status.setdefault("dataset_ids", []).append(dataset_id)
return _text(json.dumps(status, indent=2))
alias, client, _, dataset_id = await CONTEXT.prepare_dataset_request(
tenant=tenant,
dataset=dataset,
email=user_email,
password=user_password,
service_url=service_url,
)
status = await client.status()
payload = {
"tenant": tenant or alias,
"dataset_id": dataset_id,
"status": status,
}
return _text(json.dumps(payload, indent=2))
@mcp.tool()
async def search_chunks(query: str, top_k: int = 5, dataset: Optional[str] = None) -> List[types.TextContent]:
async def search_chunks(
query: str,
tenant: Optional[str] = None,
top_k: int = 5,
dataset: Optional[str] = None,
user_email: Optional[str] = None,
user_password: Optional[str] = None,
service_url: Optional[str] = None,
) -> List[types.TextContent]:
"""Search dataset for relevant document chunks."""
dataset_name, _ = await CONTEXT.resolve_dataset(dataset)
result = await CONTEXT.client.search(
alias, client, dataset_name, _ = await CONTEXT.prepare_dataset_request(
tenant=tenant,
dataset=dataset,
email=user_email,
password=user_password,
service_url=service_url,
)
result = await client.search(
query,
datasets=[dataset_name],
search_type="CHUNKS",
top_k=top_k,
)
return _text(json.dumps(result, indent=2))
payload = {
"tenant": tenant or alias,
"dataset": dataset_name,
"results": result,
}
return _text(json.dumps(payload, indent=2))
@mcp.tool()
async def search_insights(query: str, top_k: int = 5, dataset: Optional[str] = None) -> List[types.TextContent]:
async def search_insights(
query: str,
tenant: Optional[str] = None,
top_k: int = 5,
dataset: Optional[str] = None,
user_email: Optional[str] = None,
user_password: Optional[str] = None,
service_url: Optional[str] = None,
) -> List[types.TextContent]:
"""Search dataset for graph insights."""
dataset_name, _ = await CONTEXT.resolve_dataset(dataset)
result = await CONTEXT.client.search(
alias, client, dataset_name, _ = await CONTEXT.prepare_dataset_request(
tenant=tenant,
dataset=dataset,
email=user_email,
password=user_password,
service_url=service_url,
)
result = await client.search(
query,
datasets=[dataset_name],
search_type="INSIGHTS",
top_k=top_k,
)
payload = {
"tenant": tenant or alias,
"dataset": dataset_name,
"results": result,
}
return _text(json.dumps(payload, indent=2))
@mcp.tool()
async def search(
query: str,
search_type: str = "CHUNKS",
tenant: Optional[str] = None,
dataset: Optional[str] = None,
top_k: Optional[int] = None,
only_context: Optional[bool] = None,
node_name: Optional[List[str]] = None,
system_prompt: Optional[str] = None,
use_combined_context: Optional[bool] = None,
user_email: Optional[str] = None,
user_password: Optional[str] = None,
service_url: Optional[str] = None,
) -> List[types.TextContent]:
"""Execute a Cognee semantic search with custom modes."""
alias, client, dataset_name, _ = await CONTEXT.prepare_dataset_request(
tenant=tenant,
dataset=dataset,
email=user_email,
password=user_password,
service_url=service_url,
)
search_type_upper = search_type.upper()
if search_type_upper not in CONTEXT.SUPPORTED_SEARCH_TYPES:
raise ValueError(
"Unsupported search_type. Choose one of: "
+ ", ".join(sorted(CONTEXT.SUPPORTED_SEARCH_TYPES))
)
node_iter: Optional[Iterable[str]] = node_name if node_name is not None else None
result = await client.search(
query,
datasets=[dataset_name],
search_type=search_type_upper,
top_k=top_k,
only_context=only_context,
node_name=node_iter,
system_prompt=system_prompt,
use_combined_context=use_combined_context,
)
payload = {
"tenant": tenant or alias,
"dataset": dataset_name,
"search_type": search_type_upper,
"results": result,
}
return _text(json.dumps(payload, indent=2))
@mcp.tool()
async def memify(
tenant: Optional[str] = None,
dataset: Optional[str] = None,
extraction_tasks: Optional[List[str]] = None,
enrichment_tasks: Optional[List[str]] = None,
data: Optional[str] = None,
node_name: Optional[List[str]] = None,
run_in_background: bool = False,
user_email: Optional[str] = None,
user_password: Optional[str] = None,
service_url: Optional[str] = None,
) -> List[types.TextContent]:
"""Run the Memify enrichment pipeline for a dataset."""
alias, client, dataset_name, dataset_id = await CONTEXT.prepare_dataset_request(
tenant=tenant,
dataset=dataset,
email=user_email,
password=user_password,
service_url=service_url,
)
response = await client.memify(
dataset_name=dataset_name,
dataset_id=dataset_id,
extraction_tasks=extraction_tasks,
enrichment_tasks=enrichment_tasks,
data=data,
node_name=node_name,
run_in_background=run_in_background,
)
payload = {
"tenant": tenant or alias,
"dataset": dataset_name,
"response": response,
}
return _text(json.dumps(payload, indent=2))
@mcp.tool()
async def list_datasets(
tenant: Optional[str] = None,
user_email: Optional[str] = None,
user_password: Optional[str] = None,
service_url: Optional[str] = None,
) -> List[types.TextContent]:
"""List datasets accessible to configured tenants or provided user credentials."""
result: dict[str, List[dict[str, object]]] = {}
if tenant or user_email:
alias, client = await CONTEXT.prepare_client(
tenant=tenant,
email=user_email,
password=user_password,
service_url=service_url,
)
result[tenant or alias] = await client.list_datasets()
else:
for alias in CONTEXT.list_tenants():
client = await CONTEXT.get_client(alias)
datasets = await client.list_datasets()
result[alias] = datasets
return _text(json.dumps(result, indent=2))
@mcp.tool()
async def list_datasets() -> List[types.TextContent]:
"""List datasets accessible to the current service user."""
datasets = await CONTEXT.client.list_datasets()
return _text(json.dumps(datasets, indent=2))
@mcp.tool()
async def prune_dataset(dataset: Optional[str] = None) -> List[types.TextContent]:
async def prune_dataset(
tenant: Optional[str] = None,
dataset: Optional[str] = None,
user_email: Optional[str] = None,
user_password: Optional[str] = None,
service_url: Optional[str] = None,
) -> List[types.TextContent]:
"""Delete the dataset and recreate it empty."""
dataset_name, dataset_id = await CONTEXT.resolve_dataset(dataset)
await CONTEXT.client.delete_dataset(dataset_id)
dataset_info = await CONTEXT.client.ensure_dataset(dataset_name)
CONTEXT.active_dataset_name = dataset_name
CONTEXT.active_dataset_id = dataset_info.get("id")
return _text(f"Dataset '{dataset_name}' has been pruned and recreated.")
alias, client, dataset_name, dataset_id = await CONTEXT.prepare_dataset_request(
tenant=tenant,
dataset=dataset,
email=user_email,
password=user_password,
service_url=service_url,
)
await client.delete_dataset(dataset_id)
dataset_info = await client.ensure_dataset(dataset_name)
CONTEXT.dataset_cache[(alias, dataset_name)] = dataset_info.get("id")
return _text(
f"Dataset '{dataset_name}' (tenant {tenant or alias}) has been pruned and recreated."
)
async def _async_main(args: argparse.Namespace) -> None:
@@ -222,6 +654,14 @@ async def _async_main(args: argparse.Namespace) -> None:
port = args.port
path = args.path
# Configure FastMCP server settings before launch
mcp.settings.host = host
mcp.settings.port = port
if transport == "http":
mcp.settings.streamable_http_path = path
elif transport == "sse":
mcp.settings.sse_path = path
logger.info(
"Starting Cognee MCP server",
extra={
@@ -229,17 +669,17 @@ async def _async_main(args: argparse.Namespace) -> None:
"host": host,
"port": port,
"path": path,
"dataset": CONTEXT.dataset_name,
"tenants": CONTEXT.list_tenants(),
},
)
try:
if transport == "http":
await mcp.run_http(host=host, port=port, path=path)
await mcp.run_streamable_http_async()
elif transport == "sse":
await mcp.run_sse(host=host, port=port, path=path)
await mcp.run_sse_async(path)
else:
await mcp.run_stdio()
await mcp.run_stdio_async()
finally:
await CONTEXT.close()

View File

@@ -16,23 +16,23 @@ services:
DATA_ROOT_DIRECTORY: ${DATA_ROOT_DIRECTORY:-s3://cognee-bucket/data}
SYSTEM_ROOT_DIRECTORY: ${SYSTEM_ROOT_DIRECTORY:-s3://cognee-bucket/system}
# Uppercase for standard env
AWS_ENDPOINT_URL: ${AWS_ENDPOINT_URL:-http://minio:9000}
AWS_ENDPOINT_URL: ${AWS_ENDPOINT_URL:-http://fuzzforge-minio:9000}
AWS_REGION: ${AWS_REGION:-us-east-1}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-fuzzforge}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123}
# Lowercase for boto3/s3fs
aws_endpoint_url: ${AWS_ENDPOINT_URL:-http://minio:9000}
aws_endpoint_url: ${AWS_ENDPOINT_URL:-http://fuzzforge-minio:9000}
aws_region: ${AWS_REGION:-us-east-1}
aws_access_key_id: ${AWS_ACCESS_KEY_ID:-fuzzforge}
aws_secret_access_key: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123}
# For LanceDB object_store
S3_ENDPOINT: ${AWS_ENDPOINT_URL:-http://minio:9000}
S3_ENDPOINT: ${AWS_ENDPOINT_URL:-http://fuzzforge-minio:9000}
S3_REGION: ${AWS_REGION:-us-east-1}
S3_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-fuzzforge}
S3_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123}
S3_ALLOW_HTTP: "true"
DB_PROVIDER: ${DB_PROVIDER:-postgres}
DB_HOST: ${DB_HOST:-postgres}
DB_HOST: ${DB_HOST:-cognee-postgres}
DB_PORT: ${DB_PORT:-5432}
DB_NAME: ${DB_NAME:-cognee_db}
DB_USERNAME: ${DB_USERNAME:-cognee}
@@ -43,13 +43,14 @@ services:
- cognee-system:/app/.cognee_system
- cognee-data:/app/.data_storage
ports:
- "${COGNEE_SERVICE_PORT:-8000}:8000"
- "${COGNEE_SERVICE_PORT:-18000}:8000"
depends_on:
- postgres
- cognee-postgres
- cognee-minio-setup
networks:
- cognee
- fuzzforge_temporal_network
postgres:
cognee-postgres:
image: pgvector/pgvector:pg17
container_name: cognee-postgres
env_file:
@@ -61,56 +62,57 @@ services:
volumes:
- cognee-postgres:/var/lib/postgresql/data
ports:
- "5432:5432"
- "5433:5432"
networks:
- cognee
- fuzzforge_temporal_network
minio:
image: minio/minio:latest
container_name: cognee-minio
command: server /data --console-address ":9001"
cognee-minio-setup:
image: minio/mc:latest
container_name: cognee-minio-setup
env_file:
- *cognee_env_file
entrypoint: >
/bin/sh -c "
echo 'Setting up Cognee bucket in MinIO...';
sleep 5;
mc alias set fuzzforge http://fuzzforge-minio:9000 fuzzforge fuzzforge123;
mc mb fuzzforge/cognee-bucket --ignore-existing;
mc anonymous set private fuzzforge/cognee-bucket;
echo 'Cognee bucket setup complete!';
exit 0;
"
networks:
- fuzzforge_temporal_network
cognee-mcp:
build:
context: .
dockerfile: docker/Dockerfile.cognee_mcp
container_name: cognee-mcp
env_file:
- *cognee_env_file
environment:
MINIO_ROOT_USER: ${AWS_ACCESS_KEY_ID:-fuzzforge}
MINIO_ROOT_PASSWORD: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123}
COGNEE_TENANTS_FILE: ${COGNEE_TENANTS_FILE:-/config/cognee.tenants.json}
COGNEE_SERVICE_URL: ${COGNEE_SERVICE_URL:-http://cognee-service:8000}
COGNEE_INTERNAL_SERVICE_URL: ${COGNEE_INTERNAL_SERVICE_URL:-http://cognee-service:8000}
MCP_HOST: 0.0.0.0
MCP_PORT: ${MCP_PORT:-18001}
MCP_HTTP_PATH: ${MCP_HTTP_PATH:-/mcp}
volumes:
- cognee-minio:/data
ports:
- "9000:9000"
- "9001:9001"
networks:
- cognee
profiles:
- self-hosted-minio
minio-mc:
image: minio/mc:latest
container_name: cognee-minio-mc
- ${COGNEE_TENANTS_FILE:-/dev/null}:/config/cognee.tenants.json:ro
depends_on:
minio:
cognee:
condition: service_started
env_file:
- *cognee_env_file
entrypoint: |
/bin/sh -c "
sleep 3 && \
mc alias set cognee ${AWS_ENDPOINT_URL:-http://minio:9000} ${AWS_ACCESS_KEY_ID:-fuzzforge} ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} && \
mc mb -p cognee/${STORAGE_BUCKET_NAME:-cognee-bucket} || true && \
mc anonymous set private cognee/${STORAGE_BUCKET_NAME:-cognee-bucket} && \
tail -f /dev/null
"
ports:
- "${MCP_HTTP_PORT:-18001}:18001"
networks:
- cognee
profiles:
- self-hosted-minio
- fuzzforge_temporal_network
networks:
cognee:
driver: bridge
fuzzforge_temporal_network:
external: true
volumes:
cognee-system:
cognee-data:
cognee-postgres:
cognee-minio:

View File

@@ -0,0 +1,27 @@
FROM python:3.11-slim
ENV PYTHONUNBUFFERED=1
WORKDIR /app
# Install system deps needed by cognee + http clients
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
git \
&& rm -rf /var/lib/apt/lists/*
# Copy source (AI module + Cognee client lib)
COPY ai /app/ai
COPY cognee/cognee /app/cognee
# Adjust local dependency path for container build
RUN sed -i 's|cognee @ file://../cognee/cognee|cognee @ file:///app/cognee|g' /app/ai/pyproject.toml
# Install dependencies
RUN pip install --no-cache-dir /app/cognee && \
pip install --no-cache-dir /app/ai
# Expose MCP HTTP port
EXPOSE 18001
# Default command - can be overridden by docker-compose
CMD ["python", "-m", "fuzzforge_ai.mcp.cognee_mcp_server", "--transport", "http", "--host", "0.0.0.0", "--port", "18001"]

242
test_cognee_mcp.py Normal file
View File

@@ -0,0 +1,242 @@
#!/usr/bin/env python3
"""Test multi-tenant Cognee MCP via HTTP"""
import json
import requests
MCP_URL = "http://localhost:18001/mcp"
HEADERS = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
}
def send_mcp_request(method, params=None, session_id=None, is_notification=False):
"""Send MCP JSON-RPC request or notification."""
payload = {
"jsonrpc": "2.0",
"method": method,
}
if not is_notification:
payload["id"] = 1
if params:
payload["params"] = params
headers = HEADERS.copy()
if session_id:
headers["Mcp-Session-Id"] = session_id
response = requests.post(MCP_URL, json=payload, headers=headers, stream=True)
new_session_id = response.headers.get("Mcp-Session-Id")
if is_notification:
return None, new_session_id
for line in response.iter_lines():
if not line:
continue
line_str = line.decode("utf-8")
if line_str.startswith("data: "):
data = json.loads(line_str[6:])
return data, new_session_id
return None, new_session_id
def extract_text(result):
"""Return the first text content from an MCP result payload."""
if not result:
return None
payload = result.get("result") or {}
content = payload.get("content") or []
if not content:
return None
return content[0].get("text")
def test_multi_tenant_mcp():
print("🧪 Testing Multi-Tenant Cognee MCP\n")
# Test 1: Initialize
print("1⃣ Initializing MCP connection...")
result, session_id = send_mcp_request(
"initialize",
{
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test", "version": "1.0"},
},
)
if not result:
print(" ❌ No response from initialize")
return
server_info = result.get("result", {}).get("serverInfo", {})
print(
f" ✅ Server: {server_info.get('name')} v{server_info.get('version')}"
)
print(f" ✅ Session ID: {session_id}")
# Send initialized notification (required by MCP spec)
print("\n1b⃣ Sending initialized notification...")
send_mcp_request(
"notifications/initialized",
{},
session_id=session_id,
is_notification=True,
)
print(" ✅ Initialized")
# Test 2: List tools
print("\n2⃣ Listing available tools...")
result, _ = send_mcp_request("tools/list", session_id=session_id)
if not result or "result" not in result:
print(f" ❌ Error: Got response {result}")
return
tools = result["result"].get("tools", [])
print(f" ✅ Found {len(tools)} tools:")
for tool in tools[:5]:
print(f" - {tool['name']}")
cognify_tool = next((t for t in tools if t["name"] == "cognify"), None)
if cognify_tool:
params = cognify_tool["inputSchema"].get("properties", {})
print("\n 🔍 Cognify tool parameters:")
for field in [
"tenant",
"dataset",
"run_in_background",
"user_email",
"user_password",
"service_url",
"custom_prompt",
]:
print(f" - {field}: {'' if field in params else ''}")
else:
print("\n ⚠️ Cognify tool not advertised")
# Test 3: Discover tenants
print("\n3⃣ Fetching configured tenant aliases...")
result, _ = send_mcp_request(
"tools/call",
{"name": "list_tenants", "arguments": {}},
session_id=session_id,
)
tenants_payload = {}
tenants_text = extract_text(result)
if tenants_text:
try:
tenants_payload = json.loads(tenants_text)
except json.JSONDecodeError:
print(f" ❌ Failed to parse tenants payload: {tenants_text}")
tenants = tenants_payload.get("tenants", []) if isinstance(tenants_payload, dict) else []
if tenants:
print(f" ✅ Found tenants: {', '.join(tenants)}")
else:
print(" ⚠️ No tenants discovered; configure COGNEE_TENANT_* env vars")
# Test 4: Trigger cognify for up to two tenants
for idx, tenant in enumerate(tenants[:2], start=1):
print(f"\n{idx + 3}️⃣ Triggering cognify for tenant '{tenant}'...")
result, _ = send_mcp_request(
"tools/call",
{
"name": "cognify",
"arguments": {
"tenant": tenant,
"run_in_background": True,
},
},
session_id=session_id,
)
cognify_text = extract_text(result)
if not cognify_text:
print(" ❌ No response from cognify tool")
continue
try:
payload = json.loads(cognify_text)
except json.JSONDecodeError:
print(f" ❌ Unable to parse cognify response: {cognify_text[:100]}...")
continue
dataset = payload.get("dataset")
response = payload.get("response", {})
status = response.get("status") or response.get("detail")
print(f" ✅ Cognify triggered (dataset={dataset}, status={status})")
status_result, _ = send_mcp_request(
"tools/call",
{"name": "cognify_status", "arguments": {"tenant": tenant}},
session_id=session_id,
)
status_text = extract_text(status_result)
if status_text:
try:
status_payload = json.loads(status_text)
except json.JSONDecodeError:
print(f" ⚠️ Status not JSON: {status_text[:100]}...")
else:
dataset_id = status_payload.get("dataset_id")
print(f" 📦 Dataset ID: {dataset_id}")
if dataset:
print(f" 🔎 Running search for tenant '{tenant}'...")
search_result, _ = send_mcp_request(
"tools/call",
{
"name": "search",
"arguments": {
"tenant": tenant,
"dataset": dataset,
"query": "hello",
"search_type": "CHUNKS",
"top_k": 3,
},
},
session_id=session_id,
)
search_text = extract_text(search_result)
if search_text:
print(f" 🔍 Search response preview: {search_text[:80]}...")
print(f" 🧠 Triggering memify for tenant '{tenant}'...")
memify_result, _ = send_mcp_request(
"tools/call",
{
"name": "memify",
"arguments": {
"tenant": tenant,
"dataset": dataset,
"run_in_background": True,
"data": "Automated memify check",
},
},
session_id=session_id,
)
memify_text = extract_text(memify_result)
if memify_text:
print(f" 🧾 Memify response preview: {memify_text[:80]}...")
else:
print(" ⚠️ Skipping search/memify because dataset name was not resolved")
print("\n✅ Multi-tenant MCP test complete!")
print("\n📝 Summary:")
print(" - MCP server responding correctly")
print(" - Tools expose tenant or credential parameters")
if tenants:
print(f" - Tested tenants: {', '.join(tenants[:2])}")
else:
print(" - Configure at least one tenant via environment variables")
print(" - Server URL: http://localhost:18001/mcp")
if __name__ == "__main__":
try:
test_multi_tenant_mcp()
except Exception as exc:
print(f"❌ Error: {exc}")
import traceback
traceback.print_exc()