diff --git a/infra/app.py b/infra/app.py deleted file mode 100644 index 83509f6..0000000 --- a/infra/app.py +++ /dev/null @@ -1,11 +0,0 @@ -#!/usr/bin/env python3 -import os -import aws_cdk as cdk -from stack import AdsbProcessingStack - -app = cdk.App() -AdsbProcessingStack(app, "AdsbProcessingStack", env=cdk.Environment( - account=os.environ["CDK_DEFAULT_ACCOUNT"], - region=os.environ["CDK_DEFAULT_REGION"], -)) -app.synth() diff --git a/infra/cdk.json b/infra/cdk.json deleted file mode 100644 index b4baa10..0000000 --- a/infra/cdk.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "app": "python3 app.py" -} diff --git a/infra/requirements.txt b/infra/requirements.txt deleted file mode 100644 index 32b3387..0000000 --- a/infra/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -aws-cdk-lib>=2.170.0 -constructs>=10.0.0 diff --git a/infra/stack.py b/infra/stack.py deleted file mode 100644 index 81c501f..0000000 --- a/infra/stack.py +++ /dev/null @@ -1,213 +0,0 @@ -import aws_cdk as cdk -from aws_cdk import ( - Stack, - Duration, - RemovalPolicy, - aws_s3 as s3, - aws_ecs as ecs, - aws_ec2 as ec2, - aws_ecr_assets, - aws_iam as iam, - aws_logs as logs, - aws_stepfunctions as sfn, - aws_stepfunctions_tasks as sfn_tasks, -) -from constructs import Construct -from pathlib import Path - - -class AdsbProcessingStack(Stack): - def __init__(self, scope: Construct, id: str, **kwargs): - super().__init__(scope, id, **kwargs) - - # --- S3 bucket for intermediate and final results --- - bucket = s3.Bucket( - self, "ResultsBucket", - bucket_name="openairframes-dev", - removal_policy=RemovalPolicy.DESTROY, - auto_delete_objects=True, - lifecycle_rules=[ - s3.LifecycleRule( - prefix="intermediate/", - expiration=Duration.days(7), - ) - ], - ) - - # --- Use default VPC (no additional cost) --- - vpc = ec2.Vpc.from_lookup( - self, "Vpc", - is_default=True, - ) - - # --- ECS Cluster --- - cluster = ecs.Cluster( - self, "Cluster", - vpc=vpc, - container_insights=True, - ) - - # --- Log group --- - log_group = logs.LogGroup( - self, "LogGroup", - log_group_name="/adsb-processing", - removal_policy=RemovalPolicy.DESTROY, - retention=logs.RetentionDays.TWO_WEEKS, - ) - - # --- Docker images (built from local Dockerfiles) --- - adsb_dir = str(Path(__file__).parent.parent / "src" / "adsb") - - worker_image = ecs.ContainerImage.from_asset( - adsb_dir, - file="Dockerfile.worker", - platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64, - ) - reducer_image = ecs.ContainerImage.from_asset( - adsb_dir, - file="Dockerfile.reducer", - platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64, - ) - - # --- Task role (shared) --- - task_role = iam.Role( - self, "TaskRole", - assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"), - ) - bucket.grant_read_write(task_role) - - # --- MAP: worker task definition --- - map_task_def = ecs.FargateTaskDefinition( - self, "MapTaskDef", - cpu=4096, # 4 vCPU - memory_limit_mib=30720, # 30 GB - task_role=task_role, - runtime_platform=ecs.RuntimePlatform( - cpu_architecture=ecs.CpuArchitecture.ARM64, - operating_system_family=ecs.OperatingSystemFamily.LINUX, - ), - ) - map_container = map_task_def.add_container( - "worker", - image=worker_image, - logging=ecs.LogDrivers.aws_logs( - stream_prefix="map", - log_group=log_group, - ), - environment={ - "S3_BUCKET": bucket.bucket_name, - }, - ) - - # --- REDUCE: reducer task definition --- - reduce_task_def = ecs.FargateTaskDefinition( - self, "ReduceTaskDef", - cpu=4096, # 4 vCPU - memory_limit_mib=30720, # 30 GB — must hold full year in memory - task_role=task_role, - runtime_platform=ecs.RuntimePlatform( - cpu_architecture=ecs.CpuArchitecture.ARM64, - operating_system_family=ecs.OperatingSystemFamily.LINUX, - ), - ) - reduce_container = reduce_task_def.add_container( - "reducer", - image=reducer_image, - logging=ecs.LogDrivers.aws_logs( - stream_prefix="reduce", - log_group=log_group, - ), - environment={ - "S3_BUCKET": bucket.bucket_name, - }, - ) - - # --- Step Functions --- - - # Map task: run ECS Fargate for each date chunk - map_ecs_task = sfn_tasks.EcsRunTask( - self, "ProcessChunk", - integration_pattern=sfn.IntegrationPattern.RUN_JOB, - cluster=cluster, - task_definition=map_task_def, - launch_target=sfn_tasks.EcsFargateLaunchTarget( - platform_version=ecs.FargatePlatformVersion.LATEST, - ), - container_overrides=[ - sfn_tasks.ContainerOverride( - container_definition=map_container, - environment=[ - sfn_tasks.TaskEnvironmentVariable( - name="START_DATE", - value=sfn.JsonPath.string_at("$.start_date"), - ), - sfn_tasks.TaskEnvironmentVariable( - name="END_DATE", - value=sfn.JsonPath.string_at("$.end_date"), - ), - sfn_tasks.TaskEnvironmentVariable( - name="RUN_ID", - value=sfn.JsonPath.string_at("$.run_id"), - ), - ], - ) - ], - assign_public_ip=True, - subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), - result_path="$.task_result", - ) - - # Map state — max 3 concurrent workers - map_state = sfn.Map( - self, "FanOutChunks", - items_path="$.chunks", - max_concurrency=3, - result_path="$.map_results", - ) - map_state.item_processor(map_ecs_task) - - # Reduce task: combine all chunk CSVs - reduce_ecs_task = sfn_tasks.EcsRunTask( - self, "ReduceResults", - integration_pattern=sfn.IntegrationPattern.RUN_JOB, - cluster=cluster, - task_definition=reduce_task_def, - launch_target=sfn_tasks.EcsFargateLaunchTarget( - platform_version=ecs.FargatePlatformVersion.LATEST, - ), - container_overrides=[ - sfn_tasks.ContainerOverride( - container_definition=reduce_container, - environment=[ - sfn_tasks.TaskEnvironmentVariable( - name="RUN_ID", - value=sfn.JsonPath.string_at("$.run_id"), - ), - sfn_tasks.TaskEnvironmentVariable( - name="GLOBAL_START_DATE", - value=sfn.JsonPath.string_at("$.global_start_date"), - ), - sfn_tasks.TaskEnvironmentVariable( - name="GLOBAL_END_DATE", - value=sfn.JsonPath.string_at("$.global_end_date"), - ), - ], - ) - ], - assign_public_ip=True, - subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), - ) - - # Chain: fan-out map → reduce - definition = map_state.next(reduce_ecs_task) - - sfn.StateMachine( - self, "Pipeline", - state_machine_name="adsb-map-reduce", - definition_body=sfn.DefinitionBody.from_chainable(definition), - timeout=Duration.hours(48), - ) - - # --- Outputs --- - cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name) - cdk.CfnOutput(self, "StateMachineName", value="adsb-map-reduce") diff --git a/trigger_pipeline.py b/trigger_pipeline.py deleted file mode 100644 index 56f47d8..0000000 --- a/trigger_pipeline.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -Generate Step Functions input and start the pipeline. - -Usage: - python trigger_pipeline.py 2024-01-01 2025-01-01 - python trigger_pipeline.py 2024-01-01 2025-01-01 --chunk-days 30 - python trigger_pipeline.py 2024-01-01 2025-01-01 --dry-run -""" -import argparse -import json -import os -import uuid -from datetime import datetime, timedelta - -import boto3 - - -def generate_chunks(start_date: str, end_date: str, chunk_days: int = 1): - """Split a date range into chunks of chunk_days.""" - start = datetime.strptime(start_date, "%Y-%m-%d") - end = datetime.strptime(end_date, "%Y-%m-%d") - - chunks = [] - current = start - while current < end: - chunk_end = min(current + timedelta(days=chunk_days), end) - chunks.append({ - "start_date": current.strftime("%Y-%m-%d"), - "end_date": chunk_end.strftime("%Y-%m-%d"), - }) - current = chunk_end - - return chunks - - -def main(): - parser = argparse.ArgumentParser(description="Trigger ADS-B map-reduce pipeline") - parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)") - parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)") - parser.add_argument("--chunk-days", type=int, default=1, - help="Days per chunk (default: 1)") - parser.add_argument("--dry-run", action="store_true", - help="Print input JSON without starting execution") - args = parser.parse_args() - - run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}" - chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days) - - # Inject run_id into each chunk - for chunk in chunks: - chunk["run_id"] = run_id - - sfn_input = { - "run_id": run_id, - "global_start_date": args.start_date, - "global_end_date": args.end_date, - "chunks": chunks, - } - - print(f"Run ID: {run_id}") - print(f"Chunks: {len(chunks)} (at {args.chunk_days} days each)") - print(f"Max concurrency: 3 (enforced by Step Functions Map state)") - print() - print(json.dumps(sfn_input, indent=2)) - - if args.dry_run: - print("\n--dry-run: not starting execution") - return - - client = boto3.client("stepfunctions") - - # Find the state machine ARN - machines = client.list_state_machines()["stateMachines"] - arn = next( - m["stateMachineArn"] - for m in machines - if m["name"] == "adsb-map-reduce" - ) - - response = client.start_execution( - stateMachineArn=arn, - name=run_id, - input=json.dumps(sfn_input), - ) - - print(f"\nStarted execution: {response['executionArn']}") - - -if __name__ == "__main__": - main()