diff --git a/src/adsb/process_icao_chunk.py b/src/adsb/process_icao_chunk.py index d2eed6a..f208a85 100644 --- a/src/adsb/process_icao_chunk.py +++ b/src/adsb/process_icao_chunk.py @@ -1,18 +1,12 @@ """ -Processes a chunk of ICAOs from pre-extracted trace files. +Processes a chunk of ICAOs from pre-extracted trace files for a single day. This is the map phase of the map-reduce pipeline. -Supports both single-day (daily) and multi-day (historical) modes. - Expects extract_dir to already exist with trace files. Reads ICAO manifest to determine which ICAOs to process based on chunk-id. Usage: - # Daily mode (single day) - python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 - - # Historical mode (date range) - python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 --start-date 2024-01-01 --end-date 2024-01-07 + python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 --date 2026-01-01 """ import gc import os @@ -248,69 +242,11 @@ def process_single_day( return process_chunk(chunk_id, total_chunks, trace_map, icaos, date_str) -def process_date_range( - chunk_id: int, - total_chunks: int, - start_date: datetime, - end_date: datetime, -) -> str | None: - """Process a date range for this chunk. - - Combines trace files from all days in the range. - - Args: - chunk_id: This chunk's ID (0-indexed) - total_chunks: Total number of chunks - start_date: Start date (inclusive) - end_date: End date (inclusive) - """ - start_str = start_date.strftime("%Y-%m-%d") - end_str = end_date.strftime("%Y-%m-%d") - manifest_id = f"{start_str}_{end_str}" - - print(f"Processing date range: {start_str} to {end_str}") - - # Build combined trace map from all days - keep ALL trace files (one ICAO may have multiple days) - combined_trace_map: dict[str, list[str]] = {} - current = start_date - - # Both start and end are inclusive - while current <= end_date: - version_date = f"v{current.strftime('%Y.%m.%d')}" - extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") - - if os.path.isdir(extract_dir): - trace_map = build_trace_file_map(extract_dir) - # Collect ALL trace files for each ICAO across all days - for icao, trace_file in trace_map.items(): - if icao not in combined_trace_map: - combined_trace_map[icao] = [] - combined_trace_map[icao].append(trace_file) - print(f" {current.strftime('%Y-%m-%d')}: {len(trace_map)} trace files") - else: - print(f" {current.strftime('%Y-%m-%d')}: no extract directory") - - current += timedelta(days=1) - - if not combined_trace_map: - print("No trace files found in date range") - return None - - print(f"Combined trace map: {len(combined_trace_map)} ICAOs with {sum(len(files) for files in combined_trace_map.values())} total trace files") - - icaos = read_manifest(manifest_id) - print(f"Total ICAOs in manifest: {len(icaos)}") - - return process_chunk(chunk_id, total_chunks, combined_trace_map, icaos, manifest_id) - - def main(): - parser = argparse.ArgumentParser(description="Process a chunk of ICAOs") + parser = argparse.ArgumentParser(description="Process a chunk of ICAOs for a single day") parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)") parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks") parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") - parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") - parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") args = parser.parse_args() print(f"Processing chunk {args.chunk_id}/{args.total_chunks}") @@ -326,19 +262,13 @@ def main(): else: print(f" Directory does not exist!") - # Determine mode: single day or date range - if args.start_date and args.end_date: - # Historical mode - start_date = datetime.strptime(args.start_date, "%Y-%m-%d") - end_date = datetime.strptime(args.end_date, "%Y-%m-%d") - output_path = process_date_range(args.chunk_id, args.total_chunks, start_date, end_date) + # Process single day + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") else: - # Daily mode - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") - else: - target_day = get_target_day() - output_path = process_single_day(args.chunk_id, args.total_chunks, target_day) + target_day = get_target_day() + + output_path = process_single_day(args.chunk_id, args.total_chunks, target_day) if output_path: print(f"Output: {output_path}") @@ -347,4 +277,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file