mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-05-13 03:44:45 +02:00
Daily ADSB and Histoircal updates. Update readme.md
This commit is contained in:
+54
-242
@@ -1,18 +1,9 @@
|
||||
"""
|
||||
Processes a chunk of ICAOs from pre-extracted trace files.
|
||||
Processes trace files from a single archive part for a single day.
|
||||
This is the map phase of the map-reduce pipeline.
|
||||
|
||||
Supports both single-day (daily) and multi-day (historical) modes.
|
||||
|
||||
Expects extract_dir to already exist with trace files.
|
||||
Reads ICAO manifest to determine which ICAOs to process based on chunk-id.
|
||||
|
||||
Usage:
|
||||
# Daily mode (single day)
|
||||
python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4
|
||||
|
||||
# Historical mode (date range)
|
||||
python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 --start-date 2024-01-01 --end-date 2024-01-07
|
||||
python -m src.adsb.process_icao_chunk --part-id 1 --date 2026-01-01
|
||||
"""
|
||||
import gc
|
||||
import os
|
||||
@@ -21,6 +12,9 @@ import argparse
|
||||
import time
|
||||
import concurrent.futures
|
||||
from datetime import datetime, timedelta
|
||||
import tarfile
|
||||
import tempfile
|
||||
import shutil
|
||||
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
@@ -37,72 +31,21 @@ from src.adsb.download_adsb_data_to_parquet import (
|
||||
)
|
||||
|
||||
|
||||
CHUNK_OUTPUT_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks")
|
||||
os.makedirs(CHUNK_OUTPUT_DIR, exist_ok=True)
|
||||
|
||||
# Smaller batch size for memory efficiency
|
||||
BATCH_SIZE = 100_000
|
||||
|
||||
|
||||
def get_target_day() -> datetime:
|
||||
"""Get yesterday's date (the day we're processing)."""
|
||||
return datetime.utcnow() - timedelta(days=1)
|
||||
|
||||
|
||||
def read_manifest(manifest_id: str) -> list[str]:
|
||||
"""Read ICAO manifest file.
|
||||
def build_trace_file_map(archive_path: str) -> dict[str, str]:
|
||||
"""Build a map of ICAO -> trace file path by extracting tar.gz archive."""
|
||||
print(f"Extracting {archive_path}...")
|
||||
|
||||
Args:
|
||||
manifest_id: Either a date string (YYYY-MM-DD) or range string (YYYY-MM-DD_YYYY-MM-DD)
|
||||
"""
|
||||
manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{manifest_id}.txt")
|
||||
if not os.path.exists(manifest_path):
|
||||
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
|
||||
temp_dir = tempfile.mkdtemp(prefix="adsb_extract_")
|
||||
|
||||
with open(manifest_path, "r") as f:
|
||||
icaos = [line.strip() for line in f if line.strip()]
|
||||
return icaos
|
||||
|
||||
|
||||
def deterministic_hash(s: str) -> int:
|
||||
"""Return a deterministic hash for a string (unlike Python's hash() which is randomized)."""
|
||||
# Use sum of byte values - simple but deterministic
|
||||
return sum(ord(c) for c in s)
|
||||
|
||||
|
||||
def get_chunk_icaos(icaos: list[str], chunk_id: int, total_chunks: int) -> list[str]:
|
||||
"""Get the subset of ICAOs for this chunk based on deterministic hash partitioning."""
|
||||
return [icao for icao in icaos if deterministic_hash(icao) % total_chunks == chunk_id]
|
||||
|
||||
|
||||
def build_trace_file_map(extract_dir: str) -> dict[str, str]:
|
||||
"""Build a map of ICAO -> trace file path using find command."""
|
||||
print(f"Building trace file map from {extract_dir}...")
|
||||
with tarfile.open(archive_path, 'r:gz') as tar:
|
||||
tar.extractall(path=temp_dir, filter='data')
|
||||
|
||||
# Debug: check what's in extract_dir
|
||||
if os.path.isdir(extract_dir):
|
||||
items = os.listdir(extract_dir)[:10]
|
||||
print(f"First 10 items in extract_dir: {items}")
|
||||
# Check if there are subdirectories
|
||||
for item in items[:3]:
|
||||
subpath = os.path.join(extract_dir, item)
|
||||
if os.path.isdir(subpath):
|
||||
subitems = os.listdir(subpath)[:5]
|
||||
print(f" Contents of {item}/: {subitems}")
|
||||
|
||||
trace_map = collect_trace_files_with_find(extract_dir)
|
||||
trace_map = collect_trace_files_with_find(temp_dir)
|
||||
print(f"Found {len(trace_map)} trace files")
|
||||
|
||||
if len(trace_map) == 0:
|
||||
# Debug: try manual find
|
||||
import subprocess
|
||||
result = subprocess.run(
|
||||
['find', extract_dir, '-type', 'f', '-name', 'trace_full_*'],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
print(f"Manual find output (first 500 chars): {result.stdout[:500]}")
|
||||
print(f"Manual find stderr: {result.stderr[:200]}")
|
||||
|
||||
return trace_map
|
||||
|
||||
|
||||
@@ -125,42 +68,13 @@ def rows_to_table(rows: list) -> pa.Table:
|
||||
|
||||
|
||||
def process_chunk(
|
||||
chunk_id: int,
|
||||
total_chunks: int,
|
||||
trace_map: dict[str, str],
|
||||
icaos: list[str],
|
||||
output_id: str,
|
||||
trace_files: list[str],
|
||||
part_id: int,
|
||||
date_str: str,
|
||||
) -> str | None:
|
||||
"""Process a chunk of ICAOs and write to parquet.
|
||||
"""Process trace files and write to a single parquet file."""
|
||||
|
||||
Args:
|
||||
chunk_id: This chunk's ID (0-indexed)
|
||||
total_chunks: Total number of chunks
|
||||
trace_map: Map of ICAO -> trace file path
|
||||
icaos: Full list of ICAOs from manifest
|
||||
output_id: Identifier for output file (date or date range)
|
||||
"""
|
||||
chunk_icaos = get_chunk_icaos(icaos, chunk_id, total_chunks)
|
||||
print(f"Chunk {chunk_id}/{total_chunks}: Processing {len(chunk_icaos)} ICAOs")
|
||||
|
||||
if not chunk_icaos:
|
||||
print(f"Chunk {chunk_id}: No ICAOs to process")
|
||||
return None
|
||||
|
||||
# Get trace file paths from the map
|
||||
trace_files = []
|
||||
for icao in chunk_icaos:
|
||||
if icao in trace_map:
|
||||
trace_files.append(trace_map[icao])
|
||||
|
||||
print(f"Chunk {chunk_id}: Found {len(trace_files)} trace files")
|
||||
|
||||
if not trace_files:
|
||||
print(f"Chunk {chunk_id}: No trace files found")
|
||||
return None
|
||||
|
||||
# Process files and write parquet in batches
|
||||
output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{output_id}.parquet")
|
||||
output_path = os.path.join(PARQUET_DIR, f"part_{part_id}_{date_str}.parquet")
|
||||
|
||||
start_time = time.perf_counter()
|
||||
total_rows = 0
|
||||
@@ -168,7 +82,8 @@ def process_chunk(
|
||||
writer = None
|
||||
|
||||
try:
|
||||
# Process in parallel batches
|
||||
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
|
||||
|
||||
files_per_batch = MAX_WORKERS * 100
|
||||
for offset in range(0, len(trace_files), files_per_batch):
|
||||
batch_files = trace_files[offset:offset + files_per_batch]
|
||||
@@ -178,166 +93,63 @@ def process_chunk(
|
||||
if rows:
|
||||
batch_rows.extend(rows)
|
||||
|
||||
# Write when batch is full
|
||||
if len(batch_rows) >= BATCH_SIZE:
|
||||
table = rows_to_table(batch_rows)
|
||||
writer.write_table(rows_to_table(batch_rows))
|
||||
total_rows += len(batch_rows)
|
||||
|
||||
if writer is None:
|
||||
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
|
||||
writer.write_table(table)
|
||||
|
||||
batch_rows = []
|
||||
del table
|
||||
gc.collect()
|
||||
|
||||
elapsed = time.perf_counter() - start_time
|
||||
print(f"Chunk {chunk_id}: {total_rows} rows, {elapsed:.1f}s | {get_resource_usage()}")
|
||||
|
||||
gc.collect()
|
||||
|
||||
# Write remaining rows
|
||||
if batch_rows:
|
||||
table = rows_to_table(batch_rows)
|
||||
writer.write_table(rows_to_table(batch_rows))
|
||||
total_rows += len(batch_rows)
|
||||
|
||||
if writer is None:
|
||||
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
|
||||
writer.write_table(table)
|
||||
del table
|
||||
|
||||
finally:
|
||||
if writer:
|
||||
writer.close()
|
||||
|
||||
elapsed = time.perf_counter() - start_time
|
||||
print(f"Chunk {chunk_id}: Done! {total_rows} rows in {elapsed:.1f}s | {get_resource_usage()}")
|
||||
print(f"Part {part_id}: Done! {total_rows} rows in {time.perf_counter() - start_time:.1f}s | {get_resource_usage()}")
|
||||
|
||||
if total_rows > 0:
|
||||
return output_path
|
||||
return None
|
||||
|
||||
|
||||
def process_single_day(
|
||||
chunk_id: int,
|
||||
total_chunks: int,
|
||||
target_day: datetime,
|
||||
) -> str | None:
|
||||
"""Process a single day for this chunk."""
|
||||
date_str = target_day.strftime("%Y-%m-%d")
|
||||
version_date = f"v{target_day.strftime('%Y.%m.%d')}"
|
||||
|
||||
extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0")
|
||||
|
||||
if not os.path.isdir(extract_dir):
|
||||
print(f"Extract directory not found: {extract_dir}")
|
||||
return None
|
||||
|
||||
trace_map = build_trace_file_map(extract_dir)
|
||||
if not trace_map:
|
||||
print("No trace files found")
|
||||
return None
|
||||
|
||||
icaos = read_manifest(date_str)
|
||||
print(f"Total ICAOs in manifest: {len(icaos)}")
|
||||
|
||||
return process_chunk(chunk_id, total_chunks, trace_map, icaos, date_str)
|
||||
|
||||
|
||||
def process_date_range(
|
||||
chunk_id: int,
|
||||
total_chunks: int,
|
||||
start_date: datetime,
|
||||
end_date: datetime,
|
||||
) -> str | None:
|
||||
"""Process a date range for this chunk.
|
||||
|
||||
Combines trace files from all days in the range.
|
||||
|
||||
Args:
|
||||
chunk_id: This chunk's ID (0-indexed)
|
||||
total_chunks: Total number of chunks
|
||||
start_date: Start date (inclusive)
|
||||
end_date: End date (inclusive)
|
||||
"""
|
||||
start_str = start_date.strftime("%Y-%m-%d")
|
||||
end_str = end_date.strftime("%Y-%m-%d")
|
||||
manifest_id = f"{start_str}_{end_str}"
|
||||
|
||||
print(f"Processing date range: {start_str} to {end_str}")
|
||||
|
||||
# Build combined trace map from all days
|
||||
combined_trace_map: dict[str, str] = {}
|
||||
current = start_date
|
||||
|
||||
# Both start and end are inclusive
|
||||
while current <= end_date:
|
||||
version_date = f"v{current.strftime('%Y.%m.%d')}"
|
||||
extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0")
|
||||
|
||||
if os.path.isdir(extract_dir):
|
||||
trace_map = build_trace_file_map(extract_dir)
|
||||
# Later days override earlier days (use most recent trace file)
|
||||
combined_trace_map.update(trace_map)
|
||||
print(f" {current.strftime('%Y-%m-%d')}: {len(trace_map)} trace files")
|
||||
else:
|
||||
print(f" {current.strftime('%Y-%m-%d')}: no extract directory")
|
||||
|
||||
current += timedelta(days=1)
|
||||
|
||||
if not combined_trace_map:
|
||||
print("No trace files found in date range")
|
||||
return None
|
||||
|
||||
print(f"Combined trace map: {len(combined_trace_map)} ICAOs")
|
||||
|
||||
icaos = read_manifest(manifest_id)
|
||||
print(f"Total ICAOs in manifest: {len(icaos)}")
|
||||
|
||||
return process_chunk(chunk_id, total_chunks, combined_trace_map, icaos, manifest_id)
|
||||
return output_path if total_rows > 0 else None
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Process a chunk of ICAOs")
|
||||
parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)")
|
||||
parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks")
|
||||
parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)")
|
||||
parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)")
|
||||
parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)")
|
||||
parser = argparse.ArgumentParser(description="Process a single archive part for a day")
|
||||
parser.add_argument("--part-id", type=int, required=True, help="Part ID (1-indexed)")
|
||||
parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format")
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Processing chunk {args.chunk_id}/{args.total_chunks}")
|
||||
print(f"OUTPUT_DIR: {OUTPUT_DIR}")
|
||||
print(f"CHUNK_OUTPUT_DIR: {CHUNK_OUTPUT_DIR}")
|
||||
print(f"Resource usage at start: {get_resource_usage()}")
|
||||
print(f"Processing part {args.part_id} for {args.date}")
|
||||
|
||||
# Debug: List what's in OUTPUT_DIR
|
||||
print(f"\nContents of {OUTPUT_DIR}:")
|
||||
if os.path.isdir(OUTPUT_DIR):
|
||||
for item in os.listdir(OUTPUT_DIR)[:20]:
|
||||
print(f" - {item}")
|
||||
else:
|
||||
print(f" Directory does not exist!")
|
||||
# Get specific archive file for this part
|
||||
archive_path = os.path.join(OUTPUT_DIR, "adsb_archives", args.date, f"{args.date}_part_{args.part_id}.tar.gz")
|
||||
|
||||
# Determine mode: single day or date range
|
||||
if args.start_date and args.end_date:
|
||||
# Historical mode
|
||||
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
|
||||
end_date = datetime.strptime(args.end_date, "%Y-%m-%d")
|
||||
output_path = process_date_range(args.chunk_id, args.total_chunks, start_date, end_date)
|
||||
else:
|
||||
# Daily mode
|
||||
if args.date:
|
||||
target_day = datetime.strptime(args.date, "%Y-%m-%d")
|
||||
else:
|
||||
target_day = get_target_day()
|
||||
output_path = process_single_day(args.chunk_id, args.total_chunks, target_day)
|
||||
# Extract and collect trace files
|
||||
trace_map = build_trace_file_map(archive_path)
|
||||
all_trace_files = list(trace_map.values())
|
||||
|
||||
if output_path:
|
||||
print(f"Output: {output_path}")
|
||||
else:
|
||||
print("No output generated")
|
||||
print(f"Total trace files: {len(all_trace_files)}")
|
||||
|
||||
# Process and write output
|
||||
output_path = process_chunk(all_trace_files, args.part_id, args.date)
|
||||
|
||||
from src.adsb.compress_adsb_to_aircraft_data import compress_parquet_part
|
||||
df_compressed = compress_parquet_part(args.part_id, args.date)
|
||||
|
||||
# Write parquet
|
||||
df_compressed_output = OUTPUT_DIR / "compressed" / args.date/ f"part_{args.part_id}_{args.date}.parquet"
|
||||
os.makedirs(df_compressed_output.parent, exist_ok=True)
|
||||
df_compressed.write_parquet(df_compressed_output, compression='snappy')
|
||||
|
||||
# Write CSV
|
||||
csv_output = OUTPUT_DIR / "compressed" / args.date / f"part_{args.part_id}_{args.date}.csv"
|
||||
df_compressed.write_csv(csv_output)
|
||||
|
||||
print(f"Raw output: {output_path}" if output_path else "No raw output generated")
|
||||
print(f"Compressed parquet: {df_compressed_output}")
|
||||
print(f"Compressed CSV: {csv_output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
Reference in New Issue
Block a user