FEATURE: add historical adsb aircraft data and update daily adsb aircraft data derivation.

add clickhouse_connect

use 32GB

update to no longer do df.copy()

Add planequery_adsb_read.ipynb

INCREASE: update Fargate task definition to 16 vCPU and 64 GB memory for improved performance on large datasets

update notebook

remove print(df)

Ensure empty strings are preserved in DataFrame columns

check if day has data for adsb

update notebook
This commit is contained in:
ggman12
2026-02-09 18:58:49 -05:00
parent b94bfdc575
commit 27da93801e
24 changed files with 2387 additions and 115 deletions
@@ -2,27 +2,54 @@ name: planequery-aircraft Daily Release
on:
schedule:
# 6:00pm UTC every day
# 6:00pm UTC every day - runs on default branch, triggers both
- cron: "0 06 * * *"
workflow_dispatch: {}
workflow_dispatch:
permissions:
contents: write
jobs:
build-and-release:
trigger-releases:
runs-on: ubuntu-latest
if: github.event_name == 'schedule'
steps:
- name: Trigger main branch release
uses: actions/github-script@v7
with:
script: |
await github.rest.actions.createWorkflowDispatch({
owner: context.repo.owner,
repo: context.repo.repo,
workflow_id: 'planequery-aircraft-daily-release.yaml',
ref: 'main'
});
- name: Trigger develop branch release
uses: actions/github-script@v7
with:
script: |
await github.rest.actions.createWorkflowDispatch({
owner: context.repo.owner,
repo: context.repo.repo,
workflow_id: 'planequery-aircraft-daily-release.yaml',
ref: 'develop'
});
build-and-release:
runs-on: ubuntu-24.04-arm
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.12"
python-version: "3.14"
- name: Install dependencies
run: |
@@ -30,8 +57,13 @@ jobs:
pip install -r requirements.txt
- name: Run daily release script
env:
CLICKHOUSE_HOST: ${{ secrets.CLICKHOUSE_HOST }}
CLICKHOUSE_USERNAME: ${{ secrets.CLICKHOUSE_USERNAME }}
CLICKHOUSE_PASSWORD: ${{ secrets.CLICKHOUSE_PASSWORD }}
run: |
python src/create_daily_planequery_aircraft_release.py
python src/create_daily_planequery_aircraft_adsb_release.py
ls -lah data/faa_releasable
ls -lah data/planequery_aircraft
@@ -39,15 +71,26 @@ jobs:
id: meta
run: |
DATE=$(date -u +"%Y-%m-%d")
TAG="planequery-aircraft-${DATE}"
# Find the CSV file in data/planequery_aircraft matching the pattern
CSV_FILE=$(ls data/planequery_aircraft/planequery_aircraft_*_${DATE}.csv | head -1)
CSV_BASENAME=$(basename "$CSV_FILE")
BRANCH_NAME="${GITHUB_REF#refs/heads/}"
BRANCH_SUFFIX=""
if [ "$BRANCH_NAME" = "main" ]; then
BRANCH_SUFFIX="-main"
elif [ "$BRANCH_NAME" = "develop" ]; then
BRANCH_SUFFIX="-develop"
fi
TAG="planequery-aircraft-${DATE}${BRANCH_SUFFIX}"
# Find the CSV files in data/planequery_aircraft matching the patterns
CSV_FILE_FAA=$(ls data/planequery_aircraft/planequery_aircraft_faa_*_${DATE}.csv | head -1)
CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA")
CSV_FILE_ADSB=$(ls data/planequery_aircraft/planequery_aircraft_adsb_*_${DATE}.csv | head -1)
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
echo "date=$DATE" >> "$GITHUB_OUTPUT"
echo "tag=$TAG" >> "$GITHUB_OUTPUT"
echo "csv_file=$CSV_FILE" >> "$GITHUB_OUTPUT"
echo "csv_basename=$CSV_BASENAME" >> "$GITHUB_OUTPUT"
echo "name=planequery-aircraft snapshot ($DATE)" >> "$GITHUB_OUTPUT"
echo "csv_file_faa=$CSV_FILE_FAA" >> "$GITHUB_OUTPUT"
echo "csv_basename_faa=$CSV_BASENAME_FAA" >> "$GITHUB_OUTPUT"
echo "csv_file_adsb=$CSV_FILE_ADSB" >> "$GITHUB_OUTPUT"
echo "csv_basename_adsb=$CSV_BASENAME_ADSB" >> "$GITHUB_OUTPUT"
echo "name=planequery-aircraft snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
- name: Create GitHub Release and upload assets
uses: softprops/action-gh-release@v2
@@ -58,10 +101,12 @@ jobs:
Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}.
Assets:
- ${{ steps.meta.outputs.csv_basename }}
- ${{ steps.meta.outputs.csv_basename_faa }}
- ${{ steps.meta.outputs.csv_basename_adsb }}
- ReleasableAircraft_${{ steps.meta.outputs.date }}.zip
files: |
${{ steps.meta.outputs.csv_file }}
${{ steps.meta.outputs.csv_file_faa }}
${{ steps.meta.outputs.csv_file_adsb }}
data/faa_releasable/ReleasableAircraft_${{ steps.meta.outputs.date }}.zip
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+64 -1
View File
@@ -218,4 +218,67 @@ __marimo__/
# Custom
data/
.DS_Store
notebooks/
# --- CDK ---
# VSCode extension
# Store launch config in repo but not settings
.vscode/settings.json
/.favorites.json
# TypeScript incremental build states
*.tsbuildinfo
# Local state files & OS specifics
.DS_Store
node_modules/
lerna-debug.log
dist/
pack/
.BUILD_COMPLETED
.local-npm/
.tools/
coverage/
.nyc_output
.nycrc
.LAST_BUILD
*.sw[a-z]
*~
.idea
*.iml
junit.xml
# We don't want tsconfig at the root
/tsconfig.json
# CDK Context & Staging files
cdk.context.json
.cdk.staging/
cdk.out/
*.tabl.json
cdk-integ.out.*/
# Yarn error log
yarn-error.log
# VSCode history plugin
.vscode/.history/
# Cloud9
.c9
.nzm-*
/.versionrc.json
RELEASE_NOTES.md
# Produced by integ tests
read*lock
# VSCode jest plugin
.test-output
# Nx cache
.nx/
# jsii-rosetta files
type-fingerprints.txt
+11
View File
@@ -0,0 +1,11 @@
#!/usr/bin/env python3
import os
import aws_cdk as cdk
from stack import AdsbProcessingStack
app = cdk.App()
AdsbProcessingStack(app, "AdsbProcessingStack", env=cdk.Environment(
account=os.environ["CDK_DEFAULT_ACCOUNT"],
region=os.environ["CDK_DEFAULT_REGION"],
))
app.synth()
+3
View File
@@ -0,0 +1,3 @@
{
"app": "python3 app.py"
}
+2
View File
@@ -0,0 +1,2 @@
aws-cdk-lib>=2.170.0
constructs>=10.0.0
+225
View File
@@ -0,0 +1,225 @@
import aws_cdk as cdk
from aws_cdk import (
Stack,
Duration,
RemovalPolicy,
aws_s3 as s3,
aws_ecs as ecs,
aws_ec2 as ec2,
aws_ecr_assets,
aws_iam as iam,
aws_logs as logs,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as sfn_tasks,
)
from constructs import Construct
from pathlib import Path
class AdsbProcessingStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs):
super().__init__(scope, id, **kwargs)
# --- S3 bucket for intermediate and final results ---
bucket = s3.Bucket(
self, "ResultsBucket",
bucket_name="planequery-aircraft-dev",
removal_policy=RemovalPolicy.DESTROY,
auto_delete_objects=True,
lifecycle_rules=[
s3.LifecycleRule(
prefix="intermediate/",
expiration=Duration.days(7),
)
],
)
# --- Use default VPC (no additional cost) ---
vpc = ec2.Vpc.from_lookup(
self, "Vpc",
is_default=True,
)
# --- ECS Cluster ---
cluster = ecs.Cluster(
self, "Cluster",
vpc=vpc,
container_insights=True,
)
# --- Log group ---
log_group = logs.LogGroup(
self, "LogGroup",
log_group_name="/adsb-processing",
removal_policy=RemovalPolicy.DESTROY,
retention=logs.RetentionDays.TWO_WEEKS,
)
# --- Docker images (built from local Dockerfiles) ---
adsb_dir = str(Path(__file__).parent.parent / "src" / "adsb")
worker_image = ecs.ContainerImage.from_asset(
adsb_dir,
file="Dockerfile.worker",
platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64,
)
reducer_image = ecs.ContainerImage.from_asset(
adsb_dir,
file="Dockerfile.reducer",
platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64,
)
# --- Task role (shared) ---
task_role = iam.Role(
self, "TaskRole",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
)
bucket.grant_read_write(task_role)
# --- MAP: worker task definition ---
map_task_def = ecs.FargateTaskDefinition(
self, "MapTaskDef",
cpu=16384, # 16 vCPU
memory_limit_mib=65536, # 64 GB — high memory for pandas operations on large datasets
task_role=task_role,
runtime_platform=ecs.RuntimePlatform(
cpu_architecture=ecs.CpuArchitecture.ARM64,
operating_system_family=ecs.OperatingSystemFamily.LINUX,
),
)
map_container = map_task_def.add_container(
"worker",
image=worker_image,
logging=ecs.LogDrivers.aws_logs(
stream_prefix="map",
log_group=log_group,
),
environment={
"S3_BUCKET": bucket.bucket_name,
},
)
# --- REDUCE: reducer task definition ---
reduce_task_def = ecs.FargateTaskDefinition(
self, "ReduceTaskDef",
cpu=4096, # 4 vCPU
memory_limit_mib=30720, # 30 GB — must hold full year in memory
task_role=task_role,
runtime_platform=ecs.RuntimePlatform(
cpu_architecture=ecs.CpuArchitecture.ARM64,
operating_system_family=ecs.OperatingSystemFamily.LINUX,
),
)
reduce_container = reduce_task_def.add_container(
"reducer",
image=reducer_image,
logging=ecs.LogDrivers.aws_logs(
stream_prefix="reduce",
log_group=log_group,
),
environment={
"S3_BUCKET": bucket.bucket_name,
},
)
# --- Step Functions ---
# Map task: run ECS Fargate for each date chunk
map_ecs_task = sfn_tasks.EcsRunTask(
self, "ProcessChunk",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=map_task_def,
launch_target=sfn_tasks.EcsFargateLaunchTarget(
platform_version=ecs.FargatePlatformVersion.LATEST,
),
container_overrides=[
sfn_tasks.ContainerOverride(
container_definition=map_container,
environment=[
sfn_tasks.TaskEnvironmentVariable(
name="START_DATE",
value=sfn.JsonPath.string_at("$.start_date"),
),
sfn_tasks.TaskEnvironmentVariable(
name="END_DATE",
value=sfn.JsonPath.string_at("$.end_date"),
),
sfn_tasks.TaskEnvironmentVariable(
name="RUN_ID",
value=sfn.JsonPath.string_at("$.run_id"),
),
sfn_tasks.TaskEnvironmentVariable(
name="CLICKHOUSE_HOST",
value=sfn.JsonPath.string_at("$.clickhouse_host"),
),
sfn_tasks.TaskEnvironmentVariable(
name="CLICKHOUSE_USERNAME",
value=sfn.JsonPath.string_at("$.clickhouse_username"),
),
sfn_tasks.TaskEnvironmentVariable(
name="CLICKHOUSE_PASSWORD",
value=sfn.JsonPath.string_at("$.clickhouse_password"),
),
],
)
],
assign_public_ip=True,
subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
result_path="$.task_result",
)
# Map state — max 3 concurrent workers
map_state = sfn.Map(
self, "FanOutChunks",
items_path="$.chunks",
max_concurrency=3,
result_path="$.map_results",
)
map_state.item_processor(map_ecs_task)
# Reduce task: combine all chunk CSVs
reduce_ecs_task = sfn_tasks.EcsRunTask(
self, "ReduceResults",
integration_pattern=sfn.IntegrationPattern.RUN_JOB,
cluster=cluster,
task_definition=reduce_task_def,
launch_target=sfn_tasks.EcsFargateLaunchTarget(
platform_version=ecs.FargatePlatformVersion.LATEST,
),
container_overrides=[
sfn_tasks.ContainerOverride(
container_definition=reduce_container,
environment=[
sfn_tasks.TaskEnvironmentVariable(
name="RUN_ID",
value=sfn.JsonPath.string_at("$.run_id"),
),
sfn_tasks.TaskEnvironmentVariable(
name="GLOBAL_START_DATE",
value=sfn.JsonPath.string_at("$.global_start_date"),
),
sfn_tasks.TaskEnvironmentVariable(
name="GLOBAL_END_DATE",
value=sfn.JsonPath.string_at("$.global_end_date"),
),
],
)
],
assign_public_ip=True,
subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
)
# Chain: fan-out map → reduce
definition = map_state.next(reduce_ecs_task)
sfn.StateMachine(
self, "Pipeline",
state_machine_name="adsb-map-reduce",
definition_body=sfn.DefinitionBody.from_chainable(definition),
timeout=Duration.hours(48),
)
# --- Outputs ---
cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name)
cdk.CfnOutput(self, "StateMachineName", value="adsb-map-reduce")
+640
View File
@@ -0,0 +1,640 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "06ae0319",
"metadata": {},
"outputs": [],
"source": [
"import clickhouse_connect\n",
"client = clickhouse_connect.get_client(\n",
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
" secure=True,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "779710f0",
"metadata": {},
"outputs": [],
"source": [
"df = client.query_df(\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '2024-01-01 00:00:00' AND time < '2024-01-02 00:00:00'\")\n",
"df_copy = df.copy()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bf024da8",
"metadata": {},
"outputs": [],
"source": [
"# -- military = dbFlags & 1; interesting = dbFlags & 2; PIA = dbFlags & 4; LADD = dbFlags & 8;"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "270607b5",
"metadata": {},
"outputs": [],
"source": [
"df = load_raw_adsb_for_day(datetime(2024,1,1))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ac06a30e",
"metadata": {},
"outputs": [],
"source": [
"df['aircraft']"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "91edab3e",
"metadata": {},
"outputs": [],
"source": [
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
"def compress_df(df):\n",
" icao = df.name\n",
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
" original_df = df.copy()\n",
" df = df.groupby(\"_signature\", as_index=False).last() # check if it works with both last and first.\n",
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
" def get_non_empty_dict(row):\n",
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
" \n",
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
" \n",
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
" def is_subset_of_any(idx):\n",
" row_dict = df.loc[idx, '_non_empty_dict']\n",
" row_count = df.loc[idx, '_non_empty_count']\n",
" \n",
" for other_idx in df.index:\n",
" if idx == other_idx:\n",
" continue\n",
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
" other_count = df.loc[other_idx, '_non_empty_count']\n",
" \n",
" # Check if all non-empty values in current row match those in other row\n",
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
" # If they match and other has more defined columns, current row is redundant\n",
" if other_count > row_count:\n",
" return True\n",
" return False\n",
" \n",
" # Keep rows that are not subsets of any other row\n",
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
" df = df[keep_mask]\n",
"\n",
" if len(df) > 1:\n",
" original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n",
" value_counts = original_df[\"_signature\"].value_counts()\n",
" max_signature = value_counts.idxmax()\n",
" df = df[df['_signature'] == max_signature]\n",
"\n",
" df['icao'] = icao\n",
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
" return df\n",
"\n",
"# df = df_copy\n",
"# df = df_copy.iloc[0:100000]\n",
"# df = df[df['r'] == \"N4131T\"]\n",
"# df = df[(df['icao'] == \"008081\")]\n",
"# df = df.iloc[0:500]\n",
"df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
"df = df.drop(columns=['aircraft'])\n",
"df = df.sort_values(['icao', 'time'])\n",
"df[COLUMNS] = df[COLUMNS].fillna('')\n",
"ORIGINAL_COLUMNS = df.columns.tolist()\n",
"df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
"cols = df_compressed.columns.tolist()\n",
"cols.remove(\"icao\")\n",
"cols.insert(1, \"icao\")\n",
"df_compressed = df_compressed[cols]\n",
"df_compressed"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "efdfcb2c",
"metadata": {},
"outputs": [],
"source": [
"df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
"df[~df['aircraft_category'].isna()]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "495c5025",
"metadata": {},
"outputs": [],
"source": [
"# SOME KIND OF MAP REDUCE SYSTEM\n",
"import os\n",
"\n",
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
"def compress_df(df):\n",
" icao = df.name\n",
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
" \n",
" # Compute signature counts before grouping (avoid copy)\n",
" signature_counts = df[\"_signature\"].value_counts()\n",
" \n",
" df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n",
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
" def get_non_empty_dict(row):\n",
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
" \n",
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
" \n",
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
" def is_subset_of_any(idx):\n",
" row_dict = df.loc[idx, '_non_empty_dict']\n",
" row_count = df.loc[idx, '_non_empty_count']\n",
" \n",
" for other_idx in df.index:\n",
" if idx == other_idx:\n",
" continue\n",
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
" other_count = df.loc[other_idx, '_non_empty_count']\n",
" \n",
" # Check if all non-empty values in current row match those in other row\n",
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
" # If they match and other has more defined columns, current row is redundant\n",
" if other_count > row_count:\n",
" return True\n",
" return False\n",
" \n",
" # Keep rows that are not subsets of any other row\n",
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
" df = df[keep_mask]\n",
"\n",
" if len(df) > 1:\n",
" # Use pre-computed signature counts instead of original_df\n",
" remaining_sigs = df['_signature']\n",
" sig_counts = signature_counts[remaining_sigs]\n",
" max_signature = sig_counts.idxmax()\n",
" df = df[df['_signature'] == max_signature]\n",
"\n",
" df['icao'] = icao\n",
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
" return df\n",
"\n",
"# names of releases something like\n",
"# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n",
"\n",
"# Let's build historical first. \n",
"\n",
"_ch_client = None\n",
"\n",
"def _get_clickhouse_client():\n",
" \"\"\"Return a reusable ClickHouse client, with retry/backoff for transient DNS or connection errors.\"\"\"\n",
" global _ch_client\n",
" if _ch_client is not None:\n",
" return _ch_client\n",
"\n",
" import clickhouse_connect\n",
" import time\n",
"\n",
" max_retries = 5\n",
" for attempt in range(1, max_retries + 1):\n",
" try:\n",
" _ch_client = clickhouse_connect.get_client(\n",
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
" secure=True,\n",
" )\n",
" return _ch_client\n",
" except Exception as e:\n",
" wait = min(2 ** attempt, 30)\n",
" print(f\" ClickHouse connect attempt {attempt}/{max_retries} failed: {e}\")\n",
" if attempt == max_retries:\n",
" raise\n",
" print(f\" Retrying in {wait}s...\")\n",
" time.sleep(wait)\n",
"\n",
"\n",
"def load_raw_adsb_for_day(day):\n",
" \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n",
" from datetime import timedelta\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" import time\n",
" \n",
" start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n",
" end_time = start_time + timedelta(days=1)\n",
" \n",
" # Set up caching\n",
" cache_dir = Path(\"data/adsb\")\n",
" cache_dir.mkdir(parents=True, exist_ok=True)\n",
" cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n",
" \n",
" # Check if cache exists\n",
" if cache_file.exists():\n",
" print(f\" Loading from cache: {cache_file}\")\n",
" df = pd.read_csv(cache_file, compression='zstd')\n",
" df['time'] = pd.to_datetime(df['time'])\n",
" else:\n",
" # Format dates for the query\n",
" start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" \n",
" max_retries = 3\n",
" for attempt in range(1, max_retries + 1):\n",
" try:\n",
" client = _get_clickhouse_client()\n",
" print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n",
" df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n",
" break\n",
" except Exception as e:\n",
" wait = min(2 ** attempt, 30)\n",
" print(f\" Query attempt {attempt}/{max_retries} failed: {e}\")\n",
" if attempt == max_retries:\n",
" raise\n",
" # Reset client in case connection is stale\n",
" global _ch_client\n",
" _ch_client = None\n",
" print(f\" Retrying in {wait}s...\")\n",
" time.sleep(wait)\n",
" \n",
" # Save to cache\n",
" df.to_csv(cache_file, index=False, compression='zstd')\n",
" print(f\" Saved to cache: {cache_file}\")\n",
" \n",
" return df\n",
"\n",
"def load_historical_for_day(day):\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" \n",
" df = load_raw_adsb_for_day(day)\n",
" print(df)\n",
" df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
" df = df.drop(columns=['aircraft'])\n",
" df = df.sort_values(['icao', 'time'])\n",
" df[COLUMNS] = df[COLUMNS].fillna('')\n",
" df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
" cols = df_compressed.columns.tolist()\n",
" cols.remove('time')\n",
" cols.insert(0, 'time')\n",
" cols.remove(\"icao\")\n",
" cols.insert(1, \"icao\")\n",
" df_compressed = df_compressed[cols]\n",
" return df_compressed\n",
"\n",
"\n",
"def concat_compressed_dfs(df_base, df_new):\n",
" \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n",
" import pandas as pd\n",
" \n",
" # Combine both dataframes\n",
" df_combined = pd.concat([df_base, df_new], ignore_index=True)\n",
" \n",
" # Sort by ICAO and time\n",
" df_combined = df_combined.sort_values(['icao', 'time'])\n",
" \n",
" # Fill NaN values\n",
" df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n",
" \n",
" # Apply compression logic per ICAO to get the best row\n",
" df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n",
" \n",
" # Sort by time\n",
" df_compressed = df_compressed.sort_values('time')\n",
" \n",
" return df_compressed\n",
"\n",
"\n",
"def get_latest_aircraft_adsb_csv_df():\n",
" \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n",
" from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n",
" \n",
" import pandas as pd\n",
" import re\n",
" \n",
" csv_path = download_latest_aircraft_adsb_csv()\n",
" df = pd.read_csv(csv_path)\n",
" df = df.fillna(\"\")\n",
" \n",
" # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n",
" match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n",
" if not match:\n",
" raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n",
" \n",
" date_str = match.group(1)\n",
" return df, date_str\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7f66acf7",
"metadata": {},
"outputs": [],
"source": [
"# SOME KIND OF MAP REDUCE SYSTEM\n",
"\n",
"\n",
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
"def compress_df(df):\n",
" icao = df.name\n",
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
" original_df = df.copy()\n",
" df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n",
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
" def get_non_empty_dict(row):\n",
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
" \n",
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
" \n",
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
" def is_subset_of_any(idx):\n",
" row_dict = df.loc[idx, '_non_empty_dict']\n",
" row_count = df.loc[idx, '_non_empty_count']\n",
" \n",
" for other_idx in df.index:\n",
" if idx == other_idx:\n",
" continue\n",
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
" other_count = df.loc[other_idx, '_non_empty_count']\n",
" \n",
" # Check if all non-empty values in current row match those in other row\n",
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
" # If they match and other has more defined columns, current row is redundant\n",
" if other_count > row_count:\n",
" return True\n",
" return False\n",
" \n",
" # Keep rows that are not subsets of any other row\n",
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
" df = df[keep_mask]\n",
"\n",
" if len(df) > 1:\n",
" original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n",
" value_counts = original_df[\"_signature\"].value_counts()\n",
" max_signature = value_counts.idxmax()\n",
" df = df[df['_signature'] == max_signature]\n",
"\n",
" df['icao'] = icao\n",
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
" return df\n",
"\n",
"# names of releases something like\n",
"# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n",
"\n",
"# Let's build historical first. \n",
"\n",
"def load_raw_adsb_for_day(day):\n",
" \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n",
" from datetime import timedelta\n",
" import clickhouse_connect\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" \n",
" start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n",
" end_time = start_time + timedelta(days=1)\n",
" \n",
" # Set up caching\n",
" cache_dir = Path(\"data/adsb\")\n",
" cache_dir.mkdir(parents=True, exist_ok=True)\n",
" cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n",
" \n",
" # Check if cache exists\n",
" if cache_file.exists():\n",
" print(f\" Loading from cache: {cache_file}\")\n",
" df = pd.read_csv(cache_file, compression='zstd')\n",
" df['time'] = pd.to_datetime(df['time'])\n",
" else:\n",
" # Format dates for the query\n",
" start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" \n",
" client = clickhouse_connect.get_client(\n",
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
" secure=True,\n",
" )\n",
" print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n",
" df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n",
" \n",
" # Save to cache\n",
" df.to_csv(cache_file, index=False, compression='zstd')\n",
" print(f\" Saved to cache: {cache_file}\")\n",
" \n",
" return df\n",
"\n",
"def load_historical_for_day(day):\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" \n",
" df = load_raw_adsb_for_day(day)\n",
" \n",
" df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
" df = df.drop(columns=['aircraft'])\n",
" df = df.sort_values(['icao', 'time'])\n",
" df[COLUMNS] = df[COLUMNS].fillna('')\n",
" df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
" cols = df_compressed.columns.tolist()\n",
" cols.remove('time')\n",
" cols.insert(0, 'time')\n",
" cols.remove(\"icao\")\n",
" cols.insert(1, \"icao\")\n",
" df_compressed = df_compressed[cols]\n",
" return df_compressed\n",
"\n",
"\n",
"def concat_compressed_dfs(df_base, df_new):\n",
" \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n",
" import pandas as pd\n",
" \n",
" # Combine both dataframes\n",
" df_combined = pd.concat([df_base, df_new], ignore_index=True)\n",
" \n",
" # Sort by ICAO and time\n",
" df_combined = df_combined.sort_values(['icao', 'time'])\n",
" \n",
" # Fill NaN values\n",
" df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n",
" \n",
" # Apply compression logic per ICAO to get the best row\n",
" df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n",
" \n",
" # Sort by time\n",
" df_compressed = df_compressed.sort_values('time')\n",
" \n",
" return df_compressed\n",
"\n",
"\n",
"def get_latest_aircraft_adsb_csv_df():\n",
" \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n",
" from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n",
" \n",
" import pandas as pd\n",
" import re\n",
" \n",
" csv_path = download_latest_aircraft_adsb_csv()\n",
" df = pd.read_csv(csv_path)\n",
" df = df.fillna(\"\")\n",
" \n",
" # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n",
" match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n",
" if not match:\n",
" raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n",
" \n",
" date_str = match.group(1)\n",
" return df, date_str\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e14c8363",
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime\n",
"df = load_historical_for_day(datetime(2024,1,1))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3874ba4d",
"metadata": {},
"outputs": [],
"source": [
"len(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bcae50ad",
"metadata": {},
"outputs": [],
"source": [
"df[(df['icao'] == \"008081\")]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "50921c86",
"metadata": {},
"outputs": [],
"source": [
"df[df['icao'] == \"a4e1d2\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8194d9aa",
"metadata": {},
"outputs": [],
"source": [
"df[df['r'] == \"N4131T\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1e3b7aa2",
"metadata": {},
"outputs": [],
"source": [
"df_compressed[df_compressed['icao'].duplicated(keep=False)]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "40613bc1",
"metadata": {},
"outputs": [],
"source": [
"import gzip\n",
"import json\n",
"\n",
"path = \"/Users/jonahgoode/Downloads/test_extract/traces/fb/trace_full_acbbfb.json\"\n",
"\n",
"with gzip.open(path, \"rt\", encoding=\"utf-8\") as f:\n",
" data = json.load(f)\n",
"\n",
"print(type(data))\n",
"# use `data` here\n",
"import json\n",
"print(json.dumps(data, indent=2)[:2000])\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "320109b2",
"metadata": {},
"outputs": [],
"source": [
"# First, load the JSON to inspect its structure\n",
"import json\n",
"with open(\"/Users/jonahgoode/Documents/PlaneQuery/Other-Code/readsb-protobuf/webapp/src/db/aircrafts.json\", 'r') as f:\n",
" data = json.load(f)\n",
"\n",
"# Check the structure\n",
"print(type(data))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "590134f4",
"metadata": {},
"outputs": [],
"source": [
"data['AC97E3']"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
+1 -1
View File
@@ -1,3 +1,3 @@
faa-aircraft-registry==0.1.0
pandas==3.0.0
clickhouse-connect==0.10.0
+10
View File
@@ -0,0 +1,10 @@
FROM --platform=linux/arm64 python:3.12-slim
WORKDIR /app
COPY requirements.reducer.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY reducer.py .
CMD ["python", "-u", "reducer.py"]
+11
View File
@@ -0,0 +1,11 @@
FROM --platform=linux/arm64 python:3.12-slim
WORKDIR /app
COPY requirements.worker.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY worker.py .
CMD ["python", "-u", "worker.py"]
+7
View File
@@ -0,0 +1,7 @@
from pathlib import Path
from datetime import datetime, timezone,timedelta
from adsb_to_aircraft_data_historical import load_historical_for_day
day = datetime.now(timezone.utc) - timedelta(days=1)
load_historical_for_day(day)
@@ -0,0 +1,87 @@
"""
Process historical ADS-B data by date range.
Downloads and compresses ADS-B messages for each day in the specified range.
"""
import argparse
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
from compress_adsb_to_aircraft_data import load_historical_for_day, COLUMNS
def deduplicate_by_signature(df):
"""For each icao, keep only the earliest row with each unique signature."""
df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)
# Group by icao and signature, keep first (earliest) occurrence
df_deduped = df.groupby(['icao', '_signature'], as_index=False).first()
df_deduped = df_deduped.drop(columns=['_signature'])
df_deduped = df_deduped.sort_values('time')
return df_deduped
def main(start_date_str: str, end_date_str: str):
"""Process historical ADS-B data for the given date range."""
OUT_ROOT = Path("data/planequery_aircraft")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
# Parse dates
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
# Calculate total number of days
total_days = (end_date - start_date).days
print(f"Processing {total_days} days from {start_date_str} to {end_date_str}")
# Initialize accumulated dataframe
df_accumulated = pd.DataFrame()
# Cache directory path
cache_dir = Path("data/adsb")
# Iterate through each day
current_date = start_date
while current_date < end_date:
print(f"Processing {current_date.strftime('%Y-%m-%d')}...")
df_compressed = load_historical_for_day(current_date)
# Concatenate to accumulated dataframe
if df_accumulated.empty:
df_accumulated = df_compressed
else:
df_accumulated = pd.concat([df_accumulated, df_compressed], ignore_index=True)
print(f" Added {len(df_compressed)} records (total: {len(df_accumulated)})")
# Save intermediate output after each day
current_date_str = current_date.strftime('%Y-%m-%d')
output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{current_date_str}.csv.gz"
df_deduped = deduplicate_by_signature(df_accumulated.copy())
df_deduped.to_csv(output_file, index=False, compression='gzip')
print(f" Saved to {output_file.name}")
# Delete cache after processing if processing more than 10 days
if total_days > 5 and cache_dir.exists():
import shutil
shutil.rmtree(cache_dir)
print(f" Deleted cache directory to save space")
# Move to next day
current_date += timedelta(days=1)
# Save the final accumulated data
output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{end_date_str}.csv.gz"
df_accumulated = deduplicate_by_signature(df_accumulated)
df_accumulated.to_csv(output_file, index=False, compression='gzip')
print(f"Completed processing from {start_date_str} to {end_date_str}")
print(f"Saved {len(df_accumulated)} total records to {output_file}")
if __name__ == '__main__':
# Parse command line arguments
parser = argparse.ArgumentParser(description="Process historical ADS-B data from ClickHouse")
parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)")
parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)")
args = parser.parse_args()
main(args.start_date, args.end_date)
+170
View File
@@ -0,0 +1,170 @@
# SOME KIND OF MAP REDUCE SYSTEM
import os
COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']
def compress_df(df):
icao = df.name
df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)
# Compute signature counts before grouping (avoid copy)
signature_counts = df["_signature"].value_counts()
df = df.groupby("_signature", as_index=False).first() # check if it works with both last and first.
# For each row, create a dict of non-empty column values. This is using sets and subsets...
def get_non_empty_dict(row):
return {col: row[col] for col in COLUMNS if row[col] != ''}
df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)
df['_non_empty_count'] = df['_non_empty_dict'].apply(len)
# Check if row i's non-empty values are a subset of row j's non-empty values
def is_subset_of_any(idx):
row_dict = df.loc[idx, '_non_empty_dict']
row_count = df.loc[idx, '_non_empty_count']
for other_idx in df.index:
if idx == other_idx:
continue
other_dict = df.loc[other_idx, '_non_empty_dict']
other_count = df.loc[other_idx, '_non_empty_count']
# Check if all non-empty values in current row match those in other row
if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):
# If they match and other has more defined columns, current row is redundant
if other_count > row_count:
return True
return False
# Keep rows that are not subsets of any other row
keep_mask = ~df.index.to_series().apply(is_subset_of_any)
df = df[keep_mask]
if len(df) > 1:
# Use pre-computed signature counts instead of original_df
remaining_sigs = df['_signature']
sig_counts = signature_counts[remaining_sigs]
max_signature = sig_counts.idxmax()
df = df[df['_signature'] == max_signature]
df['icao'] = icao
df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])
# Ensure empty strings are preserved, not NaN
df[COLUMNS] = df[COLUMNS].fillna('')
return df
# names of releases something like
# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz
# Let's build historical first.
def load_raw_adsb_for_day(day):
"""Load raw ADS-B data for a day from parquet file."""
from datetime import timedelta
from pathlib import Path
import pandas as pd
start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)
# Check for parquet file first
version_date = f"v{start_time.strftime('%Y.%m.%d')}"
parquet_file = Path(f"data/output/parquet_output/{version_date}.parquet")
if not parquet_file.exists():
# Try to generate parquet file by calling the download function
print(f" Parquet file not found: {parquet_file}")
print(f" Attempting to download and generate parquet for {start_time.strftime('%Y-%m-%d')}...")
from download_adsb_data_to_parquet import create_parquet_for_day
result_path = create_parquet_for_day(start_time, keep_folders=False)
if result_path:
print(f" Successfully generated parquet file: {result_path}")
else:
raise Exception("Failed to generate parquet file")
if parquet_file.exists():
print(f" Loading from parquet: {parquet_file}")
df = pd.read_parquet(
parquet_file,
columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category']
)
# Convert to timezone-naive datetime
df['time'] = df['time'].dt.tz_localize(None)
return df
else:
# Return empty DataFrame if parquet file doesn't exist
print(f" No data available for {start_time.strftime('%Y-%m-%d')}")
import pandas as pd
return pd.DataFrame(columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category'])
def load_historical_for_day(day):
from pathlib import Path
import pandas as pd
df = load_raw_adsb_for_day(day)
if df.empty:
return df
print(f"Loaded {len(df)} raw records for {day.strftime('%Y-%m-%d')}")
df = df.sort_values(['icao', 'time'])
print("done sort")
df[COLUMNS] = df[COLUMNS].fillna('')
# First pass: quick deduplication of exact duplicates
df = df.drop_duplicates(subset=['icao'] + COLUMNS, keep='first')
print(f"After quick dedup: {len(df)} records")
# Second pass: sophisticated compression per ICAO
print("Compressing per ICAO...")
df_compressed = df.groupby('icao', group_keys=False).apply(compress_df)
print(f"After compress: {len(df_compressed)} records")
cols = df_compressed.columns.tolist()
cols.remove('time')
cols.insert(0, 'time')
cols.remove("icao")
cols.insert(1, "icao")
df_compressed = df_compressed[cols]
return df_compressed
def concat_compressed_dfs(df_base, df_new):
"""Concatenate base and new compressed dataframes, keeping the most informative row per ICAO."""
import pandas as pd
# Combine both dataframes
df_combined = pd.concat([df_base, df_new], ignore_index=True)
# Sort by ICAO and time
df_combined = df_combined.sort_values(['icao', 'time'])
# Fill NaN values
df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')
# Apply compression logic per ICAO to get the best row
df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)
# Sort by time
df_compressed = df_compressed.sort_values('time')
return df_compressed
def get_latest_aircraft_adsb_csv_df():
"""Download and load the latest ADS-B CSV from GitHub releases."""
from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv
import pandas as pd
import re
csv_path = download_latest_aircraft_adsb_csv()
df = pd.read_csv(csv_path)
df = df.fillna("")
# Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv
match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
date_str = match.group(1)
return df, date_str
+684
View File
@@ -0,0 +1,684 @@
"""
Downloads adsb.lol data and writes to Parquet files.
Usage:
python -m src.process_historical_adsb_data.download_to_parquet 2025-01-01 2025-01-02
This will download trace data for the specified date range and output Parquet files.
This file is self-contained and does not import from other project modules.
"""
import gc
import glob
import gzip
import sys
import logging
import time
import re
import signal
import concurrent.futures
import subprocess
import os
import argparse
import datetime as dt
from datetime import datetime, timedelta, timezone
import requests
import orjson
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
# ============================================================================
# Configuration
# ============================================================================
OUTPUT_DIR = "./data/output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
PARQUET_DIR = os.path.join(OUTPUT_DIR, "parquet_output")
os.makedirs(PARQUET_DIR, exist_ok=True)
TOKEN = os.environ.get('GITHUB_TOKEN') # Optional: for higher GitHub API rate limits
HEADERS = {"Authorization": f"token {TOKEN}"} if TOKEN else {}
# ============================================================================
# GitHub Release Fetching and Downloading
# ============================================================================
class DownloadTimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise DownloadTimeoutException("Download timed out after 40 seconds")
def fetch_releases(version_date: str) -> list:
"""Fetch GitHub releases for a given version date from adsblol."""
year = version_date.split('.')[0][1:]
if version_date == "v2024.12.31":
year = "2025"
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
PATTERN = f"{version_date}-planes-readsb-prod-0"
releases = []
page = 1
while True:
max_retries = 10
retry_delay = 60
for attempt in range(1, max_retries + 1):
try:
response = requests.get(f"{BASE_URL}?page={page}", headers=HEADERS)
if response.status_code == 200:
break
else:
print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status_code} {response.reason}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry...")
time.sleep(retry_delay)
else:
print(f"Giving up after {max_retries} attempts")
return releases
except Exception as e:
print(f"Request exception (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry...")
time.sleep(retry_delay)
else:
print(f"Giving up after {max_retries} attempts")
return releases
data = response.json()
if not data:
break
for release in data:
if re.match(PATTERN, release["tag_name"]):
releases.append(release)
page += 1
return releases
def download_asset(asset_url: str, file_path: str) -> bool:
"""Download a single release asset."""
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
if os.path.exists(file_path):
print(f"[SKIP] {file_path} already downloaded.")
return True
print(f"Downloading {asset_url}...")
try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(40) # 40-second timeout
response = requests.get(asset_url, headers=HEADERS, stream=True)
signal.alarm(0)
if response.status_code == 200:
with open(file_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"Saved {file_path}")
return True
else:
print(f"Failed to download {asset_url}: {response.status_code} {response.reason}")
return False
except DownloadTimeoutException as e:
print(f"Download aborted for {asset_url}: {e}")
return False
except Exception as e:
print(f"An error occurred while downloading {asset_url}: {e}")
return False
def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
"""
Extracts a split archive by concatenating the parts using 'cat'
and then extracting with 'tar' in one pipeline.
"""
if os.path.isdir(extract_dir):
print(f"[SKIP] Extraction directory already exists: {extract_dir}")
return True
def sort_key(path: str):
base = os.path.basename(path)
parts = base.rsplit('.', maxsplit=1)
if len(parts) == 2:
suffix = parts[1]
if suffix.isdigit():
return (0, int(suffix))
if re.fullmatch(r'[a-zA-Z]+', suffix):
return (1, suffix)
return (2, base)
file_paths = sorted(file_paths, key=sort_key)
os.makedirs(extract_dir, exist_ok=True)
try:
cat_proc = subprocess.Popen(
["cat"] + file_paths,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
tar_cmd = ["tar", "xf", "-", "-C", extract_dir, "--strip-components=1"]
subprocess.run(
tar_cmd,
stdin=cat_proc.stdout,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True
)
cat_proc.stdout.close()
cat_proc.wait()
print(f"Successfully extracted archive to {extract_dir}")
return True
except subprocess.CalledProcessError as e:
print(f"Failed to extract split archive: {e}")
return False
# ============================================================================
# Trace File Processing (with alt_baro/on_ground handling)
# ============================================================================
ALLOWED_DATA_SOURCE = {'', 'adsb.lol', 'adsbexchange', 'airplanes.live'}
def process_file(filepath: str) -> list:
"""
Process a single trace file and return list of rows.
Handles alt_baro/on_ground: if altitude == "ground", on_ground=True and alt_baro=None.
"""
insert_rows = []
with gzip.open(filepath, 'rb') as f:
data = orjson.loads(f.read())
icao = data.get('icao', None)
if icao is None:
print(f"Skipping file {filepath} as it does not contain 'icao'")
return []
r = data.get('r', "")
t = data.get('t', "")
dbFlags = data.get('dbFlags', 0)
noRegData = data.get('noRegData', False)
ownOp = data.get('ownOp', "")
year = int(data.get('year', 0))
timestamp = data.get('timestamp', None)
desc = data.get('desc', "")
trace_data = data.get('trace', None)
if timestamp is None or trace_data is None:
print(f"Skipping file {filepath} as it does not contain 'timestamp' or 'trace'")
return []
for row in trace_data:
time_offset = row[0]
lat = row[1]
lon = row[2]
altitude = row[3]
# Handle alt_baro/on_ground
alt_baro = None
on_ground = False
if type(altitude) is str and altitude == "ground":
on_ground = True
elif type(altitude) is int:
alt_baro = altitude
elif type(altitude) is float:
alt_baro = int(altitude)
ground_speed = row[4]
track_degrees = row[5]
flags = row[6]
vertical_rate = row[7]
aircraft = row[8]
source = row[9]
data_source_value = "adsb.lol" if "adsb.lol" in ALLOWED_DATA_SOURCE else ""
geometric_altitude = row[10]
geometric_vertical_rate = row[11]
indicated_airspeed = row[12]
roll_angle = row[13]
time_val = timestamp + time_offset
dt64 = dt.datetime.fromtimestamp(time_val, tz=dt.timezone.utc)
# Prepare base fields
inserted_row = [
dt64, icao, r, t, dbFlags, noRegData, ownOp, year, desc,
lat, lon, alt_baro, on_ground, ground_speed, track_degrees,
flags, vertical_rate
]
next_part = [
source, geometric_altitude, geometric_vertical_rate,
indicated_airspeed, roll_angle
]
inserted_row.extend(next_part)
if aircraft is None or type(aircraft) is not dict:
aircraft = dict()
aircraft_data = {
'alert': aircraft.get('alert', None),
'alt_geom': aircraft.get('alt_geom', None),
'gva': aircraft.get('gva', None),
'nac_p': aircraft.get('nac_p', None),
'nac_v': aircraft.get('nac_v', None),
'nic': aircraft.get('nic', None),
'nic_baro': aircraft.get('nic_baro', None),
'rc': aircraft.get('rc', None),
'sda': aircraft.get('sda', None),
'sil': aircraft.get('sil', None),
'sil_type': aircraft.get('sil_type', ""),
'spi': aircraft.get('spi', None),
'track': aircraft.get('track', None),
'type': aircraft.get('type', ""),
'version': aircraft.get('version', None),
'category': aircraft.get('category', ''),
'emergency': aircraft.get('emergency', ''),
'flight': aircraft.get('flight', ""),
'squawk': aircraft.get('squawk', ""),
'baro_rate': aircraft.get('baro_rate', None),
'nav_altitude_fms': aircraft.get('nav_altitude_fms', None),
'nav_altitude_mcp': aircraft.get('nav_altitude_mcp', None),
'nav_modes': aircraft.get('nav_modes', []),
'nav_qnh': aircraft.get('nav_qnh', None),
'geom_rate': aircraft.get('geom_rate', None),
'ias': aircraft.get('ias', None),
'mach': aircraft.get('mach', None),
'mag_heading': aircraft.get('mag_heading', None),
'oat': aircraft.get('oat', None),
'roll': aircraft.get('roll', None),
'tas': aircraft.get('tas', None),
'tat': aircraft.get('tat', None),
'true_heading': aircraft.get('true_heading', None),
'wd': aircraft.get('wd', None),
'ws': aircraft.get('ws', None),
'track_rate': aircraft.get('track_rate', None),
'nav_heading': aircraft.get('nav_heading', None)
}
aircraft_list = list(aircraft_data.values())
inserted_row.extend(aircraft_list)
inserted_row.append(data_source_value)
insert_rows.append(inserted_row)
if insert_rows:
print(f"Got {len(insert_rows)} rows from {filepath}")
return insert_rows
else:
return []
# ============================================================================
# Parquet Writing
# ============================================================================
# Column names matching the order of data in inserted_row
COLUMNS = [
"time", "icao",
"r", "t", "dbFlags", "noRegData", "ownOp", "year", "desc",
"lat", "lon", "alt_baro", "on_ground", "ground_speed", "track_degrees",
"flags", "vertical_rate", "source", "geometric_altitude",
"geometric_vertical_rate", "indicated_airspeed", "roll_angle",
"aircraft_alert", "aircraft_alt_geom", "aircraft_gva", "aircraft_nac_p",
"aircraft_nac_v", "aircraft_nic", "aircraft_nic_baro", "aircraft_rc",
"aircraft_sda", "aircraft_sil", "aircraft_sil_type", "aircraft_spi",
"aircraft_track", "aircraft_type", "aircraft_version", "aircraft_category",
"aircraft_emergency", "aircraft_flight", "aircraft_squawk",
"aircraft_baro_rate", "aircraft_nav_altitude_fms", "aircraft_nav_altitude_mcp",
"aircraft_nav_modes", "aircraft_nav_qnh", "aircraft_geom_rate",
"aircraft_ias", "aircraft_mach", "aircraft_mag_heading", "aircraft_oat",
"aircraft_roll", "aircraft_tas", "aircraft_tat", "aircraft_true_heading",
"aircraft_wd", "aircraft_ws", "aircraft_track_rate", "aircraft_nav_heading",
"data_source",
]
OS_CPU_COUNT = os.cpu_count() or 1
MAX_WORKERS = OS_CPU_COUNT if OS_CPU_COUNT > 4 else 1
CHUNK_SIZE = MAX_WORKERS * 1000
BATCH_SIZE = (os.cpu_count() or 1) * 100000
# PyArrow schema for efficient Parquet writing
PARQUET_SCHEMA = pa.schema([
("time", pa.timestamp("ms", tz="UTC")),
("icao", pa.string()),
("r", pa.string()),
("t", pa.string()),
("dbFlags", pa.int32()),
("noRegData", pa.bool_()),
("ownOp", pa.string()),
("year", pa.uint16()),
("desc", pa.string()),
("lat", pa.float64()),
("lon", pa.float64()),
("alt_baro", pa.int32()),
("on_ground", pa.bool_()),
("ground_speed", pa.float32()),
("track_degrees", pa.float32()),
("flags", pa.uint32()),
("vertical_rate", pa.int32()),
("source", pa.string()),
("geometric_altitude", pa.int32()),
("geometric_vertical_rate", pa.int32()),
("indicated_airspeed", pa.int32()),
("roll_angle", pa.float32()),
("aircraft_alert", pa.int64()),
("aircraft_alt_geom", pa.int64()),
("aircraft_gva", pa.int64()),
("aircraft_nac_p", pa.int64()),
("aircraft_nac_v", pa.int64()),
("aircraft_nic", pa.int64()),
("aircraft_nic_baro", pa.int64()),
("aircraft_rc", pa.int64()),
("aircraft_sda", pa.int64()),
("aircraft_sil", pa.int64()),
("aircraft_sil_type", pa.string()),
("aircraft_spi", pa.int64()),
("aircraft_track", pa.float64()),
("aircraft_type", pa.string()),
("aircraft_version", pa.int64()),
("aircraft_category", pa.string()),
("aircraft_emergency", pa.string()),
("aircraft_flight", pa.string()),
("aircraft_squawk", pa.string()),
("aircraft_baro_rate", pa.int64()),
("aircraft_nav_altitude_fms", pa.int64()),
("aircraft_nav_altitude_mcp", pa.int64()),
("aircraft_nav_modes", pa.list_(pa.string())),
("aircraft_nav_qnh", pa.float64()),
("aircraft_geom_rate", pa.int64()),
("aircraft_ias", pa.int64()),
("aircraft_mach", pa.float64()),
("aircraft_mag_heading", pa.float64()),
("aircraft_oat", pa.int64()),
("aircraft_roll", pa.float64()),
("aircraft_tas", pa.int64()),
("aircraft_tat", pa.int64()),
("aircraft_true_heading", pa.float64()),
("aircraft_wd", pa.int64()),
("aircraft_ws", pa.int64()),
("aircraft_track_rate", pa.float64()),
("aircraft_nav_heading", pa.float64()),
("data_source", pa.string()),
])
def collect_trace_files_with_find(root_dir):
"""Find all trace_full_*.json files in the extracted directory."""
trace_dict: dict[str, str] = {}
cmd = ['find', root_dir, '-type', 'f', '-name', 'trace_full_*.json']
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
print(f"Error executing find: {result.stderr}")
return trace_dict
for file_path in result.stdout.strip().split('\n'):
if file_path:
filename = os.path.basename(file_path)
if filename.startswith("trace_full_") and filename.endswith(".json"):
icao = filename[len("trace_full_"):-len(".json")]
trace_dict[icao] = file_path
return trace_dict
def generate_version_dates(start_date: str, end_date: str) -> list:
"""Generate a list of dates from start_date to end_date inclusive."""
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
delta = end - start
return [start + timedelta(days=i) for i in range(delta.days + 1)]
def safe_process(fp):
"""Safely process a file, returning empty list on error."""
try:
return process_file(fp)
except Exception as e:
logging.error(f"Error processing {fp}: {e}")
return []
def rows_to_dataframe(rows: list) -> pd.DataFrame:
"""Convert list of rows to a pandas DataFrame."""
df = pd.DataFrame(rows, columns=COLUMNS)
return df
def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int):
"""Write a batch of rows to a Parquet file."""
if not rows:
return
df = rows_to_dataframe(rows)
# Ensure datetime column is timezone-aware
if not df['time'].dt.tz:
df['time'] = df['time'].dt.tz_localize('UTC')
parquet_path = os.path.join(PARQUET_DIR, f"{version_date}_batch_{batch_idx:04d}.parquet")
# Convert to PyArrow table and write
table = pa.Table.from_pandas(df, schema=PARQUET_SCHEMA, preserve_index=False)
pq.write_table(table, parquet_path, compression='snappy')
print(f"Written parquet batch {batch_idx} ({len(rows)} rows) to {parquet_path}")
def merge_parquet_files(version_date: str, delete_batches: bool = True):
"""Merge all batch parquet files for a version_date into a single file."""
pattern = os.path.join(PARQUET_DIR, f"{version_date}_batch_*.parquet")
batch_files = sorted(glob.glob(pattern))
if not batch_files:
print(f"No batch files found for {version_date}")
return None
print(f"Merging {len(batch_files)} batch files for {version_date}...")
# Read all batch files
tables = []
for f in batch_files:
tables.append(pq.read_table(f))
# Concatenate all tables
merged_table = pa.concat_tables(tables)
# Write merged file
merged_path = os.path.join(PARQUET_DIR, f"{version_date}.parquet")
pq.write_table(merged_table, merged_path, compression='snappy')
print(f"Merged parquet file written to {merged_path} ({merged_table.num_rows} total rows)")
# Optionally delete batch files
if delete_batches:
for f in batch_files:
os.remove(f)
print(f"Deleted {len(batch_files)} batch files")
return merged_path
def process_version_date(version_date: str, keep_folders: bool = False):
"""Download, extract, and process trace files for a single version date."""
print(f"\nProcessing version_date: {version_date}")
extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0")
def collect_trace_files_for_version_date(vd):
releases = fetch_releases(vd)
if len(releases) == 0:
print(f"No releases found for {vd}.")
return None
downloaded_files = []
for release in releases:
tag_name = release["tag_name"]
print(f"Processing release: {tag_name}")
# Only download prod-0 if available, else prod-0tmp
assets = release.get("assets", [])
normal_assets = [
a for a in assets
if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"]
]
tmp_assets = [
a for a in assets
if "planes-readsb-prod-0tmp" in a["name"]
]
use_assets = normal_assets if normal_assets else tmp_assets
for asset in use_assets:
asset_name = asset["name"]
asset_url = asset["browser_download_url"]
file_path = os.path.join(OUTPUT_DIR, asset_name)
result = download_asset(asset_url, file_path)
if result:
downloaded_files.append(file_path)
extract_split_archive(downloaded_files, extract_dir)
return collect_trace_files_with_find(extract_dir)
# Check if files already exist
pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*")
matches = [p for p in glob.glob(pattern) if os.path.isfile(p)]
if matches:
print(f"Found existing files for {version_date}:")
# Prefer non-tmp slices when reusing existing files
normal_matches = [
p for p in matches
if "-planes-readsb-prod-0." in os.path.basename(p)
and "tmp" not in os.path.basename(p)
]
downloaded_files = normal_matches if normal_matches else matches
extract_split_archive(downloaded_files, extract_dir)
trace_files = collect_trace_files_with_find(extract_dir)
else:
trace_files = collect_trace_files_for_version_date(version_date)
if trace_files is None or len(trace_files) == 0:
print(f"No trace files found for version_date: {version_date}")
return 0
file_list = list(trace_files.values())
start_time = time.perf_counter()
total_num_rows = 0
batch_rows = []
batch_idx = 0
# Process files in chunks
for offset in range(0, len(file_list), CHUNK_SIZE):
chunk = file_list[offset:offset + CHUNK_SIZE]
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as process_executor:
for rows in process_executor.map(safe_process, chunk):
if not rows:
continue
batch_rows.extend(rows)
if len(batch_rows) >= BATCH_SIZE:
total_num_rows += len(batch_rows)
write_batch_to_parquet(batch_rows, version_date, batch_idx)
batch_idx += 1
batch_rows = []
elapsed = time.perf_counter() - start_time
speed = total_num_rows / elapsed if elapsed > 0 else 0
print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)")
gc.collect()
# Final batch
if batch_rows:
total_num_rows += len(batch_rows)
write_batch_to_parquet(batch_rows, version_date, batch_idx)
elapsed = time.perf_counter() - start_time
speed = total_num_rows / elapsed if elapsed > 0 else 0
print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)")
print(f"Total rows processed for version_date {version_date}: {total_num_rows}")
# Merge batch files into a single parquet file
merge_parquet_files(version_date, delete_batches=True)
# Clean up extracted directory if not keeping
if not keep_folders and os.path.isdir(extract_dir):
import shutil
shutil.rmtree(extract_dir)
print(f"Cleaned up extraction directory: {extract_dir}")
return total_num_rows
def create_parquet_for_day(day, keep_folders: bool = False):
"""Create parquet file for a single day.
Args:
day: datetime object or string in 'YYYY-MM-DD' format
keep_folders: Whether to keep extracted folders after processing
Returns:
Path to the created parquet file, or None if failed
"""
from pathlib import Path
if isinstance(day, str):
day = datetime.strptime(day, "%Y-%m-%d")
version_date = f"v{day.strftime('%Y.%m.%d')}"
# Check if parquet already exists
parquet_path = Path(PARQUET_DIR) / f"{version_date}.parquet"
if parquet_path.exists():
print(f"Parquet file already exists: {parquet_path}")
return parquet_path
print(f"Creating parquet for {version_date}...")
rows_processed = process_version_date(version_date, keep_folders)
if rows_processed > 0 and parquet_path.exists():
return parquet_path
else:
return None
def main(start_date: str, end_date: str, keep_folders: bool = False):
"""Main function to download and convert adsb.lol data to Parquet."""
version_dates = [f"v{date.strftime('%Y.%m.%d')}" for date in generate_version_dates(start_date, end_date)]
print(f"Processing dates: {version_dates}")
total_rows_all = 0
for version_date in version_dates:
rows_processed = process_version_date(version_date, keep_folders)
total_rows_all += rows_processed
print(f"\n=== Summary ===")
print(f"Total dates processed: {len(version_dates)}")
print(f"Total rows written to Parquet: {total_rows_all}")
print(f"Parquet files location: {PARQUET_DIR}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout, force=True)
parser = argparse.ArgumentParser(
description="Download adsb.lol data and write to Parquet files"
)
parser.add_argument("start_date", type=str, help="Start date in YYYY-MM-DD format")
parser.add_argument("end_date", type=str, help="End date in YYYY-MM-DD format")
parser.add_argument("--keep-folders", action="store_true",
help="Keep extracted folders after processing")
args = parser.parse_args()
main(args.start_date, args.end_date, args.keep_folders)
+97
View File
@@ -0,0 +1,97 @@
"""
Reduce step: downloads all chunk CSVs from S3, combines them,
deduplicates across the full dataset, and uploads the final result.
Environment variables:
S3_BUCKET — bucket with intermediate results
RUN_ID — run identifier matching the map workers
GLOBAL_START_DATE — overall start date for output filename
GLOBAL_END_DATE — overall end date for output filename
"""
import os
from pathlib import Path
import boto3
import pandas as pd
COLUMNS = ["dbFlags", "ownOp", "year", "desc", "aircraft_category", "r", "t"]
def deduplicate_by_signature(df: pd.DataFrame) -> pd.DataFrame:
"""For each icao, keep only the earliest row with each unique signature."""
df["_signature"] = df[COLUMNS].astype(str).agg("|".join, axis=1)
df_deduped = df.groupby(["icao", "_signature"], as_index=False).first()
df_deduped = df_deduped.drop(columns=["_signature"])
df_deduped = df_deduped.sort_values("time")
return df_deduped
def main():
s3_bucket = os.environ["S3_BUCKET"]
run_id = os.environ.get("RUN_ID", "default")
global_start = os.environ["GLOBAL_START_DATE"]
global_end = os.environ["GLOBAL_END_DATE"]
s3 = boto3.client("s3")
prefix = f"intermediate/{run_id}/"
# List all chunk files for this run
paginator = s3.get_paginator("list_objects_v2")
chunk_keys = []
for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["Key"].endswith(".csv.gz"):
chunk_keys.append(obj["Key"])
chunk_keys.sort()
print(f"Found {len(chunk_keys)} chunks to combine")
if not chunk_keys:
print("No chunks found — nothing to reduce.")
return
# Download and concatenate all chunks
download_dir = Path("/tmp/chunks")
download_dir.mkdir(parents=True, exist_ok=True)
df_accumulated = pd.DataFrame()
for key in chunk_keys:
local_path = download_dir / Path(key).name
print(f"Downloading {key}...")
s3.download_file(s3_bucket, key, str(local_path))
df_chunk = pd.read_csv(local_path, compression="gzip", keep_default_na=False)
print(f" Loaded {len(df_chunk)} rows from {local_path.name}")
if df_accumulated.empty:
df_accumulated = df_chunk
else:
df_accumulated = pd.concat(
[df_accumulated, df_chunk], ignore_index=True
)
# Free disk space after loading
local_path.unlink()
print(f"Combined: {len(df_accumulated)} rows before dedup")
# Final global deduplication
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {len(df_accumulated)} rows")
# Write and upload final result
output_name = f"planequery_aircraft_adsb_{global_start}_{global_end}.csv.gz"
local_output = Path(f"/tmp/{output_name}")
df_accumulated.to_csv(local_output, index=False, compression="gzip")
final_key = f"final/{output_name}"
print(f"Uploading to s3://{s3_bucket}/{final_key}")
s3.upload_file(str(local_output), s3_bucket, final_key)
print(f"Final output: {len(df_accumulated)} records -> {final_key}")
if __name__ == "__main__":
main()
+2
View File
@@ -0,0 +1,2 @@
pandas>=2.0
boto3>=1.34
+4
View File
@@ -0,0 +1,4 @@
pandas>=2.0
clickhouse-connect>=0.7
boto3>=1.34
zstandard>=0.22
+94
View File
@@ -0,0 +1,94 @@
"""
Map worker: processes a date range chunk, uploads result to S3.
Environment variables:
START_DATE — inclusive, YYYY-MM-DD
END_DATE — exclusive, YYYY-MM-DD
S3_BUCKET — bucket for intermediate results
RUN_ID — unique run identifier for namespacing S3 keys
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
import boto3
import pandas as pd
from compress_adsb_to_aircraft_data import load_historical_for_day, COLUMNS
def deduplicate_by_signature(df: pd.DataFrame) -> pd.DataFrame:
"""For each icao, keep only the earliest row with each unique signature."""
df["_signature"] = df[COLUMNS].astype(str).agg("|".join, axis=1)
df_deduped = df.groupby(["icao", "_signature"], as_index=False).first()
df_deduped = df_deduped.drop(columns=["_signature"])
df_deduped = df_deduped.sort_values("time")
return df_deduped
def main():
start_date_str = os.environ["START_DATE"]
end_date_str = os.environ["END_DATE"]
s3_bucket = os.environ["S3_BUCKET"]
run_id = os.environ.get("RUN_ID", "default")
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
total_days = (end_date - start_date).days
print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})")
df_accumulated = pd.DataFrame()
current_date = start_date
while current_date < end_date:
day_str = current_date.strftime("%Y-%m-%d")
print(f" Loading {day_str}...")
try:
df_compressed = load_historical_for_day(current_date)
except Exception as e:
print(f" WARNING: Failed to load {day_str}: {e}")
current_date += timedelta(days=1)
continue
if df_accumulated.empty:
df_accumulated = df_compressed
else:
df_accumulated = pd.concat(
[df_accumulated, df_compressed], ignore_index=True
)
print(f" +{len(df_compressed)} rows (total: {len(df_accumulated)})")
# Delete local cache after each day to save disk in container
cache_dir = Path("data/adsb")
if cache_dir.exists():
import shutil
shutil.rmtree(cache_dir)
current_date += timedelta(days=1)
if df_accumulated.empty:
print("No data collected — exiting.")
return
# Deduplicate within this chunk
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {len(df_accumulated)} rows")
# Write to local file then upload to S3
local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz")
df_accumulated.to_csv(local_path, index=False, compression="gzip")
s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz"
print(f"Uploading to s3://{s3_bucket}/{s3_key}")
s3 = boto3.client("s3")
s3.upload_file(str(local_path), s3_bucket, s3_key)
print("Done.")
if __name__ == "__main__":
main()
-89
View File
@@ -1,89 +0,0 @@
from pathlib import Path
import pandas as pd
import re
from derive_from_faa_master_txt import concat_faa_historical_df
def concatenate_aircraft_csvs(
input_dir: Path = Path("data/concat"),
output_dir: Path = Path("data/planequery_aircraft"),
filename_pattern: str = r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv"
):
"""
Read all CSVs matching the pattern from input_dir in order,
concatenate them using concat_faa_historical_df, and output a single CSV.
Args:
input_dir: Directory containing the CSV files to concatenate
output_dir: Directory where the output CSV will be saved
filename_pattern: Regex pattern to match CSV filenames
"""
input_dir = Path(input_dir)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Find all matching CSV files
pattern = re.compile(filename_pattern)
csv_files = []
for csv_path in sorted(input_dir.glob("*.csv")):
match = pattern.search(csv_path.name)
if match:
start_date = match.group(1)
end_date = match.group(2)
csv_files.append((start_date, end_date, csv_path))
# Sort by start date, then end date
csv_files.sort(key=lambda x: (x[0], x[1]))
if not csv_files:
raise FileNotFoundError(f"No CSV files matching pattern found in {input_dir}")
print(f"Found {len(csv_files)} CSV files to concatenate")
# Read first CSV as base
first_start_date, first_end_date, first_path = csv_files[0]
print(f"Reading base file: {first_path.name}")
df_base = pd.read_csv(
first_path,
dtype={
'transponder_code': str,
'unique_regulatory_id': str,
'registrant_county': str
}
)
# Concatenate remaining CSVs
for start_date, end_date, csv_path in csv_files[1:]:
print(f"Concatenating: {csv_path.name}")
df_new = pd.read_csv(
csv_path,
dtype={
'transponder_code': str,
'unique_regulatory_id': str,
'registrant_county': str
}
)
df_base = concat_faa_historical_df(df_base, df_new)
# Verify monotonic increasing download_date
assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing"
# Output filename uses first start date and last end date
last_start_date, last_end_date, _ = csv_files[-1]
output_filename = f"planequery_aircraft_{first_start_date}_{last_end_date}.csv"
output_path = output_dir / output_filename
print(f"Writing output to: {output_path}")
df_base.to_csv(output_path, index=False)
print(f"Successfully concatenated {len(csv_files)} files into {output_filename}")
print(f"Total rows: {len(df_base)}")
return output_path
if __name__ == "__main__":
# Example usage - modify these paths as needed
concatenate_aircraft_csvs(
input_dir=Path("data/concat"),
output_dir=Path("data/planequery_aircraft")
)
@@ -0,0 +1,64 @@
from pathlib import Path
from datetime import datetime, timezone, timedelta
import sys
# Add adsb directory to path
sys.path.insert(0, str(Path(__file__).parent / "adsb")) # TODO: Fix this hacky path manipulation
from adsb.compress_adsb_to_aircraft_data import (
load_historical_for_day,
concat_compressed_dfs,
get_latest_aircraft_adsb_csv_df,
)
if __name__ == '__main__':
# Get yesterday's date (data for the previous day)
day = datetime.now(timezone.utc) - timedelta(days=1)
# Find a day with complete data
max_attempts = 2 # Don't look back more than a week
for attempt in range(max_attempts):
date_str = day.strftime("%Y-%m-%d")
print(f"Processing ADS-B data for {date_str}")
print("Loading new ADS-B data...")
df_new = load_historical_for_day(day)
if df_new.empty:
day = day - timedelta(days=1)
continue
max_time = df_new['time'].max()
max_time = max_time.replace(tzinfo=timezone.utc)
if max_time >= day.replace(hour=23, minute=59, second=59) - timedelta(minutes=5):
# Data is complete
break
print(f"WARNING: Latest data time is {max_time}, which is more than 5 minutes before end of day.")
day = day - timedelta(days=1)
else:
raise RuntimeError(f"Could not find complete data in the last {max_attempts} days")
try:
# Get the latest release data
print("Downloading latest ADS-B release...")
df_base, start_date_str = get_latest_aircraft_adsb_csv_df()
# Combine with historical data
print("Combining with historical data...")
df_combined = concat_compressed_dfs(df_base, df_new)
except Exception as e:
print(f"Error downloading latest ADS-B release: {e}")
df_combined = df_new
start_date_str = date_str
# Sort by time for consistent ordering
df_combined = df_combined.sort_values('time').reset_index(drop=True)
# Save the result
OUT_ROOT = Path("data/planequery_aircraft")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{date_str}.csv"
df_combined.to_csv(output_file, index=False)
print(f"Saved: {output_file}")
print(f"Total aircraft: {len(df_combined)}")
@@ -25,9 +25,9 @@ if not zip_path.exists():
OUT_ROOT = Path("data/planequery_aircraft")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df
from get_latest_planequery_aircraft_release import get_latest_aircraft_csv_df
from get_latest_planequery_aircraft_release import get_latest_aircraft_faa_csv_df
df_new = convert_faa_master_txt_to_df(zip_path, date_str)
df_base, start_date_str = get_latest_aircraft_csv_df()
df_base, start_date_str = get_latest_aircraft_faa_csv_df()
df_base = concat_faa_historical_df(df_base, df_new)
assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing"
df_base.to_csv(OUT_ROOT / f"planequery_aircraft_{start_date_str}_{date_str}.csv", index=False)
df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date_str}_{date_str}.csv", index=False)
+1 -1
View File
@@ -112,5 +112,5 @@ for date, sha in date_to_sha.items():
print(len(df_base), "total entries so far")
assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing"
df_base.to_csv(OUT_ROOT / f"planequery_aircraft_{start_date}_{end_date}.csv", index=False)
df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date}_{end_date}.csv", index=False)
# TODO: get average number of new rows per day.
+50 -5
View File
@@ -109,7 +109,7 @@ def download_latest_aircraft_csv(
repo: str = REPO,
) -> Path:
"""
Download the latest planequery_aircraft_*.csv file from the latest GitHub release.
Download the latest planequery_aircraft_faa_*.csv file from the latest GitHub release.
Args:
output_dir: Directory to save the downloaded file (default: "downloads")
@@ -120,25 +120,70 @@ def download_latest_aircraft_csv(
Path to the downloaded file
"""
assets = get_latest_release_assets(repo, github_token=github_token)
asset = pick_asset(assets, name_regex=r"^planequery_aircraft_.*\.csv$")
try:
asset = pick_asset(assets, name_regex=r"^planequery_aircraft_faa_.*\.csv$")
except FileNotFoundError:
# Fallback to old naming pattern
asset = pick_asset(assets, name_regex=r"^planequery_aircraft_\d{4}-\d{2}-\d{2}_.*\.csv$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
def get_latest_aircraft_csv_df():
def get_latest_aircraft_faa_csv_df():
csv_path = download_latest_aircraft_csv()
import pandas as pd
df = pd.read_csv(csv_path, dtype={'transponder_code': str,
'unique_regulatory_id': str,
'registrant_county': str})
df = df.fillna("")
# Extract date from filename pattern: planequery_aircraft_{date}_{date}.csv
match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path))
# Extract start date from filename pattern: planequery_aircraft_faa_{start_date}_{end_date}.csv
match = re.search(r"planequery_aircraft_faa_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
# Fallback to old naming pattern: planequery_aircraft_{start_date}_{end_date}.csv
match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
date_str = match.group(1)
return df, date_str
def download_latest_aircraft_adsb_csv(
output_dir: Path = Path("downloads"),
github_token: Optional[str] = None,
repo: str = REPO,
) -> Path:
"""
Download the latest planequery_aircraft_adsb_*.csv file from the latest GitHub release.
Args:
output_dir: Directory to save the downloaded file (default: "downloads")
github_token: Optional GitHub token for authentication
repo: GitHub repository in format "owner/repo" (default: REPO)
Returns:
Path to the downloaded file
"""
assets = get_latest_release_assets(repo, github_token=github_token)
asset = pick_asset(assets, name_regex=r"^planequery_aircraft_adsb_.*\.csv$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
def get_latest_aircraft_adsb_csv_df():
csv_path = download_latest_aircraft_adsb_csv()
import pandas as pd
df = pd.read_csv(csv_path)
df = df.fillna("")
# Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv
match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
date_str = match.group(1)
return df, date_str
if __name__ == "__main__":
download_latest_aircraft_csv()
+97
View File
@@ -0,0 +1,97 @@
"""
Generate Step Functions input and start the pipeline.
Usage:
python trigger_pipeline.py 2024-01-01 2025-01-01
python trigger_pipeline.py 2024-01-01 2025-01-01 --chunk-days 30
python trigger_pipeline.py 2024-01-01 2025-01-01 --dry-run
"""
import argparse
import json
import os
import uuid
from datetime import datetime, timedelta
import boto3
def generate_chunks(start_date: str, end_date: str, chunk_days: int = 1):
"""Split a date range into chunks of chunk_days."""
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
chunks = []
current = start
while current < end:
chunk_end = min(current + timedelta(days=chunk_days), end)
chunks.append({
"start_date": current.strftime("%Y-%m-%d"),
"end_date": chunk_end.strftime("%Y-%m-%d"),
})
current = chunk_end
return chunks
def main():
parser = argparse.ArgumentParser(description="Trigger ADS-B map-reduce pipeline")
parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)")
parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)")
parser.add_argument("--chunk-days", type=int, default=1,
help="Days per chunk (default: 1)")
parser.add_argument("--dry-run", action="store_true",
help="Print input JSON without starting execution")
args = parser.parse_args()
run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}"
chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days)
clickhouse_host = os.environ["CLICKHOUSE_HOST"]
clickhouse_username = os.environ["CLICKHOUSE_USERNAME"]
clickhouse_password = os.environ["CLICKHOUSE_PASSWORD"]
# Inject run_id and ClickHouse credentials into each chunk
for chunk in chunks:
chunk["run_id"] = run_id
chunk["clickhouse_host"] = clickhouse_host
chunk["clickhouse_username"] = clickhouse_username
chunk["clickhouse_password"] = clickhouse_password
sfn_input = {
"run_id": run_id,
"global_start_date": args.start_date,
"global_end_date": args.end_date,
"chunks": chunks,
}
print(f"Run ID: {run_id}")
print(f"Chunks: {len(chunks)} (at {args.chunk_days} days each)")
print(f"Max concurrency: 3 (enforced by Step Functions Map state)")
print()
print(json.dumps(sfn_input, indent=2))
if args.dry_run:
print("\n--dry-run: not starting execution")
return
client = boto3.client("stepfunctions")
# Find the state machine ARN
machines = client.list_state_machines()["stateMachines"]
arn = next(
m["stateMachineArn"]
for m in machines
if m["name"] == "adsb-map-reduce"
)
response = client.start_execution(
stateMachineArn=arn,
name=run_id,
input=json.dumps(sfn_input),
)
print(f"\nStarted execution: {response['executionArn']}")
if __name__ == "__main__":
main()