diff --git a/.gitignore b/.gitignore index e6520c3..87e78a1 100644 --- a/.gitignore +++ b/.gitignore @@ -288,4 +288,5 @@ test_projects/*/wallet.json test_projects/*/.npmrc test_projects/*/.git-credentials test_projects/*/credentials.* -test_projects/*/api_keys.* \ No newline at end of file +test_projects/*/api_keys.* +cognee/.env.service diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..2a0ffc8 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "cognee/cognee"] + path = cognee/cognee + url = https://github.com/Songbird99/cognee.git + branch = main diff --git a/COGNEE_S3_SYNC_NOTES.md b/COGNEE_S3_SYNC_NOTES.md new file mode 100644 index 0000000..1b9f400 --- /dev/null +++ b/COGNEE_S3_SYNC_NOTES.md @@ -0,0 +1,40 @@ +# Cognee LanceDB S3 Sync Notes + +Date: 2025-10-02 / 2025-10-03 + +## Summary +- `fuzzforge ingest` returned `NoDataError` in multi-tenant S3 mode because LanceDB collections were only written to the local volume (`/app/.cognee_system`) and never mirrored to S3. +- Kuzu (`*.pkl`) already synced correctly; only LanceDB collections were missing. +- Added `_sync_local_db_to_s3()` helper in `cognee/infrastructure/databases/vector/lancedb/LanceDBAdapter.py` and call it after `create_data_points` and `delete_data_points` so every write publishes the Lance directory to S3. +- Rebuilt the `cognee` service container and restarted on port `18000` (port 8000 was in use). +- Wiped the old dataset state (Postgres tables, local Lance directory, S3 prefix) and re-ran `fuzzforge ingest --force ./test_ingest.md`. +- Verified the ingest now succeeds, insights/chunks return data, and LanceDB collections appear under `s3://cognee-bucket/system/databases//.lance.db/`. + +## Useful Commands +``` +# rebuild and restart service +docker compose -f docker-compose.cognee.yaml build cognee +COGNEE_SERVICE_PORT=18000 docker compose -f docker-compose.cognee.yaml up -d cognee + +# clean old dataset data +docker compose -f docker-compose.cognee.yaml exec postgres psql -U cognee -d cognee_db \ + -c "DELETE FROM dataset_data; DELETE FROM data; DELETE FROM datasets; DELETE FROM dataset_database;" + +# remove local Lance directory & S3 prefix (optional reset) +docker compose -f docker-compose.cognee.yaml exec cognee rm -rf /app/.cognee_system/databases/ +docker compose -f docker-compose.cognee.yaml exec minio-mc mc rm -r --force \ + cognee/cognee-bucket/system/databases/ + +# re-ingest project (from project root) +fuzzforge ingest --force ./test_ingest.md + +# confirm LanceDB collections in S3 +docker compose -f docker-compose.cognee.yaml exec minio-mc \ + mc ls -r cognee/cognee-bucket/system/databases//.lance.db/ +``` + +## Verification +- API search (`CHUNKS` and `INSIGHTS`) now returns data. +- `mc ls` shows LanceDB tables in S3. +- No more `DocumentChunk_text collection not found` errors in the service logs. + diff --git a/ai/pyproject.toml b/ai/pyproject.toml index ef62383..aaf2df0 100644 --- a/ai/pyproject.toml +++ b/ai/pyproject.toml @@ -16,7 +16,8 @@ dependencies = [ "fastmcp", "mcp", "typing-extensions", - "cognee>=0.3.0", + "cognee @ file://../cognee/cognee", + "python-dotenv", ] [project.optional-dependencies] diff --git a/ai/src/fuzzforge_ai/cognee_api_client.py b/ai/src/fuzzforge_ai/cognee_api_client.py new file mode 100644 index 0000000..21ebfc5 --- /dev/null +++ b/ai/src/fuzzforge_ai/cognee_api_client.py @@ -0,0 +1,316 @@ +"""Async HTTP client for interacting with a remote Cognee service.""" + +from __future__ import annotations + +import json +import logging +import mimetypes +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Sequence + +import httpx + +logger = logging.getLogger(__name__) + + +class CogneeAPIError(RuntimeError): + """Raised when the Cognee service returns an unexpected response.""" + + +class CogneeAPIClient: + """Thin async wrapper around Cognee's HTTP API.""" + + def __init__(self, base_url: str, timeout: float = 180.0, verify_ssl: Optional[bool] = None): + if not base_url: + raise ValueError("Cognee service URL must be provided") + self.base_url = base_url.rstrip("/") + kwargs: Dict[str, Any] = {"timeout": timeout, "base_url": self.base_url, "follow_redirects": True} + if verify_ssl is not None: + kwargs["verify"] = verify_ssl + self._client = httpx.AsyncClient(**kwargs) + self._token: Optional[str] = None + + async def close(self) -> None: + await self._client.aclose() + + # ------------------------------------------------------------------ + # Authentication helpers + # ------------------------------------------------------------------ + async def login(self, email: str, password: str) -> bool: + """Attempt to authenticate the user and store the JWT token.""" + form = {"username": email, "password": password} + try: + response = await self._client.post("/api/v1/auth/login", data=form) + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee login request failed: {exc}") from exc + + if response.status_code == 200: + data = response.json() + self._token = data.get("access_token") + if self._token: + # Update client headers with Bearer token + self._client.headers.update({"Authorization": f"Bearer {self._token}"}) + return True + if response.status_code in {400, 401}: # Invalid credentials / user not found + return False + raise CogneeAPIError( + f"Unexpected login status {response.status_code}: {response.text.strip()}" + ) + + async def register(self, email: str, password: str) -> bool: + """Register a new user; returns True when created, False if it already exists.""" + payload = {"email": email, "password": password} + try: + response = await self._client.post("/api/v1/auth/register", json=payload) + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee register request failed: {exc}") from exc + + if response.status_code in {200, 201}: + return True + if response.status_code in {400, 409}: # already exists / validation error + logger.debug("Cognee register response %s: %s", response.status_code, response.text) + return False + raise CogneeAPIError( + f"Unexpected register status {response.status_code}: {response.text.strip()}" + ) + + async def ensure_user(self, email: str, password: str, tenant_name: str) -> Dict[str, Any]: + """Ensure a user exists, authenticate, and provision tenant if needed.""" + logged_in = await self.login(email, password) + if not logged_in: + created = await self.register(email, password) + if not created: + # User may already exist with different password; try login again to surface error + logged_in = await self.login(email, password) + if not logged_in: + raise CogneeAPIError("Unable to authenticate with Cognee service") + else: + # Newly created -> login to capture cookies + if not await self.login(email, password): + raise CogneeAPIError("Login failed after registering Cognee service user") + + # Fetch user profile - if this fails with 401, auth may not be enforced + try: + profile = await self._get_me() + except CogneeAPIError as exc: + if "401" in str(exc): + # Auth endpoints exist but may not be enforced - create minimal profile + logger.debug("Auth endpoints returned 401, using minimal profile") + profile = {"email": email, "id": None} + else: + raise + + # Ensure tenant exists and is associated with the user + if tenant_name: + await self._create_tenant_if_needed(tenant_name) + profile["tenant_name"] = tenant_name + + return profile + + async def _get_me(self) -> Dict[str, Any]: + try: + response = await self._client.get("/api/v1/auth/me") + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee profile request failed: {exc}") from exc + + if response.status_code == 200: + return response.json() + raise CogneeAPIError( + f"Failed to fetch Cognee profile ({response.status_code}): {response.text.strip()}" + ) + + async def _create_tenant_if_needed(self, tenant_name: str) -> None: + try: + response = await self._client.post( + "/api/v1/permissions/tenants", params={"tenant_name": tenant_name} + ) + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee tenant request failed: {exc}") from exc + + if response.status_code in {200, 409}: # created or already exists + return + # FastAPI may surface validation errors as 400/422; treat as already exists + if response.status_code in {400, 422}: + logger.debug("Tenant creation returned %s: %s", response.status_code, response.text) + return + raise CogneeAPIError( + f"Failed to ensure Cognee tenant ({response.status_code}): {response.text.strip()}" + ) + + # ------------------------------------------------------------------ + # Dataset helpers + # ------------------------------------------------------------------ + async def ensure_dataset(self, dataset_name: str) -> Dict[str, Any]: + payload = {"name": dataset_name} + try: + response = await self._client.post("/api/v1/datasets", json=payload) + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee dataset request failed: {exc}") from exc + + if response.status_code in {200, 201}: + return response.json() + raise CogneeAPIError( + f"Failed to ensure dataset {dataset_name} ({response.status_code}): {response.text.strip()}" + ) + + async def delete_dataset(self, dataset_id: str) -> None: + try: + response = await self._client.delete(f"/api/v1/datasets/{dataset_id}") + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee dataset delete failed: {exc}") from exc + + if response.status_code in {200, 204}: + return + if response.status_code == 404: + return + raise CogneeAPIError( + f"Failed to delete dataset ({response.status_code}): {response.text.strip()}" + ) + + async def list_datasets(self) -> List[Dict[str, Any]]: + try: + response = await self._client.get("/api/v1/datasets") + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee datasets list failed: {exc}") from exc + + if response.status_code == 200: + return response.json() + raise CogneeAPIError( + f"Failed to list datasets ({response.status_code}): {response.text.strip()}" + ) + + # ------------------------------------------------------------------ + # Ingestion and processing + # ------------------------------------------------------------------ + async def ingest_files(self, dataset_name: str, files: Sequence[Path]) -> Dict[str, Any]: + form_data = {"datasetName": dataset_name} + multipart_files: List[tuple[str, tuple[str, Any, Optional[str]]]] = [] + for path in files: + mime_type, _ = mimetypes.guess_type(path.name) + try: + content = path.read_bytes() + except OSError as exc: + raise CogneeAPIError(f"Failed to read file for Cognee upload: {path}: {exc}") from exc + multipart_files.append( + ( + "data", + (path.name, content, mime_type or "application/octet-stream"), + ) + ) + + try: + response = await self._client.post( + "/api/v1/add", data=form_data, files=multipart_files + ) + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee ingestion request failed: {exc}") from exc + + if response.status_code in {200, 201}: + result = response.json() + result.setdefault("status", response.status_code) + return result + + raise CogneeAPIError( + f"Cognee ingestion failed ({response.status_code}): {response.text.strip()}" + ) + + async def cognify(self, dataset_name: str) -> Dict[str, Any]: + payload = {"datasets": [dataset_name]} + try: + response = await self._client.post("/api/v1/cognify", json=payload) + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee cognify request failed: {exc}") from exc + + if response.status_code in {200, 201}: + return response.json() + raise CogneeAPIError( + f"Cognee cognify failed ({response.status_code}): {response.text.strip()}" + ) + + async def ingest_text(self, dataset_name: str, content: str) -> Dict[str, Any]: + data_path = Path("virtual.txt") + multipart_files = [ + ( + "data", + (data_path.name, content.encode("utf-8"), "text/plain"), + ) + ] + form_data = {"datasetName": dataset_name} + try: + response = await self._client.post( + "/api/v1/add", data=form_data, files=multipart_files + ) + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee text ingestion request failed: {exc}") from exc + + if response.status_code in {200, 201}: + return response.json() + raise CogneeAPIError( + f"Cognee text ingestion failed ({response.status_code}): {response.text.strip()}" + ) + + # ------------------------------------------------------------------ + # Search / insights + # ------------------------------------------------------------------ + async def search( + self, + query: str, + datasets: Optional[Iterable[str]] = None, + search_type: str = "INSIGHTS", + top_k: Optional[int] = None, + ) -> Any: + payload: Dict[str, Any] = { + "query": query, + "search_type": search_type, + } + if datasets: + payload["datasets"] = list(datasets) + if top_k is not None: + payload["top_k"] = top_k + + try: + response = await self._client.post("/api/v1/search", json=payload) + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee search request failed: {exc}") from exc + + if response.status_code in {200, 201}: + # search may already be JSON serialisable but ensure data type is python object + return response.json() + raise CogneeAPIError( + f"Cognee search failed ({response.status_code}): {response.text.strip()}" + ) + + async def status(self) -> Dict[str, Any]: + """Return health information along with dataset list.""" + info: Dict[str, Any] = {} + try: + response = await self._client.get("/health") + if response.status_code == 200: + info["health"] = response.json() + else: + info["health_error"] = { + "status_code": response.status_code, + "body": response.text.strip(), + } + except httpx.RequestError as exc: + info["health_error"] = {"message": str(exc)} + + try: + datasets = await self.list_datasets() + info["datasets"] = datasets + except CogneeAPIError as exc: + info.setdefault("dataset_error", str(exc)) + + return info + + # ------------------------------------------------------------------ + async def raw_request(self, method: str, url: str, **kwargs: Any) -> httpx.Response: + """Expose a raw request helper for advanced scenarios.""" + try: + response = await self._client.request(method, url, **kwargs) + except httpx.RequestError as exc: + raise CogneeAPIError(f"Cognee request failed: {exc}") from exc + return response + + +__all__ = ["CogneeAPIClient", "CogneeAPIError"] diff --git a/ai/src/fuzzforge_ai/cognee_service.py b/ai/src/fuzzforge_ai/cognee_service.py index dea5d5d..500da9e 100644 --- a/ai/src/fuzzforge_ai/cognee_service.py +++ b/ai/src/fuzzforge_ai/cognee_service.py @@ -15,14 +15,16 @@ Provides integrated Cognee functionality for codebase analysis and knowledge gra import os -import asyncio +import json import logging from pathlib import Path -from typing import Dict, List, Any, Optional +from typing import Dict, List, Any, Optional, Tuple from datetime import datetime logger = logging.getLogger(__name__) +from .cognee_api_client import CogneeAPIClient, CogneeAPIError + class CogneeService: """ @@ -33,17 +35,108 @@ class CogneeService: def __init__(self, config): """Initialize with FuzzForge config""" self.config = config - self.cognee_config = config.get_cognee_config() - self.project_context = config.get_project_context() self._cognee = None self._user = None self._initialized = False + self._s3_client = None + self._api_client: Optional[CogneeAPIClient] = None + self._dataset_id: Optional[str] = None + self._sync_config_state() + + def _sync_config_state(self) -> None: + """Refresh cached configuration from the project manager.""" + self.cognee_config = self.config.get_cognee_config() + self.project_context = self.config.get_project_context() + self.project_dir = Path(self.project_context.get("project_dir", Path.cwd())) + self.storage_backend = self.cognee_config.get("storage_backend", "filesystem") + mode_env = os.getenv("COGNEE_STORAGE_MODE") + if mode_env: + self.service_mode = mode_env + else: + self.service_mode = self.cognee_config.get("mode", "embedded") + + bucket_env = os.getenv("COGNEE_S3_BUCKET") or os.getenv("S3_BUCKET") + prefix_env = os.getenv("COGNEE_S3_PREFIX") + self.s3_bucket = bucket_env or self.cognee_config.get("s3_bucket") + self.s3_prefix = prefix_env or self.cognee_config.get("s3_prefix") + + email_env = os.getenv("COGNEE_SERVICE_USER_EMAIL") or os.getenv("DEFAULT_USER_EMAIL") + self.service_user_email = email_env or self.cognee_config.get("service_user_email") + self.service_env_dir = self.cognee_config.get("service_env_dir") + + url_env = os.getenv("COGNEE_SERVICE_URL") or os.getenv("COGNEE_API_URL") + self.service_url = url_env or self.cognee_config.get("service_url") + + def _using_service(self) -> bool: + return (self.service_mode or "").lower() == "service" and bool(self.service_url) + + def _get_service_credentials(self) -> Tuple[str, str]: + email = ( + self.service_user_email + or os.getenv("COGNEE_SERVICE_USER_EMAIL") + or os.getenv("DEFAULT_USER_EMAIL") + or f"project_{self.project_context['project_id']}@cognee.dev" + ) + password = ( + os.getenv("COGNEE_SERVICE_USER_PASSWORD") + or os.getenv("DEFAULT_USER_PASSWORD") + or f"{self.project_context['project_id'][:8]}_C0gn33!" + ) + os.environ.setdefault("COGNEE_SERVICE_USER_EMAIL", email) + os.environ.setdefault("DEFAULT_USER_EMAIL", email) + os.environ.setdefault("COGNEE_SERVICE_USER_PASSWORD", password) + os.environ.setdefault("DEFAULT_USER_PASSWORD", password) + return email, password + + async def _initialize_remote(self) -> None: + if not self.service_url: + raise ValueError( + "COGNEE_SERVICE_URL must be configured when Cognee mode is set to 'service'" + ) + + # Ensure env variables are populated so downstream components (LLM config etc.) stay aligned + self.config.setup_cognee_environment() + self._sync_config_state() + + email, password = self._get_service_credentials() + tenant_name = self.project_context.get("tenant_id") + + if self._api_client: + await self._api_client.close() + + self._api_client = CogneeAPIClient(self.service_url) + + try: + profile = await self._api_client.ensure_user(email, password, tenant_name) + self._user = profile + dataset_name = self.get_project_dataset_name() + dataset_info = await self._api_client.ensure_dataset(dataset_name) + self._dataset_id = dataset_info.get("id") + logger.info( + "Connected to Cognee service", + extra={ + "service_url": self.service_url, + "project": self.project_context.get("project_name"), + "dataset": dataset_name, + }, + ) + except Exception: + if self._api_client: + await self._api_client.close() + self._api_client = None + raise async def initialize(self): """Initialize Cognee with project-specific configuration""" try: - # Ensure environment variables for Cognee are set before import + self.config.refresh() self.config.setup_cognee_environment() + self._sync_config_state() + if self._using_service(): + await self._initialize_remote() + self._initialized = True + return + logger.debug( "Cognee environment configured", extra={ @@ -119,8 +212,14 @@ class CogneeService: await self._setup_user_context() self._initialized = True - logger.info(f"Cognee initialized for project {self.project_context['project_name']} " - f"with Kuzu at {system_dir}") + logger.info( + "Cognee initialized", + extra={ + "project": self.project_context.get("project_name"), + "storage": self.storage_backend, + "data_root": data_dir, + }, + ) except ImportError: logger.error("Cognee not installed. Install with: pip install cognee") @@ -137,7 +236,7 @@ class CogneeService: try: # Dataset creation is handled automatically by Cognee when adding files # We just ensure we have the right context set up - dataset_name = f"{self.project_context['project_name']}_codebase" + dataset_name = self.get_project_dataset_name() logger.info(f"Dataset {dataset_name} ready for project {self.project_context['project_name']}") return dataset_name except Exception as e: @@ -146,47 +245,187 @@ class CogneeService: async def _setup_user_context(self): """Setup user context for multi-tenant isolation""" + if self._using_service(): + return try: from cognee.modules.users.methods import create_user, get_user - - # Always try fallback email first to avoid validation issues - fallback_email = f"project_{self.project_context['project_id']}@fuzzforge.example" - user_tenant = self.project_context['tenant_id'] - - # Try to get existing fallback user first + from cognee.modules.users.methods.get_user_by_email import get_user_by_email + from cognee.modules.users.tenants.methods import create_tenant + from cognee.infrastructure.databases.exceptions import EntityAlreadyExistsError + + fallback_email = self.service_user_email or f"project_{self.project_context['project_id']}@cognee.dev" + fallback_password = ( + os.getenv("COGNEE_SERVICE_USER_PASSWORD") + or os.getenv("DEFAULT_USER_PASSWORD") + or f"{self.project_context['project_id'][:8]}_C0gn33!" + ) + + os.environ.setdefault("DEFAULT_USER_EMAIL", fallback_email) + os.environ.setdefault("DEFAULT_USER_PASSWORD", fallback_password) + try: - self._user = await get_user(fallback_email) - logger.info(f"Using existing user: {fallback_email}") - return - except: - # User doesn't exist, try to create fallback - pass - - # Create fallback user - try: - self._user = await create_user(fallback_email, user_tenant) - logger.info(f"Created fallback user: {fallback_email} for tenant: {user_tenant}") - return - except Exception as fallback_error: - logger.warning(f"Fallback user creation failed: {fallback_error}") - self._user = None - return - + user = await get_user_by_email(fallback_email) + logger.info("Using existing Cognee service user", extra={"email": fallback_email}) + except Exception: + user = None + + if user is None: + try: + user = await create_user( + email=fallback_email, + password=fallback_password, + is_superuser=True, + is_active=True, + is_verified=True, + auto_login=True, + ) + logger.info( + "Created Cognee service user", + extra={"email": fallback_email, "tenant": self.project_context.get("tenant_id")}, + ) + except Exception as exc: + logger.warning("Failed to create Cognee service user", exc_info=True) + self._user = None + return + + tenant_name = self.project_context.get("tenant_id") or f"fuzzforge_project_{self.project_context['project_id']}" + if not getattr(user, "tenant_id", None): + try: + await create_tenant(tenant_name, user.id) + except EntityAlreadyExistsError: + logger.debug("Tenant already exists for project", extra={"tenant": tenant_name}) + except Exception as exc: + logger.warning("Failed to ensure Cognee tenant", exc_info=True) + + try: + user = await get_user(user.id) + except Exception: + pass + + self._user = user + self.service_user_email = fallback_email + except Exception as e: logger.warning(f"Could not setup multi-tenant user context: {e}") logger.info("Proceeding with default context") self._user = None + + def _uses_s3_storage(self) -> bool: + if (self.storage_backend or "").lower() == "s3": + return True + if os.getenv("STORAGE_BACKEND", "").lower() == "s3": + return True + + data_dir = (self.cognee_config.get("data_directory") or "").lower() + system_dir = (self.cognee_config.get("system_directory") or "").lower() + return data_dir.startswith("s3://") or system_dir.startswith("s3://") + + def _relative_ingest_path(self, file_path: Path) -> str: + try: + relative = file_path.relative_to(self.project_dir) + except ValueError: + relative = file_path.name + return str(relative).replace(os.sep, "/") + + def _resolve_s3_paths(self) -> Tuple[str, str]: + bucket = ( + self.s3_bucket + or os.getenv("STORAGE_BUCKET_NAME") + or os.getenv("COGNEE_S3_BUCKET") + or os.getenv("S3_BUCKET") + ) + if not bucket: + raise ValueError("S3 bucket not configured for Cognee service storage") + + prefix = ( + self.s3_prefix + or os.getenv("COGNEE_S3_PREFIX") + or f"cognee/projects/{self.project_context['project_id']}" + ) + return bucket, prefix.rstrip("/") + + def _get_s3_client(self): + if self._s3_client is not None: + return self._s3_client + + try: + import boto3 + except ImportError as exc: + raise RuntimeError("boto3 is required for Cognee S3 ingestion") from exc + + endpoint = os.getenv("AWS_ENDPOINT_URL") or os.getenv("S3_ENDPOINT") + access_key = os.getenv("AWS_ACCESS_KEY_ID") or os.getenv("S3_ACCESS_KEY") + secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") or os.getenv("S3_SECRET_KEY") + region = ( + os.getenv("AWS_REGION") + or os.getenv("AWS_DEFAULT_REGION") + or os.getenv("S3_REGION") + or "us-east-1" + ) + use_ssl = os.getenv("S3_USE_SSL", "false").lower() == "true" + + if not (access_key and secret_key): + raise ValueError("AWS/S3 credentials missing for Cognee service ingestion") + + self._s3_client = boto3.client( + "s3", + endpoint_url=endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region, + use_ssl=use_ssl, + ) + return self._s3_client + + def _upload_files_to_s3(self, file_paths: List[Path]) -> Tuple[List[str], Dict[str, str]]: + bucket, prefix = self._resolve_s3_paths() + client = self._get_s3_client() + + timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%S") + try: + from uuid import uuid4 + except ImportError: # pragma: no cover - standard lib always available + uuid4 = None # type: ignore + + batch_suffix = uuid4().hex[:8] if uuid4 else "batch" + base_prefix = f"{prefix}/uploads/{self.project_context['project_id']}/{timestamp}_{batch_suffix}" + + uploaded: Dict[str, str] = {} + uris: List[str] = [] + for file_path in file_paths: + key = f"{base_prefix}/{self._relative_ingest_path(file_path)}" + with open(file_path, "rb") as handle: + client.upload_fileobj(handle, bucket, key) + uri = f"s3://{bucket}/{key}" + uploaded[str(file_path)] = uri + uris.append(uri) + + return uris, uploaded + + def _prepare_ingest_payload(self, file_paths: List[Path]) -> Tuple[List[str], Dict[str, str]]: + if self._uses_s3_storage(): + return self._upload_files_to_s3(file_paths) + return [str(path) for path in file_paths], {} def get_project_dataset_name(self, dataset_suffix: str = "codebase") -> str: """Get project-specific dataset name""" return f"{self.project_context['project_name']}_{dataset_suffix}" - async def ingest_text(self, content: str, dataset: str = "fuzzforge") -> bool: + async def ingest_text(self, content: str, dataset: str = None) -> bool: """Ingest text content into knowledge graph""" if not self._initialized: await self.initialize() + dataset = dataset or self.get_project_dataset_name() try: + if self._using_service(): + if not self._api_client: + raise RuntimeError("Cognee API client not initialized") + await self._api_client.ensure_dataset(dataset) + await self._api_client.ingest_text(dataset, content) + await self._api_client.cognify(dataset) + return True + await self._cognee.add([content], dataset) await self._cognee.cognify([dataset]) return True @@ -194,45 +433,76 @@ class CogneeService: logger.error(f"Failed to ingest text: {e}") return False - async def ingest_files(self, file_paths: List[Path], dataset: str = "fuzzforge") -> Dict[str, Any]: + async def ingest_files(self, file_paths: List[Path], dataset: str = None) -> Dict[str, Any]: """Ingest multiple files into knowledge graph""" if not self._initialized: await self.initialize() + self._sync_config_state() + dataset = dataset or self.get_project_dataset_name() results = { "success": 0, "failed": 0, - "errors": [] + "errors": [], + "uploaded": {}, } - - try: - ingest_paths: List[str] = [] - for file_path in file_paths: - try: - with open(file_path, 'r', encoding='utf-8'): - ingest_paths.append(str(file_path)) - results["success"] += 1 - except (UnicodeDecodeError, PermissionError) as exc: - results["failed"] += 1 - results["errors"].append(f"{file_path}: {exc}") - logger.warning("Skipping %s: %s", file_path, exc) + readable_files: List[Path] = [] - if ingest_paths: - await self._cognee.add(ingest_paths, dataset_name=dataset) + for file_path in file_paths: + try: + with open(file_path, "r", encoding="utf-8"): + pass + readable_files.append(Path(file_path)) + except (UnicodeDecodeError, PermissionError) as exc: + results["failed"] += 1 + results["errors"].append(f"{file_path}: {exc}") + logger.warning("Skipping %s: %s", file_path, exc) + + if not readable_files: + return results + + try: + if self._using_service(): + if not self._api_client: + raise RuntimeError("Cognee API client not initialized") + await self._api_client.ensure_dataset(dataset) + service_result = await self._api_client.ingest_files(dataset, readable_files) + await self._api_client.cognify(dataset) + results["success"] = len(readable_files) + results["service"] = service_result + else: + ingest_payload, uploaded = self._prepare_ingest_payload(readable_files) + await self._cognee.add(ingest_payload, dataset_name=dataset) await self._cognee.cognify([dataset]) - + results["success"] = len(readable_files) + if uploaded: + results["uploaded"] = uploaded except Exception as e: - logger.error(f"Failed to ingest files: {e}") + logger.error("Failed to ingest files", exc_info=True) results["errors"].append(f"Cognify error: {str(e)}") - + results["failed"] += len(readable_files) + results["success"] = 0 + return results + return results async def search_insights(self, query: str, dataset: str = None) -> List[str]: """Search for insights in the knowledge graph""" if not self._initialized: await self.initialize() - + try: + if self._using_service(): + if not self._api_client: + raise RuntimeError("Cognee API client not initialized") + dataset_name = dataset or self.get_project_dataset_name() + results = await self._api_client.search( + query, + datasets=[dataset_name] if dataset_name else None, + search_type="INSIGHTS", + ) + return self._normalise_service_results(results) + from cognee.modules.search.types import SearchType kwargs = { @@ -256,6 +526,17 @@ class CogneeService: await self.initialize() try: + if self._using_service(): + if not self._api_client: + raise RuntimeError("Cognee API client not initialized") + dataset_name = dataset or self.get_project_dataset_name() + results = await self._api_client.search( + query, + datasets=[dataset_name] if dataset_name else None, + search_type="CHUNKS", + ) + return self._normalise_service_results(results) + from cognee.modules.search.types import SearchType kwargs = { @@ -279,6 +560,16 @@ class CogneeService: await self.initialize() try: + if self._using_service(): + if not self._api_client: + raise RuntimeError("Cognee API client not initialized") + results = await self._api_client.search( + query, + datasets=[self.get_project_dataset_name()], + search_type="GRAPH_COMPLETION", + ) + return self._normalise_service_results(results) + from cognee.modules.search.types import SearchType results = await self._cognee.search( @@ -290,34 +581,77 @@ class CogneeService: except Exception as e: logger.error(f"Failed to search graph completion: {e}") return [] - + async def get_status(self) -> Dict[str, Any]: """Get service status and statistics""" + self._sync_config_state() status = { "initialized": self._initialized, "enabled": self.cognee_config.get("enabled", True), "provider": self.cognee_config.get("graph_database_provider", "kuzu"), "data_directory": self.cognee_config.get("data_directory"), "system_directory": self.cognee_config.get("system_directory"), + "storage_backend": self.storage_backend, + "service_mode": self.service_mode, + "s3_bucket": self.s3_bucket, + "s3_prefix": self.s3_prefix, + "dataset_name": self.get_project_dataset_name(), } - - if self._initialized: + if self._dataset_id: + status["dataset_id"] = self._dataset_id + + if self._initialized and self._using_service() and self._api_client: + try: + status.update(await self._api_client.status()) + except CogneeAPIError as exc: + status["service_error"] = str(exc) + + if self._initialized and not self._uses_s3_storage() and not self._using_service(): try: - # Check if directories exist and get sizes data_dir = Path(status["data_directory"]) system_dir = Path(status["system_directory"]) - - status.update({ - "data_dir_exists": data_dir.exists(), - "system_dir_exists": system_dir.exists(), - "kuzu_db_exists": (system_dir / "kuzu_db").exists(), - "lancedb_exists": (system_dir / "lancedb").exists(), - }) - + + status.update( + { + "data_dir_exists": data_dir.exists(), + "system_dir_exists": system_dir.exists(), + "kuzu_db_exists": (system_dir / "kuzu_db").exists(), + "lancedb_exists": (system_dir / "lancedb").exists(), + } + ) + except Exception as e: status["status_error"] = str(e) - + else: + status["s3_storage_active"] = self._uses_s3_storage() + return status + + def _normalise_service_results(self, payload: Any) -> List[str]: + if payload is None: + return [] + if isinstance(payload, list): + return [self._stringify_service_item(item) for item in payload] + if isinstance(payload, dict): + if "results" in payload and isinstance(payload["results"], list): + return [self._stringify_service_item(item) for item in payload["results"]] + if "data" in payload and isinstance(payload["data"], list): + return [self._stringify_service_item(item) for item in payload["data"]] + return [self._stringify_service_item(payload)] + + @staticmethod + def _stringify_service_item(item: Any) -> str: + if isinstance(item, str): + return item + try: + return json.dumps(item, ensure_ascii=False) + except Exception: + return str(item) + + async def close(self) -> None: + if self._api_client: + await self._api_client.close() + self._api_client = None async def clear_data(self, confirm: bool = False): """Clear all ingested data (dangerous!)""" @@ -327,6 +661,25 @@ class CogneeService: if not self._initialized: await self.initialize() + if self._using_service(): + if not self._api_client: + raise RuntimeError("Cognee API client not initialized") + dataset_name = self.get_project_dataset_name() + try: + datasets = await self._api_client.list_datasets() + target = next((d for d in datasets if d.get("name") == dataset_name), None) + if target and target.get("id"): + await self._api_client.delete_dataset(target["id"]) + logger.info("Deleted Cognee dataset via service", extra={"dataset": dataset_name}) + else: + logger.info( + "No matching Cognee dataset found to delete", extra={"dataset": dataset_name} + ) + except Exception as exc: + logger.error("Failed to clear Cognee dataset via service", exc_info=True) + raise + return + try: await self._cognee.prune.prune_data() await self._cognee.prune.prune_system(metadata=True) diff --git a/ai/src/fuzzforge_ai/mcp/__init__.py b/ai/src/fuzzforge_ai/mcp/__init__.py new file mode 100644 index 0000000..17383bf --- /dev/null +++ b/ai/src/fuzzforge_ai/mcp/__init__.py @@ -0,0 +1 @@ +"""MCP server utilities for FuzzForge.""" diff --git a/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py b/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py new file mode 100644 index 0000000..b79f048 --- /dev/null +++ b/ai/src/fuzzforge_ai/mcp/cognee_mcp_server.py @@ -0,0 +1,259 @@ +"""Multi-tenant aware MCP server backed by the Cognee service API.""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import logging +import os +from pathlib import Path +from typing import List, Optional + +from dotenv import load_dotenv +from mcp.server import FastMCP +import mcp.types as types + +from fuzzforge_ai.cognee_api_client import CogneeAPIClient, CogneeAPIError + +logger = logging.getLogger(__name__) +logging.basicConfig(level=os.getenv("MCP_LOG_LEVEL", "INFO")) + +mcp = FastMCP("FuzzForge-Cognee") + + +def _load_project_env() -> None: + """Load environment variables from `.fuzzforge/.env` if present.""" + + env_path = Path.cwd() / ".fuzzforge" / ".env" + if env_path.exists(): + load_dotenv(env_path, override=False) + + +class CogneeMCPContext: + """Holds shared state for MCP tools and manages the Cognee API client.""" + + def __init__(self) -> None: + _load_project_env() + self.service_url = os.getenv("COGNEE_SERVICE_URL", "").rstrip("/") + self.verify_ssl = os.getenv("COGNEE_VERIFY_SSL", "true").lower() != "false" + self.email = os.getenv("COGNEE_SERVICE_USER_EMAIL") or os.getenv("DEFAULT_USER_EMAIL") + self.password = os.getenv("COGNEE_SERVICE_USER_PASSWORD") or os.getenv("DEFAULT_USER_PASSWORD") + if not (self.service_url and self.email and self.password): + raise RuntimeError( + "COGNEE_SERVICE_URL, COGNEE_SERVICE_USER_EMAIL and" + " COGNEE_SERVICE_USER_PASSWORD must be set in the environment" + ) + self._client: Optional[CogneeAPIClient] = None + self.active_dataset_name: Optional[str] = os.getenv("COGNEE_DATASET_NAME") or self._default_dataset_name() + self.active_dataset_id: Optional[str] = None + + async def start(self) -> None: + self._client = CogneeAPIClient(self.service_url, verify_ssl=self.verify_ssl) + await self._client.ensure_user(self.email, self.password, tenant_name=self._tenant_name()) + if self.active_dataset_name: + dataset_info = await self._client.ensure_dataset(self.active_dataset_name) + self.active_dataset_id = dataset_info.get("id") + logger.info( + "Connected to Cognee service", + extra={ + "service_url": self.service_url, + "dataset": self.active_dataset_name, + "dataset_id": self.active_dataset_id, + }, + ) + + async def close(self) -> None: + if self._client: + await self._client.close() + + @property + def client(self) -> CogneeAPIClient: + if not self._client: + raise RuntimeError("Cognee API client is not initialised yet") + return self._client + + def _tenant_name(self) -> str: + return os.getenv("COGNEE_TENANT_NAME") or os.getenv("COGNEE_PROJECT_ID") or "default-tenant" + + def _default_dataset_name(self) -> str: + project_name = os.getenv("FUZZFORGE_PROJECT_NAME") or "project" + return f"{project_name}_codebase" + + async def ensure_dataset(self, dataset_name: str) -> str: + """Ensure the dataset exists and cache its id.""" + + info = await self.client.ensure_dataset(dataset_name) + dataset_id = info.get("id") + if not dataset_id: + raise RuntimeError(f"Failed to resolve dataset id for '{dataset_name}'") + self.active_dataset_name = dataset_name + self.active_dataset_id = dataset_id + return dataset_id + + async def resolve_dataset(self, dataset: Optional[str]) -> tuple[str, str]: + name = dataset or self.active_dataset_name + if not name: + raise ValueError("Dataset is not specified; set COGNEE_DATASET_NAME or call set_dataset first") + # if the requested dataset differs from the cached one, ensure it + if dataset and dataset != self.active_dataset_name: + dataset_id = await self.ensure_dataset(dataset) + else: + dataset_id = self.active_dataset_id + if not dataset_id: + dataset_id = await self.ensure_dataset(name) + return name, dataset_id + + +CONTEXT = CogneeMCPContext() + + +def _text(message: str) -> List[types.TextContent]: + return [types.TextContent(type="text", text=message)] + + +@mcp.tool() +async def set_dataset(dataset: str) -> List[types.TextContent]: + """Switch the active dataset for subsequent operations.""" + + dataset_id = await CONTEXT.ensure_dataset(dataset) + return _text(f"Active dataset set to '{dataset}' (id: {dataset_id}).") + + +@mcp.tool() +async def ingest_file(path: str, dataset: Optional[str] = None) -> List[types.TextContent]: + """Upload a local file into the Cognee dataset.""" + + file_path = Path(path).expanduser().resolve() + if not file_path.is_file(): + raise ValueError(f"File not found: {file_path}") + + dataset_name, _ = await CONTEXT.resolve_dataset(dataset) + result = await CONTEXT.client.ingest_files(dataset_name, [file_path]) + success = result.get("success", 0) + failed = result.get("failed", 0) + errors = result.get("errors", []) + + message = [ + f"Uploaded {success} file(s) to dataset '{dataset_name}'.", + f"Failed: {failed}", + ] + if errors: + message.append("Errors:\n" + "\n".join(errors)) + + return _text("\n".join(message)) + + +@mcp.tool() +async def cognify(run_in_background: bool = False, dataset: Optional[str] = None) -> List[types.TextContent]: + """Trigger the Cognify pipeline for the dataset.""" + + dataset_name, _ = await CONTEXT.resolve_dataset(dataset) + response = await CONTEXT.client.cognify(dataset_name, run_in_background=run_in_background) + return _text(json.dumps(response, indent=2)) + + +@mcp.tool() +async def cognify_status(dataset: Optional[str] = None) -> List[types.TextContent]: + """Return the status of recent Cognify runs.""" + + _, dataset_id = await CONTEXT.resolve_dataset(dataset) + status = await CONTEXT.client.status() + status.setdefault("dataset_ids", []).append(dataset_id) + return _text(json.dumps(status, indent=2)) + + +@mcp.tool() +async def search_chunks(query: str, top_k: int = 5, dataset: Optional[str] = None) -> List[types.TextContent]: + """Search dataset for relevant document chunks.""" + + dataset_name, _ = await CONTEXT.resolve_dataset(dataset) + result = await CONTEXT.client.search( + query, + datasets=[dataset_name], + search_type="CHUNKS", + top_k=top_k, + ) + return _text(json.dumps(result, indent=2)) + + +@mcp.tool() +async def search_insights(query: str, top_k: int = 5, dataset: Optional[str] = None) -> List[types.TextContent]: + """Search dataset for graph insights.""" + + dataset_name, _ = await CONTEXT.resolve_dataset(dataset) + result = await CONTEXT.client.search( + query, + datasets=[dataset_name], + search_type="INSIGHTS", + top_k=top_k, + ) + return _text(json.dumps(result, indent=2)) + + +@mcp.tool() +async def list_datasets() -> List[types.TextContent]: + """List datasets accessible to the current service user.""" + + datasets = await CONTEXT.client.list_datasets() + return _text(json.dumps(datasets, indent=2)) + + +@mcp.tool() +async def prune_dataset(dataset: Optional[str] = None) -> List[types.TextContent]: + """Delete the dataset and recreate it empty.""" + + dataset_name, dataset_id = await CONTEXT.resolve_dataset(dataset) + await CONTEXT.client.delete_dataset(dataset_id) + dataset_info = await CONTEXT.client.ensure_dataset(dataset_name) + CONTEXT.active_dataset_name = dataset_name + CONTEXT.active_dataset_id = dataset_info.get("id") + return _text(f"Dataset '{dataset_name}' has been pruned and recreated.") + + +async def _async_main(args: argparse.Namespace) -> None: + try: + await CONTEXT.start() + except CogneeAPIError as exc: + raise SystemExit(f"Failed to initialise Cognee MCP: {exc}") from exc + + transport = args.transport + host = args.host + port = args.port + path = args.path + + logger.info( + "Starting Cognee MCP server", + extra={ + "transport": transport, + "host": host, + "port": port, + "path": path, + "dataset": CONTEXT.dataset_name, + }, + ) + + try: + if transport == "http": + await mcp.run_http(host=host, port=port, path=path) + elif transport == "sse": + await mcp.run_sse(host=host, port=port, path=path) + else: + await mcp.run_stdio() + finally: + await CONTEXT.close() + + +def main(argv: Optional[List[str]] = None) -> None: + parser = argparse.ArgumentParser(description="Cognee MCP server (service-backed)") + parser.add_argument("--transport", choices=["stdio", "sse", "http"], default=os.getenv("TRANSPORT_MODE", "stdio")) + parser.add_argument("--host", default=os.getenv("MCP_HOST", "127.0.0.1")) + parser.add_argument("--port", type=int, default=int(os.getenv("MCP_PORT", "8000"))) + parser.add_argument("--path", default=os.getenv("MCP_HTTP_PATH", "/mcp")) + args = parser.parse_args(argv) + + asyncio.run(_async_main(args)) + + +if __name__ == "__main__": + main() diff --git a/cli/src/fuzzforge_cli/commands/init.py b/cli/src/fuzzforge_cli/commands/init.py index 1847349..74e19d3 100644 --- a/cli/src/fuzzforge_cli/commands/init.py +++ b/cli/src/fuzzforge_cli/commands/init.py @@ -22,7 +22,7 @@ import typer from rich.console import Console from rich.prompt import Confirm, Prompt -from ..config import ensure_project_config +from ..config import ensure_project_config, provision_cognee_service_for_project, get_project_config from ..database import ensure_project_db console = Console() @@ -107,6 +107,20 @@ def project( _ensure_env_file(fuzzforge_dir, force) _ensure_agents_registry(fuzzforge_dir, force) + # Provision Cognee service user/tenant/dataset if service mode is enabled + console.print("🧠 Provisioning Cognee service credentials...") + provision_result = provision_cognee_service_for_project(current_dir) + + if provision_result["status"] == "success": + console.print(f" ✅ Created user: {provision_result.get('user', 'N/A')}", style="green") + console.print(f" ✅ Created tenant: {provision_result.get('tenant', 'N/A')}", style="green") + console.print(f" ✅ Created dataset: {provision_result.get('dataset', 'N/A')}", style="green") + elif provision_result["status"] == "skipped": + console.print(f" ⏭️ Skipped: {provision_result.get('message', 'N/A')}", style="dim") + elif provision_result["status"] == "error": + console.print(f" ⚠️ Warning: {provision_result.get('message', 'N/A')}", style="yellow") + console.print(" 💡 You can provision later when the service is available", style="dim") + # Create .gitignore if needed gitignore_path = current_dir / ".gitignore" gitignore_entries = [ @@ -211,6 +225,21 @@ def _ensure_env_file(fuzzforge_dir: Path, force: bool) -> None: session_db_path = fuzzforge_dir / "fuzzforge_sessions.db" session_db_rel = session_db_path.relative_to(fuzzforge_dir.parent) + project_config = get_project_config(fuzzforge_dir.parent) + cognee_cfg = project_config.cognee if project_config else None + + service_url_default = "http://localhost:18000" + service_url = os.getenv("COGNEE_SERVICE_URL") or (cognee_cfg.service_url if cognee_cfg and cognee_cfg.service_url else service_url_default) + service_port = os.getenv("COGNEE_SERVICE_PORT") or "18000" + s3_bucket = os.getenv("COGNEE_S3_BUCKET") or (cognee_cfg.s3_bucket if cognee_cfg and cognee_cfg.s3_bucket else "cognee-bucket") + s3_prefix = os.getenv("COGNEE_S3_PREFIX") or (cognee_cfg.s3_prefix if cognee_cfg and cognee_cfg.s3_prefix else "cognee/projects") + service_email = os.getenv("COGNEE_SERVICE_USER_EMAIL") or (cognee_cfg.service_user_email if cognee_cfg and cognee_cfg.service_user_email else "") + service_password = os.getenv("COGNEE_SERVICE_USER_PASSWORD") or (cognee_cfg.service_user_password if cognee_cfg and cognee_cfg.service_user_password else "") + aws_endpoint = os.getenv("COGNEE_AWS_ENDPOINT_URL") or "" + aws_region = os.getenv("COGNEE_AWS_REGION") or "" + aws_access = os.getenv("COGNEE_AWS_ACCESS_KEY_ID") or "" + aws_secret = os.getenv("COGNEE_AWS_SECRET_ACCESS_KEY") or "" + env_lines = [ "# FuzzForge AI configuration", "# Populate the API key(s) that match your LLM provider", @@ -228,6 +257,19 @@ def _ensure_env_file(fuzzforge_dir: Path, force: bool) -> None: "LLM_COGNEE_ENDPOINT=", "COGNEE_MCP_URL=", "", + "# Cognee service configuration", + "COGNEE_STORAGE_MODE=service", + f"COGNEE_SERVICE_URL={service_url}", + f"COGNEE_SERVICE_PORT={service_port}", + f"COGNEE_S3_BUCKET={s3_bucket}", + f"COGNEE_S3_PREFIX={s3_prefix}", + f"COGNEE_SERVICE_USER_EMAIL={service_email}", + f"COGNEE_SERVICE_USER_PASSWORD={service_password}", + f"COGNEE_AWS_ENDPOINT_URL={aws_endpoint}", + f"COGNEE_AWS_REGION={aws_region}", + f"COGNEE_AWS_ACCESS_KEY_ID={aws_access}", + f"COGNEE_AWS_SECRET_ACCESS_KEY={aws_secret}", + "", "# Session persistence options: inmemory | sqlite", "SESSION_PERSISTENCE=sqlite", f"SESSION_DB_PATH={session_db_rel}", diff --git a/cli/src/fuzzforge_cli/config.py b/cli/src/fuzzforge_cli/config.py index ba67c9e..c3785d3 100644 --- a/cli/src/fuzzforge_cli/config.py +++ b/cli/src/fuzzforge_cli/config.py @@ -21,7 +21,8 @@ from __future__ import annotations import hashlib import os from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Literal +import asyncio try: # Optional dependency; fall back if not installed from dotenv import load_dotenv @@ -76,6 +77,14 @@ class CogneeConfig(BaseModel): backend_access_control: bool = True project_id: Optional[str] = None tenant_id: Optional[str] = None + mode: Literal["embedded", "service"] = "embedded" + storage_backend: Literal["filesystem", "s3"] = "filesystem" + s3_bucket: Optional[str] = None + s3_prefix: Optional[str] = None + service_env_dir: Optional[str] = None + service_user_email: Optional[str] = None + service_user_password: Optional[str] = None + service_url: Optional[str] = None class FuzzForgeConfig(BaseModel): @@ -138,30 +147,269 @@ class FuzzForgeConfig(BaseModel): if not cognee.tenant_id: cognee.tenant_id = self.project.tenant_id changed = True - - base_dir = project_dir / ".fuzzforge" / "cognee" / f"project_{self.project.id}" - data_dir = base_dir / "data" - system_dir = base_dir / "system" - - for path in ( - base_dir, - data_dir, - system_dir, - system_dir / "kuzu_db", - system_dir / "lancedb", - ): - if not path.exists(): - path.mkdir(parents=True, exist_ok=True) - - if cognee.data_directory != str(data_dir): - cognee.data_directory = str(data_dir) + if not cognee.service_user_email or cognee.service_user_email.endswith("@cognee.local"): + cognee.service_user_email = f"project_{self.project.id}@cognee.dev" changed = True - if cognee.system_directory != str(system_dir): - cognee.system_directory = str(system_dir) + if not cognee.service_user_password: + cognee.service_user_password = f"{self.project.id[:8]}_C0gn33!" changed = True + mode_env = (os.getenv("COGNEE_STORAGE_MODE") or os.getenv("COGNEE_SERVICE_MODE")) + if mode_env: + normalized = mode_env.strip().lower() + if normalized in {"embedded", "service"} and normalized != cognee.mode: + cognee.mode = normalized # type: ignore[assignment] + changed = True + + service_url_env = os.getenv("COGNEE_SERVICE_URL") or os.getenv("COGNEE_API_URL") + if service_url_env and cognee.service_url != service_url_env: + cognee.service_url = service_url_env + changed = True + + if cognee.mode not in {"embedded", "service"}: + cognee.mode = "service" + changed = True + + if cognee.mode != "service": + cognee.mode = "service" + changed = True + + if cognee.mode == "service": + bucket = cognee.s3_bucket or os.getenv("COGNEE_S3_BUCKET") or os.getenv("S3_BUCKET") + if bucket and cognee.s3_bucket != bucket: + cognee.s3_bucket = bucket + changed = True + + prefix = cognee.s3_prefix or os.getenv("COGNEE_S3_PREFIX") + if not prefix: + prefix = f"cognee/projects/{self.project.id}" + if cognee.s3_prefix != prefix: + cognee.s3_prefix = prefix + changed = True + + if bucket: + data_dir = f"s3://{bucket}/{prefix}/data" + system_dir = f"s3://{bucket}/{prefix}/system" + + if cognee.data_directory != data_dir: + cognee.data_directory = data_dir + changed = True + if cognee.system_directory != system_dir: + cognee.system_directory = system_dir + changed = True + if cognee.storage_backend != "s3": + cognee.storage_backend = "s3" + changed = True + + service_dir = ( + project_dir + / ".fuzzforge" + / "cognee" + / "service" + / f"project_{self.project.id}" + ) + if cognee.service_env_dir != str(service_dir): + cognee.service_env_dir = str(service_dir) + changed = True + service_dir.mkdir(parents=True, exist_ok=True) + self._write_cognee_service_env(project_dir) + + if cognee.mode == "embedded": + base_dir = project_dir / ".fuzzforge" / "cognee" / f"project_{self.project.id}" + data_dir = base_dir / "data" + system_dir = base_dir / "system" + + for path in ( + base_dir, + data_dir, + system_dir, + system_dir / "kuzu_db", + system_dir / "lancedb", + ): + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + + if cognee.data_directory != str(data_dir): + cognee.data_directory = str(data_dir) + changed = True + if cognee.system_directory != str(system_dir): + cognee.system_directory = str(system_dir) + changed = True + if cognee.storage_backend != "filesystem": + cognee.storage_backend = "filesystem" + changed = True + if cognee.service_env_dir is not None: + cognee.service_env_dir = None + changed = True + if cognee.service_url is not None: + cognee.service_url = None + changed = True + return changed + def _write_cognee_service_env(self, project_dir: Path) -> None: + """Generate a service .env file for Cognee containers.""" + cognee = self.cognee + if cognee.mode != "service" or not cognee.service_env_dir: + return + + service_dir = Path(cognee.service_env_dir) + service_dir.mkdir(parents=True, exist_ok=True) + env_path = service_dir / ".env" + + def _env(*names: str) -> str: + for name in names: + value = os.getenv(name) + if value: + return value + return "" + + bucket = cognee.s3_bucket or _env("COGNEE_S3_BUCKET", "S3_BUCKET") + prefix = cognee.s3_prefix or "" + backend_flag = "true" if cognee.backend_access_control else "false" + data_root = cognee.data_directory or "" + system_root = cognee.system_directory or "" + + aws_key = _env("COGNEE_AWS_ACCESS_KEY_ID", "AWS_ACCESS_KEY_ID", "S3_ACCESS_KEY") + aws_secret = _env( + "COGNEE_AWS_SECRET_ACCESS_KEY", + "AWS_SECRET_ACCESS_KEY", + "S3_SECRET_KEY", + ) + aws_endpoint = _env("COGNEE_AWS_ENDPOINT_URL", "AWS_ENDPOINT_URL", "S3_ENDPOINT") + aws_region = _env("COGNEE_AWS_REGION", "AWS_REGION", "S3_REGION") + + llm_provider = _env("LLM_COGNEE_PROVIDER", "COGNEE_LLM_PROVIDER", "LLM_PROVIDER") + llm_model = _env("LLM_COGNEE_MODEL", "COGNEE_LLM_MODEL", "LITELLM_MODEL") + llm_api_key = _env("LLM_COGNEE_API_KEY", "COGNEE_LLM_API_KEY", "LLM_API_KEY") + + service_password = os.getenv("COGNEE_SERVICE_USER_PASSWORD", "") + + lines = [ + "# Auto-generated by FuzzForge. Updates when project config changes.", + "COGNEE_STORAGE_MODE=service", + "STORAGE_BACKEND=s3", + ] + + if bucket: + lines.append(f"STORAGE_BUCKET_NAME={bucket}") + if prefix: + lines.append(f"COGNEE_S3_PREFIX={prefix}") + if data_root: + lines.append(f"COGNEE_DATA_ROOT={data_root}") + if system_root: + lines.append(f"COGNEE_SYSTEM_ROOT={system_root}") + + lines.extend( + [ + f"ENABLE_BACKEND_ACCESS_CONTROL={backend_flag}", + f"GRAPH_DATABASE_PROVIDER={cognee.graph_database_provider}", + f"COGNEE_PROJECT_ID={self.project.id}", + f"COGNEE_TENANT_KEY={self.project.tenant_id}", + ] + ) + + if cognee.service_user_email: + lines.append(f"COGNEE_SERVICE_USER_EMAIL={cognee.service_user_email}") + lines.append(f"DEFAULT_USER_EMAIL={cognee.service_user_email}") + else: + lines.append("COGNEE_SERVICE_USER_EMAIL=") + lines.append("DEFAULT_USER_EMAIL=") + + if cognee.service_user_password: + service_password = cognee.service_user_password + lines.append(f"COGNEE_SERVICE_USER_PASSWORD={service_password}") + lines.append(f"DEFAULT_USER_PASSWORD={service_password}") + else: + lines.append("COGNEE_SERVICE_USER_PASSWORD=") + lines.append("DEFAULT_USER_PASSWORD=") + + if aws_key: + lines.append(f"AWS_ACCESS_KEY_ID={aws_key}") + if aws_secret: + lines.append(f"AWS_SECRET_ACCESS_KEY={aws_secret}") + if aws_endpoint: + lines.append(f"AWS_ENDPOINT_URL={aws_endpoint}") + if aws_region: + lines.append(f"AWS_REGION={aws_region}") + lines.append(f"AWS_DEFAULT_REGION={aws_region}") + + if llm_provider: + lines.append(f"LLM_PROVIDER={llm_provider}") + if llm_model: + lines.append(f"LLM_MODEL={llm_model}") + if llm_api_key: + lines.append(f"LLM_API_KEY={llm_api_key}") + + service_url = cognee.service_url or _env("COGNEE_SERVICE_URL", "COGNEE_API_URL") + if service_url: + lines.append(f"COGNEE_SERVICE_URL={service_url}") + + env_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + def _provision_cognee_service_account(self, project_dir: Path) -> dict[str, str]: + """Ensure the hosted Cognee service has a user/tenant/dataset for this project. + + Returns: + Dictionary with status info: {"status": "success|error|skipped", "message": "..."} + """ + # Lazy import to avoid circular dependency + try: + from fuzzforge_ai.cognee_api_client import CogneeAPIClient + except ImportError: + return {"status": "skipped", "message": "CogneeAPIClient not available"} + + if self.cognee.mode != "service": + return {"status": "skipped", "message": "Cognee mode is not 'service'"} + + service_url = os.getenv("COGNEE_SERVICE_URL") or self.cognee.service_url + if not service_url: + return {"status": "skipped", "message": "No service URL configured"} + + email = self.cognee.service_user_email + password = self.cognee.service_user_password + if not email or not password: + return {"status": "error", "message": "Missing service credentials"} + + tenant_name = self.project.tenant_id or f"fuzzforge_project_{self.project.id}" + dataset_name = f"{self.project.name}_codebase" + + async def _bootstrap(): + client = CogneeAPIClient(service_url) + try: + profile = await client.ensure_user(email, password, tenant_name) + dataset_info = await client.ensure_dataset(dataset_name) + return { + "status": "success", + "message": f"Provisioned user/tenant/dataset on Cognee service", + "user": profile.get("email"), + "tenant": tenant_name, + "dataset": dataset_name, + "dataset_id": dataset_info.get("id"), + } + except Exception as exc: + return { + "status": "error", + "message": f"Failed to provision Cognee service: {exc}", + "error": str(exc), + } + finally: + await client.close() + + try: + return asyncio.run(_bootstrap()) + except RuntimeError as exc: # pragma: no cover - fallback where loop already running + if "event loop" in str(exc): + loop = asyncio.get_event_loop() + return loop.run_until_complete(_bootstrap()) + raise + except Exception as exc: + return { + "status": "error", + "message": f"Failed to run bootstrap: {exc}", + "error": str(exc), + } + def get_api_url(self) -> str: """Get API URL with environment variable override.""" return os.getenv("FUZZFORGE_API_URL", self.project.api_url) @@ -182,6 +430,15 @@ class FuzzForgeConfig(BaseModel): "tenant_id": self.project.tenant_id or "fuzzforge_tenant", "data_directory": self.cognee.data_directory, "system_directory": self.cognee.system_directory, + "project_dir": str(project_dir), + "cognee_mode": self.cognee.mode, + "cognee_storage_backend": self.cognee.storage_backend, + "cognee_s3_bucket": self.cognee.s3_bucket, + "cognee_s3_prefix": self.cognee.s3_prefix, + "cognee_service_user_email": self.cognee.service_user_email, + "cognee_service_user_password": self.cognee.service_user_password, + "cognee_service_env_dir": self.cognee.service_env_dir, + "cognee_service_url": self.cognee.service_url, } def get_cognee_config(self, project_dir: Path) -> Dict[str, Any]: @@ -249,6 +506,28 @@ def ensure_project_config( return config +def provision_cognee_service_for_project( + project_dir: Optional[Path] = None, +) -> dict[str, str]: + """Provision user/tenant/dataset on the Cognee service for this project. + + Args: + project_dir: Project directory path + + Returns: + Dictionary with provisioning status + """ + project_dir = Path(project_dir or Path.cwd()) + config = get_project_config(project_dir) + if config is None: + return { + "status": "error", + "message": "Project not initialized. Run 'ff init project' first.", + } + + return config._provision_cognee_service_account(project_dir) + + def get_global_config() -> FuzzForgeConfig: """Get global user configuration.""" home = Path.home() @@ -320,6 +599,53 @@ class ProjectConfigManager: except Exception: # pragma: no cover - best effort fallback pass + def _env(*names: str, default: str | None = None) -> str | None: + for name in names: + value = os.getenv(name) + if value: + return value + return default + + storage_mode = cognee.get("mode", "embedded") + os.environ["COGNEE_STORAGE_MODE"] = storage_mode + + storage_backend = cognee.get("storage_backend", "filesystem") + os.environ["STORAGE_BACKEND"] = storage_backend + + if storage_backend == "s3": + bucket = cognee.get("s3_bucket") or _env("COGNEE_S3_BUCKET", "S3_BUCKET") + if bucket: + os.environ["STORAGE_BUCKET_NAME"] = bucket + + prefix = cognee.get("s3_prefix") or _env("COGNEE_S3_PREFIX") + if prefix: + os.environ["COGNEE_S3_PREFIX"] = prefix + + aws_key = _env("COGNEE_AWS_ACCESS_KEY_ID", "AWS_ACCESS_KEY_ID", "S3_ACCESS_KEY") + if aws_key: + os.environ["AWS_ACCESS_KEY_ID"] = aws_key + + aws_secret = _env( + "COGNEE_AWS_SECRET_ACCESS_KEY", + "AWS_SECRET_ACCESS_KEY", + "S3_SECRET_KEY", + ) + if aws_secret: + os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret + + aws_endpoint = _env( + "COGNEE_AWS_ENDPOINT_URL", + "AWS_ENDPOINT_URL", + "S3_ENDPOINT", + ) + if aws_endpoint: + os.environ["AWS_ENDPOINT_URL"] = aws_endpoint + + aws_region = _env("COGNEE_AWS_REGION", "AWS_REGION", "S3_REGION") + if aws_region: + os.environ["AWS_REGION"] = aws_region + os.environ["AWS_DEFAULT_REGION"] = aws_region + backend_access = "true" if cognee.get("backend_access_control", True) else "false" os.environ["ENABLE_BACKEND_ACCESS_CONTROL"] = backend_access os.environ["GRAPH_DATABASE_PROVIDER"] = cognee.get("graph_database_provider", "kuzu") @@ -335,14 +661,29 @@ class ProjectConfigManager: os.environ["COGNEE_USER_ID"] = tenant_id os.environ["COGNEE_TENANT_ID"] = tenant_id + os.environ["COGNEE_PROJECT_ID"] = cognee.get("project_id", self._config.project.id) + service_user_email = cognee.get("service_user_email") + if service_user_email: + os.environ["COGNEE_SERVICE_USER_EMAIL"] = service_user_email + os.environ["DEFAULT_USER_EMAIL"] = service_user_email + service_password = _env( + "COGNEE_SERVICE_USER_PASSWORD", + "DEFAULT_USER_PASSWORD", + default=self._config.cognee.service_user_password, + ) + if service_password: + os.environ["COGNEE_SERVICE_USER_PASSWORD"] = service_password + os.environ["DEFAULT_USER_PASSWORD"] = service_password + if cognee.get("service_env_dir"): + os.environ["COGNEE_SERVICE_ENV_PATH"] = cognee["service_env_dir"] + + if cognee.get("service_url"): + os.environ["COGNEE_SERVICE_URL"] = cognee["service_url"] + if os.getenv("COGNEE_SERVICE_PORT"): + os.environ["COGNEE_SERVICE_PORT"] = os.getenv("COGNEE_SERVICE_PORT") + # Configure LLM provider defaults for Cognee. Values prefixed with COGNEE_ # take precedence so users can segregate credentials. - def _env(*names: str, default: str | None = None) -> str | None: - for name in names: - value = os.getenv(name) - if value: - return value - return default provider = _env( "LLM_COGNEE_PROVIDER", diff --git a/cognee/.env.service.example b/cognee/.env.service.example new file mode 100644 index 0000000..23267fd --- /dev/null +++ b/cognee/.env.service.example @@ -0,0 +1,52 @@ +# Cognee service environment template +# Copy to cognee/.env.service and adjust values before running docker-compose.cognee.yaml + +# --- FuzzForge tenant / project defaults --- +# --- Service defaults --- +COGNEE_STORAGE_MODE=service +COGNEE_SERVICE_URL=http://localhost:8000 +COGNEE_SERVICE_PORT=8000 +COGNEE_S3_BUCKET= +COGNEE_S3_PREFIX= +ENABLE_BACKEND_ACCESS_CONTROL=true + +# --- LLM provider (used by Cognee pipelines) --- +LLM_PROVIDER=openai +LLM_MODEL=gpt-4o-mini +OPENAI_API_KEY=replace_me +LLM_API_KEY=${OPENAI_API_KEY} + +# --- Hosted service user (per installation) --- +# Use a syntactically valid email (Pydantic enforces a real TLD) +COGNEE_SERVICE_USER_EMAIL=service@cognee.dev +COGNEE_SERVICE_USER_PASSWORD=change_me +DEFAULT_USER_EMAIL=${COGNEE_SERVICE_USER_EMAIL} +DEFAULT_USER_PASSWORD=${COGNEE_SERVICE_USER_PASSWORD} + +# --- S3 / MinIO configuration --- +# For an external MinIO (e.g. FuzzForge Temporal stack) set to http://host.docker.internal:9000 +# Point at external MinIO if needed, e.g. http://host.docker.internal:9000 +AWS_ENDPOINT_URL=http://minio:9000 +AWS_REGION=us-east-1 +AWS_ACCESS_KEY_ID=fuzzforge +AWS_SECRET_ACCESS_KEY=fuzzforge123 +STORAGE_BACKEND=s3 +STORAGE_BUCKET_NAME=${COGNEE_S3_BUCKET} + +# --- Postgres metadata database --- +DB_PROVIDER=postgres +DB_HOST=postgres +DB_PORT=5432 +DB_NAME=cognee_db +DB_USERNAME=cognee +DB_PASSWORD=cognee + +# Cognee data/system roots (must match mounted volumes) +SYSTEM_ROOT_DIRECTORY=/app/.cognee_system +DATA_ROOT_DIRECTORY=/app/.data_storage +# Optional cache override +# CACHE_ROOT_DIRECTORY=/app/.cognee_cache + +# --- Optional logging tweaks --- +COGNEE_LOG_LEVEL=INFO +LOG_LEVEL=INFO diff --git a/cognee/cognee b/cognee/cognee new file mode 160000 index 0000000..904cb17 --- /dev/null +++ b/cognee/cognee @@ -0,0 +1 @@ +Subproject commit 904cb1706253cf9ff51ebe754eb98e27b9f0626a diff --git a/docker-compose.cognee.yaml b/docker-compose.cognee.yaml new file mode 100644 index 0000000..fd23550 --- /dev/null +++ b/docker-compose.cognee.yaml @@ -0,0 +1,116 @@ +version: "3.9" + +x-cognee-env-file: &cognee_env_file ${COGNEE_SERVICE_ENV_FILE:-./cognee/.env.service} + +services: + cognee: + container_name: cognee-service + build: + context: ./cognee/cognee + dockerfile: Dockerfile + env_file: + - *cognee_env_file + environment: + ENABLE_BACKEND_ACCESS_CONTROL: "true" + STORAGE_BACKEND: "s3" + DATA_ROOT_DIRECTORY: ${DATA_ROOT_DIRECTORY:-s3://cognee-bucket/data} + SYSTEM_ROOT_DIRECTORY: ${SYSTEM_ROOT_DIRECTORY:-s3://cognee-bucket/system} + # Uppercase for standard env + AWS_ENDPOINT_URL: ${AWS_ENDPOINT_URL:-http://minio:9000} + AWS_REGION: ${AWS_REGION:-us-east-1} + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-fuzzforge} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} + # Lowercase for boto3/s3fs + aws_endpoint_url: ${AWS_ENDPOINT_URL:-http://minio:9000} + aws_region: ${AWS_REGION:-us-east-1} + aws_access_key_id: ${AWS_ACCESS_KEY_ID:-fuzzforge} + aws_secret_access_key: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} + # For LanceDB object_store + S3_ENDPOINT: ${AWS_ENDPOINT_URL:-http://minio:9000} + S3_REGION: ${AWS_REGION:-us-east-1} + S3_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-fuzzforge} + S3_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} + S3_ALLOW_HTTP: "true" + DB_PROVIDER: ${DB_PROVIDER:-postgres} + DB_HOST: ${DB_HOST:-postgres} + DB_PORT: ${DB_PORT:-5432} + DB_NAME: ${DB_NAME:-cognee_db} + DB_USERNAME: ${DB_USERNAME:-cognee} + DB_PASSWORD: ${DB_PASSWORD:-cognee} + HOST: 0.0.0.0 + LOG_LEVEL: ${COGNEE_LOG_LEVEL:-INFO} + volumes: + - cognee-system:/app/.cognee_system + - cognee-data:/app/.data_storage + ports: + - "${COGNEE_SERVICE_PORT:-8000}:8000" + depends_on: + - postgres + networks: + - cognee + + postgres: + image: pgvector/pgvector:pg17 + container_name: cognee-postgres + env_file: + - *cognee_env_file + environment: + POSTGRES_USER: ${DB_USERNAME:-cognee} + POSTGRES_PASSWORD: ${DB_PASSWORD:-cognee} + POSTGRES_DB: ${DB_NAME:-cognee_db} + volumes: + - cognee-postgres:/var/lib/postgresql/data + ports: + - "5432:5432" + networks: + - cognee + + minio: + image: minio/minio:latest + container_name: cognee-minio + command: server /data --console-address ":9001" + env_file: + - *cognee_env_file + environment: + MINIO_ROOT_USER: ${AWS_ACCESS_KEY_ID:-fuzzforge} + MINIO_ROOT_PASSWORD: ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} + volumes: + - cognee-minio:/data + ports: + - "9000:9000" + - "9001:9001" + networks: + - cognee + profiles: + - self-hosted-minio + + minio-mc: + image: minio/mc:latest + container_name: cognee-minio-mc + depends_on: + minio: + condition: service_started + env_file: + - *cognee_env_file + entrypoint: | + /bin/sh -c " + sleep 3 && \ + mc alias set cognee ${AWS_ENDPOINT_URL:-http://minio:9000} ${AWS_ACCESS_KEY_ID:-fuzzforge} ${AWS_SECRET_ACCESS_KEY:-fuzzforge123} && \ + mc mb -p cognee/${STORAGE_BUCKET_NAME:-cognee-bucket} || true && \ + mc anonymous set private cognee/${STORAGE_BUCKET_NAME:-cognee-bucket} && \ + tail -f /dev/null + " + networks: + - cognee + profiles: + - self-hosted-minio + +networks: + cognee: + driver: bridge + +volumes: + cognee-system: + cognee-data: + cognee-postgres: + cognee-minio: diff --git a/docs/docs/ai/a2a-services.md b/docs/docs/ai/a2a-services.md index aac4c94..75f0c94 100644 --- a/docs/docs/ai/a2a-services.md +++ b/docs/docs/ai/a2a-services.md @@ -12,7 +12,7 @@ Run the command from a project directory that already contains `.fuzzforge/`. Th **Default directories** - Logs: `.fuzzforge/logs/cognee.log` -- Cognee datasets: `.fuzzforge/cognee/project_/{data,system}` +- Cognee datasets: `.fuzzforge/cognee/project_/{data,system}` in embedded mode, or `s3:///cognee/projects//` when the service backend is active. - Artifact cache: `.fuzzforge/artifacts` ## HTTP Endpoints diff --git a/docs/docs/ai/architecture.md b/docs/docs/ai/architecture.md index 60f334b..73722e5 100644 --- a/docs/docs/ai/architecture.md +++ b/docs/docs/ai/architecture.md @@ -140,7 +140,7 @@ graph LR - **Session persistence** is controlled by `SESSION_PERSISTENCE`. When set to `sqlite`, ADK’s `DatabaseSessionService` writes transcripts to the path configured by `SESSION_DB_PATH` (defaults to `./fuzzforge_sessions.db`). With `inmemory`, the context is scoped to the current process. - **Semantic recall** stores vector embeddings so `/recall` queries can surface earlier prompts, even after restarts when using SQLite. - **Hybrid memory manager** (`HybridMemoryManager`) stitches Cognee results into the ADK session. When a knowledge query hits Cognee, the relevant nodes are appended back into the session context so follow-up prompts can reference them naturally. -- **Cognee datasets** are unique per project. Ingestion runs populate `_codebase` while custom calls to `ingest_to_dataset` let you maintain dedicated buckets (e.g., `insights`). Data is persisted inside `.fuzzforge/cognee/project_/` and shared across CLI and A2A modes. +- **Cognee datasets** are unique per project. Ingestion runs populate `_codebase` while custom calls to `ingest_to_dataset` let you maintain dedicated buckets (e.g., `insights`). Data is persisted inside `.fuzzforge/cognee/project_/` when running embedded, or under `s3:///cognee/projects//` when the hosted Cognee service is enabled. - **Task metadata** (workflow runs, artifact descriptors) lives in the executor’s in-memory caches but is also mirrored through A2A task events so remote agents can resubscribe if the CLI restarts. - **Operational check**: Run `/recall ` or `You> search project knowledge for "topic" using INSIGHTS` after ingestion to confirm both ADK session recall and Cognee graph access are active. - **CLI quick check**: `/memory status` summarises the current memory type, session persistence, and Cognee dataset directories from inside the agent shell. diff --git a/docs/docs/ai/configuration.md b/docs/docs/ai/configuration.md index cb42783..5d22585 100644 --- a/docs/docs/ai/configuration.md +++ b/docs/docs/ai/configuration.md @@ -81,6 +81,23 @@ LLM_COGNEE_API_KEY=sk-your-key If the Cognee variables are omitted, graph-specific tools remain available but return a friendly "not configured" response. +### Hosted Cognee Service + +See [Hosted Cognee Service](./cognee-service.md) for step-by-step instructions on starting the shared backend with Docker. + +When you want multiple projects to share a dedicated Cognee backend, point the CLI at the service and shared S3 bucket: + +```env +COGNEE_STORAGE_MODE=service +COGNEE_SERVICE_URL=http://localhost:8000 +COGNEE_S3_BUCKET=cognee-shared +COGNEE_S3_PREFIX=cognee/projects +COGNEE_SERVICE_USER_EMAIL=project_12345678@cognee.local +COGNEE_SERVICE_USER_PASSWORD=super-secret +``` + +During initialisation the CLI writes these values to `.fuzzforge/cognee/service/project_/.env`. Each project gets its own scoped dataset (default `_codebase`) while the service persists metadata in `s3:////` using the project and tenant identifiers. + ## MCP / Backend Integration ```env diff --git a/docs/docs/ai/ingestion.md b/docs/docs/ai/ingestion.md index 0af3c9e..bbc8669 100644 --- a/docs/docs/ai/ingestion.md +++ b/docs/docs/ai/ingestion.md @@ -38,14 +38,14 @@ All runs automatically skip `.fuzzforge/**` and `.git/**` to avoid recursive ing - Primary dataset: `_codebase` - Additional datasets: create ad-hoc buckets such as `insights` via the `ingest_to_dataset` tool -- Storage location: `.fuzzforge/cognee/project_/` +- Storage location: `.fuzzforge/cognee/project_/` when running embedded, or `s3:///cognee/projects//` when using the Cognee service mode. ### Persistence Details -- Every dataset lives under `.fuzzforge/cognee/project_/{data,system}`. These directories are safe to commit to long-lived storage (they only contain embeddings and metadata). +- Every dataset lives under `.fuzzforge/cognee/project_/{data,system}` when running locally. In service mode the same layout is mirrored to a shared S3 bucket so multiple projects can reuse the hosted Cognee instance without colliding. - Cognee assigns deterministic IDs per project; if you move the repository, copy the entire `.fuzzforge/cognee/` tree to retain graph history. - `HybridMemoryManager` ensures answers from Cognee are written back into the ADK session store so future prompts can refer to the same nodes without repeating the query. -- All Cognee processing runs locally against the files you ingest. No external service calls are made unless you configure a remote Cognee endpoint. +- In embedded mode all Cognee processing runs locally against the files you ingest. When `COGNEE_STORAGE_MODE=service`, the CLI streams files to the Cognee API, which stores them in the shared S3 prefix and runs the pipeline remotely before results flow back into the agent session. ## Prompt Examples @@ -77,6 +77,12 @@ FUZZFORGE_MCP_URL=http://localhost:8010/mcp LLM_COGNEE_PROVIDER=openai LLM_COGNEE_MODEL=gpt-5-mini LLM_COGNEE_API_KEY=sk-your-key + +# Optional: hosted Cognee service +COGNEE_STORAGE_MODE=service +COGNEE_SERVICE_URL=http://localhost:8000 +COGNEE_S3_BUCKET=cognee-shared +COGNEE_S3_PREFIX=cognee/projects ``` Add comments or project-specific overrides as needed; the agent reads these variables on startup.