mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-06-08 06:03:55 +02:00
works
This commit is contained in:
@@ -80,63 +80,51 @@ def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame:
|
|||||||
|
|
||||||
|
|
||||||
def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame:
|
def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame:
|
||||||
"""Compress a DataFrame with multiple ICAOs to one row per ICAO per UTC day.
|
"""Compress a DataFrame with multiple ICAOs to one row per ICAO.
|
||||||
|
|
||||||
This is the main entry point for compressing ADS-B data.
|
|
||||||
Used by both daily GitHub Actions runs and historical AWS runs.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
df: DataFrame with columns ['time', 'icao'] + COLUMNS
|
df: DataFrame with columns ['time', 'icao'] + COLUMNS
|
||||||
verbose: Whether to print progress
|
verbose: Whether to print progress
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Compressed DataFrame with one row per ICAO per UTC day
|
Compressed DataFrame with one row per ICAO
|
||||||
"""
|
"""
|
||||||
if df.height == 0:
|
if df.height == 0:
|
||||||
return df
|
return df
|
||||||
|
|
||||||
# Extract UTC date from time column
|
# Sort by icao and time
|
||||||
df = df.with_columns(pl.col('time').dt.date().alias('_date'))
|
df = df.sort(['icao', 'time'])
|
||||||
|
|
||||||
# Sort by date, icao, and time
|
|
||||||
df = df.sort(['_date', 'icao', 'time'])
|
|
||||||
|
|
||||||
# Fill null values with empty strings for COLUMNS
|
# Fill null values with empty strings for COLUMNS
|
||||||
for col in COLUMNS:
|
for col in COLUMNS:
|
||||||
if col in df.columns:
|
if col in df.columns:
|
||||||
df = df.with_columns(pl.col(col).cast(pl.Utf8).fill_null(""))
|
df = df.with_columns(pl.col(col).cast(pl.Utf8).fill_null(""))
|
||||||
|
|
||||||
# First pass: quick deduplication of exact duplicates per day
|
# Quick deduplication of exact duplicates
|
||||||
df = df.unique(subset=['_date', 'icao'] + COLUMNS, keep='first')
|
df = df.unique(subset=['icao'] + COLUMNS, keep='first')
|
||||||
if verbose:
|
if verbose:
|
||||||
print(f"After quick dedup: {df.height} records")
|
print(f"After quick dedup: {df.height} records")
|
||||||
|
|
||||||
# Second pass: sophisticated compression per date and ICAO
|
# Compress per ICAO
|
||||||
if verbose:
|
if verbose:
|
||||||
print("Compressing per ICAO...")
|
print("Compressing per ICAO...")
|
||||||
|
|
||||||
# Process each date+ICAO group
|
icao_groups = df.partition_by('icao', as_dict=True, maintain_order=True)
|
||||||
date_icao_groups = df.partition_by(['_date', 'icao'], as_dict=True, maintain_order=True)
|
|
||||||
compressed_dfs = []
|
compressed_dfs = []
|
||||||
|
|
||||||
for group_key, group_df in date_icao_groups.items():
|
for icao_key, group_df in icao_groups.items():
|
||||||
# partition_by with as_dict=True returns tuple keys: (date, icao)
|
icao = icao_key[0]
|
||||||
date_val, icao = group_key
|
|
||||||
compressed = compress_df_polars(group_df, str(icao))
|
compressed = compress_df_polars(group_df, str(icao))
|
||||||
# Time is preserved from compress_df_polars (earliest time for this ICAO on this day)
|
|
||||||
compressed_dfs.append(compressed)
|
compressed_dfs.append(compressed)
|
||||||
|
|
||||||
if compressed_dfs:
|
if compressed_dfs:
|
||||||
df_compressed = pl.concat(compressed_dfs)
|
df_compressed = pl.concat(compressed_dfs)
|
||||||
else:
|
else:
|
||||||
df_compressed = df.head(0) # Empty with same schema
|
df_compressed = df.head(0)
|
||||||
|
|
||||||
if verbose:
|
if verbose:
|
||||||
print(f"After compress: {df_compressed.height} records")
|
print(f"After compress: {df_compressed.height} records")
|
||||||
|
|
||||||
# Drop the temporary _date column
|
|
||||||
df_compressed = df_compressed.drop('_date')
|
|
||||||
|
|
||||||
# Reorder columns: time first, then icao
|
# Reorder columns: time first, then icao
|
||||||
cols = df_compressed.columns
|
cols = df_compressed.columns
|
||||||
ordered_cols = ['time', 'icao'] + [c for c in cols if c not in ['time', 'icao']]
|
ordered_cols = ['time', 'icao'] + [c for c in cols if c not in ['time', 'icao']]
|
||||||
@@ -145,45 +133,22 @@ def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFra
|
|||||||
return df_compressed
|
return df_compressed
|
||||||
|
|
||||||
|
|
||||||
def load_raw_adsb_for_day(day):
|
def load_parquet_part(part_id: int, date: str) -> pl.DataFrame:
|
||||||
"""Load raw ADS-B data for a day from parquet file."""
|
"""Load a single parquet part file for a date.
|
||||||
from datetime import timedelta
|
|
||||||
|
Args:
|
||||||
|
part_id: Part ID (e.g., 1, 2, 3)
|
||||||
|
date: Date string in YYYY-MM-DD format
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
DataFrame with ADS-B data
|
||||||
|
"""
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)
|
parquet_file = Path(f"data/output/parquet_output/part_{part_id}_{date}.parquet")
|
||||||
|
|
||||||
# Check for parquet file first
|
|
||||||
version_date = f"v{start_time.strftime('%Y.%m.%d')}"
|
|
||||||
parquet_file = Path(f"data/output/parquet_output/{version_date}.parquet")
|
|
||||||
|
|
||||||
if not parquet_file.exists():
|
if not parquet_file.exists():
|
||||||
# Try to generate parquet file by calling the download function
|
print(f"Parquet file not found: {parquet_file}")
|
||||||
print(f" Parquet file not found: {parquet_file}")
|
|
||||||
print(f" Attempting to download and generate parquet for {start_time.strftime('%Y-%m-%d')}...")
|
|
||||||
|
|
||||||
from download_adsb_data_to_parquet import create_parquet_for_day
|
|
||||||
result_path = create_parquet_for_day(start_time, keep_folders=False)
|
|
||||||
|
|
||||||
if result_path:
|
|
||||||
print(f" Successfully generated parquet file: {result_path}")
|
|
||||||
else:
|
|
||||||
raise Exception("Failed to generate parquet file")
|
|
||||||
|
|
||||||
if parquet_file.exists():
|
|
||||||
print(f" Loading from parquet: {parquet_file}")
|
|
||||||
df = pl.read_parquet(
|
|
||||||
parquet_file,
|
|
||||||
columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category']
|
|
||||||
)
|
|
||||||
|
|
||||||
# Convert to timezone-naive datetime
|
|
||||||
if df["time"].dtype == pl.Datetime:
|
|
||||||
df = df.with_columns(pl.col("time").dt.replace_time_zone(None))
|
|
||||||
|
|
||||||
return df
|
|
||||||
else:
|
|
||||||
# Return empty DataFrame if parquet file doesn't exist
|
|
||||||
print(f" No data available for {start_time.strftime('%Y-%m-%d')}")
|
|
||||||
return pl.DataFrame(schema={
|
return pl.DataFrame(schema={
|
||||||
'time': pl.Datetime,
|
'time': pl.Datetime,
|
||||||
'icao': pl.Utf8,
|
'icao': pl.Utf8,
|
||||||
@@ -196,16 +161,28 @@ def load_raw_adsb_for_day(day):
|
|||||||
'aircraft_category': pl.Utf8
|
'aircraft_category': pl.Utf8
|
||||||
})
|
})
|
||||||
|
|
||||||
|
print(f"Loading from parquet: {parquet_file}")
|
||||||
|
df = pl.read_parquet(
|
||||||
|
parquet_file,
|
||||||
|
columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category']
|
||||||
|
)
|
||||||
|
|
||||||
|
# Convert to timezone-naive datetime
|
||||||
|
if df["time"].dtype == pl.Datetime:
|
||||||
|
df = df.with_columns(pl.col("time").dt.replace_time_zone(None))
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
def compress_parquet_part(part_id: int, date: str) -> pl.DataFrame:
|
||||||
|
"""Load and compress a single parquet part file."""
|
||||||
|
df = load_parquet_part(part_id, date)
|
||||||
|
|
||||||
def load_historical_for_day(day):
|
|
||||||
"""Load and compress historical ADS-B data for a day."""
|
|
||||||
df = load_raw_adsb_for_day(day)
|
|
||||||
if df.height == 0:
|
if df.height == 0:
|
||||||
return df
|
return df
|
||||||
|
|
||||||
print(f"Loaded {df.height} raw records for {day.strftime('%Y-%m-%d')}")
|
print(f"Loaded {df.height} raw records for part {part_id}, date {date}")
|
||||||
|
|
||||||
# Use shared compression function
|
|
||||||
return compress_multi_icao_df(df, verbose=True)
|
return compress_multi_icao_df(df, verbose=True)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import pyarrow.parquet as pq
|
|||||||
|
|
||||||
from src.adsb.download_adsb_data_to_parquet import (
|
from src.adsb.download_adsb_data_to_parquet import (
|
||||||
OUTPUT_DIR,
|
OUTPUT_DIR,
|
||||||
|
PARQUET_DIR,
|
||||||
PARQUET_SCHEMA,
|
PARQUET_SCHEMA,
|
||||||
COLUMNS,
|
COLUMNS,
|
||||||
MAX_WORKERS,
|
MAX_WORKERS,
|
||||||
@@ -76,7 +77,7 @@ def process_chunk(
|
|||||||
) -> str | None:
|
) -> str | None:
|
||||||
"""Process trace files and write to a single parquet file."""
|
"""Process trace files and write to a single parquet file."""
|
||||||
|
|
||||||
output_path = os.path.join(CHUNK_OUTPUT_DIR, f"part_{part_id}_{date_str}.parquet")
|
output_path = os.path.join(PARQUET_DIR, f"part_{part_id}_{date_str}.parquet")
|
||||||
|
|
||||||
start_time = time.perf_counter()
|
start_time = time.perf_counter()
|
||||||
total_rows = 0
|
total_rows = 0
|
||||||
@@ -138,6 +139,10 @@ def main():
|
|||||||
# Process and write output
|
# Process and write output
|
||||||
output_path = process_chunk(all_trace_files, args.part_id, args.date)
|
output_path = process_chunk(all_trace_files, args.part_id, args.date)
|
||||||
|
|
||||||
|
from src.adsb.compress_adsb_to_aircraft_data import compress_parquet_part
|
||||||
|
df_compressed = compress_parquet_part(args.part_id, args.date)
|
||||||
|
print(df_compressed)
|
||||||
|
# compress adsb parquet to aircraft
|
||||||
print(f"Output: {output_path}" if output_path else "No output generated")
|
print(f"Output: {output_path}" if output_path else "No output generated")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user