mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-04-23 19:46:09 +02:00
compress by day
This commit is contained in:
@@ -5,23 +5,6 @@ import polars as pl
|
||||
COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']
|
||||
|
||||
|
||||
def deduplicate_by_signature(df: pl.DataFrame) -> pl.DataFrame:
|
||||
"""For each icao, keep only the earliest row with each unique signature.
|
||||
|
||||
This is used for deduplicating across multiple compressed chunks.
|
||||
"""
|
||||
# Create signature column
|
||||
df = df.with_columns(
|
||||
pl.concat_str([pl.col(c).cast(pl.Utf8).fill_null("") for c in COLUMNS], separator="|").alias("_signature")
|
||||
)
|
||||
# Group by icao and signature, take first row (earliest due to time sort)
|
||||
df = df.sort("time")
|
||||
df_deduped = df.group_by(["icao", "_signature"]).first()
|
||||
df_deduped = df_deduped.drop("_signature")
|
||||
df_deduped = df_deduped.sort("time")
|
||||
return df_deduped
|
||||
|
||||
|
||||
def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame:
|
||||
"""Compress a single ICAO group to its most informative row using Polars."""
|
||||
# Create signature string
|
||||
@@ -97,7 +80,7 @@ def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame:
|
||||
|
||||
|
||||
def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame:
|
||||
"""Compress a DataFrame with multiple ICAOs to one row per ICAO.
|
||||
"""Compress a DataFrame with multiple ICAOs to one row per ICAO per UTC day.
|
||||
|
||||
This is the main entry point for compressing ADS-B data.
|
||||
Used by both daily GitHub Actions runs and historical AWS runs.
|
||||
@@ -107,35 +90,38 @@ def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFra
|
||||
verbose: Whether to print progress
|
||||
|
||||
Returns:
|
||||
Compressed DataFrame with one row per ICAO
|
||||
Compressed DataFrame with one row per ICAO per UTC day
|
||||
"""
|
||||
if df.height == 0:
|
||||
return df
|
||||
|
||||
# Sort by icao and time
|
||||
df = df.sort(['icao', 'time'])
|
||||
# Extract UTC date from time column
|
||||
df = df.with_columns(pl.col('time').dt.date().alias('_date'))
|
||||
|
||||
# Sort by date, icao, and time
|
||||
df = df.sort(['_date', 'icao', 'time'])
|
||||
|
||||
# Fill null values with empty strings for COLUMNS
|
||||
for col in COLUMNS:
|
||||
if col in df.columns:
|
||||
df = df.with_columns(pl.col(col).cast(pl.Utf8).fill_null(""))
|
||||
|
||||
# First pass: quick deduplication of exact duplicates
|
||||
df = df.unique(subset=['icao'] + COLUMNS, keep='first')
|
||||
# First pass: quick deduplication of exact duplicates per day
|
||||
df = df.unique(subset=['_date', 'icao'] + COLUMNS, keep='first')
|
||||
if verbose:
|
||||
print(f"After quick dedup: {df.height} records")
|
||||
|
||||
# Second pass: sophisticated compression per ICAO
|
||||
# Second pass: sophisticated compression per date and ICAO
|
||||
if verbose:
|
||||
print("Compressing per ICAO...")
|
||||
|
||||
# Process each ICAO group
|
||||
icao_groups = df.partition_by('icao', as_dict=True, maintain_order=True)
|
||||
# Process each date+ICAO group
|
||||
date_icao_groups = df.partition_by(['_date', 'icao'], as_dict=True, maintain_order=True)
|
||||
compressed_dfs = []
|
||||
|
||||
for icao_key, group_df in icao_groups.items():
|
||||
# partition_by with as_dict=True returns tuple keys, extract first element
|
||||
icao = icao_key[0] if isinstance(icao_key, tuple) else icao_key
|
||||
for group_key, group_df in date_icao_groups.items():
|
||||
# partition_by with as_dict=True returns tuple keys: (date, icao)
|
||||
date_val, icao = group_key
|
||||
compressed = compress_df_polars(group_df, str(icao))
|
||||
compressed_dfs.append(compressed)
|
||||
|
||||
@@ -147,6 +133,9 @@ def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFra
|
||||
if verbose:
|
||||
print(f"After compress: {df_compressed.height} records")
|
||||
|
||||
# Drop the temporary _date column
|
||||
df_compressed = df_compressed.drop('_date')
|
||||
|
||||
# Reorder columns: time first, then icao
|
||||
cols = df_compressed.columns
|
||||
ordered_cols = ['time', 'icao'] + [c for c in cols if c not in ['time', 'icao']]
|
||||
@@ -236,8 +225,9 @@ def concat_compressed_dfs(df_base, df_new):
|
||||
icao_groups = df_combined.partition_by('icao', as_dict=True, maintain_order=True)
|
||||
compressed_dfs = []
|
||||
|
||||
for icao, group_df in icao_groups.items():
|
||||
compressed = compress_df_polars(group_df, icao)
|
||||
for icao_key, group_df in icao_groups.items():
|
||||
icao = icao_key[0]
|
||||
compressed = compress_df_polars(group_df, str(icao))
|
||||
compressed_dfs.append(compressed)
|
||||
|
||||
if compressed_dfs:
|
||||
|
||||
Reference in New Issue
Block a user