mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-04-23 19:46:09 +02:00
delete aws
This commit is contained in:
@@ -1,11 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import aws_cdk as cdk
|
||||
from stack import AdsbProcessingStack
|
||||
|
||||
app = cdk.App()
|
||||
AdsbProcessingStack(app, "AdsbProcessingStack", env=cdk.Environment(
|
||||
account=os.environ["CDK_DEFAULT_ACCOUNT"],
|
||||
region=os.environ["CDK_DEFAULT_REGION"],
|
||||
))
|
||||
app.synth()
|
||||
@@ -1,3 +0,0 @@
|
||||
{
|
||||
"app": "python3 app.py"
|
||||
}
|
||||
@@ -1,2 +0,0 @@
|
||||
aws-cdk-lib>=2.170.0
|
||||
constructs>=10.0.0
|
||||
-213
@@ -1,213 +0,0 @@
|
||||
import aws_cdk as cdk
|
||||
from aws_cdk import (
|
||||
Stack,
|
||||
Duration,
|
||||
RemovalPolicy,
|
||||
aws_s3 as s3,
|
||||
aws_ecs as ecs,
|
||||
aws_ec2 as ec2,
|
||||
aws_ecr_assets,
|
||||
aws_iam as iam,
|
||||
aws_logs as logs,
|
||||
aws_stepfunctions as sfn,
|
||||
aws_stepfunctions_tasks as sfn_tasks,
|
||||
)
|
||||
from constructs import Construct
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class AdsbProcessingStack(Stack):
|
||||
def __init__(self, scope: Construct, id: str, **kwargs):
|
||||
super().__init__(scope, id, **kwargs)
|
||||
|
||||
# --- S3 bucket for intermediate and final results ---
|
||||
bucket = s3.Bucket(
|
||||
self, "ResultsBucket",
|
||||
bucket_name="openairframes-dev",
|
||||
removal_policy=RemovalPolicy.DESTROY,
|
||||
auto_delete_objects=True,
|
||||
lifecycle_rules=[
|
||||
s3.LifecycleRule(
|
||||
prefix="intermediate/",
|
||||
expiration=Duration.days(7),
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
# --- Use default VPC (no additional cost) ---
|
||||
vpc = ec2.Vpc.from_lookup(
|
||||
self, "Vpc",
|
||||
is_default=True,
|
||||
)
|
||||
|
||||
# --- ECS Cluster ---
|
||||
cluster = ecs.Cluster(
|
||||
self, "Cluster",
|
||||
vpc=vpc,
|
||||
container_insights=True,
|
||||
)
|
||||
|
||||
# --- Log group ---
|
||||
log_group = logs.LogGroup(
|
||||
self, "LogGroup",
|
||||
log_group_name="/adsb-processing",
|
||||
removal_policy=RemovalPolicy.DESTROY,
|
||||
retention=logs.RetentionDays.TWO_WEEKS,
|
||||
)
|
||||
|
||||
# --- Docker images (built from local Dockerfiles) ---
|
||||
adsb_dir = str(Path(__file__).parent.parent / "src" / "adsb")
|
||||
|
||||
worker_image = ecs.ContainerImage.from_asset(
|
||||
adsb_dir,
|
||||
file="Dockerfile.worker",
|
||||
platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64,
|
||||
)
|
||||
reducer_image = ecs.ContainerImage.from_asset(
|
||||
adsb_dir,
|
||||
file="Dockerfile.reducer",
|
||||
platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64,
|
||||
)
|
||||
|
||||
# --- Task role (shared) ---
|
||||
task_role = iam.Role(
|
||||
self, "TaskRole",
|
||||
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
|
||||
)
|
||||
bucket.grant_read_write(task_role)
|
||||
|
||||
# --- MAP: worker task definition ---
|
||||
map_task_def = ecs.FargateTaskDefinition(
|
||||
self, "MapTaskDef",
|
||||
cpu=4096, # 4 vCPU
|
||||
memory_limit_mib=30720, # 30 GB
|
||||
task_role=task_role,
|
||||
runtime_platform=ecs.RuntimePlatform(
|
||||
cpu_architecture=ecs.CpuArchitecture.ARM64,
|
||||
operating_system_family=ecs.OperatingSystemFamily.LINUX,
|
||||
),
|
||||
)
|
||||
map_container = map_task_def.add_container(
|
||||
"worker",
|
||||
image=worker_image,
|
||||
logging=ecs.LogDrivers.aws_logs(
|
||||
stream_prefix="map",
|
||||
log_group=log_group,
|
||||
),
|
||||
environment={
|
||||
"S3_BUCKET": bucket.bucket_name,
|
||||
},
|
||||
)
|
||||
|
||||
# --- REDUCE: reducer task definition ---
|
||||
reduce_task_def = ecs.FargateTaskDefinition(
|
||||
self, "ReduceTaskDef",
|
||||
cpu=4096, # 4 vCPU
|
||||
memory_limit_mib=30720, # 30 GB — must hold full year in memory
|
||||
task_role=task_role,
|
||||
runtime_platform=ecs.RuntimePlatform(
|
||||
cpu_architecture=ecs.CpuArchitecture.ARM64,
|
||||
operating_system_family=ecs.OperatingSystemFamily.LINUX,
|
||||
),
|
||||
)
|
||||
reduce_container = reduce_task_def.add_container(
|
||||
"reducer",
|
||||
image=reducer_image,
|
||||
logging=ecs.LogDrivers.aws_logs(
|
||||
stream_prefix="reduce",
|
||||
log_group=log_group,
|
||||
),
|
||||
environment={
|
||||
"S3_BUCKET": bucket.bucket_name,
|
||||
},
|
||||
)
|
||||
|
||||
# --- Step Functions ---
|
||||
|
||||
# Map task: run ECS Fargate for each date chunk
|
||||
map_ecs_task = sfn_tasks.EcsRunTask(
|
||||
self, "ProcessChunk",
|
||||
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
|
||||
cluster=cluster,
|
||||
task_definition=map_task_def,
|
||||
launch_target=sfn_tasks.EcsFargateLaunchTarget(
|
||||
platform_version=ecs.FargatePlatformVersion.LATEST,
|
||||
),
|
||||
container_overrides=[
|
||||
sfn_tasks.ContainerOverride(
|
||||
container_definition=map_container,
|
||||
environment=[
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="START_DATE",
|
||||
value=sfn.JsonPath.string_at("$.start_date"),
|
||||
),
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="END_DATE",
|
||||
value=sfn.JsonPath.string_at("$.end_date"),
|
||||
),
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="RUN_ID",
|
||||
value=sfn.JsonPath.string_at("$.run_id"),
|
||||
),
|
||||
],
|
||||
)
|
||||
],
|
||||
assign_public_ip=True,
|
||||
subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
|
||||
result_path="$.task_result",
|
||||
)
|
||||
|
||||
# Map state — max 3 concurrent workers
|
||||
map_state = sfn.Map(
|
||||
self, "FanOutChunks",
|
||||
items_path="$.chunks",
|
||||
max_concurrency=3,
|
||||
result_path="$.map_results",
|
||||
)
|
||||
map_state.item_processor(map_ecs_task)
|
||||
|
||||
# Reduce task: combine all chunk CSVs
|
||||
reduce_ecs_task = sfn_tasks.EcsRunTask(
|
||||
self, "ReduceResults",
|
||||
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
|
||||
cluster=cluster,
|
||||
task_definition=reduce_task_def,
|
||||
launch_target=sfn_tasks.EcsFargateLaunchTarget(
|
||||
platform_version=ecs.FargatePlatformVersion.LATEST,
|
||||
),
|
||||
container_overrides=[
|
||||
sfn_tasks.ContainerOverride(
|
||||
container_definition=reduce_container,
|
||||
environment=[
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="RUN_ID",
|
||||
value=sfn.JsonPath.string_at("$.run_id"),
|
||||
),
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="GLOBAL_START_DATE",
|
||||
value=sfn.JsonPath.string_at("$.global_start_date"),
|
||||
),
|
||||
sfn_tasks.TaskEnvironmentVariable(
|
||||
name="GLOBAL_END_DATE",
|
||||
value=sfn.JsonPath.string_at("$.global_end_date"),
|
||||
),
|
||||
],
|
||||
)
|
||||
],
|
||||
assign_public_ip=True,
|
||||
subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
|
||||
)
|
||||
|
||||
# Chain: fan-out map → reduce
|
||||
definition = map_state.next(reduce_ecs_task)
|
||||
|
||||
sfn.StateMachine(
|
||||
self, "Pipeline",
|
||||
state_machine_name="adsb-map-reduce",
|
||||
definition_body=sfn.DefinitionBody.from_chainable(definition),
|
||||
timeout=Duration.hours(48),
|
||||
)
|
||||
|
||||
# --- Outputs ---
|
||||
cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name)
|
||||
cdk.CfnOutput(self, "StateMachineName", value="adsb-map-reduce")
|
||||
@@ -1,90 +0,0 @@
|
||||
"""
|
||||
Generate Step Functions input and start the pipeline.
|
||||
|
||||
Usage:
|
||||
python trigger_pipeline.py 2024-01-01 2025-01-01
|
||||
python trigger_pipeline.py 2024-01-01 2025-01-01 --chunk-days 30
|
||||
python trigger_pipeline.py 2024-01-01 2025-01-01 --dry-run
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import boto3
|
||||
|
||||
|
||||
def generate_chunks(start_date: str, end_date: str, chunk_days: int = 1):
|
||||
"""Split a date range into chunks of chunk_days."""
|
||||
start = datetime.strptime(start_date, "%Y-%m-%d")
|
||||
end = datetime.strptime(end_date, "%Y-%m-%d")
|
||||
|
||||
chunks = []
|
||||
current = start
|
||||
while current < end:
|
||||
chunk_end = min(current + timedelta(days=chunk_days), end)
|
||||
chunks.append({
|
||||
"start_date": current.strftime("%Y-%m-%d"),
|
||||
"end_date": chunk_end.strftime("%Y-%m-%d"),
|
||||
})
|
||||
current = chunk_end
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Trigger ADS-B map-reduce pipeline")
|
||||
parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)")
|
||||
parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)")
|
||||
parser.add_argument("--chunk-days", type=int, default=1,
|
||||
help="Days per chunk (default: 1)")
|
||||
parser.add_argument("--dry-run", action="store_true",
|
||||
help="Print input JSON without starting execution")
|
||||
args = parser.parse_args()
|
||||
|
||||
run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}"
|
||||
chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days)
|
||||
|
||||
# Inject run_id into each chunk
|
||||
for chunk in chunks:
|
||||
chunk["run_id"] = run_id
|
||||
|
||||
sfn_input = {
|
||||
"run_id": run_id,
|
||||
"global_start_date": args.start_date,
|
||||
"global_end_date": args.end_date,
|
||||
"chunks": chunks,
|
||||
}
|
||||
|
||||
print(f"Run ID: {run_id}")
|
||||
print(f"Chunks: {len(chunks)} (at {args.chunk_days} days each)")
|
||||
print(f"Max concurrency: 3 (enforced by Step Functions Map state)")
|
||||
print()
|
||||
print(json.dumps(sfn_input, indent=2))
|
||||
|
||||
if args.dry_run:
|
||||
print("\n--dry-run: not starting execution")
|
||||
return
|
||||
|
||||
client = boto3.client("stepfunctions")
|
||||
|
||||
# Find the state machine ARN
|
||||
machines = client.list_state_machines()["stateMachines"]
|
||||
arn = next(
|
||||
m["stateMachineArn"]
|
||||
for m in machines
|
||||
if m["name"] == "adsb-map-reduce"
|
||||
)
|
||||
|
||||
response = client.start_execution(
|
||||
stateMachineArn=arn,
|
||||
name=run_id,
|
||||
input=json.dumps(sfn_input),
|
||||
)
|
||||
|
||||
print(f"\nStarted execution: {response['executionArn']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user