Compare commits

..

29 Commits

Author SHA1 Message Date
tduhamel42
511a89c8c2 Update GitHub link to fuzzforge_ai 2025-11-04 17:42:52 +01:00
tduhamel42
321b9d5eed chore: bump all package versions to 0.7.3 for consistency 2025-11-04 14:04:33 +01:00
tduhamel42
7782e3917a docs: update CHANGELOG with missing versions and recent changes
- Add Unreleased section for post-v0.7.3 documentation updates
- Add v0.7.2 entry with bug fixes and worker improvements
- Document that v0.7.1 was re-tagged as v0.7.2
- Fix v0.6.0 date to "Undocumented" (no tag exists)
- Add version comparison links for easier navigation
2025-11-04 14:04:33 +01:00
tduhamel42
e33c611711 chore: add worker startup documentation and cleanup .gitignore
- Add workflow-to-worker mapping tables across documentation
- Update troubleshooting guide with worker requirements section
- Enhance getting started guide with worker examples
- Add quick reference to docker setup guide
- Add WEEK_SUMMARY*.md pattern to .gitignore
2025-11-04 14:04:33 +01:00
tduhamel42
bdcedec091 docs: fix broken documentation links in cli-reference 2025-11-04 14:04:33 +01:00
tduhamel42
1a835b95ee chore: bump version to 0.7.3 2025-11-04 14:04:33 +01:00
tduhamel42
d005521c78 fix: MobSF scanner now properly parses files dict structure
MobSF returns 'files' as a dict (not list):
{"filename": "line_numbers"}

The parser was treating it as a list, causing zero findings
to be extracted. Now properly iterates over the dict and
creates one finding per affected file with correct line numbers
and metadata (CWE, OWASP, MASVS, CVSS).

Fixed in both code_analysis and behaviour sections.
2025-11-04 14:04:33 +01:00
tduhamel42
9a7138fdb6 feat(cli): add worker management commands with improved progress feedback
Add comprehensive CLI commands for managing Temporal workers:
- ff worker list - List workers with status and uptime
- ff worker start <name> - Start specific worker with optional rebuild
- ff worker stop - Safely stop all workers without affecting core services

Improvements:
- Live progress display during worker startup with Rich Status spinner
- Real-time elapsed time counter and container state updates
- Health check status tracking (starting → unhealthy → healthy)
- Helpful contextual hints at 10s, 30s, 60s intervals
- Better timeout messages showing last known state

Worker management enhancements:
- Use 'docker compose' (space) instead of 'docker-compose' (hyphen)
- Stop workers individually with 'docker stop' to avoid stopping core services
- Platform detection and Dockerfile selection (ARM64/AMD64)

Documentation:
- Updated docker-setup.md with CLI commands as primary method
- Created comprehensive cli-reference.md with all commands and examples
- Added worker management best practices
2025-11-04 14:04:33 +01:00
Songbird
8bf5e1bb77 refactor: replace .env.example with .env.template in documentation
- Remove volumes/env/.env.example file
- Update all documentation references to use .env.template instead
- Update bootstrap script error message
- Update .gitignore comment
2025-11-04 14:04:33 +01:00
Songbird
97d8af4c52 fix: add default values to llm_analysis workflow parameters
Resolves validation error where agent_url was None when not explicitly provided. The TemporalManager applies defaults from metadata.yaml, not from module input schemas, so all parameters need defaults in the workflow metadata.

Changes:
- Add default agent_url, llm_model (gpt-5-mini), llm_provider (openai)
- Expand file_patterns to 45 comprehensive patterns covering code, configs, secrets, and Docker files
- Increase default limits: max_files (10), max_file_size (100KB), timeout (90s)
2025-11-04 14:04:33 +01:00
Songbird99
f77c3ff1e9 Feature/litellm proxy (#27)
* feat: seed governance config and responses routing

* Add env-configurable timeout for proxy providers

* Integrate LiteLLM OTEL collector and update docs

* Make .env.litellm optional for LiteLLM proxy

* Add LiteLLM proxy integration with model-agnostic virtual keys

Changes:
- Bootstrap generates 3 virtual keys with individual budgets (CLI: $100, Task-Agent: $25, Cognee: $50)
- Task-agent loads config at runtime via entrypoint script to wait for bootstrap completion
- All keys are model-agnostic by default (no LITELLM_DEFAULT_MODELS restrictions)
- Bootstrap handles database/env mismatch after docker prune by deleting stale aliases
- CLI and Cognee configured to use LiteLLM proxy with virtual keys
- Added comprehensive documentation in volumes/env/README.md

Technical details:
- task-agent entrypoint waits for keys in .env file before starting uvicorn
- Bootstrap creates/updates TASK_AGENT_API_KEY, COGNEE_API_KEY, and OPENAI_API_KEY
- Removed hardcoded API keys from docker-compose.yml
- All services route through http://localhost:10999 proxy

* Fix CLI not loading virtual keys from global .env

Project .env files with empty OPENAI_API_KEY values were overriding
the global virtual keys. Updated _load_env_file_if_exists to only
override with non-empty values.

* Fix agent executor not passing API key to LiteLLM

The agent was initializing LiteLlm without api_key or api_base,
causing authentication errors when using the LiteLLM proxy. Now
reads from OPENAI_API_KEY/LLM_API_KEY and LLM_ENDPOINT environment
variables and passes them to LiteLlm constructor.

* Auto-populate project .env with virtual key from global config

When running 'ff init', the command now checks for a global
volumes/env/.env file and automatically uses the OPENAI_API_KEY
virtual key if found. This ensures projects work with LiteLLM
proxy out of the box without manual key configuration.

* docs: Update README with LiteLLM configuration instructions

Add note about LITELLM_GEMINI_API_KEY configuration and clarify that OPENAI_API_KEY default value should not be changed as it's used for the LLM proxy.

* Refactor workflow parameters to use JSON Schema defaults

Consolidates parameter defaults into JSON Schema format, removing the separate default_parameters field. Adds extract_defaults_from_json_schema() helper to extract defaults from the standard schema structure. Updates LiteLLM proxy config to use LITELLM_OPENAI_API_KEY environment variable.

* Remove .env.example from task_agent

* Fix MDX syntax error in llm-proxy.md

* fix: apply default parameters from metadata.yaml automatically

Fixed TemporalManager.run_workflow() to correctly apply default parameter
values from workflow metadata.yaml files when parameters are not provided
by the caller.

Previous behavior:
- When workflow_params was empty {}, the condition
  `if workflow_params and 'parameters' in metadata` would fail
- Parameters would not be extracted from schema, resulting in workflows
  receiving only target_id with no other parameters

New behavior:
- Removed the `workflow_params and` requirement from the condition
- Now explicitly checks for defaults in parameter spec
- Applies defaults from metadata.yaml automatically when param not provided
- Workflows receive all parameters with proper fallback:
  provided value > metadata default > None

This makes metadata.yaml the single source of truth for parameter defaults,
removing the need for workflows to implement defensive default handling.

Affected workflows:
- llm_secret_detection (was failing with KeyError)
- All other workflows now benefit from automatic default application

Co-authored-by: tduhamel42 <tduhamel@fuzzinglabs.com>
2025-11-04 14:04:10 +01:00
tduhamel42
bd94d19d34 Merge pull request #28 from FuzzingLabs/feature/android-workflow-conversion
feat: Android Static Analysis Workflow with ARM64 Support
2025-10-24 17:22:49 +02:00
tduhamel42
b0a0d591e4 ci: support multi-platform Dockerfiles in worker validation
Updated worker validation script to accept both:
- Single Dockerfile pattern (existing workers)
- Multi-platform Dockerfile pattern (Dockerfile.amd64, Dockerfile.arm64, etc.)

This enables platform-aware worker architectures like the Android worker
which uses different Dockerfiles for x86_64 and ARM64 platforms.
2025-10-24 17:06:00 +02:00
tduhamel42
1fd525f904 fix: resolve linter errors in Android modules
- Remove unused imports from mobsf_scanner.py (asyncio, hashlib, json, Optional)
- Remove unused variables from opengrep_android.py (start_col, end_col)
- Remove duplicate Path import from workflow.py
2025-10-24 17:05:04 +02:00
tduhamel42
73dc26493d docs: update CHANGELOG with Android workflow and ARM64 support
Added [Unreleased] section documenting:
- Android Static Analysis Workflow (Jadx, OpenGrep, MobSF)
- Platform-Aware Worker Architecture with ARM64 support
- Python SAST Workflow
- CI/CD improvements and worker validation
- CLI enhancements
- Bug fixes and technical changes

Fixed date typo: 2025-01-16 → 2025-10-16
2025-10-24 16:52:48 +02:00
tduhamel42
b1a98dbf73 fix: make MobSFScanner import conditional for ARM64 compatibility
- Add try-except block to conditionally import MobSFScanner in modules/android/__init__.py
- Allows Android worker to start on ARM64 without MobSF dependencies (aiohttp)
- MobSF activity gracefully skips on ARM64 with clear warning message
- Remove workflow path detection logic (not needed - workflows receive directories)

Platform-aware architecture fully functional on ARM64:
- CLI detects ARM64 and selects Dockerfile.arm64 automatically
- Worker builds and runs without MobSF on ARM64
- Jadx successfully decompiles APKs (4145 files from BeetleBug.apk)
- OpenGrep finds security vulnerabilities (8 issues found)
- MobSF gracefully skips with warning on ARM64
- Graceful degradation working as designed

Tested with:
  ff workflow run android_static_analysis test_projects/android_test/ \
    --wait --no-interactive apk_path=BeetleBug.apk decompile_apk=true

Results: 8 security findings (1 ERROR, 7 WARNINGS)
2025-10-24 15:14:06 +02:00
tduhamel42
0801ca3d78 feat: add platform-aware worker architecture with ARM64 support
Implement platform-specific Dockerfile selection and graceful tool degradation to support both x86_64 and ARM64 (Apple Silicon) platforms.

**Backend Changes:**
- Add system info API endpoint (/system/info) exposing host filesystem paths
- Add FUZZFORGE_HOST_ROOT environment variable to backend service
- Add graceful degradation in MobSF activity for ARM64 platforms

**CLI Changes:**
- Implement multi-strategy path resolution (backend API, .fuzzforge marker, env var)
- Add platform detection (linux/amd64 vs linux/arm64)
- Add worker metadata.yaml reading for platform capabilities
- Auto-select appropriate Dockerfile based on detected platform
- Pass platform-specific env vars to docker-compose

**Worker Changes:**
- Create workers/android/metadata.yaml defining platform capabilities
- Rename Dockerfile -> Dockerfile.amd64 (full toolchain with MobSF)
- Create Dockerfile.arm64 (excludes MobSF due to Rosetta 2 incompatibility)
- Update docker-compose.yml to use ${ANDROID_DOCKERFILE} variable

**Workflow Changes:**
- Handle MobSF "skipped" status gracefully in workflow
- Log clear warnings when tools are unavailable on platform

**Key Features:**
- Automatic platform detection and Dockerfile selection
- Graceful degradation when tools unavailable (MobSF on ARM64)
- Works from any directory (backend API provides paths)
- Manual override via environment variables
- Clear user feedback about platform and selected Dockerfile

**Benefits:**
- Android workflow now works on Apple Silicon Macs
- No code changes needed for other workflows
- Convention established for future platform-specific workers

Closes: MobSF Rosetta 2 incompatibility issue
Implements: Platform-aware worker architecture (Option B)
2025-10-23 16:43:17 +02:00
tduhamel42
1d3e033bcc fix(android): correct activity names and MobSF API key generation
- Fix activity names in workflow.py (get_target, upload_results, cleanup_cache)
- Fix MobSF API key generation in Dockerfile startup script (cut delimiter)
- Update activity parameter signatures to match actual implementations
- Workflow now executes successfully with Jadx and OpenGrep
2025-10-23 16:36:39 +02:00
tduhamel42
cfcbe91610 feat: Add Android static analysis workflow with Jadx, OpenGrep, and MobSF
Comprehensive Android security testing workflow converted from Prefect to Temporal architecture:

Modules (3):
- JadxDecompiler: APK to Java source code decompilation
- OpenGrepAndroid: Static analysis with Android-specific security rules
- MobSFScanner: Comprehensive mobile security framework integration

Custom Rules (13):
- clipboard-sensitive-data, hardcoded-secrets, insecure-data-storage
- insecure-deeplink, insecure-logging, intent-redirection
- sensitive_data_sharedPreferences, sqlite-injection
- vulnerable-activity, vulnerable-content-provider, vulnerable-service
- webview-javascript-enabled, webview-load-arbitrary-url

Workflow:
- 6-phase Temporal workflow: download → Jadx → OpenGrep → MobSF → SARIF → upload
- 4 activities: decompile_with_jadx, scan_with_opengrep, scan_with_mobsf, generate_android_sarif
- SARIF output combining findings from all security tools

Docker Worker:
- ARM64 Mac compatibility via amd64 platform emulation
- Pre-installed: Android SDK, Jadx 1.4.7, OpenGrep 1.45.0, MobSF 3.9.7
- MobSF runs as background service with API key auto-generation
- Added aiohttp for async HTTP communication

Test APKs:
- BeetleBug.apk and shopnest.apk for workflow validation
2025-10-23 10:25:52 +02:00
tduhamel42
e180431b1e Merge pull request #24 from FuzzingLabs/fix/cleanup-and-bugs
fix: resolve live monitoring bug, remove deprecated parameters, and auto-start Python worker
2025-10-22 17:12:08 +02:00
tduhamel42
6ca5cf36c0 fix: resolve linter errors and optimize CI worker builds
- Remove unused Literal import from backend findings model
- Remove unnecessary f-string prefixes in CLI findings command
- Optimize GitHub Actions to build only modified workers
  - Detect specific worker changes (python, secrets, rust, android, ossfuzz)
  - Build only changed workers instead of all 5
  - Build all workers if docker-compose.yml changes
  - Significantly reduces CI build time
2025-10-22 16:56:51 +02:00
tduhamel42
09951d68d7 fix: resolve live monitoring bug, remove deprecated parameters, and auto-start Python worker
- Fix live monitoring style error by calling _live_monitor() helper directly
- Remove default_parameters duplication from 10 workflow metadata files
- Remove deprecated volume_mode parameter from 26 files across CLI, SDK, backend, and docs
- Configure Python worker to start automatically with docker compose up
- Clean up constants, validation, completion, and example files

Fixes #
- Live monitoring now works correctly with --live flag
- Workflow metadata follows JSON Schema standard
- Cleaner codebase without deprecated volume_mode
- Python worker (most commonly used) starts by default
2025-10-22 16:26:58 +02:00
tduhamel42
1c3c7a801e Merge pull request #23 from FuzzingLabs/feature/python-sast-workflow
feat: Add Python SAST workflow (Issue #5)
2025-10-22 15:55:26 +02:00
tduhamel42
66e797a0e7 fix: Remove unused imports to pass linter 2025-10-22 15:36:35 +02:00
tduhamel42
9468a8b023 feat: Add Python SAST workflow with three security analysis tools
Implements Issue #5 - Python SAST workflow that combines:
- Dependency scanning (pip-audit) for CVE detection
- Security linting (Bandit) for vulnerability patterns
- Type checking (Mypy) for type safety issues

## Changes

**New Modules:**
- `DependencyScanner`: Scans Python dependencies for known CVEs using pip-audit
- `BanditAnalyzer`: Analyzes Python code for security issues using Bandit
- `MypyAnalyzer`: Checks Python code for type safety issues using Mypy

**New Workflow:**
- `python_sast`: Temporal workflow that orchestrates all three SAST tools
  - Runs tools in parallel for fast feedback (3-5 min vs hours for fuzzing)
  - Generates unified SARIF report with findings from all tools
  - Supports configurable severity/confidence thresholds

**Updates:**
- Added SAST dependencies to Python worker (bandit, pip-audit, mypy)
- Updated module __init__.py files to export new analyzers
- Added type_errors.py test file to vulnerable_app for Mypy validation

## Testing

Workflow tested successfully on vulnerable_app:
-  Bandit: Detected 9 security issues (command injection, unsafe functions)
-  Mypy: Detected 5 type errors
-  DependencyScanner: Ran successfully (no CVEs in test dependencies)
-  SARIF export: Generated valid SARIF with 14 total findings
2025-10-22 15:28:19 +02:00
tduhamel42
6e4241a15f fix: properly detect worker file changes in CI
The previous condition used invalid GitHub context field.
Now uses git diff to properly detect changes to workers/ or docker-compose.yml.

Behavior:
- Job always runs the check step
- Detects if workers/ or docker-compose.yml modified
- Only builds Docker images if workers actually changed
- Shows clear skip message when no worker changes detected
2025-10-22 11:51:32 +02:00
tduhamel42
d68344867b fix: add dev branch to test workflow triggers
The test workflow was configured for 'develop' but the actual branch is named 'dev'.
This caused tests not to run on PRs to dev branch.

Now tests will run on:
- PRs to: main, master, dev, develop
- Pushes to: main, master, dev, develop, feature/**
2025-10-22 11:49:06 +02:00
tduhamel42
f5554d0836 Merge pull request #22 from FuzzingLabs/ci/worker-validation-and-docker-builds
ci: add worker validation and Docker build checks
2025-10-22 11:46:58 +02:00
tduhamel42
3e949b2ae8 ci: add worker validation and Docker build checks
Add automated validation to prevent worker-related issues:

**Worker Validation Script:**
- New script: .github/scripts/validate-workers.sh
- Validates all workers in docker-compose.yml exist
- Checks required files: Dockerfile, requirements.txt, worker.py
- Verifies files are tracked by git (not gitignored)
- Detects gitignore issues that could hide workers

**CI Workflow Updates:**
- Added validate-workers job (runs on every PR)
- Added build-workers job (runs if workers/ modified)
- Uses Docker Buildx for caching
- Validates Docker images build successfully
- Updated test-summary to check validation results

**PR Template:**
- New pull request template with comprehensive checklist
- Specific section for worker-related changes
- Reminds contributors to validate worker files
- Includes documentation and changelog reminders

These checks would have caught the secrets worker gitignore issue.

Implements Phase 1 improvements from CI/CD quality assessment.
2025-10-22 11:45:04 +02:00
24 changed files with 98 additions and 1091 deletions

View File

@@ -7,11 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### ✨ Enhancements
- Added Ladybug-backed Cognee integration with optional MinIO/S3 storage. Projects can now set `COGNEE_STORAGE_BACKEND=s3` (plus the `COGNEE_S3_*` settings) to keep knowledge graphs in the shared MinIO bucket seeded by `docker-compose`, enabling multi-tenant ingestion across workers and containers.
- Introduced a dedicated Cognee service (`docker/docker-compose.cognee.yml`) and HTTP client so `fuzzforge ingest` streams data to the shared backend (`COGNEE_SERVICE_URL`) instead of importing Cognee locally. Each project now auto-provisions its own Cognee account/tenant and authenticates via the REST API, keeping datasets isolated even though the service is shared.
- Added an event-driven ingestion pipeline: MinIO publishes `PUT` events from `s3://projects/<project-id>/...` to RabbitMQ, and the new `ingestion-dispatcher` container downloads the file, logs into Cognee as that projects tenant, and invokes `/api/v1/add` + `/api/v1/cognify`. Uploading files (rsync, CI, etc.) now keeps datasets fresh without touching the CLI.
### 📝 Documentation
- Added comprehensive worker startup documentation across all guides
- Added workflow-to-worker mapping tables in README, troubleshooting guide, getting started guide, and docker setup guide

View File

@@ -6,7 +6,7 @@
<p align="center"><strong>AI-powered workflow automation and AI Agents for AppSec, Fuzzing & Offensive Security</strong></p>
<p align="center">
<a href="https://discord.gg/8XEX33UUwZ"><img src="https://img.shields.io/discord/1420767905255133267?logo=discord&label=Discord" alt="Discord"></a>
<a href="https://discord.gg/8XEX33UUwZ/"><img src="https://img.shields.io/discord/1420767905255133267?logo=discord&label=Discord" alt="Discord"></a>
<a href="LICENSE"><img src="https://img.shields.io/badge/license-BSL%20%2B%20Apache-orange" alt="License: BSL + Apache"></a>
<a href="https://www.python.org/downloads/"><img src="https://img.shields.io/badge/python-3.11%2B-blue" alt="Python 3.11+"/></a>
<a href="https://fuzzforge.ai"><img src="https://img.shields.io/badge/Website-fuzzforge.ai-blue" alt="Website"/></a>
@@ -60,8 +60,6 @@ _Setting up and running security workflows through the interface_
- 🤖 **AI Agents for Security** Specialized agents for AppSec, reversing, and fuzzing
- 🛠 **Workflow Automation** Define & execute AppSec workflows as code
- 🧠 **Knowledge Graphs backed by Cognee** Multi-tenant Ladybug graphs stored in MinIO/S3 and reachable as a shared service for every project
-**Event-Driven Ingestion** Upload files to MinIO and let RabbitMQ + the dispatcher stream them into Cognee datasets automatically
- 📈 **Vulnerability Research at Scale** Rediscover 1-days & find 0-days with automation
- 🔗 **Fuzzer Integration** Atheris (Python), cargo-fuzz (Rust), OSS-Fuzz campaigns
- 🌐 **Community Marketplace** Share workflows, corpora, PoCs, and modules
@@ -159,9 +157,6 @@ cp volumes/env/.env.template volumes/env/.env
# 3. Start FuzzForge with Temporal
docker compose up -d
# 3b. Start the shared Cognee service (Ladybug + MinIO)
docker compose -f docker/docker-compose.cognee.yml up -d
# 4. Start the Python worker (needed for security_assessment workflow)
docker compose up -d worker-python
```
@@ -191,28 +186,6 @@ ff workflow run security_assessment . # Start workflow - CLI uploads files au
# - Create a compressed tarball
# - Upload to backend (via MinIO)
# - Start the workflow on vertical worker
### Automated Cognee Ingestion
Uploading files into MinIO automatically streams them into Cognee:
```
s3://projects/<project-id>/
files/... # → <project-id>_codebase dataset
findings/... # → <project-id>_findings dataset
docs/... # → <project-id>_docs dataset
```
MinIO emits the object-created event to RabbitMQ, the `ingestion-dispatcher` downloads the file, and it calls the Cognee REST API on behalf of that project's tenant. Use any upload mechanism you like (`aws s3 cp`, rsync to MinIO, etc.); once the object lands in the bucket it is ingested and cognified automatically.
#### Monitoring / Debugging
- RabbitMQ Management UI: `http://localhost:15672` (user `ingest`, password `ingest`). The `cognee-ingest` exchange fan-outs events into the `cognee-ingestion-dispatcher` queue; a growing `Ready` count means the dispatcher is down or congested.
- Dispatcher logs: `docker logs -f fuzzforge-ingestion-dispatcher` immediately show each object as `Processing <project>/<category>/<file> -> dataset ...` followed by `/auth/login`, `/add`, and `/cognify` 200s.
- Cognee service logs: `docker logs -f fuzzforge-cognee` display the full pipeline (`ingest_data`, `extract_graph_from_data`, etc.) for each dataset run.
- Bucket verification: `docker run --rm --network=fuzzforge_temporal_network -e AWS_ACCESS_KEY_ID=fuzzforge -e AWS_SECRET_ACCESS_KEY=fuzzforge123 amazon/aws-cli s3 ls --recursive --endpoint-url http://minio:9000 s3://projects/<project-id>` confirms files, Ladybug DBs (`graph/…`) and LanceDB indexes (`vector/…`).
If you still upload into the legacy `s3://cognee/...` hierarchy, copy the object over (or update `COGNEE_S3_BUCKET`) so MinIO emits the event for the watched bucket.
```
**What's running:**

View File

@@ -26,50 +26,27 @@ class RemoteAgentConnection:
"""Initialize connection to a remote agent"""
self.url = url.rstrip('/')
self.agent_card = None
self.client = httpx.AsyncClient(timeout=120.0, follow_redirects=True)
self.client = httpx.AsyncClient(timeout=120.0)
self.context_id = None
async def get_agent_card(self) -> Optional[Dict[str, Any]]:
"""Get the agent card from the remote agent"""
# If URL already points to a .json file, fetch it directly
if self.url.endswith('.json'):
try:
# Try new path first (A2A 0.3.0+)
response = await self.client.get(f"{self.url}/.well-known/agent-card.json")
response.raise_for_status()
self.agent_card = response.json()
return self.agent_card
except Exception:
# Try old path for compatibility
try:
response = await self.client.get(self.url)
response = await self.client.get(f"{self.url}/.well-known/agent.json")
response.raise_for_status()
self.agent_card = response.json()
# Use canonical URL from agent card if provided
if isinstance(self.agent_card, dict) and "url" in self.agent_card:
self.url = self.agent_card["url"].rstrip('/')
return self.agent_card
except Exception as e:
print(f"Failed to get agent card from {self.url}: {e}")
return None
# Try both agent-card.json (A2A 0.3.0+) and agent.json (legacy)
well_known_paths = [
"/.well-known/agent-card.json",
"/.well-known/agent.json",
]
for path in well_known_paths:
try:
response = await self.client.get(f"{self.url}{path}")
response.raise_for_status()
self.agent_card = response.json()
# Use canonical URL from agent card if provided
if isinstance(self.agent_card, dict) and "url" in self.agent_card:
self.url = self.agent_card["url"].rstrip('/')
return self.agent_card
except Exception:
continue
print(f"Failed to get agent card from {self.url}")
print("Tip: If agent is at /a2a/something, use full URL: /register http://host:port/a2a/something")
return None
async def send_message(self, message: str | Dict[str, Any] | List[Dict[str, Any]]) -> str:
"""Send a message to the remote agent using A2A protocol"""
@@ -116,7 +93,7 @@ class RemoteAgentConnection:
payload["params"]["contextId"] = self.context_id
# Send to root endpoint per A2A protocol
response = await self.client.post(self.url, json=payload)
response = await self.client.post(f"{self.url}/", json=payload)
response.raise_for_status()
result = response.json()

View File

@@ -78,7 +78,7 @@ fuzzforge workflows list
fuzzforge workflows info security_assessment
# Submit a workflow for analysis
fuzzforge workflow run security_assessment /path/to/your/code
fuzzforge workflow security_assessment /path/to/your/code
# View findings when complete
@@ -150,24 +150,24 @@ fuzzforge workflows parameters security_assessment --no-interactive
### Workflow Execution
#### `fuzzforge workflow run <workflow> <target-path>`
#### `fuzzforge workflow <workflow> <target-path>`
Execute a security testing workflow with **automatic file upload**.
```bash
# Basic execution - CLI automatically detects local files and uploads them
fuzzforge workflow run security_assessment /path/to/code
fuzzforge workflow security_assessment /path/to/code
# With parameters
fuzzforge workflow run security_assessment /path/to/binary \
fuzzforge workflow security_assessment /path/to/binary \
--param timeout=3600 \
--param iterations=10000
# With parameter file
fuzzforge workflow run security_assessment /path/to/code \
fuzzforge workflow security_assessment /path/to/code \
--param-file my-params.json
# Wait for completion
fuzzforge workflow run security_assessment /path/to/code --wait
fuzzforge workflow security_assessment /path/to/code --wait
```
**Automatic File Upload Behavior:**

View File

@@ -1,159 +0,0 @@
"""HTTP client for the Cognee REST API."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Iterable, Sequence
import httpx
class CogneeApiError(RuntimeError):
"""Raised when the Cognee API returns an error status."""
def __init__(self, message: str, *, status_code: int | None = None):
super().__init__(message)
self.status_code = status_code
class CogneeApiClient:
"""Async client for interacting with the Cognee service."""
def __init__(
self,
base_url: str,
api_key: str | None = None,
*,
email: str | None = None,
password: str | None = None,
timeout: float = 180.0,
):
base = base_url.rstrip("/")
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
self._client = httpx.AsyncClient(
base_url=base,
timeout=httpx.Timeout(timeout),
follow_redirects=True,
headers=headers,
)
self._email = email
self._password = password
self._token: str | None = None
async def __aenter__(self) -> "CogneeApiClient":
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
await self.close()
async def close(self) -> None:
await self._client.aclose()
async def ensure_authenticated(self) -> None:
"""Ensure we have a bearer token before making privileged calls."""
if self._client.headers.get("Authorization") or self._token:
return
if not (self._email and self._password):
# Service might be running with authentication disabled.
return
try:
await self.register_user(self._email, self._password)
except CogneeApiError as exc:
if exc.status_code not in (400, 409):
raise
token = await self.login(self._email, self._password)
self._token = token
self._client.headers["Authorization"] = f"Bearer {token}"
async def register_user(self, email: str, password: str) -> Any:
payload = {
"email": email,
"password": password,
"is_active": True,
"is_verified": True,
}
response = await self._client.post("/api/v1/auth/register", json=payload)
return self._handle_response(response)
async def login(self, email: str, password: str) -> str:
data = {"username": email, "password": password}
response = await self._client.post("/api/v1/auth/login", data=data)
payload = self._handle_response(response)
token = payload.get("access_token")
if not token:
raise CogneeApiError("Cognee auth response did not include an access_token")
return token
async def add_files(self, file_paths: Iterable[Path], dataset_name: str) -> Any:
await self.ensure_authenticated()
files: list[tuple[str, tuple[str, bytes, str]]] = []
for path in file_paths:
data = path.read_bytes()
files.append(("data", (path.name, data, "application/octet-stream")))
data = {"datasetName": dataset_name}
response = await self._client.post("/api/v1/add", data=data, files=files)
return self._handle_response(response)
async def add_texts(self, texts: Sequence[str], dataset_name: str) -> Any:
await self.ensure_authenticated()
files: list[tuple[str, tuple[str, bytes, str]]] = []
for idx, text in enumerate(texts):
data = text.encode("utf-8")
files.append(("data", (f"snippet_{idx}.txt", data, "text/plain")))
response = await self._client.post(
"/api/v1/add",
data={"datasetName": dataset_name},
files=files,
)
return self._handle_response(response)
async def cognify(self, datasets: Sequence[str]) -> Any:
await self.ensure_authenticated()
payload = {"datasets": list(datasets), "run_in_background": False}
response = await self._client.post("/api/v1/cognify", json=payload)
return self._handle_response(response)
async def search(
self,
*,
query: str,
search_type: str,
datasets: Sequence[str] | None = None,
top_k: int | None = None,
only_context: bool = False,
) -> Any:
await self.ensure_authenticated()
payload: dict[str, object] = {
"query": query,
"search_type": search_type,
"only_context": only_context,
}
if datasets:
payload["datasets"] = list(datasets)
if top_k is not None:
payload["top_k"] = top_k
response = await self._client.post("/api/v1/search", json=payload)
return self._handle_response(response)
def _handle_response(self, response: httpx.Response) -> Any:
try:
response.raise_for_status()
except httpx.HTTPStatusError as exc: # pragma: no cover - surfaced to caller
message = exc.response.text
raise CogneeApiError(
f"Cognee API request failed ({exc.response.status_code}): {message}",
status_code=exc.response.status_code,
) from exc
if response.content:
return response.json()
return {}

View File

@@ -23,7 +23,6 @@ from rich.console import Console
from rich.prompt import Confirm
from ..config import ProjectConfigManager
from ..cognee_api import CogneeApiClient, CogneeApiError
from ..ingest_utils import collect_ingest_files
console = Console()
@@ -93,18 +92,23 @@ def ingest_callback(
config.setup_cognee_environment()
if os.getenv("FUZZFORGE_DEBUG", "0") == "1":
storage_backend = os.getenv("COGNEE_STORAGE_BACKEND", "local")
console.print(
"[dim]Cognee directories:\n"
f" DATA: {os.getenv('COGNEE_DATA_ROOT', 'unset')}\n"
f" SYSTEM: {os.getenv('COGNEE_SYSTEM_ROOT', 'unset')}\n"
f" USER: {os.getenv('COGNEE_USER_ID', 'unset')}\n"
f" STORAGE: {storage_backend}\n",
f" USER: {os.getenv('COGNEE_USER_ID', 'unset')}\n",
)
project_context = config.get_project_context()
target_path = path or Path.cwd()
dataset_name = dataset or f"{project_context['project_id']}_codebase"
dataset_name = dataset or f"{project_context['project_name']}_codebase"
try:
import cognee # noqa: F401 # Just to validate installation
except ImportError as exc:
console.print("[red]Cognee is not installed.[/red]")
console.print("Install with: pip install 'cognee[all]' litellm")
raise typer.Exit(1) from exc
console.print(f"[bold]🔍 Ingesting {target_path} into Cognee knowledge graph[/bold]")
console.print(
@@ -151,21 +155,10 @@ async def _run_ingestion(
force: bool,
) -> None:
"""Perform the actual ingestion work."""
cognee_cfg = config.get_cognee_config()
service_url = (
cognee_cfg.get("service_url")
or os.getenv("COGNEE_SERVICE_URL")
or "http://localhost:18000"
)
service_email = os.getenv("COGNEE_SERVICE_EMAIL") or cognee_cfg.get("service_email")
service_password = os.getenv("COGNEE_SERVICE_PASSWORD") or cognee_cfg.get("service_password")
from fuzzforge_ai.cognee_service import CogneeService
if not service_email or not service_password:
console.print(
"[red]Missing Cognee service credentials.[/red] Run `ff init` again or set "
"COGNEE_SERVICE_EMAIL / COGNEE_SERVICE_PASSWORD in .fuzzforge/.env."
)
return
cognee_service = CogneeService(config)
await cognee_service.initialize()
# Always skip internal bookkeeping directories
exclude_patterns = list(exclude or [])
@@ -199,9 +192,11 @@ async def _run_ingestion(
console.print(f"Found [green]{len(files_to_ingest)}[/green] files to ingest")
if force:
console.print(
"[yellow]Warning:[/yellow] Force re-ingest is not yet supported for the remote Cognee service."
)
console.print("Cleaning existing data for this project...")
try:
await cognee_service.clear_data(confirm=True)
except Exception as exc:
console.print(f"[yellow]Warning:[/yellow] Could not clean existing data: {exc}")
console.print("Adding files to Cognee...")
valid_file_paths = []
@@ -218,62 +213,39 @@ async def _run_ingestion(
console.print("[yellow]No readable files found to ingest[/yellow]")
return
async with CogneeApiClient(
service_url,
email=service_email,
password=service_password,
) as client:
try:
await client.ensure_authenticated()
except CogneeApiError as exc:
console.print(f"[red]Cognee authentication failed:[/red] {exc}")
return
except Exception as exc:
console.print(f"[red]Cognee authentication error:[/red] {exc}")
return
try:
await client.add_files(valid_file_paths, dataset)
await client.cognify([dataset])
except CogneeApiError as exc:
console.print(f"[red]Cognee API error:[/red] {exc}")
return
except Exception as exc:
console.print(f"[red]Unexpected Cognee error:[/red] {exc}")
return
results = await cognee_service.ingest_files(valid_file_paths, dataset)
console.print(
f"[green]✅ Successfully ingested {results['success']} files into knowledge graph[/green]"
)
if results["failed"]:
console.print(
f"[green]✅ Successfully ingested {len(valid_file_paths)} files into knowledge graph[/green]"
f"[yellow]⚠️ Skipped {results['failed']} files due to errors[/yellow]"
)
try:
insights = await client.search(
query=f"What insights can you provide about the {dataset} dataset?",
search_type="INSIGHTS",
datasets=[dataset],
)
insight_list = insights if isinstance(insights, list) else insights.get("results", [])
if insight_list:
console.print(f"\n[bold]📊 Generated {len(insight_list)} insights:[/bold]")
for index, insight in enumerate(insight_list[:3], 1):
console.print(f" {index}. {insight}")
if len(insight_list) > 3:
console.print(f" ... and {len(insight_list) - 3} more")
try:
insights = await cognee_service.search_insights(
query=f"What insights can you provide about the {dataset} dataset?",
dataset=dataset,
)
if insights:
console.print(f"\n[bold]📊 Generated {len(insights)} insights:[/bold]")
for index, insight in enumerate(insights[:3], 1):
console.print(f" {index}. {insight}")
if len(insights) > 3:
console.print(f" ... and {len(insights) - 3} more")
chunks = await client.search(
query=f"functions classes methods in {dataset}",
search_type="CHUNKS",
datasets=[dataset],
top_k=5,
chunks = await cognee_service.search_chunks(
query=f"functions classes methods in {dataset}",
dataset=dataset,
)
if chunks:
console.print(
f"\n[bold]🔍 Sample searchable content ({len(chunks)} chunks found):[/bold]"
)
chunk_list = chunks if isinstance(chunks, list) else chunks.get("results", [])
if chunk_list:
console.print(
f"\n[bold]🔍 Sample searchable content ({len(chunk_list)} chunks found):[/bold]"
)
for index, chunk in enumerate(chunk_list[:2], 1):
text = str(chunk)
preview = text[:100] + "..." if len(text) > 100 else text
console.print(f" {index}. {preview}")
except Exception:
pass
for index, chunk in enumerate(chunks[:2], 1):
preview = chunk[:100] + "..." if len(chunk) > 100 else chunk
console.print(f" {index}. {preview}")
except Exception:
# Best-effort stats — ignore failures here
pass

View File

@@ -141,7 +141,7 @@ FuzzForge security testing project.
fuzzforge workflows
# Submit a workflow for analysis
fuzzforge workflow run <workflow-name> /path/to/target
fuzzforge workflow <workflow-name> /path/to/target
# View findings
fuzzforge finding <run-id>
@@ -246,17 +246,6 @@ def _ensure_env_file(fuzzforge_dir: Path, force: bool) -> None:
"LLM_COGNEE_EMBEDDING_MODEL=litellm_proxy/text-embedding-3-large",
"LLM_COGNEE_EMBEDDING_ENDPOINT=http://localhost:10999",
"COGNEE_MCP_URL=",
"COGNEE_SERVICE_URL=http://localhost:18000",
"COGNEE_STORAGE_BACKEND=s3",
"COGNEE_SERVICE_EMAIL=",
"COGNEE_SERVICE_PASSWORD=",
"COGNEE_S3_BUCKET=cognee",
"COGNEE_S3_PREFIX=",
"COGNEE_S3_ENDPOINT=http://localhost:9000",
"COGNEE_S3_REGION=us-east-1",
"COGNEE_S3_ACCESS_KEY=",
"COGNEE_S3_SECRET_KEY=",
"COGNEE_S3_ALLOW_HTTP=1",
"",
"# Session persistence options: inmemory | sqlite",
"SESSION_PERSISTENCE=sqlite",

View File

@@ -428,7 +428,7 @@ def execute_workflow(
# Suggest --live for fuzzing workflows
if not live and not wait and "fuzzing" in workflow.lower():
console.print(f"💡 Next time try: [bold cyan]fuzzforge workflow run {workflow} {target_path} --live[/bold cyan] for real-time monitoring", style="dim")
console.print(f"💡 Next time try: [bold cyan]fuzzforge workflow {workflow} {target_path} --live[/bold cyan] for real-time monitoring", style="dim")
# Start live monitoring if requested
if live:

View File

@@ -21,7 +21,7 @@ from __future__ import annotations
import hashlib
import os
from pathlib import Path
from typing import Any, Dict, Optional, Literal
from typing import Any, Dict, Optional
try: # Optional dependency; fall back if not installed
from dotenv import load_dotenv
@@ -131,19 +131,7 @@ class CogneeConfig(BaseModel):
"""Cognee integration metadata."""
enabled: bool = True
graph_database_provider: str = "ladybug"
service_url: str = "http://localhost:18000"
api_key: Optional[str] = None
service_email: Optional[str] = None
service_password: Optional[str] = None
storage_backend: Literal["local", "s3"] = "s3"
s3_bucket: Optional[str] = None
s3_prefix: Optional[str] = None
s3_endpoint_url: Optional[str] = None
s3_region: Optional[str] = None
s3_access_key: Optional[str] = None
s3_secret_key: Optional[str] = None
s3_allow_http: bool = False
graph_database_provider: str = "kuzu"
data_directory: Optional[str] = None
system_directory: Optional[str] = None
backend_access_control: bool = True
@@ -213,63 +201,25 @@ class FuzzForgeConfig(BaseModel):
cognee.tenant_id = self.project.tenant_id
changed = True
if not cognee.service_url:
cognee.service_url = "http://localhost:18000"
changed = True
base_dir = project_dir / ".fuzzforge" / "cognee" / f"project_{self.project.id}"
data_dir = base_dir / "data"
system_dir = base_dir / "system"
if not cognee.s3_bucket:
cognee.s3_bucket = "projects"
changed = True
if cognee.s3_prefix is None:
cognee.s3_prefix = ""
changed = True
default_email = f"project_{self.project.id}@fuzzforge.dev"
if not cognee.service_email or cognee.service_email.endswith(
("@cognee.local", "@cognee.localhost")
for path in (
base_dir,
data_dir,
system_dir,
system_dir / "kuzu_db",
system_dir / "lancedb",
):
cognee.service_email = default_email
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
if cognee.data_directory != str(data_dir):
cognee.data_directory = str(data_dir)
changed = True
derived_password = hashlib.sha256(self.project.id.encode()).hexdigest()[:20]
if not cognee.service_password or len(cognee.service_password) < 12:
cognee.service_password = derived_password
changed = True
if cognee.storage_backend.lower() == "s3":
bucket = cognee.s3_bucket or "projects"
prefix = (cognee.s3_prefix or "").strip("/")
path_parts = [f"s3://{bucket}"]
if prefix:
path_parts.append(prefix)
path_parts.append(self.project.id)
base_uri = "/".join(path_parts)
data_dir = f"{base_uri}/files"
system_dir = f"{base_uri}/graph"
else:
base_dir = project_dir / ".fuzzforge" / "cognee" / f"project_{self.project.id}"
data_path = base_dir / "data"
system_path = base_dir / "system"
for path in (
base_dir,
data_path,
system_path,
system_path / "ladybug",
system_path / "lancedb",
):
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
data_dir = str(data_path)
system_dir = str(system_path)
if cognee.data_directory != data_dir:
cognee.data_directory = data_dir
changed = True
if cognee.system_directory != system_dir:
cognee.system_directory = system_dir
if cognee.system_directory != str(system_dir):
cognee.system_directory = str(system_dir)
changed = True
return changed
@@ -418,67 +368,16 @@ class ProjectConfigManager:
backend_access = "true" if cognee.get("backend_access_control", True) else "false"
os.environ["ENABLE_BACKEND_ACCESS_CONTROL"] = backend_access
graph_provider = cognee.get("graph_database_provider", "ladybug")
os.environ["GRAPH_DATABASE_PROVIDER"] = graph_provider
service_url = cognee.get("service_url") or os.getenv("COGNEE_SERVICE_URL") or "http://localhost:18000"
os.environ["COGNEE_SERVICE_URL"] = service_url
api_key = os.getenv("COGNEE_API_KEY") or cognee.get("api_key")
if api_key:
os.environ["COGNEE_API_KEY"] = api_key
service_email = os.getenv("COGNEE_SERVICE_EMAIL") or cognee.get("service_email")
if service_email:
os.environ["COGNEE_SERVICE_EMAIL"] = service_email
service_password = os.getenv("COGNEE_SERVICE_PASSWORD") or cognee.get("service_password")
if service_password:
os.environ["COGNEE_SERVICE_PASSWORD"] = service_password
os.environ["GRAPH_DATABASE_PROVIDER"] = cognee.get("graph_database_provider", "kuzu")
data_dir = cognee.get("data_directory")
system_dir = cognee.get("system_directory")
tenant_id = cognee.get("tenant_id", "fuzzforge_tenant")
storage_backend = cognee.get("storage_backend", "local").lower()
os.environ["COGNEE_STORAGE_BACKEND"] = storage_backend
if storage_backend == "s3":
os.environ["STORAGE_BACKEND"] = "s3"
bucket = os.getenv("COGNEE_S3_BUCKET") or cognee.get("s3_bucket") or "cognee"
os.environ["STORAGE_BUCKET_NAME"] = bucket
os.environ["COGNEE_S3_BUCKET"] = bucket
prefix_override = os.getenv("COGNEE_S3_PREFIX") or cognee.get("s3_prefix")
if prefix_override:
os.environ["COGNEE_S3_PREFIX"] = prefix_override
endpoint = os.getenv("COGNEE_S3_ENDPOINT") or cognee.get("s3_endpoint_url") or "http://localhost:9000"
os.environ["AWS_ENDPOINT_URL"] = endpoint
os.environ["COGNEE_S3_ENDPOINT"] = endpoint
region = os.getenv("COGNEE_S3_REGION") or cognee.get("s3_region") or "us-east-1"
os.environ["AWS_REGION"] = region
os.environ["COGNEE_S3_REGION"] = region
access_key = os.getenv("COGNEE_S3_ACCESS_KEY") or cognee.get("s3_access_key")
secret_key = os.getenv("COGNEE_S3_SECRET_KEY") or cognee.get("s3_secret_key")
if access_key:
os.environ.setdefault("AWS_ACCESS_KEY_ID", access_key)
os.environ["COGNEE_S3_ACCESS_KEY"] = access_key
if secret_key:
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", secret_key)
os.environ["COGNEE_S3_SECRET_KEY"] = secret_key
allow_http_env = os.getenv("COGNEE_S3_ALLOW_HTTP")
allow_http_flag = allow_http_env if allow_http_env is not None else ("true" if cognee.get("s3_allow_http") else "false")
if allow_http_flag.lower() in {"1", "true", "yes"}:
os.environ["AWS_ALLOW_HTTP"] = "true"
os.environ["COGNEE_S3_ALLOW_HTTP"] = "1"
if data_dir:
os.environ["COGNEE_DATA_ROOT"] = data_dir
os.environ.setdefault("DATA_ROOT_DIRECTORY", data_dir)
if system_dir:
os.environ["COGNEE_SYSTEM_ROOT"] = system_dir
os.environ.setdefault("SYSTEM_ROOT_DIRECTORY", system_dir)
os.environ["COGNEE_USER_ID"] = tenant_id
os.environ["COGNEE_TENANT_ID"] = tenant_id

View File

@@ -251,7 +251,7 @@ def workflow_main():
Execute workflows and manage workflow executions
Examples:
fuzzforge workflow run security_assessment ./target # Execute workflow
fuzzforge workflow security_assessment ./target # Execute workflow
fuzzforge workflow status # Check latest status
fuzzforge workflow history # Show execution history
"""

View File

@@ -97,12 +97,6 @@ services:
MINIO_ROOT_PASSWORD: fuzzforge123
# Lightweight mode for development (reduces memory to 256MB)
MINIO_CI_CD: "true"
MINIO_NOTIFY_AMQP_ENABLE_INGEST: "on"
MINIO_NOTIFY_AMQP_URL_INGEST: amqp://ingest:ingest@rabbitmq:5672
MINIO_NOTIFY_AMQP_EXCHANGE_INGEST: cognee-ingest
MINIO_NOTIFY_AMQP_EXCHANGE_TYPE_INGEST: fanout
MINIO_NOTIFY_AMQP_ROUTING_KEY_INGEST: ""
MINIO_NOTIFY_AMQP_DELIVERY_MODE_INGEST: "1"
volumes:
- minio_data:/data
networks:
@@ -125,7 +119,6 @@ services:
condition: service_healthy
entrypoint: >
/bin/sh -c "
set -e;
echo 'Waiting for MinIO to be ready...';
sleep 2;
@@ -136,18 +129,11 @@ services:
mc mb fuzzforge/targets --ignore-existing;
mc mb fuzzforge/results --ignore-existing;
mc mb fuzzforge/cache --ignore-existing;
mc mb fuzzforge/cognee --ignore-existing;
mc mb fuzzforge/projects --ignore-existing;
echo 'Configuring project ingestion events...';
mc event remove fuzzforge/projects --force || true;
mc event add fuzzforge/projects arn:minio:sqs::ingest:amqp --event put -p;
echo 'Setting lifecycle policies...';
mc ilm add fuzzforge/targets --expiry-days 7;
mc ilm add fuzzforge/results --expiry-days 30;
mc ilm add fuzzforge/cache --expiry-days 3;
mc ilm add fuzzforge/cognee --expiry-days 90;
echo 'Setting access policies...';
mc anonymous set download fuzzforge/results;
@@ -158,24 +144,6 @@ services:
networks:
- fuzzforge-network
rabbitmq:
image: rabbitmq:3.13-management
container_name: fuzzforge-rabbitmq
environment:
RABBITMQ_DEFAULT_USER: ingest
RABBITMQ_DEFAULT_PASS: ingest
ports:
- "5672:5672"
- "15672:15672"
networks:
- fuzzforge-network
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
# ============================================================================
# LLM Proxy - LiteLLM Gateway
# ============================================================================
@@ -573,39 +541,11 @@ services:
networks:
- fuzzforge-network
restart: unless-stopped
ingestion-dispatcher:
build:
context: ./services/ingestion_dispatcher
dockerfile: Dockerfile
container_name: fuzzforge-ingestion-dispatcher
depends_on:
rabbitmq:
condition: service_healthy
minio:
condition: service_healthy
environment:
RABBITMQ_URL: amqp://ingest:ingest@rabbitmq:5672/
RABBITMQ_EXCHANGE: cognee-ingest
RABBITMQ_QUEUE: cognee-ingestion-dispatcher
S3_ENDPOINT: http://minio:9000
S3_REGION: us-east-1
S3_BUCKET: projects
S3_ACCESS_KEY: fuzzforge
S3_SECRET_KEY: fuzzforge123
COGNEE_SERVICE_URL: http://fuzzforge-cognee:8000
DATASET_CATEGORY_MAP: files:codebase,findings:findings,docs:docs
EMAIL_DOMAIN: fuzzforge.dev
LOG_LEVEL: INFO
networks:
- fuzzforge-network
healthcheck:
test: ["CMD", "python", "healthcheck.py"]
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 5s
timeout: 10s
retries: 3
start_period: 10s
restart: unless-stopped
# ============================================================================
# Task Agent - A2A LiteLLM Agent

View File

@@ -1,48 +0,0 @@
version: "3.9"
services:
cognee:
build:
context: ../../cognee
dockerfile: Dockerfile
container_name: fuzzforge-cognee
env_file:
- ../volumes/env/.env
environment:
GRAPH_DATABASE_PROVIDER: ladybug
ENABLE_BACKEND_ACCESS_CONTROL: "true"
STORAGE_BACKEND: s3
STORAGE_BUCKET_NAME: ${COGNEE_S3_BUCKET:-projects}
DB_PROVIDER: sqlite
DB_PATH: /data/relational
DB_NAME: cognee.db
MIGRATION_DB_PROVIDER: sqlite
MIGRATION_DB_PATH: /data/relational
MIGRATION_DB_NAME: cognee.db
AWS_ENDPOINT_URL: ${COGNEE_S3_ENDPOINT:-http://minio:9000}
AWS_REGION: ${COGNEE_S3_REGION:-us-east-1}
AWS_ACCESS_KEY_ID: ${COGNEE_S3_ACCESS_KEY:-fuzzforge}
AWS_SECRET_ACCESS_KEY: ${COGNEE_S3_SECRET_KEY:-fuzzforge123}
AWS_ALLOW_HTTP: ${COGNEE_S3_ALLOW_HTTP:-1}
LITELLM_PROXY_API_BASE: http://llm-proxy:4000
OPENAI_API_BASE: http://llm-proxy:4000
LLM_ENDPOINT: http://llm-proxy:4000
LOG_LEVEL: INFO
ENVIRONMENT: dev
COGNEE_TEMP_DIR: /tmp/cognee
# Processed text files must be persisted so downstream ingestion doesn't 404.
COGNEE_SKIP_PROCESSED_FILE_STORAGE: "false"
# Store normalized Cognee artifacts outside of ingestion prefixes
COGNEE_PROCESSED_SUBDIR: cognee_artifacts
ports:
- "18000:8000"
networks:
- fuzzforge-network
volumes:
- ../volumes/cognee-db:/data/relational
restart: unless-stopped
networks:
fuzzforge-network:
external: true
name: fuzzforge_temporal_network

View File

@@ -10,9 +10,9 @@ fuzzforge ai server
Run the command from a project directory that already contains `.fuzzforge/`. The server reads the project configuration and reuses the same environment variables as the CLI shell.
**Default directories / services**
**Default directories**
- Logs: `.fuzzforge/logs/cognee.log`
- Cognee datasets: hosted by the shared Cognee service (`COGNEE_SERVICE_URL`) inside the configured MinIO/S3 bucket. Local mode falls back to `.fuzzforge/cognee/project_<id>/{data,system}`. Uploads dropped into `s3://projects/<project-id>/...` are ingested automatically via RabbitMQ + the dispatcher.
- Cognee datasets: `.fuzzforge/cognee/project_<id>/{data,system}`
- Artifact cache: `.fuzzforge/artifacts`
## HTTP Endpoints

View File

@@ -73,8 +73,7 @@ sequenceDiagram
- **Remote agent registry** (`ai/src/fuzzforge_ai/remote_agent.py`) holds metadata for downstream agents and handles capability discovery over HTTP. Auto-registration is configured by `ConfigManager` so known agents attach on startup.
- **Memory services**:
- `FuzzForgeMemoryService` and `HybridMemoryManager` (`ai/src/fuzzforge_ai/memory_service.py`) provide conversation recall and bridge to Cognee datasets when configured.
- Cognee bootstrap (`ai/src/fuzzforge_ai/cognee_service.py`) ensures ingestion and knowledge queries stay scoped to the current project and forwards them to the shared Cognee service (`COGNEE_SERVICE_URL`). Datasets live inside the configured MinIO/S3 bucket, with `.fuzzforge/cognee/` available only when `COGNEE_STORAGE_BACKEND=local`.
- MinIO bucket notifications push object-created events into RabbitMQ. The `ingestion-dispatcher` container listens on `cognee-ingest`, downloads the object, and invokes Cognees REST API on behalf of the projects tenant so uploads become datasets without a manual CLI hop.
- Cognee bootstrap (`ai/src/fuzzforge_ai/cognee_service.py`) ensures ingestion and knowledge queries stay scoped to the current project.
## Workflow Automation
@@ -92,7 +91,7 @@ The CLI surface mirrors these helpers as natural-language prompts (`You> run fuz
## Knowledge & Ingestion
- The `fuzzforge ingest` and `fuzzforge rag ingest` commands call into `ai/src/fuzzforge_ai/ingest_utils.py`, which filters file types, ignores caches, and streams files to the Cognee service where they are stored under `s3://projects/<project-id>/`. When files land directly in `s3://projects/<project-id>/<category>/...`, the dispatcher performs the same workflow automatically via RabbitMQ events.
- The `fuzzforge ingest` and `fuzzforge rag ingest` commands call into `ai/src/fuzzforge_ai/ingest_utils.py`, which filters file types, ignores caches, and populates Cognee datasets under `.fuzzforge/cognee/project_<id>/`.
- Runtime queries hit `query_project_knowledge_api` on the executor, which defers to `cognee_service` for dataset lookup and semantic search. When Cognee credentials are absent the tools return a friendly "not configured" response.
## Artifact Pipeline
@@ -141,15 +140,7 @@ graph LR
- **Session persistence** is controlled by `SESSION_PERSISTENCE`. When set to `sqlite`, ADKs `DatabaseSessionService` writes transcripts to the path configured by `SESSION_DB_PATH` (defaults to `./fuzzforge_sessions.db`). With `inmemory`, the context is scoped to the current process.
- **Semantic recall** stores vector embeddings so `/recall` queries can surface earlier prompts, even after restarts when using SQLite.
- **Hybrid memory manager** (`HybridMemoryManager`) stitches Cognee results into the ADK session. When a knowledge query hits Cognee, the relevant nodes are appended back into the session context so follow-up prompts can reference them naturally.
- **Cognee datasets** are unique per project. Ingestion runs populate `<project>_codebase` while custom calls to `ingest_to_dataset` let you maintain dedicated buckets (e.g., `insights`). Data is persisted inside the Cognee services bucket/prefix and is shared across CLI, HTTP server, and MCP integrations.
- **Cognee datasets** are unique per project. Ingestion runs populate `<project>_codebase` while custom calls to `ingest_to_dataset` let you maintain dedicated buckets (e.g., `insights`). Data is persisted inside `.fuzzforge/cognee/project_<id>/` and shared across CLI and A2A modes.
- **Task metadata** (workflow runs, artifact descriptors) lives in the executors in-memory caches but is also mirrored through A2A task events so remote agents can resubscribe if the CLI restarts.
- **Operational check**: Run `/recall <keyword>` or `You> search project knowledge for "topic" using INSIGHTS` after ingestion to confirm both ADK session recall and Cognee graph access are active.
- **CLI quick check**: `/memory status` summarises the current memory type, session persistence, and Cognee dataset directories from inside the agent shell.
## Reliability Considerations
- **Per-dataset storage**: every dataset (`<project>_codebase`, `<project>_docs`, `<project>_findings`) owns its own graph database (`projects/<project>/graph/<uuid>.db`) and LanceDB vector store, so a failed cognify or rebuild never stomps other categories. The dispatcher derives the dataset name from the MinIO key and Cognee isolates the backing files accordingly.
- **Notification scope**: MinIO publishes a single AMQP notification for the `projects` bucket and the dispatcher filters on `files/`, `docs/`, and `findings/`. That keeps the pipeline simple (one queue) while still ignoring Cognees own artifacts or temporary uploads.
- **Processed-file hygiene**: Cognee normalises uploads into deterministic `text_<hash>.txt` files under `COGNEE_PROCESSED_SUBDIR` (defaults to `cognee_artifacts`). The dispatchers `_cleanup_cognee_artifacts` helper and the standalone `services/ingestion_dispatcher/cleanup_cognee_artifacts.py` script delete any `tmp*` or `text_*` objects that slip into the ingestion prefixes, keeping MinIO tidy.
- **RabbitMQ health**: `services/ingestion_dispatcher/healthcheck.py` exercises the broker connection every 30seconds via Dockers `healthcheck`. If RabbitMQ restarts or credentials break, the container flips to `unhealthy` and Compose/Kubernetes restarts it automatically instead of silently hanging.
- **Crash recovery**: because uploads land in MinIO first, you can rerun the cleanup script or replay outstanding objects after a dispatcher outage simply by re-seeding the queue (`mc event add … --event put -p`)—no state is lost in the dispatcher itself.

View File

@@ -81,33 +81,6 @@ LLM_COGNEE_API_KEY=sk-your-key
If the Cognee variables are omitted, graph-specific tools remain available but return a friendly "not configured" response.
### Cognee Storage Backend
Cognee defaults to local storage under `.fuzzforge/cognee/`, but you can mirror datasets to MinIO/S3 for multi-tenant or containerised deployments:
```env
COGNEE_STORAGE_BACKEND=s3
COGNEE_S3_BUCKET=cognee
COGNEE_S3_PREFIX=project_${PROJECT_ID}
COGNEE_S3_ENDPOINT=http://localhost:9000
COGNEE_S3_REGION=us-east-1
COGNEE_S3_ACCESS_KEY=fuzzforge
COGNEE_S3_SECRET_KEY=fuzzforge123
COGNEE_S3_ALLOW_HTTP=1
```
Set the values to match your MinIO/S3 endpoint; the docker compose stack seeds a `cognee` bucket automatically. When S3 mode is active, ingestion and search work exactly the same but Cognee writes metadata to `s3://<bucket>/<prefix>/project_<id>/{data,system}`.
### Cognee Service URL
The CLI and workers talk to Cognee over HTTP. Point `COGNEE_SERVICE_URL` at the service (defaults to `http://localhost:18000` when you run `docker/docker-compose.cognee.yml`) and provide `COGNEE_API_KEY` if you protect the API behind LiteLLM.
Every project gets its own Cognee login so datasets stay isolated. The CLI auto-derives an email/password pair (e.g., `project_<id>@fuzzforge.dev`) and registers it the first time you run `fuzzforge ingest`. Override those defaults by setting `COGNEE_SERVICE_EMAIL` / `COGNEE_SERVICE_PASSWORD` in `.fuzzforge/.env` before running ingestion if you need to reuse an existing account.
### MinIO Event Mapping
The ingestion dispatcher converts S3 prefixes to datasets using `DATASET_CATEGORY_MAP` (default `files:codebase,findings:findings,docs:docs`). Adjust it in `docker-compose.yml` if you want to add more categories or rename datasets.
## MCP / Backend Integration
```env

View File

@@ -38,13 +38,12 @@ All runs automatically skip `.fuzzforge/**` and `.git/**` to avoid recursive ing
- Primary dataset: `<project>_codebase`
- Additional datasets: create ad-hoc buckets such as `insights` via the `ingest_to_dataset` tool
- Storage location (service default): `s3://<bucket>/<prefix>/project_<id>/{data,system}` as defined by the Cognee service (the docker compose stack seeds a `cognee` bucket automatically).
- Local mode (opt-in): set `COGNEE_STORAGE_BACKEND=local` to fall back to `.fuzzforge/cognee/project_<id>/` when developing without MinIO.
- Storage location: `.fuzzforge/cognee/project_<id>/`
### Persistence Details
- The Cognee service keeps datasets inside the configured bucket/prefix (`s3://<bucket>/<prefix>/project_<id>/{data,system}`) so every project has its own Ladybug + LanceDB pair. Local mode mirrors the same layout under `.fuzzforge/cognee/project_<id>/`.
- Cognee assigns deterministic IDs per project; copy the entire prefix (local or S3) if you migrate repositories to retain graph history.
- Every dataset lives under `.fuzzforge/cognee/project_<id>/{data,system}`. These directories are safe to commit to long-lived storage (they only contain embeddings and metadata).
- Cognee assigns deterministic IDs per project; if you move the repository, copy the entire `.fuzzforge/cognee/` tree to retain graph history.
- `HybridMemoryManager` ensures answers from Cognee are written back into the ADK session store so future prompts can refer to the same nodes without repeating the query.
- All Cognee processing runs locally against the files you ingest. No external service calls are made unless you configure a remote Cognee endpoint.
@@ -78,40 +77,10 @@ FUZZFORGE_MCP_URL=http://localhost:8010/mcp
LLM_COGNEE_PROVIDER=openai
LLM_COGNEE_MODEL=gpt-5-mini
LLM_COGNEE_API_KEY=sk-your-key
COGNEE_SERVICE_URL=http://localhost:18000
COGNEE_API_KEY=
```
The CLI auto-registers a dedicated Cognee account per project the first time you ingest (email pattern `project_<id>@cognee.local`). Set `COGNEE_SERVICE_EMAIL` / `COGNEE_SERVICE_PASSWORD` in `.fuzzforge/.env` if you prefer to reuse an existing login.
Switch the knowledge graph storage to S3/MinIO by adding:
```env
COGNEE_STORAGE_BACKEND=s3
COGNEE_S3_BUCKET=cognee
COGNEE_S3_PREFIX=project_${PROJECT_ID}
COGNEE_S3_ENDPOINT=http://localhost:9000
COGNEE_S3_ACCESS_KEY=fuzzforge
COGNEE_S3_SECRET_KEY=fuzzforge123
COGNEE_S3_ALLOW_HTTP=1
```
The default `docker-compose` stack already seeds a `cognee` bucket inside MinIO so these values work out-of-the-box. Point `COGNEE_SERVICE_URL` at the Cognee container (included in `docker/docker-compose.cognee.yml`) so `fuzzforge ingest` sends all requests to the shared service instead of importing Cognee locally.
Add comments or project-specific overrides as needed; the agent reads these variables on startup.
## Event-Driven Ingestion
Uploading files directly into MinIO triggers Cognee automatically. The dispatcher watches `s3://projects/<project-id>/...` and translates the top-level folder into a dataset:
| Prefix | Dataset name |
|-----------|---------------------------------------|
| `files/` | `<project-id>_codebase` |
| `findings/` | `<project-id>_findings` |
| `docs/` | `<project-id>_docs` |
Under the hood MinIO publishes a `PUT` event → RabbitMQ (`cognee-ingest` exchange) → the `ingestion-dispatcher` container downloads the object and calls `/api/v1/add` + `/api/v1/cognify` using the deterministic project credentials (`project_<id>@fuzzforge.dev`). That means rsync, `aws s3 cp`, GitHub Actions, or any other tool that writes to the bucket can seed Cognee without touching the CLI.
## Tips
- Re-run ingestion after significant code changes to keep the knowledge graph fresh.

View File

@@ -231,20 +231,6 @@ nano volumes/env/.env
See [Getting Started](../tutorial/getting-started.md) for detailed environment setup.
### Cognee Service Stack
Cognee now runs as its own container so every project shares the same multi-tenant backend (Ladybug + LanceDB sitting on MinIO). After the core stack is running, bring the service online with:
```bash
docker compose -f docker/docker-compose.cognee.yml up -d
```
This spins up the Cognee API on `http://localhost:18000`, publishes it to the host, and stores knowledge graphs in the `cognee` bucket that the main compose file seeds. Point the CLI at it by setting `COGNEE_SERVICE_URL=http://localhost:18000` (already included in `.env.template`).
### RabbitMQ + Dispatcher
`docker-compose.yml` also launches RabbitMQ (`http://localhost:15672`, ingest/ingest) and the `ingestion-dispatcher` container. MinIO publishes `PUT` events from `s3://projects/<project-id>/...` to the `cognee-ingest` exchange, and the dispatcher downloads the object and calls Cognees REST API. That means any rsync/upload into the projects bucket automatically becomes a dataset.
---
## Troubleshooting

View File

@@ -1,111 +0,0 @@
#!/usr/bin/env bash
# Stand up the Cognee/Ladybug ingestion pipeline (MinIO + RabbitMQ + dispatcher)
# and optionally push a sample file through the MinIO bucket to prove the
# RabbitMQ → dispatcher → Cognee path is healthy.
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
cd "$ROOT_DIR"
PROJECT_ID="${PROJECT_ID:-demo123}"
VERIFY=0
usage() {
cat <<'USAGE'
Usage: scripts/setup_auto_ingest.sh [--verify] [--project <id>]
--verify Upload a sample file into MinIO after services start.
--project <id> Project ID for the verification upload (default: demo123).
Environment overrides:
PROJECT_ID Same as --project.
AWS_ENDPOINT Override the MinIO endpoint (default http://minio:9000 inside Docker network).
USAGE
}
while [[ $# -gt 0 ]]; do
case "$1" in
--verify)
VERIFY=1
shift
;;
--project)
PROJECT_ID="$2"
shift 2
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown option: $1" >&2
usage
exit 1
;;
esac
done
AWS_ENDPOINT="${AWS_ENDPOINT:-http://minio:9000}"
require_file() {
if [[ ! -f "$1" ]]; then
echo "Missing $1. Copy volumes/env/.env.template to volumes/env/.env first." >&2
exit 1
fi
}
run() {
echo "[$(date +%H:%M:%S)] $*"
"$@"
}
require_file "volumes/env/.env"
echo "Bootstrapping auto-ingestion stack from $ROOT_DIR"
run docker compose up -d
run docker compose -f docker/docker-compose.cognee.yml up -d
# Ensure MinIO buckets, lifecycle policies, and AMQP events are in place.
run docker compose up minio-setup
# Make sure the dispatcher is online (restarts to pick up env/file changes).
run docker compose up -d ingestion-dispatcher
echo "Current ingestion dispatcher status:"
docker compose ps ingestion-dispatcher
if [[ "$VERIFY" -eq 1 ]]; then
TMP_FILE="$(mktemp)"
SAMPLE_KEY="files/ingest_smoketest_$(date +%s).txt"
cat <<EOF >"$TMP_FILE"
Automatic ingestion smoke test at $(date)
Project: $PROJECT_ID
EOF
echo "Uploading $SAMPLE_KEY into s3://projects/$PROJECT_ID via aws-cli container..."
run docker run --rm --network fuzzforge_temporal_network \
-e AWS_ACCESS_KEY_ID=fuzzforge \
-e AWS_SECRET_ACCESS_KEY=fuzzforge123 \
-e AWS_DEFAULT_REGION=us-east-1 \
-v "$TMP_FILE:/tmp/sample.txt:ro" \
amazon/aws-cli s3 cp /tmp/sample.txt "s3://projects/${PROJECT_ID}/${SAMPLE_KEY}" \
--endpoint-url "$AWS_ENDPOINT"
rm -f "$TMP_FILE"
cat <<EOF
Sample file enqueued. Watch the dispatcher logs with:
docker logs -f fuzzforge-ingestion-dispatcher
Datasets will appear via:
curl -s -X POST http://localhost:18000/api/v1/auth/login \\
-d "username=project_${PROJECT_ID}@fuzzforge.dev&password=\$(python3 - <<'PY'
from hashlib import sha256
print(sha256(b"$PROJECT_ID").hexdigest()[:20])
PY
)" | python3 -c 'import sys,json; print(json.load(sys.stdin)["access_token"])'
EOF
fi
echo "Auto-ingestion stack ready."

View File

@@ -1,11 +0,0 @@
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py healthcheck.py ./
CMD ["python", "app.py"]

View File

@@ -1,285 +0,0 @@
import json
import logging
import os
import tempfile
from contextlib import contextmanager
from dataclasses import dataclass
from hashlib import sha256
from typing import Dict, Iterable, List, Optional
from urllib.parse import unquote_plus
import boto3
import httpx
import pika
from tenacity import retry, stop_after_attempt, wait_fixed
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
LOGGER = logging.getLogger("cognee-dispatcher")
@dataclass(frozen=True)
class Record:
bucket: str
key: str
project_id: str
category: str
dataset: str
class S3Client:
def __init__(self) -> None:
self.default_bucket = os.getenv("S3_BUCKET", "projects")
session = boto3.session.Session()
self.client = session.client(
"s3",
endpoint_url=os.getenv("S3_ENDPOINT", "http://minio:9000"),
aws_access_key_id=os.getenv("S3_ACCESS_KEY"),
aws_secret_access_key=os.getenv("S3_SECRET_KEY"),
region_name=os.getenv("S3_REGION", "us-east-1"),
)
@contextmanager
def download(self, key: str, bucket: Optional[str] = None):
# Use /tmp for dispatcher temp files (never inside project workspace)
temp_dir = os.getenv("DISPATCHER_TEMP_DIR", "/tmp/dispatcher_tmp")
os.makedirs(temp_dir, exist_ok=True)
tmp = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
tmp.close()
try:
target_bucket = bucket or self.default_bucket
self.client.download_file(target_bucket, key, tmp.name)
yield tmp.name
finally:
try:
os.unlink(tmp.name)
except FileNotFoundError:
pass
class CogneeApiClient:
def __init__(self, base_url: str, email: str, password: str) -> None:
self.base_url = base_url.rstrip("/")
self.email = email
self.password = password
self._token: Optional[str] = None
self._client = httpx.Client(timeout=httpx.Timeout(180.0))
def close(self) -> None:
self._client.close()
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def ensure_authenticated(self) -> None:
if self._token:
return
if not self._login():
LOGGER.info("Registering new Cognee user %s", self.email)
self._register()
if not self._login(): # pragma: no cover
raise RuntimeError("Unable to authenticate with Cognee service")
def _register(self) -> None:
response = self._client.post(
f"{self.base_url}/api/v1/auth/register",
json={"email": self.email, "password": self.password},
)
if response.status_code not in (200, 201, 400):
response.raise_for_status()
def _login(self) -> bool:
response = self._client.post(
f"{self.base_url}/api/v1/auth/login",
data={"username": self.email, "password": self.password},
)
if response.status_code != 200:
return False
self._token = response.json().get("access_token")
self._client.headers["Authorization"] = f"Bearer {self._token}"
return True
def add_file(
self, file_path: str, dataset: str, original_filename: Optional[str] = None
) -> bool:
with open(file_path, "rb") as fh:
# Use original filename from S3 key instead of temp filename
filename = original_filename or os.path.basename(file_path)
files = {"data": (filename, fh)}
data = {"datasetName": dataset}
response = self._client.post(f"{self.base_url}/api/v1/add", data=data, files=files)
if response.status_code == 409:
LOGGER.info(
"Dataset %s already has %s (%s)",
dataset,
filename,
response.text.strip(),
)
return False
if response.status_code not in (200, 201):
raise RuntimeError(f"Add failed: {response.text}")
return True
def cognify(self, dataset: str) -> None:
payload = {"datasets": [dataset], "run_in_background": False}
response = self._client.post(f"{self.base_url}/api/v1/cognify", json=payload)
if response.status_code not in (200, 201):
raise RuntimeError(f"Cognify failed: {response.text}")
class Dispatcher:
def __init__(self) -> None:
self.s3 = S3Client()
self.cognee_url = os.getenv("COGNEE_SERVICE_URL", "http://fuzzforge-cognee:8000")
self.email_domain = os.getenv("EMAIL_DOMAIN", "fuzzforge.dev")
self.category_map = self._parse_category_map(os.getenv("DATASET_CATEGORY_MAP"))
@staticmethod
def _parse_category_map(raw: Optional[str]) -> Dict[str, str]:
mapping: Dict[str, str] = {}
if not raw:
return mapping
for pair in raw.split(","):
if ":" not in pair:
continue
category, suffix = pair.split(":", 1)
mapping[category.strip()] = suffix.strip()
return mapping
def handle_record(self, record: Record) -> None:
LOGGER.info("Processing %s -> dataset %s", record.key, record.dataset)
# Extract original filename from S3 key
original_filename = record.key.split("/")[-1]
with self.s3.download(record.key, record.bucket) as local_path:
client = CogneeApiClient(
base_url=self.cognee_url,
email=self._service_email(record.project_id),
password=self._service_password(record.project_id),
)
try:
client.ensure_authenticated()
created = client.add_file(
local_path, record.dataset, original_filename=original_filename
)
if created:
client.cognify(record.dataset)
# Remove Cognee's temp/text artifacts so the bucket stays tidy.
self._cleanup_cognee_artifacts(record.project_id, record.category)
else:
LOGGER.info(
"Skipping cognify for %s; file already present", record.dataset
)
finally:
client.close()
def _cleanup_cognee_artifacts(self, project_id: str, category: str) -> None:
"""Remove tmp* and text_* files that Cognee creates during processing."""
try:
prefix = f"{project_id}/{category}/"
import boto3
s3_client = boto3.client(
service_name='s3',
endpoint_url=os.getenv("S3_ENDPOINT"),
aws_access_key_id=os.getenv("S3_ACCESS_KEY"),
aws_secret_access_key=os.getenv("S3_SECRET_KEY"),
region_name=os.getenv("S3_REGION", "us-east-1"),
)
response = s3_client.list_objects_v2(
Bucket=self.s3.default_bucket,
Prefix=prefix,
MaxKeys=100
)
to_delete = []
for obj in response.get('Contents', []):
key = obj['Key']
filename = key.split('/')[-1]
# Delete temp files created by Cognee
if (filename.startswith('tmp') and '.' not in filename) or filename.startswith('text_'):
to_delete.append({'Key': key})
if to_delete:
s3_client.delete_objects(
Bucket=self.s3.default_bucket,
Delete={'Objects': to_delete}
)
LOGGER.info("Cleaned up %d Cognee artifacts from %s", len(to_delete), prefix)
except Exception as e:
LOGGER.warning("Failed to cleanup Cognee artifacts: %s", e)
def _service_email(self, project_id: str) -> str:
return f"project_{project_id}@{self.email_domain}"
def _service_password(self, project_id: str) -> str:
return sha256(project_id.encode()).hexdigest()[:20]
def parse_records(self, payload: Dict) -> Iterable[Record]:
"""Parse S3 event records and filter out files that shouldn't be ingested.
Ingestion scope:
- s3://projects/<project-id>/files/ → <project-id>_codebase
- s3://projects/<project-id>/findings/ → <project-id>_findings
- s3://projects/<project-id>/docs/ → <project-id>_docs
Exclusions:
- s3://projects/<project-id>/tmp/ → Not in category map (agent temp files)
- Files named tmp* without extension → Python tempfile artifacts
- Files named text_*.txt → Cognee processing artifacts
"""
for record in payload.get("Records", []):
s3_info = record.get("s3", {})
bucket = s3_info.get("bucket", {}).get("name")
key = unquote_plus(s3_info.get("object", {}).get("key", ""))
key_parts = key.split("/")
if len(key_parts) < 3:
LOGGER.debug("Skipping key without project/category: %s", key)
continue
project_id, category = key_parts[0], key_parts[1]
filename = key_parts[-1]
# Skip temp files: tmp* without extension, text_<hash>.txt from Cognee processing
if (filename.startswith("tmp") and "." not in filename) or filename.startswith("text_"):
LOGGER.debug("Skipping temporary/processed file: %s", key)
continue
dataset_suffix = self.category_map.get(category)
if not dataset_suffix:
LOGGER.debug("Ignoring category %s for %s", category, key)
continue
dataset = f"{project_id}_{dataset_suffix}"
yield Record(bucket=bucket or self.s3.default_bucket, key="/".join(key_parts), project_id=project_id, category=category, dataset=dataset)
def main() -> None:
dispatcher = Dispatcher()
rabbit_url = os.getenv("RABBITMQ_URL", "amqp://ingest:ingest@rabbitmq:5672/")
exchange = os.getenv("RABBITMQ_EXCHANGE", "cognee-ingest")
queue_name = os.getenv("RABBITMQ_QUEUE", "cognee-ingestion-dispatcher")
connection = pika.BlockingConnection(pika.URLParameters(rabbit_url))
channel = connection.channel()
channel.exchange_declare(exchange=exchange, exchange_type="fanout", durable=True)
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(queue=queue_name, exchange=exchange)
channel.basic_qos(prefetch_count=1)
def _callback(ch, method, _properties, body):
try:
payload = json.loads(body.decode("utf-8"))
for record in dispatcher.parse_records(payload):
dispatcher.handle_record(record)
except Exception as exc: # pragma: no cover
LOGGER.exception("Failed to process event: %s", exc)
# Don't requeue 404s (file deleted/never existed) - ack and move on
from botocore.exceptions import ClientError
if isinstance(exc, ClientError) and exc.response.get('Error', {}).get('Code') == '404':
LOGGER.warning("File not found (404), acking message to avoid retry loop")
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
return
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=_callback)
LOGGER.info("Ingestion dispatcher listening on %s", queue_name)
channel.start_consuming()
if __name__ == "__main__":
main()

View File

@@ -1,26 +0,0 @@
"""Simple healthcheck to verify RabbitMQ connectivity."""
from __future__ import annotations
import os
import sys
import pika
def main() -> int:
rabbit_url = os.getenv("RABBITMQ_URL", "amqp://ingest:ingest@rabbitmq:5672/")
try:
connection = pika.BlockingConnection(pika.URLParameters(rabbit_url))
try:
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
finally:
connection.close()
except Exception as exc: # pragma: no cover - run-time diagnostic
print(f"[healthcheck] RabbitMQ unavailable: {exc}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,4 +0,0 @@
boto3==1.34.146
pika==1.3.2
httpx==0.28.1
tenacity==9.0.0

View File

@@ -9,7 +9,7 @@ FuzzForge security testing project.
fuzzforge workflows
# Submit a workflow for analysis
fuzzforge workflow run <workflow-name> /path/to/target
fuzzforge workflow <workflow-name> /path/to/target
# View findings
fuzzforge finding <run-id>

View File

@@ -63,16 +63,3 @@ LLM_EMBEDDING_MODEL=litellm_proxy/text-embedding-3-large
# -----------------------------------------------------------------------------
UI_USERNAME=fuzzforge
UI_PASSWORD=fuzzforge123
# Cognee service configuration
COGNEE_MCP_URL=
COGNEE_SERVICE_URL=http://localhost:18000
COGNEE_API_KEY=
COGNEE_STORAGE_BACKEND=s3
COGNEE_S3_BUCKET=projects
COGNEE_S3_PREFIX=
COGNEE_S3_ENDPOINT=http://localhost:9000
COGNEE_S3_REGION=us-east-1
COGNEE_S3_ACCESS_KEY=fuzzforge
COGNEE_S3_SECRET_KEY=fuzzforge123
COGNEE_S3_ALLOW_HTTP=1