From 3f38263a0cee20d4bfef56a197dc33e01d405f39 Mon Sep 17 00:00:00 2001 From: ggman12 Date: Sun, 15 Feb 2026 17:55:16 -0500 Subject: [PATCH] stop depue that destroys previous days --- src/adsb/combine_chunks_to_csv.py | 92 +++++++++++++------------------ 1 file changed, 37 insertions(+), 55 deletions(-) diff --git a/src/adsb/combine_chunks_to_csv.py b/src/adsb/combine_chunks_to_csv.py index 8a7e693..046becb 100644 --- a/src/adsb/combine_chunks_to_csv.py +++ b/src/adsb/combine_chunks_to_csv.py @@ -93,62 +93,44 @@ def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.Dat from src.get_latest_release import download_latest_aircraft_adsb_csv print("Downloading base ADS-B release...") - try: - base_path = download_latest_aircraft_adsb_csv( - output_dir="./data/openairframes_base" - ) - print(f"Download returned: {base_path}") - - if base_path and os.path.exists(str(base_path)): - print(f"Loading base release from {base_path}") - - # Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz) - import re - filename = os.path.basename(str(base_path)) - match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename) - earliest_date = match.group(1) if match else None - print(f"Start date from base filename: {earliest_date}") - - # Read CSV with schema matching the new data - base_df = pl.read_csv(base_path, schema=compressed_df.schema) - print(f"Base release has {len(base_df)} records") - - # Ensure columns match - base_cols = set(base_df.columns) - new_cols = set(compressed_df.columns) - print(f"Base columns: {sorted(base_cols)}") - print(f"New columns: {sorted(new_cols)}") - - # Add missing columns - for col in new_cols - base_cols: - base_df = base_df.with_columns(pl.lit(None).alias(col)) - for col in base_cols - new_cols: - compressed_df = compressed_df.with_columns(pl.lit(None).alias(col)) - - # Reorder columns to match - compressed_df = compressed_df.select(base_df.columns) - - # Concat and deduplicate by icao (keep new data - it comes last) - combined = pl.concat([base_df, compressed_df]) - print(f"After concat: {len(combined)} records") - - deduplicated = combined.unique(subset=["icao"], keep="last") - - print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup") - - del base_df, combined - gc.collect() - - return deduplicated, earliest_date - else: - print(f"No base release found at {base_path}, using only new data") - return compressed_df, None - except Exception as e: - import traceback - print(f"Failed to download base release: {e}") - traceback.print_exc() - return compressed_df, None + base_path = download_latest_aircraft_adsb_csv( + output_dir="./data/openairframes_base" + ) + print(f"Download returned: {base_path}") + + print(f"Loading base release from {base_path}") + + # Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz) + import re + filename = os.path.basename(str(base_path)) + match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename) + earliest_date = match.group(1) if match else None + print(f"Start date from base filename: {earliest_date}") + + # Read CSV with schema matching the new data + base_df = pl.read_csv(base_path, schema=compressed_df.schema) + print(f"Base release has {len(base_df)} records") + + # Ensure columns match + base_cols = set(base_df.columns) + new_cols = set(compressed_df.columns) + print(f"Base columns: {sorted(base_cols)}") + print(f"New columns: {sorted(new_cols)}") + + # Add missing columns + for col in new_cols - base_cols: + base_df = base_df.with_columns(pl.lit(None).alias(col)) + for col in base_cols - new_cols: + compressed_df = compressed_df.with_columns(pl.lit(None).alias(col)) + + # Reorder columns to match + compressed_df = compressed_df.select(base_df.columns) + + # Concat and deduplicate by icao (keep new data - it comes last) + combined = pl.concat([base_df, compressed_df]) + print(f"After concat: {len(combined)} records") + return combined, earliest_date def cleanup_chunks(output_id: str, chunks_dir: str): """Delete chunk parquet files after successful merge."""