mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-04-24 12:06:31 +02:00
do only a single day instead of multiple
This commit is contained in:
@@ -1,18 +1,12 @@
|
||||
"""
|
||||
Processes a chunk of ICAOs from pre-extracted trace files.
|
||||
Processes a chunk of ICAOs from pre-extracted trace files for a single day.
|
||||
This is the map phase of the map-reduce pipeline.
|
||||
|
||||
Supports both single-day (daily) and multi-day (historical) modes.
|
||||
|
||||
Expects extract_dir to already exist with trace files.
|
||||
Reads ICAO manifest to determine which ICAOs to process based on chunk-id.
|
||||
|
||||
Usage:
|
||||
# Daily mode (single day)
|
||||
python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4
|
||||
|
||||
# Historical mode (date range)
|
||||
python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 --start-date 2024-01-01 --end-date 2024-01-07
|
||||
python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 --date 2026-01-01
|
||||
"""
|
||||
import gc
|
||||
import os
|
||||
@@ -248,69 +242,11 @@ def process_single_day(
|
||||
return process_chunk(chunk_id, total_chunks, trace_map, icaos, date_str)
|
||||
|
||||
|
||||
def process_date_range(
|
||||
chunk_id: int,
|
||||
total_chunks: int,
|
||||
start_date: datetime,
|
||||
end_date: datetime,
|
||||
) -> str | None:
|
||||
"""Process a date range for this chunk.
|
||||
|
||||
Combines trace files from all days in the range.
|
||||
|
||||
Args:
|
||||
chunk_id: This chunk's ID (0-indexed)
|
||||
total_chunks: Total number of chunks
|
||||
start_date: Start date (inclusive)
|
||||
end_date: End date (inclusive)
|
||||
"""
|
||||
start_str = start_date.strftime("%Y-%m-%d")
|
||||
end_str = end_date.strftime("%Y-%m-%d")
|
||||
manifest_id = f"{start_str}_{end_str}"
|
||||
|
||||
print(f"Processing date range: {start_str} to {end_str}")
|
||||
|
||||
# Build combined trace map from all days - keep ALL trace files (one ICAO may have multiple days)
|
||||
combined_trace_map: dict[str, list[str]] = {}
|
||||
current = start_date
|
||||
|
||||
# Both start and end are inclusive
|
||||
while current <= end_date:
|
||||
version_date = f"v{current.strftime('%Y.%m.%d')}"
|
||||
extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0")
|
||||
|
||||
if os.path.isdir(extract_dir):
|
||||
trace_map = build_trace_file_map(extract_dir)
|
||||
# Collect ALL trace files for each ICAO across all days
|
||||
for icao, trace_file in trace_map.items():
|
||||
if icao not in combined_trace_map:
|
||||
combined_trace_map[icao] = []
|
||||
combined_trace_map[icao].append(trace_file)
|
||||
print(f" {current.strftime('%Y-%m-%d')}: {len(trace_map)} trace files")
|
||||
else:
|
||||
print(f" {current.strftime('%Y-%m-%d')}: no extract directory")
|
||||
|
||||
current += timedelta(days=1)
|
||||
|
||||
if not combined_trace_map:
|
||||
print("No trace files found in date range")
|
||||
return None
|
||||
|
||||
print(f"Combined trace map: {len(combined_trace_map)} ICAOs with {sum(len(files) for files in combined_trace_map.values())} total trace files")
|
||||
|
||||
icaos = read_manifest(manifest_id)
|
||||
print(f"Total ICAOs in manifest: {len(icaos)}")
|
||||
|
||||
return process_chunk(chunk_id, total_chunks, combined_trace_map, icaos, manifest_id)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Process a chunk of ICAOs")
|
||||
parser = argparse.ArgumentParser(description="Process a chunk of ICAOs for a single day")
|
||||
parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)")
|
||||
parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks")
|
||||
parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)")
|
||||
parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)")
|
||||
parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)")
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Processing chunk {args.chunk_id}/{args.total_chunks}")
|
||||
@@ -326,19 +262,13 @@ def main():
|
||||
else:
|
||||
print(f" Directory does not exist!")
|
||||
|
||||
# Determine mode: single day or date range
|
||||
if args.start_date and args.end_date:
|
||||
# Historical mode
|
||||
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
|
||||
end_date = datetime.strptime(args.end_date, "%Y-%m-%d")
|
||||
output_path = process_date_range(args.chunk_id, args.total_chunks, start_date, end_date)
|
||||
# Process single day
|
||||
if args.date:
|
||||
target_day = datetime.strptime(args.date, "%Y-%m-%d")
|
||||
else:
|
||||
# Daily mode
|
||||
if args.date:
|
||||
target_day = datetime.strptime(args.date, "%Y-%m-%d")
|
||||
else:
|
||||
target_day = get_target_day()
|
||||
output_path = process_single_day(args.chunk_id, args.total_chunks, target_day)
|
||||
target_day = get_target_day()
|
||||
|
||||
output_path = process_single_day(args.chunk_id, args.total_chunks, target_day)
|
||||
|
||||
if output_path:
|
||||
print(f"Output: {output_path}")
|
||||
@@ -347,4 +277,4 @@ def main():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
Reference in New Issue
Block a user