diff --git a/.github/workflows/historical-adsb.yaml b/.github/workflows/historical-adsb.yaml index db85e99..0bb99a1 100644 --- a/.github/workflows/historical-adsb.yaml +++ b/.github/workflows/historical-adsb.yaml @@ -8,7 +8,7 @@ on: required: true type: string end_date: - description: 'End date (YYYY-MM-DD, inclusive)' + description: 'End date (YYYY-MM-DD, exclusive)' required: true type: string chunk_days: @@ -41,9 +41,9 @@ jobs: INPUT_CHUNK_DAYS: ${{ inputs.chunk_days }} run: python src/adsb/historical_generate_matrix.py - process-chunk: + adsb-extract: needs: generate-matrix - runs-on: ubuntu-latest + runs-on: ubuntu-24.04-arm strategy: matrix: chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }} @@ -61,7 +61,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install polars pyarrow orjson zstandard + pip install -r requirements.txt - name: Free disk space run: | @@ -70,24 +70,37 @@ jobs: sudo rm -rf /usr/local/share/boost df -h - - name: Process date range + - name: Download and extract ADS-B data env: - CHUNK_START_DATE: ${{ matrix.chunk.start_date }} - CHUNK_END_DATE: ${{ matrix.chunk.end_date }} - working-directory: src/adsb - run: python historical_process_chunk.py + START_DATE: ${{ matrix.chunk.start_date }} + END_DATE: ${{ matrix.chunk.end_date }} + run: | + python -m src.adsb.download_and_list_icaos --start-date "$START_DATE" --end-date "$END_DATE" + ls -lah data/output/ - - name: Upload chunk artifact + - name: Create tar of extracted data + run: | + cd data/output + tar -cf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt 2>/dev/null || echo "Some files may not exist" + ls -lah extracted_data.tar || echo "No tar created" + + - name: Upload extracted data uses: actions/upload-artifact@v4 with: - name: chunk-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }} - path: data/chunks/*.csv + name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }} + path: data/output/extracted_data.tar retention-days: 1 - if-no-files-found: ignore + compression-level: 0 + if-no-files-found: warn - combine-chunks: - needs: [generate-matrix, process-chunk] - runs-on: ubuntu-latest + adsb-map: + needs: [generate-matrix, adsb-extract] + runs-on: ubuntu-24.04-arm + strategy: + fail-fast: false + matrix: + chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }} + icao_chunk: [0, 1, 2, 3] steps: - name: Checkout uses: actions/checkout@v4 @@ -100,25 +113,93 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install polars + pip install -r requirements.txt + + - name: Free disk space + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf /opt/ghc + sudo rm -rf /usr/local/share/boost + df -h + + - name: Download extracted data + uses: actions/download-artifact@v4 + with: + name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }} + path: data/output/ + continue-on-error: true + + - name: Extract tar + id: extract + run: | + cd data/output + if [ -f extracted_data.tar ]; then + tar -xf extracted_data.tar + rm extracted_data.tar + echo "has_data=true" >> "$GITHUB_OUTPUT" + echo "=== Contents of data/output ===" + ls -lah + else + echo "No extracted_data.tar found" + echo "has_data=false" >> "$GITHUB_OUTPUT" + fi + + - name: Process ICAO chunk + if: steps.extract.outputs.has_data == 'true' + env: + START_DATE: ${{ matrix.chunk.start_date }} + END_DATE: ${{ matrix.chunk.end_date }} + run: | + python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.icao_chunk }} --total-chunks 4 --start-date "$START_DATE" --end-date "$END_DATE" + ls -lah data/output/adsb_chunks/ || echo "No chunks created" + + - name: Upload chunk artifacts + if: steps.extract.outputs.has_data == 'true' + uses: actions/upload-artifact@v4 + with: + name: adsb-map-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }}-chunk-${{ matrix.icao_chunk }} + path: data/output/adsb_chunks/ + retention-days: 1 + if-no-files-found: ignore + + adsb-reduce: + needs: [generate-matrix, adsb-map] + runs-on: ubuntu-24.04-arm + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt - name: Download all chunk artifacts uses: actions/download-artifact@v4 with: - path: chunks - pattern: chunk-* + pattern: adsb-map-* + path: data/output/adsb_chunks/ merge-multiple: true - - name: List downloaded chunks + - name: Debug downloaded files run: | - echo "Downloaded chunks:" - find chunks -name "*.csv" -type f 2>/dev/null || echo "No CSV files found" + echo "=== Listing data/output/adsb_chunks/ ===" + find data/output/adsb_chunks/ -type f 2>/dev/null | head -50 || echo "No files found" + echo "=== Looking for parquet files ===" + find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found" - - name: Combine chunks + - name: Combine chunks to CSV env: - GLOBAL_START_DATE: ${{ needs.generate-matrix.outputs.global_start }} - GLOBAL_END_DATE: ${{ needs.generate-matrix.outputs.global_end }} - run: python src/adsb/historical_combine_chunks.py + START_DATE: ${{ needs.generate-matrix.outputs.global_start }} + END_DATE: ${{ needs.generate-matrix.outputs.global_end }} + run: | + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date "$START_DATE" --end-date "$END_DATE" --skip-base + ls -lah data/planequery_aircraft/ - name: Upload final artifact uses: actions/upload-artifact@v4 diff --git a/.github/workflows/planequery-aircraft-daily-release.yaml b/.github/workflows/planequery-aircraft-daily-release.yaml index 37c96d1..00838cb 100644 --- a/.github/workflows/planequery-aircraft-daily-release.yaml +++ b/.github/workflows/planequery-aircraft-daily-release.yaml @@ -8,6 +8,7 @@ on: permissions: contents: write + actions: write jobs: trigger-releases: diff --git a/src/adsb/combine_chunks_to_csv.py b/src/adsb/combine_chunks_to_csv.py index d8941d3..2fe8b4e 100644 --- a/src/adsb/combine_chunks_to_csv.py +++ b/src/adsb/combine_chunks_to_csv.py @@ -2,10 +2,16 @@ Combines chunk parquet files and compresses to final aircraft CSV. This is the reduce phase of the map-reduce pipeline. +Supports both single-day (daily) and multi-day (historical) modes. + Memory-efficient: processes each chunk separately, compresses, then combines. Usage: + # Daily mode python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks + + # Historical mode + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base """ import gc import os @@ -117,9 +123,9 @@ def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame return compressed_df -def cleanup_chunks(date_str: str, chunks_dir: str): +def cleanup_chunks(output_id: str, chunks_dir: str): """Delete chunk parquet files after successful merge.""" - pattern = os.path.join(chunks_dir, f"chunk_*_{date_str}.parquet") + pattern = os.path.join(chunks_dir, f"chunk_*_{output_id}.parquet") chunk_files = glob.glob(pattern) for f in chunk_files: try: @@ -129,32 +135,56 @@ def cleanup_chunks(date_str: str, chunks_dir: str): print(f"Failed to delete {f}: {e}") +def find_chunk_files(chunks_dir: str, output_id: str) -> list[str]: + """Find chunk parquet files matching the output ID.""" + pattern = os.path.join(chunks_dir, f"chunk_*_{output_id}.parquet") + chunk_files = sorted(glob.glob(pattern)) + + if not chunk_files: + # Try recursive search for historical mode with merged artifacts + pattern = os.path.join(chunks_dir, "**", "*.parquet") + chunk_files = sorted(glob.glob(pattern, recursive=True)) + + return chunk_files + + def main(): parser = argparse.ArgumentParser(description="Combine chunk parquets to final CSV") - parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") + parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files") parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release") parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging") args = parser.parse_args() - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") + # Determine output ID and filename based on mode + if args.start_date and args.end_date: + # Historical mode + output_id = f"{args.start_date}_{args.end_date}" + output_filename = f"planequery_aircraft_adsb_{args.start_date}_{args.end_date}.csv" + print(f"Combining chunks for date range: {args.start_date} to {args.end_date}") else: - target_day = get_target_day() + # Daily mode + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + + date_str = target_day.strftime("%Y-%m-%d") + output_id = date_str + output_filename = f"planequery_aircraft_adsb_{date_str}.csv" + print(f"Combining chunks for {date_str}") - date_str = target_day.strftime("%Y-%m-%d") chunks_dir = args.chunks_dir - - print(f"Combining chunks for {date_str}") print(f"Chunks directory: {chunks_dir}") print(f"Resource usage at start: {get_resource_usage()}") # Find chunk files - pattern = os.path.join(chunks_dir, f"chunk_*_{date_str}.parquet") - chunk_files = sorted(glob.glob(pattern)) + chunk_files = find_chunk_files(chunks_dir, output_id) if not chunk_files: - print(f"No chunk files found matching: {pattern}") + print(f"No chunk files found in: {chunks_dir}") sys.exit(1) print(f"Found {len(chunk_files)} chunk files") @@ -174,7 +204,7 @@ def main(): gc.collect() print(f"After combining: {get_resource_usage()}") - # Merge with base release + # Merge with base release (unless skipped) if not args.skip_base: combined = download_and_merge_base_release(combined) @@ -190,13 +220,13 @@ def main(): combined = combined.sort('time') # Write final CSV - output_path = os.path.join(FINAL_OUTPUT_DIR, f"planequery_aircraft_adsb_{date_str}.csv") + output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename) combined.write_csv(output_path) print(f"Wrote {len(combined)} records to {output_path}") # Cleanup if not args.keep_chunks: - cleanup_chunks(date_str, chunks_dir) + cleanup_chunks(output_id, chunks_dir) print(f"Done! | {get_resource_usage()}") diff --git a/src/adsb/download_and_list_icaos.py b/src/adsb/download_and_list_icaos.py index b058b5d..8893f24 100644 --- a/src/adsb/download_and_list_icaos.py +++ b/src/adsb/download_and_list_icaos.py @@ -2,6 +2,8 @@ Downloads and extracts adsb.lol tar files, then lists all ICAO folders. This is the first step of the map-reduce pipeline. +Supports both single-day (daily) and multi-day (historical) modes. + Outputs: - Extracted trace files in data/output/{version_date}-planes-readsb-prod-0.tar_0/ - ICAO manifest at data/output/icao_manifest_{date}.txt @@ -25,7 +27,6 @@ from src.adsb.download_adsb_data_to_parquet import ( def get_target_day() -> datetime: """Get yesterday's date (the day we're processing).""" - # return datetime.utcnow() - timedelta(days=1) return datetime.utcnow() - timedelta(days=1) @@ -99,49 +100,111 @@ def list_icao_folders(extract_dir: str) -> list[str]: return icaos -def write_manifest(icaos: list[str], date_str: str) -> str: - """Write ICAO list to manifest file.""" - manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{date_str}.txt") +def write_manifest(icaos: list[str], manifest_id: str) -> str: + """Write ICAO list to manifest file. + + Args: + icaos: List of ICAO codes + manifest_id: Identifier for manifest file (date or date range) + """ + manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{manifest_id}.txt") with open(manifest_path, "w") as f: - for icao in icaos: + for icao in sorted(icaos): f.write(f"{icao}\n") print(f"Wrote manifest with {len(icaos)} ICAOs to {manifest_path}") return manifest_path -def main(): - parser = argparse.ArgumentParser(description="Download and list ICAOs from adsb.lol data") - parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") - args = parser.parse_args() - - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") - else: - target_day = get_target_day() +def process_single_day(target_day: datetime) -> tuple[str | None, list[str]]: + """Process a single day: download, extract, list ICAOs. + Returns: + Tuple of (extract_dir, icaos) + """ date_str = target_day.strftime("%Y-%m-%d") version_date = f"v{target_day.strftime('%Y.%m.%d')}" print(f"Processing date: {date_str} (version: {version_date})") - # Download and extract extract_dir = download_and_extract(version_date) if not extract_dir: - print("Failed to download/extract data") - sys.exit(1) + print(f"Failed to download/extract data for {date_str}") + return None, [] - # List ICAOs icaos = list_icao_folders(extract_dir) - if not icaos: - print("No ICAOs found") - sys.exit(1) + print(f"Found {len(icaos)} ICAOs for {date_str}") - # Write manifest - manifest_path = write_manifest(icaos, date_str) + return extract_dir, icaos + + +def process_date_range(start_date: datetime, end_date: datetime) -> set[str]: + """Process multiple days: download, extract, combine ICAO lists. - print(f"\nDone! Extract dir: {extract_dir}") - print(f"Manifest: {manifest_path}") - print(f"Total ICAOs: {len(icaos)}") + Args: + start_date: Start date (inclusive) + end_date: End date (inclusive) + + Returns: + Combined set of all ICAOs across the date range + """ + all_icaos: set[str] = set() + current = start_date + + # Both start and end are inclusive + while current <= end_date: + _, icaos = process_single_day(current) + all_icaos.update(icaos) + current += timedelta(days=1) + + return all_icaos + + +def main(): + parser = argparse.ArgumentParser(description="Download and list ICAOs from adsb.lol data") + parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") + parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") + args = parser.parse_args() + + # Determine mode: single day or date range + if args.start_date and args.end_date: + # Historical mode: process date range + start_date = datetime.strptime(args.start_date, "%Y-%m-%d") + end_date = datetime.strptime(args.end_date, "%Y-%m-%d") + + print(f"Processing date range: {args.start_date} to {args.end_date}") + + all_icaos = process_date_range(start_date, end_date) + + if not all_icaos: + print("No ICAOs found in date range") + sys.exit(1) + + # Write combined manifest with range identifier + manifest_id = f"{args.start_date}_{args.end_date}" + write_manifest(list(all_icaos), manifest_id) + + print(f"\nDone! Total ICAOs: {len(all_icaos)}") + + else: + # Daily mode: single day + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + + date_str = target_day.strftime("%Y-%m-%d") + + extract_dir, icaos = process_single_day(target_day) + + if not icaos: + print("No ICAOs found") + sys.exit(1) + + write_manifest(icaos, date_str) + + print(f"\nDone! Extract dir: {extract_dir}") + print(f"Total ICAOs: {len(icaos)}") if __name__ == "__main__": diff --git a/src/adsb/historical_combine_chunks.py b/src/adsb/historical_combine_chunks.py deleted file mode 100644 index 06ce5c6..0000000 --- a/src/adsb/historical_combine_chunks.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3 -"""Combine processed chunks into final historical ADS-B release.""" - -import os -import sys -from pathlib import Path - -import polars as pl - - -def combine_chunks(chunks_dir: Path, output_dir: Path, start_date: str, end_date: str) -> Path: - """Combine all chunk CSVs into final output. - - Args: - chunks_dir: Directory containing chunk CSV files - output_dir: Directory to write final output - start_date: Global start date for filename - end_date: Global end date for filename - - Returns: - Path to final output CSV - """ - # Import here to allow script to be run from repo root - sys.path.insert(0, str(Path(__file__).parent)) - from compress_adsb_to_aircraft_data import deduplicate_by_signature - - csv_files = sorted(chunks_dir.glob("**/*.csv")) - print(f"Found {len(csv_files)} chunk files") - - if not csv_files: - print("ERROR: No chunk files found", file=sys.stderr) - sys.exit(1) - - dfs: list[pl.DataFrame] = [] - for csv_file in csv_files: - print(f"Loading {csv_file}") - df = pl.read_csv(csv_file, null_values=[""]) - dfs.append(df) - print(f" {df.height} rows") - - df_combined = pl.concat(dfs) - print(f"Combined: {df_combined.height} rows") - - df_combined = deduplicate_by_signature(df_combined) - print(f"After final dedup: {df_combined.height} rows") - - # Sort by time - if "time" in df_combined.columns: - df_combined = df_combined.sort("time") - - # Convert list columns to strings for CSV compatibility - for col in df_combined.columns: - if df_combined[col].dtype == pl.List: - df_combined = df_combined.with_columns( - pl.col(col).list.join(",").alias(col) - ) - - # Write output - output_dir.mkdir(parents=True, exist_ok=True) - output_path = output_dir / f"planequery_aircraft_adsb_{start_date}_{end_date}.csv" - - df_combined.write_csv(output_path) - print(f"Wrote final output: {output_path}") - print(f"Total records: {df_combined.height}") - - return output_path - - -def main() -> None: - """Main entry point for GitHub Actions.""" - start_date = os.environ.get("GLOBAL_START_DATE") - end_date = os.environ.get("GLOBAL_END_DATE") - - if not start_date or not end_date: - print("ERROR: GLOBAL_START_DATE and GLOBAL_END_DATE must be set", file=sys.stderr) - sys.exit(1) - - chunks_dir = Path("chunks") - output_dir = Path("data/planequery_aircraft") - - combine_chunks(chunks_dir, output_dir, start_date, end_date) - - -if __name__ == "__main__": - main() diff --git a/src/adsb/historical_generate_matrix.py b/src/adsb/historical_generate_matrix.py index 3a687e5..cd11789 100644 --- a/src/adsb/historical_generate_matrix.py +++ b/src/adsb/historical_generate_matrix.py @@ -11,12 +11,12 @@ def generate_chunks(start_date: str, end_date: str, chunk_days: int) -> list[dic """Generate date chunks for parallel processing. Args: - start_date: Start date in YYYY-MM-DD format - end_date: End date in YYYY-MM-DD format + start_date: Start date in YYYY-MM-DD format (inclusive) + end_date: End date in YYYY-MM-DD format (exclusive) chunk_days: Number of days per chunk Returns: - List of chunk dictionaries with start_date and end_date + List of chunk dictionaries with start_date and end_date (both inclusive within chunk) """ start = datetime.strptime(start_date, "%Y-%m-%d") end = datetime.strptime(end_date, "%Y-%m-%d") @@ -24,8 +24,10 @@ def generate_chunks(start_date: str, end_date: str, chunk_days: int) -> list[dic chunks = [] current = start - while current <= end: - chunk_end = min(current + timedelta(days=chunk_days - 1), end) + # end_date is exclusive, so we process up to but not including it + while current < end: + # chunk_end is inclusive, so subtract 1 from the next chunk start + chunk_end = min(current + timedelta(days=chunk_days - 1), end - timedelta(days=1)) chunks.append({ "start_date": current.strftime("%Y-%m-%d"), "end_date": chunk_end.strftime("%Y-%m-%d"), diff --git a/src/adsb/historical_process_chunk.py b/src/adsb/historical_process_chunk.py deleted file mode 100644 index f5dbe1e..0000000 --- a/src/adsb/historical_process_chunk.py +++ /dev/null @@ -1,91 +0,0 @@ -#!/usr/bin/env python3 -"""Process a single date chunk for historical ADS-B data.""" - -import os -import sys -from datetime import datetime, timedelta -from pathlib import Path - -# Add parent directory to path for imports when run from repo root -sys.path.insert(0, str(Path(__file__).parent)) - - -def process_chunk(start_date: str, end_date: str, output_dir: Path) -> Path | None: - """Process a date range and output compressed CSV. - - Args: - start_date: Start date in YYYY-MM-DD format - end_date: End date in YYYY-MM-DD format - output_dir: Directory to write output CSV - - Returns: - Path to output CSV, or None if no data - """ - from compress_adsb_to_aircraft_data import ( - load_historical_for_day, - deduplicate_by_signature, - ) - import polars as pl - - start = datetime.strptime(start_date, "%Y-%m-%d") - end = datetime.strptime(end_date, "%Y-%m-%d") - - total_days = (end - start).days + 1 - print(f"Processing {total_days} days [{start_date}, {end_date}]") - - dfs: list[pl.DataFrame] = [] - current_date = start - - while current_date <= end: - day_str = current_date.strftime("%Y-%m-%d") - print(f" Loading {day_str}...") - - try: - df_compressed = load_historical_for_day(current_date) - if df_compressed.height > 0: - dfs.append(df_compressed) - total_rows = sum(df.height for df in dfs) - print(f" +{df_compressed.height} rows (total: {total_rows})") - except Exception as e: - print(f" Warning: Failed to load {day_str}: {e}") - - current_date += timedelta(days=1) - - if not dfs: - print("No data found for this chunk") - return None - - df_accumulated = pl.concat(dfs) - df_accumulated = deduplicate_by_signature(df_accumulated) - print(f"After dedup: {df_accumulated.height} rows") - - # Write output - output_dir.mkdir(parents=True, exist_ok=True) - output_path = output_dir / f"chunk_{start_date}_{end_date}.csv" - df_accumulated.write_csv(output_path) - print(f"Wrote {output_path}") - - return output_path - - -def main() -> None: - """Main entry point for GitHub Actions.""" - start_date = os.environ.get("CHUNK_START_DATE") - end_date = os.environ.get("CHUNK_END_DATE") - - if not start_date or not end_date: - print("ERROR: CHUNK_START_DATE and CHUNK_END_DATE must be set", file=sys.stderr) - sys.exit(1) - - # Output to repo root data/chunks (script runs from src/adsb) - repo_root = Path(__file__).parent.parent.parent - output_dir = repo_root / "data" / "chunks" - result = process_chunk(start_date, end_date, output_dir) - - if result is None: - print("No data produced for this chunk") - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/src/adsb/process_icao_chunk.py b/src/adsb/process_icao_chunk.py index 9092088..e67673f 100644 --- a/src/adsb/process_icao_chunk.py +++ b/src/adsb/process_icao_chunk.py @@ -2,11 +2,17 @@ Processes a chunk of ICAOs from pre-extracted trace files. This is the map phase of the map-reduce pipeline. +Supports both single-day (daily) and multi-day (historical) modes. + Expects extract_dir to already exist with trace files. Reads ICAO manifest to determine which ICAOs to process based on chunk-id. Usage: + # Daily mode (single day) python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 + + # Historical mode (date range) + python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 --start-date 2024-01-01 --end-date 2024-01-07 """ import gc import os @@ -43,9 +49,13 @@ def get_target_day() -> datetime: return datetime.utcnow() - timedelta(days=1) -def read_manifest(date_str: str) -> list[str]: - """Read ICAO manifest file.""" - manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{date_str}.txt") +def read_manifest(manifest_id: str) -> list[str]: + """Read ICAO manifest file. + + Args: + manifest_id: Either a date string (YYYY-MM-DD) or range string (YYYY-MM-DD_YYYY-MM-DD) + """ + manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{manifest_id}.txt") if not os.path.exists(manifest_path): raise FileNotFoundError(f"Manifest not found: {manifest_path}") @@ -119,9 +129,17 @@ def process_chunk( total_chunks: int, trace_map: dict[str, str], icaos: list[str], - date_str: str, + output_id: str, ) -> str | None: - """Process a chunk of ICAOs and write to parquet.""" + """Process a chunk of ICAOs and write to parquet. + + Args: + chunk_id: This chunk's ID (0-indexed) + total_chunks: Total number of chunks + trace_map: Map of ICAO -> trace file path + icaos: Full list of ICAOs from manifest + output_id: Identifier for output file (date or date range) + """ chunk_icaos = get_chunk_icaos(icaos, chunk_id, total_chunks) print(f"Chunk {chunk_id}/{total_chunks}: Processing {len(chunk_icaos)} ICAOs") @@ -142,7 +160,7 @@ def process_chunk( return None # Process files and write parquet in batches - output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{date_str}.parquet") + output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{output_id}.parquet") start_time = time.perf_counter() total_rows = 0 @@ -200,22 +218,95 @@ def process_chunk( return None +def process_single_day( + chunk_id: int, + total_chunks: int, + target_day: datetime, +) -> str | None: + """Process a single day for this chunk.""" + date_str = target_day.strftime("%Y-%m-%d") + version_date = f"v{target_day.strftime('%Y.%m.%d')}" + + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + + if not os.path.isdir(extract_dir): + print(f"Extract directory not found: {extract_dir}") + return None + + trace_map = build_trace_file_map(extract_dir) + if not trace_map: + print("No trace files found") + return None + + icaos = read_manifest(date_str) + print(f"Total ICAOs in manifest: {len(icaos)}") + + return process_chunk(chunk_id, total_chunks, trace_map, icaos, date_str) + + +def process_date_range( + chunk_id: int, + total_chunks: int, + start_date: datetime, + end_date: datetime, +) -> str | None: + """Process a date range for this chunk. + + Combines trace files from all days in the range. + + Args: + chunk_id: This chunk's ID (0-indexed) + total_chunks: Total number of chunks + start_date: Start date (inclusive) + end_date: End date (inclusive) + """ + start_str = start_date.strftime("%Y-%m-%d") + end_str = end_date.strftime("%Y-%m-%d") + manifest_id = f"{start_str}_{end_str}" + + print(f"Processing date range: {start_str} to {end_str}") + + # Build combined trace map from all days + combined_trace_map: dict[str, str] = {} + current = start_date + + # Both start and end are inclusive + while current <= end_date: + version_date = f"v{current.strftime('%Y.%m.%d')}" + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + + if os.path.isdir(extract_dir): + trace_map = build_trace_file_map(extract_dir) + # Later days override earlier days (use most recent trace file) + combined_trace_map.update(trace_map) + print(f" {current.strftime('%Y-%m-%d')}: {len(trace_map)} trace files") + else: + print(f" {current.strftime('%Y-%m-%d')}: no extract directory") + + current += timedelta(days=1) + + if not combined_trace_map: + print("No trace files found in date range") + return None + + print(f"Combined trace map: {len(combined_trace_map)} ICAOs") + + icaos = read_manifest(manifest_id) + print(f"Total ICAOs in manifest: {len(icaos)}") + + return process_chunk(chunk_id, total_chunks, combined_trace_map, icaos, manifest_id) + + def main(): parser = argparse.ArgumentParser(description="Process a chunk of ICAOs") parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)") parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks") - parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") + parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") args = parser.parse_args() - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") - else: - target_day = get_target_day() - - date_str = target_day.strftime("%Y-%m-%d") - version_date = f"v{target_day.strftime('%Y.%m.%d')}" - - print(f"Processing chunk {args.chunk_id}/{args.total_chunks} for {date_str}") + print(f"Processing chunk {args.chunk_id}/{args.total_chunks}") print(f"OUTPUT_DIR: {OUTPUT_DIR}") print(f"CHUNK_OUTPUT_DIR: {CHUNK_OUTPUT_DIR}") print(f"Resource usage at start: {get_resource_usage()}") @@ -228,37 +319,19 @@ def main(): else: print(f" Directory does not exist!") - # Find extract directory - extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") - print(f"\nLooking for extract_dir: {extract_dir}") - if not os.path.isdir(extract_dir): - print(f"Extract directory not found: {extract_dir}") - # Try to find any extracted directory - import glob - pattern = os.path.join(OUTPUT_DIR, "*-planes-readsb-prod-0*") - matches = glob.glob(pattern) - print(f"Searching for pattern: {pattern}") - print(f"Found matches: {matches}") - sys.exit(1) - - # Build trace file map using find - trace_map = build_trace_file_map(extract_dir) - if not trace_map: - print("No trace files found in extract directory") - sys.exit(1) - - # Read manifest - icaos = read_manifest(date_str) - print(f"Total ICAOs in manifest: {len(icaos)}") - - # Process chunk - output_path = process_chunk( - args.chunk_id, - args.total_chunks, - trace_map, - icaos, - date_str, - ) + # Determine mode: single day or date range + if args.start_date and args.end_date: + # Historical mode + start_date = datetime.strptime(args.start_date, "%Y-%m-%d") + end_date = datetime.strptime(args.end_date, "%Y-%m-%d") + output_path = process_date_range(args.chunk_id, args.total_chunks, start_date, end_date) + else: + # Daily mode + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + output_path = process_single_day(args.chunk_id, args.total_chunks, target_day) if output_path: print(f"Output: {output_path}")