From be33fd2eafe7959f91944df706a56137c220d46c Mon Sep 17 00:00:00 2001 From: ggman12 Date: Sun, 15 Feb 2026 19:59:50 -0500 Subject: [PATCH] compress by day --- src/adsb/compress_adsb_to_aircraft_data.py | 52 +++++++++------------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/src/adsb/compress_adsb_to_aircraft_data.py b/src/adsb/compress_adsb_to_aircraft_data.py index 42cc0f4..6252925 100644 --- a/src/adsb/compress_adsb_to_aircraft_data.py +++ b/src/adsb/compress_adsb_to_aircraft_data.py @@ -5,23 +5,6 @@ import polars as pl COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't'] -def deduplicate_by_signature(df: pl.DataFrame) -> pl.DataFrame: - """For each icao, keep only the earliest row with each unique signature. - - This is used for deduplicating across multiple compressed chunks. - """ - # Create signature column - df = df.with_columns( - pl.concat_str([pl.col(c).cast(pl.Utf8).fill_null("") for c in COLUMNS], separator="|").alias("_signature") - ) - # Group by icao and signature, take first row (earliest due to time sort) - df = df.sort("time") - df_deduped = df.group_by(["icao", "_signature"]).first() - df_deduped = df_deduped.drop("_signature") - df_deduped = df_deduped.sort("time") - return df_deduped - - def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame: """Compress a single ICAO group to its most informative row using Polars.""" # Create signature string @@ -97,7 +80,7 @@ def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame: def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame: - """Compress a DataFrame with multiple ICAOs to one row per ICAO. + """Compress a DataFrame with multiple ICAOs to one row per ICAO per UTC day. This is the main entry point for compressing ADS-B data. Used by both daily GitHub Actions runs and historical AWS runs. @@ -107,35 +90,38 @@ def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFra verbose: Whether to print progress Returns: - Compressed DataFrame with one row per ICAO + Compressed DataFrame with one row per ICAO per UTC day """ if df.height == 0: return df - # Sort by icao and time - df = df.sort(['icao', 'time']) + # Extract UTC date from time column + df = df.with_columns(pl.col('time').dt.date().alias('_date')) + + # Sort by date, icao, and time + df = df.sort(['_date', 'icao', 'time']) # Fill null values with empty strings for COLUMNS for col in COLUMNS: if col in df.columns: df = df.with_columns(pl.col(col).cast(pl.Utf8).fill_null("")) - # First pass: quick deduplication of exact duplicates - df = df.unique(subset=['icao'] + COLUMNS, keep='first') + # First pass: quick deduplication of exact duplicates per day + df = df.unique(subset=['_date', 'icao'] + COLUMNS, keep='first') if verbose: print(f"After quick dedup: {df.height} records") - # Second pass: sophisticated compression per ICAO + # Second pass: sophisticated compression per date and ICAO if verbose: print("Compressing per ICAO...") - # Process each ICAO group - icao_groups = df.partition_by('icao', as_dict=True, maintain_order=True) + # Process each date+ICAO group + date_icao_groups = df.partition_by(['_date', 'icao'], as_dict=True, maintain_order=True) compressed_dfs = [] - for icao_key, group_df in icao_groups.items(): - # partition_by with as_dict=True returns tuple keys, extract first element - icao = icao_key[0] if isinstance(icao_key, tuple) else icao_key + for group_key, group_df in date_icao_groups.items(): + # partition_by with as_dict=True returns tuple keys: (date, icao) + date_val, icao = group_key compressed = compress_df_polars(group_df, str(icao)) compressed_dfs.append(compressed) @@ -147,6 +133,9 @@ def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFra if verbose: print(f"After compress: {df_compressed.height} records") + # Drop the temporary _date column + df_compressed = df_compressed.drop('_date') + # Reorder columns: time first, then icao cols = df_compressed.columns ordered_cols = ['time', 'icao'] + [c for c in cols if c not in ['time', 'icao']] @@ -236,8 +225,9 @@ def concat_compressed_dfs(df_base, df_new): icao_groups = df_combined.partition_by('icao', as_dict=True, maintain_order=True) compressed_dfs = [] - for icao, group_df in icao_groups.items(): - compressed = compress_df_polars(group_df, icao) + for icao_key, group_df in icao_groups.items(): + icao = icao_key[0] + compressed = compress_df_polars(group_df, str(icao)) compressed_dfs.append(compressed) if compressed_dfs: