This commit is contained in:
ggman12
2026-02-17 15:46:07 -05:00
parent 1afe2bed4e
commit 70ec797535
+27 -77
View File
@@ -1,11 +1,9 @@
""" """
Processes trace files from pre-extracted directory for a single day. Processes trace files from a single archive part for a single day.
This is the map phase of the map-reduce pipeline. This is the map phase of the map-reduce pipeline.
Expects extract_dir to already exist with trace files.
Usage: Usage:
python -m src.adsb.process_icao_chunk --chunk-id 0 --date 2026-01-01 python -m src.adsb.process_icao_chunk --part-id 1 --date 2026-01-01
""" """
import gc import gc
import os import os
@@ -23,7 +21,6 @@ import pyarrow.parquet as pq
from src.adsb.download_adsb_data_to_parquet import ( from src.adsb.download_adsb_data_to_parquet import (
OUTPUT_DIR, OUTPUT_DIR,
PARQUET_DIR,
PARQUET_SCHEMA, PARQUET_SCHEMA,
COLUMNS, COLUMNS,
MAX_WORKERS, MAX_WORKERS,
@@ -73,23 +70,13 @@ def rows_to_table(rows: list) -> pa.Table:
def process_chunk( def process_chunk(
trace_map: dict[str, str] | dict[str, list[str]], trace_files: list[str],
chunk_id: int, part_id: int,
output_id: str, date_str: str,
) -> str | None: ) -> str | None:
"""Process trace files and write to a single parquet file. """Process trace files and write to a single parquet file."""
Args: output_path = os.path.join(CHUNK_OUTPUT_DIR, f"part_{part_id}_{date_str}.parquet")
trace_map: Map of ICAO -> trace file path (str) or list of trace file paths (list[str])
chunk_id: This chunk's ID (0-indexed)
output_id: Identifier for output file (date or date range)
"""
# Get trace file paths from the map
trace_files = list(trace_map.values())
# Single output file
output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{output_id}.parquet")
start_time = time.perf_counter() start_time = time.perf_counter()
total_rows = 0 total_rows = 0
@@ -97,10 +84,8 @@ def process_chunk(
writer = None writer = None
try: try:
# Open writer once at the start
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
# Process files in batches
files_per_batch = MAX_WORKERS * 100 files_per_batch = MAX_WORKERS * 100
for offset in range(0, len(trace_files), files_per_batch): for offset in range(0, len(trace_files), files_per_batch):
batch_files = trace_files[offset:offset + files_per_batch] batch_files = trace_files[offset:offset + files_per_batch]
@@ -110,85 +95,50 @@ def process_chunk(
if rows: if rows:
batch_rows.extend(rows) batch_rows.extend(rows)
# Write when batch is full
if len(batch_rows) >= BATCH_SIZE: if len(batch_rows) >= BATCH_SIZE:
table = rows_to_table(batch_rows) writer.write_table(rows_to_table(batch_rows))
writer.write_table(table)
total_rows += len(batch_rows) total_rows += len(batch_rows)
batch_rows = [] batch_rows = []
del table
gc.collect() gc.collect()
elapsed = time.perf_counter() - start_time print(f"Part {part_id}: {total_rows} rows, {time.perf_counter() - start_time:.1f}s | {get_resource_usage()}")
print(f"Chunk {chunk_id}: {total_rows} rows, {elapsed:.1f}s | {get_resource_usage()}")
gc.collect() gc.collect()
# Write remaining rows
if batch_rows: if batch_rows:
table = rows_to_table(batch_rows) writer.write_table(rows_to_table(batch_rows))
writer.write_table(table)
total_rows += len(batch_rows) total_rows += len(batch_rows)
del table
finally: finally:
if writer: if writer:
writer.close() writer.close()
elapsed = time.perf_counter() - start_time print(f"Part {part_id}: Done! {total_rows} rows in {time.perf_counter() - start_time:.1f}s | {get_resource_usage()}")
print(f"Chunk {chunk_id}: Done! {total_rows} rows in {elapsed:.1f}s | {get_resource_usage()}")
if total_rows > 0: return output_path if total_rows > 0 else None
return output_path
return None
def process_single_day(
chunk_id: int,
target_day: datetime,
) -> str | None:
"""Process a single day for this chunk."""
date_str = target_day.strftime("%Y-%m-%d")
archive_dir = os.path.join(OUTPUT_DIR, "adsb_archives", date_str)
archive_files = sorted([
os.path.join(archive_dir, f)
for f in os.listdir(archive_dir)
if f.startswith(f"{date_str}_part_") and f.endswith(".tar.gz")
])
print(f"Processing {len(archive_files)} archive files")
all_trace_files = []
for archive_path in archive_files:
trace_map = build_trace_file_map(archive_path)
all_trace_files.extend(trace_map.values())
print(f"Total trace files: {len(all_trace_files)}")
# Convert list to dict for process_chunk compatibility
trace_map = {str(i): path for i, path in enumerate(all_trace_files)}
return process_chunk(trace_map, chunk_id, date_str)
def main(): def main():
parser = argparse.ArgumentParser(description="Process a chunk of ICAOs for a single day") parser = argparse.ArgumentParser(description="Process a single archive part for a day")
parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)") parser.add_argument("--part-id", type=int, required=True, help="Part ID (1-indexed)")
parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format") parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format")
args = parser.parse_args() args = parser.parse_args()
print(f"Processing chunk {args.chunk_id} for {args.date}") print(f"Processing part {args.part_id} for {args.date}")
print(f"Resource usage: {get_resource_usage()}")
target_day = datetime.strptime(args.date, "%Y-%m-%d") # Get specific archive file for this part
output_path = process_single_day(args.chunk_id, target_day) archive_path = os.path.join(OUTPUT_DIR, "adsb_archives", args.date, f"{args.date}_part_{args.part_id}.tar.gz")
if output_path: # Extract and collect trace files
print(f"Output: {output_path}") trace_map = build_trace_file_map(archive_path)
else: all_trace_files = list(trace_map.values())
print("No output generated")
print(f"Total trace files: {len(all_trace_files)}")
# Process and write output
output_path = process_chunk(all_trace_files, args.part_id, args.date)
print(f"Output: {output_path}" if output_path else "No output generated")
if __name__ == "__main__": if __name__ == "__main__":