Compare commits

..

23 Commits

Author SHA1 Message Date
JG 2829e5fb6e Merge pull request #35 from PlaneQuery/develop
update readme.md
2026-03-18 14:31:29 -04:00
ggman12 9c744b0baf update readme.md 2026-03-18 14:29:13 -04:00
JG ebda04767f Merge pull request #34 from PlaneQuery/develop
Develop to main: theairtraffic google sheet
2026-03-10 05:12:11 -04:00
ggman12 3fdf443894 add russia_ukraine 2026-03-10 05:08:19 -04:00
ggman12 24313603c5 works 2026-03-10 05:08:19 -04:00
JG 2bb0a5eac3 Merge pull request #33 from PlaneQuery/develop
Develop to Main: Handle ADSB when ADSB.lol has not released any data for day. Just rerelease latest adsb
2026-02-26 15:32:59 -05:00
ggman12 b54f33aa56 Handle ADSB when ADSB.lol has not released any data for day. Just rerelease latest adsb 2026-02-26 15:31:47 -05:00
JG 2dda3d341c Merge pull request #32 from PlaneQuery/develop
Develop to Main: Fix Community Submission export. Fix CSV concatenation logic to prevent duplicates when there is no new ADSB.lol data.
2026-02-24 15:37:54 -05:00
ggman12 b0526f0a95 Fix Community Submission export. Fix CSV concatenation logic to prevent duplicates when there is no new ADSB.lol data. 2026-02-24 15:36:10 -05:00
JG 4b6a043a9d Merge pull request #31 from PlaneQuery/develop
Develop to Main Fix adsb asset retrival to be more fault tolerant. Fix download issue
2026-02-24 02:17:08 -05:00
ggman12 55c464aad7 Fix adsb asset retrival to be more fault tolerant. Fix download issue for 2024-07-03 2026-02-24 02:12:55 -05:00
ggman12 aa509e8560 attempt to fix download issue for 2024-07-03 2026-02-19 17:51:49 -05:00
ggman12 82d11d8d24 try less strict tar extract for 2025-10-15 and other days that fail 2026-02-19 00:20:03 -05:00
ggman12 76a217ad14 src/contributions/approve_submission.py handle big json files 2026-02-18 23:18:19 -05:00
ggman12 ec2d1a1291 update download.sh 2026-02-18 23:18:19 -05:00
ggman12 97284c69a9 verify downlaod asssets 2026-02-18 23:18:19 -05:00
JG 892ffa78af Merge pull request #28 from PlaneQuery/community-submission-27
Community submission: ggman12_2026-02-18_5ddbb8bd.json
2026-02-18 17:18:49 -05:00
github-actions[bot] f77a91db2c Update schema with new tags: manufacturer_icao, manufacturer_name, model, type_code, serial_number, icao_aircraft_type, operator, operator_callsign, operator_icao, citation_0 2026-02-18 22:18:12 +00:00
github-actions[bot] b3bd654998 Add community submission from @ggman12 (closes #27) 2026-02-18 22:18:12 +00:00
ggman12 302be8b8dc update checker for arrays issue 2026-02-18 17:11:14 -05:00
ggman12 b61dc0f5e5 provide more error 2026-02-18 17:08:43 -05:00
ggman12 1ff17cc6a8 allow adsb to fail for when adsb.lol hasen't uploaded file yet. 2026-02-18 16:49:02 -05:00
ggman12 d216ea9329 Daily ADSB and Histoircal updates. Update readme.md 2026-02-18 16:34:06 -05:00
24 changed files with 1153 additions and 532 deletions
@@ -7,12 +7,22 @@ on:
description: 'YYYY-MM-DD' description: 'YYYY-MM-DD'
required: true required: true
type: string type: string
concat_with_latest_csv:
description: 'Also concatenate with latest CSV from GitHub releases'
required: false
type: boolean
default: false
workflow_call: workflow_call:
inputs: inputs:
date: date:
description: 'YYYY-MM-DD' description: 'YYYY-MM-DD'
required: true required: true
type: string type: string
concat_with_latest_csv:
description: 'Also concatenate with latest CSV from GitHub releases'
required: false
type: boolean
default: false
jobs: jobs:
adsb-extract: adsb-extract:
@@ -39,11 +49,38 @@ jobs:
python -m src.adsb.download_and_list_icaos --date "$DATE" python -m src.adsb.download_and_list_icaos --date "$DATE"
ls -lah data/output/adsb_archives/"$DATE" || true ls -lah data/output/adsb_archives/"$DATE" || true
- name: Upload archives - name: Upload archive part 0
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: adsb-archives-${{ inputs.date }} name: adsb-archive-${{ inputs.date }}-part-0
path: data/output/adsb_archives/${{ inputs.date }} path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_0.tar.gz
retention-days: 1
compression-level: 0
if-no-files-found: error
- name: Upload archive part 1
uses: actions/upload-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-1
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_1.tar.gz
retention-days: 1
compression-level: 0
if-no-files-found: error
- name: Upload archive part 2
uses: actions/upload-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-2
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_2.tar.gz
retention-days: 1
compression-level: 0
if-no-files-found: error
- name: Upload archive part 3
uses: actions/upload-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-3
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_3.tar.gz
retention-days: 1 retention-days: 1
compression-level: 0 compression-level: 0
if-no-files-found: error if-no-files-found: error
@@ -54,7 +91,7 @@ jobs:
strategy: strategy:
fail-fast: true fail-fast: true
matrix: matrix:
part_id: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] part_id: [0, 1, 2, 3]
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v6 uses: actions/checkout@v6
@@ -69,12 +106,22 @@ jobs:
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install -r requirements.txt pip install -r requirements.txt
- name: Download archives - name: Download archive part
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
with: with:
name: adsb-archives-${{ inputs.date }} name: adsb-archive-${{ inputs.date }}-part-${{ matrix.part_id }}
path: data/output/adsb_archives/${{ inputs.date }} path: data/output/adsb_archives/${{ inputs.date }}
- name: Verify archive
run: |
FILE="data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_${{ matrix.part_id }}.tar.gz"
ls -lah data/output/adsb_archives/${{ inputs.date }}/
if [ ! -f "$FILE" ]; then
echo "::error::Archive not found: $FILE"
exit 1
fi
echo "Verified: $(du -h "$FILE")"
- name: Process part - name: Process part
env: env:
DATE: ${{ inputs.date }} DATE: ${{ inputs.date }}
@@ -111,19 +158,25 @@ jobs:
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
with: with:
pattern: adsb-compressed-${{ inputs.date }}-part-* pattern: adsb-compressed-${{ inputs.date }}-part-*
path: outputs/compressed/${{ inputs.date }} path: data/output/compressed/${{ inputs.date }}
merge-multiple: true merge-multiple: true
- name: Concatenate final outputs - name: Concatenate final outputs
env: env:
DATE: ${{ inputs.date }} DATE: ${{ inputs.date }}
CONCAT_WITH_LATEST_CSV: ${{ inputs.concat_with_latest_csv }}
run: | run: |
python src/adsb/concat_parquet_to_final.py --date "$DATE" EXTRA=""
ls -lah outputs/ || true if [ "$CONCAT_WITH_LATEST_CSV" = "true" ]; then
EXTRA="--concat_with_latest_csv"
fi
python -m src.adsb.concat_parquet_to_final --date "$DATE" $EXTRA
ls -lah data/output/ || true
- name: Upload final artifacts - name: Upload final artifacts
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: openairframes_adsb-${{ inputs.date }} name: openairframes_adsb-${{ inputs.date }}
path: outputs/openairframes_adsb_${{ inputs.date }}_${{ inputs.date }}.* path: data/output/openairframes_adsb_*
retention-days: 30 retention-days: 30
if-no-files-found: error
@@ -1,4 +1,4 @@
name: Historical ADS-B Run name: adsb-to-aircraft-multiple-day-run
on: on:
workflow_dispatch: workflow_dispatch:
@@ -26,10 +26,11 @@ jobs:
run: | run: |
python - <<'PY' python - <<'PY'
import json import json
import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
start = datetime.strptime("${START_DATE}", "%Y-%m-%d") start = datetime.strptime(os.environ["START_DATE"], "%Y-%m-%d")
end = datetime.strptime("${END_DATE}", "%Y-%m-%d") end = datetime.strptime(os.environ["END_DATE"], "%Y-%m-%d")
if end <= start: if end <= start:
raise SystemExit("end_date must be after start_date") raise SystemExit("end_date must be after start_date")
@@ -39,7 +40,7 @@ jobs:
dates.append(cur.strftime("%Y-%m-%d")) dates.append(cur.strftime("%Y-%m-%d"))
cur += timedelta(days=1) cur += timedelta(days=1)
with open("$GITHUB_OUTPUT", "a") as f: with open(os.environ["GITHUB_OUTPUT"], "a") as f:
f.write(f"dates={json.dumps(dates)}\n") f.write(f"dates={json.dumps(dates)}\n")
PY PY
@@ -49,7 +50,7 @@ jobs:
fail-fast: true fail-fast: true
matrix: matrix:
date: ${{ fromJson(needs.generate-dates.outputs.dates) }} date: ${{ fromJson(needs.generate-dates.outputs.dates) }}
uses: ./.github/workflows/historical-adsb.yaml uses: ./.github/workflows/adsb-to-aircraft-for-day.yaml
with: with:
date: ${{ matrix.date }} date: ${{ matrix.date }}
@@ -83,14 +84,15 @@ jobs:
END_DATE: ${{ inputs.end_date }} END_DATE: ${{ inputs.end_date }}
run: | run: |
python - <<'PY' python - <<'PY'
import os
import re import re
from pathlib import Path from pathlib import Path
import polars as pl import polars as pl
start = "${START_DATE}" start = os.environ["START_DATE"]
end = "${END_DATE}" end = os.environ["END_DATE"]
daily_dir = Path("outputs/daily") daily_dir = Path("outputs/daily")
files = sorted(daily_dir.glob("openairframes_adsb_*.csv")) files = sorted(daily_dir.glob("openairframes_adsb_*.csv.gz"))
if not files: if not files:
raise SystemExit("No daily CSVs found") raise SystemExit("No daily CSVs found")
@@ -102,8 +104,8 @@ jobs:
frames = [pl.read_csv(p) for p in files] frames = [pl.read_csv(p) for p in files]
df = pl.concat(frames, how="vertical", rechunk=True) df = pl.concat(frames, how="vertical", rechunk=True)
output_path = Path("outputs") / f"openairframes_adsb_{start}_{end}.csv" output_path = Path("outputs") / f"openairframes_adsb_{start}_{end}.csv.gz"
df.write_csv(output_path) df.write_csv(output_path, compression="gzip")
print(f"Wrote {output_path} with {df.height} rows") print(f"Wrote {output_path} with {df.height} rows")
PY PY
@@ -111,5 +113,6 @@ jobs:
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: openairframes_adsb-${{ inputs.start_date }}-${{ inputs.end_date }} name: openairframes_adsb-${{ inputs.start_date }}-${{ inputs.end_date }}
path: outputs/openairframes_adsb_${{ inputs.start_date }}_${{ inputs.end_date }}.csv path: outputs/openairframes_adsb_${{ inputs.start_date }}_${{ inputs.end_date }}.csv.gz
retention-days: 30 retention-days: 30
# gh workflow run adsb-to-aircraft-multiple-day-run.yaml --repo ggman12/OpenAirframes --ref jonah/fix-historical-proper -f start_date=2025-12-31 -f end_date=2026-01-02
@@ -1,4 +1,4 @@
name: OpenAirframes Daily Release name: openairframes-daily-release
on: on:
schedule: schedule:
@@ -76,159 +76,75 @@ jobs:
data/faa_releasable/ReleasableAircraft_*.zip data/faa_releasable/ReleasableAircraft_*.zip
retention-days: 1 retention-days: 1
adsb-extract: resolve-dates:
runs-on: ubuntu-24.04-arm runs-on: ubuntu-latest
if: github.event_name != 'schedule' if: github.event_name != 'schedule'
outputs: outputs:
manifest-exists: ${{ steps.check.outputs.exists }} date: ${{ steps.out.outputs.date }}
adsb_date: ${{ steps.out.outputs.adsb_date }}
steps: steps:
- name: Checkout - id: out
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install dependencies
run: | run: |
python -m pip install --upgrade pip if [ -n "${{ inputs.date }}" ]; then
pip install -r requirements.txt echo "date=${{ inputs.date }}" >> "$GITHUB_OUTPUT"
echo "adsb_date=${{ inputs.date }}" >> "$GITHUB_OUTPUT"
- name: Download and extract ADS-B data
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
python -m src.adsb.download_and_list_icaos ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/output/
- name: Check manifest exists
id: check
run: |
if ls data/output/icao_manifest_*.txt 1>/dev/null 2>&1; then
echo "exists=true" >> "$GITHUB_OUTPUT"
else else
echo "exists=false" >> "$GITHUB_OUTPUT" echo "date=$(date -u -d 'yesterday' +%Y-%m-%d)" >> "$GITHUB_OUTPUT"
echo "adsb_date=$(date -u -d 'yesterday' +%Y-%m-%d)" >> "$GITHUB_OUTPUT"
fi fi
- name: Create tar of extracted data adsb-to-aircraft:
run: | needs: resolve-dates
cd data/output if: github.event_name != 'schedule'
tar -cf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt uses: ./.github/workflows/adsb-to-aircraft-for-day.yaml
ls -lah extracted_data.tar with:
date: ${{ needs.resolve-dates.outputs.adsb_date }}
- name: Upload extracted data concat_with_latest_csv: true
uses: actions/upload-artifact@v4
with:
name: adsb-extracted
path: data/output/extracted_data.tar
retention-days: 1
compression-level: 0 # Already compressed trace files
adsb-map:
runs-on: ubuntu-24.04-arm
needs: adsb-extract
if: github.event_name != 'schedule' && needs.adsb-extract.outputs.manifest-exists == 'true'
strategy:
fail-fast: false
matrix:
chunk: [0, 1, 2, 3]
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download extracted data
uses: actions/download-artifact@v4
with:
name: adsb-extracted
path: data/output/
- name: Extract tar
run: |
cd data/output
tar -xf extracted_data.tar
rm extracted_data.tar
echo "=== Contents of data/output ==="
ls -lah
echo "=== Looking for manifest ==="
cat icao_manifest_*.txt | head -20 || echo "No manifest found"
echo "=== Looking for extracted dirs ==="
ls -d *-planes-readsb-prod-0* 2>/dev/null || echo "No extracted dirs"
- name: Process chunk ${{ matrix.chunk }}
run: |
python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4 ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
mkdir -p data/output/adsb_chunks
ls -lah data/output/adsb_chunks/ || echo "No chunks created"
- name: Upload chunk artifacts
uses: actions/upload-artifact@v4
with:
name: adsb-chunk-${{ matrix.chunk }}
path: data/output/adsb_chunks/
retention-days: 1
adsb-reduce: adsb-reduce:
needs: [resolve-dates, adsb-to-aircraft]
if: always() && github.event_name != 'schedule' && needs.adsb-to-aircraft.result == 'failure'
runs-on: ubuntu-24.04-arm runs-on: ubuntu-24.04-arm
needs: adsb-map
if: github.event_name != 'schedule'
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v6 uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v6 uses: actions/setup-python@v6
with: with:
python-version: "3.14" python-version: '3.12'
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install -r requirements.txt pip install -r requirements.txt
- name: Download all chunk artifacts - name: Download compressed outputs
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
with: with:
pattern: adsb-chunk-* pattern: adsb-compressed-${{ needs.resolve-dates.outputs.adsb_date }}-part-*
path: data/output/adsb_chunks/ path: data/output/compressed/${{ needs.resolve-dates.outputs.adsb_date }}
merge-multiple: true merge-multiple: true
- name: Debug downloaded files - name: Concatenate final outputs
env:
DATE: ${{ needs.resolve-dates.outputs.adsb_date }}
CONCAT_WITH_LATEST_CSV: true
run: | run: |
echo "=== Listing data/ ===" EXTRA=""
find data/ -type f 2>/dev/null | head -50 || echo "No files in data/" if [ "$CONCAT_WITH_LATEST_CSV" = "true" ]; then
echo "=== Looking for parquet files ===" EXTRA="--concat_with_latest_csv"
find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found" fi
python -m src.adsb.concat_parquet_to_final --date "$DATE" $EXTRA
ls -lah data/output/ || true
- name: Combine chunks to CSV - name: Upload final artifacts
run: |
mkdir -p data/output/adsb_chunks
ls -lah data/output/adsb_chunks/ || echo "Directory empty or does not exist"
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/openairframes/
- name: Upload ADS-B artifacts
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: adsb-release name: openairframes_adsb-${{ needs.resolve-dates.outputs.adsb_date }}
path: data/openairframes/openairframes_adsb_*.csv.gz path: data/output/openairframes_adsb_*
retention-days: 1 retention-days: 30
if-no-files-found: error
build-community: build-community:
runs-on: ubuntu-latest runs-on: ubuntu-latest
@@ -317,9 +233,14 @@ jobs:
create-release: create-release:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: [build-faa, adsb-reduce, build-community, build-adsbexchange-json, build-mictronics-db] needs: [resolve-dates, build-faa, adsb-to-aircraft, adsb-reduce, build-community, build-adsbexchange-json, build-mictronics-db]
if: github.event_name != 'schedule' && !failure() && !cancelled() if: github.event_name != 'schedule' && !cancelled()
steps: steps:
- name: Check ADS-B workflow status
if: needs.adsb-to-aircraft.result != 'success' && needs.adsb-reduce.result != 'success'
run: |
echo "WARNING: ADS-B workflow failed (adsb-to-aircraft='${{ needs.adsb-to-aircraft.result }}', adsb-reduce='${{ needs.adsb-reduce.result }}'), will continue without ADS-B artifacts"
- name: Checkout for gh CLI - name: Checkout for gh CLI
uses: actions/checkout@v4 uses: actions/checkout@v4
with: with:
@@ -328,31 +249,33 @@ jobs:
sparse-checkout-cone-mode: false sparse-checkout-cone-mode: false
- name: Download FAA artifacts - name: Download FAA artifacts
uses: actions/download-artifact@v4 uses: actions/download-artifact@v5
with: with:
name: faa-release name: faa-release
path: artifacts/faa path: artifacts/faa
- name: Download ADS-B artifacts - name: Download ADS-B artifacts
uses: actions/download-artifact@v4 uses: actions/download-artifact@v5
if: needs.adsb-to-aircraft.result == 'success' || needs.adsb-reduce.result == 'success'
continue-on-error: true
with: with:
name: adsb-release name: openairframes_adsb-${{ needs.resolve-dates.outputs.adsb_date }}
path: artifacts/adsb path: artifacts/adsb
- name: Download Community artifacts - name: Download Community artifacts
uses: actions/download-artifact@v4 uses: actions/download-artifact@v5
with: with:
name: community-release name: community-release
path: artifacts/community path: artifacts/community
- name: Download ADS-B Exchange JSON artifact - name: Download ADS-B Exchange JSON artifact
uses: actions/download-artifact@v4 uses: actions/download-artifact@v5
with: with:
name: adsbexchange-json name: adsbexchange-json
path: artifacts/adsbexchange path: artifacts/adsbexchange
- name: Download Mictronics DB artifact - name: Download Mictronics DB artifact
uses: actions/download-artifact@v4 uses: actions/download-artifact@v5
continue-on-error: true continue-on-error: true
with: with:
name: mictronics-db name: mictronics-db
@@ -388,7 +311,11 @@ jobs:
# Find files from artifacts using find (handles nested structures) # Find files from artifacts using find (handles nested structures)
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1) CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1) # Prefer concatenated file (with date range) over single-day file
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*_*.csv.gz" -type f 2>/dev/null | head -1)
if [ -z "$CSV_FILE_ADSB" ]; then
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1)
fi
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1) CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1) ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1) JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1)
@@ -399,9 +326,6 @@ jobs:
if [ -z "$CSV_FILE_FAA" ] || [ ! -f "$CSV_FILE_FAA" ]; then if [ -z "$CSV_FILE_FAA" ] || [ ! -f "$CSV_FILE_FAA" ]; then
MISSING_FILES="$MISSING_FILES FAA_CSV" MISSING_FILES="$MISSING_FILES FAA_CSV"
fi fi
if [ -z "$CSV_FILE_ADSB" ] || [ ! -f "$CSV_FILE_ADSB" ]; then
MISSING_FILES="$MISSING_FILES ADSB_CSV"
fi
if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then
MISSING_FILES="$MISSING_FILES FAA_ZIP" MISSING_FILES="$MISSING_FILES FAA_ZIP"
fi fi
@@ -411,6 +335,11 @@ jobs:
# Optional files - warn but don't fail # Optional files - warn but don't fail
OPTIONAL_MISSING="" OPTIONAL_MISSING=""
if [ -z "$CSV_FILE_ADSB" ] || [ ! -f "$CSV_FILE_ADSB" ]; then
OPTIONAL_MISSING="$OPTIONAL_MISSING ADSB_CSV"
CSV_FILE_ADSB=""
CSV_BASENAME_ADSB=""
fi
if [ -z "$ZIP_FILE_MICTRONICS" ] || [ ! -f "$ZIP_FILE_MICTRONICS" ]; then if [ -z "$ZIP_FILE_MICTRONICS" ] || [ ! -f "$ZIP_FILE_MICTRONICS" ]; then
OPTIONAL_MISSING="$OPTIONAL_MISSING MICTRONICS_ZIP" OPTIONAL_MISSING="$OPTIONAL_MISSING MICTRONICS_ZIP"
ZIP_FILE_MICTRONICS="" ZIP_FILE_MICTRONICS=""
@@ -428,7 +357,9 @@ jobs:
# Get basenames for display # Get basenames for display
CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA") CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA")
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB") if [ -n "$CSV_FILE_ADSB" ]; then
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
fi
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "") CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
ZIP_BASENAME=$(basename "$ZIP_FILE") ZIP_BASENAME=$(basename "$ZIP_FILE")
JSON_BASENAME_ADSBX=$(basename "$JSON_FILE_ADSBX") JSON_BASENAME_ADSBX=$(basename "$JSON_FILE_ADSBX")
@@ -483,7 +414,7 @@ jobs:
Assets: Assets:
- ${{ steps.meta.outputs.csv_basename_faa }} - ${{ steps.meta.outputs.csv_basename_faa }}
- ${{ steps.meta.outputs.csv_basename_adsb }} ${{ steps.meta.outputs.csv_basename_adsb && format('- {0}', steps.meta.outputs.csv_basename_adsb) || '' }}
- ${{ steps.meta.outputs.csv_basename_community }} - ${{ steps.meta.outputs.csv_basename_community }}
- ${{ steps.meta.outputs.zip_basename }} - ${{ steps.meta.outputs.zip_basename }}
- ${{ steps.meta.outputs.json_basename_adsbx }} - ${{ steps.meta.outputs.json_basename_adsbx }}
+10 -2
View File
@@ -16,11 +16,19 @@ A daily release is created at **06:00 UTC** and includes:
- **openairframes_community.csv** - **openairframes_community.csv**
All community submissions All community submissions
- **openairframes_adsb.csv**
Airframes dataset derived from ADSB.lol network data. For each UTC day, a row is created for every icao observed in that days ADS-B messages, using registration data from [tar1090-db](https://github.com/wiedehopf/tar1090-db) (ADSBExchange & Mictronics).
Example Usage:
```python
import pandas as pd
url = "https://github.com/PlaneQuery/OpenAirframes/releases/download/openairframes-2026-03-18-main/openairframes_adsb_2024-01-01_2026-03-17.csv.gz" # 1GB
df = pd.read_csv(url)
df
```
![](docs/images/df_adsb_example_0.png)
- **openairframes_faa.csv** - **openairframes_faa.csv**
All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB) All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB)
- **openairframes_adsb.csv**
Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present (will be from 2024-01-01 soon). The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB).
- **ReleasableAircraft_{date}.zip** - **ReleasableAircraft_{date}.zip**
A daily snapshot of the FAA database, which updates at **05:30 UTC** A daily snapshot of the FAA database, which updates at **05:30 UTC**
@@ -0,0 +1,40 @@
[
{
"contributor_name": "JohnSmith.com",
"contributor_uuid": "2981c3ee-8712-5f96-84bf-732eda515a3f",
"creation_timestamp": "2026-02-18T22:18:11.349009+00:00",
"registration_number": "ZM146",
"tags": {
"citation_0": "https://assets.publishing.service.gov.uk/media/5c07a65f40f0b6705f11cf37/10389.pdf",
"icao_aircraft_type": "L1J",
"manufacturer_icao": "LOCKHEED MARTIN",
"manufacturer_name": "Lockheed-martin",
"model": "F-35B Lightning II",
"operator": "Royal Air Force",
"operator_callsign": "RAFAIR",
"operator_icao": "RFR",
"serial_number": "BK-12",
"type_code": "VF35"
},
"transponder_code_hex": "43C81C"
},
{
"contributor_name": "JohnSmith.com",
"contributor_uuid": "2981c3ee-8712-5f96-84bf-732eda515a3f",
"creation_timestamp": "2026-02-18T22:18:11.349009+00:00",
"registration_number": "ZM148",
"tags": {
"citation_0": "https://assets.publishing.service.gov.uk/media/5c07a65f40f0b6705f11cf37/10389.pdf",
"icao_aircraft_type": "L1J",
"manufacturer_icao": "LOCKHEED MARTIN",
"manufacturer_name": "Lockheed-martin",
"model": "F-35B Lightning II",
"operator": "Royal Air Force",
"operator_callsign": "RAFAIR",
"operator_icao": "RFR",
"serial_number": "BK-14",
"type_code": "VF35"
},
"transponder_code_hex": "43C811"
}
]
Binary file not shown.

After

Width:  |  Height:  |  Size: 99 KiB

+32 -1
View File
@@ -54,7 +54,38 @@
"additionalProperties": { "additionalProperties": {
"$ref": "#/$defs/tagValue" "$ref": "#/$defs/tagValue"
}, },
"properties": {} "properties": {
"citation_0": {
"type": "string"
},
"icao_aircraft_type": {
"type": "string"
},
"manufacturer_icao": {
"type": "string"
},
"manufacturer_name": {
"type": "string"
},
"model": {
"type": "string"
},
"operator": {
"type": "string"
},
"operator_callsign": {
"type": "string"
},
"operator_icao": {
"type": "string"
},
"serial_number": {
"type": "string"
},
"type_code": {
"type": "string"
}
}
} }
}, },
"allOf": [ "allOf": [
+49
View File
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import re
from pathlib import Path
import polars as pl
# Find all CSV.gz files in the downloaded artifacts
artifacts_dir = Path("downloads/adsb_artifacts")
files = sorted(artifacts_dir.glob("*/openairframes_adsb_*.csv.gz"))
if not files:
raise SystemExit("No CSV.gz files found in downloads/adsb_artifacts/")
print(f"Found {len(files)} files to concatenate")
# Extract dates from filenames to determine range
def extract_dates(path: Path) -> tuple[str, str]:
"""Extract start and end dates from filename"""
m = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv\.gz", path.name)
if m:
return m.group(1), m.group(2)
return None, None
# Collect all dates
all_dates = []
for f in files:
start, end = extract_dates(f)
if start and end:
all_dates.extend([start, end])
print(f" {f.name}: {start} to {end}")
if not all_dates:
raise SystemExit("Could not extract dates from filenames")
# Find earliest and latest dates
earliest = min(all_dates)
latest = max(all_dates)
print(f"\nDate range: {earliest} to {latest}")
# Read and concatenate all files
print("\nReading and concatenating files...")
frames = [pl.read_csv(f) for f in files]
df = pl.concat(frames, how="vertical", rechunk=True)
# Write output
output_path = Path("downloads") / f"openairframes_adsb_{earliest}_{latest}.csv.gz"
output_path.parent.mkdir(parents=True, exist_ok=True)
df.write_csv(output_path, compression="gzip")
print(f"\nWrote {output_path} with {df.height:,} rows")
+40
View File
@@ -0,0 +1,40 @@
#!/bin/bash
# Create download directory
mkdir -p downloads/adsb_artifacts
# Repository from the workflow comment
REPO="ggman12/OpenAirframes"
# Get last 15 runs of the workflow and download matching artifacts
gh run list \
--repo "$REPO" \
--workflow adsb-to-aircraft-multiple-day-run.yaml \
--limit 15 \
--json databaseId \
--jq '.[].databaseId' | while read -r run_id; do
echo "Checking run ID: $run_id"
# List artifacts for this run using the API
# Match pattern: openairframes_adsb-YYYY-MM-DD-YYYY-MM-DD (with second date)
gh api \
--paginate \
"repos/$REPO/actions/runs/$run_id/artifacts" \
--jq '.artifacts[] | select(.name | test("^openairframes_adsb-[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{4}-[0-9]{2}-[0-9]{2}$")) | .name' | while read -r artifact_name; do
# Check if artifact directory already exists and has files
if [ -d "downloads/adsb_artifacts/$artifact_name" ] && [ -n "$(ls -A "downloads/adsb_artifacts/$artifact_name" 2>/dev/null)" ]; then
echo " Skipping (already exists): $artifact_name"
continue
fi
echo " Downloading: $artifact_name"
gh run download "$run_id" \
--repo "$REPO" \
--name "$artifact_name" \
--dir "downloads/adsb_artifacts/$artifact_name"
done
done
echo "Download complete! Files saved to downloads/adsb_artifacts/"
+40 -34
View File
@@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Script to trigger historical-adsb workflow runs in 15-day chunks. Script to trigger adsb-to-aircraft-multiple-day-run workflow runs in monthly chunks.
Usage: Usage:
python scripts/run_historical_adsb_action.py --start-date 2025-01-01 --end-date 2025-06-01 python scripts/run_historical_adsb_action.py --start-date 2025-01-01 --end-date 2025-06-01
@@ -10,10 +10,14 @@ import argparse
import subprocess import subprocess
import sys import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
from calendar import monthrange
def generate_date_chunks(start_date_str, end_date_str, chunk_days=15): def generate_monthly_chunks(start_date_str, end_date_str):
"""Generate date ranges in fixed-day chunks from start to end date.""" """Generate date ranges in monthly chunks from start to end date.
End dates are exclusive (e.g., to process Jan 1-31, end_date should be Feb 1).
"""
start_date = datetime.strptime(start_date_str, '%Y-%m-%d') start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
end_date = datetime.strptime(end_date_str, '%Y-%m-%d') end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
@@ -21,31 +25,35 @@ def generate_date_chunks(start_date_str, end_date_str, chunk_days=15):
current = start_date current = start_date
while current < end_date: while current < end_date:
# Calculate end of current chunk # Get the first day of the next month (exclusive end)
chunk_end = current + timedelta(days=chunk_days) _, days_in_month = monthrange(current.year, current.month)
month_end = current.replace(day=days_in_month)
next_month_start = month_end + timedelta(days=1)
# Don't go past the global end date # Don't go past the global end date
if chunk_end > end_date: chunk_end = min(next_month_start, end_date)
chunk_end = end_date
chunks.append({ chunks.append({
'start': current.strftime('%Y-%m-%d'), 'start': current.strftime('%Y-%m-%d'),
'end': chunk_end.strftime('%Y-%m-%d') 'end': chunk_end.strftime('%Y-%m-%d')
}) })
current = chunk_end # Move to first day of next month
if next_month_start >= end_date:
break
current = next_month_start
return chunks return chunks
def trigger_workflow(start_date, end_date, chunk_days=1, branch='main', dry_run=False): def trigger_workflow(start_date, end_date, repo='ggman12/OpenAirframes', branch='main', dry_run=False):
"""Trigger the historical-adsb workflow via GitHub CLI.""" """Trigger the adsb-to-aircraft-multiple-day-run workflow via GitHub CLI."""
cmd = [ cmd = [
'gh', 'workflow', 'run', 'historical-adsb.yaml', 'gh', 'workflow', 'run', 'adsb-to-aircraft-multiple-day-run.yaml',
'--repo', repo,
'--ref', branch, '--ref', branch,
'-f', f'start_date={start_date}', '-f', f'start_date={start_date}',
'-f', f'end_date={end_date}', '-f', f'end_date={end_date}'
'-f', f'chunk_days={chunk_days}'
] ]
if dry_run: if dry_run:
@@ -66,7 +74,8 @@ def trigger_workflow(start_date, end_date, chunk_days=1, branch='main', dry_run=
# Get the most recent run (should be the one we just triggered) # Get the most recent run (should be the one we just triggered)
list_cmd = [ list_cmd = [
'gh', 'run', 'list', 'gh', 'run', 'list',
'--workflow', 'historical-adsb.yaml', '--repo', repo,
'--workflow', 'adsb-to-aircraft-multiple-day-run.yaml',
'--branch', branch, '--branch', branch,
'--limit', '1', '--limit', '1',
'--json', 'databaseId', '--json', 'databaseId',
@@ -84,29 +93,25 @@ def trigger_workflow(start_date, end_date, chunk_days=1, branch='main', dry_run=
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Trigger historical-adsb workflow runs in monthly chunks' description='Trigger adsb-to-aircraft-multiple-day-run workflow runs in monthly chunks'
) )
parser.add_argument( parser.add_argument(
'--start-date', '--start-date', '--start_date',
dest='start_date',
required=True, required=True,
help='Start date in YYYY-MM-DD format (inclusive)' help='Start date in YYYY-MM-DD format (inclusive)'
) )
parser.add_argument( parser.add_argument(
'--end-date', '--end-date', '--end_date',
dest='end_date',
required=True, required=True,
help='End date in YYYY-MM-DD format (exclusive)' help='End date in YYYY-MM-DD format (exclusive)'
) )
parser.add_argument( parser.add_argument(
'--chunk-days', '--repo',
type=int, type=str,
default=1, default='ggman12/OpenAirframes',
help='Days per job chunk within each workflow run (default: 1)' help='GitHub repository (default: ggman12/OpenAirframes)'
)
parser.add_argument(
'--workflow-chunk-days',
type=int,
default=15,
help='Days per workflow run (default: 15)'
) )
parser.add_argument( parser.add_argument(
'--branch', '--branch',
@@ -132,17 +137,17 @@ def main():
try: try:
start = datetime.strptime(args.start_date, '%Y-%m-%d') start = datetime.strptime(args.start_date, '%Y-%m-%d')
end = datetime.strptime(args.end_date, '%Y-%m-%d') end = datetime.strptime(args.end_date, '%Y-%m-%d')
if start >= end: if start > end:
print("Error: start_date must be before end_date") print("Error: start_date must be before or equal to end_date")
sys.exit(1) sys.exit(1)
except ValueError as e: except ValueError as e:
print(f"Error: Invalid date format - {e}") print(f"Error: Invalid date format - {e}")
sys.exit(1) sys.exit(1)
# Generate date chunks # Generate monthly chunks
chunks = generate_date_chunks(args.start_date, args.end_date, chunk_days=args.workflow_chunk_days) chunks = generate_monthly_chunks(args.start_date, args.end_date)
print(f"\nGenerating {len(chunks)} workflow runs ({args.workflow_chunk_days} days each) on branch '{args.branch}':") print(f"\nGenerating {len(chunks)} monthly workflow runs on branch '{args.branch}' (repo: {args.repo}):")
for i, chunk in enumerate(chunks, 1): for i, chunk in enumerate(chunks, 1):
print(f" {i}. {chunk['start']} to {chunk['end']}") print(f" {i}. {chunk['start']} to {chunk['end']}")
@@ -165,7 +170,7 @@ def main():
success, run_id = trigger_workflow( success, run_id = trigger_workflow(
chunk['start'], chunk['start'],
chunk['end'], chunk['end'],
chunk_days=args.chunk_days, repo=args.repo,
branch=args.branch, branch=args.branch,
dry_run=args.dry_run dry_run=args.dry_run
) )
@@ -189,11 +194,12 @@ def main():
if triggered_runs and not args.dry_run: if triggered_runs and not args.dry_run:
import json import json
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
runs_file = f"./triggered_runs_{timestamp}.json" runs_file = f"./output/triggered_runs_{timestamp}.json"
with open(runs_file, 'w') as f: with open(runs_file, 'w') as f:
json.dump({ json.dump({
'start_date': args.start_date, 'start_date': args.start_date,
'end_date': args.end_date, 'end_date': args.end_date,
'repo': args.repo,
'branch': args.branch, 'branch': args.branch,
'runs': triggered_runs 'runs': triggered_runs
}, f, indent=2) }, f, indent=2)
+82
View File
@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
Run src.adsb.main in an isolated git worktree so edits in the main
working tree won't affect subprocess imports during the run.
Usage:
python scripts/run_main_isolated.py 2026-01-01
python scripts/run_main_isolated.py --start_date 2026-01-01 --end_date 2026-01-03
"""
import argparse
import os
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
def run(
cmd: list[str],
*,
cwd: Path | None = None,
check: bool = True,
) -> subprocess.CompletedProcess:
print(f"\n>>> {' '.join(cmd)}")
return subprocess.run(cmd, cwd=cwd, check=check)
def main() -> int:
parser = argparse.ArgumentParser(description="Run src.adsb.main in an isolated worktree")
parser.add_argument("date", nargs="?", help="Single date to process (YYYY-MM-DD)")
parser.add_argument("--start_date", help="Start date (inclusive, YYYY-MM-DD)")
parser.add_argument("--end_date", help="End date (exclusive, YYYY-MM-DD)")
parser.add_argument("--concat_with_latest_csv", action="store_true", help="Also concatenate with latest CSV from GitHub releases")
args = parser.parse_args()
if args.date and (args.start_date or args.end_date):
raise SystemExit("Use a single date or --start_date/--end_date, not both.")
if args.date:
datetime.strptime(args.date, "%Y-%m-%d")
main_args = ["--date", args.date]
else:
if not args.start_date or not args.end_date:
raise SystemExit("Provide --start_date and --end_date, or a single date.")
datetime.strptime(args.start_date, "%Y-%m-%d")
datetime.strptime(args.end_date, "%Y-%m-%d")
main_args = ["--start_date", args.start_date, "--end_date", args.end_date]
if args.concat_with_latest_csv:
main_args.append("--concat_with_latest_csv")
repo_root = Path(__file__).resolve().parents[1]
snapshots_root = repo_root / ".snapshots"
snapshots_root.mkdir(exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
snapshot_root = snapshots_root / f"run_{timestamp}"
snapshot_src = snapshot_root / "src"
exit_code = 0
try:
shutil.copytree(repo_root / "src", snapshot_src)
runner = (
"import sys, runpy; "
f"sys.path.insert(0, {repr(str(snapshot_root))}); "
f"sys.argv = ['src.adsb.main'] + {main_args!r}; "
"runpy.run_module('src.adsb.main', run_name='__main__')"
)
cmd = [sys.executable, "-c", runner]
run(cmd, cwd=repo_root)
except subprocess.CalledProcessError as exc:
exit_code = exc.returncode
finally:
shutil.rmtree(snapshot_root, ignore_errors=True)
return exit_code
if __name__ == "__main__":
raise SystemExit(main())
+242
View File
@@ -0,0 +1,242 @@
#!/usr/bin/env python3
"""
Parse TheAirTraffic Database CSV and produce community_submission.v1 JSON.
Source: "TheAirTraffic Database - Aircraft 2.csv"
Output: community/YYYY-MM-DD/theairtraffic_<date>_<hash>.json
Categories in the spreadsheet columns (paired: name, registrations, separator):
Col 1-3: Business
Col 4-6: Government
Col 7-9: People
Col 10-12: Sports
Col 13-15: Celebrity
Col 16-18: State Govt./Law
Col 19-21: Other
Col 22-24: Test Aircraft
Col 25-27: YouTubers
Col 28-30: Formula 1 VIP's
Col 31-33: Active GII's and GIII's (test/demo aircraft)
Col 34-37: Russia & Ukraine (extra col for old/new)
Col 38-40: Helicopters & Blimps
Col 41-43: Unique Reg's
Col 44-46: Saudi & UAE
Col 47-49: Schools
Col 50-52: Special Charter
Col 53-55: Unknown Owners
Col 56-59: Frequent Flyers (extra cols: name, aircraft, logged, hours)
"""
import csv
import json
import hashlib
import re
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
# ── Category mapping ────────────────────────────────────────────────────────
# Each entry: (name_col, reg_col, owner_category_tags)
# owner_category_tags is a dict of tag keys to add beyond "owner"
CATEGORY_COLUMNS = [
# (name_col, reg_col, {tag_key: tag_value, ...})
(1, 2, {"owner_category_0": "business"}),
(4, 5, {"owner_category_0": "government"}),
(7, 8, {"owner_category_0": "celebrity"}),
(10, 11, {"owner_category_0": "sports"}),
(13, 14, {"owner_category_0": "celebrity"}),
(16, 17, {"owner_category_0": "government", "owner_category_1": "law_enforcement"}),
(19, 20, {"owner_category_0": "other"}),
(22, 23, {"owner_category_0": "test_aircraft"}),
(25, 26, {"owner_category_0": "youtuber", "owner_category_1": "celebrity"}),
(28, 29, {"owner_category_0": "celebrity", "owner_category_1": "motorsport"}),
(31, 32, {"owner_category_0": "test_aircraft"}),
# Russia & Ukraine: col 34=name, col 35 or 36 may have reg
(34, 35, {"owner_category_0": "russia_ukraine"}),
(38, 39, {"owner_category_0": "celebrity", "category": "helicopter_or_blimp"}),
(41, 42, {"owner_category_0": "other"}),
(44, 45, {"owner_category_0": "government", "owner_category_1": "royal_family"}),
(47, 48, {"owner_category_0": "education"}),
(50, 51, {"owner_category_0": "charter"}),
(53, 54, {"owner_category_0": "unknown"}),
(56, 57, {"owner_category_0": "celebrity"}), # Frequent Flyers name col, aircraft col
]
# First data row index (0-based) in the CSV
DATA_START_ROW = 4
# ── Contributor info ────────────────────────────────────────────────────────
CONTRIBUTOR_NAME = "TheAirTraffic"
# Deterministic UUID v5 from contributor name
CONTRIBUTOR_UUID = str(uuid.uuid5(uuid.NAMESPACE_URL, "https://theairtraffic.com"))
# Citation
CITATION = "https://docs.google.com/spreadsheets/d/1JHhfJBnJPNBA6TgiSHjkXFkHBdVTTz_nXxaUDRWcHpk"
def looks_like_military_serial(reg: str) -> bool:
"""
Detect military-style serials like 92-9000, 82-8000, 98-0001
or pure numeric IDs like 929000, 828000, 980001.
These aren't standard civil registrations; use openairframes_id.
"""
# Pattern: NN-NNNN
if re.match(r'^\d{2}-\d{4}$', reg):
return True
# Pure 6-digit numbers (likely ICAO hex or military mode-S)
if re.match(r'^\d{6}$', reg):
return True
# Short numeric-only (1-5 digits) like "01", "02", "676"
if re.match(r'^\d{1,5}$', reg):
return True
return False
def normalize_reg(raw: str) -> str:
"""Clean up a registration string."""
reg = raw.strip().rstrip(',').strip()
# Remove carriage returns and other whitespace
reg = reg.replace('\r', '').replace('\n', '').strip()
return reg
def parse_regs(cell_value: str) -> list[str]:
"""
Parse a cell that may contain one or many registrations,
separated by commas, possibly wrapped in quotes.
"""
if not cell_value or not cell_value.strip():
return []
# Some cells have ADS-B exchange URLs skip those
if 'globe.adsbexchange.com' in cell_value:
return []
if cell_value.strip() in ('.', ',', ''):
return []
results = []
# Split on comma
parts = cell_value.split(',')
for part in parts:
reg = normalize_reg(part)
if not reg:
continue
# Skip URLs, section labels, etc.
if reg.startswith('http') or reg.startswith('Link') or reg == 'Section 1':
continue
# Skip if it's just whitespace or dots
if reg in ('.', '..', '...'):
continue
results.append(reg)
return results
def make_submission(
reg: str,
owner: str,
category_tags: dict[str, str],
) -> dict:
"""Build a single community_submission.v1 object."""
entry: dict = {}
# Decide identifier field
if looks_like_military_serial(reg):
entry["openairframes_id"] = reg
else:
entry["registration_number"] = reg
# Tags
tags: dict = {
"citation_0": CITATION,
}
if owner:
tags["owner"] = owner.strip()
tags.update(category_tags)
entry["tags"] = tags
return entry
def main():
csv_path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(
"/Users/jonahgoode/Downloads/TheAirTraffic Database - Aircraft 2.csv"
)
if not csv_path.exists():
print(f"ERROR: CSV not found at {csv_path}", file=sys.stderr)
sys.exit(1)
# Read CSV
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
rows = list(reader)
print(f"Read {len(rows)} rows from {csv_path.name}")
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
submissions: list[dict] = []
seen: set[tuple] = set() # (reg, owner) dedup
for row_idx in range(DATA_START_ROW, len(rows)):
row = rows[row_idx]
if len(row) < 3:
continue
for name_col, reg_col, cat_tags in CATEGORY_COLUMNS:
if reg_col >= len(row) or name_col >= len(row):
continue
owner_raw = row[name_col].strip().rstrip(',').strip()
reg_raw = row[reg_col]
# Clean owner name
owner = owner_raw.replace('\r', '').replace('\n', '').strip()
if not owner or owner in ('.', ',', 'Section 1'):
continue
# Skip header-like values
if owner.startswith('http') or owner.startswith('Link '):
continue
regs = parse_regs(reg_raw)
if not regs:
# For Russia & Ukraine, try the next column too (col 35 might have old reg, col 36 new)
if name_col == 34 and reg_col + 1 < len(row):
regs = parse_regs(row[reg_col + 1])
for reg in regs:
key = (reg, owner)
if key in seen:
continue
seen.add(key)
submissions.append(make_submission(reg, owner, cat_tags))
print(f"Generated {len(submissions)} submissions")
# Write output
proj_root = Path(__file__).resolve().parent.parent
out_dir = proj_root / "community" / date_str
out_dir.mkdir(parents=True, exist_ok=True)
out_file = out_dir / f"theairtraffic_{date_str}.json"
with open(out_file, 'w', encoding='utf-8') as f:
json.dump(submissions, f, indent=2, ensure_ascii=False)
print(f"Written to {out_file}")
print(f"Sample entry:\n{json.dumps(submissions[0], indent=2)}")
# Quick stats
cats = {}
for s in submissions:
c = s['tags'].get('owner_category_0', 'NONE')
cats[c] = cats.get(c, 0) + 1
print("\nCategory breakdown:")
for c, n in sorted(cats.items(), key=lambda x: -x[1]):
print(f" {c}: {n}")
if __name__ == "__main__":
main()
+69
View File
@@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""Validate the generated theairtraffic JSON output."""
import json
import glob
import sys
# Find the latest output
files = sorted(glob.glob("community/2026-02-*/theairtraffic_*.json"))
if not files:
print("No output files found!")
sys.exit(1)
path = files[-1]
print(f"Validating: {path}")
with open(path) as f:
data = json.load(f)
print(f"Total entries: {len(data)}")
# Check military serial handling
mil = [d for d in data if "openairframes_id" in d]
print(f"\nEntries using openairframes_id: {len(mil)}")
for m in mil[:10]:
print(f" {m['openairframes_id']} -> owner: {m['tags'].get('owner','?')}")
# Check youtuber entries
yt = [d for d in data if d["tags"].get("owner_category_0") == "youtuber"]
print(f"\nYouTuber entries: {len(yt)}")
for y in yt[:5]:
reg = y.get("registration_number", y.get("openairframes_id"))
c0 = y["tags"].get("owner_category_0")
c1 = y["tags"].get("owner_category_1")
print(f" {reg} -> owner: {y['tags']['owner']}, cat0: {c0}, cat1: {c1}")
# Check US Govt / military
gov = [d for d in data if d["tags"].get("owner") == "United States of America 747/757"]
print(f"\nUSA 747/757 entries: {len(gov)}")
for g in gov:
oid = g.get("openairframes_id", g.get("registration_number"))
print(f" {oid}")
# Schema validation
issues = 0
for i, d in enumerate(data):
has_id = any(k in d for k in ["registration_number", "transponder_code_hex", "openairframes_id"])
if not has_id:
print(f" Entry {i}: no identifier!")
issues += 1
if "tags" not in d:
print(f" Entry {i}: no tags!")
issues += 1
# Check tag key format
for k in d.get("tags", {}):
import re
if not re.match(r"^[a-z][a-z0-9_]{0,63}$", k):
print(f" Entry {i}: invalid tag key '{k}'")
issues += 1
print(f"\nSchema issues: {issues}")
# Category breakdown
cats = {}
for s in data:
c = s["tags"].get("owner_category_0", "NONE")
cats[c] = cats.get(c, 0) + 1
print("\nCategory breakdown:")
for c, n in sorted(cats.items(), key=lambda x: -x[1]):
print(f" {c}: {n}")
+2 -51
View File
@@ -170,7 +170,7 @@ def load_parquet_part(part_id: int, date: str) -> pl.DataFrame:
# Convert to timezone-naive datetime # Convert to timezone-naive datetime
if df["time"].dtype == pl.Datetime: if df["time"].dtype == pl.Datetime:
df = df.with_columns(pl.col("time").dt.replace_time_zone(None)) df = df.with_columns(pl.col("time").dt.replace_time_zone(None))
os.remove(parquet_file)
return df return df
@@ -194,53 +194,4 @@ def concat_compressed_dfs(df_base, df_new):
"""Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.""" """Concatenate base and new compressed dataframes, keeping the most informative row per ICAO."""
# Combine both dataframes # Combine both dataframes
df_combined = pl.concat([df_base, df_new]) df_combined = pl.concat([df_base, df_new])
return df_combined
# Sort by ICAO and time
df_combined = df_combined.sort(['icao', 'time'])
# Fill null values
for col in COLUMNS:
if col in df_combined.columns:
df_combined = df_combined.with_columns(pl.col(col).fill_null(""))
# Apply compression logic per ICAO to get the best row
icao_groups = df_combined.partition_by('icao', as_dict=True, maintain_order=True)
compressed_dfs = []
for icao_key, group_df in icao_groups.items():
icao = icao_key[0]
compressed = compress_df_polars(group_df, str(icao))
compressed_dfs.append(compressed)
if compressed_dfs:
df_compressed = pl.concat(compressed_dfs)
else:
df_compressed = df_combined.head(0)
# Sort by time
df_compressed = df_compressed.sort('time')
return df_compressed
def get_latest_aircraft_adsb_csv_df():
"""Download and load the latest ADS-B CSV from GitHub releases."""
from get_latest_release import download_latest_aircraft_adsb_csv
import re
csv_path = download_latest_aircraft_adsb_csv()
df = pl.read_csv(csv_path, null_values=[""])
# Fill nulls with empty strings
for col in df.columns:
if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null(""))
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
date_str = match.group(1)
return df, date_str
+48 -15
View File
@@ -1,34 +1,67 @@
from pathlib import Path from pathlib import Path
import polars as pl import polars as pl
import argparse import argparse
import os
OUTPUT_DIR = Path("./outputs") OUTPUT_DIR = Path("./data/output")
CORRECT_ORDER_OF_COLUMNS = ["time", "icao", "r", "t", "dbFlags", "ownOp", "year", "desc", "aircraft_category"]
def main(): def main():
parser = argparse.ArgumentParser(description="Concatenate compressed parquet files for a single day") parser = argparse.ArgumentParser(description="Concatenate compressed parquet files for a single day")
parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format") parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format")
parser.add_argument("--concat_with_latest_csv", action="store_true", help="Whether to also concatenate with the latest CSV from GitHub releases")
args = parser.parse_args() args = parser.parse_args()
compressed_dir = OUTPUT_DIR / "compressed" compressed_dir = OUTPUT_DIR / "compressed"
date_dir = compressed_dir / args.date date_dir = compressed_dir / args.date
if not date_dir.is_dir():
raise FileNotFoundError(f"No date folder found: {date_dir}")
parquet_files = sorted(date_dir.glob("*.parquet")) parquet_files = sorted(date_dir.glob("*.parquet"))
if not parquet_files: df = None
raise FileNotFoundError(f"No parquet files found in {date_dir}") if parquet_files: # TODO: This logic could be updated slightly.
print(f"No parquet files found in {date_dir}")
frames = [pl.read_parquet(p) for p in parquet_files] frames = [pl.read_parquet(p) for p in parquet_files]
df = pl.concat(frames, how="vertical", rechunk=True) df = pl.concat(frames, how="vertical", rechunk=True)
df = df.sort(["time", "icao"]) df = df.sort(["time", "icao"])
output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}_{args.date}.parquet" df = df.select(CORRECT_ORDER_OF_COLUMNS)
print(f"Writing combined parquet to {output_path} with {df.height} rows")
df.write_parquet(output_path) output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.parquet"
print(f"Writing combined parquet to {output_path} with {df.height} rows")
df.write_parquet(output_path)
csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}_{args.date}.csv" csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.csv.gz"
print(f"Writing combined csv to {csv_output_path} with {df.height} rows") print(f"Writing combined csv.gz to {csv_output_path} with {df.height} rows")
df.write_csv(csv_output_path) df.write_csv(csv_output_path, compression="gzip")
if args.concat_with_latest_csv:
print("Loading latest CSV from GitHub releases to concatenate with...")
from src.get_latest_release import get_latest_aircraft_adsb_csv_df
from datetime import datetime
df_latest_csv, csv_start_date, csv_end_date = get_latest_aircraft_adsb_csv_df()
# Compare dates: end_date is exclusive, so if csv_end_date > args.date,
# the latest CSV already includes this day's data
csv_end_dt = datetime.strptime(csv_end_date, "%Y-%m-%d")
args_dt = datetime.strptime(args.date, "%Y-%m-%d")
if df is None or csv_end_dt >= args_dt:
print(f"Latest CSV already includes data through {args.date} (end_date={csv_end_date} is exclusive)")
print("Writing latest CSV directly without concatenation to avoid duplicates")
os.makedirs(OUTPUT_DIR, exist_ok=True)
final_csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{csv_start_date}_{csv_end_date}.csv.gz"
df_latest_csv = df_latest_csv.select(CORRECT_ORDER_OF_COLUMNS)
df_latest_csv.write_csv(final_csv_output_path, compression="gzip")
else:
print(f"Concatenating latest CSV (through {csv_end_date}) with new data ({args.date})")
# Ensure column order matches before concatenating
df_latest_csv = df_latest_csv.select(CORRECT_ORDER_OF_COLUMNS)
from src.adsb.compress_adsb_to_aircraft_data import concat_compressed_dfs
df_final = concat_compressed_dfs(df_latest_csv, df)
df_final = df_final.select(CORRECT_ORDER_OF_COLUMNS)
final_csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{csv_start_date}_{args.date}.csv.gz"
df_final.write_csv(final_csv_output_path, compression="gzip")
print(f"Final CSV written to {final_csv_output_path}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()
+112 -39
View File
@@ -16,7 +16,7 @@ import sys
import urllib.error import urllib.error
import urllib.request import urllib.request
from datetime import datetime from datetime import datetime
import time
import orjson import orjson
import pyarrow as pa import pyarrow as pa
import pyarrow.parquet as pq import pyarrow.parquet as pq
@@ -88,7 +88,7 @@ def _fetch_releases_from_repo(year: str, version_date: str) -> list:
else: else:
print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status} {response.reason}") print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status} {response.reason}")
if attempt < max_retries: if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry...") print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay) time.sleep(retry_delay)
else: else:
print(f"Giving up after {max_retries} attempts") print(f"Giving up after {max_retries} attempts")
@@ -96,7 +96,7 @@ def _fetch_releases_from_repo(year: str, version_date: str) -> list:
except Exception as e: except Exception as e:
print(f"Request exception (attempt {attempt}/{max_retries}): {e}") print(f"Request exception (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries: if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry...") print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay) time.sleep(retry_delay)
else: else:
print(f"Giving up after {max_retries} attempts") print(f"Giving up after {max_retries} attempts")
@@ -123,47 +123,105 @@ def fetch_releases(version_date: str) -> list:
# For last day of year, also check next year's repo if nothing found # For last day of year, also check next year's repo if nothing found
if not releases and version_date.endswith(".12.31"): if not releases and version_date.endswith(".12.31"):
next_year = str(int(year) + 1) next_year = str(int(year) + 1)
print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo...") print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo")
releases = _fetch_releases_from_repo(next_year, version_date) releases = _fetch_releases_from_repo(next_year, version_date)
return releases return releases
def download_asset(asset_url: str, file_path: str) -> bool: def download_asset(asset_url: str, file_path: str, expected_size: int | None = None) -> bool:
"""Download a single release asset.""" """Download a single release asset with size verification.
Args:
asset_url: URL to download from
file_path: Local path to save to
expected_size: Expected file size in bytes (for verification)
Returns:
True if download succeeded and size matches (if provided), False otherwise
"""
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True) os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
# Check if file exists and has correct size
if os.path.exists(file_path): if os.path.exists(file_path):
print(f"[SKIP] {file_path} already downloaded.") if expected_size is not None:
return True actual_size = os.path.getsize(file_path)
if actual_size == expected_size:
print(f"Downloading {asset_url}...") print(f"[SKIP] {file_path} already downloaded and verified ({actual_size} bytes).")
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(40) # 40-second timeout
req = urllib.request.Request(asset_url, headers=HEADERS)
with urllib.request.urlopen(req) as response:
signal.alarm(0)
if response.status == 200:
with open(file_path, "wb") as file:
while True:
chunk = response.read(8192)
if not chunk:
break
file.write(chunk)
print(f"Saved {file_path}")
return True return True
else: else:
print(f"Failed to download {asset_url}: {response.status} {response.msg}") print(f"[WARN] {file_path} exists but size mismatch (expected {expected_size}, got {actual_size}). Re-downloading.")
os.remove(file_path)
else:
print(f"[SKIP] {file_path} already downloaded.")
return True
max_retries = 2
retry_delay = 30
timeout_seconds = 140
for attempt in range(1, max_retries + 1):
print(f"Downloading {asset_url} (attempt {attempt}/{max_retries})")
try:
req = urllib.request.Request(asset_url, headers=HEADERS)
with urllib.request.urlopen(req, timeout=timeout_seconds) as response:
if response.status == 200:
with open(file_path, "wb") as file:
while True:
chunk = response.read(8192)
if not chunk:
break
file.write(chunk)
# Verify file size if expected_size was provided
if expected_size is not None:
actual_size = os.path.getsize(file_path)
if actual_size != expected_size:
print(f"[ERROR] Size mismatch for {file_path}: expected {expected_size} bytes, got {actual_size} bytes")
os.remove(file_path)
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
continue
return False
print(f"Saved {file_path} ({actual_size} bytes, verified)")
else:
print(f"Saved {file_path}")
return True
else:
print(f"Failed to download {asset_url}: {response.status} {response.msg}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
return False
except urllib.error.HTTPError as e:
if e.code == 404:
print(f"404 Not Found: {asset_url}")
raise Exception(f"Asset not found (404): {asset_url}")
else:
print(f"HTTP error occurred (attempt {attempt}/{max_retries}): {e.code} {e.reason}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
return False
except urllib.error.URLError as e:
print(f"URL/Timeout error (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
return False return False
except DownloadTimeoutException as e: except Exception as e:
print(f"Download aborted for {asset_url}: {e}") print(f"An error occurred (attempt {attempt}/{max_retries}): {e}")
return False if attempt < max_retries:
except Exception as e: print(f"Waiting {retry_delay} seconds before retry")
print(f"An error occurred while downloading {asset_url}: {e}") time.sleep(retry_delay)
return False else:
return False
return False
def extract_split_archive(file_paths: list, extract_dir: str) -> bool: def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
@@ -202,7 +260,6 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
stdin=cat_proc.stdout, stdin=cat_proc.stdout,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
check=True
) )
cat_proc.stdout.close() cat_proc.stdout.close()
cat_stderr = cat_proc.stderr.read().decode() if cat_proc.stderr else "" cat_stderr = cat_proc.stderr.read().decode() if cat_proc.stderr else ""
@@ -211,6 +268,24 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
if cat_stderr: if cat_stderr:
print(f"cat stderr: {cat_stderr}") print(f"cat stderr: {cat_stderr}")
tar_stderr = result.stderr.decode() if result.stderr else ""
if result.returncode != 0:
# GNU tar exits non-zero for format issues that BSD tar silently
# tolerates (e.g. trailing junk after the last valid entry).
# Check whether files were actually extracted before giving up.
extracted_items = os.listdir(extract_dir)
if extracted_items:
print(f"[WARN] tar exited {result.returncode} but extracted "
f"{len(extracted_items)} items — treating as success")
if tar_stderr:
print(f"tar stderr: {tar_stderr}")
else:
print(f"Failed to extract split archive (tar exit {result.returncode})")
if tar_stderr:
print(f"tar stderr: {tar_stderr}")
shutil.rmtree(extract_dir, ignore_errors=True)
return False
print(f"Successfully extracted archive to {extract_dir}") print(f"Successfully extracted archive to {extract_dir}")
# Delete tar files immediately after extraction # Delete tar files immediately after extraction
@@ -227,11 +302,9 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
print(f"Disk space after tar deletion: {free_gb:.1f}GB free") print(f"Disk space after tar deletion: {free_gb:.1f}GB free")
return True return True
except subprocess.CalledProcessError as e: except Exception as e:
stderr_output = e.stderr.decode() if e.stderr else ""
print(f"Failed to extract split archive: {e}") print(f"Failed to extract split archive: {e}")
if stderr_output: shutil.rmtree(extract_dir, ignore_errors=True)
print(f"tar stderr: {stderr_output}")
return False return False
@@ -505,7 +578,7 @@ def create_parquet_for_day(day, keep_folders: bool = False):
print(f"Parquet file already exists: {parquet_path}") print(f"Parquet file already exists: {parquet_path}")
return parquet_path return parquet_path
print(f"Creating parquet for {version_date}...") print(f"Creating parquet for {version_date}")
rows_processed = process_version_date(version_date, keep_folders) rows_processed = process_version_date(version_date, keep_folders)
if rows_processed > 0 and parquet_path.exists(): if rows_processed > 0 and parquet_path.exists():
+3 -2
View File
@@ -77,8 +77,9 @@ def download_and_extract(version_date: str) -> str | None:
for asset in use_assets: for asset in use_assets:
asset_name = asset["name"] asset_name = asset["name"]
asset_url = asset["browser_download_url"] asset_url = asset["browser_download_url"]
asset_size = asset.get("size") # Get expected file size
file_path = os.path.join(OUTPUT_DIR, asset_name) file_path = os.path.join(OUTPUT_DIR, asset_name)
if download_asset(asset_url, file_path): if download_asset(asset_url, file_path, expected_size=asset_size):
downloaded_files.append(file_path) downloaded_files.append(file_path)
if not downloaded_files: if not downloaded_files:
@@ -122,7 +123,7 @@ def process_single_day(target_day: datetime) -> tuple[str | None, list[str]]:
from pathlib import Path from pathlib import Path
import tarfile import tarfile
NUMBER_PARTS = 16 NUMBER_PARTS = 4
def split_folders_into_gzip_archives(extract_dir: Path, tar_output_dir: Path, icaos: list[str], parts = NUMBER_PARTS) -> list[str]: def split_folders_into_gzip_archives(extract_dir: Path, tar_output_dir: Path, icaos: list[str], parts = NUMBER_PARTS) -> list[str]:
traces_dir = extract_dir / "traces" traces_dir = extract_dir / "traces"
buckets = sorted(traces_dir.iterdir()) buckets = sorted(traces_dir.iterdir())
+57 -16
View File
@@ -3,33 +3,74 @@ Main pipeline for processing ADS-B data from adsb.lol.
Usage: Usage:
python -m src.adsb.main --date 2026-01-01 python -m src.adsb.main --date 2026-01-01
python -m src.adsb.main --start_date 2026-01-01 --end_date 2026-01-03
""" """
import argparse import argparse
import subprocess import subprocess
import sys import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
import polars as pl
from src.adsb.download_and_list_icaos import NUMBER_PARTS from src.adsb.download_and_list_icaos import NUMBER_PARTS
def main(): def main():
parser = argparse.ArgumentParser(description="Process ADS-B data for a single day") parser = argparse.ArgumentParser(description="Process ADS-B data for a single day or date range")
parser.add_argument("--date", type=str, required=True) parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format")
parser.add_argument("--start_date", type=str, help="Start date (inclusive, YYYY-MM-DD)")
parser.add_argument("--end_date", type=str, help="End date (exclusive, YYYY-MM-DD)")
parser.add_argument("--concat_with_latest_csv", action="store_true", help="Also concatenate with latest CSV from GitHub releases")
args = parser.parse_args() args = parser.parse_args()
date_str = datetime.strptime(args.date, "%Y-%m-%d").strftime("%Y-%m-%d") if args.date and (args.start_date or args.end_date):
print(f"Processing day: {date_str}") raise SystemExit("Use --date or --start_date/--end_date, not both.")
# Download and split if args.date:
subprocess.run([sys.executable, "-m", "src.adsb.download_and_list_icaos", "--date", date_str], check=True) start_date = datetime.strptime(args.date, "%Y-%m-%d")
end_date = start_date + timedelta(days=1)
# Process parts else:
for part_id in range(NUMBER_PARTS): if not args.start_date or not args.end_date:
subprocess.run([sys.executable, "-m", "src.adsb.process_icao_chunk", "--part-id", str(part_id), "--date", date_str], check=True) raise SystemExit("Provide --start_date and --end_date, or use --date.")
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
# Concatenate end_date = datetime.strptime(args.end_date, "%Y-%m-%d")
subprocess.run([sys.executable, "src/adsb/concat_parquet_to_final.py", "--date", date_str], check=True)
current = start_date
while current < end_date:
date_str = current.strftime("%Y-%m-%d")
print(f"Processing day: {date_str}")
# Download and split
subprocess.run([sys.executable, "-m", "src.adsb.download_and_list_icaos", "--date", date_str], check=True)
# Process parts
for part_id in range(NUMBER_PARTS):
subprocess.run([sys.executable, "-m", "src.adsb.process_icao_chunk", "--part-id", str(part_id), "--date", date_str], check=True)
# Concatenate
concat_cmd = [sys.executable, "-m", "src.adsb.concat_parquet_to_final", "--date", date_str]
if args.concat_with_latest_csv:
concat_cmd.append("--concat_with_latest_csv")
subprocess.run(concat_cmd, check=True)
current += timedelta(days=1)
if end_date - start_date > timedelta(days=1):
dates = []
cur = start_date
while cur < end_date:
dates.append(cur.strftime("%Y-%m-%d"))
cur += timedelta(days=1)
csv_files = [
f"data/outputs/openairframes_adsb_{d}_{d}.csv"
for d in dates
]
frames = [pl.read_csv(p) for p in csv_files]
df = pl.concat(frames, how="vertical", rechunk=True)
output_path = f"data/outputs/openairframes_adsb_{start_date.strftime('%Y-%m-%d')}_{end_date.strftime('%Y-%m-%d')}.csv"
df.write_csv(output_path)
print(f"Wrote combined CSV: {output_path}")
print("Done") print("Done")
+10 -4
View File
@@ -31,9 +31,6 @@ from src.adsb.download_adsb_data_to_parquet import (
) )
CHUNK_OUTPUT_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks")
os.makedirs(CHUNK_OUTPUT_DIR, exist_ok=True)
# Smaller batch size for memory efficiency # Smaller batch size for memory efficiency
BATCH_SIZE = 100_000 BATCH_SIZE = 100_000
@@ -126,7 +123,16 @@ def main():
print(f"Processing part {args.part_id} for {args.date}") print(f"Processing part {args.part_id} for {args.date}")
# Get specific archive file for this part # Get specific archive file for this part
archive_path = os.path.join(OUTPUT_DIR, "adsb_archives", args.date, f"{args.date}_part_{args.part_id}.tar.gz") archive_dir = os.path.join(OUTPUT_DIR, "adsb_archives", args.date)
archive_path = os.path.join(archive_dir, f"{args.date}_part_{args.part_id}.tar.gz")
if not os.path.isfile(archive_path):
print(f"ERROR: Archive not found: {archive_path}")
if os.path.isdir(archive_dir):
print(f"Files in {archive_dir}: {os.listdir(archive_dir)}")
else:
print(f"Directory does not exist: {archive_dir}")
sys.exit(1)
# Extract and collect trace files # Extract and collect trace files
trace_map = build_trace_file_map(archive_path) trace_map = build_trace_file_map(archive_path)
-173
View File
@@ -1,173 +0,0 @@
#!/usr/bin/env python3
"""
Run the full ADS-B processing pipeline locally.
Downloads adsb.lol data, processes trace files, and outputs openairframes_adsb CSV.
Usage:
# Single day (yesterday by default)
python -m src.adsb.run_local
# Single day (specific date, processes 2024-01-15 only)
python -m src.adsb.run_local 2024-01-15 2024-01-16
# Date range (end date is exclusive)
python -m src.adsb.run_local 2024-01-01 2024-01-07
"""
import argparse
import os
import subprocess
import sys
from datetime import datetime, timedelta
def run_cmd(cmd: list[str], description: str) -> None:
"""Run a command and exit on failure."""
print(f"\n>>> {' '.join(cmd)}")
result = subprocess.run(cmd)
if result.returncode != 0:
print(f"ERROR: {description} failed with exit code {result.returncode}")
sys.exit(result.returncode)
def main():
parser = argparse.ArgumentParser(
description="Run full ADS-B processing pipeline locally",
usage="python -m src.adsb.run_local [start_date] [end_date]"
)
parser.add_argument(
"start_date",
nargs="?",
help="Start date (YYYY-MM-DD, inclusive). Default: yesterday"
)
parser.add_argument(
"end_date",
nargs="?",
help="End date (YYYY-MM-DD, exclusive). If omitted, processes single day (start_date + 1)"
)
parser.add_argument(
"--chunks",
type=int,
default=4,
help="Number of parallel chunks (default: 4)"
)
parser.add_argument(
"--chunk-days",
type=int,
default=1,
help="Days per chunk for date range processing (default: 1)"
)
parser.add_argument(
"--skip-base",
action="store_true",
default=True,
help="Skip downloading and merging with base release (default: True for historical runs)"
)
args = parser.parse_args()
# Determine dates
if args.start_date:
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
else:
start_date = datetime.utcnow() - timedelta(days=1)
if args.end_date:
end_date = datetime.strptime(args.end_date, "%Y-%m-%d")
else:
# Default: process single day (end = start + 1 day, exclusive)
end_date = start_date + timedelta(days=1)
start_str = start_date.strftime("%Y-%m-%d")
end_str = end_date.strftime("%Y-%m-%d")
# Generate date chunks
date_chunks = []
current = start_date
while current < end_date:
chunk_end = min(current + timedelta(days=args.chunk_days), end_date)
date_chunks.append({
'start': current.strftime("%Y-%m-%d"),
'end': chunk_end.strftime("%Y-%m-%d")
})
current = chunk_end
print("=" * 60)
print("ADS-B Processing Pipeline")
print("=" * 60)
print(f"Date range: {start_str} to {end_str} (exclusive)")
print(f"Date chunks: {len(date_chunks)} ({args.chunk_days} days each)")
print(f"ICAO chunks: {args.chunks}")
print("=" * 60)
# Process each date chunk
for idx, date_chunk in enumerate(date_chunks, 1):
chunk_start = date_chunk['start']
chunk_end = date_chunk['end']
# Convert exclusive end date to inclusive for subcommands
# download_and_list_icaos and process_icao_chunk treat both dates as inclusive
chunk_end_inclusive = (datetime.strptime(chunk_end, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d")
print(f"\n{'=' * 60}")
print(f"Processing Date Chunk {idx}/{len(date_chunks)}: {chunk_start} to {chunk_end_inclusive} (inclusive)")
print('=' * 60)
# Step 1: Download and extract
print("\n" + "=" * 60)
print("Step 1: Download and Extract")
print("=" * 60)
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
"--start-date", chunk_start, "--end-date", chunk_end_inclusive]
run_cmd(cmd, "Download and extract")
# Step 2: Process chunks
print("\n" + "=" * 60)
print("Step 2: Process Chunks")
print("=" * 60)
for chunk_id in range(args.chunks):
print(f"\n--- ICAO Chunk {chunk_id + 1}/{args.chunks} ---")
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
"--chunk-id", str(chunk_id),
"--total-chunks", str(args.chunks),
"--start-date", chunk_start,
"--end-date", chunk_end_inclusive]
run_cmd(cmd, f"Process ICAO chunk {chunk_id}")
# Step 3: Combine all chunks to CSV
print("\n" + "=" * 60)
print("Step 3: Combine All Chunks to CSV")
print("=" * 60)
chunks_dir = "./data/output/adsb_chunks"
cmd = ["python", "-m", "src.adsb.combine_chunks_to_csv",
"--chunks-dir", chunks_dir,
"--start-date", start_str,
"--end-date", end_str,
"--stream"]
if args.skip_base:
cmd.append("--skip-base")
run_cmd(cmd, "Combine chunks")
print("\n" + "=" * 60)
print("Done!")
print("=" * 60)
# Show output
output_dir = "./data/openairframes"
# Calculate actual end date for filename (end_date - 1 day since it's exclusive)
actual_end = (end_date - timedelta(days=1)).strftime("%Y-%m-%d")
output_file = f"openairframes_adsb_{start_str}_{actual_end}.csv.gz"
output_path = os.path.join(output_dir, output_file)
if os.path.exists(output_path):
size_mb = os.path.getsize(output_path) / (1024 * 1024)
print(f"Output: {output_path}")
print(f"Size: {size_mb:.1f} MB")
if __name__ == "__main__":
main()
+15 -4
View File
@@ -246,6 +246,20 @@ def process_submission(
if schema_updated: if schema_updated:
schema_note = f"\n**Schema Updated:** Added new tags: `{', '.join(new_tags)}`\n" schema_note = f"\n**Schema Updated:** Added new tags: `{', '.join(new_tags)}`\n"
# Truncate JSON preview to stay under GitHub's 65536 char body limit
max_json_preview = 50000
if len(content_json) > max_json_preview:
# Show first few entries as a preview
preview_entries = submissions[:10]
preview_json = json.dumps(preview_entries, indent=2, sort_keys=True)
json_section = (
f"### Submissions (showing 10 of {len(submissions)})\n"
f"```json\n{preview_json}\n```\n\n"
f"*Full submission ({len(submissions)} entries, {len(content_json):,} chars) is in the committed file.*"
)
else:
json_section = f"### Submissions\n```json\n{content_json}\n```"
pr_body = f"""## Community Submission pr_body = f"""## Community Submission
Adds {len(submissions)} submission(s) from @{author_username}. Adds {len(submissions)} submission(s) from @{author_username}.
@@ -257,10 +271,7 @@ Closes #{issue_number}
--- ---
### Submissions {json_section}"""
```json
{content_json}
```"""
pr = create_pull_request( pr = create_pull_request(
title=f"Community submission: {filename}", title=f"Community submission: {filename}",
@@ -24,7 +24,7 @@ def read_all_submissions(community_dir: Path) -> list[dict]:
"""Read all JSON submissions from the community directory.""" """Read all JSON submissions from the community directory."""
all_submissions = [] all_submissions = []
for json_file in sorted(community_dir.glob("*.json")): for json_file in sorted(community_dir.glob("**/*.json")):
try: try:
with open(json_file) as f: with open(json_file) as f:
data = json.load(f) data = json.load(f)
+65 -3
View File
@@ -36,6 +36,52 @@ def get_latest_schema_version() -> int:
return max_version return max_version
def _is_balanced_json(text: str) -> bool:
"""
Check if JSON has balanced brackets/braces.
This is a simple check to ensure we captured complete JSON.
Ignores brackets/braces inside strings.
Args:
text: JSON text to check
Returns:
True if balanced, False otherwise
"""
in_string = False
escape = False
stack = []
pairs = {'[': ']', '{': '}'}
for char in text:
if escape:
escape = False
continue
if char == '\\':
escape = True
continue
if char == '"' and not escape:
in_string = not in_string
continue
if in_string:
continue
if char in pairs:
stack.append(char)
elif char in pairs.values():
if not stack:
return False
if pairs[stack[-1]] != char:
return False
stack.pop()
return len(stack) == 0 and not in_string
def get_schema_path(version: int | None = None) -> Path: def get_schema_path(version: int | None = None) -> Path:
""" """
Get path to a specific schema version, or latest if version is None. Get path to a specific schema version, or latest if version is None.
@@ -162,10 +208,14 @@ def extract_json_from_issue_body(body: str) -> str | None:
return match.group(1).strip() return match.group(1).strip()
# Try: Raw JSON after "### Submission JSON" until next section or end # Try: Raw JSON after "### Submission JSON" until next section or end
pattern_raw = r"### Submission JSON\s*\n\s*([\[{][\s\S]*?[\]}])(?=\n###|\n\n###|$)" # Use greedy matching since we have a clear boundary (next ### or end)
pattern_raw = r"### Submission JSON\s*\n\s*([\[{][\s\S]*[\]}])(?=\s*\n###|\s*$)"
match = re.search(pattern_raw, body) match = re.search(pattern_raw, body)
if match: if match:
return match.group(1).strip() candidate = match.group(1).strip()
# Validate it's complete JSON by checking balanced brackets
if _is_balanced_json(candidate):
return candidate
# Try: Any JSON object/array in the body (fallback) # Try: Any JSON object/array in the body (fallback)
pattern_any = r"([\[{][\s\S]*?[\]}])" pattern_any = r"([\[{][\s\S]*?[\]}])"
@@ -219,7 +269,19 @@ def parse_and_validate(json_str: str, schema: dict | None = None) -> tuple[list
try: try:
data = json.loads(json_str) data = json.loads(json_str)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
return None, [f"Invalid JSON: {e}"] # Provide detailed error context
error_msg = f"Invalid JSON: {e}"
# Show context around the error position
if hasattr(e, 'pos') and e.pos is not None:
start = max(0, e.pos - 50)
end = min(len(json_str), e.pos + 50)
context = json_str[start:end]
# Escape for readability
context_escaped = repr(context)
error_msg += f"\n\nContext around position {e.pos}: {context_escaped}"
return None, [error_msg]
errors = validate_submission(data, schema) errors = validate_submission(data, schema)
return data, errors return data, errors
+91 -29
View File
@@ -27,6 +27,33 @@ def _http_get_json(url: str, headers: dict[str, str]) -> dict:
return json.loads(data.decode("utf-8")) return json.loads(data.decode("utf-8"))
def get_releases(repo: str = REPO, github_token: Optional[str] = None, per_page: int = 30) -> list[dict]:
"""Get a list of releases from the repository."""
url = f"https://api.github.com/repos/{repo}/releases?per_page={per_page}"
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "openairframes-downloader/1.0",
}
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
return _http_get_json(url, headers=headers)
def get_release_assets_from_release_data(release_data: dict) -> list[ReleaseAsset]:
"""Extract assets from a release data dictionary."""
assets = []
for a in release_data.get("assets", []):
assets.append(
ReleaseAsset(
name=a["name"],
download_url=a["browser_download_url"],
size=int(a.get("size", 0)),
)
)
return assets
def get_latest_release_assets(repo: str = REPO, github_token: Optional[str] = None) -> list[ReleaseAsset]: def get_latest_release_assets(repo: str = REPO, github_token: Optional[str] = None) -> list[ReleaseAsset]:
url = f"https://api.github.com/repos/{repo}/releases/latest" url = f"https://api.github.com/repos/{repo}/releases/latest"
headers = { headers = {
@@ -37,16 +64,7 @@ def get_latest_release_assets(repo: str = REPO, github_token: Optional[str] = No
headers["Authorization"] = f"Bearer {github_token}" headers["Authorization"] = f"Bearer {github_token}"
payload = _http_get_json(url, headers=headers) payload = _http_get_json(url, headers=headers)
assets = [] return get_release_assets_from_release_data(payload)
for a in payload.get("assets", []):
assets.append(
ReleaseAsset(
name=a["name"],
download_url=a["browser_download_url"],
size=int(a.get("size", 0)),
)
)
return assets
def pick_asset( def pick_asset(
@@ -155,7 +173,8 @@ def download_latest_aircraft_adsb_csv(
repo: str = REPO, repo: str = REPO,
) -> Path: ) -> Path:
""" """
Download the latest openairframes_adsb_*.csv file from the latest GitHub release. Download the latest openairframes_adsb_*.csv file from GitHub releases.
If the latest release doesn't have the file, searches previous releases.
Args: Args:
output_dir: Directory to save the downloaded file (default: "downloads") output_dir: Directory to save the downloaded file (default: "downloads")
@@ -166,26 +185,69 @@ def download_latest_aircraft_adsb_csv(
Path to the downloaded file Path to the downloaded file
""" """
output_dir = Path(output_dir) output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token)
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
def get_latest_aircraft_adsb_csv_df():
csv_path = download_latest_aircraft_adsb_csv()
import pandas as pd
df = pd.read_csv(csv_path)
df = df.fillna("")
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
date_str = match.group(1) # Get multiple releases
return df, date_str releases = get_releases(repo, github_token=github_token, per_page=30)
# Try each release until we find one with the matching asset
for release in releases:
assets = get_release_assets_from_release_data(release)
try:
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
except FileNotFoundError:
# This release doesn't have the matching asset, try the next one
continue
raise FileNotFoundError(
f"No release in the last 30 releases has an asset matching 'openairframes_adsb_.*\\.csv(\\.gz)?$'"
)
import polars as pl
def get_latest_aircraft_adsb_csv_df():
"""Download and load the latest ADS-B CSV from GitHub releases.
Returns:
tuple: (df, start_date, end_date) where dates are in YYYY-MM-DD format
"""
import re
csv_path = download_latest_aircraft_adsb_csv()
df = pl.read_csv(csv_path, null_values=[""])
# Parse time column: values like "2025-12-31T00:00:00.040" or "2025-05-11T15:15:50.540+0000"
# Try with timezone first (convert to naive), then without timezone
df = df.with_columns(
pl.col("time").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%S%.f%z", strict=False)
.dt.replace_time_zone(None) # Convert to naive datetime first
.fill_null(pl.col("time").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%S%.f", strict=False))
)
# Cast dbFlags and year to strings to match the schema used in compress functions
for col in ['dbFlags', 'year']:
if col in df.columns:
df = df.with_columns(pl.col(col).cast(pl.Utf8))
# Fill nulls with empty strings for string columns
for col in df.columns:
if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null(""))
# Extract start and end dates from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv", str(csv_path))
if not match:
raise ValueError(f"Could not extract dates from filename: {csv_path.name}")
start_date = match.group(1)
end_date = match.group(2)
print(df.columns)
print(df.dtypes)
return df, start_date, end_date
if __name__ == "__main__": if __name__ == "__main__":
download_latest_aircraft_csv() download_latest_aircraft_csv()
download_latest_aircraft_adsb_csv()