From 2b2095700fcd558085bd0c1ba58b957b43325da6 Mon Sep 17 00:00:00 2001 From: ggman12 Date: Sun, 15 Feb 2026 19:53:09 -0500 Subject: [PATCH] use chunks in run_local --- src/adsb/run_local.py | 103 +++++++++++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 36 deletions(-) diff --git a/src/adsb/run_local.py b/src/adsb/run_local.py index 8c2ff77..122a4ce 100644 --- a/src/adsb/run_local.py +++ b/src/adsb/run_local.py @@ -51,6 +51,12 @@ def main(): default=4, help="Number of parallel chunks (default: 4)" ) + parser.add_argument( + "--chunk-days", + type=int, + default=1, + help="Days per chunk for date range processing (default: 1)" + ) parser.add_argument( "--skip-base", action="store_true", @@ -71,52 +77,77 @@ def main(): start_str = start_date.strftime("%Y-%m-%d") end_str = end_date.strftime("%Y-%m-%d") if end_date else None + # Generate date chunks if processing a range + date_chunks = [] + if end_str: + current = start_date + while current < end_date: + chunk_end = min(current + timedelta(days=args.chunk_days), end_date) + date_chunks.append({ + 'start': current.strftime("%Y-%m-%d"), + 'end': chunk_end.strftime("%Y-%m-%d") + }) + current = chunk_end + else: + # Single day + date_chunks = [{'start': start_str, 'end': start_str}] + print("=" * 60) print("ADS-B Processing Pipeline") print("=" * 60) if end_str: print(f"Date range: {start_str} to {end_str}") + print(f"Date chunks: {len(date_chunks)} ({args.chunk_days} days each)") else: print(f"Date: {start_str}") - print(f"Chunks: {args.chunks}") + print(f"ICAO chunks: {args.chunks}") print("=" * 60) - # Step 1: Download and extract - print("\n" + "=" * 60) - print("Step 1: Download and Extract") - print("=" * 60) - - if end_str: - cmd = ["python", "-m", "src.adsb.download_and_list_icaos", - "--start-date", start_str, "--end-date", end_str] - else: - cmd = ["python", "-m", "src.adsb.download_and_list_icaos", - "--date", start_str] - run_cmd(cmd, "Download and extract") - - # Step 2: Process chunks - print("\n" + "=" * 60) - print("Step 2: Process Chunks") - print("=" * 60) - - for chunk_id in range(args.chunks): - print(f"\n--- Chunk {chunk_id + 1}/{args.chunks} ---") - if end_str: - cmd = ["python", "-m", "src.adsb.process_icao_chunk", - "--chunk-id", str(chunk_id), - "--total-chunks", str(args.chunks), - "--start-date", start_str, - "--end-date", end_str] + # Process each date chunk + for idx, date_chunk in enumerate(date_chunks, 1): + chunk_start = date_chunk['start'] + chunk_end = date_chunk['end'] + + print(f"\n{'=' * 60}") + print(f"Processing Date Chunk {idx}/{len(date_chunks)}: {chunk_start} to {chunk_end}") + print('=' * 60) + + # Step 1: Download and extract + print("\n" + "=" * 60) + print("Step 1: Download and Extract") + print("=" * 60) + + if chunk_start == chunk_end: + cmd = ["python", "-m", "src.adsb.download_and_list_icaos", + "--date", chunk_start] else: - cmd = ["python", "-m", "src.adsb.process_icao_chunk", - "--chunk-id", str(chunk_id), - "--total-chunks", str(args.chunks), - "--date", start_str] - run_cmd(cmd, f"Process chunk {chunk_id}") + cmd = ["python", "-m", "src.adsb.download_and_list_icaos", + "--start-date", chunk_start, "--end-date", chunk_end] + run_cmd(cmd, "Download and extract") + + # Step 2: Process chunks + print("\n" + "=" * 60) + print("Step 2: Process Chunks") + print("=" * 60) + + for chunk_id in range(args.chunks): + print(f"\n--- ICAO Chunk {chunk_id + 1}/{args.chunks} ---") + if chunk_start == chunk_end: + cmd = ["python", "-m", "src.adsb.process_icao_chunk", + "--chunk-id", str(chunk_id), + "--total-chunks", str(args.chunks), + "--date", chunk_start] + else: + cmd = ["python", "-m", "src.adsb.process_icao_chunk", + "--chunk-id", str(chunk_id), + "--total-chunks", str(args.chunks), + "--start-date", chunk_start, + "--end-date", chunk_end] + run_cmd(cmd, f"Process ICAO chunk {chunk_id}") - # Step 3: Combine chunks to CSV + # Step 3: Combine all chunks to CSV print("\n" + "=" * 60) - print("Step 3: Combine to CSV") + print("Step 3: Combine All Chunks to CSV") print("=" * 60) chunks_dir = "./data/output/adsb_chunks" @@ -140,9 +171,9 @@ def main(): # Show output output_dir = "./data/openairframes" if end_str: - output_file = f"openairframes_adsb_{start_str}_{end_str}.csv" + output_file = f"openairframes_adsb_{start_str}_{end_str}.csv.gz" else: - output_file = f"openairframes_adsb_{start_str}_{start_str}.csv" + output_file = f"openairframes_adsb_{start_str}_{start_str}.csv.gz" output_path = os.path.join(output_dir, output_file) if os.path.exists(output_path):