Files
OpenAirframes/src/adsb/process_icao_chunk.py
T
2026-02-19 17:51:49 -05:00

164 lines
5.3 KiB
Python

"""
Processes trace files from a single archive part for a single day.
This is the map phase of the map-reduce pipeline.
Usage:
python -m src.adsb.process_icao_chunk --part-id 1 --date 2026-01-01
"""
import gc
import os
import sys
import argparse
import time
import concurrent.futures
from datetime import datetime, timedelta
import tarfile
import tempfile
import shutil
import pyarrow as pa
import pyarrow.parquet as pq
from src.adsb.download_adsb_data_to_parquet import (
OUTPUT_DIR,
PARQUET_DIR,
PARQUET_SCHEMA,
COLUMNS,
MAX_WORKERS,
process_file,
get_resource_usage,
collect_trace_files_with_find,
)
# Smaller batch size for memory efficiency
BATCH_SIZE = 100_000
def build_trace_file_map(archive_path: str) -> dict[str, str]:
"""Build a map of ICAO -> trace file path by extracting tar.gz archive."""
print(f"Extracting {archive_path}...")
temp_dir = tempfile.mkdtemp(prefix="adsb_extract_")
with tarfile.open(archive_path, 'r:gz') as tar:
tar.extractall(path=temp_dir, filter='data')
trace_map = collect_trace_files_with_find(temp_dir)
print(f"Found {len(trace_map)} trace files")
return trace_map
def safe_process(filepath: str) -> list:
"""Safely process a file, returning empty list on error."""
try:
return process_file(filepath)
except Exception as e:
print(f"Error processing {filepath}: {e}")
return []
def rows_to_table(rows: list) -> pa.Table:
"""Convert list of rows to PyArrow table."""
import pandas as pd
df = pd.DataFrame(rows, columns=COLUMNS)
if not df['time'].dt.tz:
df['time'] = df['time'].dt.tz_localize('UTC')
return pa.Table.from_pandas(df, schema=PARQUET_SCHEMA, preserve_index=False)
def process_chunk(
trace_files: list[str],
part_id: int,
date_str: str,
) -> str | None:
"""Process trace files and write to a single parquet file."""
output_path = os.path.join(PARQUET_DIR, f"part_{part_id}_{date_str}.parquet")
start_time = time.perf_counter()
total_rows = 0
batch_rows = []
writer = None
try:
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
files_per_batch = MAX_WORKERS * 100
for offset in range(0, len(trace_files), files_per_batch):
batch_files = trace_files[offset:offset + files_per_batch]
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
for rows in executor.map(safe_process, batch_files):
if rows:
batch_rows.extend(rows)
if len(batch_rows) >= BATCH_SIZE:
writer.write_table(rows_to_table(batch_rows))
total_rows += len(batch_rows)
batch_rows = []
gc.collect()
gc.collect()
if batch_rows:
writer.write_table(rows_to_table(batch_rows))
total_rows += len(batch_rows)
finally:
if writer:
writer.close()
print(f"Part {part_id}: Done! {total_rows} rows in {time.perf_counter() - start_time:.1f}s | {get_resource_usage()}")
return output_path if total_rows > 0 else None
from pathlib import Path
def main():
parser = argparse.ArgumentParser(description="Process a single archive part for a day")
parser.add_argument("--part-id", type=int, required=True, help="Part ID (1-indexed)")
parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format")
args = parser.parse_args()
print(f"Processing part {args.part_id} for {args.date}")
# Get specific archive file for this part
archive_dir = os.path.join(OUTPUT_DIR, "adsb_archives", args.date)
archive_path = os.path.join(archive_dir, f"{args.date}_part_{args.part_id}.tar.gz")
if not os.path.isfile(archive_path):
print(f"ERROR: Archive not found: {archive_path}")
if os.path.isdir(archive_dir):
print(f"Files in {archive_dir}: {os.listdir(archive_dir)}")
else:
print(f"Directory does not exist: {archive_dir}")
sys.exit(1)
# Extract and collect trace files
trace_map = build_trace_file_map(archive_path)
all_trace_files = list(trace_map.values())
print(f"Total trace files: {len(all_trace_files)}")
# Process and write output
output_path = process_chunk(all_trace_files, args.part_id, args.date)
from src.adsb.compress_adsb_to_aircraft_data import compress_parquet_part
df_compressed = compress_parquet_part(args.part_id, args.date)
# Write parquet
df_compressed_output = OUTPUT_DIR / "compressed" / args.date/ f"part_{args.part_id}_{args.date}.parquet"
os.makedirs(df_compressed_output.parent, exist_ok=True)
df_compressed.write_parquet(df_compressed_output, compression='snappy')
# Write CSV
csv_output = OUTPUT_DIR / "compressed" / args.date / f"part_{args.part_id}_{args.date}.csv"
df_compressed.write_csv(csv_output)
print(f"Raw output: {output_path}" if output_path else "No raw output generated")
print(f"Compressed parquet: {df_compressed_output}")
print(f"Compressed CSV: {csv_output}")
if __name__ == "__main__":
main()