Compare commits

..

5 Commits

Author SHA1 Message Date
ggman12 145f1006be update template 2026-02-13 12:12:24 -05:00
ggman12 f5465f0552 update .github/workflows/update-community-prs.yaml 2026-02-13 12:00:10 -05:00
ggman12 17098ae39a fix update-community-prs.yaml 2026-02-13 11:52:53 -05:00
ggman12 6f6b65780a update community_submission.yaml. update Readme.md 2026-02-13 11:49:18 -05:00
ggman12 4015a5fcf1 OpenAirframes 1.0 2026-02-13 11:37:31 -05:00
27 changed files with 494 additions and 1131 deletions
@@ -8,13 +8,13 @@ body:
- type: markdown
attributes:
value: |
Submit **one object** or an **array of objects** that matches the community submission schema.
Submit **one object** or an **array of objects** that matches the community submission [schema](https://github.com/PlaneQuery/OpenAirframes/blob/main/schemas/community_submission.v1.schema.json). Reuse existing tags from the schema when possible.
**Rules (enforced on review/automation):**
- Each object must include **at least one** of:
- `registration_number`
- `transponder_code_hex` (6 uppercase hex chars, e.g., `ABC123`)
- `planequery_airframe_id`
- `openairframes_id`
- Your contributor name (entered below) will be applied to all objects.
- `contributor_uuid` is derived from your GitHub account automatically.
- `creation_timestamp` is created by the system (you may omit it).
@@ -27,7 +27,7 @@ body:
```json
{
"registration_number": "N12345",
"tags": {"owner": "John Doe"},
"tags": {"owner": "John Doe", "photo": "https://example.com/photo.jpg"},
"start_date": "2025-01-01"
}
```
@@ -77,6 +77,5 @@ body:
id: notes
attributes:
label: Notes (optional)
description: Any context, sources, or links that help validate your submission.
validations:
required: false
+53 -15
View File
@@ -48,7 +48,7 @@ jobs:
matrix:
chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }}
max-parallel: 3
fail-fast: false
fail-fast: true
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -74,11 +74,12 @@ jobs:
env:
START_DATE: ${{ matrix.chunk.start_date }}
END_DATE: ${{ matrix.chunk.end_date }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
python -m src.adsb.download_and_list_icaos --start-date "$START_DATE" --end-date "$END_DATE"
ls -lah data/output/
- name: Create tar of extracted data
- name: Create tar of extracted data and split into chunks
run: |
cd data/output
echo "=== Disk space before tar ==="
@@ -93,16 +94,31 @@ jobs:
ls -lah extracted_data.tar
# Verify tar integrity
tar -tf extracted_data.tar > /dev/null && echo "Tar integrity check passed" || { echo "Tar integrity check FAILED"; exit 1; }
# Create checksum of the FULL tar before splitting (for verification after reassembly)
echo "=== Creating checksum of full tar ==="
sha256sum extracted_data.tar > full_tar.sha256
cat full_tar.sha256
# Split into 500MB chunks to avoid artifact upload issues
echo "=== Splitting tar into 500MB chunks ==="
mkdir -p tar_chunks
split -b 500M extracted_data.tar tar_chunks/extracted_data.tar.part_
rm extracted_data.tar
mv full_tar.sha256 tar_chunks/
echo "=== Chunks created ==="
ls -lah tar_chunks/
else
echo "ERROR: No extracted directories found, cannot create tar"
exit 1
fi
- name: Upload extracted data
- name: Upload extracted data chunks
uses: actions/upload-artifact@v4
with:
name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }}
path: data/output/extracted_data.tar
path: data/output/tar_chunks/
retention-days: 1
compression-level: 0
if-no-files-found: warn
@@ -140,18 +156,40 @@ jobs:
uses: actions/download-artifact@v4
with:
name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }}
path: data/output/
continue-on-error: true
path: data/output/tar_chunks/
- name: Extract tar
- name: Reassemble and extract tar
id: extract
run: |
cd data/output
if [ -f extracted_data.tar ]; then
echo "=== Tar file info ==="
if [ -d tar_chunks ] && ls tar_chunks/extracted_data.tar.part_* 1>/dev/null 2>&1; then
echo "=== Chunk files info ==="
ls -lah tar_chunks/
cd tar_chunks
# Reassemble tar with explicit sorting
echo "=== Reassembling tar file ==="
ls -1 extracted_data.tar.part_?? | sort | while read part; do
echo "Appending $part..."
cat "$part" >> ../extracted_data.tar
done
cd ..
echo "=== Reassembled tar file info ==="
ls -lah extracted_data.tar
echo "=== Verifying tar integrity ==="
tar -tf extracted_data.tar > /dev/null || { echo "ERROR: Tar file is corrupted"; exit 1; }
# Verify checksum of reassembled tar matches original
echo "=== Verifying reassembled tar checksum ==="
echo "Original checksum:"
cat tar_chunks/full_tar.sha256
echo "Reassembled checksum:"
sha256sum extracted_data.tar
sha256sum -c tar_chunks/full_tar.sha256 || { echo "ERROR: Reassembled tar checksum mismatch - data corrupted during transfer"; exit 1; }
echo "Checksum verified - data integrity confirmed"
rm -rf tar_chunks
echo "=== Extracting ==="
tar -xvf extracted_data.tar
rm extracted_data.tar
@@ -159,7 +197,7 @@ jobs:
echo "=== Contents of data/output ==="
ls -lah
else
echo "No extracted_data.tar found"
echo "No tar chunks found"
echo "has_data=false" >> "$GITHUB_OUTPUT"
fi
@@ -220,11 +258,11 @@ jobs:
END_DATE: ${{ needs.generate-matrix.outputs.global_end }}
run: |
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date "$START_DATE" --end-date "$END_DATE" --skip-base --stream
ls -lah data/planequery_aircraft/
ls -lah data/openairframes/
- name: Upload final artifact
uses: actions/upload-artifact@v4
with:
name: planequery_aircraft_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }}
path: data/planequery_aircraft/*.csv
name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }}
path: data/openairframes/*.csv
retention-days: 30
@@ -1,10 +1,15 @@
name: planequery-aircraft Daily Release
name: OpenAirframes Daily Release
on:
schedule:
# 6:00pm UTC every day - runs on default branch, triggers both
- cron: "0 06 * * *"
workflow_dispatch:
inputs:
date:
description: 'Date to process (YYYY-MM-DD format, default: yesterday)'
required: false
type: string
permissions:
contents: write
@@ -22,7 +27,7 @@ jobs:
await github.rest.actions.createWorkflowDispatch({
owner: context.repo.owner,
repo: context.repo.repo,
workflow_id: 'planequery-aircraft-daily-release.yaml',
workflow_id: 'openairframes-daily-release.yaml',
ref: 'main'
});
@@ -33,7 +38,7 @@ jobs:
await github.rest.actions.createWorkflowDispatch({
owner: context.repo.owner,
repo: context.repo.repo,
workflow_id: 'planequery-aircraft-daily-release.yaml',
workflow_id: 'openairframes-daily-release.yaml',
ref: 'develop'
});
@@ -58,16 +63,16 @@ jobs:
- name: Run FAA release script
run: |
python src/create_daily_planequery_aircraft_release.py
python src/create_daily_faa_release.py ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/faa_releasable
ls -lah data/planequery_aircraft
ls -lah data/openairframes
- name: Upload FAA artifacts
uses: actions/upload-artifact@v4
with:
name: faa-release
path: |
data/planequery_aircraft/planequery_aircraft_faa_*.csv
data/openairframes/openairframes_faa_*.csv
data/faa_releasable/ReleasableAircraft_*.zip
retention-days: 1
@@ -93,8 +98,10 @@ jobs:
pip install -r requirements.txt
- name: Download and extract ADS-B data
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
python -m src.adsb.download_and_list_icaos
python -m src.adsb.download_and_list_icaos ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/output/
- name: Check manifest exists
@@ -164,7 +171,7 @@ jobs:
- name: Process chunk ${{ matrix.chunk }}
run: |
python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4
python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4 ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
mkdir -p data/output/adsb_chunks
ls -lah data/output/adsb_chunks/ || echo "No chunks created"
@@ -213,14 +220,14 @@ jobs:
run: |
mkdir -p data/output/adsb_chunks
ls -lah data/output/adsb_chunks/ || echo "Directory empty or does not exist"
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks
ls -lah data/planequery_aircraft/
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/openairframes/
- name: Upload ADS-B artifacts
uses: actions/upload-artifact@v4
with:
name: adsb-release
path: data/planequery_aircraft/planequery_aircraft_adsb_*.csv
path: data/openairframes/openairframes_adsb_*.csv
retention-days: 1
build-community:
@@ -245,13 +252,13 @@ jobs:
- name: Run Community release script
run: |
python -m src.contributions.create_daily_community_release
ls -lah data/planequery_aircraft
ls -lah data/openairframes
- name: Upload Community artifacts
uses: actions/upload-artifact@v4
with:
name: community-release
path: data/planequery_aircraft/planequery_aircraft_community_*.csv
path: data/openairframes/openairframes_community_*.csv
retention-days: 1
create-release:
@@ -259,6 +266,13 @@ jobs:
needs: [build-faa, adsb-reduce, build-community]
if: github.event_name != 'schedule'
steps:
- name: Checkout for gh CLI
uses: actions/checkout@v4
with:
sparse-checkout: |
.github
sparse-checkout-cone-mode: false
- name: Download FAA artifacts
uses: actions/download-artifact@v4
with:
@@ -279,6 +293,8 @@ jobs:
- name: Debug artifact structure
run: |
echo "=== Full artifacts tree ==="
find artifacts -type f 2>/dev/null || echo "No files found in artifacts"
echo "=== FAA artifacts ==="
find artifacts/faa -type f 2>/dev/null || echo "No files found in artifacts/faa"
echo "=== ADS-B artifacts ==="
@@ -297,16 +313,38 @@ jobs:
elif [ "$BRANCH_NAME" = "develop" ]; then
BRANCH_SUFFIX="-develop"
fi
TAG="planequery-aircraft-${DATE}${BRANCH_SUFFIX}"
TAG="openairframes-${DATE}${BRANCH_SUFFIX}"
# Find files from artifacts
CSV_FILE_FAA=$(ls artifacts/faa/data/planequery_aircraft/planequery_aircraft_faa_*.csv | head -1)
# Find files from artifacts using find (handles nested structures)
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
# Validate required files exist
MISSING_FILES=""
if [ -z "$CSV_FILE_FAA" ] || [ ! -f "$CSV_FILE_FAA" ]; then
MISSING_FILES="$MISSING_FILES FAA_CSV"
fi
if [ -z "$CSV_FILE_ADSB" ] || [ ! -f "$CSV_FILE_ADSB" ]; then
MISSING_FILES="$MISSING_FILES ADSB_CSV"
fi
if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then
MISSING_FILES="$MISSING_FILES FAA_ZIP"
fi
if [ -n "$MISSING_FILES" ]; then
echo "ERROR: Missing required release files:$MISSING_FILES"
echo "FAA CSV: $CSV_FILE_FAA"
echo "ADSB CSV: $CSV_FILE_ADSB"
echo "ZIP: $ZIP_FILE"
exit 1
fi
# Get basenames for display
CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA")
CSV_FILE_ADSB=$(ls artifacts/adsb/planequery_aircraft_adsb_*.csv | head -1)
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
CSV_FILE_COMMUNITY=$(ls artifacts/community/planequery_aircraft_community_*.csv 2>/dev/null | head -1 || echo "")
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
ZIP_FILE=$(ls artifacts/faa/data/faa_releasable/ReleasableAircraft_*.zip | head -1)
ZIP_BASENAME=$(basename "$ZIP_FILE")
echo "date=$DATE" >> "$GITHUB_OUTPUT"
@@ -319,12 +357,18 @@ jobs:
echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT"
echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT"
echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT"
echo "name=planequery-aircraft snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
echo "Found files:"
echo " FAA CSV: $CSV_FILE_FAA"
echo " ADSB CSV: $CSV_FILE_ADSB"
echo " Community CSV: $CSV_FILE_COMMUNITY"
echo " ZIP: $ZIP_FILE"
- name: Delete existing release if exists
run: |
gh release delete "${{ steps.meta.outputs.tag }}" --yes 2>/dev/null || true
git push --delete origin "refs/tags/${{ steps.meta.outputs.tag }}" 2>/dev/null || true
echo "Attempting to delete release: ${{ steps.meta.outputs.tag }}"
gh release delete "${{ steps.meta.outputs.tag }}" --yes --cleanup-tag || echo "No existing release to delete"
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@@ -333,6 +377,7 @@ jobs:
with:
tag_name: ${{ steps.meta.outputs.tag }}
name: ${{ steps.meta.outputs.name }}
fail_on_unmatched_files: true
body: |
Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}.
@@ -0,0 +1,171 @@
name: Process Historical FAA Data
on:
workflow_dispatch: # Manual trigger
jobs:
generate-matrix:
runs-on: ubuntu-latest
outputs:
matrix: ${{ steps.set-matrix.outputs.matrix }}
steps:
- name: Generate date ranges
id: set-matrix
run: |
python3 << 'EOF'
import json
from datetime import datetime, timedelta
start = datetime(2023, 8, 16)
end = datetime(2026, 1, 1)
ranges = []
current = start
# Process in 4-day chunks
while current < end:
chunk_end = current + timedelta(days=4)
# Don't go past the end date
if chunk_end > end:
chunk_end = end
ranges.append({
"since": current.strftime("%Y-%m-%d"),
"until": chunk_end.strftime("%Y-%m-%d")
})
current = chunk_end
print(f"::set-output name=matrix::{json.dumps(ranges)}")
EOF
clone-faa-repo:
runs-on: ubuntu-latest
steps:
- name: Cache FAA repository
id: cache-faa-repo
uses: actions/cache@v4
with:
path: data/scrape-faa-releasable-aircraft
key: faa-repo-v1
- name: Clone FAA repository
if: steps.cache-faa-repo.outputs.cache-hit != 'true'
run: |
mkdir -p data
git clone https://github.com/simonw/scrape-faa-releasable-aircraft data/scrape-faa-releasable-aircraft
echo "Repository cloned successfully"
process-chunk:
needs: [generate-matrix, clone-faa-repo]
runs-on: ubuntu-latest
strategy:
max-parallel: 5 # Process 5 chunks at a time
matrix:
range: ${{ fromJson(needs.generate-matrix.outputs.matrix) }}
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Restore FAA repository cache
uses: actions/cache/restore@v4
with:
path: data/scrape-faa-releasable-aircraft
key: faa-repo-v1
fail-on-cache-miss: true
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Process chunk ${{ matrix.range.since }} to ${{ matrix.range.until }}
run: |
python src/get_historical_faa.py "${{ matrix.range.since }}" "${{ matrix.range.until }}"
- name: Upload CSV artifact
uses: actions/upload-artifact@v4
with:
name: csv-${{ matrix.range.since }}-to-${{ matrix.range.until }}
path: data/faa_releasable_historical/*.csv
retention-days: 1
create-release:
needs: process-chunk
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Download all artifacts
uses: actions/download-artifact@v4
with:
path: artifacts
- name: Prepare release files
run: |
mkdir -p release-files
find artifacts -name "*.csv" -exec cp {} release-files/ \;
ls -lh release-files/
- name: Create Release
uses: softprops/action-gh-release@v1
with:
tag_name: historical-faa-${{ github.run_number }}
name: Historical FAA Data Release ${{ github.run_number }}
body: |
Automated release of historical FAA aircraft data
Processing period: 2023-08-16 to 2026-01-01
Generated: ${{ github.event.repository.updated_at }}
files: release-files/*.csv
draft: false
prerelease: false
concatenate-and-release:
needs: process-chunk
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Download all artifacts
uses: actions/download-artifact@v4
with:
path: artifacts
- name: Prepare CSVs for concatenation
run: |
mkdir -p data/faa_releasable_historical
find artifacts -name "*.csv" -exec cp {} data/faa_releasable_historical/ \;
ls -lh data/faa_releasable_historical/
- name: Concatenate all CSVs
run: |
python scripts/concat_csvs.py
- name: Create Combined Release
uses: softprops/action-gh-release@v1
with:
tag_name: historical-faa-combined-${{ github.run_number }}
name: Historical FAA Data Combined Release ${{ github.run_number }}
body: |
Combined historical FAA aircraft data (all chunks concatenated)
Processing period: 2023-08-16 to 2026-01-01
Generated: ${{ github.event.repository.updated_at }}
files: data/openairframes/*.csv
draft: false
prerelease: false
+41 -18
View File
@@ -48,29 +48,52 @@ jobs:
git fetch origin "$branch_name"
git checkout "$branch_name"
# Merge main into PR branch
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
if git merge origin/main -m "Merge main to update schema"; then
# Regenerate schema for this PR's submission (adds any new tags)
python -m src.contributions.regenerate_pr_schema || true
# If there are changes, commit and push
if [ -n "$(git status --porcelain schemas/)" ]; then
git add schemas/
git commit -m "Update schema with new tags"
git push origin "$branch_name"
echo " Updated PR #$pr_number with schema changes"
else
git push origin "$branch_name"
echo " Merged main into PR #$pr_number"
# Get the community submission file(s) and schema from this branch
community_files=$(git diff --name-only origin/main...HEAD -- 'community/' 'schemas/')
if [ -z "$community_files" ]; then
echo " No community/schema files found in PR #$pr_number, skipping"
git checkout main
continue
fi
echo " Files to preserve: $community_files"
# Save the community files content
mkdir -p /tmp/pr_files
for file in $community_files; do
if [ -f "$file" ]; then
mkdir -p "/tmp/pr_files/$(dirname "$file")"
cp "$file" "/tmp/pr_files/$file"
fi
done
# Reset branch to main (clean slate)
git reset --hard origin/main
# Restore the community files
for file in $community_files; do
if [ -f "/tmp/pr_files/$file" ]; then
mkdir -p "$(dirname "$file")"
cp "/tmp/pr_files/$file" "$file"
fi
done
rm -rf /tmp/pr_files
# Regenerate schema with current main + this submission's tags
python -m src.contributions.regenerate_pr_schema || true
# Stage and commit all changes
git add community/ schemas/
if ! git diff --cached --quiet; then
git commit -m "Community submission (rebased on main)"
git push --force origin "$branch_name"
echo " Rebased PR #$pr_number onto main"
else
echo " Merge conflict in PR #$pr_number, adding comment"
gh pr comment "$pr_number" --body $'⚠️ **Merge Conflict**\n\nAnother community submission was merged and this PR has conflicts.\n\nA maintainer may need to:\n1. Close this PR\n2. Remove the `approved` label from the original issue\n3. Re-add the `approved` label to regenerate the PR'
git merge --abort
fi
echo " No changes needed for PR #$pr_number"
fi
git checkout main
+1 -1
View File
@@ -1,6 +1,6 @@
MIT License
Copyright (c) 2026 PlaneQuery
Copyright (c) 2026 OpenAirframes
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
+50 -1
View File
@@ -1 +1,50 @@
Downloads [`https://registry.faa.gov/database/ReleasableAircraft.zip`](https://registry.faa.gov/database/ReleasableAircraft.zip). Creates a daily GitHub Release at 06:00 UTC containing the unaltered `ReleasableAircraft.zip` and a derived CSV file with all data from FAA database since 2023-08-16. The FAA database updates daily at 05:30 UTC.
# OpenAirframes.org
OpenAirframes.org is an open-source, community-driven airframes database.
The data includes:
- Registration information from Civil Aviation Authorities (FAA)
- Airline data (e.g., Air France)
- Community contributions such as ownership details, military aircraft info, photos, and more
---
## For Users
A daily release is created at **06:00 UTC** and includes:
- **openairframes_community.csv**
All community submissions
- **openairframes_faa.csv**
All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB)
- **openairframes_adsb.csv**
Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present (will be from 2024-01-01 soon). The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB).
- **ReleasableAircraft_{date}.zip**
A daily snapshot of the FAA database, which updates at **05:30 UTC**
---
## For Contributors
Submit data via a [GitHub Issue](https://github.com/PlaneQuery/OpenAirframes/issues/new?template=community_submission.yaml) with your preferred attribution. Once approved, it will appear in the daily release. A leaderboard will be available in the future.
All data is valuable. Examples include:
- Celebrity ownership (with citations)
- Photos
- Internet capability
- Military aircraft information
- Unique facts (e.g., an airframe that crashed, performs aerobatics, etc.)
Please try to follow the submission formatting guidelines. If you are struggling with them, that is fine—submit your data anyway and it will be formatted for you.
---
## For Developers
All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include:
- Web UI for data
- Web UI for contributors
- Additional export formats in the daily release
- Data fusion from multiple sources in the daily release
- Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs
@@ -1,14 +0,0 @@
[
{
"contributor_name": "hellohello",
"contributor_uuid": "2981c3ee-8712-5f96-84bf-732eda515a3f",
"creation_timestamp": "2026-02-12T21:02:32.325360+00:00",
"registration_number": "N12345",
"tags": {
"airshows": true,
"cat_friendly": false,
"dog_friendly": true,
"notes": "is a pet carrier"
}
}
]
-11
View File
@@ -1,11 +0,0 @@
#!/usr/bin/env python3
import os
import aws_cdk as cdk
from stack import AdsbProcessingStack
app = cdk.App()
AdsbProcessingStack(app, "AdsbProcessingStack", env=cdk.Environment(
account=os.environ["CDK_DEFAULT_ACCOUNT"],
region=os.environ["CDK_DEFAULT_REGION"],
))
app.synth()
-3
View File
@@ -1,3 +0,0 @@
{
"app": "python3 app.py"
}
-2
View File
@@ -1,2 +0,0 @@
aws-cdk-lib>=2.170.0
constructs>=10.0.0
-213
View File
@@ -1,213 +0,0 @@
import aws_cdk as cdk
from aws_cdk import (
Stack,
Duration,
RemovalPolicy,
aws_s3 as s3,
aws_ecs as ecs,
aws_ec2 as ec2,
aws_ecr_assets,
aws_iam as iam,
aws_logs as logs,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as sfn_tasks,
)
from constructs import Construct
from pathlib import Path
class AdsbProcessingStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs):
super().__init__(scope, id, **kwargs)
# --- S3 bucket for intermediate and final results ---
bucket = s3.Bucket(
self, "ResultsBucket",
bucket_name="planequery-aircraft-dev",
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True,
lifecycle_rules=[
s3.LifecycleRule(
prefix="intermediate/",
expiration=Duration.days(7),
)
],
)
# --- Use default VPC (no additional cost) ---
vpc = ec2.Vpc.from_lookup(
self, "Vpc",
is_default=True,
)
# --- ECS Cluster ---
cluster = ecs.Cluster(
self, "Cluster",
vpc=vpc,
container_insights=True,
)
# --- Log group ---
log_group = logs.LogGroup(
self, "LogGroup",
log_group_name="/adsb-processing",
removal_policy=RemovalPolicy.DESTROY,
retention=logs.RetentionDays.TWO_WEEKS,
)
# --- Docker images (built from local Dockerfiles) ---
adsb_dir = str(Path(__file__).parent.parent / "src" / "adsb")
worker_image = ecs.ContainerImage.from_asset(
adsb_dir,
file="Dockerfile.worker",
platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64,
)
reducer_image = ecs.ContainerImage.from_asset(
adsb_dir,
file="Dockerfile.reducer",
platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64,
)
# --- Task role (shared) ---
task_role = iam.Role(
self, "TaskRole",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
)
bucket.grant_read_write(task_role)
# --- MAP: worker task definition ---
map_task_def = ecs.FargateTaskDefinition(
self, "MapTaskDef",
cpu=4096, # 4 vCPU
memory_limit_mib=30720, # 30 GB
task_role=task_role,
runtime_platform=ecs.RuntimePlatform(
cpu_architecture=ecs.CpuArchitecture.ARM64,
operating_system_family=ecs.OperatingSystemFamily.LINUX,
),
)
map_container = map_task_def.add_container(
"worker",
image=worker_image,
logging=ecs.LogDrivers.aws_logs(
stream_prefix="map",
log_group=log_group,
),
environment={
"S3_BUCKET": bucket.bucket_name,
},
)
# --- REDUCE: reducer task definition ---
reduce_task_def = ecs.FargateTaskDefinition(
self, "ReduceTaskDef",
cpu=4096, # 4 vCPU
memory_limit_mib=30720, # 30 GB — must hold full year in memory
task_role=task_role,
runtime_platform=ecs.RuntimePlatform(
cpu_architecture=ecs.CpuArchitecture.ARM64,
operating_system_family=ecs.OperatingSystemFamily.LINUX,
),
)
reduce_container = reduce_task_def.add_container(
"reducer",
image=reducer_image,
logging=ecs.LogDrivers.aws_logs(
stream_prefix="reduce",
log_group=log_group,
),
environment={
"S3_BUCKET": bucket.bucket_name,
},
)
# --- Step Functions ---
# Map task: run ECS Fargate for each date chunk
map_ecs_task = sfn_tasks.EcsRunTask(
self, "ProcessChunk",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=map_task_def,
launch_target=sfn_tasks.EcsFargateLaunchTarget(
platform_version=ecs.FargatePlatformVersion.LATEST,
),
container_overrides=[
sfn_tasks.ContainerOverride(
container_definition=map_container,
environment=[
sfn_tasks.TaskEnvironmentVariable(
name="START_DATE",
value=sfn.JsonPath.string_at("$.start_date"),
),
sfn_tasks.TaskEnvironmentVariable(
name="END_DATE",
value=sfn.JsonPath.string_at("$.end_date"),
),
sfn_tasks.TaskEnvironmentVariable(
name="RUN_ID",
value=sfn.JsonPath.string_at("$.run_id"),
),
],
)
],
assign_public_ip=True,
subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
result_path="$.task_result",
)
# Map state — max 3 concurrent workers
map_state = sfn.Map(
self, "FanOutChunks",
items_path="$.chunks",
max_concurrency=3,
result_path="$.map_results",
)
map_state.item_processor(map_ecs_task)
# Reduce task: combine all chunk CSVs
reduce_ecs_task = sfn_tasks.EcsRunTask(
self, "ReduceResults",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=reduce_task_def,
launch_target=sfn_tasks.EcsFargateLaunchTarget(
platform_version=ecs.FargatePlatformVersion.LATEST,
),
container_overrides=[
sfn_tasks.ContainerOverride(
container_definition=reduce_container,
environment=[
sfn_tasks.TaskEnvironmentVariable(
name="RUN_ID",
value=sfn.JsonPath.string_at("$.run_id"),
),
sfn_tasks.TaskEnvironmentVariable(
name="GLOBAL_START_DATE",
value=sfn.JsonPath.string_at("$.global_start_date"),
),
sfn_tasks.TaskEnvironmentVariable(
name="GLOBAL_END_DATE",
value=sfn.JsonPath.string_at("$.global_end_date"),
),
],
)
],
assign_public_ip=True,
subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
)
# Chain: fan-out map → reduce
definition = map_state.next(reduce_ecs_task)
sfn.StateMachine(
self, "Pipeline",
state_machine_name="adsb-map-reduce",
definition_body=sfn.DefinitionBody.from_chainable(definition),
timeout=Duration.hours(48),
)
# --- Outputs ---
cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name)
cdk.CfnOutput(self, "StateMachineName", value="adsb-map-reduce")
-640
View File
@@ -1,640 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "06ae0319",
"metadata": {},
"outputs": [],
"source": [
"import clickhouse_connect\n",
"client = clickhouse_connect.get_client(\n",
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
" secure=True,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "779710f0",
"metadata": {},
"outputs": [],
"source": [
"df = client.query_df(\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '2024-01-01 00:00:00' AND time < '2024-01-02 00:00:00'\")\n",
"df_copy = df.copy()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bf024da8",
"metadata": {},
"outputs": [],
"source": [
"# -- military = dbFlags & 1; interesting = dbFlags & 2; PIA = dbFlags & 4; LADD = dbFlags & 8;"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "270607b5",
"metadata": {},
"outputs": [],
"source": [
"df = load_raw_adsb_for_day(datetime(2024,1,1))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ac06a30e",
"metadata": {},
"outputs": [],
"source": [
"df['aircraft']"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "91edab3e",
"metadata": {},
"outputs": [],
"source": [
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
"def compress_df(df):\n",
" icao = df.name\n",
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
" original_df = df.copy()\n",
" df = df.groupby(\"_signature\", as_index=False).last() # check if it works with both last and first.\n",
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
" def get_non_empty_dict(row):\n",
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
" \n",
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
" \n",
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
" def is_subset_of_any(idx):\n",
" row_dict = df.loc[idx, '_non_empty_dict']\n",
" row_count = df.loc[idx, '_non_empty_count']\n",
" \n",
" for other_idx in df.index:\n",
" if idx == other_idx:\n",
" continue\n",
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
" other_count = df.loc[other_idx, '_non_empty_count']\n",
" \n",
" # Check if all non-empty values in current row match those in other row\n",
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
" # If they match and other has more defined columns, current row is redundant\n",
" if other_count > row_count:\n",
" return True\n",
" return False\n",
" \n",
" # Keep rows that are not subsets of any other row\n",
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
" df = df[keep_mask]\n",
"\n",
" if len(df) > 1:\n",
" original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n",
" value_counts = original_df[\"_signature\"].value_counts()\n",
" max_signature = value_counts.idxmax()\n",
" df = df[df['_signature'] == max_signature]\n",
"\n",
" df['icao'] = icao\n",
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
" return df\n",
"\n",
"# df = df_copy\n",
"# df = df_copy.iloc[0:100000]\n",
"# df = df[df['r'] == \"N4131T\"]\n",
"# df = df[(df['icao'] == \"008081\")]\n",
"# df = df.iloc[0:500]\n",
"df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
"df = df.drop(columns=['aircraft'])\n",
"df = df.sort_values(['icao', 'time'])\n",
"df[COLUMNS] = df[COLUMNS].fillna('')\n",
"ORIGINAL_COLUMNS = df.columns.tolist()\n",
"df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
"cols = df_compressed.columns.tolist()\n",
"cols.remove(\"icao\")\n",
"cols.insert(1, \"icao\")\n",
"df_compressed = df_compressed[cols]\n",
"df_compressed"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "efdfcb2c",
"metadata": {},
"outputs": [],
"source": [
"df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
"df[~df['aircraft_category'].isna()]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "495c5025",
"metadata": {},
"outputs": [],
"source": [
"# SOME KIND OF MAP REDUCE SYSTEM\n",
"import os\n",
"\n",
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
"def compress_df(df):\n",
" icao = df.name\n",
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
" \n",
" # Compute signature counts before grouping (avoid copy)\n",
" signature_counts = df[\"_signature\"].value_counts()\n",
" \n",
" df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n",
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
" def get_non_empty_dict(row):\n",
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
" \n",
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
" \n",
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
" def is_subset_of_any(idx):\n",
" row_dict = df.loc[idx, '_non_empty_dict']\n",
" row_count = df.loc[idx, '_non_empty_count']\n",
" \n",
" for other_idx in df.index:\n",
" if idx == other_idx:\n",
" continue\n",
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
" other_count = df.loc[other_idx, '_non_empty_count']\n",
" \n",
" # Check if all non-empty values in current row match those in other row\n",
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
" # If they match and other has more defined columns, current row is redundant\n",
" if other_count > row_count:\n",
" return True\n",
" return False\n",
" \n",
" # Keep rows that are not subsets of any other row\n",
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
" df = df[keep_mask]\n",
"\n",
" if len(df) > 1:\n",
" # Use pre-computed signature counts instead of original_df\n",
" remaining_sigs = df['_signature']\n",
" sig_counts = signature_counts[remaining_sigs]\n",
" max_signature = sig_counts.idxmax()\n",
" df = df[df['_signature'] == max_signature]\n",
"\n",
" df['icao'] = icao\n",
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
" return df\n",
"\n",
"# names of releases something like\n",
"# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n",
"\n",
"# Let's build historical first. \n",
"\n",
"_ch_client = None\n",
"\n",
"def _get_clickhouse_client():\n",
" \"\"\"Return a reusable ClickHouse client, with retry/backoff for transient DNS or connection errors.\"\"\"\n",
" global _ch_client\n",
" if _ch_client is not None:\n",
" return _ch_client\n",
"\n",
" import clickhouse_connect\n",
" import time\n",
"\n",
" max_retries = 5\n",
" for attempt in range(1, max_retries + 1):\n",
" try:\n",
" _ch_client = clickhouse_connect.get_client(\n",
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
" secure=True,\n",
" )\n",
" return _ch_client\n",
" except Exception as e:\n",
" wait = min(2 ** attempt, 30)\n",
" print(f\" ClickHouse connect attempt {attempt}/{max_retries} failed: {e}\")\n",
" if attempt == max_retries:\n",
" raise\n",
" print(f\" Retrying in {wait}s...\")\n",
" time.sleep(wait)\n",
"\n",
"\n",
"def load_raw_adsb_for_day(day):\n",
" \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n",
" from datetime import timedelta\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" import time\n",
" \n",
" start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n",
" end_time = start_time + timedelta(days=1)\n",
" \n",
" # Set up caching\n",
" cache_dir = Path(\"data/adsb\")\n",
" cache_dir.mkdir(parents=True, exist_ok=True)\n",
" cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n",
" \n",
" # Check if cache exists\n",
" if cache_file.exists():\n",
" print(f\" Loading from cache: {cache_file}\")\n",
" df = pd.read_csv(cache_file, compression='zstd')\n",
" df['time'] = pd.to_datetime(df['time'])\n",
" else:\n",
" # Format dates for the query\n",
" start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" \n",
" max_retries = 3\n",
" for attempt in range(1, max_retries + 1):\n",
" try:\n",
" client = _get_clickhouse_client()\n",
" print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n",
" df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n",
" break\n",
" except Exception as e:\n",
" wait = min(2 ** attempt, 30)\n",
" print(f\" Query attempt {attempt}/{max_retries} failed: {e}\")\n",
" if attempt == max_retries:\n",
" raise\n",
" # Reset client in case connection is stale\n",
" global _ch_client\n",
" _ch_client = None\n",
" print(f\" Retrying in {wait}s...\")\n",
" time.sleep(wait)\n",
" \n",
" # Save to cache\n",
" df.to_csv(cache_file, index=False, compression='zstd')\n",
" print(f\" Saved to cache: {cache_file}\")\n",
" \n",
" return df\n",
"\n",
"def load_historical_for_day(day):\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" \n",
" df = load_raw_adsb_for_day(day)\n",
" print(df)\n",
" df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
" df = df.drop(columns=['aircraft'])\n",
" df = df.sort_values(['icao', 'time'])\n",
" df[COLUMNS] = df[COLUMNS].fillna('')\n",
" df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
" cols = df_compressed.columns.tolist()\n",
" cols.remove('time')\n",
" cols.insert(0, 'time')\n",
" cols.remove(\"icao\")\n",
" cols.insert(1, \"icao\")\n",
" df_compressed = df_compressed[cols]\n",
" return df_compressed\n",
"\n",
"\n",
"def concat_compressed_dfs(df_base, df_new):\n",
" \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n",
" import pandas as pd\n",
" \n",
" # Combine both dataframes\n",
" df_combined = pd.concat([df_base, df_new], ignore_index=True)\n",
" \n",
" # Sort by ICAO and time\n",
" df_combined = df_combined.sort_values(['icao', 'time'])\n",
" \n",
" # Fill NaN values\n",
" df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n",
" \n",
" # Apply compression logic per ICAO to get the best row\n",
" df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n",
" \n",
" # Sort by time\n",
" df_compressed = df_compressed.sort_values('time')\n",
" \n",
" return df_compressed\n",
"\n",
"\n",
"def get_latest_aircraft_adsb_csv_df():\n",
" \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n",
" from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n",
" \n",
" import pandas as pd\n",
" import re\n",
" \n",
" csv_path = download_latest_aircraft_adsb_csv()\n",
" df = pd.read_csv(csv_path)\n",
" df = df.fillna(\"\")\n",
" \n",
" # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n",
" match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n",
" if not match:\n",
" raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n",
" \n",
" date_str = match.group(1)\n",
" return df, date_str\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7f66acf7",
"metadata": {},
"outputs": [],
"source": [
"# SOME KIND OF MAP REDUCE SYSTEM\n",
"\n",
"\n",
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
"def compress_df(df):\n",
" icao = df.name\n",
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
" original_df = df.copy()\n",
" df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n",
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
" def get_non_empty_dict(row):\n",
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
" \n",
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
" \n",
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
" def is_subset_of_any(idx):\n",
" row_dict = df.loc[idx, '_non_empty_dict']\n",
" row_count = df.loc[idx, '_non_empty_count']\n",
" \n",
" for other_idx in df.index:\n",
" if idx == other_idx:\n",
" continue\n",
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
" other_count = df.loc[other_idx, '_non_empty_count']\n",
" \n",
" # Check if all non-empty values in current row match those in other row\n",
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
" # If they match and other has more defined columns, current row is redundant\n",
" if other_count > row_count:\n",
" return True\n",
" return False\n",
" \n",
" # Keep rows that are not subsets of any other row\n",
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
" df = df[keep_mask]\n",
"\n",
" if len(df) > 1:\n",
" original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n",
" value_counts = original_df[\"_signature\"].value_counts()\n",
" max_signature = value_counts.idxmax()\n",
" df = df[df['_signature'] == max_signature]\n",
"\n",
" df['icao'] = icao\n",
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
" return df\n",
"\n",
"# names of releases something like\n",
"# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n",
"\n",
"# Let's build historical first. \n",
"\n",
"def load_raw_adsb_for_day(day):\n",
" \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n",
" from datetime import timedelta\n",
" import clickhouse_connect\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" \n",
" start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n",
" end_time = start_time + timedelta(days=1)\n",
" \n",
" # Set up caching\n",
" cache_dir = Path(\"data/adsb\")\n",
" cache_dir.mkdir(parents=True, exist_ok=True)\n",
" cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n",
" \n",
" # Check if cache exists\n",
" if cache_file.exists():\n",
" print(f\" Loading from cache: {cache_file}\")\n",
" df = pd.read_csv(cache_file, compression='zstd')\n",
" df['time'] = pd.to_datetime(df['time'])\n",
" else:\n",
" # Format dates for the query\n",
" start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" \n",
" client = clickhouse_connect.get_client(\n",
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
" secure=True,\n",
" )\n",
" print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n",
" df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n",
" \n",
" # Save to cache\n",
" df.to_csv(cache_file, index=False, compression='zstd')\n",
" print(f\" Saved to cache: {cache_file}\")\n",
" \n",
" return df\n",
"\n",
"def load_historical_for_day(day):\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" \n",
" df = load_raw_adsb_for_day(day)\n",
" \n",
" df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
" df = df.drop(columns=['aircraft'])\n",
" df = df.sort_values(['icao', 'time'])\n",
" df[COLUMNS] = df[COLUMNS].fillna('')\n",
" df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
" cols = df_compressed.columns.tolist()\n",
" cols.remove('time')\n",
" cols.insert(0, 'time')\n",
" cols.remove(\"icao\")\n",
" cols.insert(1, \"icao\")\n",
" df_compressed = df_compressed[cols]\n",
" return df_compressed\n",
"\n",
"\n",
"def concat_compressed_dfs(df_base, df_new):\n",
" \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n",
" import pandas as pd\n",
" \n",
" # Combine both dataframes\n",
" df_combined = pd.concat([df_base, df_new], ignore_index=True)\n",
" \n",
" # Sort by ICAO and time\n",
" df_combined = df_combined.sort_values(['icao', 'time'])\n",
" \n",
" # Fill NaN values\n",
" df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n",
" \n",
" # Apply compression logic per ICAO to get the best row\n",
" df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n",
" \n",
" # Sort by time\n",
" df_compressed = df_compressed.sort_values('time')\n",
" \n",
" return df_compressed\n",
"\n",
"\n",
"def get_latest_aircraft_adsb_csv_df():\n",
" \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n",
" from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n",
" \n",
" import pandas as pd\n",
" import re\n",
" \n",
" csv_path = download_latest_aircraft_adsb_csv()\n",
" df = pd.read_csv(csv_path)\n",
" df = df.fillna(\"\")\n",
" \n",
" # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n",
" match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n",
" if not match:\n",
" raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n",
" \n",
" date_str = match.group(1)\n",
" return df, date_str\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e14c8363",
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime\n",
"df = load_historical_for_day(datetime(2024,1,1))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3874ba4d",
"metadata": {},
"outputs": [],
"source": [
"len(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bcae50ad",
"metadata": {},
"outputs": [],
"source": [
"df[(df['icao'] == \"008081\")]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "50921c86",
"metadata": {},
"outputs": [],
"source": [
"df[df['icao'] == \"a4e1d2\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8194d9aa",
"metadata": {},
"outputs": [],
"source": [
"df[df['r'] == \"N4131T\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1e3b7aa2",
"metadata": {},
"outputs": [],
"source": [
"df_compressed[df_compressed['icao'].duplicated(keep=False)]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "40613bc1",
"metadata": {},
"outputs": [],
"source": [
"import gzip\n",
"import json\n",
"\n",
"path = \"/Users/jonahgoode/Downloads/test_extract/traces/fb/trace_full_acbbfb.json\"\n",
"\n",
"with gzip.open(path, \"rt\", encoding=\"utf-8\") as f:\n",
" data = json.load(f)\n",
"\n",
"print(type(data))\n",
"# use `data` here\n",
"import json\n",
"print(json.dumps(data, indent=2)[:2000])\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "320109b2",
"metadata": {},
"outputs": [],
"source": [
"# First, load the JSON to inspect its structure\n",
"import json\n",
"with open(\"/Users/jonahgoode/Documents/PlaneQuery/Other-Code/readsb-protobuf/webapp/src/db/aircrafts.json\", 'r') as f:\n",
" data = json.load(f)\n",
"\n",
"# Check the structure\n",
"print(type(data))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "590134f4",
"metadata": {},
"outputs": [],
"source": [
"data['AC97E3']"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
+4 -17
View File
@@ -1,6 +1,6 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "PlaneQuery Aircraft Community Submission (v1)",
"title": "OpenAirframes Community Submission (v1)",
"type": "object",
"additionalProperties": false,
"properties": {
@@ -12,7 +12,7 @@
"type": "string",
"pattern": "^[0-9A-F]{6}$"
},
"planequery_airframe_id": {
"openairframes_id": {
"type": "string",
"minLength": 1
},
@@ -54,20 +54,7 @@
"additionalProperties": {
"$ref": "#/$defs/tagValue"
},
"properties": {
"airshows": {
"type": "boolean"
},
"cat_friendly": {
"type": "boolean"
},
"dog_friendly": {
"type": "boolean"
},
"notes": {
"type": "string"
}
}
"properties": {}
}
},
"allOf": [
@@ -85,7 +72,7 @@
},
{
"required": [
"planequery_airframe_id"
"openairframes_id"
]
}
]
+6 -6
View File
@@ -27,7 +27,7 @@ from src.adsb.compress_adsb_to_aircraft_data import compress_multi_icao_df, COLU
DEFAULT_CHUNK_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks")
FINAL_OUTPUT_DIR = "./data/planequery_aircraft"
FINAL_OUTPUT_DIR = "./data/openairframes"
os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
@@ -85,12 +85,12 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame:
"""Download base release and merge with new data."""
from src.get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv
from src.get_latest_release import download_latest_aircraft_adsb_csv
print("Downloading base ADS-B release...")
try:
base_path = download_latest_aircraft_adsb_csv(
output_dir="./data/planequery_aircraft_base"
output_dir="./data/openairframes_base"
)
print(f"Download returned: {base_path}")
@@ -176,10 +176,10 @@ def main():
if args.start_date and args.end_date:
# Historical mode
output_id = f"{args.start_date}_{args.end_date}"
output_filename = f"planequery_aircraft_adsb_{args.start_date}_{args.end_date}.csv"
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv"
print(f"Combining chunks for date range: {args.start_date} to {args.end_date}")
else:
# Daily mode
# Daily mode - use same date for start and end
if args.date:
target_day = datetime.strptime(args.date, "%Y-%m-%d")
else:
@@ -187,7 +187,7 @@ def main():
date_str = target_day.strftime("%Y-%m-%d")
output_id = date_str
output_filename = f"planequery_aircraft_adsb_{date_str}.csv"
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv"
print(f"Combining chunks for {date_str}")
chunks_dir = args.chunks_dir
+3 -3
View File
@@ -253,7 +253,7 @@ def concat_compressed_dfs(df_base, df_new):
def get_latest_aircraft_adsb_csv_df():
"""Download and load the latest ADS-B CSV from GitHub releases."""
from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv
from get_latest_release import download_latest_aircraft_adsb_csv
import re
csv_path = download_latest_aircraft_adsb_csv()
@@ -264,8 +264,8 @@ def get_latest_aircraft_adsb_csv_df():
if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null(""))
# Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv
match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
+13 -5
View File
@@ -82,7 +82,8 @@ def fetch_releases(version_date: str) -> list:
if version_date == "v2024.12.31":
year = "2025"
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
PATTERN = f"{version_date}-planes-readsb-prod-0"
# Match exact release name, exclude tmp releases
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+$"
releases = []
page = 1
@@ -187,19 +188,23 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
cat_proc = subprocess.Popen(
["cat"] + file_paths,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
stderr=subprocess.PIPE
)
tar_cmd = ["tar", "xf", "-", "-C", extract_dir, "--strip-components=1"]
subprocess.run(
result = subprocess.run(
tar_cmd,
stdin=cat_proc.stdout,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True
)
cat_proc.stdout.close()
cat_stderr = cat_proc.stderr.read().decode() if cat_proc.stderr else ""
cat_proc.wait()
if cat_stderr:
print(f"cat stderr: {cat_stderr}")
print(f"Successfully extracted archive to {extract_dir}")
# Delete tar files immediately after extraction
@@ -217,7 +222,10 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
return True
except subprocess.CalledProcessError as e:
stderr_output = e.stderr.decode() if e.stderr else ""
print(f"Failed to extract split archive: {e}")
if stderr_output:
print(f"tar stderr: {stderr_output}")
return False
+2 -2
View File
@@ -76,8 +76,8 @@ def main():
print(f"After dedup: {df_accumulated.height} rows")
# Write and upload final result
output_name = f"planequery_aircraft_adsb_{global_start}_{global_end}.csv.gz"
csv_output = Path(f"/tmp/planequery_aircraft_adsb_{global_start}_{global_end}.csv")
output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz"
csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv")
gz_output = Path(f"/tmp/{output_name}")
df_accumulated.write_csv(csv_output)
@@ -17,7 +17,7 @@ import pandas as pd
COMMUNITY_DIR = Path(__file__).parent.parent.parent / "community"
OUT_ROOT = Path("data/planequery_aircraft")
OUT_ROOT = Path("data/openairframes")
def read_all_submissions(community_dir: Path) -> list[dict]:
@@ -47,7 +47,7 @@ def submissions_to_dataframe(submissions: list[dict]) -> pd.DataFrame:
- creation_timestamp (first)
- transponder_code_hex
- registration_number
- planequery_airframe_id
- openairframes_id
- contributor_name
- [other columns alphabetically]
- contributor_uuid (last)
@@ -62,7 +62,7 @@ def submissions_to_dataframe(submissions: list[dict]) -> pd.DataFrame:
"creation_timestamp",
"transponder_code_hex",
"registration_number",
"planequery_airframe_id",
"openairframes_id",
"contributor_name",
"contributor_uuid",
]
@@ -78,7 +78,7 @@ def submissions_to_dataframe(submissions: list[dict]) -> pd.DataFrame:
"creation_timestamp",
"transponder_code_hex",
"registration_number",
"planequery_airframe_id",
"openairframes_id",
"contributor_name",
]
last_cols = ["contributor_uuid"]
@@ -108,7 +108,7 @@ def main():
"creation_timestamp",
"transponder_code_hex",
"registration_number",
"planequery_airframe_id",
"openairframes_id",
"contributor_name",
"tags",
"contributor_uuid",
@@ -127,7 +127,7 @@ def main():
# Output
OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"planequery_aircraft_community_{start_date_str}_{date_str}.csv"
output_file = OUT_ROOT / f"openairframes_community_{start_date_str}_{date_str}.csv"
df.to_csv(output_file, index=False)
+2 -2
View File
@@ -112,8 +112,8 @@ def group_by_identifier(submissions: list[dict]) -> dict[str, list[dict]]:
key = f"reg:{submission['registration_number']}"
elif "transponder_code_hex" in submission:
key = f"icao:{submission['transponder_code_hex']}"
elif "planequery_airframe_id" in submission:
key = f"id:{submission['planequery_airframe_id']}"
elif "openairframes_id" in submission:
key = f"id:{submission['openairframes_id']}"
else:
key = "_unknown"
+1 -1
View File
@@ -111,7 +111,7 @@ def download_github_attachment(url: str) -> str | None:
import urllib.error
try:
req = urllib.request.Request(url, headers={"User-Agent": "PlaneQuery-Bot"})
req = urllib.request.Request(url, headers={"User-Agent": "OpenAirframes-Bot"})
with urllib.request.urlopen(req, timeout=30) as response:
return response.read().decode("utf-8")
except (urllib.error.URLError, urllib.error.HTTPError, UnicodeDecodeError) as e:
@@ -74,10 +74,10 @@ if __name__ == '__main__':
)
# Save the result
OUT_ROOT = Path("data/planequery_aircraft")
OUT_ROOT = Path("data/openairframes")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{date_str}.csv"
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv"
df_combined.write_csv(output_file)
print(f"Saved: {output_file}")
+49
View File
@@ -0,0 +1,49 @@
from pathlib import Path
from datetime import datetime, timezone, timedelta
import argparse
parser = argparse.ArgumentParser(description="Create daily FAA release")
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today)")
args = parser.parse_args()
if args.date:
date_str = args.date
else:
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
out_dir = Path("data/faa_releasable")
out_dir.mkdir(parents=True, exist_ok=True)
zip_name = f"ReleasableAircraft_{date_str}.zip"
zip_path = out_dir / zip_name
if not zip_path.exists():
# URL and paths
url = "https://registry.faa.gov/database/ReleasableAircraft.zip"
from urllib.request import Request, urlopen
req = Request(
url,
headers={"User-Agent": "Mozilla/5.0"},
method="GET",
)
with urlopen(req, timeout=120) as r:
body = r.read()
zip_path.write_bytes(body)
OUT_ROOT = Path("data/openairframes")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df
from get_latest_release import get_latest_aircraft_faa_csv_df
df_new = convert_faa_master_txt_to_df(zip_path, date_str)
try:
df_base, start_date_str = get_latest_aircraft_faa_csv_df()
df_base = concat_faa_historical_df(df_base, df_new)
assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing"
except Exception as e:
print(f"No existing FAA release found, using only new data: {e}")
df_base = df_new
start_date_str = date_str
df_base.to_csv(OUT_ROOT / f"openairframes_faa_{start_date_str}_{date_str}.csv", index=False)
@@ -1,33 +0,0 @@
from pathlib import Path
from datetime import datetime, timezone
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
out_dir = Path("data/faa_releasable")
out_dir.mkdir(parents=True, exist_ok=True)
zip_name = f"ReleasableAircraft_{date_str}.zip"
zip_path = out_dir / zip_name
if not zip_path.exists():
# URL and paths
url = "https://registry.faa.gov/database/ReleasableAircraft.zip"
from urllib.request import Request, urlopen
req = Request(
url,
headers={"User-Agent": "Mozilla/5.0"},
method="GET",
)
with urlopen(req, timeout=120) as r:
body = r.read()
zip_path.write_bytes(body)
OUT_ROOT = Path("data/planequery_aircraft")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df
from get_latest_planequery_aircraft_release import get_latest_aircraft_faa_csv_df
df_new = convert_faa_master_txt_to_df(zip_path, date_str)
df_base, start_date_str = get_latest_aircraft_faa_csv_df()
df_base = concat_faa_historical_df(df_base, df_new)
assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing"
df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date_str}_{date_str}.csv", index=False)
+5 -5
View File
@@ -29,8 +29,8 @@ def convert_faa_master_txt_to_df(zip_path: Path, date: str):
certification = pd.json_normalize(df["certification"].where(df["certification"].notna(), {})).add_prefix("certificate_")
df = df.drop(columns="certification").join(certification)
# Create planequery_airframe_id
df["planequery_airframe_id"] = (
# Create openairframes_id
df["openairframes_id"] = (
normalize(df["aircraft_manufacturer"])
+ "|"
+ normalize(df["aircraft_model"])
@@ -38,11 +38,11 @@ def convert_faa_master_txt_to_df(zip_path: Path, date: str):
+ normalize(df["serial_number"])
)
# Move planequery_airframe_id to come after registration_number
# Move openairframes_id to come after registration_number
cols = df.columns.tolist()
cols.remove("planequery_airframe_id")
cols.remove("openairframes_id")
reg_idx = cols.index("registration_number")
cols.insert(reg_idx + 1, "planequery_airframe_id")
cols.insert(reg_idx + 1, "openairframes_id")
df = df[cols]
# Convert all NaN to empty strings
@@ -9,7 +9,7 @@ import urllib.error
import json
REPO = "PlaneQuery/planequery-aircraft"
REPO = "PlaneQuery/openairframes"
LATEST_RELEASE_URL = f"https://api.github.com/repos/{REPO}/releases/latest"
@@ -31,7 +31,7 @@ def get_latest_release_assets(repo: str = REPO, github_token: Optional[str] = No
url = f"https://api.github.com/repos/{repo}/releases/latest"
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "planequery-aircraft-downloader/1.0",
"User-Agent": "openairframes-downloader/1.0",
}
if github_token:
headers["Authorization"] = f"Bearer {github_token}"
@@ -80,7 +80,7 @@ def download_asset(asset: ReleaseAsset, out_path: Path, github_token: Optional[s
out_path.parent.mkdir(parents=True, exist_ok=True)
headers = {
"User-Agent": "planequery-aircraft-downloader/1.0",
"User-Agent": "openairframes-downloader/1.0",
"Accept": "application/octet-stream",
}
if github_token:
@@ -109,7 +109,7 @@ def download_latest_aircraft_csv(
repo: str = REPO,
) -> Path:
"""
Download the latest planequery_aircraft_faa_*.csv file from the latest GitHub release.
Download the latest openairframes_faa_*.csv file from the latest GitHub release.
Args:
output_dir: Directory to save the downloaded file (default: "downloads")
@@ -121,10 +121,10 @@ def download_latest_aircraft_csv(
"""
assets = get_latest_release_assets(repo, github_token=github_token)
try:
asset = pick_asset(assets, name_regex=r"^planequery_aircraft_faa_.*\.csv$")
asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$")
except FileNotFoundError:
# Fallback to old naming pattern
asset = pick_asset(assets, name_regex=r"^planequery_aircraft_\d{4}-\d{2}-\d{2}_.*\.csv$")
asset = pick_asset(assets, name_regex=r"^openairframes_\d{4}-\d{2}-\d{2}_.*\.csv$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
@@ -136,11 +136,11 @@ def get_latest_aircraft_faa_csv_df():
'unique_regulatory_id': str,
'registrant_county': str})
df = df.fillna("")
# Extract start date from filename pattern: planequery_aircraft_faa_{start_date}_{end_date}.csv
match = re.search(r"planequery_aircraft_faa_(\d{4}-\d{2}-\d{2})_", str(csv_path))
# Extract start date from filename pattern: openairframes_faa_{start_date}_{end_date}.csv
match = re.search(r"openairframes_faa_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
# Fallback to old naming pattern: planequery_aircraft_{start_date}_{end_date}.csv
match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path))
# Fallback to old naming pattern: openairframes_{start_date}_{end_date}.csv
match = re.search(r"openairframes_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
@@ -154,7 +154,7 @@ def download_latest_aircraft_adsb_csv(
repo: str = REPO,
) -> Path:
"""
Download the latest planequery_aircraft_adsb_*.csv file from the latest GitHub release.
Download the latest openairframes_adsb_*.csv file from the latest GitHub release.
Args:
output_dir: Directory to save the downloaded file (default: "downloads")
@@ -165,7 +165,7 @@ def download_latest_aircraft_adsb_csv(
Path to the downloaded file
"""
assets = get_latest_release_assets(repo, github_token=github_token)
asset = pick_asset(assets, name_regex=r"^planequery_aircraft_adsb_.*\.csv$")
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
@@ -176,8 +176,8 @@ def get_latest_aircraft_adsb_csv_df():
import pandas as pd
df = pd.read_csv(csv_path)
df = df.fillna("")
# Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv
match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
-90
View File
@@ -1,90 +0,0 @@
"""
Generate Step Functions input and start the pipeline.
Usage:
python trigger_pipeline.py 2024-01-01 2025-01-01
python trigger_pipeline.py 2024-01-01 2025-01-01 --chunk-days 30
python trigger_pipeline.py 2024-01-01 2025-01-01 --dry-run
"""
import argparse
import json
import os
import uuid
from datetime import datetime, timedelta
import boto3
def generate_chunks(start_date: str, end_date: str, chunk_days: int = 1):
"""Split a date range into chunks of chunk_days."""
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
chunks = []
current = start
while current < end:
chunk_end = min(current + timedelta(days=chunk_days), end)
chunks.append({
"start_date": current.strftime("%Y-%m-%d"),
"end_date": chunk_end.strftime("%Y-%m-%d"),
})
current = chunk_end
return chunks
def main():
parser = argparse.ArgumentParser(description="Trigger ADS-B map-reduce pipeline")
parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)")
parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)")
parser.add_argument("--chunk-days", type=int, default=1,
help="Days per chunk (default: 1)")
parser.add_argument("--dry-run", action="store_true",
help="Print input JSON without starting execution")
args = parser.parse_args()
run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}"
chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days)
# Inject run_id into each chunk
for chunk in chunks:
chunk["run_id"] = run_id
sfn_input = {
"run_id": run_id,
"global_start_date": args.start_date,
"global_end_date": args.end_date,
"chunks": chunks,
}
print(f"Run ID: {run_id}")
print(f"Chunks: {len(chunks)} (at {args.chunk_days} days each)")
print(f"Max concurrency: 3 (enforced by Step Functions Map state)")
print()
print(json.dumps(sfn_input, indent=2))
if args.dry_run:
print("\n--dry-run: not starting execution")
return
client = boto3.client("stepfunctions")
# Find the state machine ARN
machines = client.list_state_machines()["stateMachines"]
arn = next(
m["stateMachineArn"]
for m in machines
if m["name"] == "adsb-map-reduce"
)
response = client.start_execution(
stateMachineArn=arn,
name=run_id,
input=json.dumps(sfn_input),
)
print(f"\nStarted execution: {response['executionArn']}")
if __name__ == "__main__":
main()