mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-04-23 11:36:35 +02:00
get rid of unused code
This commit is contained in:
@@ -1,265 +0,0 @@
|
||||
"""
|
||||
Combines chunk parquet files and compresses to final aircraft CSV.
|
||||
This is the reduce phase of the map-reduce pipeline.
|
||||
|
||||
Supports both single-day (daily) and multi-day (historical) modes.
|
||||
|
||||
Memory-efficient: processes each chunk separately, compresses, then combines.
|
||||
|
||||
Usage:
|
||||
# Daily mode
|
||||
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks
|
||||
|
||||
# Historical mode
|
||||
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base
|
||||
"""
|
||||
import gc
|
||||
import gzip
|
||||
import os
|
||||
import sys
|
||||
import glob
|
||||
import argparse
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import polars as pl
|
||||
|
||||
from src.adsb.download_adsb_data_to_parquet import OUTPUT_DIR, get_resource_usage
|
||||
from src.adsb.compress_adsb_to_aircraft_data import compress_multi_icao_df, COLUMNS
|
||||
|
||||
|
||||
DEFAULT_CHUNK_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks")
|
||||
FINAL_OUTPUT_DIR = "./data/openairframes"
|
||||
os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
|
||||
|
||||
|
||||
def get_target_day() -> datetime:
|
||||
"""Get yesterday's date (the day we're processing)."""
|
||||
return datetime.now(timezone.utc) - timedelta(days=1)
|
||||
|
||||
|
||||
def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame:
|
||||
"""Load and compress a single chunk parquet file.
|
||||
|
||||
Args:
|
||||
chunk_path: Path to parquet file
|
||||
delete_after_load: If True, delete the parquet file after loading to free disk space
|
||||
"""
|
||||
print(f"Processing {os.path.basename(chunk_path)}... | {get_resource_usage()}")
|
||||
|
||||
# Load chunk - only columns we need
|
||||
needed_columns = ['time', 'icao'] + COLUMNS
|
||||
df = pl.read_parquet(chunk_path, columns=needed_columns)
|
||||
print(f" Loaded {len(df)} rows")
|
||||
|
||||
# Delete file immediately after loading to free disk space
|
||||
if delete_after_load:
|
||||
try:
|
||||
os.remove(chunk_path)
|
||||
print(f" Deleted {chunk_path} to free disk space")
|
||||
except Exception as e:
|
||||
print(f" Warning: Failed to delete {chunk_path}: {e}")
|
||||
|
||||
# Compress to aircraft records (one per ICAO) using shared function
|
||||
compressed = compress_multi_icao_df(df, verbose=True)
|
||||
print(f" Compressed to {len(compressed)} aircraft records")
|
||||
|
||||
del df
|
||||
gc.collect()
|
||||
|
||||
return compressed
|
||||
|
||||
|
||||
def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFrame:
|
||||
"""Combine multiple compressed DataFrames.
|
||||
|
||||
Since chunks are partitioned by ICAO hash, each ICAO only appears in one chunk.
|
||||
No deduplication needed here - just concatenate.
|
||||
"""
|
||||
print(f"Combining {len(compressed_dfs)} compressed chunks... | {get_resource_usage()}")
|
||||
|
||||
# Concat all
|
||||
combined = pl.concat(compressed_dfs)
|
||||
print(f"Combined: {len(combined)} records")
|
||||
|
||||
return combined
|
||||
|
||||
|
||||
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.DataFrame, str | None]:
|
||||
"""Download base release and merge with new data.
|
||||
|
||||
Returns:
|
||||
Tuple of (merged_df, earliest_date_str) where earliest_date_str is None if no base release was merged
|
||||
"""
|
||||
from src.get_latest_release import download_latest_aircraft_adsb_csv
|
||||
|
||||
print("Downloading base ADS-B release...")
|
||||
base_path = download_latest_aircraft_adsb_csv(
|
||||
output_dir="./data/openairframes_base"
|
||||
)
|
||||
print(f"Download returned: {base_path}")
|
||||
|
||||
print(f"Loading base release from {base_path}")
|
||||
|
||||
# Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz)
|
||||
import re
|
||||
filename = os.path.basename(str(base_path))
|
||||
match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename)
|
||||
earliest_date = match.group(1) if match else None
|
||||
print(f"Start date from base filename: {earliest_date}")
|
||||
|
||||
# Read CSV with schema matching the new data
|
||||
base_df = pl.read_csv(base_path, schema=compressed_df.schema)
|
||||
print(f"Base release has {len(base_df)} records")
|
||||
|
||||
# Ensure columns match
|
||||
base_cols = set(base_df.columns)
|
||||
new_cols = set(compressed_df.columns)
|
||||
print(f"Base columns: {sorted(base_cols)}")
|
||||
print(f"New columns: {sorted(new_cols)}")
|
||||
|
||||
# Add missing columns
|
||||
for col in new_cols - base_cols:
|
||||
base_df = base_df.with_columns(pl.lit(None).alias(col))
|
||||
for col in base_cols - new_cols:
|
||||
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
|
||||
|
||||
# Reorder columns to match
|
||||
compressed_df = compressed_df.select(base_df.columns)
|
||||
|
||||
# Concat base (old days) with new data (new days)
|
||||
# No deduplication needed since they represent different UTC days
|
||||
combined = pl.concat([base_df, compressed_df])
|
||||
print(f"After concat: {len(combined)} records")
|
||||
|
||||
# Sort by time for consistent output
|
||||
combined = combined.sort('time')
|
||||
|
||||
return combined, earliest_date
|
||||
|
||||
def cleanup_chunks(output_id: str, chunks_dir: str):
|
||||
"""Delete chunk parquet files after successful merge."""
|
||||
pattern = os.path.join(chunks_dir, f"chunk_*_{output_id}.parquet")
|
||||
chunk_files = glob.glob(pattern)
|
||||
for f in chunk_files:
|
||||
try:
|
||||
os.remove(f)
|
||||
print(f"Deleted {f}")
|
||||
except Exception as e:
|
||||
print(f"Failed to delete {f}: {e}")
|
||||
|
||||
|
||||
def find_chunk_files(chunks_dir: str, output_id: str) -> list[str]:
|
||||
"""Find chunk parquet files matching the output ID."""
|
||||
pattern = os.path.join(chunks_dir, f"chunk_*_{output_id}.parquet")
|
||||
chunk_files = sorted(glob.glob(pattern))
|
||||
|
||||
if not chunk_files:
|
||||
# Try recursive search for historical mode with merged artifacts
|
||||
pattern = os.path.join(chunks_dir, "**", "*.parquet")
|
||||
chunk_files = sorted(glob.glob(pattern, recursive=True))
|
||||
|
||||
return chunk_files
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Combine chunk parquets to final CSV")
|
||||
parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)")
|
||||
parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)")
|
||||
parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)")
|
||||
parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files")
|
||||
parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release")
|
||||
parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging")
|
||||
parser.add_argument("--stream", action="store_true", help="Delete parquet files immediately after loading to save disk space")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Determine output ID and filename based on mode
|
||||
if args.start_date and args.end_date:
|
||||
# Historical mode
|
||||
output_id = f"{args.start_date}_{args.end_date}"
|
||||
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv.gz"
|
||||
print(f"Combining chunks for date range: {args.start_date} to {args.end_date}")
|
||||
else:
|
||||
# Daily mode - use same date for start and end
|
||||
if args.date:
|
||||
target_day = datetime.strptime(args.date, "%Y-%m-%d")
|
||||
else:
|
||||
target_day = get_target_day()
|
||||
|
||||
date_str = target_day.strftime("%Y-%m-%d")
|
||||
output_id = date_str
|
||||
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv.gz"
|
||||
print(f"Combining chunks for {date_str}")
|
||||
|
||||
chunks_dir = args.chunks_dir
|
||||
print(f"Chunks directory: {chunks_dir}")
|
||||
print(f"Resource usage at start: {get_resource_usage()}")
|
||||
|
||||
# Find chunk files
|
||||
chunk_files = find_chunk_files(chunks_dir, output_id)
|
||||
|
||||
if not chunk_files:
|
||||
print(f"No chunk files found in: {chunks_dir}")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Found {len(chunk_files)} chunk files")
|
||||
|
||||
# Process each chunk separately to save memory
|
||||
# With --stream, delete parquet files immediately after loading to save disk space
|
||||
compressed_chunks = []
|
||||
for chunk_path in chunk_files:
|
||||
compressed = process_single_chunk(chunk_path, delete_after_load=args.stream)
|
||||
compressed_chunks.append(compressed)
|
||||
gc.collect()
|
||||
|
||||
# Combine all compressed chunks
|
||||
combined = combine_compressed_chunks(compressed_chunks)
|
||||
|
||||
# Free memory from individual chunks
|
||||
del compressed_chunks
|
||||
gc.collect()
|
||||
print(f"After combining: {get_resource_usage()}")
|
||||
|
||||
# Merge with base release (unless skipped)
|
||||
base_start_date = None
|
||||
if not args.skip_base:
|
||||
combined, base_start_date = download_and_merge_base_release(combined)
|
||||
|
||||
# Update filename if we merged with base release and got a start date
|
||||
if base_start_date and not (args.start_date and args.end_date):
|
||||
# Only update filename for daily mode when base was merged
|
||||
output_filename = f"openairframes_adsb_{base_start_date}_{date_str}.csv.gz"
|
||||
print(f"Updated filename to reflect date range: {output_filename}")
|
||||
|
||||
# Convert list columns to strings for CSV compatibility
|
||||
for col in combined.columns:
|
||||
if combined[col].dtype == pl.List:
|
||||
combined = combined.with_columns(
|
||||
pl.col(col).list.join(",").alias(col)
|
||||
)
|
||||
|
||||
# Sort by time for consistent output
|
||||
if 'time' in combined.columns:
|
||||
combined = combined.sort('time')
|
||||
|
||||
# Replace empty strings with null across all string columns to avoid quoted empty strings
|
||||
for col in combined.columns:
|
||||
if combined[col].dtype == pl.Utf8:
|
||||
combined = combined.with_columns(
|
||||
pl.when(pl.col(col) == "").then(None).otherwise(pl.col(col)).alias(col)
|
||||
)
|
||||
|
||||
# Write final CSV
|
||||
output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename)
|
||||
with gzip.open(output_path, "wb", compresslevel=9) as f:
|
||||
combined.write_csv(f, null_value='', quote_style='necessary')
|
||||
print(f"Wrote {len(combined)} records to {output_path}")
|
||||
|
||||
# Cleanup
|
||||
if not args.keep_chunks:
|
||||
cleanup_chunks(output_id, chunks_dir)
|
||||
|
||||
print(f"Done! | {get_resource_usage()}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,2 +0,0 @@
|
||||
polars>=1.0
|
||||
boto3>=1.34
|
||||
@@ -1,5 +0,0 @@
|
||||
polars>=1.0
|
||||
pyarrow>=14.0
|
||||
orjson>=3.9
|
||||
boto3>=1.34
|
||||
zstandard>=0.22
|
||||
Reference in New Issue
Block a user