diff --git a/src/adsb/process_icao_chunk.py b/src/adsb/process_icao_chunk.py index f208a85..f86c91b 100644 --- a/src/adsb/process_icao_chunk.py +++ b/src/adsb/process_icao_chunk.py @@ -1,12 +1,11 @@ """ -Processes a chunk of ICAOs from pre-extracted trace files for a single day. +Processes trace files from pre-extracted directory for a single day. This is the map phase of the map-reduce pipeline. Expects extract_dir to already exist with trace files. -Reads ICAO manifest to determine which ICAOs to process based on chunk-id. Usage: - python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 --date 2026-01-01 + python -m src.adsb.process_icao_chunk --chunk-id 0 --date 2026-01-01 """ import gc import os @@ -15,6 +14,9 @@ import argparse import time import concurrent.futures from datetime import datetime, timedelta +import tarfile +import tempfile +import shutil import pyarrow as pa import pyarrow.parquet as pq @@ -37,66 +39,18 @@ os.makedirs(CHUNK_OUTPUT_DIR, exist_ok=True) # Smaller batch size for memory efficiency BATCH_SIZE = 100_000 - -def get_target_day() -> datetime: - """Get yesterday's date (the day we're processing).""" - return datetime.utcnow() - timedelta(days=1) - - -def read_manifest(manifest_id: str) -> list[str]: - """Read ICAO manifest file. +def build_trace_file_map(archive_path: str) -> dict[str, str]: + """Build a map of ICAO -> trace file path by extracting tar.gz archive.""" + print(f"Extracting {archive_path}...") - Args: - manifest_id: Either a date string (YYYY-MM-DD) or range string (YYYY-MM-DD_YYYY-MM-DD) - """ - manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{manifest_id}.txt") - if not os.path.exists(manifest_path): - raise FileNotFoundError(f"Manifest not found: {manifest_path}") + temp_dir = tempfile.mkdtemp(prefix="adsb_extract_") - with open(manifest_path, "r") as f: - icaos = [line.strip() for line in f if line.strip()] - return icaos - - -def deterministic_hash(s: str) -> int: - """Return a deterministic hash for a string (unlike Python's hash() which is randomized).""" - # Use sum of byte values - simple but deterministic - return sum(ord(c) for c in s) - - -def get_chunk_icaos(icaos: list[str], chunk_id: int, total_chunks: int) -> list[str]: - """Get the subset of ICAOs for this chunk based on deterministic hash partitioning.""" - return [icao for icao in icaos if deterministic_hash(icao) % total_chunks == chunk_id] - - -def build_trace_file_map(extract_dir: str) -> dict[str, str]: - """Build a map of ICAO -> trace file path using find command.""" - print(f"Building trace file map from {extract_dir}...") + with tarfile.open(archive_path, 'r:gz') as tar: + tar.extractall(path=temp_dir, filter='data') - # Debug: check what's in extract_dir - if os.path.isdir(extract_dir): - items = os.listdir(extract_dir)[:10] - print(f"First 10 items in extract_dir: {items}") - # Check if there are subdirectories - for item in items[:3]: - subpath = os.path.join(extract_dir, item) - if os.path.isdir(subpath): - subitems = os.listdir(subpath)[:5] - print(f" Contents of {item}/: {subitems}") - - trace_map = collect_trace_files_with_find(extract_dir) + trace_map = collect_trace_files_with_find(temp_dir) print(f"Found {len(trace_map)} trace files") - if len(trace_map) == 0: - # Debug: try manual find - import subprocess - result = subprocess.run( - ['find', extract_dir, '-type', 'f', '-name', 'trace_full_*'], - capture_output=True, text=True - ) - print(f"Manual find output (first 500 chars): {result.stdout[:500]}") - print(f"Manual find stderr: {result.stderr[:200]}") - return trace_map @@ -119,45 +73,22 @@ def rows_to_table(rows: list) -> pa.Table: def process_chunk( - chunk_id: int, - total_chunks: int, trace_map: dict[str, str] | dict[str, list[str]], - icaos: list[str], + chunk_id: int, output_id: str, ) -> str | None: - """Process a chunk of ICAOs and write to parquet. + """Process trace files and write to a single parquet file. Args: - chunk_id: This chunk's ID (0-indexed) - total_chunks: Total number of chunks trace_map: Map of ICAO -> trace file path (str) or list of trace file paths (list[str]) - icaos: Full list of ICAOs from manifest + chunk_id: This chunk's ID (0-indexed) output_id: Identifier for output file (date or date range) """ - chunk_icaos = get_chunk_icaos(icaos, chunk_id, total_chunks) - print(f"Chunk {chunk_id}/{total_chunks}: Processing {len(chunk_icaos)} ICAOs") - if not chunk_icaos: - print(f"Chunk {chunk_id}: No ICAOs to process") - return None + # Get trace file paths from the map + trace_files = list(trace_map.values()) - # Get trace file paths from the map (flatten lists if needed) - trace_files = [] - for icao in chunk_icaos: - if icao in trace_map: - files = trace_map[icao] - if isinstance(files, list): - trace_files.extend(files) - else: - trace_files.append(files) - - print(f"Chunk {chunk_id}: Found {len(trace_files)} trace files") - - if not trace_files: - print(f"Chunk {chunk_id}: No trace files found") - return None - - # Process files and write parquet in batches + # Single output file output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{output_id}.parquet") start_time = time.perf_counter() @@ -166,7 +97,10 @@ def process_chunk( writer = None try: - # Process in parallel batches + # Open writer once at the start + writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') + + # Process files in batches files_per_batch = MAX_WORKERS * 100 for offset in range(0, len(trace_files), files_per_batch): batch_files = trace_files[offset:offset + files_per_batch] @@ -179,11 +113,8 @@ def process_chunk( # Write when batch is full if len(batch_rows) >= BATCH_SIZE: table = rows_to_table(batch_rows) - total_rows += len(batch_rows) - - if writer is None: - writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') writer.write_table(table) + total_rows += len(batch_rows) batch_rows = [] del table @@ -197,11 +128,8 @@ def process_chunk( # Write remaining rows if batch_rows: table = rows_to_table(batch_rows) - total_rows += len(batch_rows) - - if writer is None: - writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') writer.write_table(table) + total_rows += len(batch_rows) del table finally: @@ -218,57 +146,44 @@ def process_chunk( def process_single_day( chunk_id: int, - total_chunks: int, target_day: datetime, ) -> str | None: """Process a single day for this chunk.""" date_str = target_day.strftime("%Y-%m-%d") - version_date = f"v{target_day.strftime('%Y.%m.%d')}" + archive_dir = os.path.join(OUTPUT_DIR, "adsb_archives", date_str) - extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + archive_files = sorted([ + os.path.join(archive_dir, f) + for f in os.listdir(archive_dir) + if f.startswith(f"{date_str}_part_") and f.endswith(".tar.gz") + ]) - if not os.path.isdir(extract_dir): - print(f"Extract directory not found: {extract_dir}") - return None + print(f"Processing {len(archive_files)} archive files") - trace_map = build_trace_file_map(extract_dir) - if not trace_map: - print("No trace files found") - return None + all_trace_files = [] + for archive_path in archive_files: + trace_map = build_trace_file_map(archive_path) + all_trace_files.extend(trace_map.values()) - icaos = read_manifest(date_str) - print(f"Total ICAOs in manifest: {len(icaos)}") + print(f"Total trace files: {len(all_trace_files)}") - return process_chunk(chunk_id, total_chunks, trace_map, icaos, date_str) + # Convert list to dict for process_chunk compatibility + trace_map = {str(i): path for i, path in enumerate(all_trace_files)} + + return process_chunk(trace_map, chunk_id, date_str) def main(): parser = argparse.ArgumentParser(description="Process a chunk of ICAOs for a single day") parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)") - parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks") - parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format") args = parser.parse_args() - print(f"Processing chunk {args.chunk_id}/{args.total_chunks}") - print(f"OUTPUT_DIR: {OUTPUT_DIR}") - print(f"CHUNK_OUTPUT_DIR: {CHUNK_OUTPUT_DIR}") - print(f"Resource usage at start: {get_resource_usage()}") + print(f"Processing chunk {args.chunk_id} for {args.date}") + print(f"Resource usage: {get_resource_usage()}") - # Debug: List what's in OUTPUT_DIR - print(f"\nContents of {OUTPUT_DIR}:") - if os.path.isdir(OUTPUT_DIR): - for item in os.listdir(OUTPUT_DIR)[:20]: - print(f" - {item}") - else: - print(f" Directory does not exist!") - - # Process single day - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") - else: - target_day = get_target_day() - - output_path = process_single_day(args.chunk_id, args.total_chunks, target_day) + target_day = datetime.strptime(args.date, "%Y-%m-%d") + output_path = process_single_day(args.chunk_id, target_day) if output_path: print(f"Output: {output_path}") @@ -277,4 +192,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() \ No newline at end of file