From f17adc4574f32dc3aeebbbcaf9ba4f2c9f45ed69 Mon Sep 17 00:00:00 2001 From: ggman12 Date: Sat, 14 Feb 2026 22:21:14 -0500 Subject: [PATCH] remvoe aws worker, reducer --- src/adsb/Dockerfile.reducer | 11 ----- src/adsb/Dockerfile.worker | 12 ----- src/adsb/reducer.py | 97 ------------------------------------- src/adsb/worker.py | 89 ---------------------------------- 4 files changed, 209 deletions(-) delete mode 100644 src/adsb/Dockerfile.reducer delete mode 100644 src/adsb/Dockerfile.worker delete mode 100644 src/adsb/reducer.py delete mode 100644 src/adsb/worker.py diff --git a/src/adsb/Dockerfile.reducer b/src/adsb/Dockerfile.reducer deleted file mode 100644 index b375f46..0000000 --- a/src/adsb/Dockerfile.reducer +++ /dev/null @@ -1,11 +0,0 @@ -FROM --platform=linux/arm64 python:3.12-slim - -WORKDIR /app - -COPY requirements.reducer.txt requirements.txt -RUN pip install --no-cache-dir -r requirements.txt - -COPY compress_adsb_to_aircraft_data.py . -COPY reducer.py . - -CMD ["python", "-u", "reducer.py"] diff --git a/src/adsb/Dockerfile.worker b/src/adsb/Dockerfile.worker deleted file mode 100644 index dc4336d..0000000 --- a/src/adsb/Dockerfile.worker +++ /dev/null @@ -1,12 +0,0 @@ -FROM --platform=linux/arm64 python:3.12-slim - -WORKDIR /app - -COPY requirements.worker.txt requirements.txt -RUN pip install --no-cache-dir -r requirements.txt - -COPY compress_adsb_to_aircraft_data.py . -COPY download_adsb_data_to_parquet.py . -COPY worker.py . - -CMD ["python", "-u", "worker.py"] diff --git a/src/adsb/reducer.py b/src/adsb/reducer.py deleted file mode 100644 index 9dcdb91..0000000 --- a/src/adsb/reducer.py +++ /dev/null @@ -1,97 +0,0 @@ -""" -Reduce step: downloads all chunk CSVs from S3, combines them, -deduplicates across the full dataset, and uploads the final result. - -Environment variables: - S3_BUCKET — bucket with intermediate results - RUN_ID — run identifier matching the map workers - GLOBAL_START_DATE — overall start date for output filename - GLOBAL_END_DATE — overall end date for output filename -""" -import gzip -import os -import shutil -from pathlib import Path - -import boto3 -import polars as pl - -from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature - - -def main(): - s3_bucket = os.environ["S3_BUCKET"] - run_id = os.environ.get("RUN_ID", "default") - global_start = os.environ["GLOBAL_START_DATE"] - global_end = os.environ["GLOBAL_END_DATE"] - - s3 = boto3.client("s3") - prefix = f"intermediate/{run_id}/" - - # List all chunk files for this run - paginator = s3.get_paginator("list_objects_v2") - chunk_keys = [] - for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix): - for obj in page.get("Contents", []): - if obj["Key"].endswith(".csv.gz"): - chunk_keys.append(obj["Key"]) - - chunk_keys.sort() - print(f"Found {len(chunk_keys)} chunks to combine") - - if not chunk_keys: - print("No chunks found — nothing to reduce.") - return - - # Download and concatenate all chunks - download_dir = Path("/tmp/chunks") - download_dir.mkdir(parents=True, exist_ok=True) - - dfs = [] - - for key in chunk_keys: - gz_path = download_dir / Path(key).name - csv_path = gz_path.with_suffix("") # Remove .gz - print(f"Downloading {key}...") - s3.download_file(s3_bucket, key, str(gz_path)) - - # Decompress - with gzip.open(gz_path, 'rb') as f_in: - with open(csv_path, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - gz_path.unlink() - - df_chunk = pl.read_csv(csv_path) - print(f" Loaded {df_chunk.height} rows from {csv_path.name}") - dfs.append(df_chunk) - - # Free disk space after loading - csv_path.unlink() - - df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame() - print(f"Combined: {df_accumulated.height} rows before dedup") - - # Final global deduplication - df_accumulated = deduplicate_by_signature(df_accumulated) - print(f"After dedup: {df_accumulated.height} rows") - - # Write and upload final result - output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz" - csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv") - gz_output = Path(f"/tmp/{output_name}") - - df_accumulated.write_csv(csv_output) - with open(csv_output, 'rb') as f_in: - with gzip.open(gz_output, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - csv_output.unlink() - - final_key = f"final/{output_name}" - print(f"Uploading to s3://{s3_bucket}/{final_key}") - s3.upload_file(str(gz_output), s3_bucket, final_key) - - print(f"Final output: {df_accumulated.height} records -> {final_key}") - - -if __name__ == "__main__": - main() diff --git a/src/adsb/worker.py b/src/adsb/worker.py deleted file mode 100644 index 9884ce7..0000000 --- a/src/adsb/worker.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -Map worker: processes a date range chunk, uploads result to S3. - -Environment variables: - START_DATE — inclusive, YYYY-MM-DD - END_DATE — exclusive, YYYY-MM-DD - S3_BUCKET — bucket for intermediate results - RUN_ID — unique run identifier for namespacing S3 keys -""" -import os -import sys -from datetime import datetime, timedelta -from pathlib import Path - -import boto3 -import polars as pl - -from compress_adsb_to_aircraft_data import ( - load_historical_for_day, - deduplicate_by_signature, - COLUMNS, -) - - -def main(): - start_date_str = os.environ["START_DATE"] - end_date_str = os.environ["END_DATE"] - s3_bucket = os.environ["S3_BUCKET"] - run_id = os.environ.get("RUN_ID", "default") - - start_date = datetime.strptime(start_date_str, "%Y-%m-%d") - end_date = datetime.strptime(end_date_str, "%Y-%m-%d") - - total_days = (end_date - start_date).days - print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})") - - dfs = [] - current_date = start_date - - while current_date < end_date: - day_str = current_date.strftime("%Y-%m-%d") - print(f" Loading {day_str}...") - - df_compressed = load_historical_for_day(current_date) - if df_compressed.height == 0: - raise RuntimeError(f"No data found for {day_str}") - - dfs.append(df_compressed) - total_rows = sum(df.height for df in dfs) - print(f" +{df_compressed.height} rows (total: {total_rows})") - - # Delete local cache after each day to save disk in container - cache_dir = Path("data/adsb") - if cache_dir.exists(): - import shutil - shutil.rmtree(cache_dir) - - current_date += timedelta(days=1) - - # Concatenate all days - df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame() - - # Deduplicate within this chunk - df_accumulated = deduplicate_by_signature(df_accumulated) - print(f"After dedup: {df_accumulated.height} rows") - - # Write to local file then upload to S3 - local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv") - df_accumulated.write_csv(local_path) - - # Compress with gzip - import gzip - import shutil - gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz") - with open(local_path, 'rb') as f_in: - with gzip.open(gz_path, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - local_path.unlink() # Remove uncompressed file - - s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz" - print(f"Uploading to s3://{s3_bucket}/{s3_key}") - - s3 = boto3.client("s3") - s3.upload_file(str(gz_path), s3_bucket, s3_key) - print("Done.") - - -if __name__ == "__main__": - main()