mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-05-04 00:35:07 +02:00
Compare commits
35 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 50267f3c57 | |||
| dd323f6e55 | |||
| 0e8b21daf9 | |||
| 3960e6936c | |||
| 48623ef79e | |||
| 5affe8937c | |||
| d0254146f3 | |||
| 1699ad6d8a | |||
| 2a6892c347 | |||
| 47ccecb9ba | |||
| 2826dfd450 | |||
| fecf9ff0ea | |||
| 7e0a396fc7 | |||
| b0503bb3b2 | |||
| 0b89138daf | |||
| 4b756cdaef | |||
| 9acffe1e56 | |||
| 1694fe0b46 | |||
| c6d9e59d01 | |||
| dd6cd7b6fd | |||
| f543b671f8 | |||
| efb4cbb953 | |||
| 5578133a99 | |||
| eace7d5a63 | |||
| 82f47b662c | |||
| 787796c3ab | |||
| 61aae586ee | |||
| 5abfa6b226 | |||
| a743b74ae5 | |||
| 53a020ab73 | |||
| 2de41c9883 | |||
| bccc634158 | |||
| 43b07942b0 | |||
| 2c9e994a12 | |||
| 99b680476a |
@@ -8,8 +8,8 @@ body:
|
||||
- type: markdown
|
||||
attributes:
|
||||
value: |
|
||||
Submit **one object** or an **array of objects** that matches the community submission [schema](https://github.com/PlaneQuery/OpenAirframes/blob/main/schemas/community_submission.v1.schema.json). Reuse existing tags from the schema when possible.
|
||||
|
||||
Submit **one object** or an **array of objects** that matches the community submission schema.
|
||||
|
||||
**Rules (enforced on review/automation):**
|
||||
- Each object must include **at least one** of:
|
||||
- `registration_number`
|
||||
@@ -27,7 +27,7 @@ body:
|
||||
```json
|
||||
{
|
||||
"registration_number": "N12345",
|
||||
"tags": {"owner": "John Doe", "photo": "https://example.com/photo.jpg"},
|
||||
"tags": {"owner": "John Doe"},
|
||||
"start_date": "2025-01-01"
|
||||
}
|
||||
```
|
||||
@@ -77,5 +77,6 @@ body:
|
||||
id: notes
|
||||
attributes:
|
||||
label: Notes (optional)
|
||||
description: Any context, sources, or links that help validate your submission.
|
||||
validations:
|
||||
required: false
|
||||
@@ -74,12 +74,11 @@ jobs:
|
||||
env:
|
||||
START_DATE: ${{ matrix.chunk.start_date }}
|
||||
END_DATE: ${{ matrix.chunk.end_date }}
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
python -m src.adsb.download_and_list_icaos --start-date "$START_DATE" --end-date "$END_DATE"
|
||||
ls -lah data/output/
|
||||
|
||||
- name: Create tar of extracted data and split into chunks
|
||||
- name: Create tar of extracted data
|
||||
run: |
|
||||
cd data/output
|
||||
echo "=== Disk space before tar ==="
|
||||
@@ -94,31 +93,16 @@ jobs:
|
||||
ls -lah extracted_data.tar
|
||||
# Verify tar integrity
|
||||
tar -tf extracted_data.tar > /dev/null && echo "Tar integrity check passed" || { echo "Tar integrity check FAILED"; exit 1; }
|
||||
|
||||
# Create checksum of the FULL tar before splitting (for verification after reassembly)
|
||||
echo "=== Creating checksum of full tar ==="
|
||||
sha256sum extracted_data.tar > full_tar.sha256
|
||||
cat full_tar.sha256
|
||||
|
||||
# Split into 500MB chunks to avoid artifact upload issues
|
||||
echo "=== Splitting tar into 500MB chunks ==="
|
||||
mkdir -p tar_chunks
|
||||
split -b 500M extracted_data.tar tar_chunks/extracted_data.tar.part_
|
||||
rm extracted_data.tar
|
||||
mv full_tar.sha256 tar_chunks/
|
||||
|
||||
echo "=== Chunks created ==="
|
||||
ls -lah tar_chunks/
|
||||
else
|
||||
echo "ERROR: No extracted directories found, cannot create tar"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Upload extracted data chunks
|
||||
- name: Upload extracted data
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }}
|
||||
path: data/output/tar_chunks/
|
||||
path: data/output/extracted_data.tar
|
||||
retention-days: 1
|
||||
compression-level: 0
|
||||
if-no-files-found: warn
|
||||
@@ -156,40 +140,18 @@ jobs:
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }}
|
||||
path: data/output/tar_chunks/
|
||||
path: data/output/
|
||||
continue-on-error: true
|
||||
|
||||
- name: Reassemble and extract tar
|
||||
- name: Extract tar
|
||||
id: extract
|
||||
run: |
|
||||
cd data/output
|
||||
if [ -d tar_chunks ] && ls tar_chunks/extracted_data.tar.part_* 1>/dev/null 2>&1; then
|
||||
echo "=== Chunk files info ==="
|
||||
ls -lah tar_chunks/
|
||||
|
||||
cd tar_chunks
|
||||
|
||||
# Reassemble tar with explicit sorting
|
||||
echo "=== Reassembling tar file ==="
|
||||
ls -1 extracted_data.tar.part_?? | sort | while read part; do
|
||||
echo "Appending $part..."
|
||||
cat "$part" >> ../extracted_data.tar
|
||||
done
|
||||
cd ..
|
||||
|
||||
echo "=== Reassembled tar file info ==="
|
||||
if [ -f extracted_data.tar ]; then
|
||||
echo "=== Tar file info ==="
|
||||
ls -lah extracted_data.tar
|
||||
|
||||
# Verify checksum of reassembled tar matches original
|
||||
echo "=== Verifying reassembled tar checksum ==="
|
||||
echo "Original checksum:"
|
||||
cat tar_chunks/full_tar.sha256
|
||||
echo "Reassembled checksum:"
|
||||
sha256sum extracted_data.tar
|
||||
sha256sum -c tar_chunks/full_tar.sha256 || { echo "ERROR: Reassembled tar checksum mismatch - data corrupted during transfer"; exit 1; }
|
||||
echo "Checksum verified - data integrity confirmed"
|
||||
|
||||
rm -rf tar_chunks
|
||||
|
||||
echo "=== Verifying tar integrity ==="
|
||||
tar -tf extracted_data.tar > /dev/null || { echo "ERROR: Tar file is corrupted"; exit 1; }
|
||||
echo "=== Extracting ==="
|
||||
tar -xvf extracted_data.tar
|
||||
rm extracted_data.tar
|
||||
@@ -197,7 +159,7 @@ jobs:
|
||||
echo "=== Contents of data/output ==="
|
||||
ls -lah
|
||||
else
|
||||
echo "No tar chunks found"
|
||||
echo "No extracted_data.tar found"
|
||||
echo "has_data=false" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
|
||||
@@ -5,11 +5,6 @@ on:
|
||||
# 6:00pm UTC every day - runs on default branch, triggers both
|
||||
- cron: "0 06 * * *"
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
date:
|
||||
description: 'Date to process (YYYY-MM-DD format, default: yesterday)'
|
||||
required: false
|
||||
type: string
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
@@ -63,7 +58,7 @@ jobs:
|
||||
|
||||
- name: Run FAA release script
|
||||
run: |
|
||||
python src/create_daily_faa_release.py ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
|
||||
python src/create_daily_faa_release.py
|
||||
ls -lah data/faa_releasable
|
||||
ls -lah data/openairframes
|
||||
|
||||
@@ -98,10 +93,8 @@ jobs:
|
||||
pip install -r requirements.txt
|
||||
|
||||
- name: Download and extract ADS-B data
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
python -m src.adsb.download_and_list_icaos ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
|
||||
python -m src.adsb.download_and_list_icaos
|
||||
ls -lah data/output/
|
||||
|
||||
- name: Check manifest exists
|
||||
@@ -171,7 +164,7 @@ jobs:
|
||||
|
||||
- name: Process chunk ${{ matrix.chunk }}
|
||||
run: |
|
||||
python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4 ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
|
||||
python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4
|
||||
mkdir -p data/output/adsb_chunks
|
||||
ls -lah data/output/adsb_chunks/ || echo "No chunks created"
|
||||
|
||||
@@ -220,7 +213,7 @@ jobs:
|
||||
run: |
|
||||
mkdir -p data/output/adsb_chunks
|
||||
ls -lah data/output/adsb_chunks/ || echo "Directory empty or does not exist"
|
||||
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
|
||||
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks
|
||||
ls -lah data/openairframes/
|
||||
|
||||
- name: Upload ADS-B artifacts
|
||||
@@ -266,13 +259,6 @@ jobs:
|
||||
needs: [build-faa, adsb-reduce, build-community]
|
||||
if: github.event_name != 'schedule'
|
||||
steps:
|
||||
- name: Checkout for gh CLI
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
sparse-checkout: |
|
||||
.github
|
||||
sparse-checkout-cone-mode: false
|
||||
|
||||
- name: Download FAA artifacts
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
@@ -293,8 +279,6 @@ jobs:
|
||||
|
||||
- name: Debug artifact structure
|
||||
run: |
|
||||
echo "=== Full artifacts tree ==="
|
||||
find artifacts -type f 2>/dev/null || echo "No files found in artifacts"
|
||||
echo "=== FAA artifacts ==="
|
||||
find artifacts/faa -type f 2>/dev/null || echo "No files found in artifacts/faa"
|
||||
echo "=== ADS-B artifacts ==="
|
||||
@@ -316,35 +300,13 @@ jobs:
|
||||
TAG="openairframes-${DATE}${BRANCH_SUFFIX}"
|
||||
|
||||
# Find files from artifacts using find (handles nested structures)
|
||||
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
|
||||
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1)
|
||||
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
|
||||
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
|
||||
|
||||
# Validate required files exist
|
||||
MISSING_FILES=""
|
||||
if [ -z "$CSV_FILE_FAA" ] || [ ! -f "$CSV_FILE_FAA" ]; then
|
||||
MISSING_FILES="$MISSING_FILES FAA_CSV"
|
||||
fi
|
||||
if [ -z "$CSV_FILE_ADSB" ] || [ ! -f "$CSV_FILE_ADSB" ]; then
|
||||
MISSING_FILES="$MISSING_FILES ADSB_CSV"
|
||||
fi
|
||||
if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then
|
||||
MISSING_FILES="$MISSING_FILES FAA_ZIP"
|
||||
fi
|
||||
|
||||
if [ -n "$MISSING_FILES" ]; then
|
||||
echo "ERROR: Missing required release files:$MISSING_FILES"
|
||||
echo "FAA CSV: $CSV_FILE_FAA"
|
||||
echo "ADSB CSV: $CSV_FILE_ADSB"
|
||||
echo "ZIP: $ZIP_FILE"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Get basenames for display
|
||||
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" | head -1)
|
||||
CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA")
|
||||
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" | head -1)
|
||||
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
|
||||
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" 2>/dev/null | head -1 || echo "")
|
||||
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
|
||||
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" | head -1)
|
||||
ZIP_BASENAME=$(basename "$ZIP_FILE")
|
||||
|
||||
echo "date=$DATE" >> "$GITHUB_OUTPUT"
|
||||
@@ -358,12 +320,9 @@ jobs:
|
||||
echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT"
|
||||
echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT"
|
||||
echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
|
||||
|
||||
echo "Found files:"
|
||||
echo " FAA CSV: $CSV_FILE_FAA"
|
||||
echo " ADSB CSV: $CSV_FILE_ADSB"
|
||||
echo " Community CSV: $CSV_FILE_COMMUNITY"
|
||||
echo " ZIP: $ZIP_FILE"
|
||||
|
||||
- name: Checkout for gh CLI
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Delete existing release if exists
|
||||
run: |
|
||||
@@ -377,7 +336,6 @@ jobs:
|
||||
with:
|
||||
tag_name: ${{ steps.meta.outputs.tag }}
|
||||
name: ${{ steps.meta.outputs.name }}
|
||||
fail_on_unmatched_files: true
|
||||
body: |
|
||||
Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}.
|
||||
|
||||
|
||||
@@ -48,52 +48,29 @@ jobs:
|
||||
git fetch origin "$branch_name"
|
||||
git checkout "$branch_name"
|
||||
|
||||
# Merge main into PR branch
|
||||
git config user.name "github-actions[bot]"
|
||||
git config user.email "github-actions[bot]@users.noreply.github.com"
|
||||
|
||||
# Get the community submission file(s) and schema from this branch
|
||||
community_files=$(git diff --name-only origin/main...HEAD -- 'community/' 'schemas/')
|
||||
|
||||
if [ -z "$community_files" ]; then
|
||||
echo " No community/schema files found in PR #$pr_number, skipping"
|
||||
git checkout main
|
||||
continue
|
||||
fi
|
||||
|
||||
echo " Files to preserve: $community_files"
|
||||
|
||||
# Save the community files content
|
||||
mkdir -p /tmp/pr_files
|
||||
for file in $community_files; do
|
||||
if [ -f "$file" ]; then
|
||||
mkdir -p "/tmp/pr_files/$(dirname "$file")"
|
||||
cp "$file" "/tmp/pr_files/$file"
|
||||
if git merge origin/main -m "Merge main to update schema"; then
|
||||
# Regenerate schema for this PR's submission (adds any new tags)
|
||||
python -m src.contributions.regenerate_pr_schema || true
|
||||
|
||||
# If there are changes, commit and push
|
||||
if [ -n "$(git status --porcelain schemas/)" ]; then
|
||||
git add schemas/
|
||||
git commit -m "Update schema with new tags"
|
||||
git push origin "$branch_name"
|
||||
echo " Updated PR #$pr_number with schema changes"
|
||||
else
|
||||
git push origin "$branch_name"
|
||||
echo " Merged main into PR #$pr_number"
|
||||
fi
|
||||
done
|
||||
|
||||
# Reset branch to main (clean slate)
|
||||
git reset --hard origin/main
|
||||
|
||||
# Restore the community files
|
||||
for file in $community_files; do
|
||||
if [ -f "/tmp/pr_files/$file" ]; then
|
||||
mkdir -p "$(dirname "$file")"
|
||||
cp "/tmp/pr_files/$file" "$file"
|
||||
fi
|
||||
done
|
||||
rm -rf /tmp/pr_files
|
||||
|
||||
# Regenerate schema with current main + this submission's tags
|
||||
python -m src.contributions.regenerate_pr_schema || true
|
||||
|
||||
# Stage and commit all changes
|
||||
git add community/ schemas/
|
||||
if ! git diff --cached --quiet; then
|
||||
git commit -m "Community submission (rebased on main)"
|
||||
git push --force origin "$branch_name"
|
||||
echo " Rebased PR #$pr_number onto main"
|
||||
else
|
||||
echo " No changes needed for PR #$pr_number"
|
||||
echo " Merge conflict in PR #$pr_number, adding comment"
|
||||
gh pr comment "$pr_number" --body $'⚠️ **Merge Conflict**\n\nAnother community submission was merged and this PR has conflicts.\n\nA maintainer may need to:\n1. Close this PR\n2. Remove the `approved` label from the original issue\n3. Re-add the `approved` label to regenerate the PR'
|
||||
git merge --abort
|
||||
fi
|
||||
fi
|
||||
|
||||
git checkout main
|
||||
|
||||
@@ -1,50 +1 @@
|
||||
# OpenAirframes.org
|
||||
|
||||
OpenAirframes.org is an open-source, community-driven airframes database.
|
||||
|
||||
The data includes:
|
||||
- Registration information from Civil Aviation Authorities (FAA)
|
||||
- Airline data (e.g., Air France)
|
||||
- Community contributions such as ownership details, military aircraft info, photos, and more
|
||||
|
||||
---
|
||||
|
||||
## For Users
|
||||
|
||||
A daily release is created at **06:00 UTC** and includes:
|
||||
|
||||
- **openairframes_community.csv**
|
||||
All community submissions
|
||||
|
||||
- **openairframes_faa.csv**
|
||||
All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB)
|
||||
|
||||
- **openairframes_adsb.csv**
|
||||
Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present (will be from 2024-01-01 soon). The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB).
|
||||
|
||||
- **ReleasableAircraft_{date}.zip**
|
||||
A daily snapshot of the FAA database, which updates at **05:30 UTC**
|
||||
|
||||
---
|
||||
|
||||
## For Contributors
|
||||
|
||||
Submit data via a [GitHub Issue](https://github.com/PlaneQuery/OpenAirframes/issues/new?template=community_submission.yaml) with your preferred attribution. Once approved, it will appear in the daily release. A leaderboard will be available in the future.
|
||||
All data is valuable. Examples include:
|
||||
- Celebrity ownership (with citations)
|
||||
- Photos
|
||||
- Internet capability
|
||||
- Military aircraft information
|
||||
- Unique facts (e.g., an airframe that crashed, performs aerobatics, etc.)
|
||||
|
||||
Please try to follow the submission formatting guidelines. If you are struggling with them, that is fine—submit your data anyway and it will be formatted for you.
|
||||
|
||||
---
|
||||
|
||||
## For Developers
|
||||
All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include:
|
||||
- Web UI for data
|
||||
- Web UI for contributors
|
||||
- Additional export formats in the daily release
|
||||
- Data fusion from multiple sources in the daily release
|
||||
- Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs
|
||||
Downloads [`https://registry.faa.gov/database/ReleasableAircraft.zip`](https://registry.faa.gov/database/ReleasableAircraft.zip). Creates a daily GitHub Release at 06:00 UTC containing the unaltered `ReleasableAircraft.zip` and a derived CSV file with all data from FAA database since 2023-08-16. The FAA database updates daily at 05:30 UTC.
|
||||
@@ -0,0 +1,11 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import aws_cdk as cdk
|
||||
from stack import AdsbProcessingStack
|
||||
|
||||
app = cdk.App()
|
||||
AdsbProcessingStack(app, "AdsbProcessingStack", env=cdk.Environment(
|
||||
account=os.environ["CDK_DEFAULT_ACCOUNT"],
|
||||
region=os.environ["CDK_DEFAULT_REGION"],
|
||||
))
|
||||
app.synth()
|
||||
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"app": "python3 app.py"
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
aws-cdk-lib>=2.170.0
|
||||
constructs>=10.0.0
|
||||
+213
@@ -0,0 +1,213 @@
|
||||
import aws_cdk as cdk
|
||||
from aws_cdk import (
|
||||
Stack,
|
||||
Duration,
|
||||
RemovalPolicy,
|
||||
aws_s3 as s3,
|
||||
aws_ecs as ecs,
|
||||
aws_ec2 as ec2,
|
||||
aws_ecr_assets,
|
||||
aws_iam as iam,
|
||||
aws_logs as logs,
|
||||
aws_stepfunctions as sfn,
|
||||
aws_stepfunctions_tasks as sfn_tasks,
|
||||
)
|
||||
from constructs import Construct
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class AdsbProcessingStack(Stack):
|
||||
def __init__(self, scope: Construct, id: str, **kwargs):
|
||||
super().__init__(scope, id, **kwargs)
|
||||
|
||||
# --- S3 bucket for intermediate and final results ---
|
||||
bucket = s3.Bucket(
|
||||
self, "ResultsBucket",
|
||||
bucket_name="openairframes-dev",
|
||||
removal_policy=RemovalPolicy.DESTROY,
|
||||
auto_delete_objects=True,
|
||||
lifecycle_rules=[
|
||||
s3.LifecycleRule(
|
||||
prefix="intermediate/",
|
||||
expiration=Duration.days(7),
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
# --- Use default VPC (no additional cost) ---
|
||||
vpc = ec2.Vpc.from_lookup(
|
||||
self, "Vpc",
|
||||
is_default=True,
|
||||
)
|
||||
|
||||
# --- ECS Cluster ---
|
||||
cluster = ecs.Cluster(
|
||||
self, "Cluster",
|
||||
vpc=vpc,
|
||||
container_insights=True,
|
||||
)
|
||||
|
||||
# --- Log group ---
|
||||
log_group = logs.LogGroup(
|
||||
self, "LogGroup",
|
||||
log_group_name="/adsb-processing",
|
||||
removal_policy=RemovalPolicy.DESTROY,
|
||||
retention=logs.RetentionDays.TWO_WEEKS,
|
||||
)
|
||||
|
||||
# --- Docker images (built from local Dockerfiles) ---
|
||||
adsb_dir = str(Path(__file__).parent.parent / "src" / "adsb")
|
||||
|
||||
worker_image = ecs.ContainerImage.from_asset(
|
||||
adsb_dir,
|
||||
file="Dockerfile.worker",
|
||||
platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64,
|
||||
)
|
||||
reducer_image = ecs.ContainerImage.from_asset(
|
||||
adsb_dir,
|
||||
file="Dockerfile.reducer",
|
||||
platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64,
|
||||
)
|
||||
|
||||
# --- Task role (shared) ---
|
||||
task_role = iam.Role(
|
||||
self, "TaskRole",
|
||||
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
|
||||
)
|
||||
bucket.grant_read_write(task_role)
|
||||
|
||||
# --- MAP: worker task definition ---
|
||||
map_task_def = ecs.FargateTaskDefinition(
|
||||
self, "MapTaskDef",
|
||||
cpu=4096, # 4 vCPU
|
||||
memory_limit_mib=30720, # 30 GB
|
||||
task_role=task_role,
|
||||
runtime_platform=ecs.RuntimePlatform(
|
||||
cpu_architecture=ecs.CpuArchitecture.ARM64,
|
||||
operating_system_family=ecs.OperatingSystemFamily.LINUX,
|
||||
),
|
||||
)
|
||||
map_container = map_task_def.add_container(
|
||||
"worker",
|
||||
image=worker_image,
|
||||
logging=ecs.LogDrivers.aws_logs(
|
||||
stream_prefix="map",
|
||||
log_group=log_group,
|
||||
),
|
||||
environment={
|
||||
"S3_BUCKET": bucket.bucket_name,
|
||||
},
|
||||
)
|
||||
|
||||
# --- REDUCE: reducer task definition ---
|
||||
reduce_task_def = ecs.FargateTaskDefinition(
|
||||
self, "ReduceTaskDef",
|
||||
cpu=4096, # 4 vCPU
|
||||
memory_limit_mib=30720, # 30 GB — must hold full year in memory
|
||||
task_role=task_role,
|
||||
runtime_platform=ecs.RuntimePlatform(
|
||||
cpu_architecture=ecs.CpuArchitecture.ARM64,
|
||||
operating_system_family=ecs.OperatingSystemFamily.LINUX,
|
||||
),
|
||||
)
|
||||
reduce_container = reduce_task_def.add_container(
|
||||
"reducer",
|
||||
image=reducer_image,
|
||||
logging=ecs.LogDrivers.aws_logs(
|
||||
stream_prefix="reduce",
|
||||
log_group=log_group,
|
||||
),
|
||||
environment={
|
||||
"S3_BUCKET": bucket.bucket_name,
|
||||
},
|
||||
)
|
||||
|
||||
# --- Step Functions ---
|
||||
|
||||
# Map task: run ECS Fargate for each date chunk
|
||||
map_ecs_task = sfn_tasks.EcsRunTask(
|
||||
self, "ProcessChunk",
|
||||
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
|
||||
cluster=cluster,
|
||||
task_definition=map_task_def,
|
||||
launch_target=sfn_tasks.EcsFargateLaunchTarget(
|
||||
platform_version=ecs.FargatePlatformVersion.LATEST,
|
||||
),
|
||||
container_overrides=[
|
||||
sfn_tasks.ContainerOverride(
|
||||
container_definition=map_container,
|
||||
environment=[
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="START_DATE",
|
||||
value=sfn.JsonPath.string_at("$.start_date"),
|
||||
),
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="END_DATE",
|
||||
value=sfn.JsonPath.string_at("$.end_date"),
|
||||
),
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="RUN_ID",
|
||||
value=sfn.JsonPath.string_at("$.run_id"),
|
||||
),
|
||||
],
|
||||
)
|
||||
],
|
||||
assign_public_ip=True,
|
||||
subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
|
||||
result_path="$.task_result",
|
||||
)
|
||||
|
||||
# Map state — max 3 concurrent workers
|
||||
map_state = sfn.Map(
|
||||
self, "FanOutChunks",
|
||||
items_path="$.chunks",
|
||||
max_concurrency=3,
|
||||
result_path="$.map_results",
|
||||
)
|
||||
map_state.item_processor(map_ecs_task)
|
||||
|
||||
# Reduce task: combine all chunk CSVs
|
||||
reduce_ecs_task = sfn_tasks.EcsRunTask(
|
||||
self, "ReduceResults",
|
||||
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
|
||||
cluster=cluster,
|
||||
task_definition=reduce_task_def,
|
||||
launch_target=sfn_tasks.EcsFargateLaunchTarget(
|
||||
platform_version=ecs.FargatePlatformVersion.LATEST,
|
||||
),
|
||||
container_overrides=[
|
||||
sfn_tasks.ContainerOverride(
|
||||
container_definition=reduce_container,
|
||||
environment=[
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="RUN_ID",
|
||||
value=sfn.JsonPath.string_at("$.run_id"),
|
||||
),
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="GLOBAL_START_DATE",
|
||||
value=sfn.JsonPath.string_at("$.global_start_date"),
|
||||
),
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="GLOBAL_END_DATE",
|
||||
value=sfn.JsonPath.string_at("$.global_end_date"),
|
||||
),
|
||||
],
|
||||
)
|
||||
],
|
||||
assign_public_ip=True,
|
||||
subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
|
||||
)
|
||||
|
||||
# Chain: fan-out map → reduce
|
||||
definition = map_state.next(reduce_ecs_task)
|
||||
|
||||
sfn.StateMachine(
|
||||
self, "Pipeline",
|
||||
state_machine_name="adsb-map-reduce",
|
||||
definition_body=sfn.DefinitionBody.from_chainable(definition),
|
||||
timeout=Duration.hours(48),
|
||||
)
|
||||
|
||||
# --- Outputs ---
|
||||
cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name)
|
||||
cdk.CfnOutput(self, "StateMachineName", value="adsb-map-reduce")
|
||||
@@ -1,15 +1,6 @@
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone, timedelta
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Create daily FAA release")
|
||||
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today)")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.date:
|
||||
date_str = args.date
|
||||
else:
|
||||
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
from datetime import datetime, timezone
|
||||
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
|
||||
out_dir = Path("data/faa_releasable")
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@@ -0,0 +1,90 @@
|
||||
"""
|
||||
Generate Step Functions input and start the pipeline.
|
||||
|
||||
Usage:
|
||||
python trigger_pipeline.py 2024-01-01 2025-01-01
|
||||
python trigger_pipeline.py 2024-01-01 2025-01-01 --chunk-days 30
|
||||
python trigger_pipeline.py 2024-01-01 2025-01-01 --dry-run
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import boto3
|
||||
|
||||
|
||||
def generate_chunks(start_date: str, end_date: str, chunk_days: int = 1):
|
||||
"""Split a date range into chunks of chunk_days."""
|
||||
start = datetime.strptime(start_date, "%Y-%m-%d")
|
||||
end = datetime.strptime(end_date, "%Y-%m-%d")
|
||||
|
||||
chunks = []
|
||||
current = start
|
||||
while current < end:
|
||||
chunk_end = min(current + timedelta(days=chunk_days), end)
|
||||
chunks.append({
|
||||
"start_date": current.strftime("%Y-%m-%d"),
|
||||
"end_date": chunk_end.strftime("%Y-%m-%d"),
|
||||
})
|
||||
current = chunk_end
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Trigger ADS-B map-reduce pipeline")
|
||||
parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)")
|
||||
parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)")
|
||||
parser.add_argument("--chunk-days", type=int, default=1,
|
||||
help="Days per chunk (default: 1)")
|
||||
parser.add_argument("--dry-run", action="store_true",
|
||||
help="Print input JSON without starting execution")
|
||||
args = parser.parse_args()
|
||||
|
||||
run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}"
|
||||
chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days)
|
||||
|
||||
# Inject run_id into each chunk
|
||||
for chunk in chunks:
|
||||
chunk["run_id"] = run_id
|
||||
|
||||
sfn_input = {
|
||||
"run_id": run_id,
|
||||
"global_start_date": args.start_date,
|
||||
"global_end_date": args.end_date,
|
||||
"chunks": chunks,
|
||||
}
|
||||
|
||||
print(f"Run ID: {run_id}")
|
||||
print(f"Chunks: {len(chunks)} (at {args.chunk_days} days each)")
|
||||
print(f"Max concurrency: 3 (enforced by Step Functions Map state)")
|
||||
print()
|
||||
print(json.dumps(sfn_input, indent=2))
|
||||
|
||||
if args.dry_run:
|
||||
print("\n--dry-run: not starting execution")
|
||||
return
|
||||
|
||||
client = boto3.client("stepfunctions")
|
||||
|
||||
# Find the state machine ARN
|
||||
machines = client.list_state_machines()["stateMachines"]
|
||||
arn = next(
|
||||
m["stateMachineArn"]
|
||||
for m in machines
|
||||
if m["name"] == "adsb-map-reduce"
|
||||
)
|
||||
|
||||
response = client.start_execution(
|
||||
stateMachineArn=arn,
|
||||
name=run_id,
|
||||
input=json.dumps(sfn_input),
|
||||
)
|
||||
|
||||
print(f"\nStarted execution: {response['executionArn']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user