From 4015a5fcf168a1892ea56662c47cd9e863266318 Mon Sep 17 00:00:00 2001 From: ggman12 Date: Thu, 12 Feb 2026 10:52:42 -0500 Subject: [PATCH] OpenAirframes 1.0 --- .../ISSUE_TEMPLATE/community_submission.yaml | 42 +- .../approve-community-submission.yaml | 3 +- .github/workflows/historical-adsb.yaml | 99 ++- ....yaml => openairframes-daily-release.yaml} | 101 ++- .github/workflows/process-historical-faa.yaml | 171 +++++ .github/workflows/update-community-prs.yaml | 77 +++ .../validate-community-submission.yaml | 18 +- LICENSE | 2 +- README.md | 50 +- infra/app.py | 11 - infra/cdk.json | 3 - infra/requirements.txt | 2 - infra/stack.py | 213 ------ notebooks/planequery_adsb_read.ipynb | 640 ------------------ requirements.txt | 1 + schemas/community_submission.v1.schema.json | 69 +- src/adsb/combine_chunks_to_csv.py | 33 +- src/adsb/compress_adsb_to_aircraft_data.py | 6 +- src/adsb/download_adsb_data_to_parquet.py | 18 +- src/adsb/reducer.py | 4 +- src/contributions/approve_submission.py | 86 ++- .../create_daily_community_release.py | 12 +- src/contributions/read_community_data.py | 53 +- src/contributions/regenerate_pr_schema.py | 66 ++ src/contributions/schema.py | 124 +++- src/contributions/update_schema.py | 154 +++++ src/contributions/validate_submission.py | 78 +++ ...elease.py => create_daily_adsb_release.py} | 4 +- src/create_daily_faa_release.py | 49 ++ ...reate_daily_planequery_aircraft_release.py | 33 - src/derive_from_faa_master_txt.py | 10 +- ...craft_release.py => get_latest_release.py} | 28 +- trigger_pipeline.py | 90 --- 33 files changed, 1212 insertions(+), 1138 deletions(-) rename .github/workflows/{planequery-aircraft-daily-release.yaml => openairframes-daily-release.yaml} (71%) create mode 100644 .github/workflows/process-historical-faa.yaml create mode 100644 .github/workflows/update-community-prs.yaml delete mode 100644 infra/app.py delete mode 100644 infra/cdk.json delete mode 100644 infra/requirements.txt delete mode 100644 infra/stack.py delete mode 100644 notebooks/planequery_adsb_read.ipynb create mode 100644 src/contributions/regenerate_pr_schema.py create mode 100644 src/contributions/update_schema.py rename src/{create_daily_planequery_aircraft_adsb_release.py => create_daily_adsb_release.py} (95%) create mode 100644 src/create_daily_faa_release.py delete mode 100644 src/create_daily_planequery_aircraft_release.py rename src/{get_latest_planequery_aircraft_release.py => get_latest_release.py} (82%) delete mode 100644 trigger_pipeline.py diff --git a/.github/ISSUE_TEMPLATE/community_submission.yaml b/.github/ISSUE_TEMPLATE/community_submission.yaml index 938843d..8ee54c5 100644 --- a/.github/ISSUE_TEMPLATE/community_submission.yaml +++ b/.github/ISSUE_TEMPLATE/community_submission.yaml @@ -13,29 +13,42 @@ body: **Rules (enforced on review/automation):** - Each object must include **at least one** of: - `registration_number` - - `transponder_code_hex` (6 hex chars) - - `planequery_airframe_id` + - `transponder_code_hex` (6 uppercase hex chars, e.g., `ABC123`) + - `openairframes_id` - Your contributor name (entered below) will be applied to all objects. - `contributor_uuid` is derived from your GitHub account automatically. - `creation_timestamp` is created by the system (you may omit it). + **Optional date scoping:** + - `start_date` - When the tags become valid (ISO 8601: `YYYY-MM-DD`) + - `end_date` - When the tags stop being valid (ISO 8601: `YYYY-MM-DD`) + **Example: single object** ```json { - "transponder_code_hex": "a1b2c3" + "registration_number": "N12345", + "tags": {"owner": "John Doe"}, + "start_date": "2025-01-01" } ``` - **Example: multiple objects (array)** ```json [ - { - "registration_number": "N123AB" - }, - { - "planequery_airframe_id": "cessna|172s|12345", - "transponder_code_hex": "0f1234" - } + { + "registration_number": "N12345", + "tags": {"internet": "starlink"}, + "start_date": "2025-05-01" + }, + { + "registration_number": "N12345", + "tags": {"owner": "John Doe"}, + "start_date": "2025-01-01", + "end_date": "2025-07-20" + }, + { + "transponder_code_hex": "ABC123", + "tags": {"internet": "viasat", "owner": "John Doe"} + } ] ``` @@ -52,9 +65,11 @@ body: id: submission_json attributes: label: Submission JSON - description: Paste either one JSON object or an array of JSON objects. Must be valid JSON. Do not include contributor_name or contributor_uuid in your JSON. + description: | + Paste JSON directly, OR drag-and-drop a .json file here. + Must be valid JSON. Do not include contributor_name or contributor_uuid. placeholder: | - Paste JSON here... + Paste JSON here, or drag-and-drop a .json file... validations: required: true @@ -62,6 +77,5 @@ body: id: notes attributes: label: Notes (optional) - description: Any context, sources, or links that help validate your submission. validations: required: false \ No newline at end of file diff --git a/.github/workflows/approve-community-submission.yaml b/.github/workflows/approve-community-submission.yaml index 19f9fbd..6ce80b3 100644 --- a/.github/workflows/approve-community-submission.yaml +++ b/.github/workflows/approve-community-submission.yaml @@ -38,9 +38,10 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_REPOSITORY: ${{ github.repository }} + ISSUE_BODY: ${{ github.event.issue.body }} run: | python -m src.contributions.approve_submission \ --issue-number ${{ github.event.issue.number }} \ - --issue-body "${{ github.event.issue.body }}" \ + --issue-body "$ISSUE_BODY" \ --author "${{ steps.author.outputs.username }}" \ --author-id ${{ steps.author.outputs.user_id }} diff --git a/.github/workflows/historical-adsb.yaml b/.github/workflows/historical-adsb.yaml index 0bb99a1..d3ce334 100644 --- a/.github/workflows/historical-adsb.yaml +++ b/.github/workflows/historical-adsb.yaml @@ -48,7 +48,7 @@ jobs: matrix: chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }} max-parallel: 3 - fail-fast: false + fail-fast: true steps: - name: Checkout uses: actions/checkout@v4 @@ -74,21 +74,51 @@ jobs: env: START_DATE: ${{ matrix.chunk.start_date }} END_DATE: ${{ matrix.chunk.end_date }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | python -m src.adsb.download_and_list_icaos --start-date "$START_DATE" --end-date "$END_DATE" ls -lah data/output/ - - name: Create tar of extracted data + - name: Create tar of extracted data and split into chunks run: | cd data/output - tar -cf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt 2>/dev/null || echo "Some files may not exist" - ls -lah extracted_data.tar || echo "No tar created" + echo "=== Disk space before tar ===" + df -h . + echo "=== Files to tar ===" + ls -lah *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt 2>/dev/null || echo "No files found" + + # Create tar with explicit error checking + if ls *-planes-readsb-prod-0.tar_0 1>/dev/null 2>&1; then + tar -cvf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt + echo "=== Tar file created ===" + ls -lah extracted_data.tar + # Verify tar integrity + tar -tf extracted_data.tar > /dev/null && echo "Tar integrity check passed" || { echo "Tar integrity check FAILED"; exit 1; } + + # Create checksum of the FULL tar before splitting (for verification after reassembly) + echo "=== Creating checksum of full tar ===" + sha256sum extracted_data.tar > full_tar.sha256 + cat full_tar.sha256 + + # Split into 500MB chunks to avoid artifact upload issues + echo "=== Splitting tar into 500MB chunks ===" + mkdir -p tar_chunks + split -b 500M extracted_data.tar tar_chunks/extracted_data.tar.part_ + rm extracted_data.tar + mv full_tar.sha256 tar_chunks/ + + echo "=== Chunks created ===" + ls -lah tar_chunks/ + else + echo "ERROR: No extracted directories found, cannot create tar" + exit 1 + fi - - name: Upload extracted data + - name: Upload extracted data chunks uses: actions/upload-artifact@v4 with: name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }} - path: data/output/extracted_data.tar + path: data/output/tar_chunks/ retention-days: 1 compression-level: 0 if-no-files-found: warn @@ -97,7 +127,7 @@ jobs: needs: [generate-matrix, adsb-extract] runs-on: ubuntu-24.04-arm strategy: - fail-fast: false + fail-fast: true matrix: chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }} icao_chunk: [0, 1, 2, 3] @@ -126,21 +156,48 @@ jobs: uses: actions/download-artifact@v4 with: name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }} - path: data/output/ - continue-on-error: true + path: data/output/tar_chunks/ - - name: Extract tar + - name: Reassemble and extract tar id: extract run: | cd data/output - if [ -f extracted_data.tar ]; then - tar -xf extracted_data.tar + if [ -d tar_chunks ] && ls tar_chunks/extracted_data.tar.part_* 1>/dev/null 2>&1; then + echo "=== Chunk files info ===" + ls -lah tar_chunks/ + + cd tar_chunks + + # Reassemble tar with explicit sorting + echo "=== Reassembling tar file ===" + ls -1 extracted_data.tar.part_?? | sort | while read part; do + echo "Appending $part..." + cat "$part" >> ../extracted_data.tar + done + cd .. + + echo "=== Reassembled tar file info ===" + ls -lah extracted_data.tar + + # Verify checksum of reassembled tar matches original + echo "=== Verifying reassembled tar checksum ===" + echo "Original checksum:" + cat tar_chunks/full_tar.sha256 + echo "Reassembled checksum:" + sha256sum extracted_data.tar + sha256sum -c tar_chunks/full_tar.sha256 || { echo "ERROR: Reassembled tar checksum mismatch - data corrupted during transfer"; exit 1; } + echo "Checksum verified - data integrity confirmed" + + rm -rf tar_chunks + + echo "=== Extracting ===" + tar -xvf extracted_data.tar rm extracted_data.tar echo "has_data=true" >> "$GITHUB_OUTPUT" echo "=== Contents of data/output ===" ls -lah else - echo "No extracted_data.tar found" + echo "No tar chunks found" echo "has_data=false" >> "$GITHUB_OUTPUT" fi @@ -188,22 +245,24 @@ jobs: - name: Debug downloaded files run: | + echo "=== Disk space before processing ===" + df -h echo "=== Listing data/output/adsb_chunks/ ===" - find data/output/adsb_chunks/ -type f 2>/dev/null | head -50 || echo "No files found" - echo "=== Looking for parquet files ===" - find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found" + find data/output/adsb_chunks/ -type f 2>/dev/null | wc -l + echo "=== Total parquet size ===" + du -sh data/output/adsb_chunks/ || echo "No chunks dir" - name: Combine chunks to CSV env: START_DATE: ${{ needs.generate-matrix.outputs.global_start }} END_DATE: ${{ needs.generate-matrix.outputs.global_end }} run: | - python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date "$START_DATE" --end-date "$END_DATE" --skip-base - ls -lah data/planequery_aircraft/ + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date "$START_DATE" --end-date "$END_DATE" --skip-base --stream + ls -lah data/openairframes/ - name: Upload final artifact uses: actions/upload-artifact@v4 with: - name: planequery_aircraft_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }} - path: data/planequery_aircraft/*.csv + name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }} + path: data/openairframes/*.csv retention-days: 30 diff --git a/.github/workflows/planequery-aircraft-daily-release.yaml b/.github/workflows/openairframes-daily-release.yaml similarity index 71% rename from .github/workflows/planequery-aircraft-daily-release.yaml rename to .github/workflows/openairframes-daily-release.yaml index 00838cb..7f684bf 100644 --- a/.github/workflows/planequery-aircraft-daily-release.yaml +++ b/.github/workflows/openairframes-daily-release.yaml @@ -1,10 +1,15 @@ -name: planequery-aircraft Daily Release +name: OpenAirframes Daily Release on: schedule: # 6:00pm UTC every day - runs on default branch, triggers both - cron: "0 06 * * *" workflow_dispatch: + inputs: + date: + description: 'Date to process (YYYY-MM-DD format, default: yesterday)' + required: false + type: string permissions: contents: write @@ -22,7 +27,7 @@ jobs: await github.rest.actions.createWorkflowDispatch({ owner: context.repo.owner, repo: context.repo.repo, - workflow_id: 'planequery-aircraft-daily-release.yaml', + workflow_id: 'openairframes-daily-release.yaml', ref: 'main' }); @@ -33,7 +38,7 @@ jobs: await github.rest.actions.createWorkflowDispatch({ owner: context.repo.owner, repo: context.repo.repo, - workflow_id: 'planequery-aircraft-daily-release.yaml', + workflow_id: 'openairframes-daily-release.yaml', ref: 'develop' }); @@ -58,16 +63,16 @@ jobs: - name: Run FAA release script run: | - python src/create_daily_planequery_aircraft_release.py + python src/create_daily_faa_release.py ${{ inputs.date && format('--date {0}', inputs.date) || '' }} ls -lah data/faa_releasable - ls -lah data/planequery_aircraft + ls -lah data/openairframes - name: Upload FAA artifacts uses: actions/upload-artifact@v4 with: name: faa-release path: | - data/planequery_aircraft/planequery_aircraft_faa_*.csv + data/openairframes/openairframes_faa_*.csv data/faa_releasable/ReleasableAircraft_*.zip retention-days: 1 @@ -93,8 +98,10 @@ jobs: pip install -r requirements.txt - name: Download and extract ADS-B data + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | - python -m src.adsb.download_and_list_icaos + python -m src.adsb.download_and_list_icaos ${{ inputs.date && format('--date {0}', inputs.date) || '' }} ls -lah data/output/ - name: Check manifest exists @@ -164,7 +171,7 @@ jobs: - name: Process chunk ${{ matrix.chunk }} run: | - python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4 + python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4 ${{ inputs.date && format('--date {0}', inputs.date) || '' }} mkdir -p data/output/adsb_chunks ls -lah data/output/adsb_chunks/ || echo "No chunks created" @@ -213,14 +220,14 @@ jobs: run: | mkdir -p data/output/adsb_chunks ls -lah data/output/adsb_chunks/ || echo "Directory empty or does not exist" - python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks - ls -lah data/planequery_aircraft/ + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks ${{ inputs.date && format('--date {0}', inputs.date) || '' }} + ls -lah data/openairframes/ - name: Upload ADS-B artifacts uses: actions/upload-artifact@v4 with: name: adsb-release - path: data/planequery_aircraft/planequery_aircraft_adsb_*.csv + path: data/openairframes/openairframes_adsb_*.csv retention-days: 1 build-community: @@ -245,13 +252,13 @@ jobs: - name: Run Community release script run: | python -m src.contributions.create_daily_community_release - ls -lah data/planequery_aircraft + ls -lah data/openairframes - name: Upload Community artifacts uses: actions/upload-artifact@v4 with: name: community-release - path: data/planequery_aircraft/planequery_aircraft_community_*.csv + path: data/openairframes/openairframes_community_*.csv retention-days: 1 create-release: @@ -259,6 +266,13 @@ jobs: needs: [build-faa, adsb-reduce, build-community] if: github.event_name != 'schedule' steps: + - name: Checkout for gh CLI + uses: actions/checkout@v4 + with: + sparse-checkout: | + .github + sparse-checkout-cone-mode: false + - name: Download FAA artifacts uses: actions/download-artifact@v4 with: @@ -277,6 +291,17 @@ jobs: name: community-release path: artifacts/community + - name: Debug artifact structure + run: | + echo "=== Full artifacts tree ===" + find artifacts -type f 2>/dev/null || echo "No files found in artifacts" + echo "=== FAA artifacts ===" + find artifacts/faa -type f 2>/dev/null || echo "No files found in artifacts/faa" + echo "=== ADS-B artifacts ===" + find artifacts/adsb -type f 2>/dev/null || echo "No files found in artifacts/adsb" + echo "=== Community artifacts ===" + find artifacts/community -type f 2>/dev/null || echo "No files found in artifacts/community" + - name: Prepare release metadata id: meta run: | @@ -288,16 +313,38 @@ jobs: elif [ "$BRANCH_NAME" = "develop" ]; then BRANCH_SUFFIX="-develop" fi - TAG="planequery-aircraft-${DATE}${BRANCH_SUFFIX}" + TAG="openairframes-${DATE}${BRANCH_SUFFIX}" - # Find files from artifacts - CSV_FILE_FAA=$(ls artifacts/faa/data/planequery_aircraft/planequery_aircraft_faa_*.csv | head -1) + # Find files from artifacts using find (handles nested structures) + CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1) + CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1) + CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1) + ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1) + + # Validate required files exist + MISSING_FILES="" + if [ -z "$CSV_FILE_FAA" ] || [ ! -f "$CSV_FILE_FAA" ]; then + MISSING_FILES="$MISSING_FILES FAA_CSV" + fi + if [ -z "$CSV_FILE_ADSB" ] || [ ! -f "$CSV_FILE_ADSB" ]; then + MISSING_FILES="$MISSING_FILES ADSB_CSV" + fi + if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then + MISSING_FILES="$MISSING_FILES FAA_ZIP" + fi + + if [ -n "$MISSING_FILES" ]; then + echo "ERROR: Missing required release files:$MISSING_FILES" + echo "FAA CSV: $CSV_FILE_FAA" + echo "ADSB CSV: $CSV_FILE_ADSB" + echo "ZIP: $ZIP_FILE" + exit 1 + fi + + # Get basenames for display CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA") - CSV_FILE_ADSB=$(ls artifacts/adsb/planequery_aircraft_adsb_*.csv | head -1) CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB") - CSV_FILE_COMMUNITY=$(ls artifacts/community/planequery_aircraft_community_*.csv 2>/dev/null | head -1 || echo "") CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "") - ZIP_FILE=$(ls artifacts/faa/data/faa_releasable/ReleasableAircraft_*.zip | head -1) ZIP_BASENAME=$(basename "$ZIP_FILE") echo "date=$DATE" >> "$GITHUB_OUTPUT" @@ -310,13 +357,27 @@ jobs: echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT" echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT" echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT" - echo "name=planequery-aircraft snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT" + echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT" + + echo "Found files:" + echo " FAA CSV: $CSV_FILE_FAA" + echo " ADSB CSV: $CSV_FILE_ADSB" + echo " Community CSV: $CSV_FILE_COMMUNITY" + echo " ZIP: $ZIP_FILE" + + - name: Delete existing release if exists + run: | + echo "Attempting to delete release: ${{ steps.meta.outputs.tag }}" + gh release delete "${{ steps.meta.outputs.tag }}" --yes --cleanup-tag || echo "No existing release to delete" + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Create GitHub Release and upload assets uses: softprops/action-gh-release@v2 with: tag_name: ${{ steps.meta.outputs.tag }} name: ${{ steps.meta.outputs.name }} + fail_on_unmatched_files: true body: | Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}. diff --git a/.github/workflows/process-historical-faa.yaml b/.github/workflows/process-historical-faa.yaml new file mode 100644 index 0000000..a5cdc4e --- /dev/null +++ b/.github/workflows/process-historical-faa.yaml @@ -0,0 +1,171 @@ +name: Process Historical FAA Data + +on: + workflow_dispatch: # Manual trigger + +jobs: + generate-matrix: + runs-on: ubuntu-latest + outputs: + matrix: ${{ steps.set-matrix.outputs.matrix }} + steps: + - name: Generate date ranges + id: set-matrix + run: | + python3 << 'EOF' + import json + from datetime import datetime, timedelta + + start = datetime(2023, 8, 16) + end = datetime(2026, 1, 1) + + ranges = [] + current = start + + # Process in 4-day chunks + while current < end: + chunk_end = current + timedelta(days=4) + # Don't go past the end date + if chunk_end > end: + chunk_end = end + + ranges.append({ + "since": current.strftime("%Y-%m-%d"), + "until": chunk_end.strftime("%Y-%m-%d") + }) + + current = chunk_end + + print(f"::set-output name=matrix::{json.dumps(ranges)}") + EOF + + clone-faa-repo: + runs-on: ubuntu-latest + steps: + - name: Cache FAA repository + id: cache-faa-repo + uses: actions/cache@v4 + with: + path: data/scrape-faa-releasable-aircraft + key: faa-repo-v1 + + - name: Clone FAA repository + if: steps.cache-faa-repo.outputs.cache-hit != 'true' + run: | + mkdir -p data + git clone https://github.com/simonw/scrape-faa-releasable-aircraft data/scrape-faa-releasable-aircraft + echo "Repository cloned successfully" + + process-chunk: + needs: [generate-matrix, clone-faa-repo] + runs-on: ubuntu-latest + strategy: + max-parallel: 5 # Process 5 chunks at a time + matrix: + range: ${{ fromJson(needs.generate-matrix.outputs.matrix) }} + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Restore FAA repository cache + uses: actions/cache/restore@v4 + with: + path: data/scrape-faa-releasable-aircraft + key: faa-repo-v1 + fail-on-cache-miss: true + + - name: Install dependencies + run: | + pip install -r requirements.txt + + - name: Process chunk ${{ matrix.range.since }} to ${{ matrix.range.until }} + run: | + python src/get_historical_faa.py "${{ matrix.range.since }}" "${{ matrix.range.until }}" + + - name: Upload CSV artifact + uses: actions/upload-artifact@v4 + with: + name: csv-${{ matrix.range.since }}-to-${{ matrix.range.until }} + path: data/faa_releasable_historical/*.csv + retention-days: 1 + + create-release: + needs: process-chunk + runs-on: ubuntu-latest + permissions: + contents: write + steps: + - name: Download all artifacts + uses: actions/download-artifact@v4 + with: + path: artifacts + + - name: Prepare release files + run: | + mkdir -p release-files + find artifacts -name "*.csv" -exec cp {} release-files/ \; + ls -lh release-files/ + + - name: Create Release + uses: softprops/action-gh-release@v1 + with: + tag_name: historical-faa-${{ github.run_number }} + name: Historical FAA Data Release ${{ github.run_number }} + body: | + Automated release of historical FAA aircraft data + Processing period: 2023-08-16 to 2026-01-01 + Generated: ${{ github.event.repository.updated_at }} + files: release-files/*.csv + draft: false + prerelease: false + + concatenate-and-release: + needs: process-chunk + runs-on: ubuntu-latest + permissions: + contents: write + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + pip install -r requirements.txt + + - name: Download all artifacts + uses: actions/download-artifact@v4 + with: + path: artifacts + + - name: Prepare CSVs for concatenation + run: | + mkdir -p data/faa_releasable_historical + find artifacts -name "*.csv" -exec cp {} data/faa_releasable_historical/ \; + ls -lh data/faa_releasable_historical/ + + - name: Concatenate all CSVs + run: | + python scripts/concat_csvs.py + + - name: Create Combined Release + uses: softprops/action-gh-release@v1 + with: + tag_name: historical-faa-combined-${{ github.run_number }} + name: Historical FAA Data Combined Release ${{ github.run_number }} + body: | + Combined historical FAA aircraft data (all chunks concatenated) + Processing period: 2023-08-16 to 2026-01-01 + Generated: ${{ github.event.repository.updated_at }} + files: data/openairframes/*.csv + draft: false + prerelease: false \ No newline at end of file diff --git a/.github/workflows/update-community-prs.yaml b/.github/workflows/update-community-prs.yaml new file mode 100644 index 0000000..711df76 --- /dev/null +++ b/.github/workflows/update-community-prs.yaml @@ -0,0 +1,77 @@ +name: Update Community PRs After Merge + +on: + push: + branches: [main] + paths: + - 'community/**' + - 'schemas/community_submission.v1.schema.json' + +permissions: + contents: write + pull-requests: write + +jobs: + update-open-prs: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: pip install jsonschema + + - name: Find and update open community PRs + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + # Get list of open community PRs + prs=$(gh pr list --label community --state open --json number,headRefName --jq '.[] | "\(.number) \(.headRefName)"') + + if [ -z "$prs" ]; then + echo "No open community PRs found" + exit 0 + fi + + echo "$prs" | while read pr_number branch_name; do + echo "Processing PR #$pr_number (branch: $branch_name)" + + # Checkout PR branch + git fetch origin "$branch_name" + git checkout "$branch_name" + + # Merge main into PR branch + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + + if git merge origin/main -m "Merge main to update schema"; then + # Regenerate schema for this PR's submission (adds any new tags) + python -m src.contributions.regenerate_pr_schema || true + + # If there are changes, commit and push + if [ -n "$(git status --porcelain schemas/)" ]; then + git add schemas/ + git commit -m "Update schema with new tags" + git push origin "$branch_name" + echo " Updated PR #$pr_number with schema changes" + else + git push origin "$branch_name" + echo " Merged main into PR #$pr_number" + fi + else + echo " Merge conflict in PR #$pr_number, adding comment" + gh pr comment "$pr_number" --body $'⚠️ **Merge Conflict**\n\nAnother community submission was merged and this PR has conflicts.\n\nA maintainer may need to:\n1. Close this PR\n2. Remove the `approved` label from the original issue\n3. Re-add the `approved` label to regenerate the PR' + git merge --abort + fi + fi + + git checkout main + done diff --git a/.github/workflows/validate-community-submission.yaml b/.github/workflows/validate-community-submission.yaml index e217401..ed2973d 100644 --- a/.github/workflows/validate-community-submission.yaml +++ b/.github/workflows/validate-community-submission.yaml @@ -4,6 +4,9 @@ on: issues: types: [opened, edited] +permissions: + issues: write + jobs: validate: if: contains(github.event.issue.labels.*.name, 'submission') @@ -20,11 +23,24 @@ jobs: - name: Install dependencies run: pip install jsonschema + - name: Debug issue body + run: | + echo "=== Issue Body ===" + cat << 'ISSUE_BODY_EOF' + ${{ github.event.issue.body }} + ISSUE_BODY_EOF + + - name: Save issue body to file + run: | + cat << 'ISSUE_BODY_EOF' > /tmp/issue_body.txt + ${{ github.event.issue.body }} + ISSUE_BODY_EOF + - name: Validate submission env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_REPOSITORY: ${{ github.repository }} run: | python -m src.contributions.validate_submission \ - --issue-body "${{ github.event.issue.body }}" \ + --issue-body-file /tmp/issue_body.txt \ --issue-number ${{ github.event.issue.number }} diff --git a/LICENSE b/LICENSE index 3d4fbf1..36310d8 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2026 PlaneQuery +Copyright (c) 2026 OpenAirframes Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index f78157f..3141ccd 100644 --- a/README.md +++ b/README.md @@ -1 +1,49 @@ -Downloads [`https://registry.faa.gov/database/ReleasableAircraft.zip`](https://registry.faa.gov/database/ReleasableAircraft.zip). Creates a daily GitHub Release at 06:00 UTC containing the unaltered `ReleasableAircraft.zip` and a derived CSV file with all data from FAA database since 2023-08-16. The FAA database updates daily at 05:30 UTC. \ No newline at end of file +# OpenAirframes.org + +OpenAirframes.org is an open-source, community-driven airframes database. + +The data includes: +- Registration information from Civil Aviation Authorities (FAA) +- Airline data (e.g., Air France) +- Community contributions such as ownership details, military aircraft info, photos, and more + +--- + +## For Users + +A daily release is created at **06:00 UTC** and includes: + +- **openairframes_community.csv** + All community submissions + +- **openairframes_faa.csv** + All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB) + +- **openairframes_adsb.csv** + Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present. The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB). + +- **ReleasableAircraft_{date}.zip** + A daily snapshot of the FAA database, which updates at **05:30 UTC** + +--- + +## For Contributors + +Submit data via a [GitHub Issue](https://github.com/PlaneQuery/OpenAirframes/issues/new?template=community_submission.yaml) with your preferred attribution. Once approved, it will appear in the daily release. A leaderboard will be available in the future. +All data is valuable. Examples include: +- Celebrity ownership (with citations) +- Photos +- Internet capability +- Military aircraft information +- Unique facts (e.g., an airframe that crashed, performs aerobatics, etc.) + +Please try to follow the submission formatting guidelines. If you are struggling with them, that is fine—submit your data anyway and it will be formatted for you. + +--- + +## For Developers +All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include: +- Web UI +- Additional export formats in the daily release +- Data fusion from multiple sources in the daily release +- Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs diff --git a/infra/app.py b/infra/app.py deleted file mode 100644 index 83509f6..0000000 --- a/infra/app.py +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env python3 -import os -import aws_cdk as cdk -from stack import AdsbProcessingStack - -app = cdk.App() -AdsbProcessingStack(app, "AdsbProcessingStack", env=cdk.Environment( - account=os.environ["CDK_DEFAULT_ACCOUNT"], - region=os.environ["CDK_DEFAULT_REGION"], -)) -app.synth() diff --git a/infra/cdk.json b/infra/cdk.json deleted file mode 100644 index b4baa10..0000000 --- a/infra/cdk.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "app": "python3 app.py" -} diff --git a/infra/requirements.txt b/infra/requirements.txt deleted file mode 100644 index 32b3387..0000000 --- a/infra/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -aws-cdk-lib>=2.170.0 -constructs>=10.0.0 diff --git a/infra/stack.py b/infra/stack.py deleted file mode 100644 index a54bd79..0000000 --- a/infra/stack.py +++ /dev/null @@ -1,213 +0,0 @@ -import aws_cdk as cdk -from aws_cdk import ( - Stack, - Duration, - RemovalPolicy, - aws_s3 as s3, - aws_ecs as ecs, - aws_ec2 as ec2, - aws_ecr_assets, - aws_iam as iam, - aws_logs as logs, - aws_stepfunctions as sfn, - aws_stepfunctions_tasks as sfn_tasks, -) -from constructs import Construct -from pathlib import Path - - -class AdsbProcessingStack(Stack): - def __init__(self, scope: Construct, id: str, **kwargs): - super().__init__(scope, id, **kwargs) - - # --- S3 bucket for intermediate and final results --- - bucket = s3.Bucket( - self, "ResultsBucket", - bucket_name="planequery-aircraft-dev", - removal_policy=RemovalPolicy.DESTROY, - auto_delete_objects=True, - lifecycle_rules=[ - s3.LifecycleRule( - prefix="intermediate/", - expiration=Duration.days(7), - ) - ], - ) - - # --- Use default VPC (no additional cost) --- - vpc = ec2.Vpc.from_lookup( - self, "Vpc", - is_default=True, - ) - - # --- ECS Cluster --- - cluster = ecs.Cluster( - self, "Cluster", - vpc=vpc, - container_insights=True, - ) - - # --- Log group --- - log_group = logs.LogGroup( - self, "LogGroup", - log_group_name="/adsb-processing", - removal_policy=RemovalPolicy.DESTROY, - retention=logs.RetentionDays.TWO_WEEKS, - ) - - # --- Docker images (built from local Dockerfiles) --- - adsb_dir = str(Path(__file__).parent.parent / "src" / "adsb") - - worker_image = ecs.ContainerImage.from_asset( - adsb_dir, - file="Dockerfile.worker", - platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64, - ) - reducer_image = ecs.ContainerImage.from_asset( - adsb_dir, - file="Dockerfile.reducer", - platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64, - ) - - # --- Task role (shared) --- - task_role = iam.Role( - self, "TaskRole", - assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"), - ) - bucket.grant_read_write(task_role) - - # --- MAP: worker task definition --- - map_task_def = ecs.FargateTaskDefinition( - self, "MapTaskDef", - cpu=4096, # 4 vCPU - memory_limit_mib=30720, # 30 GB - task_role=task_role, - runtime_platform=ecs.RuntimePlatform( - cpu_architecture=ecs.CpuArchitecture.ARM64, - operating_system_family=ecs.OperatingSystemFamily.LINUX, - ), - ) - map_container = map_task_def.add_container( - "worker", - image=worker_image, - logging=ecs.LogDrivers.aws_logs( - stream_prefix="map", - log_group=log_group, - ), - environment={ - "S3_BUCKET": bucket.bucket_name, - }, - ) - - # --- REDUCE: reducer task definition --- - reduce_task_def = ecs.FargateTaskDefinition( - self, "ReduceTaskDef", - cpu=4096, # 4 vCPU - memory_limit_mib=30720, # 30 GB — must hold full year in memory - task_role=task_role, - runtime_platform=ecs.RuntimePlatform( - cpu_architecture=ecs.CpuArchitecture.ARM64, - operating_system_family=ecs.OperatingSystemFamily.LINUX, - ), - ) - reduce_container = reduce_task_def.add_container( - "reducer", - image=reducer_image, - logging=ecs.LogDrivers.aws_logs( - stream_prefix="reduce", - log_group=log_group, - ), - environment={ - "S3_BUCKET": bucket.bucket_name, - }, - ) - - # --- Step Functions --- - - # Map task: run ECS Fargate for each date chunk - map_ecs_task = sfn_tasks.EcsRunTask( - self, "ProcessChunk", - integration_pattern=sfn.IntegrationPattern.RUN_JOB, - cluster=cluster, - task_definition=map_task_def, - launch_target=sfn_tasks.EcsFargateLaunchTarget( - platform_version=ecs.FargatePlatformVersion.LATEST, - ), - container_overrides=[ - sfn_tasks.ContainerOverride( - container_definition=map_container, - environment=[ - sfn_tasks.TaskEnvironmentVariable( - name="START_DATE", - value=sfn.JsonPath.string_at("$.start_date"), - ), - sfn_tasks.TaskEnvironmentVariable( - name="END_DATE", - value=sfn.JsonPath.string_at("$.end_date"), - ), - sfn_tasks.TaskEnvironmentVariable( - name="RUN_ID", - value=sfn.JsonPath.string_at("$.run_id"), - ), - ], - ) - ], - assign_public_ip=True, - subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), - result_path="$.task_result", - ) - - # Map state — max 3 concurrent workers - map_state = sfn.Map( - self, "FanOutChunks", - items_path="$.chunks", - max_concurrency=3, - result_path="$.map_results", - ) - map_state.item_processor(map_ecs_task) - - # Reduce task: combine all chunk CSVs - reduce_ecs_task = sfn_tasks.EcsRunTask( - self, "ReduceResults", - integration_pattern=sfn.IntegrationPattern.RUN_JOB, - cluster=cluster, - task_definition=reduce_task_def, - launch_target=sfn_tasks.EcsFargateLaunchTarget( - platform_version=ecs.FargatePlatformVersion.LATEST, - ), - container_overrides=[ - sfn_tasks.ContainerOverride( - container_definition=reduce_container, - environment=[ - sfn_tasks.TaskEnvironmentVariable( - name="RUN_ID", - value=sfn.JsonPath.string_at("$.run_id"), - ), - sfn_tasks.TaskEnvironmentVariable( - name="GLOBAL_START_DATE", - value=sfn.JsonPath.string_at("$.global_start_date"), - ), - sfn_tasks.TaskEnvironmentVariable( - name="GLOBAL_END_DATE", - value=sfn.JsonPath.string_at("$.global_end_date"), - ), - ], - ) - ], - assign_public_ip=True, - subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), - ) - - # Chain: fan-out map → reduce - definition = map_state.next(reduce_ecs_task) - - sfn.StateMachine( - self, "Pipeline", - state_machine_name="adsb-map-reduce", - definition_body=sfn.DefinitionBody.from_chainable(definition), - timeout=Duration.hours(48), - ) - - # --- Outputs --- - cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name) - cdk.CfnOutput(self, "StateMachineName", value="adsb-map-reduce") diff --git a/notebooks/planequery_adsb_read.ipynb b/notebooks/planequery_adsb_read.ipynb deleted file mode 100644 index f0d98e0..0000000 --- a/notebooks/planequery_adsb_read.ipynb +++ /dev/null @@ -1,640 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "06ae0319", - "metadata": {}, - "outputs": [], - "source": [ - "import clickhouse_connect\n", - "client = clickhouse_connect.get_client(\n", - " host=os.environ[\"CLICKHOUSE_HOST\"],\n", - " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", - " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", - " secure=True,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "779710f0", - "metadata": {}, - "outputs": [], - "source": [ - "df = client.query_df(\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '2024-01-01 00:00:00' AND time < '2024-01-02 00:00:00'\")\n", - "df_copy = df.copy()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "bf024da8", - "metadata": {}, - "outputs": [], - "source": [ - "# -- military = dbFlags & 1; interesting = dbFlags & 2; PIA = dbFlags & 4; LADD = dbFlags & 8;" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "270607b5", - "metadata": {}, - "outputs": [], - "source": [ - "df = load_raw_adsb_for_day(datetime(2024,1,1))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ac06a30e", - "metadata": {}, - "outputs": [], - "source": [ - "df['aircraft']" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "91edab3e", - "metadata": {}, - "outputs": [], - "source": [ - "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", - "def compress_df(df):\n", - " icao = df.name\n", - " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", - " original_df = df.copy()\n", - " df = df.groupby(\"_signature\", as_index=False).last() # check if it works with both last and first.\n", - " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", - " def get_non_empty_dict(row):\n", - " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", - " \n", - " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", - " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", - " \n", - " # Check if row i's non-empty values are a subset of row j's non-empty values\n", - " def is_subset_of_any(idx):\n", - " row_dict = df.loc[idx, '_non_empty_dict']\n", - " row_count = df.loc[idx, '_non_empty_count']\n", - " \n", - " for other_idx in df.index:\n", - " if idx == other_idx:\n", - " continue\n", - " other_dict = df.loc[other_idx, '_non_empty_dict']\n", - " other_count = df.loc[other_idx, '_non_empty_count']\n", - " \n", - " # Check if all non-empty values in current row match those in other row\n", - " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", - " # If they match and other has more defined columns, current row is redundant\n", - " if other_count > row_count:\n", - " return True\n", - " return False\n", - " \n", - " # Keep rows that are not subsets of any other row\n", - " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", - " df = df[keep_mask]\n", - "\n", - " if len(df) > 1:\n", - " original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n", - " value_counts = original_df[\"_signature\"].value_counts()\n", - " max_signature = value_counts.idxmax()\n", - " df = df[df['_signature'] == max_signature]\n", - "\n", - " df['icao'] = icao\n", - " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", - " return df\n", - "\n", - "# df = df_copy\n", - "# df = df_copy.iloc[0:100000]\n", - "# df = df[df['r'] == \"N4131T\"]\n", - "# df = df[(df['icao'] == \"008081\")]\n", - "# df = df.iloc[0:500]\n", - "df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", - "df = df.drop(columns=['aircraft'])\n", - "df = df.sort_values(['icao', 'time'])\n", - "df[COLUMNS] = df[COLUMNS].fillna('')\n", - "ORIGINAL_COLUMNS = df.columns.tolist()\n", - "df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", - "cols = df_compressed.columns.tolist()\n", - "cols.remove(\"icao\")\n", - "cols.insert(1, \"icao\")\n", - "df_compressed = df_compressed[cols]\n", - "df_compressed" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "efdfcb2c", - "metadata": {}, - "outputs": [], - "source": [ - "df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", - "df[~df['aircraft_category'].isna()]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "495c5025", - "metadata": {}, - "outputs": [], - "source": [ - "# SOME KIND OF MAP REDUCE SYSTEM\n", - "import os\n", - "\n", - "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", - "def compress_df(df):\n", - " icao = df.name\n", - " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", - " \n", - " # Compute signature counts before grouping (avoid copy)\n", - " signature_counts = df[\"_signature\"].value_counts()\n", - " \n", - " df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n", - " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", - " def get_non_empty_dict(row):\n", - " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", - " \n", - " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", - " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", - " \n", - " # Check if row i's non-empty values are a subset of row j's non-empty values\n", - " def is_subset_of_any(idx):\n", - " row_dict = df.loc[idx, '_non_empty_dict']\n", - " row_count = df.loc[idx, '_non_empty_count']\n", - " \n", - " for other_idx in df.index:\n", - " if idx == other_idx:\n", - " continue\n", - " other_dict = df.loc[other_idx, '_non_empty_dict']\n", - " other_count = df.loc[other_idx, '_non_empty_count']\n", - " \n", - " # Check if all non-empty values in current row match those in other row\n", - " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", - " # If they match and other has more defined columns, current row is redundant\n", - " if other_count > row_count:\n", - " return True\n", - " return False\n", - " \n", - " # Keep rows that are not subsets of any other row\n", - " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", - " df = df[keep_mask]\n", - "\n", - " if len(df) > 1:\n", - " # Use pre-computed signature counts instead of original_df\n", - " remaining_sigs = df['_signature']\n", - " sig_counts = signature_counts[remaining_sigs]\n", - " max_signature = sig_counts.idxmax()\n", - " df = df[df['_signature'] == max_signature]\n", - "\n", - " df['icao'] = icao\n", - " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", - " return df\n", - "\n", - "# names of releases something like\n", - "# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n", - "\n", - "# Let's build historical first. \n", - "\n", - "_ch_client = None\n", - "\n", - "def _get_clickhouse_client():\n", - " \"\"\"Return a reusable ClickHouse client, with retry/backoff for transient DNS or connection errors.\"\"\"\n", - " global _ch_client\n", - " if _ch_client is not None:\n", - " return _ch_client\n", - "\n", - " import clickhouse_connect\n", - " import time\n", - "\n", - " max_retries = 5\n", - " for attempt in range(1, max_retries + 1):\n", - " try:\n", - " _ch_client = clickhouse_connect.get_client(\n", - " host=os.environ[\"CLICKHOUSE_HOST\"],\n", - " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", - " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", - " secure=True,\n", - " )\n", - " return _ch_client\n", - " except Exception as e:\n", - " wait = min(2 ** attempt, 30)\n", - " print(f\" ClickHouse connect attempt {attempt}/{max_retries} failed: {e}\")\n", - " if attempt == max_retries:\n", - " raise\n", - " print(f\" Retrying in {wait}s...\")\n", - " time.sleep(wait)\n", - "\n", - "\n", - "def load_raw_adsb_for_day(day):\n", - " \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n", - " from datetime import timedelta\n", - " from pathlib import Path\n", - " import pandas as pd\n", - " import time\n", - " \n", - " start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n", - " end_time = start_time + timedelta(days=1)\n", - " \n", - " # Set up caching\n", - " cache_dir = Path(\"data/adsb\")\n", - " cache_dir.mkdir(parents=True, exist_ok=True)\n", - " cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n", - " \n", - " # Check if cache exists\n", - " if cache_file.exists():\n", - " print(f\" Loading from cache: {cache_file}\")\n", - " df = pd.read_csv(cache_file, compression='zstd')\n", - " df['time'] = pd.to_datetime(df['time'])\n", - " else:\n", - " # Format dates for the query\n", - " start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n", - " end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n", - " \n", - " max_retries = 3\n", - " for attempt in range(1, max_retries + 1):\n", - " try:\n", - " client = _get_clickhouse_client()\n", - " print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n", - " df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n", - " break\n", - " except Exception as e:\n", - " wait = min(2 ** attempt, 30)\n", - " print(f\" Query attempt {attempt}/{max_retries} failed: {e}\")\n", - " if attempt == max_retries:\n", - " raise\n", - " # Reset client in case connection is stale\n", - " global _ch_client\n", - " _ch_client = None\n", - " print(f\" Retrying in {wait}s...\")\n", - " time.sleep(wait)\n", - " \n", - " # Save to cache\n", - " df.to_csv(cache_file, index=False, compression='zstd')\n", - " print(f\" Saved to cache: {cache_file}\")\n", - " \n", - " return df\n", - "\n", - "def load_historical_for_day(day):\n", - " from pathlib import Path\n", - " import pandas as pd\n", - " \n", - " df = load_raw_adsb_for_day(day)\n", - " print(df)\n", - " df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", - " df = df.drop(columns=['aircraft'])\n", - " df = df.sort_values(['icao', 'time'])\n", - " df[COLUMNS] = df[COLUMNS].fillna('')\n", - " df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", - " cols = df_compressed.columns.tolist()\n", - " cols.remove('time')\n", - " cols.insert(0, 'time')\n", - " cols.remove(\"icao\")\n", - " cols.insert(1, \"icao\")\n", - " df_compressed = df_compressed[cols]\n", - " return df_compressed\n", - "\n", - "\n", - "def concat_compressed_dfs(df_base, df_new):\n", - " \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n", - " import pandas as pd\n", - " \n", - " # Combine both dataframes\n", - " df_combined = pd.concat([df_base, df_new], ignore_index=True)\n", - " \n", - " # Sort by ICAO and time\n", - " df_combined = df_combined.sort_values(['icao', 'time'])\n", - " \n", - " # Fill NaN values\n", - " df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n", - " \n", - " # Apply compression logic per ICAO to get the best row\n", - " df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n", - " \n", - " # Sort by time\n", - " df_compressed = df_compressed.sort_values('time')\n", - " \n", - " return df_compressed\n", - "\n", - "\n", - "def get_latest_aircraft_adsb_csv_df():\n", - " \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n", - " from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n", - " \n", - " import pandas as pd\n", - " import re\n", - " \n", - " csv_path = download_latest_aircraft_adsb_csv()\n", - " df = pd.read_csv(csv_path)\n", - " df = df.fillna(\"\")\n", - " \n", - " # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n", - " match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n", - " if not match:\n", - " raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n", - " \n", - " date_str = match.group(1)\n", - " return df, date_str\n", - "\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7f66acf7", - "metadata": {}, - "outputs": [], - "source": [ - "# SOME KIND OF MAP REDUCE SYSTEM\n", - "\n", - "\n", - "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", - "def compress_df(df):\n", - " icao = df.name\n", - " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", - " original_df = df.copy()\n", - " df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n", - " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", - " def get_non_empty_dict(row):\n", - " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", - " \n", - " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", - " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", - " \n", - " # Check if row i's non-empty values are a subset of row j's non-empty values\n", - " def is_subset_of_any(idx):\n", - " row_dict = df.loc[idx, '_non_empty_dict']\n", - " row_count = df.loc[idx, '_non_empty_count']\n", - " \n", - " for other_idx in df.index:\n", - " if idx == other_idx:\n", - " continue\n", - " other_dict = df.loc[other_idx, '_non_empty_dict']\n", - " other_count = df.loc[other_idx, '_non_empty_count']\n", - " \n", - " # Check if all non-empty values in current row match those in other row\n", - " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", - " # If they match and other has more defined columns, current row is redundant\n", - " if other_count > row_count:\n", - " return True\n", - " return False\n", - " \n", - " # Keep rows that are not subsets of any other row\n", - " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", - " df = df[keep_mask]\n", - "\n", - " if len(df) > 1:\n", - " original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n", - " value_counts = original_df[\"_signature\"].value_counts()\n", - " max_signature = value_counts.idxmax()\n", - " df = df[df['_signature'] == max_signature]\n", - "\n", - " df['icao'] = icao\n", - " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", - " return df\n", - "\n", - "# names of releases something like\n", - "# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n", - "\n", - "# Let's build historical first. \n", - "\n", - "def load_raw_adsb_for_day(day):\n", - " \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n", - " from datetime import timedelta\n", - " import clickhouse_connect\n", - " from pathlib import Path\n", - " import pandas as pd\n", - " \n", - " start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n", - " end_time = start_time + timedelta(days=1)\n", - " \n", - " # Set up caching\n", - " cache_dir = Path(\"data/adsb\")\n", - " cache_dir.mkdir(parents=True, exist_ok=True)\n", - " cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n", - " \n", - " # Check if cache exists\n", - " if cache_file.exists():\n", - " print(f\" Loading from cache: {cache_file}\")\n", - " df = pd.read_csv(cache_file, compression='zstd')\n", - " df['time'] = pd.to_datetime(df['time'])\n", - " else:\n", - " # Format dates for the query\n", - " start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n", - " end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n", - " \n", - " client = clickhouse_connect.get_client(\n", - " host=os.environ[\"CLICKHOUSE_HOST\"],\n", - " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", - " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", - " secure=True,\n", - " )\n", - " print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n", - " df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n", - " \n", - " # Save to cache\n", - " df.to_csv(cache_file, index=False, compression='zstd')\n", - " print(f\" Saved to cache: {cache_file}\")\n", - " \n", - " return df\n", - "\n", - "def load_historical_for_day(day):\n", - " from pathlib import Path\n", - " import pandas as pd\n", - " \n", - " df = load_raw_adsb_for_day(day)\n", - " \n", - " df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", - " df = df.drop(columns=['aircraft'])\n", - " df = df.sort_values(['icao', 'time'])\n", - " df[COLUMNS] = df[COLUMNS].fillna('')\n", - " df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", - " cols = df_compressed.columns.tolist()\n", - " cols.remove('time')\n", - " cols.insert(0, 'time')\n", - " cols.remove(\"icao\")\n", - " cols.insert(1, \"icao\")\n", - " df_compressed = df_compressed[cols]\n", - " return df_compressed\n", - "\n", - "\n", - "def concat_compressed_dfs(df_base, df_new):\n", - " \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n", - " import pandas as pd\n", - " \n", - " # Combine both dataframes\n", - " df_combined = pd.concat([df_base, df_new], ignore_index=True)\n", - " \n", - " # Sort by ICAO and time\n", - " df_combined = df_combined.sort_values(['icao', 'time'])\n", - " \n", - " # Fill NaN values\n", - " df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n", - " \n", - " # Apply compression logic per ICAO to get the best row\n", - " df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n", - " \n", - " # Sort by time\n", - " df_compressed = df_compressed.sort_values('time')\n", - " \n", - " return df_compressed\n", - "\n", - "\n", - "def get_latest_aircraft_adsb_csv_df():\n", - " \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n", - " from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n", - " \n", - " import pandas as pd\n", - " import re\n", - " \n", - " csv_path = download_latest_aircraft_adsb_csv()\n", - " df = pd.read_csv(csv_path)\n", - " df = df.fillna(\"\")\n", - " \n", - " # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n", - " match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n", - " if not match:\n", - " raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n", - " \n", - " date_str = match.group(1)\n", - " return df, date_str\n", - "\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e14c8363", - "metadata": {}, - "outputs": [], - "source": [ - "from datetime import datetime\n", - "df = load_historical_for_day(datetime(2024,1,1))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "3874ba4d", - "metadata": {}, - "outputs": [], - "source": [ - "len(df)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "bcae50ad", - "metadata": {}, - "outputs": [], - "source": [ - "df[(df['icao'] == \"008081\")]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "50921c86", - "metadata": {}, - "outputs": [], - "source": [ - "df[df['icao'] == \"a4e1d2\"]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "8194d9aa", - "metadata": {}, - "outputs": [], - "source": [ - "df[df['r'] == \"N4131T\"]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1e3b7aa2", - "metadata": {}, - "outputs": [], - "source": [ - "df_compressed[df_compressed['icao'].duplicated(keep=False)]\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "40613bc1", - "metadata": {}, - "outputs": [], - "source": [ - "import gzip\n", - "import json\n", - "\n", - "path = \"/Users/jonahgoode/Downloads/test_extract/traces/fb/trace_full_acbbfb.json\"\n", - "\n", - "with gzip.open(path, \"rt\", encoding=\"utf-8\") as f:\n", - " data = json.load(f)\n", - "\n", - "print(type(data))\n", - "# use `data` here\n", - "import json\n", - "print(json.dumps(data, indent=2)[:2000])\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "320109b2", - "metadata": {}, - "outputs": [], - "source": [ - "# First, load the JSON to inspect its structure\n", - "import json\n", - "with open(\"/Users/jonahgoode/Documents/PlaneQuery/Other-Code/readsb-protobuf/webapp/src/db/aircrafts.json\", 'r') as f:\n", - " data = json.load(f)\n", - "\n", - "# Check the structure\n", - "print(type(data))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "590134f4", - "metadata": {}, - "outputs": [], - "source": [ - "data['AC97E3']" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": ".venv", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.10" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/requirements.txt b/requirements.txt index 6a4ec9a..5d93f27 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ pandas==3.0.0 pyarrow==23.0.0 orjson==3.11.7 polars==1.38.1 +jsonschema==4.26.0 \ No newline at end of file diff --git a/schemas/community_submission.v1.schema.json b/schemas/community_submission.v1.schema.json index 18609a4..8d0d94c 100644 --- a/schemas/community_submission.v1.schema.json +++ b/schemas/community_submission.v1.schema.json @@ -1,9 +1,8 @@ { "$schema": "https://json-schema.org/draft/2020-12/schema", - "title": "PlaneQuery Aircraft Community Submission (v1)", + "title": "OpenAirframes Community Submission (v1)", "type": "object", "additionalProperties": false, - "properties": { "registration_number": { "type": "string", @@ -11,13 +10,12 @@ }, "transponder_code_hex": { "type": "string", - "pattern": "^[0-9A-Fa-f]{6}$" + "pattern": "^[0-9A-F]{6}$" }, - "planequery_airframe_id": { + "openairframes_id": { "type": "string", "minLength": 1 }, - "contributor_uuid": { "type": "string", "format": "uuid" @@ -28,14 +26,24 @@ "maxLength": 150, "description": "Display name (may be blank)" }, - "creation_timestamp": { "type": "string", "format": "date-time", "description": "Set by the system when the submission is persisted/approved.", "readOnly": true }, - + "start_date": { + "type": "string", + "format": "date", + "pattern": "^\\d{4}-\\d{2}-\\d{2}$", + "description": "Optional start date for when this submission's tags are valid (ISO 8601, e.g., 2025-05-01)." + }, + "end_date": { + "type": "string", + "format": "date", + "pattern": "^\\d{4}-\\d{2}-\\d{2}$", + "description": "Optional end date for when this submission's tags are valid (ISO 8601, e.g., 2025-07-03)." + }, "tags": { "type": "object", "description": "Additional community-defined tags as key/value pairs (values may be scalar, array, or object).", @@ -43,38 +51,63 @@ "type": "string", "pattern": "^[a-z][a-z0-9_]{0,63}$" }, - "additionalProperties": { "$ref": "#/$defs/tagValue" } + "additionalProperties": { + "$ref": "#/$defs/tagValue" + }, + "properties": {} } }, - "allOf": [ { "anyOf": [ - { "required": ["registration_number"] }, - { "required": ["transponder_code_hex"] }, - { "required": ["planequery_airframe_id"] } + { + "required": [ + "registration_number" + ] + }, + { + "required": [ + "transponder_code_hex" + ] + }, + { + "required": [ + "openairframes_id" + ] + } ] } ], - "$defs": { "tagScalar": { - "type": ["string", "number", "integer", "boolean", "null"] + "type": [ + "string", + "number", + "integer", + "boolean", + "null" + ] }, "tagValue": { "anyOf": [ - { "$ref": "#/$defs/tagScalar" }, + { + "$ref": "#/$defs/tagScalar" + }, { "type": "array", "maxItems": 50, - "items": { "$ref": "#/$defs/tagScalar" } + "items": { + "$ref": "#/$defs/tagScalar" + } }, { "type": "object", "maxProperties": 50, - "additionalProperties": { "$ref": "#/$defs/tagScalar" } + "additionalProperties": { + "$ref": "#/$defs/tagScalar" + } } ] } } -} \ No newline at end of file +} diff --git a/src/adsb/combine_chunks_to_csv.py b/src/adsb/combine_chunks_to_csv.py index 2fe8b4e..b5afca3 100644 --- a/src/adsb/combine_chunks_to_csv.py +++ b/src/adsb/combine_chunks_to_csv.py @@ -27,7 +27,7 @@ from src.adsb.compress_adsb_to_aircraft_data import compress_multi_icao_df, COLU DEFAULT_CHUNK_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks") -FINAL_OUTPUT_DIR = "./data/planequery_aircraft" +FINAL_OUTPUT_DIR = "./data/openairframes" os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True) @@ -36,8 +36,13 @@ def get_target_day() -> datetime: return datetime.utcnow() - timedelta(days=1) -def process_single_chunk(chunk_path: str) -> pl.DataFrame: - """Load and compress a single chunk parquet file.""" +def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame: + """Load and compress a single chunk parquet file. + + Args: + chunk_path: Path to parquet file + delete_after_load: If True, delete the parquet file after loading to free disk space + """ print(f"Processing {os.path.basename(chunk_path)}... | {get_resource_usage()}") # Load chunk - only columns we need @@ -45,6 +50,14 @@ def process_single_chunk(chunk_path: str) -> pl.DataFrame: df = pl.read_parquet(chunk_path, columns=needed_columns) print(f" Loaded {len(df)} rows") + # Delete file immediately after loading to free disk space + if delete_after_load: + try: + os.remove(chunk_path) + print(f" Deleted {chunk_path} to free disk space") + except Exception as e: + print(f" Warning: Failed to delete {chunk_path}: {e}") + # Compress to aircraft records (one per ICAO) using shared function compressed = compress_multi_icao_df(df, verbose=True) print(f" Compressed to {len(compressed)} aircraft records") @@ -72,12 +85,12 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame: """Download base release and merge with new data.""" - from src.get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv + from src.get_latest_release import download_latest_aircraft_adsb_csv print("Downloading base ADS-B release...") try: base_path = download_latest_aircraft_adsb_csv( - output_dir="./data/planequery_aircraft_base" + output_dir="./data/openairframes_base" ) print(f"Download returned: {base_path}") @@ -156,16 +169,17 @@ def main(): parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files") parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release") parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging") + parser.add_argument("--stream", action="store_true", help="Delete parquet files immediately after loading to save disk space") args = parser.parse_args() # Determine output ID and filename based on mode if args.start_date and args.end_date: # Historical mode output_id = f"{args.start_date}_{args.end_date}" - output_filename = f"planequery_aircraft_adsb_{args.start_date}_{args.end_date}.csv" + output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv" print(f"Combining chunks for date range: {args.start_date} to {args.end_date}") else: - # Daily mode + # Daily mode - use same date for start and end if args.date: target_day = datetime.strptime(args.date, "%Y-%m-%d") else: @@ -173,7 +187,7 @@ def main(): date_str = target_day.strftime("%Y-%m-%d") output_id = date_str - output_filename = f"planequery_aircraft_adsb_{date_str}.csv" + output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv" print(f"Combining chunks for {date_str}") chunks_dir = args.chunks_dir @@ -190,9 +204,10 @@ def main(): print(f"Found {len(chunk_files)} chunk files") # Process each chunk separately to save memory + # With --stream, delete parquet files immediately after loading to save disk space compressed_chunks = [] for chunk_path in chunk_files: - compressed = process_single_chunk(chunk_path) + compressed = process_single_chunk(chunk_path, delete_after_load=args.stream) compressed_chunks.append(compressed) gc.collect() diff --git a/src/adsb/compress_adsb_to_aircraft_data.py b/src/adsb/compress_adsb_to_aircraft_data.py index 5ae58cd..0938883 100644 --- a/src/adsb/compress_adsb_to_aircraft_data.py +++ b/src/adsb/compress_adsb_to_aircraft_data.py @@ -253,7 +253,7 @@ def concat_compressed_dfs(df_base, df_new): def get_latest_aircraft_adsb_csv_df(): """Download and load the latest ADS-B CSV from GitHub releases.""" - from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv + from get_latest_release import download_latest_aircraft_adsb_csv import re csv_path = download_latest_aircraft_adsb_csv() @@ -264,8 +264,8 @@ def get_latest_aircraft_adsb_csv_df(): if df[col].dtype == pl.Utf8: df = df.with_columns(pl.col(col).fill_null("")) - # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv - match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + # Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv + match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) if not match: raise ValueError(f"Could not extract date from filename: {csv_path.name}") diff --git a/src/adsb/download_adsb_data_to_parquet.py b/src/adsb/download_adsb_data_to_parquet.py index 4be76d6..579a375 100644 --- a/src/adsb/download_adsb_data_to_parquet.py +++ b/src/adsb/download_adsb_data_to_parquet.py @@ -82,7 +82,8 @@ def fetch_releases(version_date: str) -> list: if version_date == "v2024.12.31": year = "2025" BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases" - PATTERN = f"{version_date}-planes-readsb-prod-0" + # Match exact release name, exclude tmp releases + PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+$" releases = [] page = 1 @@ -187,19 +188,23 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool: cat_proc = subprocess.Popen( ["cat"] + file_paths, stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL + stderr=subprocess.PIPE ) tar_cmd = ["tar", "xf", "-", "-C", extract_dir, "--strip-components=1"] - subprocess.run( + result = subprocess.run( tar_cmd, stdin=cat_proc.stdout, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, check=True ) cat_proc.stdout.close() + cat_stderr = cat_proc.stderr.read().decode() if cat_proc.stderr else "" cat_proc.wait() + if cat_stderr: + print(f"cat stderr: {cat_stderr}") + print(f"Successfully extracted archive to {extract_dir}") # Delete tar files immediately after extraction @@ -217,7 +222,10 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool: return True except subprocess.CalledProcessError as e: + stderr_output = e.stderr.decode() if e.stderr else "" print(f"Failed to extract split archive: {e}") + if stderr_output: + print(f"tar stderr: {stderr_output}") return False diff --git a/src/adsb/reducer.py b/src/adsb/reducer.py index 3ad7f40..9dcdb91 100644 --- a/src/adsb/reducer.py +++ b/src/adsb/reducer.py @@ -76,8 +76,8 @@ def main(): print(f"After dedup: {df_accumulated.height} rows") # Write and upload final result - output_name = f"planequery_aircraft_adsb_{global_start}_{global_end}.csv.gz" - csv_output = Path(f"/tmp/planequery_aircraft_adsb_{global_start}_{global_end}.csv") + output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz" + csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv") gz_output = Path(f"/tmp/{output_name}") df_accumulated.write_csv(csv_output) diff --git a/src/contributions/approve_submission.py b/src/contributions/approve_submission.py index 5d953a4..ebdd6b7 100644 --- a/src/contributions/approve_submission.py +++ b/src/contributions/approve_submission.py @@ -21,12 +21,14 @@ import urllib.request import urllib.error from datetime import datetime, timezone -from .schema import extract_json_from_issue_body, extract_contributor_name_from_issue_body, parse_and_validate +from .schema import extract_json_from_issue_body, extract_contributor_name_from_issue_body, parse_and_validate, load_schema, SCHEMAS_DIR from .contributor import ( generate_contributor_uuid, generate_submission_filename, compute_content_hash, ) +from .update_schema import generate_updated_schema, check_for_new_tags, get_existing_tag_definitions +from .read_community_data import build_tag_type_registry def github_api_request( @@ -54,7 +56,11 @@ def github_api_request( try: with urllib.request.urlopen(req) as response: - return json.loads(response.read()) + response_body = response.read() + # DELETE requests return empty body (204 No Content) + if not response_body: + return {} + return json.loads(response_body) except urllib.error.HTTPError as e: error_body = e.read().decode() if e.fp else "" print(f"GitHub API error: {e.code} {e.reason}: {error_body}", file=sys.stderr) @@ -94,14 +100,30 @@ def create_branch(branch_name: str, sha: str) -> None: raise +def get_file_sha(path: str, branch: str) -> str | None: + """Get the SHA of an existing file, or None if it doesn't exist.""" + try: + response = github_api_request("GET", f"/contents/{path}?ref={branch}") + return response.get("sha") + except Exception: + return None + + def create_or_update_file(path: str, content: str, message: str, branch: str) -> None: """Create or update a file in the repository.""" content_b64 = base64.b64encode(content.encode()).decode() - github_api_request("PUT", f"/contents/{path}", { + payload = { "message": message, "content": content_b64, "branch": branch, - }) + } + + # If file exists, we need to include its SHA to update it + sha = get_file_sha(path, branch) + if sha: + payload["sha"] = sha + + github_api_request("PUT", f"/contents/{path}", payload) def create_pull_request(title: str, head: str, base: str, body: str) -> dict: @@ -144,21 +166,19 @@ def process_submission( return False data, errors = parse_and_validate(json_str) - if errors: - error_list = "\n".join(f"- {e}" for e in errors) + if errors or data is None: + error_list = "\n".join(f"- {e}" for e in errors) if errors else "Unknown error" add_issue_comment(issue_number, f"❌ **Validation Failed**\n\n{error_list}") return False # Normalize to list - submissions = data if isinstance(data, list) else [data] + submissions: list[dict] = data if isinstance(data, list) else [data] # Generate contributor UUID from GitHub ID contributor_uuid = generate_contributor_uuid(author_id) - # Extract contributor name from issue form (or default to GitHub username) + # Extract contributor name from issue form (None means user opted out of attribution) contributor_name = extract_contributor_name_from_issue_body(issue_body) - if not contributor_name: - contributor_name = f"@{author_username}" # Add metadata to each submission now = datetime.now(timezone.utc) @@ -167,14 +187,15 @@ def process_submission( for submission in submissions: submission["contributor_uuid"] = contributor_uuid - submission["contributor_name"] = contributor_name + if contributor_name: + submission["contributor_name"] = contributor_name submission["creation_timestamp"] = timestamp_str # Generate unique filename content_json = json.dumps(submissions, indent=2, sort_keys=True) content_hash = compute_content_hash(content_json) filename = generate_submission_filename(author_username, date_str, content_hash) - file_path = f"community/{filename}" + file_path = f"community/{date_str}/{filename}" # Create branch branch_name = f"community-submission-{issue_number}" @@ -185,14 +206,53 @@ def process_submission( commit_message = f"Add community submission from @{author_username} (closes #{issue_number})" create_or_update_file(file_path, content_json, commit_message, branch_name) + # Update schema with any new tags (modifies v1 in place) + schema_updated = False + new_tags = [] + try: + # Build tag registry from new submissions + tag_registry = build_tag_type_registry(submissions) + + # Get current schema and merge existing tags + current_schema = load_schema() + existing_tags = get_existing_tag_definitions(current_schema) + + # Merge existing tags into registry + for tag_name, tag_def in existing_tags.items(): + if tag_name not in tag_registry: + tag_type = tag_def.get("type", "string") + tag_registry[tag_name] = tag_type + + # Check for new tags + new_tags = check_for_new_tags(tag_registry, current_schema) + + if new_tags: + # Generate updated schema + updated_schema = generate_updated_schema(current_schema, tag_registry) + schema_json = json.dumps(updated_schema, indent=2) + "\n" + + create_or_update_file( + "schemas/community_submission.v1.schema.json", + schema_json, + f"Update schema with new tags: {', '.join(new_tags)}", + branch_name + ) + schema_updated = True + except Exception as e: + print(f"Warning: Could not update schema: {e}", file=sys.stderr) + # Create PR + schema_note = "" + if schema_updated: + schema_note = f"\n**Schema Updated:** Added new tags: `{', '.join(new_tags)}`\n" + pr_body = f"""## Community Submission Adds {len(submissions)} submission(s) from @{author_username}. **File:** `{file_path}` **Contributor UUID:** `{contributor_uuid}` - +{schema_note} Closes #{issue_number} --- diff --git a/src/contributions/create_daily_community_release.py b/src/contributions/create_daily_community_release.py index f709d24..ec4d060 100644 --- a/src/contributions/create_daily_community_release.py +++ b/src/contributions/create_daily_community_release.py @@ -17,7 +17,7 @@ import pandas as pd COMMUNITY_DIR = Path(__file__).parent.parent.parent / "community" -OUT_ROOT = Path("data/planequery_aircraft") +OUT_ROOT = Path("data/openairframes") def read_all_submissions(community_dir: Path) -> list[dict]: @@ -47,7 +47,7 @@ def submissions_to_dataframe(submissions: list[dict]) -> pd.DataFrame: - creation_timestamp (first) - transponder_code_hex - registration_number - - planequery_airframe_id + - openairframes_id - contributor_name - [other columns alphabetically] - contributor_uuid (last) @@ -62,7 +62,7 @@ def submissions_to_dataframe(submissions: list[dict]) -> pd.DataFrame: "creation_timestamp", "transponder_code_hex", "registration_number", - "planequery_airframe_id", + "openairframes_id", "contributor_name", "contributor_uuid", ] @@ -78,7 +78,7 @@ def submissions_to_dataframe(submissions: list[dict]) -> pd.DataFrame: "creation_timestamp", "transponder_code_hex", "registration_number", - "planequery_airframe_id", + "openairframes_id", "contributor_name", ] last_cols = ["contributor_uuid"] @@ -108,7 +108,7 @@ def main(): "creation_timestamp", "transponder_code_hex", "registration_number", - "planequery_airframe_id", + "openairframes_id", "contributor_name", "tags", "contributor_uuid", @@ -127,7 +127,7 @@ def main(): # Output OUT_ROOT.mkdir(parents=True, exist_ok=True) - output_file = OUT_ROOT / f"planequery_aircraft_community_{start_date_str}_{date_str}.csv" + output_file = OUT_ROOT / f"openairframes_community_{start_date_str}_{date_str}.csv" df.to_csv(output_file, index=False) diff --git a/src/contributions/read_community_data.py b/src/contributions/read_community_data.py index 0e6e4ea..525e467 100644 --- a/src/contributions/read_community_data.py +++ b/src/contributions/read_community_data.py @@ -30,7 +30,8 @@ def read_all_submissions(community_dir: Path | None = None) -> list[dict]: all_submissions = [] - for json_file in sorted(community_dir.glob("*.json")): + # Search both root directory and date subdirectories (e.g., 2026-02-12/) + for json_file in sorted(community_dir.glob("**/*.json")): try: with open(json_file) as f: data = json.load(f) @@ -50,6 +51,52 @@ def read_all_submissions(community_dir: Path | None = None) -> list[dict]: return all_submissions +def get_python_type_name(value) -> str: + """Get a normalized type name for a value.""" + if value is None: + return "null" + if isinstance(value, bool): + return "boolean" + if isinstance(value, int): + return "integer" + if isinstance(value, float): + return "number" + if isinstance(value, str): + return "string" + if isinstance(value, list): + return "array" + if isinstance(value, dict): + return "object" + return type(value).__name__ + + +def build_tag_type_registry(submissions: list[dict]) -> dict[str, str]: + """ + Build a registry of tag names to their expected types from existing submissions. + + Args: + submissions: List of existing submission dictionaries + + Returns: + Dict mapping tag name to expected type (e.g., {"internet": "string", "year_built": "integer"}) + """ + tag_types = {} + + for submission in submissions: + tags = submission.get("tags", {}) + if not isinstance(tags, dict): + continue + + for key, value in tags.items(): + inferred_type = get_python_type_name(value) + + if key not in tag_types: + tag_types[key] = inferred_type + # If there's a conflict, keep the first type (it's already in use) + + return tag_types + + def group_by_identifier(submissions: list[dict]) -> dict[str, list[dict]]: """ Group submissions by their identifier (registration, transponder, or airframe ID). @@ -65,8 +112,8 @@ def group_by_identifier(submissions: list[dict]) -> dict[str, list[dict]]: key = f"reg:{submission['registration_number']}" elif "transponder_code_hex" in submission: key = f"icao:{submission['transponder_code_hex']}" - elif "planequery_airframe_id" in submission: - key = f"id:{submission['planequery_airframe_id']}" + elif "openairframes_id" in submission: + key = f"id:{submission['openairframes_id']}" else: key = "_unknown" diff --git a/src/contributions/regenerate_pr_schema.py b/src/contributions/regenerate_pr_schema.py new file mode 100644 index 0000000..5209e13 --- /dev/null +++ b/src/contributions/regenerate_pr_schema.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +""" +Regenerate schema for a PR branch after main has been merged in. +This script looks at the submission files in this branch and updates +the schema if new tags were introduced. + +Usage: python -m src.contributions.regenerate_pr_schema +""" + +import json +import sys +from pathlib import Path + +# Add parent to path for imports when running as script +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from src.contributions.read_community_data import read_all_submissions, build_tag_type_registry +from src.contributions.update_schema import ( + get_existing_tag_definitions, + check_for_new_tags, + generate_updated_schema, +) +from src.contributions.schema import load_schema, SCHEMAS_DIR + + +def main(): + """Main entry point.""" + # Load current schema + current_schema = load_schema() + + # Get existing tag definitions from schema + existing_tags = get_existing_tag_definitions(current_schema) + + # Read all submissions (including ones from this PR branch) + submissions = read_all_submissions() + + if not submissions: + print("No submissions found") + return + + # Build tag registry from all submissions + tag_registry = build_tag_type_registry(submissions) + + # Check for new tags not in the current schema + new_tags = check_for_new_tags(tag_registry, current_schema) + + if new_tags: + print(f"Found new tags: {new_tags}") + print("Updating schema...") + + # Generate updated schema + updated_schema = generate_updated_schema(current_schema, tag_registry) + + # Write updated schema (in place) + schema_path = SCHEMAS_DIR / "community_submission.v1.schema.json" + with open(schema_path, 'w') as f: + json.dump(updated_schema, f, indent=2) + f.write("\n") + + print(f"Updated {schema_path}") + else: + print("No new tags found, schema is up to date") + + +if __name__ == "__main__": + main() diff --git a/src/contributions/schema.py b/src/contributions/schema.py index 3bc9539..e2ee54b 100644 --- a/src/contributions/schema.py +++ b/src/contributions/schema.py @@ -10,12 +10,59 @@ except ImportError: Draft202012Validator = None -SCHEMA_PATH = Path(__file__).parent.parent.parent / "schemas" / "community_submission.v1.schema.json" +SCHEMAS_DIR = Path(__file__).parent.parent.parent / "schemas" + +# For backwards compatibility +SCHEMA_PATH = SCHEMAS_DIR / "community_submission.v1.schema.json" -def load_schema() -> dict: - """Load the community submission schema.""" - with open(SCHEMA_PATH) as f: +def get_latest_schema_version() -> int: + """ + Find the latest schema version number. + + Returns: + Latest version number (e.g., 1, 2, 3) + """ + import re + pattern = re.compile(r"community_submission\.v(\d+)\.schema\.json$") + max_version = 0 + + for path in SCHEMAS_DIR.glob("community_submission.v*.schema.json"): + match = pattern.search(path.name) + if match: + version = int(match.group(1)) + max_version = max(max_version, version) + + return max_version + + +def get_schema_path(version: int | None = None) -> Path: + """ + Get path to a specific schema version, or latest if version is None. + + Args: + version: Schema version number, or None for latest + + Returns: + Path to schema file + """ + if version is None: + version = get_latest_schema_version() + return SCHEMAS_DIR / f"community_submission.v{version}.schema.json" + + +def load_schema(version: int | None = None) -> dict: + """ + Load the community submission schema. + + Args: + version: Schema version to load. If None, loads the latest version. + + Returns: + Schema dict + """ + schema_path = get_schema_path(version) + with open(schema_path) as f: return json.load(f) @@ -50,11 +97,36 @@ def validate_submission(data: dict | list, schema: dict | None = None) -> list[s return errors +def download_github_attachment(url: str) -> str | None: + """ + Download content from a GitHub attachment URL. + + Args: + url: GitHub attachment URL (e.g., https://github.com/user-attachments/files/...) + + Returns: + File content as string, or None if download failed + """ + import urllib.request + import urllib.error + + try: + req = urllib.request.Request(url, headers={"User-Agent": "OpenAirframes-Bot"}) + with urllib.request.urlopen(req, timeout=30) as response: + return response.read().decode("utf-8") + except (urllib.error.URLError, urllib.error.HTTPError, UnicodeDecodeError) as e: + print(f"Failed to download attachment from {url}: {e}") + return None + + def extract_json_from_issue_body(body: str) -> str | None: """ Extract JSON from GitHub issue body. - Looks for JSON in the 'Submission JSON' section wrapped in code blocks. + Looks for JSON in the 'Submission JSON' section, either: + - A GitHub file attachment URL (drag-and-drop .json file) + - Wrapped in code blocks (```json ... ``` or ``` ... ```) + - Or raw JSON after the header Args: body: The issue body text @@ -62,13 +134,49 @@ def extract_json_from_issue_body(body: str) -> str | None: Returns: Extracted JSON string or None if not found """ - # Match JSON in "### Submission JSON" section - pattern = r"### Submission JSON\s*\n\s*```(?:json)?\s*\n([\s\S]*?)\n\s*```" - match = re.search(pattern, body) + # Try: GitHub attachment URL in the Submission JSON section + # Format: [filename.json](https://github.com/user-attachments/files/...) + # Or just the raw URL + pattern_attachment = r"### Submission JSON\s*\n[\s\S]*?(https://github\.com/(?:user-attachments/files|.*?/files)/[^\s\)\]]+\.json)" + match = re.search(pattern_attachment, body) + if match: + url = match.group(1) + content = download_github_attachment(url) + if content: + return content.strip() + # Also check for GitHub user-attachments URL anywhere in submission section + pattern_attachment_alt = r"\[.*?\.json\]\((https://github\.com/[^\)]+)\)" + match = re.search(pattern_attachment_alt, body) + if match: + url = match.group(1) + if ".json" in url or "user-attachments" in url: + content = download_github_attachment(url) + if content: + return content.strip() + + # Try: JSON in code blocks after "### Submission JSON" + pattern_codeblock = r"### Submission JSON\s*\n\s*```(?:json)?\s*\n([\s\S]*?)\n\s*```" + match = re.search(pattern_codeblock, body) if match: return match.group(1).strip() + # Try: Raw JSON after "### Submission JSON" until next section or end + pattern_raw = r"### Submission JSON\s*\n\s*([\[{][\s\S]*?[\]}])(?=\n###|\n\n###|$)" + match = re.search(pattern_raw, body) + if match: + return match.group(1).strip() + + # Try: Any JSON object/array in the body (fallback) + pattern_any = r"([\[{][\s\S]*?[\]}])" + for match in re.finditer(pattern_any, body): + candidate = match.group(1).strip() + # Validate it looks like JSON + if candidate.startswith('{') and candidate.endswith('}'): + return candidate + if candidate.startswith('[') and candidate.endswith(']'): + return candidate + return None diff --git a/src/contributions/update_schema.py b/src/contributions/update_schema.py new file mode 100644 index 0000000..05b6385 --- /dev/null +++ b/src/contributions/update_schema.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +""" +Update the schema with tag type definitions from existing submissions. + +This script reads all community submissions and generates a new schema version +that includes explicit type definitions for all known tags. + +When new tags are introduced, a new schema version is created (e.g., v1 -> v2 -> v3). + +Usage: + python -m src.contributions.update_schema + python -m src.contributions.update_schema --check # Check if update needed +""" +import argparse +import json +import sys +from pathlib import Path + +from .read_community_data import read_all_submissions, build_tag_type_registry +from .schema import SCHEMAS_DIR, get_latest_schema_version, get_schema_path, load_schema + + +def get_existing_tag_definitions(schema: dict) -> dict[str, dict]: + """Extract existing tag property definitions from schema.""" + tags_props = schema.get("properties", {}).get("tags", {}).get("properties", {}) + return tags_props + + +def type_name_to_json_schema(type_name: str) -> dict: + """Convert a type name to a JSON Schema type definition.""" + type_map = { + "string": {"type": "string"}, + "integer": {"type": "integer"}, + "number": {"type": "number"}, + "boolean": {"type": "boolean"}, + "null": {"type": "null"}, + "array": {"type": "array", "items": {"$ref": "#/$defs/tagScalar"}}, + "object": {"type": "object", "additionalProperties": {"$ref": "#/$defs/tagScalar"}}, + } + return type_map.get(type_name, {"$ref": "#/$defs/tagValue"}) + + +def generate_updated_schema(base_schema: dict, tag_registry: dict[str, str]) -> dict: + """ + Generate an updated schema with explicit tag definitions. + + Args: + base_schema: The current schema to update + tag_registry: Dict mapping tag name to type name + + Returns: + Updated schema dict + """ + schema = json.loads(json.dumps(base_schema)) # Deep copy + + # Build tag properties with explicit types + tag_properties = {} + for tag_name, type_name in sorted(tag_registry.items()): + tag_properties[tag_name] = type_name_to_json_schema(type_name) + + # Only add/update the properties key within tags, preserve everything else + if "properties" in schema and "tags" in schema["properties"]: + schema["properties"]["tags"]["properties"] = tag_properties + + return schema + + +def check_for_new_tags(tag_registry: dict[str, str], current_schema: dict) -> list[str]: + """ + Check which tags in the registry are not yet defined in the schema. + + Returns: + List of new tag names + """ + existing_tags = get_existing_tag_definitions(current_schema) + return [tag for tag in tag_registry if tag not in existing_tags] + + +def update_schema_file( + tag_registry: dict[str, str], + check_only: bool = False +) -> tuple[bool, list[str]]: + """ + Update the v1 schema file with new tag definitions. + + Args: + tag_registry: Dict mapping tag name to type name + check_only: If True, only check if update is needed without writing + + Returns: + Tuple of (was_updated, list_of_new_tags) + """ + current_schema = load_schema() + + # Find new tags + new_tags = check_for_new_tags(tag_registry, current_schema) + + if not new_tags: + return False, [] + + if check_only: + return True, new_tags + + # Generate and write updated schema (in place) + updated_schema = generate_updated_schema(current_schema, tag_registry) + schema_path = get_schema_path() + + with open(schema_path, "w") as f: + json.dump(updated_schema, f, indent=2) + f.write("\n") + + return True, new_tags + + +def update_schema_from_submissions(check_only: bool = False) -> tuple[bool, list[str]]: + """ + Read all submissions and update the schema if needed. + + Args: + check_only: If True, only check if update is needed without writing + + Returns: + Tuple of (was_updated, list_of_new_tags) + """ + submissions = read_all_submissions() + tag_registry = build_tag_type_registry(submissions) + return update_schema_file(tag_registry, check_only) + + +def main(): + parser = argparse.ArgumentParser(description="Update schema with tag definitions") + parser.add_argument("--check", action="store_true", help="Check if update needed without writing") + + args = parser.parse_args() + + was_updated, new_tags = update_schema_from_submissions(check_only=args.check) + + if args.check: + if was_updated: + print(f"Schema update needed. New tags: {', '.join(new_tags)}") + sys.exit(1) + else: + print("Schema is up to date") + sys.exit(0) + else: + if was_updated: + print(f"Updated {get_schema_path()}") + print(f"Added tags: {', '.join(new_tags)}") + else: + print("No update needed") + + +if __name__ == "__main__": + main() diff --git a/src/contributions/validate_submission.py b/src/contributions/validate_submission.py index e4d45b6..84bee1b 100644 --- a/src/contributions/validate_submission.py +++ b/src/contributions/validate_submission.py @@ -7,6 +7,7 @@ submissions when issues are opened or edited. Usage: python -m src.contributions.validate_submission --issue-body "..." + python -m src.contributions.validate_submission --issue-body-file /path/to/body.txt python -m src.contributions.validate_submission --file submission.json echo '{"registration_number": "N12345"}' | python -m src.contributions.validate_submission --stdin @@ -23,6 +24,7 @@ import urllib.request import urllib.error from .schema import extract_json_from_issue_body, parse_and_validate, load_schema +from .read_community_data import read_all_submissions, build_tag_type_registry, get_python_type_name def github_api_request(method: str, endpoint: str, data: dict | None = None) -> dict: @@ -65,6 +67,40 @@ def remove_issue_label(issue_number: int, label: str) -> None: pass # Label might not exist +def validate_tag_consistency(data: dict | list, tag_registry: dict[str, str]) -> list[str]: + """ + Check that tag types in new submissions match existing tag types. + + Args: + data: Single submission dict or list of submissions + tag_registry: Dict mapping tag name to expected type + + Returns: + List of error messages. Empty list means validation passed. + """ + errors = [] + submissions = data if isinstance(data, list) else [data] + + for i, submission in enumerate(submissions): + prefix = f"[{i}] " if len(submissions) > 1 else "" + tags = submission.get("tags", {}) + + if not isinstance(tags, dict): + continue + + for key, value in tags.items(): + actual_type = get_python_type_name(value) + + if key in tag_registry: + expected_type = tag_registry[key] + if actual_type != expected_type: + errors.append( + f"{prefix}tags.{key}: expected type '{expected_type}', got '{actual_type}'" + ) + + return errors + + def validate_and_report(json_str: str, issue_number: int | None = None) -> bool: """ Validate JSON and optionally report to GitHub issue. @@ -90,6 +126,33 @@ def validate_and_report(json_str: str, issue_number: int | None = None) -> bool: return False + # Check tag type consistency against existing submissions + if data is not None: + try: + existing_submissions = read_all_submissions() + tag_registry = build_tag_type_registry(existing_submissions) + tag_errors = validate_tag_consistency(data, tag_registry) + + if tag_errors: + error_list = "\n".join(f"- {e}" for e in tag_errors) + message = ( + f"❌ **Tag Type Mismatch**\n\n" + f"Your submission uses tags with types that don't match existing submissions:\n\n" + f"{error_list}\n\n" + f"Please use the same type as existing tags, or use a different tag name." + ) + + print(message, file=sys.stderr) + + if issue_number: + add_issue_comment(issue_number, message) + remove_issue_label(issue_number, "validated") + + return False + except Exception as e: + # Don't fail validation if we can't read existing submissions + print(f"Warning: Could not check tag consistency: {e}", file=sys.stderr) + count = len(data) if isinstance(data, list) else 1 message = f"✅ **Validation Passed**\n\n{count} submission(s) validated successfully against the schema.\n\nA maintainer can approve this submission by adding the `approved` label." @@ -106,6 +169,7 @@ def main(): parser = argparse.ArgumentParser(description="Validate community submission JSON") source_group = parser.add_mutually_exclusive_group(required=True) source_group.add_argument("--issue-body", help="Issue body text containing JSON") + source_group.add_argument("--issue-body-file", help="File containing issue body text") source_group.add_argument("--file", help="JSON file to validate") source_group.add_argument("--stdin", action="store_true", help="Read JSON from stdin") @@ -125,6 +189,20 @@ def main(): "Please ensure your JSON is in the 'Submission JSON' field wrapped in code blocks." ) sys.exit(1) + elif args.issue_body_file: + with open(args.issue_body_file) as f: + issue_body = f.read() + json_str = extract_json_from_issue_body(issue_body) + if not json_str: + print("❌ Could not extract JSON from issue body", file=sys.stderr) + print(f"Issue body:\n{issue_body}", file=sys.stderr) + if args.issue_number: + add_issue_comment( + args.issue_number, + "❌ **Validation Failed**\n\nCould not extract JSON from submission. " + "Please ensure your JSON is in the 'Submission JSON' field." + ) + sys.exit(1) elif args.file: with open(args.file) as f: json_str = f.read() diff --git a/src/create_daily_planequery_aircraft_adsb_release.py b/src/create_daily_adsb_release.py similarity index 95% rename from src/create_daily_planequery_aircraft_adsb_release.py rename to src/create_daily_adsb_release.py index e5de1f8..0a5137e 100644 --- a/src/create_daily_planequery_aircraft_adsb_release.py +++ b/src/create_daily_adsb_release.py @@ -74,10 +74,10 @@ if __name__ == '__main__': ) # Save the result - OUT_ROOT = Path("data/planequery_aircraft") + OUT_ROOT = Path("data/openairframes") OUT_ROOT.mkdir(parents=True, exist_ok=True) - output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{date_str}.csv" + output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv" df_combined.write_csv(output_file) print(f"Saved: {output_file}") diff --git a/src/create_daily_faa_release.py b/src/create_daily_faa_release.py new file mode 100644 index 0000000..4e7adfd --- /dev/null +++ b/src/create_daily_faa_release.py @@ -0,0 +1,49 @@ +from pathlib import Path +from datetime import datetime, timezone, timedelta +import argparse + +parser = argparse.ArgumentParser(description="Create daily FAA release") +parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today)") +args = parser.parse_args() + +if args.date: + date_str = args.date +else: + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + +out_dir = Path("data/faa_releasable") +out_dir.mkdir(parents=True, exist_ok=True) +zip_name = f"ReleasableAircraft_{date_str}.zip" + +zip_path = out_dir / zip_name +if not zip_path.exists(): + # URL and paths + url = "https://registry.faa.gov/database/ReleasableAircraft.zip" + from urllib.request import Request, urlopen + + req = Request( + url, + headers={"User-Agent": "Mozilla/5.0"}, + method="GET", + ) + + with urlopen(req, timeout=120) as r: + body = r.read() + zip_path.write_bytes(body) + +OUT_ROOT = Path("data/openairframes") +OUT_ROOT.mkdir(parents=True, exist_ok=True) +from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df +from get_latest_release import get_latest_aircraft_faa_csv_df +df_new = convert_faa_master_txt_to_df(zip_path, date_str) + +try: + df_base, start_date_str = get_latest_aircraft_faa_csv_df() + df_base = concat_faa_historical_df(df_base, df_new) + assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" +except Exception as e: + print(f"No existing FAA release found, using only new data: {e}") + df_base = df_new + start_date_str = date_str + +df_base.to_csv(OUT_ROOT / f"openairframes_faa_{start_date_str}_{date_str}.csv", index=False) \ No newline at end of file diff --git a/src/create_daily_planequery_aircraft_release.py b/src/create_daily_planequery_aircraft_release.py deleted file mode 100644 index 559f8fc..0000000 --- a/src/create_daily_planequery_aircraft_release.py +++ /dev/null @@ -1,33 +0,0 @@ -from pathlib import Path -from datetime import datetime, timezone -date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") - -out_dir = Path("data/faa_releasable") -out_dir.mkdir(parents=True, exist_ok=True) -zip_name = f"ReleasableAircraft_{date_str}.zip" - -zip_path = out_dir / zip_name -if not zip_path.exists(): - # URL and paths - url = "https://registry.faa.gov/database/ReleasableAircraft.zip" - from urllib.request import Request, urlopen - - req = Request( - url, - headers={"User-Agent": "Mozilla/5.0"}, - method="GET", - ) - - with urlopen(req, timeout=120) as r: - body = r.read() - zip_path.write_bytes(body) - -OUT_ROOT = Path("data/planequery_aircraft") -OUT_ROOT.mkdir(parents=True, exist_ok=True) -from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df -from get_latest_planequery_aircraft_release import get_latest_aircraft_faa_csv_df -df_new = convert_faa_master_txt_to_df(zip_path, date_str) -df_base, start_date_str = get_latest_aircraft_faa_csv_df() -df_base = concat_faa_historical_df(df_base, df_new) -assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" -df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date_str}_{date_str}.csv", index=False) \ No newline at end of file diff --git a/src/derive_from_faa_master_txt.py b/src/derive_from_faa_master_txt.py index 532d3f3..ea4d167 100644 --- a/src/derive_from_faa_master_txt.py +++ b/src/derive_from_faa_master_txt.py @@ -29,8 +29,8 @@ def convert_faa_master_txt_to_df(zip_path: Path, date: str): certification = pd.json_normalize(df["certification"].where(df["certification"].notna(), {})).add_prefix("certificate_") df = df.drop(columns="certification").join(certification) - # Create planequery_airframe_id - df["planequery_airframe_id"] = ( + # Create openairframes_id + df["openairframes_id"] = ( normalize(df["aircraft_manufacturer"]) + "|" + normalize(df["aircraft_model"]) @@ -38,11 +38,11 @@ def convert_faa_master_txt_to_df(zip_path: Path, date: str): + normalize(df["serial_number"]) ) - # Move planequery_airframe_id to come after registration_number + # Move openairframes_id to come after registration_number cols = df.columns.tolist() - cols.remove("planequery_airframe_id") + cols.remove("openairframes_id") reg_idx = cols.index("registration_number") - cols.insert(reg_idx + 1, "planequery_airframe_id") + cols.insert(reg_idx + 1, "openairframes_id") df = df[cols] # Convert all NaN to empty strings diff --git a/src/get_latest_planequery_aircraft_release.py b/src/get_latest_release.py similarity index 82% rename from src/get_latest_planequery_aircraft_release.py rename to src/get_latest_release.py index 9867a5d..b29b82a 100644 --- a/src/get_latest_planequery_aircraft_release.py +++ b/src/get_latest_release.py @@ -9,7 +9,7 @@ import urllib.error import json -REPO = "PlaneQuery/planequery-aircraft" +REPO = "PlaneQuery/openairframes" LATEST_RELEASE_URL = f"https://api.github.com/repos/{REPO}/releases/latest" @@ -31,7 +31,7 @@ def get_latest_release_assets(repo: str = REPO, github_token: Optional[str] = No url = f"https://api.github.com/repos/{repo}/releases/latest" headers = { "Accept": "application/vnd.github+json", - "User-Agent": "planequery-aircraft-downloader/1.0", + "User-Agent": "openairframes-downloader/1.0", } if github_token: headers["Authorization"] = f"Bearer {github_token}" @@ -80,7 +80,7 @@ def download_asset(asset: ReleaseAsset, out_path: Path, github_token: Optional[s out_path.parent.mkdir(parents=True, exist_ok=True) headers = { - "User-Agent": "planequery-aircraft-downloader/1.0", + "User-Agent": "openairframes-downloader/1.0", "Accept": "application/octet-stream", } if github_token: @@ -109,7 +109,7 @@ def download_latest_aircraft_csv( repo: str = REPO, ) -> Path: """ - Download the latest planequery_aircraft_faa_*.csv file from the latest GitHub release. + Download the latest openairframes_faa_*.csv file from the latest GitHub release. Args: output_dir: Directory to save the downloaded file (default: "downloads") @@ -121,10 +121,10 @@ def download_latest_aircraft_csv( """ assets = get_latest_release_assets(repo, github_token=github_token) try: - asset = pick_asset(assets, name_regex=r"^planequery_aircraft_faa_.*\.csv$") + asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$") except FileNotFoundError: # Fallback to old naming pattern - asset = pick_asset(assets, name_regex=r"^planequery_aircraft_\d{4}-\d{2}-\d{2}_.*\.csv$") + asset = pick_asset(assets, name_regex=r"^openairframes_\d{4}-\d{2}-\d{2}_.*\.csv$") saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") return saved_to @@ -136,11 +136,11 @@ def get_latest_aircraft_faa_csv_df(): 'unique_regulatory_id': str, 'registrant_county': str}) df = df.fillna("") - # Extract start date from filename pattern: planequery_aircraft_faa_{start_date}_{end_date}.csv - match = re.search(r"planequery_aircraft_faa_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + # Extract start date from filename pattern: openairframes_faa_{start_date}_{end_date}.csv + match = re.search(r"openairframes_faa_(\d{4}-\d{2}-\d{2})_", str(csv_path)) if not match: - # Fallback to old naming pattern: planequery_aircraft_{start_date}_{end_date}.csv - match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + # Fallback to old naming pattern: openairframes_{start_date}_{end_date}.csv + match = re.search(r"openairframes_(\d{4}-\d{2}-\d{2})_", str(csv_path)) if not match: raise ValueError(f"Could not extract date from filename: {csv_path.name}") @@ -154,7 +154,7 @@ def download_latest_aircraft_adsb_csv( repo: str = REPO, ) -> Path: """ - Download the latest planequery_aircraft_adsb_*.csv file from the latest GitHub release. + Download the latest openairframes_adsb_*.csv file from the latest GitHub release. Args: output_dir: Directory to save the downloaded file (default: "downloads") @@ -165,7 +165,7 @@ def download_latest_aircraft_adsb_csv( Path to the downloaded file """ assets = get_latest_release_assets(repo, github_token=github_token) - asset = pick_asset(assets, name_regex=r"^planequery_aircraft_adsb_.*\.csv$") + asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv$") saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") return saved_to @@ -176,8 +176,8 @@ def get_latest_aircraft_adsb_csv_df(): import pandas as pd df = pd.read_csv(csv_path) df = df.fillna("") - # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv - match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + # Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv + match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) if not match: raise ValueError(f"Could not extract date from filename: {csv_path.name}") diff --git a/trigger_pipeline.py b/trigger_pipeline.py deleted file mode 100644 index 56f47d8..0000000 --- a/trigger_pipeline.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -Generate Step Functions input and start the pipeline. - -Usage: - python trigger_pipeline.py 2024-01-01 2025-01-01 - python trigger_pipeline.py 2024-01-01 2025-01-01 --chunk-days 30 - python trigger_pipeline.py 2024-01-01 2025-01-01 --dry-run -""" -import argparse -import json -import os -import uuid -from datetime import datetime, timedelta - -import boto3 - - -def generate_chunks(start_date: str, end_date: str, chunk_days: int = 1): - """Split a date range into chunks of chunk_days.""" - start = datetime.strptime(start_date, "%Y-%m-%d") - end = datetime.strptime(end_date, "%Y-%m-%d") - - chunks = [] - current = start - while current < end: - chunk_end = min(current + timedelta(days=chunk_days), end) - chunks.append({ - "start_date": current.strftime("%Y-%m-%d"), - "end_date": chunk_end.strftime("%Y-%m-%d"), - }) - current = chunk_end - - return chunks - - -def main(): - parser = argparse.ArgumentParser(description="Trigger ADS-B map-reduce pipeline") - parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)") - parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)") - parser.add_argument("--chunk-days", type=int, default=1, - help="Days per chunk (default: 1)") - parser.add_argument("--dry-run", action="store_true", - help="Print input JSON without starting execution") - args = parser.parse_args() - - run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}" - chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days) - - # Inject run_id into each chunk - for chunk in chunks: - chunk["run_id"] = run_id - - sfn_input = { - "run_id": run_id, - "global_start_date": args.start_date, - "global_end_date": args.end_date, - "chunks": chunks, - } - - print(f"Run ID: {run_id}") - print(f"Chunks: {len(chunks)} (at {args.chunk_days} days each)") - print(f"Max concurrency: 3 (enforced by Step Functions Map state)") - print() - print(json.dumps(sfn_input, indent=2)) - - if args.dry_run: - print("\n--dry-run: not starting execution") - return - - client = boto3.client("stepfunctions") - - # Find the state machine ARN - machines = client.list_state_machines()["stateMachines"] - arn = next( - m["stateMachineArn"] - for m in machines - if m["name"] == "adsb-map-reduce" - ) - - response = client.start_execution( - stateMachineArn=arn, - name=run_id, - input=json.dumps(sfn_input), - ) - - print(f"\nStarted execution: {response['executionArn']}") - - -if __name__ == "__main__": - main()