From d216ea932944db8936f3dead7548ecd84e5e642c Mon Sep 17 00:00:00 2001 From: ggman12 Date: Fri, 13 Feb 2026 11:49:18 -0500 Subject: [PATCH] Daily ADSB and Histoircal updates. Update readme.md --- .../ISSUE_TEMPLATE/community_submission.yaml | 6 +- .../workflows/adsb-to-aircraft-for-day.yaml | 145 +++++++ .../adsb-to-aircraft-multiple-day-run.yaml | 118 ++++++ .github/workflows/historical-adsb.yaml | 268 ------------ .../openairframes-daily-release.yaml | 289 +++++++------ .github/workflows/update-community-prs.yaml | 59 ++- .gitignore | 5 +- README.md | 5 +- scripts/concat_downloads.py | 49 +++ scripts/download.sh | 34 ++ scripts/download_and_concat_runs.py | 182 +++++++++ scripts/run_historical_adsb_action.py | 215 ++++++++++ scripts/run_main_isolated.py | 82 ++++ src/adsb/Dockerfile.reducer | 11 - src/adsb/Dockerfile.worker | 12 - src/adsb/combine_chunks_to_csv.py | 250 ------------ src/adsb/compress_adsb_to_aircraft_data.py | 153 ++----- src/adsb/concat_parquet_to_final.py | 50 +++ src/adsb/download_adsb_data_to_parquet.py | 382 ++++-------------- src/adsb/download_and_list_icaos.py | 126 ++---- src/adsb/historical_generate_matrix.py | 2 +- src/adsb/main.py | 78 ++++ src/adsb/process_icao_chunk.py | 296 +++----------- src/adsb/reducer.py | 97 ----- src/adsb/requirements.reducer.txt | 2 - src/adsb/requirements.worker.txt | 5 - src/adsb/worker.py | 89 ---- .../create_daily_adsbexchange_release.py | 40 ++ .../create_daily_microtonics_release.py | 55 +++ src/create_daily_adsb_release.py | 84 ---- src/derive_from_faa_master_txt.py | 7 +- src/get_latest_release.py | 37 +- 32 files changed, 1489 insertions(+), 1744 deletions(-) create mode 100644 .github/workflows/adsb-to-aircraft-for-day.yaml create mode 100644 .github/workflows/adsb-to-aircraft-multiple-day-run.yaml delete mode 100644 .github/workflows/historical-adsb.yaml create mode 100644 scripts/concat_downloads.py create mode 100644 scripts/download.sh create mode 100644 scripts/download_and_concat_runs.py create mode 100644 scripts/run_historical_adsb_action.py create mode 100644 scripts/run_main_isolated.py delete mode 100644 src/adsb/Dockerfile.reducer delete mode 100644 src/adsb/Dockerfile.worker delete mode 100644 src/adsb/combine_chunks_to_csv.py create mode 100644 src/adsb/concat_parquet_to_final.py create mode 100644 src/adsb/main.py delete mode 100644 src/adsb/reducer.py delete mode 100644 src/adsb/requirements.reducer.txt delete mode 100644 src/adsb/requirements.worker.txt delete mode 100644 src/adsb/worker.py create mode 100644 src/contributions/create_daily_adsbexchange_release.py create mode 100644 src/contributions/create_daily_microtonics_release.py delete mode 100644 src/create_daily_adsb_release.py diff --git a/.github/ISSUE_TEMPLATE/community_submission.yaml b/.github/ISSUE_TEMPLATE/community_submission.yaml index 8ee54c5..471fbd2 100644 --- a/.github/ISSUE_TEMPLATE/community_submission.yaml +++ b/.github/ISSUE_TEMPLATE/community_submission.yaml @@ -8,8 +8,8 @@ body: - type: markdown attributes: value: | - Submit **one object** or an **array of objects** that matches the community submission schema. - + Submit **one object** or an **array of objects** that matches the community submission [schema](https://github.com/PlaneQuery/OpenAirframes/blob/main/schemas/community_submission.v1.schema.json). Reuse existing tags from the schema when possible. + **Rules (enforced on review/automation):** - Each object must include **at least one** of: - `registration_number` @@ -27,7 +27,7 @@ body: ```json { "registration_number": "N12345", - "tags": {"owner": "John Doe"}, + "tags": {"owner": "John Doe", "photo": "https://example.com/photo.jpg"}, "start_date": "2025-01-01" } ``` diff --git a/.github/workflows/adsb-to-aircraft-for-day.yaml b/.github/workflows/adsb-to-aircraft-for-day.yaml new file mode 100644 index 0000000..d10676e --- /dev/null +++ b/.github/workflows/adsb-to-aircraft-for-day.yaml @@ -0,0 +1,145 @@ +name: Historical ADS-B Processing + +on: + workflow_dispatch: + inputs: + date: + description: 'YYYY-MM-DD' + required: true + type: string + concat_with_latest_csv: + description: 'Also concatenate with latest CSV from GitHub releases' + required: false + type: boolean + default: false + workflow_call: + inputs: + date: + description: 'YYYY-MM-DD' + required: true + type: string + concat_with_latest_csv: + description: 'Also concatenate with latest CSV from GitHub releases' + required: false + type: boolean + default: false + +jobs: + adsb-extract: + runs-on: ubuntu-24.04-arm + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download and split ADS-B data + env: + DATE: ${{ inputs.date }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + python -m src.adsb.download_and_list_icaos --date "$DATE" + ls -lah data/output/adsb_archives/"$DATE" || true + + - name: Upload archives + uses: actions/upload-artifact@v4 + with: + name: adsb-archives-${{ inputs.date }} + path: data/output/adsb_archives/${{ inputs.date }} + retention-days: 1 + compression-level: 0 + if-no-files-found: error + + adsb-map: + needs: adsb-extract + runs-on: ubuntu-24.04-arm + strategy: + fail-fast: true + matrix: + part_id: [0, 1, 2, 3] + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download archives + uses: actions/download-artifact@v4 + with: + name: adsb-archives-${{ inputs.date }} + path: data/output/adsb_archives/${{ inputs.date }} + + - name: Process part + env: + DATE: ${{ inputs.date }} + run: | + python -m src.adsb.process_icao_chunk --part-id ${{ matrix.part_id }} --date "$DATE" + + - name: Upload compressed outputs + uses: actions/upload-artifact@v4 + with: + name: adsb-compressed-${{ inputs.date }}-part-${{ matrix.part_id }} + path: data/output/compressed/${{ inputs.date }} + retention-days: 1 + compression-level: 0 + if-no-files-found: error + + adsb-reduce: + needs: adsb-map + runs-on: ubuntu-24.04-arm + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download compressed outputs + uses: actions/download-artifact@v4 + with: + pattern: adsb-compressed-${{ inputs.date }}-part-* + path: data/output/compressed/${{ inputs.date }} + merge-multiple: true + + - name: Concatenate final outputs + env: + DATE: ${{ inputs.date }} + CONCAT_WITH_LATEST_CSV: ${{ inputs.concat_with_latest_csv }} + run: | + EXTRA="" + if [ "$CONCAT_WITH_LATEST_CSV" = "true" ]; then + EXTRA="--concat_with_latest_csv" + fi + python -m src.adsb.concat_parquet_to_final --date "$DATE" $EXTRA + ls -lah data/output/ || true + + - name: Upload final artifacts + uses: actions/upload-artifact@v4 + with: + name: openairframes_adsb-${{ inputs.date }} + path: data/output/openairframes_adsb_${{ inputs.date }}* + retention-days: 30 + if-no-files-found: error diff --git a/.github/workflows/adsb-to-aircraft-multiple-day-run.yaml b/.github/workflows/adsb-to-aircraft-multiple-day-run.yaml new file mode 100644 index 0000000..68d9cce --- /dev/null +++ b/.github/workflows/adsb-to-aircraft-multiple-day-run.yaml @@ -0,0 +1,118 @@ +name: adsb-to-aircraft-multiple-day-run + +on: + workflow_dispatch: + inputs: + start_date: + description: 'YYYY-MM-DD (inclusive)' + required: true + type: string + end_date: + description: 'YYYY-MM-DD (exclusive)' + required: true + type: string + +jobs: + generate-dates: + runs-on: ubuntu-24.04-arm + outputs: + dates: ${{ steps.generate.outputs.dates }} + steps: + - name: Generate date list + id: generate + env: + START_DATE: ${{ inputs.start_date }} + END_DATE: ${{ inputs.end_date }} + run: | + python - <<'PY' + import json + import os + from datetime import datetime, timedelta + + start = datetime.strptime(os.environ["START_DATE"], "%Y-%m-%d") + end = datetime.strptime(os.environ["END_DATE"], "%Y-%m-%d") + if end <= start: + raise SystemExit("end_date must be after start_date") + + dates = [] + cur = start + while cur < end: + dates.append(cur.strftime("%Y-%m-%d")) + cur += timedelta(days=1) + + with open(os.environ["GITHUB_OUTPUT"], "a") as f: + f.write(f"dates={json.dumps(dates)}\n") + PY + + adsb-day: + needs: generate-dates + strategy: + fail-fast: true + matrix: + date: ${{ fromJson(needs.generate-dates.outputs.dates) }} + uses: ./.github/workflows/adsb-to-aircraft-for-day.yaml + with: + date: ${{ matrix.date }} + + adsb-final: + needs: adsb-day + runs-on: ubuntu-24.04-arm + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download daily CSVs + uses: actions/download-artifact@v4 + with: + pattern: openairframes_adsb-* + path: outputs/daily/ + merge-multiple: true + + - name: Concatenate all days to final CSV + env: + START_DATE: ${{ inputs.start_date }} + END_DATE: ${{ inputs.end_date }} + run: | + python - <<'PY' + import os + import re + from pathlib import Path + import polars as pl + + start = os.environ["START_DATE"] + end = os.environ["END_DATE"] + daily_dir = Path("outputs/daily") + files = sorted(daily_dir.glob("openairframes_adsb_*.csv.gz")) + if not files: + raise SystemExit("No daily CSVs found") + + def date_key(path: Path) -> str: + m = re.match(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", path.name) + return m.group(1) if m else path.name + + files = sorted(files, key=date_key) + frames = [pl.read_csv(p) for p in files] + df = pl.concat(frames, how="vertical", rechunk=True) + + output_path = Path("outputs") / f"openairframes_adsb_{start}_{end}.csv.gz" + df.write_csv(output_path, compression="gzip") + print(f"Wrote {output_path} with {df.height} rows") + PY + + - name: Upload final CSV + uses: actions/upload-artifact@v4 + with: + name: openairframes_adsb-${{ inputs.start_date }}-${{ inputs.end_date }} + path: outputs/openairframes_adsb_${{ inputs.start_date }}_${{ inputs.end_date }}.csv.gz + retention-days: 30 +# gh workflow run adsb-to-aircraft-multiple-day-run.yaml --repo ggman12/OpenAirframes --ref jonah/fix-historical-proper -f start_date=2025-12-31 -f end_date=2026-01-02 \ No newline at end of file diff --git a/.github/workflows/historical-adsb.yaml b/.github/workflows/historical-adsb.yaml deleted file mode 100644 index d3ce334..0000000 --- a/.github/workflows/historical-adsb.yaml +++ /dev/null @@ -1,268 +0,0 @@ -name: Historical ADS-B Processing - -on: - workflow_dispatch: - inputs: - start_date: - description: 'Start date (YYYY-MM-DD, inclusive)' - required: true - type: string - end_date: - description: 'End date (YYYY-MM-DD, exclusive)' - required: true - type: string - chunk_days: - description: 'Days per job chunk (default: 7)' - required: false - type: number - default: 7 - -jobs: - generate-matrix: - runs-on: ubuntu-latest - outputs: - chunks: ${{ steps.generate.outputs.chunks }} - global_start: ${{ inputs.start_date }} - global_end: ${{ inputs.end_date }} - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Setup Python - uses: actions/setup-python@v5 - with: - python-version: '3.12' - - - name: Generate date chunks - id: generate - env: - INPUT_START_DATE: ${{ inputs.start_date }} - INPUT_END_DATE: ${{ inputs.end_date }} - INPUT_CHUNK_DAYS: ${{ inputs.chunk_days }} - run: python src/adsb/historical_generate_matrix.py - - adsb-extract: - needs: generate-matrix - runs-on: ubuntu-24.04-arm - strategy: - matrix: - chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }} - max-parallel: 3 - fail-fast: true - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Setup Python - uses: actions/setup-python@v5 - with: - python-version: '3.12' - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - - - name: Free disk space - run: | - sudo rm -rf /usr/share/dotnet - sudo rm -rf /opt/ghc - sudo rm -rf /usr/local/share/boost - df -h - - - name: Download and extract ADS-B data - env: - START_DATE: ${{ matrix.chunk.start_date }} - END_DATE: ${{ matrix.chunk.end_date }} - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: | - python -m src.adsb.download_and_list_icaos --start-date "$START_DATE" --end-date "$END_DATE" - ls -lah data/output/ - - - name: Create tar of extracted data and split into chunks - run: | - cd data/output - echo "=== Disk space before tar ===" - df -h . - echo "=== Files to tar ===" - ls -lah *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt 2>/dev/null || echo "No files found" - - # Create tar with explicit error checking - if ls *-planes-readsb-prod-0.tar_0 1>/dev/null 2>&1; then - tar -cvf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt - echo "=== Tar file created ===" - ls -lah extracted_data.tar - # Verify tar integrity - tar -tf extracted_data.tar > /dev/null && echo "Tar integrity check passed" || { echo "Tar integrity check FAILED"; exit 1; } - - # Create checksum of the FULL tar before splitting (for verification after reassembly) - echo "=== Creating checksum of full tar ===" - sha256sum extracted_data.tar > full_tar.sha256 - cat full_tar.sha256 - - # Split into 500MB chunks to avoid artifact upload issues - echo "=== Splitting tar into 500MB chunks ===" - mkdir -p tar_chunks - split -b 500M extracted_data.tar tar_chunks/extracted_data.tar.part_ - rm extracted_data.tar - mv full_tar.sha256 tar_chunks/ - - echo "=== Chunks created ===" - ls -lah tar_chunks/ - else - echo "ERROR: No extracted directories found, cannot create tar" - exit 1 - fi - - - name: Upload extracted data chunks - uses: actions/upload-artifact@v4 - with: - name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }} - path: data/output/tar_chunks/ - retention-days: 1 - compression-level: 0 - if-no-files-found: warn - - adsb-map: - needs: [generate-matrix, adsb-extract] - runs-on: ubuntu-24.04-arm - strategy: - fail-fast: true - matrix: - chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }} - icao_chunk: [0, 1, 2, 3] - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Setup Python - uses: actions/setup-python@v5 - with: - python-version: '3.12' - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - - - name: Free disk space - run: | - sudo rm -rf /usr/share/dotnet - sudo rm -rf /opt/ghc - sudo rm -rf /usr/local/share/boost - df -h - - - name: Download extracted data - uses: actions/download-artifact@v4 - with: - name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }} - path: data/output/tar_chunks/ - - - name: Reassemble and extract tar - id: extract - run: | - cd data/output - if [ -d tar_chunks ] && ls tar_chunks/extracted_data.tar.part_* 1>/dev/null 2>&1; then - echo "=== Chunk files info ===" - ls -lah tar_chunks/ - - cd tar_chunks - - # Reassemble tar with explicit sorting - echo "=== Reassembling tar file ===" - ls -1 extracted_data.tar.part_?? | sort | while read part; do - echo "Appending $part..." - cat "$part" >> ../extracted_data.tar - done - cd .. - - echo "=== Reassembled tar file info ===" - ls -lah extracted_data.tar - - # Verify checksum of reassembled tar matches original - echo "=== Verifying reassembled tar checksum ===" - echo "Original checksum:" - cat tar_chunks/full_tar.sha256 - echo "Reassembled checksum:" - sha256sum extracted_data.tar - sha256sum -c tar_chunks/full_tar.sha256 || { echo "ERROR: Reassembled tar checksum mismatch - data corrupted during transfer"; exit 1; } - echo "Checksum verified - data integrity confirmed" - - rm -rf tar_chunks - - echo "=== Extracting ===" - tar -xvf extracted_data.tar - rm extracted_data.tar - echo "has_data=true" >> "$GITHUB_OUTPUT" - echo "=== Contents of data/output ===" - ls -lah - else - echo "No tar chunks found" - echo "has_data=false" >> "$GITHUB_OUTPUT" - fi - - - name: Process ICAO chunk - if: steps.extract.outputs.has_data == 'true' - env: - START_DATE: ${{ matrix.chunk.start_date }} - END_DATE: ${{ matrix.chunk.end_date }} - run: | - python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.icao_chunk }} --total-chunks 4 --start-date "$START_DATE" --end-date "$END_DATE" - ls -lah data/output/adsb_chunks/ || echo "No chunks created" - - - name: Upload chunk artifacts - if: steps.extract.outputs.has_data == 'true' - uses: actions/upload-artifact@v4 - with: - name: adsb-map-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }}-chunk-${{ matrix.icao_chunk }} - path: data/output/adsb_chunks/ - retention-days: 1 - if-no-files-found: ignore - - adsb-reduce: - needs: [generate-matrix, adsb-map] - runs-on: ubuntu-24.04-arm - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Setup Python - uses: actions/setup-python@v5 - with: - python-version: '3.12' - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - - - name: Download all chunk artifacts - uses: actions/download-artifact@v4 - with: - pattern: adsb-map-* - path: data/output/adsb_chunks/ - merge-multiple: true - - - name: Debug downloaded files - run: | - echo "=== Disk space before processing ===" - df -h - echo "=== Listing data/output/adsb_chunks/ ===" - find data/output/adsb_chunks/ -type f 2>/dev/null | wc -l - echo "=== Total parquet size ===" - du -sh data/output/adsb_chunks/ || echo "No chunks dir" - - - name: Combine chunks to CSV - env: - START_DATE: ${{ needs.generate-matrix.outputs.global_start }} - END_DATE: ${{ needs.generate-matrix.outputs.global_end }} - run: | - python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date "$START_DATE" --end-date "$END_DATE" --skip-base --stream - ls -lah data/openairframes/ - - - name: Upload final artifact - uses: actions/upload-artifact@v4 - with: - name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }} - path: data/openairframes/*.csv - retention-days: 30 diff --git a/.github/workflows/openairframes-daily-release.yaml b/.github/workflows/openairframes-daily-release.yaml index 7f684bf..17f23be 100644 --- a/.github/workflows/openairframes-daily-release.yaml +++ b/.github/workflows/openairframes-daily-release.yaml @@ -1,4 +1,4 @@ -name: OpenAirframes Daily Release +name: openairframes-daily-release on: schedule: @@ -76,159 +76,30 @@ jobs: data/faa_releasable/ReleasableAircraft_*.zip retention-days: 1 - adsb-extract: - runs-on: ubuntu-24.04-arm + resolve-dates: + runs-on: ubuntu-latest if: github.event_name != 'schedule' outputs: - manifest-exists: ${{ steps.check.outputs.exists }} + date: ${{ steps.out.outputs.date }} + adsb_date: ${{ steps.out.outputs.adsb_date }} steps: - - name: Checkout - uses: actions/checkout@v6 - with: - fetch-depth: 0 - - - name: Setup Python - uses: actions/setup-python@v6 - with: - python-version: "3.14" - - - name: Install dependencies + - id: out run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - - - name: Download and extract ADS-B data - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - run: | - python -m src.adsb.download_and_list_icaos ${{ inputs.date && format('--date {0}', inputs.date) || '' }} - ls -lah data/output/ - - - name: Check manifest exists - id: check - run: | - if ls data/output/icao_manifest_*.txt 1>/dev/null 2>&1; then - echo "exists=true" >> "$GITHUB_OUTPUT" + if [ -n "${{ inputs.date }}" ]; then + echo "date=${{ inputs.date }}" >> "$GITHUB_OUTPUT" + echo "adsb_date=${{ inputs.date }}" >> "$GITHUB_OUTPUT" else - echo "exists=false" >> "$GITHUB_OUTPUT" + echo "date=$(date -u -d 'yesterday' +%Y-%m-%d)" >> "$GITHUB_OUTPUT" + echo "adsb_date=$(date -u -d 'yesterday' +%Y-%m-%d)" >> "$GITHUB_OUTPUT" fi - - name: Create tar of extracted data - run: | - cd data/output - tar -cf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt - ls -lah extracted_data.tar - - - name: Upload extracted data - uses: actions/upload-artifact@v4 - with: - name: adsb-extracted - path: data/output/extracted_data.tar - retention-days: 1 - compression-level: 0 # Already compressed trace files - - adsb-map: - runs-on: ubuntu-24.04-arm - needs: adsb-extract - if: github.event_name != 'schedule' && needs.adsb-extract.outputs.manifest-exists == 'true' - strategy: - fail-fast: false - matrix: - chunk: [0, 1, 2, 3] - steps: - - name: Checkout - uses: actions/checkout@v6 - with: - fetch-depth: 0 - - - name: Setup Python - uses: actions/setup-python@v6 - with: - python-version: "3.14" - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - - - name: Download extracted data - uses: actions/download-artifact@v4 - with: - name: adsb-extracted - path: data/output/ - - - name: Extract tar - run: | - cd data/output - tar -xf extracted_data.tar - rm extracted_data.tar - echo "=== Contents of data/output ===" - ls -lah - echo "=== Looking for manifest ===" - cat icao_manifest_*.txt | head -20 || echo "No manifest found" - echo "=== Looking for extracted dirs ===" - ls -d *-planes-readsb-prod-0* 2>/dev/null || echo "No extracted dirs" - - - name: Process chunk ${{ matrix.chunk }} - run: | - python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4 ${{ inputs.date && format('--date {0}', inputs.date) || '' }} - mkdir -p data/output/adsb_chunks - ls -lah data/output/adsb_chunks/ || echo "No chunks created" - - - name: Upload chunk artifacts - uses: actions/upload-artifact@v4 - with: - name: adsb-chunk-${{ matrix.chunk }} - path: data/output/adsb_chunks/ - retention-days: 1 - - adsb-reduce: - runs-on: ubuntu-24.04-arm - needs: adsb-map + adsb-to-aircraft: + needs: resolve-dates if: github.event_name != 'schedule' - steps: - - name: Checkout - uses: actions/checkout@v6 - with: - fetch-depth: 0 - - - name: Setup Python - uses: actions/setup-python@v6 - with: - python-version: "3.14" - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - - - name: Download all chunk artifacts - uses: actions/download-artifact@v4 - with: - pattern: adsb-chunk-* - path: data/output/adsb_chunks/ - merge-multiple: true - - - name: Debug downloaded files - run: | - echo "=== Listing data/ ===" - find data/ -type f 2>/dev/null | head -50 || echo "No files in data/" - echo "=== Looking for parquet files ===" - find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found" - - - name: Combine chunks to CSV - run: | - mkdir -p data/output/adsb_chunks - ls -lah data/output/adsb_chunks/ || echo "Directory empty or does not exist" - python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks ${{ inputs.date && format('--date {0}', inputs.date) || '' }} - ls -lah data/openairframes/ - - - name: Upload ADS-B artifacts - uses: actions/upload-artifact@v4 - with: - name: adsb-release - path: data/openairframes/openairframes_adsb_*.csv - retention-days: 1 + uses: ./.github/workflows/adsb-to-aircraft-for-day.yaml + with: + date: ${{ needs.resolve-dates.outputs.adsb_date }} + concat_with_latest_csv: true build-community: runs-on: ubuntu-latest @@ -261,11 +132,71 @@ jobs: path: data/openairframes/openairframes_community_*.csv retention-days: 1 - create-release: + build-adsbexchange-json: runs-on: ubuntu-latest - needs: [build-faa, adsb-reduce, build-community] if: github.event_name != 'schedule' steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.14" + + - name: Run ADS-B Exchange JSON release script + run: | + python -m src.contributions.create_daily_adsbexchange_release ${{ inputs.date && format('--date {0}', inputs.date) || '' }} + ls -lah data/openairframes + + - name: Upload ADS-B Exchange JSON artifact + uses: actions/upload-artifact@v4 + with: + name: adsbexchange-json + path: data/openairframes/basic-ac-db_*.json.gz + retention-days: 1 + + build-mictronics-db: + runs-on: ubuntu-latest + if: github.event_name != 'schedule' + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.14" + + - name: Run Mictronics DB release script + continue-on-error: true + run: | + python -m src.contributions.create_daily_microtonics_release ${{ inputs.date && format('--date {0}', inputs.date) || '' }} + ls -lah data/openairframes + + - name: Upload Mictronics DB artifact + uses: actions/upload-artifact@v4 + with: + name: mictronics-db + path: data/openairframes/mictronics-db_*.zip + retention-days: 1 + if-no-files-found: ignore + + create-release: + runs-on: ubuntu-latest + needs: [resolve-dates, build-faa, adsb-to-aircraft, build-community, build-adsbexchange-json, build-mictronics-db] + if: github.event_name != 'schedule' && !cancelled() + steps: + - name: Require adsb-to-aircraft success + if: needs.adsb-to-aircraft.result != 'success' + run: | + echo "adsb-to-aircraft result was '${{ needs.adsb-to-aircraft.result }}', expected 'success'" + exit 1 + - name: Checkout for gh CLI uses: actions/checkout@v4 with: @@ -274,23 +205,36 @@ jobs: sparse-checkout-cone-mode: false - name: Download FAA artifacts - uses: actions/download-artifact@v4 + uses: actions/download-artifact@v5 with: name: faa-release path: artifacts/faa - name: Download ADS-B artifacts - uses: actions/download-artifact@v4 + uses: actions/download-artifact@v5 with: - name: adsb-release + name: openairframes_adsb-${{ needs.resolve-dates.outputs.adsb_date }} path: artifacts/adsb - name: Download Community artifacts - uses: actions/download-artifact@v4 + uses: actions/download-artifact@v5 with: name: community-release path: artifacts/community + - name: Download ADS-B Exchange JSON artifact + uses: actions/download-artifact@v5 + with: + name: adsbexchange-json + path: artifacts/adsbexchange + + - name: Download Mictronics DB artifact + uses: actions/download-artifact@v5 + continue-on-error: true + with: + name: mictronics-db + path: artifacts/mictronics + - name: Debug artifact structure run: | echo "=== Full artifacts tree ===" @@ -301,6 +245,10 @@ jobs: find artifacts/adsb -type f 2>/dev/null || echo "No files found in artifacts/adsb" echo "=== Community artifacts ===" find artifacts/community -type f 2>/dev/null || echo "No files found in artifacts/community" + echo "=== ADS-B Exchange JSON artifacts ===" + find artifacts/adsbexchange -type f 2>/dev/null || echo "No files found in artifacts/adsbexchange" + echo "=== Mictronics DB artifacts ===" + find artifacts/mictronics -type f 2>/dev/null || echo "No files found in artifacts/mictronics" - name: Prepare release metadata id: meta @@ -317,9 +265,11 @@ jobs: # Find files from artifacts using find (handles nested structures) CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1) - CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1) + CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1) CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1) ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1) + JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1) + ZIP_FILE_MICTRONICS=$(find artifacts/mictronics -name "mictronics-db_*.zip" -type f 2>/dev/null | head -1) # Validate required files exist MISSING_FILES="" @@ -332,12 +282,24 @@ jobs: if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then MISSING_FILES="$MISSING_FILES FAA_ZIP" fi + if [ -z "$JSON_FILE_ADSBX" ] || [ ! -f "$JSON_FILE_ADSBX" ]; then + MISSING_FILES="$MISSING_FILES ADSBX_JSON" + fi + + # Optional files - warn but don't fail + OPTIONAL_MISSING="" + if [ -z "$ZIP_FILE_MICTRONICS" ] || [ ! -f "$ZIP_FILE_MICTRONICS" ]; then + OPTIONAL_MISSING="$OPTIONAL_MISSING MICTRONICS_ZIP" + ZIP_FILE_MICTRONICS="" + fi if [ -n "$MISSING_FILES" ]; then echo "ERROR: Missing required release files:$MISSING_FILES" echo "FAA CSV: $CSV_FILE_FAA" echo "ADSB CSV: $CSV_FILE_ADSB" echo "ZIP: $ZIP_FILE" + echo "ADSBX JSON: $JSON_FILE_ADSBX" + echo "MICTRONICS ZIP: $ZIP_FILE_MICTRONICS" exit 1 fi @@ -346,6 +308,15 @@ jobs: CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB") CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "") ZIP_BASENAME=$(basename "$ZIP_FILE") + JSON_BASENAME_ADSBX=$(basename "$JSON_FILE_ADSBX") + ZIP_BASENAME_MICTRONICS="" + if [ -n "$ZIP_FILE_MICTRONICS" ]; then + ZIP_BASENAME_MICTRONICS=$(basename "$ZIP_FILE_MICTRONICS") + fi + + if [ -n "$OPTIONAL_MISSING" ]; then + echo "WARNING: Optional files missing:$OPTIONAL_MISSING (will continue without them)" + fi echo "date=$DATE" >> "$GITHUB_OUTPUT" echo "tag=$TAG" >> "$GITHUB_OUTPUT" @@ -357,6 +328,10 @@ jobs: echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT" echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT" echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT" + echo "json_file_adsbx=$JSON_FILE_ADSBX" >> "$GITHUB_OUTPUT" + echo "json_basename_adsbx=$JSON_BASENAME_ADSBX" >> "$GITHUB_OUTPUT" + echo "zip_file_mictronics=$ZIP_FILE_MICTRONICS" >> "$GITHUB_OUTPUT" + echo "zip_basename_mictronics=$ZIP_BASENAME_MICTRONICS" >> "$GITHUB_OUTPUT" echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT" echo "Found files:" @@ -364,6 +339,8 @@ jobs: echo " ADSB CSV: $CSV_FILE_ADSB" echo " Community CSV: $CSV_FILE_COMMUNITY" echo " ZIP: $ZIP_FILE" + echo " ADSBX JSON: $JSON_FILE_ADSBX" + echo " MICTRONICS ZIP: $ZIP_FILE_MICTRONICS" - name: Delete existing release if exists run: | @@ -377,7 +354,7 @@ jobs: with: tag_name: ${{ steps.meta.outputs.tag }} name: ${{ steps.meta.outputs.name }} - fail_on_unmatched_files: true + fail_on_unmatched_files: false body: | Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}. @@ -386,10 +363,14 @@ jobs: - ${{ steps.meta.outputs.csv_basename_adsb }} - ${{ steps.meta.outputs.csv_basename_community }} - ${{ steps.meta.outputs.zip_basename }} + - ${{ steps.meta.outputs.json_basename_adsbx }} + ${{ steps.meta.outputs.zip_basename_mictronics && format('- {0}', steps.meta.outputs.zip_basename_mictronics) || '' }} files: | ${{ steps.meta.outputs.csv_file_faa }} ${{ steps.meta.outputs.csv_file_adsb }} ${{ steps.meta.outputs.csv_file_community }} ${{ steps.meta.outputs.zip_file }} + ${{ steps.meta.outputs.json_file_adsbx }} + ${{ steps.meta.outputs.zip_file_mictronics }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/update-community-prs.yaml b/.github/workflows/update-community-prs.yaml index 711df76..cf7e826 100644 --- a/.github/workflows/update-community-prs.yaml +++ b/.github/workflows/update-community-prs.yaml @@ -48,29 +48,52 @@ jobs: git fetch origin "$branch_name" git checkout "$branch_name" - # Merge main into PR branch git config user.name "github-actions[bot]" git config user.email "github-actions[bot]@users.noreply.github.com" - if git merge origin/main -m "Merge main to update schema"; then - # Regenerate schema for this PR's submission (adds any new tags) - python -m src.contributions.regenerate_pr_schema || true - - # If there are changes, commit and push - if [ -n "$(git status --porcelain schemas/)" ]; then - git add schemas/ - git commit -m "Update schema with new tags" - git push origin "$branch_name" - echo " Updated PR #$pr_number with schema changes" - else - git push origin "$branch_name" - echo " Merged main into PR #$pr_number" + # Get the community submission file(s) and schema from this branch + community_files=$(git diff --name-only origin/main...HEAD -- 'community/' 'schemas/') + + if [ -z "$community_files" ]; then + echo " No community/schema files found in PR #$pr_number, skipping" + git checkout main + continue + fi + + echo " Files to preserve: $community_files" + + # Save the community files content + mkdir -p /tmp/pr_files + for file in $community_files; do + if [ -f "$file" ]; then + mkdir -p "/tmp/pr_files/$(dirname "$file")" + cp "$file" "/tmp/pr_files/$file" fi + done + + # Reset branch to main (clean slate) + git reset --hard origin/main + + # Restore the community files + for file in $community_files; do + if [ -f "/tmp/pr_files/$file" ]; then + mkdir -p "$(dirname "$file")" + cp "/tmp/pr_files/$file" "$file" + fi + done + rm -rf /tmp/pr_files + + # Regenerate schema with current main + this submission's tags + python -m src.contributions.regenerate_pr_schema || true + + # Stage and commit all changes + git add community/ schemas/ + if ! git diff --cached --quiet; then + git commit -m "Community submission (rebased on main)" + git push --force origin "$branch_name" + echo " Rebased PR #$pr_number onto main" else - echo " Merge conflict in PR #$pr_number, adding comment" - gh pr comment "$pr_number" --body $'⚠️ **Merge Conflict**\n\nAnother community submission was merged and this PR has conflicts.\n\nA maintainer may need to:\n1. Close this PR\n2. Remove the `approved` label from the original issue\n3. Re-add the `approved` label to regenerate the PR' - git merge --abort - fi + echo " No changes needed for PR #$pr_number" fi git checkout main diff --git a/.gitignore b/.gitignore index 7ef2c0b..2291163 100644 --- a/.gitignore +++ b/.gitignore @@ -281,4 +281,7 @@ read*lock .nx/ # jsii-rosetta files -type-fingerprints.txt \ No newline at end of file +type-fingerprints.txt + +notebooks/whatever.ipynb +.snapshots/ \ No newline at end of file diff --git a/README.md b/README.md index 3141ccd..fe2095a 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ A daily release is created at **06:00 UTC** and includes: All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB) - **openairframes_adsb.csv** - Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present. The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB). + Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present (will be from 2024-01-01 soon). The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB). - **ReleasableAircraft_{date}.zip** A daily snapshot of the FAA database, which updates at **05:30 UTC** @@ -43,7 +43,8 @@ Please try to follow the submission formatting guidelines. If you are struggling ## For Developers All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include: -- Web UI +- Web UI for data +- Web UI for contributors - Additional export formats in the daily release - Data fusion from multiple sources in the daily release - Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs diff --git a/scripts/concat_downloads.py b/scripts/concat_downloads.py new file mode 100644 index 0000000..7f7bf2b --- /dev/null +++ b/scripts/concat_downloads.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +import re +from pathlib import Path +import polars as pl + +# Find all CSV.gz files in the downloaded artifacts +artifacts_dir = Path("downloads/adsb_artifacts") +files = sorted(artifacts_dir.glob("*/openairframes_adsb_*.csv.gz")) + +if not files: + raise SystemExit("No CSV.gz files found in downloads/adsb_artifacts/") + +print(f"Found {len(files)} files to concatenate") + +# Extract dates from filenames to determine range +def extract_dates(path: Path) -> tuple[str, str]: + """Extract start and end dates from filename""" + m = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv\.gz", path.name) + if m: + return m.group(1), m.group(2) + return None, None + +# Collect all dates +all_dates = [] +for f in files: + start, end = extract_dates(f) + if start and end: + all_dates.extend([start, end]) + print(f" {f.name}: {start} to {end}") + +if not all_dates: + raise SystemExit("Could not extract dates from filenames") + +# Find earliest and latest dates +earliest = min(all_dates) +latest = max(all_dates) +print(f"\nDate range: {earliest} to {latest}") + +# Read and concatenate all files +print("\nReading and concatenating files...") +frames = [pl.read_csv(f) for f in files] +df = pl.concat(frames, how="vertical", rechunk=True) + +# Write output +output_path = Path("downloads") / f"openairframes_adsb_{earliest}_{latest}.csv.gz" +output_path.parent.mkdir(parents=True, exist_ok=True) +df.write_csv(output_path, compression="gzip") + +print(f"\nWrote {output_path} with {df.height:,} rows") \ No newline at end of file diff --git a/scripts/download.sh b/scripts/download.sh new file mode 100644 index 0000000..1817ae0 --- /dev/null +++ b/scripts/download.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +# Create download directory +mkdir -p downloads/adsb_artifacts + +# Repository from the workflow comment +REPO="ggman12/OpenAirframes" + +# Get last 15 runs of the workflow and download matching artifacts +gh run list \ + --repo "$REPO" \ + --workflow adsb-to-aircraft-multiple-day-run.yaml \ + --limit 15 \ + --json databaseId \ + --jq '.[].databaseId' | while read -r run_id; do + + echo "Checking run ID: $run_id" + + # List artifacts for this run using the API + # Match pattern: openairframes_adsb-YYYY-MM-DD-YYYY-MM-DD (with second date) + gh api \ + --paginate \ + "repos/$REPO/actions/runs/$run_id/artifacts" \ + --jq '.artifacts[] | select(.name | test("^openairframes_adsb-[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{4}-[0-9]{2}-[0-9]{2}$")) | .name' | while read -r artifact_name; do + + echo " Downloading: $artifact_name" + gh run download "$run_id" \ + --repo "$REPO" \ + --name "$artifact_name" \ + --dir "downloads/adsb_artifacts/$artifact_name" + done +done + +echo "Download complete! Files saved to downloads/adsb_artifacts/" \ No newline at end of file diff --git a/scripts/download_and_concat_runs.py b/scripts/download_and_concat_runs.py new file mode 100644 index 0000000..b931eb1 --- /dev/null +++ b/scripts/download_and_concat_runs.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +""" +Download and concatenate artifacts from a specific set of workflow runs. + +Usage: + python scripts/download_and_concat_runs.py triggered_runs_20260216_123456.json +""" + +import argparse +import json +import os +import subprocess +import sys +from pathlib import Path + + +def download_run_artifact(run_id, output_dir): + """Download artifact from a specific workflow run.""" + print(f" Downloading artifacts from run {run_id}...") + + cmd = [ + 'gh', 'run', 'download', str(run_id), + '--pattern', 'openairframes_adsb-*', + '--dir', output_dir + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + print(f" ✓ Downloaded") + return True + else: + if "no artifacts" in result.stderr.lower(): + print(f" ⚠ No artifacts found (workflow may still be running)") + else: + print(f" ✗ Failed: {result.stderr}") + return False + + +def find_csv_files(download_dir): + """Find all CSV.gz files in the download directory.""" + csv_files = [] + for root, dirs, files in os.walk(download_dir): + for file in files: + if file.endswith('.csv.gz'): + csv_files.append(os.path.join(root, file)) + return sorted(csv_files) + + +def concatenate_csv_files(csv_files, output_file): + """Concatenate CSV files in order, preserving headers.""" + import gzip + + print(f"\nConcatenating {len(csv_files)} CSV files...") + + with gzip.open(output_file, 'wt') as outf: + header_written = False + + for i, csv_file in enumerate(csv_files, 1): + print(f" [{i}/{len(csv_files)}] Processing {os.path.basename(csv_file)}") + + with gzip.open(csv_file, 'rt') as inf: + lines = inf.readlines() + + if not header_written: + # Write header from first file + outf.writelines(lines) + header_written = True + else: + # Skip header for subsequent files + outf.writelines(lines[1:]) + + print(f"\n✓ Concatenated CSV saved to: {output_file}") + + # Show file size + size_mb = os.path.getsize(output_file) / (1024 * 1024) + print(f" Size: {size_mb:.1f} MB") + + +def main(): + parser = argparse.ArgumentParser( + description='Download and concatenate artifacts from workflow runs' + ) + parser.add_argument( + 'runs_file', + help='JSON file containing run IDs (from run_historical_adsb_action.py)' + ) + parser.add_argument( + '--output-dir', + default='./downloads/historical_concat', + help='Directory for downloads (default: ./downloads/historical_concat)' + ) + parser.add_argument( + '--wait', + action='store_true', + help='Wait for workflows to complete before downloading' + ) + + args = parser.parse_args() + + # Load run IDs + if not os.path.exists(args.runs_file): + print(f"Error: File not found: {args.runs_file}") + sys.exit(1) + + with open(args.runs_file, 'r') as f: + data = json.load(f) + + runs = data['runs'] + start_date = data['start_date'] + end_date = data['end_date'] + + print("=" * 60) + print("Download and Concatenate Historical Artifacts") + print("=" * 60) + print(f"Date range: {start_date} to {end_date}") + print(f"Workflow runs: {len(runs)}") + print(f"Output directory: {args.output_dir}") + print("=" * 60) + + # Create output directory + os.makedirs(args.output_dir, exist_ok=True) + + # Wait for workflows to complete if requested + if args.wait: + print("\nWaiting for workflows to complete...") + for run_info in runs: + run_id = run_info['run_id'] + print(f" Checking run {run_id}...") + + cmd = ['gh', 'run', 'watch', str(run_id)] + subprocess.run(cmd) + + # Download artifacts + print("\nDownloading artifacts...") + successful_downloads = 0 + + for i, run_info in enumerate(runs, 1): + run_id = run_info['run_id'] + print(f"\n[{i}/{len(runs)}] Run {run_id} ({run_info['start']} to {run_info['end']})") + + if download_run_artifact(run_id, args.output_dir): + successful_downloads += 1 + + print(f"\n\nDownload Summary: {successful_downloads}/{len(runs)} artifacts downloaded") + + if successful_downloads == 0: + print("\nNo artifacts downloaded. Workflows may still be running.") + print("Use --wait to wait for completion, or try again later.") + sys.exit(1) + + # Find all CSV files + csv_files = find_csv_files(args.output_dir) + + if not csv_files: + print("\nError: No CSV files found in download directory") + sys.exit(1) + + print(f"\nFound {len(csv_files)} CSV file(s):") + for csv_file in csv_files: + print(f" - {os.path.basename(csv_file)}") + + # Concatenate + # Calculate actual end date for filename (end_date - 1 day since it's exclusive) + from datetime import datetime, timedelta + end_dt = datetime.strptime(end_date, '%Y-%m-%d') - timedelta(days=1) + actual_end = end_dt.strftime('%Y-%m-%d') + + output_file = os.path.join( + args.output_dir, + f"openairframes_adsb_{start_date}_{actual_end}.csv.gz" + ) + + concatenate_csv_files(csv_files, output_file) + + print("\n" + "=" * 60) + print("Done!") + print("=" * 60) + + +if __name__ == '__main__': + main() diff --git a/scripts/run_historical_adsb_action.py b/scripts/run_historical_adsb_action.py new file mode 100644 index 0000000..f728529 --- /dev/null +++ b/scripts/run_historical_adsb_action.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +""" +Script to trigger adsb-to-aircraft-multiple-day-run workflow runs in monthly chunks. + +Usage: + python scripts/run_historical_adsb_action.py --start-date 2025-01-01 --end-date 2025-06-01 +""" + +import argparse +import subprocess +import sys +from datetime import datetime, timedelta +from calendar import monthrange + + +def generate_monthly_chunks(start_date_str, end_date_str): + """Generate date ranges in monthly chunks from start to end date. + + End dates are exclusive (e.g., to process Jan 1-31, end_date should be Feb 1). + """ + start_date = datetime.strptime(start_date_str, '%Y-%m-%d') + end_date = datetime.strptime(end_date_str, '%Y-%m-%d') + + chunks = [] + current = start_date + + while current < end_date: + # Get the first day of the next month (exclusive end) + _, days_in_month = monthrange(current.year, current.month) + month_end = current.replace(day=days_in_month) + next_month_start = month_end + timedelta(days=1) + + # Don't go past the global end date + chunk_end = min(next_month_start, end_date) + + chunks.append({ + 'start': current.strftime('%Y-%m-%d'), + 'end': chunk_end.strftime('%Y-%m-%d') + }) + + # Move to first day of next month + if next_month_start >= end_date: + break + current = next_month_start + + return chunks + + +def trigger_workflow(start_date, end_date, repo='ggman12/OpenAirframes', branch='main', dry_run=False): + """Trigger the adsb-to-aircraft-multiple-day-run workflow via GitHub CLI.""" + cmd = [ + 'gh', 'workflow', 'run', 'adsb-to-aircraft-multiple-day-run.yaml', + '--repo', repo, + '--ref', branch, + '-f', f'start_date={start_date}', + '-f', f'end_date={end_date}' + ] + + if dry_run: + print(f"[DRY RUN] Would run: {' '.join(cmd)}") + return True, None + + print(f"Triggering workflow: {start_date} to {end_date} (on {branch})") + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + print(f"✓ Successfully triggered workflow for {start_date} to {end_date}") + + # Get the run ID of the workflow we just triggered + # Wait a moment for it to appear + import time + time.sleep(2) + + # Get the most recent run (should be the one we just triggered) + list_cmd = [ + 'gh', 'run', 'list', + '--repo', repo, + '--workflow', 'adsb-to-aircraft-multiple-day-run.yaml', + '--branch', branch, + '--limit', '1', + '--json', 'databaseId', + '--jq', '.[0].databaseId' + ] + list_result = subprocess.run(list_cmd, capture_output=True, text=True) + run_id = list_result.stdout.strip() if list_result.returncode == 0 else None + + return True, run_id + else: + print(f"✗ Failed to trigger workflow for {start_date} to {end_date}") + print(f"Error: {result.stderr}") + return False, None + + +def main(): + parser = argparse.ArgumentParser( + description='Trigger adsb-to-aircraft-multiple-day-run workflow runs in monthly chunks' + ) + parser.add_argument( + '--start-date', '--start_date', + dest='start_date', + required=True, + help='Start date in YYYY-MM-DD format (inclusive)' + ) + parser.add_argument( + '--end-date', '--end_date', + dest='end_date', + required=True, + help='End date in YYYY-MM-DD format (exclusive)' + ) + parser.add_argument( + '--repo', + type=str, + default='ggman12/OpenAirframes', + help='GitHub repository (default: ggman12/OpenAirframes)' + ) + parser.add_argument( + '--branch', + type=str, + default='main', + help='Branch to run the workflow on (default: main)' + ) + parser.add_argument( + '--dry-run', + action='store_true', + help='Print commands without executing them' + ) + parser.add_argument( + '--delay', + type=int, + default=5, + help='Delay in seconds between workflow triggers (default: 5)' + ) + + args = parser.parse_args() + + # Validate dates + try: + start = datetime.strptime(args.start_date, '%Y-%m-%d') + end = datetime.strptime(args.end_date, '%Y-%m-%d') + if start > end: + print("Error: start_date must be before or equal to end_date") + sys.exit(1) + except ValueError as e: + print(f"Error: Invalid date format - {e}") + sys.exit(1) + + # Generate monthly chunks + chunks = generate_monthly_chunks(args.start_date, args.end_date) + + print(f"\nGenerating {len(chunks)} monthly workflow runs on branch '{args.branch}' (repo: {args.repo}):") + for i, chunk in enumerate(chunks, 1): + print(f" {i}. {chunk['start']} to {chunk['end']}") + + if not args.dry_run: + response = input(f"\nProceed with triggering {len(chunks)} workflows on '{args.branch}'? [y/N]: ") + if response.lower() != 'y': + print("Cancelled.") + sys.exit(0) + + print() + + # Trigger workflows + import time + success_count = 0 + triggered_runs = [] + + for i, chunk in enumerate(chunks, 1): + print(f"\n[{i}/{len(chunks)}] ", end='') + + success, run_id = trigger_workflow( + chunk['start'], + chunk['end'], + repo=args.repo, + branch=args.branch, + dry_run=args.dry_run + ) + + if success: + success_count += 1 + if run_id: + triggered_runs.append({ + 'run_id': run_id, + 'start': chunk['start'], + 'end': chunk['end'] + }) + + # Add delay between triggers (except for last one) + if i < len(chunks) and not args.dry_run: + time.sleep(args.delay) + + print(f"\n\nSummary: {success_count}/{len(chunks)} workflows triggered successfully") + + # Save triggered run IDs to a file + if triggered_runs and not args.dry_run: + import json + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + runs_file = f"./triggered_runs_{timestamp}.json" + with open(runs_file, 'w') as f: + json.dump({ + 'start_date': args.start_date, + 'end_date': args.end_date, + 'repo': args.repo, + 'branch': args.branch, + 'runs': triggered_runs + }, f, indent=2) + print(f"\nRun IDs saved to: {runs_file}") + print(f"\nTo download and concatenate these artifacts, run:") + print(f" python scripts/download_and_concat_runs.py {runs_file}") + + if success_count < len(chunks): + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/scripts/run_main_isolated.py b/scripts/run_main_isolated.py new file mode 100644 index 0000000..92527cb --- /dev/null +++ b/scripts/run_main_isolated.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +""" +Run src.adsb.main in an isolated git worktree so edits in the main +working tree won't affect subprocess imports during the run. + +Usage: + python scripts/run_main_isolated.py 2026-01-01 + python scripts/run_main_isolated.py --start_date 2026-01-01 --end_date 2026-01-03 +""" +import argparse +import os +import shutil +import subprocess +import sys +from datetime import datetime, timezone +from pathlib import Path + + +def run( + cmd: list[str], + *, + cwd: Path | None = None, + check: bool = True, +) -> subprocess.CompletedProcess: + print(f"\n>>> {' '.join(cmd)}") + return subprocess.run(cmd, cwd=cwd, check=check) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Run src.adsb.main in an isolated worktree") + parser.add_argument("date", nargs="?", help="Single date to process (YYYY-MM-DD)") + parser.add_argument("--start_date", help="Start date (inclusive, YYYY-MM-DD)") + parser.add_argument("--end_date", help="End date (exclusive, YYYY-MM-DD)") + parser.add_argument("--concat_with_latest_csv", action="store_true", help="Also concatenate with latest CSV from GitHub releases") + args = parser.parse_args() + + if args.date and (args.start_date or args.end_date): + raise SystemExit("Use a single date or --start_date/--end_date, not both.") + + if args.date: + datetime.strptime(args.date, "%Y-%m-%d") + main_args = ["--date", args.date] + else: + if not args.start_date or not args.end_date: + raise SystemExit("Provide --start_date and --end_date, or a single date.") + datetime.strptime(args.start_date, "%Y-%m-%d") + datetime.strptime(args.end_date, "%Y-%m-%d") + main_args = ["--start_date", args.start_date, "--end_date", args.end_date] + + if args.concat_with_latest_csv: + main_args.append("--concat_with_latest_csv") + + repo_root = Path(__file__).resolve().parents[1] + snapshots_root = repo_root / ".snapshots" + snapshots_root.mkdir(exist_ok=True) + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + snapshot_root = snapshots_root / f"run_{timestamp}" + snapshot_src = snapshot_root / "src" + + exit_code = 0 + try: + shutil.copytree(repo_root / "src", snapshot_src) + + runner = ( + "import sys, runpy; " + f"sys.path.insert(0, {repr(str(snapshot_root))}); " + f"sys.argv = ['src.adsb.main'] + {main_args!r}; " + "runpy.run_module('src.adsb.main', run_name='__main__')" + ) + cmd = [sys.executable, "-c", runner] + run(cmd, cwd=repo_root) + except subprocess.CalledProcessError as exc: + exit_code = exc.returncode + finally: + shutil.rmtree(snapshot_root, ignore_errors=True) + + return exit_code + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/adsb/Dockerfile.reducer b/src/adsb/Dockerfile.reducer deleted file mode 100644 index b375f46..0000000 --- a/src/adsb/Dockerfile.reducer +++ /dev/null @@ -1,11 +0,0 @@ -FROM --platform=linux/arm64 python:3.12-slim - -WORKDIR /app - -COPY requirements.reducer.txt requirements.txt -RUN pip install --no-cache-dir -r requirements.txt - -COPY compress_adsb_to_aircraft_data.py . -COPY reducer.py . - -CMD ["python", "-u", "reducer.py"] diff --git a/src/adsb/Dockerfile.worker b/src/adsb/Dockerfile.worker deleted file mode 100644 index dc4336d..0000000 --- a/src/adsb/Dockerfile.worker +++ /dev/null @@ -1,12 +0,0 @@ -FROM --platform=linux/arm64 python:3.12-slim - -WORKDIR /app - -COPY requirements.worker.txt requirements.txt -RUN pip install --no-cache-dir -r requirements.txt - -COPY compress_adsb_to_aircraft_data.py . -COPY download_adsb_data_to_parquet.py . -COPY worker.py . - -CMD ["python", "-u", "worker.py"] diff --git a/src/adsb/combine_chunks_to_csv.py b/src/adsb/combine_chunks_to_csv.py deleted file mode 100644 index b5afca3..0000000 --- a/src/adsb/combine_chunks_to_csv.py +++ /dev/null @@ -1,250 +0,0 @@ -""" -Combines chunk parquet files and compresses to final aircraft CSV. -This is the reduce phase of the map-reduce pipeline. - -Supports both single-day (daily) and multi-day (historical) modes. - -Memory-efficient: processes each chunk separately, compresses, then combines. - -Usage: - # Daily mode - python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks - - # Historical mode - python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base -""" -import gc -import os -import sys -import glob -import argparse -from datetime import datetime, timedelta - -import polars as pl - -from src.adsb.download_adsb_data_to_parquet import OUTPUT_DIR, get_resource_usage -from src.adsb.compress_adsb_to_aircraft_data import compress_multi_icao_df, COLUMNS - - -DEFAULT_CHUNK_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks") -FINAL_OUTPUT_DIR = "./data/openairframes" -os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True) - - -def get_target_day() -> datetime: - """Get yesterday's date (the day we're processing).""" - return datetime.utcnow() - timedelta(days=1) - - -def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame: - """Load and compress a single chunk parquet file. - - Args: - chunk_path: Path to parquet file - delete_after_load: If True, delete the parquet file after loading to free disk space - """ - print(f"Processing {os.path.basename(chunk_path)}... | {get_resource_usage()}") - - # Load chunk - only columns we need - needed_columns = ['time', 'icao'] + COLUMNS - df = pl.read_parquet(chunk_path, columns=needed_columns) - print(f" Loaded {len(df)} rows") - - # Delete file immediately after loading to free disk space - if delete_after_load: - try: - os.remove(chunk_path) - print(f" Deleted {chunk_path} to free disk space") - except Exception as e: - print(f" Warning: Failed to delete {chunk_path}: {e}") - - # Compress to aircraft records (one per ICAO) using shared function - compressed = compress_multi_icao_df(df, verbose=True) - print(f" Compressed to {len(compressed)} aircraft records") - - del df - gc.collect() - - return compressed - - -def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFrame: - """Combine multiple compressed DataFrames. - - Since chunks are partitioned by ICAO hash, each ICAO only appears in one chunk. - No deduplication needed here - just concatenate. - """ - print(f"Combining {len(compressed_dfs)} compressed chunks... | {get_resource_usage()}") - - # Concat all - combined = pl.concat(compressed_dfs) - print(f"Combined: {len(combined)} records") - - return combined - - -def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame: - """Download base release and merge with new data.""" - from src.get_latest_release import download_latest_aircraft_adsb_csv - - print("Downloading base ADS-B release...") - try: - base_path = download_latest_aircraft_adsb_csv( - output_dir="./data/openairframes_base" - ) - print(f"Download returned: {base_path}") - - if base_path and os.path.exists(str(base_path)): - print(f"Loading base release from {base_path}") - base_df = pl.read_csv(base_path) - print(f"Base release has {len(base_df)} records") - - # Ensure columns match - base_cols = set(base_df.columns) - new_cols = set(compressed_df.columns) - print(f"Base columns: {sorted(base_cols)}") - print(f"New columns: {sorted(new_cols)}") - - # Add missing columns - for col in new_cols - base_cols: - base_df = base_df.with_columns(pl.lit(None).alias(col)) - for col in base_cols - new_cols: - compressed_df = compressed_df.with_columns(pl.lit(None).alias(col)) - - # Reorder columns to match - compressed_df = compressed_df.select(base_df.columns) - - # Concat and deduplicate by icao (keep new data - it comes last) - combined = pl.concat([base_df, compressed_df]) - print(f"After concat: {len(combined)} records") - - deduplicated = combined.unique(subset=["icao"], keep="last") - - print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup") - - del base_df, combined - gc.collect() - - return deduplicated - else: - print(f"No base release found at {base_path}, using only new data") - return compressed_df - except Exception as e: - import traceback - print(f"Failed to download base release: {e}") - traceback.print_exc() - return compressed_df - - -def cleanup_chunks(output_id: str, chunks_dir: str): - """Delete chunk parquet files after successful merge.""" - pattern = os.path.join(chunks_dir, f"chunk_*_{output_id}.parquet") - chunk_files = glob.glob(pattern) - for f in chunk_files: - try: - os.remove(f) - print(f"Deleted {f}") - except Exception as e: - print(f"Failed to delete {f}: {e}") - - -def find_chunk_files(chunks_dir: str, output_id: str) -> list[str]: - """Find chunk parquet files matching the output ID.""" - pattern = os.path.join(chunks_dir, f"chunk_*_{output_id}.parquet") - chunk_files = sorted(glob.glob(pattern)) - - if not chunk_files: - # Try recursive search for historical mode with merged artifacts - pattern = os.path.join(chunks_dir, "**", "*.parquet") - chunk_files = sorted(glob.glob(pattern, recursive=True)) - - return chunk_files - - -def main(): - parser = argparse.ArgumentParser(description="Combine chunk parquets to final CSV") - parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") - parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") - parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") - parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files") - parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release") - parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging") - parser.add_argument("--stream", action="store_true", help="Delete parquet files immediately after loading to save disk space") - args = parser.parse_args() - - # Determine output ID and filename based on mode - if args.start_date and args.end_date: - # Historical mode - output_id = f"{args.start_date}_{args.end_date}" - output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv" - print(f"Combining chunks for date range: {args.start_date} to {args.end_date}") - else: - # Daily mode - use same date for start and end - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") - else: - target_day = get_target_day() - - date_str = target_day.strftime("%Y-%m-%d") - output_id = date_str - output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv" - print(f"Combining chunks for {date_str}") - - chunks_dir = args.chunks_dir - print(f"Chunks directory: {chunks_dir}") - print(f"Resource usage at start: {get_resource_usage()}") - - # Find chunk files - chunk_files = find_chunk_files(chunks_dir, output_id) - - if not chunk_files: - print(f"No chunk files found in: {chunks_dir}") - sys.exit(1) - - print(f"Found {len(chunk_files)} chunk files") - - # Process each chunk separately to save memory - # With --stream, delete parquet files immediately after loading to save disk space - compressed_chunks = [] - for chunk_path in chunk_files: - compressed = process_single_chunk(chunk_path, delete_after_load=args.stream) - compressed_chunks.append(compressed) - gc.collect() - - # Combine all compressed chunks - combined = combine_compressed_chunks(compressed_chunks) - - # Free memory from individual chunks - del compressed_chunks - gc.collect() - print(f"After combining: {get_resource_usage()}") - - # Merge with base release (unless skipped) - if not args.skip_base: - combined = download_and_merge_base_release(combined) - - # Convert list columns to strings for CSV compatibility - for col in combined.columns: - if combined[col].dtype == pl.List: - combined = combined.with_columns( - pl.col(col).list.join(",").alias(col) - ) - - # Sort by time for consistent output - if 'time' in combined.columns: - combined = combined.sort('time') - - # Write final CSV - output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename) - combined.write_csv(output_path) - print(f"Wrote {len(combined)} records to {output_path}") - - # Cleanup - if not args.keep_chunks: - cleanup_chunks(output_id, chunks_dir) - - print(f"Done! | {get_resource_usage()}") - - -if __name__ == "__main__": - main() diff --git a/src/adsb/compress_adsb_to_aircraft_data.py b/src/adsb/compress_adsb_to_aircraft_data.py index 0938883..cf995b7 100644 --- a/src/adsb/compress_adsb_to_aircraft_data.py +++ b/src/adsb/compress_adsb_to_aircraft_data.py @@ -5,23 +5,6 @@ import polars as pl COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't'] -def deduplicate_by_signature(df: pl.DataFrame) -> pl.DataFrame: - """For each icao, keep only the earliest row with each unique signature. - - This is used for deduplicating across multiple compressed chunks. - """ - # Create signature column - df = df.with_columns( - pl.concat_str([pl.col(c).cast(pl.Utf8).fill_null("") for c in COLUMNS], separator="|").alias("_signature") - ) - # Group by icao and signature, take first row (earliest due to time sort) - df = df.sort("time") - df_deduped = df.group_by(["icao", "_signature"]).first() - df_deduped = df_deduped.drop("_signature") - df_deduped = df_deduped.sort("time") - return df_deduped - - def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame: """Compress a single ICAO group to its most informative row using Polars.""" # Create signature string @@ -99,9 +82,6 @@ def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame: def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame: """Compress a DataFrame with multiple ICAOs to one row per ICAO. - This is the main entry point for compressing ADS-B data. - Used by both daily GitHub Actions runs and historical AWS runs. - Args: df: DataFrame with columns ['time', 'icao'] + COLUMNS verbose: Whether to print progress @@ -120,29 +100,27 @@ def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFra if col in df.columns: df = df.with_columns(pl.col(col).cast(pl.Utf8).fill_null("")) - # First pass: quick deduplication of exact duplicates + # Quick deduplication of exact duplicates df = df.unique(subset=['icao'] + COLUMNS, keep='first') if verbose: print(f"After quick dedup: {df.height} records") - # Second pass: sophisticated compression per ICAO + # Compress per ICAO if verbose: print("Compressing per ICAO...") - # Process each ICAO group icao_groups = df.partition_by('icao', as_dict=True, maintain_order=True) compressed_dfs = [] for icao_key, group_df in icao_groups.items(): - # partition_by with as_dict=True returns tuple keys, extract first element - icao = icao_key[0] if isinstance(icao_key, tuple) else icao_key + icao = icao_key[0] compressed = compress_df_polars(group_df, str(icao)) compressed_dfs.append(compressed) if compressed_dfs: df_compressed = pl.concat(compressed_dfs) else: - df_compressed = df.head(0) # Empty with same schema + df_compressed = df.head(0) if verbose: print(f"After compress: {df_compressed.height} records") @@ -155,45 +133,22 @@ def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFra return df_compressed -def load_raw_adsb_for_day(day): - """Load raw ADS-B data for a day from parquet file.""" - from datetime import timedelta +def load_parquet_part(part_id: int, date: str) -> pl.DataFrame: + """Load a single parquet part file for a date. + + Args: + part_id: Part ID (e.g., 1, 2, 3) + date: Date string in YYYY-MM-DD format + + Returns: + DataFrame with ADS-B data + """ from pathlib import Path - start_time = day.replace(hour=0, minute=0, second=0, microsecond=0) - - # Check for parquet file first - version_date = f"v{start_time.strftime('%Y.%m.%d')}" - parquet_file = Path(f"data/output/parquet_output/{version_date}.parquet") + parquet_file = Path(f"data/output/parquet_output/part_{part_id}_{date}.parquet") if not parquet_file.exists(): - # Try to generate parquet file by calling the download function - print(f" Parquet file not found: {parquet_file}") - print(f" Attempting to download and generate parquet for {start_time.strftime('%Y-%m-%d')}...") - - from download_adsb_data_to_parquet import create_parquet_for_day - result_path = create_parquet_for_day(start_time, keep_folders=False) - - if result_path: - print(f" Successfully generated parquet file: {result_path}") - else: - raise Exception("Failed to generate parquet file") - - if parquet_file.exists(): - print(f" Loading from parquet: {parquet_file}") - df = pl.read_parquet( - parquet_file, - columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category'] - ) - - # Convert to timezone-naive datetime - if df["time"].dtype == pl.Datetime: - df = df.with_columns(pl.col("time").dt.replace_time_zone(None)) - - return df - else: - # Return empty DataFrame if parquet file doesn't exist - print(f" No data available for {start_time.strftime('%Y-%m-%d')}") + print(f"Parquet file not found: {parquet_file}") return pl.DataFrame(schema={ 'time': pl.Datetime, 'icao': pl.Utf8, @@ -205,17 +160,33 @@ def load_raw_adsb_for_day(day): 'desc': pl.Utf8, 'aircraft_category': pl.Utf8 }) + + print(f"Loading from parquet: {parquet_file}") + df = pl.read_parquet( + parquet_file, + columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category'] + ) + + # Convert to timezone-naive datetime + if df["time"].dtype == pl.Datetime: + df = df.with_columns(pl.col("time").dt.replace_time_zone(None)) + os.remove(parquet_file) + return df -def load_historical_for_day(day): - """Load and compress historical ADS-B data for a day.""" - df = load_raw_adsb_for_day(day) +def compress_parquet_part(part_id: int, date: str) -> pl.DataFrame: + """Load and compress a single parquet part file.""" + df = load_parquet_part(part_id, date) + if df.height == 0: return df + + # Filter to rows within the given date (UTC-naive). This is because sometimes adsb.lol export can have rows at 00:00:00 of next day or similar. + date_lit = pl.lit(date).str.strptime(pl.Date, "%Y-%m-%d") + df = df.filter(pl.col("time").dt.date() == date_lit) - print(f"Loaded {df.height} raw records for {day.strftime('%Y-%m-%d')}") + print(f"Loaded {df.height} raw records for part {part_id}, date {date}") - # Use shared compression function return compress_multi_icao_df(df, verbose=True) @@ -223,52 +194,4 @@ def concat_compressed_dfs(df_base, df_new): """Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.""" # Combine both dataframes df_combined = pl.concat([df_base, df_new]) - - # Sort by ICAO and time - df_combined = df_combined.sort(['icao', 'time']) - - # Fill null values - for col in COLUMNS: - if col in df_combined.columns: - df_combined = df_combined.with_columns(pl.col(col).fill_null("")) - - # Apply compression logic per ICAO to get the best row - icao_groups = df_combined.partition_by('icao', as_dict=True, maintain_order=True) - compressed_dfs = [] - - for icao, group_df in icao_groups.items(): - compressed = compress_df_polars(group_df, icao) - compressed_dfs.append(compressed) - - if compressed_dfs: - df_compressed = pl.concat(compressed_dfs) - else: - df_compressed = df_combined.head(0) - - # Sort by time - df_compressed = df_compressed.sort('time') - - return df_compressed - - -def get_latest_aircraft_adsb_csv_df(): - """Download and load the latest ADS-B CSV from GitHub releases.""" - from get_latest_release import download_latest_aircraft_adsb_csv - import re - - csv_path = download_latest_aircraft_adsb_csv() - df = pl.read_csv(csv_path, null_values=[""]) - - # Fill nulls with empty strings - for col in df.columns: - if df[col].dtype == pl.Utf8: - df = df.with_columns(pl.col(col).fill_null("")) - - # Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv - match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) - if not match: - raise ValueError(f"Could not extract date from filename: {csv_path.name}") - - date_str = match.group(1) - return df, date_str - + return df_combined \ No newline at end of file diff --git a/src/adsb/concat_parquet_to_final.py b/src/adsb/concat_parquet_to_final.py new file mode 100644 index 0000000..da88019 --- /dev/null +++ b/src/adsb/concat_parquet_to_final.py @@ -0,0 +1,50 @@ +from pathlib import Path +import polars as pl +import argparse + +OUTPUT_DIR = Path("./data/output") +CORRECT_ORDER_OF_COLUMNS = ["time", "icao", "r", "t", "dbFlags", "ownOp", "year", "desc", "aircraft_category"] + +def main(): + parser = argparse.ArgumentParser(description="Concatenate compressed parquet files for a single day") + parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format") + parser.add_argument("--concat_with_latest_csv", action="store_true", help="Whether to also concatenate with the latest CSV from GitHub releases") + args = parser.parse_args() + + compressed_dir = OUTPUT_DIR / "compressed" + date_dir = compressed_dir / args.date + if not date_dir.is_dir(): + raise FileNotFoundError(f"No date folder found: {date_dir}") + + parquet_files = sorted(date_dir.glob("*.parquet")) + if not parquet_files: + raise FileNotFoundError(f"No parquet files found in {date_dir}") + + frames = [pl.read_parquet(p) for p in parquet_files] + df = pl.concat(frames, how="vertical", rechunk=True) + + df = df.sort(["time", "icao"]) + df = df.select(CORRECT_ORDER_OF_COLUMNS) + + output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.parquet" + print(f"Writing combined parquet to {output_path} with {df.height} rows") + df.write_parquet(output_path) + + csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.csv.gz" + print(f"Writing combined csv.gz to {csv_output_path} with {df.height} rows") + df.write_csv(csv_output_path, compression="gzip") + + if args.concat_with_latest_csv: + print("Loading latest CSV from GitHub releases to concatenate with...") + from src.get_latest_release import get_latest_aircraft_adsb_csv_df + df_latest_csv, csv_date = get_latest_aircraft_adsb_csv_df() + # Ensure column order matches before concatenating + df_latest_csv = df_latest_csv.select(CORRECT_ORDER_OF_COLUMNS) + from src.adsb.compress_adsb_to_aircraft_data import concat_compressed_dfs + df_final = concat_compressed_dfs(df_latest_csv, df) + df_final = df_final.select(CORRECT_ORDER_OF_COLUMNS) + final_csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{csv_date}_{args.date}.csv.gz" + df_final.write_csv(final_csv_output_path, compression="gzip") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/adsb/download_adsb_data_to_parquet.py b/src/adsb/download_adsb_data_to_parquet.py index 579a375..4dfe3eb 100644 --- a/src/adsb/download_adsb_data_to_parquet.py +++ b/src/adsb/download_adsb_data_to_parquet.py @@ -1,42 +1,33 @@ """ Downloads adsb.lol data and writes to Parquet files. -Usage: - python -m src.process_historical_adsb_data.download_to_parquet 2025-01-01 2025-01-02 - -This will download trace data for the specified date range and output Parquet files. - -This file is self-contained and does not import from other project modules. +This file contains utility functions for downloading and processing adsb.lol trace data. +Used by the historical ADS-B processing pipeline. """ -import gc -import glob +import datetime as dt import gzip +import os +import re import resource import shutil -import sys -import logging -import time -import re import signal -import concurrent.futures import subprocess -import os -import argparse -import datetime as dt -from datetime import datetime, timedelta, timezone -import urllib.request +import sys import urllib.error - +import urllib.request +from datetime import datetime +import time import orjson import pyarrow as pa import pyarrow.parquet as pq +from pathlib import Path # ============================================================================ # Configuration # ============================================================================ -OUTPUT_DIR = "./data/output" +OUTPUT_DIR = Path("./data/output") os.makedirs(OUTPUT_DIR, exist_ok=True) PARQUET_DIR = os.path.join(OUTPUT_DIR, "parquet_output") @@ -76,20 +67,16 @@ def timeout_handler(signum, frame): raise DownloadTimeoutException("Download timed out after 40 seconds") -def fetch_releases(version_date: str) -> list: - """Fetch GitHub releases for a given version date from adsblol.""" - year = version_date.split('.')[0][1:] - if version_date == "v2024.12.31": - year = "2025" +def _fetch_releases_from_repo(year: str, version_date: str) -> list: + """Fetch GitHub releases for a given version date from a specific year's adsblol repo.""" BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases" - # Match exact release name, exclude tmp releases - PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+$" + PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+(tmp)?$" releases = [] page = 1 while True: max_retries = 10 - retry_delay = 60 + retry_delay = 60*5 for attempt in range(1, max_retries + 1): try: @@ -101,7 +88,7 @@ def fetch_releases(version_date: str) -> list: else: print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status} {response.reason}") if attempt < max_retries: - print(f"Waiting {retry_delay} seconds before retry...") + print(f"Waiting {retry_delay} seconds before retry") time.sleep(retry_delay) else: print(f"Giving up after {max_retries} attempts") @@ -109,7 +96,7 @@ def fetch_releases(version_date: str) -> list: except Exception as e: print(f"Request exception (attempt {attempt}/{max_retries}): {e}") if attempt < max_retries: - print(f"Waiting {retry_delay} seconds before retry...") + print(f"Waiting {retry_delay} seconds before retry") time.sleep(retry_delay) else: print(f"Giving up after {max_retries} attempts") @@ -123,6 +110,25 @@ def fetch_releases(version_date: str) -> list: return releases +def fetch_releases(version_date: str) -> list: + """Fetch GitHub releases for a given version date from adsblol. + + For Dec 31 dates, if no releases are found in the current year's repo, + also checks the next year's repo (adsblol sometimes publishes Dec 31 + data in the following year's repository). + """ + year = version_date.split('.')[0][1:] + releases = _fetch_releases_from_repo(year, version_date) + + # For last day of year, also check next year's repo if nothing found + if not releases and version_date.endswith(".12.31"): + next_year = str(int(year) + 1) + print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo") + releases = _fetch_releases_from_repo(next_year, version_date) + + return releases + + def download_asset(asset_url: str, file_path: str) -> bool: """Download a single release asset.""" os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True) @@ -131,33 +137,58 @@ def download_asset(asset_url: str, file_path: str) -> bool: print(f"[SKIP] {file_path} already downloaded.") return True - print(f"Downloading {asset_url}...") - try: - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(40) # 40-second timeout - - req = urllib.request.Request(asset_url, headers=HEADERS) - with urllib.request.urlopen(req) as response: - signal.alarm(0) - - if response.status == 200: - with open(file_path, "wb") as file: - while True: - chunk = response.read(8192) - if not chunk: - break - file.write(chunk) - print(f"Saved {file_path}") - return True + max_retries = 2 + retry_delay = 30 + timeout_seconds = 140 + + for attempt in range(1, max_retries + 1): + print(f"Downloading {asset_url} (attempt {attempt}/{max_retries})") + try: + req = urllib.request.Request(asset_url, headers=HEADERS) + with urllib.request.urlopen(req, timeout=timeout_seconds) as response: + if response.status == 200: + with open(file_path, "wb") as file: + while True: + chunk = response.read(8192) + if not chunk: + break + file.write(chunk) + print(f"Saved {file_path}") + return True + else: + print(f"Failed to download {asset_url}: {response.status} {response.msg}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry") + time.sleep(retry_delay) + else: + return False + except urllib.error.HTTPError as e: + if e.code == 404: + print(f"404 Not Found: {asset_url}") + raise Exception(f"Asset not found (404): {asset_url}") + else: + print(f"HTTP error occurred (attempt {attempt}/{max_retries}): {e.code} {e.reason}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry") + time.sleep(retry_delay) + else: + return False + except urllib.error.URLError as e: + print(f"URL/Timeout error (attempt {attempt}/{max_retries}): {e}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry") + time.sleep(retry_delay) else: - print(f"Failed to download {asset_url}: {response.status} {response.msg}") return False - except DownloadTimeoutException as e: - print(f"Download aborted for {asset_url}: {e}") - return False - except Exception as e: - print(f"An error occurred while downloading {asset_url}: {e}") - return False + except Exception as e: + print(f"An error occurred (attempt {attempt}/{max_retries}): {e}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry") + time.sleep(retry_delay) + else: + return False + + return False def extract_split_archive(file_paths: list, extract_dir: str) -> bool: @@ -389,8 +420,6 @@ COLUMNS = [ OS_CPU_COUNT = os.cpu_count() or 1 MAX_WORKERS = OS_CPU_COUNT if OS_CPU_COUNT > 4 else 1 -CHUNK_SIZE = MAX_WORKERS * 500 # Reduced for lower RAM usage -BATCH_SIZE = 250_000 # Fixed size for predictable memory usage (~500MB per batch) # PyArrow schema for efficient Parquet writing PARQUET_SCHEMA = pa.schema([ @@ -478,211 +507,6 @@ def collect_trace_files_with_find(root_dir): return trace_dict -def generate_version_dates(start_date: str, end_date: str) -> list: - """Generate a list of dates from start_date to end_date inclusive.""" - start = datetime.strptime(start_date, "%Y-%m-%d") - end = datetime.strptime(end_date, "%Y-%m-%d") - delta = end - start - return [start + timedelta(days=i) for i in range(delta.days + 1)] - - -def safe_process(fp): - """Safely process a file, returning empty list on error.""" - try: - return process_file(fp) - except Exception as e: - logging.error(f"Error processing {fp}: {e}") - return [] - - -def rows_to_arrow_table(rows: list) -> pa.Table: - """Convert list of rows to a PyArrow Table directly (no pandas).""" - # Transpose rows into columns - columns = list(zip(*rows)) - - # Build arrays for each column according to schema - arrays = [] - for i, field in enumerate(PARQUET_SCHEMA): - col_data = list(columns[i]) if i < len(columns) else [None] * len(rows) - arrays.append(pa.array(col_data, type=field.type)) - - return pa.Table.from_arrays(arrays, schema=PARQUET_SCHEMA) - - -def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int): - """Write a batch of rows to a Parquet file.""" - if not rows: - return - - table = rows_to_arrow_table(rows) - - parquet_path = os.path.join(PARQUET_DIR, f"{version_date}_batch_{batch_idx:04d}.parquet") - - pq.write_table(table, parquet_path, compression='snappy') - - print(f"Written parquet batch {batch_idx} ({len(rows)} rows) | {get_resource_usage()}") - - -def merge_parquet_files(version_date: str, delete_batches: bool = True): - """Merge all batch parquet files for a version_date into a single file using streaming.""" - pattern = os.path.join(PARQUET_DIR, f"{version_date}_batch_*.parquet") - batch_files = sorted(glob.glob(pattern)) - - if not batch_files: - print(f"No batch files found for {version_date}") - return None - - print(f"Merging {len(batch_files)} batch files for {version_date} (streaming)...") - - merged_path = os.path.join(PARQUET_DIR, f"{version_date}.parquet") - total_rows = 0 - - # Stream write: read one batch at a time to minimize RAM usage - writer = None - try: - for i, f in enumerate(batch_files): - table = pq.read_table(f) - total_rows += table.num_rows - - if writer is None: - writer = pq.ParquetWriter(merged_path, table.schema, compression='snappy') - - writer.write_table(table) - - # Delete batch file immediately after reading to free disk space - if delete_batches: - os.remove(f) - - # Free memory - del table - if (i + 1) % 10 == 0: - gc.collect() - print(f" Merged {i + 1}/{len(batch_files)} batches... | {get_resource_usage()}") - finally: - if writer is not None: - writer.close() - - print(f"Merged parquet file written to {merged_path} ({total_rows} total rows) | {get_resource_usage()}") - - if delete_batches: - print(f"Deleted {len(batch_files)} batch files during merge") - - gc.collect() - return merged_path - - -def process_version_date(version_date: str, keep_folders: bool = False): - """Download, extract, and process trace files for a single version date.""" - print(f"\nProcessing version_date: {version_date}") - extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") - - def collect_trace_files_for_version_date(vd): - releases = fetch_releases(vd) - if len(releases) == 0: - print(f"No releases found for {vd}.") - return None - - downloaded_files = [] - for release in releases: - tag_name = release["tag_name"] - print(f"Processing release: {tag_name}") - - # Only download prod-0 if available, else prod-0tmp - assets = release.get("assets", []) - normal_assets = [ - a for a in assets - if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"] - ] - tmp_assets = [ - a for a in assets - if "planes-readsb-prod-0tmp" in a["name"] - ] - use_assets = normal_assets if normal_assets else tmp_assets - - for asset in use_assets: - asset_name = asset["name"] - asset_url = asset["browser_download_url"] - file_path = os.path.join(OUTPUT_DIR, asset_name) - result = download_asset(asset_url, file_path) - if result: - downloaded_files.append(file_path) - - extract_split_archive(downloaded_files, extract_dir) - return collect_trace_files_with_find(extract_dir) - - # Check if files already exist - pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*") - matches = [p for p in glob.glob(pattern) if os.path.isfile(p)] - - if matches: - print(f"Found existing files for {version_date}:") - # Prefer non-tmp slices when reusing existing files - normal_matches = [ - p for p in matches - if "-planes-readsb-prod-0." in os.path.basename(p) - and "tmp" not in os.path.basename(p) - ] - downloaded_files = normal_matches if normal_matches else matches - - extract_split_archive(downloaded_files, extract_dir) - trace_files = collect_trace_files_with_find(extract_dir) - else: - trace_files = collect_trace_files_for_version_date(version_date) - - if trace_files is None or len(trace_files) == 0: - print(f"No trace files found for version_date: {version_date}") - return 0 - - file_list = list(trace_files.values()) - - start_time = time.perf_counter() - total_num_rows = 0 - batch_rows = [] - batch_idx = 0 - - # Process files in chunks - for offset in range(0, len(file_list), CHUNK_SIZE): - chunk = file_list[offset:offset + CHUNK_SIZE] - with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as process_executor: - for rows in process_executor.map(safe_process, chunk): - if not rows: - continue - batch_rows.extend(rows) - - if len(batch_rows) >= BATCH_SIZE: - total_num_rows += len(batch_rows) - write_batch_to_parquet(batch_rows, version_date, batch_idx) - batch_idx += 1 - batch_rows = [] - - elapsed = time.perf_counter() - start_time - speed = total_num_rows / elapsed if elapsed > 0 else 0 - print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)") - - gc.collect() - - # Final batch - if batch_rows: - total_num_rows += len(batch_rows) - write_batch_to_parquet(batch_rows, version_date, batch_idx) - elapsed = time.perf_counter() - start_time - speed = total_num_rows / elapsed if elapsed > 0 else 0 - print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)") - - print(f"Total rows processed for version_date {version_date}: {total_num_rows}") - - # Clean up extracted directory immediately after processing (before merging parquet files) - if not keep_folders and os.path.isdir(extract_dir): - print(f"Deleting extraction directory with 100,000+ files: {extract_dir}") - shutil.rmtree(extract_dir) - print(f"Successfully deleted extraction directory: {extract_dir} | {get_resource_usage()}") - - # Merge batch files into a single parquet file - merge_parquet_files(version_date, delete_batches=True) - - return total_num_rows - - def create_parquet_for_day(day, keep_folders: bool = False): """Create parquet file for a single day. @@ -706,42 +530,10 @@ def create_parquet_for_day(day, keep_folders: bool = False): print(f"Parquet file already exists: {parquet_path}") return parquet_path - print(f"Creating parquet for {version_date}...") + print(f"Creating parquet for {version_date}") rows_processed = process_version_date(version_date, keep_folders) if rows_processed > 0 and parquet_path.exists(): return parquet_path else: return None - - -def main(start_date: str, end_date: str, keep_folders: bool = False): - """Main function to download and convert adsb.lol data to Parquet.""" - version_dates = [f"v{date.strftime('%Y.%m.%d')}" for date in generate_version_dates(start_date, end_date)] - print(f"Processing dates: {version_dates}") - - total_rows_all = 0 - for version_date in version_dates: - rows_processed = process_version_date(version_date, keep_folders) - total_rows_all += rows_processed - - print(f"\n=== Summary ===") - print(f"Total dates processed: {len(version_dates)}") - print(f"Total rows written to Parquet: {total_rows_all}") - print(f"Parquet files location: {PARQUET_DIR}") - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO, stream=sys.stdout, force=True) - - parser = argparse.ArgumentParser( - description="Download adsb.lol data and write to Parquet files" - ) - parser.add_argument("start_date", type=str, help="Start date in YYYY-MM-DD format") - parser.add_argument("end_date", type=str, help="End date in YYYY-MM-DD format") - parser.add_argument("--keep-folders", action="store_true", - help="Keep extracted folders after processing") - - args = parser.parse_args() - - main(args.start_date, args.end_date, args.keep_folders) diff --git a/src/adsb/download_and_list_icaos.py b/src/adsb/download_and_list_icaos.py index 8893f24..fb12377 100644 --- a/src/adsb/download_and_list_icaos.py +++ b/src/adsb/download_and_list_icaos.py @@ -1,9 +1,7 @@ """ -Downloads and extracts adsb.lol tar files, then lists all ICAO folders. +Downloads and extracts adsb.lol tar files for a single day, then lists all ICAO folders. This is the first step of the map-reduce pipeline. -Supports both single-day (daily) and multi-day (historical) modes. - Outputs: - Extracted trace files in data/output/{version_date}-planes-readsb-prod-0.tar_0/ - ICAO manifest at data/output/icao_manifest_{date}.txt @@ -25,11 +23,6 @@ from src.adsb.download_adsb_data_to_parquet import ( ) -def get_target_day() -> datetime: - """Get yesterday's date (the day we're processing).""" - return datetime.utcnow() - timedelta(days=1) - - def download_and_extract(version_date: str) -> str | None: """Download and extract tar files, return extract directory path.""" extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") @@ -59,6 +52,12 @@ def download_and_extract(version_date: str) -> str | None: print(f"No releases found for {version_date}") return None + # Prefer non-tmp releases; only use tmp if no normal releases exist + normal_releases = [r for r in releases if "tmp" not in r["tag_name"]] + tmp_releases = [r for r in releases if "tmp" in r["tag_name"]] + releases = normal_releases if normal_releases else tmp_releases + print(f"Using {'normal' if normal_releases else 'tmp'} releases ({len(releases)} found)") + downloaded_files = [] for release in releases: tag_name = release["tag_name"] @@ -100,21 +99,6 @@ def list_icao_folders(extract_dir: str) -> list[str]: return icaos -def write_manifest(icaos: list[str], manifest_id: str) -> str: - """Write ICAO list to manifest file. - - Args: - icaos: List of ICAO codes - manifest_id: Identifier for manifest file (date or date range) - """ - manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{manifest_id}.txt") - with open(manifest_path, "w") as f: - for icao in sorted(icaos): - f.write(f"{icao}\n") - print(f"Wrote manifest with {len(icaos)} ICAOs to {manifest_path}") - return manifest_path - - def process_single_day(target_day: datetime) -> tuple[str | None, list[str]]: """Process a single day: download, extract, list ICAOs. @@ -129,82 +113,50 @@ def process_single_day(target_day: datetime) -> tuple[str | None, list[str]]: extract_dir = download_and_extract(version_date) if not extract_dir: print(f"Failed to download/extract data for {date_str}") - return None, [] + raise Exception(f"No data available for {date_str}") icaos = list_icao_folders(extract_dir) print(f"Found {len(icaos)} ICAOs for {date_str}") return extract_dir, icaos - -def process_date_range(start_date: datetime, end_date: datetime) -> set[str]: - """Process multiple days: download, extract, combine ICAO lists. - - Args: - start_date: Start date (inclusive) - end_date: End date (inclusive) - - Returns: - Combined set of all ICAOs across the date range - """ - all_icaos: set[str] = set() - current = start_date - - # Both start and end are inclusive - while current <= end_date: - _, icaos = process_single_day(current) - all_icaos.update(icaos) - current += timedelta(days=1) - - return all_icaos +from pathlib import Path +import tarfile +NUMBER_PARTS = 4 +def split_folders_into_gzip_archives(extract_dir: Path, tar_output_dir: Path, icaos: list[str], parts = NUMBER_PARTS) -> list[str]: + traces_dir = extract_dir / "traces" + buckets = sorted(traces_dir.iterdir()) + tars = [] + for i in range(parts): + tar_path = tar_output_dir / f"{tar_output_dir.name}_part_{i}.tar.gz" + tars.append(tarfile.open(tar_path, "w:gz")) + for idx, bucket_path in enumerate(buckets): + tar_idx = idx % parts + tars[tar_idx].add(bucket_path, arcname=bucket_path.name) + for tar in tars: + tar.close() def main(): - parser = argparse.ArgumentParser(description="Download and list ICAOs from adsb.lol data") + parser = argparse.ArgumentParser(description="Download and list ICAOs from adsb.lol data for a single day") parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") - parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") - parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") args = parser.parse_args() - # Determine mode: single day or date range - if args.start_date and args.end_date: - # Historical mode: process date range - start_date = datetime.strptime(args.start_date, "%Y-%m-%d") - end_date = datetime.strptime(args.end_date, "%Y-%m-%d") - - print(f"Processing date range: {args.start_date} to {args.end_date}") - - all_icaos = process_date_range(start_date, end_date) - - if not all_icaos: - print("No ICAOs found in date range") - sys.exit(1) - - # Write combined manifest with range identifier - manifest_id = f"{args.start_date}_{args.end_date}" - write_manifest(list(all_icaos), manifest_id) - - print(f"\nDone! Total ICAOs: {len(all_icaos)}") - - else: - # Daily mode: single day - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") - else: - target_day = get_target_day() - - date_str = target_day.strftime("%Y-%m-%d") - - extract_dir, icaos = process_single_day(target_day) - - if not icaos: - print("No ICAOs found") - sys.exit(1) - - write_manifest(icaos, date_str) - - print(f"\nDone! Extract dir: {extract_dir}") - print(f"Total ICAOs: {len(icaos)}") + target_day = datetime.strptime(args.date, "%Y-%m-%d") + date_str = target_day.strftime("%Y-%m-%d") + tar_output_dir = Path(f"./data/output/adsb_archives/{date_str}") + + extract_dir, icaos = process_single_day(target_day) + extract_dir = Path(extract_dir) + print(extract_dir) + tar_output_dir.mkdir(parents=True, exist_ok=True) + split_folders_into_gzip_archives(extract_dir, tar_output_dir, icaos) + if not icaos: + print("No ICAOs found") + sys.exit(1) + + print(f"\nDone! Extract dir: {extract_dir}") + print(f"Total ICAOs: {len(icaos)}") if __name__ == "__main__": diff --git a/src/adsb/historical_generate_matrix.py b/src/adsb/historical_generate_matrix.py index cd11789..ef28e4f 100644 --- a/src/adsb/historical_generate_matrix.py +++ b/src/adsb/historical_generate_matrix.py @@ -41,7 +41,7 @@ def main() -> None: """Main entry point for GitHub Actions.""" start_date = os.environ.get("INPUT_START_DATE") end_date = os.environ.get("INPUT_END_DATE") - chunk_days = int(os.environ.get("INPUT_CHUNK_DAYS", "7")) + chunk_days = int(os.environ.get("INPUT_CHUNK_DAYS", "1")) if not start_date or not end_date: print("ERROR: INPUT_START_DATE and INPUT_END_DATE must be set", file=sys.stderr) diff --git a/src/adsb/main.py b/src/adsb/main.py new file mode 100644 index 0000000..08b4412 --- /dev/null +++ b/src/adsb/main.py @@ -0,0 +1,78 @@ +""" +Main pipeline for processing ADS-B data from adsb.lol. + +Usage: + python -m src.adsb.main --date 2026-01-01 + python -m src.adsb.main --start_date 2026-01-01 --end_date 2026-01-03 +""" +import argparse +import subprocess +import sys +from datetime import datetime, timedelta + +import polars as pl + +from src.adsb.download_and_list_icaos import NUMBER_PARTS + + +def main(): + parser = argparse.ArgumentParser(description="Process ADS-B data for a single day or date range") + parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format") + parser.add_argument("--start_date", type=str, help="Start date (inclusive, YYYY-MM-DD)") + parser.add_argument("--end_date", type=str, help="End date (exclusive, YYYY-MM-DD)") + parser.add_argument("--concat_with_latest_csv", action="store_true", help="Also concatenate with latest CSV from GitHub releases") + args = parser.parse_args() + + if args.date and (args.start_date or args.end_date): + raise SystemExit("Use --date or --start_date/--end_date, not both.") + + if args.date: + start_date = datetime.strptime(args.date, "%Y-%m-%d") + end_date = start_date + timedelta(days=1) + else: + if not args.start_date or not args.end_date: + raise SystemExit("Provide --start_date and --end_date, or use --date.") + start_date = datetime.strptime(args.start_date, "%Y-%m-%d") + end_date = datetime.strptime(args.end_date, "%Y-%m-%d") + + current = start_date + while current < end_date: + date_str = current.strftime("%Y-%m-%d") + print(f"Processing day: {date_str}") + + # Download and split + subprocess.run([sys.executable, "-m", "src.adsb.download_and_list_icaos", "--date", date_str], check=True) + + # Process parts + for part_id in range(NUMBER_PARTS): + subprocess.run([sys.executable, "-m", "src.adsb.process_icao_chunk", "--part-id", str(part_id), "--date", date_str], check=True) + + # Concatenate + concat_cmd = [sys.executable, "-m", "src.adsb.concat_parquet_to_final", "--date", date_str] + if args.concat_with_latest_csv: + concat_cmd.append("--concat_with_latest_csv") + subprocess.run(concat_cmd, check=True) + + current += timedelta(days=1) + + if end_date - start_date > timedelta(days=1): + dates = [] + cur = start_date + while cur < end_date: + dates.append(cur.strftime("%Y-%m-%d")) + cur += timedelta(days=1) + csv_files = [ + f"data/outputs/openairframes_adsb_{d}_{d}.csv" + for d in dates + ] + frames = [pl.read_csv(p) for p in csv_files] + df = pl.concat(frames, how="vertical", rechunk=True) + output_path = f"data/outputs/openairframes_adsb_{start_date.strftime('%Y-%m-%d')}_{end_date.strftime('%Y-%m-%d')}.csv" + df.write_csv(output_path) + print(f"Wrote combined CSV: {output_path}") + + print("Done") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/adsb/process_icao_chunk.py b/src/adsb/process_icao_chunk.py index e67673f..0687652 100644 --- a/src/adsb/process_icao_chunk.py +++ b/src/adsb/process_icao_chunk.py @@ -1,18 +1,9 @@ """ -Processes a chunk of ICAOs from pre-extracted trace files. +Processes trace files from a single archive part for a single day. This is the map phase of the map-reduce pipeline. -Supports both single-day (daily) and multi-day (historical) modes. - -Expects extract_dir to already exist with trace files. -Reads ICAO manifest to determine which ICAOs to process based on chunk-id. - Usage: - # Daily mode (single day) - python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 - - # Historical mode (date range) - python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 --start-date 2024-01-01 --end-date 2024-01-07 + python -m src.adsb.process_icao_chunk --part-id 1 --date 2026-01-01 """ import gc import os @@ -21,6 +12,9 @@ import argparse import time import concurrent.futures from datetime import datetime, timedelta +import tarfile +import tempfile +import shutil import pyarrow as pa import pyarrow.parquet as pq @@ -37,72 +31,21 @@ from src.adsb.download_adsb_data_to_parquet import ( ) -CHUNK_OUTPUT_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks") -os.makedirs(CHUNK_OUTPUT_DIR, exist_ok=True) - # Smaller batch size for memory efficiency BATCH_SIZE = 100_000 - -def get_target_day() -> datetime: - """Get yesterday's date (the day we're processing).""" - return datetime.utcnow() - timedelta(days=1) - - -def read_manifest(manifest_id: str) -> list[str]: - """Read ICAO manifest file. +def build_trace_file_map(archive_path: str) -> dict[str, str]: + """Build a map of ICAO -> trace file path by extracting tar.gz archive.""" + print(f"Extracting {archive_path}...") - Args: - manifest_id: Either a date string (YYYY-MM-DD) or range string (YYYY-MM-DD_YYYY-MM-DD) - """ - manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{manifest_id}.txt") - if not os.path.exists(manifest_path): - raise FileNotFoundError(f"Manifest not found: {manifest_path}") + temp_dir = tempfile.mkdtemp(prefix="adsb_extract_") - with open(manifest_path, "r") as f: - icaos = [line.strip() for line in f if line.strip()] - return icaos - - -def deterministic_hash(s: str) -> int: - """Return a deterministic hash for a string (unlike Python's hash() which is randomized).""" - # Use sum of byte values - simple but deterministic - return sum(ord(c) for c in s) - - -def get_chunk_icaos(icaos: list[str], chunk_id: int, total_chunks: int) -> list[str]: - """Get the subset of ICAOs for this chunk based on deterministic hash partitioning.""" - return [icao for icao in icaos if deterministic_hash(icao) % total_chunks == chunk_id] - - -def build_trace_file_map(extract_dir: str) -> dict[str, str]: - """Build a map of ICAO -> trace file path using find command.""" - print(f"Building trace file map from {extract_dir}...") + with tarfile.open(archive_path, 'r:gz') as tar: + tar.extractall(path=temp_dir, filter='data') - # Debug: check what's in extract_dir - if os.path.isdir(extract_dir): - items = os.listdir(extract_dir)[:10] - print(f"First 10 items in extract_dir: {items}") - # Check if there are subdirectories - for item in items[:3]: - subpath = os.path.join(extract_dir, item) - if os.path.isdir(subpath): - subitems = os.listdir(subpath)[:5] - print(f" Contents of {item}/: {subitems}") - - trace_map = collect_trace_files_with_find(extract_dir) + trace_map = collect_trace_files_with_find(temp_dir) print(f"Found {len(trace_map)} trace files") - if len(trace_map) == 0: - # Debug: try manual find - import subprocess - result = subprocess.run( - ['find', extract_dir, '-type', 'f', '-name', 'trace_full_*'], - capture_output=True, text=True - ) - print(f"Manual find output (first 500 chars): {result.stdout[:500]}") - print(f"Manual find stderr: {result.stderr[:200]}") - return trace_map @@ -125,42 +68,13 @@ def rows_to_table(rows: list) -> pa.Table: def process_chunk( - chunk_id: int, - total_chunks: int, - trace_map: dict[str, str], - icaos: list[str], - output_id: str, + trace_files: list[str], + part_id: int, + date_str: str, ) -> str | None: - """Process a chunk of ICAOs and write to parquet. + """Process trace files and write to a single parquet file.""" - Args: - chunk_id: This chunk's ID (0-indexed) - total_chunks: Total number of chunks - trace_map: Map of ICAO -> trace file path - icaos: Full list of ICAOs from manifest - output_id: Identifier for output file (date or date range) - """ - chunk_icaos = get_chunk_icaos(icaos, chunk_id, total_chunks) - print(f"Chunk {chunk_id}/{total_chunks}: Processing {len(chunk_icaos)} ICAOs") - - if not chunk_icaos: - print(f"Chunk {chunk_id}: No ICAOs to process") - return None - - # Get trace file paths from the map - trace_files = [] - for icao in chunk_icaos: - if icao in trace_map: - trace_files.append(trace_map[icao]) - - print(f"Chunk {chunk_id}: Found {len(trace_files)} trace files") - - if not trace_files: - print(f"Chunk {chunk_id}: No trace files found") - return None - - # Process files and write parquet in batches - output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{output_id}.parquet") + output_path = os.path.join(PARQUET_DIR, f"part_{part_id}_{date_str}.parquet") start_time = time.perf_counter() total_rows = 0 @@ -168,7 +82,8 @@ def process_chunk( writer = None try: - # Process in parallel batches + writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') + files_per_batch = MAX_WORKERS * 100 for offset in range(0, len(trace_files), files_per_batch): batch_files = trace_files[offset:offset + files_per_batch] @@ -178,166 +93,63 @@ def process_chunk( if rows: batch_rows.extend(rows) - # Write when batch is full if len(batch_rows) >= BATCH_SIZE: - table = rows_to_table(batch_rows) + writer.write_table(rows_to_table(batch_rows)) total_rows += len(batch_rows) - - if writer is None: - writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') - writer.write_table(table) - batch_rows = [] - del table gc.collect() - - elapsed = time.perf_counter() - start_time - print(f"Chunk {chunk_id}: {total_rows} rows, {elapsed:.1f}s | {get_resource_usage()}") - gc.collect() - # Write remaining rows if batch_rows: - table = rows_to_table(batch_rows) + writer.write_table(rows_to_table(batch_rows)) total_rows += len(batch_rows) - - if writer is None: - writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') - writer.write_table(table) - del table finally: if writer: writer.close() - elapsed = time.perf_counter() - start_time - print(f"Chunk {chunk_id}: Done! {total_rows} rows in {elapsed:.1f}s | {get_resource_usage()}") + print(f"Part {part_id}: Done! {total_rows} rows in {time.perf_counter() - start_time:.1f}s | {get_resource_usage()}") - if total_rows > 0: - return output_path - return None - - -def process_single_day( - chunk_id: int, - total_chunks: int, - target_day: datetime, -) -> str | None: - """Process a single day for this chunk.""" - date_str = target_day.strftime("%Y-%m-%d") - version_date = f"v{target_day.strftime('%Y.%m.%d')}" - - extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") - - if not os.path.isdir(extract_dir): - print(f"Extract directory not found: {extract_dir}") - return None - - trace_map = build_trace_file_map(extract_dir) - if not trace_map: - print("No trace files found") - return None - - icaos = read_manifest(date_str) - print(f"Total ICAOs in manifest: {len(icaos)}") - - return process_chunk(chunk_id, total_chunks, trace_map, icaos, date_str) - - -def process_date_range( - chunk_id: int, - total_chunks: int, - start_date: datetime, - end_date: datetime, -) -> str | None: - """Process a date range for this chunk. - - Combines trace files from all days in the range. - - Args: - chunk_id: This chunk's ID (0-indexed) - total_chunks: Total number of chunks - start_date: Start date (inclusive) - end_date: End date (inclusive) - """ - start_str = start_date.strftime("%Y-%m-%d") - end_str = end_date.strftime("%Y-%m-%d") - manifest_id = f"{start_str}_{end_str}" - - print(f"Processing date range: {start_str} to {end_str}") - - # Build combined trace map from all days - combined_trace_map: dict[str, str] = {} - current = start_date - - # Both start and end are inclusive - while current <= end_date: - version_date = f"v{current.strftime('%Y.%m.%d')}" - extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") - - if os.path.isdir(extract_dir): - trace_map = build_trace_file_map(extract_dir) - # Later days override earlier days (use most recent trace file) - combined_trace_map.update(trace_map) - print(f" {current.strftime('%Y-%m-%d')}: {len(trace_map)} trace files") - else: - print(f" {current.strftime('%Y-%m-%d')}: no extract directory") - - current += timedelta(days=1) - - if not combined_trace_map: - print("No trace files found in date range") - return None - - print(f"Combined trace map: {len(combined_trace_map)} ICAOs") - - icaos = read_manifest(manifest_id) - print(f"Total ICAOs in manifest: {len(icaos)}") - - return process_chunk(chunk_id, total_chunks, combined_trace_map, icaos, manifest_id) + return output_path if total_rows > 0 else None +from pathlib import Path def main(): - parser = argparse.ArgumentParser(description="Process a chunk of ICAOs") - parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)") - parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks") - parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)") - parser.add_argument("--start-date", type=str, help="Start date for range (YYYY-MM-DD)") - parser.add_argument("--end-date", type=str, help="End date for range (YYYY-MM-DD)") + parser = argparse.ArgumentParser(description="Process a single archive part for a day") + parser.add_argument("--part-id", type=int, required=True, help="Part ID (1-indexed)") + parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format") args = parser.parse_args() - print(f"Processing chunk {args.chunk_id}/{args.total_chunks}") - print(f"OUTPUT_DIR: {OUTPUT_DIR}") - print(f"CHUNK_OUTPUT_DIR: {CHUNK_OUTPUT_DIR}") - print(f"Resource usage at start: {get_resource_usage()}") + print(f"Processing part {args.part_id} for {args.date}") - # Debug: List what's in OUTPUT_DIR - print(f"\nContents of {OUTPUT_DIR}:") - if os.path.isdir(OUTPUT_DIR): - for item in os.listdir(OUTPUT_DIR)[:20]: - print(f" - {item}") - else: - print(f" Directory does not exist!") + # Get specific archive file for this part + archive_path = os.path.join(OUTPUT_DIR, "adsb_archives", args.date, f"{args.date}_part_{args.part_id}.tar.gz") - # Determine mode: single day or date range - if args.start_date and args.end_date: - # Historical mode - start_date = datetime.strptime(args.start_date, "%Y-%m-%d") - end_date = datetime.strptime(args.end_date, "%Y-%m-%d") - output_path = process_date_range(args.chunk_id, args.total_chunks, start_date, end_date) - else: - # Daily mode - if args.date: - target_day = datetime.strptime(args.date, "%Y-%m-%d") - else: - target_day = get_target_day() - output_path = process_single_day(args.chunk_id, args.total_chunks, target_day) + # Extract and collect trace files + trace_map = build_trace_file_map(archive_path) + all_trace_files = list(trace_map.values()) - if output_path: - print(f"Output: {output_path}") - else: - print("No output generated") + print(f"Total trace files: {len(all_trace_files)}") + + # Process and write output + output_path = process_chunk(all_trace_files, args.part_id, args.date) + + from src.adsb.compress_adsb_to_aircraft_data import compress_parquet_part + df_compressed = compress_parquet_part(args.part_id, args.date) + + # Write parquet + df_compressed_output = OUTPUT_DIR / "compressed" / args.date/ f"part_{args.part_id}_{args.date}.parquet" + os.makedirs(df_compressed_output.parent, exist_ok=True) + df_compressed.write_parquet(df_compressed_output, compression='snappy') + + # Write CSV + csv_output = OUTPUT_DIR / "compressed" / args.date / f"part_{args.part_id}_{args.date}.csv" + df_compressed.write_csv(csv_output) + + print(f"Raw output: {output_path}" if output_path else "No raw output generated") + print(f"Compressed parquet: {df_compressed_output}") + print(f"Compressed CSV: {csv_output}") if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/src/adsb/reducer.py b/src/adsb/reducer.py deleted file mode 100644 index 9dcdb91..0000000 --- a/src/adsb/reducer.py +++ /dev/null @@ -1,97 +0,0 @@ -""" -Reduce step: downloads all chunk CSVs from S3, combines them, -deduplicates across the full dataset, and uploads the final result. - -Environment variables: - S3_BUCKET — bucket with intermediate results - RUN_ID — run identifier matching the map workers - GLOBAL_START_DATE — overall start date for output filename - GLOBAL_END_DATE — overall end date for output filename -""" -import gzip -import os -import shutil -from pathlib import Path - -import boto3 -import polars as pl - -from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature - - -def main(): - s3_bucket = os.environ["S3_BUCKET"] - run_id = os.environ.get("RUN_ID", "default") - global_start = os.environ["GLOBAL_START_DATE"] - global_end = os.environ["GLOBAL_END_DATE"] - - s3 = boto3.client("s3") - prefix = f"intermediate/{run_id}/" - - # List all chunk files for this run - paginator = s3.get_paginator("list_objects_v2") - chunk_keys = [] - for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix): - for obj in page.get("Contents", []): - if obj["Key"].endswith(".csv.gz"): - chunk_keys.append(obj["Key"]) - - chunk_keys.sort() - print(f"Found {len(chunk_keys)} chunks to combine") - - if not chunk_keys: - print("No chunks found — nothing to reduce.") - return - - # Download and concatenate all chunks - download_dir = Path("/tmp/chunks") - download_dir.mkdir(parents=True, exist_ok=True) - - dfs = [] - - for key in chunk_keys: - gz_path = download_dir / Path(key).name - csv_path = gz_path.with_suffix("") # Remove .gz - print(f"Downloading {key}...") - s3.download_file(s3_bucket, key, str(gz_path)) - - # Decompress - with gzip.open(gz_path, 'rb') as f_in: - with open(csv_path, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - gz_path.unlink() - - df_chunk = pl.read_csv(csv_path) - print(f" Loaded {df_chunk.height} rows from {csv_path.name}") - dfs.append(df_chunk) - - # Free disk space after loading - csv_path.unlink() - - df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame() - print(f"Combined: {df_accumulated.height} rows before dedup") - - # Final global deduplication - df_accumulated = deduplicate_by_signature(df_accumulated) - print(f"After dedup: {df_accumulated.height} rows") - - # Write and upload final result - output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz" - csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv") - gz_output = Path(f"/tmp/{output_name}") - - df_accumulated.write_csv(csv_output) - with open(csv_output, 'rb') as f_in: - with gzip.open(gz_output, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - csv_output.unlink() - - final_key = f"final/{output_name}" - print(f"Uploading to s3://{s3_bucket}/{final_key}") - s3.upload_file(str(gz_output), s3_bucket, final_key) - - print(f"Final output: {df_accumulated.height} records -> {final_key}") - - -if __name__ == "__main__": - main() diff --git a/src/adsb/requirements.reducer.txt b/src/adsb/requirements.reducer.txt deleted file mode 100644 index 29e6bf9..0000000 --- a/src/adsb/requirements.reducer.txt +++ /dev/null @@ -1,2 +0,0 @@ -polars>=1.0 -boto3>=1.34 diff --git a/src/adsb/requirements.worker.txt b/src/adsb/requirements.worker.txt deleted file mode 100644 index cf305a7..0000000 --- a/src/adsb/requirements.worker.txt +++ /dev/null @@ -1,5 +0,0 @@ -polars>=1.0 -pyarrow>=14.0 -orjson>=3.9 -boto3>=1.34 -zstandard>=0.22 diff --git a/src/adsb/worker.py b/src/adsb/worker.py deleted file mode 100644 index 9884ce7..0000000 --- a/src/adsb/worker.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -Map worker: processes a date range chunk, uploads result to S3. - -Environment variables: - START_DATE — inclusive, YYYY-MM-DD - END_DATE — exclusive, YYYY-MM-DD - S3_BUCKET — bucket for intermediate results - RUN_ID — unique run identifier for namespacing S3 keys -""" -import os -import sys -from datetime import datetime, timedelta -from pathlib import Path - -import boto3 -import polars as pl - -from compress_adsb_to_aircraft_data import ( - load_historical_for_day, - deduplicate_by_signature, - COLUMNS, -) - - -def main(): - start_date_str = os.environ["START_DATE"] - end_date_str = os.environ["END_DATE"] - s3_bucket = os.environ["S3_BUCKET"] - run_id = os.environ.get("RUN_ID", "default") - - start_date = datetime.strptime(start_date_str, "%Y-%m-%d") - end_date = datetime.strptime(end_date_str, "%Y-%m-%d") - - total_days = (end_date - start_date).days - print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})") - - dfs = [] - current_date = start_date - - while current_date < end_date: - day_str = current_date.strftime("%Y-%m-%d") - print(f" Loading {day_str}...") - - df_compressed = load_historical_for_day(current_date) - if df_compressed.height == 0: - raise RuntimeError(f"No data found for {day_str}") - - dfs.append(df_compressed) - total_rows = sum(df.height for df in dfs) - print(f" +{df_compressed.height} rows (total: {total_rows})") - - # Delete local cache after each day to save disk in container - cache_dir = Path("data/adsb") - if cache_dir.exists(): - import shutil - shutil.rmtree(cache_dir) - - current_date += timedelta(days=1) - - # Concatenate all days - df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame() - - # Deduplicate within this chunk - df_accumulated = deduplicate_by_signature(df_accumulated) - print(f"After dedup: {df_accumulated.height} rows") - - # Write to local file then upload to S3 - local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv") - df_accumulated.write_csv(local_path) - - # Compress with gzip - import gzip - import shutil - gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz") - with open(local_path, 'rb') as f_in: - with gzip.open(gz_path, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - local_path.unlink() # Remove uncompressed file - - s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz" - print(f"Uploading to s3://{s3_bucket}/{s3_key}") - - s3 = boto3.client("s3") - s3.upload_file(str(gz_path), s3_bucket, s3_key) - print("Done.") - - -if __name__ == "__main__": - main() diff --git a/src/contributions/create_daily_adsbexchange_release.py b/src/contributions/create_daily_adsbexchange_release.py new file mode 100644 index 0000000..1693d05 --- /dev/null +++ b/src/contributions/create_daily_adsbexchange_release.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +""" +Download ADS-B Exchange basic-ac-db.json.gz. + +Usage: + python -m src.contributions.create_daily_adsbexchange_release [--date YYYY-MM-DD] +""" +from __future__ import annotations + +import argparse +import shutil +from datetime import datetime, timezone +from pathlib import Path +from urllib.request import Request, urlopen + +URL = "https://downloads.adsbexchange.com/downloads/basic-ac-db.json.gz" +OUT_ROOT = Path("data/openairframes") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Create daily ADS-B Exchange JSON release") + parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today UTC)") + args = parser.parse_args() + + date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d") + + OUT_ROOT.mkdir(parents=True, exist_ok=True) + + gz_path = OUT_ROOT / f"basic-ac-db_{date_str}.json.gz" + + print(f"Downloading {URL}...") + req = Request(URL, headers={"User-Agent": "openairframes-downloader/1.0"}, method="GET") + with urlopen(req, timeout=300) as r, gz_path.open("wb") as f: + shutil.copyfileobj(r, f) + + print(f"Wrote: {gz_path}") + + +if __name__ == "__main__": + main() diff --git a/src/contributions/create_daily_microtonics_release.py b/src/contributions/create_daily_microtonics_release.py new file mode 100644 index 0000000..007b4e4 --- /dev/null +++ b/src/contributions/create_daily_microtonics_release.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +""" +Download Mictronics aircraft database zip. + +Usage: + python -m src.contributions.create_daily_microtonics_release [--date YYYY-MM-DD] +""" +from __future__ import annotations + +import argparse +import shutil +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from urllib.error import URLError +from urllib.request import Request, urlopen + +URL = "https://www.mictronics.de/aircraft-database/indexedDB_old.php" +OUT_ROOT = Path("data/openairframes") +MAX_RETRIES = 3 +RETRY_DELAY = 30 # seconds + + +def main() -> None: + parser = argparse.ArgumentParser(description="Create daily Mictronics database release") + parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today UTC)") + args = parser.parse_args() + + date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d") + + OUT_ROOT.mkdir(parents=True, exist_ok=True) + + zip_path = OUT_ROOT / f"mictronics-db_{date_str}.zip" + + for attempt in range(1, MAX_RETRIES + 1): + try: + print(f"Downloading {URL} (attempt {attempt}/{MAX_RETRIES})...") + req = Request(URL, headers={"User-Agent": "Mozilla/5.0 (compatible; openairframes-downloader/1.0)"}, method="GET") + with urlopen(req, timeout=120) as r, zip_path.open("wb") as f: + shutil.copyfileobj(r, f) + print(f"Wrote: {zip_path}") + return + except (URLError, TimeoutError) as e: + print(f"Attempt {attempt} failed: {e}") + if attempt < MAX_RETRIES: + print(f"Retrying in {RETRY_DELAY} seconds...") + time.sleep(RETRY_DELAY) + else: + print("All retries exhausted. Mictronics download failed.") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/create_daily_adsb_release.py b/src/create_daily_adsb_release.py deleted file mode 100644 index 0a5137e..0000000 --- a/src/create_daily_adsb_release.py +++ /dev/null @@ -1,84 +0,0 @@ -from pathlib import Path -from datetime import datetime, timezone, timedelta -import sys - -import polars as pl - -# Add adsb directory to path -sys.path.insert(0, str(Path(__file__).parent / "adsb")) # TODO: Fix this hacky path manipulation - -from adsb.compress_adsb_to_aircraft_data import ( - load_historical_for_day, - concat_compressed_dfs, - get_latest_aircraft_adsb_csv_df, -) - -if __name__ == '__main__': - # Get yesterday's date (data for the previous day) - day = datetime.now(timezone.utc) - timedelta(days=1) - - # Find a day with complete data - max_attempts = 2 # Don't look back more than a week - for attempt in range(max_attempts): - date_str = day.strftime("%Y-%m-%d") - print(f"Processing ADS-B data for {date_str}") - - print("Loading new ADS-B data...") - df_new = load_historical_for_day(day) - if df_new.height == 0: - day = day - timedelta(days=1) - continue - max_time = df_new['time'].max() - if max_time is not None: - # Handle timezone - max_time_dt = max_time - if hasattr(max_time_dt, 'replace'): - max_time_dt = max_time_dt.replace(tzinfo=timezone.utc) - - end_of_day = day.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) - timedelta(minutes=5) - - # Convert polars datetime to python datetime if needed - if isinstance(max_time_dt, datetime): - if max_time_dt.replace(tzinfo=timezone.utc) >= end_of_day: - break - else: - # Polars returns python datetime already - if max_time >= day.replace(hour=23, minute=54, second=59): - break - - print(f"WARNING: Latest data time is {max_time}, which is more than 5 minutes before end of day.") - day = day - timedelta(days=1) - else: - raise RuntimeError(f"Could not find complete data in the last {max_attempts} days") - - try: - # Get the latest release data - print("Downloading latest ADS-B release...") - df_base, start_date_str = get_latest_aircraft_adsb_csv_df() - # Combine with historical data - print("Combining with historical data...") - df_combined = concat_compressed_dfs(df_base, df_new) - except Exception as e: - print(f"Error downloading latest ADS-B release: {e}") - df_combined = df_new - start_date_str = date_str - - # Sort by time for consistent ordering - df_combined = df_combined.sort('time') - - # Convert any list columns to strings for CSV compatibility - for col in df_combined.columns: - if df_combined[col].dtype == pl.List: - df_combined = df_combined.with_columns( - pl.col(col).list.join(",").alias(col) - ) - - # Save the result - OUT_ROOT = Path("data/openairframes") - OUT_ROOT.mkdir(parents=True, exist_ok=True) - - output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv" - df_combined.write_csv(output_file) - - print(f"Saved: {output_file}") - print(f"Total aircraft: {df_combined.height}") diff --git a/src/derive_from_faa_master_txt.py b/src/derive_from_faa_master_txt.py index ea4d167..fd98b05 100644 --- a/src/derive_from_faa_master_txt.py +++ b/src/derive_from_faa_master_txt.py @@ -47,6 +47,9 @@ def convert_faa_master_txt_to_df(zip_path: Path, date: str): # Convert all NaN to empty strings df = df.fillna("") + # The FAA parser can produce the literal string "None" for missing values; + # replace those so they match the empty-string convention used everywhere else. + df = df.replace("None", "") return df @@ -84,8 +87,8 @@ def concat_faa_historical_df(df_base, df_new): # Convert to string val_str = str(val).strip() - # Handle empty strings - if val_str == "" or val_str == "nan": + # Handle empty strings and null-like literals + if val_str == "" or val_str == "nan" or val_str == "None": return "" # Check if it looks like a list representation (starts with [ ) diff --git a/src/get_latest_release.py b/src/get_latest_release.py index b29b82a..3283b06 100644 --- a/src/get_latest_release.py +++ b/src/get_latest_release.py @@ -119,6 +119,7 @@ def download_latest_aircraft_csv( Returns: Path to the downloaded file """ + output_dir = Path(output_dir) assets = get_latest_release_assets(repo, github_token=github_token) try: asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$") @@ -164,26 +165,50 @@ def download_latest_aircraft_adsb_csv( Returns: Path to the downloaded file """ + output_dir = Path(output_dir) assets = get_latest_release_assets(repo, github_token=github_token) - asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv$") + asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$") saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") return saved_to - +import polars as pl def get_latest_aircraft_adsb_csv_df(): + """Download and load the latest ADS-B CSV from GitHub releases.""" + import re + csv_path = download_latest_aircraft_adsb_csv() - import pandas as pd - df = pd.read_csv(csv_path) - df = df.fillna("") - # Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv + df = pl.read_csv(csv_path, null_values=[""]) + + # Parse time column: values like "2025-12-31T00:00:00.040" or "2025-05-11T15:15:50.540+0000" + # Try with timezone first (convert to naive), then without timezone + df = df.with_columns( + pl.col("time").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%S%.f%z", strict=False) + .dt.replace_time_zone(None) # Convert to naive datetime first + .fill_null(pl.col("time").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%S%.f", strict=False)) + ) + + # Cast dbFlags and year to strings to match the schema used in compress functions + for col in ['dbFlags', 'year']: + if col in df.columns: + df = df.with_columns(pl.col(col).cast(pl.Utf8)) + + # Fill nulls with empty strings for string columns + for col in df.columns: + if df[col].dtype == pl.Utf8: + df = df.with_columns(pl.col(col).fill_null("")) + + # Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz] match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) if not match: raise ValueError(f"Could not extract date from filename: {csv_path.name}") date_str = match.group(1) + print(df.columns) + print(df.dtypes) return df, date_str + if __name__ == "__main__": download_latest_aircraft_csv()