Document RabbitMQ monitoring and refine dispatcher filtering

This commit is contained in:
Songbird
2025-11-07 17:52:22 +01:00
parent 44d61ad4bf
commit d33847609f
2 changed files with 14 additions and 1 deletions

View File

@@ -203,7 +203,16 @@ s3://projects/<project-id>/
docs/... # → <project-id>_docs dataset
```
MinIO emits the object-created event to RabbitMQ, the `ingestion-dispatcher` downloads the file, and it calls the Cognee REST API on behalf of that projects tenant. Use any upload mechanism you like (`aws s3 cp`, rsync to MinIO, etc.); once the object lands in the bucket it is ingested and cognified automatically.
MinIO emits the object-created event to RabbitMQ, the `ingestion-dispatcher` downloads the file, and it calls the Cognee REST API on behalf of that project's tenant. Use any upload mechanism you like (`aws s3 cp`, rsync to MinIO, etc.); once the object lands in the bucket it is ingested and cognified automatically.
#### Monitoring / Debugging
- RabbitMQ Management UI: `http://localhost:15672` (user `ingest`, password `ingest`). The `cognee-ingest` exchange fan-outs events into the `cognee-ingestion-dispatcher` queue; a growing `Ready` count means the dispatcher is down or congested.
- Dispatcher logs: `docker logs -f fuzzforge-ingestion-dispatcher` immediately show each object as `Processing <project>/<category>/<file> -> dataset ...` followed by `/auth/login`, `/add`, and `/cognify` 200s.
- Cognee service logs: `docker logs -f fuzzforge-cognee` display the full pipeline (`ingest_data`, `extract_graph_from_data`, etc.) for each dataset run.
- Bucket verification: `docker run --rm --network=fuzzforge_temporal_network -e AWS_ACCESS_KEY_ID=fuzzforge -e AWS_SECRET_ACCESS_KEY=fuzzforge123 amazon/aws-cli s3 ls --recursive --endpoint-url http://minio:9000 s3://projects/<project-id>` confirms files, Ladybug DBs (`graph/…`) and LanceDB indexes (`vector/…`).
If you still upload into the legacy `s3://cognee/...` hierarchy, copy the object over (or update `COGNEE_S3_BUCKET`) so MinIO emits the event for the watched bucket.
```
**What's running:**

View File

@@ -158,6 +158,10 @@ class Dispatcher:
LOGGER.debug("Skipping key without project/category: %s", key)
continue
project_id, category = key_parts[0], key_parts[1]
filename = key_parts[-1]
if filename.startswith("tmp") and "." not in filename:
LOGGER.debug("Skipping temporary upload key: %s", key)
continue
dataset_suffix = self.category_map.get(category)
if not dataset_suffix:
LOGGER.debug("Ignoring category %s for %s", category, key)