Add event-driven Cognee ingestion pipeline

This commit is contained in:
Songbird
2025-11-07 17:21:18 +01:00
parent 83244ee537
commit 3ef6bf2437
17 changed files with 795 additions and 82 deletions

View File

@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### ✨ Enhancements
- Added Ladybug-backed Cognee integration with optional MinIO/S3 storage. Projects can now set `COGNEE_STORAGE_BACKEND=s3` (plus the `COGNEE_S3_*` settings) to keep knowledge graphs in the shared MinIO bucket seeded by `docker-compose`, enabling multi-tenant ingestion across workers and containers.
- Introduced a dedicated Cognee service (`docker/docker-compose.cognee.yml`) and HTTP client so `fuzzforge ingest` streams data to the shared backend (`COGNEE_SERVICE_URL`) instead of importing Cognee locally. Each project now auto-provisions its own Cognee account/tenant and authenticates via the REST API, keeping datasets isolated even though the service is shared.
- Added an event-driven ingestion pipeline: MinIO publishes `PUT` events from `s3://cognee/projects/<project-id>/...` to RabbitMQ, and the new `ingestion-dispatcher` container downloads the file, logs into Cognee as that projects tenant, and invokes `/api/v1/add` + `/api/v1/cognify`. Uploading files (rsync, CI, etc.) now keeps datasets fresh without touching the CLI.
### 📝 Documentation
- Added comprehensive worker startup documentation across all guides
- Added workflow-to-worker mapping tables in README, troubleshooting guide, getting started guide, and docker setup guide

View File

@@ -60,6 +60,8 @@ _Setting up and running security workflows through the interface_
- 🤖 **AI Agents for Security** Specialized agents for AppSec, reversing, and fuzzing
- 🛠 **Workflow Automation** Define & execute AppSec workflows as code
- 🧠 **Knowledge Graphs backed by Cognee** Multi-tenant Ladybug graphs stored in MinIO/S3 and reachable as a shared service for every project
-**Event-Driven Ingestion** Upload files to MinIO and let RabbitMQ + the dispatcher stream them into Cognee datasets automatically
- 📈 **Vulnerability Research at Scale** Rediscover 1-days & find 0-days with automation
- 🔗 **Fuzzer Integration** Atheris (Python), cargo-fuzz (Rust), OSS-Fuzz campaigns
- 🌐 **Community Marketplace** Share workflows, corpora, PoCs, and modules
@@ -157,6 +159,9 @@ cp volumes/env/.env.template volumes/env/.env
# 3. Start FuzzForge with Temporal
docker compose up -d
# 3b. Start the shared Cognee service (Ladybug + MinIO)
docker compose -f docker/docker-compose.cognee.yml up -d
# 4. Start the Python worker (needed for security_assessment workflow)
docker compose up -d worker-python
```
@@ -186,6 +191,19 @@ ff workflow run security_assessment . # Start workflow - CLI uploads files au
# - Create a compressed tarball
# - Upload to backend (via MinIO)
# - Start the workflow on vertical worker
### Automated Cognee Ingestion
Uploading files into MinIO automatically streams them into Cognee:
```
s3://cognee/projects/<project-id>/
files/... # → <project-id>_codebase dataset
findings/... # → <project-id>_findings dataset
docs/... # → <project-id>_docs dataset
```
MinIO emits the object-created event to RabbitMQ, the `ingestion-dispatcher` downloads the file, and it calls the Cognee REST API on behalf of that projects tenant. Use any upload mechanism you like (`aws s3 cp`, rsync to MinIO, etc.); once the object lands in the bucket it is ingested and cognified automatically.
```
**What's running:**

View File

@@ -0,0 +1,159 @@
"""HTTP client for the Cognee REST API."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Iterable, Sequence
import httpx
class CogneeApiError(RuntimeError):
"""Raised when the Cognee API returns an error status."""
def __init__(self, message: str, *, status_code: int | None = None):
super().__init__(message)
self.status_code = status_code
class CogneeApiClient:
"""Async client for interacting with the Cognee service."""
def __init__(
self,
base_url: str,
api_key: str | None = None,
*,
email: str | None = None,
password: str | None = None,
timeout: float = 180.0,
):
base = base_url.rstrip("/")
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
self._client = httpx.AsyncClient(
base_url=base,
timeout=httpx.Timeout(timeout),
follow_redirects=True,
headers=headers,
)
self._email = email
self._password = password
self._token: str | None = None
async def __aenter__(self) -> "CogneeApiClient":
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self.close()
async def close(self) -> None:
await self._client.aclose()
async def ensure_authenticated(self) -> None:
"""Ensure we have a bearer token before making privileged calls."""
if self._client.headers.get("Authorization") or self._token:
return
if not (self._email and self._password):
# Service might be running with authentication disabled.
return
try:
await self.register_user(self._email, self._password)
except CogneeApiError as exc:
if exc.status_code not in (400, 409):
raise
token = await self.login(self._email, self._password)
self._token = token
self._client.headers["Authorization"] = f"Bearer {token}"
async def register_user(self, email: str, password: str) -> Any:
payload = {
"email": email,
"password": password,
"is_active": True,
"is_verified": True,
}
response = await self._client.post("/api/v1/auth/register", json=payload)
return self._handle_response(response)
async def login(self, email: str, password: str) -> str:
data = {"username": email, "password": password}
response = await self._client.post("/api/v1/auth/login", data=data)
payload = self._handle_response(response)
token = payload.get("access_token")
if not token:
raise CogneeApiError("Cognee auth response did not include an access_token")
return token
async def add_files(self, file_paths: Iterable[Path], dataset_name: str) -> Any:
await self.ensure_authenticated()
files: list[tuple[str, tuple[str, bytes, str]]] = []
for path in file_paths:
data = path.read_bytes()
files.append(("data", (path.name, data, "application/octet-stream")))
data = {"datasetName": dataset_name}
response = await self._client.post("/api/v1/add", data=data, files=files)
return self._handle_response(response)
async def add_texts(self, texts: Sequence[str], dataset_name: str) -> Any:
await self.ensure_authenticated()
files: list[tuple[str, tuple[str, bytes, str]]] = []
for idx, text in enumerate(texts):
data = text.encode("utf-8")
files.append(("data", (f"snippet_{idx}.txt", data, "text/plain")))
response = await self._client.post(
"/api/v1/add",
data={"datasetName": dataset_name},
files=files,
)
return self._handle_response(response)
async def cognify(self, datasets: Sequence[str]) -> Any:
await self.ensure_authenticated()
payload = {"datasets": list(datasets), "run_in_background": False}
response = await self._client.post("/api/v1/cognify", json=payload)
return self._handle_response(response)
async def search(
self,
*,
query: str,
search_type: str,
datasets: Sequence[str] | None = None,
top_k: int | None = None,
only_context: bool = False,
) -> Any:
await self.ensure_authenticated()
payload: dict[str, object] = {
"query": query,
"search_type": search_type,
"only_context": only_context,
}
if datasets:
payload["datasets"] = list(datasets)
if top_k is not None:
payload["top_k"] = top_k
response = await self._client.post("/api/v1/search", json=payload)
return self._handle_response(response)
def _handle_response(self, response: httpx.Response) -> Any:
try:
response.raise_for_status()
except httpx.HTTPStatusError as exc: # pragma: no cover - surfaced to caller
message = exc.response.text
raise CogneeApiError(
f"Cognee API request failed ({exc.response.status_code}): {message}",
status_code=exc.response.status_code,
) from exc
if response.content:
return response.json()
return {}

View File

@@ -23,6 +23,7 @@ from rich.console import Console
from rich.prompt import Confirm
from ..config import ProjectConfigManager
from ..cognee_api import CogneeApiClient, CogneeApiError
from ..ingest_utils import collect_ingest_files
console = Console()
@@ -92,23 +93,18 @@ def ingest_callback(
config.setup_cognee_environment()
if os.getenv("FUZZFORGE_DEBUG", "0") == "1":
storage_backend = os.getenv("COGNEE_STORAGE_BACKEND", "local")
console.print(
"[dim]Cognee directories:\n"
f" DATA: {os.getenv('COGNEE_DATA_ROOT', 'unset')}\n"
f" SYSTEM: {os.getenv('COGNEE_SYSTEM_ROOT', 'unset')}\n"
f" USER: {os.getenv('COGNEE_USER_ID', 'unset')}\n",
f" USER: {os.getenv('COGNEE_USER_ID', 'unset')}\n"
f" STORAGE: {storage_backend}\n",
)
project_context = config.get_project_context()
target_path = path or Path.cwd()
dataset_name = dataset or f"{project_context['project_name']}_codebase"
try:
import cognee # noqa: F401 # Just to validate installation
except ImportError as exc:
console.print("[red]Cognee is not installed.[/red]")
console.print("Install with: pip install 'cognee[all]' litellm")
raise typer.Exit(1) from exc
dataset_name = dataset or f"{project_context['project_id']}_codebase"
console.print(f"[bold]🔍 Ingesting {target_path} into Cognee knowledge graph[/bold]")
console.print(
@@ -155,10 +151,21 @@ async def _run_ingestion(
force: bool,
) -> None:
"""Perform the actual ingestion work."""
from fuzzforge_ai.cognee_service import CogneeService
cognee_cfg = config.get_cognee_config()
service_url = (
cognee_cfg.get("service_url")
or os.getenv("COGNEE_SERVICE_URL")
or "http://localhost:18000"
)
service_email = os.getenv("COGNEE_SERVICE_EMAIL") or cognee_cfg.get("service_email")
service_password = os.getenv("COGNEE_SERVICE_PASSWORD") or cognee_cfg.get("service_password")
cognee_service = CogneeService(config)
await cognee_service.initialize()
if not service_email or not service_password:
console.print(
"[red]Missing Cognee service credentials.[/red] Run `ff init` again or set "
"COGNEE_SERVICE_EMAIL / COGNEE_SERVICE_PASSWORD in .fuzzforge/.env."
)
return
# Always skip internal bookkeeping directories
exclude_patterns = list(exclude or [])
@@ -192,11 +199,9 @@ async def _run_ingestion(
console.print(f"Found [green]{len(files_to_ingest)}[/green] files to ingest")
if force:
console.print("Cleaning existing data for this project...")
try:
await cognee_service.clear_data(confirm=True)
except Exception as exc:
console.print(f"[yellow]Warning:[/yellow] Could not clean existing data: {exc}")
console.print(
"[yellow]Warning:[/yellow] Force re-ingest is not yet supported for the remote Cognee service."
)
console.print("Adding files to Cognee...")
valid_file_paths = []
@@ -213,39 +218,62 @@ async def _run_ingestion(
console.print("[yellow]No readable files found to ingest[/yellow]")
return
results = await cognee_service.ingest_files(valid_file_paths, dataset)
async with CogneeApiClient(
service_url,
email=service_email,
password=service_password,
) as client:
try:
await client.ensure_authenticated()
except CogneeApiError as exc:
console.print(f"[red]Cognee authentication failed:[/red] {exc}")
return
except Exception as exc:
console.print(f"[red]Cognee authentication error:[/red] {exc}")
return
try:
await client.add_files(valid_file_paths, dataset)
await client.cognify([dataset])
except CogneeApiError as exc:
console.print(f"[red]Cognee API error:[/red] {exc}")
return
except Exception as exc:
console.print(f"[red]Unexpected Cognee error:[/red] {exc}")
return
console.print(
f"[green]✅ Successfully ingested {results['success']} files into knowledge graph[/green]"
)
if results["failed"]:
console.print(
f"[yellow]⚠️ Skipped {results['failed']} files due to errors[/yellow]"
f"[green]✅ Successfully ingested {len(valid_file_paths)} files into knowledge graph[/green]"
)
try:
insights = await cognee_service.search_insights(
query=f"What insights can you provide about the {dataset} dataset?",
dataset=dataset,
)
if insights:
console.print(f"\n[bold]📊 Generated {len(insights)} insights:[/bold]")
for index, insight in enumerate(insights[:3], 1):
console.print(f" {index}. {insight}")
if len(insights) > 3:
console.print(f" ... and {len(insights) - 3} more")
chunks = await cognee_service.search_chunks(
query=f"functions classes methods in {dataset}",
dataset=dataset,
)
if chunks:
console.print(
f"\n[bold]🔍 Sample searchable content ({len(chunks)} chunks found):[/bold]"
try:
insights = await client.search(
query=f"What insights can you provide about the {dataset} dataset?",
search_type="INSIGHTS",
datasets=[dataset],
)
for index, chunk in enumerate(chunks[:2], 1):
preview = chunk[:100] + "..." if len(chunk) > 100 else chunk
console.print(f" {index}. {preview}")
except Exception:
# Best-effort stats — ignore failures here
pass
insight_list = insights if isinstance(insights, list) else insights.get("results", [])
if insight_list:
console.print(f"\n[bold]📊 Generated {len(insight_list)} insights:[/bold]")
for index, insight in enumerate(insight_list[:3], 1):
console.print(f" {index}. {insight}")
if len(insight_list) > 3:
console.print(f" ... and {len(insight_list) - 3} more")
chunks = await client.search(
query=f"functions classes methods in {dataset}",
search_type="CHUNKS",
datasets=[dataset],
top_k=5,
)
chunk_list = chunks if isinstance(chunks, list) else chunks.get("results", [])
if chunk_list:
console.print(
f"\n[bold]🔍 Sample searchable content ({len(chunk_list)} chunks found):[/bold]"
)
for index, chunk in enumerate(chunk_list[:2], 1):
text = str(chunk)
preview = text[:100] + "..." if len(text) > 100 else text
console.print(f" {index}. {preview}")
except Exception:
pass

View File

@@ -246,6 +246,17 @@ def _ensure_env_file(fuzzforge_dir: Path, force: bool) -> None:
"LLM_COGNEE_EMBEDDING_MODEL=litellm_proxy/text-embedding-3-large",
"LLM_COGNEE_EMBEDDING_ENDPOINT=http://localhost:10999",
"COGNEE_MCP_URL=",
"COGNEE_SERVICE_URL=http://localhost:18000",
"COGNEE_STORAGE_BACKEND=s3",
"COGNEE_SERVICE_EMAIL=",
"COGNEE_SERVICE_PASSWORD=",
"COGNEE_S3_BUCKET=cognee",
"COGNEE_S3_PREFIX=",
"COGNEE_S3_ENDPOINT=http://localhost:9000",
"COGNEE_S3_REGION=us-east-1",
"COGNEE_S3_ACCESS_KEY=",
"COGNEE_S3_SECRET_KEY=",
"COGNEE_S3_ALLOW_HTTP=1",
"",
"# Session persistence options: inmemory | sqlite",
"SESSION_PERSISTENCE=sqlite",

View File

@@ -21,7 +21,7 @@ from __future__ import annotations
import hashlib
import os
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Literal
try: # Optional dependency; fall back if not installed
from dotenv import load_dotenv
@@ -131,7 +131,19 @@ class CogneeConfig(BaseModel):
"""Cognee integration metadata."""
enabled: bool = True
graph_database_provider: str = "kuzu"
graph_database_provider: str = "ladybug"
service_url: str = "http://localhost:18000"
api_key: Optional[str] = None
service_email: Optional[str] = None
service_password: Optional[str] = None
storage_backend: Literal["local", "s3"] = "s3"
s3_bucket: Optional[str] = None
s3_prefix: Optional[str] = "projects"
s3_endpoint_url: Optional[str] = None
s3_region: Optional[str] = None
s3_access_key: Optional[str] = None
s3_secret_key: Optional[str] = None
s3_allow_http: bool = False
data_directory: Optional[str] = None
system_directory: Optional[str] = None
backend_access_control: bool = True
@@ -201,25 +213,55 @@ class FuzzForgeConfig(BaseModel):
cognee.tenant_id = self.project.tenant_id
changed = True
base_dir = project_dir / ".fuzzforge" / "cognee" / f"project_{self.project.id}"
data_dir = base_dir / "data"
system_dir = base_dir / "system"
for path in (
base_dir,
data_dir,
system_dir,
system_dir / "kuzu_db",
system_dir / "lancedb",
):
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
if cognee.data_directory != str(data_dir):
cognee.data_directory = str(data_dir)
if not cognee.service_url:
cognee.service_url = "http://localhost:18000"
changed = True
if cognee.system_directory != str(system_dir):
cognee.system_directory = str(system_dir)
if not cognee.s3_prefix:
cognee.s3_prefix = "projects"
changed = True
default_email = f"project_{self.project.id}@fuzzforge.dev"
if not cognee.service_email or cognee.service_email.endswith(
("@cognee.local", "@cognee.localhost")
):
cognee.service_email = default_email
changed = True
derived_password = hashlib.sha256(self.project.id.encode()).hexdigest()[:20]
if not cognee.service_password or len(cognee.service_password) < 12:
cognee.service_password = derived_password
changed = True
if cognee.storage_backend.lower() == "s3":
bucket = cognee.s3_bucket or "cognee"
prefix = (cognee.s3_prefix or "projects").strip("/")
base_uri = f"s3://{bucket}/{prefix}/{self.project.id}"
data_dir = f"{base_uri}/files"
system_dir = f"{base_uri}/graph"
else:
base_dir = project_dir / ".fuzzforge" / "cognee" / f"project_{self.project.id}"
data_path = base_dir / "data"
system_path = base_dir / "system"
for path in (
base_dir,
data_path,
system_path,
system_path / "ladybug",
system_path / "lancedb",
):
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
data_dir = str(data_path)
system_dir = str(system_path)
if cognee.data_directory != data_dir:
cognee.data_directory = data_dir
changed = True
if cognee.system_directory != system_dir:
cognee.system_directory = system_dir
changed = True
return changed
@@ -368,16 +410,67 @@ class ProjectConfigManager:
backend_access = "true" if cognee.get("backend_access_control", True) else "false"
os.environ["ENABLE_BACKEND_ACCESS_CONTROL"] = backend_access
os.environ["GRAPH_DATABASE_PROVIDER"] = cognee.get("graph_database_provider", "kuzu")
graph_provider = cognee.get("graph_database_provider", "ladybug")
os.environ["GRAPH_DATABASE_PROVIDER"] = graph_provider
service_url = cognee.get("service_url") or os.getenv("COGNEE_SERVICE_URL") or "http://localhost:18000"
os.environ["COGNEE_SERVICE_URL"] = service_url
api_key = os.getenv("COGNEE_API_KEY") or cognee.get("api_key")
if api_key:
os.environ["COGNEE_API_KEY"] = api_key
service_email = os.getenv("COGNEE_SERVICE_EMAIL") or cognee.get("service_email")
if service_email:
os.environ["COGNEE_SERVICE_EMAIL"] = service_email
service_password = os.getenv("COGNEE_SERVICE_PASSWORD") or cognee.get("service_password")
if service_password:
os.environ["COGNEE_SERVICE_PASSWORD"] = service_password
data_dir = cognee.get("data_directory")
system_dir = cognee.get("system_directory")
tenant_id = cognee.get("tenant_id", "fuzzforge_tenant")
storage_backend = cognee.get("storage_backend", "local").lower()
os.environ["COGNEE_STORAGE_BACKEND"] = storage_backend
if storage_backend == "s3":
os.environ["STORAGE_BACKEND"] = "s3"
bucket = os.getenv("COGNEE_S3_BUCKET") or cognee.get("s3_bucket") or "cognee"
os.environ["STORAGE_BUCKET_NAME"] = bucket
os.environ["COGNEE_S3_BUCKET"] = bucket
prefix_override = os.getenv("COGNEE_S3_PREFIX") or cognee.get("s3_prefix")
if prefix_override:
os.environ["COGNEE_S3_PREFIX"] = prefix_override
endpoint = os.getenv("COGNEE_S3_ENDPOINT") or cognee.get("s3_endpoint_url") or "http://localhost:9000"
os.environ["AWS_ENDPOINT_URL"] = endpoint
os.environ["COGNEE_S3_ENDPOINT"] = endpoint
region = os.getenv("COGNEE_S3_REGION") or cognee.get("s3_region") or "us-east-1"
os.environ["AWS_REGION"] = region
os.environ["COGNEE_S3_REGION"] = region
access_key = os.getenv("COGNEE_S3_ACCESS_KEY") or cognee.get("s3_access_key")
secret_key = os.getenv("COGNEE_S3_SECRET_KEY") or cognee.get("s3_secret_key")
if access_key:
os.environ.setdefault("AWS_ACCESS_KEY_ID", access_key)
os.environ["COGNEE_S3_ACCESS_KEY"] = access_key
if secret_key:
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", secret_key)
os.environ["COGNEE_S3_SECRET_KEY"] = secret_key
allow_http_env = os.getenv("COGNEE_S3_ALLOW_HTTP")
allow_http_flag = allow_http_env if allow_http_env is not None else ("true" if cognee.get("s3_allow_http") else "false")
if allow_http_flag.lower() in {"1", "true", "yes"}:
os.environ["AWS_ALLOW_HTTP"] = "true"
os.environ["COGNEE_S3_ALLOW_HTTP"] = "1"
if data_dir:
os.environ["COGNEE_DATA_ROOT"] = data_dir
os.environ.setdefault("DATA_ROOT_DIRECTORY", data_dir)
if system_dir:
os.environ["COGNEE_SYSTEM_ROOT"] = system_dir
os.environ.setdefault("SYSTEM_ROOT_DIRECTORY", system_dir)
os.environ["COGNEE_USER_ID"] = tenant_id
os.environ["COGNEE_TENANT_ID"] = tenant_id

View File

@@ -97,6 +97,12 @@ services:
MINIO_ROOT_PASSWORD: fuzzforge123
# Lightweight mode for development (reduces memory to 256MB)
MINIO_CI_CD: "true"
MINIO_NOTIFY_AMQP_ENABLE_INGEST: "on"
MINIO_NOTIFY_AMQP_URL_INGEST: amqp://ingest:ingest@rabbitmq:5672
MINIO_NOTIFY_AMQP_EXCHANGE_INGEST: cognee-ingest
MINIO_NOTIFY_AMQP_EXCHANGE_TYPE_INGEST: fanout
MINIO_NOTIFY_AMQP_ROUTING_KEY_INGEST: ""
MINIO_NOTIFY_AMQP_DELIVERY_MODE_INGEST: "1"
volumes:
- minio_data:/data
networks:
@@ -129,11 +135,18 @@ services:
mc mb fuzzforge/targets --ignore-existing;
mc mb fuzzforge/results --ignore-existing;
mc mb fuzzforge/cache --ignore-existing;
mc mb fuzzforge/cognee --ignore-existing;
mc mb fuzzforge/projects --ignore-existing;
echo 'Configuring project ingestion events...';
mc event remove fuzzforge/projects arn:minio:sqs::ingest || true;
mc event add fuzzforge/projects arn:minio:sqs::ingest --event put;
echo 'Setting lifecycle policies...';
mc ilm add fuzzforge/targets --expiry-days 7;
mc ilm add fuzzforge/results --expiry-days 30;
mc ilm add fuzzforge/cache --expiry-days 3;
mc ilm add fuzzforge/cognee --expiry-days 90;
echo 'Setting access policies...';
mc anonymous set download fuzzforge/results;
@@ -144,6 +157,24 @@ services:
networks:
- fuzzforge-network
rabbitmq:
image: rabbitmq:3.13-management
container_name: fuzzforge-rabbitmq
environment:
RABBITMQ_DEFAULT_USER: ingest
RABBITMQ_DEFAULT_PASS: ingest
ports:
- "5672:5672"
- "15672:15672"
networks:
- fuzzforge-network
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
# ============================================================================
# LLM Proxy - LiteLLM Gateway
# ============================================================================
@@ -541,11 +572,33 @@ services:
networks:
- fuzzforge-network
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
ingestion-dispatcher:
build:
context: ./services/ingestion_dispatcher
dockerfile: Dockerfile
container_name: fuzzforge-ingestion-dispatcher
depends_on:
rabbitmq:
condition: service_healthy
minio:
condition: service_healthy
environment:
RABBITMQ_URL: amqp://ingest:ingest@rabbitmq:5672/
RABBITMQ_EXCHANGE: cognee-ingest
RABBITMQ_QUEUE: cognee-ingestion-dispatcher
S3_ENDPOINT: http://minio:9000
S3_REGION: us-east-1
S3_BUCKET: projects
S3_ACCESS_KEY: fuzzforge
S3_SECRET_KEY: fuzzforge123
COGNEE_SERVICE_URL: http://fuzzforge-cognee:8000
DATASET_CATEGORY_MAP: files:codebase,findings:findings,docs:docs
EMAIL_DOMAIN: fuzzforge.dev
LOG_LEVEL: INFO
networks:
- fuzzforge-network
restart: unless-stopped
# ============================================================================
# Task Agent - A2A LiteLLM Agent

View File

@@ -0,0 +1,46 @@
version: "3.9"
services:
cognee:
build:
context: ../../cognee
dockerfile: Dockerfile
container_name: fuzzforge-cognee
env_file:
- ../volumes/env/.env
environment:
GRAPH_DATABASE_PROVIDER: ladybug
ENABLE_BACKEND_ACCESS_CONTROL: "true"
STORAGE_BACKEND: s3
STORAGE_BUCKET_NAME: ${COGNEE_S3_BUCKET:-cognee}
DATA_ROOT_DIRECTORY: s3://${COGNEE_S3_BUCKET:-cognee}/${COGNEE_S3_PREFIX:-projects}
SYSTEM_ROOT_DIRECTORY: s3://${COGNEE_S3_BUCKET:-cognee}/${COGNEE_S3_PREFIX:-projects}
CACHE_ROOT_DIRECTORY: s3://${COGNEE_S3_BUCKET:-cognee}/${COGNEE_S3_PREFIX:-projects}/cache
DB_PROVIDER: sqlite
DB_PATH: /data/relational
DB_NAME: cognee.db
MIGRATION_DB_PROVIDER: sqlite
MIGRATION_DB_PATH: /data/relational
MIGRATION_DB_NAME: cognee.db
AWS_ENDPOINT_URL: ${COGNEE_S3_ENDPOINT:-http://minio:9000}
AWS_REGION: ${COGNEE_S3_REGION:-us-east-1}
AWS_ACCESS_KEY_ID: ${COGNEE_S3_ACCESS_KEY:-fuzzforge}
AWS_SECRET_ACCESS_KEY: ${COGNEE_S3_SECRET_KEY:-fuzzforge123}
AWS_ALLOW_HTTP: ${COGNEE_S3_ALLOW_HTTP:-1}
LITELLM_PROXY_API_BASE: http://llm-proxy:4000
OPENAI_API_BASE: http://llm-proxy:4000
LLM_ENDPOINT: http://llm-proxy:4000
LOG_LEVEL: INFO
ENVIRONMENT: dev
ports:
- "18000:8000"
networks:
- fuzzforge-network
volumes:
- ../volumes/cognee-db:/data/relational
restart: unless-stopped
networks:
fuzzforge-network:
external: true
name: fuzzforge_temporal_network

View File

@@ -10,9 +10,9 @@ fuzzforge ai server
Run the command from a project directory that already contains `.fuzzforge/`. The server reads the project configuration and reuses the same environment variables as the CLI shell.
**Default directories**
**Default directories / services**
- Logs: `.fuzzforge/logs/cognee.log`
- Cognee datasets: `.fuzzforge/cognee/project_<id>/{data,system}`
- Cognee datasets: hosted by the shared Cognee service (`COGNEE_SERVICE_URL`) inside the configured MinIO/S3 bucket. Local mode falls back to `.fuzzforge/cognee/project_<id>/{data,system}`. Uploads dropped into `s3://cognee/projects/<project-id>/...` are ingested automatically via RabbitMQ + the dispatcher.
- Artifact cache: `.fuzzforge/artifacts`
## HTTP Endpoints

View File

@@ -73,7 +73,8 @@ sequenceDiagram
- **Remote agent registry** (`ai/src/fuzzforge_ai/remote_agent.py`) holds metadata for downstream agents and handles capability discovery over HTTP. Auto-registration is configured by `ConfigManager` so known agents attach on startup.
- **Memory services**:
- `FuzzForgeMemoryService` and `HybridMemoryManager` (`ai/src/fuzzforge_ai/memory_service.py`) provide conversation recall and bridge to Cognee datasets when configured.
- Cognee bootstrap (`ai/src/fuzzforge_ai/cognee_service.py`) ensures ingestion and knowledge queries stay scoped to the current project.
- Cognee bootstrap (`ai/src/fuzzforge_ai/cognee_service.py`) ensures ingestion and knowledge queries stay scoped to the current project and forwards them to the shared Cognee service (`COGNEE_SERVICE_URL`). Datasets live inside the configured MinIO/S3 bucket, with `.fuzzforge/cognee/` available only when `COGNEE_STORAGE_BACKEND=local`.
- MinIO bucket notifications push object-created events into RabbitMQ. The `ingestion-dispatcher` container listens on `cognee-ingest`, downloads the object, and invokes Cognees REST API on behalf of the projects tenant so uploads become datasets without a manual CLI hop.
## Workflow Automation
@@ -91,7 +92,7 @@ The CLI surface mirrors these helpers as natural-language prompts (`You> run fuz
## Knowledge & Ingestion
- The `fuzzforge ingest` and `fuzzforge rag ingest` commands call into `ai/src/fuzzforge_ai/ingest_utils.py`, which filters file types, ignores caches, and populates Cognee datasets under `.fuzzforge/cognee/project_<id>/`.
- The `fuzzforge ingest` and `fuzzforge rag ingest` commands call into `ai/src/fuzzforge_ai/ingest_utils.py`, which filters file types, ignores caches, and streams files to the Cognee service where they are stored under `s3://<bucket>/<prefix>/project_<id>/`. When files land directly in `s3://cognee/projects/<project-id>/<category>/...`, the dispatcher performs the same workflow automatically via RabbitMQ events.
- Runtime queries hit `query_project_knowledge_api` on the executor, which defers to `cognee_service` for dataset lookup and semantic search. When Cognee credentials are absent the tools return a friendly "not configured" response.
## Artifact Pipeline
@@ -140,7 +141,7 @@ graph LR
- **Session persistence** is controlled by `SESSION_PERSISTENCE`. When set to `sqlite`, ADKs `DatabaseSessionService` writes transcripts to the path configured by `SESSION_DB_PATH` (defaults to `./fuzzforge_sessions.db`). With `inmemory`, the context is scoped to the current process.
- **Semantic recall** stores vector embeddings so `/recall` queries can surface earlier prompts, even after restarts when using SQLite.
- **Hybrid memory manager** (`HybridMemoryManager`) stitches Cognee results into the ADK session. When a knowledge query hits Cognee, the relevant nodes are appended back into the session context so follow-up prompts can reference them naturally.
- **Cognee datasets** are unique per project. Ingestion runs populate `<project>_codebase` while custom calls to `ingest_to_dataset` let you maintain dedicated buckets (e.g., `insights`). Data is persisted inside `.fuzzforge/cognee/project_<id>/` and shared across CLI and A2A modes.
- **Cognee datasets** are unique per project. Ingestion runs populate `<project>_codebase` while custom calls to `ingest_to_dataset` let you maintain dedicated buckets (e.g., `insights`). Data is persisted inside the Cognee services bucket/prefix and is shared across CLI, HTTP server, and MCP integrations.
- **Task metadata** (workflow runs, artifact descriptors) lives in the executors in-memory caches but is also mirrored through A2A task events so remote agents can resubscribe if the CLI restarts.
- **Operational check**: Run `/recall <keyword>` or `You> search project knowledge for "topic" using INSIGHTS` after ingestion to confirm both ADK session recall and Cognee graph access are active.
- **CLI quick check**: `/memory status` summarises the current memory type, session persistence, and Cognee dataset directories from inside the agent shell.

View File

@@ -81,6 +81,33 @@ LLM_COGNEE_API_KEY=sk-your-key
If the Cognee variables are omitted, graph-specific tools remain available but return a friendly "not configured" response.
### Cognee Storage Backend
Cognee defaults to local storage under `.fuzzforge/cognee/`, but you can mirror datasets to MinIO/S3 for multi-tenant or containerised deployments:
```env
COGNEE_STORAGE_BACKEND=s3
COGNEE_S3_BUCKET=cognee
COGNEE_S3_PREFIX=project_${PROJECT_ID}
COGNEE_S3_ENDPOINT=http://localhost:9000
COGNEE_S3_REGION=us-east-1
COGNEE_S3_ACCESS_KEY=fuzzforge
COGNEE_S3_SECRET_KEY=fuzzforge123
COGNEE_S3_ALLOW_HTTP=1
```
Set the values to match your MinIO/S3 endpoint; the docker compose stack seeds a `cognee` bucket automatically. When S3 mode is active, ingestion and search work exactly the same but Cognee writes metadata to `s3://<bucket>/<prefix>/project_<id>/{data,system}`.
### Cognee Service URL
The CLI and workers talk to Cognee over HTTP. Point `COGNEE_SERVICE_URL` at the service (defaults to `http://localhost:18000` when you run `docker/docker-compose.cognee.yml`) and provide `COGNEE_API_KEY` if you protect the API behind LiteLLM.
Every project gets its own Cognee login so datasets stay isolated. The CLI auto-derives an email/password pair (e.g., `project_<id>@fuzzforge.dev`) and registers it the first time you run `fuzzforge ingest`. Override those defaults by setting `COGNEE_SERVICE_EMAIL` / `COGNEE_SERVICE_PASSWORD` in `.fuzzforge/.env` before running ingestion if you need to reuse an existing account.
### MinIO Event Mapping
The ingestion dispatcher converts S3 prefixes to datasets using `DATASET_CATEGORY_MAP` (default `files:codebase,findings:findings,docs:docs`). Adjust it in `docker-compose.yml` if you want to add more categories or rename datasets.
## MCP / Backend Integration
```env

View File

@@ -38,12 +38,13 @@ All runs automatically skip `.fuzzforge/**` and `.git/**` to avoid recursive ing
- Primary dataset: `<project>_codebase`
- Additional datasets: create ad-hoc buckets such as `insights` via the `ingest_to_dataset` tool
- Storage location: `.fuzzforge/cognee/project_<id>/`
- Storage location (service default): `s3://<bucket>/<prefix>/project_<id>/{data,system}` as defined by the Cognee service (the docker compose stack seeds a `cognee` bucket automatically).
- Local mode (opt-in): set `COGNEE_STORAGE_BACKEND=local` to fall back to `.fuzzforge/cognee/project_<id>/` when developing without MinIO.
### Persistence Details
- Every dataset lives under `.fuzzforge/cognee/project_<id>/{data,system}`. These directories are safe to commit to long-lived storage (they only contain embeddings and metadata).
- Cognee assigns deterministic IDs per project; if you move the repository, copy the entire `.fuzzforge/cognee/` tree to retain graph history.
- The Cognee service keeps datasets inside the configured bucket/prefix (`s3://<bucket>/<prefix>/project_<id>/{data,system}`) so every project has its own Ladybug + LanceDB pair. Local mode mirrors the same layout under `.fuzzforge/cognee/project_<id>/`.
- Cognee assigns deterministic IDs per project; copy the entire prefix (local or S3) if you migrate repositories to retain graph history.
- `HybridMemoryManager` ensures answers from Cognee are written back into the ADK session store so future prompts can refer to the same nodes without repeating the query.
- All Cognee processing runs locally against the files you ingest. No external service calls are made unless you configure a remote Cognee endpoint.
@@ -77,10 +78,40 @@ FUZZFORGE_MCP_URL=http://localhost:8010/mcp
LLM_COGNEE_PROVIDER=openai
LLM_COGNEE_MODEL=gpt-5-mini
LLM_COGNEE_API_KEY=sk-your-key
COGNEE_SERVICE_URL=http://localhost:18000
COGNEE_API_KEY=
```
The CLI auto-registers a dedicated Cognee account per project the first time you ingest (email pattern `project_<id>@cognee.local`). Set `COGNEE_SERVICE_EMAIL` / `COGNEE_SERVICE_PASSWORD` in `.fuzzforge/.env` if you prefer to reuse an existing login.
Switch the knowledge graph storage to S3/MinIO by adding:
```env
COGNEE_STORAGE_BACKEND=s3
COGNEE_S3_BUCKET=cognee
COGNEE_S3_PREFIX=project_${PROJECT_ID}
COGNEE_S3_ENDPOINT=http://localhost:9000
COGNEE_S3_ACCESS_KEY=fuzzforge
COGNEE_S3_SECRET_KEY=fuzzforge123
COGNEE_S3_ALLOW_HTTP=1
```
The default `docker-compose` stack already seeds a `cognee` bucket inside MinIO so these values work out-of-the-box. Point `COGNEE_SERVICE_URL` at the Cognee container (included in `docker/docker-compose.cognee.yml`) so `fuzzforge ingest` sends all requests to the shared service instead of importing Cognee locally.
Add comments or project-specific overrides as needed; the agent reads these variables on startup.
## Event-Driven Ingestion
Uploading files directly into MinIO triggers Cognee automatically. The dispatcher watches `s3://cognee/projects/<project-id>/...` and translates the top-level folder into a dataset:
| Prefix | Dataset name |
|-----------|---------------------------------------|
| `files/` | `<project-id>_codebase` |
| `findings/` | `<project-id>_findings` |
| `docs/` | `<project-id>_docs` |
Under the hood MinIO publishes a `PUT` event → RabbitMQ (`cognee-ingest` exchange) → the `ingestion-dispatcher` container downloads the object and calls `/api/v1/add` + `/api/v1/cognify` using the deterministic project credentials (`project_<id>@fuzzforge.dev`). That means rsync, `aws s3 cp`, GitHub Actions, or any other tool that writes to the bucket can seed Cognee without touching the CLI.
## Tips
- Re-run ingestion after significant code changes to keep the knowledge graph fresh.

View File

@@ -231,6 +231,20 @@ nano volumes/env/.env
See [Getting Started](../tutorial/getting-started.md) for detailed environment setup.
### Cognee Service Stack
Cognee now runs as its own container so every project shares the same multi-tenant backend (Ladybug + LanceDB sitting on MinIO). After the core stack is running, bring the service online with:
```bash
docker compose -f docker/docker-compose.cognee.yml up -d
```
This spins up the Cognee API on `http://localhost:18000`, publishes it to the host, and stores knowledge graphs in the `cognee` bucket that the main compose file seeds. Point the CLI at it by setting `COGNEE_SERVICE_URL=http://localhost:18000` (already included in `.env.template`).
### RabbitMQ + Dispatcher
`docker-compose.yml` also launches RabbitMQ (`http://localhost:15672`, ingest/ingest) and the `ingestion-dispatcher` container. MinIO publishes `PUT` events from `s3://cognee/projects/<project-id>/...` to the `cognee-ingest` exchange, and the dispatcher downloads the object and calls Cognees REST API. That means any rsync/upload into the projects bucket automatically becomes a dataset.
---
## Troubleshooting

View File

@@ -0,0 +1,11 @@
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py ./
CMD ["python", "app.py"]

View File

@@ -0,0 +1,199 @@
import json
import logging
import os
import tempfile
from contextlib import contextmanager
from dataclasses import dataclass
from hashlib import sha256
from typing import Dict, Iterable, List, Optional
from urllib.parse import unquote_plus
import boto3
import httpx
import pika
from tenacity import retry, stop_after_attempt, wait_fixed
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
LOGGER = logging.getLogger("cognee-dispatcher")
@dataclass(frozen=True)
class Record:
bucket: str
key: str
project_id: str
category: str
dataset: str
class S3Client:
def __init__(self) -> None:
self.default_bucket = os.getenv("S3_BUCKET", "projects")
session = boto3.session.Session()
self.client = session.client(
"s3",
endpoint_url=os.getenv("S3_ENDPOINT", "http://minio:9000"),
aws_access_key_id=os.getenv("S3_ACCESS_KEY"),
aws_secret_access_key=os.getenv("S3_SECRET_KEY"),
region_name=os.getenv("S3_REGION", "us-east-1"),
)
@contextmanager
def download(self, key: str, bucket: Optional[str] = None):
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.close()
try:
target_bucket = bucket or self.default_bucket
self.client.download_file(target_bucket, key, tmp.name)
yield tmp.name
finally:
try:
os.unlink(tmp.name)
except FileNotFoundError:
pass
class CogneeApiClient:
def __init__(self, base_url: str, email: str, password: str) -> None:
self.base_url = base_url.rstrip("/")
self.email = email
self.password = password
self._token: Optional[str] = None
self._client = httpx.Client(timeout=httpx.Timeout(180.0))
def close(self) -> None:
self._client.close()
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def ensure_authenticated(self) -> None:
if self._token:
return
if not self._login():
LOGGER.info("Registering new Cognee user %s", self.email)
self._register()
if not self._login(): # pragma: no cover
raise RuntimeError("Unable to authenticate with Cognee service")
def _register(self) -> None:
response = self._client.post(
f"{self.base_url}/api/v1/auth/register",
json={"email": self.email, "password": self.password},
)
if response.status_code not in (200, 201, 400):
response.raise_for_status()
def _login(self) -> bool:
response = self._client.post(
f"{self.base_url}/api/v1/auth/login",
data={"username": self.email, "password": self.password},
)
if response.status_code != 200:
return False
self._token = response.json().get("access_token")
self._client.headers["Authorization"] = f"Bearer {self._token}"
return True
def add_file(self, file_path: str, dataset: str) -> None:
with open(file_path, "rb") as fh:
files = {"data": (os.path.basename(file_path), fh)}
data = {"datasetName": dataset}
response = self._client.post(f"{self.base_url}/api/v1/add", data=data, files=files)
if response.status_code not in (200, 201):
raise RuntimeError(f"Add failed: {response.text}")
def cognify(self, dataset: str) -> None:
payload = {"datasets": [dataset], "run_in_background": False}
response = self._client.post(f"{self.base_url}/api/v1/cognify", json=payload)
if response.status_code not in (200, 201):
raise RuntimeError(f"Cognify failed: {response.text}")
class Dispatcher:
def __init__(self) -> None:
self.s3 = S3Client()
self.cognee_url = os.getenv("COGNEE_SERVICE_URL", "http://fuzzforge-cognee:8000")
self.email_domain = os.getenv("EMAIL_DOMAIN", "fuzzforge.dev")
self.category_map = self._parse_category_map(os.getenv("DATASET_CATEGORY_MAP"))
@staticmethod
def _parse_category_map(raw: Optional[str]) -> Dict[str, str]:
mapping: Dict[str, str] = {}
if not raw:
return mapping
for pair in raw.split(","):
if ":" not in pair:
continue
category, suffix = pair.split(":", 1)
mapping[category.strip()] = suffix.strip()
return mapping
def handle_record(self, record: Record) -> None:
LOGGER.info("Processing %s -> dataset %s", record.key, record.dataset)
with self.s3.download(record.key, record.bucket) as local_path:
client = CogneeApiClient(
base_url=self.cognee_url,
email=self._service_email(record.project_id),
password=self._service_password(record.project_id),
)
try:
client.ensure_authenticated()
client.add_file(local_path, record.dataset)
client.cognify(record.dataset)
finally:
client.close()
def _service_email(self, project_id: str) -> str:
return f"project_{project_id}@{self.email_domain}"
def _service_password(self, project_id: str) -> str:
return sha256(project_id.encode()).hexdigest()[:20]
def parse_records(self, payload: Dict) -> Iterable[Record]:
for record in payload.get("Records", []):
s3_info = record.get("s3", {})
bucket = s3_info.get("bucket", {}).get("name")
key = unquote_plus(s3_info.get("object", {}).get("key", ""))
key_parts = key.split("/")
if len(key_parts) < 3:
LOGGER.debug("Skipping key without project/category: %s", key)
continue
project_id, category = key_parts[0], key_parts[1]
dataset_suffix = self.category_map.get(category)
if not dataset_suffix:
LOGGER.debug("Ignoring category %s for %s", category, key)
continue
dataset = f"{project_id}_{dataset_suffix}"
yield Record(bucket=bucket or self.s3.default_bucket, key="/".join(key_parts), project_id=project_id, category=category, dataset=dataset)
def main() -> None:
dispatcher = Dispatcher()
rabbit_url = os.getenv("RABBITMQ_URL", "amqp://ingest:ingest@rabbitmq:5672/")
exchange = os.getenv("RABBITMQ_EXCHANGE", "cognee-ingest")
queue_name = os.getenv("RABBITMQ_QUEUE", "cognee-ingestion-dispatcher")
connection = pika.BlockingConnection(pika.URLParameters(rabbit_url))
channel = connection.channel()
channel.exchange_declare(exchange=exchange, exchange_type="fanout", durable=True)
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(queue=queue_name, exchange=exchange)
channel.basic_qos(prefetch_count=1)
def _callback(ch, method, _properties, body):
try:
payload = json.loads(body.decode("utf-8"))
for record in dispatcher.parse_records(payload):
dispatcher.handle_record(record)
except Exception as exc: # pragma: no cover
LOGGER.exception("Failed to process event: %s", exc)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
return
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=_callback)
LOGGER.info("Ingestion dispatcher listening on %s", queue_name)
channel.start_consuming()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,4 @@
boto3==1.34.146
pika==1.3.2
httpx==0.28.1
tenacity==9.0.0

View File

@@ -63,3 +63,16 @@ LLM_EMBEDDING_MODEL=litellm_proxy/text-embedding-3-large
# -----------------------------------------------------------------------------
UI_USERNAME=fuzzforge
UI_PASSWORD=fuzzforge123
# Cognee service configuration
COGNEE_MCP_URL=
COGNEE_SERVICE_URL=http://localhost:18000
COGNEE_API_KEY=
COGNEE_STORAGE_BACKEND=s3
COGNEE_S3_BUCKET=cognee
COGNEE_S3_PREFIX=projects
COGNEE_S3_ENDPOINT=http://localhost:9000
COGNEE_S3_REGION=us-east-1
COGNEE_S3_ACCESS_KEY=fuzzforge
COGNEE_S3_SECRET_KEY=fuzzforge123
COGNEE_S3_ALLOW_HTTP=1