mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-04-23 19:46:09 +02:00
use chunks in run_local
This commit is contained in:
+67
-36
@@ -51,6 +51,12 @@ def main():
|
||||
default=4,
|
||||
help="Number of parallel chunks (default: 4)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--chunk-days",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Days per chunk for date range processing (default: 1)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skip-base",
|
||||
action="store_true",
|
||||
@@ -71,52 +77,77 @@ def main():
|
||||
start_str = start_date.strftime("%Y-%m-%d")
|
||||
end_str = end_date.strftime("%Y-%m-%d") if end_date else None
|
||||
|
||||
# Generate date chunks if processing a range
|
||||
date_chunks = []
|
||||
if end_str:
|
||||
current = start_date
|
||||
while current < end_date:
|
||||
chunk_end = min(current + timedelta(days=args.chunk_days), end_date)
|
||||
date_chunks.append({
|
||||
'start': current.strftime("%Y-%m-%d"),
|
||||
'end': chunk_end.strftime("%Y-%m-%d")
|
||||
})
|
||||
current = chunk_end
|
||||
else:
|
||||
# Single day
|
||||
date_chunks = [{'start': start_str, 'end': start_str}]
|
||||
|
||||
print("=" * 60)
|
||||
print("ADS-B Processing Pipeline")
|
||||
print("=" * 60)
|
||||
if end_str:
|
||||
print(f"Date range: {start_str} to {end_str}")
|
||||
print(f"Date chunks: {len(date_chunks)} ({args.chunk_days} days each)")
|
||||
else:
|
||||
print(f"Date: {start_str}")
|
||||
print(f"Chunks: {args.chunks}")
|
||||
print(f"ICAO chunks: {args.chunks}")
|
||||
print("=" * 60)
|
||||
|
||||
# Step 1: Download and extract
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 1: Download and Extract")
|
||||
print("=" * 60)
|
||||
|
||||
if end_str:
|
||||
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
|
||||
"--start-date", start_str, "--end-date", end_str]
|
||||
else:
|
||||
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
|
||||
"--date", start_str]
|
||||
run_cmd(cmd, "Download and extract")
|
||||
|
||||
# Step 2: Process chunks
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 2: Process Chunks")
|
||||
print("=" * 60)
|
||||
|
||||
for chunk_id in range(args.chunks):
|
||||
print(f"\n--- Chunk {chunk_id + 1}/{args.chunks} ---")
|
||||
if end_str:
|
||||
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
|
||||
"--chunk-id", str(chunk_id),
|
||||
"--total-chunks", str(args.chunks),
|
||||
"--start-date", start_str,
|
||||
"--end-date", end_str]
|
||||
# Process each date chunk
|
||||
for idx, date_chunk in enumerate(date_chunks, 1):
|
||||
chunk_start = date_chunk['start']
|
||||
chunk_end = date_chunk['end']
|
||||
|
||||
print(f"\n{'=' * 60}")
|
||||
print(f"Processing Date Chunk {idx}/{len(date_chunks)}: {chunk_start} to {chunk_end}")
|
||||
print('=' * 60)
|
||||
|
||||
# Step 1: Download and extract
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 1: Download and Extract")
|
||||
print("=" * 60)
|
||||
|
||||
if chunk_start == chunk_end:
|
||||
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
|
||||
"--date", chunk_start]
|
||||
else:
|
||||
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
|
||||
"--chunk-id", str(chunk_id),
|
||||
"--total-chunks", str(args.chunks),
|
||||
"--date", start_str]
|
||||
run_cmd(cmd, f"Process chunk {chunk_id}")
|
||||
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
|
||||
"--start-date", chunk_start, "--end-date", chunk_end]
|
||||
run_cmd(cmd, "Download and extract")
|
||||
|
||||
# Step 2: Process chunks
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 2: Process Chunks")
|
||||
print("=" * 60)
|
||||
|
||||
for chunk_id in range(args.chunks):
|
||||
print(f"\n--- ICAO Chunk {chunk_id + 1}/{args.chunks} ---")
|
||||
if chunk_start == chunk_end:
|
||||
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
|
||||
"--chunk-id", str(chunk_id),
|
||||
"--total-chunks", str(args.chunks),
|
||||
"--date", chunk_start]
|
||||
else:
|
||||
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
|
||||
"--chunk-id", str(chunk_id),
|
||||
"--total-chunks", str(args.chunks),
|
||||
"--start-date", chunk_start,
|
||||
"--end-date", chunk_end]
|
||||
run_cmd(cmd, f"Process ICAO chunk {chunk_id}")
|
||||
|
||||
# Step 3: Combine chunks to CSV
|
||||
# Step 3: Combine all chunks to CSV
|
||||
print("\n" + "=" * 60)
|
||||
print("Step 3: Combine to CSV")
|
||||
print("Step 3: Combine All Chunks to CSV")
|
||||
print("=" * 60)
|
||||
|
||||
chunks_dir = "./data/output/adsb_chunks"
|
||||
@@ -140,9 +171,9 @@ def main():
|
||||
# Show output
|
||||
output_dir = "./data/openairframes"
|
||||
if end_str:
|
||||
output_file = f"openairframes_adsb_{start_str}_{end_str}.csv"
|
||||
output_file = f"openairframes_adsb_{start_str}_{end_str}.csv.gz"
|
||||
else:
|
||||
output_file = f"openairframes_adsb_{start_str}_{start_str}.csv"
|
||||
output_file = f"openairframes_adsb_{start_str}_{start_str}.csv.gz"
|
||||
|
||||
output_path = os.path.join(output_dir, output_file)
|
||||
if os.path.exists(output_path):
|
||||
|
||||
Reference in New Issue
Block a user