Compare commits

..

35 Commits

Author SHA1 Message Date
ggman12 50267f3c57 make faa work with no new data 2026-02-12 17:26:48 -05:00
ggman12 dd323f6e55 delete old files 2026-02-12 17:25:50 -05:00
ggman12 0e8b21daf9 rename from planequery to openairframes 2026-02-12 17:24:08 -05:00
ggman12 3960e6936c use start_date_end_date for adsb naming 2026-02-12 17:13:06 -05:00
ggman12 48623ef79e delete existign release 2026-02-12 17:12:09 -05:00
ggman12 5affe8937c rename to openairframes 2026-02-12 17:09:07 -05:00
ggman12 d0254146f3 update release to fix not grabbing FAA file 2026-02-12 16:42:47 -05:00
ggman12 1699ad6d8a rename file 2026-02-12 16:12:03 -05:00
ggman12 2a6892c347 fix download 2026-02-12 16:08:08 -05:00
ggman12 47ccecb9ba set fail-fast to true 2026-02-12 16:07:42 -05:00
ggman12 2826dfd450 remove notebook 2026-02-12 16:07:28 -05:00
ggman12 fecf9ff0ea format properly 2026-02-12 16:01:14 -05:00
ggman12 7e0a396fc7 only modify key parts of schemas/community_submission.v1.schema.json schema. Lowest diffs 2026-02-12 15:55:44 -05:00
ggman12 b0503bb3b2 fix: should update schema now 2026-02-12 15:46:11 -05:00
ggman12 0b89138daf modify existing json schema instead of creating a new file every time 2026-02-12 15:40:01 -05:00
ggman12 4b756cdaef fix syntax error 2026-02-12 15:32:37 -05:00
ggman12 9acffe1e56 handle multiple PRs with schema changes 2026-02-12 15:31:53 -05:00
ggman12 1694fe0b46 allow fileupload in submission 2026-02-12 15:26:45 -05:00
ggman12 c6d9e59d01 update template 2026-02-12 13:29:45 -05:00
ggman12 dd6cd7b6fd update schema with optional start_date and end_date scope 2026-02-12 13:28:43 -05:00
ggman12 f543b671f8 updating schema 2026-02-12 13:22:56 -05:00
ggman12 efb4cbb953 update example 2026-02-12 13:22:43 -05:00
ggman12 5578133a99 update schema to be uppercase only 2026-02-12 12:36:50 -05:00
ggman12 eace7d5a63 update folder 2026-02-12 12:34:27 -05:00
ggman12 82f47b662c make blank username work 2026-02-12 12:32:41 -05:00
ggman12 787796c3ab update approve_submission 2026-02-12 12:26:54 -05:00
ggman12 61aae586ee fix approve 2026-02-12 12:18:28 -05:00
ggman12 5abfa6b226 update submission validation 2026-02-12 12:15:04 -05:00
ggman12 a743b74ae5 Merge branch 'develop' 2026-02-12 12:10:24 -05:00
ggman12 53a020ab73 add jsonschema to requirements.txt 2026-02-12 12:09:03 -05:00
ggman12 2de41c9883 update historical. To check tar and fail fast if any maps fail 2026-02-12 12:01:13 -05:00
ggman12 bccc634158 remove existing release 2026-02-12 11:50:45 -05:00
ggman12 43b07942b0 add needed permissions 2026-02-12 11:42:49 -05:00
ggman12 2c9e994a12 add debug for FAA 2026-02-12 11:06:38 -05:00
ggman12 99b680476a delete parquet chunck after load to not use so much space for big historical run 2026-02-12 10:52:42 -05:00
11 changed files with 366 additions and 207 deletions
@@ -8,8 +8,8 @@ body:
- type: markdown
attributes:
value: |
Submit **one object** or an **array of objects** that matches the community submission [schema](https://github.com/PlaneQuery/OpenAirframes/blob/main/schemas/community_submission.v1.schema.json). Reuse existing tags from the schema when possible.
Submit **one object** or an **array of objects** that matches the community submission schema.
**Rules (enforced on review/automation):**
- Each object must include **at least one** of:
- `registration_number`
@@ -27,7 +27,7 @@ body:
```json
{
"registration_number": "N12345",
"tags": {"owner": "John Doe", "photo": "https://example.com/photo.jpg"},
"tags": {"owner": "John Doe"},
"start_date": "2025-01-01"
}
```
@@ -77,5 +77,6 @@ body:
id: notes
attributes:
label: Notes (optional)
description: Any context, sources, or links that help validate your submission.
validations:
required: false
+11 -49
View File
@@ -74,12 +74,11 @@ jobs:
env:
START_DATE: ${{ matrix.chunk.start_date }}
END_DATE: ${{ matrix.chunk.end_date }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
python -m src.adsb.download_and_list_icaos --start-date "$START_DATE" --end-date "$END_DATE"
ls -lah data/output/
- name: Create tar of extracted data and split into chunks
- name: Create tar of extracted data
run: |
cd data/output
echo "=== Disk space before tar ==="
@@ -94,31 +93,16 @@ jobs:
ls -lah extracted_data.tar
# Verify tar integrity
tar -tf extracted_data.tar > /dev/null && echo "Tar integrity check passed" || { echo "Tar integrity check FAILED"; exit 1; }
# Create checksum of the FULL tar before splitting (for verification after reassembly)
echo "=== Creating checksum of full tar ==="
sha256sum extracted_data.tar > full_tar.sha256
cat full_tar.sha256
# Split into 500MB chunks to avoid artifact upload issues
echo "=== Splitting tar into 500MB chunks ==="
mkdir -p tar_chunks
split -b 500M extracted_data.tar tar_chunks/extracted_data.tar.part_
rm extracted_data.tar
mv full_tar.sha256 tar_chunks/
echo "=== Chunks created ==="
ls -lah tar_chunks/
else
echo "ERROR: No extracted directories found, cannot create tar"
exit 1
fi
- name: Upload extracted data chunks
- name: Upload extracted data
uses: actions/upload-artifact@v4
with:
name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }}
path: data/output/tar_chunks/
path: data/output/extracted_data.tar
retention-days: 1
compression-level: 0
if-no-files-found: warn
@@ -156,40 +140,18 @@ jobs:
uses: actions/download-artifact@v4
with:
name: adsb-extracted-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }}
path: data/output/tar_chunks/
path: data/output/
continue-on-error: true
- name: Reassemble and extract tar
- name: Extract tar
id: extract
run: |
cd data/output
if [ -d tar_chunks ] && ls tar_chunks/extracted_data.tar.part_* 1>/dev/null 2>&1; then
echo "=== Chunk files info ==="
ls -lah tar_chunks/
cd tar_chunks
# Reassemble tar with explicit sorting
echo "=== Reassembling tar file ==="
ls -1 extracted_data.tar.part_?? | sort | while read part; do
echo "Appending $part..."
cat "$part" >> ../extracted_data.tar
done
cd ..
echo "=== Reassembled tar file info ==="
if [ -f extracted_data.tar ]; then
echo "=== Tar file info ==="
ls -lah extracted_data.tar
# Verify checksum of reassembled tar matches original
echo "=== Verifying reassembled tar checksum ==="
echo "Original checksum:"
cat tar_chunks/full_tar.sha256
echo "Reassembled checksum:"
sha256sum extracted_data.tar
sha256sum -c tar_chunks/full_tar.sha256 || { echo "ERROR: Reassembled tar checksum mismatch - data corrupted during transfer"; exit 1; }
echo "Checksum verified - data integrity confirmed"
rm -rf tar_chunks
echo "=== Verifying tar integrity ==="
tar -tf extracted_data.tar > /dev/null || { echo "ERROR: Tar file is corrupted"; exit 1; }
echo "=== Extracting ==="
tar -xvf extracted_data.tar
rm extracted_data.tar
@@ -197,7 +159,7 @@ jobs:
echo "=== Contents of data/output ==="
ls -lah
else
echo "No tar chunks found"
echo "No extracted_data.tar found"
echo "has_data=false" >> "$GITHUB_OUTPUT"
fi
@@ -5,11 +5,6 @@ on:
# 6:00pm UTC every day - runs on default branch, triggers both
- cron: "0 06 * * *"
workflow_dispatch:
inputs:
date:
description: 'Date to process (YYYY-MM-DD format, default: yesterday)'
required: false
type: string
permissions:
contents: write
@@ -63,7 +58,7 @@ jobs:
- name: Run FAA release script
run: |
python src/create_daily_faa_release.py ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
python src/create_daily_faa_release.py
ls -lah data/faa_releasable
ls -lah data/openairframes
@@ -98,10 +93,8 @@ jobs:
pip install -r requirements.txt
- name: Download and extract ADS-B data
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
python -m src.adsb.download_and_list_icaos ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
python -m src.adsb.download_and_list_icaos
ls -lah data/output/
- name: Check manifest exists
@@ -171,7 +164,7 @@ jobs:
- name: Process chunk ${{ matrix.chunk }}
run: |
python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4 ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4
mkdir -p data/output/adsb_chunks
ls -lah data/output/adsb_chunks/ || echo "No chunks created"
@@ -220,7 +213,7 @@ jobs:
run: |
mkdir -p data/output/adsb_chunks
ls -lah data/output/adsb_chunks/ || echo "Directory empty or does not exist"
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks ${{ inputs.date && format('--date {0}', inputs.date) || '' }}
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks
ls -lah data/openairframes/
- name: Upload ADS-B artifacts
@@ -266,13 +259,6 @@ jobs:
needs: [build-faa, adsb-reduce, build-community]
if: github.event_name != 'schedule'
steps:
- name: Checkout for gh CLI
uses: actions/checkout@v4
with:
sparse-checkout: |
.github
sparse-checkout-cone-mode: false
- name: Download FAA artifacts
uses: actions/download-artifact@v4
with:
@@ -293,8 +279,6 @@ jobs:
- name: Debug artifact structure
run: |
echo "=== Full artifacts tree ==="
find artifacts -type f 2>/dev/null || echo "No files found in artifacts"
echo "=== FAA artifacts ==="
find artifacts/faa -type f 2>/dev/null || echo "No files found in artifacts/faa"
echo "=== ADS-B artifacts ==="
@@ -316,35 +300,13 @@ jobs:
TAG="openairframes-${DATE}${BRANCH_SUFFIX}"
# Find files from artifacts using find (handles nested structures)
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
# Validate required files exist
MISSING_FILES=""
if [ -z "$CSV_FILE_FAA" ] || [ ! -f "$CSV_FILE_FAA" ]; then
MISSING_FILES="$MISSING_FILES FAA_CSV"
fi
if [ -z "$CSV_FILE_ADSB" ] || [ ! -f "$CSV_FILE_ADSB" ]; then
MISSING_FILES="$MISSING_FILES ADSB_CSV"
fi
if [ -z "$ZIP_FILE" ] || [ ! -f "$ZIP_FILE" ]; then
MISSING_FILES="$MISSING_FILES FAA_ZIP"
fi
if [ -n "$MISSING_FILES" ]; then
echo "ERROR: Missing required release files:$MISSING_FILES"
echo "FAA CSV: $CSV_FILE_FAA"
echo "ADSB CSV: $CSV_FILE_ADSB"
echo "ZIP: $ZIP_FILE"
exit 1
fi
# Get basenames for display
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" | head -1)
CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA")
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" | head -1)
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" 2>/dev/null | head -1 || echo "")
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" | head -1)
ZIP_BASENAME=$(basename "$ZIP_FILE")
echo "date=$DATE" >> "$GITHUB_OUTPUT"
@@ -358,12 +320,9 @@ jobs:
echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT"
echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT"
echo "name=OpenAirframes snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
echo "Found files:"
echo " FAA CSV: $CSV_FILE_FAA"
echo " ADSB CSV: $CSV_FILE_ADSB"
echo " Community CSV: $CSV_FILE_COMMUNITY"
echo " ZIP: $ZIP_FILE"
- name: Checkout for gh CLI
uses: actions/checkout@v4
- name: Delete existing release if exists
run: |
@@ -377,7 +336,6 @@ jobs:
with:
tag_name: ${{ steps.meta.outputs.tag }}
name: ${{ steps.meta.outputs.name }}
fail_on_unmatched_files: true
body: |
Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}.
+18 -41
View File
@@ -48,52 +48,29 @@ jobs:
git fetch origin "$branch_name"
git checkout "$branch_name"
# Merge main into PR branch
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
# Get the community submission file(s) and schema from this branch
community_files=$(git diff --name-only origin/main...HEAD -- 'community/' 'schemas/')
if [ -z "$community_files" ]; then
echo " No community/schema files found in PR #$pr_number, skipping"
git checkout main
continue
fi
echo " Files to preserve: $community_files"
# Save the community files content
mkdir -p /tmp/pr_files
for file in $community_files; do
if [ -f "$file" ]; then
mkdir -p "/tmp/pr_files/$(dirname "$file")"
cp "$file" "/tmp/pr_files/$file"
if git merge origin/main -m "Merge main to update schema"; then
# Regenerate schema for this PR's submission (adds any new tags)
python -m src.contributions.regenerate_pr_schema || true
# If there are changes, commit and push
if [ -n "$(git status --porcelain schemas/)" ]; then
git add schemas/
git commit -m "Update schema with new tags"
git push origin "$branch_name"
echo " Updated PR #$pr_number with schema changes"
else
git push origin "$branch_name"
echo " Merged main into PR #$pr_number"
fi
done
# Reset branch to main (clean slate)
git reset --hard origin/main
# Restore the community files
for file in $community_files; do
if [ -f "/tmp/pr_files/$file" ]; then
mkdir -p "$(dirname "$file")"
cp "/tmp/pr_files/$file" "$file"
fi
done
rm -rf /tmp/pr_files
# Regenerate schema with current main + this submission's tags
python -m src.contributions.regenerate_pr_schema || true
# Stage and commit all changes
git add community/ schemas/
if ! git diff --cached --quiet; then
git commit -m "Community submission (rebased on main)"
git push --force origin "$branch_name"
echo " Rebased PR #$pr_number onto main"
else
echo " No changes needed for PR #$pr_number"
echo " Merge conflict in PR #$pr_number, adding comment"
gh pr comment "$pr_number" --body $'⚠️ **Merge Conflict**\n\nAnother community submission was merged and this PR has conflicts.\n\nA maintainer may need to:\n1. Close this PR\n2. Remove the `approved` label from the original issue\n3. Re-add the `approved` label to regenerate the PR'
git merge --abort
fi
fi
git checkout main
+1 -50
View File
@@ -1,50 +1 @@
# OpenAirframes.org
OpenAirframes.org is an open-source, community-driven airframes database.
The data includes:
- Registration information from Civil Aviation Authorities (FAA)
- Airline data (e.g., Air France)
- Community contributions such as ownership details, military aircraft info, photos, and more
---
## For Users
A daily release is created at **06:00 UTC** and includes:
- **openairframes_community.csv**
All community submissions
- **openairframes_faa.csv**
All [FAA registration data](https://www.faa.gov/licenses_certificates/aircraft_certification/aircraft_registry/releasable_aircraft_download) from 2023-08-16 to present (~260 MB)
- **openairframes_adsb.csv**
Airframe information derived from ADS-B messages on the [ADSB.lol](https://www.adsb.lol/) network, from 2026-02-12 to present (will be from 2024-01-01 soon). The airframe information originates from [mictronics aircraft database](https://www.mictronics.de/aircraft-database/) (~5 MB).
- **ReleasableAircraft_{date}.zip**
A daily snapshot of the FAA database, which updates at **05:30 UTC**
---
## For Contributors
Submit data via a [GitHub Issue](https://github.com/PlaneQuery/OpenAirframes/issues/new?template=community_submission.yaml) with your preferred attribution. Once approved, it will appear in the daily release. A leaderboard will be available in the future.
All data is valuable. Examples include:
- Celebrity ownership (with citations)
- Photos
- Internet capability
- Military aircraft information
- Unique facts (e.g., an airframe that crashed, performs aerobatics, etc.)
Please try to follow the submission formatting guidelines. If you are struggling with them, that is fine—submit your data anyway and it will be formatted for you.
---
## For Developers
All code, compute (GitHub Actions), and storage (releases) are in this GitHub repository Improvements are welcome. Potential features include:
- Web UI for data
- Web UI for contributors
- Additional export formats in the daily release
- Data fusion from multiple sources in the daily release
- Automated airframe data connectors, including (but not limited to) civil aviation authorities and airline APIs
Downloads [`https://registry.faa.gov/database/ReleasableAircraft.zip`](https://registry.faa.gov/database/ReleasableAircraft.zip). Creates a daily GitHub Release at 06:00 UTC containing the unaltered `ReleasableAircraft.zip` and a derived CSV file with all data from FAA database since 2023-08-16. The FAA database updates daily at 05:30 UTC.
+11
View File
@@ -0,0 +1,11 @@
#!/usr/bin/env python3
import os
import aws_cdk as cdk
from stack import AdsbProcessingStack
app = cdk.App()
AdsbProcessingStack(app, "AdsbProcessingStack", env=cdk.Environment(
account=os.environ["CDK_DEFAULT_ACCOUNT"],
region=os.environ["CDK_DEFAULT_REGION"],
))
app.synth()
+3
View File
@@ -0,0 +1,3 @@
{
"app": "python3 app.py"
}
+2
View File
@@ -0,0 +1,2 @@
aws-cdk-lib>=2.170.0
constructs>=10.0.0
+213
View File
@@ -0,0 +1,213 @@
import aws_cdk as cdk
from aws_cdk import (
Stack,
Duration,
RemovalPolicy,
aws_s3 as s3,
aws_ecs as ecs,
aws_ec2 as ec2,
aws_ecr_assets,
aws_iam as iam,
aws_logs as logs,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as sfn_tasks,
)
from constructs import Construct
from pathlib import Path
class AdsbProcessingStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs):
super().__init__(scope, id, **kwargs)
# --- S3 bucket for intermediate and final results ---
bucket = s3.Bucket(
self, "ResultsBucket",
bucket_name="openairframes-dev",
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True,
lifecycle_rules=[
s3.LifecycleRule(
prefix="intermediate/",
expiration=Duration.days(7),
)
],
)
# --- Use default VPC (no additional cost) ---
vpc = ec2.Vpc.from_lookup(
self, "Vpc",
is_default=True,
)
# --- ECS Cluster ---
cluster = ecs.Cluster(
self, "Cluster",
vpc=vpc,
container_insights=True,
)
# --- Log group ---
log_group = logs.LogGroup(
self, "LogGroup",
log_group_name="/adsb-processing",
removal_policy=RemovalPolicy.DESTROY,
retention=logs.RetentionDays.TWO_WEEKS,
)
# --- Docker images (built from local Dockerfiles) ---
adsb_dir = str(Path(__file__).parent.parent / "src" / "adsb")
worker_image = ecs.ContainerImage.from_asset(
adsb_dir,
file="Dockerfile.worker",
platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64,
)
reducer_image = ecs.ContainerImage.from_asset(
adsb_dir,
file="Dockerfile.reducer",
platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64,
)
# --- Task role (shared) ---
task_role = iam.Role(
self, "TaskRole",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
)
bucket.grant_read_write(task_role)
# --- MAP: worker task definition ---
map_task_def = ecs.FargateTaskDefinition(
self, "MapTaskDef",
cpu=4096, # 4 vCPU
memory_limit_mib=30720, # 30 GB
task_role=task_role,
runtime_platform=ecs.RuntimePlatform(
cpu_architecture=ecs.CpuArchitecture.ARM64,
operating_system_family=ecs.OperatingSystemFamily.LINUX,
),
)
map_container = map_task_def.add_container(
"worker",
image=worker_image,
logging=ecs.LogDrivers.aws_logs(
stream_prefix="map",
log_group=log_group,
),
environment={
"S3_BUCKET": bucket.bucket_name,
},
)
# --- REDUCE: reducer task definition ---
reduce_task_def = ecs.FargateTaskDefinition(
self, "ReduceTaskDef",
cpu=4096, # 4 vCPU
memory_limit_mib=30720, # 30 GB — must hold full year in memory
task_role=task_role,
runtime_platform=ecs.RuntimePlatform(
cpu_architecture=ecs.CpuArchitecture.ARM64,
operating_system_family=ecs.OperatingSystemFamily.LINUX,
),
)
reduce_container = reduce_task_def.add_container(
"reducer",
image=reducer_image,
logging=ecs.LogDrivers.aws_logs(
stream_prefix="reduce",
log_group=log_group,
),
environment={
"S3_BUCKET": bucket.bucket_name,
},
)
# --- Step Functions ---
# Map task: run ECS Fargate for each date chunk
map_ecs_task = sfn_tasks.EcsRunTask(
self, "ProcessChunk",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=map_task_def,
launch_target=sfn_tasks.EcsFargateLaunchTarget(
platform_version=ecs.FargatePlatformVersion.LATEST,
),
container_overrides=[
sfn_tasks.ContainerOverride(
container_definition=map_container,
environment=[
sfn_tasks.TaskEnvironmentVariable(
name="START_DATE",
value=sfn.JsonPath.string_at("$.start_date"),
),
sfn_tasks.TaskEnvironmentVariable(
name="END_DATE",
value=sfn.JsonPath.string_at("$.end_date"),
),
sfn_tasks.TaskEnvironmentVariable(
name="RUN_ID",
value=sfn.JsonPath.string_at("$.run_id"),
),
],
)
],
assign_public_ip=True,
subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
result_path="$.task_result",
)
# Map state — max 3 concurrent workers
map_state = sfn.Map(
self, "FanOutChunks",
items_path="$.chunks",
max_concurrency=3,
result_path="$.map_results",
)
map_state.item_processor(map_ecs_task)
# Reduce task: combine all chunk CSVs
reduce_ecs_task = sfn_tasks.EcsRunTask(
self, "ReduceResults",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=reduce_task_def,
launch_target=sfn_tasks.EcsFargateLaunchTarget(
platform_version=ecs.FargatePlatformVersion.LATEST,
),
container_overrides=[
sfn_tasks.ContainerOverride(
container_definition=reduce_container,
environment=[
sfn_tasks.TaskEnvironmentVariable(
name="RUN_ID",
value=sfn.JsonPath.string_at("$.run_id"),
),
sfn_tasks.TaskEnvironmentVariable(
name="GLOBAL_START_DATE",
value=sfn.JsonPath.string_at("$.global_start_date"),
),
sfn_tasks.TaskEnvironmentVariable(
name="GLOBAL_END_DATE",
value=sfn.JsonPath.string_at("$.global_end_date"),
),
],
)
],
assign_public_ip=True,
subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
)
# Chain: fan-out map → reduce
definition = map_state.next(reduce_ecs_task)
sfn.StateMachine(
self, "Pipeline",
state_machine_name="adsb-map-reduce",
definition_body=sfn.DefinitionBody.from_chainable(definition),
timeout=Duration.hours(48),
)
# --- Outputs ---
cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name)
cdk.CfnOutput(self, "StateMachineName", value="adsb-map-reduce")
+2 -11
View File
@@ -1,15 +1,6 @@
from pathlib import Path
from datetime import datetime, timezone, timedelta
import argparse
parser = argparse.ArgumentParser(description="Create daily FAA release")
parser.add_argument("--date", type=str, help="Date to process (YYYY-MM-DD format, default: today)")
args = parser.parse_args()
if args.date:
date_str = args.date
else:
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
from datetime import datetime, timezone
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
out_dir = Path("data/faa_releasable")
out_dir.mkdir(parents=True, exist_ok=True)
+90
View File
@@ -0,0 +1,90 @@
"""
Generate Step Functions input and start the pipeline.
Usage:
python trigger_pipeline.py 2024-01-01 2025-01-01
python trigger_pipeline.py 2024-01-01 2025-01-01 --chunk-days 30
python trigger_pipeline.py 2024-01-01 2025-01-01 --dry-run
"""
import argparse
import json
import os
import uuid
from datetime import datetime, timedelta
import boto3
def generate_chunks(start_date: str, end_date: str, chunk_days: int = 1):
"""Split a date range into chunks of chunk_days."""
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
chunks = []
current = start
while current < end:
chunk_end = min(current + timedelta(days=chunk_days), end)
chunks.append({
"start_date": current.strftime("%Y-%m-%d"),
"end_date": chunk_end.strftime("%Y-%m-%d"),
})
current = chunk_end
return chunks
def main():
parser = argparse.ArgumentParser(description="Trigger ADS-B map-reduce pipeline")
parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)")
parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)")
parser.add_argument("--chunk-days", type=int, default=1,
help="Days per chunk (default: 1)")
parser.add_argument("--dry-run", action="store_true",
help="Print input JSON without starting execution")
args = parser.parse_args()
run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}"
chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days)
# Inject run_id into each chunk
for chunk in chunks:
chunk["run_id"] = run_id
sfn_input = {
"run_id": run_id,
"global_start_date": args.start_date,
"global_end_date": args.end_date,
"chunks": chunks,
}
print(f"Run ID: {run_id}")
print(f"Chunks: {len(chunks)} (at {args.chunk_days} days each)")
print(f"Max concurrency: 3 (enforced by Step Functions Map state)")
print()
print(json.dumps(sfn_input, indent=2))
if args.dry_run:
print("\n--dry-run: not starting execution")
return
client = boto3.client("stepfunctions")
# Find the state machine ARN
machines = client.list_state_machines()["stateMachines"]
arn = next(
m["stateMachineArn"]
for m in machines
if m["name"] == "adsb-map-reduce"
)
response = client.start_execution(
stateMachineArn=arn,
name=run_id,
input=json.dumps(sfn_input),
)
print(f"\nStarted execution: {response['executionArn']}")
if __name__ == "__main__":
main()