Keep Cognee artifacts out of ingestion

This commit is contained in:
Songbird
2025-11-14 10:58:41 +01:00
parent d33847609f
commit a05a8235ea
3 changed files with 98 additions and 10 deletions

View File

@@ -125,6 +125,7 @@ services:
condition: service_healthy
entrypoint: >
/bin/sh -c "
set -e;
echo 'Waiting for MinIO to be ready...';
sleep 2;
@@ -139,8 +140,8 @@ services:
mc mb fuzzforge/projects --ignore-existing;
echo 'Configuring project ingestion events...';
mc event remove fuzzforge/projects arn:minio:sqs::ingest || true;
mc event add fuzzforge/projects arn:minio:sqs::ingest --event put;
mc event remove fuzzforge/projects --force || true;
mc event add fuzzforge/projects arn:minio:sqs::ingest:amqp --event put -p;
echo 'Setting lifecycle policies...';
mc ilm add fuzzforge/targets --expiry-days 7;

View File

@@ -29,6 +29,11 @@ services:
LLM_ENDPOINT: http://llm-proxy:4000
LOG_LEVEL: INFO
ENVIRONMENT: dev
COGNEE_TEMP_DIR: /tmp/cognee
# Processed text files must be persisted so downstream ingestion doesn't 404.
COGNEE_SKIP_PROCESSED_FILE_STORAGE: "false"
# Store normalized Cognee artifacts outside of ingestion prefixes
COGNEE_PROCESSED_SUBDIR: cognee_artifacts
ports:
- "18000:8000"
networks:

View File

@@ -40,7 +40,10 @@ class S3Client:
@contextmanager
def download(self, key: str, bucket: Optional[str] = None):
tmp = tempfile.NamedTemporaryFile(delete=False)
# Use /tmp for dispatcher temp files (never inside project workspace)
temp_dir = os.getenv("DISPATCHER_TEMP_DIR", "/tmp/dispatcher_tmp")
os.makedirs(temp_dir, exist_ok=True)
tmp = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
tmp.close()
try:
target_bucket = bucket or self.default_bucket
@@ -93,13 +96,26 @@ class CogneeApiClient:
self._client.headers["Authorization"] = f"Bearer {self._token}"
return True
def add_file(self, file_path: str, dataset: str) -> None:
def add_file(
self, file_path: str, dataset: str, original_filename: Optional[str] = None
) -> bool:
with open(file_path, "rb") as fh:
files = {"data": (os.path.basename(file_path), fh)}
# Use original filename from S3 key instead of temp filename
filename = original_filename or os.path.basename(file_path)
files = {"data": (filename, fh)}
data = {"datasetName": dataset}
response = self._client.post(f"{self.base_url}/api/v1/add", data=data, files=files)
if response.status_code == 409:
LOGGER.info(
"Dataset %s already has %s (%s)",
dataset,
filename,
response.text.strip(),
)
return False
if response.status_code not in (200, 201):
raise RuntimeError(f"Add failed: {response.text}")
return True
def cognify(self, dataset: str) -> None:
payload = {"datasets": [dataset], "run_in_background": False}
@@ -129,6 +145,8 @@ class Dispatcher:
def handle_record(self, record: Record) -> None:
LOGGER.info("Processing %s -> dataset %s", record.key, record.dataset)
# Extract original filename from S3 key
original_filename = record.key.split("/")[-1]
with self.s3.download(record.key, record.bucket) as local_path:
client = CogneeApiClient(
base_url=self.cognee_url,
@@ -137,11 +155,56 @@ class Dispatcher:
)
try:
client.ensure_authenticated()
client.add_file(local_path, record.dataset)
client.cognify(record.dataset)
created = client.add_file(
local_path, record.dataset, original_filename=original_filename
)
if created:
client.cognify(record.dataset)
# Remove Cognee's temp/text artifacts so the bucket stays tidy.
self._cleanup_cognee_artifacts(record.project_id, record.category)
else:
LOGGER.info(
"Skipping cognify for %s; file already present", record.dataset
)
finally:
client.close()
def _cleanup_cognee_artifacts(self, project_id: str, category: str) -> None:
"""Remove tmp* and text_* files that Cognee creates during processing."""
try:
prefix = f"{project_id}/{category}/"
import boto3
s3_client = boto3.client(
service_name='s3',
endpoint_url=os.getenv("S3_ENDPOINT"),
aws_access_key_id=os.getenv("S3_ACCESS_KEY"),
aws_secret_access_key=os.getenv("S3_SECRET_KEY"),
region_name=os.getenv("S3_REGION", "us-east-1"),
)
response = s3_client.list_objects_v2(
Bucket=self.s3.default_bucket,
Prefix=prefix,
MaxKeys=100
)
to_delete = []
for obj in response.get('Contents', []):
key = obj['Key']
filename = key.split('/')[-1]
# Delete temp files created by Cognee
if (filename.startswith('tmp') and '.' not in filename) or filename.startswith('text_'):
to_delete.append({'Key': key})
if to_delete:
s3_client.delete_objects(
Bucket=self.s3.default_bucket,
Delete={'Objects': to_delete}
)
LOGGER.info("Cleaned up %d Cognee artifacts from %s", len(to_delete), prefix)
except Exception as e:
LOGGER.warning("Failed to cleanup Cognee artifacts: %s", e)
def _service_email(self, project_id: str) -> str:
return f"project_{project_id}@{self.email_domain}"
@@ -149,6 +212,18 @@ class Dispatcher:
return sha256(project_id.encode()).hexdigest()[:20]
def parse_records(self, payload: Dict) -> Iterable[Record]:
"""Parse S3 event records and filter out files that shouldn't be ingested.
Ingestion scope:
- s3://projects/<project-id>/files/ → <project-id>_codebase
- s3://projects/<project-id>/findings/ → <project-id>_findings
- s3://projects/<project-id>/docs/ → <project-id>_docs
Exclusions:
- s3://projects/<project-id>/tmp/ → Not in category map (agent temp files)
- Files named tmp* without extension → Python tempfile artifacts
- Files named text_*.txt → Cognee processing artifacts
"""
for record in payload.get("Records", []):
s3_info = record.get("s3", {})
bucket = s3_info.get("bucket", {}).get("name")
@@ -159,8 +234,9 @@ class Dispatcher:
continue
project_id, category = key_parts[0], key_parts[1]
filename = key_parts[-1]
if filename.startswith("tmp") and "." not in filename:
LOGGER.debug("Skipping temporary upload key: %s", key)
# Skip temp files: tmp* without extension, text_<hash>.txt from Cognee processing
if (filename.startswith("tmp") and "." not in filename) or filename.startswith("text_"):
LOGGER.debug("Skipping temporary/processed file: %s", key)
continue
dataset_suffix = self.category_map.get(category)
if not dataset_suffix:
@@ -190,7 +266,13 @@ def main() -> None:
dispatcher.handle_record(record)
except Exception as exc: # pragma: no cover
LOGGER.exception("Failed to process event: %s", exc)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# Don't requeue 404s (file deleted/never existed) - ack and move on
from botocore.exceptions import ClientError
if isinstance(exc, ClientError) and exc.response.get('Error', {}).get('Code') == '404':
LOGGER.warning("File not found (404), acking message to avoid retry loop")
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
return
ch.basic_ack(delivery_tag=method.delivery_tag)