Compare commits

..

35 Commits

Author SHA1 Message Date
ggman12 9c744b0baf update readme.md 2026-03-18 14:29:13 -04:00
JG ebda04767f Merge pull request #34 from PlaneQuery/develop
Develop to main: theairtraffic google sheet
2026-03-10 05:12:11 -04:00
ggman12 3fdf443894 add russia_ukraine 2026-03-10 05:08:19 -04:00
ggman12 24313603c5 works 2026-03-10 05:08:19 -04:00
JG 2bb0a5eac3 Merge pull request #33 from PlaneQuery/develop
Develop to Main: Handle ADSB when ADSB.lol has not released any data for day. Just rerelease latest adsb
2026-02-26 15:32:59 -05:00
ggman12 b54f33aa56 Handle ADSB when ADSB.lol has not released any data for day. Just rerelease latest adsb 2026-02-26 15:31:47 -05:00
JG 2dda3d341c Merge pull request #32 from PlaneQuery/develop
Develop to Main: Fix Community Submission export. Fix CSV concatenation logic to prevent duplicates when there is no new ADSB.lol data.
2026-02-24 15:37:54 -05:00
ggman12 b0526f0a95 Fix Community Submission export. Fix CSV concatenation logic to prevent duplicates when there is no new ADSB.lol data. 2026-02-24 15:36:10 -05:00
JG 4b6a043a9d Merge pull request #31 from PlaneQuery/develop
Develop to Main Fix adsb asset retrival to be more fault tolerant. Fix download issue
2026-02-24 02:17:08 -05:00
ggman12 55c464aad7 Fix adsb asset retrival to be more fault tolerant. Fix download issue for 2024-07-03 2026-02-24 02:12:55 -05:00
ggman12 aa509e8560 attempt to fix download issue for 2024-07-03 2026-02-19 17:51:49 -05:00
ggman12 82d11d8d24 try less strict tar extract for 2025-10-15 and other days that fail 2026-02-19 00:20:03 -05:00
ggman12 76a217ad14 src/contributions/approve_submission.py handle big json files 2026-02-18 23:18:19 -05:00
ggman12 ec2d1a1291 update download.sh 2026-02-18 23:18:19 -05:00
ggman12 97284c69a9 verify downlaod asssets 2026-02-18 23:18:19 -05:00
JG 892ffa78af Merge pull request #28 from PlaneQuery/community-submission-27
Community submission: ggman12_2026-02-18_5ddbb8bd.json
2026-02-18 17:18:49 -05:00
github-actions[bot] f77a91db2c Update schema with new tags: manufacturer_icao, manufacturer_name, model, type_code, serial_number, icao_aircraft_type, operator, operator_callsign, operator_icao, citation_0 2026-02-18 22:18:12 +00:00
github-actions[bot] b3bd654998 Add community submission from @ggman12 (closes #27) 2026-02-18 22:18:12 +00:00
ggman12 302be8b8dc update checker for arrays issue 2026-02-18 17:11:14 -05:00
ggman12 b61dc0f5e5 provide more error 2026-02-18 17:08:43 -05:00
ggman12 1ff17cc6a8 allow adsb to fail for when adsb.lol hasen't uploaded file yet. 2026-02-18 16:49:02 -05:00
ggman12 d216ea9329 Daily ADSB and Histoircal updates. Update readme.md 2026-02-18 16:34:06 -05:00
ggman12 4015a5fcf1 OpenAirframes 1.0 2026-02-13 11:37:31 -05:00
JG f9e04337ae Merge pull request #5 from PlaneQuery/develop
FIX: trigger for planequery-aircraft daily release workflow. Update contributions issue template.
2026-02-12 10:42:47 -05:00
ggman12 1348e1f3a0 Merge branch 'main' into develop 2026-02-12 10:41:26 -05:00
ggman12 b349c01d31 FIX: trigger for planequery-aircraft daily release workflow. Update contributions issue template. 2026-02-12 10:26:05 -05:00
JG a98175bc6c Merge pull request #3 from PlaneQuery/develop
Develop to main new historical adsb workflow. Community Submission updates.
2026-02-11 23:42:40 -05:00
ggman12 953a3647df remove process historical-faa github workflow 2026-02-11 23:41:42 -05:00
ggman12 e5c99b611c make a histoircla runner for adsb 2026-02-11 23:41:42 -05:00
ggman12 4e803dbb45 remove confirmations 2026-02-11 23:41:42 -05:00
JG 59c2aab5c7 Merge pull request #2 from PlaneQuery/develop
develop to main FEATURE: Add contributions framework. Fix and improve daily adsb release
2026-02-11 23:24:01 -05:00
ggman12 722bcdf791 FEATURE: Add contributions framework. Fix and improve daily adsb release using Github actions for map reduce. 2026-02-11 23:22:46 -05:00
ggman12 27da93801e FEATURE: add historical adsb aircraft data and update daily adsb aircraft data derivation.
add clickhouse_connect

use 32GB

update to no longer do df.copy()

Add planequery_adsb_read.ipynb

INCREASE: update Fargate task definition to 16 vCPU and 64 GB memory for improved performance on large datasets

update notebook

remove print(df)

Ensure empty strings are preserved in DataFrame columns

check if day has data for adsb

update notebook
2026-02-11 13:58:56 -05:00
JG b94bfdc575 Merge pull request #1 from PlaneQuery/import/af-klm-fleet
af-klm-fleet from iclems
2026-02-04 17:51:46 -05:00
ggman12 c90bdada76 delete air-france folder 2026-02-04 17:49:25 -05:00
49 changed files with 5361 additions and 461 deletions
@@ -0,0 +1,81 @@
name: Community submission (JSON)
description: Submit one or more community records (JSON) to be reviewed and approved.
title: "Community submission: "
labels:
- community
- submission
body:
- type: markdown
attributes:
value: |
Submit **one object** or an **array of objects** that matches the community submission [schema](https://github.com/PlaneQuery/OpenAirframes/blob/main/schemas/community_submission.v1.schema.json). Reuse existing tags from the schema when possible.
**Rules (enforced on review/automation):**
- Each object must include **at least one** of:
- `registration_number`
- `transponder_code_hex` (6 uppercase hex chars, e.g., `ABC123`)
- `openairframes_id`
- Your contributor name (entered below) will be applied to all objects.
- `contributor_uuid` is derived from your GitHub account automatically.
- `creation_timestamp` is created by the system (you may omit it).
**Optional date scoping:**
- `start_date` - When the tags become valid (ISO 8601: `YYYY-MM-DD`)
- `end_date` - When the tags stop being valid (ISO 8601: `YYYY-MM-DD`)
**Example: single object**
```json
{
"registration_number": "N12345",
"tags": {"owner": "John Doe", "photo": "https://example.com/photo.jpg"},
"start_date": "2025-01-01"
}
```
**Example: multiple objects (array)**
```json
[
{
"registration_number": "N12345",
"tags": {"internet": "starlink"},
"start_date": "2025-05-01"
},
{
"registration_number": "N12345",
"tags": {"owner": "John Doe"},
"start_date": "2025-01-01",
"end_date": "2025-07-20"
},
{
"transponder_code_hex": "ABC123",
"tags": {"internet": "viasat", "owner": "John Doe"}
}
]
```
- type: input
id: contributor_name
attributes:
label: Contributor Name
description: Your display name for attribution. Leave blank for no attribution. Max 150 characters.
placeholder: "e.g., JamesBerry.com or leave blank"
validations:
required: false
- type: textarea
id: submission_json
attributes:
label: Submission JSON
description: |
Paste JSON directly, OR drag-and-drop a .json file here.
Must be valid JSON. Do not include contributor_name or contributor_uuid.
placeholder: |
Paste JSON here, or drag-and-drop a .json file...
validations:
required: true
- type: textarea
id: notes
attributes:
label: Notes (optional)
validations:
required: false
@@ -0,0 +1,182 @@
name: Historical ADS-B Processing
on:
workflow_dispatch:
inputs:
date:
description: 'YYYY-MM-DD'
required: true
type: string
concat_with_latest_csv:
description: 'Also concatenate with latest CSV from GitHub releases'
required: false
type: boolean
default: false
workflow_call:
inputs:
date:
description: 'YYYY-MM-DD'
required: true
type: string
concat_with_latest_csv:
description: 'Also concatenate with latest CSV from GitHub releases'
required: false
type: boolean
default: false
jobs:
adsb-extract:
runs-on: ubuntu-24.04-arm
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download and split ADS-B data
env:
DATE: ${{ inputs.date }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
python -m src.adsb.download_and_list_icaos --date "$DATE"
ls -lah data/output/adsb_archives/"$DATE" || true
- name: Upload archive part 0
uses: actions/upload-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-0
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_0.tar.gz
retention-days: 1
compression-level: 0
if-no-files-found: error
- name: Upload archive part 1
uses: actions/upload-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-1
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_1.tar.gz
retention-days: 1
compression-level: 0
if-no-files-found: error
- name: Upload archive part 2
uses: actions/upload-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-2
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_2.tar.gz
retention-days: 1
compression-level: 0
if-no-files-found: error
- name: Upload archive part 3
uses: actions/upload-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-3
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_3.tar.gz
retention-days: 1
compression-level: 0
if-no-files-found: error
adsb-map:
needs: adsb-extract
runs-on: ubuntu-24.04-arm
strategy:
fail-fast: true
matrix:
part_id: [0, 1, 2, 3]
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download archive part
uses: actions/download-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-${{ matrix.part_id }}
path: data/output/adsb_archives/${{ inputs.date }}
- name: Verify archive
run: |
FILE="data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_${{ matrix.part_id }}.tar.gz"
ls -lah data/output/adsb_archives/${{ inputs.date }}/
if [ ! -f "$FILE" ]; then
echo "::error::Archive not found: $FILE"
exit 1
fi
echo "Verified: $(du -h "$FILE")"
- name: Process part
env:
DATE: ${{ inputs.date }}
run: |
python -m src.adsb.process_icao_chunk --part-id ${{ matrix.part_id }} --date "$DATE"
- name: Upload compressed outputs
uses: actions/upload-artifact@v4
with:
name: adsb-compressed-${{ inputs.date }}-part-${{ matrix.part_id }}
path: data/output/compressed/${{ inputs.date }}
retention-days: 1
compression-level: 0
if-no-files-found: error
adsb-reduce:
needs: adsb-map
runs-on: ubuntu-24.04-arm
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download compressed outputs
uses: actions/download-artifact@v4
with:
pattern: adsb-compressed-${{ inputs.date }}-part-*
path: data/output/compressed/${{ inputs.date }}
merge-multiple: true
- name: Concatenate final outputs
env:
DATE: ${{ inputs.date }}
CONCAT_WITH_LATEST_CSV: ${{ inputs.concat_with_latest_csv }}
run: |
EXTRA=""
if [ "$CONCAT_WITH_LATEST_CSV" = "true" ]; then
EXTRA="--concat_with_latest_csv"
fi
python -m src.adsb.concat_parquet_to_final --date "$DATE" $EXTRA
ls -lah data/output/ || true
- name: Upload final artifacts
uses: actions/upload-artifact@v4
with:
name: openairframes_adsb-${{ inputs.date }}
path: data/output/openairframes_adsb_*
retention-days: 30
if-no-files-found: error
@@ -0,0 +1,118 @@
name: adsb-to-aircraft-multiple-day-run
on:
workflow_dispatch:
inputs:
start_date:
description: 'YYYY-MM-DD (inclusive)'
required: true
type: string
end_date:
description: 'YYYY-MM-DD (exclusive)'
required: true
type: string
jobs:
generate-dates:
runs-on: ubuntu-24.04-arm
outputs:
dates: ${{ steps.generate.outputs.dates }}
steps:
- name: Generate date list
id: generate
env:
START_DATE: ${{ inputs.start_date }}
END_DATE: ${{ inputs.end_date }}
run: |
python - <<'PY'
import json
import os
from datetime import datetime, timedelta
start = datetime.strptime(os.environ["START_DATE"], "%Y-%m-%d")
end = datetime.strptime(os.environ["END_DATE"], "%Y-%m-%d")
if end <= start:
raise SystemExit("end_date must be after start_date")
dates = []
cur = start
while cur < end:
dates.append(cur.strftime("%Y-%m-%d"))
cur += timedelta(days=1)
with open(os.environ["GITHUB_OUTPUT"], "a") as f:
f.write(f"dates={json.dumps(dates)}\n")
PY
adsb-day:
needs: generate-dates
strategy:
fail-fast: true
matrix:
date: ${{ fromJson(needs.generate-dates.outputs.dates) }}
uses: ./.github/workflows/adsb-to-aircraft-for-day.yaml
with:
date: ${{ matrix.date }}
adsb-final:
needs: adsb-day
runs-on: ubuntu-24.04-arm
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download daily CSVs
uses: actions/download-artifact@v4
with:
pattern: openairframes_adsb-*
path: outputs/daily/
merge-multiple: true
- name: Concatenate all days to final CSV
env:
START_DATE: ${{ inputs.start_date }}
END_DATE: ${{ inputs.end_date }}
run: |
python - <<'PY'
import os
import re
from pathlib import Path
import polars as pl
start = os.environ["START_DATE"]
end = os.environ["END_DATE"]
daily_dir = Path("outputs/daily")
files = sorted(daily_dir.glob("openairframes_adsb_*.csv.gz"))
if not files:
raise SystemExit("No daily CSVs found")
def date_key(path: Path) -> str:
m = re.match(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", path.name)
return m.group(1) if m else path.name
files = sorted(files, key=date_key)
frames = [pl.read_csv(p) for p in files]
df = pl.concat(frames, how="vertical", rechunk=True)
output_path = Path("outputs") / f"openairframes_adsb_{start}_{end}.csv.gz"
df.write_csv(output_path, compression="gzip")
print(f"Wrote {output_path} with {df.height} rows")
PY
- name: Upload final CSV
uses: actions/upload-artifact@v4
with:
name: openairframes_adsb-${{ inputs.start_date }}-${{ inputs.end_date }}
path: outputs/openairframes_adsb_${{ inputs.start_date }}_${{ inputs.end_date }}.csv.gz
retention-days: 30
# gh workflow run adsb-to-aircraft-multiple-day-run.yaml --repo ggman12/OpenAirframes --ref jonah/fix-historical-proper -f start_date=2025-12-31 -f end_date=2026-01-02
@@ -0,0 +1,47 @@
name: Approve Community Submission
on:
issues:
types: [labeled]
permissions:
contents: write
pull-requests: write
issues: write
jobs:
approve:
if: github.event.label.name == 'approved' && contains(github.event.issue.labels.*.name, 'validated')
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install jsonschema
- name: Get issue author ID
id: author
uses: actions/github-script@v7
with:
script: |
const issue = context.payload.issue;
core.setOutput('username', issue.user.login);
core.setOutput('user_id', issue.user.id);
- name: Process and create PR
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_REPOSITORY: ${{ github.repository }}
ISSUE_BODY: ${{ github.event.issue.body }}
run: |
python -m src.contributions.approve_submission \
--issue-number ${{ github.event.issue.number }} \
--issue-body "$ISSUE_BODY" \
--author "${{ steps.author.outputs.username }}" \
--author-id ${{ steps.author.outputs.user_id }}
@@ -0,0 +1,430 @@
name: openairframes-daily-release
on:
schedule:
# 6:00pm UTC every day - runs on default branch, triggers both
- cron: "0 06 * * *"
workflow_dispatch:
inputs:
date:
description: 'Date to process (YYYY-MM-DD format, default: yesterday)'
required: false
type: string
permissions:
contents: write
actions: write
jobs:
trigger-releases:
runs-on: ubuntu-latest
if: github.event_name == 'schedule'
steps:
- name: Trigger main branch release
uses: actions/github-script@v7
with:
script: |
await github.rest.actions.createWorkflowDispatch({
owner: context.repo.owner,
repo: context.repo.repo,
workflow_id: 'openairframes-daily-release.yaml',
ref: 'main'
});
- name: Trigger develop branch release
uses: actions/github-script@v7
with:
script: |
await github.rest.actions.createWorkflowDispatch({
owner: context.repo.owner,
repo: context.repo.repo,
workflow_id: 'openairframes-daily-release.yaml',
ref: 'develop'
});
build-faa:
runs-on: ubuntu-24.04-arm
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run FAA release script
run: |
python src/create_daily_faa_release.py ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/faa_releasable
ls -lah data/openairframes
- name: Upload FAA artifacts
uses: actions/upload-artifact@v4
with:
name: faa-release
path: |
data/openairframes/openairframes_faa_*.csv
data/faa_releasable/ReleasableAircraft_*.zip
retention-days: 1
resolve-dates:
runs-on: ubuntu-latest
if: github.event_name != 'schedule'
outputs:
date: ${{ steps.out.outputs.date }}
adsb_date: ${{ steps.out.outputs.adsb_date }}
steps:
- id: out
run: |
if [ -n "${{ inputs.date }}" ]; then
echo "date=${{ inputs.date }}" >> "$GITHUB_OUTPUT"
echo "adsb_date=${{ inputs.date }}" >> "$GITHUB_OUTPUT"
else
echo "date=$(date -u -d 'yesterday' +%Y-%m-%d)" >> "$GITHUB_OUTPUT"
echo "adsb_date=$(date -u -d 'yesterday' +%Y-%m-%d)" >> "$GITHUB_OUTPUT"
fi
adsb-to-aircraft:
needs: resolve-dates
if: github.event_name != 'schedule'
uses: ./.github/workflows/adsb-to-aircraft-for-day.yaml
with:
date: ${{ needs.resolve-dates.outputs.adsb_date }}
concat_with_latest_csv: true
adsb-reduce:
needs: [resolve-dates, adsb-to-aircraft]
if: always() && github.event_name != 'schedule' && needs.adsb-to-aircraft.result == 'failure'
runs-on: ubuntu-24.04-arm
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download compressed outputs
uses: actions/download-artifact@v4
with:
pattern: adsb-compressed-${{ needs.resolve-dates.outputs.adsb_date }}-part-*
path: data/output/compressed/${{ needs.resolve-dates.outputs.adsb_date }}
merge-multiple: true
- name: Concatenate final outputs
env:
DATE: ${{ needs.resolve-dates.outputs.adsb_date }}
CONCAT_WITH_LATEST_CSV: true
run: |
EXTRA=""
if [ "$CONCAT_WITH_LATEST_CSV" = "true" ]; then
EXTRA="--concat_with_latest_csv"
fi
python -m src.adsb.concat_parquet_to_final --date "$DATE" $EXTRA
ls -lah data/output/ || true
- name: Upload final artifacts
uses: actions/upload-artifact@v4
with:
name: openairframes_adsb-${{ needs.resolve-dates.outputs.adsb_date }}
path: data/output/openairframes_adsb_*
retention-days: 30
if-no-files-found: error
build-community:
runs-on: ubuntu-latest
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pandas
- name: Run Community release script
run: |
python -m src.contributions.create_daily_community_release
ls -lah data/openairframes
- name: Upload Community artifacts
uses: actions/upload-artifact@v4
with:
name: community-release
path: data/openairframes/openairframes_community_*.csv
retention-days: 1
build-adsbexchange-json:
runs-on: ubuntu-latest
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Run ADS-B Exchange JSON release script
run: |
python -m src.contributions.create_daily_adsbexchange_release ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/openairframes
- name: Upload ADS-B Exchange JSON artifact
uses: actions/upload-artifact@v4
with:
name: adsbexchange-json
path: data/openairframes/basic-ac-db_*.json.gz
retention-days: 1
build-mictronics-db:
runs-on: ubuntu-latest
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Run Mictronics DB release script
continue-on-error: true
run: |
python -m src.contributions.create_daily_microtonics_release ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/openairframes
- name: Upload Mictronics DB artifact
uses: actions/upload-artifact@v4
with:
name: mictronics-db
path: data/openairframes/mictronics-db_*.zip
retention-days: 1
if-no-files-found: ignore
create-release:
runs-on: ubuntu-latest
needs: [resolve-dates, build-faa, adsb-to-aircraft, adsb-reduce, build-community, build-adsbexchange-json, build-mictronics-db]
if: github.event_name != 'schedule' && !cancelled()
steps:
- name: Check ADS-B workflow status
if: needs.adsb-to-aircraft.result != 'success' && needs.adsb-reduce.result != 'success'
run: |
echo "WARNING: ADS-B workflow failed (adsb-to-aircraft='${{ needs.adsb-to-aircraft.result }}', adsb-reduce='${{ needs.adsb-reduce.result }}'), will continue without ADS-B artifacts"
- name: Checkout for gh CLI
uses: actions/checkout@v4
with:
sparse-checkout: |
.github
sparse-checkout-cone-mode: false
- name: Download FAA artifacts
uses: actions/download-artifact@v5
with:
name: faa-release
path: artifacts/faa
- name: Download ADS-B artifacts
uses: actions/download-artifact@v5
if: needs.adsb-to-aircraft.result == 'success' || needs.adsb-reduce.result == 'success'
continue-on-error: true
with:
name: openairframes_adsb-${{ needs.resolve-dates.outputs.adsb_date }}
path: artifacts/adsb
- name: Download Community artifacts
uses: actions/download-artifact@v5
with:
name: community-release
path: artifacts/community
- name: Download ADS-B Exchange JSON artifact
uses: actions/download-artifact@v5
with:
name: adsbexchange-json
path: artifacts/adsbexchange
- name: Download Mictronics DB artifact
uses: actions/download-artifact@v5
continue-on-error: true
with:
name: mictronics-db
path: artifacts/mictronics
- name: Debug artifact structure
run: |
echo "=== Full artifacts tree ==="
find artifacts -type f 2>/dev/null || echo "No files found in artifacts"
echo "=== FAA artifacts ==="
find artifacts/faa -type f 2>/dev/null || echo "No files found in artifacts/faa"
echo "=== ADS-B artifacts ==="
find artifacts/adsb -type f 2>/dev/null || echo "No files found in artifacts/adsb"
echo "=== Community artifacts ==="
find artifacts/community -type f 2>/dev/null || echo "No files found in artifacts/community"
echo "=== ADS-B Exchange JSON artifacts ==="
find artifacts/adsbexchange -type f 2>/dev/null || echo "No files found in artifacts/adsbexchange"
echo "=== Mictronics DB artifacts ==="
find artifacts/mictronics -type f 2>/dev/null || echo "No files found in artifacts/mictronics"
- name: Prepare release metadata
id: meta
run: |
DATE=$(date -u +"%Y-%m-%d")
BRANCH_NAME="${GITHUB_REF#refs/heads/}"
BRANCH_SUFFIX=""
if [ "$BRANCH_NAME" = "main" ]; then
BRANCH_SUFFIX="-main"
elif [ "$BRANCH_NAME" = "develop" ]; then
BRANCH_SUFFIX="-develop"
fi
TAG="openairframes-${DATE}${BRANCH_SUFFIX}"
# Find files from artifacts using find (handles nested structures)
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
# Prefer concatenated file (with date range) over single-day file
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*_*.csv.gz" -type f 2>/dev/null | head -1)
if [ -z "$CSV_FILE_ADSB" ]; then
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1)
fi
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1)
ZIP_FILE_MICTRONICS=$(find artifacts/mictronics -name "mictronics-db_*.zip" -type f 2>/dev/null | head -1)
# Validate required files exist
MISSING_FILES=""
if [ -z "$CSV_FILE_FAA" ] || [ ! -f "$CSV_FILE_FAA" ]; then
MISSING_FILES="$MISSING_FILES FAA_CSV"
fi
if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then
MISSING_FILES="$MISSING_FILES FAA_ZIP"
fi
if [ -z "$JSON_FILE_ADSBX" ] || [ ! -f "$JSON_FILE_ADSBX" ]; then
MISSING_FILES="$MISSING_FILES ADSBX_JSON"
fi
# Optional files - warn but don't fail
OPTIONAL_MISSING=""
if [ -z "$CSV_FILE_ADSB" ] || [ ! -f "$CSV_FILE_ADSB" ]; then
OPTIONAL_MISSING="$OPTIONAL_MISSING ADSB_CSV"
CSV_FILE_ADSB=""
CSV_BASENAME_ADSB=""
fi
if [ -z "$ZIP_FILE_MICTRONICS" ] || [ ! -f "$ZIP_FILE_MICTRONICS" ]; then
OPTIONAL_MISSING="$OPTIONAL_MISSING MICTRONICS_ZIP"
ZIP_FILE_MICTRONICS=""
fi
if [ -n "$MISSING_FILES" ]; then
echo "ERROR: Missing required release files:$MISSING_FILES"
echo "FAA CSV: $CSV_FILE_FAA"
echo "ADSB CSV: $CSV_FILE_ADSB"
echo "ZIP: $ZIP_FILE"
echo "ADSBX JSON: $JSON_FILE_ADSBX"
echo "MICTRONICS ZIP: $ZIP_FILE_MICTRONICS"
exit 1
fi
# Get basenames for display
CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA")
if [ -n "$CSV_FILE_ADSB" ]; then
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
fi
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
ZIP_BASENAME=$(basename "$ZIP_FILE")
JSON_BASENAME_ADSBX=$(basename "$JSON_FILE_ADSBX")
ZIP_BASENAME_MICTRONICS=""
if [ -n "$ZIP_FILE_MICTRONICS" ]; then
ZIP_BASENAME_MICTRONICS=$(basename "$ZIP_FILE_MICTRONICS")
fi
if [ -n "$OPTIONAL_MISSING" ]; then
echo "WARNING: Optional files missing:$OPTIONAL_MISSING (will continue without them)"
fi
echo "date=$DATE" >> "$GITHUB_OUTPUT"
echo "tag=$TAG" >> "$GITHUB_OUTPUT"
echo "csv_file_faa=$CSV_FILE_FAA" >> "$GITHUB_OUTPUT"
echo "csv_basename_faa=$CSV_BASENAME_FAA" >> "$GITHUB_OUTPUT"
echo "csv_file_adsb=$CSV_FILE_ADSB" >> "$GITHUB_OUTPUT"
echo "csv_basename_adsb=$CSV_BASENAME_ADSB" >> "$GITHUB_OUTPUT"
echo "csv_file_community=$CSV_FILE_COMMUNITY" >> "$GITHUB_OUTPUT"
echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT"
echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT"
echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT"
echo "json_file_adsbx=$JSON_FILE_ADSBX" >> "$GITHUB_OUTPUT"
echo "json_basename_adsbx=$JSON_BASENAME_ADSBX" >> "$GITHUB_OUTPUT"
echo "zip_file_mictronics=$ZIP_FILE_MICTRONICS" >> "$GITHUB_OUTPUT"
echo "zip_basename_mictronics=$ZIP_BASENAME_MICTRONICS" >> "$GITHUB_OUTPUT"
echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
echo "Found files:"
echo " FAA CSV: $CSV_FILE_FAA"
echo " ADSB CSV: $CSV_FILE_ADSB"
echo " Community CSV: $CSV_FILE_COMMUNITY"
echo " ZIP: $ZIP_FILE"
echo " ADSBX JSON: $JSON_FILE_ADSBX"
echo " MICTRONICS ZIP: $ZIP_FILE_MICTRONICS"
- name: Delete existing release if exists
run: |
echo "Attempting to delete release: ${{ steps.meta.outputs.tag }}"
gh release delete "${{ steps.meta.outputs.tag }}" --yes --cleanup-tag || echo "No existing release to delete"
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Create GitHub Release and upload assets
uses: softprops/action-gh-release@v2
with:
tag_name: ${{ steps.meta.outputs.tag }}
name: ${{ steps.meta.outputs.name }}
fail_on_unmatched_files: false
body: |
Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}.
Assets:
- ${{ steps.meta.outputs.csv_basename_faa }}
${{ steps.meta.outputs.csv_basename_adsb && format('- {0}', steps.meta.outputs.csv_basename_adsb) || '' }}
- ${{ steps.meta.outputs.csv_basename_community }}
- ${{ steps.meta.outputs.zip_basename }}
- ${{ steps.meta.outputs.json_basename_adsbx }}
${{ steps.meta.outputs.zip_basename_mictronics && format('- {0}', steps.meta.outputs.zip_basename_mictronics) || '' }}
files: |
${{ steps.meta.outputs.csv_file_faa }}
${{ steps.meta.outputs.csv_file_adsb }}
${{ steps.meta.outputs.csv_file_community }}
${{ steps.meta.outputs.zip_file }}
${{ steps.meta.outputs.json_file_adsbx }}
${{ steps.meta.outputs.zip_file_mictronics }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@@ -1,67 +0,0 @@
name: planequery-aircraft Daily Release
on:
schedule:
# 6:00pm UTC every day
- cron: "0 06 * * *"
workflow_dispatch: {}
permissions:
contents: write
jobs:
build-and-release:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run daily release script
run: |
python src/create_daily_planequery_aircraft_release.py
ls -lah data/faa_releasable
ls -lah data/planequery_aircraft
- name: Prepare release metadata
id: meta
run: |
DATE=$(date -u +"%Y-%m-%d")
TAG="planequery-aircraft-${DATE}"
# Find the CSV file in data/planequery_aircraft matching the pattern
CSV_FILE=$(ls data/planequery_aircraft/planequery_aircraft_*_${DATE}.csv | head -1)
CSV_BASENAME=$(basename "$CSV_FILE")
echo "date=$DATE" >> "$GITHUB_OUTPUT"
echo "tag=$TAG" >> "$GITHUB_OUTPUT"
echo "csv_file=$CSV_FILE" >> "$GITHUB_OUTPUT"
echo "csv_basename=$CSV_BASENAME" >> "$GITHUB_OUTPUT"
echo "name=planequery-aircraft snapshot ($DATE)" >> "$GITHUB_OUTPUT"
- name: Create GitHub Release and upload assets
uses: softprops/action-gh-release@v2
with:
tag_name: ${{ steps.meta.outputs.tag }}
name: ${{ steps.meta.outputs.name }}
body: |
Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}.
Assets:
- ${{ steps.meta.outputs.csv_basename }}
- ReleasableAircraft_${{ steps.meta.outputs.date }}.zip
files: |
${{ steps.meta.outputs.csv_file }}
data/faa_releasable/ReleasableAircraft_${{ steps.meta.outputs.date }}.zip
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@@ -166,6 +166,6 @@ jobs:
Combined historical FAA aircraft data (all chunks concatenated)
Processing period: 2023-08-16 to 2026-01-01
Generated: ${{ github.event.repository.updated_at }}
files: data/planequery_aircraft/*.csv
files: data/openairframes/*.csv
draft: false
prerelease: false
+100
View File
@@ -0,0 +1,100 @@
name: Update Community PRs After Merge
on:
push:
branches: [main]
paths:
- 'community/**'
- 'schemas/community_submission.v1.schema.json'
permissions:
contents: write
pull-requests: write
jobs:
update-open-prs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install jsonschema
- name: Find and update open community PRs
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
# Get list of open community PRs
prs=$(gh pr list --label community --state open --json number,headRefName --jq '.[] | "\(.number) \(.headRefName)"')
if [ -z "$prs" ]; then
echo "No open community PRs found"
exit 0
fi
echo "$prs" | while read pr_number branch_name; do
echo "Processing PR #$pr_number (branch: $branch_name)"
# Checkout PR branch
git fetch origin "$branch_name"
git checkout "$branch_name"
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
# Get the community submission file(s) and schema from this branch
community_files=$(git diff --name-only origin/main...HEAD -- 'community/' 'schemas/')
if [ -z "$community_files" ]; then
echo " No community/schema files found in PR #$pr_number, skipping"
git checkout main
continue
fi
echo " Files to preserve: $community_files"
# Save the community files content
mkdir -p /tmp/pr_files
for file in $community_files; do
if [ -f "$file" ]; then
mkdir -p "/tmp/pr_files/$(dirname "$file")"
cp "$file" "/tmp/pr_files/$file"
fi
done
# Reset branch to main (clean slate)
git reset --hard origin/main
# Restore the community files
for file in $community_files; do
if [ -f "/tmp/pr_files/$file" ]; then
mkdir -p "$(dirname "$file")"
cp "/tmp/pr_files/$file" "$file"
fi
done
rm -rf /tmp/pr_files
# Regenerate schema with current main + this submission's tags
python -m src.contributions.regenerate_pr_schema || true
# Stage and commit all changes
git add community/ schemas/
if ! git diff --cached --quiet; then
git commit -m "Community submission (rebased on main)"
git push --force origin "$branch_name"
echo " Rebased PR #$pr_number onto main"
else
echo " No changes needed for PR #$pr_number"
fi
git checkout main
done
@@ -0,0 +1,46 @@
name: Validate Community Submission
on:
issues:
types: [opened, edited]
permissions:
issues: write
jobs:
validate:
if: contains(github.event.issue.labels.*.name, 'submission')
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install jsonschema
- name: Debug issue body
run: |
echo "=== Issue Body ==="
cat << 'ISSUE_BODY_EOF'
${{ github.event.issue.body }}
ISSUE_BODY_EOF
- name: Save issue body to file
run: |
cat << 'ISSUE_BODY_EOF' > /tmp/issue_body.txt
${{ github.event.issue.body }}
ISSUE_BODY_EOF
- name: Validate submission
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_REPOSITORY: ${{ github.repository }}
run: |
python -m src.contributions.validate_submission \
--issue-body-file /tmp/issue_body.txt \
--issue-number ${{ github.event.issue.number }}
+67 -1
View File
@@ -218,4 +218,70 @@ __marimo__/
# Custom
data/
.DS_Store
notebooks/
# --- CDK ---
# VSCode extension
# Store launch config in repo but not settings
.vscode/settings.json
/.favorites.json
# TypeScript incremental build states
*.tsbuildinfo
# Local state files & OS specifics
.DS_Store
node_modules/
lerna-debug.log
dist/
pack/
.BUILD_COMPLETED
.local-npm/
.tools/
coverage/
.nyc_output
.nycrc
.LAST_BUILD
*.sw[a-z]
*~
.idea
*.iml
junit.xml
# We don't want tsconfig at the root
/tsconfig.json
# CDK Context & Staging files
cdk.context.json
.cdk.staging/
cdk.out/
*.tabl.json
cdk-integ.out.*/
# Yarn error log
yarn-error.log
# VSCode history plugin
.vscode/.history/
# Cloud9
.c9
.nzm-*
/.versionrc.json
RELEASE_NOTES.md
# Produced by integ tests
read*lock
# VSCode jest plugin
.test-output
# Nx cache
.nx/
# jsii-rosetta files
type-fingerprints.txt
notebooks/whatever.ipynb
.snapshots/
+1 -1
View File
@@ -1,6 +1,6 @@
MIT License
Copyright (c) 2026 PlaneQuery
Copyright (c) 2026 OpenAirframes
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
+58 -1
View File
@@ -1 +1,58 @@
Downloads [`https://registry.faa.gov/database/ReleasableAircraft.zip`](https://registry.faa.gov/database/ReleasableAircraft.zip). Creates a daily GitHub Release at 06:00 UTC containing the unaltered `ReleasableAircraft.zip` and a derived CSV file with all data from FAA database since 2023-08-16. The FAA database updates daily at 05:30 UTC.
# OpenAirframes.org
OpenAirframes.org is an open-source, community-driven airframes database.
The data includes:
- Registration information from Civil Aviation Authorities (FAA)
- Airline data (e.g., Air France)
- Community contributions such as ownership details, military aircraft info, photos, and more
---
## For Users
A daily release is created at **06:00 UTC** and includes:
- **openairframes_community.csv**
All community submissions
- **openairframes_adsb.csv**
Airframes dataset derived from ADSB.lol network data. For each UTC day, a row is created for every icao observed in that days ADS-B messages, using registration data from [tar1090-db](https://github.com/wiedehopf/tar1090-db) (ADSBExchange & Mictronics).
Example Usage:
```python
import pandas as pd
url = "https://github.com/PlaneQuery/OpenAirframes/releases/download/openairframes-2026-03-18-main/openairframes_adsb_2024-01-01_2026-03-17.csv.gz" # 1GB
df = pd.read_csv(url)
df
```
![](docs/images/df_adsb_example_0.png)
- **openairframes_faa.csv**
All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB)
- **ReleasableAircraft_{date}.zip**
A daily snapshot of the FAA database, which updates at **05:30 UTC**
---
## For Contributors
Submit data via a [GitHub Issue](https://github.com/PlaneQuery/OpenAirframes/issues/new?template=community_submission.yaml) with your preferred attribution. Once approved, it will appear in the daily release. A leaderboard will be available in the future.
All data is valuable. Examples include:
- Celebrity ownership (with citations)
- Photos
- Internet capability
- Military aircraft information
- Unique facts (e.g., an airframe that crashed, performs aerobatics, etc.)
Please try to follow the submission formatting guidelines. If you are struggling with them, that is fine—submit your data anyway and it will be formatted for you.
---
## For Developers
All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include:
- Web UI for data
- Web UI for contributors
- Additional export formats in the daily release
- Data fusion from multiple sources in the daily release
- Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs
View File
@@ -0,0 +1,40 @@
[
{
"contributor_name": "JohnSmith.com",
"contributor_uuid": "2981c3ee-8712-5f96-84bf-732eda515a3f",
"creation_timestamp": "2026-02-18T22:18:11.349009+00:00",
"registration_number": "ZM146",
"tags": {
"citation_0": "https://assets.publishing.service.gov.uk/media/5c07a65f40f0b6705f11cf37/10389.pdf",
"icao_aircraft_type": "L1J",
"manufacturer_icao": "LOCKHEED MARTIN",
"manufacturer_name": "Lockheed-martin",
"model": "F-35B Lightning II",
"operator": "Royal Air Force",
"operator_callsign": "RAFAIR",
"operator_icao": "RFR",
"serial_number": "BK-12",
"type_code": "VF35"
},
"transponder_code_hex": "43C81C"
},
{
"contributor_name": "JohnSmith.com",
"contributor_uuid": "2981c3ee-8712-5f96-84bf-732eda515a3f",
"creation_timestamp": "2026-02-18T22:18:11.349009+00:00",
"registration_number": "ZM148",
"tags": {
"citation_0": "https://assets.publishing.service.gov.uk/media/5c07a65f40f0b6705f11cf37/10389.pdf",
"icao_aircraft_type": "L1J",
"manufacturer_icao": "LOCKHEED MARTIN",
"manufacturer_name": "Lockheed-martin",
"model": "F-35B Lightning II",
"operator": "Royal Air Force",
"operator_callsign": "RAFAIR",
"operator_icao": "RFR",
"serial_number": "BK-14",
"type_code": "VF35"
},
"transponder_code_hex": "43C811"
}
]
Binary file not shown.

After

Width:  |  Height:  |  Size: 99 KiB

+4 -1
View File
@@ -1,3 +1,6 @@
faa-aircraft-registry==0.1.0
pandas==3.0.0
pyarrow==23.0.0
orjson==3.11.7
polars==1.38.1
jsonschema==4.26.0
+144
View File
@@ -0,0 +1,144 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "OpenAirframes Community Submission (v1)",
"type": "object",
"additionalProperties": false,
"properties": {
"registration_number": {
"type": "string",
"minLength": 1
},
"transponder_code_hex": {
"type": "string",
"pattern": "^[0-9A-F]{6}$"
},
"openairframes_id": {
"type": "string",
"minLength": 1
},
"contributor_uuid": {
"type": "string",
"format": "uuid"
},
"contributor_name": {
"type": "string",
"minLength": 0,
"maxLength": 150,
"description": "Display name (may be blank)"
},
"creation_timestamp": {
"type": "string",
"format": "date-time",
"description": "Set by the system when the submission is persisted/approved.",
"readOnly": true
},
"start_date": {
"type": "string",
"format": "date",
"pattern": "^\\d{4}-\\d{2}-\\d{2}$",
"description": "Optional start date for when this submission's tags are valid (ISO 8601, e.g., 2025-05-01)."
},
"end_date": {
"type": "string",
"format": "date",
"pattern": "^\\d{4}-\\d{2}-\\d{2}$",
"description": "Optional end date for when this submission's tags are valid (ISO 8601, e.g., 2025-07-03)."
},
"tags": {
"type": "object",
"description": "Additional community-defined tags as key/value pairs (values may be scalar, array, or object).",
"propertyNames": {
"type": "string",
"pattern": "^[a-z][a-z0-9_]{0,63}$"
},
"additionalProperties": {
"$ref": "#/$defs/tagValue"
},
"properties": {
"citation_0": {
"type": "string"
},
"icao_aircraft_type": {
"type": "string"
},
"manufacturer_icao": {
"type": "string"
},
"manufacturer_name": {
"type": "string"
},
"model": {
"type": "string"
},
"operator": {
"type": "string"
},
"operator_callsign": {
"type": "string"
},
"operator_icao": {
"type": "string"
},
"serial_number": {
"type": "string"
},
"type_code": {
"type": "string"
}
}
}
},
"allOf": [
{
"anyOf": [
{
"required": [
"registration_number"
]
},
{
"required": [
"transponder_code_hex"
]
},
{
"required": [
"openairframes_id"
]
}
]
}
],
"$defs": {
"tagScalar": {
"type": [
"string",
"number",
"integer",
"boolean",
"null"
]
},
"tagValue": {
"anyOf": [
{
"$ref": "#/$defs/tagScalar"
},
{
"type": "array",
"maxItems": 50,
"items": {
"$ref": "#/$defs/tagScalar"
}
},
{
"type": "object",
"maxProperties": 50,
"additionalProperties": {
"$ref": "#/$defs/tagScalar"
}
}
]
}
}
}
+49
View File
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import re
from pathlib import Path
import polars as pl
# Find all CSV.gz files in the downloaded artifacts
artifacts_dir = Path("downloads/adsb_artifacts")
files = sorted(artifacts_dir.glob("*/openairframes_adsb_*.csv.gz"))
if not files:
raise SystemExit("No CSV.gz files found in downloads/adsb_artifacts/")
print(f"Found {len(files)} files to concatenate")
# Extract dates from filenames to determine range
def extract_dates(path: Path) -> tuple[str, str]:
"""Extract start and end dates from filename"""
m = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv\.gz", path.name)
if m:
return m.group(1), m.group(2)
return None, None
# Collect all dates
all_dates = []
for f in files:
start, end = extract_dates(f)
if start and end:
all_dates.extend([start, end])
print(f" {f.name}: {start} to {end}")
if not all_dates:
raise SystemExit("Could not extract dates from filenames")
# Find earliest and latest dates
earliest = min(all_dates)
latest = max(all_dates)
print(f"\nDate range: {earliest} to {latest}")
# Read and concatenate all files
print("\nReading and concatenating files...")
frames = [pl.read_csv(f) for f in files]
df = pl.concat(frames, how="vertical", rechunk=True)
# Write output
output_path = Path("downloads") / f"openairframes_adsb_{earliest}_{latest}.csv.gz"
output_path.parent.mkdir(parents=True, exist_ok=True)
df.write_csv(output_path, compression="gzip")
print(f"\nWrote {output_path} with {df.height:,} rows")
+40
View File
@@ -0,0 +1,40 @@
#!/bin/bash
# Create download directory
mkdir -p downloads/adsb_artifacts
# Repository from the workflow comment
REPO="ggman12/OpenAirframes"
# Get last 15 runs of the workflow and download matching artifacts
gh run list \
--repo "$REPO" \
--workflow adsb-to-aircraft-multiple-day-run.yaml \
--limit 15 \
--json databaseId \
--jq '.[].databaseId' | while read -r run_id; do
echo "Checking run ID: $run_id"
# List artifacts for this run using the API
# Match pattern: openairframes_adsb-YYYY-MM-DD-YYYY-MM-DD (with second date)
gh api \
--paginate \
"repos/$REPO/actions/runs/$run_id/artifacts" \
--jq '.artifacts[] | select(.name | test("^openairframes_adsb-[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{4}-[0-9]{2}-[0-9]{2}$")) | .name' | while read -r artifact_name; do
# Check if artifact directory already exists and has files
if [ -d "downloads/adsb_artifacts/$artifact_name" ] && [ -n "$(ls -A "downloads/adsb_artifacts/$artifact_name" 2>/dev/null)" ]; then
echo " Skipping (already exists): $artifact_name"
continue
fi
echo " Downloading: $artifact_name"
gh run download "$run_id" \
--repo "$REPO" \
--name "$artifact_name" \
--dir "downloads/adsb_artifacts/$artifact_name"
done
done
echo "Download complete! Files saved to downloads/adsb_artifacts/"
+182
View File
@@ -0,0 +1,182 @@
#!/usr/bin/env python3
"""
Download and concatenate artifacts from a specific set of workflow runs.
Usage:
python scripts/download_and_concat_runs.py triggered_runs_20260216_123456.json
"""
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
def download_run_artifact(run_id, output_dir):
"""Download artifact from a specific workflow run."""
print(f" Downloading artifacts from run {run_id}...")
cmd = [
'gh', 'run', 'download', str(run_id),
'--pattern', 'openairframes_adsb-*',
'--dir', output_dir
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f" ✓ Downloaded")
return True
else:
if "no artifacts" in result.stderr.lower():
print(f" ⚠ No artifacts found (workflow may still be running)")
else:
print(f" ✗ Failed: {result.stderr}")
return False
def find_csv_files(download_dir):
"""Find all CSV.gz files in the download directory."""
csv_files = []
for root, dirs, files in os.walk(download_dir):
for file in files:
if file.endswith('.csv.gz'):
csv_files.append(os.path.join(root, file))
return sorted(csv_files)
def concatenate_csv_files(csv_files, output_file):
"""Concatenate CSV files in order, preserving headers."""
import gzip
print(f"\nConcatenating {len(csv_files)} CSV files...")
with gzip.open(output_file, 'wt') as outf:
header_written = False
for i, csv_file in enumerate(csv_files, 1):
print(f" [{i}/{len(csv_files)}] Processing {os.path.basename(csv_file)}")
with gzip.open(csv_file, 'rt') as inf:
lines = inf.readlines()
if not header_written:
# Write header from first file
outf.writelines(lines)
header_written = True
else:
# Skip header for subsequent files
outf.writelines(lines[1:])
print(f"\n✓ Concatenated CSV saved to: {output_file}")
# Show file size
size_mb = os.path.getsize(output_file) / (1024 * 1024)
print(f" Size: {size_mb:.1f} MB")
def main():
parser = argparse.ArgumentParser(
description='Download and concatenate artifacts from workflow runs'
)
parser.add_argument(
'runs_file',
help='JSON file containing run IDs (from run_historical_adsb_action.py)'
)
parser.add_argument(
'--output-dir',
default='./downloads/historical_concat',
help='Directory for downloads (default: ./downloads/historical_concat)'
)
parser.add_argument(
'--wait',
action='store_true',
help='Wait for workflows to complete before downloading'
)
args = parser.parse_args()
# Load run IDs
if not os.path.exists(args.runs_file):
print(f"Error: File not found: {args.runs_file}")
sys.exit(1)
with open(args.runs_file, 'r') as f:
data = json.load(f)
runs = data['runs']
start_date = data['start_date']
end_date = data['end_date']
print("=" * 60)
print("Download and Concatenate Historical Artifacts")
print("=" * 60)
print(f"Date range: {start_date} to {end_date}")
print(f"Workflow runs: {len(runs)}")
print(f"Output directory: {args.output_dir}")
print("=" * 60)
# Create output directory
os.makedirs(args.output_dir, exist_ok=True)
# Wait for workflows to complete if requested
if args.wait:
print("\nWaiting for workflows to complete...")
for run_info in runs:
run_id = run_info['run_id']
print(f" Checking run {run_id}...")
cmd = ['gh', 'run', 'watch', str(run_id)]
subprocess.run(cmd)
# Download artifacts
print("\nDownloading artifacts...")
successful_downloads = 0
for i, run_info in enumerate(runs, 1):
run_id = run_info['run_id']
print(f"\n[{i}/{len(runs)}] Run {run_id} ({run_info['start']} to {run_info['end']})")
if download_run_artifact(run_id, args.output_dir):
successful_downloads += 1
print(f"\n\nDownload Summary: {successful_downloads}/{len(runs)} artifacts downloaded")
if successful_downloads == 0:
print("\nNo artifacts downloaded. Workflows may still be running.")
print("Use --wait to wait for completion, or try again later.")
sys.exit(1)
# Find all CSV files
csv_files = find_csv_files(args.output_dir)
if not csv_files:
print("\nError: No CSV files found in download directory")
sys.exit(1)
print(f"\nFound {len(csv_files)} CSV file(s):")
for csv_file in csv_files:
print(f" - {os.path.basename(csv_file)}")
# Concatenate
# Calculate actual end date for filename (end_date - 1 day since it's exclusive)
from datetime import datetime, timedelta
end_dt = datetime.strptime(end_date, '%Y-%m-%d') - timedelta(days=1)
actual_end = end_dt.strftime('%Y-%m-%d')
output_file = os.path.join(
args.output_dir,
f"openairframes_adsb_{start_date}_{actual_end}.csv.gz"
)
concatenate_csv_files(csv_files, output_file)
print("\n" + "=" * 60)
print("Done!")
print("=" * 60)
if __name__ == '__main__':
main()
+215
View File
@@ -0,0 +1,215 @@
#!/usr/bin/env python3
"""
Script to trigger adsb-to-aircraft-multiple-day-run workflow runs in monthly chunks.
Usage:
python scripts/run_historical_adsb_action.py --start-date 2025-01-01 --end-date 2025-06-01
"""
import argparse
import subprocess
import sys
from datetime import datetime, timedelta
from calendar import monthrange
def generate_monthly_chunks(start_date_str, end_date_str):
"""Generate date ranges in monthly chunks from start to end date.
End dates are exclusive (e.g., to process Jan 1-31, end_date should be Feb 1).
"""
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
chunks = []
current = start_date
while current < end_date:
# Get the first day of the next month (exclusive end)
_, days_in_month = monthrange(current.year, current.month)
month_end = current.replace(day=days_in_month)
next_month_start = month_end + timedelta(days=1)
# Don't go past the global end date
chunk_end = min(next_month_start, end_date)
chunks.append({
'start': current.strftime('%Y-%m-%d'),
'end': chunk_end.strftime('%Y-%m-%d')
})
# Move to first day of next month
if next_month_start >= end_date:
break
current = next_month_start
return chunks
def trigger_workflow(start_date, end_date, repo='ggman12/OpenAirframes', branch='main', dry_run=False):
"""Trigger the adsb-to-aircraft-multiple-day-run workflow via GitHub CLI."""
cmd = [
'gh', 'workflow', 'run', 'adsb-to-aircraft-multiple-day-run.yaml',
'--repo', repo,
'--ref', branch,
'-f', f'start_date={start_date}',
'-f', f'end_date={end_date}'
]
if dry_run:
print(f"[DRY RUN] Would run: {' '.join(cmd)}")
return True, None
print(f"Triggering workflow: {start_date} to {end_date} (on {branch})")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"✓ Successfully triggered workflow for {start_date} to {end_date}")
# Get the run ID of the workflow we just triggered
# Wait a moment for it to appear
import time
time.sleep(2)
# Get the most recent run (should be the one we just triggered)
list_cmd = [
'gh', 'run', 'list',
'--repo', repo,
'--workflow', 'adsb-to-aircraft-multiple-day-run.yaml',
'--branch', branch,
'--limit', '1',
'--json', 'databaseId',
'--jq', '.[0].databaseId'
]
list_result = subprocess.run(list_cmd, capture_output=True, text=True)
run_id = list_result.stdout.strip() if list_result.returncode == 0 else None
return True, run_id
else:
print(f"✗ Failed to trigger workflow for {start_date} to {end_date}")
print(f"Error: {result.stderr}")
return False, None
def main():
parser = argparse.ArgumentParser(
description='Trigger adsb-to-aircraft-multiple-day-run workflow runs in monthly chunks'
)
parser.add_argument(
'--start-date', '--start_date',
dest='start_date',
required=True,
help='Start date in YYYY-MM-DD format (inclusive)'
)
parser.add_argument(
'--end-date', '--end_date',
dest='end_date',
required=True,
help='End date in YYYY-MM-DD format (exclusive)'
)
parser.add_argument(
'--repo',
type=str,
default='ggman12/OpenAirframes',
help='GitHub repository (default: ggman12/OpenAirframes)'
)
parser.add_argument(
'--branch',
type=str,
default='main',
help='Branch to run the workflow on (default: main)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Print commands without executing them'
)
parser.add_argument(
'--delay',
type=int,
default=5,
help='Delay in seconds between workflow triggers (default: 5)'
)
args = parser.parse_args()
# Validate dates
try:
start = datetime.strptime(args.start_date, '%Y-%m-%d')
end = datetime.strptime(args.end_date, '%Y-%m-%d')
if start > end:
print("Error: start_date must be before or equal to end_date")
sys.exit(1)
except ValueError as e:
print(f"Error: Invalid date format - {e}")
sys.exit(1)
# Generate monthly chunks
chunks = generate_monthly_chunks(args.start_date, args.end_date)
print(f"\nGenerating {len(chunks)} monthly workflow runs on branch '{args.branch}' (repo: {args.repo}):")
for i, chunk in enumerate(chunks, 1):
print(f" {i}. {chunk['start']} to {chunk['end']}")
if not args.dry_run:
response = input(f"\nProceed with triggering {len(chunks)} workflows on '{args.branch}'? [y/N]: ")
if response.lower() != 'y':
print("Cancelled.")
sys.exit(0)
print()
# Trigger workflows
import time
success_count = 0
triggered_runs = []
for i, chunk in enumerate(chunks, 1):
print(f"\n[{i}/{len(chunks)}] ", end='')
success, run_id = trigger_workflow(
chunk['start'],
chunk['end'],
repo=args.repo,
branch=args.branch,
dry_run=args.dry_run
)
if success:
success_count += 1
if run_id:
triggered_runs.append({
'run_id': run_id,
'start': chunk['start'],
'end': chunk['end']
})
# Add delay between triggers (except for last one)
if i < len(chunks) and not args.dry_run:
time.sleep(args.delay)
print(f"\n\nSummary: {success_count}/{len(chunks)} workflows triggered successfully")
# Save triggered run IDs to a file
if triggered_runs and not args.dry_run:
import json
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
runs_file = f"./output/triggered_runs_{timestamp}.json"
with open(runs_file, 'w') as f:
json.dump({
'start_date': args.start_date,
'end_date': args.end_date,
'repo': args.repo,
'branch': args.branch,
'runs': triggered_runs
}, f, indent=2)
print(f"\nRun IDs saved to: {runs_file}")
print(f"\nTo download and concatenate these artifacts, run:")
print(f" python scripts/download_and_concat_runs.py {runs_file}")
if success_count < len(chunks):
sys.exit(1)
if __name__ == '__main__':
main()
+82
View File
@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
Run src.adsb.main in an isolated git worktree so edits in the main
working tree won't affect subprocess imports during the run.
Usage:
python scripts/run_main_isolated.py 2026-01-01
python scripts/run_main_isolated.py --start_date 2026-01-01 --end_date 2026-01-03
"""
import argparse
import os
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
def run(
cmd: list[str],
*,
cwd: Path | None = None,
check: bool = True,
) -> subprocess.CompletedProcess:
print(f"\n>>> {' '.join(cmd)}")
return subprocess.run(cmd, cwd=cwd, check=check)
def main() -> int:
parser = argparse.ArgumentParser(description="Run src.adsb.main in an isolated worktree")
parser.add_argument("date", nargs="?", help="Single date to process (YYYY-MM-DD)")
parser.add_argument("--start_date", help="Start date (inclusive, YYYY-MM-DD)")
parser.add_argument("--end_date", help="End date (exclusive, YYYY-MM-DD)")
parser.add_argument("--concat_with_latest_csv", action="store_true", help="Also concatenate with latest CSV from GitHub releases")
args = parser.parse_args()
if args.date and (args.start_date or args.end_date):
raise SystemExit("Use a single date or --start_date/--end_date, not both.")
if args.date:
datetime.strptime(args.date, "%Y-%m-%d")
main_args = ["--date", args.date]
else:
if not args.start_date or not args.end_date:
raise SystemExit("Provide --start_date and --end_date, or a single date.")
datetime.strptime(args.start_date, "%Y-%m-%d")
datetime.strptime(args.end_date, "%Y-%m-%d")
main_args = ["--start_date", args.start_date, "--end_date", args.end_date]
if args.concat_with_latest_csv:
main_args.append("--concat_with_latest_csv")
repo_root = Path(__file__).resolve().parents[1]
snapshots_root = repo_root / ".snapshots"
snapshots_root.mkdir(exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
snapshot_root = snapshots_root / f"run_{timestamp}"
snapshot_src = snapshot_root / "src"
exit_code = 0
try:
shutil.copytree(repo_root / "src", snapshot_src)
runner = (
"import sys, runpy; "
f"sys.path.insert(0, {repr(str(snapshot_root))}); "
f"sys.argv = ['src.adsb.main'] + {main_args!r}; "
"runpy.run_module('src.adsb.main', run_name='__main__')"
)
cmd = [sys.executable, "-c", runner]
run(cmd, cwd=repo_root)
except subprocess.CalledProcessError as exc:
exit_code = exc.returncode
finally:
shutil.rmtree(snapshot_root, ignore_errors=True)
return exit_code
if __name__ == "__main__":
raise SystemExit(main())
+242
View File
@@ -0,0 +1,242 @@
#!/usr/bin/env python3
"""
Parse TheAirTraffic Database CSV and produce community_submission.v1 JSON.
Source: "TheAirTraffic Database - Aircraft 2.csv"
Output: community/YYYY-MM-DD/theairtraffic_<date>_<hash>.json
Categories in the spreadsheet columns (paired: name, registrations, separator):
Col 1-3: Business
Col 4-6: Government
Col 7-9: People
Col 10-12: Sports
Col 13-15: Celebrity
Col 16-18: State Govt./Law
Col 19-21: Other
Col 22-24: Test Aircraft
Col 25-27: YouTubers
Col 28-30: Formula 1 VIP's
Col 31-33: Active GII's and GIII's (test/demo aircraft)
Col 34-37: Russia & Ukraine (extra col for old/new)
Col 38-40: Helicopters & Blimps
Col 41-43: Unique Reg's
Col 44-46: Saudi & UAE
Col 47-49: Schools
Col 50-52: Special Charter
Col 53-55: Unknown Owners
Col 56-59: Frequent Flyers (extra cols: name, aircraft, logged, hours)
"""
import csv
import json
import hashlib
import re
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
# ── Category mapping ────────────────────────────────────────────────────────
# Each entry: (name_col, reg_col, owner_category_tags)
# owner_category_tags is a dict of tag keys to add beyond "owner"
CATEGORY_COLUMNS = [
# (name_col, reg_col, {tag_key: tag_value, ...})
(1, 2, {"owner_category_0": "business"}),
(4, 5, {"owner_category_0": "government"}),
(7, 8, {"owner_category_0": "celebrity"}),
(10, 11, {"owner_category_0": "sports"}),
(13, 14, {"owner_category_0": "celebrity"}),
(16, 17, {"owner_category_0": "government", "owner_category_1": "law_enforcement"}),
(19, 20, {"owner_category_0": "other"}),
(22, 23, {"owner_category_0": "test_aircraft"}),
(25, 26, {"owner_category_0": "youtuber", "owner_category_1": "celebrity"}),
(28, 29, {"owner_category_0": "celebrity", "owner_category_1": "motorsport"}),
(31, 32, {"owner_category_0": "test_aircraft"}),
# Russia & Ukraine: col 34=name, col 35 or 36 may have reg
(34, 35, {"owner_category_0": "russia_ukraine"}),
(38, 39, {"owner_category_0": "celebrity", "category": "helicopter_or_blimp"}),
(41, 42, {"owner_category_0": "other"}),
(44, 45, {"owner_category_0": "government", "owner_category_1": "royal_family"}),
(47, 48, {"owner_category_0": "education"}),
(50, 51, {"owner_category_0": "charter"}),
(53, 54, {"owner_category_0": "unknown"}),
(56, 57, {"owner_category_0": "celebrity"}), # Frequent Flyers name col, aircraft col
]
# First data row index (0-based) in the CSV
DATA_START_ROW = 4
# ── Contributor info ────────────────────────────────────────────────────────
CONTRIBUTOR_NAME = "TheAirTraffic"
# Deterministic UUID v5 from contributor name
CONTRIBUTOR_UUID = str(uuid.uuid5(uuid.NAMESPACE_URL, "https://theairtraffic.com"))
# Citation
CITATION = "https://docs.google.com/spreadsheets/d/1JHhfJBnJPNBA6TgiSHjkXFkHBdVTTz_nXxaUDRWcHpk"
def looks_like_military_serial(reg: str) -> bool:
"""
Detect military-style serials like 92-9000, 82-8000, 98-0001
or pure numeric IDs like 929000, 828000, 980001.
These aren't standard civil registrations; use openairframes_id.
"""
# Pattern: NN-NNNN
if re.match(r'^\d{2}-\d{4}$', reg):
return True
# Pure 6-digit numbers (likely ICAO hex or military mode-S)
if re.match(r'^\d{6}$', reg):
return True
# Short numeric-only (1-5 digits) like "01", "02", "676"
if re.match(r'^\d{1,5}$', reg):
return True
return False
def normalize_reg(raw: str) -> str:
"""Clean up a registration string."""
reg = raw.strip().rstrip(',').strip()
# Remove carriage returns and other whitespace
reg = reg.replace('\r', '').replace('\n', '').strip()
return reg
def parse_regs(cell_value: str) -> list[str]:
"""
Parse a cell that may contain one or many registrations,
separated by commas, possibly wrapped in quotes.
"""
if not cell_value or not cell_value.strip():
return []
# Some cells have ADS-B exchange URLs skip those
if 'globe.adsbexchange.com' in cell_value:
return []
if cell_value.strip() in ('.', ',', ''):
return []
results = []
# Split on comma
parts = cell_value.split(',')
for part in parts:
reg = normalize_reg(part)
if not reg:
continue
# Skip URLs, section labels, etc.
if reg.startswith('http') or reg.startswith('Link') or reg == 'Section 1':
continue
# Skip if it's just whitespace or dots
if reg in ('.', '..', '...'):
continue
results.append(reg)
return results
def make_submission(
reg: str,
owner: str,
category_tags: dict[str, str],
) -> dict:
"""Build a single community_submission.v1 object."""
entry: dict = {}
# Decide identifier field
if looks_like_military_serial(reg):
entry["openairframes_id"] = reg
else:
entry["registration_number"] = reg
# Tags
tags: dict = {
"citation_0": CITATION,
}
if owner:
tags["owner"] = owner.strip()
tags.update(category_tags)
entry["tags"] = tags
return entry
def main():
csv_path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(
"/Users/jonahgoode/Downloads/TheAirTraffic Database - Aircraft 2.csv"
)
if not csv_path.exists():
print(f"ERROR: CSV not found at {csv_path}", file=sys.stderr)
sys.exit(1)
# Read CSV
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
rows = list(reader)
print(f"Read {len(rows)} rows from {csv_path.name}")
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
submissions: list[dict] = []
seen: set[tuple] = set() # (reg, owner) dedup
for row_idx in range(DATA_START_ROW, len(rows)):
row = rows[row_idx]
if len(row) < 3:
continue
for name_col, reg_col, cat_tags in CATEGORY_COLUMNS:
if reg_col >= len(row) or name_col >= len(row):
continue
owner_raw = row[name_col].strip().rstrip(',').strip()
reg_raw = row[reg_col]
# Clean owner name
owner = owner_raw.replace('\r', '').replace('\n', '').strip()
if not owner or owner in ('.', ',', 'Section 1'):
continue
# Skip header-like values
if owner.startswith('http') or owner.startswith('Link '):
continue
regs = parse_regs(reg_raw)
if not regs:
# For Russia & Ukraine, try the next column too (col 35 might have old reg, col 36 new)
if name_col == 34 and reg_col + 1 < len(row):
regs = parse_regs(row[reg_col + 1])
for reg in regs:
key = (reg, owner)
if key in seen:
continue
seen.add(key)
submissions.append(make_submission(reg, owner, cat_tags))
print(f"Generated {len(submissions)} submissions")
# Write output
proj_root = Path(__file__).resolve().parent.parent
out_dir = proj_root / "community" / date_str
out_dir.mkdir(parents=True, exist_ok=True)
out_file = out_dir / f"theairtraffic_{date_str}.json"
with open(out_file, 'w', encoding='utf-8') as f:
json.dump(submissions, f, indent=2, ensure_ascii=False)
print(f"Written to {out_file}")
print(f"Sample entry:\n{json.dumps(submissions[0], indent=2)}")
# Quick stats
cats = {}
for s in submissions:
c = s['tags'].get('owner_category_0', 'NONE')
cats[c] = cats.get(c, 0) + 1
print("\nCategory breakdown:")
for c, n in sorted(cats.items(), key=lambda x: -x[1]):
print(f" {c}: {n}")
if __name__ == "__main__":
main()
+69
View File
@@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""Validate the generated theairtraffic JSON output."""
import json
import glob
import sys
# Find the latest output
files = sorted(glob.glob("community/2026-02-*/theairtraffic_*.json"))
if not files:
print("No output files found!")
sys.exit(1)
path = files[-1]
print(f"Validating: {path}")
with open(path) as f:
data = json.load(f)
print(f"Total entries: {len(data)}")
# Check military serial handling
mil = [d for d in data if "openairframes_id" in d]
print(f"\nEntries using openairframes_id: {len(mil)}")
for m in mil[:10]:
print(f" {m['openairframes_id']} -> owner: {m['tags'].get('owner','?')}")
# Check youtuber entries
yt = [d for d in data if d["tags"].get("owner_category_0") == "youtuber"]
print(f"\nYouTuber entries: {len(yt)}")
for y in yt[:5]:
reg = y.get("registration_number", y.get("openairframes_id"))
c0 = y["tags"].get("owner_category_0")
c1 = y["tags"].get("owner_category_1")
print(f" {reg} -> owner: {y['tags']['owner']}, cat0: {c0}, cat1: {c1}")
# Check US Govt / military
gov = [d for d in data if d["tags"].get("owner") == "United States of America 747/757"]
print(f"\nUSA 747/757 entries: {len(gov)}")
for g in gov:
oid = g.get("openairframes_id", g.get("registration_number"))
print(f" {oid}")
# Schema validation
issues = 0
for i, d in enumerate(data):
has_id = any(k in d for k in ["registration_number", "transponder_code_hex", "openairframes_id"])
if not has_id:
print(f" Entry {i}: no identifier!")
issues += 1
if "tags" not in d:
print(f" Entry {i}: no tags!")
issues += 1
# Check tag key format
for k in d.get("tags", {}):
import re
if not re.match(r"^[a-z][a-z0-9_]{0,63}$", k):
print(f" Entry {i}: invalid tag key '{k}'")
issues += 1
print(f"\nSchema issues: {issues}")
# Category breakdown
cats = {}
for s in data:
c = s["tags"].get("owner_category_0", "NONE")
cats[c] = cats.get(c, 0) + 1
print("\nCategory breakdown:")
for c, n in sorted(cats.items(), key=lambda x: -x[1]):
print(f" {c}: {n}")
+197
View File
@@ -0,0 +1,197 @@
# Shared compression logic for ADS-B aircraft data
import os
import polars as pl
COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']
def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame:
"""Compress a single ICAO group to its most informative row using Polars."""
# Create signature string
df = df.with_columns(
pl.concat_str([pl.col(c).cast(pl.Utf8) for c in COLUMNS], separator="|").alias("_signature")
)
# Compute signature counts
signature_counts = df.group_by("_signature").len().rename({"len": "_sig_count"})
# Group by signature and take first row
df = df.group_by("_signature").first()
if df.height == 1:
# Only one unique signature, return it
result = df.drop("_signature").with_columns(pl.lit(icao).alias("icao"))
return result
# For each row, create dict of non-empty column values and check subsets
# Convert to list of dicts for subset checking (same logic as pandas version)
rows_data = []
for row in df.iter_rows(named=True):
non_empty = {col: row[col] for col in COLUMNS if row[col] != '' and row[col] is not None}
rows_data.append({
'signature': row['_signature'],
'non_empty_dict': non_empty,
'non_empty_count': len(non_empty),
'row_data': row
})
# Check if row i's non-empty values are a subset of row j's non-empty values
def is_subset_of_any(idx):
row_dict = rows_data[idx]['non_empty_dict']
row_count = rows_data[idx]['non_empty_count']
for other_idx, other_data in enumerate(rows_data):
if idx == other_idx:
continue
other_dict = other_data['non_empty_dict']
other_count = other_data['non_empty_count']
# Check if all non-empty values in current row match those in other row
if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):
# If they match and other has more defined columns, current row is redundant
if other_count > row_count:
return True
return False
# Keep rows that are not subsets of any other row
keep_indices = [i for i in range(len(rows_data)) if not is_subset_of_any(i)]
if len(keep_indices) == 0:
keep_indices = [0] # Fallback: keep first row
remaining_signatures = [rows_data[i]['signature'] for i in keep_indices]
df = df.filter(pl.col("_signature").is_in(remaining_signatures))
if df.height > 1:
# Use signature counts to pick the most frequent one
df = df.join(signature_counts, on="_signature", how="left")
max_count = df["_sig_count"].max()
df = df.filter(pl.col("_sig_count") == max_count).head(1)
df = df.drop("_sig_count")
result = df.drop("_signature").with_columns(pl.lit(icao).alias("icao"))
# Ensure empty strings are preserved
for col in COLUMNS:
if col in result.columns:
result = result.with_columns(pl.col(col).fill_null(""))
return result
def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame:
"""Compress a DataFrame with multiple ICAOs to one row per ICAO.
Args:
df: DataFrame with columns ['time', 'icao'] + COLUMNS
verbose: Whether to print progress
Returns:
Compressed DataFrame with one row per ICAO
"""
if df.height == 0:
return df
# Sort by icao and time
df = df.sort(['icao', 'time'])
# Fill null values with empty strings for COLUMNS
for col in COLUMNS:
if col in df.columns:
df = df.with_columns(pl.col(col).cast(pl.Utf8).fill_null(""))
# Quick deduplication of exact duplicates
df = df.unique(subset=['icao'] + COLUMNS, keep='first')
if verbose:
print(f"After quick dedup: {df.height} records")
# Compress per ICAO
if verbose:
print("Compressing per ICAO...")
icao_groups = df.partition_by('icao', as_dict=True, maintain_order=True)
compressed_dfs = []
for icao_key, group_df in icao_groups.items():
icao = icao_key[0]
compressed = compress_df_polars(group_df, str(icao))
compressed_dfs.append(compressed)
if compressed_dfs:
df_compressed = pl.concat(compressed_dfs)
else:
df_compressed = df.head(0)
if verbose:
print(f"After compress: {df_compressed.height} records")
# Reorder columns: time first, then icao
cols = df_compressed.columns
ordered_cols = ['time', 'icao'] + [c for c in cols if c not in ['time', 'icao']]
df_compressed = df_compressed.select(ordered_cols)
return df_compressed
def load_parquet_part(part_id: int, date: str) -> pl.DataFrame:
"""Load a single parquet part file for a date.
Args:
part_id: Part ID (e.g., 1, 2, 3)
date: Date string in YYYY-MM-DD format
Returns:
DataFrame with ADS-B data
"""
from pathlib import Path
parquet_file = Path(f"data/output/parquet_output/part_{part_id}_{date}.parquet")
if not parquet_file.exists():
print(f"Parquet file not found: {parquet_file}")
return pl.DataFrame(schema={
'time': pl.Datetime,
'icao': pl.Utf8,
'r': pl.Utf8,
't': pl.Utf8,
'dbFlags': pl.Int64,
'ownOp': pl.Utf8,
'year': pl.Int64,
'desc': pl.Utf8,
'aircraft_category': pl.Utf8
})
print(f"Loading from parquet: {parquet_file}")
df = pl.read_parquet(
parquet_file,
columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category']
)
# Convert to timezone-naive datetime
if df["time"].dtype == pl.Datetime:
df = df.with_columns(pl.col("time").dt.replace_time_zone(None))
os.remove(parquet_file)
return df
def compress_parquet_part(part_id: int, date: str) -> pl.DataFrame:
"""Load and compress a single parquet part file."""
df = load_parquet_part(part_id, date)
if df.height == 0:
return df
# Filter to rows within the given date (UTC-naive). This is because sometimes adsb.lol export can have rows at 00:00:00 of next day or similar.
date_lit = pl.lit(date).str.strptime(pl.Date, "%Y-%m-%d")
df = df.filter(pl.col("time").dt.date() == date_lit)
print(f"Loaded {df.height} raw records for part {part_id}, date {date}")
return compress_multi_icao_df(df, verbose=True)
def concat_compressed_dfs(df_base, df_new):
"""Concatenate base and new compressed dataframes, keeping the most informative row per ICAO."""
# Combine both dataframes
df_combined = pl.concat([df_base, df_new])
return df_combined
+67
View File
@@ -0,0 +1,67 @@
from pathlib import Path
import polars as pl
import argparse
import os
OUTPUT_DIR = Path("./data/output")
CORRECT_ORDER_OF_COLUMNS = ["time", "icao", "r", "t", "dbFlags", "ownOp", "year", "desc", "aircraft_category"]
def main():
parser = argparse.ArgumentParser(description="Concatenate compressed parquet files for a single day")
parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format")
parser.add_argument("--concat_with_latest_csv", action="store_true", help="Whether to also concatenate with the latest CSV from GitHub releases")
args = parser.parse_args()
compressed_dir = OUTPUT_DIR / "compressed"
date_dir = compressed_dir / args.date
parquet_files = sorted(date_dir.glob("*.parquet"))
df = None
if parquet_files: # TODO: This logic could be updated slightly.
print(f"No parquet files found in {date_dir}")
frames = [pl.read_parquet(p) for p in parquet_files]
df = pl.concat(frames, how="vertical", rechunk=True)
df = df.sort(["time", "icao"])
df = df.select(CORRECT_ORDER_OF_COLUMNS)
output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.parquet"
print(f"Writing combined parquet to {output_path} with {df.height} rows")
df.write_parquet(output_path)
csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.csv.gz"
print(f"Writing combined csv.gz to {csv_output_path} with {df.height} rows")
df.write_csv(csv_output_path, compression="gzip")
if args.concat_with_latest_csv:
print("Loading latest CSV from GitHub releases to concatenate with...")
from src.get_latest_release import get_latest_aircraft_adsb_csv_df
from datetime import datetime
df_latest_csv, csv_start_date, csv_end_date = get_latest_aircraft_adsb_csv_df()
# Compare dates: end_date is exclusive, so if csv_end_date > args.date,
# the latest CSV already includes this day's data
csv_end_dt = datetime.strptime(csv_end_date, "%Y-%m-%d")
args_dt = datetime.strptime(args.date, "%Y-%m-%d")
if df is None or csv_end_dt >= args_dt:
print(f"Latest CSV already includes data through {args.date} (end_date={csv_end_date} is exclusive)")
print("Writing latest CSV directly without concatenation to avoid duplicates")
os.makedirs(OUTPUT_DIR, exist_ok=True)
final_csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{csv_start_date}_{csv_end_date}.csv.gz"
df_latest_csv = df_latest_csv.select(CORRECT_ORDER_OF_COLUMNS)
df_latest_csv.write_csv(final_csv_output_path, compression="gzip")
else:
print(f"Concatenating latest CSV (through {csv_end_date}) with new data ({args.date})")
# Ensure column order matches before concatenating
df_latest_csv = df_latest_csv.select(CORRECT_ORDER_OF_COLUMNS)
from src.adsb.compress_adsb_to_aircraft_data import concat_compressed_dfs
df_final = concat_compressed_dfs(df_latest_csv, df)
df_final = df_final.select(CORRECT_ORDER_OF_COLUMNS)
final_csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{csv_start_date}_{args.date}.csv.gz"
df_final.write_csv(final_csv_output_path, compression="gzip")
print(f"Final CSV written to {final_csv_output_path}")
if __name__ == "__main__":
main()
+587
View File
@@ -0,0 +1,587 @@
"""
Downloads adsb.lol data and writes to Parquet files.
This file contains utility functions for downloading and processing adsb.lol trace data.
Used by the historical ADS-B processing pipeline.
"""
import datetime as dt
import gzip
import os
import re
import resource
import shutil
import signal
import subprocess
import sys
import urllib.error
import urllib.request
from datetime import datetime
import time
import orjson
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
# ============================================================================
# Configuration
# ============================================================================
OUTPUT_DIR = Path("./data/output")
os.makedirs(OUTPUT_DIR, exist_ok=True)
PARQUET_DIR = os.path.join(OUTPUT_DIR, "parquet_output")
os.makedirs(PARQUET_DIR, exist_ok=True)
TOKEN = os.environ.get('GITHUB_TOKEN') # Optional: for higher GitHub API rate limits
HEADERS = {"Authorization": f"token {TOKEN}"} if TOKEN else {}
def get_resource_usage() -> str:
"""Get current RAM and disk usage as a formatted string."""
# RAM usage (RSS = Resident Set Size)
ram_bytes = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# On macOS, ru_maxrss is in bytes; on Linux, it's in KB
if sys.platform == 'darwin':
ram_gb = ram_bytes / (1024**3)
else:
ram_gb = ram_bytes / (1024**2) # Convert KB to GB
# Disk usage
disk = shutil.disk_usage('.')
disk_free_gb = disk.free / (1024**3)
disk_total_gb = disk.total / (1024**3)
return f"RAM: {ram_gb:.2f}GB | Disk: {disk_free_gb:.1f}GB free / {disk_total_gb:.1f}GB total"
# ============================================================================
# GitHub Release Fetching and Downloading
# ============================================================================
class DownloadTimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise DownloadTimeoutException("Download timed out after 40 seconds")
def _fetch_releases_from_repo(year: str, version_date: str) -> list:
"""Fetch GitHub releases for a given version date from a specific year's adsblol repo."""
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+(tmp)?$"
releases = []
page = 1
while True:
max_retries = 10
retry_delay = 60*5
for attempt in range(1, max_retries + 1):
try:
req = urllib.request.Request(f"{BASE_URL}?page={page}", headers=HEADERS)
with urllib.request.urlopen(req) as response:
if response.status == 200:
data = orjson.loads(response.read())
break
else:
print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status} {response.reason}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
print(f"Giving up after {max_retries} attempts")
return releases
except Exception as e:
print(f"Request exception (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
print(f"Giving up after {max_retries} attempts")
return releases
if not data:
break
for release in data:
if re.match(PATTERN, release["tag_name"]):
releases.append(release)
page += 1
return releases
def fetch_releases(version_date: str) -> list:
"""Fetch GitHub releases for a given version date from adsblol.
For Dec 31 dates, if no releases are found in the current year's repo,
also checks the next year's repo (adsblol sometimes publishes Dec 31
data in the following year's repository).
"""
year = version_date.split('.')[0][1:]
releases = _fetch_releases_from_repo(year, version_date)
# For last day of year, also check next year's repo if nothing found
if not releases and version_date.endswith(".12.31"):
next_year = str(int(year) + 1)
print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo")
releases = _fetch_releases_from_repo(next_year, version_date)
return releases
def download_asset(asset_url: str, file_path: str, expected_size: int | None = None) -> bool:
"""Download a single release asset with size verification.
Args:
asset_url: URL to download from
file_path: Local path to save to
expected_size: Expected file size in bytes (for verification)
Returns:
True if download succeeded and size matches (if provided), False otherwise
"""
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
# Check if file exists and has correct size
if os.path.exists(file_path):
if expected_size is not None:
actual_size = os.path.getsize(file_path)
if actual_size == expected_size:
print(f"[SKIP] {file_path} already downloaded and verified ({actual_size} bytes).")
return True
else:
print(f"[WARN] {file_path} exists but size mismatch (expected {expected_size}, got {actual_size}). Re-downloading.")
os.remove(file_path)
else:
print(f"[SKIP] {file_path} already downloaded.")
return True
max_retries = 2
retry_delay = 30
timeout_seconds = 140
for attempt in range(1, max_retries + 1):
print(f"Downloading {asset_url} (attempt {attempt}/{max_retries})")
try:
req = urllib.request.Request(asset_url, headers=HEADERS)
with urllib.request.urlopen(req, timeout=timeout_seconds) as response:
if response.status == 200:
with open(file_path, "wb") as file:
while True:
chunk = response.read(8192)
if not chunk:
break
file.write(chunk)
# Verify file size if expected_size was provided
if expected_size is not None:
actual_size = os.path.getsize(file_path)
if actual_size != expected_size:
print(f"[ERROR] Size mismatch for {file_path}: expected {expected_size} bytes, got {actual_size} bytes")
os.remove(file_path)
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
continue
return False
print(f"Saved {file_path} ({actual_size} bytes, verified)")
else:
print(f"Saved {file_path}")
return True
else:
print(f"Failed to download {asset_url}: {response.status} {response.msg}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
return False
except urllib.error.HTTPError as e:
if e.code == 404:
print(f"404 Not Found: {asset_url}")
raise Exception(f"Asset not found (404): {asset_url}")
else:
print(f"HTTP error occurred (attempt {attempt}/{max_retries}): {e.code} {e.reason}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
return False
except urllib.error.URLError as e:
print(f"URL/Timeout error (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
return False
except Exception as e:
print(f"An error occurred (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
return False
return False
def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
"""
Extracts a split archive by concatenating the parts using 'cat'
and then extracting with 'tar' in one pipeline.
Deletes the tar files immediately after extraction to save disk space.
"""
if os.path.isdir(extract_dir):
print(f"[SKIP] Extraction directory already exists: {extract_dir}")
return True
def sort_key(path: str):
base = os.path.basename(path)
parts = base.rsplit('.', maxsplit=1)
if len(parts) == 2:
suffix = parts[1]
if suffix.isdigit():
return (0, int(suffix))
if re.fullmatch(r'[a-zA-Z]+', suffix):
return (1, suffix)
return (2, base)
file_paths = sorted(file_paths, key=sort_key)
os.makedirs(extract_dir, exist_ok=True)
try:
cat_proc = subprocess.Popen(
["cat"] + file_paths,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
tar_cmd = ["tar", "xf", "-", "-C", extract_dir, "--strip-components=1"]
result = subprocess.run(
tar_cmd,
stdin=cat_proc.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
cat_proc.stdout.close()
cat_stderr = cat_proc.stderr.read().decode() if cat_proc.stderr else ""
cat_proc.wait()
if cat_stderr:
print(f"cat stderr: {cat_stderr}")
tar_stderr = result.stderr.decode() if result.stderr else ""
if result.returncode != 0:
# GNU tar exits non-zero for format issues that BSD tar silently
# tolerates (e.g. trailing junk after the last valid entry).
# Check whether files were actually extracted before giving up.
extracted_items = os.listdir(extract_dir)
if extracted_items:
print(f"[WARN] tar exited {result.returncode} but extracted "
f"{len(extracted_items)} items — treating as success")
if tar_stderr:
print(f"tar stderr: {tar_stderr}")
else:
print(f"Failed to extract split archive (tar exit {result.returncode})")
if tar_stderr:
print(f"tar stderr: {tar_stderr}")
shutil.rmtree(extract_dir, ignore_errors=True)
return False
print(f"Successfully extracted archive to {extract_dir}")
# Delete tar files immediately after extraction
for tar_file in file_paths:
try:
os.remove(tar_file)
print(f"Deleted tar file: {tar_file}")
except Exception as e:
print(f"Failed to delete {tar_file}: {e}")
# Check disk usage after deletion
disk = shutil.disk_usage('.')
free_gb = disk.free / (1024**3)
print(f"Disk space after tar deletion: {free_gb:.1f}GB free")
return True
except Exception as e:
print(f"Failed to extract split archive: {e}")
shutil.rmtree(extract_dir, ignore_errors=True)
return False
# ============================================================================
# Trace File Processing (with alt_baro/on_ground handling)
# ============================================================================
ALLOWED_DATA_SOURCE = {'', 'adsb.lol', 'adsbexchange', 'airplanes.live'}
def process_file(filepath: str) -> list:
"""
Process a single trace file and return list of rows.
Handles alt_baro/on_ground: if altitude == "ground", on_ground=True and alt_baro=None.
"""
insert_rows = []
with gzip.open(filepath, 'rb') as f:
data = orjson.loads(f.read())
icao = data.get('icao', None)
if icao is None:
print(f"Skipping file {filepath} as it does not contain 'icao'")
return []
r = data.get('r', "")
t = data.get('t', "")
dbFlags = data.get('dbFlags', 0)
noRegData = data.get('noRegData', False)
ownOp = data.get('ownOp', "")
year = int(data.get('year', 0))
timestamp = data.get('timestamp', None)
desc = data.get('desc', "")
trace_data = data.get('trace', None)
if timestamp is None or trace_data is None:
print(f"Skipping file {filepath} as it does not contain 'timestamp' or 'trace'")
return []
for row in trace_data:
time_offset = row[0]
lat = row[1]
lon = row[2]
altitude = row[3]
# Handle alt_baro/on_ground
alt_baro = None
on_ground = False
if type(altitude) is str and altitude == "ground":
on_ground = True
elif type(altitude) is int:
alt_baro = altitude
elif type(altitude) is float:
alt_baro = int(altitude)
ground_speed = row[4]
track_degrees = row[5]
flags = row[6]
vertical_rate = row[7]
aircraft = row[8]
source = row[9]
data_source_value = "adsb.lol" if "adsb.lol" in ALLOWED_DATA_SOURCE else ""
geometric_altitude = row[10]
geometric_vertical_rate = row[11]
indicated_airspeed = row[12]
roll_angle = row[13]
time_val = timestamp + time_offset
dt64 = dt.datetime.fromtimestamp(time_val, tz=dt.timezone.utc)
# Prepare base fields
inserted_row = [
dt64, icao, r, t, dbFlags, noRegData, ownOp, year, desc,
lat, lon, alt_baro, on_ground, ground_speed, track_degrees,
flags, vertical_rate
]
next_part = [
source, geometric_altitude, geometric_vertical_rate,
indicated_airspeed, roll_angle
]
inserted_row.extend(next_part)
if aircraft is None or type(aircraft) is not dict:
aircraft = dict()
aircraft_data = {
'alert': aircraft.get('alert', None),
'alt_geom': aircraft.get('alt_geom', None),
'gva': aircraft.get('gva', None),
'nac_p': aircraft.get('nac_p', None),
'nac_v': aircraft.get('nac_v', None),
'nic': aircraft.get('nic', None),
'nic_baro': aircraft.get('nic_baro', None),
'rc': aircraft.get('rc', None),
'sda': aircraft.get('sda', None),
'sil': aircraft.get('sil', None),
'sil_type': aircraft.get('sil_type', ""),
'spi': aircraft.get('spi', None),
'track': aircraft.get('track', None),
'type': aircraft.get('type', ""),
'version': aircraft.get('version', None),
'category': aircraft.get('category', ''),
'emergency': aircraft.get('emergency', ''),
'flight': aircraft.get('flight', ""),
'squawk': aircraft.get('squawk', ""),
'baro_rate': aircraft.get('baro_rate', None),
'nav_altitude_fms': aircraft.get('nav_altitude_fms', None),
'nav_altitude_mcp': aircraft.get('nav_altitude_mcp', None),
'nav_modes': aircraft.get('nav_modes', []),
'nav_qnh': aircraft.get('nav_qnh', None),
'geom_rate': aircraft.get('geom_rate', None),
'ias': aircraft.get('ias', None),
'mach': aircraft.get('mach', None),
'mag_heading': aircraft.get('mag_heading', None),
'oat': aircraft.get('oat', None),
'roll': aircraft.get('roll', None),
'tas': aircraft.get('tas', None),
'tat': aircraft.get('tat', None),
'true_heading': aircraft.get('true_heading', None),
'wd': aircraft.get('wd', None),
'ws': aircraft.get('ws', None),
'track_rate': aircraft.get('track_rate', None),
'nav_heading': aircraft.get('nav_heading', None)
}
aircraft_list = list(aircraft_data.values())
inserted_row.extend(aircraft_list)
inserted_row.append(data_source_value)
insert_rows.append(inserted_row)
if insert_rows:
# print(f"Got {len(insert_rows)} rows from {filepath}")
return insert_rows
else:
return []
# ============================================================================
# Parquet Writing
# ============================================================================
# Column names matching the order of data in inserted_row
COLUMNS = [
"time", "icao",
"r", "t", "dbFlags", "noRegData", "ownOp", "year", "desc",
"lat", "lon", "alt_baro", "on_ground", "ground_speed", "track_degrees",
"flags", "vertical_rate", "source", "geometric_altitude",
"geometric_vertical_rate", "indicated_airspeed", "roll_angle",
"aircraft_alert", "aircraft_alt_geom", "aircraft_gva", "aircraft_nac_p",
"aircraft_nac_v", "aircraft_nic", "aircraft_nic_baro", "aircraft_rc",
"aircraft_sda", "aircraft_sil", "aircraft_sil_type", "aircraft_spi",
"aircraft_track", "aircraft_type", "aircraft_version", "aircraft_category",
"aircraft_emergency", "aircraft_flight", "aircraft_squawk",
"aircraft_baro_rate", "aircraft_nav_altitude_fms", "aircraft_nav_altitude_mcp",
"aircraft_nav_modes", "aircraft_nav_qnh", "aircraft_geom_rate",
"aircraft_ias", "aircraft_mach", "aircraft_mag_heading", "aircraft_oat",
"aircraft_roll", "aircraft_tas", "aircraft_tat", "aircraft_true_heading",
"aircraft_wd", "aircraft_ws", "aircraft_track_rate", "aircraft_nav_heading",
"data_source",
]
OS_CPU_COUNT = os.cpu_count() or 1
MAX_WORKERS = OS_CPU_COUNT if OS_CPU_COUNT > 4 else 1
# PyArrow schema for efficient Parquet writing
PARQUET_SCHEMA = pa.schema([
("time", pa.timestamp("ms", tz="UTC")),
("icao", pa.string()),
("r", pa.string()),
("t", pa.string()),
("dbFlags", pa.int32()),
("noRegData", pa.bool_()),
("ownOp", pa.string()),
("year", pa.uint16()),
("desc", pa.string()),
("lat", pa.float64()),
("lon", pa.float64()),
("alt_baro", pa.int32()),
("on_ground", pa.bool_()),
("ground_speed", pa.float32()),
("track_degrees", pa.float32()),
("flags", pa.uint32()),
("vertical_rate", pa.int32()),
("source", pa.string()),
("geometric_altitude", pa.int32()),
("geometric_vertical_rate", pa.int32()),
("indicated_airspeed", pa.int32()),
("roll_angle", pa.float32()),
("aircraft_alert", pa.int64()),
("aircraft_alt_geom", pa.int64()),
("aircraft_gva", pa.int64()),
("aircraft_nac_p", pa.int64()),
("aircraft_nac_v", pa.int64()),
("aircraft_nic", pa.int64()),
("aircraft_nic_baro", pa.int64()),
("aircraft_rc", pa.int64()),
("aircraft_sda", pa.int64()),
("aircraft_sil", pa.int64()),
("aircraft_sil_type", pa.string()),
("aircraft_spi", pa.int64()),
("aircraft_track", pa.float64()),
("aircraft_type", pa.string()),
("aircraft_version", pa.int64()),
("aircraft_category", pa.string()),
("aircraft_emergency", pa.string()),
("aircraft_flight", pa.string()),
("aircraft_squawk", pa.string()),
("aircraft_baro_rate", pa.int64()),
("aircraft_nav_altitude_fms", pa.int64()),
("aircraft_nav_altitude_mcp", pa.int64()),
("aircraft_nav_modes", pa.list_(pa.string())),
("aircraft_nav_qnh", pa.float64()),
("aircraft_geom_rate", pa.int64()),
("aircraft_ias", pa.int64()),
("aircraft_mach", pa.float64()),
("aircraft_mag_heading", pa.float64()),
("aircraft_oat", pa.int64()),
("aircraft_roll", pa.float64()),
("aircraft_tas", pa.int64()),
("aircraft_tat", pa.int64()),
("aircraft_true_heading", pa.float64()),
("aircraft_wd", pa.int64()),
("aircraft_ws", pa.int64()),
("aircraft_track_rate", pa.float64()),
("aircraft_nav_heading", pa.float64()),
("data_source", pa.string()),
])
def collect_trace_files_with_find(root_dir):
"""Find all trace_full_*.json files in the extracted directory."""
trace_dict: dict[str, str] = {}
cmd = ['find', root_dir, '-type', 'f', '-name', 'trace_full_*.json']
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
print(f"Error executing find: {result.stderr}")
return trace_dict
for file_path in result.stdout.strip().split('\n'):
if file_path:
filename = os.path.basename(file_path)
if filename.startswith("trace_full_") and filename.endswith(".json"):
icao = filename[len("trace_full_"):-len(".json")]
trace_dict[icao] = file_path
return trace_dict
def create_parquet_for_day(day, keep_folders: bool = False):
"""Create parquet file for a single day.
Args:
day: datetime object or string in 'YYYY-MM-DD' format
keep_folders: Whether to keep extracted folders after processing
Returns:
Path to the created parquet file, or None if failed
"""
from pathlib import Path
if isinstance(day, str):
day = datetime.strptime(day, "%Y-%m-%d")
version_date = f"v{day.strftime('%Y.%m.%d')}"
# Check if parquet already exists
parquet_path = Path(PARQUET_DIR) / f"{version_date}.parquet"
if parquet_path.exists():
print(f"Parquet file already exists: {parquet_path}")
return parquet_path
print(f"Creating parquet for {version_date}")
rows_processed = process_version_date(version_date, keep_folders)
if rows_processed > 0 and parquet_path.exists():
return parquet_path
else:
return None
+164
View File
@@ -0,0 +1,164 @@
"""
Downloads and extracts adsb.lol tar files for a single day, then lists all ICAO folders.
This is the first step of the map-reduce pipeline.
Outputs:
- Extracted trace files in data/output/{version_date}-planes-readsb-prod-0.tar_0/
- ICAO manifest at data/output/icao_manifest_{date}.txt
"""
import os
import sys
import argparse
import glob
import subprocess
from datetime import datetime, timedelta
# Re-use download/extract functions from download_adsb_data_to_parquet
from src.adsb.download_adsb_data_to_parquet import (
OUTPUT_DIR,
fetch_releases,
download_asset,
extract_split_archive,
collect_trace_files_with_find,
)
def download_and_extract(version_date: str) -> str | None:
"""Download and extract tar files, return extract directory path."""
extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0")
# Check if already extracted
if os.path.isdir(extract_dir):
print(f"[SKIP] Already extracted: {extract_dir}")
return extract_dir
# Check for existing tar files
pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*")
matches = [p for p in glob.glob(pattern) if os.path.isfile(p)]
if matches:
print(f"Found existing tar files for {version_date}")
normal_matches = [
p for p in matches
if "-planes-readsb-prod-0." in os.path.basename(p)
and "tmp" not in os.path.basename(p)
]
downloaded_files = normal_matches if normal_matches else matches
else:
# Download from GitHub
print(f"Downloading releases for {version_date}...")
releases = fetch_releases(version_date)
if not releases:
print(f"No releases found for {version_date}")
return None
# Prefer non-tmp releases; only use tmp if no normal releases exist
normal_releases = [r for r in releases if "tmp" not in r["tag_name"]]
tmp_releases = [r for r in releases if "tmp" in r["tag_name"]]
releases = normal_releases if normal_releases else tmp_releases
print(f"Using {'normal' if normal_releases else 'tmp'} releases ({len(releases)} found)")
downloaded_files = []
for release in releases:
tag_name = release["tag_name"]
print(f"Processing release: {tag_name}")
assets = release.get("assets", [])
normal_assets = [
a for a in assets
if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"]
]
tmp_assets = [
a for a in assets
if "planes-readsb-prod-0tmp" in a["name"]
]
use_assets = normal_assets if normal_assets else tmp_assets
for asset in use_assets:
asset_name = asset["name"]
asset_url = asset["browser_download_url"]
asset_size = asset.get("size") # Get expected file size
file_path = os.path.join(OUTPUT_DIR, asset_name)
if download_asset(asset_url, file_path, expected_size=asset_size):
downloaded_files.append(file_path)
if not downloaded_files:
print(f"No files downloaded for {version_date}")
return None
# Extract
if extract_split_archive(downloaded_files, extract_dir):
return extract_dir
return None
def list_icao_folders(extract_dir: str) -> list[str]:
"""List all ICAO folder names from extracted directory."""
trace_files = collect_trace_files_with_find(extract_dir)
icaos = sorted(trace_files.keys())
print(f"Found {len(icaos)} unique ICAOs")
return icaos
def process_single_day(target_day: datetime) -> tuple[str | None, list[str]]:
"""Process a single day: download, extract, list ICAOs.
Returns:
Tuple of (extract_dir, icaos)
"""
date_str = target_day.strftime("%Y-%m-%d")
version_date = f"v{target_day.strftime('%Y.%m.%d')}"
print(f"Processing date: {date_str} (version: {version_date})")
extract_dir = download_and_extract(version_date)
if not extract_dir:
print(f"Failed to download/extract data for {date_str}")
raise Exception(f"No data available for {date_str}")
icaos = list_icao_folders(extract_dir)
print(f"Found {len(icaos)} ICAOs for {date_str}")
return extract_dir, icaos
from pathlib import Path
import tarfile
NUMBER_PARTS = 4
def split_folders_into_gzip_archives(extract_dir: Path, tar_output_dir: Path, icaos: list[str], parts = NUMBER_PARTS) -> list[str]:
traces_dir = extract_dir / "traces"
buckets = sorted(traces_dir.iterdir())
tars = []
for i in range(parts):
tar_path = tar_output_dir / f"{tar_output_dir.name}_part_{i}.tar.gz"
tars.append(tarfile.open(tar_path, "w:gz"))
for idx, bucket_path in enumerate(buckets):
tar_idx = idx % parts
tars[tar_idx].add(bucket_path, arcname=bucket_path.name)
for tar in tars:
tar.close()
def main():
parser = argparse.ArgumentParser(description="Download and list ICAOs from adsb.lol data for a single day")
parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format (default: yesterday)")
args = parser.parse_args()
target_day = datetime.strptime(args.date, "%Y-%m-%d")
date_str = target_day.strftime("%Y-%m-%d")
tar_output_dir = Path(f"./data/output/adsb_archives/{date_str}")
extract_dir, icaos = process_single_day(target_day)
extract_dir = Path(extract_dir)
print(extract_dir)
tar_output_dir.mkdir(parents=True, exist_ok=True)
split_folders_into_gzip_archives(extract_dir, tar_output_dir, icaos)
if not icaos:
print("No ICAOs found")
sys.exit(1)
print(f"\nDone! Extract dir: {extract_dir}")
print(f"Total ICAOs: {len(icaos)}")
if __name__ == "__main__":
main()
+64
View File
@@ -0,0 +1,64 @@
#!/usr/bin/env python3
"""Generate date chunk matrix for historical ADS-B processing."""
import json
import os
import sys
from datetime import datetime, timedelta
def generate_chunks(start_date: str, end_date: str, chunk_days: int) -> list[dict]:
"""Generate date chunks for parallel processing.
Args:
start_date: Start date in YYYY-MM-DD format (inclusive)
end_date: End date in YYYY-MM-DD format (exclusive)
chunk_days: Number of days per chunk
Returns:
List of chunk dictionaries with start_date and end_date (both inclusive within chunk)
"""
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
chunks = []
current = start
# end_date is exclusive, so we process up to but not including it
while current < end:
# chunk_end is inclusive, so subtract 1 from the next chunk start
chunk_end = min(current + timedelta(days=chunk_days - 1), end - timedelta(days=1))
chunks.append({
"start_date": current.strftime("%Y-%m-%d"),
"end_date": chunk_end.strftime("%Y-%m-%d"),
})
current = chunk_end + timedelta(days=1)
return chunks
def main() -> None:
"""Main entry point for GitHub Actions."""
start_date = os.environ.get("INPUT_START_DATE")
end_date = os.environ.get("INPUT_END_DATE")
chunk_days = int(os.environ.get("INPUT_CHUNK_DAYS", "1"))
if not start_date or not end_date:
print("ERROR: INPUT_START_DATE and INPUT_END_DATE must be set", file=sys.stderr)
sys.exit(1)
chunks = generate_chunks(start_date, end_date, chunk_days)
print(f"Generated {len(chunks)} chunks for {start_date} to {end_date}")
# Write to GitHub Actions output
github_output = os.environ.get("GITHUB_OUTPUT")
if github_output:
with open(github_output, "a") as f:
f.write(f"chunks={json.dumps(chunks)}\n")
else:
# For local testing, just print
print(json.dumps(chunks, indent=2))
if __name__ == "__main__":
main()
+78
View File
@@ -0,0 +1,78 @@
"""
Main pipeline for processing ADS-B data from adsb.lol.
Usage:
python -m src.adsb.main --date 2026-01-01
python -m src.adsb.main --start_date 2026-01-01 --end_date 2026-01-03
"""
import argparse
import subprocess
import sys
from datetime import datetime, timedelta
import polars as pl
from src.adsb.download_and_list_icaos import NUMBER_PARTS
def main():
parser = argparse.ArgumentParser(description="Process ADS-B data for a single day or date range")
parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format")
parser.add_argument("--start_date", type=str, help="Start date (inclusive, YYYY-MM-DD)")
parser.add_argument("--end_date", type=str, help="End date (exclusive, YYYY-MM-DD)")
parser.add_argument("--concat_with_latest_csv", action="store_true", help="Also concatenate with latest CSV from GitHub releases")
args = parser.parse_args()
if args.date and (args.start_date or args.end_date):
raise SystemExit("Use --date or --start_date/--end_date, not both.")
if args.date:
start_date = datetime.strptime(args.date, "%Y-%m-%d")
end_date = start_date + timedelta(days=1)
else:
if not args.start_date or not args.end_date:
raise SystemExit("Provide --start_date and --end_date, or use --date.")
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
end_date = datetime.strptime(args.end_date, "%Y-%m-%d")
current = start_date
while current < end_date:
date_str = current.strftime("%Y-%m-%d")
print(f"Processing day: {date_str}")
# Download and split
subprocess.run([sys.executable, "-m", "src.adsb.download_and_list_icaos", "--date", date_str], check=True)
# Process parts
for part_id in range(NUMBER_PARTS):
subprocess.run([sys.executable, "-m", "src.adsb.process_icao_chunk", "--part-id", str(part_id), "--date", date_str], check=True)
# Concatenate
concat_cmd = [sys.executable, "-m", "src.adsb.concat_parquet_to_final", "--date", date_str]
if args.concat_with_latest_csv:
concat_cmd.append("--concat_with_latest_csv")
subprocess.run(concat_cmd, check=True)
current += timedelta(days=1)
if end_date - start_date > timedelta(days=1):
dates = []
cur = start_date
while cur < end_date:
dates.append(cur.strftime("%Y-%m-%d"))
cur += timedelta(days=1)
csv_files = [
f"data/outputs/openairframes_adsb_{d}_{d}.csv"
for d in dates
]
frames = [pl.read_csv(p) for p in csv_files]
df = pl.concat(frames, how="vertical", rechunk=True)
output_path = f"data/outputs/openairframes_adsb_{start_date.strftime('%Y-%m-%d')}_{end_date.strftime('%Y-%m-%d')}.csv"
df.write_csv(output_path)
print(f"Wrote combined CSV: {output_path}")
print("Done")
if __name__ == "__main__":
main()
+164
View File
@@ -0,0 +1,164 @@
"""
Processes trace files from a single archive part for a single day.
This is the map phase of the map-reduce pipeline.
Usage:
python -m src.adsb.process_icao_chunk --part-id 1 --date 2026-01-01
"""
import gc
import os
import sys
import argparse
import time
import concurrent.futures
from datetime import datetime, timedelta
import tarfile
import tempfile
import shutil
import pyarrow as pa
import pyarrow.parquet as pq
from src.adsb.download_adsb_data_to_parquet import (
OUTPUT_DIR,
PARQUET_DIR,
PARQUET_SCHEMA,
COLUMNS,
MAX_WORKERS,
process_file,
get_resource_usage,
collect_trace_files_with_find,
)
# Smaller batch size for memory efficiency
BATCH_SIZE = 100_000
def build_trace_file_map(archive_path: str) -> dict[str, str]:
"""Build a map of ICAO -> trace file path by extracting tar.gz archive."""
print(f"Extracting {archive_path}...")
temp_dir = tempfile.mkdtemp(prefix="adsb_extract_")
with tarfile.open(archive_path, 'r:gz') as tar:
tar.extractall(path=temp_dir, filter='data')
trace_map = collect_trace_files_with_find(temp_dir)
print(f"Found {len(trace_map)} trace files")
return trace_map
def safe_process(filepath: str) -> list:
"""Safely process a file, returning empty list on error."""
try:
return process_file(filepath)
except Exception as e:
print(f"Error processing {filepath}: {e}")
return []
def rows_to_table(rows: list) -> pa.Table:
"""Convert list of rows to PyArrow table."""
import pandas as pd
df = pd.DataFrame(rows, columns=COLUMNS)
if not df['time'].dt.tz:
df['time'] = df['time'].dt.tz_localize('UTC')
return pa.Table.from_pandas(df, schema=PARQUET_SCHEMA, preserve_index=False)
def process_chunk(
trace_files: list[str],
part_id: int,
date_str: str,
) -> str | None:
"""Process trace files and write to a single parquet file."""
output_path = os.path.join(PARQUET_DIR, f"part_{part_id}_{date_str}.parquet")
start_time = time.perf_counter()
total_rows = 0
batch_rows = []
writer = None
try:
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
files_per_batch = MAX_WORKERS * 100
for offset in range(0, len(trace_files), files_per_batch):
batch_files = trace_files[offset:offset + files_per_batch]
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
for rows in executor.map(safe_process, batch_files):
if rows:
batch_rows.extend(rows)
if len(batch_rows) >= BATCH_SIZE:
writer.write_table(rows_to_table(batch_rows))
total_rows += len(batch_rows)
batch_rows = []
gc.collect()
gc.collect()
if batch_rows:
writer.write_table(rows_to_table(batch_rows))
total_rows += len(batch_rows)
finally:
if writer:
writer.close()
print(f"Part {part_id}: Done! {total_rows} rows in {time.perf_counter() - start_time:.1f}s | {get_resource_usage()}")
return output_path if total_rows > 0 else None
from pathlib import Path
def main():
parser = argparse.ArgumentParser(description="Process a single archive part for a day")
parser.add_argument("--part-id", type=int, required=True, help="Part ID (1-indexed)")
parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format")
args = parser.parse_args()
print(f"Processing part {args.part_id} for {args.date}")
# Get specific archive file for this part
archive_dir = os.path.join(OUTPUT_DIR, "adsb_archives", args.date)
archive_path = os.path.join(archive_dir, f"{args.date}_part_{args.part_id}.tar.gz")
if not os.path.isfile(archive_path):
print(f"ERROR: Archive not found: {archive_path}")
if os.path.isdir(archive_dir):
print(f"Files in {archive_dir}: {os.listdir(archive_dir)}")
else:
print(f"Directory does not exist: {archive_dir}")
sys.exit(1)
# Extract and collect trace files
trace_map = build_trace_file_map(archive_path)
all_trace_files = list(trace_map.values())
print(f"Total trace files: {len(all_trace_files)}")
# Process and write output
output_path = process_chunk(all_trace_files, args.part_id, args.date)
from src.adsb.compress_adsb_to_aircraft_data import compress_parquet_part
df_compressed = compress_parquet_part(args.part_id, args.date)
# Write parquet
df_compressed_output = OUTPUT_DIR / "compressed" / args.date/ f"part_{args.part_id}_{args.date}.parquet"
os.makedirs(df_compressed_output.parent, exist_ok=True)
df_compressed.write_parquet(df_compressed_output, compression='snappy')
# Write CSV
csv_output = OUTPUT_DIR / "compressed" / args.date / f"part_{args.part_id}_{args.date}.csv"
df_compressed.write_csv(csv_output)
print(f"Raw output: {output_path}" if output_path else "No raw output generated")
print(f"Compressed parquet: {df_compressed_output}")
print(f"Compressed CSV: {csv_output}")
if __name__ == "__main__":
main()
-89
View File
@@ -1,89 +0,0 @@
from pathlib import Path
import pandas as pd
import re
from derive_from_faa_master_txt import concat_faa_historical_df
def concatenate_aircraft_csvs(
input_dir: Path = Path("data/concat"),
output_dir: Path = Path("data/planequery_aircraft"),
filename_pattern: str = r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv"
):
"""
Read all CSVs matching the pattern from input_dir in order,
concatenate them using concat_faa_historical_df, and output a single CSV.
Args:
input_dir: Directory containing the CSV files to concatenate
output_dir: Directory where the output CSV will be saved
filename_pattern: Regex pattern to match CSV filenames
"""
input_dir = Path(input_dir)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Find all matching CSV files
pattern = re.compile(filename_pattern)
csv_files = []
for csv_path in sorted(input_dir.glob("*.csv")):
match = pattern.search(csv_path.name)
if match:
start_date = match.group(1)
end_date = match.group(2)
csv_files.append((start_date, end_date, csv_path))
# Sort by start date, then end date
csv_files.sort(key=lambda x: (x[0], x[1]))
if not csv_files:
raise FileNotFoundError(f"No CSV files matching pattern found in {input_dir}")
print(f"Found {len(csv_files)} CSV files to concatenate")
# Read first CSV as base
first_start_date, first_end_date, first_path = csv_files[0]
print(f"Reading base file: {first_path.name}")
df_base = pd.read_csv(
first_path,
dtype={
'transponder_code': str,
'unique_regulatory_id': str,
'registrant_county': str
}
)
# Concatenate remaining CSVs
for start_date, end_date, csv_path in csv_files[1:]:
print(f"Concatenating: {csv_path.name}")
df_new = pd.read_csv(
csv_path,
dtype={
'transponder_code': str,
'unique_regulatory_id': str,
'registrant_county': str
}
)
df_base = concat_faa_historical_df(df_base, df_new)
# Verify monotonic increasing download_date
assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing"
# Output filename uses first start date and last end date
last_start_date, last_end_date, _ = csv_files[-1]
output_filename = f"planequery_aircraft_{first_start_date}_{last_end_date}.csv"
output_path = output_dir / output_filename
print(f"Writing output to: {output_path}")
df_base.to_csv(output_path, index=False)
print(f"Successfully concatenated {len(csv_files)} files into {output_filename}")
print(f"Total rows: {len(df_base)}")
return output_path
if __name__ == "__main__":
# Example usage - modify these paths as needed
concatenate_aircraft_csvs(
input_dir=Path("data/concat"),
output_dir=Path("data/planequery_aircraft")
)
+1
View File
@@ -0,0 +1 @@
"""Community contributions processing module."""
+320
View File
@@ -0,0 +1,320 @@
#!/usr/bin/env python3
"""
Approve a community submission and create a PR.
This script is called by the GitHub Actions workflow when the 'approved'
label is added to a validated submission issue.
Usage:
python -m src.contributions.approve_submission --issue-number 123 --issue-body "..." --author "username" --author-id 12345
Environment variables:
GITHUB_TOKEN: GitHub API token with repo write permissions
GITHUB_REPOSITORY: owner/repo
"""
import argparse
import base64
import json
import os
import sys
import urllib.request
import urllib.error
from datetime import datetime, timezone
from .schema import extract_json_from_issue_body, extract_contributor_name_from_issue_body, parse_and_validate, load_schema, SCHEMAS_DIR
from .contributor import (
generate_contributor_uuid,
generate_submission_filename,
compute_content_hash,
)
from .update_schema import generate_updated_schema, check_for_new_tags, get_existing_tag_definitions
from .read_community_data import build_tag_type_registry
def github_api_request(
method: str,
endpoint: str,
data: dict | None = None,
accept: str = "application/vnd.github.v3+json"
) -> dict:
"""Make a GitHub API request."""
token = os.environ.get("GITHUB_TOKEN")
repo = os.environ.get("GITHUB_REPOSITORY")
if not token or not repo:
raise EnvironmentError("GITHUB_TOKEN and GITHUB_REPOSITORY must be set")
url = f"https://api.github.com/repos/{repo}{endpoint}"
headers = {
"Authorization": f"token {token}",
"Accept": accept,
"Content-Type": "application/json",
}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req) as response:
response_body = response.read()
# DELETE requests return empty body (204 No Content)
if not response_body:
return {}
return json.loads(response_body)
except urllib.error.HTTPError as e:
error_body = e.read().decode() if e.fp else ""
print(f"GitHub API error: {e.code} {e.reason}: {error_body}", file=sys.stderr)
raise
def add_issue_comment(issue_number: int, body: str) -> None:
"""Add a comment to a GitHub issue."""
github_api_request("POST", f"/issues/{issue_number}/comments", {"body": body})
def get_default_branch_sha() -> str:
"""Get the SHA of the default branch (main)."""
ref = github_api_request("GET", "/git/ref/heads/main")
return ref["object"]["sha"]
def create_branch(branch_name: str, sha: str) -> None:
"""Create a new branch from a SHA."""
try:
github_api_request("POST", "/git/refs", {
"ref": f"refs/heads/{branch_name}",
"sha": sha,
})
except urllib.error.HTTPError as e:
if e.code == 422: # Branch exists
# Delete and recreate
try:
github_api_request("DELETE", f"/git/refs/heads/{branch_name}")
except urllib.error.HTTPError:
pass
github_api_request("POST", "/git/refs", {
"ref": f"refs/heads/{branch_name}",
"sha": sha,
})
else:
raise
def get_file_sha(path: str, branch: str) -> str | None:
"""Get the SHA of an existing file, or None if it doesn't exist."""
try:
response = github_api_request("GET", f"/contents/{path}?ref={branch}")
return response.get("sha")
except Exception:
return None
def create_or_update_file(path: str, content: str, message: str, branch: str) -> None:
"""Create or update a file in the repository."""
content_b64 = base64.b64encode(content.encode()).decode()
payload = {
"message": message,
"content": content_b64,
"branch": branch,
}
# If file exists, we need to include its SHA to update it
sha = get_file_sha(path, branch)
if sha:
payload["sha"] = sha
github_api_request("PUT", f"/contents/{path}", payload)
def create_pull_request(title: str, head: str, base: str, body: str) -> dict:
"""Create a pull request."""
return github_api_request("POST", "/pulls", {
"title": title,
"head": head,
"base": base,
"body": body,
})
def add_labels_to_issue(issue_number: int, labels: list[str]) -> None:
"""Add labels to an issue or PR."""
github_api_request("POST", f"/issues/{issue_number}/labels", {"labels": labels})
def process_submission(
issue_number: int,
issue_body: str,
author_username: str,
author_id: int,
) -> bool:
"""
Process an approved submission and create a PR.
Args:
issue_number: The GitHub issue number
issue_body: The issue body text
author_username: The GitHub username of the issue author
author_id: The numeric GitHub user ID
Returns:
True if successful, False otherwise
"""
# Extract and validate JSON
json_str = extract_json_from_issue_body(issue_body)
if not json_str:
add_issue_comment(issue_number, "❌ Could not extract JSON from submission.")
return False
data, errors = parse_and_validate(json_str)
if errors or data is None:
error_list = "\n".join(f"- {e}" for e in errors) if errors else "Unknown error"
add_issue_comment(issue_number, f"❌ **Validation Failed**\n\n{error_list}")
return False
# Normalize to list
submissions: list[dict] = data if isinstance(data, list) else [data]
# Generate contributor UUID from GitHub ID
contributor_uuid = generate_contributor_uuid(author_id)
# Extract contributor name from issue form (None means user opted out of attribution)
contributor_name = extract_contributor_name_from_issue_body(issue_body)
# Add metadata to each submission
now = datetime.now(timezone.utc)
date_str = now.strftime("%Y-%m-%d")
timestamp_str = now.isoformat()
for submission in submissions:
submission["contributor_uuid"] = contributor_uuid
if contributor_name:
submission["contributor_name"] = contributor_name
submission["creation_timestamp"] = timestamp_str
# Generate unique filename
content_json = json.dumps(submissions, indent=2, sort_keys=True)
content_hash = compute_content_hash(content_json)
filename = generate_submission_filename(author_username, date_str, content_hash)
file_path = f"community/{date_str}/{filename}"
# Create branch
branch_name = f"community-submission-{issue_number}"
default_sha = get_default_branch_sha()
create_branch(branch_name, default_sha)
# Create file
commit_message = f"Add community submission from @{author_username} (closes #{issue_number})"
create_or_update_file(file_path, content_json, commit_message, branch_name)
# Update schema with any new tags (modifies v1 in place)
schema_updated = False
new_tags = []
try:
# Build tag registry from new submissions
tag_registry = build_tag_type_registry(submissions)
# Get current schema and merge existing tags
current_schema = load_schema()
existing_tags = get_existing_tag_definitions(current_schema)
# Merge existing tags into registry
for tag_name, tag_def in existing_tags.items():
if tag_name not in tag_registry:
tag_type = tag_def.get("type", "string")
tag_registry[tag_name] = tag_type
# Check for new tags
new_tags = check_for_new_tags(tag_registry, current_schema)
if new_tags:
# Generate updated schema
updated_schema = generate_updated_schema(current_schema, tag_registry)
schema_json = json.dumps(updated_schema, indent=2) + "\n"
create_or_update_file(
"schemas/community_submission.v1.schema.json",
schema_json,
f"Update schema with new tags: {', '.join(new_tags)}",
branch_name
)
schema_updated = True
except Exception as e:
print(f"Warning: Could not update schema: {e}", file=sys.stderr)
# Create PR
schema_note = ""
if schema_updated:
schema_note = f"\n**Schema Updated:** Added new tags: `{', '.join(new_tags)}`\n"
# Truncate JSON preview to stay under GitHub's 65536 char body limit
max_json_preview = 50000
if len(content_json) > max_json_preview:
# Show first few entries as a preview
preview_entries = submissions[:10]
preview_json = json.dumps(preview_entries, indent=2, sort_keys=True)
json_section = (
f"### Submissions (showing 10 of {len(submissions)})\n"
f"```json\n{preview_json}\n```\n\n"
f"*Full submission ({len(submissions)} entries, {len(content_json):,} chars) is in the committed file.*"
)
else:
json_section = f"### Submissions\n```json\n{content_json}\n```"
pr_body = f"""## Community Submission
Adds {len(submissions)} submission(s) from @{author_username}.
**File:** `{file_path}`
**Contributor UUID:** `{contributor_uuid}`
{schema_note}
Closes #{issue_number}
---
{json_section}"""
pr = create_pull_request(
title=f"Community submission: {filename}",
head=branch_name,
base="main",
body=pr_body,
)
# Add labels to PR
add_labels_to_issue(pr["number"], ["community", "auto-generated"])
# Comment on original issue
add_issue_comment(
issue_number,
f"✅ **Submission Approved**\n\n"
f"PR #{pr['number']} has been created to add your submission.\n\n"
f"**File:** `{file_path}`\n"
f"**Your Contributor UUID:** `{contributor_uuid}`\n\n"
f"The PR will be merged by a maintainer."
)
print(f"Created PR #{pr['number']} for submission")
return True
def main():
parser = argparse.ArgumentParser(description="Approve community submission and create PR")
parser.add_argument("--issue-number", type=int, required=True, help="GitHub issue number")
parser.add_argument("--issue-body", required=True, help="Issue body text")
parser.add_argument("--author", required=True, help="Issue author username")
parser.add_argument("--author-id", type=int, required=True, help="Issue author numeric ID")
args = parser.parse_args()
success = process_submission(
issue_number=args.issue_number,
issue_body=args.issue_body,
author_username=args.author,
author_id=args.author_id,
)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
+86
View File
@@ -0,0 +1,86 @@
"""Contributor identification utilities."""
import hashlib
import uuid
# DNS namespace UUID for generating UUIDv5
DNS_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
def generate_contributor_uuid(github_user_id: int) -> str:
"""
Generate a deterministic UUID v5 from a GitHub user ID.
This ensures the same GitHub account always gets the same contributor UUID.
Args:
github_user_id: The numeric GitHub user ID
Returns:
UUID string in standard format
"""
name = f"github:{github_user_id}"
return str(uuid.uuid5(DNS_NAMESPACE, name))
def sanitize_username(username: str, max_length: int = 20) -> str:
"""
Sanitize a GitHub username for use in filenames.
Args:
username: GitHub username
max_length: Maximum length of sanitized name
Returns:
Lowercase alphanumeric string with underscores
"""
sanitized = ""
for char in username.lower():
if char.isalnum():
sanitized += char
else:
sanitized += "_"
# Collapse multiple underscores
while "__" in sanitized:
sanitized = sanitized.replace("__", "_")
return sanitized.strip("_")[:max_length]
def generate_submission_filename(
username: str,
date_str: str,
content_hash: str,
extension: str = ".json"
) -> str:
"""
Generate a unique filename for a community submission.
Format: {sanitized_username}_{date}_{short_hash}.json
Args:
username: GitHub username
date_str: Date in YYYY-MM-DD format
content_hash: Hash of the submission content (will be truncated to 8 chars)
extension: File extension (default: .json)
Returns:
Unique filename string
"""
sanitized_name = sanitize_username(username)
short_hash = content_hash[:8]
return f"{sanitized_name}_{date_str}_{short_hash}{extension}"
def compute_content_hash(content: str) -> str:
"""
Compute SHA256 hash of content.
Args:
content: String content to hash
Returns:
Hex digest of SHA256 hash
"""
return hashlib.sha256(content.encode()).hexdigest()
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Download ADS-B Exchange basic-ac-db.json.gz.
Usage:
python -m src.contributions.create_daily_adsbexchange_release [--date YYYY-MM-DD]
"""
from __future__ import annotations
import argparse
import shutil
from datetime import datetime, timezone
from pathlib import Path
from urllib.request import Request, urlopen
URL = "https://downloads.adsbexchange.com/downloads/basic-ac-db.json.gz"
OUT_ROOT = Path("data/openairframes")
def main() -> None:
parser = argparse.ArgumentParser(description="Create daily ADS-B Exchange JSON release")
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today UTC)")
args = parser.parse_args()
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
gz_path = OUT_ROOT / f"basic-ac-db_{date_str}.json.gz"
print(f"Downloading {URL}...")
req = Request(URL, headers={"User-Agent": "openairframes-downloader/1.0"}, method="GET")
with urlopen(req, timeout=300) as r, gz_path.open("wb") as f:
shutil.copyfileobj(r, f)
print(f"Wrote: {gz_path}")
if __name__ == "__main__":
main()
@@ -0,0 +1,141 @@
#!/usr/bin/env python3
"""
Generate a daily CSV of all community contributions.
Reads all JSON files from the community/ directory and outputs a sorted CSV
with creation_timestamp as the first column and contributor_name/contributor_uuid as the last columns.
Usage:
python -m src.contributions.create_daily_community_release
"""
from datetime import datetime, timezone
from pathlib import Path
import json
import sys
import pandas as pd
COMMUNITY_DIR = Path(__file__).parent.parent.parent / "community"
OUT_ROOT = Path("data/openairframes")
def read_all_submissions(community_dir: Path) -> list[dict]:
"""Read all JSON submissions from the community directory."""
all_submissions = []
for json_file in sorted(community_dir.glob("**/*.json")):
try:
with open(json_file) as f:
data = json.load(f)
# Normalize to list
submissions = data if isinstance(data, list) else [data]
all_submissions.extend(submissions)
except (json.JSONDecodeError, OSError) as e:
print(f"Warning: Failed to read {json_file}: {e}", file=sys.stderr)
return all_submissions
def submissions_to_dataframe(submissions: list[dict]) -> pd.DataFrame:
"""
Convert submissions to a DataFrame with proper column ordering.
Column order:
- creation_timestamp (first)
- transponder_code_hex
- registration_number
- openairframes_id
- contributor_name
- [other columns alphabetically]
- contributor_uuid (last)
"""
if not submissions:
return pd.DataFrame()
df = pd.DataFrame(submissions)
# Ensure required columns exist
required_cols = [
"creation_timestamp",
"transponder_code_hex",
"registration_number",
"openairframes_id",
"contributor_name",
"contributor_uuid",
]
for col in required_cols:
if col not in df.columns:
df[col] = None
# Sort by creation_timestamp ascending
df = df.sort_values("creation_timestamp", ascending=True, na_position="last")
# Reorder columns: specific order first, contributor_uuid last
first_cols = [
"creation_timestamp",
"transponder_code_hex",
"registration_number",
"openairframes_id",
"contributor_name",
]
last_cols = ["contributor_uuid"]
middle_cols = sorted([
col for col in df.columns
if col not in first_cols and col not in last_cols
])
ordered_cols = first_cols + middle_cols + last_cols
df = df[ordered_cols]
return df.reset_index(drop=True)
def main():
"""Generate the daily community contributions CSV."""
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
print(f"Reading community submissions from {COMMUNITY_DIR}")
submissions = read_all_submissions(COMMUNITY_DIR)
if not submissions:
print("No community submissions found.")
# Still create an empty CSV with headers
df = pd.DataFrame(columns=[
"creation_timestamp",
"transponder_code_hex",
"registration_number",
"openairframes_id",
"contributor_name",
"tags",
"contributor_uuid",
])
else:
print(f"Found {len(submissions)} total submissions")
df = submissions_to_dataframe(submissions)
# Determine date range for filename
if not df.empty and df["creation_timestamp"].notna().any():
# Get earliest timestamp for start date
earliest = pd.to_datetime(df["creation_timestamp"]).min()
start_date_str = earliest.strftime("%Y-%m-%d")
else:
start_date_str = date_str
# Output
OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"openairframes_community_{start_date_str}_{date_str}.csv"
df.to_csv(output_file, index=False)
print(f"Saved: {output_file}")
print(f"Total contributions: {len(df)}")
return output_file
if __name__ == "__main__":
main()
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
Download Mictronics aircraft database zip.
Usage:
python -m src.contributions.create_daily_microtonics_release [--date YYYY-MM-DD]
"""
from __future__ import annotations
import argparse
import shutil
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.error import URLError
from urllib.request import Request, urlopen
URL = "https://www.mictronics.de/aircraft-database/indexedDB_old.php"
OUT_ROOT = Path("data/openairframes")
MAX_RETRIES = 3
RETRY_DELAY = 30 # seconds
def main() -> None:
parser = argparse.ArgumentParser(description="Create daily Mictronics database release")
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today UTC)")
args = parser.parse_args()
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
zip_path = OUT_ROOT / f"mictronics-db_{date_str}.zip"
for attempt in range(1, MAX_RETRIES + 1):
try:
print(f"Downloading {URL} (attempt {attempt}/{MAX_RETRIES})...")
req = Request(URL, headers={"User-Agent": "Mozilla/5.0 (compatible; openairframes-downloader/1.0)"}, method="GET")
with urlopen(req, timeout=120) as r, zip_path.open("wb") as f:
shutil.copyfileobj(r, f)
print(f"Wrote: {zip_path}")
return
except (URLError, TimeoutError) as e:
print(f"Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES:
print(f"Retrying in {RETRY_DELAY} seconds...")
time.sleep(RETRY_DELAY)
else:
print("All retries exhausted. Mictronics download failed.")
sys.exit(1)
if __name__ == "__main__":
main()
+162
View File
@@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""
Read and aggregate all community submission data.
Usage:
python -m src.contributions.read_community_data
python -m src.contributions.read_community_data --output merged.json
"""
import argparse
import json
import sys
from pathlib import Path
COMMUNITY_DIR = Path(__file__).parent.parent.parent / "community"
def read_all_submissions(community_dir: Path | None = None) -> list[dict]:
"""
Read all JSON submissions from the community directory.
Args:
community_dir: Path to community directory. Uses default if None.
Returns:
List of all submission dictionaries
"""
if community_dir is None:
community_dir = COMMUNITY_DIR
all_submissions = []
# Search both root directory and date subdirectories (e.g., 2026-02-12/)
for json_file in sorted(community_dir.glob("**/*.json")):
try:
with open(json_file) as f:
data = json.load(f)
# Normalize to list
submissions = data if isinstance(data, list) else [data]
# Add source file metadata
for submission in submissions:
submission["_source_file"] = json_file.name
all_submissions.extend(submissions)
except (json.JSONDecodeError, OSError) as e:
print(f"Warning: Failed to read {json_file}: {e}", file=sys.stderr)
return all_submissions
def get_python_type_name(value) -> str:
"""Get a normalized type name for a value."""
if value is None:
return "null"
if isinstance(value, bool):
return "boolean"
if isinstance(value, int):
return "integer"
if isinstance(value, float):
return "number"
if isinstance(value, str):
return "string"
if isinstance(value, list):
return "array"
if isinstance(value, dict):
return "object"
return type(value).__name__
def build_tag_type_registry(submissions: list[dict]) -> dict[str, str]:
"""
Build a registry of tag names to their expected types from existing submissions.
Args:
submissions: List of existing submission dictionaries
Returns:
Dict mapping tag name to expected type (e.g., {"internet": "string", "year_built": "integer"})
"""
tag_types = {}
for submission in submissions:
tags = submission.get("tags", {})
if not isinstance(tags, dict):
continue
for key, value in tags.items():
inferred_type = get_python_type_name(value)
if key not in tag_types:
tag_types[key] = inferred_type
# If there's a conflict, keep the first type (it's already in use)
return tag_types
def group_by_identifier(submissions: list[dict]) -> dict[str, list[dict]]:
"""
Group submissions by their identifier (registration, transponder, or airframe ID).
Returns:
Dict mapping identifier to list of submissions for that identifier
"""
grouped = {}
for submission in submissions:
# Determine identifier
if "registration_number" in submission:
key = f"reg:{submission['registration_number']}"
elif "transponder_code_hex" in submission:
key = f"icao:{submission['transponder_code_hex']}"
elif "openairframes_id" in submission:
key = f"id:{submission['openairframes_id']}"
else:
key = "_unknown"
if key not in grouped:
grouped[key] = []
grouped[key].append(submission)
return grouped
def main():
parser = argparse.ArgumentParser(description="Read community submission data")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
parser.add_argument("--group", action="store_true", help="Group by identifier")
parser.add_argument("--stats", action="store_true", help="Print statistics only")
args = parser.parse_args()
submissions = read_all_submissions()
if args.stats:
grouped = group_by_identifier(submissions)
contributors = set(s.get("contributor_uuid", "unknown") for s in submissions)
print(f"Total submissions: {len(submissions)}")
print(f"Unique identifiers: {len(grouped)}")
print(f"Unique contributors: {len(contributors)}")
return
if args.group:
result = group_by_identifier(submissions)
else:
result = submissions
output = json.dumps(result, indent=2)
if args.output:
with open(args.output, "w") as f:
f.write(output)
print(f"Wrote {len(submissions)} submissions to {args.output}")
else:
print(output)
if __name__ == "__main__":
main()
+66
View File
@@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""
Regenerate schema for a PR branch after main has been merged in.
This script looks at the submission files in this branch and updates
the schema if new tags were introduced.
Usage: python -m src.contributions.regenerate_pr_schema
"""
import json
import sys
from pathlib import Path
# Add parent to path for imports when running as script
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from src.contributions.read_community_data import read_all_submissions, build_tag_type_registry
from src.contributions.update_schema import (
get_existing_tag_definitions,
check_for_new_tags,
generate_updated_schema,
)
from src.contributions.schema import load_schema, SCHEMAS_DIR
def main():
"""Main entry point."""
# Load current schema
current_schema = load_schema()
# Get existing tag definitions from schema
existing_tags = get_existing_tag_definitions(current_schema)
# Read all submissions (including ones from this PR branch)
submissions = read_all_submissions()
if not submissions:
print("No submissions found")
return
# Build tag registry from all submissions
tag_registry = build_tag_type_registry(submissions)
# Check for new tags not in the current schema
new_tags = check_for_new_tags(tag_registry, current_schema)
if new_tags:
print(f"Found new tags: {new_tags}")
print("Updating schema...")
# Generate updated schema
updated_schema = generate_updated_schema(current_schema, tag_registry)
# Write updated schema (in place)
schema_path = SCHEMAS_DIR / "community_submission.v1.schema.json"
with open(schema_path, 'w') as f:
json.dump(updated_schema, f, indent=2)
f.write("\n")
print(f"Updated {schema_path}")
else:
print("No new tags found, schema is up to date")
if __name__ == "__main__":
main()
+287
View File
@@ -0,0 +1,287 @@
"""Schema validation for community submissions."""
import json
import re
from pathlib import Path
from typing import Any
try:
from jsonschema import Draft202012Validator
except ImportError:
Draft202012Validator = None
SCHEMAS_DIR = Path(__file__).parent.parent.parent / "schemas"
# For backwards compatibility
SCHEMA_PATH = SCHEMAS_DIR / "community_submission.v1.schema.json"
def get_latest_schema_version() -> int:
"""
Find the latest schema version number.
Returns:
Latest version number (e.g., 1, 2, 3)
"""
import re
pattern = re.compile(r"community_submission\.v(\d+)\.schema\.json$")
max_version = 0
for path in SCHEMAS_DIR.glob("community_submission.v*.schema.json"):
match = pattern.search(path.name)
if match:
version = int(match.group(1))
max_version = max(max_version, version)
return max_version
def _is_balanced_json(text: str) -> bool:
"""
Check if JSON has balanced brackets/braces.
This is a simple check to ensure we captured complete JSON.
Ignores brackets/braces inside strings.
Args:
text: JSON text to check
Returns:
True if balanced, False otherwise
"""
in_string = False
escape = False
stack = []
pairs = {'[': ']', '{': '}'}
for char in text:
if escape:
escape = False
continue
if char == '\\':
escape = True
continue
if char == '"' and not escape:
in_string = not in_string
continue
if in_string:
continue
if char in pairs:
stack.append(char)
elif char in pairs.values():
if not stack:
return False
if pairs[stack[-1]] != char:
return False
stack.pop()
return len(stack) == 0 and not in_string
def get_schema_path(version: int | None = None) -> Path:
"""
Get path to a specific schema version, or latest if version is None.
Args:
version: Schema version number, or None for latest
Returns:
Path to schema file
"""
if version is None:
version = get_latest_schema_version()
return SCHEMAS_DIR / f"community_submission.v{version}.schema.json"
def load_schema(version: int | None = None) -> dict:
"""
Load the community submission schema.
Args:
version: Schema version to load. If None, loads the latest version.
Returns:
Schema dict
"""
schema_path = get_schema_path(version)
with open(schema_path) as f:
return json.load(f)
def validate_submission(data: dict | list, schema: dict | None = None) -> list[str]:
"""
Validate submission(s) against schema.
Args:
data: Single submission dict or list of submissions
schema: Optional schema dict. If None, loads from default path.
Returns:
List of error messages. Empty list means validation passed.
"""
if Draft202012Validator is None:
raise ImportError("jsonschema is required: pip install jsonschema")
if schema is None:
schema = load_schema()
submissions = data if isinstance(data, list) else [data]
errors = []
validator = Draft202012Validator(schema)
for i, submission in enumerate(submissions):
prefix = f"[{i}] " if len(submissions) > 1 else ""
for error in validator.iter_errors(submission):
path = ".".join(str(p) for p in error.path) if error.path else "(root)"
errors.append(f"{prefix}{path}: {error.message}")
return errors
def download_github_attachment(url: str) -> str | None:
"""
Download content from a GitHub attachment URL.
Args:
url: GitHub attachment URL (e.g., https://github.com/user-attachments/files/...)
Returns:
File content as string, or None if download failed
"""
import urllib.request
import urllib.error
try:
req = urllib.request.Request(url, headers={"User-Agent": "OpenAirframes-Bot"})
with urllib.request.urlopen(req, timeout=30) as response:
return response.read().decode("utf-8")
except (urllib.error.URLError, urllib.error.HTTPError, UnicodeDecodeError) as e:
print(f"Failed to download attachment from {url}: {e}")
return None
def extract_json_from_issue_body(body: str) -> str | None:
"""
Extract JSON from GitHub issue body.
Looks for JSON in the 'Submission JSON' section, either:
- A GitHub file attachment URL (drag-and-drop .json file)
- Wrapped in code blocks (```json ... ``` or ``` ... ```)
- Or raw JSON after the header
Args:
body: The issue body text
Returns:
Extracted JSON string or None if not found
"""
# Try: GitHub attachment URL in the Submission JSON section
# Format: [filename.json](https://github.com/user-attachments/files/...)
# Or just the raw URL
pattern_attachment = r"### Submission JSON\s*\n[\s\S]*?(https://github\.com/(?:user-attachments/files|.*?/files)/[^\s\)\]]+\.json)"
match = re.search(pattern_attachment, body)
if match:
url = match.group(1)
content = download_github_attachment(url)
if content:
return content.strip()
# Also check for GitHub user-attachments URL anywhere in submission section
pattern_attachment_alt = r"\[.*?\.json\]\((https://github\.com/[^\)]+)\)"
match = re.search(pattern_attachment_alt, body)
if match:
url = match.group(1)
if ".json" in url or "user-attachments" in url:
content = download_github_attachment(url)
if content:
return content.strip()
# Try: JSON in code blocks after "### Submission JSON"
pattern_codeblock = r"### Submission JSON\s*\n\s*```(?:json)?\s*\n([\s\S]*?)\n\s*```"
match = re.search(pattern_codeblock, body)
if match:
return match.group(1).strip()
# Try: Raw JSON after "### Submission JSON" until next section or end
# Use greedy matching since we have a clear boundary (next ### or end)
pattern_raw = r"### Submission JSON\s*\n\s*([\[{][\s\S]*[\]}])(?=\s*\n###|\s*$)"
match = re.search(pattern_raw, body)
if match:
candidate = match.group(1).strip()
# Validate it's complete JSON by checking balanced brackets
if _is_balanced_json(candidate):
return candidate
# Try: Any JSON object/array in the body (fallback)
pattern_any = r"([\[{][\s\S]*?[\]}])"
for match in re.finditer(pattern_any, body):
candidate = match.group(1).strip()
# Validate it looks like JSON
if candidate.startswith('{') and candidate.endswith('}'):
return candidate
if candidate.startswith('[') and candidate.endswith(']'):
return candidate
return None
def extract_contributor_name_from_issue_body(body: str) -> str | None:
"""
Extract contributor name from GitHub issue body.
Looks for the 'Contributor Name' field in the issue form.
Args:
body: The issue body text
Returns:
Contributor name string or None if not found/empty
"""
# Match "### Contributor Name" section
pattern = r"### Contributor Name\s*\n\s*(.+?)(?=\n###|\n\n|$)"
match = re.search(pattern, body)
if match:
name = match.group(1).strip()
# GitHub issue forms show "_No response_" for empty optional fields
if name and name != "_No response_":
return name
return None
def parse_and_validate(json_str: str, schema: dict | None = None) -> tuple[list | dict | None, list[str]]:
"""
Parse JSON string and validate against schema.
Args:
json_str: JSON string to parse
schema: Optional schema dict
Returns:
Tuple of (parsed data or None, list of errors)
"""
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
# Provide detailed error context
error_msg = f"Invalid JSON: {e}"
# Show context around the error position
if hasattr(e, 'pos') and e.pos is not None:
start = max(0, e.pos - 50)
end = min(len(json_str), e.pos + 50)
context = json_str[start:end]
# Escape for readability
context_escaped = repr(context)
error_msg += f"\n\nContext around position {e.pos}: {context_escaped}"
return None, [error_msg]
errors = validate_submission(data, schema)
return data, errors
+154
View File
@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""
Update the schema with tag type definitions from existing submissions.
This script reads all community submissions and generates a new schema version
that includes explicit type definitions for all known tags.
When new tags are introduced, a new schema version is created (e.g., v1 -> v2 -> v3).
Usage:
python -m src.contributions.update_schema
python -m src.contributions.update_schema --check # Check if update needed
"""
import argparse
import json
import sys
from pathlib import Path
from .read_community_data import read_all_submissions, build_tag_type_registry
from .schema import SCHEMAS_DIR, get_latest_schema_version, get_schema_path, load_schema
def get_existing_tag_definitions(schema: dict) -> dict[str, dict]:
"""Extract existing tag property definitions from schema."""
tags_props = schema.get("properties", {}).get("tags", {}).get("properties", {})
return tags_props
def type_name_to_json_schema(type_name: str) -> dict:
"""Convert a type name to a JSON Schema type definition."""
type_map = {
"string": {"type": "string"},
"integer": {"type": "integer"},
"number": {"type": "number"},
"boolean": {"type": "boolean"},
"null": {"type": "null"},
"array": {"type": "array", "items": {"$ref": "#/$defs/tagScalar"}},
"object": {"type": "object", "additionalProperties": {"$ref": "#/$defs/tagScalar"}},
}
return type_map.get(type_name, {"$ref": "#/$defs/tagValue"})
def generate_updated_schema(base_schema: dict, tag_registry: dict[str, str]) -> dict:
"""
Generate an updated schema with explicit tag definitions.
Args:
base_schema: The current schema to update
tag_registry: Dict mapping tag name to type name
Returns:
Updated schema dict
"""
schema = json.loads(json.dumps(base_schema)) # Deep copy
# Build tag properties with explicit types
tag_properties = {}
for tag_name, type_name in sorted(tag_registry.items()):
tag_properties[tag_name] = type_name_to_json_schema(type_name)
# Only add/update the properties key within tags, preserve everything else
if "properties" in schema and "tags" in schema["properties"]:
schema["properties"]["tags"]["properties"] = tag_properties
return schema
def check_for_new_tags(tag_registry: dict[str, str], current_schema: dict) -> list[str]:
"""
Check which tags in the registry are not yet defined in the schema.
Returns:
List of new tag names
"""
existing_tags = get_existing_tag_definitions(current_schema)
return [tag for tag in tag_registry if tag not in existing_tags]
def update_schema_file(
tag_registry: dict[str, str],
check_only: bool = False
) -> tuple[bool, list[str]]:
"""
Update the v1 schema file with new tag definitions.
Args:
tag_registry: Dict mapping tag name to type name
check_only: If True, only check if update is needed without writing
Returns:
Tuple of (was_updated, list_of_new_tags)
"""
current_schema = load_schema()
# Find new tags
new_tags = check_for_new_tags(tag_registry, current_schema)
if not new_tags:
return False, []
if check_only:
return True, new_tags
# Generate and write updated schema (in place)
updated_schema = generate_updated_schema(current_schema, tag_registry)
schema_path = get_schema_path()
with open(schema_path, "w") as f:
json.dump(updated_schema, f, indent=2)
f.write("\n")
return True, new_tags
def update_schema_from_submissions(check_only: bool = False) -> tuple[bool, list[str]]:
"""
Read all submissions and update the schema if needed.
Args:
check_only: If True, only check if update is needed without writing
Returns:
Tuple of (was_updated, list_of_new_tags)
"""
submissions = read_all_submissions()
tag_registry = build_tag_type_registry(submissions)
return update_schema_file(tag_registry, check_only)
def main():
parser = argparse.ArgumentParser(description="Update schema with tag definitions")
parser.add_argument("--check", action="store_true", help="Check if update needed without writing")
args = parser.parse_args()
was_updated, new_tags = update_schema_from_submissions(check_only=args.check)
if args.check:
if was_updated:
print(f"Schema update needed. New tags: {', '.join(new_tags)}")
sys.exit(1)
else:
print("Schema is up to date")
sys.exit(0)
else:
if was_updated:
print(f"Updated {get_schema_path()}")
print(f"Added tags: {', '.join(new_tags)}")
else:
print("No update needed")
if __name__ == "__main__":
main()
+218
View File
@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""
Validate a community submission from a GitHub issue.
This script is called by the GitHub Actions workflow to validate
submissions when issues are opened or edited.
Usage:
python -m src.contributions.validate_submission --issue-body "..."
python -m src.contributions.validate_submission --issue-body-file /path/to/body.txt
python -m src.contributions.validate_submission --file submission.json
echo '{"registration_number": "N12345"}' | python -m src.contributions.validate_submission --stdin
Environment variables (for GitHub Actions):
GITHUB_TOKEN: GitHub API token
GITHUB_REPOSITORY: owner/repo
ISSUE_NUMBER: Issue number to comment on
"""
import argparse
import json
import os
import sys
import urllib.request
import urllib.error
from .schema import extract_json_from_issue_body, parse_and_validate, load_schema
from .read_community_data import read_all_submissions, build_tag_type_registry, get_python_type_name
def github_api_request(method: str, endpoint: str, data: dict | None = None) -> dict:
"""Make a GitHub API request."""
token = os.environ.get("GITHUB_TOKEN")
repo = os.environ.get("GITHUB_REPOSITORY")
if not token or not repo:
raise EnvironmentError("GITHUB_TOKEN and GITHUB_REPOSITORY must be set")
url = f"https://api.github.com/repos/{repo}{endpoint}"
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
"Content-Type": "application/json",
}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
with urllib.request.urlopen(req) as response:
return json.loads(response.read())
def add_issue_comment(issue_number: int, body: str) -> None:
"""Add a comment to a GitHub issue."""
github_api_request("POST", f"/issues/{issue_number}/comments", {"body": body})
def add_issue_label(issue_number: int, label: str) -> None:
"""Add a label to a GitHub issue."""
github_api_request("POST", f"/issues/{issue_number}/labels", {"labels": [label]})
def remove_issue_label(issue_number: int, label: str) -> None:
"""Remove a label from a GitHub issue."""
try:
github_api_request("DELETE", f"/issues/{issue_number}/labels/{label}")
except urllib.error.HTTPError:
pass # Label might not exist
def validate_tag_consistency(data: dict | list, tag_registry: dict[str, str]) -> list[str]:
"""
Check that tag types in new submissions match existing tag types.
Args:
data: Single submission dict or list of submissions
tag_registry: Dict mapping tag name to expected type
Returns:
List of error messages. Empty list means validation passed.
"""
errors = []
submissions = data if isinstance(data, list) else [data]
for i, submission in enumerate(submissions):
prefix = f"[{i}] " if len(submissions) > 1 else ""
tags = submission.get("tags", {})
if not isinstance(tags, dict):
continue
for key, value in tags.items():
actual_type = get_python_type_name(value)
if key in tag_registry:
expected_type = tag_registry[key]
if actual_type != expected_type:
errors.append(
f"{prefix}tags.{key}: expected type '{expected_type}', got '{actual_type}'"
)
return errors
def validate_and_report(json_str: str, issue_number: int | None = None) -> bool:
"""
Validate JSON and optionally report to GitHub issue.
Args:
json_str: JSON string to validate
issue_number: Optional issue number to comment on
Returns:
True if validation passed, False otherwise
"""
data, errors = parse_and_validate(json_str)
if errors:
error_list = "\n".join(f"- {e}" for e in errors)
message = f"❌ **Validation Failed**\n\n{error_list}\n\nPlease fix the errors and edit your submission."
print(message, file=sys.stderr)
if issue_number:
add_issue_comment(issue_number, message)
remove_issue_label(issue_number, "validated")
return False
# Check tag type consistency against existing submissions
if data is not None:
try:
existing_submissions = read_all_submissions()
tag_registry = build_tag_type_registry(existing_submissions)
tag_errors = validate_tag_consistency(data, tag_registry)
if tag_errors:
error_list = "\n".join(f"- {e}" for e in tag_errors)
message = (
f"❌ **Tag Type Mismatch**\n\n"
f"Your submission uses tags with types that don't match existing submissions:\n\n"
f"{error_list}\n\n"
f"Please use the same type as existing tags, or use a different tag name."
)
print(message, file=sys.stderr)
if issue_number:
add_issue_comment(issue_number, message)
remove_issue_label(issue_number, "validated")
return False
except Exception as e:
# Don't fail validation if we can't read existing submissions
print(f"Warning: Could not check tag consistency: {e}", file=sys.stderr)
count = len(data) if isinstance(data, list) else 1
message = f"✅ **Validation Passed**\n\n{count} submission(s) validated successfully against the schema.\n\nA maintainer can approve this submission by adding the `approved` label."
print(message)
if issue_number:
add_issue_comment(issue_number, message)
add_issue_label(issue_number, "validated")
return True
def main():
parser = argparse.ArgumentParser(description="Validate community submission JSON")
source_group = parser.add_mutually_exclusive_group(required=True)
source_group.add_argument("--issue-body", help="Issue body text containing JSON")
source_group.add_argument("--issue-body-file", help="File containing issue body text")
source_group.add_argument("--file", help="JSON file to validate")
source_group.add_argument("--stdin", action="store_true", help="Read JSON from stdin")
parser.add_argument("--issue-number", type=int, help="GitHub issue number to comment on")
args = parser.parse_args()
# Get JSON string
if args.issue_body:
json_str = extract_json_from_issue_body(args.issue_body)
if not json_str:
print("❌ Could not extract JSON from issue body", file=sys.stderr)
if args.issue_number:
add_issue_comment(
args.issue_number,
"❌ **Validation Failed**\n\nCould not extract JSON from submission. "
"Please ensure your JSON is in the 'Submission JSON' field wrapped in code blocks."
)
sys.exit(1)
elif args.issue_body_file:
with open(args.issue_body_file) as f:
issue_body = f.read()
json_str = extract_json_from_issue_body(issue_body)
if not json_str:
print("❌ Could not extract JSON from issue body", file=sys.stderr)
print(f"Issue body:\n{issue_body}", file=sys.stderr)
if args.issue_number:
add_issue_comment(
args.issue_number,
"❌ **Validation Failed**\n\nCould not extract JSON from submission. "
"Please ensure your JSON is in the 'Submission JSON' field."
)
sys.exit(1)
elif args.file:
with open(args.file) as f:
json_str = f.read()
else: # stdin
json_str = sys.stdin.read()
# Validate
success = validate_and_report(json_str, args.issue_number)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
+49
View File
@@ -0,0 +1,49 @@
from pathlib import Path
from datetime import datetime, timezone, timedelta
import argparse
parser = argparse.ArgumentParser(description="Create daily FAA release")
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today)")
args = parser.parse_args()
if args.date:
date_str = args.date
else:
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
out_dir = Path("data/faa_releasable")
out_dir.mkdir(parents=True, exist_ok=True)
zip_name = f"ReleasableAircraft_{date_str}.zip"
zip_path = out_dir / zip_name
if not zip_path.exists():
# URL and paths
url = "https://registry.faa.gov/database/ReleasableAircraft.zip"
from urllib.request import Request, urlopen
req = Request(
url,
headers={"User-Agent": "Mozilla/5.0"},
method="GET",
)
with urlopen(req, timeout=120) as r:
body = r.read()
zip_path.write_bytes(body)
OUT_ROOT = Path("data/openairframes")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df
from get_latest_release import get_latest_aircraft_faa_csv_df
df_new = convert_faa_master_txt_to_df(zip_path, date_str)
try:
df_base, start_date_str = get_latest_aircraft_faa_csv_df()
df_base = concat_faa_historical_df(df_base, df_new)
assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing"
except Exception as e:
print(f"No existing FAA release found, using only new data: {e}")
df_base = df_new
start_date_str = date_str
df_base.to_csv(OUT_ROOT / f"openairframes_faa_{start_date_str}_{date_str}.csv", index=False)
@@ -1,33 +0,0 @@
from pathlib import Path
from datetime import datetime, timezone
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
out_dir = Path("data/faa_releasable")
out_dir.mkdir(parents=True, exist_ok=True)
zip_name = f"ReleasableAircraft_{date_str}.zip"
zip_path = out_dir / zip_name
if not zip_path.exists():
# URL and paths
url = "https://registry.faa.gov/database/ReleasableAircraft.zip"
from urllib.request import Request, urlopen
req = Request(
url,
headers={"User-Agent": "Mozilla/5.0"},
method="GET",
)
with urlopen(req, timeout=120) as r:
body = r.read()
zip_path.write_bytes(body)
OUT_ROOT = Path("data/planequery_aircraft")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df
from get_latest_planequery_aircraft_release import get_latest_aircraft_csv_df
df_new = convert_faa_master_txt_to_df(zip_path, date_str)
df_base, start_date_str = get_latest_aircraft_csv_df()
df_base = concat_faa_historical_df(df_base, df_new)
assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing"
df_base.to_csv(OUT_ROOT / f"planequery_aircraft_{start_date_str}_{date_str}.csv", index=False)
+10 -7
View File
@@ -29,8 +29,8 @@ def convert_faa_master_txt_to_df(zip_path: Path, date: str):
certification = pd.json_normalize(df["certification"].where(df["certification"].notna(), {})).add_prefix("certificate_")
df = df.drop(columns="certification").join(certification)
# Create planequery_airframe_id
df["planequery_airframe_id"] = (
# Create openairframes_id
df["openairframes_id"] = (
normalize(df["aircraft_manufacturer"])
+ "|"
+ normalize(df["aircraft_model"])
@@ -38,15 +38,18 @@ def convert_faa_master_txt_to_df(zip_path: Path, date: str):
+ normalize(df["serial_number"])
)
# Move planequery_airframe_id to come after registration_number
# Move openairframes_id to come after registration_number
cols = df.columns.tolist()
cols.remove("planequery_airframe_id")
cols.remove("openairframes_id")
reg_idx = cols.index("registration_number")
cols.insert(reg_idx + 1, "planequery_airframe_id")
cols.insert(reg_idx + 1, "openairframes_id")
df = df[cols]
# Convert all NaN to empty strings
df = df.fillna("")
# The FAA parser can produce the literal string "None" for missing values;
# replace those so they match the empty-string convention used everywhere else.
df = df.replace("None", "")
return df
@@ -84,8 +87,8 @@ def concat_faa_historical_df(df_base, df_new):
# Convert to string
val_str = str(val).strip()
# Handle empty strings
if val_str == "" or val_str == "nan":
# Handle empty strings and null-like literals
if val_str == "" or val_str == "nan" or val_str == "None":
return ""
# Check if it looks like a list representation (starts with [ )
-116
View File
@@ -1,116 +0,0 @@
"""
For each commit-day in Feb 2024 (last commit per day):
- Write ALL FAA text files from that commit into: data/faa_releasable_historical/YYYY-MM-DD/
ACFTREF.txt, DEALER.txt, DOCINDEX.txt, ENGINE.txt, RESERVED.txt
- Recombine MASTER-*.txt into Master.txt
- Produce Master.csv via convert_faa_master_txt_to_csv
Assumes the non-master files are present in every commit.
"""
import subprocess, re
from pathlib import Path
import shutil
from collections import OrderedDict
from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df
import zipfile
import pandas as pd
import argparse
from datetime import datetime, timedelta
# Parse command line arguments
parser = argparse.ArgumentParser(description="Process historical FAA data from git commits")
parser.add_argument("since", help="Start date (YYYY-MM-DD)")
parser.add_argument("until", help="End date (YYYY-MM-DD)")
args = parser.parse_args()
# Clone repository if it doesn't exist
REPO = Path("data/scrape-faa-releasable-aircraft")
OUT_ROOT = Path("data/faa_releasable_historical")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
def run_git_text(*args: str) -> str:
return subprocess.check_output(["git", "-C", str(REPO), *args], text=True).strip()
def run_git_bytes(*args: str) -> bytes:
return subprocess.check_output(["git", "-C", str(REPO), *args])
# Parse dates and adjust --since to the day before
since_date = datetime.strptime(args.since, "%Y-%m-%d")
adjusted_since = (since_date - timedelta(days=1)).strftime("%Y-%m-%d")
# All commits in specified date range (oldest -> newest)
log = run_git_text(
"log",
"--reverse",
"--format=%H %cs",
f"--since={adjusted_since}",
f"--until={args.until}",
)
lines = [ln for ln in log.splitlines() if ln.strip()]
if not lines:
raise SystemExit(f"No commits found between {args.since} and {args.until}.")
# date -> last SHA that day
date_to_sha = OrderedDict()
for ln in lines:
sha, date = ln.split()
date_to_sha[date] = sha
OTHER_FILES = ["ACFTREF.txt", "DEALER.txt", "DOCINDEX.txt", "ENGINE.txt", "RESERVED.txt"]
master_re = re.compile(r"^MASTER-(\d+)\.txt$")
df_base = pd.DataFrame()
start_date = None
end_date = None
for date, sha in date_to_sha.items():
if start_date is None:
start_date = date
end_date = date
day_dir = OUT_ROOT / date
day_dir.mkdir(parents=True, exist_ok=True)
# Write auxiliary files (assumed present)
for fname in OTHER_FILES:
(day_dir / fname).write_bytes(run_git_bytes("show", f"{sha}:{fname}"))
# Recombine MASTER parts
names = run_git_text("ls-tree", "--name-only", sha).splitlines()
parts = []
for n in names:
m = master_re.match(n)
if m:
parts.append((int(m.group(1)), n))
parts.sort()
if not parts:
raise RuntimeError(f"{date} {sha[:7]}: no MASTER-*.txt parts found")
master_path = day_dir / "MASTER.txt"
with master_path.open("wb") as w:
for _, fname in parts:
data = run_git_bytes("show", f"{sha}:{fname}")
w.write(data)
if data and not data.endswith(b"\n"):
w.write(b"\n")
# 3) Zip the day's files
zip_path = day_dir / f"ReleasableAircraft.zip"
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
for p in day_dir.iterdir():
z.write(p, arcname=p.name)
print(f"{date} {sha[:7]} -> {day_dir} (master parts: {len(parts)})")
# 4) Convert ZIP -> CSV
df_new = convert_faa_master_txt_to_df(zip_path, date)
if df_base.empty:
df_base = df_new
print(len(df_base), "total entries so far")
# Delete all files in the day directory
shutil.rmtree(day_dir)
continue
df_base = concat_faa_historical_df(df_base, df_new)
shutil.rmtree(day_dir)
print(len(df_base), "total entries so far")
assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing"
df_base.to_csv(OUT_ROOT / f"planequery_aircraft_{start_date}_{end_date}.csv", index=False)
# TODO: get average number of new rows per day.
@@ -1,144 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional
import re
import urllib.request
import urllib.error
import json
REPO = "PlaneQuery/planequery-aircraft"
LATEST_RELEASE_URL = f"https://api.github.com/repos/{REPO}/releases/latest"
@dataclass(frozen=True)
class ReleaseAsset:
name: str
download_url: str
size: int # bytes
def _http_get_json(url: str, headers: dict[str, str]) -> dict:
req = urllib.request.Request(url, headers=headers, method="GET")
with urllib.request.urlopen(req, timeout=120) as resp:
data = resp.read()
return json.loads(data.decode("utf-8"))
def get_latest_release_assets(repo: str = REPO, github_token: Optional[str] = None) -> list[ReleaseAsset]:
url = f"https://api.github.com/repos/{repo}/releases/latest"
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "planequery-aircraft-downloader/1.0",
}
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
payload = _http_get_json(url, headers=headers)
assets = []
for a in payload.get("assets", []):
assets.append(
ReleaseAsset(
name=a["name"],
download_url=a["browser_download_url"],
size=int(a.get("size", 0)),
)
)
return assets
def pick_asset(
assets: Iterable[ReleaseAsset],
*,
exact_name: Optional[str] = None,
name_regex: Optional[str] = None,
) -> ReleaseAsset:
assets = list(assets)
if exact_name:
for a in assets:
if a.name == exact_name:
return a
raise FileNotFoundError(f"No asset exactly named {exact_name!r}. Available: {[a.name for a in assets]}")
if name_regex:
rx = re.compile(name_regex)
matches = [a for a in assets if rx.search(a.name)]
if not matches:
raise FileNotFoundError(f"No asset matched regex {name_regex!r}. Available: {[a.name for a in assets]}")
if len(matches) > 1:
raise FileExistsError(f"Regex {name_regex!r} matched multiple assets: {[m.name for m in matches]}")
return matches[0]
raise ValueError("Provide either exact_name=... or name_regex=...")
def download_asset(asset: ReleaseAsset, out_path: Path, github_token: Optional[str] = None) -> Path:
out_path = Path(out_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
headers = {
"User-Agent": "planequery-aircraft-downloader/1.0",
"Accept": "application/octet-stream",
}
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
req = urllib.request.Request(asset.download_url, headers=headers, method="GET")
try:
with urllib.request.urlopen(req, timeout=300) as resp, out_path.open("wb") as f:
# Stream download
while True:
chunk = resp.read(1024 * 1024) # 1 MiB
if not chunk:
break
f.write(chunk)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace") if hasattr(e, "read") else ""
raise RuntimeError(f"HTTPError {e.code} downloading {asset.name}: {body[:500]}") from e
return out_path
def download_latest_aircraft_csv(
output_dir: Path = Path("downloads"),
github_token: Optional[str] = None,
repo: str = REPO,
) -> Path:
"""
Download the latest planequery_aircraft_*.csv file from the latest GitHub release.
Args:
output_dir: Directory to save the downloaded file (default: "downloads")
github_token: Optional GitHub token for authentication
repo: GitHub repository in format "owner/repo" (default: REPO)
Returns:
Path to the downloaded file
"""
assets = get_latest_release_assets(repo, github_token=github_token)
asset = pick_asset(assets, name_regex=r"^planequery_aircraft_.*\.csv$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
def get_latest_aircraft_csv_df():
csv_path = download_latest_aircraft_csv()
import pandas as pd
df = pd.read_csv(csv_path, dtype={'transponder_code': str,
'unique_regulatory_id': str,
'registrant_county': str})
df = df.fillna("")
# Extract date from filename pattern: planequery_aircraft_{date}_{date}.csv
match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
date_str = match.group(1)
return df, date_str
if __name__ == "__main__":
download_latest_aircraft_csv()
+253
View File
@@ -0,0 +1,253 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional
import re
import urllib.request
import urllib.error
import json
REPO = "PlaneQuery/openairframes"
LATEST_RELEASE_URL = f"https://api.github.com/repos/{REPO}/releases/latest"
@dataclass(frozen=True)
class ReleaseAsset:
name: str
download_url: str
size: int # bytes
def _http_get_json(url: str, headers: dict[str, str]) -> dict:
req = urllib.request.Request(url, headers=headers, method="GET")
with urllib.request.urlopen(req, timeout=120) as resp:
data = resp.read()
return json.loads(data.decode("utf-8"))
def get_releases(repo: str = REPO, github_token: Optional[str] = None, per_page: int = 30) -> list[dict]:
"""Get a list of releases from the repository."""
url = f"https://api.github.com/repos/{repo}/releases?per_page={per_page}"
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "openairframes-downloader/1.0",
}
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
return _http_get_json(url, headers=headers)
def get_release_assets_from_release_data(release_data: dict) -> list[ReleaseAsset]:
"""Extract assets from a release data dictionary."""
assets = []
for a in release_data.get("assets", []):
assets.append(
ReleaseAsset(
name=a["name"],
download_url=a["browser_download_url"],
size=int(a.get("size", 0)),
)
)
return assets
def get_latest_release_assets(repo: str = REPO, github_token: Optional[str] = None) -> list[ReleaseAsset]:
url = f"https://api.github.com/repos/{repo}/releases/latest"
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "openairframes-downloader/1.0",
}
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
payload = _http_get_json(url, headers=headers)
return get_release_assets_from_release_data(payload)
def pick_asset(
assets: Iterable[ReleaseAsset],
*,
exact_name: Optional[str] = None,
name_regex: Optional[str] = None,
) -> ReleaseAsset:
assets = list(assets)
if exact_name:
for a in assets:
if a.name == exact_name:
return a
raise FileNotFoundError(f"No asset exactly named {exact_name!r}. Available: {[a.name for a in assets]}")
if name_regex:
rx = re.compile(name_regex)
matches = [a for a in assets if rx.search(a.name)]
if not matches:
raise FileNotFoundError(f"No asset matched regex {name_regex!r}. Available: {[a.name for a in assets]}")
if len(matches) > 1:
raise FileExistsError(f"Regex {name_regex!r} matched multiple assets: {[m.name for m in matches]}")
return matches[0]
raise ValueError("Provide either exact_name=... or name_regex=...")
def download_asset(asset: ReleaseAsset, out_path: Path, github_token: Optional[str] = None) -> Path:
out_path = Path(out_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
headers = {
"User-Agent": "openairframes-downloader/1.0",
"Accept": "application/octet-stream",
}
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
req = urllib.request.Request(asset.download_url, headers=headers, method="GET")
try:
with urllib.request.urlopen(req, timeout=300) as resp, out_path.open("wb") as f:
# Stream download
while True:
chunk = resp.read(1024 * 1024) # 1 MiB
if not chunk:
break
f.write(chunk)
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace") if hasattr(e, "read") else ""
raise RuntimeError(f"HTTPError {e.code} downloading {asset.name}: {body[:500]}") from e
return out_path
def download_latest_aircraft_csv(
output_dir: Path = Path("downloads"),
github_token: Optional[str] = None,
repo: str = REPO,
) -> Path:
"""
Download the latest openairframes_faa_*.csv file from the latest GitHub release.
Args:
output_dir: Directory to save the downloaded file (default: "downloads")
github_token: Optional GitHub token for authentication
repo: GitHub repository in format "owner/repo" (default: REPO)
Returns:
Path to the downloaded file
"""
output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token)
try:
asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$")
except FileNotFoundError:
# Fallback to old naming pattern
asset = pick_asset(assets, name_regex=r"^openairframes_\d{4}-\d{2}-\d{2}_.*\.csv$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
def get_latest_aircraft_faa_csv_df():
csv_path = download_latest_aircraft_csv()
import pandas as pd
df = pd.read_csv(csv_path, dtype={'transponder_code': str,
'unique_regulatory_id': str,
'registrant_county': str})
df = df.fillna("")
# Extract start date from filename pattern: openairframes_faa_{start_date}_{end_date}.csv
match = re.search(r"openairframes_faa_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
# Fallback to old naming pattern: openairframes_{start_date}_{end_date}.csv
match = re.search(r"openairframes_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
date_str = match.group(1)
return df, date_str
def download_latest_aircraft_adsb_csv(
output_dir: Path = Path("downloads"),
github_token: Optional[str] = None,
repo: str = REPO,
) -> Path:
"""
Download the latest openairframes_adsb_*.csv file from GitHub releases.
If the latest release doesn't have the file, searches previous releases.
Args:
output_dir: Directory to save the downloaded file (default: "downloads")
github_token: Optional GitHub token for authentication
repo: GitHub repository in format "owner/repo" (default: REPO)
Returns:
Path to the downloaded file
"""
output_dir = Path(output_dir)
# Get multiple releases
releases = get_releases(repo, github_token=github_token, per_page=30)
# Try each release until we find one with the matching asset
for release in releases:
assets = get_release_assets_from_release_data(release)
try:
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
except FileNotFoundError:
# This release doesn't have the matching asset, try the next one
continue
raise FileNotFoundError(
f"No release in the last 30 releases has an asset matching 'openairframes_adsb_.*\\.csv(\\.gz)?$'"
)
import polars as pl
def get_latest_aircraft_adsb_csv_df():
"""Download and load the latest ADS-B CSV from GitHub releases.
Returns:
tuple: (df, start_date, end_date) where dates are in YYYY-MM-DD format
"""
import re
csv_path = download_latest_aircraft_adsb_csv()
df = pl.read_csv(csv_path, null_values=[""])
# Parse time column: values like "2025-12-31T00:00:00.040" or "2025-05-11T15:15:50.540+0000"
# Try with timezone first (convert to naive), then without timezone
df = df.with_columns(
pl.col("time").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%S%.f%z", strict=False)
.dt.replace_time_zone(None) # Convert to naive datetime first
.fill_null(pl.col("time").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%S%.f", strict=False))
)
# Cast dbFlags and year to strings to match the schema used in compress functions
for col in ['dbFlags', 'year']:
if col in df.columns:
df = df.with_columns(pl.col(col).cast(pl.Utf8))
# Fill nulls with empty strings for string columns
for col in df.columns:
if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null(""))
# Extract start and end dates from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv", str(csv_path))
if not match:
raise ValueError(f"Could not extract dates from filename: {csv_path.name}")
start_date = match.group(1)
end_date = match.group(2)
print(df.columns)
print(df.dtypes)
return df, start_date, end_date
if __name__ == "__main__":
download_latest_aircraft_csv()
download_latest_aircraft_adsb_csv()