mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-06-10 23:17:47 +02:00
Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a8b2b66952 | |||
| 3f38263a0c | |||
| 1a553d5f44 | |||
| 405855c566 | |||
| 4e81dde201 | |||
| fde8ef029c | |||
| 18ab51bd60 | |||
| 83b9d2a76d | |||
| 8874619ab0 | |||
| 823f291728 | |||
| 982011b36f | |||
| 1b15e43669 | |||
| f17adc4574 | |||
| 6a250a63fb | |||
| 9e24fcbc63 | |||
| 8ce04f1f83 | |||
| 9441761ac9 | |||
| ccf55b2308 | |||
| 76eaf118ef | |||
| 0fcbad0fbc | |||
| 0c7484e7bf | |||
| 8c60ac611d | |||
| 145f1006be | |||
| f5465f0552 | |||
| 17098ae39a | |||
| 6f6b65780a |
@@ -8,7 +8,7 @@ body:
|
|||||||
- type: markdown
|
- type: markdown
|
||||||
attributes:
|
attributes:
|
||||||
value: |
|
value: |
|
||||||
Submit **one object** or an **array of objects** that matches the community submission schema.
|
Submit **one object** or an **array of objects** that matches the community submission [schema](https://github.com/PlaneQuery/OpenAirframes/blob/main/schemas/community_submission.v1.schema.json). Reuse existing tags from the schema when possible.
|
||||||
|
|
||||||
**Rules (enforced on review/automation):**
|
**Rules (enforced on review/automation):**
|
||||||
- Each object must include **at least one** of:
|
- Each object must include **at least one** of:
|
||||||
@@ -27,7 +27,7 @@ body:
|
|||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"registration_number": "N12345",
|
"registration_number": "N12345",
|
||||||
"tags": {"owner": "John Doe"},
|
"tags": {"owner": "John Doe", "photo": "https://example.com/photo.jpg"},
|
||||||
"start_date": "2025-01-01"
|
"start_date": "2025-01-01"
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -95,20 +95,27 @@ jobs:
|
|||||||
# Verify tar integrity
|
# Verify tar integrity
|
||||||
tar -tf extracted_data.tar > /dev/null && echo "Tar integrity check passed" || { echo "Tar integrity check FAILED"; exit 1; }
|
tar -tf extracted_data.tar > /dev/null && echo "Tar integrity check passed" || { echo "Tar integrity check FAILED"; exit 1; }
|
||||||
|
|
||||||
# Create checksum of the FULL tar before splitting (for verification after reassembly)
|
# Record tar size and checksum for verification after reassembly
|
||||||
echo "=== Creating checksum of full tar ==="
|
echo "=== Recording tar metadata ==="
|
||||||
sha256sum extracted_data.tar > full_tar.sha256
|
ORIGINAL_SIZE=$(stat --format=%s extracted_data.tar)
|
||||||
cat full_tar.sha256
|
ORIGINAL_SHA=$(sha256sum extracted_data.tar | awk '{print $1}')
|
||||||
|
echo "Size: $ORIGINAL_SIZE"
|
||||||
|
echo "SHA256: $ORIGINAL_SHA"
|
||||||
|
|
||||||
# Split into 500MB chunks to avoid artifact upload issues
|
# Split into 500MB chunks to avoid artifact upload issues
|
||||||
echo "=== Splitting tar into 500MB chunks ==="
|
echo "=== Splitting tar into 500MB chunks ==="
|
||||||
mkdir -p tar_chunks
|
mkdir -p tar_chunks
|
||||||
split -b 500M extracted_data.tar tar_chunks/extracted_data.tar.part_
|
split -b 500M extracted_data.tar tar_chunks/extracted_data.tar.part_
|
||||||
rm extracted_data.tar
|
rm extracted_data.tar
|
||||||
mv full_tar.sha256 tar_chunks/
|
|
||||||
|
# Write metadata file (plain text so artifact upload won't skip it)
|
||||||
|
echo "$ORIGINAL_SHA extracted_data.tar" > tar_chunks/checksum.txt
|
||||||
|
echo "$ORIGINAL_SIZE" >> tar_chunks/checksum.txt
|
||||||
|
|
||||||
echo "=== Chunks created ==="
|
echo "=== Chunks created ==="
|
||||||
ls -lah tar_chunks/
|
ls -lah tar_chunks/
|
||||||
|
echo "=== Checksum file ==="
|
||||||
|
cat tar_chunks/checksum.txt
|
||||||
else
|
else
|
||||||
echo "ERROR: No extracted directories found, cannot create tar"
|
echo "ERROR: No extracted directories found, cannot create tar"
|
||||||
exit 1
|
exit 1
|
||||||
@@ -179,19 +186,30 @@ jobs:
|
|||||||
echo "=== Reassembled tar file info ==="
|
echo "=== Reassembled tar file info ==="
|
||||||
ls -lah extracted_data.tar
|
ls -lah extracted_data.tar
|
||||||
|
|
||||||
# Verify checksum of reassembled tar matches original
|
# Verify integrity
|
||||||
echo "=== Verifying reassembled tar checksum ==="
|
echo "=== Verifying reassembled tar ==="
|
||||||
echo "Original checksum:"
|
if [ -f tar_chunks/checksum.txt ]; then
|
||||||
cat tar_chunks/full_tar.sha256
|
EXPECTED_SHA=$(head -1 tar_chunks/checksum.txt | awk '{print $1}')
|
||||||
echo "Reassembled checksum:"
|
EXPECTED_SIZE=$(sed -n '2p' tar_chunks/checksum.txt)
|
||||||
sha256sum extracted_data.tar
|
ACTUAL_SHA=$(sha256sum extracted_data.tar | awk '{print $1}')
|
||||||
sha256sum -c tar_chunks/full_tar.sha256 || { echo "ERROR: Reassembled tar checksum mismatch - data corrupted during transfer"; exit 1; }
|
ACTUAL_SIZE=$(stat --format=%s extracted_data.tar)
|
||||||
echo "Checksum verified - data integrity confirmed"
|
echo "Expected: SHA=$EXPECTED_SHA Size=$EXPECTED_SIZE"
|
||||||
|
echo "Actual: SHA=$ACTUAL_SHA Size=$ACTUAL_SIZE"
|
||||||
|
if [ "$EXPECTED_SHA" != "$ACTUAL_SHA" ] || [ "$EXPECTED_SIZE" != "$ACTUAL_SIZE" ]; then
|
||||||
|
echo "ERROR: Reassembled tar does not match original - data corrupted during transfer"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "Checksum and size verified"
|
||||||
|
else
|
||||||
|
echo "WARNING: No checksum file found, falling back to tar integrity check"
|
||||||
|
tar -tf extracted_data.tar > /dev/null || { echo "ERROR: Tar file is corrupted"; exit 1; }
|
||||||
|
echo "Tar integrity check passed"
|
||||||
|
fi
|
||||||
|
|
||||||
rm -rf tar_chunks
|
rm -rf tar_chunks
|
||||||
|
|
||||||
echo "=== Extracting ==="
|
echo "=== Extracting ==="
|
||||||
tar -xvf extracted_data.tar
|
tar -xf extracted_data.tar
|
||||||
rm extracted_data.tar
|
rm extracted_data.tar
|
||||||
echo "has_data=true" >> "$GITHUB_OUTPUT"
|
echo "has_data=true" >> "$GITHUB_OUTPUT"
|
||||||
echo "=== Contents of data/output ==="
|
echo "=== Contents of data/output ==="
|
||||||
@@ -264,5 +282,5 @@ jobs:
|
|||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
with:
|
with:
|
||||||
name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }}
|
name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }}
|
||||||
path: data/openairframes/*.csv
|
path: data/openairframes/*.csv.gz
|
||||||
retention-days: 30
|
retention-days: 30
|
||||||
|
|||||||
@@ -227,7 +227,7 @@ jobs:
|
|||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
with:
|
with:
|
||||||
name: adsb-release
|
name: adsb-release
|
||||||
path: data/openairframes/openairframes_adsb_*.csv
|
path: data/openairframes/openairframes_adsb_*.csv.gz
|
||||||
retention-days: 1
|
retention-days: 1
|
||||||
|
|
||||||
build-community:
|
build-community:
|
||||||
@@ -261,10 +261,64 @@ jobs:
|
|||||||
path: data/openairframes/openairframes_community_*.csv
|
path: data/openairframes/openairframes_community_*.csv
|
||||||
retention-days: 1
|
retention-days: 1
|
||||||
|
|
||||||
|
build-adsbexchange-json:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
if: github.event_name != 'schedule'
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v6
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
|
||||||
|
- name: Setup Python
|
||||||
|
uses: actions/setup-python@v6
|
||||||
|
with:
|
||||||
|
python-version: "3.14"
|
||||||
|
|
||||||
|
- name: Run ADS-B Exchange JSON release script
|
||||||
|
run: |
|
||||||
|
python -m src.contributions.create_daily_adsbexchange_release ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
|
||||||
|
ls -lah data/openairframes
|
||||||
|
|
||||||
|
- name: Upload ADS-B Exchange JSON artifact
|
||||||
|
uses: actions/upload-artifact@v4
|
||||||
|
with:
|
||||||
|
name: adsbexchange-json
|
||||||
|
path: data/openairframes/basic-ac-db_*.json.gz
|
||||||
|
retention-days: 1
|
||||||
|
|
||||||
|
build-mictronics-db:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
if: github.event_name != 'schedule'
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v6
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
|
||||||
|
- name: Setup Python
|
||||||
|
uses: actions/setup-python@v6
|
||||||
|
with:
|
||||||
|
python-version: "3.14"
|
||||||
|
|
||||||
|
- name: Run Mictronics DB release script
|
||||||
|
continue-on-error: true
|
||||||
|
run: |
|
||||||
|
python -m src.contributions.create_daily_microtonics_release ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
|
||||||
|
ls -lah data/openairframes
|
||||||
|
|
||||||
|
- name: Upload Mictronics DB artifact
|
||||||
|
uses: actions/upload-artifact@v4
|
||||||
|
with:
|
||||||
|
name: mictronics-db
|
||||||
|
path: data/openairframes/mictronics-db_*.zip
|
||||||
|
retention-days: 1
|
||||||
|
if-no-files-found: ignore
|
||||||
|
|
||||||
create-release:
|
create-release:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
needs: [build-faa, adsb-reduce, build-community]
|
needs: [build-faa, adsb-reduce, build-community, build-adsbexchange-json, build-mictronics-db]
|
||||||
if: github.event_name != 'schedule'
|
if: github.event_name != 'schedule' && !failure() && !cancelled()
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout for gh CLI
|
- name: Checkout for gh CLI
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
@@ -291,6 +345,19 @@ jobs:
|
|||||||
name: community-release
|
name: community-release
|
||||||
path: artifacts/community
|
path: artifacts/community
|
||||||
|
|
||||||
|
- name: Download ADS-B Exchange JSON artifact
|
||||||
|
uses: actions/download-artifact@v4
|
||||||
|
with:
|
||||||
|
name: adsbexchange-json
|
||||||
|
path: artifacts/adsbexchange
|
||||||
|
|
||||||
|
- name: Download Mictronics DB artifact
|
||||||
|
uses: actions/download-artifact@v4
|
||||||
|
continue-on-error: true
|
||||||
|
with:
|
||||||
|
name: mictronics-db
|
||||||
|
path: artifacts/mictronics
|
||||||
|
|
||||||
- name: Debug artifact structure
|
- name: Debug artifact structure
|
||||||
run: |
|
run: |
|
||||||
echo "=== Full artifacts tree ==="
|
echo "=== Full artifacts tree ==="
|
||||||
@@ -301,6 +368,10 @@ jobs:
|
|||||||
find artifacts/adsb -type f 2>/dev/null || echo "No files found in artifacts/adsb"
|
find artifacts/adsb -type f 2>/dev/null || echo "No files found in artifacts/adsb"
|
||||||
echo "=== Community artifacts ==="
|
echo "=== Community artifacts ==="
|
||||||
find artifacts/community -type f 2>/dev/null || echo "No files found in artifacts/community"
|
find artifacts/community -type f 2>/dev/null || echo "No files found in artifacts/community"
|
||||||
|
echo "=== ADS-B Exchange JSON artifacts ==="
|
||||||
|
find artifacts/adsbexchange -type f 2>/dev/null || echo "No files found in artifacts/adsbexchange"
|
||||||
|
echo "=== Mictronics DB artifacts ==="
|
||||||
|
find artifacts/mictronics -type f 2>/dev/null || echo "No files found in artifacts/mictronics"
|
||||||
|
|
||||||
- name: Prepare release metadata
|
- name: Prepare release metadata
|
||||||
id: meta
|
id: meta
|
||||||
@@ -317,9 +388,11 @@ jobs:
|
|||||||
|
|
||||||
# Find files from artifacts using find (handles nested structures)
|
# Find files from artifacts using find (handles nested structures)
|
||||||
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
|
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
|
||||||
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1)
|
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1)
|
||||||
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
|
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
|
||||||
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
|
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
|
||||||
|
JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1)
|
||||||
|
ZIP_FILE_MICTRONICS=$(find artifacts/mictronics -name "mictronics-db_*.zip" -type f 2>/dev/null | head -1)
|
||||||
|
|
||||||
# Validate required files exist
|
# Validate required files exist
|
||||||
MISSING_FILES=""
|
MISSING_FILES=""
|
||||||
@@ -332,12 +405,24 @@ jobs:
|
|||||||
if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then
|
if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then
|
||||||
MISSING_FILES="$MISSING_FILES FAA_ZIP"
|
MISSING_FILES="$MISSING_FILES FAA_ZIP"
|
||||||
fi
|
fi
|
||||||
|
if [ -z "$JSON_FILE_ADSBX" ] || [ ! -f "$JSON_FILE_ADSBX" ]; then
|
||||||
|
MISSING_FILES="$MISSING_FILES ADSBX_JSON"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Optional files - warn but don't fail
|
||||||
|
OPTIONAL_MISSING=""
|
||||||
|
if [ -z "$ZIP_FILE_MICTRONICS" ] || [ ! -f "$ZIP_FILE_MICTRONICS" ]; then
|
||||||
|
OPTIONAL_MISSING="$OPTIONAL_MISSING MICTRONICS_ZIP"
|
||||||
|
ZIP_FILE_MICTRONICS=""
|
||||||
|
fi
|
||||||
|
|
||||||
if [ -n "$MISSING_FILES" ]; then
|
if [ -n "$MISSING_FILES" ]; then
|
||||||
echo "ERROR: Missing required release files:$MISSING_FILES"
|
echo "ERROR: Missing required release files:$MISSING_FILES"
|
||||||
echo "FAA CSV: $CSV_FILE_FAA"
|
echo "FAA CSV: $CSV_FILE_FAA"
|
||||||
echo "ADSB CSV: $CSV_FILE_ADSB"
|
echo "ADSB CSV: $CSV_FILE_ADSB"
|
||||||
echo "ZIP: $ZIP_FILE"
|
echo "ZIP: $ZIP_FILE"
|
||||||
|
echo "ADSBX JSON: $JSON_FILE_ADSBX"
|
||||||
|
echo "MICTRONICS ZIP: $ZIP_FILE_MICTRONICS"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
@@ -346,6 +431,15 @@ jobs:
|
|||||||
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
|
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
|
||||||
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
|
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
|
||||||
ZIP_BASENAME=$(basename "$ZIP_FILE")
|
ZIP_BASENAME=$(basename "$ZIP_FILE")
|
||||||
|
JSON_BASENAME_ADSBX=$(basename "$JSON_FILE_ADSBX")
|
||||||
|
ZIP_BASENAME_MICTRONICS=""
|
||||||
|
if [ -n "$ZIP_FILE_MICTRONICS" ]; then
|
||||||
|
ZIP_BASENAME_MICTRONICS=$(basename "$ZIP_FILE_MICTRONICS")
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ -n "$OPTIONAL_MISSING" ]; then
|
||||||
|
echo "WARNING: Optional files missing:$OPTIONAL_MISSING (will continue without them)"
|
||||||
|
fi
|
||||||
|
|
||||||
echo "date=$DATE" >> "$GITHUB_OUTPUT"
|
echo "date=$DATE" >> "$GITHUB_OUTPUT"
|
||||||
echo "tag=$TAG" >> "$GITHUB_OUTPUT"
|
echo "tag=$TAG" >> "$GITHUB_OUTPUT"
|
||||||
@@ -357,6 +451,10 @@ jobs:
|
|||||||
echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT"
|
echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT"
|
||||||
echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT"
|
echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT"
|
||||||
echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT"
|
echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT"
|
||||||
|
echo "json_file_adsbx=$JSON_FILE_ADSBX" >> "$GITHUB_OUTPUT"
|
||||||
|
echo "json_basename_adsbx=$JSON_BASENAME_ADSBX" >> "$GITHUB_OUTPUT"
|
||||||
|
echo "zip_file_mictronics=$ZIP_FILE_MICTRONICS" >> "$GITHUB_OUTPUT"
|
||||||
|
echo "zip_basename_mictronics=$ZIP_BASENAME_MICTRONICS" >> "$GITHUB_OUTPUT"
|
||||||
echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
|
echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
|
||||||
|
|
||||||
echo "Found files:"
|
echo "Found files:"
|
||||||
@@ -364,6 +462,8 @@ jobs:
|
|||||||
echo " ADSB CSV: $CSV_FILE_ADSB"
|
echo " ADSB CSV: $CSV_FILE_ADSB"
|
||||||
echo " Community CSV: $CSV_FILE_COMMUNITY"
|
echo " Community CSV: $CSV_FILE_COMMUNITY"
|
||||||
echo " ZIP: $ZIP_FILE"
|
echo " ZIP: $ZIP_FILE"
|
||||||
|
echo " ADSBX JSON: $JSON_FILE_ADSBX"
|
||||||
|
echo " MICTRONICS ZIP: $ZIP_FILE_MICTRONICS"
|
||||||
|
|
||||||
- name: Delete existing release if exists
|
- name: Delete existing release if exists
|
||||||
run: |
|
run: |
|
||||||
@@ -377,7 +477,7 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
tag_name: ${{ steps.meta.outputs.tag }}
|
tag_name: ${{ steps.meta.outputs.tag }}
|
||||||
name: ${{ steps.meta.outputs.name }}
|
name: ${{ steps.meta.outputs.name }}
|
||||||
fail_on_unmatched_files: true
|
fail_on_unmatched_files: false
|
||||||
body: |
|
body: |
|
||||||
Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}.
|
Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}.
|
||||||
|
|
||||||
@@ -386,10 +486,14 @@ jobs:
|
|||||||
- ${{ steps.meta.outputs.csv_basename_adsb }}
|
- ${{ steps.meta.outputs.csv_basename_adsb }}
|
||||||
- ${{ steps.meta.outputs.csv_basename_community }}
|
- ${{ steps.meta.outputs.csv_basename_community }}
|
||||||
- ${{ steps.meta.outputs.zip_basename }}
|
- ${{ steps.meta.outputs.zip_basename }}
|
||||||
|
- ${{ steps.meta.outputs.json_basename_adsbx }}
|
||||||
|
${{ steps.meta.outputs.zip_basename_mictronics && format('- {0}', steps.meta.outputs.zip_basename_mictronics) || '' }}
|
||||||
files: |
|
files: |
|
||||||
${{ steps.meta.outputs.csv_file_faa }}
|
${{ steps.meta.outputs.csv_file_faa }}
|
||||||
${{ steps.meta.outputs.csv_file_adsb }}
|
${{ steps.meta.outputs.csv_file_adsb }}
|
||||||
${{ steps.meta.outputs.csv_file_community }}
|
${{ steps.meta.outputs.csv_file_community }}
|
||||||
${{ steps.meta.outputs.zip_file }}
|
${{ steps.meta.outputs.zip_file }}
|
||||||
|
${{ steps.meta.outputs.json_file_adsbx }}
|
||||||
|
${{ steps.meta.outputs.zip_file_mictronics }}
|
||||||
env:
|
env:
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
|||||||
@@ -48,29 +48,52 @@ jobs:
|
|||||||
git fetch origin "$branch_name"
|
git fetch origin "$branch_name"
|
||||||
git checkout "$branch_name"
|
git checkout "$branch_name"
|
||||||
|
|
||||||
# Merge main into PR branch
|
|
||||||
git config user.name "github-actions[bot]"
|
git config user.name "github-actions[bot]"
|
||||||
git config user.email "github-actions[bot]@users.noreply.github.com"
|
git config user.email "github-actions[bot]@users.noreply.github.com"
|
||||||
|
|
||||||
if git merge origin/main -m "Merge main to update schema"; then
|
# Get the community submission file(s) and schema from this branch
|
||||||
# Regenerate schema for this PR's submission (adds any new tags)
|
community_files=$(git diff --name-only origin/main...HEAD -- 'community/' 'schemas/')
|
||||||
python -m src.contributions.regenerate_pr_schema || true
|
|
||||||
|
|
||||||
# If there are changes, commit and push
|
if [ -z "$community_files" ]; then
|
||||||
if [ -n "$(git status --porcelain schemas/)" ]; then
|
echo " No community/schema files found in PR #$pr_number, skipping"
|
||||||
git add schemas/
|
git checkout main
|
||||||
git commit -m "Update schema with new tags"
|
continue
|
||||||
git push origin "$branch_name"
|
fi
|
||||||
echo " Updated PR #$pr_number with schema changes"
|
|
||||||
else
|
echo " Files to preserve: $community_files"
|
||||||
git push origin "$branch_name"
|
|
||||||
echo " Merged main into PR #$pr_number"
|
# Save the community files content
|
||||||
|
mkdir -p /tmp/pr_files
|
||||||
|
for file in $community_files; do
|
||||||
|
if [ -f "$file" ]; then
|
||||||
|
mkdir -p "/tmp/pr_files/$(dirname "$file")"
|
||||||
|
cp "$file" "/tmp/pr_files/$file"
|
||||||
fi
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# Reset branch to main (clean slate)
|
||||||
|
git reset --hard origin/main
|
||||||
|
|
||||||
|
# Restore the community files
|
||||||
|
for file in $community_files; do
|
||||||
|
if [ -f "/tmp/pr_files/$file" ]; then
|
||||||
|
mkdir -p "$(dirname "$file")"
|
||||||
|
cp "/tmp/pr_files/$file" "$file"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
rm -rf /tmp/pr_files
|
||||||
|
|
||||||
|
# Regenerate schema with current main + this submission's tags
|
||||||
|
python -m src.contributions.regenerate_pr_schema || true
|
||||||
|
|
||||||
|
# Stage and commit all changes
|
||||||
|
git add community/ schemas/
|
||||||
|
if ! git diff --cached --quiet; then
|
||||||
|
git commit -m "Community submission (rebased on main)"
|
||||||
|
git push --force origin "$branch_name"
|
||||||
|
echo " Rebased PR #$pr_number onto main"
|
||||||
else
|
else
|
||||||
echo " Merge conflict in PR #$pr_number, adding comment"
|
echo " No changes needed for PR #$pr_number"
|
||||||
gh pr comment "$pr_number" --body $'⚠️ **Merge Conflict**\n\nAnother community submission was merged and this PR has conflicts.\n\nA maintainer may need to:\n1. Close this PR\n2. Remove the `approved` label from the original issue\n3. Re-add the `approved` label to regenerate the PR'
|
|
||||||
git merge --abort
|
|
||||||
fi
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
git checkout main
|
git checkout main
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ A daily release is created at **06:00 UTC** and includes:
|
|||||||
All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB)
|
All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB)
|
||||||
|
|
||||||
- **openairframes_adsb.csv**
|
- **openairframes_adsb.csv**
|
||||||
Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present. The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB).
|
Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present (will be from 2024-01-01 soon). The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB).
|
||||||
|
|
||||||
- **ReleasableAircraft_{date}.zip**
|
- **ReleasableAircraft_{date}.zip**
|
||||||
A daily snapshot of the FAA database, which updates at **05:30 UTC**
|
A daily snapshot of the FAA database, which updates at **05:30 UTC**
|
||||||
@@ -43,7 +43,8 @@ Please try to follow the submission formatting guidelines. If you are struggling
|
|||||||
|
|
||||||
## For Developers
|
## For Developers
|
||||||
All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include:
|
All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include:
|
||||||
- Web UI
|
- Web UI for data
|
||||||
|
- Web UI for contributors
|
||||||
- Additional export formats in the daily release
|
- Additional export formats in the daily release
|
||||||
- Data fusion from multiple sources in the daily release
|
- Data fusion from multiple sources in the daily release
|
||||||
- Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs
|
- Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs
|
||||||
|
|||||||
@@ -1,11 +0,0 @@
|
|||||||
FROM --platform=linux/arm64 python:3.12-slim
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
COPY requirements.reducer.txt requirements.txt
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
|
||||||
|
|
||||||
COPY compress_adsb_to_aircraft_data.py .
|
|
||||||
COPY reducer.py .
|
|
||||||
|
|
||||||
CMD ["python", "-u", "reducer.py"]
|
|
||||||
@@ -1,12 +0,0 @@
|
|||||||
FROM --platform=linux/arm64 python:3.12-slim
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
COPY requirements.worker.txt requirements.txt
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
|
||||||
|
|
||||||
COPY compress_adsb_to_aircraft_data.py .
|
|
||||||
COPY download_adsb_data_to_parquet.py .
|
|
||||||
COPY worker.py .
|
|
||||||
|
|
||||||
CMD ["python", "-u", "worker.py"]
|
|
||||||
@@ -14,11 +14,12 @@ Usage:
|
|||||||
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base
|
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base
|
||||||
"""
|
"""
|
||||||
import gc
|
import gc
|
||||||
|
import gzip
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import glob
|
import glob
|
||||||
import argparse
|
import argparse
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta, timezone
|
||||||
|
|
||||||
import polars as pl
|
import polars as pl
|
||||||
|
|
||||||
@@ -33,7 +34,7 @@ os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
|
|||||||
|
|
||||||
def get_target_day() -> datetime:
|
def get_target_day() -> datetime:
|
||||||
"""Get yesterday's date (the day we're processing)."""
|
"""Get yesterday's date (the day we're processing)."""
|
||||||
return datetime.utcnow() - timedelta(days=1)
|
return datetime.now(timezone.utc) - timedelta(days=1)
|
||||||
|
|
||||||
|
|
||||||
def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame:
|
def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame:
|
||||||
@@ -83,58 +84,53 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram
|
|||||||
return combined
|
return combined
|
||||||
|
|
||||||
|
|
||||||
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame:
|
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.DataFrame, str | None]:
|
||||||
"""Download base release and merge with new data."""
|
"""Download base release and merge with new data.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (merged_df, earliest_date_str) where earliest_date_str is None if no base release was merged
|
||||||
|
"""
|
||||||
from src.get_latest_release import download_latest_aircraft_adsb_csv
|
from src.get_latest_release import download_latest_aircraft_adsb_csv
|
||||||
|
|
||||||
print("Downloading base ADS-B release...")
|
print("Downloading base ADS-B release...")
|
||||||
try:
|
base_path = download_latest_aircraft_adsb_csv(
|
||||||
base_path = download_latest_aircraft_adsb_csv(
|
output_dir="./data/openairframes_base"
|
||||||
output_dir="./data/openairframes_base"
|
)
|
||||||
)
|
print(f"Download returned: {base_path}")
|
||||||
print(f"Download returned: {base_path}")
|
|
||||||
|
|
||||||
if base_path and os.path.exists(str(base_path)):
|
print(f"Loading base release from {base_path}")
|
||||||
print(f"Loading base release from {base_path}")
|
|
||||||
base_df = pl.read_csv(base_path)
|
|
||||||
print(f"Base release has {len(base_df)} records")
|
|
||||||
|
|
||||||
# Ensure columns match
|
# Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz)
|
||||||
base_cols = set(base_df.columns)
|
import re
|
||||||
new_cols = set(compressed_df.columns)
|
filename = os.path.basename(str(base_path))
|
||||||
print(f"Base columns: {sorted(base_cols)}")
|
match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename)
|
||||||
print(f"New columns: {sorted(new_cols)}")
|
earliest_date = match.group(1) if match else None
|
||||||
|
print(f"Start date from base filename: {earliest_date}")
|
||||||
|
|
||||||
# Add missing columns
|
# Read CSV with schema matching the new data
|
||||||
for col in new_cols - base_cols:
|
base_df = pl.read_csv(base_path, schema=compressed_df.schema)
|
||||||
base_df = base_df.with_columns(pl.lit(None).alias(col))
|
print(f"Base release has {len(base_df)} records")
|
||||||
for col in base_cols - new_cols:
|
|
||||||
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
|
|
||||||
|
|
||||||
# Reorder columns to match
|
# Ensure columns match
|
||||||
compressed_df = compressed_df.select(base_df.columns)
|
base_cols = set(base_df.columns)
|
||||||
|
new_cols = set(compressed_df.columns)
|
||||||
|
print(f"Base columns: {sorted(base_cols)}")
|
||||||
|
print(f"New columns: {sorted(new_cols)}")
|
||||||
|
|
||||||
# Concat and deduplicate by icao (keep new data - it comes last)
|
# Add missing columns
|
||||||
combined = pl.concat([base_df, compressed_df])
|
for col in new_cols - base_cols:
|
||||||
print(f"After concat: {len(combined)} records")
|
base_df = base_df.with_columns(pl.lit(None).alias(col))
|
||||||
|
for col in base_cols - new_cols:
|
||||||
|
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
|
||||||
|
|
||||||
deduplicated = combined.unique(subset=["icao"], keep="last")
|
# Reorder columns to match
|
||||||
|
compressed_df = compressed_df.select(base_df.columns)
|
||||||
|
|
||||||
print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup")
|
# Concat and deduplicate by icao (keep new data - it comes last)
|
||||||
|
combined = pl.concat([base_df, compressed_df])
|
||||||
del base_df, combined
|
print(f"After concat: {len(combined)} records")
|
||||||
gc.collect()
|
|
||||||
|
|
||||||
return deduplicated
|
|
||||||
else:
|
|
||||||
print(f"No base release found at {base_path}, using only new data")
|
|
||||||
return compressed_df
|
|
||||||
except Exception as e:
|
|
||||||
import traceback
|
|
||||||
print(f"Failed to download base release: {e}")
|
|
||||||
traceback.print_exc()
|
|
||||||
return compressed_df
|
|
||||||
|
|
||||||
|
return combined, earliest_date
|
||||||
|
|
||||||
def cleanup_chunks(output_id: str, chunks_dir: str):
|
def cleanup_chunks(output_id: str, chunks_dir: str):
|
||||||
"""Delete chunk parquet files after successful merge."""
|
"""Delete chunk parquet files after successful merge."""
|
||||||
@@ -176,7 +172,7 @@ def main():
|
|||||||
if args.start_date and args.end_date:
|
if args.start_date and args.end_date:
|
||||||
# Historical mode
|
# Historical mode
|
||||||
output_id = f"{args.start_date}_{args.end_date}"
|
output_id = f"{args.start_date}_{args.end_date}"
|
||||||
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv"
|
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv.gz"
|
||||||
print(f"Combining chunks for date range: {args.start_date} to {args.end_date}")
|
print(f"Combining chunks for date range: {args.start_date} to {args.end_date}")
|
||||||
else:
|
else:
|
||||||
# Daily mode - use same date for start and end
|
# Daily mode - use same date for start and end
|
||||||
@@ -187,7 +183,7 @@ def main():
|
|||||||
|
|
||||||
date_str = target_day.strftime("%Y-%m-%d")
|
date_str = target_day.strftime("%Y-%m-%d")
|
||||||
output_id = date_str
|
output_id = date_str
|
||||||
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv"
|
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv.gz"
|
||||||
print(f"Combining chunks for {date_str}")
|
print(f"Combining chunks for {date_str}")
|
||||||
|
|
||||||
chunks_dir = args.chunks_dir
|
chunks_dir = args.chunks_dir
|
||||||
@@ -220,8 +216,15 @@ def main():
|
|||||||
print(f"After combining: {get_resource_usage()}")
|
print(f"After combining: {get_resource_usage()}")
|
||||||
|
|
||||||
# Merge with base release (unless skipped)
|
# Merge with base release (unless skipped)
|
||||||
|
base_start_date = None
|
||||||
if not args.skip_base:
|
if not args.skip_base:
|
||||||
combined = download_and_merge_base_release(combined)
|
combined, base_start_date = download_and_merge_base_release(combined)
|
||||||
|
|
||||||
|
# Update filename if we merged with base release and got a start date
|
||||||
|
if base_start_date and not (args.start_date and args.end_date):
|
||||||
|
# Only update filename for daily mode when base was merged
|
||||||
|
output_filename = f"openairframes_adsb_{base_start_date}_{date_str}.csv.gz"
|
||||||
|
print(f"Updated filename to reflect date range: {output_filename}")
|
||||||
|
|
||||||
# Convert list columns to strings for CSV compatibility
|
# Convert list columns to strings for CSV compatibility
|
||||||
for col in combined.columns:
|
for col in combined.columns:
|
||||||
@@ -234,9 +237,17 @@ def main():
|
|||||||
if 'time' in combined.columns:
|
if 'time' in combined.columns:
|
||||||
combined = combined.sort('time')
|
combined = combined.sort('time')
|
||||||
|
|
||||||
|
# Replace empty strings with null across all string columns to avoid quoted empty strings
|
||||||
|
for col in combined.columns:
|
||||||
|
if combined[col].dtype == pl.Utf8:
|
||||||
|
combined = combined.with_columns(
|
||||||
|
pl.when(pl.col(col) == "").then(None).otherwise(pl.col(col)).alias(col)
|
||||||
|
)
|
||||||
|
|
||||||
# Write final CSV
|
# Write final CSV
|
||||||
output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename)
|
output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename)
|
||||||
combined.write_csv(output_path)
|
with gzip.open(output_path, "wb", compresslevel=9) as f:
|
||||||
|
combined.write_csv(f, null_value='', quote_style='necessary')
|
||||||
print(f"Wrote {len(combined)} records to {output_path}")
|
print(f"Wrote {len(combined)} records to {output_path}")
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
|
|||||||
@@ -264,7 +264,7 @@ def get_latest_aircraft_adsb_csv_df():
|
|||||||
if df[col].dtype == pl.Utf8:
|
if df[col].dtype == pl.Utf8:
|
||||||
df = df.with_columns(pl.col(col).fill_null(""))
|
df = df.with_columns(pl.col(col).fill_null(""))
|
||||||
|
|
||||||
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
|
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
|
||||||
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
|
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
|
||||||
if not match:
|
if not match:
|
||||||
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
|
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
|
||||||
|
|||||||
@@ -76,14 +76,10 @@ def timeout_handler(signum, frame):
|
|||||||
raise DownloadTimeoutException("Download timed out after 40 seconds")
|
raise DownloadTimeoutException("Download timed out after 40 seconds")
|
||||||
|
|
||||||
|
|
||||||
def fetch_releases(version_date: str) -> list:
|
def _fetch_releases_from_repo(year: str, version_date: str) -> list:
|
||||||
"""Fetch GitHub releases for a given version date from adsblol."""
|
"""Fetch GitHub releases for a given version date from a specific year's adsblol repo."""
|
||||||
year = version_date.split('.')[0][1:]
|
|
||||||
if version_date == "v2024.12.31":
|
|
||||||
year = "2025"
|
|
||||||
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
|
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
|
||||||
# Match exact release name, exclude tmp releases
|
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+(tmp)?$"
|
||||||
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+$"
|
|
||||||
releases = []
|
releases = []
|
||||||
page = 1
|
page = 1
|
||||||
|
|
||||||
@@ -123,6 +119,25 @@ def fetch_releases(version_date: str) -> list:
|
|||||||
return releases
|
return releases
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_releases(version_date: str) -> list:
|
||||||
|
"""Fetch GitHub releases for a given version date from adsblol.
|
||||||
|
|
||||||
|
For Dec 31 dates, if no releases are found in the current year's repo,
|
||||||
|
also checks the next year's repo (adsblol sometimes publishes Dec 31
|
||||||
|
data in the following year's repository).
|
||||||
|
"""
|
||||||
|
year = version_date.split('.')[0][1:]
|
||||||
|
releases = _fetch_releases_from_repo(year, version_date)
|
||||||
|
|
||||||
|
# For last day of year, also check next year's repo if nothing found
|
||||||
|
if not releases and version_date.endswith(".12.31"):
|
||||||
|
next_year = str(int(year) + 1)
|
||||||
|
print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo...")
|
||||||
|
releases = _fetch_releases_from_repo(next_year, version_date)
|
||||||
|
|
||||||
|
return releases
|
||||||
|
|
||||||
|
|
||||||
def download_asset(asset_url: str, file_path: str) -> bool:
|
def download_asset(asset_url: str, file_path: str) -> bool:
|
||||||
"""Download a single release asset."""
|
"""Download a single release asset."""
|
||||||
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
|
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
|
||||||
@@ -582,6 +597,12 @@ def process_version_date(version_date: str, keep_folders: bool = False):
|
|||||||
print(f"No releases found for {vd}.")
|
print(f"No releases found for {vd}.")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Prefer non-tmp releases; only use tmp if no normal releases exist
|
||||||
|
normal_releases = [r for r in releases if "tmp" not in r["tag_name"]]
|
||||||
|
tmp_releases = [r for r in releases if "tmp" in r["tag_name"]]
|
||||||
|
releases = normal_releases if normal_releases else tmp_releases
|
||||||
|
print(f"Using {'normal' if normal_releases else 'tmp'} releases ({len(releases)} found)")
|
||||||
|
|
||||||
downloaded_files = []
|
downloaded_files = []
|
||||||
for release in releases:
|
for release in releases:
|
||||||
tag_name = release["tag_name"]
|
tag_name = release["tag_name"]
|
||||||
|
|||||||
@@ -59,6 +59,12 @@ def download_and_extract(version_date: str) -> str | None:
|
|||||||
print(f"No releases found for {version_date}")
|
print(f"No releases found for {version_date}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Prefer non-tmp releases; only use tmp if no normal releases exist
|
||||||
|
normal_releases = [r for r in releases if "tmp" not in r["tag_name"]]
|
||||||
|
tmp_releases = [r for r in releases if "tmp" in r["tag_name"]]
|
||||||
|
releases = normal_releases if normal_releases else tmp_releases
|
||||||
|
print(f"Using {'normal' if normal_releases else 'tmp'} releases ({len(releases)} found)")
|
||||||
|
|
||||||
downloaded_files = []
|
downloaded_files = []
|
||||||
for release in releases:
|
for release in releases:
|
||||||
tag_name = release["tag_name"]
|
tag_name = release["tag_name"]
|
||||||
|
|||||||
@@ -1,97 +0,0 @@
|
|||||||
"""
|
|
||||||
Reduce step: downloads all chunk CSVs from S3, combines them,
|
|
||||||
deduplicates across the full dataset, and uploads the final result.
|
|
||||||
|
|
||||||
Environment variables:
|
|
||||||
S3_BUCKET — bucket with intermediate results
|
|
||||||
RUN_ID — run identifier matching the map workers
|
|
||||||
GLOBAL_START_DATE — overall start date for output filename
|
|
||||||
GLOBAL_END_DATE — overall end date for output filename
|
|
||||||
"""
|
|
||||||
import gzip
|
|
||||||
import os
|
|
||||||
import shutil
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import boto3
|
|
||||||
import polars as pl
|
|
||||||
|
|
||||||
from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
s3_bucket = os.environ["S3_BUCKET"]
|
|
||||||
run_id = os.environ.get("RUN_ID", "default")
|
|
||||||
global_start = os.environ["GLOBAL_START_DATE"]
|
|
||||||
global_end = os.environ["GLOBAL_END_DATE"]
|
|
||||||
|
|
||||||
s3 = boto3.client("s3")
|
|
||||||
prefix = f"intermediate/{run_id}/"
|
|
||||||
|
|
||||||
# List all chunk files for this run
|
|
||||||
paginator = s3.get_paginator("list_objects_v2")
|
|
||||||
chunk_keys = []
|
|
||||||
for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix):
|
|
||||||
for obj in page.get("Contents", []):
|
|
||||||
if obj["Key"].endswith(".csv.gz"):
|
|
||||||
chunk_keys.append(obj["Key"])
|
|
||||||
|
|
||||||
chunk_keys.sort()
|
|
||||||
print(f"Found {len(chunk_keys)} chunks to combine")
|
|
||||||
|
|
||||||
if not chunk_keys:
|
|
||||||
print("No chunks found — nothing to reduce.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Download and concatenate all chunks
|
|
||||||
download_dir = Path("/tmp/chunks")
|
|
||||||
download_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
dfs = []
|
|
||||||
|
|
||||||
for key in chunk_keys:
|
|
||||||
gz_path = download_dir / Path(key).name
|
|
||||||
csv_path = gz_path.with_suffix("") # Remove .gz
|
|
||||||
print(f"Downloading {key}...")
|
|
||||||
s3.download_file(s3_bucket, key, str(gz_path))
|
|
||||||
|
|
||||||
# Decompress
|
|
||||||
with gzip.open(gz_path, 'rb') as f_in:
|
|
||||||
with open(csv_path, 'wb') as f_out:
|
|
||||||
shutil.copyfileobj(f_in, f_out)
|
|
||||||
gz_path.unlink()
|
|
||||||
|
|
||||||
df_chunk = pl.read_csv(csv_path)
|
|
||||||
print(f" Loaded {df_chunk.height} rows from {csv_path.name}")
|
|
||||||
dfs.append(df_chunk)
|
|
||||||
|
|
||||||
# Free disk space after loading
|
|
||||||
csv_path.unlink()
|
|
||||||
|
|
||||||
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
|
|
||||||
print(f"Combined: {df_accumulated.height} rows before dedup")
|
|
||||||
|
|
||||||
# Final global deduplication
|
|
||||||
df_accumulated = deduplicate_by_signature(df_accumulated)
|
|
||||||
print(f"After dedup: {df_accumulated.height} rows")
|
|
||||||
|
|
||||||
# Write and upload final result
|
|
||||||
output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz"
|
|
||||||
csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv")
|
|
||||||
gz_output = Path(f"/tmp/{output_name}")
|
|
||||||
|
|
||||||
df_accumulated.write_csv(csv_output)
|
|
||||||
with open(csv_output, 'rb') as f_in:
|
|
||||||
with gzip.open(gz_output, 'wb') as f_out:
|
|
||||||
shutil.copyfileobj(f_in, f_out)
|
|
||||||
csv_output.unlink()
|
|
||||||
|
|
||||||
final_key = f"final/{output_name}"
|
|
||||||
print(f"Uploading to s3://{s3_bucket}/{final_key}")
|
|
||||||
s3.upload_file(str(gz_output), s3_bucket, final_key)
|
|
||||||
|
|
||||||
print(f"Final output: {df_accumulated.height} records -> {final_key}")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@@ -0,0 +1,155 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Run the full ADS-B processing pipeline locally.
|
||||||
|
|
||||||
|
Downloads adsb.lol data, processes trace files, and outputs openairframes_adsb CSV.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
# Single day (yesterday by default)
|
||||||
|
python -m src.adsb.run_local
|
||||||
|
|
||||||
|
# Single day (specific date)
|
||||||
|
python -m src.adsb.run_local 2024-01-15
|
||||||
|
|
||||||
|
# Date range (inclusive)
|
||||||
|
python -m src.adsb.run_local 2024-01-01 2024-01-07
|
||||||
|
"""
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
|
||||||
|
def run_cmd(cmd: list[str], description: str) -> None:
|
||||||
|
"""Run a command and exit on failure."""
|
||||||
|
print(f"\n>>> {' '.join(cmd)}")
|
||||||
|
result = subprocess.run(cmd)
|
||||||
|
if result.returncode != 0:
|
||||||
|
print(f"ERROR: {description} failed with exit code {result.returncode}")
|
||||||
|
sys.exit(result.returncode)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Run full ADS-B processing pipeline locally",
|
||||||
|
usage="python -m src.adsb.run_local [start_date] [end_date]"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"start_date",
|
||||||
|
nargs="?",
|
||||||
|
help="Start date (YYYY-MM-DD). Default: yesterday"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"end_date",
|
||||||
|
nargs="?",
|
||||||
|
help="End date (YYYY-MM-DD, inclusive). If omitted, processes single day"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--chunks",
|
||||||
|
type=int,
|
||||||
|
default=4,
|
||||||
|
help="Number of parallel chunks (default: 4)"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--skip-base",
|
||||||
|
action="store_true",
|
||||||
|
help="Skip downloading and merging with base release"
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Determine dates
|
||||||
|
if args.start_date:
|
||||||
|
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
|
||||||
|
else:
|
||||||
|
start_date = datetime.utcnow() - timedelta(days=1)
|
||||||
|
|
||||||
|
end_date = None
|
||||||
|
if args.end_date:
|
||||||
|
end_date = datetime.strptime(args.end_date, "%Y-%m-%d")
|
||||||
|
|
||||||
|
start_str = start_date.strftime("%Y-%m-%d")
|
||||||
|
end_str = end_date.strftime("%Y-%m-%d") if end_date else None
|
||||||
|
|
||||||
|
print("=" * 60)
|
||||||
|
print("ADS-B Processing Pipeline")
|
||||||
|
print("=" * 60)
|
||||||
|
if end_str:
|
||||||
|
print(f"Date range: {start_str} to {end_str}")
|
||||||
|
else:
|
||||||
|
print(f"Date: {start_str}")
|
||||||
|
print(f"Chunks: {args.chunks}")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
# Step 1: Download and extract
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("Step 1: Download and Extract")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
if end_str:
|
||||||
|
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
|
||||||
|
"--start-date", start_str, "--end-date", end_str]
|
||||||
|
else:
|
||||||
|
cmd = ["python", "-m", "src.adsb.download_and_list_icaos",
|
||||||
|
"--date", start_str]
|
||||||
|
run_cmd(cmd, "Download and extract")
|
||||||
|
|
||||||
|
# Step 2: Process chunks
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("Step 2: Process Chunks")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
for chunk_id in range(args.chunks):
|
||||||
|
print(f"\n--- Chunk {chunk_id + 1}/{args.chunks} ---")
|
||||||
|
if end_str:
|
||||||
|
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
|
||||||
|
"--chunk-id", str(chunk_id),
|
||||||
|
"--total-chunks", str(args.chunks),
|
||||||
|
"--start-date", start_str,
|
||||||
|
"--end-date", end_str]
|
||||||
|
else:
|
||||||
|
cmd = ["python", "-m", "src.adsb.process_icao_chunk",
|
||||||
|
"--chunk-id", str(chunk_id),
|
||||||
|
"--total-chunks", str(args.chunks),
|
||||||
|
"--date", start_str]
|
||||||
|
run_cmd(cmd, f"Process chunk {chunk_id}")
|
||||||
|
|
||||||
|
# Step 3: Combine chunks to CSV
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("Step 3: Combine to CSV")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
chunks_dir = "./data/output/adsb_chunks"
|
||||||
|
cmd = ["python", "-m", "src.adsb.combine_chunks_to_csv",
|
||||||
|
"--chunks-dir", chunks_dir]
|
||||||
|
|
||||||
|
if end_str:
|
||||||
|
cmd.extend(["--start-date", start_str, "--end-date", end_str])
|
||||||
|
else:
|
||||||
|
cmd.extend(["--date", start_str])
|
||||||
|
|
||||||
|
if args.skip_base:
|
||||||
|
cmd.append("--skip-base")
|
||||||
|
|
||||||
|
run_cmd(cmd, "Combine chunks")
|
||||||
|
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("Done!")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
# Show output
|
||||||
|
output_dir = "./data/openairframes"
|
||||||
|
if end_str:
|
||||||
|
output_file = f"openairframes_adsb_{start_str}_{end_str}.csv"
|
||||||
|
else:
|
||||||
|
output_file = f"openairframes_adsb_{start_str}_{start_str}.csv"
|
||||||
|
|
||||||
|
output_path = os.path.join(output_dir, output_file)
|
||||||
|
if os.path.exists(output_path):
|
||||||
|
size_mb = os.path.getsize(output_path) / (1024 * 1024)
|
||||||
|
print(f"Output: {output_path}")
|
||||||
|
print(f"Size: {size_mb:.1f} MB")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -1,89 +0,0 @@
|
|||||||
"""
|
|
||||||
Map worker: processes a date range chunk, uploads result to S3.
|
|
||||||
|
|
||||||
Environment variables:
|
|
||||||
START_DATE — inclusive, YYYY-MM-DD
|
|
||||||
END_DATE — exclusive, YYYY-MM-DD
|
|
||||||
S3_BUCKET — bucket for intermediate results
|
|
||||||
RUN_ID — unique run identifier for namespacing S3 keys
|
|
||||||
"""
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
from datetime import datetime, timedelta
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import boto3
|
|
||||||
import polars as pl
|
|
||||||
|
|
||||||
from compress_adsb_to_aircraft_data import (
|
|
||||||
load_historical_for_day,
|
|
||||||
deduplicate_by_signature,
|
|
||||||
COLUMNS,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
start_date_str = os.environ["START_DATE"]
|
|
||||||
end_date_str = os.environ["END_DATE"]
|
|
||||||
s3_bucket = os.environ["S3_BUCKET"]
|
|
||||||
run_id = os.environ.get("RUN_ID", "default")
|
|
||||||
|
|
||||||
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
|
|
||||||
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
|
|
||||||
|
|
||||||
total_days = (end_date - start_date).days
|
|
||||||
print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})")
|
|
||||||
|
|
||||||
dfs = []
|
|
||||||
current_date = start_date
|
|
||||||
|
|
||||||
while current_date < end_date:
|
|
||||||
day_str = current_date.strftime("%Y-%m-%d")
|
|
||||||
print(f" Loading {day_str}...")
|
|
||||||
|
|
||||||
df_compressed = load_historical_for_day(current_date)
|
|
||||||
if df_compressed.height == 0:
|
|
||||||
raise RuntimeError(f"No data found for {day_str}")
|
|
||||||
|
|
||||||
dfs.append(df_compressed)
|
|
||||||
total_rows = sum(df.height for df in dfs)
|
|
||||||
print(f" +{df_compressed.height} rows (total: {total_rows})")
|
|
||||||
|
|
||||||
# Delete local cache after each day to save disk in container
|
|
||||||
cache_dir = Path("data/adsb")
|
|
||||||
if cache_dir.exists():
|
|
||||||
import shutil
|
|
||||||
shutil.rmtree(cache_dir)
|
|
||||||
|
|
||||||
current_date += timedelta(days=1)
|
|
||||||
|
|
||||||
# Concatenate all days
|
|
||||||
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
|
|
||||||
|
|
||||||
# Deduplicate within this chunk
|
|
||||||
df_accumulated = deduplicate_by_signature(df_accumulated)
|
|
||||||
print(f"After dedup: {df_accumulated.height} rows")
|
|
||||||
|
|
||||||
# Write to local file then upload to S3
|
|
||||||
local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv")
|
|
||||||
df_accumulated.write_csv(local_path)
|
|
||||||
|
|
||||||
# Compress with gzip
|
|
||||||
import gzip
|
|
||||||
import shutil
|
|
||||||
gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz")
|
|
||||||
with open(local_path, 'rb') as f_in:
|
|
||||||
with gzip.open(gz_path, 'wb') as f_out:
|
|
||||||
shutil.copyfileobj(f_in, f_out)
|
|
||||||
local_path.unlink() # Remove uncompressed file
|
|
||||||
|
|
||||||
s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz"
|
|
||||||
print(f"Uploading to s3://{s3_bucket}/{s3_key}")
|
|
||||||
|
|
||||||
s3 = boto3.client("s3")
|
|
||||||
s3.upload_file(str(gz_path), s3_bucket, s3_key)
|
|
||||||
print("Done.")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@@ -0,0 +1,40 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Download ADS-B Exchange basic-ac-db.json.gz.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python -m src.contributions.create_daily_adsbexchange_release [--date YYYY-MM-DD]
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import shutil
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from urllib.request import Request, urlopen
|
||||||
|
|
||||||
|
URL = "https://downloads.adsbexchange.com/downloads/basic-ac-db.json.gz"
|
||||||
|
OUT_ROOT = Path("data/openairframes")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser(description="Create daily ADS-B Exchange JSON release")
|
||||||
|
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today UTC)")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
OUT_ROOT.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
gz_path = OUT_ROOT / f"basic-ac-db_{date_str}.json.gz"
|
||||||
|
|
||||||
|
print(f"Downloading {URL}...")
|
||||||
|
req = Request(URL, headers={"User-Agent": "openairframes-downloader/1.0"}, method="GET")
|
||||||
|
with urlopen(req, timeout=300) as r, gz_path.open("wb") as f:
|
||||||
|
shutil.copyfileobj(r, f)
|
||||||
|
|
||||||
|
print(f"Wrote: {gz_path}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Download Mictronics aircraft database zip.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python -m src.contributions.create_daily_microtonics_release [--date YYYY-MM-DD]
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import shutil
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from urllib.error import URLError
|
||||||
|
from urllib.request import Request, urlopen
|
||||||
|
|
||||||
|
URL = "https://www.mictronics.de/aircraft-database/indexedDB_old.php"
|
||||||
|
OUT_ROOT = Path("data/openairframes")
|
||||||
|
MAX_RETRIES = 3
|
||||||
|
RETRY_DELAY = 30 # seconds
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser(description="Create daily Mictronics database release")
|
||||||
|
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today UTC)")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
OUT_ROOT.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
zip_path = OUT_ROOT / f"mictronics-db_{date_str}.zip"
|
||||||
|
|
||||||
|
for attempt in range(1, MAX_RETRIES + 1):
|
||||||
|
try:
|
||||||
|
print(f"Downloading {URL} (attempt {attempt}/{MAX_RETRIES})...")
|
||||||
|
req = Request(URL, headers={"User-Agent": "Mozilla/5.0 (compatible; openairframes-downloader/1.0)"}, method="GET")
|
||||||
|
with urlopen(req, timeout=120) as r, zip_path.open("wb") as f:
|
||||||
|
shutil.copyfileobj(r, f)
|
||||||
|
print(f"Wrote: {zip_path}")
|
||||||
|
return
|
||||||
|
except (URLError, TimeoutError) as e:
|
||||||
|
print(f"Attempt {attempt} failed: {e}")
|
||||||
|
if attempt < MAX_RETRIES:
|
||||||
|
print(f"Retrying in {RETRY_DELAY} seconds...")
|
||||||
|
time.sleep(RETRY_DELAY)
|
||||||
|
else:
|
||||||
|
print("All retries exhausted. Mictronics download failed.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -77,7 +77,7 @@ if __name__ == '__main__':
|
|||||||
OUT_ROOT = Path("data/openairframes")
|
OUT_ROOT = Path("data/openairframes")
|
||||||
OUT_ROOT.mkdir(parents=True, exist_ok=True)
|
OUT_ROOT.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv"
|
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv.gz"
|
||||||
df_combined.write_csv(output_file)
|
df_combined.write_csv(output_file)
|
||||||
|
|
||||||
print(f"Saved: {output_file}")
|
print(f"Saved: {output_file}")
|
||||||
|
|||||||
@@ -47,6 +47,9 @@ def convert_faa_master_txt_to_df(zip_path: Path, date: str):
|
|||||||
|
|
||||||
# Convert all NaN to empty strings
|
# Convert all NaN to empty strings
|
||||||
df = df.fillna("")
|
df = df.fillna("")
|
||||||
|
# The FAA parser can produce the literal string "None" for missing values;
|
||||||
|
# replace those so they match the empty-string convention used everywhere else.
|
||||||
|
df = df.replace("None", "")
|
||||||
|
|
||||||
return df
|
return df
|
||||||
|
|
||||||
@@ -84,8 +87,8 @@ def concat_faa_historical_df(df_base, df_new):
|
|||||||
# Convert to string
|
# Convert to string
|
||||||
val_str = str(val).strip()
|
val_str = str(val).strip()
|
||||||
|
|
||||||
# Handle empty strings
|
# Handle empty strings and null-like literals
|
||||||
if val_str == "" or val_str == "nan":
|
if val_str == "" or val_str == "nan" or val_str == "None":
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
# Check if it looks like a list representation (starts with [ )
|
# Check if it looks like a list representation (starts with [ )
|
||||||
|
|||||||
@@ -119,6 +119,7 @@ def download_latest_aircraft_csv(
|
|||||||
Returns:
|
Returns:
|
||||||
Path to the downloaded file
|
Path to the downloaded file
|
||||||
"""
|
"""
|
||||||
|
output_dir = Path(output_dir)
|
||||||
assets = get_latest_release_assets(repo, github_token=github_token)
|
assets = get_latest_release_assets(repo, github_token=github_token)
|
||||||
try:
|
try:
|
||||||
asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$")
|
asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$")
|
||||||
@@ -164,8 +165,9 @@ def download_latest_aircraft_adsb_csv(
|
|||||||
Returns:
|
Returns:
|
||||||
Path to the downloaded file
|
Path to the downloaded file
|
||||||
"""
|
"""
|
||||||
|
output_dir = Path(output_dir)
|
||||||
assets = get_latest_release_assets(repo, github_token=github_token)
|
assets = get_latest_release_assets(repo, github_token=github_token)
|
||||||
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv$")
|
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
|
||||||
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
|
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
|
||||||
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
|
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
|
||||||
return saved_to
|
return saved_to
|
||||||
@@ -176,7 +178,7 @@ def get_latest_aircraft_adsb_csv_df():
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
df = pd.read_csv(csv_path)
|
df = pd.read_csv(csv_path)
|
||||||
df = df.fillna("")
|
df = df.fillna("")
|
||||||
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
|
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
|
||||||
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
|
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
|
||||||
if not match:
|
if not match:
|
||||||
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
|
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
|
||||||
|
|||||||
Reference in New Issue
Block a user