remvoe code from src/adsb/process_icao_chunk.py

This commit is contained in:
ggman12
2026-02-17 15:42:45 -05:00
parent d3c52266e5
commit 1afe2bed4e
+44 -129
View File
@@ -1,12 +1,11 @@
""" """
Processes a chunk of ICAOs from pre-extracted trace files for a single day. Processes trace files from pre-extracted directory for a single day.
This is the map phase of the map-reduce pipeline. This is the map phase of the map-reduce pipeline.
Expects extract_dir to already exist with trace files. Expects extract_dir to already exist with trace files.
Reads ICAO manifest to determine which ICAOs to process based on chunk-id.
Usage: Usage:
python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 --date 2026-01-01 python -m src.adsb.process_icao_chunk --chunk-id 0 --date 2026-01-01
""" """
import gc import gc
import os import os
@@ -15,6 +14,9 @@ import argparse
import time import time
import concurrent.futures import concurrent.futures
from datetime import datetime, timedelta from datetime import datetime, timedelta
import tarfile
import tempfile
import shutil
import pyarrow as pa import pyarrow as pa
import pyarrow.parquet as pq import pyarrow.parquet as pq
@@ -37,66 +39,18 @@ os.makedirs(CHUNK_OUTPUT_DIR, exist_ok=True)
# Smaller batch size for memory efficiency # Smaller batch size for memory efficiency
BATCH_SIZE = 100_000 BATCH_SIZE = 100_000
def build_trace_file_map(archive_path: str) -> dict[str, str]:
"""Build a map of ICAO -> trace file path by extracting tar.gz archive."""
print(f"Extracting {archive_path}...")
def get_target_day() -> datetime: temp_dir = tempfile.mkdtemp(prefix="adsb_extract_")
"""Get yesterday's date (the day we're processing)."""
return datetime.utcnow() - timedelta(days=1)
with tarfile.open(archive_path, 'r:gz') as tar:
tar.extractall(path=temp_dir, filter='data')
def read_manifest(manifest_id: str) -> list[str]: trace_map = collect_trace_files_with_find(temp_dir)
"""Read ICAO manifest file.
Args:
manifest_id: Either a date string (YYYY-MM-DD) or range string (YYYY-MM-DD_YYYY-MM-DD)
"""
manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{manifest_id}.txt")
if not os.path.exists(manifest_path):
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
with open(manifest_path, "r") as f:
icaos = [line.strip() for line in f if line.strip()]
return icaos
def deterministic_hash(s: str) -> int:
"""Return a deterministic hash for a string (unlike Python's hash() which is randomized)."""
# Use sum of byte values - simple but deterministic
return sum(ord(c) for c in s)
def get_chunk_icaos(icaos: list[str], chunk_id: int, total_chunks: int) -> list[str]:
"""Get the subset of ICAOs for this chunk based on deterministic hash partitioning."""
return [icao for icao in icaos if deterministic_hash(icao) % total_chunks == chunk_id]
def build_trace_file_map(extract_dir: str) -> dict[str, str]:
"""Build a map of ICAO -> trace file path using find command."""
print(f"Building trace file map from {extract_dir}...")
# Debug: check what's in extract_dir
if os.path.isdir(extract_dir):
items = os.listdir(extract_dir)[:10]
print(f"First 10 items in extract_dir: {items}")
# Check if there are subdirectories
for item in items[:3]:
subpath = os.path.join(extract_dir, item)
if os.path.isdir(subpath):
subitems = os.listdir(subpath)[:5]
print(f" Contents of {item}/: {subitems}")
trace_map = collect_trace_files_with_find(extract_dir)
print(f"Found {len(trace_map)} trace files") print(f"Found {len(trace_map)} trace files")
if len(trace_map) == 0:
# Debug: try manual find
import subprocess
result = subprocess.run(
['find', extract_dir, '-type', 'f', '-name', 'trace_full_*'],
capture_output=True, text=True
)
print(f"Manual find output (first 500 chars): {result.stdout[:500]}")
print(f"Manual find stderr: {result.stderr[:200]}")
return trace_map return trace_map
@@ -119,45 +73,22 @@ def rows_to_table(rows: list) -> pa.Table:
def process_chunk( def process_chunk(
chunk_id: int,
total_chunks: int,
trace_map: dict[str, str] | dict[str, list[str]], trace_map: dict[str, str] | dict[str, list[str]],
icaos: list[str], chunk_id: int,
output_id: str, output_id: str,
) -> str | None: ) -> str | None:
"""Process a chunk of ICAOs and write to parquet. """Process trace files and write to a single parquet file.
Args: Args:
chunk_id: This chunk's ID (0-indexed)
total_chunks: Total number of chunks
trace_map: Map of ICAO -> trace file path (str) or list of trace file paths (list[str]) trace_map: Map of ICAO -> trace file path (str) or list of trace file paths (list[str])
icaos: Full list of ICAOs from manifest chunk_id: This chunk's ID (0-indexed)
output_id: Identifier for output file (date or date range) output_id: Identifier for output file (date or date range)
""" """
chunk_icaos = get_chunk_icaos(icaos, chunk_id, total_chunks)
print(f"Chunk {chunk_id}/{total_chunks}: Processing {len(chunk_icaos)} ICAOs")
if not chunk_icaos: # Get trace file paths from the map
print(f"Chunk {chunk_id}: No ICAOs to process") trace_files = list(trace_map.values())
return None
# Get trace file paths from the map (flatten lists if needed) # Single output file
trace_files = []
for icao in chunk_icaos:
if icao in trace_map:
files = trace_map[icao]
if isinstance(files, list):
trace_files.extend(files)
else:
trace_files.append(files)
print(f"Chunk {chunk_id}: Found {len(trace_files)} trace files")
if not trace_files:
print(f"Chunk {chunk_id}: No trace files found")
return None
# Process files and write parquet in batches
output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{output_id}.parquet") output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{output_id}.parquet")
start_time = time.perf_counter() start_time = time.perf_counter()
@@ -166,7 +97,10 @@ def process_chunk(
writer = None writer = None
try: try:
# Process in parallel batches # Open writer once at the start
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
# Process files in batches
files_per_batch = MAX_WORKERS * 100 files_per_batch = MAX_WORKERS * 100
for offset in range(0, len(trace_files), files_per_batch): for offset in range(0, len(trace_files), files_per_batch):
batch_files = trace_files[offset:offset + files_per_batch] batch_files = trace_files[offset:offset + files_per_batch]
@@ -179,11 +113,8 @@ def process_chunk(
# Write when batch is full # Write when batch is full
if len(batch_rows) >= BATCH_SIZE: if len(batch_rows) >= BATCH_SIZE:
table = rows_to_table(batch_rows) table = rows_to_table(batch_rows)
total_rows += len(batch_rows)
if writer is None:
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
writer.write_table(table) writer.write_table(table)
total_rows += len(batch_rows)
batch_rows = [] batch_rows = []
del table del table
@@ -197,11 +128,8 @@ def process_chunk(
# Write remaining rows # Write remaining rows
if batch_rows: if batch_rows:
table = rows_to_table(batch_rows) table = rows_to_table(batch_rows)
total_rows += len(batch_rows)
if writer is None:
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
writer.write_table(table) writer.write_table(table)
total_rows += len(batch_rows)
del table del table
finally: finally:
@@ -218,57 +146,44 @@ def process_chunk(
def process_single_day( def process_single_day(
chunk_id: int, chunk_id: int,
total_chunks: int,
target_day: datetime, target_day: datetime,
) -> str | None: ) -> str | None:
"""Process a single day for this chunk.""" """Process a single day for this chunk."""
date_str = target_day.strftime("%Y-%m-%d") date_str = target_day.strftime("%Y-%m-%d")
version_date = f"v{target_day.strftime('%Y.%m.%d')}" archive_dir = os.path.join(OUTPUT_DIR, "adsb_archives", date_str)
extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") archive_files = sorted([
os.path.join(archive_dir, f)
for f in os.listdir(archive_dir)
if f.startswith(f"{date_str}_part_") and f.endswith(".tar.gz")
])
if not os.path.isdir(extract_dir): print(f"Processing {len(archive_files)} archive files")
print(f"Extract directory not found: {extract_dir}")
return None
trace_map = build_trace_file_map(extract_dir) all_trace_files = []
if not trace_map: for archive_path in archive_files:
print("No trace files found") trace_map = build_trace_file_map(archive_path)
return None all_trace_files.extend(trace_map.values())
icaos = read_manifest(date_str) print(f"Total trace files: {len(all_trace_files)}")
print(f"Total ICAOs in manifest: {len(icaos)}")
return process_chunk(chunk_id, total_chunks, trace_map, icaos, date_str) # Convert list to dict for process_chunk compatibility
trace_map = {str(i): path for i, path in enumerate(all_trace_files)}
return process_chunk(trace_map, chunk_id, date_str)
def main(): def main():
parser = argparse.ArgumentParser(description="Process a chunk of ICAOs for a single day") parser = argparse.ArgumentParser(description="Process a chunk of ICAOs for a single day")
parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)") parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)")
parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks") parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format")
parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)")
args = parser.parse_args() args = parser.parse_args()
print(f"Processing chunk {args.chunk_id}/{args.total_chunks}") print(f"Processing chunk {args.chunk_id} for {args.date}")
print(f"OUTPUT_DIR: {OUTPUT_DIR}") print(f"Resource usage: {get_resource_usage()}")
print(f"CHUNK_OUTPUT_DIR: {CHUNK_OUTPUT_DIR}")
print(f"Resource usage at start: {get_resource_usage()}")
# Debug: List what's in OUTPUT_DIR
print(f"\nContents of {OUTPUT_DIR}:")
if os.path.isdir(OUTPUT_DIR):
for item in os.listdir(OUTPUT_DIR)[:20]:
print(f" - {item}")
else:
print(f" Directory does not exist!")
# Process single day
if args.date:
target_day = datetime.strptime(args.date, "%Y-%m-%d") target_day = datetime.strptime(args.date, "%Y-%m-%d")
else: output_path = process_single_day(args.chunk_id, target_day)
target_day = get_target_day()
output_path = process_single_day(args.chunk_id, args.total_chunks, target_day)
if output_path: if output_path:
print(f"Output: {output_path}") print(f"Output: {output_path}")