From 27da93801ebc099a3e838db605ad2984da4165a6 Mon Sep 17 00:00:00 2001 From: ggman12 Date: Mon, 9 Feb 2026 18:58:49 -0500 Subject: [PATCH] FEATURE: add historical adsb aircraft data and update daily adsb aircraft data derivation. add clickhouse_connect use 32GB update to no longer do df.copy() Add planequery_adsb_read.ipynb INCREASE: update Fargate task definition to 16 vCPU and 64 GB memory for improved performance on large datasets update notebook remove print(df) Ensure empty strings are preserved in DataFrame columns check if day has data for adsb update notebook --- .../planequery-aircraft-daily-release.yaml | 75 +- .gitignore | 65 +- infra/app.py | 11 + infra/cdk.json | 3 + infra/requirements.txt | 2 + infra/stack.py | 225 ++++++ notebooks/planequery_adsb_read.ipynb | 640 ++++++++++++++++ requirements.txt | 2 +- src/adsb/Dockerfile.reducer | 10 + src/adsb/Dockerfile.worker | 11 + src/adsb/adsb_to_aircraft_data_daily.py | 7 + src/adsb/adsb_to_aircraft_data_historical.py | 87 +++ src/adsb/compress_adsb_to_aircraft_data.py | 170 +++++ src/adsb/download_adsb_data_to_parquet.py | 684 ++++++++++++++++++ src/adsb/reducer.py | 97 +++ src/adsb/requirements.reducer.txt | 2 + src/adsb/requirements.worker.txt | 4 + src/adsb/worker.py | 94 +++ src/concat_csvs.py | 89 --- ..._daily_planequery_aircraft_adsb_release.py | 64 ++ ...reate_daily_planequery_aircraft_release.py | 6 +- src/get_historical_faa.py | 2 +- src/get_latest_planequery_aircraft_release.py | 55 +- trigger_pipeline.py | 97 +++ 24 files changed, 2387 insertions(+), 115 deletions(-) create mode 100644 infra/app.py create mode 100644 infra/cdk.json create mode 100644 infra/requirements.txt create mode 100644 infra/stack.py create mode 100644 notebooks/planequery_adsb_read.ipynb create mode 100644 src/adsb/Dockerfile.reducer create mode 100644 src/adsb/Dockerfile.worker create mode 100644 src/adsb/adsb_to_aircraft_data_daily.py create mode 100644 src/adsb/adsb_to_aircraft_data_historical.py create mode 100644 src/adsb/compress_adsb_to_aircraft_data.py create mode 100644 src/adsb/download_adsb_data_to_parquet.py create mode 100644 src/adsb/reducer.py create mode 100644 src/adsb/requirements.reducer.txt create mode 100644 src/adsb/requirements.worker.txt create mode 100644 src/adsb/worker.py delete mode 100644 src/concat_csvs.py create mode 100644 src/create_daily_planequery_aircraft_adsb_release.py create mode 100644 trigger_pipeline.py diff --git a/.github/workflows/planequery-aircraft-daily-release.yaml b/.github/workflows/planequery-aircraft-daily-release.yaml index 6e50e58..3cb65f9 100644 --- a/.github/workflows/planequery-aircraft-daily-release.yaml +++ b/.github/workflows/planequery-aircraft-daily-release.yaml @@ -2,27 +2,54 @@ name: planequery-aircraft Daily Release on: schedule: - # 6:00pm UTC every day + # 6:00pm UTC every day - runs on default branch, triggers both - cron: "0 06 * * *" - workflow_dispatch: {} + workflow_dispatch: permissions: contents: write jobs: - build-and-release: + trigger-releases: runs-on: ubuntu-latest + if: github.event_name == 'schedule' + steps: + - name: Trigger main branch release + uses: actions/github-script@v7 + with: + script: | + await github.rest.actions.createWorkflowDispatch({ + owner: context.repo.owner, + repo: context.repo.repo, + workflow_id: 'planequery-aircraft-daily-release.yaml', + ref: 'main' + }); + + - name: Trigger develop branch release + uses: actions/github-script@v7 + with: + script: | + await github.rest.actions.createWorkflowDispatch({ + owner: context.repo.owner, + repo: context.repo.repo, + workflow_id: 'planequery-aircraft-daily-release.yaml', + ref: 'develop' + }); + + build-and-release: + runs-on: ubuntu-24.04-arm + if: github.event_name != 'schedule' steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 - name: Setup Python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: - python-version: "3.12" + python-version: "3.14" - name: Install dependencies run: | @@ -30,8 +57,13 @@ jobs: pip install -r requirements.txt - name: Run daily release script + env: + CLICKHOUSE_HOST: ${{ secrets.CLICKHOUSE_HOST }} + CLICKHOUSE_USERNAME: ${{ secrets.CLICKHOUSE_USERNAME }} + CLICKHOUSE_PASSWORD: ${{ secrets.CLICKHOUSE_PASSWORD }} run: | python src/create_daily_planequery_aircraft_release.py + python src/create_daily_planequery_aircraft_adsb_release.py ls -lah data/faa_releasable ls -lah data/planequery_aircraft @@ -39,15 +71,26 @@ jobs: id: meta run: | DATE=$(date -u +"%Y-%m-%d") - TAG="planequery-aircraft-${DATE}" - # Find the CSV file in data/planequery_aircraft matching the pattern - CSV_FILE=$(ls data/planequery_aircraft/planequery_aircraft_*_${DATE}.csv | head -1) - CSV_BASENAME=$(basename "$CSV_FILE") + BRANCH_NAME="${GITHUB_REF#refs/heads/}" + BRANCH_SUFFIX="" + if [ "$BRANCH_NAME" = "main" ]; then + BRANCH_SUFFIX="-main" + elif [ "$BRANCH_NAME" = "develop" ]; then + BRANCH_SUFFIX="-develop" + fi + TAG="planequery-aircraft-${DATE}${BRANCH_SUFFIX}" + # Find the CSV files in data/planequery_aircraft matching the patterns + CSV_FILE_FAA=$(ls data/planequery_aircraft/planequery_aircraft_faa_*_${DATE}.csv | head -1) + CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA") + CSV_FILE_ADSB=$(ls data/planequery_aircraft/planequery_aircraft_adsb_*_${DATE}.csv | head -1) + CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB") echo "date=$DATE" >> "$GITHUB_OUTPUT" echo "tag=$TAG" >> "$GITHUB_OUTPUT" - echo "csv_file=$CSV_FILE" >> "$GITHUB_OUTPUT" - echo "csv_basename=$CSV_BASENAME" >> "$GITHUB_OUTPUT" - echo "name=planequery-aircraft snapshot ($DATE)" >> "$GITHUB_OUTPUT" + echo "csv_file_faa=$CSV_FILE_FAA" >> "$GITHUB_OUTPUT" + echo "csv_basename_faa=$CSV_BASENAME_FAA" >> "$GITHUB_OUTPUT" + echo "csv_file_adsb=$CSV_FILE_ADSB" >> "$GITHUB_OUTPUT" + echo "csv_basename_adsb=$CSV_BASENAME_ADSB" >> "$GITHUB_OUTPUT" + echo "name=planequery-aircraft snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT" - name: Create GitHub Release and upload assets uses: softprops/action-gh-release@v2 @@ -58,10 +101,12 @@ jobs: Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}. Assets: - - ${{ steps.meta.outputs.csv_basename }} + - ${{ steps.meta.outputs.csv_basename_faa }} + - ${{ steps.meta.outputs.csv_basename_adsb }} - ReleasableAircraft_${{ steps.meta.outputs.date }}.zip files: | - ${{ steps.meta.outputs.csv_file }} + ${{ steps.meta.outputs.csv_file_faa }} + ${{ steps.meta.outputs.csv_file_adsb }} data/faa_releasable/ReleasableAircraft_${{ steps.meta.outputs.date }}.zip env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore index 74326ca..7ef2c0b 100644 --- a/.gitignore +++ b/.gitignore @@ -218,4 +218,67 @@ __marimo__/ # Custom data/ .DS_Store -notebooks/ \ No newline at end of file + +# --- CDK --- +# VSCode extension + +# Store launch config in repo but not settings +.vscode/settings.json +/.favorites.json + +# TypeScript incremental build states +*.tsbuildinfo + +# Local state files & OS specifics +.DS_Store +node_modules/ +lerna-debug.log +dist/ +pack/ +.BUILD_COMPLETED +.local-npm/ +.tools/ +coverage/ +.nyc_output +.nycrc +.LAST_BUILD +*.sw[a-z] +*~ +.idea +*.iml +junit.xml + +# We don't want tsconfig at the root +/tsconfig.json + +# CDK Context & Staging files +cdk.context.json +.cdk.staging/ +cdk.out/ +*.tabl.json +cdk-integ.out.*/ + +# Yarn error log +yarn-error.log + +# VSCode history plugin +.vscode/.history/ + +# Cloud9 +.c9 +.nzm-* + +/.versionrc.json +RELEASE_NOTES.md + +# Produced by integ tests +read*lock + +# VSCode jest plugin +.test-output + +# Nx cache +.nx/ + +# jsii-rosetta files +type-fingerprints.txt \ No newline at end of file diff --git a/infra/app.py b/infra/app.py new file mode 100644 index 0000000..83509f6 --- /dev/null +++ b/infra/app.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +import os +import aws_cdk as cdk +from stack import AdsbProcessingStack + +app = cdk.App() +AdsbProcessingStack(app, "AdsbProcessingStack", env=cdk.Environment( + account=os.environ["CDK_DEFAULT_ACCOUNT"], + region=os.environ["CDK_DEFAULT_REGION"], +)) +app.synth() diff --git a/infra/cdk.json b/infra/cdk.json new file mode 100644 index 0000000..b4baa10 --- /dev/null +++ b/infra/cdk.json @@ -0,0 +1,3 @@ +{ + "app": "python3 app.py" +} diff --git a/infra/requirements.txt b/infra/requirements.txt new file mode 100644 index 0000000..32b3387 --- /dev/null +++ b/infra/requirements.txt @@ -0,0 +1,2 @@ +aws-cdk-lib>=2.170.0 +constructs>=10.0.0 diff --git a/infra/stack.py b/infra/stack.py new file mode 100644 index 0000000..84f6d49 --- /dev/null +++ b/infra/stack.py @@ -0,0 +1,225 @@ +import aws_cdk as cdk +from aws_cdk import ( + Stack, + Duration, + RemovalPolicy, + aws_s3 as s3, + aws_ecs as ecs, + aws_ec2 as ec2, + aws_ecr_assets, + aws_iam as iam, + aws_logs as logs, + aws_stepfunctions as sfn, + aws_stepfunctions_tasks as sfn_tasks, +) +from constructs import Construct +from pathlib import Path + + +class AdsbProcessingStack(Stack): + def __init__(self, scope: Construct, id: str, **kwargs): + super().__init__(scope, id, **kwargs) + + # --- S3 bucket for intermediate and final results --- + bucket = s3.Bucket( + self, "ResultsBucket", + bucket_name="planequery-aircraft-dev", + removal_policy=RemovalPolicy.DESTROY, + auto_delete_objects=True, + lifecycle_rules=[ + s3.LifecycleRule( + prefix="intermediate/", + expiration=Duration.days(7), + ) + ], + ) + + # --- Use default VPC (no additional cost) --- + vpc = ec2.Vpc.from_lookup( + self, "Vpc", + is_default=True, + ) + + # --- ECS Cluster --- + cluster = ecs.Cluster( + self, "Cluster", + vpc=vpc, + container_insights=True, + ) + + # --- Log group --- + log_group = logs.LogGroup( + self, "LogGroup", + log_group_name="/adsb-processing", + removal_policy=RemovalPolicy.DESTROY, + retention=logs.RetentionDays.TWO_WEEKS, + ) + + # --- Docker images (built from local Dockerfiles) --- + adsb_dir = str(Path(__file__).parent.parent / "src" / "adsb") + + worker_image = ecs.ContainerImage.from_asset( + adsb_dir, + file="Dockerfile.worker", + platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64, + ) + reducer_image = ecs.ContainerImage.from_asset( + adsb_dir, + file="Dockerfile.reducer", + platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64, + ) + + # --- Task role (shared) --- + task_role = iam.Role( + self, "TaskRole", + assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"), + ) + bucket.grant_read_write(task_role) + + # --- MAP: worker task definition --- + map_task_def = ecs.FargateTaskDefinition( + self, "MapTaskDef", + cpu=16384, # 16 vCPU + memory_limit_mib=65536, # 64 GB — high memory for pandas operations on large datasets + task_role=task_role, + runtime_platform=ecs.RuntimePlatform( + cpu_architecture=ecs.CpuArchitecture.ARM64, + operating_system_family=ecs.OperatingSystemFamily.LINUX, + ), + ) + map_container = map_task_def.add_container( + "worker", + image=worker_image, + logging=ecs.LogDrivers.aws_logs( + stream_prefix="map", + log_group=log_group, + ), + environment={ + "S3_BUCKET": bucket.bucket_name, + }, + ) + + # --- REDUCE: reducer task definition --- + reduce_task_def = ecs.FargateTaskDefinition( + self, "ReduceTaskDef", + cpu=4096, # 4 vCPU + memory_limit_mib=30720, # 30 GB — must hold full year in memory + task_role=task_role, + runtime_platform=ecs.RuntimePlatform( + cpu_architecture=ecs.CpuArchitecture.ARM64, + operating_system_family=ecs.OperatingSystemFamily.LINUX, + ), + ) + reduce_container = reduce_task_def.add_container( + "reducer", + image=reducer_image, + logging=ecs.LogDrivers.aws_logs( + stream_prefix="reduce", + log_group=log_group, + ), + environment={ + "S3_BUCKET": bucket.bucket_name, + }, + ) + + # --- Step Functions --- + + # Map task: run ECS Fargate for each date chunk + map_ecs_task = sfn_tasks.EcsRunTask( + self, "ProcessChunk", + integration_pattern=sfn.IntegrationPattern.RUN_JOB, + cluster=cluster, + task_definition=map_task_def, + launch_target=sfn_tasks.EcsFargateLaunchTarget( + platform_version=ecs.FargatePlatformVersion.LATEST, + ), + container_overrides=[ + sfn_tasks.ContainerOverride( + container_definition=map_container, + environment=[ + sfn_tasks.TaskEnvironmentVariable( + name="START_DATE", + value=sfn.JsonPath.string_at("$.start_date"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="END_DATE", + value=sfn.JsonPath.string_at("$.end_date"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="RUN_ID", + value=sfn.JsonPath.string_at("$.run_id"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="CLICKHOUSE_HOST", + value=sfn.JsonPath.string_at("$.clickhouse_host"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="CLICKHOUSE_USERNAME", + value=sfn.JsonPath.string_at("$.clickhouse_username"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="CLICKHOUSE_PASSWORD", + value=sfn.JsonPath.string_at("$.clickhouse_password"), + ), + ], + ) + ], + assign_public_ip=True, + subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), + result_path="$.task_result", + ) + + # Map state — max 3 concurrent workers + map_state = sfn.Map( + self, "FanOutChunks", + items_path="$.chunks", + max_concurrency=3, + result_path="$.map_results", + ) + map_state.item_processor(map_ecs_task) + + # Reduce task: combine all chunk CSVs + reduce_ecs_task = sfn_tasks.EcsRunTask( + self, "ReduceResults", + integration_pattern=sfn.IntegrationPattern.RUN_JOB, + cluster=cluster, + task_definition=reduce_task_def, + launch_target=sfn_tasks.EcsFargateLaunchTarget( + platform_version=ecs.FargatePlatformVersion.LATEST, + ), + container_overrides=[ + sfn_tasks.ContainerOverride( + container_definition=reduce_container, + environment=[ + sfn_tasks.TaskEnvironmentVariable( + name="RUN_ID", + value=sfn.JsonPath.string_at("$.run_id"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="GLOBAL_START_DATE", + value=sfn.JsonPath.string_at("$.global_start_date"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="GLOBAL_END_DATE", + value=sfn.JsonPath.string_at("$.global_end_date"), + ), + ], + ) + ], + assign_public_ip=True, + subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), + ) + + # Chain: fan-out map → reduce + definition = map_state.next(reduce_ecs_task) + + sfn.StateMachine( + self, "Pipeline", + state_machine_name="adsb-map-reduce", + definition_body=sfn.DefinitionBody.from_chainable(definition), + timeout=Duration.hours(48), + ) + + # --- Outputs --- + cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name) + cdk.CfnOutput(self, "StateMachineName", value="adsb-map-reduce") diff --git a/notebooks/planequery_adsb_read.ipynb b/notebooks/planequery_adsb_read.ipynb new file mode 100644 index 0000000..f0d98e0 --- /dev/null +++ b/notebooks/planequery_adsb_read.ipynb @@ -0,0 +1,640 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "06ae0319", + "metadata": {}, + "outputs": [], + "source": [ + "import clickhouse_connect\n", + "client = clickhouse_connect.get_client(\n", + " host=os.environ[\"CLICKHOUSE_HOST\"],\n", + " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", + " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", + " secure=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "779710f0", + "metadata": {}, + "outputs": [], + "source": [ + "df = client.query_df(\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '2024-01-01 00:00:00' AND time < '2024-01-02 00:00:00'\")\n", + "df_copy = df.copy()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bf024da8", + "metadata": {}, + "outputs": [], + "source": [ + "# -- military = dbFlags & 1; interesting = dbFlags & 2; PIA = dbFlags & 4; LADD = dbFlags & 8;" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "270607b5", + "metadata": {}, + "outputs": [], + "source": [ + "df = load_raw_adsb_for_day(datetime(2024,1,1))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ac06a30e", + "metadata": {}, + "outputs": [], + "source": [ + "df['aircraft']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "91edab3e", + "metadata": {}, + "outputs": [], + "source": [ + "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", + "def compress_df(df):\n", + " icao = df.name\n", + " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", + " original_df = df.copy()\n", + " df = df.groupby(\"_signature\", as_index=False).last() # check if it works with both last and first.\n", + " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", + " def get_non_empty_dict(row):\n", + " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", + " \n", + " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", + " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", + " \n", + " # Check if row i's non-empty values are a subset of row j's non-empty values\n", + " def is_subset_of_any(idx):\n", + " row_dict = df.loc[idx, '_non_empty_dict']\n", + " row_count = df.loc[idx, '_non_empty_count']\n", + " \n", + " for other_idx in df.index:\n", + " if idx == other_idx:\n", + " continue\n", + " other_dict = df.loc[other_idx, '_non_empty_dict']\n", + " other_count = df.loc[other_idx, '_non_empty_count']\n", + " \n", + " # Check if all non-empty values in current row match those in other row\n", + " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", + " # If they match and other has more defined columns, current row is redundant\n", + " if other_count > row_count:\n", + " return True\n", + " return False\n", + " \n", + " # Keep rows that are not subsets of any other row\n", + " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", + " df = df[keep_mask]\n", + "\n", + " if len(df) > 1:\n", + " original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n", + " value_counts = original_df[\"_signature\"].value_counts()\n", + " max_signature = value_counts.idxmax()\n", + " df = df[df['_signature'] == max_signature]\n", + "\n", + " df['icao'] = icao\n", + " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", + " return df\n", + "\n", + "# df = df_copy\n", + "# df = df_copy.iloc[0:100000]\n", + "# df = df[df['r'] == \"N4131T\"]\n", + "# df = df[(df['icao'] == \"008081\")]\n", + "# df = df.iloc[0:500]\n", + "df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + "df = df.drop(columns=['aircraft'])\n", + "df = df.sort_values(['icao', 'time'])\n", + "df[COLUMNS] = df[COLUMNS].fillna('')\n", + "ORIGINAL_COLUMNS = df.columns.tolist()\n", + "df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", + "cols = df_compressed.columns.tolist()\n", + "cols.remove(\"icao\")\n", + "cols.insert(1, \"icao\")\n", + "df_compressed = df_compressed[cols]\n", + "df_compressed" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "efdfcb2c", + "metadata": {}, + "outputs": [], + "source": [ + "df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + "df[~df['aircraft_category'].isna()]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "495c5025", + "metadata": {}, + "outputs": [], + "source": [ + "# SOME KIND OF MAP REDUCE SYSTEM\n", + "import os\n", + "\n", + "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", + "def compress_df(df):\n", + " icao = df.name\n", + " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", + " \n", + " # Compute signature counts before grouping (avoid copy)\n", + " signature_counts = df[\"_signature\"].value_counts()\n", + " \n", + " df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n", + " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", + " def get_non_empty_dict(row):\n", + " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", + " \n", + " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", + " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", + " \n", + " # Check if row i's non-empty values are a subset of row j's non-empty values\n", + " def is_subset_of_any(idx):\n", + " row_dict = df.loc[idx, '_non_empty_dict']\n", + " row_count = df.loc[idx, '_non_empty_count']\n", + " \n", + " for other_idx in df.index:\n", + " if idx == other_idx:\n", + " continue\n", + " other_dict = df.loc[other_idx, '_non_empty_dict']\n", + " other_count = df.loc[other_idx, '_non_empty_count']\n", + " \n", + " # Check if all non-empty values in current row match those in other row\n", + " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", + " # If they match and other has more defined columns, current row is redundant\n", + " if other_count > row_count:\n", + " return True\n", + " return False\n", + " \n", + " # Keep rows that are not subsets of any other row\n", + " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", + " df = df[keep_mask]\n", + "\n", + " if len(df) > 1:\n", + " # Use pre-computed signature counts instead of original_df\n", + " remaining_sigs = df['_signature']\n", + " sig_counts = signature_counts[remaining_sigs]\n", + " max_signature = sig_counts.idxmax()\n", + " df = df[df['_signature'] == max_signature]\n", + "\n", + " df['icao'] = icao\n", + " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", + " return df\n", + "\n", + "# names of releases something like\n", + "# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n", + "\n", + "# Let's build historical first. \n", + "\n", + "_ch_client = None\n", + "\n", + "def _get_clickhouse_client():\n", + " \"\"\"Return a reusable ClickHouse client, with retry/backoff for transient DNS or connection errors.\"\"\"\n", + " global _ch_client\n", + " if _ch_client is not None:\n", + " return _ch_client\n", + "\n", + " import clickhouse_connect\n", + " import time\n", + "\n", + " max_retries = 5\n", + " for attempt in range(1, max_retries + 1):\n", + " try:\n", + " _ch_client = clickhouse_connect.get_client(\n", + " host=os.environ[\"CLICKHOUSE_HOST\"],\n", + " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", + " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", + " secure=True,\n", + " )\n", + " return _ch_client\n", + " except Exception as e:\n", + " wait = min(2 ** attempt, 30)\n", + " print(f\" ClickHouse connect attempt {attempt}/{max_retries} failed: {e}\")\n", + " if attempt == max_retries:\n", + " raise\n", + " print(f\" Retrying in {wait}s...\")\n", + " time.sleep(wait)\n", + "\n", + "\n", + "def load_raw_adsb_for_day(day):\n", + " \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n", + " from datetime import timedelta\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " import time\n", + " \n", + " start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n", + " end_time = start_time + timedelta(days=1)\n", + " \n", + " # Set up caching\n", + " cache_dir = Path(\"data/adsb\")\n", + " cache_dir.mkdir(parents=True, exist_ok=True)\n", + " cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n", + " \n", + " # Check if cache exists\n", + " if cache_file.exists():\n", + " print(f\" Loading from cache: {cache_file}\")\n", + " df = pd.read_csv(cache_file, compression='zstd')\n", + " df['time'] = pd.to_datetime(df['time'])\n", + " else:\n", + " # Format dates for the query\n", + " start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " \n", + " max_retries = 3\n", + " for attempt in range(1, max_retries + 1):\n", + " try:\n", + " client = _get_clickhouse_client()\n", + " print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n", + " df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n", + " break\n", + " except Exception as e:\n", + " wait = min(2 ** attempt, 30)\n", + " print(f\" Query attempt {attempt}/{max_retries} failed: {e}\")\n", + " if attempt == max_retries:\n", + " raise\n", + " # Reset client in case connection is stale\n", + " global _ch_client\n", + " _ch_client = None\n", + " print(f\" Retrying in {wait}s...\")\n", + " time.sleep(wait)\n", + " \n", + " # Save to cache\n", + " df.to_csv(cache_file, index=False, compression='zstd')\n", + " print(f\" Saved to cache: {cache_file}\")\n", + " \n", + " return df\n", + "\n", + "def load_historical_for_day(day):\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " \n", + " df = load_raw_adsb_for_day(day)\n", + " print(df)\n", + " df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + " df = df.drop(columns=['aircraft'])\n", + " df = df.sort_values(['icao', 'time'])\n", + " df[COLUMNS] = df[COLUMNS].fillna('')\n", + " df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", + " cols = df_compressed.columns.tolist()\n", + " cols.remove('time')\n", + " cols.insert(0, 'time')\n", + " cols.remove(\"icao\")\n", + " cols.insert(1, \"icao\")\n", + " df_compressed = df_compressed[cols]\n", + " return df_compressed\n", + "\n", + "\n", + "def concat_compressed_dfs(df_base, df_new):\n", + " \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n", + " import pandas as pd\n", + " \n", + " # Combine both dataframes\n", + " df_combined = pd.concat([df_base, df_new], ignore_index=True)\n", + " \n", + " # Sort by ICAO and time\n", + " df_combined = df_combined.sort_values(['icao', 'time'])\n", + " \n", + " # Fill NaN values\n", + " df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n", + " \n", + " # Apply compression logic per ICAO to get the best row\n", + " df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n", + " \n", + " # Sort by time\n", + " df_compressed = df_compressed.sort_values('time')\n", + " \n", + " return df_compressed\n", + "\n", + "\n", + "def get_latest_aircraft_adsb_csv_df():\n", + " \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n", + " from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n", + " \n", + " import pandas as pd\n", + " import re\n", + " \n", + " csv_path = download_latest_aircraft_adsb_csv()\n", + " df = pd.read_csv(csv_path)\n", + " df = df.fillna(\"\")\n", + " \n", + " # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n", + " match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n", + " if not match:\n", + " raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n", + " \n", + " date_str = match.group(1)\n", + " return df, date_str\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7f66acf7", + "metadata": {}, + "outputs": [], + "source": [ + "# SOME KIND OF MAP REDUCE SYSTEM\n", + "\n", + "\n", + "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", + "def compress_df(df):\n", + " icao = df.name\n", + " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", + " original_df = df.copy()\n", + " df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n", + " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", + " def get_non_empty_dict(row):\n", + " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", + " \n", + " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", + " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", + " \n", + " # Check if row i's non-empty values are a subset of row j's non-empty values\n", + " def is_subset_of_any(idx):\n", + " row_dict = df.loc[idx, '_non_empty_dict']\n", + " row_count = df.loc[idx, '_non_empty_count']\n", + " \n", + " for other_idx in df.index:\n", + " if idx == other_idx:\n", + " continue\n", + " other_dict = df.loc[other_idx, '_non_empty_dict']\n", + " other_count = df.loc[other_idx, '_non_empty_count']\n", + " \n", + " # Check if all non-empty values in current row match those in other row\n", + " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", + " # If they match and other has more defined columns, current row is redundant\n", + " if other_count > row_count:\n", + " return True\n", + " return False\n", + " \n", + " # Keep rows that are not subsets of any other row\n", + " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", + " df = df[keep_mask]\n", + "\n", + " if len(df) > 1:\n", + " original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n", + " value_counts = original_df[\"_signature\"].value_counts()\n", + " max_signature = value_counts.idxmax()\n", + " df = df[df['_signature'] == max_signature]\n", + "\n", + " df['icao'] = icao\n", + " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", + " return df\n", + "\n", + "# names of releases something like\n", + "# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n", + "\n", + "# Let's build historical first. \n", + "\n", + "def load_raw_adsb_for_day(day):\n", + " \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n", + " from datetime import timedelta\n", + " import clickhouse_connect\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " \n", + " start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n", + " end_time = start_time + timedelta(days=1)\n", + " \n", + " # Set up caching\n", + " cache_dir = Path(\"data/adsb\")\n", + " cache_dir.mkdir(parents=True, exist_ok=True)\n", + " cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n", + " \n", + " # Check if cache exists\n", + " if cache_file.exists():\n", + " print(f\" Loading from cache: {cache_file}\")\n", + " df = pd.read_csv(cache_file, compression='zstd')\n", + " df['time'] = pd.to_datetime(df['time'])\n", + " else:\n", + " # Format dates for the query\n", + " start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " \n", + " client = clickhouse_connect.get_client(\n", + " host=os.environ[\"CLICKHOUSE_HOST\"],\n", + " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", + " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", + " secure=True,\n", + " )\n", + " print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n", + " df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n", + " \n", + " # Save to cache\n", + " df.to_csv(cache_file, index=False, compression='zstd')\n", + " print(f\" Saved to cache: {cache_file}\")\n", + " \n", + " return df\n", + "\n", + "def load_historical_for_day(day):\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " \n", + " df = load_raw_adsb_for_day(day)\n", + " \n", + " df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + " df = df.drop(columns=['aircraft'])\n", + " df = df.sort_values(['icao', 'time'])\n", + " df[COLUMNS] = df[COLUMNS].fillna('')\n", + " df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", + " cols = df_compressed.columns.tolist()\n", + " cols.remove('time')\n", + " cols.insert(0, 'time')\n", + " cols.remove(\"icao\")\n", + " cols.insert(1, \"icao\")\n", + " df_compressed = df_compressed[cols]\n", + " return df_compressed\n", + "\n", + "\n", + "def concat_compressed_dfs(df_base, df_new):\n", + " \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n", + " import pandas as pd\n", + " \n", + " # Combine both dataframes\n", + " df_combined = pd.concat([df_base, df_new], ignore_index=True)\n", + " \n", + " # Sort by ICAO and time\n", + " df_combined = df_combined.sort_values(['icao', 'time'])\n", + " \n", + " # Fill NaN values\n", + " df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n", + " \n", + " # Apply compression logic per ICAO to get the best row\n", + " df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n", + " \n", + " # Sort by time\n", + " df_compressed = df_compressed.sort_values('time')\n", + " \n", + " return df_compressed\n", + "\n", + "\n", + "def get_latest_aircraft_adsb_csv_df():\n", + " \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n", + " from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n", + " \n", + " import pandas as pd\n", + " import re\n", + " \n", + " csv_path = download_latest_aircraft_adsb_csv()\n", + " df = pd.read_csv(csv_path)\n", + " df = df.fillna(\"\")\n", + " \n", + " # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n", + " match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n", + " if not match:\n", + " raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n", + " \n", + " date_str = match.group(1)\n", + " return df, date_str\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e14c8363", + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "df = load_historical_for_day(datetime(2024,1,1))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3874ba4d", + "metadata": {}, + "outputs": [], + "source": [ + "len(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bcae50ad", + "metadata": {}, + "outputs": [], + "source": [ + "df[(df['icao'] == \"008081\")]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "50921c86", + "metadata": {}, + "outputs": [], + "source": [ + "df[df['icao'] == \"a4e1d2\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8194d9aa", + "metadata": {}, + "outputs": [], + "source": [ + "df[df['r'] == \"N4131T\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1e3b7aa2", + "metadata": {}, + "outputs": [], + "source": [ + "df_compressed[df_compressed['icao'].duplicated(keep=False)]\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "40613bc1", + "metadata": {}, + "outputs": [], + "source": [ + "import gzip\n", + "import json\n", + "\n", + "path = \"/Users/jonahgoode/Downloads/test_extract/traces/fb/trace_full_acbbfb.json\"\n", + "\n", + "with gzip.open(path, \"rt\", encoding=\"utf-8\") as f:\n", + " data = json.load(f)\n", + "\n", + "print(type(data))\n", + "# use `data` here\n", + "import json\n", + "print(json.dumps(data, indent=2)[:2000])\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "320109b2", + "metadata": {}, + "outputs": [], + "source": [ + "# First, load the JSON to inspect its structure\n", + "import json\n", + "with open(\"/Users/jonahgoode/Documents/PlaneQuery/Other-Code/readsb-protobuf/webapp/src/db/aircrafts.json\", 'r') as f:\n", + " data = json.load(f)\n", + "\n", + "# Check the structure\n", + "print(type(data))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "590134f4", + "metadata": {}, + "outputs": [], + "source": [ + "data['AC97E3']" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/requirements.txt b/requirements.txt index c30c4f5..387292e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ faa-aircraft-registry==0.1.0 pandas==3.0.0 - +clickhouse-connect==0.10.0 diff --git a/src/adsb/Dockerfile.reducer b/src/adsb/Dockerfile.reducer new file mode 100644 index 0000000..ddc06db --- /dev/null +++ b/src/adsb/Dockerfile.reducer @@ -0,0 +1,10 @@ +FROM --platform=linux/arm64 python:3.12-slim + +WORKDIR /app + +COPY requirements.reducer.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY reducer.py . + +CMD ["python", "-u", "reducer.py"] diff --git a/src/adsb/Dockerfile.worker b/src/adsb/Dockerfile.worker new file mode 100644 index 0000000..e721442 --- /dev/null +++ b/src/adsb/Dockerfile.worker @@ -0,0 +1,11 @@ +FROM --platform=linux/arm64 python:3.12-slim + +WORKDIR /app + +COPY requirements.worker.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY compress_adsb_to_aircraft_data.py . +COPY worker.py . + +CMD ["python", "-u", "worker.py"] diff --git a/src/adsb/adsb_to_aircraft_data_daily.py b/src/adsb/adsb_to_aircraft_data_daily.py new file mode 100644 index 0000000..380f187 --- /dev/null +++ b/src/adsb/adsb_to_aircraft_data_daily.py @@ -0,0 +1,7 @@ +from pathlib import Path +from datetime import datetime, timezone,timedelta +from adsb_to_aircraft_data_historical import load_historical_for_day + + +day = datetime.now(timezone.utc) - timedelta(days=1) +load_historical_for_day(day) \ No newline at end of file diff --git a/src/adsb/adsb_to_aircraft_data_historical.py b/src/adsb/adsb_to_aircraft_data_historical.py new file mode 100644 index 0000000..1ff7acf --- /dev/null +++ b/src/adsb/adsb_to_aircraft_data_historical.py @@ -0,0 +1,87 @@ +""" +Process historical ADS-B data by date range. +Downloads and compresses ADS-B messages for each day in the specified range. +""" +import argparse +from datetime import datetime, timedelta +from pathlib import Path +import pandas as pd +from compress_adsb_to_aircraft_data import load_historical_for_day, COLUMNS + +def deduplicate_by_signature(df): + """For each icao, keep only the earliest row with each unique signature.""" + df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1) + # Group by icao and signature, keep first (earliest) occurrence + df_deduped = df.groupby(['icao', '_signature'], as_index=False).first() + df_deduped = df_deduped.drop(columns=['_signature']) + df_deduped = df_deduped.sort_values('time') + return df_deduped + + +def main(start_date_str: str, end_date_str: str): + """Process historical ADS-B data for the given date range.""" + OUT_ROOT = Path("data/planequery_aircraft") + OUT_ROOT.mkdir(parents=True, exist_ok=True) + + # Parse dates + start_date = datetime.strptime(start_date_str, "%Y-%m-%d") + end_date = datetime.strptime(end_date_str, "%Y-%m-%d") + + # Calculate total number of days + total_days = (end_date - start_date).days + print(f"Processing {total_days} days from {start_date_str} to {end_date_str}") + + # Initialize accumulated dataframe + df_accumulated = pd.DataFrame() + + # Cache directory path + cache_dir = Path("data/adsb") + + # Iterate through each day + current_date = start_date + while current_date < end_date: + print(f"Processing {current_date.strftime('%Y-%m-%d')}...") + + df_compressed = load_historical_for_day(current_date) + + # Concatenate to accumulated dataframe + if df_accumulated.empty: + df_accumulated = df_compressed + else: + df_accumulated = pd.concat([df_accumulated, df_compressed], ignore_index=True) + + print(f" Added {len(df_compressed)} records (total: {len(df_accumulated)})") + + # Save intermediate output after each day + current_date_str = current_date.strftime('%Y-%m-%d') + output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{current_date_str}.csv.gz" + df_deduped = deduplicate_by_signature(df_accumulated.copy()) + df_deduped.to_csv(output_file, index=False, compression='gzip') + print(f" Saved to {output_file.name}") + + # Delete cache after processing if processing more than 10 days + if total_days > 5 and cache_dir.exists(): + import shutil + shutil.rmtree(cache_dir) + print(f" Deleted cache directory to save space") + + # Move to next day + current_date += timedelta(days=1) + + # Save the final accumulated data + output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{end_date_str}.csv.gz" + df_accumulated = deduplicate_by_signature(df_accumulated) + df_accumulated.to_csv(output_file, index=False, compression='gzip') + + print(f"Completed processing from {start_date_str} to {end_date_str}") + print(f"Saved {len(df_accumulated)} total records to {output_file}") + + +if __name__ == '__main__': + # Parse command line arguments + parser = argparse.ArgumentParser(description="Process historical ADS-B data from ClickHouse") + parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)") + parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)") + args = parser.parse_args() + + main(args.start_date, args.end_date) diff --git a/src/adsb/compress_adsb_to_aircraft_data.py b/src/adsb/compress_adsb_to_aircraft_data.py new file mode 100644 index 0000000..d191c47 --- /dev/null +++ b/src/adsb/compress_adsb_to_aircraft_data.py @@ -0,0 +1,170 @@ +# SOME KIND OF MAP REDUCE SYSTEM +import os + +COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't'] +def compress_df(df): + icao = df.name + df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1) + + # Compute signature counts before grouping (avoid copy) + signature_counts = df["_signature"].value_counts() + + df = df.groupby("_signature", as_index=False).first() # check if it works with both last and first. + # For each row, create a dict of non-empty column values. This is using sets and subsets... + def get_non_empty_dict(row): + return {col: row[col] for col in COLUMNS if row[col] != ''} + + df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1) + df['_non_empty_count'] = df['_non_empty_dict'].apply(len) + + # Check if row i's non-empty values are a subset of row j's non-empty values + def is_subset_of_any(idx): + row_dict = df.loc[idx, '_non_empty_dict'] + row_count = df.loc[idx, '_non_empty_count'] + + for other_idx in df.index: + if idx == other_idx: + continue + other_dict = df.loc[other_idx, '_non_empty_dict'] + other_count = df.loc[other_idx, '_non_empty_count'] + + # Check if all non-empty values in current row match those in other row + if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()): + # If they match and other has more defined columns, current row is redundant + if other_count > row_count: + return True + return False + + # Keep rows that are not subsets of any other row + keep_mask = ~df.index.to_series().apply(is_subset_of_any) + df = df[keep_mask] + + if len(df) > 1: + # Use pre-computed signature counts instead of original_df + remaining_sigs = df['_signature'] + sig_counts = signature_counts[remaining_sigs] + max_signature = sig_counts.idxmax() + df = df[df['_signature'] == max_signature] + + df['icao'] = icao + df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature']) + # Ensure empty strings are preserved, not NaN + df[COLUMNS] = df[COLUMNS].fillna('') + return df + +# names of releases something like +# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz + +# Let's build historical first. + +def load_raw_adsb_for_day(day): + """Load raw ADS-B data for a day from parquet file.""" + from datetime import timedelta + from pathlib import Path + import pandas as pd + + start_time = day.replace(hour=0, minute=0, second=0, microsecond=0) + + # Check for parquet file first + version_date = f"v{start_time.strftime('%Y.%m.%d')}" + parquet_file = Path(f"data/output/parquet_output/{version_date}.parquet") + + if not parquet_file.exists(): + # Try to generate parquet file by calling the download function + print(f" Parquet file not found: {parquet_file}") + print(f" Attempting to download and generate parquet for {start_time.strftime('%Y-%m-%d')}...") + + from download_adsb_data_to_parquet import create_parquet_for_day + result_path = create_parquet_for_day(start_time, keep_folders=False) + + if result_path: + print(f" Successfully generated parquet file: {result_path}") + else: + raise Exception("Failed to generate parquet file") + + if parquet_file.exists(): + print(f" Loading from parquet: {parquet_file}") + df = pd.read_parquet( + parquet_file, + columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category'] + ) + + # Convert to timezone-naive datetime + df['time'] = df['time'].dt.tz_localize(None) + return df + else: + # Return empty DataFrame if parquet file doesn't exist + print(f" No data available for {start_time.strftime('%Y-%m-%d')}") + import pandas as pd + return pd.DataFrame(columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category']) + +def load_historical_for_day(day): + from pathlib import Path + import pandas as pd + df = load_raw_adsb_for_day(day) + if df.empty: + return df + print(f"Loaded {len(df)} raw records for {day.strftime('%Y-%m-%d')}") + df = df.sort_values(['icao', 'time']) + print("done sort") + df[COLUMNS] = df[COLUMNS].fillna('') + + # First pass: quick deduplication of exact duplicates + df = df.drop_duplicates(subset=['icao'] + COLUMNS, keep='first') + print(f"After quick dedup: {len(df)} records") + + # Second pass: sophisticated compression per ICAO + print("Compressing per ICAO...") + df_compressed = df.groupby('icao', group_keys=False).apply(compress_df) + print(f"After compress: {len(df_compressed)} records") + + cols = df_compressed.columns.tolist() + cols.remove('time') + cols.insert(0, 'time') + cols.remove("icao") + cols.insert(1, "icao") + df_compressed = df_compressed[cols] + return df_compressed + + +def concat_compressed_dfs(df_base, df_new): + """Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.""" + import pandas as pd + + # Combine both dataframes + df_combined = pd.concat([df_base, df_new], ignore_index=True) + + # Sort by ICAO and time + df_combined = df_combined.sort_values(['icao', 'time']) + + # Fill NaN values + df_combined[COLUMNS] = df_combined[COLUMNS].fillna('') + + # Apply compression logic per ICAO to get the best row + df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df) + + # Sort by time + df_compressed = df_compressed.sort_values('time') + + return df_compressed + + +def get_latest_aircraft_adsb_csv_df(): + """Download and load the latest ADS-B CSV from GitHub releases.""" + from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv + + import pandas as pd + import re + + csv_path = download_latest_aircraft_adsb_csv() + df = pd.read_csv(csv_path) + df = df.fillna("") + + # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + if not match: + raise ValueError(f"Could not extract date from filename: {csv_path.name}") + + date_str = match.group(1) + return df, date_str + diff --git a/src/adsb/download_adsb_data_to_parquet.py b/src/adsb/download_adsb_data_to_parquet.py new file mode 100644 index 0000000..deb1c82 --- /dev/null +++ b/src/adsb/download_adsb_data_to_parquet.py @@ -0,0 +1,684 @@ +""" +Downloads adsb.lol data and writes to Parquet files. + +Usage: + python -m src.process_historical_adsb_data.download_to_parquet 2025-01-01 2025-01-02 + +This will download trace data for the specified date range and output Parquet files. + +This file is self-contained and does not import from other project modules. +""" +import gc +import glob +import gzip +import sys +import logging +import time +import re +import signal +import concurrent.futures +import subprocess +import os +import argparse +import datetime as dt +from datetime import datetime, timedelta, timezone + +import requests +import orjson +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq + + +# ============================================================================ +# Configuration +# ============================================================================ + +OUTPUT_DIR = "./data/output" +os.makedirs(OUTPUT_DIR, exist_ok=True) + +PARQUET_DIR = os.path.join(OUTPUT_DIR, "parquet_output") +os.makedirs(PARQUET_DIR, exist_ok=True) + +TOKEN = os.environ.get('GITHUB_TOKEN') # Optional: for higher GitHub API rate limits +HEADERS = {"Authorization": f"token {TOKEN}"} if TOKEN else {} + + +# ============================================================================ +# GitHub Release Fetching and Downloading +# ============================================================================ + +class DownloadTimeoutException(Exception): + pass + + +def timeout_handler(signum, frame): + raise DownloadTimeoutException("Download timed out after 40 seconds") + + +def fetch_releases(version_date: str) -> list: + """Fetch GitHub releases for a given version date from adsblol.""" + year = version_date.split('.')[0][1:] + if version_date == "v2024.12.31": + year = "2025" + BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases" + PATTERN = f"{version_date}-planes-readsb-prod-0" + releases = [] + page = 1 + + while True: + max_retries = 10 + retry_delay = 60 + + for attempt in range(1, max_retries + 1): + try: + response = requests.get(f"{BASE_URL}?page={page}", headers=HEADERS) + if response.status_code == 200: + break + else: + print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status_code} {response.reason}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry...") + time.sleep(retry_delay) + else: + print(f"Giving up after {max_retries} attempts") + return releases + except Exception as e: + print(f"Request exception (attempt {attempt}/{max_retries}): {e}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry...") + time.sleep(retry_delay) + else: + print(f"Giving up after {max_retries} attempts") + return releases + + data = response.json() + if not data: + break + for release in data: + if re.match(PATTERN, release["tag_name"]): + releases.append(release) + page += 1 + return releases + + +def download_asset(asset_url: str, file_path: str) -> bool: + """Download a single release asset.""" + os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True) + + if os.path.exists(file_path): + print(f"[SKIP] {file_path} already downloaded.") + return True + + print(f"Downloading {asset_url}...") + try: + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(40) # 40-second timeout + + response = requests.get(asset_url, headers=HEADERS, stream=True) + signal.alarm(0) + + if response.status_code == 200: + with open(file_path, "wb") as file: + for chunk in response.iter_content(chunk_size=8192): + file.write(chunk) + print(f"Saved {file_path}") + return True + else: + print(f"Failed to download {asset_url}: {response.status_code} {response.reason}") + return False + except DownloadTimeoutException as e: + print(f"Download aborted for {asset_url}: {e}") + return False + except Exception as e: + print(f"An error occurred while downloading {asset_url}: {e}") + return False + + +def extract_split_archive(file_paths: list, extract_dir: str) -> bool: + """ + Extracts a split archive by concatenating the parts using 'cat' + and then extracting with 'tar' in one pipeline. + """ + if os.path.isdir(extract_dir): + print(f"[SKIP] Extraction directory already exists: {extract_dir}") + return True + + def sort_key(path: str): + base = os.path.basename(path) + parts = base.rsplit('.', maxsplit=1) + if len(parts) == 2: + suffix = parts[1] + if suffix.isdigit(): + return (0, int(suffix)) + if re.fullmatch(r'[a-zA-Z]+', suffix): + return (1, suffix) + return (2, base) + + file_paths = sorted(file_paths, key=sort_key) + os.makedirs(extract_dir, exist_ok=True) + + try: + cat_proc = subprocess.Popen( + ["cat"] + file_paths, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL + ) + tar_cmd = ["tar", "xf", "-", "-C", extract_dir, "--strip-components=1"] + subprocess.run( + tar_cmd, + stdin=cat_proc.stdout, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True + ) + cat_proc.stdout.close() + cat_proc.wait() + + print(f"Successfully extracted archive to {extract_dir}") + return True + except subprocess.CalledProcessError as e: + print(f"Failed to extract split archive: {e}") + return False + + +# ============================================================================ +# Trace File Processing (with alt_baro/on_ground handling) +# ============================================================================ + +ALLOWED_DATA_SOURCE = {'', 'adsb.lol', 'adsbexchange', 'airplanes.live'} + + +def process_file(filepath: str) -> list: + """ + Process a single trace file and return list of rows. + Handles alt_baro/on_ground: if altitude == "ground", on_ground=True and alt_baro=None. + """ + insert_rows = [] + with gzip.open(filepath, 'rb') as f: + data = orjson.loads(f.read()) + icao = data.get('icao', None) + if icao is None: + print(f"Skipping file {filepath} as it does not contain 'icao'") + return [] + + r = data.get('r', "") + t = data.get('t', "") + dbFlags = data.get('dbFlags', 0) + noRegData = data.get('noRegData', False) + ownOp = data.get('ownOp', "") + year = int(data.get('year', 0)) + timestamp = data.get('timestamp', None) + desc = data.get('desc', "") + trace_data = data.get('trace', None) + + if timestamp is None or trace_data is None: + print(f"Skipping file {filepath} as it does not contain 'timestamp' or 'trace'") + return [] + + for row in trace_data: + time_offset = row[0] + lat = row[1] + lon = row[2] + altitude = row[3] + + # Handle alt_baro/on_ground + alt_baro = None + on_ground = False + if type(altitude) is str and altitude == "ground": + on_ground = True + elif type(altitude) is int: + alt_baro = altitude + elif type(altitude) is float: + alt_baro = int(altitude) + + ground_speed = row[4] + track_degrees = row[5] + flags = row[6] + vertical_rate = row[7] + aircraft = row[8] + source = row[9] + data_source_value = "adsb.lol" if "adsb.lol" in ALLOWED_DATA_SOURCE else "" + geometric_altitude = row[10] + geometric_vertical_rate = row[11] + indicated_airspeed = row[12] + roll_angle = row[13] + + time_val = timestamp + time_offset + dt64 = dt.datetime.fromtimestamp(time_val, tz=dt.timezone.utc) + + # Prepare base fields + inserted_row = [ + dt64, icao, r, t, dbFlags, noRegData, ownOp, year, desc, + lat, lon, alt_baro, on_ground, ground_speed, track_degrees, + flags, vertical_rate + ] + next_part = [ + source, geometric_altitude, geometric_vertical_rate, + indicated_airspeed, roll_angle + ] + inserted_row.extend(next_part) + + if aircraft is None or type(aircraft) is not dict: + aircraft = dict() + + aircraft_data = { + 'alert': aircraft.get('alert', None), + 'alt_geom': aircraft.get('alt_geom', None), + 'gva': aircraft.get('gva', None), + 'nac_p': aircraft.get('nac_p', None), + 'nac_v': aircraft.get('nac_v', None), + 'nic': aircraft.get('nic', None), + 'nic_baro': aircraft.get('nic_baro', None), + 'rc': aircraft.get('rc', None), + 'sda': aircraft.get('sda', None), + 'sil': aircraft.get('sil', None), + 'sil_type': aircraft.get('sil_type', ""), + 'spi': aircraft.get('spi', None), + 'track': aircraft.get('track', None), + 'type': aircraft.get('type', ""), + 'version': aircraft.get('version', None), + 'category': aircraft.get('category', ''), + 'emergency': aircraft.get('emergency', ''), + 'flight': aircraft.get('flight', ""), + 'squawk': aircraft.get('squawk', ""), + 'baro_rate': aircraft.get('baro_rate', None), + 'nav_altitude_fms': aircraft.get('nav_altitude_fms', None), + 'nav_altitude_mcp': aircraft.get('nav_altitude_mcp', None), + 'nav_modes': aircraft.get('nav_modes', []), + 'nav_qnh': aircraft.get('nav_qnh', None), + 'geom_rate': aircraft.get('geom_rate', None), + 'ias': aircraft.get('ias', None), + 'mach': aircraft.get('mach', None), + 'mag_heading': aircraft.get('mag_heading', None), + 'oat': aircraft.get('oat', None), + 'roll': aircraft.get('roll', None), + 'tas': aircraft.get('tas', None), + 'tat': aircraft.get('tat', None), + 'true_heading': aircraft.get('true_heading', None), + 'wd': aircraft.get('wd', None), + 'ws': aircraft.get('ws', None), + 'track_rate': aircraft.get('track_rate', None), + 'nav_heading': aircraft.get('nav_heading', None) + } + + aircraft_list = list(aircraft_data.values()) + inserted_row.extend(aircraft_list) + inserted_row.append(data_source_value) + + insert_rows.append(inserted_row) + + if insert_rows: + print(f"Got {len(insert_rows)} rows from {filepath}") + return insert_rows + else: + return [] + + +# ============================================================================ +# Parquet Writing +# ============================================================================ + +# Column names matching the order of data in inserted_row +COLUMNS = [ + "time", "icao", + "r", "t", "dbFlags", "noRegData", "ownOp", "year", "desc", + "lat", "lon", "alt_baro", "on_ground", "ground_speed", "track_degrees", + "flags", "vertical_rate", "source", "geometric_altitude", + "geometric_vertical_rate", "indicated_airspeed", "roll_angle", + "aircraft_alert", "aircraft_alt_geom", "aircraft_gva", "aircraft_nac_p", + "aircraft_nac_v", "aircraft_nic", "aircraft_nic_baro", "aircraft_rc", + "aircraft_sda", "aircraft_sil", "aircraft_sil_type", "aircraft_spi", + "aircraft_track", "aircraft_type", "aircraft_version", "aircraft_category", + "aircraft_emergency", "aircraft_flight", "aircraft_squawk", + "aircraft_baro_rate", "aircraft_nav_altitude_fms", "aircraft_nav_altitude_mcp", + "aircraft_nav_modes", "aircraft_nav_qnh", "aircraft_geom_rate", + "aircraft_ias", "aircraft_mach", "aircraft_mag_heading", "aircraft_oat", + "aircraft_roll", "aircraft_tas", "aircraft_tat", "aircraft_true_heading", + "aircraft_wd", "aircraft_ws", "aircraft_track_rate", "aircraft_nav_heading", + "data_source", +] + + +OS_CPU_COUNT = os.cpu_count() or 1 +MAX_WORKERS = OS_CPU_COUNT if OS_CPU_COUNT > 4 else 1 +CHUNK_SIZE = MAX_WORKERS * 1000 +BATCH_SIZE = (os.cpu_count() or 1) * 100000 + +# PyArrow schema for efficient Parquet writing +PARQUET_SCHEMA = pa.schema([ + ("time", pa.timestamp("ms", tz="UTC")), + ("icao", pa.string()), + ("r", pa.string()), + ("t", pa.string()), + ("dbFlags", pa.int32()), + ("noRegData", pa.bool_()), + ("ownOp", pa.string()), + ("year", pa.uint16()), + ("desc", pa.string()), + ("lat", pa.float64()), + ("lon", pa.float64()), + ("alt_baro", pa.int32()), + ("on_ground", pa.bool_()), + ("ground_speed", pa.float32()), + ("track_degrees", pa.float32()), + ("flags", pa.uint32()), + ("vertical_rate", pa.int32()), + ("source", pa.string()), + ("geometric_altitude", pa.int32()), + ("geometric_vertical_rate", pa.int32()), + ("indicated_airspeed", pa.int32()), + ("roll_angle", pa.float32()), + ("aircraft_alert", pa.int64()), + ("aircraft_alt_geom", pa.int64()), + ("aircraft_gva", pa.int64()), + ("aircraft_nac_p", pa.int64()), + ("aircraft_nac_v", pa.int64()), + ("aircraft_nic", pa.int64()), + ("aircraft_nic_baro", pa.int64()), + ("aircraft_rc", pa.int64()), + ("aircraft_sda", pa.int64()), + ("aircraft_sil", pa.int64()), + ("aircraft_sil_type", pa.string()), + ("aircraft_spi", pa.int64()), + ("aircraft_track", pa.float64()), + ("aircraft_type", pa.string()), + ("aircraft_version", pa.int64()), + ("aircraft_category", pa.string()), + ("aircraft_emergency", pa.string()), + ("aircraft_flight", pa.string()), + ("aircraft_squawk", pa.string()), + ("aircraft_baro_rate", pa.int64()), + ("aircraft_nav_altitude_fms", pa.int64()), + ("aircraft_nav_altitude_mcp", pa.int64()), + ("aircraft_nav_modes", pa.list_(pa.string())), + ("aircraft_nav_qnh", pa.float64()), + ("aircraft_geom_rate", pa.int64()), + ("aircraft_ias", pa.int64()), + ("aircraft_mach", pa.float64()), + ("aircraft_mag_heading", pa.float64()), + ("aircraft_oat", pa.int64()), + ("aircraft_roll", pa.float64()), + ("aircraft_tas", pa.int64()), + ("aircraft_tat", pa.int64()), + ("aircraft_true_heading", pa.float64()), + ("aircraft_wd", pa.int64()), + ("aircraft_ws", pa.int64()), + ("aircraft_track_rate", pa.float64()), + ("aircraft_nav_heading", pa.float64()), + ("data_source", pa.string()), +]) + + +def collect_trace_files_with_find(root_dir): + """Find all trace_full_*.json files in the extracted directory.""" + trace_dict: dict[str, str] = {} + cmd = ['find', root_dir, '-type', 'f', '-name', 'trace_full_*.json'] + + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + if result.returncode != 0: + print(f"Error executing find: {result.stderr}") + return trace_dict + + for file_path in result.stdout.strip().split('\n'): + if file_path: + filename = os.path.basename(file_path) + if filename.startswith("trace_full_") and filename.endswith(".json"): + icao = filename[len("trace_full_"):-len(".json")] + trace_dict[icao] = file_path + + return trace_dict + + +def generate_version_dates(start_date: str, end_date: str) -> list: + """Generate a list of dates from start_date to end_date inclusive.""" + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + delta = end - start + return [start + timedelta(days=i) for i in range(delta.days + 1)] + + +def safe_process(fp): + """Safely process a file, returning empty list on error.""" + try: + return process_file(fp) + except Exception as e: + logging.error(f"Error processing {fp}: {e}") + return [] + + +def rows_to_dataframe(rows: list) -> pd.DataFrame: + """Convert list of rows to a pandas DataFrame.""" + df = pd.DataFrame(rows, columns=COLUMNS) + return df + + +def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int): + """Write a batch of rows to a Parquet file.""" + if not rows: + return + + df = rows_to_dataframe(rows) + + # Ensure datetime column is timezone-aware + if not df['time'].dt.tz: + df['time'] = df['time'].dt.tz_localize('UTC') + + parquet_path = os.path.join(PARQUET_DIR, f"{version_date}_batch_{batch_idx:04d}.parquet") + + # Convert to PyArrow table and write + table = pa.Table.from_pandas(df, schema=PARQUET_SCHEMA, preserve_index=False) + pq.write_table(table, parquet_path, compression='snappy') + + print(f"Written parquet batch {batch_idx} ({len(rows)} rows) to {parquet_path}") + + +def merge_parquet_files(version_date: str, delete_batches: bool = True): + """Merge all batch parquet files for a version_date into a single file.""" + pattern = os.path.join(PARQUET_DIR, f"{version_date}_batch_*.parquet") + batch_files = sorted(glob.glob(pattern)) + + if not batch_files: + print(f"No batch files found for {version_date}") + return None + + print(f"Merging {len(batch_files)} batch files for {version_date}...") + + # Read all batch files + tables = [] + for f in batch_files: + tables.append(pq.read_table(f)) + + # Concatenate all tables + merged_table = pa.concat_tables(tables) + + # Write merged file + merged_path = os.path.join(PARQUET_DIR, f"{version_date}.parquet") + pq.write_table(merged_table, merged_path, compression='snappy') + + print(f"Merged parquet file written to {merged_path} ({merged_table.num_rows} total rows)") + + # Optionally delete batch files + if delete_batches: + for f in batch_files: + os.remove(f) + print(f"Deleted {len(batch_files)} batch files") + + return merged_path + + +def process_version_date(version_date: str, keep_folders: bool = False): + """Download, extract, and process trace files for a single version date.""" + print(f"\nProcessing version_date: {version_date}") + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + + def collect_trace_files_for_version_date(vd): + releases = fetch_releases(vd) + if len(releases) == 0: + print(f"No releases found for {vd}.") + return None + + downloaded_files = [] + for release in releases: + tag_name = release["tag_name"] + print(f"Processing release: {tag_name}") + + # Only download prod-0 if available, else prod-0tmp + assets = release.get("assets", []) + normal_assets = [ + a for a in assets + if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"] + ] + tmp_assets = [ + a for a in assets + if "planes-readsb-prod-0tmp" in a["name"] + ] + use_assets = normal_assets if normal_assets else tmp_assets + + for asset in use_assets: + asset_name = asset["name"] + asset_url = asset["browser_download_url"] + file_path = os.path.join(OUTPUT_DIR, asset_name) + result = download_asset(asset_url, file_path) + if result: + downloaded_files.append(file_path) + + extract_split_archive(downloaded_files, extract_dir) + return collect_trace_files_with_find(extract_dir) + + # Check if files already exist + pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*") + matches = [p for p in glob.glob(pattern) if os.path.isfile(p)] + + if matches: + print(f"Found existing files for {version_date}:") + # Prefer non-tmp slices when reusing existing files + normal_matches = [ + p for p in matches + if "-planes-readsb-prod-0." in os.path.basename(p) + and "tmp" not in os.path.basename(p) + ] + downloaded_files = normal_matches if normal_matches else matches + + extract_split_archive(downloaded_files, extract_dir) + trace_files = collect_trace_files_with_find(extract_dir) + else: + trace_files = collect_trace_files_for_version_date(version_date) + + if trace_files is None or len(trace_files) == 0: + print(f"No trace files found for version_date: {version_date}") + return 0 + + file_list = list(trace_files.values()) + + start_time = time.perf_counter() + total_num_rows = 0 + batch_rows = [] + batch_idx = 0 + + # Process files in chunks + for offset in range(0, len(file_list), CHUNK_SIZE): + chunk = file_list[offset:offset + CHUNK_SIZE] + with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as process_executor: + for rows in process_executor.map(safe_process, chunk): + if not rows: + continue + batch_rows.extend(rows) + + if len(batch_rows) >= BATCH_SIZE: + total_num_rows += len(batch_rows) + write_batch_to_parquet(batch_rows, version_date, batch_idx) + batch_idx += 1 + batch_rows = [] + + elapsed = time.perf_counter() - start_time + speed = total_num_rows / elapsed if elapsed > 0 else 0 + print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)") + + gc.collect() + + # Final batch + if batch_rows: + total_num_rows += len(batch_rows) + write_batch_to_parquet(batch_rows, version_date, batch_idx) + elapsed = time.perf_counter() - start_time + speed = total_num_rows / elapsed if elapsed > 0 else 0 + print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)") + + print(f"Total rows processed for version_date {version_date}: {total_num_rows}") + + # Merge batch files into a single parquet file + merge_parquet_files(version_date, delete_batches=True) + + # Clean up extracted directory if not keeping + if not keep_folders and os.path.isdir(extract_dir): + import shutil + shutil.rmtree(extract_dir) + print(f"Cleaned up extraction directory: {extract_dir}") + + return total_num_rows + + +def create_parquet_for_day(day, keep_folders: bool = False): + """Create parquet file for a single day. + + Args: + day: datetime object or string in 'YYYY-MM-DD' format + keep_folders: Whether to keep extracted folders after processing + + Returns: + Path to the created parquet file, or None if failed + """ + from pathlib import Path + + if isinstance(day, str): + day = datetime.strptime(day, "%Y-%m-%d") + + version_date = f"v{day.strftime('%Y.%m.%d')}" + + # Check if parquet already exists + parquet_path = Path(PARQUET_DIR) / f"{version_date}.parquet" + if parquet_path.exists(): + print(f"Parquet file already exists: {parquet_path}") + return parquet_path + + print(f"Creating parquet for {version_date}...") + rows_processed = process_version_date(version_date, keep_folders) + + if rows_processed > 0 and parquet_path.exists(): + return parquet_path + else: + return None + + +def main(start_date: str, end_date: str, keep_folders: bool = False): + """Main function to download and convert adsb.lol data to Parquet.""" + version_dates = [f"v{date.strftime('%Y.%m.%d')}" for date in generate_version_dates(start_date, end_date)] + print(f"Processing dates: {version_dates}") + + total_rows_all = 0 + for version_date in version_dates: + rows_processed = process_version_date(version_date, keep_folders) + total_rows_all += rows_processed + + print(f"\n=== Summary ===") + print(f"Total dates processed: {len(version_dates)}") + print(f"Total rows written to Parquet: {total_rows_all}") + print(f"Parquet files location: {PARQUET_DIR}") + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, stream=sys.stdout, force=True) + + parser = argparse.ArgumentParser( + description="Download adsb.lol data and write to Parquet files" + ) + parser.add_argument("start_date", type=str, help="Start date in YYYY-MM-DD format") + parser.add_argument("end_date", type=str, help="End date in YYYY-MM-DD format") + parser.add_argument("--keep-folders", action="store_true", + help="Keep extracted folders after processing") + + args = parser.parse_args() + + main(args.start_date, args.end_date, args.keep_folders) diff --git a/src/adsb/reducer.py b/src/adsb/reducer.py new file mode 100644 index 0000000..1bdd85c --- /dev/null +++ b/src/adsb/reducer.py @@ -0,0 +1,97 @@ +""" +Reduce step: downloads all chunk CSVs from S3, combines them, +deduplicates across the full dataset, and uploads the final result. + +Environment variables: + S3_BUCKET — bucket with intermediate results + RUN_ID — run identifier matching the map workers + GLOBAL_START_DATE — overall start date for output filename + GLOBAL_END_DATE — overall end date for output filename +""" +import os +from pathlib import Path + +import boto3 +import pandas as pd + + +COLUMNS = ["dbFlags", "ownOp", "year", "desc", "aircraft_category", "r", "t"] + + +def deduplicate_by_signature(df: pd.DataFrame) -> pd.DataFrame: + """For each icao, keep only the earliest row with each unique signature.""" + df["_signature"] = df[COLUMNS].astype(str).agg("|".join, axis=1) + df_deduped = df.groupby(["icao", "_signature"], as_index=False).first() + df_deduped = df_deduped.drop(columns=["_signature"]) + df_deduped = df_deduped.sort_values("time") + return df_deduped + + +def main(): + s3_bucket = os.environ["S3_BUCKET"] + run_id = os.environ.get("RUN_ID", "default") + global_start = os.environ["GLOBAL_START_DATE"] + global_end = os.environ["GLOBAL_END_DATE"] + + s3 = boto3.client("s3") + prefix = f"intermediate/{run_id}/" + + # List all chunk files for this run + paginator = s3.get_paginator("list_objects_v2") + chunk_keys = [] + for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix): + for obj in page.get("Contents", []): + if obj["Key"].endswith(".csv.gz"): + chunk_keys.append(obj["Key"]) + + chunk_keys.sort() + print(f"Found {len(chunk_keys)} chunks to combine") + + if not chunk_keys: + print("No chunks found — nothing to reduce.") + return + + # Download and concatenate all chunks + download_dir = Path("/tmp/chunks") + download_dir.mkdir(parents=True, exist_ok=True) + + df_accumulated = pd.DataFrame() + + for key in chunk_keys: + local_path = download_dir / Path(key).name + print(f"Downloading {key}...") + s3.download_file(s3_bucket, key, str(local_path)) + + df_chunk = pd.read_csv(local_path, compression="gzip", keep_default_na=False) + print(f" Loaded {len(df_chunk)} rows from {local_path.name}") + + if df_accumulated.empty: + df_accumulated = df_chunk + else: + df_accumulated = pd.concat( + [df_accumulated, df_chunk], ignore_index=True + ) + + # Free disk space after loading + local_path.unlink() + + print(f"Combined: {len(df_accumulated)} rows before dedup") + + # Final global deduplication + df_accumulated = deduplicate_by_signature(df_accumulated) + print(f"After dedup: {len(df_accumulated)} rows") + + # Write and upload final result + output_name = f"planequery_aircraft_adsb_{global_start}_{global_end}.csv.gz" + local_output = Path(f"/tmp/{output_name}") + df_accumulated.to_csv(local_output, index=False, compression="gzip") + + final_key = f"final/{output_name}" + print(f"Uploading to s3://{s3_bucket}/{final_key}") + s3.upload_file(str(local_output), s3_bucket, final_key) + + print(f"Final output: {len(df_accumulated)} records -> {final_key}") + + +if __name__ == "__main__": + main() diff --git a/src/adsb/requirements.reducer.txt b/src/adsb/requirements.reducer.txt new file mode 100644 index 0000000..8aafff3 --- /dev/null +++ b/src/adsb/requirements.reducer.txt @@ -0,0 +1,2 @@ +pandas>=2.0 +boto3>=1.34 diff --git a/src/adsb/requirements.worker.txt b/src/adsb/requirements.worker.txt new file mode 100644 index 0000000..9566c25 --- /dev/null +++ b/src/adsb/requirements.worker.txt @@ -0,0 +1,4 @@ +pandas>=2.0 +clickhouse-connect>=0.7 +boto3>=1.34 +zstandard>=0.22 diff --git a/src/adsb/worker.py b/src/adsb/worker.py new file mode 100644 index 0000000..ad2b09b --- /dev/null +++ b/src/adsb/worker.py @@ -0,0 +1,94 @@ +""" +Map worker: processes a date range chunk, uploads result to S3. + +Environment variables: + START_DATE — inclusive, YYYY-MM-DD + END_DATE — exclusive, YYYY-MM-DD + S3_BUCKET — bucket for intermediate results + RUN_ID — unique run identifier for namespacing S3 keys +""" +import os +import sys +from datetime import datetime, timedelta +from pathlib import Path + +import boto3 +import pandas as pd + +from compress_adsb_to_aircraft_data import load_historical_for_day, COLUMNS + + +def deduplicate_by_signature(df: pd.DataFrame) -> pd.DataFrame: + """For each icao, keep only the earliest row with each unique signature.""" + df["_signature"] = df[COLUMNS].astype(str).agg("|".join, axis=1) + df_deduped = df.groupby(["icao", "_signature"], as_index=False).first() + df_deduped = df_deduped.drop(columns=["_signature"]) + df_deduped = df_deduped.sort_values("time") + return df_deduped + + +def main(): + start_date_str = os.environ["START_DATE"] + end_date_str = os.environ["END_DATE"] + s3_bucket = os.environ["S3_BUCKET"] + run_id = os.environ.get("RUN_ID", "default") + + start_date = datetime.strptime(start_date_str, "%Y-%m-%d") + end_date = datetime.strptime(end_date_str, "%Y-%m-%d") + + total_days = (end_date - start_date).days + print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})") + + df_accumulated = pd.DataFrame() + current_date = start_date + + while current_date < end_date: + day_str = current_date.strftime("%Y-%m-%d") + print(f" Loading {day_str}...") + + try: + df_compressed = load_historical_for_day(current_date) + except Exception as e: + print(f" WARNING: Failed to load {day_str}: {e}") + current_date += timedelta(days=1) + continue + + if df_accumulated.empty: + df_accumulated = df_compressed + else: + df_accumulated = pd.concat( + [df_accumulated, df_compressed], ignore_index=True + ) + + print(f" +{len(df_compressed)} rows (total: {len(df_accumulated)})") + + # Delete local cache after each day to save disk in container + cache_dir = Path("data/adsb") + if cache_dir.exists(): + import shutil + shutil.rmtree(cache_dir) + + current_date += timedelta(days=1) + + if df_accumulated.empty: + print("No data collected — exiting.") + return + + # Deduplicate within this chunk + df_accumulated = deduplicate_by_signature(df_accumulated) + print(f"After dedup: {len(df_accumulated)} rows") + + # Write to local file then upload to S3 + local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz") + df_accumulated.to_csv(local_path, index=False, compression="gzip") + + s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz" + print(f"Uploading to s3://{s3_bucket}/{s3_key}") + + s3 = boto3.client("s3") + s3.upload_file(str(local_path), s3_bucket, s3_key) + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/src/concat_csvs.py b/src/concat_csvs.py deleted file mode 100644 index ce6780e..0000000 --- a/src/concat_csvs.py +++ /dev/null @@ -1,89 +0,0 @@ -from pathlib import Path -import pandas as pd -import re -from derive_from_faa_master_txt import concat_faa_historical_df - -def concatenate_aircraft_csvs( - input_dir: Path = Path("data/concat"), - output_dir: Path = Path("data/planequery_aircraft"), - filename_pattern: str = r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv" -): - """ - Read all CSVs matching the pattern from input_dir in order, - concatenate them using concat_faa_historical_df, and output a single CSV. - - Args: - input_dir: Directory containing the CSV files to concatenate - output_dir: Directory where the output CSV will be saved - filename_pattern: Regex pattern to match CSV filenames - """ - input_dir = Path(input_dir) - output_dir = Path(output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - - # Find all matching CSV files - pattern = re.compile(filename_pattern) - csv_files = [] - - for csv_path in sorted(input_dir.glob("*.csv")): - match = pattern.search(csv_path.name) - if match: - start_date = match.group(1) - end_date = match.group(2) - csv_files.append((start_date, end_date, csv_path)) - - # Sort by start date, then end date - csv_files.sort(key=lambda x: (x[0], x[1])) - - if not csv_files: - raise FileNotFoundError(f"No CSV files matching pattern found in {input_dir}") - - print(f"Found {len(csv_files)} CSV files to concatenate") - - # Read first CSV as base - first_start_date, first_end_date, first_path = csv_files[0] - print(f"Reading base file: {first_path.name}") - df_base = pd.read_csv( - first_path, - dtype={ - 'transponder_code': str, - 'unique_regulatory_id': str, - 'registrant_county': str - } - ) - - # Concatenate remaining CSVs - for start_date, end_date, csv_path in csv_files[1:]: - print(f"Concatenating: {csv_path.name}") - df_new = pd.read_csv( - csv_path, - dtype={ - 'transponder_code': str, - 'unique_regulatory_id': str, - 'registrant_county': str - } - ) - df_base = concat_faa_historical_df(df_base, df_new) - - # Verify monotonic increasing download_date - assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" - - # Output filename uses first start date and last end date - last_start_date, last_end_date, _ = csv_files[-1] - output_filename = f"planequery_aircraft_{first_start_date}_{last_end_date}.csv" - output_path = output_dir / output_filename - - print(f"Writing output to: {output_path}") - df_base.to_csv(output_path, index=False) - print(f"Successfully concatenated {len(csv_files)} files into {output_filename}") - print(f"Total rows: {len(df_base)}") - - return output_path - - -if __name__ == "__main__": - # Example usage - modify these paths as needed - concatenate_aircraft_csvs( - input_dir=Path("data/concat"), - output_dir=Path("data/planequery_aircraft") - ) \ No newline at end of file diff --git a/src/create_daily_planequery_aircraft_adsb_release.py b/src/create_daily_planequery_aircraft_adsb_release.py new file mode 100644 index 0000000..faa6b6d --- /dev/null +++ b/src/create_daily_planequery_aircraft_adsb_release.py @@ -0,0 +1,64 @@ +from pathlib import Path +from datetime import datetime, timezone, timedelta +import sys + +# Add adsb directory to path +sys.path.insert(0, str(Path(__file__).parent / "adsb")) # TODO: Fix this hacky path manipulation + +from adsb.compress_adsb_to_aircraft_data import ( + load_historical_for_day, + concat_compressed_dfs, + get_latest_aircraft_adsb_csv_df, +) + +if __name__ == '__main__': + # Get yesterday's date (data for the previous day) + day = datetime.now(timezone.utc) - timedelta(days=1) + + # Find a day with complete data + max_attempts = 2 # Don't look back more than a week + for attempt in range(max_attempts): + date_str = day.strftime("%Y-%m-%d") + print(f"Processing ADS-B data for {date_str}") + + print("Loading new ADS-B data...") + df_new = load_historical_for_day(day) + if df_new.empty: + day = day - timedelta(days=1) + continue + max_time = df_new['time'].max() + max_time = max_time.replace(tzinfo=timezone.utc) + + if max_time >= day.replace(hour=23, minute=59, second=59) - timedelta(minutes=5): + # Data is complete + break + + print(f"WARNING: Latest data time is {max_time}, which is more than 5 minutes before end of day.") + day = day - timedelta(days=1) + else: + raise RuntimeError(f"Could not find complete data in the last {max_attempts} days") + + try: + # Get the latest release data + print("Downloading latest ADS-B release...") + df_base, start_date_str = get_latest_aircraft_adsb_csv_df() + # Combine with historical data + print("Combining with historical data...") + df_combined = concat_compressed_dfs(df_base, df_new) + except Exception as e: + print(f"Error downloading latest ADS-B release: {e}") + df_combined = df_new + start_date_str = date_str + + # Sort by time for consistent ordering + df_combined = df_combined.sort_values('time').reset_index(drop=True) + + # Save the result + OUT_ROOT = Path("data/planequery_aircraft") + OUT_ROOT.mkdir(parents=True, exist_ok=True) + + output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{date_str}.csv" + df_combined.to_csv(output_file, index=False) + + print(f"Saved: {output_file}") + print(f"Total aircraft: {len(df_combined)}") diff --git a/src/create_daily_planequery_aircraft_release.py b/src/create_daily_planequery_aircraft_release.py index 4019aa7..559f8fc 100644 --- a/src/create_daily_planequery_aircraft_release.py +++ b/src/create_daily_planequery_aircraft_release.py @@ -25,9 +25,9 @@ if not zip_path.exists(): OUT_ROOT = Path("data/planequery_aircraft") OUT_ROOT.mkdir(parents=True, exist_ok=True) from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df -from get_latest_planequery_aircraft_release import get_latest_aircraft_csv_df +from get_latest_planequery_aircraft_release import get_latest_aircraft_faa_csv_df df_new = convert_faa_master_txt_to_df(zip_path, date_str) -df_base, start_date_str = get_latest_aircraft_csv_df() +df_base, start_date_str = get_latest_aircraft_faa_csv_df() df_base = concat_faa_historical_df(df_base, df_new) assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" -df_base.to_csv(OUT_ROOT / f"planequery_aircraft_{start_date_str}_{date_str}.csv", index=False) \ No newline at end of file +df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date_str}_{date_str}.csv", index=False) \ No newline at end of file diff --git a/src/get_historical_faa.py b/src/get_historical_faa.py index 51cdcd4..656345e 100644 --- a/src/get_historical_faa.py +++ b/src/get_historical_faa.py @@ -112,5 +112,5 @@ for date, sha in date_to_sha.items(): print(len(df_base), "total entries so far") assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" -df_base.to_csv(OUT_ROOT / f"planequery_aircraft_{start_date}_{end_date}.csv", index=False) +df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date}_{end_date}.csv", index=False) # TODO: get average number of new rows per day. diff --git a/src/get_latest_planequery_aircraft_release.py b/src/get_latest_planequery_aircraft_release.py index c264250..9867a5d 100644 --- a/src/get_latest_planequery_aircraft_release.py +++ b/src/get_latest_planequery_aircraft_release.py @@ -109,7 +109,7 @@ def download_latest_aircraft_csv( repo: str = REPO, ) -> Path: """ - Download the latest planequery_aircraft_*.csv file from the latest GitHub release. + Download the latest planequery_aircraft_faa_*.csv file from the latest GitHub release. Args: output_dir: Directory to save the downloaded file (default: "downloads") @@ -120,25 +120,70 @@ def download_latest_aircraft_csv( Path to the downloaded file """ assets = get_latest_release_assets(repo, github_token=github_token) - asset = pick_asset(assets, name_regex=r"^planequery_aircraft_.*\.csv$") + try: + asset = pick_asset(assets, name_regex=r"^planequery_aircraft_faa_.*\.csv$") + except FileNotFoundError: + # Fallback to old naming pattern + asset = pick_asset(assets, name_regex=r"^planequery_aircraft_\d{4}-\d{2}-\d{2}_.*\.csv$") saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") return saved_to -def get_latest_aircraft_csv_df(): +def get_latest_aircraft_faa_csv_df(): csv_path = download_latest_aircraft_csv() import pandas as pd df = pd.read_csv(csv_path, dtype={'transponder_code': str, 'unique_regulatory_id': str, 'registrant_county': str}) df = df.fillna("") - # Extract date from filename pattern: planequery_aircraft_{date}_{date}.csv - match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + # Extract start date from filename pattern: planequery_aircraft_faa_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_faa_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + if not match: + # Fallback to old naming pattern: planequery_aircraft_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path)) if not match: raise ValueError(f"Could not extract date from filename: {csv_path.name}") date_str = match.group(1) return df, date_str + +def download_latest_aircraft_adsb_csv( + output_dir: Path = Path("downloads"), + github_token: Optional[str] = None, + repo: str = REPO, +) -> Path: + """ + Download the latest planequery_aircraft_adsb_*.csv file from the latest GitHub release. + + Args: + output_dir: Directory to save the downloaded file (default: "downloads") + github_token: Optional GitHub token for authentication + repo: GitHub repository in format "owner/repo" (default: REPO) + + Returns: + Path to the downloaded file + """ + assets = get_latest_release_assets(repo, github_token=github_token) + asset = pick_asset(assets, name_regex=r"^planequery_aircraft_adsb_.*\.csv$") + saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) + print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") + return saved_to + + +def get_latest_aircraft_adsb_csv_df(): + csv_path = download_latest_aircraft_adsb_csv() + import pandas as pd + df = pd.read_csv(csv_path) + df = df.fillna("") + # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + if not match: + raise ValueError(f"Could not extract date from filename: {csv_path.name}") + + date_str = match.group(1) + return df, date_str + + if __name__ == "__main__": download_latest_aircraft_csv() diff --git a/trigger_pipeline.py b/trigger_pipeline.py new file mode 100644 index 0000000..d386b25 --- /dev/null +++ b/trigger_pipeline.py @@ -0,0 +1,97 @@ +""" +Generate Step Functions input and start the pipeline. + +Usage: + python trigger_pipeline.py 2024-01-01 2025-01-01 + python trigger_pipeline.py 2024-01-01 2025-01-01 --chunk-days 30 + python trigger_pipeline.py 2024-01-01 2025-01-01 --dry-run +""" +import argparse +import json +import os +import uuid +from datetime import datetime, timedelta + +import boto3 + + +def generate_chunks(start_date: str, end_date: str, chunk_days: int = 1): + """Split a date range into chunks of chunk_days.""" + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + + chunks = [] + current = start + while current < end: + chunk_end = min(current + timedelta(days=chunk_days), end) + chunks.append({ + "start_date": current.strftime("%Y-%m-%d"), + "end_date": chunk_end.strftime("%Y-%m-%d"), + }) + current = chunk_end + + return chunks + + +def main(): + parser = argparse.ArgumentParser(description="Trigger ADS-B map-reduce pipeline") + parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)") + parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)") + parser.add_argument("--chunk-days", type=int, default=1, + help="Days per chunk (default: 1)") + parser.add_argument("--dry-run", action="store_true", + help="Print input JSON without starting execution") + args = parser.parse_args() + + run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}" + chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days) + + clickhouse_host = os.environ["CLICKHOUSE_HOST"] + clickhouse_username = os.environ["CLICKHOUSE_USERNAME"] + clickhouse_password = os.environ["CLICKHOUSE_PASSWORD"] + + # Inject run_id and ClickHouse credentials into each chunk + for chunk in chunks: + chunk["run_id"] = run_id + chunk["clickhouse_host"] = clickhouse_host + chunk["clickhouse_username"] = clickhouse_username + chunk["clickhouse_password"] = clickhouse_password + + sfn_input = { + "run_id": run_id, + "global_start_date": args.start_date, + "global_end_date": args.end_date, + "chunks": chunks, + } + + print(f"Run ID: {run_id}") + print(f"Chunks: {len(chunks)} (at {args.chunk_days} days each)") + print(f"Max concurrency: 3 (enforced by Step Functions Map state)") + print() + print(json.dumps(sfn_input, indent=2)) + + if args.dry_run: + print("\n--dry-run: not starting execution") + return + + client = boto3.client("stepfunctions") + + # Find the state machine ARN + machines = client.list_state_machines()["stateMachines"] + arn = next( + m["stateMachineArn"] + for m in machines + if m["name"] == "adsb-map-reduce" + ) + + response = client.start_execution( + stateMachineArn=arn, + name=run_id, + input=json.dumps(sfn_input), + ) + + print(f"\nStarted execution: {response['executionArn']}") + + +if __name__ == "__main__": + main()