diff --git a/src/adsb/compress_adsb_to_aircraft_data.py b/src/adsb/compress_adsb_to_aircraft_data.py index 89585e4..4fde19b 100644 --- a/src/adsb/compress_adsb_to_aircraft_data.py +++ b/src/adsb/compress_adsb_to_aircraft_data.py @@ -80,63 +80,51 @@ def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame: def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame: - """Compress a DataFrame with multiple ICAOs to one row per ICAO per UTC day. - - This is the main entry point for compressing ADS-B data. - Used by both daily GitHub Actions runs and historical AWS runs. + """Compress a DataFrame with multiple ICAOs to one row per ICAO. Args: df: DataFrame with columns ['time', 'icao'] + COLUMNS verbose: Whether to print progress Returns: - Compressed DataFrame with one row per ICAO per UTC day + Compressed DataFrame with one row per ICAO """ if df.height == 0: return df - # Extract UTC date from time column - df = df.with_columns(pl.col('time').dt.date().alias('_date')) - - # Sort by date, icao, and time - df = df.sort(['_date', 'icao', 'time']) + # Sort by icao and time + df = df.sort(['icao', 'time']) # Fill null values with empty strings for COLUMNS for col in COLUMNS: if col in df.columns: df = df.with_columns(pl.col(col).cast(pl.Utf8).fill_null("")) - # First pass: quick deduplication of exact duplicates per day - df = df.unique(subset=['_date', 'icao'] + COLUMNS, keep='first') + # Quick deduplication of exact duplicates + df = df.unique(subset=['icao'] + COLUMNS, keep='first') if verbose: print(f"After quick dedup: {df.height} records") - # Second pass: sophisticated compression per date and ICAO + # Compress per ICAO if verbose: print("Compressing per ICAO...") - # Process each date+ICAO group - date_icao_groups = df.partition_by(['_date', 'icao'], as_dict=True, maintain_order=True) + icao_groups = df.partition_by('icao', as_dict=True, maintain_order=True) compressed_dfs = [] - for group_key, group_df in date_icao_groups.items(): - # partition_by with as_dict=True returns tuple keys: (date, icao) - date_val, icao = group_key + for icao_key, group_df in icao_groups.items(): + icao = icao_key[0] compressed = compress_df_polars(group_df, str(icao)) - # Time is preserved from compress_df_polars (earliest time for this ICAO on this day) compressed_dfs.append(compressed) if compressed_dfs: df_compressed = pl.concat(compressed_dfs) else: - df_compressed = df.head(0) # Empty with same schema + df_compressed = df.head(0) if verbose: print(f"After compress: {df_compressed.height} records") - # Drop the temporary _date column - df_compressed = df_compressed.drop('_date') - # Reorder columns: time first, then icao cols = df_compressed.columns ordered_cols = ['time', 'icao'] + [c for c in cols if c not in ['time', 'icao']] @@ -145,45 +133,22 @@ def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFra return df_compressed -def load_raw_adsb_for_day(day): - """Load raw ADS-B data for a day from parquet file.""" - from datetime import timedelta +def load_parquet_part(part_id: int, date: str) -> pl.DataFrame: + """Load a single parquet part file for a date. + + Args: + part_id: Part ID (e.g., 1, 2, 3) + date: Date string in YYYY-MM-DD format + + Returns: + DataFrame with ADS-B data + """ from pathlib import Path - start_time = day.replace(hour=0, minute=0, second=0, microsecond=0) - - # Check for parquet file first - version_date = f"v{start_time.strftime('%Y.%m.%d')}" - parquet_file = Path(f"data/output/parquet_output/{version_date}.parquet") + parquet_file = Path(f"data/output/parquet_output/part_{part_id}_{date}.parquet") if not parquet_file.exists(): - # Try to generate parquet file by calling the download function - print(f" Parquet file not found: {parquet_file}") - print(f" Attempting to download and generate parquet for {start_time.strftime('%Y-%m-%d')}...") - - from download_adsb_data_to_parquet import create_parquet_for_day - result_path = create_parquet_for_day(start_time, keep_folders=False) - - if result_path: - print(f" Successfully generated parquet file: {result_path}") - else: - raise Exception("Failed to generate parquet file") - - if parquet_file.exists(): - print(f" Loading from parquet: {parquet_file}") - df = pl.read_parquet( - parquet_file, - columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category'] - ) - - # Convert to timezone-naive datetime - if df["time"].dtype == pl.Datetime: - df = df.with_columns(pl.col("time").dt.replace_time_zone(None)) - - return df - else: - # Return empty DataFrame if parquet file doesn't exist - print(f" No data available for {start_time.strftime('%Y-%m-%d')}") + print(f"Parquet file not found: {parquet_file}") return pl.DataFrame(schema={ 'time': pl.Datetime, 'icao': pl.Utf8, @@ -195,17 +160,29 @@ def load_raw_adsb_for_day(day): 'desc': pl.Utf8, 'aircraft_category': pl.Utf8 }) + + print(f"Loading from parquet: {parquet_file}") + df = pl.read_parquet( + parquet_file, + columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category'] + ) + + # Convert to timezone-naive datetime + if df["time"].dtype == pl.Datetime: + df = df.with_columns(pl.col("time").dt.replace_time_zone(None)) + + return df -def load_historical_for_day(day): - """Load and compress historical ADS-B data for a day.""" - df = load_raw_adsb_for_day(day) +def compress_parquet_part(part_id: int, date: str) -> pl.DataFrame: + """Load and compress a single parquet part file.""" + df = load_parquet_part(part_id, date) + if df.height == 0: return df - print(f"Loaded {df.height} raw records for {day.strftime('%Y-%m-%d')}") + print(f"Loaded {df.height} raw records for part {part_id}, date {date}") - # Use shared compression function return compress_multi_icao_df(df, verbose=True) diff --git a/src/adsb/process_icao_chunk.py b/src/adsb/process_icao_chunk.py index eb12558..471b957 100644 --- a/src/adsb/process_icao_chunk.py +++ b/src/adsb/process_icao_chunk.py @@ -21,6 +21,7 @@ import pyarrow.parquet as pq from src.adsb.download_adsb_data_to_parquet import ( OUTPUT_DIR, + PARQUET_DIR, PARQUET_SCHEMA, COLUMNS, MAX_WORKERS, @@ -76,7 +77,7 @@ def process_chunk( ) -> str | None: """Process trace files and write to a single parquet file.""" - output_path = os.path.join(CHUNK_OUTPUT_DIR, f"part_{part_id}_{date_str}.parquet") + output_path = os.path.join(PARQUET_DIR, f"part_{part_id}_{date_str}.parquet") start_time = time.perf_counter() total_rows = 0 @@ -138,6 +139,10 @@ def main(): # Process and write output output_path = process_chunk(all_trace_files, args.part_id, args.date) + from src.adsb.compress_adsb_to_aircraft_data import compress_parquet_part + df_compressed = compress_parquet_part(args.part_id, args.date) + print(df_compressed) + # compress adsb parquet to aircraft print(f"Output: {output_path}" if output_path else "No output generated")