diff --git a/src/adsb/combine_chunks_to_csv.py b/src/adsb/combine_chunks_to_csv.py deleted file mode 100644 index 709c3cd..0000000 --- a/src/adsb/combine_chunks_to_csv.py +++ /dev/null @@ -1,265 +0,0 @@ -""" -Combines chunk parquet files and compresses to final aircraft CSV. -This is the reduce phase of the map-reduce pipeline. - -Supports both single-day (daily) and multi-day (historical) modes. - -Memory-efficient: processes each chunk separately, compresses, then combines. - -Usage: - # Daily mode - python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks - - # Historical mode - python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base -""" -import gc -import gzip -import os -import sys -import glob -import argparse -from datetime import datetime, timedelta, timezone - -import polars as pl - -from src.adsb.download_adsb_data_to_parquet import OUTPUT_DIR, get_resource_usage -from src.adsb.compress_adsb_to_aircraft_data import compress_multi_icao_df, COLUMNS - - -DEFAULT_CHUNK_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks") -FINAL_OUTPUT_DIR = "./data/openairframes" -os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True) - - -def get_target_day() -> datetime: - """Get yesterday's date (the day we're processing).""" - return datetime.now(timezone.utc) - timedelta(days=1) - - -def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame: - """Load and compress a single chunk parquet file. - - Args: - chunk_path: Path to parquet file - delete_after_load: If True, delete the parquet file after loading to free disk space - """ - print(f"Processing {os.path.basename(chunk_path)}... | {get_resource_usage()}") - - # Load chunk - only columns we need - needed_columns = ['time', 'icao'] + COLUMNS - df = pl.read_parquet(chunk_path, columns=needed_columns) - print(f" Loaded {len(df)} rows") - - # Delete file immediately after loading to free disk space - if delete_after_load: - try: - os.remove(chunk_path) - print(f" Deleted {chunk_path} to free disk space") - except Exception as e: - print(f" Warning: Failed to delete {chunk_path}: {e}") - - # Compress to aircraft records (one per ICAO) using shared function - compressed = compress_multi_icao_df(df, verbose=True) - print(f" Compressed to {len(compressed)} aircraft records") - - del df - gc.collect() - - return compressed - - -def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFrame: - """Combine multiple compressed DataFrames. - - Since chunks are partitioned by ICAO hash, each ICAO only appears in one chunk. - No deduplication needed here - just concatenate. - """ - print(f"Combining {len(compressed_dfs)} compressed chunks... | {get_resource_usage()}") - - # Concat all - combined = pl.concat(compressed_dfs) - print(f"Combined: {len(combined)} records") - - return combined - - -def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.DataFrame, str | None]: - """Download base release and merge with new data. - - Returns: - Tuple of (merged_df, earliest_date_str) where earliest_date_str is None if no base release was merged - """ - from src.get_latest_release import download_latest_aircraft_adsb_csv - - print("Downloading base ADS-B release...") - base_path = download_latest_aircraft_adsb_csv( - output_dir="./data/openairframes_base" - ) - print(f"Download returned: {base_path}") - - print(f"Loading base release from {base_path}") - - # Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz) - import re - filename = os.path.basename(str(base_path)) - match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename) - earliest_date = match.group(1) if match else None - print(f"Start date from base filename: {earliest_date}") - - # Read CSV with schema matching the new data - base_df = pl.read_csv(base_path, schema=compressed_df.schema) - print(f"Base release has {len(base_df)} records") - - # Ensure columns match - base_cols = set(base_df.columns) - new_cols = set(compressed_df.columns) - print(f"Base columns: {sorted(base_cols)}") - print(f"New columns: {sorted(new_cols)}") - - # Add missing columns - for col in new_cols - base_cols: - base_df = base_df.with_columns(pl.lit(None).alias(col)) - for col in base_cols - new_cols: - compressed_df = compressed_df.with_columns(pl.lit(None).alias(col)) - - # Reorder columns to match - compressed_df = compressed_df.select(base_df.columns) - - # Concat base (old days) with new data (new days) - # No deduplication needed since they represent different UTC days - combined = pl.concat([base_df, compressed_df]) - print(f"After concat: {len(combined)} records") - - # Sort by time for consistent output - combined = combined.sort('time') - - return combined, earliest_date - -def cleanup_chunks(output_id: str, chunks_dir: str): - """Delete chunk parquet files after successful merge.""" - pattern = os.path.join(chunks_dir, f"chunk_*_{output_id}.parquet") - chunk_files = glob.glob(pattern) - for f in chunk_files: - try: - os.remove(f) - print(f"Deleted {f}") - except Exception as e: - print(f"Failed to delete {f}: {e}") - - -def find_chunk_files(chunks_dir: str, output_id: str) -> list[str]: - """Find chunk parquet files matching the output ID.""" - pattern = os.path.join(chunks_dir, f"chunk_*_{output_id}.parquet") - chunk_files = sorted(glob.glob(pattern)) - - if not chunk_files: - # Try recursive search for historical mode with merged artifacts - pattern = os.path.join(chunks_dir, "**", "*.parquet") - chunk_files = sorted(glob.glob(pattern, recursive=True)) - - return chunk_files - - -def main(): - parser = argparse.ArgumentParser(description="Combine chunk parquets to final CSV") - parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") - parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") - parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") - parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files") - parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release") - parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging") - parser.add_argument("--stream", action="store_true", help="Delete parquet files immediately after loading to save disk space") - args = parser.parse_args() - - # Determine output ID and filename based on mode - if args.start_date and args.end_date: - # Historical mode - output_id = f"{args.start_date}_{args.end_date}" - output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv.gz" - print(f"Combining chunks for date range: {args.start_date} to {args.end_date}") - else: - # Daily mode - use same date for start and end - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") - else: - target_day = get_target_day() - - date_str = target_day.strftime("%Y-%m-%d") - output_id = date_str - output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv.gz" - print(f"Combining chunks for {date_str}") - - chunks_dir = args.chunks_dir - print(f"Chunks directory: {chunks_dir}") - print(f"Resource usage at start: {get_resource_usage()}") - - # Find chunk files - chunk_files = find_chunk_files(chunks_dir, output_id) - - if not chunk_files: - print(f"No chunk files found in: {chunks_dir}") - sys.exit(1) - - print(f"Found {len(chunk_files)} chunk files") - - # Process each chunk separately to save memory - # With --stream, delete parquet files immediately after loading to save disk space - compressed_chunks = [] - for chunk_path in chunk_files: - compressed = process_single_chunk(chunk_path, delete_after_load=args.stream) - compressed_chunks.append(compressed) - gc.collect() - - # Combine all compressed chunks - combined = combine_compressed_chunks(compressed_chunks) - - # Free memory from individual chunks - del compressed_chunks - gc.collect() - print(f"After combining: {get_resource_usage()}") - - # Merge with base release (unless skipped) - base_start_date = None - if not args.skip_base: - combined, base_start_date = download_and_merge_base_release(combined) - - # Update filename if we merged with base release and got a start date - if base_start_date and not (args.start_date and args.end_date): - # Only update filename for daily mode when base was merged - output_filename = f"openairframes_adsb_{base_start_date}_{date_str}.csv.gz" - print(f"Updated filename to reflect date range: {output_filename}") - - # Convert list columns to strings for CSV compatibility - for col in combined.columns: - if combined[col].dtype == pl.List: - combined = combined.with_columns( - pl.col(col).list.join(",").alias(col) - ) - - # Sort by time for consistent output - if 'time' in combined.columns: - combined = combined.sort('time') - - # Replace empty strings with null across all string columns to avoid quoted empty strings - for col in combined.columns: - if combined[col].dtype == pl.Utf8: - combined = combined.with_columns( - pl.when(pl.col(col) == "").then(None).otherwise(pl.col(col)).alias(col) - ) - - # Write final CSV - output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename) - with gzip.open(output_path, "wb", compresslevel=9) as f: - combined.write_csv(f, null_value='', quote_style='necessary') - print(f"Wrote {len(combined)} records to {output_path}") - - # Cleanup - if not args.keep_chunks: - cleanup_chunks(output_id, chunks_dir) - - print(f"Done! | {get_resource_usage()}") - - -if __name__ == "__main__": - main() diff --git a/src/adsb/requirements.reducer.txt b/src/adsb/requirements.reducer.txt deleted file mode 100644 index 29e6bf9..0000000 --- a/src/adsb/requirements.reducer.txt +++ /dev/null @@ -1,2 +0,0 @@ -polars>=1.0 -boto3>=1.34 diff --git a/src/adsb/requirements.worker.txt b/src/adsb/requirements.worker.txt deleted file mode 100644 index cf305a7..0000000 --- a/src/adsb/requirements.worker.txt +++ /dev/null @@ -1,5 +0,0 @@ -polars>=1.0 -pyarrow>=14.0 -orjson>=3.9 -boto3>=1.34 -zstandard>=0.22