Compare commits

..

67 Commits

Author SHA1 Message Date
ggman12 4703383648 add hisotircal-adsb-run.yaml 2026-02-17 17:55:49 -05:00
ggman12 53633bc09f use v6 for python 2026-02-17 17:39:54 -05:00
ggman12 f6897c1b72 updates to .github/workflows/historical-adsb.yaml 2026-02-17 17:36:47 -05:00
ggman12 19f3c5b63c Add main pipeline for processing ADS-B data with date argument 2026-02-17 17:27:44 -05:00
ggman12 b2f6a751fa Refactor concat_parquet_to_final.py to accept date as an argument and streamline file handling 2026-02-17 17:27:41 -05:00
ggman12 88b00c1cf6 add .snapshots to .gitignore 2026-02-17 17:27:28 -05:00
ggman12 b6bf915cec Filter rows by date in compress_parquet_part function 2026-02-17 16:59:09 -05:00
ggman12 6306aade16 Make global NUMBER_PARTS. remove print. 2026-02-17 16:42:59 -05:00
ggman12 9c54d9f1e4 remove print 2026-02-17 16:40:25 -05:00
ggman12 e8707ab853 get rid of unused code 2026-02-17 16:18:26 -05:00
ggman12 ca5cb23a4d use date folder 2026-02-17 16:16:46 -05:00
ggman12 121dccf26c concat file 2026-02-17 16:16:41 -05:00
ggman12 db98b3021a change to use path 2026-02-17 16:16:35 -05:00
ggman12 94cf50ac3a make return consistant 2026-02-17 16:09:09 -05:00
ggman12 ac177e8025 output .csv too 2026-02-17 16:09:01 -05:00
ggman12 6b7068bc84 works 2026-02-17 15:51:20 -05:00
ggman12 70ec797535 works 2026-02-17 15:46:07 -05:00
ggman12 1afe2bed4e remvoe code from src/adsb/process_icao_chunk.py 2026-02-17 15:42:45 -05:00
ggman12 d3c52266e5 fix tar corrruption 2026-02-17 15:42:35 -05:00
ggman12 c0dca14b83 remove unused code 2026-02-17 14:57:29 -05:00
ggman12 1fc4a94743 do only a single day instead of multiple 2026-02-17 14:23:30 -05:00
ggman12 f29abad52a output to parted tar.gz 2026-02-17 14:10:01 -05:00
ggman12 6eb84a894b add notebooks/whatever.ipynb to .gitignore 2026-02-17 12:48:59 -05:00
ggman12 0c81490513 make it single day 2026-02-17 12:48:28 -05:00
ggman12 11ed7e597d delete unused code 2026-02-17 12:48:01 -05:00
ggman12 24c0fc970c use exclusive end_date 2026-02-17 12:47:44 -05:00
ggman12 c12e855b5a change from 7 days to 1 2026-02-16 20:36:20 -05:00
ggman12 b55690638c feat: implement download and concatenate script for workflow artifacts 2026-02-16 20:34:22 -05:00
ggman12 dcee136f09 refactor: update historical-adsb script to use 15-day chunks and improve argument handling 2026-02-16 20:14:04 -05:00
ggman12 035748fc61 skip using base release in run_local.py 2026-02-16 18:26:53 -05:00
ggman12 13432068e6 src/adsb/run_local.py works 2026-02-16 17:45:31 -05:00
ggman12 9cb4c5045b remove compression from github action 2026-02-16 17:41:15 -05:00
ggman12 343a391a3f change default chunk_days from 7 to 3 2026-02-16 16:35:05 -05:00
ggman12 2bc45ff6a4 increase retry_delay to 5 minutes. 2026-02-16 15:35:02 -05:00
ggman12 03291d93a8 add scripts/run_historical_adsb_action.py 2026-02-16 14:54:25 -05:00
ggman12 5883b459ac fix bug with no dupliacte icaos across days 2026-02-15 21:08:17 -05:00
ggman12 f8ba66375b preserve time 2026-02-15 21:08:03 -05:00
ggman12 7a62faecef sort by time in end 2026-02-15 20:33:06 -05:00
ggman12 9964ce576b slight update for compress by day 2026-02-15 20:32:33 -05:00
ggman12 be33fd2eaf compress by day 2026-02-15 19:59:50 -05:00
ggman12 2b2095700f use chunks in run_local 2026-02-15 19:53:09 -05:00
ggman12 a8b2b66952 fix .csv to .csv.gz transition 2026-02-15 19:08:51 -05:00
ggman12 3f38263a0c stop depue that destroys previous days 2026-02-15 17:55:16 -05:00
ggman12 1a553d5f44 use date of file instead of min timestamp 2026-02-15 16:44:09 -05:00
ggman12 405855c566 deal with whole schema 2026-02-15 16:43:00 -05:00
ggman12 4e81dde201 fix date parsing 2026-02-15 14:55:32 -05:00
ggman12 fde8ef029c update csv writing to handle empty data. Save space with higher gzip compression 2026-02-15 14:14:54 -05:00
ggman12 18ab51bd60 update naming 2026-02-15 13:45:03 -05:00
ggman12 83b9d2a76d write gzip 2026-02-15 13:41:09 -05:00
ggman12 8874619ab0 write gzip 2026-02-15 13:41:02 -05:00
ggman12 823f291728 fix errors in daily release due to new .gz file 2026-02-15 13:21:51 -05:00
ggman12 982011b36f end of year check 2026-02-14 22:42:32 -05:00
ggman12 1b15e43669 use .csv.gz 2026-02-14 22:22:14 -05:00
ggman12 f17adc4574 remvoe aws worker, reducer 2026-02-14 22:21:14 -05:00
ggman12 6a250a63fb fix None value comparision 2026-02-14 20:21:32 -05:00
ggman12 9e24fcbc63 update integrity checker. Hopefully solve issue. 2026-02-14 19:56:25 -05:00
ggman12 8ce04f1f83 Revert "update for historical run"
This reverts commit ccf55b2308.
2026-02-14 18:44:21 -05:00
ggman12 9441761ac9 use temp release too. 2026-02-14 18:43:25 -05:00
ggman12 ccf55b2308 update for historical run 2026-02-14 15:57:16 -05:00
ggman12 76eaf118ef add run_local.py 2026-02-14 15:54:36 -05:00
ggman12 0fcbad0fbc let mictronics retry 2026-02-14 15:07:08 -05:00
ggman12 0c7484e7bf create_daily_microtonics release 2026-02-13 22:19:02 -05:00
ggman12 8c60ac611d create daily adsbexchange database snapshot release 2026-02-13 22:19:02 -05:00
ggman12 145f1006be update template 2026-02-13 12:12:24 -05:00
ggman12 f5465f0552 update .github/workflows/update-community-prs.yaml 2026-02-13 12:00:10 -05:00
ggman12 17098ae39a fix update-community-prs.yaml 2026-02-13 11:52:53 -05:00
ggman12 6f6b65780a update community_submission.yaml. update Readme.md 2026-02-13 11:49:18 -05:00
24 changed files with 539 additions and 1160 deletions
@@ -1,4 +1,4 @@
name: adsb-to-aircraft-multiple-day-run name: Historical ADS-B Run
on: on:
workflow_dispatch: workflow_dispatch:
@@ -26,11 +26,10 @@ jobs:
run: | run: |
python - <<'PY' python - <<'PY'
import json import json
import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
start = datetime.strptime(os.environ["START_DATE"], "%Y-%m-%d") start = datetime.strptime("${START_DATE}", "%Y-%m-%d")
end = datetime.strptime(os.environ["END_DATE"], "%Y-%m-%d") end = datetime.strptime("${END_DATE}", "%Y-%m-%d")
if end <= start: if end <= start:
raise SystemExit("end_date must be after start_date") raise SystemExit("end_date must be after start_date")
@@ -40,7 +39,7 @@ jobs:
dates.append(cur.strftime("%Y-%m-%d")) dates.append(cur.strftime("%Y-%m-%d"))
cur += timedelta(days=1) cur += timedelta(days=1)
with open(os.environ["GITHUB_OUTPUT"], "a") as f: with open("$GITHUB_OUTPUT", "a") as f:
f.write(f"dates={json.dumps(dates)}\n") f.write(f"dates={json.dumps(dates)}\n")
PY PY
@@ -50,7 +49,7 @@ jobs:
fail-fast: true fail-fast: true
matrix: matrix:
date: ${{ fromJson(needs.generate-dates.outputs.dates) }} date: ${{ fromJson(needs.generate-dates.outputs.dates) }}
uses: ./.github/workflows/adsb-to-aircraft-for-day.yaml uses: ./.github/workflows/historical-adsb.yaml
with: with:
date: ${{ matrix.date }} date: ${{ matrix.date }}
@@ -84,15 +83,14 @@ jobs:
END_DATE: ${{ inputs.end_date }} END_DATE: ${{ inputs.end_date }}
run: | run: |
python - <<'PY' python - <<'PY'
import os
import re import re
from pathlib import Path from pathlib import Path
import polars as pl import polars as pl
start = os.environ["START_DATE"] start = "${START_DATE}"
end = os.environ["END_DATE"] end = "${END_DATE}"
daily_dir = Path("outputs/daily") daily_dir = Path("outputs/daily")
files = sorted(daily_dir.glob("openairframes_adsb_*.csv.gz")) files = sorted(daily_dir.glob("openairframes_adsb_*.csv"))
if not files: if not files:
raise SystemExit("No daily CSVs found") raise SystemExit("No daily CSVs found")
@@ -104,8 +102,8 @@ jobs:
frames = [pl.read_csv(p) for p in files] frames = [pl.read_csv(p) for p in files]
df = pl.concat(frames, how="vertical", rechunk=True) df = pl.concat(frames, how="vertical", rechunk=True)
output_path = Path("outputs") / f"openairframes_adsb_{start}_{end}.csv.gz" output_path = Path("outputs") / f"openairframes_adsb_{start}_{end}.csv"
df.write_csv(output_path, compression="gzip") df.write_csv(output_path)
print(f"Wrote {output_path} with {df.height} rows") print(f"Wrote {output_path} with {df.height} rows")
PY PY
@@ -113,6 +111,5 @@ jobs:
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: openairframes_adsb-${{ inputs.start_date }}-${{ inputs.end_date }} name: openairframes_adsb-${{ inputs.start_date }}-${{ inputs.end_date }}
path: outputs/openairframes_adsb_${{ inputs.start_date }}_${{ inputs.end_date }}.csv.gz path: outputs/openairframes_adsb_${{ inputs.start_date }}_${{ inputs.end_date }}.csv
retention-days: 30 retention-days: 30
# gh workflow run adsb-to-aircraft-multiple-day-run.yaml --repo ggman12/OpenAirframes --ref jonah/fix-historical-proper -f start_date=2025-12-31 -f end_date=2026-01-02
@@ -7,22 +7,12 @@ on:
description: 'YYYY-MM-DD' description: 'YYYY-MM-DD'
required: true required: true
type: string type: string
concat_with_latest_csv:
description: 'Also concatenate with latest CSV from GitHub releases'
required: false
type: boolean
default: false
workflow_call: workflow_call:
inputs: inputs:
date: date:
description: 'YYYY-MM-DD' description: 'YYYY-MM-DD'
required: true required: true
type: string type: string
concat_with_latest_csv:
description: 'Also concatenate with latest CSV from GitHub releases'
required: false
type: boolean
default: false
jobs: jobs:
adsb-extract: adsb-extract:
@@ -49,38 +39,11 @@ jobs:
python -m src.adsb.download_and_list_icaos --date "$DATE" python -m src.adsb.download_and_list_icaos --date "$DATE"
ls -lah data/output/adsb_archives/"$DATE" || true ls -lah data/output/adsb_archives/"$DATE" || true
- name: Upload archive part 0 - name: Upload archives
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: adsb-archive-${{ inputs.date }}-part-0 name: adsb-archives-${{ inputs.date }}
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_0.tar.gz path: data/output/adsb_archives/${{ inputs.date }}
retention-days: 1
compression-level: 0
if-no-files-found: error
- name: Upload archive part 1
uses: actions/upload-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-1
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_1.tar.gz
retention-days: 1
compression-level: 0
if-no-files-found: error
- name: Upload archive part 2
uses: actions/upload-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-2
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_2.tar.gz
retention-days: 1
compression-level: 0
if-no-files-found: error
- name: Upload archive part 3
uses: actions/upload-artifact@v4
with:
name: adsb-archive-${{ inputs.date }}-part-3
path: data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_3.tar.gz
retention-days: 1 retention-days: 1
compression-level: 0 compression-level: 0
if-no-files-found: error if-no-files-found: error
@@ -91,7 +54,7 @@ jobs:
strategy: strategy:
fail-fast: true fail-fast: true
matrix: matrix:
part_id: [0, 1, 2, 3] part_id: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v6 uses: actions/checkout@v6
@@ -106,22 +69,12 @@ jobs:
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install -r requirements.txt pip install -r requirements.txt
- name: Download archive part - name: Download archives
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
with: with:
name: adsb-archive-${{ inputs.date }}-part-${{ matrix.part_id }} name: adsb-archives-${{ inputs.date }}
path: data/output/adsb_archives/${{ inputs.date }} path: data/output/adsb_archives/${{ inputs.date }}
- name: Verify archive
run: |
FILE="data/output/adsb_archives/${{ inputs.date }}/${{ inputs.date }}_part_${{ matrix.part_id }}.tar.gz"
ls -lah data/output/adsb_archives/${{ inputs.date }}/
if [ ! -f "$FILE" ]; then
echo "::error::Archive not found: $FILE"
exit 1
fi
echo "Verified: $(du -h "$FILE")"
- name: Process part - name: Process part
env: env:
DATE: ${{ inputs.date }} DATE: ${{ inputs.date }}
@@ -158,25 +111,19 @@ jobs:
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
with: with:
pattern: adsb-compressed-${{ inputs.date }}-part-* pattern: adsb-compressed-${{ inputs.date }}-part-*
path: data/output/compressed/${{ inputs.date }} path: outputs/compressed/${{ inputs.date }}
merge-multiple: true merge-multiple: true
- name: Concatenate final outputs - name: Concatenate final outputs
env: env:
DATE: ${{ inputs.date }} DATE: ${{ inputs.date }}
CONCAT_WITH_LATEST_CSV: ${{ inputs.concat_with_latest_csv }}
run: | run: |
EXTRA="" python src/adsb/concat_parquet_to_final.py --date "$DATE"
if [ "$CONCAT_WITH_LATEST_CSV" = "true" ]; then ls -lah outputs/ || true
EXTRA="--concat_with_latest_csv"
fi
python -m src.adsb.concat_parquet_to_final --date "$DATE" $EXTRA
ls -lah data/output/ || true
- name: Upload final artifacts - name: Upload final artifacts
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: openairframes_adsb-${{ inputs.date }} name: openairframes_adsb-${{ inputs.date }}
path: data/output/openairframes_adsb_* path: outputs/openairframes_adsb_${{ inputs.date }}_${{ inputs.date }}.*
retention-days: 30 retention-days: 30
if-no-files-found: error
@@ -1,4 +1,4 @@
name: openairframes-daily-release name: OpenAirframes Daily Release
on: on:
schedule: schedule:
@@ -76,75 +76,159 @@ jobs:
data/faa_releasable/ReleasableAircraft_*.zip data/faa_releasable/ReleasableAircraft_*.zip
retention-days: 1 retention-days: 1
resolve-dates: adsb-extract:
runs-on: ubuntu-latest runs-on: ubuntu-24.04-arm
if: github.event_name != 'schedule' if: github.event_name != 'schedule'
outputs: outputs:
date: ${{ steps.out.outputs.date }} manifest-exists: ${{ steps.check.outputs.exists }}
adsb_date: ${{ steps.out.outputs.adsb_date }}
steps:
- id: out
run: |
if [ -n "${{ inputs.date }}" ]; then
echo "date=${{ inputs.date }}" >> "$GITHUB_OUTPUT"
echo "adsb_date=${{ inputs.date }}" >> "$GITHUB_OUTPUT"
else
echo "date=$(date -u -d 'yesterday' +%Y-%m-%d)" >> "$GITHUB_OUTPUT"
echo "adsb_date=$(date -u -d 'yesterday' +%Y-%m-%d)" >> "$GITHUB_OUTPUT"
fi
adsb-to-aircraft:
needs: resolve-dates
if: github.event_name != 'schedule'
uses: ./.github/workflows/adsb-to-aircraft-for-day.yaml
with:
date: ${{ needs.resolve-dates.outputs.adsb_date }}
concat_with_latest_csv: true
adsb-reduce:
needs: [resolve-dates, adsb-to-aircraft]
if: always() && github.event_name != 'schedule' && needs.adsb-to-aircraft.result == 'failure'
runs-on: ubuntu-24.04-arm
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v6 uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v6 uses: actions/setup-python@v6
with: with:
python-version: '3.12' python-version: "3.14"
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install -r requirements.txt pip install -r requirements.txt
- name: Download compressed outputs - name: Download and extract ADS-B data
uses: actions/download-artifact@v4
with:
pattern: adsb-compressed-${{ needs.resolve-dates.outputs.adsb_date }}-part-*
path: data/output/compressed/${{ needs.resolve-dates.outputs.adsb_date }}
merge-multiple: true
- name: Concatenate final outputs
env: env:
DATE: ${{ needs.resolve-dates.outputs.adsb_date }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CONCAT_WITH_LATEST_CSV: true
run: | run: |
EXTRA="" python -m src.adsb.download_and_list_icaos ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
if [ "$CONCAT_WITH_LATEST_CSV" = "true" ]; then ls -lah data/output/
EXTRA="--concat_with_latest_csv"
fi
python -m src.adsb.concat_parquet_to_final --date "$DATE" $EXTRA
ls -lah data/output/ || true
- name: Upload final artifacts - name: Check manifest exists
id: check
run: |
if ls data/output/icao_manifest_*.txt 1>/dev/null 2>&1; then
echo "exists=true" >> "$GITHUB_OUTPUT"
else
echo "exists=false" >> "$GITHUB_OUTPUT"
fi
- name: Create tar of extracted data
run: |
cd data/output
tar -cf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt
ls -lah extracted_data.tar
- name: Upload extracted data
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: openairframes_adsb-${{ needs.resolve-dates.outputs.adsb_date }} name: adsb-extracted
path: data/output/openairframes_adsb_* path: data/output/extracted_data.tar
retention-days: 30 retention-days: 1
if-no-files-found: error compression-level: 0 # Already compressed trace files
adsb-map:
runs-on: ubuntu-24.04-arm
needs: adsb-extract
if: github.event_name != 'schedule' && needs.adsb-extract.outputs.manifest-exists == 'true'
strategy:
fail-fast: false
matrix:
chunk: [0, 1, 2, 3]
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download extracted data
uses: actions/download-artifact@v4
with:
name: adsb-extracted
path: data/output/
- name: Extract tar
run: |
cd data/output
tar -xf extracted_data.tar
rm extracted_data.tar
echo "=== Contents of data/output ==="
ls -lah
echo "=== Looking for manifest ==="
cat icao_manifest_*.txt | head -20 || echo "No manifest found"
echo "=== Looking for extracted dirs ==="
ls -d *-planes-readsb-prod-0* 2>/dev/null || echo "No extracted dirs"
- name: Process chunk ${{ matrix.chunk }}
run: |
python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4 ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
mkdir -p data/output/adsb_chunks
ls -lah data/output/adsb_chunks/ || echo "No chunks created"
- name: Upload chunk artifacts
uses: actions/upload-artifact@v4
with:
name: adsb-chunk-${{ matrix.chunk }}
path: data/output/adsb_chunks/
retention-days: 1
adsb-reduce:
runs-on: ubuntu-24.04-arm
needs: adsb-map
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download all chunk artifacts
uses: actions/download-artifact@v4
with:
pattern: adsb-chunk-*
path: data/output/adsb_chunks/
merge-multiple: true
- name: Debug downloaded files
run: |
echo "=== Listing data/ ==="
find data/ -type f 2>/dev/null | head -50 || echo "No files in data/"
echo "=== Looking for parquet files ==="
find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found"
- name: Combine chunks to CSV
run: |
mkdir -p data/output/adsb_chunks
ls -lah data/output/adsb_chunks/ || echo "Directory empty or does not exist"
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/openairframes/
- name: Upload ADS-B artifacts
uses: actions/upload-artifact@v4
with:
name: adsb-release
path: data/openairframes/openairframes_adsb_*.csv.gz
retention-days: 1
build-community: build-community:
runs-on: ubuntu-latest runs-on: ubuntu-latest
@@ -233,14 +317,9 @@ jobs:
create-release: create-release:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: [resolve-dates, build-faa, adsb-to-aircraft, adsb-reduce, build-community, build-adsbexchange-json, build-mictronics-db] needs: [build-faa, adsb-reduce, build-community, build-adsbexchange-json, build-mictronics-db]
if: github.event_name != 'schedule' && !cancelled() if: github.event_name != 'schedule' && !failure() && !cancelled()
steps: steps:
- name: Check ADS-B workflow status
if: needs.adsb-to-aircraft.result != 'success' && needs.adsb-reduce.result != 'success'
run: |
echo "WARNING: ADS-B workflow failed (adsb-to-aircraft='${{ needs.adsb-to-aircraft.result }}', adsb-reduce='${{ needs.adsb-reduce.result }}'), will continue without ADS-B artifacts"
- name: Checkout for gh CLI - name: Checkout for gh CLI
uses: actions/checkout@v4 uses: actions/checkout@v4
with: with:
@@ -249,33 +328,31 @@ jobs:
sparse-checkout-cone-mode: false sparse-checkout-cone-mode: false
- name: Download FAA artifacts - name: Download FAA artifacts
uses: actions/download-artifact@v5 uses: actions/download-artifact@v4
with: with:
name: faa-release name: faa-release
path: artifacts/faa path: artifacts/faa
- name: Download ADS-B artifacts - name: Download ADS-B artifacts
uses: actions/download-artifact@v5 uses: actions/download-artifact@v4
if: needs.adsb-to-aircraft.result == 'success' || needs.adsb-reduce.result == 'success'
continue-on-error: true
with: with:
name: openairframes_adsb-${{ needs.resolve-dates.outputs.adsb_date }} name: adsb-release
path: artifacts/adsb path: artifacts/adsb
- name: Download Community artifacts - name: Download Community artifacts
uses: actions/download-artifact@v5 uses: actions/download-artifact@v4
with: with:
name: community-release name: community-release
path: artifacts/community path: artifacts/community
- name: Download ADS-B Exchange JSON artifact - name: Download ADS-B Exchange JSON artifact
uses: actions/download-artifact@v5 uses: actions/download-artifact@v4
with: with:
name: adsbexchange-json name: adsbexchange-json
path: artifacts/adsbexchange path: artifacts/adsbexchange
- name: Download Mictronics DB artifact - name: Download Mictronics DB artifact
uses: actions/download-artifact@v5 uses: actions/download-artifact@v4
continue-on-error: true continue-on-error: true
with: with:
name: mictronics-db name: mictronics-db
@@ -311,11 +388,7 @@ jobs:
# Find files from artifacts using find (handles nested structures) # Find files from artifacts using find (handles nested structures)
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1) CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
# Prefer concatenated file (with date range) over single-day file CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1)
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*_*.csv.gz" -type f 2>/dev/null | head -1)
if [ -z "$CSV_FILE_ADSB" ]; then
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1)
fi
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1) CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1) ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1) JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1)
@@ -326,6 +399,9 @@ jobs:
if [ -z "$CSV_FILE_FAA" ] || [ ! -f "$CSV_FILE_FAA" ]; then if [ -z "$CSV_FILE_FAA" ] || [ ! -f "$CSV_FILE_FAA" ]; then
MISSING_FILES="$MISSING_FILES FAA_CSV" MISSING_FILES="$MISSING_FILES FAA_CSV"
fi fi
if [ -z "$CSV_FILE_ADSB" ] || [ ! -f "$CSV_FILE_ADSB" ]; then
MISSING_FILES="$MISSING_FILES ADSB_CSV"
fi
if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then
MISSING_FILES="$MISSING_FILES FAA_ZIP" MISSING_FILES="$MISSING_FILES FAA_ZIP"
fi fi
@@ -335,11 +411,6 @@ jobs:
# Optional files - warn but don't fail # Optional files - warn but don't fail
OPTIONAL_MISSING="" OPTIONAL_MISSING=""
if [ -z "$CSV_FILE_ADSB" ] || [ ! -f "$CSV_FILE_ADSB" ]; then
OPTIONAL_MISSING="$OPTIONAL_MISSING ADSB_CSV"
CSV_FILE_ADSB=""
CSV_BASENAME_ADSB=""
fi
if [ -z "$ZIP_FILE_MICTRONICS" ] || [ ! -f "$ZIP_FILE_MICTRONICS" ]; then if [ -z "$ZIP_FILE_MICTRONICS" ] || [ ! -f "$ZIP_FILE_MICTRONICS" ]; then
OPTIONAL_MISSING="$OPTIONAL_MISSING MICTRONICS_ZIP" OPTIONAL_MISSING="$OPTIONAL_MISSING MICTRONICS_ZIP"
ZIP_FILE_MICTRONICS="" ZIP_FILE_MICTRONICS=""
@@ -357,9 +428,7 @@ jobs:
# Get basenames for display # Get basenames for display
CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA") CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA")
if [ -n "$CSV_FILE_ADSB" ]; then CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
fi
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "") CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
ZIP_BASENAME=$(basename "$ZIP_FILE") ZIP_BASENAME=$(basename "$ZIP_FILE")
JSON_BASENAME_ADSBX=$(basename "$JSON_FILE_ADSBX") JSON_BASENAME_ADSBX=$(basename "$JSON_FILE_ADSBX")
@@ -414,7 +483,7 @@ jobs:
Assets: Assets:
- ${{ steps.meta.outputs.csv_basename_faa }} - ${{ steps.meta.outputs.csv_basename_faa }}
${{ steps.meta.outputs.csv_basename_adsb && format('- {0}', steps.meta.outputs.csv_basename_adsb) || '' }} - ${{ steps.meta.outputs.csv_basename_adsb }}
- ${{ steps.meta.outputs.csv_basename_community }} - ${{ steps.meta.outputs.csv_basename_community }}
- ${{ steps.meta.outputs.zip_basename }} - ${{ steps.meta.outputs.zip_basename }}
- ${{ steps.meta.outputs.json_basename_adsbx }} - ${{ steps.meta.outputs.json_basename_adsbx }}
+2 -10
View File
@@ -16,19 +16,11 @@ A daily release is created at **06:00 UTC** and includes:
- **openairframes_community.csv** - **openairframes_community.csv**
All community submissions All community submissions
- **openairframes_adsb.csv**
Airframes dataset derived from ADSB.lol network data. For each UTC day, a row is created for every icao observed in that days ADS-B messages, using registration data from [tar1090-db](https://github.com/wiedehopf/tar1090-db) (ADSBExchange & Mictronics).
Example Usage:
```python
import pandas as pd
url = "https://github.com/PlaneQuery/OpenAirframes/releases/download/openairframes-2026-03-18-main/openairframes_adsb_2024-01-01_2026-03-17.csv.gz" # 1GB
df = pd.read_csv(url)
df
```
![](docs/images/df_adsb_example_0.png)
- **openairframes_faa.csv** - **openairframes_faa.csv**
All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB) All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB)
- **openairframes_adsb.csv**
Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present (will be from 2024-01-01 soon). The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB).
- **ReleasableAircraft_{date}.zip** - **ReleasableAircraft_{date}.zip**
A daily snapshot of the FAA database, which updates at **05:30 UTC** A daily snapshot of the FAA database, which updates at **05:30 UTC**
@@ -1,40 +0,0 @@
[
{
"contributor_name": "JohnSmith.com",
"contributor_uuid": "2981c3ee-8712-5f96-84bf-732eda515a3f",
"creation_timestamp": "2026-02-18T22:18:11.349009+00:00",
"registration_number": "ZM146",
"tags": {
"citation_0": "https://assets.publishing.service.gov.uk/media/5c07a65f40f0b6705f11cf37/10389.pdf",
"icao_aircraft_type": "L1J",
"manufacturer_icao": "LOCKHEED MARTIN",
"manufacturer_name": "Lockheed-martin",
"model": "F-35B Lightning II",
"operator": "Royal Air Force",
"operator_callsign": "RAFAIR",
"operator_icao": "RFR",
"serial_number": "BK-12",
"type_code": "VF35"
},
"transponder_code_hex": "43C81C"
},
{
"contributor_name": "JohnSmith.com",
"contributor_uuid": "2981c3ee-8712-5f96-84bf-732eda515a3f",
"creation_timestamp": "2026-02-18T22:18:11.349009+00:00",
"registration_number": "ZM148",
"tags": {
"citation_0": "https://assets.publishing.service.gov.uk/media/5c07a65f40f0b6705f11cf37/10389.pdf",
"icao_aircraft_type": "L1J",
"manufacturer_icao": "LOCKHEED MARTIN",
"manufacturer_name": "Lockheed-martin",
"model": "F-35B Lightning II",
"operator": "Royal Air Force",
"operator_callsign": "RAFAIR",
"operator_icao": "RFR",
"serial_number": "BK-14",
"type_code": "VF35"
},
"transponder_code_hex": "43C811"
}
]
Binary file not shown.

Before

Width:  |  Height:  |  Size: 99 KiB

+1 -32
View File
@@ -54,38 +54,7 @@
"additionalProperties": { "additionalProperties": {
"$ref": "#/$defs/tagValue" "$ref": "#/$defs/tagValue"
}, },
"properties": { "properties": {}
"citation_0": {
"type": "string"
},
"icao_aircraft_type": {
"type": "string"
},
"manufacturer_icao": {
"type": "string"
},
"manufacturer_name": {
"type": "string"
},
"model": {
"type": "string"
},
"operator": {
"type": "string"
},
"operator_callsign": {
"type": "string"
},
"operator_icao": {
"type": "string"
},
"serial_number": {
"type": "string"
},
"type_code": {
"type": "string"
}
}
} }
}, },
"allOf": [ "allOf": [
-49
View File
@@ -1,49 +0,0 @@
#!/usr/bin/env python3
import re
from pathlib import Path
import polars as pl
# Find all CSV.gz files in the downloaded artifacts
artifacts_dir = Path("downloads/adsb_artifacts")
files = sorted(artifacts_dir.glob("*/openairframes_adsb_*.csv.gz"))
if not files:
raise SystemExit("No CSV.gz files found in downloads/adsb_artifacts/")
print(f"Found {len(files)} files to concatenate")
# Extract dates from filenames to determine range
def extract_dates(path: Path) -> tuple[str, str]:
"""Extract start and end dates from filename"""
m = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv\.gz", path.name)
if m:
return m.group(1), m.group(2)
return None, None
# Collect all dates
all_dates = []
for f in files:
start, end = extract_dates(f)
if start and end:
all_dates.extend([start, end])
print(f" {f.name}: {start} to {end}")
if not all_dates:
raise SystemExit("Could not extract dates from filenames")
# Find earliest and latest dates
earliest = min(all_dates)
latest = max(all_dates)
print(f"\nDate range: {earliest} to {latest}")
# Read and concatenate all files
print("\nReading and concatenating files...")
frames = [pl.read_csv(f) for f in files]
df = pl.concat(frames, how="vertical", rechunk=True)
# Write output
output_path = Path("downloads") / f"openairframes_adsb_{earliest}_{latest}.csv.gz"
output_path.parent.mkdir(parents=True, exist_ok=True)
df.write_csv(output_path, compression="gzip")
print(f"\nWrote {output_path} with {df.height:,} rows")
-40
View File
@@ -1,40 +0,0 @@
#!/bin/bash
# Create download directory
mkdir -p downloads/adsb_artifacts
# Repository from the workflow comment
REPO="ggman12/OpenAirframes"
# Get last 15 runs of the workflow and download matching artifacts
gh run list \
--repo "$REPO" \
--workflow adsb-to-aircraft-multiple-day-run.yaml \
--limit 15 \
--json databaseId \
--jq '.[].databaseId' | while read -r run_id; do
echo "Checking run ID: $run_id"
# List artifacts for this run using the API
# Match pattern: openairframes_adsb-YYYY-MM-DD-YYYY-MM-DD (with second date)
gh api \
--paginate \
"repos/$REPO/actions/runs/$run_id/artifacts" \
--jq '.artifacts[] | select(.name | test("^openairframes_adsb-[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{4}-[0-9]{2}-[0-9]{2}$")) | .name' | while read -r artifact_name; do
# Check if artifact directory already exists and has files
if [ -d "downloads/adsb_artifacts/$artifact_name" ] && [ -n "$(ls -A "downloads/adsb_artifacts/$artifact_name" 2>/dev/null)" ]; then
echo " Skipping (already exists): $artifact_name"
continue
fi
echo " Downloading: $artifact_name"
gh run download "$run_id" \
--repo "$REPO" \
--name "$artifact_name" \
--dir "downloads/adsb_artifacts/$artifact_name"
done
done
echo "Download complete! Files saved to downloads/adsb_artifacts/"
+34 -40
View File
@@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Script to trigger adsb-to-aircraft-multiple-day-run workflow runs in monthly chunks. Script to trigger historical-adsb workflow runs in 15-day chunks.
Usage: Usage:
python scripts/run_historical_adsb_action.py --start-date 2025-01-01 --end-date 2025-06-01 python scripts/run_historical_adsb_action.py --start-date 2025-01-01 --end-date 2025-06-01
@@ -10,14 +10,10 @@ import argparse
import subprocess import subprocess
import sys import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
from calendar import monthrange
def generate_monthly_chunks(start_date_str, end_date_str): def generate_date_chunks(start_date_str, end_date_str, chunk_days=15):
"""Generate date ranges in monthly chunks from start to end date. """Generate date ranges in fixed-day chunks from start to end date."""
End dates are exclusive (e.g., to process Jan 1-31, end_date should be Feb 1).
"""
start_date = datetime.strptime(start_date_str, '%Y-%m-%d') start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
end_date = datetime.strptime(end_date_str, '%Y-%m-%d') end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
@@ -25,35 +21,31 @@ def generate_monthly_chunks(start_date_str, end_date_str):
current = start_date current = start_date
while current < end_date: while current < end_date:
# Get the first day of the next month (exclusive end) # Calculate end of current chunk
_, days_in_month = monthrange(current.year, current.month) chunk_end = current + timedelta(days=chunk_days)
month_end = current.replace(day=days_in_month)
next_month_start = month_end + timedelta(days=1)
# Don't go past the global end date # Don't go past the global end date
chunk_end = min(next_month_start, end_date) if chunk_end > end_date:
chunk_end = end_date
chunks.append({ chunks.append({
'start': current.strftime('%Y-%m-%d'), 'start': current.strftime('%Y-%m-%d'),
'end': chunk_end.strftime('%Y-%m-%d') 'end': chunk_end.strftime('%Y-%m-%d')
}) })
# Move to first day of next month current = chunk_end
if next_month_start >= end_date:
break
current = next_month_start
return chunks return chunks
def trigger_workflow(start_date, end_date, repo='ggman12/OpenAirframes', branch='main', dry_run=False): def trigger_workflow(start_date, end_date, chunk_days=1, branch='main', dry_run=False):
"""Trigger the adsb-to-aircraft-multiple-day-run workflow via GitHub CLI.""" """Trigger the historical-adsb workflow via GitHub CLI."""
cmd = [ cmd = [
'gh', 'workflow', 'run', 'adsb-to-aircraft-multiple-day-run.yaml', 'gh', 'workflow', 'run', 'historical-adsb.yaml',
'--repo', repo,
'--ref', branch, '--ref', branch,
'-f', f'start_date={start_date}', '-f', f'start_date={start_date}',
'-f', f'end_date={end_date}' '-f', f'end_date={end_date}',
'-f', f'chunk_days={chunk_days}'
] ]
if dry_run: if dry_run:
@@ -74,8 +66,7 @@ def trigger_workflow(start_date, end_date, repo='ggman12/OpenAirframes', branch=
# Get the most recent run (should be the one we just triggered) # Get the most recent run (should be the one we just triggered)
list_cmd = [ list_cmd = [
'gh', 'run', 'list', 'gh', 'run', 'list',
'--repo', repo, '--workflow', 'historical-adsb.yaml',
'--workflow', 'adsb-to-aircraft-multiple-day-run.yaml',
'--branch', branch, '--branch', branch,
'--limit', '1', '--limit', '1',
'--json', 'databaseId', '--json', 'databaseId',
@@ -93,25 +84,29 @@ def trigger_workflow(start_date, end_date, repo='ggman12/OpenAirframes', branch=
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Trigger adsb-to-aircraft-multiple-day-run workflow runs in monthly chunks' description='Trigger historical-adsb workflow runs in monthly chunks'
) )
parser.add_argument( parser.add_argument(
'--start-date', '--start_date', '--start-date',
dest='start_date',
required=True, required=True,
help='Start date in YYYY-MM-DD format (inclusive)' help='Start date in YYYY-MM-DD format (inclusive)'
) )
parser.add_argument( parser.add_argument(
'--end-date', '--end_date', '--end-date',
dest='end_date',
required=True, required=True,
help='End date in YYYY-MM-DD format (exclusive)' help='End date in YYYY-MM-DD format (exclusive)'
) )
parser.add_argument( parser.add_argument(
'--repo', '--chunk-days',
type=str, type=int,
default='ggman12/OpenAirframes', default=1,
help='GitHub repository (default: ggman12/OpenAirframes)' help='Days per job chunk within each workflow run (default: 1)'
)
parser.add_argument(
'--workflow-chunk-days',
type=int,
default=15,
help='Days per workflow run (default: 15)'
) )
parser.add_argument( parser.add_argument(
'--branch', '--branch',
@@ -137,17 +132,17 @@ def main():
try: try:
start = datetime.strptime(args.start_date, '%Y-%m-%d') start = datetime.strptime(args.start_date, '%Y-%m-%d')
end = datetime.strptime(args.end_date, '%Y-%m-%d') end = datetime.strptime(args.end_date, '%Y-%m-%d')
if start > end: if start >= end:
print("Error: start_date must be before or equal to end_date") print("Error: start_date must be before end_date")
sys.exit(1) sys.exit(1)
except ValueError as e: except ValueError as e:
print(f"Error: Invalid date format - {e}") print(f"Error: Invalid date format - {e}")
sys.exit(1) sys.exit(1)
# Generate monthly chunks # Generate date chunks
chunks = generate_monthly_chunks(args.start_date, args.end_date) chunks = generate_date_chunks(args.start_date, args.end_date, chunk_days=args.workflow_chunk_days)
print(f"\nGenerating {len(chunks)} monthly workflow runs on branch '{args.branch}' (repo: {args.repo}):") print(f"\nGenerating {len(chunks)} workflow runs ({args.workflow_chunk_days} days each) on branch '{args.branch}':")
for i, chunk in enumerate(chunks, 1): for i, chunk in enumerate(chunks, 1):
print(f" {i}. {chunk['start']} to {chunk['end']}") print(f" {i}. {chunk['start']} to {chunk['end']}")
@@ -170,7 +165,7 @@ def main():
success, run_id = trigger_workflow( success, run_id = trigger_workflow(
chunk['start'], chunk['start'],
chunk['end'], chunk['end'],
repo=args.repo, chunk_days=args.chunk_days,
branch=args.branch, branch=args.branch,
dry_run=args.dry_run dry_run=args.dry_run
) )
@@ -194,12 +189,11 @@ def main():
if triggered_runs and not args.dry_run: if triggered_runs and not args.dry_run:
import json import json
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
runs_file = f"./output/triggered_runs_{timestamp}.json" runs_file = f"./triggered_runs_{timestamp}.json"
with open(runs_file, 'w') as f: with open(runs_file, 'w') as f:
json.dump({ json.dump({
'start_date': args.start_date, 'start_date': args.start_date,
'end_date': args.end_date, 'end_date': args.end_date,
'repo': args.repo,
'branch': args.branch, 'branch': args.branch,
'runs': triggered_runs 'runs': triggered_runs
}, f, indent=2) }, f, indent=2)
-82
View File
@@ -1,82 +0,0 @@
#!/usr/bin/env python3
"""
Run src.adsb.main in an isolated git worktree so edits in the main
working tree won't affect subprocess imports during the run.
Usage:
python scripts/run_main_isolated.py 2026-01-01
python scripts/run_main_isolated.py --start_date 2026-01-01 --end_date 2026-01-03
"""
import argparse
import os
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
def run(
cmd: list[str],
*,
cwd: Path | None = None,
check: bool = True,
) -> subprocess.CompletedProcess:
print(f"\n>>> {' '.join(cmd)}")
return subprocess.run(cmd, cwd=cwd, check=check)
def main() -> int:
parser = argparse.ArgumentParser(description="Run src.adsb.main in an isolated worktree")
parser.add_argument("date", nargs="?", help="Single date to process (YYYY-MM-DD)")
parser.add_argument("--start_date", help="Start date (inclusive, YYYY-MM-DD)")
parser.add_argument("--end_date", help="End date (exclusive, YYYY-MM-DD)")
parser.add_argument("--concat_with_latest_csv", action="store_true", help="Also concatenate with latest CSV from GitHub releases")
args = parser.parse_args()
if args.date and (args.start_date or args.end_date):
raise SystemExit("Use a single date or --start_date/--end_date, not both.")
if args.date:
datetime.strptime(args.date, "%Y-%m-%d")
main_args = ["--date", args.date]
else:
if not args.start_date or not args.end_date:
raise SystemExit("Provide --start_date and --end_date, or a single date.")
datetime.strptime(args.start_date, "%Y-%m-%d")
datetime.strptime(args.end_date, "%Y-%m-%d")
main_args = ["--start_date", args.start_date, "--end_date", args.end_date]
if args.concat_with_latest_csv:
main_args.append("--concat_with_latest_csv")
repo_root = Path(__file__).resolve().parents[1]
snapshots_root = repo_root / ".snapshots"
snapshots_root.mkdir(exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
snapshot_root = snapshots_root / f"run_{timestamp}"
snapshot_src = snapshot_root / "src"
exit_code = 0
try:
shutil.copytree(repo_root / "src", snapshot_src)
runner = (
"import sys, runpy; "
f"sys.path.insert(0, {repr(str(snapshot_root))}); "
f"sys.argv = ['src.adsb.main'] + {main_args!r}; "
"runpy.run_module('src.adsb.main', run_name='__main__')"
)
cmd = [sys.executable, "-c", runner]
run(cmd, cwd=repo_root)
except subprocess.CalledProcessError as exc:
exit_code = exc.returncode
finally:
shutil.rmtree(snapshot_root, ignore_errors=True)
return exit_code
if __name__ == "__main__":
raise SystemExit(main())
-242
View File
@@ -1,242 +0,0 @@
#!/usr/bin/env python3
"""
Parse TheAirTraffic Database CSV and produce community_submission.v1 JSON.
Source: "TheAirTraffic Database - Aircraft 2.csv"
Output: community/YYYY-MM-DD/theairtraffic_<date>_<hash>.json
Categories in the spreadsheet columns (paired: name, registrations, separator):
Col 1-3: Business
Col 4-6: Government
Col 7-9: People
Col 10-12: Sports
Col 13-15: Celebrity
Col 16-18: State Govt./Law
Col 19-21: Other
Col 22-24: Test Aircraft
Col 25-27: YouTubers
Col 28-30: Formula 1 VIP's
Col 31-33: Active GII's and GIII's (test/demo aircraft)
Col 34-37: Russia & Ukraine (extra col for old/new)
Col 38-40: Helicopters & Blimps
Col 41-43: Unique Reg's
Col 44-46: Saudi & UAE
Col 47-49: Schools
Col 50-52: Special Charter
Col 53-55: Unknown Owners
Col 56-59: Frequent Flyers (extra cols: name, aircraft, logged, hours)
"""
import csv
import json
import hashlib
import re
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
# ── Category mapping ────────────────────────────────────────────────────────
# Each entry: (name_col, reg_col, owner_category_tags)
# owner_category_tags is a dict of tag keys to add beyond "owner"
CATEGORY_COLUMNS = [
# (name_col, reg_col, {tag_key: tag_value, ...})
(1, 2, {"owner_category_0": "business"}),
(4, 5, {"owner_category_0": "government"}),
(7, 8, {"owner_category_0": "celebrity"}),
(10, 11, {"owner_category_0": "sports"}),
(13, 14, {"owner_category_0": "celebrity"}),
(16, 17, {"owner_category_0": "government", "owner_category_1": "law_enforcement"}),
(19, 20, {"owner_category_0": "other"}),
(22, 23, {"owner_category_0": "test_aircraft"}),
(25, 26, {"owner_category_0": "youtuber", "owner_category_1": "celebrity"}),
(28, 29, {"owner_category_0": "celebrity", "owner_category_1": "motorsport"}),
(31, 32, {"owner_category_0": "test_aircraft"}),
# Russia & Ukraine: col 34=name, col 35 or 36 may have reg
(34, 35, {"owner_category_0": "russia_ukraine"}),
(38, 39, {"owner_category_0": "celebrity", "category": "helicopter_or_blimp"}),
(41, 42, {"owner_category_0": "other"}),
(44, 45, {"owner_category_0": "government", "owner_category_1": "royal_family"}),
(47, 48, {"owner_category_0": "education"}),
(50, 51, {"owner_category_0": "charter"}),
(53, 54, {"owner_category_0": "unknown"}),
(56, 57, {"owner_category_0": "celebrity"}), # Frequent Flyers name col, aircraft col
]
# First data row index (0-based) in the CSV
DATA_START_ROW = 4
# ── Contributor info ────────────────────────────────────────────────────────
CONTRIBUTOR_NAME = "TheAirTraffic"
# Deterministic UUID v5 from contributor name
CONTRIBUTOR_UUID = str(uuid.uuid5(uuid.NAMESPACE_URL, "https://theairtraffic.com"))
# Citation
CITATION = "https://docs.google.com/spreadsheets/d/1JHhfJBnJPNBA6TgiSHjkXFkHBdVTTz_nXxaUDRWcHpk"
def looks_like_military_serial(reg: str) -> bool:
"""
Detect military-style serials like 92-9000, 82-8000, 98-0001
or pure numeric IDs like 929000, 828000, 980001.
These aren't standard civil registrations; use openairframes_id.
"""
# Pattern: NN-NNNN
if re.match(r'^\d{2}-\d{4}$', reg):
return True
# Pure 6-digit numbers (likely ICAO hex or military mode-S)
if re.match(r'^\d{6}$', reg):
return True
# Short numeric-only (1-5 digits) like "01", "02", "676"
if re.match(r'^\d{1,5}$', reg):
return True
return False
def normalize_reg(raw: str) -> str:
"""Clean up a registration string."""
reg = raw.strip().rstrip(',').strip()
# Remove carriage returns and other whitespace
reg = reg.replace('\r', '').replace('\n', '').strip()
return reg
def parse_regs(cell_value: str) -> list[str]:
"""
Parse a cell that may contain one or many registrations,
separated by commas, possibly wrapped in quotes.
"""
if not cell_value or not cell_value.strip():
return []
# Some cells have ADS-B exchange URLs skip those
if 'globe.adsbexchange.com' in cell_value:
return []
if cell_value.strip() in ('.', ',', ''):
return []
results = []
# Split on comma
parts = cell_value.split(',')
for part in parts:
reg = normalize_reg(part)
if not reg:
continue
# Skip URLs, section labels, etc.
if reg.startswith('http') or reg.startswith('Link') or reg == 'Section 1':
continue
# Skip if it's just whitespace or dots
if reg in ('.', '..', '...'):
continue
results.append(reg)
return results
def make_submission(
reg: str,
owner: str,
category_tags: dict[str, str],
) -> dict:
"""Build a single community_submission.v1 object."""
entry: dict = {}
# Decide identifier field
if looks_like_military_serial(reg):
entry["openairframes_id"] = reg
else:
entry["registration_number"] = reg
# Tags
tags: dict = {
"citation_0": CITATION,
}
if owner:
tags["owner"] = owner.strip()
tags.update(category_tags)
entry["tags"] = tags
return entry
def main():
csv_path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(
"/Users/jonahgoode/Downloads/TheAirTraffic Database - Aircraft 2.csv"
)
if not csv_path.exists():
print(f"ERROR: CSV not found at {csv_path}", file=sys.stderr)
sys.exit(1)
# Read CSV
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
rows = list(reader)
print(f"Read {len(rows)} rows from {csv_path.name}")
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
submissions: list[dict] = []
seen: set[tuple] = set() # (reg, owner) dedup
for row_idx in range(DATA_START_ROW, len(rows)):
row = rows[row_idx]
if len(row) < 3:
continue
for name_col, reg_col, cat_tags in CATEGORY_COLUMNS:
if reg_col >= len(row) or name_col >= len(row):
continue
owner_raw = row[name_col].strip().rstrip(',').strip()
reg_raw = row[reg_col]
# Clean owner name
owner = owner_raw.replace('\r', '').replace('\n', '').strip()
if not owner or owner in ('.', ',', 'Section 1'):
continue
# Skip header-like values
if owner.startswith('http') or owner.startswith('Link '):
continue
regs = parse_regs(reg_raw)
if not regs:
# For Russia & Ukraine, try the next column too (col 35 might have old reg, col 36 new)
if name_col == 34 and reg_col + 1 < len(row):
regs = parse_regs(row[reg_col + 1])
for reg in regs:
key = (reg, owner)
if key in seen:
continue
seen.add(key)
submissions.append(make_submission(reg, owner, cat_tags))
print(f"Generated {len(submissions)} submissions")
# Write output
proj_root = Path(__file__).resolve().parent.parent
out_dir = proj_root / "community" / date_str
out_dir.mkdir(parents=True, exist_ok=True)
out_file = out_dir / f"theairtraffic_{date_str}.json"
with open(out_file, 'w', encoding='utf-8') as f:
json.dump(submissions, f, indent=2, ensure_ascii=False)
print(f"Written to {out_file}")
print(f"Sample entry:\n{json.dumps(submissions[0], indent=2)}")
# Quick stats
cats = {}
for s in submissions:
c = s['tags'].get('owner_category_0', 'NONE')
cats[c] = cats.get(c, 0) + 1
print("\nCategory breakdown:")
for c, n in sorted(cats.items(), key=lambda x: -x[1]):
print(f" {c}: {n}")
if __name__ == "__main__":
main()
-69
View File
@@ -1,69 +0,0 @@
#!/usr/bin/env python3
"""Validate the generated theairtraffic JSON output."""
import json
import glob
import sys
# Find the latest output
files = sorted(glob.glob("community/2026-02-*/theairtraffic_*.json"))
if not files:
print("No output files found!")
sys.exit(1)
path = files[-1]
print(f"Validating: {path}")
with open(path) as f:
data = json.load(f)
print(f"Total entries: {len(data)}")
# Check military serial handling
mil = [d for d in data if "openairframes_id" in d]
print(f"\nEntries using openairframes_id: {len(mil)}")
for m in mil[:10]:
print(f" {m['openairframes_id']} -> owner: {m['tags'].get('owner','?')}")
# Check youtuber entries
yt = [d for d in data if d["tags"].get("owner_category_0") == "youtuber"]
print(f"\nYouTuber entries: {len(yt)}")
for y in yt[:5]:
reg = y.get("registration_number", y.get("openairframes_id"))
c0 = y["tags"].get("owner_category_0")
c1 = y["tags"].get("owner_category_1")
print(f" {reg} -> owner: {y['tags']['owner']}, cat0: {c0}, cat1: {c1}")
# Check US Govt / military
gov = [d for d in data if d["tags"].get("owner") == "United States of America 747/757"]
print(f"\nUSA 747/757 entries: {len(gov)}")
for g in gov:
oid = g.get("openairframes_id", g.get("registration_number"))
print(f" {oid}")
# Schema validation
issues = 0
for i, d in enumerate(data):
has_id = any(k in d for k in ["registration_number", "transponder_code_hex", "openairframes_id"])
if not has_id:
print(f" Entry {i}: no identifier!")
issues += 1
if "tags" not in d:
print(f" Entry {i}: no tags!")
issues += 1
# Check tag key format
for k in d.get("tags", {}):
import re
if not re.match(r"^[a-z][a-z0-9_]{0,63}$", k):
print(f" Entry {i}: invalid tag key '{k}'")
issues += 1
print(f"\nSchema issues: {issues}")
# Category breakdown
cats = {}
for s in data:
c = s["tags"].get("owner_category_0", "NONE")
cats[c] = cats.get(c, 0) + 1
print("\nCategory breakdown:")
for c, n in sorted(cats.items(), key=lambda x: -x[1]):
print(f" {c}: {n}")
+51 -2
View File
@@ -170,7 +170,7 @@ def load_parquet_part(part_id: int, date: str) -> pl.DataFrame:
# Convert to timezone-naive datetime # Convert to timezone-naive datetime
if df["time"].dtype == pl.Datetime: if df["time"].dtype == pl.Datetime:
df = df.with_columns(pl.col("time").dt.replace_time_zone(None)) df = df.with_columns(pl.col("time").dt.replace_time_zone(None))
os.remove(parquet_file)
return df return df
@@ -194,4 +194,53 @@ def concat_compressed_dfs(df_base, df_new):
"""Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.""" """Concatenate base and new compressed dataframes, keeping the most informative row per ICAO."""
# Combine both dataframes # Combine both dataframes
df_combined = pl.concat([df_base, df_new]) df_combined = pl.concat([df_base, df_new])
return df_combined
# Sort by ICAO and time
df_combined = df_combined.sort(['icao', 'time'])
# Fill null values
for col in COLUMNS:
if col in df_combined.columns:
df_combined = df_combined.with_columns(pl.col(col).fill_null(""))
# Apply compression logic per ICAO to get the best row
icao_groups = df_combined.partition_by('icao', as_dict=True, maintain_order=True)
compressed_dfs = []
for icao_key, group_df in icao_groups.items():
icao = icao_key[0]
compressed = compress_df_polars(group_df, str(icao))
compressed_dfs.append(compressed)
if compressed_dfs:
df_compressed = pl.concat(compressed_dfs)
else:
df_compressed = df_combined.head(0)
# Sort by time
df_compressed = df_compressed.sort('time')
return df_compressed
def get_latest_aircraft_adsb_csv_df():
"""Download and load the latest ADS-B CSV from GitHub releases."""
from get_latest_release import download_latest_aircraft_adsb_csv
import re
csv_path = download_latest_aircraft_adsb_csv()
df = pl.read_csv(csv_path, null_values=[""])
# Fill nulls with empty strings
for col in df.columns:
if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null(""))
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
date_str = match.group(1)
return df, date_str
+15 -48
View File
@@ -1,67 +1,34 @@
from pathlib import Path from pathlib import Path
import polars as pl import polars as pl
import argparse import argparse
import os
OUTPUT_DIR = Path("./data/output") OUTPUT_DIR = Path("./outputs")
CORRECT_ORDER_OF_COLUMNS = ["time", "icao", "r", "t", "dbFlags", "ownOp", "year", "desc", "aircraft_category"]
def main(): def main():
parser = argparse.ArgumentParser(description="Concatenate compressed parquet files for a single day") parser = argparse.ArgumentParser(description="Concatenate compressed parquet files for a single day")
parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format") parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format")
parser.add_argument("--concat_with_latest_csv", action="store_true", help="Whether to also concatenate with the latest CSV from GitHub releases")
args = parser.parse_args() args = parser.parse_args()
compressed_dir = OUTPUT_DIR / "compressed" compressed_dir = OUTPUT_DIR / "compressed"
date_dir = compressed_dir / args.date date_dir = compressed_dir / args.date
if not date_dir.is_dir():
raise FileNotFoundError(f"No date folder found: {date_dir}")
parquet_files = sorted(date_dir.glob("*.parquet")) parquet_files = sorted(date_dir.glob("*.parquet"))
df = None if not parquet_files:
if parquet_files: # TODO: This logic could be updated slightly. raise FileNotFoundError(f"No parquet files found in {date_dir}")
print(f"No parquet files found in {date_dir}")
frames = [pl.read_parquet(p) for p in parquet_files] frames = [pl.read_parquet(p) for p in parquet_files]
df = pl.concat(frames, how="vertical", rechunk=True) df = pl.concat(frames, how="vertical", rechunk=True)
df = df.sort(["time", "icao"]) df = df.sort(["time", "icao"])
df = df.select(CORRECT_ORDER_OF_COLUMNS) output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}_{args.date}.parquet"
print(f"Writing combined parquet to {output_path} with {df.height} rows")
output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.parquet" df.write_parquet(output_path)
print(f"Writing combined parquet to {output_path} with {df.height} rows")
df.write_parquet(output_path)
csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.csv.gz" csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}_{args.date}.csv"
print(f"Writing combined csv.gz to {csv_output_path} with {df.height} rows") print(f"Writing combined csv to {csv_output_path} with {df.height} rows")
df.write_csv(csv_output_path, compression="gzip") df.write_csv(csv_output_path)
if args.concat_with_latest_csv:
print("Loading latest CSV from GitHub releases to concatenate with...")
from src.get_latest_release import get_latest_aircraft_adsb_csv_df
from datetime import datetime
df_latest_csv, csv_start_date, csv_end_date = get_latest_aircraft_adsb_csv_df()
# Compare dates: end_date is exclusive, so if csv_end_date > args.date,
# the latest CSV already includes this day's data
csv_end_dt = datetime.strptime(csv_end_date, "%Y-%m-%d")
args_dt = datetime.strptime(args.date, "%Y-%m-%d")
if df is None or csv_end_dt >= args_dt:
print(f"Latest CSV already includes data through {args.date} (end_date={csv_end_date} is exclusive)")
print("Writing latest CSV directly without concatenation to avoid duplicates")
os.makedirs(OUTPUT_DIR, exist_ok=True)
final_csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{csv_start_date}_{csv_end_date}.csv.gz"
df_latest_csv = df_latest_csv.select(CORRECT_ORDER_OF_COLUMNS)
df_latest_csv.write_csv(final_csv_output_path, compression="gzip")
else:
print(f"Concatenating latest CSV (through {csv_end_date}) with new data ({args.date})")
# Ensure column order matches before concatenating
df_latest_csv = df_latest_csv.select(CORRECT_ORDER_OF_COLUMNS)
from src.adsb.compress_adsb_to_aircraft_data import concat_compressed_dfs
df_final = concat_compressed_dfs(df_latest_csv, df)
df_final = df_final.select(CORRECT_ORDER_OF_COLUMNS)
final_csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{csv_start_date}_{args.date}.csv.gz"
df_final.write_csv(final_csv_output_path, compression="gzip")
print(f"Final CSV written to {final_csv_output_path}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()
+39 -112
View File
@@ -16,7 +16,7 @@ import sys
import urllib.error import urllib.error
import urllib.request import urllib.request
from datetime import datetime from datetime import datetime
import time
import orjson import orjson
import pyarrow as pa import pyarrow as pa
import pyarrow.parquet as pq import pyarrow.parquet as pq
@@ -88,7 +88,7 @@ def _fetch_releases_from_repo(year: str, version_date: str) -> list:
else: else:
print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status} {response.reason}") print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status} {response.reason}")
if attempt < max_retries: if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry") print(f"Waiting {retry_delay} seconds before retry...")
time.sleep(retry_delay) time.sleep(retry_delay)
else: else:
print(f"Giving up after {max_retries} attempts") print(f"Giving up after {max_retries} attempts")
@@ -96,7 +96,7 @@ def _fetch_releases_from_repo(year: str, version_date: str) -> list:
except Exception as e: except Exception as e:
print(f"Request exception (attempt {attempt}/{max_retries}): {e}") print(f"Request exception (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries: if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry") print(f"Waiting {retry_delay} seconds before retry...")
time.sleep(retry_delay) time.sleep(retry_delay)
else: else:
print(f"Giving up after {max_retries} attempts") print(f"Giving up after {max_retries} attempts")
@@ -123,105 +123,47 @@ def fetch_releases(version_date: str) -> list:
# For last day of year, also check next year's repo if nothing found # For last day of year, also check next year's repo if nothing found
if not releases and version_date.endswith(".12.31"): if not releases and version_date.endswith(".12.31"):
next_year = str(int(year) + 1) next_year = str(int(year) + 1)
print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo") print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo...")
releases = _fetch_releases_from_repo(next_year, version_date) releases = _fetch_releases_from_repo(next_year, version_date)
return releases return releases
def download_asset(asset_url: str, file_path: str, expected_size: int | None = None) -> bool: def download_asset(asset_url: str, file_path: str) -> bool:
"""Download a single release asset with size verification. """Download a single release asset."""
Args:
asset_url: URL to download from
file_path: Local path to save to
expected_size: Expected file size in bytes (for verification)
Returns:
True if download succeeded and size matches (if provided), False otherwise
"""
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True) os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
# Check if file exists and has correct size
if os.path.exists(file_path): if os.path.exists(file_path):
if expected_size is not None: print(f"[SKIP] {file_path} already downloaded.")
actual_size = os.path.getsize(file_path) return True
if actual_size == expected_size:
print(f"[SKIP] {file_path} already downloaded and verified ({actual_size} bytes).") print(f"Downloading {asset_url}...")
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(40) # 40-second timeout
req = urllib.request.Request(asset_url, headers=HEADERS)
with urllib.request.urlopen(req) as response:
signal.alarm(0)
if response.status == 200:
with open(file_path, "wb") as file:
while True:
chunk = response.read(8192)
if not chunk:
break
file.write(chunk)
print(f"Saved {file_path}")
return True return True
else: else:
print(f"[WARN] {file_path} exists but size mismatch (expected {expected_size}, got {actual_size}). Re-downloading.") print(f"Failed to download {asset_url}: {response.status} {response.msg}")
os.remove(file_path)
else:
print(f"[SKIP] {file_path} already downloaded.")
return True
max_retries = 2
retry_delay = 30
timeout_seconds = 140
for attempt in range(1, max_retries + 1):
print(f"Downloading {asset_url} (attempt {attempt}/{max_retries})")
try:
req = urllib.request.Request(asset_url, headers=HEADERS)
with urllib.request.urlopen(req, timeout=timeout_seconds) as response:
if response.status == 200:
with open(file_path, "wb") as file:
while True:
chunk = response.read(8192)
if not chunk:
break
file.write(chunk)
# Verify file size if expected_size was provided
if expected_size is not None:
actual_size = os.path.getsize(file_path)
if actual_size != expected_size:
print(f"[ERROR] Size mismatch for {file_path}: expected {expected_size} bytes, got {actual_size} bytes")
os.remove(file_path)
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
continue
return False
print(f"Saved {file_path} ({actual_size} bytes, verified)")
else:
print(f"Saved {file_path}")
return True
else:
print(f"Failed to download {asset_url}: {response.status} {response.msg}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
return False
except urllib.error.HTTPError as e:
if e.code == 404:
print(f"404 Not Found: {asset_url}")
raise Exception(f"Asset not found (404): {asset_url}")
else:
print(f"HTTP error occurred (attempt {attempt}/{max_retries}): {e.code} {e.reason}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
return False
except urllib.error.URLError as e:
print(f"URL/Timeout error (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry")
time.sleep(retry_delay)
else:
return False return False
except Exception as e: except DownloadTimeoutException as e:
print(f"An error occurred (attempt {attempt}/{max_retries}): {e}") print(f"Download aborted for {asset_url}: {e}")
if attempt < max_retries: return False
print(f"Waiting {retry_delay} seconds before retry") except Exception as e:
time.sleep(retry_delay) print(f"An error occurred while downloading {asset_url}: {e}")
else: return False
return False
return False
def extract_split_archive(file_paths: list, extract_dir: str) -> bool: def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
@@ -260,6 +202,7 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
stdin=cat_proc.stdout, stdin=cat_proc.stdout,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
check=True
) )
cat_proc.stdout.close() cat_proc.stdout.close()
cat_stderr = cat_proc.stderr.read().decode() if cat_proc.stderr else "" cat_stderr = cat_proc.stderr.read().decode() if cat_proc.stderr else ""
@@ -268,24 +211,6 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
if cat_stderr: if cat_stderr:
print(f"cat stderr: {cat_stderr}") print(f"cat stderr: {cat_stderr}")
tar_stderr = result.stderr.decode() if result.stderr else ""
if result.returncode != 0:
# GNU tar exits non-zero for format issues that BSD tar silently
# tolerates (e.g. trailing junk after the last valid entry).
# Check whether files were actually extracted before giving up.
extracted_items = os.listdir(extract_dir)
if extracted_items:
print(f"[WARN] tar exited {result.returncode} but extracted "
f"{len(extracted_items)} items — treating as success")
if tar_stderr:
print(f"tar stderr: {tar_stderr}")
else:
print(f"Failed to extract split archive (tar exit {result.returncode})")
if tar_stderr:
print(f"tar stderr: {tar_stderr}")
shutil.rmtree(extract_dir, ignore_errors=True)
return False
print(f"Successfully extracted archive to {extract_dir}") print(f"Successfully extracted archive to {extract_dir}")
# Delete tar files immediately after extraction # Delete tar files immediately after extraction
@@ -302,9 +227,11 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
print(f"Disk space after tar deletion: {free_gb:.1f}GB free") print(f"Disk space after tar deletion: {free_gb:.1f}GB free")
return True return True
except Exception as e: except subprocess.CalledProcessError as e:
stderr_output = e.stderr.decode() if e.stderr else ""
print(f"Failed to extract split archive: {e}") print(f"Failed to extract split archive: {e}")
shutil.rmtree(extract_dir, ignore_errors=True) if stderr_output:
print(f"tar stderr: {stderr_output}")
return False return False
@@ -578,7 +505,7 @@ def create_parquet_for_day(day, keep_folders: bool = False):
print(f"Parquet file already exists: {parquet_path}") print(f"Parquet file already exists: {parquet_path}")
return parquet_path return parquet_path
print(f"Creating parquet for {version_date}") print(f"Creating parquet for {version_date}...")
rows_processed = process_version_date(version_date, keep_folders) rows_processed = process_version_date(version_date, keep_folders)
if rows_processed > 0 and parquet_path.exists(): if rows_processed > 0 and parquet_path.exists():
+2 -3
View File
@@ -77,9 +77,8 @@ def download_and_extract(version_date: str) -> str | None:
for asset in use_assets: for asset in use_assets:
asset_name = asset["name"] asset_name = asset["name"]
asset_url = asset["browser_download_url"] asset_url = asset["browser_download_url"]
asset_size = asset.get("size") # Get expected file size
file_path = os.path.join(OUTPUT_DIR, asset_name) file_path = os.path.join(OUTPUT_DIR, asset_name)
if download_asset(asset_url, file_path, expected_size=asset_size): if download_asset(asset_url, file_path):
downloaded_files.append(file_path) downloaded_files.append(file_path)
if not downloaded_files: if not downloaded_files:
@@ -123,7 +122,7 @@ def process_single_day(target_day: datetime) -> tuple[str | None, list[str]]:
from pathlib import Path from pathlib import Path
import tarfile import tarfile
NUMBER_PARTS = 4 NUMBER_PARTS = 16
def split_folders_into_gzip_archives(extract_dir: Path, tar_output_dir: Path, icaos: list[str], parts = NUMBER_PARTS) -> list[str]: def split_folders_into_gzip_archives(extract_dir: Path, tar_output_dir: Path, icaos: list[str], parts = NUMBER_PARTS) -> list[str]:
traces_dir = extract_dir / "traces" traces_dir = extract_dir / "traces"
buckets = sorted(traces_dir.iterdir()) buckets = sorted(traces_dir.iterdir())
+16 -57
View File
@@ -3,74 +3,33 @@ Main pipeline for processing ADS-B data from adsb.lol.
Usage: Usage:
python -m src.adsb.main --date 2026-01-01 python -m src.adsb.main --date 2026-01-01
python -m src.adsb.main --start_date 2026-01-01 --end_date 2026-01-03
""" """
import argparse import argparse
import subprocess import subprocess
import sys import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
import polars as pl
from src.adsb.download_and_list_icaos import NUMBER_PARTS from src.adsb.download_and_list_icaos import NUMBER_PARTS
def main(): def main():
parser = argparse.ArgumentParser(description="Process ADS-B data for a single day or date range") parser = argparse.ArgumentParser(description="Process ADS-B data for a single day")
parser.add_argument("--date", type=str, help="Single date in YYYY-MM-DD format") parser.add_argument("--date", type=str, required=True)
parser.add_argument("--start_date", type=str, help="Start date (inclusive, YYYY-MM-DD)")
parser.add_argument("--end_date", type=str, help="End date (exclusive, YYYY-MM-DD)")
parser.add_argument("--concat_with_latest_csv", action="store_true", help="Also concatenate with latest CSV from GitHub releases")
args = parser.parse_args() args = parser.parse_args()
if args.date and (args.start_date or args.end_date): date_str = datetime.strptime(args.date, "%Y-%m-%d").strftime("%Y-%m-%d")
raise SystemExit("Use --date or --start_date/--end_date, not both.") print(f"Processing day: {date_str}")
if args.date: # Download and split
start_date = datetime.strptime(args.date, "%Y-%m-%d") subprocess.run([sys.executable, "-m", "src.adsb.download_and_list_icaos", "--date", date_str], check=True)
end_date = start_date + timedelta(days=1)
else: # Process parts
if not args.start_date or not args.end_date: for part_id in range(NUMBER_PARTS):
raise SystemExit("Provide --start_date and --end_date, or use --date.") subprocess.run([sys.executable, "-m", "src.adsb.process_icao_chunk", "--part-id", str(part_id), "--date", date_str], check=True)
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
end_date = datetime.strptime(args.end_date, "%Y-%m-%d") # Concatenate
subprocess.run([sys.executable, "src/adsb/concat_parquet_to_final.py", "--date", date_str], check=True)
current = start_date
while current < end_date:
date_str = current.strftime("%Y-%m-%d")
print(f"Processing day: {date_str}")
# Download and split
subprocess.run([sys.executable, "-m", "src.adsb.download_and_list_icaos", "--date", date_str], check=True)
# Process parts
for part_id in range(NUMBER_PARTS):
subprocess.run([sys.executable, "-m", "src.adsb.process_icao_chunk", "--part-id", str(part_id), "--date", date_str], check=True)
# Concatenate
concat_cmd = [sys.executable, "-m", "src.adsb.concat_parquet_to_final", "--date", date_str]
if args.concat_with_latest_csv:
concat_cmd.append("--concat_with_latest_csv")
subprocess.run(concat_cmd, check=True)
current += timedelta(days=1)
if end_date - start_date > timedelta(days=1):
dates = []
cur = start_date
while cur < end_date:
dates.append(cur.strftime("%Y-%m-%d"))
cur += timedelta(days=1)
csv_files = [
f"data/outputs/openairframes_adsb_{d}_{d}.csv"
for d in dates
]
frames = [pl.read_csv(p) for p in csv_files]
df = pl.concat(frames, how="vertical", rechunk=True)
output_path = f"data/outputs/openairframes_adsb_{start_date.strftime('%Y-%m-%d')}_{end_date.strftime('%Y-%m-%d')}.csv"
df.write_csv(output_path)
print(f"Wrote combined CSV: {output_path}")
print("Done") print("Done")
+4 -10
View File
@@ -31,6 +31,9 @@ from src.adsb.download_adsb_data_to_parquet import (
) )
CHUNK_OUTPUT_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks")
os.makedirs(CHUNK_OUTPUT_DIR, exist_ok=True)
# Smaller batch size for memory efficiency # Smaller batch size for memory efficiency
BATCH_SIZE = 100_000 BATCH_SIZE = 100_000
@@ -123,16 +126,7 @@ def main():
print(f"Processing part {args.part_id} for {args.date}") print(f"Processing part {args.part_id} for {args.date}")
# Get specific archive file for this part # Get specific archive file for this part
archive_dir = os.path.join(OUTPUT_DIR, "adsb_archives", args.date) archive_path = os.path.join(OUTPUT_DIR, "adsb_archives", args.date, f"{args.date}_part_{args.part_id}.tar.gz")
archive_path = os.path.join(archive_dir, f"{args.date}_part_{args.part_id}.tar.gz")
if not os.path.isfile(archive_path):
print(f"ERROR: Archive not found: {archive_path}")
if os.path.isdir(archive_dir):
print(f"Files in {archive_dir}: {os.listdir(archive_dir)}")
else:
print(f"Directory does not exist: {archive_dir}")
sys.exit(1)
# Extract and collect trace files # Extract and collect trace files
trace_map = build_trace_file_map(archive_path) trace_map = build_trace_file_map(archive_path)
+173
View File
@@ -0,0 +1,173 @@
#!/usr/bin/env python3
"""
Run the full ADS-B processing pipeline locally.
Downloads adsb.lol data, processes trace files, and outputs openairframes_adsb CSV.
Usage:
# Single day (yesterday by default)
python -m src.adsb.run_local
# Single day (specific date, processes 2024-01-15 only)
python -m src.adsb.run_local 2024-01-15 2024-01-16
# Date range (end date is exclusive)
python -m src.adsb.run_local 2024-01-01 2024-01-07
"""
import argparse
import os
import subprocess
import sys
from datetime import datetime, timedelta
def run_cmd(cmd: list[str], description: str) -> None:
"""Run a command and exit on failure."""
print(f"\n>>> {' '.join(cmd)}")
result = subprocess.run(cmd)
if result.returncode != 0:
print(f"ERROR: {description} failed with exit code {result.returncode}")
sys.exit(result.returncode)
def main():
parser = argparse.ArgumentParser(
description="Run full ADS-B processing pipeline locally",
usage="python -m src.adsb.run_local [start_date] [end_date]"
)
parser.add_argument(
"start_date",
nargs="?",
help="Start date (YYYY-MM-DD, inclusive). Default: yesterday"
)
parser.add_argument(
"end_date",
nargs="?",
help="End date (YYYY-MM-DD, exclusive). If omitted, processes single day (start_date + 1)"
)
parser.add_argument(
"--chunks",
type=int,
default=4,
help="Number of parallel chunks (default: 4)"
)
parser.add_argument(
"--chunk-days",
type=int,
default=1,
help="Days per chunk for date range processing (default: 1)"
)
parser.add_argument(
"--skip-base",
action="store_true",
default=True,
help="Skip downloading and merging with base release (default: True for historical runs)"
)
args = parser.parse_args()
# Determine dates
if args.start_date:
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
else:
start_date = datetime.utcnow() - timedelta(days=1)
if args.end_date:
end_date = datetime.strptime(args.end_date, "%Y-%m-%d")
else:
# Default: process single day (end = start + 1 day, exclusive)
end_date = start_date + timedelta(days=1)
start_str = start_date.strftime("%Y-%m-%d")
end_str = end_date.strftime("%Y-%m-%d")
# Generate date chunks
date_chunks = []
current = start_date
while current < end_date:
chunk_end = min(current + timedelta(days=args.chunk_days), end_date)
date_chunks.append({
'start': current.strftime("%Y-%m-%d"),
'end': chunk_end.strftime("%Y-%m-%d")
})
current = chunk_end
print("=" * 60)
print("ADS-B Processing Pipeline")
print("=" * 60)
print(f"Date range: {start_str} to {end_str} (exclusive)")
print(f"Date chunks: {len(date_chunks)} ({args.chunk_days} days each)")
print(f"ICAO chunks: {args.chunks}")
print("=" * 60)
# Process each date chunk
for idx, date_chunk in enumerate(date_chunks, 1):
chunk_start = date_chunk['start']
chunk_end = date_chunk['end']
# Convert exclusive end date to inclusive for subcommands
# download_and_list_icaos and process_icao_chunk treat both dates as inclusive
chunk_end_inclusive = (datetime.strptime(chunk_end, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d")
print(f"\n{'=' * 60}")
print(f"Processing Date Chunk {idx}/{len(date_chunks)}: {chunk_start} to {chunk_end_inclusive} (inclusive)")
print('=' * 60)
# Step 1: Download and extract
print("\n" + "=" * 60)
print("Step 1: Download and Extract")
print("=" * 60)
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
"--start-date", chunk_start, "--end-date", chunk_end_inclusive]
run_cmd(cmd, "Download and extract")
# Step 2: Process chunks
print("\n" + "=" * 60)
print("Step 2: Process Chunks")
print("=" * 60)
for chunk_id in range(args.chunks):
print(f"\n--- ICAO Chunk {chunk_id + 1}/{args.chunks} ---")
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
"--chunk-id", str(chunk_id),
"--total-chunks", str(args.chunks),
"--start-date", chunk_start,
"--end-date", chunk_end_inclusive]
run_cmd(cmd, f"Process ICAO chunk {chunk_id}")
# Step 3: Combine all chunks to CSV
print("\n" + "=" * 60)
print("Step 3: Combine All Chunks to CSV")
print("=" * 60)
chunks_dir = "./data/output/adsb_chunks"
cmd = ["python", "-m", "src.adsb.combine_chunks_to_csv",
"--chunks-dir", chunks_dir,
"--start-date", start_str,
"--end-date", end_str,
"--stream"]
if args.skip_base:
cmd.append("--skip-base")
run_cmd(cmd, "Combine chunks")
print("\n" + "=" * 60)
print("Done!")
print("=" * 60)
# Show output
output_dir = "./data/openairframes"
# Calculate actual end date for filename (end_date - 1 day since it's exclusive)
actual_end = (end_date - timedelta(days=1)).strftime("%Y-%m-%d")
output_file = f"openairframes_adsb_{start_str}_{actual_end}.csv.gz"
output_path = os.path.join(output_dir, output_file)
if os.path.exists(output_path):
size_mb = os.path.getsize(output_path) / (1024 * 1024)
print(f"Output: {output_path}")
print(f"Size: {size_mb:.1f} MB")
if __name__ == "__main__":
main()
+4 -15
View File
@@ -246,20 +246,6 @@ def process_submission(
if schema_updated: if schema_updated:
schema_note = f"\n**Schema Updated:** Added new tags: `{', '.join(new_tags)}`\n" schema_note = f"\n**Schema Updated:** Added new tags: `{', '.join(new_tags)}`\n"
# Truncate JSON preview to stay under GitHub's 65536 char body limit
max_json_preview = 50000
if len(content_json) > max_json_preview:
# Show first few entries as a preview
preview_entries = submissions[:10]
preview_json = json.dumps(preview_entries, indent=2, sort_keys=True)
json_section = (
f"### Submissions (showing 10 of {len(submissions)})\n"
f"```json\n{preview_json}\n```\n\n"
f"*Full submission ({len(submissions)} entries, {len(content_json):,} chars) is in the committed file.*"
)
else:
json_section = f"### Submissions\n```json\n{content_json}\n```"
pr_body = f"""## Community Submission pr_body = f"""## Community Submission
Adds {len(submissions)} submission(s) from @{author_username}. Adds {len(submissions)} submission(s) from @{author_username}.
@@ -271,7 +257,10 @@ Closes #{issue_number}
--- ---
{json_section}""" ### Submissions
```json
{content_json}
```"""
pr = create_pull_request( pr = create_pull_request(
title=f"Community submission: {filename}", title=f"Community submission: {filename}",
@@ -24,7 +24,7 @@ def read_all_submissions(community_dir: Path) -> list[dict]:
"""Read all JSON submissions from the community directory.""" """Read all JSON submissions from the community directory."""
all_submissions = [] all_submissions = []
for json_file in sorted(community_dir.glob("**/*.json")): for json_file in sorted(community_dir.glob("*.json")):
try: try:
with open(json_file) as f: with open(json_file) as f:
data = json.load(f) data = json.load(f)
+3 -65
View File
@@ -36,52 +36,6 @@ def get_latest_schema_version() -> int:
return max_version return max_version
def _is_balanced_json(text: str) -> bool:
"""
Check if JSON has balanced brackets/braces.
This is a simple check to ensure we captured complete JSON.
Ignores brackets/braces inside strings.
Args:
text: JSON text to check
Returns:
True if balanced, False otherwise
"""
in_string = False
escape = False
stack = []
pairs = {'[': ']', '{': '}'}
for char in text:
if escape:
escape = False
continue
if char == '\\':
escape = True
continue
if char == '"' and not escape:
in_string = not in_string
continue
if in_string:
continue
if char in pairs:
stack.append(char)
elif char in pairs.values():
if not stack:
return False
if pairs[stack[-1]] != char:
return False
stack.pop()
return len(stack) == 0 and not in_string
def get_schema_path(version: int | None = None) -> Path: def get_schema_path(version: int | None = None) -> Path:
""" """
Get path to a specific schema version, or latest if version is None. Get path to a specific schema version, or latest if version is None.
@@ -208,14 +162,10 @@ def extract_json_from_issue_body(body: str) -> str | None:
return match.group(1).strip() return match.group(1).strip()
# Try: Raw JSON after "### Submission JSON" until next section or end # Try: Raw JSON after "### Submission JSON" until next section or end
# Use greedy matching since we have a clear boundary (next ### or end) pattern_raw = r"### Submission JSON\s*\n\s*([\[{][\s\S]*?[\]}])(?=\n###|\n\n###|$)"
pattern_raw = r"### Submission JSON\s*\n\s*([\[{][\s\S]*[\]}])(?=\s*\n###|\s*$)"
match = re.search(pattern_raw, body) match = re.search(pattern_raw, body)
if match: if match:
candidate = match.group(1).strip() return match.group(1).strip()
# Validate it's complete JSON by checking balanced brackets
if _is_balanced_json(candidate):
return candidate
# Try: Any JSON object/array in the body (fallback) # Try: Any JSON object/array in the body (fallback)
pattern_any = r"([\[{][\s\S]*?[\]}])" pattern_any = r"([\[{][\s\S]*?[\]}])"
@@ -269,19 +219,7 @@ def parse_and_validate(json_str: str, schema: dict | None = None) -> tuple[list
try: try:
data = json.loads(json_str) data = json.loads(json_str)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
# Provide detailed error context return None, [f"Invalid JSON: {e}"]
error_msg = f"Invalid JSON: {e}"
# Show context around the error position
if hasattr(e, 'pos') and e.pos is not None:
start = max(0, e.pos - 50)
end = min(len(json_str), e.pos + 50)
context = json_str[start:end]
# Escape for readability
context_escaped = repr(context)
error_msg += f"\n\nContext around position {e.pos}: {context_escaped}"
return None, [error_msg]
errors = validate_submission(data, schema) errors = validate_submission(data, schema)
return data, errors return data, errors
+25 -87
View File
@@ -27,33 +27,6 @@ def _http_get_json(url: str, headers: dict[str, str]) -> dict:
return json.loads(data.decode("utf-8")) return json.loads(data.decode("utf-8"))
def get_releases(repo: str = REPO, github_token: Optional[str] = None, per_page: int = 30) -> list[dict]:
"""Get a list of releases from the repository."""
url = f"https://api.github.com/repos/{repo}/releases?per_page={per_page}"
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "openairframes-downloader/1.0",
}
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
return _http_get_json(url, headers=headers)
def get_release_assets_from_release_data(release_data: dict) -> list[ReleaseAsset]:
"""Extract assets from a release data dictionary."""
assets = []
for a in release_data.get("assets", []):
assets.append(
ReleaseAsset(
name=a["name"],
download_url=a["browser_download_url"],
size=int(a.get("size", 0)),
)
)
return assets
def get_latest_release_assets(repo: str = REPO, github_token: Optional[str] = None) -> list[ReleaseAsset]: def get_latest_release_assets(repo: str = REPO, github_token: Optional[str] = None) -> list[ReleaseAsset]:
url = f"https://api.github.com/repos/{repo}/releases/latest" url = f"https://api.github.com/repos/{repo}/releases/latest"
headers = { headers = {
@@ -64,7 +37,16 @@ def get_latest_release_assets(repo: str = REPO, github_token: Optional[str] = No
headers["Authorization"] = f"Bearer {github_token}" headers["Authorization"] = f"Bearer {github_token}"
payload = _http_get_json(url, headers=headers) payload = _http_get_json(url, headers=headers)
return get_release_assets_from_release_data(payload) assets = []
for a in payload.get("assets", []):
assets.append(
ReleaseAsset(
name=a["name"],
download_url=a["browser_download_url"],
size=int(a.get("size", 0)),
)
)
return assets
def pick_asset( def pick_asset(
@@ -173,8 +155,7 @@ def download_latest_aircraft_adsb_csv(
repo: str = REPO, repo: str = REPO,
) -> Path: ) -> Path:
""" """
Download the latest openairframes_adsb_*.csv file from GitHub releases. Download the latest openairframes_adsb_*.csv file from the latest GitHub release.
If the latest release doesn't have the file, searches previous releases.
Args: Args:
output_dir: Directory to save the downloaded file (default: "downloads") output_dir: Directory to save the downloaded file (default: "downloads")
@@ -185,69 +166,26 @@ def download_latest_aircraft_adsb_csv(
Path to the downloaded file Path to the downloaded file
""" """
output_dir = Path(output_dir) output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token)
# Get multiple releases asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
releases = get_releases(repo, github_token=github_token, per_page=30) saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
# Try each release until we find one with the matching asset return saved_to
for release in releases:
assets = get_release_assets_from_release_data(release)
try:
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
except FileNotFoundError:
# This release doesn't have the matching asset, try the next one
continue
raise FileNotFoundError(
f"No release in the last 30 releases has an asset matching 'openairframes_adsb_.*\\.csv(\\.gz)?$'"
)
import polars as pl
def get_latest_aircraft_adsb_csv_df(): def get_latest_aircraft_adsb_csv_df():
"""Download and load the latest ADS-B CSV from GitHub releases.
Returns:
tuple: (df, start_date, end_date) where dates are in YYYY-MM-DD format
"""
import re
csv_path = download_latest_aircraft_adsb_csv() csv_path = download_latest_aircraft_adsb_csv()
df = pl.read_csv(csv_path, null_values=[""]) import pandas as pd
df = pd.read_csv(csv_path)
# Parse time column: values like "2025-12-31T00:00:00.040" or "2025-05-11T15:15:50.540+0000" df = df.fillna("")
# Try with timezone first (convert to naive), then without timezone # Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
df = df.with_columns( match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
pl.col("time").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%S%.f%z", strict=False)
.dt.replace_time_zone(None) # Convert to naive datetime first
.fill_null(pl.col("time").str.strptime(pl.Datetime("ms"), "%Y-%m-%dT%H:%M:%S%.f", strict=False))
)
# Cast dbFlags and year to strings to match the schema used in compress functions
for col in ['dbFlags', 'year']:
if col in df.columns:
df = df.with_columns(pl.col(col).cast(pl.Utf8))
# Fill nulls with empty strings for string columns
for col in df.columns:
if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null(""))
# Extract start and end dates from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv", str(csv_path))
if not match: if not match:
raise ValueError(f"Could not extract dates from filename: {csv_path.name}") raise ValueError(f"Could not extract date from filename: {csv_path.name}")
start_date = match.group(1) date_str = match.group(1)
end_date = match.group(2) return df, date_str
print(df.columns)
print(df.dtypes)
return df, start_date, end_date
if __name__ == "__main__": if __name__ == "__main__":
download_latest_aircraft_csv() download_latest_aircraft_csv()
download_latest_aircraft_adsb_csv()