diff --git a/.github/ISSUE_TEMPLATE/community_submission.yaml b/.github/ISSUE_TEMPLATE/community_submission.yaml index 104aa8e..938843d 100644 --- a/.github/ISSUE_TEMPLATE/community_submission.yaml +++ b/.github/ISSUE_TEMPLATE/community_submission.yaml @@ -43,7 +43,7 @@ body: id: contributor_name attributes: label: Contributor Name - description: Your display name for attribution. Leave blank to use your GitHub username. Max 150 characters. + description: Your display name for attribution. Leave blank for no attribution. Max 150 characters. placeholder: "e.g., JamesBerry.com or leave blank" validations: required: false @@ -58,28 +58,6 @@ body: validations: required: true - - type: dropdown - id: submission_type - attributes: - label: What did you submit? - options: - - Single object - - Multiple objects (array) - validations: - required: true - - - type: checkboxes - id: confirmations - attributes: - label: Confirmations - options: - - label: "I confirm this is valid JSON (not JSONL) and matches the field names exactly." - required: true - - label: "I confirm `transponder_code_hex` values (if provided) are 6 hex characters." - required: true - - label: "I understand submissions are reviewed and may be rejected or require changes." - required: true - - type: textarea id: notes attributes: diff --git a/.github/workflows/historical-adsb.yaml b/.github/workflows/historical-adsb.yaml new file mode 100644 index 0000000..0bb99a1 --- /dev/null +++ b/.github/workflows/historical-adsb.yaml @@ -0,0 +1,209 @@ +name: Historical ADS-B Processing + +on: + workflow_dispatch: + inputs: + start_date: + description: 'Start date (YYYY-MM-DD, inclusive)' + required: true + type: string + end_date: + description: 'End date (YYYY-MM-DD, exclusive)' + required: true + type: string + chunk_days: + description: 'Days per job chunk (default: 7)' + required: false + type: number + default: 7 + +jobs: + generate-matrix: + runs-on: ubuntu-latest + outputs: + chunks: ${{ steps.generate.outputs.chunks }} + global_start: ${{ inputs.start_date }} + global_end: ${{ inputs.end_date }} + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Generate date chunks + id: generate + env: + INPUT_START_DATE: ${{ inputs.start_date }} + INPUT_END_DATE: ${{ inputs.end_date }} + INPUT_CHUNK_DAYS: ${{ inputs.chunk_days }} + run: python src/adsb/historical_generate_matrix.py + + adsb-extract: + needs: generate-matrix + runs-on: ubuntu-24.04-arm + strategy: + matrix: + chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }} + max-parallel: 3 + fail-fast: false + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Free disk space + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf /opt/ghc + sudo rm -rf /usr/local/share/boost + df -h + + - name: Download and extract ADS-B data + env: + START_DATE: ${{ matrix.chunk.start_date }} + END_DATE: ${{ matrix.chunk.end_date }} + run: | + python -m src.adsb.download_and_list_icaos --start-date "$START_DATE" --end-date "$END_DATE" + ls -lah data/output/ + + - name: Create tar of extracted data + run: | + cd data/output + tar -cf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt 2>/dev/null || echo "Some files may not exist" + ls -lah extracted_data.tar || echo "No tar created" + + - name: Upload extracted data + uses: actions/upload-artifact@v4 + with: + name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }} + path: data/output/extracted_data.tar + retention-days: 1 + compression-level: 0 + if-no-files-found: warn + + adsb-map: + needs: [generate-matrix, adsb-extract] + runs-on: ubuntu-24.04-arm + strategy: + fail-fast: false + matrix: + chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }} + icao_chunk: [0, 1, 2, 3] + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Free disk space + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf /opt/ghc + sudo rm -rf /usr/local/share/boost + df -h + + - name: Download extracted data + uses: actions/download-artifact@v4 + with: + name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }} + path: data/output/ + continue-on-error: true + + - name: Extract tar + id: extract + run: | + cd data/output + if [ -f extracted_data.tar ]; then + tar -xf extracted_data.tar + rm extracted_data.tar + echo "has_data=true" >> "$GITHUB_OUTPUT" + echo "=== Contents of data/output ===" + ls -lah + else + echo "No extracted_data.tar found" + echo "has_data=false" >> "$GITHUB_OUTPUT" + fi + + - name: Process ICAO chunk + if: steps.extract.outputs.has_data == 'true' + env: + START_DATE: ${{ matrix.chunk.start_date }} + END_DATE: ${{ matrix.chunk.end_date }} + run: | + python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.icao_chunk }} --total-chunks 4 --start-date "$START_DATE" --end-date "$END_DATE" + ls -lah data/output/adsb_chunks/ || echo "No chunks created" + + - name: Upload chunk artifacts + if: steps.extract.outputs.has_data == 'true' + uses: actions/upload-artifact@v4 + with: + name: adsb-map-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }}-chunk-${{ matrix.icao_chunk }} + path: data/output/adsb_chunks/ + retention-days: 1 + if-no-files-found: ignore + + adsb-reduce: + needs: [generate-matrix, adsb-map] + runs-on: ubuntu-24.04-arm + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download all chunk artifacts + uses: actions/download-artifact@v4 + with: + pattern: adsb-map-* + path: data/output/adsb_chunks/ + merge-multiple: true + + - name: Debug downloaded files + run: | + echo "=== Listing data/output/adsb_chunks/ ===" + find data/output/adsb_chunks/ -type f 2>/dev/null | head -50 || echo "No files found" + echo "=== Looking for parquet files ===" + find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found" + + - name: Combine chunks to CSV + env: + START_DATE: ${{ needs.generate-matrix.outputs.global_start }} + END_DATE: ${{ needs.generate-matrix.outputs.global_end }} + run: | + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date "$START_DATE" --end-date "$END_DATE" --skip-base + ls -lah data/planequery_aircraft/ + + - name: Upload final artifact + uses: actions/upload-artifact@v4 + with: + name: planequery_aircraft_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }} + path: data/planequery_aircraft/*.csv + retention-days: 30 diff --git a/.github/workflows/planequery-aircraft-daily-release.yaml b/.github/workflows/planequery-aircraft-daily-release.yaml index 37c96d1..00838cb 100644 --- a/.github/workflows/planequery-aircraft-daily-release.yaml +++ b/.github/workflows/planequery-aircraft-daily-release.yaml @@ -8,6 +8,7 @@ on: permissions: contents: write + actions: write jobs: trigger-releases: diff --git a/.github/workflows/process-historical-faa.yaml b/.github/workflows/process-historical-faa.yaml deleted file mode 100644 index d015499..0000000 --- a/.github/workflows/process-historical-faa.yaml +++ /dev/null @@ -1,171 +0,0 @@ -name: Process Historical FAA Data - -on: - workflow_dispatch: # Manual trigger - -jobs: - generate-matrix: - runs-on: ubuntu-latest - outputs: - matrix: ${{ steps.set-matrix.outputs.matrix }} - steps: - - name: Generate date ranges - id: set-matrix - run: | - python3 << 'EOF' - import json - from datetime import datetime, timedelta - - start = datetime(2023, 8, 16) - end = datetime(2026, 1, 1) - - ranges = [] - current = start - - # Process in 4-day chunks - while current < end: - chunk_end = current + timedelta(days=4) - # Don't go past the end date - if chunk_end > end: - chunk_end = end - - ranges.append({ - "since": current.strftime("%Y-%m-%d"), - "until": chunk_end.strftime("%Y-%m-%d") - }) - - current = chunk_end - - print(f"::set-output name=matrix::{json.dumps(ranges)}") - EOF - - clone-faa-repo: - runs-on: ubuntu-latest - steps: - - name: Cache FAA repository - id: cache-faa-repo - uses: actions/cache@v4 - with: - path: data/scrape-faa-releasable-aircraft - key: faa-repo-v1 - - - name: Clone FAA repository - if: steps.cache-faa-repo.outputs.cache-hit != 'true' - run: | - mkdir -p data - git clone https://github.com/simonw/scrape-faa-releasable-aircraft data/scrape-faa-releasable-aircraft - echo "Repository cloned successfully" - - process-chunk: - needs: [generate-matrix, clone-faa-repo] - runs-on: ubuntu-latest - strategy: - max-parallel: 5 # Process 5 chunks at a time - matrix: - range: ${{ fromJson(needs.generate-matrix.outputs.matrix) }} - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: '3.12' - - - name: Restore FAA repository cache - uses: actions/cache/restore@v4 - with: - path: data/scrape-faa-releasable-aircraft - key: faa-repo-v1 - fail-on-cache-miss: true - - - name: Install dependencies - run: | - pip install -r requirements.txt - - - name: Process chunk ${{ matrix.range.since }} to ${{ matrix.range.until }} - run: | - python src/get_historical_faa.py "${{ matrix.range.since }}" "${{ matrix.range.until }}" - - - name: Upload CSV artifact - uses: actions/upload-artifact@v4 - with: - name: csv-${{ matrix.range.since }}-to-${{ matrix.range.until }} - path: data/faa_releasable_historical/*.csv - retention-days: 1 - - create-release: - needs: process-chunk - runs-on: ubuntu-latest - permissions: - contents: write - steps: - - name: Download all artifacts - uses: actions/download-artifact@v4 - with: - path: artifacts - - - name: Prepare release files - run: | - mkdir -p release-files - find artifacts -name "*.csv" -exec cp {} release-files/ \; - ls -lh release-files/ - - - name: Create Release - uses: softprops/action-gh-release@v1 - with: - tag_name: historical-faa-${{ github.run_number }} - name: Historical FAA Data Release ${{ github.run_number }} - body: | - Automated release of historical FAA aircraft data - Processing period: 2023-08-16 to 2026-01-01 - Generated: ${{ github.event.repository.updated_at }} - files: release-files/*.csv - draft: false - prerelease: false - - concatenate-and-release: - needs: process-chunk - runs-on: ubuntu-latest - permissions: - contents: write - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: '3.12' - - - name: Install dependencies - run: | - pip install -r requirements.txt - - - name: Download all artifacts - uses: actions/download-artifact@v4 - with: - path: artifacts - - - name: Prepare CSVs for concatenation - run: | - mkdir -p data/faa_releasable_historical - find artifacts -name "*.csv" -exec cp {} data/faa_releasable_historical/ \; - ls -lh data/faa_releasable_historical/ - - - name: Concatenate all CSVs - run: | - python scripts/concat_csvs.py - - - name: Create Combined Release - uses: softprops/action-gh-release@v1 - with: - tag_name: historical-faa-combined-${{ github.run_number }} - name: Historical FAA Data Combined Release ${{ github.run_number }} - body: | - Combined historical FAA aircraft data (all chunks concatenated) - Processing period: 2023-08-16 to 2026-01-01 - Generated: ${{ github.event.repository.updated_at }} - files: data/planequery_aircraft/*.csv - draft: false - prerelease: false \ No newline at end of file diff --git a/src/adsb/combine_chunks_to_csv.py b/src/adsb/combine_chunks_to_csv.py index d8941d3..2fe8b4e 100644 --- a/src/adsb/combine_chunks_to_csv.py +++ b/src/adsb/combine_chunks_to_csv.py @@ -2,10 +2,16 @@ Combines chunk parquet files and compresses to final aircraft CSV. This is the reduce phase of the map-reduce pipeline. +Supports both single-day (daily) and multi-day (historical) modes. + Memory-efficient: processes each chunk separately, compresses, then combines. Usage: + # Daily mode python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks + + # Historical mode + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base """ import gc import os @@ -117,9 +123,9 @@ def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame return compressed_df -def cleanup_chunks(date_str: str, chunks_dir: str): +def cleanup_chunks(output_id: str, chunks_dir: str): """Delete chunk parquet files after successful merge.""" - pattern = os.path.join(chunks_dir, f"chunk_*_{date_str}.parquet") + pattern = os.path.join(chunks_dir, f"chunk_*_{output_id}.parquet") chunk_files = glob.glob(pattern) for f in chunk_files: try: @@ -129,32 +135,56 @@ def cleanup_chunks(date_str: str, chunks_dir: str): print(f"Failed to delete {f}: {e}") +def find_chunk_files(chunks_dir: str, output_id: str) -> list[str]: + """Find chunk parquet files matching the output ID.""" + pattern = os.path.join(chunks_dir, f"chunk_*_{output_id}.parquet") + chunk_files = sorted(glob.glob(pattern)) + + if not chunk_files: + # Try recursive search for historical mode with merged artifacts + pattern = os.path.join(chunks_dir, "**", "*.parquet") + chunk_files = sorted(glob.glob(pattern, recursive=True)) + + return chunk_files + + def main(): parser = argparse.ArgumentParser(description="Combine chunk parquets to final CSV") - parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") + parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files") parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release") parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging") args = parser.parse_args() - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") + # Determine output ID and filename based on mode + if args.start_date and args.end_date: + # Historical mode + output_id = f"{args.start_date}_{args.end_date}" + output_filename = f"planequery_aircraft_adsb_{args.start_date}_{args.end_date}.csv" + print(f"Combining chunks for date range: {args.start_date} to {args.end_date}") else: - target_day = get_target_day() + # Daily mode + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + + date_str = target_day.strftime("%Y-%m-%d") + output_id = date_str + output_filename = f"planequery_aircraft_adsb_{date_str}.csv" + print(f"Combining chunks for {date_str}") - date_str = target_day.strftime("%Y-%m-%d") chunks_dir = args.chunks_dir - - print(f"Combining chunks for {date_str}") print(f"Chunks directory: {chunks_dir}") print(f"Resource usage at start: {get_resource_usage()}") # Find chunk files - pattern = os.path.join(chunks_dir, f"chunk_*_{date_str}.parquet") - chunk_files = sorted(glob.glob(pattern)) + chunk_files = find_chunk_files(chunks_dir, output_id) if not chunk_files: - print(f"No chunk files found matching: {pattern}") + print(f"No chunk files found in: {chunks_dir}") sys.exit(1) print(f"Found {len(chunk_files)} chunk files") @@ -174,7 +204,7 @@ def main(): gc.collect() print(f"After combining: {get_resource_usage()}") - # Merge with base release + # Merge with base release (unless skipped) if not args.skip_base: combined = download_and_merge_base_release(combined) @@ -190,13 +220,13 @@ def main(): combined = combined.sort('time') # Write final CSV - output_path = os.path.join(FINAL_OUTPUT_DIR, f"planequery_aircraft_adsb_{date_str}.csv") + output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename) combined.write_csv(output_path) print(f"Wrote {len(combined)} records to {output_path}") # Cleanup if not args.keep_chunks: - cleanup_chunks(date_str, chunks_dir) + cleanup_chunks(output_id, chunks_dir) print(f"Done! | {get_resource_usage()}") diff --git a/src/adsb/download_and_list_icaos.py b/src/adsb/download_and_list_icaos.py index b058b5d..8893f24 100644 --- a/src/adsb/download_and_list_icaos.py +++ b/src/adsb/download_and_list_icaos.py @@ -2,6 +2,8 @@ Downloads and extracts adsb.lol tar files, then lists all ICAO folders. This is the first step of the map-reduce pipeline. +Supports both single-day (daily) and multi-day (historical) modes. + Outputs: - Extracted trace files in data/output/{version_date}-planes-readsb-prod-0.tar_0/ - ICAO manifest at data/output/icao_manifest_{date}.txt @@ -25,7 +27,6 @@ from src.adsb.download_adsb_data_to_parquet import ( def get_target_day() -> datetime: """Get yesterday's date (the day we're processing).""" - # return datetime.utcnow() - timedelta(days=1) return datetime.utcnow() - timedelta(days=1) @@ -99,49 +100,111 @@ def list_icao_folders(extract_dir: str) -> list[str]: return icaos -def write_manifest(icaos: list[str], date_str: str) -> str: - """Write ICAO list to manifest file.""" - manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{date_str}.txt") +def write_manifest(icaos: list[str], manifest_id: str) -> str: + """Write ICAO list to manifest file. + + Args: + icaos: List of ICAO codes + manifest_id: Identifier for manifest file (date or date range) + """ + manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{manifest_id}.txt") with open(manifest_path, "w") as f: - for icao in icaos: + for icao in sorted(icaos): f.write(f"{icao}\n") print(f"Wrote manifest with {len(icaos)} ICAOs to {manifest_path}") return manifest_path -def main(): - parser = argparse.ArgumentParser(description="Download and list ICAOs from adsb.lol data") - parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") - args = parser.parse_args() - - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") - else: - target_day = get_target_day() +def process_single_day(target_day: datetime) -> tuple[str | None, list[str]]: + """Process a single day: download, extract, list ICAOs. + Returns: + Tuple of (extract_dir, icaos) + """ date_str = target_day.strftime("%Y-%m-%d") version_date = f"v{target_day.strftime('%Y.%m.%d')}" print(f"Processing date: {date_str} (version: {version_date})") - # Download and extract extract_dir = download_and_extract(version_date) if not extract_dir: - print("Failed to download/extract data") - sys.exit(1) + print(f"Failed to download/extract data for {date_str}") + return None, [] - # List ICAOs icaos = list_icao_folders(extract_dir) - if not icaos: - print("No ICAOs found") - sys.exit(1) + print(f"Found {len(icaos)} ICAOs for {date_str}") - # Write manifest - manifest_path = write_manifest(icaos, date_str) + return extract_dir, icaos + + +def process_date_range(start_date: datetime, end_date: datetime) -> set[str]: + """Process multiple days: download, extract, combine ICAO lists. - print(f"\nDone! Extract dir: {extract_dir}") - print(f"Manifest: {manifest_path}") - print(f"Total ICAOs: {len(icaos)}") + Args: + start_date: Start date (inclusive) + end_date: End date (inclusive) + + Returns: + Combined set of all ICAOs across the date range + """ + all_icaos: set[str] = set() + current = start_date + + # Both start and end are inclusive + while current <= end_date: + _, icaos = process_single_day(current) + all_icaos.update(icaos) + current += timedelta(days=1) + + return all_icaos + + +def main(): + parser = argparse.ArgumentParser(description="Download and list ICAOs from adsb.lol data") + parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") + parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") + args = parser.parse_args() + + # Determine mode: single day or date range + if args.start_date and args.end_date: + # Historical mode: process date range + start_date = datetime.strptime(args.start_date, "%Y-%m-%d") + end_date = datetime.strptime(args.end_date, "%Y-%m-%d") + + print(f"Processing date range: {args.start_date} to {args.end_date}") + + all_icaos = process_date_range(start_date, end_date) + + if not all_icaos: + print("No ICAOs found in date range") + sys.exit(1) + + # Write combined manifest with range identifier + manifest_id = f"{args.start_date}_{args.end_date}" + write_manifest(list(all_icaos), manifest_id) + + print(f"\nDone! Total ICAOs: {len(all_icaos)}") + + else: + # Daily mode: single day + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + + date_str = target_day.strftime("%Y-%m-%d") + + extract_dir, icaos = process_single_day(target_day) + + if not icaos: + print("No ICAOs found") + sys.exit(1) + + write_manifest(icaos, date_str) + + print(f"\nDone! Extract dir: {extract_dir}") + print(f"Total ICAOs: {len(icaos)}") if __name__ == "__main__": diff --git a/src/adsb/historical_generate_matrix.py b/src/adsb/historical_generate_matrix.py new file mode 100644 index 0000000..cd11789 --- /dev/null +++ b/src/adsb/historical_generate_matrix.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +"""Generate date chunk matrix for historical ADS-B processing.""" + +import json +import os +import sys +from datetime import datetime, timedelta + + +def generate_chunks(start_date: str, end_date: str, chunk_days: int) -> list[dict]: + """Generate date chunks for parallel processing. + + Args: + start_date: Start date in YYYY-MM-DD format (inclusive) + end_date: End date in YYYY-MM-DD format (exclusive) + chunk_days: Number of days per chunk + + Returns: + List of chunk dictionaries with start_date and end_date (both inclusive within chunk) + """ + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + + chunks = [] + current = start + + # end_date is exclusive, so we process up to but not including it + while current < end: + # chunk_end is inclusive, so subtract 1 from the next chunk start + chunk_end = min(current + timedelta(days=chunk_days - 1), end - timedelta(days=1)) + chunks.append({ + "start_date": current.strftime("%Y-%m-%d"), + "end_date": chunk_end.strftime("%Y-%m-%d"), + }) + current = chunk_end + timedelta(days=1) + + return chunks + + +def main() -> None: + """Main entry point for GitHub Actions.""" + start_date = os.environ.get("INPUT_START_DATE") + end_date = os.environ.get("INPUT_END_DATE") + chunk_days = int(os.environ.get("INPUT_CHUNK_DAYS", "7")) + + if not start_date or not end_date: + print("ERROR: INPUT_START_DATE and INPUT_END_DATE must be set", file=sys.stderr) + sys.exit(1) + + chunks = generate_chunks(start_date, end_date, chunk_days) + print(f"Generated {len(chunks)} chunks for {start_date} to {end_date}") + + # Write to GitHub Actions output + github_output = os.environ.get("GITHUB_OUTPUT") + if github_output: + with open(github_output, "a") as f: + f.write(f"chunks={json.dumps(chunks)}\n") + else: + # For local testing, just print + print(json.dumps(chunks, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/src/adsb/process_icao_chunk.py b/src/adsb/process_icao_chunk.py index 9092088..e67673f 100644 --- a/src/adsb/process_icao_chunk.py +++ b/src/adsb/process_icao_chunk.py @@ -2,11 +2,17 @@ Processes a chunk of ICAOs from pre-extracted trace files. This is the map phase of the map-reduce pipeline. +Supports both single-day (daily) and multi-day (historical) modes. + Expects extract_dir to already exist with trace files. Reads ICAO manifest to determine which ICAOs to process based on chunk-id. Usage: + # Daily mode (single day) python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 + + # Historical mode (date range) + python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 --start-date 2024-01-01 --end-date 2024-01-07 """ import gc import os @@ -43,9 +49,13 @@ def get_target_day() -> datetime: return datetime.utcnow() - timedelta(days=1) -def read_manifest(date_str: str) -> list[str]: - """Read ICAO manifest file.""" - manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{date_str}.txt") +def read_manifest(manifest_id: str) -> list[str]: + """Read ICAO manifest file. + + Args: + manifest_id: Either a date string (YYYY-MM-DD) or range string (YYYY-MM-DD_YYYY-MM-DD) + """ + manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{manifest_id}.txt") if not os.path.exists(manifest_path): raise FileNotFoundError(f"Manifest not found: {manifest_path}") @@ -119,9 +129,17 @@ def process_chunk( total_chunks: int, trace_map: dict[str, str], icaos: list[str], - date_str: str, + output_id: str, ) -> str | None: - """Process a chunk of ICAOs and write to parquet.""" + """Process a chunk of ICAOs and write to parquet. + + Args: + chunk_id: This chunk's ID (0-indexed) + total_chunks: Total number of chunks + trace_map: Map of ICAO -> trace file path + icaos: Full list of ICAOs from manifest + output_id: Identifier for output file (date or date range) + """ chunk_icaos = get_chunk_icaos(icaos, chunk_id, total_chunks) print(f"Chunk {chunk_id}/{total_chunks}: Processing {len(chunk_icaos)} ICAOs") @@ -142,7 +160,7 @@ def process_chunk( return None # Process files and write parquet in batches - output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{date_str}.parquet") + output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{output_id}.parquet") start_time = time.perf_counter() total_rows = 0 @@ -200,22 +218,95 @@ def process_chunk( return None +def process_single_day( + chunk_id: int, + total_chunks: int, + target_day: datetime, +) -> str | None: + """Process a single day for this chunk.""" + date_str = target_day.strftime("%Y-%m-%d") + version_date = f"v{target_day.strftime('%Y.%m.%d')}" + + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + + if not os.path.isdir(extract_dir): + print(f"Extract directory not found: {extract_dir}") + return None + + trace_map = build_trace_file_map(extract_dir) + if not trace_map: + print("No trace files found") + return None + + icaos = read_manifest(date_str) + print(f"Total ICAOs in manifest: {len(icaos)}") + + return process_chunk(chunk_id, total_chunks, trace_map, icaos, date_str) + + +def process_date_range( + chunk_id: int, + total_chunks: int, + start_date: datetime, + end_date: datetime, +) -> str | None: + """Process a date range for this chunk. + + Combines trace files from all days in the range. + + Args: + chunk_id: This chunk's ID (0-indexed) + total_chunks: Total number of chunks + start_date: Start date (inclusive) + end_date: End date (inclusive) + """ + start_str = start_date.strftime("%Y-%m-%d") + end_str = end_date.strftime("%Y-%m-%d") + manifest_id = f"{start_str}_{end_str}" + + print(f"Processing date range: {start_str} to {end_str}") + + # Build combined trace map from all days + combined_trace_map: dict[str, str] = {} + current = start_date + + # Both start and end are inclusive + while current <= end_date: + version_date = f"v{current.strftime('%Y.%m.%d')}" + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + + if os.path.isdir(extract_dir): + trace_map = build_trace_file_map(extract_dir) + # Later days override earlier days (use most recent trace file) + combined_trace_map.update(trace_map) + print(f" {current.strftime('%Y-%m-%d')}: {len(trace_map)} trace files") + else: + print(f" {current.strftime('%Y-%m-%d')}: no extract directory") + + current += timedelta(days=1) + + if not combined_trace_map: + print("No trace files found in date range") + return None + + print(f"Combined trace map: {len(combined_trace_map)} ICAOs") + + icaos = read_manifest(manifest_id) + print(f"Total ICAOs in manifest: {len(icaos)}") + + return process_chunk(chunk_id, total_chunks, combined_trace_map, icaos, manifest_id) + + def main(): parser = argparse.ArgumentParser(description="Process a chunk of ICAOs") parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)") parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks") - parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") + parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") args = parser.parse_args() - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") - else: - target_day = get_target_day() - - date_str = target_day.strftime("%Y-%m-%d") - version_date = f"v{target_day.strftime('%Y.%m.%d')}" - - print(f"Processing chunk {args.chunk_id}/{args.total_chunks} for {date_str}") + print(f"Processing chunk {args.chunk_id}/{args.total_chunks}") print(f"OUTPUT_DIR: {OUTPUT_DIR}") print(f"CHUNK_OUTPUT_DIR: {CHUNK_OUTPUT_DIR}") print(f"Resource usage at start: {get_resource_usage()}") @@ -228,37 +319,19 @@ def main(): else: print(f" Directory does not exist!") - # Find extract directory - extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") - print(f"\nLooking for extract_dir: {extract_dir}") - if not os.path.isdir(extract_dir): - print(f"Extract directory not found: {extract_dir}") - # Try to find any extracted directory - import glob - pattern = os.path.join(OUTPUT_DIR, "*-planes-readsb-prod-0*") - matches = glob.glob(pattern) - print(f"Searching for pattern: {pattern}") - print(f"Found matches: {matches}") - sys.exit(1) - - # Build trace file map using find - trace_map = build_trace_file_map(extract_dir) - if not trace_map: - print("No trace files found in extract directory") - sys.exit(1) - - # Read manifest - icaos = read_manifest(date_str) - print(f"Total ICAOs in manifest: {len(icaos)}") - - # Process chunk - output_path = process_chunk( - args.chunk_id, - args.total_chunks, - trace_map, - icaos, - date_str, - ) + # Determine mode: single day or date range + if args.start_date and args.end_date: + # Historical mode + start_date = datetime.strptime(args.start_date, "%Y-%m-%d") + end_date = datetime.strptime(args.end_date, "%Y-%m-%d") + output_path = process_date_range(args.chunk_id, args.total_chunks, start_date, end_date) + else: + # Daily mode + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + output_path = process_single_day(args.chunk_id, args.total_chunks, target_day) if output_path: print(f"Output: {output_path}") diff --git a/src/get_historical_faa.py b/src/get_historical_faa.py deleted file mode 100644 index 656345e..0000000 --- a/src/get_historical_faa.py +++ /dev/null @@ -1,116 +0,0 @@ -""" -For each commit-day in Feb 2024 (last commit per day): -- Write ALL FAA text files from that commit into: data/faa_releasable_historical/YYYY-MM-DD/ - ACFTREF.txt, DEALER.txt, DOCINDEX.txt, ENGINE.txt, RESERVED.txt -- Recombine MASTER-*.txt into Master.txt -- Produce Master.csv via convert_faa_master_txt_to_csv - -Assumes the non-master files are present in every commit. -""" -import subprocess, re -from pathlib import Path -import shutil -from collections import OrderedDict -from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df -import zipfile -import pandas as pd -import argparse -from datetime import datetime, timedelta - -# Parse command line arguments -parser = argparse.ArgumentParser(description="Process historical FAA data from git commits") -parser.add_argument("since", help="Start date (YYYY-MM-DD)") -parser.add_argument("until", help="End date (YYYY-MM-DD)") -args = parser.parse_args() - -# Clone repository if it doesn't exist -REPO = Path("data/scrape-faa-releasable-aircraft") -OUT_ROOT = Path("data/faa_releasable_historical") -OUT_ROOT.mkdir(parents=True, exist_ok=True) - -def run_git_text(*args: str) -> str: - return subprocess.check_output(["git", "-C", str(REPO), *args], text=True).strip() - -def run_git_bytes(*args: str) -> bytes: - return subprocess.check_output(["git", "-C", str(REPO), *args]) - -# Parse dates and adjust --since to the day before -since_date = datetime.strptime(args.since, "%Y-%m-%d") -adjusted_since = (since_date - timedelta(days=1)).strftime("%Y-%m-%d") - -# All commits in specified date range (oldest -> newest) -log = run_git_text( - "log", - "--reverse", - "--format=%H %cs", - f"--since={adjusted_since}", - f"--until={args.until}", -) -lines = [ln for ln in log.splitlines() if ln.strip()] -if not lines: - raise SystemExit(f"No commits found between {args.since} and {args.until}.") - -# date -> last SHA that day -date_to_sha = OrderedDict() -for ln in lines: - sha, date = ln.split() - date_to_sha[date] = sha - -OTHER_FILES = ["ACFTREF.txt", "DEALER.txt", "DOCINDEX.txt", "ENGINE.txt", "RESERVED.txt"] -master_re = re.compile(r"^MASTER-(\d+)\.txt$") -df_base = pd.DataFrame() -start_date = None -end_date = None -for date, sha in date_to_sha.items(): - if start_date is None: - start_date = date - end_date = date - day_dir = OUT_ROOT / date - day_dir.mkdir(parents=True, exist_ok=True) - - # Write auxiliary files (assumed present) - for fname in OTHER_FILES: - (day_dir / fname).write_bytes(run_git_bytes("show", f"{sha}:{fname}")) - - # Recombine MASTER parts - names = run_git_text("ls-tree", "--name-only", sha).splitlines() - parts = [] - for n in names: - m = master_re.match(n) - if m: - parts.append((int(m.group(1)), n)) - parts.sort() - if not parts: - raise RuntimeError(f"{date} {sha[:7]}: no MASTER-*.txt parts found") - - master_path = day_dir / "MASTER.txt" - with master_path.open("wb") as w: - for _, fname in parts: - data = run_git_bytes("show", f"{sha}:{fname}") - w.write(data) - if data and not data.endswith(b"\n"): - w.write(b"\n") - - # 3) Zip the day's files - zip_path = day_dir / f"ReleasableAircraft.zip" - with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z: - for p in day_dir.iterdir(): - z.write(p, arcname=p.name) - - print(f"{date} {sha[:7]} -> {day_dir} (master parts: {len(parts)})") - # 4) Convert ZIP -> CSV - df_new = convert_faa_master_txt_to_df(zip_path, date) - if df_base.empty: - df_base = df_new - print(len(df_base), "total entries so far") - # Delete all files in the day directory - shutil.rmtree(day_dir) - continue - - df_base = concat_faa_historical_df(df_base, df_new) - shutil.rmtree(day_dir) - print(len(df_base), "total entries so far") - -assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" -df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date}_{end_date}.csv", index=False) -# TODO: get average number of new rows per day.