diff --git a/src/adsb/process_icao_chunk.py b/src/adsb/process_icao_chunk.py index f86c91b..eb12558 100644 --- a/src/adsb/process_icao_chunk.py +++ b/src/adsb/process_icao_chunk.py @@ -1,11 +1,9 @@ """ -Processes trace files from pre-extracted directory for a single day. +Processes trace files from a single archive part for a single day. This is the map phase of the map-reduce pipeline. -Expects extract_dir to already exist with trace files. - Usage: - python -m src.adsb.process_icao_chunk --chunk-id 0 --date 2026-01-01 + python -m src.adsb.process_icao_chunk --part-id 1 --date 2026-01-01 """ import gc import os @@ -23,7 +21,6 @@ import pyarrow.parquet as pq from src.adsb.download_adsb_data_to_parquet import ( OUTPUT_DIR, - PARQUET_DIR, PARQUET_SCHEMA, COLUMNS, MAX_WORKERS, @@ -73,23 +70,13 @@ def rows_to_table(rows: list) -> pa.Table: def process_chunk( - trace_map: dict[str, str] | dict[str, list[str]], - chunk_id: int, - output_id: str, + trace_files: list[str], + part_id: int, + date_str: str, ) -> str | None: - """Process trace files and write to a single parquet file. + """Process trace files and write to a single parquet file.""" - Args: - trace_map: Map of ICAO -> trace file path (str) or list of trace file paths (list[str]) - chunk_id: This chunk's ID (0-indexed) - output_id: Identifier for output file (date or date range) - """ - - # Get trace file paths from the map - trace_files = list(trace_map.values()) - - # Single output file - output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{output_id}.parquet") + output_path = os.path.join(CHUNK_OUTPUT_DIR, f"part_{part_id}_{date_str}.parquet") start_time = time.perf_counter() total_rows = 0 @@ -97,10 +84,8 @@ def process_chunk( writer = None try: - # Open writer once at the start writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') - # Process files in batches files_per_batch = MAX_WORKERS * 100 for offset in range(0, len(trace_files), files_per_batch): batch_files = trace_files[offset:offset + files_per_batch] @@ -110,85 +95,50 @@ def process_chunk( if rows: batch_rows.extend(rows) - # Write when batch is full if len(batch_rows) >= BATCH_SIZE: - table = rows_to_table(batch_rows) - writer.write_table(table) + writer.write_table(rows_to_table(batch_rows)) total_rows += len(batch_rows) - batch_rows = [] - del table gc.collect() - elapsed = time.perf_counter() - start_time - print(f"Chunk {chunk_id}: {total_rows} rows, {elapsed:.1f}s | {get_resource_usage()}") + print(f"Part {part_id}: {total_rows} rows, {time.perf_counter() - start_time:.1f}s | {get_resource_usage()}") gc.collect() - # Write remaining rows if batch_rows: - table = rows_to_table(batch_rows) - writer.write_table(table) + writer.write_table(rows_to_table(batch_rows)) total_rows += len(batch_rows) - del table finally: if writer: writer.close() - elapsed = time.perf_counter() - start_time - print(f"Chunk {chunk_id}: Done! {total_rows} rows in {elapsed:.1f}s | {get_resource_usage()}") + print(f"Part {part_id}: Done! {total_rows} rows in {time.perf_counter() - start_time:.1f}s | {get_resource_usage()}") - if total_rows > 0: - return output_path - return None - - -def process_single_day( - chunk_id: int, - target_day: datetime, -) -> str | None: - """Process a single day for this chunk.""" - date_str = target_day.strftime("%Y-%m-%d") - archive_dir = os.path.join(OUTPUT_DIR, "adsb_archives", date_str) - - archive_files = sorted([ - os.path.join(archive_dir, f) - for f in os.listdir(archive_dir) - if f.startswith(f"{date_str}_part_") and f.endswith(".tar.gz") - ]) - - print(f"Processing {len(archive_files)} archive files") - - all_trace_files = [] - for archive_path in archive_files: - trace_map = build_trace_file_map(archive_path) - all_trace_files.extend(trace_map.values()) - - print(f"Total trace files: {len(all_trace_files)}") - - # Convert list to dict for process_chunk compatibility - trace_map = {str(i): path for i, path in enumerate(all_trace_files)} - - return process_chunk(trace_map, chunk_id, date_str) + return output_path if total_rows > 0 else None def main(): - parser = argparse.ArgumentParser(description="Process a chunk of ICAOs for a single day") - parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)") + parser = argparse.ArgumentParser(description="Process a single archive part for a day") + parser.add_argument("--part-id", type=int, required=True, help="Part ID (1-indexed)") parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format") args = parser.parse_args() - print(f"Processing chunk {args.chunk_id} for {args.date}") - print(f"Resource usage: {get_resource_usage()}") + print(f"Processing part {args.part_id} for {args.date}") - target_day = datetime.strptime(args.date, "%Y-%m-%d") - output_path = process_single_day(args.chunk_id, target_day) + # Get specific archive file for this part + archive_path = os.path.join(OUTPUT_DIR, "adsb_archives", args.date, f"{args.date}_part_{args.part_id}.tar.gz") - if output_path: - print(f"Output: {output_path}") - else: - print("No output generated") + # Extract and collect trace files + trace_map = build_trace_file_map(archive_path) + all_trace_files = list(trace_map.values()) + + print(f"Total trace files: {len(all_trace_files)}") + + # Process and write output + output_path = process_chunk(all_trace_files, args.part_id, args.date) + + print(f"Output: {output_path}" if output_path else "No output generated") if __name__ == "__main__":