Compare commits

...

30 Commits

Author SHA1 Message Date
copilot-swe-agent[bot] 2b720f3d88 Remove step-level timeouts to allow artifact downloads to complete
Co-authored-by: ggman12 <17393221+ggman12@users.noreply.github.com>
2026-02-16 19:38:27 +00:00
copilot-swe-agent[bot] e65da2cd95 Fix YAML property ordering for timeout-minutes
Co-authored-by: ggman12 <17393221+ggman12@users.noreply.github.com>
2026-02-16 17:50:18 +00:00
copilot-swe-agent[bot] 7ea815732c Add timeout and retry logic to adsb-reduce artifact download step
Co-authored-by: ggman12 <17393221+ggman12@users.noreply.github.com>
2026-02-16 17:49:41 +00:00
copilot-swe-agent[bot] 7fd5df8777 Initial plan 2026-02-16 17:44:39 +00:00
ggman12 a8b2b66952 fix .csv to .csv.gz transition 2026-02-15 19:08:51 -05:00
ggman12 3f38263a0c stop depue that destroys previous days 2026-02-15 17:55:16 -05:00
ggman12 1a553d5f44 use date of file instead of min timestamp 2026-02-15 16:44:09 -05:00
ggman12 405855c566 deal with whole schema 2026-02-15 16:43:00 -05:00
ggman12 4e81dde201 fix date parsing 2026-02-15 14:55:32 -05:00
ggman12 fde8ef029c update csv writing to handle empty data. Save space with higher gzip compression 2026-02-15 14:14:54 -05:00
ggman12 18ab51bd60 update naming 2026-02-15 13:45:03 -05:00
ggman12 83b9d2a76d write gzip 2026-02-15 13:41:09 -05:00
ggman12 8874619ab0 write gzip 2026-02-15 13:41:02 -05:00
ggman12 823f291728 fix errors in daily release due to new .gz file 2026-02-15 13:21:51 -05:00
ggman12 982011b36f end of year check 2026-02-14 22:42:32 -05:00
ggman12 1b15e43669 use .csv.gz 2026-02-14 22:22:14 -05:00
ggman12 f17adc4574 remvoe aws worker, reducer 2026-02-14 22:21:14 -05:00
ggman12 6a250a63fb fix None value comparision 2026-02-14 20:21:32 -05:00
ggman12 9e24fcbc63 update integrity checker. Hopefully solve issue. 2026-02-14 19:56:25 -05:00
ggman12 8ce04f1f83 Revert "update for historical run"
This reverts commit ccf55b2308.
2026-02-14 18:44:21 -05:00
ggman12 9441761ac9 use temp release too. 2026-02-14 18:43:25 -05:00
ggman12 ccf55b2308 update for historical run 2026-02-14 15:57:16 -05:00
ggman12 76eaf118ef add run_local.py 2026-02-14 15:54:36 -05:00
ggman12 0fcbad0fbc let mictronics retry 2026-02-14 15:07:08 -05:00
ggman12 0c7484e7bf create_daily_microtonics release 2026-02-13 22:19:02 -05:00
ggman12 8c60ac611d create daily adsbexchange database snapshot release 2026-02-13 22:19:02 -05:00
ggman12 145f1006be update template 2026-02-13 12:12:24 -05:00
ggman12 f5465f0552 update .github/workflows/update-community-prs.yaml 2026-02-13 12:00:10 -05:00
ggman12 17098ae39a fix update-community-prs.yaml 2026-02-13 11:52:53 -05:00
ggman12 6f6b65780a update community_submission.yaml. update Readme.md 2026-02-13 11:49:18 -05:00
19 changed files with 560 additions and 319 deletions
@@ -8,8 +8,8 @@ body:
- type: markdown - type: markdown
attributes: attributes:
value: | value: |
Submit **one object** or an **array of objects** that matches the community submission schema. Submit **one object** or an **array of objects** that matches the community submission [schema](https://github.com/PlaneQuery/OpenAirframes/blob/main/schemas/community_submission.v1.schema.json). Reuse existing tags from the schema when possible.
**Rules (enforced on review/automation):** **Rules (enforced on review/automation):**
- Each object must include **at least one** of: - Each object must include **at least one** of:
- `registration_number` - `registration_number`
@@ -27,7 +27,7 @@ body:
```json ```json
{ {
"registration_number": "N12345", "registration_number": "N12345",
"tags": {"owner": "John Doe"}, "tags": {"owner": "John Doe", "photo": "https://example.com/photo.jpg"},
"start_date": "2025-01-01" "start_date": "2025-01-01"
} }
``` ```
+44 -15
View File
@@ -95,20 +95,27 @@ jobs:
# Verify tar integrity # Verify tar integrity
tar -tf extracted_data.tar > /dev/null && echo "Tar integrity check passed" || { echo "Tar integrity check FAILED"; exit 1; } tar -tf extracted_data.tar > /dev/null && echo "Tar integrity check passed" || { echo "Tar integrity check FAILED"; exit 1; }
# Create checksum of the FULL tar before splitting (for verification after reassembly) # Record tar size and checksum for verification after reassembly
echo "=== Creating checksum of full tar ===" echo "=== Recording tar metadata ==="
sha256sum extracted_data.tar > full_tar.sha256 ORIGINAL_SIZE=$(stat --format=%s extracted_data.tar)
cat full_tar.sha256 ORIGINAL_SHA=$(sha256sum extracted_data.tar | awk '{print $1}')
echo "Size: $ORIGINAL_SIZE"
echo "SHA256: $ORIGINAL_SHA"
# Split into 500MB chunks to avoid artifact upload issues # Split into 500MB chunks to avoid artifact upload issues
echo "=== Splitting tar into 500MB chunks ===" echo "=== Splitting tar into 500MB chunks ==="
mkdir -p tar_chunks mkdir -p tar_chunks
split -b 500M extracted_data.tar tar_chunks/extracted_data.tar.part_ split -b 500M extracted_data.tar tar_chunks/extracted_data.tar.part_
rm extracted_data.tar rm extracted_data.tar
mv full_tar.sha256 tar_chunks/
# Write metadata file (plain text so artifact upload won't skip it)
echo "$ORIGINAL_SHA extracted_data.tar" > tar_chunks/checksum.txt
echo "$ORIGINAL_SIZE" >> tar_chunks/checksum.txt
echo "=== Chunks created ===" echo "=== Chunks created ==="
ls -lah tar_chunks/ ls -lah tar_chunks/
echo "=== Checksum file ==="
cat tar_chunks/checksum.txt
else else
echo "ERROR: No extracted directories found, cannot create tar" echo "ERROR: No extracted directories found, cannot create tar"
exit 1 exit 1
@@ -179,19 +186,30 @@ jobs:
echo "=== Reassembled tar file info ===" echo "=== Reassembled tar file info ==="
ls -lah extracted_data.tar ls -lah extracted_data.tar
# Verify checksum of reassembled tar matches original # Verify integrity
echo "=== Verifying reassembled tar checksum ===" echo "=== Verifying reassembled tar ==="
echo "Original checksum:" if [ -f tar_chunks/checksum.txt ]; then
cat tar_chunks/full_tar.sha256 EXPECTED_SHA=$(head -1 tar_chunks/checksum.txt | awk '{print $1}')
echo "Reassembled checksum:" EXPECTED_SIZE=$(sed -n '2p' tar_chunks/checksum.txt)
sha256sum extracted_data.tar ACTUAL_SHA=$(sha256sum extracted_data.tar | awk '{print $1}')
sha256sum -c tar_chunks/full_tar.sha256 || { echo "ERROR: Reassembled tar checksum mismatch - data corrupted during transfer"; exit 1; } ACTUAL_SIZE=$(stat --format=%s extracted_data.tar)
echo "Checksum verified - data integrity confirmed" echo "Expected: SHA=$EXPECTED_SHA Size=$EXPECTED_SIZE"
echo "Actual: SHA=$ACTUAL_SHA Size=$ACTUAL_SIZE"
if [ "$EXPECTED_SHA" != "$ACTUAL_SHA" ] || [ "$EXPECTED_SIZE" != "$ACTUAL_SIZE" ]; then
echo "ERROR: Reassembled tar does not match original - data corrupted during transfer"
exit 1
fi
echo "Checksum and size verified"
else
echo "WARNING: No checksum file found, falling back to tar integrity check"
tar -tf extracted_data.tar > /dev/null || { echo "ERROR: Tar file is corrupted"; exit 1; }
echo "Tar integrity check passed"
fi
rm -rf tar_chunks rm -rf tar_chunks
echo "=== Extracting ===" echo "=== Extracting ==="
tar -xvf extracted_data.tar tar -xf extracted_data.tar
rm extracted_data.tar rm extracted_data.tar
echo "has_data=true" >> "$GITHUB_OUTPUT" echo "has_data=true" >> "$GITHUB_OUTPUT"
echo "=== Contents of data/output ===" echo "=== Contents of data/output ==="
@@ -222,6 +240,7 @@ jobs:
adsb-reduce: adsb-reduce:
needs: [generate-matrix, adsb-map] needs: [generate-matrix, adsb-map]
runs-on: ubuntu-24.04-arm runs-on: ubuntu-24.04-arm
timeout-minutes: 120
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
@@ -237,6 +256,16 @@ jobs:
pip install -r requirements.txt pip install -r requirements.txt
- name: Download all chunk artifacts - name: Download all chunk artifacts
id: download
continue-on-error: true
uses: actions/download-artifact@v4
with:
pattern: adsb-map-*
path: data/output/adsb_chunks/
merge-multiple: true
- name: Retry artifact download on failure
if: steps.download.outcome == 'failure'
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
with: with:
pattern: adsb-map-* pattern: adsb-map-*
@@ -264,5 +293,5 @@ jobs:
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }} name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }}
path: data/openairframes/*.csv path: data/openairframes/*.csv.gz
retention-days: 30 retention-days: 30
@@ -227,7 +227,7 @@ jobs:
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: adsb-release name: adsb-release
path: data/openairframes/openairframes_adsb_*.csv path: data/openairframes/openairframes_adsb_*.csv.gz
retention-days: 1 retention-days: 1
build-community: build-community:
@@ -261,10 +261,64 @@ jobs:
path: data/openairframes/openairframes_community_*.csv path: data/openairframes/openairframes_community_*.csv
retention-days: 1 retention-days: 1
build-adsbexchange-json:
runs-on: ubuntu-latest
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Run ADS-B Exchange JSON release script
run: |
python -m src.contributions.create_daily_adsbexchange_release ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/openairframes
- name: Upload ADS-B Exchange JSON artifact
uses: actions/upload-artifact@v4
with:
name: adsbexchange-json
path: data/openairframes/basic-ac-db_*.json.gz
retention-days: 1
build-mictronics-db:
runs-on: ubuntu-latest
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Run Mictronics DB release script
continue-on-error: true
run: |
python -m src.contributions.create_daily_microtonics_release ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
ls -lah data/openairframes
- name: Upload Mictronics DB artifact
uses: actions/upload-artifact@v4
with:
name: mictronics-db
path: data/openairframes/mictronics-db_*.zip
retention-days: 1
if-no-files-found: ignore
create-release: create-release:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: [build-faa, adsb-reduce, build-community] needs: [build-faa, adsb-reduce, build-community, build-adsbexchange-json, build-mictronics-db]
if: github.event_name != 'schedule' if: github.event_name != 'schedule' && !failure() && !cancelled()
steps: steps:
- name: Checkout for gh CLI - name: Checkout for gh CLI
uses: actions/checkout@v4 uses: actions/checkout@v4
@@ -291,6 +345,19 @@ jobs:
name: community-release name: community-release
path: artifacts/community path: artifacts/community
- name: Download ADS-B Exchange JSON artifact
uses: actions/download-artifact@v4
with:
name: adsbexchange-json
path: artifacts/adsbexchange
- name: Download Mictronics DB artifact
uses: actions/download-artifact@v4
continue-on-error: true
with:
name: mictronics-db
path: artifacts/mictronics
- name: Debug artifact structure - name: Debug artifact structure
run: | run: |
echo "=== Full artifacts tree ===" echo "=== Full artifacts tree ==="
@@ -301,6 +368,10 @@ jobs:
find artifacts/adsb -type f 2>/dev/null || echo "No files found in artifacts/adsb" find artifacts/adsb -type f 2>/dev/null || echo "No files found in artifacts/adsb"
echo "=== Community artifacts ===" echo "=== Community artifacts ==="
find artifacts/community -type f 2>/dev/null || echo "No files found in artifacts/community" find artifacts/community -type f 2>/dev/null || echo "No files found in artifacts/community"
echo "=== ADS-B Exchange JSON artifacts ==="
find artifacts/adsbexchange -type f 2>/dev/null || echo "No files found in artifacts/adsbexchange"
echo "=== Mictronics DB artifacts ==="
find artifacts/mictronics -type f 2>/dev/null || echo "No files found in artifacts/mictronics"
- name: Prepare release metadata - name: Prepare release metadata
id: meta id: meta
@@ -317,9 +388,11 @@ jobs:
# Find files from artifacts using find (handles nested structures) # Find files from artifacts using find (handles nested structures)
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1) CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1) CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1)
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1) CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1) ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1)
ZIP_FILE_MICTRONICS=$(find artifacts/mictronics -name "mictronics-db_*.zip" -type f 2>/dev/null | head -1)
# Validate required files exist # Validate required files exist
MISSING_FILES="" MISSING_FILES=""
@@ -332,12 +405,24 @@ jobs:
if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then
MISSING_FILES="$MISSING_FILES FAA_ZIP" MISSING_FILES="$MISSING_FILES FAA_ZIP"
fi fi
if [ -z "$JSON_FILE_ADSBX" ] || [ ! -f "$JSON_FILE_ADSBX" ]; then
MISSING_FILES="$MISSING_FILES ADSBX_JSON"
fi
# Optional files - warn but don't fail
OPTIONAL_MISSING=""
if [ -z "$ZIP_FILE_MICTRONICS" ] || [ ! -f "$ZIP_FILE_MICTRONICS" ]; then
OPTIONAL_MISSING="$OPTIONAL_MISSING MICTRONICS_ZIP"
ZIP_FILE_MICTRONICS=""
fi
if [ -n "$MISSING_FILES" ]; then if [ -n "$MISSING_FILES" ]; then
echo "ERROR: Missing required release files:$MISSING_FILES" echo "ERROR: Missing required release files:$MISSING_FILES"
echo "FAA CSV: $CSV_FILE_FAA" echo "FAA CSV: $CSV_FILE_FAA"
echo "ADSB CSV: $CSV_FILE_ADSB" echo "ADSB CSV: $CSV_FILE_ADSB"
echo "ZIP: $ZIP_FILE" echo "ZIP: $ZIP_FILE"
echo "ADSBX JSON: $JSON_FILE_ADSBX"
echo "MICTRONICS ZIP: $ZIP_FILE_MICTRONICS"
exit 1 exit 1
fi fi
@@ -346,6 +431,15 @@ jobs:
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB") CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "") CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
ZIP_BASENAME=$(basename "$ZIP_FILE") ZIP_BASENAME=$(basename "$ZIP_FILE")
JSON_BASENAME_ADSBX=$(basename "$JSON_FILE_ADSBX")
ZIP_BASENAME_MICTRONICS=""
if [ -n "$ZIP_FILE_MICTRONICS" ]; then
ZIP_BASENAME_MICTRONICS=$(basename "$ZIP_FILE_MICTRONICS")
fi
if [ -n "$OPTIONAL_MISSING" ]; then
echo "WARNING: Optional files missing:$OPTIONAL_MISSING (will continue without them)"
fi
echo "date=$DATE" >> "$GITHUB_OUTPUT" echo "date=$DATE" >> "$GITHUB_OUTPUT"
echo "tag=$TAG" >> "$GITHUB_OUTPUT" echo "tag=$TAG" >> "$GITHUB_OUTPUT"
@@ -357,6 +451,10 @@ jobs:
echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT" echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT"
echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT" echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT"
echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT" echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT"
echo "json_file_adsbx=$JSON_FILE_ADSBX" >> "$GITHUB_OUTPUT"
echo "json_basename_adsbx=$JSON_BASENAME_ADSBX" >> "$GITHUB_OUTPUT"
echo "zip_file_mictronics=$ZIP_FILE_MICTRONICS" >> "$GITHUB_OUTPUT"
echo "zip_basename_mictronics=$ZIP_BASENAME_MICTRONICS" >> "$GITHUB_OUTPUT"
echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT" echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
echo "Found files:" echo "Found files:"
@@ -364,6 +462,8 @@ jobs:
echo " ADSB CSV: $CSV_FILE_ADSB" echo " ADSB CSV: $CSV_FILE_ADSB"
echo " Community CSV: $CSV_FILE_COMMUNITY" echo " Community CSV: $CSV_FILE_COMMUNITY"
echo " ZIP: $ZIP_FILE" echo " ZIP: $ZIP_FILE"
echo " ADSBX JSON: $JSON_FILE_ADSBX"
echo " MICTRONICS ZIP: $ZIP_FILE_MICTRONICS"
- name: Delete existing release if exists - name: Delete existing release if exists
run: | run: |
@@ -377,7 +477,7 @@ jobs:
with: with:
tag_name: ${{ steps.meta.outputs.tag }} tag_name: ${{ steps.meta.outputs.tag }}
name: ${{ steps.meta.outputs.name }} name: ${{ steps.meta.outputs.name }}
fail_on_unmatched_files: true fail_on_unmatched_files: false
body: | body: |
Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}. Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}.
@@ -386,10 +486,14 @@ jobs:
- ${{ steps.meta.outputs.csv_basename_adsb }} - ${{ steps.meta.outputs.csv_basename_adsb }}
- ${{ steps.meta.outputs.csv_basename_community }} - ${{ steps.meta.outputs.csv_basename_community }}
- ${{ steps.meta.outputs.zip_basename }} - ${{ steps.meta.outputs.zip_basename }}
- ${{ steps.meta.outputs.json_basename_adsbx }}
${{ steps.meta.outputs.zip_basename_mictronics && format('- {0}', steps.meta.outputs.zip_basename_mictronics) || '' }}
files: | files: |
${{ steps.meta.outputs.csv_file_faa }} ${{ steps.meta.outputs.csv_file_faa }}
${{ steps.meta.outputs.csv_file_adsb }} ${{ steps.meta.outputs.csv_file_adsb }}
${{ steps.meta.outputs.csv_file_community }} ${{ steps.meta.outputs.csv_file_community }}
${{ steps.meta.outputs.zip_file }} ${{ steps.meta.outputs.zip_file }}
${{ steps.meta.outputs.json_file_adsbx }}
${{ steps.meta.outputs.zip_file_mictronics }}
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+41 -18
View File
@@ -48,29 +48,52 @@ jobs:
git fetch origin "$branch_name" git fetch origin "$branch_name"
git checkout "$branch_name" git checkout "$branch_name"
# Merge main into PR branch
git config user.name "github-actions[bot]" git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com" git config user.email "github-actions[bot]@users.noreply.github.com"
if git merge origin/main -m "Merge main to update schema"; then # Get the community submission file(s) and schema from this branch
# Regenerate schema for this PR's submission (adds any new tags) community_files=$(git diff --name-only origin/main...HEAD -- 'community/' 'schemas/')
python -m src.contributions.regenerate_pr_schema || true
if [ -z "$community_files" ]; then
# If there are changes, commit and push echo " No community/schema files found in PR #$pr_number, skipping"
if [ -n "$(git status --porcelain schemas/)" ]; then git checkout main
git add schemas/ continue
git commit -m "Update schema with new tags" fi
git push origin "$branch_name"
echo " Updated PR #$pr_number with schema changes" echo " Files to preserve: $community_files"
else
git push origin "$branch_name" # Save the community files content
echo " Merged main into PR #$pr_number" mkdir -p /tmp/pr_files
for file in $community_files; do
if [ -f "$file" ]; then
mkdir -p "/tmp/pr_files/$(dirname "$file")"
cp "$file" "/tmp/pr_files/$file"
fi fi
done
# Reset branch to main (clean slate)
git reset --hard origin/main
# Restore the community files
for file in $community_files; do
if [ -f "/tmp/pr_files/$file" ]; then
mkdir -p "$(dirname "$file")"
cp "/tmp/pr_files/$file" "$file"
fi
done
rm -rf /tmp/pr_files
# Regenerate schema with current main + this submission's tags
python -m src.contributions.regenerate_pr_schema || true
# Stage and commit all changes
git add community/ schemas/
if ! git diff --cached --quiet; then
git commit -m "Community submission (rebased on main)"
git push --force origin "$branch_name"
echo " Rebased PR #$pr_number onto main"
else else
echo " Merge conflict in PR #$pr_number, adding comment" echo " No changes needed for PR #$pr_number"
gh pr comment "$pr_number" --body $'⚠️ **Merge Conflict**\n\nAnother community submission was merged and this PR has conflicts.\n\nA maintainer may need to:\n1. Close this PR\n2. Remove the `approved` label from the original issue\n3. Re-add the `approved` label to regenerate the PR'
git merge --abort
fi
fi fi
git checkout main git checkout main
+3 -2
View File
@@ -20,7 +20,7 @@ A daily release is created at **06:00 UTC** and includes:
All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB) All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB)
- **openairframes_adsb.csv** - **openairframes_adsb.csv**
Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present. The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB). Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present (will be from 2024-01-01 soon). The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB).
- **ReleasableAircraft_{date}.zip** - **ReleasableAircraft_{date}.zip**
A daily snapshot of the FAA database, which updates at **05:30 UTC** A daily snapshot of the FAA database, which updates at **05:30 UTC**
@@ -43,7 +43,8 @@ Please try to follow the submission formatting guidelines. If you are struggling
## For Developers ## For Developers
All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include: All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include:
- Web UI - Web UI for data
- Web UI for contributors
- Additional export formats in the daily release - Additional export formats in the daily release
- Data fusion from multiple sources in the daily release - Data fusion from multiple sources in the daily release
- Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs - Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs
-11
View File
@@ -1,11 +0,0 @@
FROM --platform=linux/arm64 python:3.12-slim
WORKDIR /app
COPY requirements.reducer.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY reducer.py .
CMD ["python", "-u", "reducer.py"]
-12
View File
@@ -1,12 +0,0 @@
FROM --platform=linux/arm64 python:3.12-slim
WORKDIR /app
COPY requirements.worker.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY download_adsb_data_to_parquet.py .
COPY worker.py .
CMD ["python", "-u", "worker.py"]
+65 -54
View File
@@ -14,11 +14,12 @@ Usage:
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base
""" """
import gc import gc
import gzip
import os import os
import sys import sys
import glob import glob
import argparse import argparse
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
import polars as pl import polars as pl
@@ -33,7 +34,7 @@ os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
def get_target_day() -> datetime: def get_target_day() -> datetime:
"""Get yesterday's date (the day we're processing).""" """Get yesterday's date (the day we're processing)."""
return datetime.utcnow() - timedelta(days=1) return datetime.now(timezone.utc) - timedelta(days=1)
def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame: def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame:
@@ -83,58 +84,53 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram
return combined return combined
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame: def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.DataFrame, str | None]:
"""Download base release and merge with new data.""" """Download base release and merge with new data.
Returns:
Tuple of (merged_df, earliest_date_str) where earliest_date_str is None if no base release was merged
"""
from src.get_latest_release import download_latest_aircraft_adsb_csv from src.get_latest_release import download_latest_aircraft_adsb_csv
print("Downloading base ADS-B release...") print("Downloading base ADS-B release...")
try: base_path = download_latest_aircraft_adsb_csv(
base_path = download_latest_aircraft_adsb_csv( output_dir="./data/openairframes_base"
output_dir="./data/openairframes_base" )
) print(f"Download returned: {base_path}")
print(f"Download returned: {base_path}")
print(f"Loading base release from {base_path}")
if base_path and os.path.exists(str(base_path)):
print(f"Loading base release from {base_path}") # Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz)
base_df = pl.read_csv(base_path) import re
print(f"Base release has {len(base_df)} records") filename = os.path.basename(str(base_path))
match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename)
# Ensure columns match earliest_date = match.group(1) if match else None
base_cols = set(base_df.columns) print(f"Start date from base filename: {earliest_date}")
new_cols = set(compressed_df.columns)
print(f"Base columns: {sorted(base_cols)}") # Read CSV with schema matching the new data
print(f"New columns: {sorted(new_cols)}") base_df = pl.read_csv(base_path, schema=compressed_df.schema)
print(f"Base release has {len(base_df)} records")
# Add missing columns
for col in new_cols - base_cols: # Ensure columns match
base_df = base_df.with_columns(pl.lit(None).alias(col)) base_cols = set(base_df.columns)
for col in base_cols - new_cols: new_cols = set(compressed_df.columns)
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col)) print(f"Base columns: {sorted(base_cols)}")
print(f"New columns: {sorted(new_cols)}")
# Reorder columns to match
compressed_df = compressed_df.select(base_df.columns) # Add missing columns
for col in new_cols - base_cols:
# Concat and deduplicate by icao (keep new data - it comes last) base_df = base_df.with_columns(pl.lit(None).alias(col))
combined = pl.concat([base_df, compressed_df]) for col in base_cols - new_cols:
print(f"After concat: {len(combined)} records") compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
deduplicated = combined.unique(subset=["icao"], keep="last") # Reorder columns to match
compressed_df = compressed_df.select(base_df.columns)
print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup")
# Concat and deduplicate by icao (keep new data - it comes last)
del base_df, combined combined = pl.concat([base_df, compressed_df])
gc.collect() print(f"After concat: {len(combined)} records")
return deduplicated
else:
print(f"No base release found at {base_path}, using only new data")
return compressed_df
except Exception as e:
import traceback
print(f"Failed to download base release: {e}")
traceback.print_exc()
return compressed_df
return combined, earliest_date
def cleanup_chunks(output_id: str, chunks_dir: str): def cleanup_chunks(output_id: str, chunks_dir: str):
"""Delete chunk parquet files after successful merge.""" """Delete chunk parquet files after successful merge."""
@@ -176,7 +172,7 @@ def main():
if args.start_date and args.end_date: if args.start_date and args.end_date:
# Historical mode # Historical mode
output_id = f"{args.start_date}_{args.end_date}" output_id = f"{args.start_date}_{args.end_date}"
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv" output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv.gz"
print(f"Combining chunks for date range: {args.start_date} to {args.end_date}") print(f"Combining chunks for date range: {args.start_date} to {args.end_date}")
else: else:
# Daily mode - use same date for start and end # Daily mode - use same date for start and end
@@ -187,7 +183,7 @@ def main():
date_str = target_day.strftime("%Y-%m-%d") date_str = target_day.strftime("%Y-%m-%d")
output_id = date_str output_id = date_str
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv" output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv.gz"
print(f"Combining chunks for {date_str}") print(f"Combining chunks for {date_str}")
chunks_dir = args.chunks_dir chunks_dir = args.chunks_dir
@@ -220,8 +216,15 @@ def main():
print(f"After combining: {get_resource_usage()}") print(f"After combining: {get_resource_usage()}")
# Merge with base release (unless skipped) # Merge with base release (unless skipped)
base_start_date = None
if not args.skip_base: if not args.skip_base:
combined = download_and_merge_base_release(combined) combined, base_start_date = download_and_merge_base_release(combined)
# Update filename if we merged with base release and got a start date
if base_start_date and not (args.start_date and args.end_date):
# Only update filename for daily mode when base was merged
output_filename = f"openairframes_adsb_{base_start_date}_{date_str}.csv.gz"
print(f"Updated filename to reflect date range: {output_filename}")
# Convert list columns to strings for CSV compatibility # Convert list columns to strings for CSV compatibility
for col in combined.columns: for col in combined.columns:
@@ -234,9 +237,17 @@ def main():
if 'time' in combined.columns: if 'time' in combined.columns:
combined = combined.sort('time') combined = combined.sort('time')
# Replace empty strings with null across all string columns to avoid quoted empty strings
for col in combined.columns:
if combined[col].dtype == pl.Utf8:
combined = combined.with_columns(
pl.when(pl.col(col) == "").then(None).otherwise(pl.col(col)).alias(col)
)
# Write final CSV # Write final CSV
output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename) output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename)
combined.write_csv(output_path) with gzip.open(output_path, "wb", compresslevel=9) as f:
combined.write_csv(f, null_value='', quote_style='necessary')
print(f"Wrote {len(combined)} records to {output_path}") print(f"Wrote {len(combined)} records to {output_path}")
# Cleanup # Cleanup
+1 -1
View File
@@ -264,7 +264,7 @@ def get_latest_aircraft_adsb_csv_df():
if df[col].dtype == pl.Utf8: if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null("")) df = df.with_columns(pl.col(col).fill_null(""))
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv # Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match: if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}") raise ValueError(f"Could not extract date from filename: {csv_path.name}")
+28 -7
View File
@@ -76,14 +76,10 @@ def timeout_handler(signum, frame):
raise DownloadTimeoutException("Download timed out after 40 seconds") raise DownloadTimeoutException("Download timed out after 40 seconds")
def fetch_releases(version_date: str) -> list: def _fetch_releases_from_repo(year: str, version_date: str) -> list:
"""Fetch GitHub releases for a given version date from adsblol.""" """Fetch GitHub releases for a given version date from a specific year's adsblol repo."""
year = version_date.split('.')[0][1:]
if version_date == "v2024.12.31":
year = "2025"
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases" BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
# Match exact release name, exclude tmp releases PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+(tmp)?$"
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+$"
releases = [] releases = []
page = 1 page = 1
@@ -123,6 +119,25 @@ def fetch_releases(version_date: str) -> list:
return releases return releases
def fetch_releases(version_date: str) -> list:
"""Fetch GitHub releases for a given version date from adsblol.
For Dec 31 dates, if no releases are found in the current year's repo,
also checks the next year's repo (adsblol sometimes publishes Dec 31
data in the following year's repository).
"""
year = version_date.split('.')[0][1:]
releases = _fetch_releases_from_repo(year, version_date)
# For last day of year, also check next year's repo if nothing found
if not releases and version_date.endswith(".12.31"):
next_year = str(int(year) + 1)
print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo...")
releases = _fetch_releases_from_repo(next_year, version_date)
return releases
def download_asset(asset_url: str, file_path: str) -> bool: def download_asset(asset_url: str, file_path: str) -> bool:
"""Download a single release asset.""" """Download a single release asset."""
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True) os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
@@ -582,6 +597,12 @@ def process_version_date(version_date: str, keep_folders: bool = False):
print(f"No releases found for {vd}.") print(f"No releases found for {vd}.")
return None return None
# Prefer non-tmp releases; only use tmp if no normal releases exist
normal_releases = [r for r in releases if "tmp" not in r["tag_name"]]
tmp_releases = [r for r in releases if "tmp" in r["tag_name"]]
releases = normal_releases if normal_releases else tmp_releases
print(f"Using {'normal' if normal_releases else 'tmp'} releases ({len(releases)} found)")
downloaded_files = [] downloaded_files = []
for release in releases: for release in releases:
tag_name = release["tag_name"] tag_name = release["tag_name"]
+6
View File
@@ -59,6 +59,12 @@ def download_and_extract(version_date: str) -> str | None:
print(f"No releases found for {version_date}") print(f"No releases found for {version_date}")
return None return None
# Prefer non-tmp releases; only use tmp if no normal releases exist
normal_releases = [r for r in releases if "tmp" not in r["tag_name"]]
tmp_releases = [r for r in releases if "tmp" in r["tag_name"]]
releases = normal_releases if normal_releases else tmp_releases
print(f"Using {'normal' if normal_releases else 'tmp'} releases ({len(releases)} found)")
downloaded_files = [] downloaded_files = []
for release in releases: for release in releases:
tag_name = release["tag_name"] tag_name = release["tag_name"]
-97
View File
@@ -1,97 +0,0 @@
"""
Reduce step: downloads all chunk CSVs from S3, combines them,
deduplicates across the full dataset, and uploads the final result.
Environment variables:
S3_BUCKET — bucket with intermediate results
RUN_ID — run identifier matching the map workers
GLOBAL_START_DATE — overall start date for output filename
GLOBAL_END_DATE — overall end date for output filename
"""
import gzip
import os
import shutil
from pathlib import Path
import boto3
import polars as pl
from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature
def main():
s3_bucket = os.environ["S3_BUCKET"]
run_id = os.environ.get("RUN_ID", "default")
global_start = os.environ["GLOBAL_START_DATE"]
global_end = os.environ["GLOBAL_END_DATE"]
s3 = boto3.client("s3")
prefix = f"intermediate/{run_id}/"
# List all chunk files for this run
paginator = s3.get_paginator("list_objects_v2")
chunk_keys = []
for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["Key"].endswith(".csv.gz"):
chunk_keys.append(obj["Key"])
chunk_keys.sort()
print(f"Found {len(chunk_keys)} chunks to combine")
if not chunk_keys:
print("No chunks found — nothing to reduce.")
return
# Download and concatenate all chunks
download_dir = Path("/tmp/chunks")
download_dir.mkdir(parents=True, exist_ok=True)
dfs = []
for key in chunk_keys:
gz_path = download_dir / Path(key).name
csv_path = gz_path.with_suffix("") # Remove .gz
print(f"Downloading {key}...")
s3.download_file(s3_bucket, key, str(gz_path))
# Decompress
with gzip.open(gz_path, 'rb') as f_in:
with open(csv_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
gz_path.unlink()
df_chunk = pl.read_csv(csv_path)
print(f" Loaded {df_chunk.height} rows from {csv_path.name}")
dfs.append(df_chunk)
# Free disk space after loading
csv_path.unlink()
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
print(f"Combined: {df_accumulated.height} rows before dedup")
# Final global deduplication
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {df_accumulated.height} rows")
# Write and upload final result
output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz"
csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv")
gz_output = Path(f"/tmp/{output_name}")
df_accumulated.write_csv(csv_output)
with open(csv_output, 'rb') as f_in:
with gzip.open(gz_output, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
csv_output.unlink()
final_key = f"final/{output_name}"
print(f"Uploading to s3://{s3_bucket}/{final_key}")
s3.upload_file(str(gz_output), s3_bucket, final_key)
print(f"Final output: {df_accumulated.height} records -> {final_key}")
if __name__ == "__main__":
main()
+155
View File
@@ -0,0 +1,155 @@
#!/usr/bin/env python3
"""
Run the full ADS-B processing pipeline locally.
Downloads adsb.lol data, processes trace files, and outputs openairframes_adsb CSV.
Usage:
# Single day (yesterday by default)
python -m src.adsb.run_local
# Single day (specific date)
python -m src.adsb.run_local 2024-01-15
# Date range (inclusive)
python -m src.adsb.run_local 2024-01-01 2024-01-07
"""
import argparse
import os
import subprocess
import sys
from datetime import datetime, timedelta
def run_cmd(cmd: list[str], description: str) -> None:
"""Run a command and exit on failure."""
print(f"\n>>> {' '.join(cmd)}")
result = subprocess.run(cmd)
if result.returncode != 0:
print(f"ERROR: {description} failed with exit code {result.returncode}")
sys.exit(result.returncode)
def main():
parser = argparse.ArgumentParser(
description="Run full ADS-B processing pipeline locally",
usage="python -m src.adsb.run_local [start_date] [end_date]"
)
parser.add_argument(
"start_date",
nargs="?",
help="Start date (YYYY-MM-DD). Default: yesterday"
)
parser.add_argument(
"end_date",
nargs="?",
help="End date (YYYY-MM-DD, inclusive). If omitted, processes single day"
)
parser.add_argument(
"--chunks",
type=int,
default=4,
help="Number of parallel chunks (default: 4)"
)
parser.add_argument(
"--skip-base",
action="store_true",
help="Skip downloading and merging with base release"
)
args = parser.parse_args()
# Determine dates
if args.start_date:
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
else:
start_date = datetime.utcnow() - timedelta(days=1)
end_date = None
if args.end_date:
end_date = datetime.strptime(args.end_date, "%Y-%m-%d")
start_str = start_date.strftime("%Y-%m-%d")
end_str = end_date.strftime("%Y-%m-%d") if end_date else None
print("=" * 60)
print("ADS-B Processing Pipeline")
print("=" * 60)
if end_str:
print(f"Date range: {start_str} to {end_str}")
else:
print(f"Date: {start_str}")
print(f"Chunks: {args.chunks}")
print("=" * 60)
# Step 1: Download and extract
print("\n" + "=" * 60)
print("Step 1: Download and Extract")
print("=" * 60)
if end_str:
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
"--start-date", start_str, "--end-date", end_str]
else:
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
"--date", start_str]
run_cmd(cmd, "Download and extract")
# Step 2: Process chunks
print("\n" + "=" * 60)
print("Step 2: Process Chunks")
print("=" * 60)
for chunk_id in range(args.chunks):
print(f"\n--- Chunk {chunk_id + 1}/{args.chunks} ---")
if end_str:
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
"--chunk-id", str(chunk_id),
"--total-chunks", str(args.chunks),
"--start-date", start_str,
"--end-date", end_str]
else:
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
"--chunk-id", str(chunk_id),
"--total-chunks", str(args.chunks),
"--date", start_str]
run_cmd(cmd, f"Process chunk {chunk_id}")
# Step 3: Combine chunks to CSV
print("\n" + "=" * 60)
print("Step 3: Combine to CSV")
print("=" * 60)
chunks_dir = "./data/output/adsb_chunks"
cmd = ["python", "-m", "src.adsb.combine_chunks_to_csv",
"--chunks-dir", chunks_dir]
if end_str:
cmd.extend(["--start-date", start_str, "--end-date", end_str])
else:
cmd.extend(["--date", start_str])
if args.skip_base:
cmd.append("--skip-base")
run_cmd(cmd, "Combine chunks")
print("\n" + "=" * 60)
print("Done!")
print("=" * 60)
# Show output
output_dir = "./data/openairframes"
if end_str:
output_file = f"openairframes_adsb_{start_str}_{end_str}.csv"
else:
output_file = f"openairframes_adsb_{start_str}_{start_str}.csv"
output_path = os.path.join(output_dir, output_file)
if os.path.exists(output_path):
size_mb = os.path.getsize(output_path) / (1024 * 1024)
print(f"Output: {output_path}")
print(f"Size: {size_mb:.1f} MB")
if __name__ == "__main__":
main()
-89
View File
@@ -1,89 +0,0 @@
"""
Map worker: processes a date range chunk, uploads result to S3.
Environment variables:
START_DATE — inclusive, YYYY-MM-DD
END_DATE — exclusive, YYYY-MM-DD
S3_BUCKET — bucket for intermediate results
RUN_ID — unique run identifier for namespacing S3 keys
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
import boto3
import polars as pl
from compress_adsb_to_aircraft_data import (
load_historical_for_day,
deduplicate_by_signature,
COLUMNS,
)
def main():
start_date_str = os.environ["START_DATE"]
end_date_str = os.environ["END_DATE"]
s3_bucket = os.environ["S3_BUCKET"]
run_id = os.environ.get("RUN_ID", "default")
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
total_days = (end_date - start_date).days
print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})")
dfs = []
current_date = start_date
while current_date < end_date:
day_str = current_date.strftime("%Y-%m-%d")
print(f" Loading {day_str}...")
df_compressed = load_historical_for_day(current_date)
if df_compressed.height == 0:
raise RuntimeError(f"No data found for {day_str}")
dfs.append(df_compressed)
total_rows = sum(df.height for df in dfs)
print(f" +{df_compressed.height} rows (total: {total_rows})")
# Delete local cache after each day to save disk in container
cache_dir = Path("data/adsb")
if cache_dir.exists():
import shutil
shutil.rmtree(cache_dir)
current_date += timedelta(days=1)
# Concatenate all days
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
# Deduplicate within this chunk
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {df_accumulated.height} rows")
# Write to local file then upload to S3
local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv")
df_accumulated.write_csv(local_path)
# Compress with gzip
import gzip
import shutil
gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz")
with open(local_path, 'rb') as f_in:
with gzip.open(gz_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
local_path.unlink() # Remove uncompressed file
s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz"
print(f"Uploading to s3://{s3_bucket}/{s3_key}")
s3 = boto3.client("s3")
s3.upload_file(str(gz_path), s3_bucket, s3_key)
print("Done.")
if __name__ == "__main__":
main()
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Download ADS-B Exchange basic-ac-db.json.gz.
Usage:
python -m src.contributions.create_daily_adsbexchange_release [--date YYYY-MM-DD]
"""
from __future__ import annotations
import argparse
import shutil
from datetime import datetime, timezone
from pathlib import Path
from urllib.request import Request, urlopen
URL = "https://downloads.adsbexchange.com/downloads/basic-ac-db.json.gz"
OUT_ROOT = Path("data/openairframes")
def main() -> None:
parser = argparse.ArgumentParser(description="Create daily ADS-B Exchange JSON release")
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today UTC)")
args = parser.parse_args()
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
gz_path = OUT_ROOT / f"basic-ac-db_{date_str}.json.gz"
print(f"Downloading {URL}...")
req = Request(URL, headers={"User-Agent": "openairframes-downloader/1.0"}, method="GET")
with urlopen(req, timeout=300) as r, gz_path.open("wb") as f:
shutil.copyfileobj(r, f)
print(f"Wrote: {gz_path}")
if __name__ == "__main__":
main()
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
Download Mictronics aircraft database zip.
Usage:
python -m src.contributions.create_daily_microtonics_release [--date YYYY-MM-DD]
"""
from __future__ import annotations
import argparse
import shutil
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.error import URLError
from urllib.request import Request, urlopen
URL = "https://www.mictronics.de/aircraft-database/indexedDB_old.php"
OUT_ROOT = Path("data/openairframes")
MAX_RETRIES = 3
RETRY_DELAY = 30 # seconds
def main() -> None:
parser = argparse.ArgumentParser(description="Create daily Mictronics database release")
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today UTC)")
args = parser.parse_args()
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
zip_path = OUT_ROOT / f"mictronics-db_{date_str}.zip"
for attempt in range(1, MAX_RETRIES + 1):
try:
print(f"Downloading {URL} (attempt {attempt}/{MAX_RETRIES})...")
req = Request(URL, headers={"User-Agent": "Mozilla/5.0 (compatible; openairframes-downloader/1.0)"}, method="GET")
with urlopen(req, timeout=120) as r, zip_path.open("wb") as f:
shutil.copyfileobj(r, f)
print(f"Wrote: {zip_path}")
return
except (URLError, TimeoutError) as e:
print(f"Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES:
print(f"Retrying in {RETRY_DELAY} seconds...")
time.sleep(RETRY_DELAY)
else:
print("All retries exhausted. Mictronics download failed.")
sys.exit(1)
if __name__ == "__main__":
main()
+1 -1
View File
@@ -77,7 +77,7 @@ if __name__ == '__main__':
OUT_ROOT = Path("data/openairframes") OUT_ROOT = Path("data/openairframes")
OUT_ROOT.mkdir(parents=True, exist_ok=True) OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv" output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv.gz"
df_combined.write_csv(output_file) df_combined.write_csv(output_file)
print(f"Saved: {output_file}") print(f"Saved: {output_file}")
+5 -2
View File
@@ -47,6 +47,9 @@ def convert_faa_master_txt_to_df(zip_path: Path, date: str):
# Convert all NaN to empty strings # Convert all NaN to empty strings
df = df.fillna("") df = df.fillna("")
# The FAA parser can produce the literal string "None" for missing values;
# replace those so they match the empty-string convention used everywhere else.
df = df.replace("None", "")
return df return df
@@ -84,8 +87,8 @@ def concat_faa_historical_df(df_base, df_new):
# Convert to string # Convert to string
val_str = str(val).strip() val_str = str(val).strip()
# Handle empty strings # Handle empty strings and null-like literals
if val_str == "" or val_str == "nan": if val_str == "" or val_str == "nan" or val_str == "None":
return "" return ""
# Check if it looks like a list representation (starts with [ ) # Check if it looks like a list representation (starts with [ )
+4 -2
View File
@@ -119,6 +119,7 @@ def download_latest_aircraft_csv(
Returns: Returns:
Path to the downloaded file Path to the downloaded file
""" """
output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token) assets = get_latest_release_assets(repo, github_token=github_token)
try: try:
asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$") asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$")
@@ -164,8 +165,9 @@ def download_latest_aircraft_adsb_csv(
Returns: Returns:
Path to the downloaded file Path to the downloaded file
""" """
output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token) assets = get_latest_release_assets(repo, github_token=github_token)
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv$") asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to return saved_to
@@ -176,7 +178,7 @@ def get_latest_aircraft_adsb_csv_df():
import pandas as pd import pandas as pd
df = pd.read_csv(csv_path) df = pd.read_csv(csv_path)
df = df.fillna("") df = df.fillna("")
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv # Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match: if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}") raise ValueError(f"Could not extract date from filename: {csv_path.name}")