Compare commits

...

26 Commits

Author SHA1 Message Date
ggman12 a8b2b66952 fix .csv to .csv.gz transition 2026-02-15 19:08:51 -05:00
ggman12 3f38263a0c stop depue that destroys previous days 2026-02-15 17:55:16 -05:00
ggman12 1a553d5f44 use date of file instead of min timestamp 2026-02-15 16:44:09 -05:00
ggman12 405855c566 deal with whole schema 2026-02-15 16:43:00 -05:00
ggman12 4e81dde201 fix date parsing 2026-02-15 14:55:32 -05:00
ggman12 fde8ef029c update csv writing to handle empty data. Save space with higher gzip compression 2026-02-15 14:14:54 -05:00
ggman12 18ab51bd60 update naming 2026-02-15 13:45:03 -05:00
ggman12 83b9d2a76d write gzip 2026-02-15 13:41:09 -05:00
ggman12 8874619ab0 write gzip 2026-02-15 13:41:02 -05:00
ggman12 823f291728 fix errors in daily release due to new .gz file 2026-02-15 13:21:51 -05:00
ggman12 982011b36f end of year check 2026-02-14 22:42:32 -05:00
ggman12 1b15e43669 use .csv.gz 2026-02-14 22:22:14 -05:00
ggman12 f17adc4574 remvoe aws worker, reducer 2026-02-14 22:21:14 -05:00
ggman12 6a250a63fb fix None value comparision 2026-02-14 20:21:32 -05:00
ggman12 9e24fcbc63 update integrity checker. Hopefully solve issue. 2026-02-14 19:56:25 -05:00
ggman12 8ce04f1f83 Revert "update for historical run"
This reverts commit ccf55b2308.
2026-02-14 18:44:21 -05:00
ggman12 9441761ac9 use temp release too. 2026-02-14 18:43:25 -05:00
ggman12 ccf55b2308 update for historical run 2026-02-14 15:57:16 -05:00
ggman12 76eaf118ef add run_local.py 2026-02-14 15:54:36 -05:00
ggman12 0fcbad0fbc let mictronics retry 2026-02-14 15:07:08 -05:00
ggman12 0c7484e7bf create_daily_microtonics release 2026-02-13 22:19:02 -05:00
ggman12 8c60ac611d create daily adsbexchange database snapshot release 2026-02-13 22:19:02 -05:00
ggman12 145f1006be update template 2026-02-13 12:12:24 -05:00
ggman12 f5465f0552 update .github/workflows/update-community-prs.yaml 2026-02-13 12:00:10 -05:00
ggman12 17098ae39a fix update-community-prs.yaml 2026-02-13 11:52:53 -05:00
ggman12 6f6b65780a update community_submission.yaml. update Readme.md 2026-02-13 11:49:18 -05:00
19 changed files with 549 additions and 319 deletions
@@ -8,8 +8,8 @@ body:
- type: markdown
attributes:
value: |
Submit **one object** or an **array of objects** that matches the community submission schema.
Submit **one object** or an **array of objects** that matches the community submission [schema](https://github.com/PlaneQuery/OpenAirframes/blob/main/schemas/community_submission.v1.schema.json). Reuse existing tags from the schema when possible.
**Rules (enforced on review/automation):**
- Each object must include **at least one** of:
- `registration_number`
@@ -27,7 +27,7 @@ body:
```json
{
"registration_number": "N12345",
"tags": {"owner": "John Doe"},
"tags": {"owner": "John Doe", "photo": "https://example.com/photo.jpg"},
"start_date": "2025-01-01"
}
```
+33 -15
View File
@@ -95,20 +95,27 @@ jobs:
# Verify tar integrity
tar -tf extracted_data.tar > /dev/null && echo "Tar integrity check passed" || { echo "Tar integrity check FAILED"; exit 1; }
# Create checksum of the FULL tar before splitting (for verification after reassembly)
echo "=== Creating checksum of full tar ==="
sha256sum extracted_data.tar > full_tar.sha256
cat full_tar.sha256
# Record tar size and checksum for verification after reassembly
echo "=== Recording tar metadata ==="
ORIGINAL_SIZE=$(stat --format=%s extracted_data.tar)
ORIGINAL_SHA=$(sha256sum extracted_data.tar | awk '{print $1}')
echo "Size: $ORIGINAL_SIZE"
echo "SHA256: $ORIGINAL_SHA"
# Split into 500MB chunks to avoid artifact upload issues
echo "=== Splitting tar into 500MB chunks ==="
mkdir -p tar_chunks
split -b 500M extracted_data.tar tar_chunks/extracted_data.tar.part_
rm extracted_data.tar
mv full_tar.sha256 tar_chunks/
# Write metadata file (plain text so artifact upload won't skip it)
echo "$ORIGINAL_SHA extracted_data.tar" > tar_chunks/checksum.txt
echo "$ORIGINAL_SIZE" >> tar_chunks/checksum.txt
echo "=== Chunks created ==="
ls -lah tar_chunks/
echo "=== Checksum file ==="
cat tar_chunks/checksum.txt
else
echo "ERROR: No extracted directories found, cannot create tar"
exit 1
@@ -179,19 +186,30 @@ jobs:
echo "=== Reassembled tar file info ==="
ls -lah extracted_data.tar
# Verify checksum of reassembled tar matches original
echo "=== Verifying reassembled tar checksum ==="
echo "Original checksum:"
cat tar_chunks/full_tar.sha256
echo "Reassembled checksum:"
sha256sum extracted_data.tar
sha256sum -c tar_chunks/full_tar.sha256 || { echo "ERROR: Reassembled tar checksum mismatch - data corrupted during transfer"; exit 1; }
echo "Checksum verified - data integrity confirmed"
# Verify integrity
echo "=== Verifying reassembled tar ==="
if [ -f tar_chunks/checksum.txt ]; then
EXPECTED_SHA=$(head -1 tar_chunks/checksum.txt | awk '{print $1}')
EXPECTED_SIZE=$(sed -n '2p' tar_chunks/checksum.txt)
ACTUAL_SHA=$(sha256sum extracted_data.tar | awk '{print $1}')
ACTUAL_SIZE=$(stat --format=%s extracted_data.tar)
echo "Expected: SHA=$EXPECTED_SHA Size=$EXPECTED_SIZE"
echo "Actual: SHA=$ACTUAL_SHA Size=$ACTUAL_SIZE"
if [ "$EXPECTED_SHA" != "$ACTUAL_SHA" ] || [ "$EXPECTED_SIZE" != "$ACTUAL_SIZE" ]; then
echo "ERROR: Reassembled tar does not match original - data corrupted during transfer"
exit 1
fi
echo "Checksum and size verified"
else
echo "WARNING: No checksum file found, falling back to tar integrity check"
tar -tf extracted_data.tar > /dev/null || { echo "ERROR: Tar file is corrupted"; exit 1; }
echo "Tar integrity check passed"
fi
rm -rf tar_chunks
echo "=== Extracting ==="
tar -xvf extracted_data.tar
tar -xf extracted_data.tar
rm extracted_data.tar
echo "has_data=true" >> "$GITHUB_OUTPUT"
echo "=== Contents of data/output ==="
@@ -264,5 +282,5 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }}
path: data/openairframes/*.csv
path: data/openairframes/*.csv.gz
retention-days: 30
@@ -227,7 +227,7 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: adsb-release
path: data/openairframes/openairframes_adsb_*.csv
path: data/openairframes/openairframes_adsb_*.csv.gz
retention-days: 1
build-community:
@@ -261,10 +261,64 @@ jobs:
path: data/openairframes/openairframes_community_*.csv
retention-days: 1
build-adsbexchange-json:
runs-on: ubuntu-latest
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Run ADS-B Exchange JSON release script
run: |
python -m src.contributions.create_daily_adsbexchange_release ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/openairframes
- name: Upload ADS-B Exchange JSON artifact
uses: actions/upload-artifact@v4
with:
name: adsbexchange-json
path: data/openairframes/basic-ac-db_*.json.gz
retention-days: 1
build-mictronics-db:
runs-on: ubuntu-latest
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Run Mictronics DB release script
continue-on-error: true
run: |
python -m src.contributions.create_daily_microtonics_release ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/openairframes
- name: Upload Mictronics DB artifact
uses: actions/upload-artifact@v4
with:
name: mictronics-db
path: data/openairframes/mictronics-db_*.zip
retention-days: 1
if-no-files-found: ignore
create-release:
runs-on: ubuntu-latest
needs: [build-faa, adsb-reduce, build-community]
if: github.event_name != 'schedule'
needs: [build-faa, adsb-reduce, build-community, build-adsbexchange-json, build-mictronics-db]
if: github.event_name != 'schedule' && !failure() && !cancelled()
steps:
- name: Checkout for gh CLI
uses: actions/checkout@v4
@@ -291,6 +345,19 @@ jobs:
name: community-release
path: artifacts/community
- name: Download ADS-B Exchange JSON artifact
uses: actions/download-artifact@v4
with:
name: adsbexchange-json
path: artifacts/adsbexchange
- name: Download Mictronics DB artifact
uses: actions/download-artifact@v4
continue-on-error: true
with:
name: mictronics-db
path: artifacts/mictronics
- name: Debug artifact structure
run: |
echo "=== Full artifacts tree ==="
@@ -301,6 +368,10 @@ jobs:
find artifacts/adsb -type f 2>/dev/null || echo "No files found in artifacts/adsb"
echo "=== Community artifacts ==="
find artifacts/community -type f 2>/dev/null || echo "No files found in artifacts/community"
echo "=== ADS-B Exchange JSON artifacts ==="
find artifacts/adsbexchange -type f 2>/dev/null || echo "No files found in artifacts/adsbexchange"
echo "=== Mictronics DB artifacts ==="
find artifacts/mictronics -type f 2>/dev/null || echo "No files found in artifacts/mictronics"
- name: Prepare release metadata
id: meta
@@ -317,9 +388,11 @@ jobs:
# Find files from artifacts using find (handles nested structures)
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1)
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1)
ZIP_FILE_MICTRONICS=$(find artifacts/mictronics -name "mictronics-db_*.zip" -type f 2>/dev/null | head -1)
# Validate required files exist
MISSING_FILES=""
@@ -332,12 +405,24 @@ jobs:
if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then
MISSING_FILES="$MISSING_FILES FAA_ZIP"
fi
if [ -z "$JSON_FILE_ADSBX" ] || [ ! -f "$JSON_FILE_ADSBX" ]; then
MISSING_FILES="$MISSING_FILES ADSBX_JSON"
fi
# Optional files - warn but don't fail
OPTIONAL_MISSING=""
if [ -z "$ZIP_FILE_MICTRONICS" ] || [ ! -f "$ZIP_FILE_MICTRONICS" ]; then
OPTIONAL_MISSING="$OPTIONAL_MISSING MICTRONICS_ZIP"
ZIP_FILE_MICTRONICS=""
fi
if [ -n "$MISSING_FILES" ]; then
echo "ERROR: Missing required release files:$MISSING_FILES"
echo "FAA CSV: $CSV_FILE_FAA"
echo "ADSB CSV: $CSV_FILE_ADSB"
echo "ZIP: $ZIP_FILE"
echo "ADSBX JSON: $JSON_FILE_ADSBX"
echo "MICTRONICS ZIP: $ZIP_FILE_MICTRONICS"
exit 1
fi
@@ -346,6 +431,15 @@ jobs:
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
ZIP_BASENAME=$(basename "$ZIP_FILE")
JSON_BASENAME_ADSBX=$(basename "$JSON_FILE_ADSBX")
ZIP_BASENAME_MICTRONICS=""
if [ -n "$ZIP_FILE_MICTRONICS" ]; then
ZIP_BASENAME_MICTRONICS=$(basename "$ZIP_FILE_MICTRONICS")
fi
if [ -n "$OPTIONAL_MISSING" ]; then
echo "WARNING: Optional files missing:$OPTIONAL_MISSING (will continue without them)"
fi
echo "date=$DATE" >> "$GITHUB_OUTPUT"
echo "tag=$TAG" >> "$GITHUB_OUTPUT"
@@ -357,6 +451,10 @@ jobs:
echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT"
echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT"
echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT"
echo "json_file_adsbx=$JSON_FILE_ADSBX" >> "$GITHUB_OUTPUT"
echo "json_basename_adsbx=$JSON_BASENAME_ADSBX" >> "$GITHUB_OUTPUT"
echo "zip_file_mictronics=$ZIP_FILE_MICTRONICS" >> "$GITHUB_OUTPUT"
echo "zip_basename_mictronics=$ZIP_BASENAME_MICTRONICS" >> "$GITHUB_OUTPUT"
echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
echo "Found files:"
@@ -364,6 +462,8 @@ jobs:
echo " ADSB CSV: $CSV_FILE_ADSB"
echo " Community CSV: $CSV_FILE_COMMUNITY"
echo " ZIP: $ZIP_FILE"
echo " ADSBX JSON: $JSON_FILE_ADSBX"
echo " MICTRONICS ZIP: $ZIP_FILE_MICTRONICS"
- name: Delete existing release if exists
run: |
@@ -377,7 +477,7 @@ jobs:
with:
tag_name: ${{ steps.meta.outputs.tag }}
name: ${{ steps.meta.outputs.name }}
fail_on_unmatched_files: true
fail_on_unmatched_files: false
body: |
Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}.
@@ -386,10 +486,14 @@ jobs:
- ${{ steps.meta.outputs.csv_basename_adsb }}
- ${{ steps.meta.outputs.csv_basename_community }}
- ${{ steps.meta.outputs.zip_basename }}
- ${{ steps.meta.outputs.json_basename_adsbx }}
${{ steps.meta.outputs.zip_basename_mictronics && format('- {0}', steps.meta.outputs.zip_basename_mictronics) || '' }}
files: |
${{ steps.meta.outputs.csv_file_faa }}
${{ steps.meta.outputs.csv_file_adsb }}
${{ steps.meta.outputs.csv_file_community }}
${{ steps.meta.outputs.zip_file }}
${{ steps.meta.outputs.json_file_adsbx }}
${{ steps.meta.outputs.zip_file_mictronics }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+41 -18
View File
@@ -48,29 +48,52 @@ jobs:
git fetch origin "$branch_name"
git checkout "$branch_name"
# Merge main into PR branch
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
if git merge origin/main -m "Merge main to update schema"; then
# Regenerate schema for this PR's submission (adds any new tags)
python -m src.contributions.regenerate_pr_schema || true
# If there are changes, commit and push
if [ -n "$(git status --porcelain schemas/)" ]; then
git add schemas/
git commit -m "Update schema with new tags"
git push origin "$branch_name"
echo " Updated PR #$pr_number with schema changes"
else
git push origin "$branch_name"
echo " Merged main into PR #$pr_number"
# Get the community submission file(s) and schema from this branch
community_files=$(git diff --name-only origin/main...HEAD -- 'community/' 'schemas/')
if [ -z "$community_files" ]; then
echo " No community/schema files found in PR #$pr_number, skipping"
git checkout main
continue
fi
echo " Files to preserve: $community_files"
# Save the community files content
mkdir -p /tmp/pr_files
for file in $community_files; do
if [ -f "$file" ]; then
mkdir -p "/tmp/pr_files/$(dirname "$file")"
cp "$file" "/tmp/pr_files/$file"
fi
done
# Reset branch to main (clean slate)
git reset --hard origin/main
# Restore the community files
for file in $community_files; do
if [ -f "/tmp/pr_files/$file" ]; then
mkdir -p "$(dirname "$file")"
cp "/tmp/pr_files/$file" "$file"
fi
done
rm -rf /tmp/pr_files
# Regenerate schema with current main + this submission's tags
python -m src.contributions.regenerate_pr_schema || true
# Stage and commit all changes
git add community/ schemas/
if ! git diff --cached --quiet; then
git commit -m "Community submission (rebased on main)"
git push --force origin "$branch_name"
echo " Rebased PR #$pr_number onto main"
else
echo " Merge conflict in PR #$pr_number, adding comment"
gh pr comment "$pr_number" --body $'⚠️ **Merge Conflict**\n\nAnother community submission was merged and this PR has conflicts.\n\nA maintainer may need to:\n1. Close this PR\n2. Remove the `approved` label from the original issue\n3. Re-add the `approved` label to regenerate the PR'
git merge --abort
fi
echo " No changes needed for PR #$pr_number"
fi
git checkout main
+3 -2
View File
@@ -20,7 +20,7 @@ A daily release is created at **06:00 UTC** and includes:
All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB)
- **openairframes_adsb.csv**
Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present. The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB).
Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present (will be from 2024-01-01 soon). The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB).
- **ReleasableAircraft_{date}.zip**
A daily snapshot of the FAA database, which updates at **05:30 UTC**
@@ -43,7 +43,8 @@ Please try to follow the submission formatting guidelines. If you are struggling
## For Developers
All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include:
- Web UI
- Web UI for data
- Web UI for contributors
- Additional export formats in the daily release
- Data fusion from multiple sources in the daily release
- Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs
-11
View File
@@ -1,11 +0,0 @@
FROM --platform=linux/arm64 python:3.12-slim
WORKDIR /app
COPY requirements.reducer.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY reducer.py .
CMD ["python", "-u", "reducer.py"]
-12
View File
@@ -1,12 +0,0 @@
FROM --platform=linux/arm64 python:3.12-slim
WORKDIR /app
COPY requirements.worker.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY download_adsb_data_to_parquet.py .
COPY worker.py .
CMD ["python", "-u", "worker.py"]
+65 -54
View File
@@ -14,11 +14,12 @@ Usage:
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base
"""
import gc
import gzip
import os
import sys
import glob
import argparse
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import polars as pl
@@ -33,7 +34,7 @@ os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
def get_target_day() -> datetime:
"""Get yesterday's date (the day we're processing)."""
return datetime.utcnow() - timedelta(days=1)
return datetime.now(timezone.utc) - timedelta(days=1)
def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame:
@@ -83,58 +84,53 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram
return combined
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame:
"""Download base release and merge with new data."""
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.DataFrame, str | None]:
"""Download base release and merge with new data.
Returns:
Tuple of (merged_df, earliest_date_str) where earliest_date_str is None if no base release was merged
"""
from src.get_latest_release import download_latest_aircraft_adsb_csv
print("Downloading base ADS-B release...")
try:
base_path = download_latest_aircraft_adsb_csv(
output_dir="./data/openairframes_base"
)
print(f"Download returned: {base_path}")
if base_path and os.path.exists(str(base_path)):
print(f"Loading base release from {base_path}")
base_df = pl.read_csv(base_path)
print(f"Base release has {len(base_df)} records")
# Ensure columns match
base_cols = set(base_df.columns)
new_cols = set(compressed_df.columns)
print(f"Base columns: {sorted(base_cols)}")
print(f"New columns: {sorted(new_cols)}")
# Add missing columns
for col in new_cols - base_cols:
base_df = base_df.with_columns(pl.lit(None).alias(col))
for col in base_cols - new_cols:
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
# Reorder columns to match
compressed_df = compressed_df.select(base_df.columns)
# Concat and deduplicate by icao (keep new data - it comes last)
combined = pl.concat([base_df, compressed_df])
print(f"After concat: {len(combined)} records")
deduplicated = combined.unique(subset=["icao"], keep="last")
print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup")
del base_df, combined
gc.collect()
return deduplicated
else:
print(f"No base release found at {base_path}, using only new data")
return compressed_df
except Exception as e:
import traceback
print(f"Failed to download base release: {e}")
traceback.print_exc()
return compressed_df
base_path = download_latest_aircraft_adsb_csv(
output_dir="./data/openairframes_base"
)
print(f"Download returned: {base_path}")
print(f"Loading base release from {base_path}")
# Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz)
import re
filename = os.path.basename(str(base_path))
match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename)
earliest_date = match.group(1) if match else None
print(f"Start date from base filename: {earliest_date}")
# Read CSV with schema matching the new data
base_df = pl.read_csv(base_path, schema=compressed_df.schema)
print(f"Base release has {len(base_df)} records")
# Ensure columns match
base_cols = set(base_df.columns)
new_cols = set(compressed_df.columns)
print(f"Base columns: {sorted(base_cols)}")
print(f"New columns: {sorted(new_cols)}")
# Add missing columns
for col in new_cols - base_cols:
base_df = base_df.with_columns(pl.lit(None).alias(col))
for col in base_cols - new_cols:
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
# Reorder columns to match
compressed_df = compressed_df.select(base_df.columns)
# Concat and deduplicate by icao (keep new data - it comes last)
combined = pl.concat([base_df, compressed_df])
print(f"After concat: {len(combined)} records")
return combined, earliest_date
def cleanup_chunks(output_id: str, chunks_dir: str):
"""Delete chunk parquet files after successful merge."""
@@ -176,7 +172,7 @@ def main():
if args.start_date and args.end_date:
# Historical mode
output_id = f"{args.start_date}_{args.end_date}"
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv"
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv.gz"
print(f"Combining chunks for date range: {args.start_date} to {args.end_date}")
else:
# Daily mode - use same date for start and end
@@ -187,7 +183,7 @@ def main():
date_str = target_day.strftime("%Y-%m-%d")
output_id = date_str
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv"
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv.gz"
print(f"Combining chunks for {date_str}")
chunks_dir = args.chunks_dir
@@ -220,8 +216,15 @@ def main():
print(f"After combining: {get_resource_usage()}")
# Merge with base release (unless skipped)
base_start_date = None
if not args.skip_base:
combined = download_and_merge_base_release(combined)
combined, base_start_date = download_and_merge_base_release(combined)
# Update filename if we merged with base release and got a start date
if base_start_date and not (args.start_date and args.end_date):
# Only update filename for daily mode when base was merged
output_filename = f"openairframes_adsb_{base_start_date}_{date_str}.csv.gz"
print(f"Updated filename to reflect date range: {output_filename}")
# Convert list columns to strings for CSV compatibility
for col in combined.columns:
@@ -234,9 +237,17 @@ def main():
if 'time' in combined.columns:
combined = combined.sort('time')
# Replace empty strings with null across all string columns to avoid quoted empty strings
for col in combined.columns:
if combined[col].dtype == pl.Utf8:
combined = combined.with_columns(
pl.when(pl.col(col) == "").then(None).otherwise(pl.col(col)).alias(col)
)
# Write final CSV
output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename)
combined.write_csv(output_path)
with gzip.open(output_path, "wb", compresslevel=9) as f:
combined.write_csv(f, null_value='', quote_style='necessary')
print(f"Wrote {len(combined)} records to {output_path}")
# Cleanup
+1 -1
View File
@@ -264,7 +264,7 @@ def get_latest_aircraft_adsb_csv_df():
if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null(""))
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
+28 -7
View File
@@ -76,14 +76,10 @@ def timeout_handler(signum, frame):
raise DownloadTimeoutException("Download timed out after 40 seconds")
def fetch_releases(version_date: str) -> list:
"""Fetch GitHub releases for a given version date from adsblol."""
year = version_date.split('.')[0][1:]
if version_date == "v2024.12.31":
year = "2025"
def _fetch_releases_from_repo(year: str, version_date: str) -> list:
"""Fetch GitHub releases for a given version date from a specific year's adsblol repo."""
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
# Match exact release name, exclude tmp releases
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+$"
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+(tmp)?$"
releases = []
page = 1
@@ -123,6 +119,25 @@ def fetch_releases(version_date: str) -> list:
return releases
def fetch_releases(version_date: str) -> list:
"""Fetch GitHub releases for a given version date from adsblol.
For Dec 31 dates, if no releases are found in the current year's repo,
also checks the next year's repo (adsblol sometimes publishes Dec 31
data in the following year's repository).
"""
year = version_date.split('.')[0][1:]
releases = _fetch_releases_from_repo(year, version_date)
# For last day of year, also check next year's repo if nothing found
if not releases and version_date.endswith(".12.31"):
next_year = str(int(year) + 1)
print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo...")
releases = _fetch_releases_from_repo(next_year, version_date)
return releases
def download_asset(asset_url: str, file_path: str) -> bool:
"""Download a single release asset."""
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
@@ -582,6 +597,12 @@ def process_version_date(version_date: str, keep_folders: bool = False):
print(f"No releases found for {vd}.")
return None
# Prefer non-tmp releases; only use tmp if no normal releases exist
normal_releases = [r for r in releases if "tmp" not in r["tag_name"]]
tmp_releases = [r for r in releases if "tmp" in r["tag_name"]]
releases = normal_releases if normal_releases else tmp_releases
print(f"Using {'normal' if normal_releases else 'tmp'} releases ({len(releases)} found)")
downloaded_files = []
for release in releases:
tag_name = release["tag_name"]
+6
View File
@@ -59,6 +59,12 @@ def download_and_extract(version_date: str) -> str | None:
print(f"No releases found for {version_date}")
return None
# Prefer non-tmp releases; only use tmp if no normal releases exist
normal_releases = [r for r in releases if "tmp" not in r["tag_name"]]
tmp_releases = [r for r in releases if "tmp" in r["tag_name"]]
releases = normal_releases if normal_releases else tmp_releases
print(f"Using {'normal' if normal_releases else 'tmp'} releases ({len(releases)} found)")
downloaded_files = []
for release in releases:
tag_name = release["tag_name"]
-97
View File
@@ -1,97 +0,0 @@
"""
Reduce step: downloads all chunk CSVs from S3, combines them,
deduplicates across the full dataset, and uploads the final result.
Environment variables:
S3_BUCKET — bucket with intermediate results
RUN_ID — run identifier matching the map workers
GLOBAL_START_DATE — overall start date for output filename
GLOBAL_END_DATE — overall end date for output filename
"""
import gzip
import os
import shutil
from pathlib import Path
import boto3
import polars as pl
from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature
def main():
s3_bucket = os.environ["S3_BUCKET"]
run_id = os.environ.get("RUN_ID", "default")
global_start = os.environ["GLOBAL_START_DATE"]
global_end = os.environ["GLOBAL_END_DATE"]
s3 = boto3.client("s3")
prefix = f"intermediate/{run_id}/"
# List all chunk files for this run
paginator = s3.get_paginator("list_objects_v2")
chunk_keys = []
for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["Key"].endswith(".csv.gz"):
chunk_keys.append(obj["Key"])
chunk_keys.sort()
print(f"Found {len(chunk_keys)} chunks to combine")
if not chunk_keys:
print("No chunks found — nothing to reduce.")
return
# Download and concatenate all chunks
download_dir = Path("/tmp/chunks")
download_dir.mkdir(parents=True, exist_ok=True)
dfs = []
for key in chunk_keys:
gz_path = download_dir / Path(key).name
csv_path = gz_path.with_suffix("") # Remove .gz
print(f"Downloading {key}...")
s3.download_file(s3_bucket, key, str(gz_path))
# Decompress
with gzip.open(gz_path, 'rb') as f_in:
with open(csv_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
gz_path.unlink()
df_chunk = pl.read_csv(csv_path)
print(f" Loaded {df_chunk.height} rows from {csv_path.name}")
dfs.append(df_chunk)
# Free disk space after loading
csv_path.unlink()
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
print(f"Combined: {df_accumulated.height} rows before dedup")
# Final global deduplication
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {df_accumulated.height} rows")
# Write and upload final result
output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz"
csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv")
gz_output = Path(f"/tmp/{output_name}")
df_accumulated.write_csv(csv_output)
with open(csv_output, 'rb') as f_in:
with gzip.open(gz_output, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
csv_output.unlink()
final_key = f"final/{output_name}"
print(f"Uploading to s3://{s3_bucket}/{final_key}")
s3.upload_file(str(gz_output), s3_bucket, final_key)
print(f"Final output: {df_accumulated.height} records -> {final_key}")
if __name__ == "__main__":
main()
+155
View File
@@ -0,0 +1,155 @@
#!/usr/bin/env python3
"""
Run the full ADS-B processing pipeline locally.
Downloads adsb.lol data, processes trace files, and outputs openairframes_adsb CSV.
Usage:
# Single day (yesterday by default)
python -m src.adsb.run_local
# Single day (specific date)
python -m src.adsb.run_local 2024-01-15
# Date range (inclusive)
python -m src.adsb.run_local 2024-01-01 2024-01-07
"""
import argparse
import os
import subprocess
import sys
from datetime import datetime, timedelta
def run_cmd(cmd: list[str], description: str) -> None:
"""Run a command and exit on failure."""
print(f"\n>>> {' '.join(cmd)}")
result = subprocess.run(cmd)
if result.returncode != 0:
print(f"ERROR: {description} failed with exit code {result.returncode}")
sys.exit(result.returncode)
def main():
parser = argparse.ArgumentParser(
description="Run full ADS-B processing pipeline locally",
usage="python -m src.adsb.run_local [start_date] [end_date]"
)
parser.add_argument(
"start_date",
nargs="?",
help="Start date (YYYY-MM-DD). Default: yesterday"
)
parser.add_argument(
"end_date",
nargs="?",
help="End date (YYYY-MM-DD, inclusive). If omitted, processes single day"
)
parser.add_argument(
"--chunks",
type=int,
default=4,
help="Number of parallel chunks (default: 4)"
)
parser.add_argument(
"--skip-base",
action="store_true",
help="Skip downloading and merging with base release"
)
args = parser.parse_args()
# Determine dates
if args.start_date:
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
else:
start_date = datetime.utcnow() - timedelta(days=1)
end_date = None
if args.end_date:
end_date = datetime.strptime(args.end_date, "%Y-%m-%d")
start_str = start_date.strftime("%Y-%m-%d")
end_str = end_date.strftime("%Y-%m-%d") if end_date else None
print("=" * 60)
print("ADS-B Processing Pipeline")
print("=" * 60)
if end_str:
print(f"Date range: {start_str} to {end_str}")
else:
print(f"Date: {start_str}")
print(f"Chunks: {args.chunks}")
print("=" * 60)
# Step 1: Download and extract
print("\n" + "=" * 60)
print("Step 1: Download and Extract")
print("=" * 60)
if end_str:
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
"--start-date", start_str, "--end-date", end_str]
else:
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
"--date", start_str]
run_cmd(cmd, "Download and extract")
# Step 2: Process chunks
print("\n" + "=" * 60)
print("Step 2: Process Chunks")
print("=" * 60)
for chunk_id in range(args.chunks):
print(f"\n--- Chunk {chunk_id + 1}/{args.chunks} ---")
if end_str:
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
"--chunk-id", str(chunk_id),
"--total-chunks", str(args.chunks),
"--start-date", start_str,
"--end-date", end_str]
else:
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
"--chunk-id", str(chunk_id),
"--total-chunks", str(args.chunks),
"--date", start_str]
run_cmd(cmd, f"Process chunk {chunk_id}")
# Step 3: Combine chunks to CSV
print("\n" + "=" * 60)
print("Step 3: Combine to CSV")
print("=" * 60)
chunks_dir = "./data/output/adsb_chunks"
cmd = ["python", "-m", "src.adsb.combine_chunks_to_csv",
"--chunks-dir", chunks_dir]
if end_str:
cmd.extend(["--start-date", start_str, "--end-date", end_str])
else:
cmd.extend(["--date", start_str])
if args.skip_base:
cmd.append("--skip-base")
run_cmd(cmd, "Combine chunks")
print("\n" + "=" * 60)
print("Done!")
print("=" * 60)
# Show output
output_dir = "./data/openairframes"
if end_str:
output_file = f"openairframes_adsb_{start_str}_{end_str}.csv"
else:
output_file = f"openairframes_adsb_{start_str}_{start_str}.csv"
output_path = os.path.join(output_dir, output_file)
if os.path.exists(output_path):
size_mb = os.path.getsize(output_path) / (1024 * 1024)
print(f"Output: {output_path}")
print(f"Size: {size_mb:.1f} MB")
if __name__ == "__main__":
main()
-89
View File
@@ -1,89 +0,0 @@
"""
Map worker: processes a date range chunk, uploads result to S3.
Environment variables:
START_DATE — inclusive, YYYY-MM-DD
END_DATE — exclusive, YYYY-MM-DD
S3_BUCKET — bucket for intermediate results
RUN_ID — unique run identifier for namespacing S3 keys
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
import boto3
import polars as pl
from compress_adsb_to_aircraft_data import (
load_historical_for_day,
deduplicate_by_signature,
COLUMNS,
)
def main():
start_date_str = os.environ["START_DATE"]
end_date_str = os.environ["END_DATE"]
s3_bucket = os.environ["S3_BUCKET"]
run_id = os.environ.get("RUN_ID", "default")
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
total_days = (end_date - start_date).days
print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})")
dfs = []
current_date = start_date
while current_date < end_date:
day_str = current_date.strftime("%Y-%m-%d")
print(f" Loading {day_str}...")
df_compressed = load_historical_for_day(current_date)
if df_compressed.height == 0:
raise RuntimeError(f"No data found for {day_str}")
dfs.append(df_compressed)
total_rows = sum(df.height for df in dfs)
print(f" +{df_compressed.height} rows (total: {total_rows})")
# Delete local cache after each day to save disk in container
cache_dir = Path("data/adsb")
if cache_dir.exists():
import shutil
shutil.rmtree(cache_dir)
current_date += timedelta(days=1)
# Concatenate all days
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
# Deduplicate within this chunk
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {df_accumulated.height} rows")
# Write to local file then upload to S3
local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv")
df_accumulated.write_csv(local_path)
# Compress with gzip
import gzip
import shutil
gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz")
with open(local_path, 'rb') as f_in:
with gzip.open(gz_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
local_path.unlink() # Remove uncompressed file
s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz"
print(f"Uploading to s3://{s3_bucket}/{s3_key}")
s3 = boto3.client("s3")
s3.upload_file(str(gz_path), s3_bucket, s3_key)
print("Done.")
if __name__ == "__main__":
main()
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Download ADS-B Exchange basic-ac-db.json.gz.
Usage:
python -m src.contributions.create_daily_adsbexchange_release [--date YYYY-MM-DD]
"""
from __future__ import annotations
import argparse
import shutil
from datetime import datetime, timezone
from pathlib import Path
from urllib.request import Request, urlopen
URL = "https://downloads.adsbexchange.com/downloads/basic-ac-db.json.gz"
OUT_ROOT = Path("data/openairframes")
def main() -> None:
parser = argparse.ArgumentParser(description="Create daily ADS-B Exchange JSON release")
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today UTC)")
args = parser.parse_args()
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
gz_path = OUT_ROOT / f"basic-ac-db_{date_str}.json.gz"
print(f"Downloading {URL}...")
req = Request(URL, headers={"User-Agent": "openairframes-downloader/1.0"}, method="GET")
with urlopen(req, timeout=300) as r, gz_path.open("wb") as f:
shutil.copyfileobj(r, f)
print(f"Wrote: {gz_path}")
if __name__ == "__main__":
main()
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
Download Mictronics aircraft database zip.
Usage:
python -m src.contributions.create_daily_microtonics_release [--date YYYY-MM-DD]
"""
from __future__ import annotations
import argparse
import shutil
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.error import URLError
from urllib.request import Request, urlopen
URL = "https://www.mictronics.de/aircraft-database/indexedDB_old.php"
OUT_ROOT = Path("data/openairframes")
MAX_RETRIES = 3
RETRY_DELAY = 30 # seconds
def main() -> None:
parser = argparse.ArgumentParser(description="Create daily Mictronics database release")
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today UTC)")
args = parser.parse_args()
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
zip_path = OUT_ROOT / f"mictronics-db_{date_str}.zip"
for attempt in range(1, MAX_RETRIES + 1):
try:
print(f"Downloading {URL} (attempt {attempt}/{MAX_RETRIES})...")
req = Request(URL, headers={"User-Agent": "Mozilla/5.0 (compatible; openairframes-downloader/1.0)"}, method="GET")
with urlopen(req, timeout=120) as r, zip_path.open("wb") as f:
shutil.copyfileobj(r, f)
print(f"Wrote: {zip_path}")
return
except (URLError, TimeoutError) as e:
print(f"Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES:
print(f"Retrying in {RETRY_DELAY} seconds...")
time.sleep(RETRY_DELAY)
else:
print("All retries exhausted. Mictronics download failed.")
sys.exit(1)
if __name__ == "__main__":
main()
+1 -1
View File
@@ -77,7 +77,7 @@ if __name__ == '__main__':
OUT_ROOT = Path("data/openairframes")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv"
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv.gz"
df_combined.write_csv(output_file)
print(f"Saved: {output_file}")
+5 -2
View File
@@ -47,6 +47,9 @@ def convert_faa_master_txt_to_df(zip_path: Path, date: str):
# Convert all NaN to empty strings
df = df.fillna("")
# The FAA parser can produce the literal string "None" for missing values;
# replace those so they match the empty-string convention used everywhere else.
df = df.replace("None", "")
return df
@@ -84,8 +87,8 @@ def concat_faa_historical_df(df_base, df_new):
# Convert to string
val_str = str(val).strip()
# Handle empty strings
if val_str == "" or val_str == "nan":
# Handle empty strings and null-like literals
if val_str == "" or val_str == "nan" or val_str == "None":
return ""
# Check if it looks like a list representation (starts with [ )
+4 -2
View File
@@ -119,6 +119,7 @@ def download_latest_aircraft_csv(
Returns:
Path to the downloaded file
"""
output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token)
try:
asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$")
@@ -164,8 +165,9 @@ def download_latest_aircraft_adsb_csv(
Returns:
Path to the downloaded file
"""
output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token)
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv$")
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
@@ -176,7 +178,7 @@ def get_latest_aircraft_adsb_csv_df():
import pandas as pd
df = pd.read_csv(csv_path)
df = df.fillna("")
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")