diff --git a/.github/workflows/planequery-aircraft-daily-release.yaml b/.github/workflows/planequery-aircraft-daily-release.yaml index 6e50e58..3cb65f9 100644 --- a/.github/workflows/planequery-aircraft-daily-release.yaml +++ b/.github/workflows/planequery-aircraft-daily-release.yaml @@ -2,27 +2,54 @@ name: planequery-aircraft Daily Release on: schedule: - # 6:00pm UTC every day + # 6:00pm UTC every day - runs on default branch, triggers both - cron: "0 06 * * *" - workflow_dispatch: {} + workflow_dispatch: permissions: contents: write jobs: - build-and-release: + trigger-releases: runs-on: ubuntu-latest + if: github.event_name == 'schedule' + steps: + - name: Trigger main branch release + uses: actions/github-script@v7 + with: + script: | + await github.rest.actions.createWorkflowDispatch({ + owner: context.repo.owner, + repo: context.repo.repo, + workflow_id: 'planequery-aircraft-daily-release.yaml', + ref: 'main' + }); + + - name: Trigger develop branch release + uses: actions/github-script@v7 + with: + script: | + await github.rest.actions.createWorkflowDispatch({ + owner: context.repo.owner, + repo: context.repo.repo, + workflow_id: 'planequery-aircraft-daily-release.yaml', + ref: 'develop' + }); + + build-and-release: + runs-on: ubuntu-24.04-arm + if: github.event_name != 'schedule' steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 - name: Setup Python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: - python-version: "3.12" + python-version: "3.14" - name: Install dependencies run: | @@ -30,8 +57,13 @@ jobs: pip install -r requirements.txt - name: Run daily release script + env: + CLICKHOUSE_HOST: ${{ secrets.CLICKHOUSE_HOST }} + CLICKHOUSE_USERNAME: ${{ secrets.CLICKHOUSE_USERNAME }} + CLICKHOUSE_PASSWORD: ${{ secrets.CLICKHOUSE_PASSWORD }} run: | python src/create_daily_planequery_aircraft_release.py + python src/create_daily_planequery_aircraft_adsb_release.py ls -lah data/faa_releasable ls -lah data/planequery_aircraft @@ -39,15 +71,26 @@ jobs: id: meta run: | DATE=$(date -u +"%Y-%m-%d") - TAG="planequery-aircraft-${DATE}" - # Find the CSV file in data/planequery_aircraft matching the pattern - CSV_FILE=$(ls data/planequery_aircraft/planequery_aircraft_*_${DATE}.csv | head -1) - CSV_BASENAME=$(basename "$CSV_FILE") + BRANCH_NAME="${GITHUB_REF#refs/heads/}" + BRANCH_SUFFIX="" + if [ "$BRANCH_NAME" = "main" ]; then + BRANCH_SUFFIX="-main" + elif [ "$BRANCH_NAME" = "develop" ]; then + BRANCH_SUFFIX="-develop" + fi + TAG="planequery-aircraft-${DATE}${BRANCH_SUFFIX}" + # Find the CSV files in data/planequery_aircraft matching the patterns + CSV_FILE_FAA=$(ls data/planequery_aircraft/planequery_aircraft_faa_*_${DATE}.csv | head -1) + CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA") + CSV_FILE_ADSB=$(ls data/planequery_aircraft/planequery_aircraft_adsb_*_${DATE}.csv | head -1) + CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB") echo "date=$DATE" >> "$GITHUB_OUTPUT" echo "tag=$TAG" >> "$GITHUB_OUTPUT" - echo "csv_file=$CSV_FILE" >> "$GITHUB_OUTPUT" - echo "csv_basename=$CSV_BASENAME" >> "$GITHUB_OUTPUT" - echo "name=planequery-aircraft snapshot ($DATE)" >> "$GITHUB_OUTPUT" + echo "csv_file_faa=$CSV_FILE_FAA" >> "$GITHUB_OUTPUT" + echo "csv_basename_faa=$CSV_BASENAME_FAA" >> "$GITHUB_OUTPUT" + echo "csv_file_adsb=$CSV_FILE_ADSB" >> "$GITHUB_OUTPUT" + echo "csv_basename_adsb=$CSV_BASENAME_ADSB" >> "$GITHUB_OUTPUT" + echo "name=planequery-aircraft snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT" - name: Create GitHub Release and upload assets uses: softprops/action-gh-release@v2 @@ -58,10 +101,12 @@ jobs: Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}. Assets: - - ${{ steps.meta.outputs.csv_basename }} + - ${{ steps.meta.outputs.csv_basename_faa }} + - ${{ steps.meta.outputs.csv_basename_adsb }} - ReleasableAircraft_${{ steps.meta.outputs.date }}.zip files: | - ${{ steps.meta.outputs.csv_file }} + ${{ steps.meta.outputs.csv_file_faa }} + ${{ steps.meta.outputs.csv_file_adsb }} data/faa_releasable/ReleasableAircraft_${{ steps.meta.outputs.date }}.zip env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore index 74326ca..7ef2c0b 100644 --- a/.gitignore +++ b/.gitignore @@ -218,4 +218,67 @@ __marimo__/ # Custom data/ .DS_Store -notebooks/ \ No newline at end of file + +# --- CDK --- +# VSCode extension + +# Store launch config in repo but not settings +.vscode/settings.json +/.favorites.json + +# TypeScript incremental build states +*.tsbuildinfo + +# Local state files & OS specifics +.DS_Store +node_modules/ +lerna-debug.log +dist/ +pack/ +.BUILD_COMPLETED +.local-npm/ +.tools/ +coverage/ +.nyc_output +.nycrc +.LAST_BUILD +*.sw[a-z] +*~ +.idea +*.iml +junit.xml + +# We don't want tsconfig at the root +/tsconfig.json + +# CDK Context & Staging files +cdk.context.json +.cdk.staging/ +cdk.out/ +*.tabl.json +cdk-integ.out.*/ + +# Yarn error log +yarn-error.log + +# VSCode history plugin +.vscode/.history/ + +# Cloud9 +.c9 +.nzm-* + +/.versionrc.json +RELEASE_NOTES.md + +# Produced by integ tests +read*lock + +# VSCode jest plugin +.test-output + +# Nx cache +.nx/ + +# jsii-rosetta files +type-fingerprints.txt \ No newline at end of file diff --git a/infra/app.py b/infra/app.py new file mode 100644 index 0000000..83509f6 --- /dev/null +++ b/infra/app.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +import os +import aws_cdk as cdk +from stack import AdsbProcessingStack + +app = cdk.App() +AdsbProcessingStack(app, "AdsbProcessingStack", env=cdk.Environment( + account=os.environ["CDK_DEFAULT_ACCOUNT"], + region=os.environ["CDK_DEFAULT_REGION"], +)) +app.synth() diff --git a/infra/cdk.json b/infra/cdk.json new file mode 100644 index 0000000..b4baa10 --- /dev/null +++ b/infra/cdk.json @@ -0,0 +1,3 @@ +{ + "app": "python3 app.py" +} diff --git a/infra/requirements.txt b/infra/requirements.txt new file mode 100644 index 0000000..32b3387 --- /dev/null +++ b/infra/requirements.txt @@ -0,0 +1,2 @@ +aws-cdk-lib>=2.170.0 +constructs>=10.0.0 diff --git a/infra/stack.py b/infra/stack.py new file mode 100644 index 0000000..84f6d49 --- /dev/null +++ b/infra/stack.py @@ -0,0 +1,225 @@ +import aws_cdk as cdk +from aws_cdk import ( + Stack, + Duration, + RemovalPolicy, + aws_s3 as s3, + aws_ecs as ecs, + aws_ec2 as ec2, + aws_ecr_assets, + aws_iam as iam, + aws_logs as logs, + aws_stepfunctions as sfn, + aws_stepfunctions_tasks as sfn_tasks, +) +from constructs import Construct +from pathlib import Path + + +class AdsbProcessingStack(Stack): + def __init__(self, scope: Construct, id: str, **kwargs): + super().__init__(scope, id, **kwargs) + + # --- S3 bucket for intermediate and final results --- + bucket = s3.Bucket( + self, "ResultsBucket", + bucket_name="planequery-aircraft-dev", + removal_policy=RemovalPolicy.DESTROY, + auto_delete_objects=True, + lifecycle_rules=[ + s3.LifecycleRule( + prefix="intermediate/", + expiration=Duration.days(7), + ) + ], + ) + + # --- Use default VPC (no additional cost) --- + vpc = ec2.Vpc.from_lookup( + self, "Vpc", + is_default=True, + ) + + # --- ECS Cluster --- + cluster = ecs.Cluster( + self, "Cluster", + vpc=vpc, + container_insights=True, + ) + + # --- Log group --- + log_group = logs.LogGroup( + self, "LogGroup", + log_group_name="/adsb-processing", + removal_policy=RemovalPolicy.DESTROY, + retention=logs.RetentionDays.TWO_WEEKS, + ) + + # --- Docker images (built from local Dockerfiles) --- + adsb_dir = str(Path(__file__).parent.parent / "src" / "adsb") + + worker_image = ecs.ContainerImage.from_asset( + adsb_dir, + file="Dockerfile.worker", + platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64, + ) + reducer_image = ecs.ContainerImage.from_asset( + adsb_dir, + file="Dockerfile.reducer", + platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64, + ) + + # --- Task role (shared) --- + task_role = iam.Role( + self, "TaskRole", + assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"), + ) + bucket.grant_read_write(task_role) + + # --- MAP: worker task definition --- + map_task_def = ecs.FargateTaskDefinition( + self, "MapTaskDef", + cpu=16384, # 16 vCPU + memory_limit_mib=65536, # 64 GB — high memory for pandas operations on large datasets + task_role=task_role, + runtime_platform=ecs.RuntimePlatform( + cpu_architecture=ecs.CpuArchitecture.ARM64, + operating_system_family=ecs.OperatingSystemFamily.LINUX, + ), + ) + map_container = map_task_def.add_container( + "worker", + image=worker_image, + logging=ecs.LogDrivers.aws_logs( + stream_prefix="map", + log_group=log_group, + ), + environment={ + "S3_BUCKET": bucket.bucket_name, + }, + ) + + # --- REDUCE: reducer task definition --- + reduce_task_def = ecs.FargateTaskDefinition( + self, "ReduceTaskDef", + cpu=4096, # 4 vCPU + memory_limit_mib=30720, # 30 GB — must hold full year in memory + task_role=task_role, + runtime_platform=ecs.RuntimePlatform( + cpu_architecture=ecs.CpuArchitecture.ARM64, + operating_system_family=ecs.OperatingSystemFamily.LINUX, + ), + ) + reduce_container = reduce_task_def.add_container( + "reducer", + image=reducer_image, + logging=ecs.LogDrivers.aws_logs( + stream_prefix="reduce", + log_group=log_group, + ), + environment={ + "S3_BUCKET": bucket.bucket_name, + }, + ) + + # --- Step Functions --- + + # Map task: run ECS Fargate for each date chunk + map_ecs_task = sfn_tasks.EcsRunTask( + self, "ProcessChunk", + integration_pattern=sfn.IntegrationPattern.RUN_JOB, + cluster=cluster, + task_definition=map_task_def, + launch_target=sfn_tasks.EcsFargateLaunchTarget( + platform_version=ecs.FargatePlatformVersion.LATEST, + ), + container_overrides=[ + sfn_tasks.ContainerOverride( + container_definition=map_container, + environment=[ + sfn_tasks.TaskEnvironmentVariable( + name="START_DATE", + value=sfn.JsonPath.string_at("$.start_date"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="END_DATE", + value=sfn.JsonPath.string_at("$.end_date"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="RUN_ID", + value=sfn.JsonPath.string_at("$.run_id"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="CLICKHOUSE_HOST", + value=sfn.JsonPath.string_at("$.clickhouse_host"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="CLICKHOUSE_USERNAME", + value=sfn.JsonPath.string_at("$.clickhouse_username"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="CLICKHOUSE_PASSWORD", + value=sfn.JsonPath.string_at("$.clickhouse_password"), + ), + ], + ) + ], + assign_public_ip=True, + subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), + result_path="$.task_result", + ) + + # Map state — max 3 concurrent workers + map_state = sfn.Map( + self, "FanOutChunks", + items_path="$.chunks", + max_concurrency=3, + result_path="$.map_results", + ) + map_state.item_processor(map_ecs_task) + + # Reduce task: combine all chunk CSVs + reduce_ecs_task = sfn_tasks.EcsRunTask( + self, "ReduceResults", + integration_pattern=sfn.IntegrationPattern.RUN_JOB, + cluster=cluster, + task_definition=reduce_task_def, + launch_target=sfn_tasks.EcsFargateLaunchTarget( + platform_version=ecs.FargatePlatformVersion.LATEST, + ), + container_overrides=[ + sfn_tasks.ContainerOverride( + container_definition=reduce_container, + environment=[ + sfn_tasks.TaskEnvironmentVariable( + name="RUN_ID", + value=sfn.JsonPath.string_at("$.run_id"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="GLOBAL_START_DATE", + value=sfn.JsonPath.string_at("$.global_start_date"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="GLOBAL_END_DATE", + value=sfn.JsonPath.string_at("$.global_end_date"), + ), + ], + ) + ], + assign_public_ip=True, + subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), + ) + + # Chain: fan-out map → reduce + definition = map_state.next(reduce_ecs_task) + + sfn.StateMachine( + self, "Pipeline", + state_machine_name="adsb-map-reduce", + definition_body=sfn.DefinitionBody.from_chainable(definition), + timeout=Duration.hours(48), + ) + + # --- Outputs --- + cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name) + cdk.CfnOutput(self, "StateMachineName", value="adsb-map-reduce") diff --git a/notebooks/planequery_adsb_read.ipynb b/notebooks/planequery_adsb_read.ipynb new file mode 100644 index 0000000..f0d98e0 --- /dev/null +++ b/notebooks/planequery_adsb_read.ipynb @@ -0,0 +1,640 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "06ae0319", + "metadata": {}, + "outputs": [], + "source": [ + "import clickhouse_connect\n", + "client = clickhouse_connect.get_client(\n", + " host=os.environ[\"CLICKHOUSE_HOST\"],\n", + " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", + " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", + " secure=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "779710f0", + "metadata": {}, + "outputs": [], + "source": [ + "df = client.query_df(\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '2024-01-01 00:00:00' AND time < '2024-01-02 00:00:00'\")\n", + "df_copy = df.copy()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bf024da8", + "metadata": {}, + "outputs": [], + "source": [ + "# -- military = dbFlags & 1; interesting = dbFlags & 2; PIA = dbFlags & 4; LADD = dbFlags & 8;" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "270607b5", + "metadata": {}, + "outputs": [], + "source": [ + "df = load_raw_adsb_for_day(datetime(2024,1,1))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ac06a30e", + "metadata": {}, + "outputs": [], + "source": [ + "df['aircraft']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "91edab3e", + "metadata": {}, + "outputs": [], + "source": [ + "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", + "def compress_df(df):\n", + " icao = df.name\n", + " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", + " original_df = df.copy()\n", + " df = df.groupby(\"_signature\", as_index=False).last() # check if it works with both last and first.\n", + " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", + " def get_non_empty_dict(row):\n", + " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", + " \n", + " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", + " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", + " \n", + " # Check if row i's non-empty values are a subset of row j's non-empty values\n", + " def is_subset_of_any(idx):\n", + " row_dict = df.loc[idx, '_non_empty_dict']\n", + " row_count = df.loc[idx, '_non_empty_count']\n", + " \n", + " for other_idx in df.index:\n", + " if idx == other_idx:\n", + " continue\n", + " other_dict = df.loc[other_idx, '_non_empty_dict']\n", + " other_count = df.loc[other_idx, '_non_empty_count']\n", + " \n", + " # Check if all non-empty values in current row match those in other row\n", + " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", + " # If they match and other has more defined columns, current row is redundant\n", + " if other_count > row_count:\n", + " return True\n", + " return False\n", + " \n", + " # Keep rows that are not subsets of any other row\n", + " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", + " df = df[keep_mask]\n", + "\n", + " if len(df) > 1:\n", + " original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n", + " value_counts = original_df[\"_signature\"].value_counts()\n", + " max_signature = value_counts.idxmax()\n", + " df = df[df['_signature'] == max_signature]\n", + "\n", + " df['icao'] = icao\n", + " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", + " return df\n", + "\n", + "# df = df_copy\n", + "# df = df_copy.iloc[0:100000]\n", + "# df = df[df['r'] == \"N4131T\"]\n", + "# df = df[(df['icao'] == \"008081\")]\n", + "# df = df.iloc[0:500]\n", + "df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + "df = df.drop(columns=['aircraft'])\n", + "df = df.sort_values(['icao', 'time'])\n", + "df[COLUMNS] = df[COLUMNS].fillna('')\n", + "ORIGINAL_COLUMNS = df.columns.tolist()\n", + "df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", + "cols = df_compressed.columns.tolist()\n", + "cols.remove(\"icao\")\n", + "cols.insert(1, \"icao\")\n", + "df_compressed = df_compressed[cols]\n", + "df_compressed" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "efdfcb2c", + "metadata": {}, + "outputs": [], + "source": [ + "df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + "df[~df['aircraft_category'].isna()]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "495c5025", + "metadata": {}, + "outputs": [], + "source": [ + "# SOME KIND OF MAP REDUCE SYSTEM\n", + "import os\n", + "\n", + "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", + "def compress_df(df):\n", + " icao = df.name\n", + " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", + " \n", + " # Compute signature counts before grouping (avoid copy)\n", + " signature_counts = df[\"_signature\"].value_counts()\n", + " \n", + " df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n", + " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", + " def get_non_empty_dict(row):\n", + " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", + " \n", + " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", + " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", + " \n", + " # Check if row i's non-empty values are a subset of row j's non-empty values\n", + " def is_subset_of_any(idx):\n", + " row_dict = df.loc[idx, '_non_empty_dict']\n", + " row_count = df.loc[idx, '_non_empty_count']\n", + " \n", + " for other_idx in df.index:\n", + " if idx == other_idx:\n", + " continue\n", + " other_dict = df.loc[other_idx, '_non_empty_dict']\n", + " other_count = df.loc[other_idx, '_non_empty_count']\n", + " \n", + " # Check if all non-empty values in current row match those in other row\n", + " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", + " # If they match and other has more defined columns, current row is redundant\n", + " if other_count > row_count:\n", + " return True\n", + " return False\n", + " \n", + " # Keep rows that are not subsets of any other row\n", + " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", + " df = df[keep_mask]\n", + "\n", + " if len(df) > 1:\n", + " # Use pre-computed signature counts instead of original_df\n", + " remaining_sigs = df['_signature']\n", + " sig_counts = signature_counts[remaining_sigs]\n", + " max_signature = sig_counts.idxmax()\n", + " df = df[df['_signature'] == max_signature]\n", + "\n", + " df['icao'] = icao\n", + " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", + " return df\n", + "\n", + "# names of releases something like\n", + "# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n", + "\n", + "# Let's build historical first. \n", + "\n", + "_ch_client = None\n", + "\n", + "def _get_clickhouse_client():\n", + " \"\"\"Return a reusable ClickHouse client, with retry/backoff for transient DNS or connection errors.\"\"\"\n", + " global _ch_client\n", + " if _ch_client is not None:\n", + " return _ch_client\n", + "\n", + " import clickhouse_connect\n", + " import time\n", + "\n", + " max_retries = 5\n", + " for attempt in range(1, max_retries + 1):\n", + " try:\n", + " _ch_client = clickhouse_connect.get_client(\n", + " host=os.environ[\"CLICKHOUSE_HOST\"],\n", + " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", + " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", + " secure=True,\n", + " )\n", + " return _ch_client\n", + " except Exception as e:\n", + " wait = min(2 ** attempt, 30)\n", + " print(f\" ClickHouse connect attempt {attempt}/{max_retries} failed: {e}\")\n", + " if attempt == max_retries:\n", + " raise\n", + " print(f\" Retrying in {wait}s...\")\n", + " time.sleep(wait)\n", + "\n", + "\n", + "def load_raw_adsb_for_day(day):\n", + " \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n", + " from datetime import timedelta\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " import time\n", + " \n", + " start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n", + " end_time = start_time + timedelta(days=1)\n", + " \n", + " # Set up caching\n", + " cache_dir = Path(\"data/adsb\")\n", + " cache_dir.mkdir(parents=True, exist_ok=True)\n", + " cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n", + " \n", + " # Check if cache exists\n", + " if cache_file.exists():\n", + " print(f\" Loading from cache: {cache_file}\")\n", + " df = pd.read_csv(cache_file, compression='zstd')\n", + " df['time'] = pd.to_datetime(df['time'])\n", + " else:\n", + " # Format dates for the query\n", + " start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " \n", + " max_retries = 3\n", + " for attempt in range(1, max_retries + 1):\n", + " try:\n", + " client = _get_clickhouse_client()\n", + " print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n", + " df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n", + " break\n", + " except Exception as e:\n", + " wait = min(2 ** attempt, 30)\n", + " print(f\" Query attempt {attempt}/{max_retries} failed: {e}\")\n", + " if attempt == max_retries:\n", + " raise\n", + " # Reset client in case connection is stale\n", + " global _ch_client\n", + " _ch_client = None\n", + " print(f\" Retrying in {wait}s...\")\n", + " time.sleep(wait)\n", + " \n", + " # Save to cache\n", + " df.to_csv(cache_file, index=False, compression='zstd')\n", + " print(f\" Saved to cache: {cache_file}\")\n", + " \n", + " return df\n", + "\n", + "def load_historical_for_day(day):\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " \n", + " df = load_raw_adsb_for_day(day)\n", + " print(df)\n", + " df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + " df = df.drop(columns=['aircraft'])\n", + " df = df.sort_values(['icao', 'time'])\n", + " df[COLUMNS] = df[COLUMNS].fillna('')\n", + " df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", + " cols = df_compressed.columns.tolist()\n", + " cols.remove('time')\n", + " cols.insert(0, 'time')\n", + " cols.remove(\"icao\")\n", + " cols.insert(1, \"icao\")\n", + " df_compressed = df_compressed[cols]\n", + " return df_compressed\n", + "\n", + "\n", + "def concat_compressed_dfs(df_base, df_new):\n", + " \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n", + " import pandas as pd\n", + " \n", + " # Combine both dataframes\n", + " df_combined = pd.concat([df_base, df_new], ignore_index=True)\n", + " \n", + " # Sort by ICAO and time\n", + " df_combined = df_combined.sort_values(['icao', 'time'])\n", + " \n", + " # Fill NaN values\n", + " df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n", + " \n", + " # Apply compression logic per ICAO to get the best row\n", + " df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n", + " \n", + " # Sort by time\n", + " df_compressed = df_compressed.sort_values('time')\n", + " \n", + " return df_compressed\n", + "\n", + "\n", + "def get_latest_aircraft_adsb_csv_df():\n", + " \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n", + " from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n", + " \n", + " import pandas as pd\n", + " import re\n", + " \n", + " csv_path = download_latest_aircraft_adsb_csv()\n", + " df = pd.read_csv(csv_path)\n", + " df = df.fillna(\"\")\n", + " \n", + " # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n", + " match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n", + " if not match:\n", + " raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n", + " \n", + " date_str = match.group(1)\n", + " return df, date_str\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7f66acf7", + "metadata": {}, + "outputs": [], + "source": [ + "# SOME KIND OF MAP REDUCE SYSTEM\n", + "\n", + "\n", + "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", + "def compress_df(df):\n", + " icao = df.name\n", + " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", + " original_df = df.copy()\n", + " df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n", + " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", + " def get_non_empty_dict(row):\n", + " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", + " \n", + " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", + " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", + " \n", + " # Check if row i's non-empty values are a subset of row j's non-empty values\n", + " def is_subset_of_any(idx):\n", + " row_dict = df.loc[idx, '_non_empty_dict']\n", + " row_count = df.loc[idx, '_non_empty_count']\n", + " \n", + " for other_idx in df.index:\n", + " if idx == other_idx:\n", + " continue\n", + " other_dict = df.loc[other_idx, '_non_empty_dict']\n", + " other_count = df.loc[other_idx, '_non_empty_count']\n", + " \n", + " # Check if all non-empty values in current row match those in other row\n", + " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", + " # If they match and other has more defined columns, current row is redundant\n", + " if other_count > row_count:\n", + " return True\n", + " return False\n", + " \n", + " # Keep rows that are not subsets of any other row\n", + " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", + " df = df[keep_mask]\n", + "\n", + " if len(df) > 1:\n", + " original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n", + " value_counts = original_df[\"_signature\"].value_counts()\n", + " max_signature = value_counts.idxmax()\n", + " df = df[df['_signature'] == max_signature]\n", + "\n", + " df['icao'] = icao\n", + " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", + " return df\n", + "\n", + "# names of releases something like\n", + "# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n", + "\n", + "# Let's build historical first. \n", + "\n", + "def load_raw_adsb_for_day(day):\n", + " \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n", + " from datetime import timedelta\n", + " import clickhouse_connect\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " \n", + " start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n", + " end_time = start_time + timedelta(days=1)\n", + " \n", + " # Set up caching\n", + " cache_dir = Path(\"data/adsb\")\n", + " cache_dir.mkdir(parents=True, exist_ok=True)\n", + " cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n", + " \n", + " # Check if cache exists\n", + " if cache_file.exists():\n", + " print(f\" Loading from cache: {cache_file}\")\n", + " df = pd.read_csv(cache_file, compression='zstd')\n", + " df['time'] = pd.to_datetime(df['time'])\n", + " else:\n", + " # Format dates for the query\n", + " start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " \n", + " client = clickhouse_connect.get_client(\n", + " host=os.environ[\"CLICKHOUSE_HOST\"],\n", + " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", + " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", + " secure=True,\n", + " )\n", + " print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n", + " df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n", + " \n", + " # Save to cache\n", + " df.to_csv(cache_file, index=False, compression='zstd')\n", + " print(f\" Saved to cache: {cache_file}\")\n", + " \n", + " return df\n", + "\n", + "def load_historical_for_day(day):\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " \n", + " df = load_raw_adsb_for_day(day)\n", + " \n", + " df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + " df = df.drop(columns=['aircraft'])\n", + " df = df.sort_values(['icao', 'time'])\n", + " df[COLUMNS] = df[COLUMNS].fillna('')\n", + " df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", + " cols = df_compressed.columns.tolist()\n", + " cols.remove('time')\n", + " cols.insert(0, 'time')\n", + " cols.remove(\"icao\")\n", + " cols.insert(1, \"icao\")\n", + " df_compressed = df_compressed[cols]\n", + " return df_compressed\n", + "\n", + "\n", + "def concat_compressed_dfs(df_base, df_new):\n", + " \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n", + " import pandas as pd\n", + " \n", + " # Combine both dataframes\n", + " df_combined = pd.concat([df_base, df_new], ignore_index=True)\n", + " \n", + " # Sort by ICAO and time\n", + " df_combined = df_combined.sort_values(['icao', 'time'])\n", + " \n", + " # Fill NaN values\n", + " df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n", + " \n", + " # Apply compression logic per ICAO to get the best row\n", + " df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n", + " \n", + " # Sort by time\n", + " df_compressed = df_compressed.sort_values('time')\n", + " \n", + " return df_compressed\n", + "\n", + "\n", + "def get_latest_aircraft_adsb_csv_df():\n", + " \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n", + " from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n", + " \n", + " import pandas as pd\n", + " import re\n", + " \n", + " csv_path = download_latest_aircraft_adsb_csv()\n", + " df = pd.read_csv(csv_path)\n", + " df = df.fillna(\"\")\n", + " \n", + " # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n", + " match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n", + " if not match:\n", + " raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n", + " \n", + " date_str = match.group(1)\n", + " return df, date_str\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e14c8363", + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "df = load_historical_for_day(datetime(2024,1,1))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3874ba4d", + "metadata": {}, + "outputs": [], + "source": [ + "len(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bcae50ad", + "metadata": {}, + "outputs": [], + "source": [ + "df[(df['icao'] == \"008081\")]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "50921c86", + "metadata": {}, + "outputs": [], + "source": [ + "df[df['icao'] == \"a4e1d2\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8194d9aa", + "metadata": {}, + "outputs": [], + "source": [ + "df[df['r'] == \"N4131T\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1e3b7aa2", + "metadata": {}, + "outputs": [], + "source": [ + "df_compressed[df_compressed['icao'].duplicated(keep=False)]\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "40613bc1", + "metadata": {}, + "outputs": [], + "source": [ + "import gzip\n", + "import json\n", + "\n", + "path = \"/Users/jonahgoode/Downloads/test_extract/traces/fb/trace_full_acbbfb.json\"\n", + "\n", + "with gzip.open(path, \"rt\", encoding=\"utf-8\") as f:\n", + " data = json.load(f)\n", + "\n", + "print(type(data))\n", + "# use `data` here\n", + "import json\n", + "print(json.dumps(data, indent=2)[:2000])\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "320109b2", + "metadata": {}, + "outputs": [], + "source": [ + "# First, load the JSON to inspect its structure\n", + "import json\n", + "with open(\"/Users/jonahgoode/Documents/PlaneQuery/Other-Code/readsb-protobuf/webapp/src/db/aircrafts.json\", 'r') as f:\n", + " data = json.load(f)\n", + "\n", + "# Check the structure\n", + "print(type(data))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "590134f4", + "metadata": {}, + "outputs": [], + "source": [ + "data['AC97E3']" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/requirements.txt b/requirements.txt index c30c4f5..387292e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ faa-aircraft-registry==0.1.0 pandas==3.0.0 - +clickhouse-connect==0.10.0 diff --git a/src/adsb/Dockerfile.reducer b/src/adsb/Dockerfile.reducer new file mode 100644 index 0000000..ddc06db --- /dev/null +++ b/src/adsb/Dockerfile.reducer @@ -0,0 +1,10 @@ +FROM --platform=linux/arm64 python:3.12-slim + +WORKDIR /app + +COPY requirements.reducer.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY reducer.py . + +CMD ["python", "-u", "reducer.py"] diff --git a/src/adsb/Dockerfile.worker b/src/adsb/Dockerfile.worker new file mode 100644 index 0000000..e721442 --- /dev/null +++ b/src/adsb/Dockerfile.worker @@ -0,0 +1,11 @@ +FROM --platform=linux/arm64 python:3.12-slim + +WORKDIR /app + +COPY requirements.worker.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY compress_adsb_to_aircraft_data.py . +COPY worker.py . + +CMD ["python", "-u", "worker.py"] diff --git a/src/adsb/adsb_to_aircraft_data_daily.py b/src/adsb/adsb_to_aircraft_data_daily.py new file mode 100644 index 0000000..380f187 --- /dev/null +++ b/src/adsb/adsb_to_aircraft_data_daily.py @@ -0,0 +1,7 @@ +from pathlib import Path +from datetime import datetime, timezone,timedelta +from adsb_to_aircraft_data_historical import load_historical_for_day + + +day = datetime.now(timezone.utc) - timedelta(days=1) +load_historical_for_day(day) \ No newline at end of file diff --git a/src/adsb/adsb_to_aircraft_data_historical.py b/src/adsb/adsb_to_aircraft_data_historical.py new file mode 100644 index 0000000..1ff7acf --- /dev/null +++ b/src/adsb/adsb_to_aircraft_data_historical.py @@ -0,0 +1,87 @@ +""" +Process historical ADS-B data by date range. +Downloads and compresses ADS-B messages for each day in the specified range. +""" +import argparse +from datetime import datetime, timedelta +from pathlib import Path +import pandas as pd +from compress_adsb_to_aircraft_data import load_historical_for_day, COLUMNS + +def deduplicate_by_signature(df): + """For each icao, keep only the earliest row with each unique signature.""" + df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1) + # Group by icao and signature, keep first (earliest) occurrence + df_deduped = df.groupby(['icao', '_signature'], as_index=False).first() + df_deduped = df_deduped.drop(columns=['_signature']) + df_deduped = df_deduped.sort_values('time') + return df_deduped + + +def main(start_date_str: str, end_date_str: str): + """Process historical ADS-B data for the given date range.""" + OUT_ROOT = Path("data/planequery_aircraft") + OUT_ROOT.mkdir(parents=True, exist_ok=True) + + # Parse dates + start_date = datetime.strptime(start_date_str, "%Y-%m-%d") + end_date = datetime.strptime(end_date_str, "%Y-%m-%d") + + # Calculate total number of days + total_days = (end_date - start_date).days + print(f"Processing {total_days} days from {start_date_str} to {end_date_str}") + + # Initialize accumulated dataframe + df_accumulated = pd.DataFrame() + + # Cache directory path + cache_dir = Path("data/adsb") + + # Iterate through each day + current_date = start_date + while current_date < end_date: + print(f"Processing {current_date.strftime('%Y-%m-%d')}...") + + df_compressed = load_historical_for_day(current_date) + + # Concatenate to accumulated dataframe + if df_accumulated.empty: + df_accumulated = df_compressed + else: + df_accumulated = pd.concat([df_accumulated, df_compressed], ignore_index=True) + + print(f" Added {len(df_compressed)} records (total: {len(df_accumulated)})") + + # Save intermediate output after each day + current_date_str = current_date.strftime('%Y-%m-%d') + output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{current_date_str}.csv.gz" + df_deduped = deduplicate_by_signature(df_accumulated.copy()) + df_deduped.to_csv(output_file, index=False, compression='gzip') + print(f" Saved to {output_file.name}") + + # Delete cache after processing if processing more than 10 days + if total_days > 5 and cache_dir.exists(): + import shutil + shutil.rmtree(cache_dir) + print(f" Deleted cache directory to save space") + + # Move to next day + current_date += timedelta(days=1) + + # Save the final accumulated data + output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{end_date_str}.csv.gz" + df_accumulated = deduplicate_by_signature(df_accumulated) + df_accumulated.to_csv(output_file, index=False, compression='gzip') + + print(f"Completed processing from {start_date_str} to {end_date_str}") + print(f"Saved {len(df_accumulated)} total records to {output_file}") + + +if __name__ == '__main__': + # Parse command line arguments + parser = argparse.ArgumentParser(description="Process historical ADS-B data from ClickHouse") + parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)") + parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)") + args = parser.parse_args() + + main(args.start_date, args.end_date) diff --git a/src/adsb/compress_adsb_to_aircraft_data.py b/src/adsb/compress_adsb_to_aircraft_data.py new file mode 100644 index 0000000..d191c47 --- /dev/null +++ b/src/adsb/compress_adsb_to_aircraft_data.py @@ -0,0 +1,170 @@ +# SOME KIND OF MAP REDUCE SYSTEM +import os + +COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't'] +def compress_df(df): + icao = df.name + df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1) + + # Compute signature counts before grouping (avoid copy) + signature_counts = df["_signature"].value_counts() + + df = df.groupby("_signature", as_index=False).first() # check if it works with both last and first. + # For each row, create a dict of non-empty column values. This is using sets and subsets... + def get_non_empty_dict(row): + return {col: row[col] for col in COLUMNS if row[col] != ''} + + df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1) + df['_non_empty_count'] = df['_non_empty_dict'].apply(len) + + # Check if row i's non-empty values are a subset of row j's non-empty values + def is_subset_of_any(idx): + row_dict = df.loc[idx, '_non_empty_dict'] + row_count = df.loc[idx, '_non_empty_count'] + + for other_idx in df.index: + if idx == other_idx: + continue + other_dict = df.loc[other_idx, '_non_empty_dict'] + other_count = df.loc[other_idx, '_non_empty_count'] + + # Check if all non-empty values in current row match those in other row + if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()): + # If they match and other has more defined columns, current row is redundant + if other_count > row_count: + return True + return False + + # Keep rows that are not subsets of any other row + keep_mask = ~df.index.to_series().apply(is_subset_of_any) + df = df[keep_mask] + + if len(df) > 1: + # Use pre-computed signature counts instead of original_df + remaining_sigs = df['_signature'] + sig_counts = signature_counts[remaining_sigs] + max_signature = sig_counts.idxmax() + df = df[df['_signature'] == max_signature] + + df['icao'] = icao + df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature']) + # Ensure empty strings are preserved, not NaN + df[COLUMNS] = df[COLUMNS].fillna('') + return df + +# names of releases something like +# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz + +# Let's build historical first. + +def load_raw_adsb_for_day(day): + """Load raw ADS-B data for a day from parquet file.""" + from datetime import timedelta + from pathlib import Path + import pandas as pd + + start_time = day.replace(hour=0, minute=0, second=0, microsecond=0) + + # Check for parquet file first + version_date = f"v{start_time.strftime('%Y.%m.%d')}" + parquet_file = Path(f"data/output/parquet_output/{version_date}.parquet") + + if not parquet_file.exists(): + # Try to generate parquet file by calling the download function + print(f" Parquet file not found: {parquet_file}") + print(f" Attempting to download and generate parquet for {start_time.strftime('%Y-%m-%d')}...") + + from download_adsb_data_to_parquet import create_parquet_for_day + result_path = create_parquet_for_day(start_time, keep_folders=False) + + if result_path: + print(f" Successfully generated parquet file: {result_path}") + else: + raise Exception("Failed to generate parquet file") + + if parquet_file.exists(): + print(f" Loading from parquet: {parquet_file}") + df = pd.read_parquet( + parquet_file, + columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category'] + ) + + # Convert to timezone-naive datetime + df['time'] = df['time'].dt.tz_localize(None) + return df + else: + # Return empty DataFrame if parquet file doesn't exist + print(f" No data available for {start_time.strftime('%Y-%m-%d')}") + import pandas as pd + return pd.DataFrame(columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category']) + +def load_historical_for_day(day): + from pathlib import Path + import pandas as pd + df = load_raw_adsb_for_day(day) + if df.empty: + return df + print(f"Loaded {len(df)} raw records for {day.strftime('%Y-%m-%d')}") + df = df.sort_values(['icao', 'time']) + print("done sort") + df[COLUMNS] = df[COLUMNS].fillna('') + + # First pass: quick deduplication of exact duplicates + df = df.drop_duplicates(subset=['icao'] + COLUMNS, keep='first') + print(f"After quick dedup: {len(df)} records") + + # Second pass: sophisticated compression per ICAO + print("Compressing per ICAO...") + df_compressed = df.groupby('icao', group_keys=False).apply(compress_df) + print(f"After compress: {len(df_compressed)} records") + + cols = df_compressed.columns.tolist() + cols.remove('time') + cols.insert(0, 'time') + cols.remove("icao") + cols.insert(1, "icao") + df_compressed = df_compressed[cols] + return df_compressed + + +def concat_compressed_dfs(df_base, df_new): + """Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.""" + import pandas as pd + + # Combine both dataframes + df_combined = pd.concat([df_base, df_new], ignore_index=True) + + # Sort by ICAO and time + df_combined = df_combined.sort_values(['icao', 'time']) + + # Fill NaN values + df_combined[COLUMNS] = df_combined[COLUMNS].fillna('') + + # Apply compression logic per ICAO to get the best row + df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df) + + # Sort by time + df_compressed = df_compressed.sort_values('time') + + return df_compressed + + +def get_latest_aircraft_adsb_csv_df(): + """Download and load the latest ADS-B CSV from GitHub releases.""" + from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv + + import pandas as pd + import re + + csv_path = download_latest_aircraft_adsb_csv() + df = pd.read_csv(csv_path) + df = df.fillna("") + + # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + if not match: + raise ValueError(f"Could not extract date from filename: {csv_path.name}") + + date_str = match.group(1) + return df, date_str + diff --git a/src/adsb/download_adsb_data_to_parquet.py b/src/adsb/download_adsb_data_to_parquet.py new file mode 100644 index 0000000..deb1c82 --- /dev/null +++ b/src/adsb/download_adsb_data_to_parquet.py @@ -0,0 +1,684 @@ +""" +Downloads adsb.lol data and writes to Parquet files. + +Usage: + python -m src.process_historical_adsb_data.download_to_parquet 2025-01-01 2025-01-02 + +This will download trace data for the specified date range and output Parquet files. + +This file is self-contained and does not import from other project modules. +""" +import gc +import glob +import gzip +import sys +import logging +import time +import re +import signal +import concurrent.futures +import subprocess +import os +import argparse +import datetime as dt +from datetime import datetime, timedelta, timezone + +import requests +import orjson +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq + + +# ============================================================================ +# Configuration +# ============================================================================ + +OUTPUT_DIR = "./data/output" +os.makedirs(OUTPUT_DIR, exist_ok=True) + +PARQUET_DIR = os.path.join(OUTPUT_DIR, "parquet_output") +os.makedirs(PARQUET_DIR, exist_ok=True) + +TOKEN = os.environ.get('GITHUB_TOKEN') # Optional: for higher GitHub API rate limits +HEADERS = {"Authorization": f"token {TOKEN}"} if TOKEN else {} + + +# ============================================================================ +# GitHub Release Fetching and Downloading +# ============================================================================ + +class DownloadTimeoutException(Exception): + pass + + +def timeout_handler(signum, frame): + raise DownloadTimeoutException("Download timed out after 40 seconds") + + +def fetch_releases(version_date: str) -> list: + """Fetch GitHub releases for a given version date from adsblol.""" + year = version_date.split('.')[0][1:] + if version_date == "v2024.12.31": + year = "2025" + BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases" + PATTERN = f"{version_date}-planes-readsb-prod-0" + releases = [] + page = 1 + + while True: + max_retries = 10 + retry_delay = 60 + + for attempt in range(1, max_retries + 1): + try: + response = requests.get(f"{BASE_URL}?page={page}", headers=HEADERS) + if response.status_code == 200: + break + else: + print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status_code} {response.reason}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry...") + time.sleep(retry_delay) + else: + print(f"Giving up after {max_retries} attempts") + return releases + except Exception as e: + print(f"Request exception (attempt {attempt}/{max_retries}): {e}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry...") + time.sleep(retry_delay) + else: + print(f"Giving up after {max_retries} attempts") + return releases + + data = response.json() + if not data: + break + for release in data: + if re.match(PATTERN, release["tag_name"]): + releases.append(release) + page += 1 + return releases + + +def download_asset(asset_url: str, file_path: str) -> bool: + """Download a single release asset.""" + os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True) + + if os.path.exists(file_path): + print(f"[SKIP] {file_path} already downloaded.") + return True + + print(f"Downloading {asset_url}...") + try: + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(40) # 40-second timeout + + response = requests.get(asset_url, headers=HEADERS, stream=True) + signal.alarm(0) + + if response.status_code == 200: + with open(file_path, "wb") as file: + for chunk in response.iter_content(chunk_size=8192): + file.write(chunk) + print(f"Saved {file_path}") + return True + else: + print(f"Failed to download {asset_url}: {response.status_code} {response.reason}") + return False + except DownloadTimeoutException as e: + print(f"Download aborted for {asset_url}: {e}") + return False + except Exception as e: + print(f"An error occurred while downloading {asset_url}: {e}") + return False + + +def extract_split_archive(file_paths: list, extract_dir: str) -> bool: + """ + Extracts a split archive by concatenating the parts using 'cat' + and then extracting with 'tar' in one pipeline. + """ + if os.path.isdir(extract_dir): + print(f"[SKIP] Extraction directory already exists: {extract_dir}") + return True + + def sort_key(path: str): + base = os.path.basename(path) + parts = base.rsplit('.', maxsplit=1) + if len(parts) == 2: + suffix = parts[1] + if suffix.isdigit(): + return (0, int(suffix)) + if re.fullmatch(r'[a-zA-Z]+', suffix): + return (1, suffix) + return (2, base) + + file_paths = sorted(file_paths, key=sort_key) + os.makedirs(extract_dir, exist_ok=True) + + try: + cat_proc = subprocess.Popen( + ["cat"] + file_paths, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL + ) + tar_cmd = ["tar", "xf", "-", "-C", extract_dir, "--strip-components=1"] + subprocess.run( + tar_cmd, + stdin=cat_proc.stdout, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True + ) + cat_proc.stdout.close() + cat_proc.wait() + + print(f"Successfully extracted archive to {extract_dir}") + return True + except subprocess.CalledProcessError as e: + print(f"Failed to extract split archive: {e}") + return False + + +# ============================================================================ +# Trace File Processing (with alt_baro/on_ground handling) +# ============================================================================ + +ALLOWED_DATA_SOURCE = {'', 'adsb.lol', 'adsbexchange', 'airplanes.live'} + + +def process_file(filepath: str) -> list: + """ + Process a single trace file and return list of rows. + Handles alt_baro/on_ground: if altitude == "ground", on_ground=True and alt_baro=None. + """ + insert_rows = [] + with gzip.open(filepath, 'rb') as f: + data = orjson.loads(f.read()) + icao = data.get('icao', None) + if icao is None: + print(f"Skipping file {filepath} as it does not contain 'icao'") + return [] + + r = data.get('r', "") + t = data.get('t', "") + dbFlags = data.get('dbFlags', 0) + noRegData = data.get('noRegData', False) + ownOp = data.get('ownOp', "") + year = int(data.get('year', 0)) + timestamp = data.get('timestamp', None) + desc = data.get('desc', "") + trace_data = data.get('trace', None) + + if timestamp is None or trace_data is None: + print(f"Skipping file {filepath} as it does not contain 'timestamp' or 'trace'") + return [] + + for row in trace_data: + time_offset = row[0] + lat = row[1] + lon = row[2] + altitude = row[3] + + # Handle alt_baro/on_ground + alt_baro = None + on_ground = False + if type(altitude) is str and altitude == "ground": + on_ground = True + elif type(altitude) is int: + alt_baro = altitude + elif type(altitude) is float: + alt_baro = int(altitude) + + ground_speed = row[4] + track_degrees = row[5] + flags = row[6] + vertical_rate = row[7] + aircraft = row[8] + source = row[9] + data_source_value = "adsb.lol" if "adsb.lol" in ALLOWED_DATA_SOURCE else "" + geometric_altitude = row[10] + geometric_vertical_rate = row[11] + indicated_airspeed = row[12] + roll_angle = row[13] + + time_val = timestamp + time_offset + dt64 = dt.datetime.fromtimestamp(time_val, tz=dt.timezone.utc) + + # Prepare base fields + inserted_row = [ + dt64, icao, r, t, dbFlags, noRegData, ownOp, year, desc, + lat, lon, alt_baro, on_ground, ground_speed, track_degrees, + flags, vertical_rate + ] + next_part = [ + source, geometric_altitude, geometric_vertical_rate, + indicated_airspeed, roll_angle + ] + inserted_row.extend(next_part) + + if aircraft is None or type(aircraft) is not dict: + aircraft = dict() + + aircraft_data = { + 'alert': aircraft.get('alert', None), + 'alt_geom': aircraft.get('alt_geom', None), + 'gva': aircraft.get('gva', None), + 'nac_p': aircraft.get('nac_p', None), + 'nac_v': aircraft.get('nac_v', None), + 'nic': aircraft.get('nic', None), + 'nic_baro': aircraft.get('nic_baro', None), + 'rc': aircraft.get('rc', None), + 'sda': aircraft.get('sda', None), + 'sil': aircraft.get('sil', None), + 'sil_type': aircraft.get('sil_type', ""), + 'spi': aircraft.get('spi', None), + 'track': aircraft.get('track', None), + 'type': aircraft.get('type', ""), + 'version': aircraft.get('version', None), + 'category': aircraft.get('category', ''), + 'emergency': aircraft.get('emergency', ''), + 'flight': aircraft.get('flight', ""), + 'squawk': aircraft.get('squawk', ""), + 'baro_rate': aircraft.get('baro_rate', None), + 'nav_altitude_fms': aircraft.get('nav_altitude_fms', None), + 'nav_altitude_mcp': aircraft.get('nav_altitude_mcp', None), + 'nav_modes': aircraft.get('nav_modes', []), + 'nav_qnh': aircraft.get('nav_qnh', None), + 'geom_rate': aircraft.get('geom_rate', None), + 'ias': aircraft.get('ias', None), + 'mach': aircraft.get('mach', None), + 'mag_heading': aircraft.get('mag_heading', None), + 'oat': aircraft.get('oat', None), + 'roll': aircraft.get('roll', None), + 'tas': aircraft.get('tas', None), + 'tat': aircraft.get('tat', None), + 'true_heading': aircraft.get('true_heading', None), + 'wd': aircraft.get('wd', None), + 'ws': aircraft.get('ws', None), + 'track_rate': aircraft.get('track_rate', None), + 'nav_heading': aircraft.get('nav_heading', None) + } + + aircraft_list = list(aircraft_data.values()) + inserted_row.extend(aircraft_list) + inserted_row.append(data_source_value) + + insert_rows.append(inserted_row) + + if insert_rows: + print(f"Got {len(insert_rows)} rows from {filepath}") + return insert_rows + else: + return [] + + +# ============================================================================ +# Parquet Writing +# ============================================================================ + +# Column names matching the order of data in inserted_row +COLUMNS = [ + "time", "icao", + "r", "t", "dbFlags", "noRegData", "ownOp", "year", "desc", + "lat", "lon", "alt_baro", "on_ground", "ground_speed", "track_degrees", + "flags", "vertical_rate", "source", "geometric_altitude", + "geometric_vertical_rate", "indicated_airspeed", "roll_angle", + "aircraft_alert", "aircraft_alt_geom", "aircraft_gva", "aircraft_nac_p", + "aircraft_nac_v", "aircraft_nic", "aircraft_nic_baro", "aircraft_rc", + "aircraft_sda", "aircraft_sil", "aircraft_sil_type", "aircraft_spi", + "aircraft_track", "aircraft_type", "aircraft_version", "aircraft_category", + "aircraft_emergency", "aircraft_flight", "aircraft_squawk", + "aircraft_baro_rate", "aircraft_nav_altitude_fms", "aircraft_nav_altitude_mcp", + "aircraft_nav_modes", "aircraft_nav_qnh", "aircraft_geom_rate", + "aircraft_ias", "aircraft_mach", "aircraft_mag_heading", "aircraft_oat", + "aircraft_roll", "aircraft_tas", "aircraft_tat", "aircraft_true_heading", + "aircraft_wd", "aircraft_ws", "aircraft_track_rate", "aircraft_nav_heading", + "data_source", +] + + +OS_CPU_COUNT = os.cpu_count() or 1 +MAX_WORKERS = OS_CPU_COUNT if OS_CPU_COUNT > 4 else 1 +CHUNK_SIZE = MAX_WORKERS * 1000 +BATCH_SIZE = (os.cpu_count() or 1) * 100000 + +# PyArrow schema for efficient Parquet writing +PARQUET_SCHEMA = pa.schema([ + ("time", pa.timestamp("ms", tz="UTC")), + ("icao", pa.string()), + ("r", pa.string()), + ("t", pa.string()), + ("dbFlags", pa.int32()), + ("noRegData", pa.bool_()), + ("ownOp", pa.string()), + ("year", pa.uint16()), + ("desc", pa.string()), + ("lat", pa.float64()), + ("lon", pa.float64()), + ("alt_baro", pa.int32()), + ("on_ground", pa.bool_()), + ("ground_speed", pa.float32()), + ("track_degrees", pa.float32()), + ("flags", pa.uint32()), + ("vertical_rate", pa.int32()), + ("source", pa.string()), + ("geometric_altitude", pa.int32()), + ("geometric_vertical_rate", pa.int32()), + ("indicated_airspeed", pa.int32()), + ("roll_angle", pa.float32()), + ("aircraft_alert", pa.int64()), + ("aircraft_alt_geom", pa.int64()), + ("aircraft_gva", pa.int64()), + ("aircraft_nac_p", pa.int64()), + ("aircraft_nac_v", pa.int64()), + ("aircraft_nic", pa.int64()), + ("aircraft_nic_baro", pa.int64()), + ("aircraft_rc", pa.int64()), + ("aircraft_sda", pa.int64()), + ("aircraft_sil", pa.int64()), + ("aircraft_sil_type", pa.string()), + ("aircraft_spi", pa.int64()), + ("aircraft_track", pa.float64()), + ("aircraft_type", pa.string()), + ("aircraft_version", pa.int64()), + ("aircraft_category", pa.string()), + ("aircraft_emergency", pa.string()), + ("aircraft_flight", pa.string()), + ("aircraft_squawk", pa.string()), + ("aircraft_baro_rate", pa.int64()), + ("aircraft_nav_altitude_fms", pa.int64()), + ("aircraft_nav_altitude_mcp", pa.int64()), + ("aircraft_nav_modes", pa.list_(pa.string())), + ("aircraft_nav_qnh", pa.float64()), + ("aircraft_geom_rate", pa.int64()), + ("aircraft_ias", pa.int64()), + ("aircraft_mach", pa.float64()), + ("aircraft_mag_heading", pa.float64()), + ("aircraft_oat", pa.int64()), + ("aircraft_roll", pa.float64()), + ("aircraft_tas", pa.int64()), + ("aircraft_tat", pa.int64()), + ("aircraft_true_heading", pa.float64()), + ("aircraft_wd", pa.int64()), + ("aircraft_ws", pa.int64()), + ("aircraft_track_rate", pa.float64()), + ("aircraft_nav_heading", pa.float64()), + ("data_source", pa.string()), +]) + + +def collect_trace_files_with_find(root_dir): + """Find all trace_full_*.json files in the extracted directory.""" + trace_dict: dict[str, str] = {} + cmd = ['find', root_dir, '-type', 'f', '-name', 'trace_full_*.json'] + + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + if result.returncode != 0: + print(f"Error executing find: {result.stderr}") + return trace_dict + + for file_path in result.stdout.strip().split('\n'): + if file_path: + filename = os.path.basename(file_path) + if filename.startswith("trace_full_") and filename.endswith(".json"): + icao = filename[len("trace_full_"):-len(".json")] + trace_dict[icao] = file_path + + return trace_dict + + +def generate_version_dates(start_date: str, end_date: str) -> list: + """Generate a list of dates from start_date to end_date inclusive.""" + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + delta = end - start + return [start + timedelta(days=i) for i in range(delta.days + 1)] + + +def safe_process(fp): + """Safely process a file, returning empty list on error.""" + try: + return process_file(fp) + except Exception as e: + logging.error(f"Error processing {fp}: {e}") + return [] + + +def rows_to_dataframe(rows: list) -> pd.DataFrame: + """Convert list of rows to a pandas DataFrame.""" + df = pd.DataFrame(rows, columns=COLUMNS) + return df + + +def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int): + """Write a batch of rows to a Parquet file.""" + if not rows: + return + + df = rows_to_dataframe(rows) + + # Ensure datetime column is timezone-aware + if not df['time'].dt.tz: + df['time'] = df['time'].dt.tz_localize('UTC') + + parquet_path = os.path.join(PARQUET_DIR, f"{version_date}_batch_{batch_idx:04d}.parquet") + + # Convert to PyArrow table and write + table = pa.Table.from_pandas(df, schema=PARQUET_SCHEMA, preserve_index=False) + pq.write_table(table, parquet_path, compression='snappy') + + print(f"Written parquet batch {batch_idx} ({len(rows)} rows) to {parquet_path}") + + +def merge_parquet_files(version_date: str, delete_batches: bool = True): + """Merge all batch parquet files for a version_date into a single file.""" + pattern = os.path.join(PARQUET_DIR, f"{version_date}_batch_*.parquet") + batch_files = sorted(glob.glob(pattern)) + + if not batch_files: + print(f"No batch files found for {version_date}") + return None + + print(f"Merging {len(batch_files)} batch files for {version_date}...") + + # Read all batch files + tables = [] + for f in batch_files: + tables.append(pq.read_table(f)) + + # Concatenate all tables + merged_table = pa.concat_tables(tables) + + # Write merged file + merged_path = os.path.join(PARQUET_DIR, f"{version_date}.parquet") + pq.write_table(merged_table, merged_path, compression='snappy') + + print(f"Merged parquet file written to {merged_path} ({merged_table.num_rows} total rows)") + + # Optionally delete batch files + if delete_batches: + for f in batch_files: + os.remove(f) + print(f"Deleted {len(batch_files)} batch files") + + return merged_path + + +def process_version_date(version_date: str, keep_folders: bool = False): + """Download, extract, and process trace files for a single version date.""" + print(f"\nProcessing version_date: {version_date}") + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + + def collect_trace_files_for_version_date(vd): + releases = fetch_releases(vd) + if len(releases) == 0: + print(f"No releases found for {vd}.") + return None + + downloaded_files = [] + for release in releases: + tag_name = release["tag_name"] + print(f"Processing release: {tag_name}") + + # Only download prod-0 if available, else prod-0tmp + assets = release.get("assets", []) + normal_assets = [ + a for a in assets + if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"] + ] + tmp_assets = [ + a for a in assets + if "planes-readsb-prod-0tmp" in a["name"] + ] + use_assets = normal_assets if normal_assets else tmp_assets + + for asset in use_assets: + asset_name = asset["name"] + asset_url = asset["browser_download_url"] + file_path = os.path.join(OUTPUT_DIR, asset_name) + result = download_asset(asset_url, file_path) + if result: + downloaded_files.append(file_path) + + extract_split_archive(downloaded_files, extract_dir) + return collect_trace_files_with_find(extract_dir) + + # Check if files already exist + pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*") + matches = [p for p in glob.glob(pattern) if os.path.isfile(p)] + + if matches: + print(f"Found existing files for {version_date}:") + # Prefer non-tmp slices when reusing existing files + normal_matches = [ + p for p in matches + if "-planes-readsb-prod-0." in os.path.basename(p) + and "tmp" not in os.path.basename(p) + ] + downloaded_files = normal_matches if normal_matches else matches + + extract_split_archive(downloaded_files, extract_dir) + trace_files = collect_trace_files_with_find(extract_dir) + else: + trace_files = collect_trace_files_for_version_date(version_date) + + if trace_files is None or len(trace_files) == 0: + print(f"No trace files found for version_date: {version_date}") + return 0 + + file_list = list(trace_files.values()) + + start_time = time.perf_counter() + total_num_rows = 0 + batch_rows = [] + batch_idx = 0 + + # Process files in chunks + for offset in range(0, len(file_list), CHUNK_SIZE): + chunk = file_list[offset:offset + CHUNK_SIZE] + with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as process_executor: + for rows in process_executor.map(safe_process, chunk): + if not rows: + continue + batch_rows.extend(rows) + + if len(batch_rows) >= BATCH_SIZE: + total_num_rows += len(batch_rows) + write_batch_to_parquet(batch_rows, version_date, batch_idx) + batch_idx += 1 + batch_rows = [] + + elapsed = time.perf_counter() - start_time + speed = total_num_rows / elapsed if elapsed > 0 else 0 + print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)") + + gc.collect() + + # Final batch + if batch_rows: + total_num_rows += len(batch_rows) + write_batch_to_parquet(batch_rows, version_date, batch_idx) + elapsed = time.perf_counter() - start_time + speed = total_num_rows / elapsed if elapsed > 0 else 0 + print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)") + + print(f"Total rows processed for version_date {version_date}: {total_num_rows}") + + # Merge batch files into a single parquet file + merge_parquet_files(version_date, delete_batches=True) + + # Clean up extracted directory if not keeping + if not keep_folders and os.path.isdir(extract_dir): + import shutil + shutil.rmtree(extract_dir) + print(f"Cleaned up extraction directory: {extract_dir}") + + return total_num_rows + + +def create_parquet_for_day(day, keep_folders: bool = False): + """Create parquet file for a single day. + + Args: + day: datetime object or string in 'YYYY-MM-DD' format + keep_folders: Whether to keep extracted folders after processing + + Returns: + Path to the created parquet file, or None if failed + """ + from pathlib import Path + + if isinstance(day, str): + day = datetime.strptime(day, "%Y-%m-%d") + + version_date = f"v{day.strftime('%Y.%m.%d')}" + + # Check if parquet already exists + parquet_path = Path(PARQUET_DIR) / f"{version_date}.parquet" + if parquet_path.exists(): + print(f"Parquet file already exists: {parquet_path}") + return parquet_path + + print(f"Creating parquet for {version_date}...") + rows_processed = process_version_date(version_date, keep_folders) + + if rows_processed > 0 and parquet_path.exists(): + return parquet_path + else: + return None + + +def main(start_date: str, end_date: str, keep_folders: bool = False): + """Main function to download and convert adsb.lol data to Parquet.""" + version_dates = [f"v{date.strftime('%Y.%m.%d')}" for date in generate_version_dates(start_date, end_date)] + print(f"Processing dates: {version_dates}") + + total_rows_all = 0 + for version_date in version_dates: + rows_processed = process_version_date(version_date, keep_folders) + total_rows_all += rows_processed + + print(f"\n=== Summary ===") + print(f"Total dates processed: {len(version_dates)}") + print(f"Total rows written to Parquet: {total_rows_all}") + print(f"Parquet files location: {PARQUET_DIR}") + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, stream=sys.stdout, force=True) + + parser = argparse.ArgumentParser( + description="Download adsb.lol data and write to Parquet files" + ) + parser.add_argument("start_date", type=str, help="Start date in YYYY-MM-DD format") + parser.add_argument("end_date", type=str, help="End date in YYYY-MM-DD format") + parser.add_argument("--keep-folders", action="store_true", + help="Keep extracted folders after processing") + + args = parser.parse_args() + + main(args.start_date, args.end_date, args.keep_folders) diff --git a/src/adsb/reducer.py b/src/adsb/reducer.py new file mode 100644 index 0000000..1bdd85c --- /dev/null +++ b/src/adsb/reducer.py @@ -0,0 +1,97 @@ +""" +Reduce step: downloads all chunk CSVs from S3, combines them, +deduplicates across the full dataset, and uploads the final result. + +Environment variables: + S3_BUCKET — bucket with intermediate results + RUN_ID — run identifier matching the map workers + GLOBAL_START_DATE — overall start date for output filename + GLOBAL_END_DATE — overall end date for output filename +""" +import os +from pathlib import Path + +import boto3 +import pandas as pd + + +COLUMNS = ["dbFlags", "ownOp", "year", "desc", "aircraft_category", "r", "t"] + + +def deduplicate_by_signature(df: pd.DataFrame) -> pd.DataFrame: + """For each icao, keep only the earliest row with each unique signature.""" + df["_signature"] = df[COLUMNS].astype(str).agg("|".join, axis=1) + df_deduped = df.groupby(["icao", "_signature"], as_index=False).first() + df_deduped = df_deduped.drop(columns=["_signature"]) + df_deduped = df_deduped.sort_values("time") + return df_deduped + + +def main(): + s3_bucket = os.environ["S3_BUCKET"] + run_id = os.environ.get("RUN_ID", "default") + global_start = os.environ["GLOBAL_START_DATE"] + global_end = os.environ["GLOBAL_END_DATE"] + + s3 = boto3.client("s3") + prefix = f"intermediate/{run_id}/" + + # List all chunk files for this run + paginator = s3.get_paginator("list_objects_v2") + chunk_keys = [] + for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix): + for obj in page.get("Contents", []): + if obj["Key"].endswith(".csv.gz"): + chunk_keys.append(obj["Key"]) + + chunk_keys.sort() + print(f"Found {len(chunk_keys)} chunks to combine") + + if not chunk_keys: + print("No chunks found — nothing to reduce.") + return + + # Download and concatenate all chunks + download_dir = Path("/tmp/chunks") + download_dir.mkdir(parents=True, exist_ok=True) + + df_accumulated = pd.DataFrame() + + for key in chunk_keys: + local_path = download_dir / Path(key).name + print(f"Downloading {key}...") + s3.download_file(s3_bucket, key, str(local_path)) + + df_chunk = pd.read_csv(local_path, compression="gzip", keep_default_na=False) + print(f" Loaded {len(df_chunk)} rows from {local_path.name}") + + if df_accumulated.empty: + df_accumulated = df_chunk + else: + df_accumulated = pd.concat( + [df_accumulated, df_chunk], ignore_index=True + ) + + # Free disk space after loading + local_path.unlink() + + print(f"Combined: {len(df_accumulated)} rows before dedup") + + # Final global deduplication + df_accumulated = deduplicate_by_signature(df_accumulated) + print(f"After dedup: {len(df_accumulated)} rows") + + # Write and upload final result + output_name = f"planequery_aircraft_adsb_{global_start}_{global_end}.csv.gz" + local_output = Path(f"/tmp/{output_name}") + df_accumulated.to_csv(local_output, index=False, compression="gzip") + + final_key = f"final/{output_name}" + print(f"Uploading to s3://{s3_bucket}/{final_key}") + s3.upload_file(str(local_output), s3_bucket, final_key) + + print(f"Final output: {len(df_accumulated)} records -> {final_key}") + + +if __name__ == "__main__": + main() diff --git a/src/adsb/requirements.reducer.txt b/src/adsb/requirements.reducer.txt new file mode 100644 index 0000000..8aafff3 --- /dev/null +++ b/src/adsb/requirements.reducer.txt @@ -0,0 +1,2 @@ +pandas>=2.0 +boto3>=1.34 diff --git a/src/adsb/requirements.worker.txt b/src/adsb/requirements.worker.txt new file mode 100644 index 0000000..9566c25 --- /dev/null +++ b/src/adsb/requirements.worker.txt @@ -0,0 +1,4 @@ +pandas>=2.0 +clickhouse-connect>=0.7 +boto3>=1.34 +zstandard>=0.22 diff --git a/src/adsb/worker.py b/src/adsb/worker.py new file mode 100644 index 0000000..ad2b09b --- /dev/null +++ b/src/adsb/worker.py @@ -0,0 +1,94 @@ +""" +Map worker: processes a date range chunk, uploads result to S3. + +Environment variables: + START_DATE — inclusive, YYYY-MM-DD + END_DATE — exclusive, YYYY-MM-DD + S3_BUCKET — bucket for intermediate results + RUN_ID — unique run identifier for namespacing S3 keys +""" +import os +import sys +from datetime import datetime, timedelta +from pathlib import Path + +import boto3 +import pandas as pd + +from compress_adsb_to_aircraft_data import load_historical_for_day, COLUMNS + + +def deduplicate_by_signature(df: pd.DataFrame) -> pd.DataFrame: + """For each icao, keep only the earliest row with each unique signature.""" + df["_signature"] = df[COLUMNS].astype(str).agg("|".join, axis=1) + df_deduped = df.groupby(["icao", "_signature"], as_index=False).first() + df_deduped = df_deduped.drop(columns=["_signature"]) + df_deduped = df_deduped.sort_values("time") + return df_deduped + + +def main(): + start_date_str = os.environ["START_DATE"] + end_date_str = os.environ["END_DATE"] + s3_bucket = os.environ["S3_BUCKET"] + run_id = os.environ.get("RUN_ID", "default") + + start_date = datetime.strptime(start_date_str, "%Y-%m-%d") + end_date = datetime.strptime(end_date_str, "%Y-%m-%d") + + total_days = (end_date - start_date).days + print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})") + + df_accumulated = pd.DataFrame() + current_date = start_date + + while current_date < end_date: + day_str = current_date.strftime("%Y-%m-%d") + print(f" Loading {day_str}...") + + try: + df_compressed = load_historical_for_day(current_date) + except Exception as e: + print(f" WARNING: Failed to load {day_str}: {e}") + current_date += timedelta(days=1) + continue + + if df_accumulated.empty: + df_accumulated = df_compressed + else: + df_accumulated = pd.concat( + [df_accumulated, df_compressed], ignore_index=True + ) + + print(f" +{len(df_compressed)} rows (total: {len(df_accumulated)})") + + # Delete local cache after each day to save disk in container + cache_dir = Path("data/adsb") + if cache_dir.exists(): + import shutil + shutil.rmtree(cache_dir) + + current_date += timedelta(days=1) + + if df_accumulated.empty: + print("No data collected — exiting.") + return + + # Deduplicate within this chunk + df_accumulated = deduplicate_by_signature(df_accumulated) + print(f"After dedup: {len(df_accumulated)} rows") + + # Write to local file then upload to S3 + local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz") + df_accumulated.to_csv(local_path, index=False, compression="gzip") + + s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz" + print(f"Uploading to s3://{s3_bucket}/{s3_key}") + + s3 = boto3.client("s3") + s3.upload_file(str(local_path), s3_bucket, s3_key) + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/src/concat_csvs.py b/src/concat_csvs.py deleted file mode 100644 index ce6780e..0000000 --- a/src/concat_csvs.py +++ /dev/null @@ -1,89 +0,0 @@ -from pathlib import Path -import pandas as pd -import re -from derive_from_faa_master_txt import concat_faa_historical_df - -def concatenate_aircraft_csvs( - input_dir: Path = Path("data/concat"), - output_dir: Path = Path("data/planequery_aircraft"), - filename_pattern: str = r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv" -): - """ - Read all CSVs matching the pattern from input_dir in order, - concatenate them using concat_faa_historical_df, and output a single CSV. - - Args: - input_dir: Directory containing the CSV files to concatenate - output_dir: Directory where the output CSV will be saved - filename_pattern: Regex pattern to match CSV filenames - """ - input_dir = Path(input_dir) - output_dir = Path(output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - - # Find all matching CSV files - pattern = re.compile(filename_pattern) - csv_files = [] - - for csv_path in sorted(input_dir.glob("*.csv")): - match = pattern.search(csv_path.name) - if match: - start_date = match.group(1) - end_date = match.group(2) - csv_files.append((start_date, end_date, csv_path)) - - # Sort by start date, then end date - csv_files.sort(key=lambda x: (x[0], x[1])) - - if not csv_files: - raise FileNotFoundError(f"No CSV files matching pattern found in {input_dir}") - - print(f"Found {len(csv_files)} CSV files to concatenate") - - # Read first CSV as base - first_start_date, first_end_date, first_path = csv_files[0] - print(f"Reading base file: {first_path.name}") - df_base = pd.read_csv( - first_path, - dtype={ - 'transponder_code': str, - 'unique_regulatory_id': str, - 'registrant_county': str - } - ) - - # Concatenate remaining CSVs - for start_date, end_date, csv_path in csv_files[1:]: - print(f"Concatenating: {csv_path.name}") - df_new = pd.read_csv( - csv_path, - dtype={ - 'transponder_code': str, - 'unique_regulatory_id': str, - 'registrant_county': str - } - ) - df_base = concat_faa_historical_df(df_base, df_new) - - # Verify monotonic increasing download_date - assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" - - # Output filename uses first start date and last end date - last_start_date, last_end_date, _ = csv_files[-1] - output_filename = f"planequery_aircraft_{first_start_date}_{last_end_date}.csv" - output_path = output_dir / output_filename - - print(f"Writing output to: {output_path}") - df_base.to_csv(output_path, index=False) - print(f"Successfully concatenated {len(csv_files)} files into {output_filename}") - print(f"Total rows: {len(df_base)}") - - return output_path - - -if __name__ == "__main__": - # Example usage - modify these paths as needed - concatenate_aircraft_csvs( - input_dir=Path("data/concat"), - output_dir=Path("data/planequery_aircraft") - ) \ No newline at end of file diff --git a/src/create_daily_planequery_aircraft_adsb_release.py b/src/create_daily_planequery_aircraft_adsb_release.py new file mode 100644 index 0000000..faa6b6d --- /dev/null +++ b/src/create_daily_planequery_aircraft_adsb_release.py @@ -0,0 +1,64 @@ +from pathlib import Path +from datetime import datetime, timezone, timedelta +import sys + +# Add adsb directory to path +sys.path.insert(0, str(Path(__file__).parent / "adsb")) # TODO: Fix this hacky path manipulation + +from adsb.compress_adsb_to_aircraft_data import ( + load_historical_for_day, + concat_compressed_dfs, + get_latest_aircraft_adsb_csv_df, +) + +if __name__ == '__main__': + # Get yesterday's date (data for the previous day) + day = datetime.now(timezone.utc) - timedelta(days=1) + + # Find a day with complete data + max_attempts = 2 # Don't look back more than a week + for attempt in range(max_attempts): + date_str = day.strftime("%Y-%m-%d") + print(f"Processing ADS-B data for {date_str}") + + print("Loading new ADS-B data...") + df_new = load_historical_for_day(day) + if df_new.empty: + day = day - timedelta(days=1) + continue + max_time = df_new['time'].max() + max_time = max_time.replace(tzinfo=timezone.utc) + + if max_time >= day.replace(hour=23, minute=59, second=59) - timedelta(minutes=5): + # Data is complete + break + + print(f"WARNING: Latest data time is {max_time}, which is more than 5 minutes before end of day.") + day = day - timedelta(days=1) + else: + raise RuntimeError(f"Could not find complete data in the last {max_attempts} days") + + try: + # Get the latest release data + print("Downloading latest ADS-B release...") + df_base, start_date_str = get_latest_aircraft_adsb_csv_df() + # Combine with historical data + print("Combining with historical data...") + df_combined = concat_compressed_dfs(df_base, df_new) + except Exception as e: + print(f"Error downloading latest ADS-B release: {e}") + df_combined = df_new + start_date_str = date_str + + # Sort by time for consistent ordering + df_combined = df_combined.sort_values('time').reset_index(drop=True) + + # Save the result + OUT_ROOT = Path("data/planequery_aircraft") + OUT_ROOT.mkdir(parents=True, exist_ok=True) + + output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{date_str}.csv" + df_combined.to_csv(output_file, index=False) + + print(f"Saved: {output_file}") + print(f"Total aircraft: {len(df_combined)}") diff --git a/src/create_daily_planequery_aircraft_release.py b/src/create_daily_planequery_aircraft_release.py index 4019aa7..559f8fc 100644 --- a/src/create_daily_planequery_aircraft_release.py +++ b/src/create_daily_planequery_aircraft_release.py @@ -25,9 +25,9 @@ if not zip_path.exists(): OUT_ROOT = Path("data/planequery_aircraft") OUT_ROOT.mkdir(parents=True, exist_ok=True) from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df -from get_latest_planequery_aircraft_release import get_latest_aircraft_csv_df +from get_latest_planequery_aircraft_release import get_latest_aircraft_faa_csv_df df_new = convert_faa_master_txt_to_df(zip_path, date_str) -df_base, start_date_str = get_latest_aircraft_csv_df() +df_base, start_date_str = get_latest_aircraft_faa_csv_df() df_base = concat_faa_historical_df(df_base, df_new) assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" -df_base.to_csv(OUT_ROOT / f"planequery_aircraft_{start_date_str}_{date_str}.csv", index=False) \ No newline at end of file +df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date_str}_{date_str}.csv", index=False) \ No newline at end of file diff --git a/src/get_historical_faa.py b/src/get_historical_faa.py index 51cdcd4..656345e 100644 --- a/src/get_historical_faa.py +++ b/src/get_historical_faa.py @@ -112,5 +112,5 @@ for date, sha in date_to_sha.items(): print(len(df_base), "total entries so far") assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" -df_base.to_csv(OUT_ROOT / f"planequery_aircraft_{start_date}_{end_date}.csv", index=False) +df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date}_{end_date}.csv", index=False) # TODO: get average number of new rows per day. diff --git a/src/get_latest_planequery_aircraft_release.py b/src/get_latest_planequery_aircraft_release.py index c264250..9867a5d 100644 --- a/src/get_latest_planequery_aircraft_release.py +++ b/src/get_latest_planequery_aircraft_release.py @@ -109,7 +109,7 @@ def download_latest_aircraft_csv( repo: str = REPO, ) -> Path: """ - Download the latest planequery_aircraft_*.csv file from the latest GitHub release. + Download the latest planequery_aircraft_faa_*.csv file from the latest GitHub release. Args: output_dir: Directory to save the downloaded file (default: "downloads") @@ -120,25 +120,70 @@ def download_latest_aircraft_csv( Path to the downloaded file """ assets = get_latest_release_assets(repo, github_token=github_token) - asset = pick_asset(assets, name_regex=r"^planequery_aircraft_.*\.csv$") + try: + asset = pick_asset(assets, name_regex=r"^planequery_aircraft_faa_.*\.csv$") + except FileNotFoundError: + # Fallback to old naming pattern + asset = pick_asset(assets, name_regex=r"^planequery_aircraft_\d{4}-\d{2}-\d{2}_.*\.csv$") saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") return saved_to -def get_latest_aircraft_csv_df(): +def get_latest_aircraft_faa_csv_df(): csv_path = download_latest_aircraft_csv() import pandas as pd df = pd.read_csv(csv_path, dtype={'transponder_code': str, 'unique_regulatory_id': str, 'registrant_county': str}) df = df.fillna("") - # Extract date from filename pattern: planequery_aircraft_{date}_{date}.csv - match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + # Extract start date from filename pattern: planequery_aircraft_faa_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_faa_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + if not match: + # Fallback to old naming pattern: planequery_aircraft_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path)) if not match: raise ValueError(f"Could not extract date from filename: {csv_path.name}") date_str = match.group(1) return df, date_str + +def download_latest_aircraft_adsb_csv( + output_dir: Path = Path("downloads"), + github_token: Optional[str] = None, + repo: str = REPO, +) -> Path: + """ + Download the latest planequery_aircraft_adsb_*.csv file from the latest GitHub release. + + Args: + output_dir: Directory to save the downloaded file (default: "downloads") + github_token: Optional GitHub token for authentication + repo: GitHub repository in format "owner/repo" (default: REPO) + + Returns: + Path to the downloaded file + """ + assets = get_latest_release_assets(repo, github_token=github_token) + asset = pick_asset(assets, name_regex=r"^planequery_aircraft_adsb_.*\.csv$") + saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) + print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") + return saved_to + + +def get_latest_aircraft_adsb_csv_df(): + csv_path = download_latest_aircraft_adsb_csv() + import pandas as pd + df = pd.read_csv(csv_path) + df = df.fillna("") + # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + if not match: + raise ValueError(f"Could not extract date from filename: {csv_path.name}") + + date_str = match.group(1) + return df, date_str + + if __name__ == "__main__": download_latest_aircraft_csv() diff --git a/trigger_pipeline.py b/trigger_pipeline.py new file mode 100644 index 0000000..d386b25 --- /dev/null +++ b/trigger_pipeline.py @@ -0,0 +1,97 @@ +""" +Generate Step Functions input and start the pipeline. + +Usage: + python trigger_pipeline.py 2024-01-01 2025-01-01 + python trigger_pipeline.py 2024-01-01 2025-01-01 --chunk-days 30 + python trigger_pipeline.py 2024-01-01 2025-01-01 --dry-run +""" +import argparse +import json +import os +import uuid +from datetime import datetime, timedelta + +import boto3 + + +def generate_chunks(start_date: str, end_date: str, chunk_days: int = 1): + """Split a date range into chunks of chunk_days.""" + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + + chunks = [] + current = start + while current < end: + chunk_end = min(current + timedelta(days=chunk_days), end) + chunks.append({ + "start_date": current.strftime("%Y-%m-%d"), + "end_date": chunk_end.strftime("%Y-%m-%d"), + }) + current = chunk_end + + return chunks + + +def main(): + parser = argparse.ArgumentParser(description="Trigger ADS-B map-reduce pipeline") + parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)") + parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)") + parser.add_argument("--chunk-days", type=int, default=1, + help="Days per chunk (default: 1)") + parser.add_argument("--dry-run", action="store_true", + help="Print input JSON without starting execution") + args = parser.parse_args() + + run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}" + chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days) + + clickhouse_host = os.environ["CLICKHOUSE_HOST"] + clickhouse_username = os.environ["CLICKHOUSE_USERNAME"] + clickhouse_password = os.environ["CLICKHOUSE_PASSWORD"] + + # Inject run_id and ClickHouse credentials into each chunk + for chunk in chunks: + chunk["run_id"] = run_id + chunk["clickhouse_host"] = clickhouse_host + chunk["clickhouse_username"] = clickhouse_username + chunk["clickhouse_password"] = clickhouse_password + + sfn_input = { + "run_id": run_id, + "global_start_date": args.start_date, + "global_end_date": args.end_date, + "chunks": chunks, + } + + print(f"Run ID: {run_id}") + print(f"Chunks: {len(chunks)} (at {args.chunk_days} days each)") + print(f"Max concurrency: 3 (enforced by Step Functions Map state)") + print() + print(json.dumps(sfn_input, indent=2)) + + if args.dry_run: + print("\n--dry-run: not starting execution") + return + + client = boto3.client("stepfunctions") + + # Find the state machine ARN + machines = client.list_state_machines()["stateMachines"] + arn = next( + m["stateMachineArn"] + for m in machines + if m["name"] == "adsb-map-reduce" + ) + + response = client.start_execution( + stateMachineArn=arn, + name=run_id, + input=json.dumps(sfn_input), + ) + + print(f"\nStarted execution: {response['executionArn']}") + + +if __name__ == "__main__": + main()