diff --git a/docker-compose.yml b/docker-compose.yml index 12c2f9d..7f66541 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -125,6 +125,7 @@ services: condition: service_healthy entrypoint: > /bin/sh -c " + set -e; echo 'Waiting for MinIO to be ready...'; sleep 2; @@ -139,8 +140,8 @@ services: mc mb fuzzforge/projects --ignore-existing; echo 'Configuring project ingestion events...'; - mc event remove fuzzforge/projects arn:minio:sqs::ingest || true; - mc event add fuzzforge/projects arn:minio:sqs::ingest --event put; + mc event remove fuzzforge/projects --force || true; + mc event add fuzzforge/projects arn:minio:sqs::ingest:amqp --event put -p; echo 'Setting lifecycle policies...'; mc ilm add fuzzforge/targets --expiry-days 7; diff --git a/docker/docker-compose.cognee.yml b/docker/docker-compose.cognee.yml index e74e5e5..b75f299 100644 --- a/docker/docker-compose.cognee.yml +++ b/docker/docker-compose.cognee.yml @@ -29,6 +29,11 @@ services: LLM_ENDPOINT: http://llm-proxy:4000 LOG_LEVEL: INFO ENVIRONMENT: dev + COGNEE_TEMP_DIR: /tmp/cognee + # Processed text files must be persisted so downstream ingestion doesn't 404. + COGNEE_SKIP_PROCESSED_FILE_STORAGE: "false" + # Store normalized Cognee artifacts outside of ingestion prefixes + COGNEE_PROCESSED_SUBDIR: cognee_artifacts ports: - "18000:8000" networks: diff --git a/services/ingestion_dispatcher/app.py b/services/ingestion_dispatcher/app.py index 18bd77e..ad64c93 100644 --- a/services/ingestion_dispatcher/app.py +++ b/services/ingestion_dispatcher/app.py @@ -40,7 +40,10 @@ class S3Client: @contextmanager def download(self, key: str, bucket: Optional[str] = None): - tmp = tempfile.NamedTemporaryFile(delete=False) + # Use /tmp for dispatcher temp files (never inside project workspace) + temp_dir = os.getenv("DISPATCHER_TEMP_DIR", "/tmp/dispatcher_tmp") + os.makedirs(temp_dir, exist_ok=True) + tmp = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) tmp.close() try: target_bucket = bucket or self.default_bucket @@ -93,13 +96,26 @@ class CogneeApiClient: self._client.headers["Authorization"] = f"Bearer {self._token}" return True - def add_file(self, file_path: str, dataset: str) -> None: + def add_file( + self, file_path: str, dataset: str, original_filename: Optional[str] = None + ) -> bool: with open(file_path, "rb") as fh: - files = {"data": (os.path.basename(file_path), fh)} + # Use original filename from S3 key instead of temp filename + filename = original_filename or os.path.basename(file_path) + files = {"data": (filename, fh)} data = {"datasetName": dataset} response = self._client.post(f"{self.base_url}/api/v1/add", data=data, files=files) + if response.status_code == 409: + LOGGER.info( + "Dataset %s already has %s (%s)", + dataset, + filename, + response.text.strip(), + ) + return False if response.status_code not in (200, 201): raise RuntimeError(f"Add failed: {response.text}") + return True def cognify(self, dataset: str) -> None: payload = {"datasets": [dataset], "run_in_background": False} @@ -129,6 +145,8 @@ class Dispatcher: def handle_record(self, record: Record) -> None: LOGGER.info("Processing %s -> dataset %s", record.key, record.dataset) + # Extract original filename from S3 key + original_filename = record.key.split("/")[-1] with self.s3.download(record.key, record.bucket) as local_path: client = CogneeApiClient( base_url=self.cognee_url, @@ -137,11 +155,56 @@ class Dispatcher: ) try: client.ensure_authenticated() - client.add_file(local_path, record.dataset) - client.cognify(record.dataset) + created = client.add_file( + local_path, record.dataset, original_filename=original_filename + ) + if created: + client.cognify(record.dataset) + # Remove Cognee's temp/text artifacts so the bucket stays tidy. + self._cleanup_cognee_artifacts(record.project_id, record.category) + else: + LOGGER.info( + "Skipping cognify for %s; file already present", record.dataset + ) finally: client.close() + def _cleanup_cognee_artifacts(self, project_id: str, category: str) -> None: + """Remove tmp* and text_* files that Cognee creates during processing.""" + try: + prefix = f"{project_id}/{category}/" + import boto3 + s3_client = boto3.client( + service_name='s3', + endpoint_url=os.getenv("S3_ENDPOINT"), + aws_access_key_id=os.getenv("S3_ACCESS_KEY"), + aws_secret_access_key=os.getenv("S3_SECRET_KEY"), + region_name=os.getenv("S3_REGION", "us-east-1"), + ) + + response = s3_client.list_objects_v2( + Bucket=self.s3.default_bucket, + Prefix=prefix, + MaxKeys=100 + ) + + to_delete = [] + for obj in response.get('Contents', []): + key = obj['Key'] + filename = key.split('/')[-1] + # Delete temp files created by Cognee + if (filename.startswith('tmp') and '.' not in filename) or filename.startswith('text_'): + to_delete.append({'Key': key}) + + if to_delete: + s3_client.delete_objects( + Bucket=self.s3.default_bucket, + Delete={'Objects': to_delete} + ) + LOGGER.info("Cleaned up %d Cognee artifacts from %s", len(to_delete), prefix) + except Exception as e: + LOGGER.warning("Failed to cleanup Cognee artifacts: %s", e) + def _service_email(self, project_id: str) -> str: return f"project_{project_id}@{self.email_domain}" @@ -149,6 +212,18 @@ class Dispatcher: return sha256(project_id.encode()).hexdigest()[:20] def parse_records(self, payload: Dict) -> Iterable[Record]: + """Parse S3 event records and filter out files that shouldn't be ingested. + + Ingestion scope: + - s3://projects//files/ → _codebase + - s3://projects//findings/ → _findings + - s3://projects//docs/ → _docs + + Exclusions: + - s3://projects//tmp/ → Not in category map (agent temp files) + - Files named tmp* without extension → Python tempfile artifacts + - Files named text_*.txt → Cognee processing artifacts + """ for record in payload.get("Records", []): s3_info = record.get("s3", {}) bucket = s3_info.get("bucket", {}).get("name") @@ -159,8 +234,9 @@ class Dispatcher: continue project_id, category = key_parts[0], key_parts[1] filename = key_parts[-1] - if filename.startswith("tmp") and "." not in filename: - LOGGER.debug("Skipping temporary upload key: %s", key) + # Skip temp files: tmp* without extension, text_.txt from Cognee processing + if (filename.startswith("tmp") and "." not in filename) or filename.startswith("text_"): + LOGGER.debug("Skipping temporary/processed file: %s", key) continue dataset_suffix = self.category_map.get(category) if not dataset_suffix: @@ -190,7 +266,13 @@ def main() -> None: dispatcher.handle_record(record) except Exception as exc: # pragma: no cover LOGGER.exception("Failed to process event: %s", exc) - ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + # Don't requeue 404s (file deleted/never existed) - ack and move on + from botocore.exceptions import ClientError + if isinstance(exc, ClientError) and exc.response.get('Error', {}).get('Code') == '404': + LOGGER.warning("File not found (404), acking message to avoid retry loop") + ch.basic_ack(delivery_tag=method.delivery_tag) + else: + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) return ch.basic_ack(delivery_tag=method.delivery_tag)