diff --git a/.github/ISSUE_TEMPLATE/community_submission.yaml b/.github/ISSUE_TEMPLATE/community_submission.yaml new file mode 100644 index 0000000..104aa8e --- /dev/null +++ b/.github/ISSUE_TEMPLATE/community_submission.yaml @@ -0,0 +1,89 @@ +name: Community submission (JSON) +description: Submit one or more community records (JSON) to be reviewed and approved. +title: "Community submission: " +labels: + - community + - submission +body: + - type: markdown + attributes: + value: | + Submit **one object** or an **array of objects** that matches the community submission schema. + + **Rules (enforced on review/automation):** + - Each object must include **at least one** of: + - `registration_number` + - `transponder_code_hex` (6 hex chars) + - `planequery_airframe_id` + - Your contributor name (entered below) will be applied to all objects. + - `contributor_uuid` is derived from your GitHub account automatically. + - `creation_timestamp` is created by the system (you may omit it). + + **Example: single object** + ```json + { + "transponder_code_hex": "a1b2c3" + } + ``` + + **Example: multiple objects (array)** + ```json + [ + { + "registration_number": "N123AB" + }, + { + "planequery_airframe_id": "cessna|172s|12345", + "transponder_code_hex": "0f1234" + } + ] + ``` + + - type: input + id: contributor_name + attributes: + label: Contributor Name + description: Your display name for attribution. Leave blank to use your GitHub username. Max 150 characters. + placeholder: "e.g., JamesBerry.com or leave blank" + validations: + required: false + + - type: textarea + id: submission_json + attributes: + label: Submission JSON + description: Paste either one JSON object or an array of JSON objects. Must be valid JSON. Do not include contributor_name or contributor_uuid in your JSON. + placeholder: | + Paste JSON here... + validations: + required: true + + - type: dropdown + id: submission_type + attributes: + label: What did you submit? + options: + - Single object + - Multiple objects (array) + validations: + required: true + + - type: checkboxes + id: confirmations + attributes: + label: Confirmations + options: + - label: "I confirm this is valid JSON (not JSONL) and matches the field names exactly." + required: true + - label: "I confirm `transponder_code_hex` values (if provided) are 6 hex characters." + required: true + - label: "I understand submissions are reviewed and may be rejected or require changes." + required: true + + - type: textarea + id: notes + attributes: + label: Notes (optional) + description: Any context, sources, or links that help validate your submission. + validations: + required: false \ No newline at end of file diff --git a/.github/workflows/approve-community-submission.yaml b/.github/workflows/approve-community-submission.yaml new file mode 100644 index 0000000..19f9fbd --- /dev/null +++ b/.github/workflows/approve-community-submission.yaml @@ -0,0 +1,46 @@ +name: Approve Community Submission + +on: + issues: + types: [labeled] + +permissions: + contents: write + pull-requests: write + issues: write + +jobs: + approve: + if: github.event.label.name == 'approved' && contains(github.event.issue.labels.*.name, 'validated') + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: pip install jsonschema + + - name: Get issue author ID + id: author + uses: actions/github-script@v7 + with: + script: | + const issue = context.payload.issue; + core.setOutput('username', issue.user.login); + core.setOutput('user_id', issue.user.id); + + - name: Process and create PR + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_REPOSITORY: ${{ github.repository }} + run: | + python -m src.contributions.approve_submission \ + --issue-number ${{ github.event.issue.number }} \ + --issue-body "${{ github.event.issue.body }}" \ + --author "${{ steps.author.outputs.username }}" \ + --author-id ${{ steps.author.outputs.user_id }} diff --git a/.github/workflows/planequery-aircraft-daily-release.yaml b/.github/workflows/planequery-aircraft-daily-release.yaml index 6e50e58..37c96d1 100644 --- a/.github/workflows/planequery-aircraft-daily-release.yaml +++ b/.github/workflows/planequery-aircraft-daily-release.yaml @@ -2,52 +2,314 @@ name: planequery-aircraft Daily Release on: schedule: - # 6:00pm UTC every day + # 6:00pm UTC every day - runs on default branch, triggers both - cron: "0 06 * * *" - workflow_dispatch: {} + workflow_dispatch: permissions: contents: write jobs: - build-and-release: + trigger-releases: runs-on: ubuntu-latest + if: github.event_name == 'schedule' + steps: + - name: Trigger main branch release + uses: actions/github-script@v7 + with: + script: | + await github.rest.actions.createWorkflowDispatch({ + owner: context.repo.owner, + repo: context.repo.repo, + workflow_id: 'planequery-aircraft-daily-release.yaml', + ref: 'main' + }); + + - name: Trigger develop branch release + uses: actions/github-script@v7 + with: + script: | + await github.rest.actions.createWorkflowDispatch({ + owner: context.repo.owner, + repo: context.repo.repo, + workflow_id: 'planequery-aircraft-daily-release.yaml', + ref: 'develop' + }); + build-faa: + runs-on: ubuntu-24.04-arm + if: github.event_name != 'schedule' steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 with: fetch-depth: 0 - name: Setup Python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: - python-version: "3.12" + python-version: "3.14" - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt - - name: Run daily release script + - name: Run FAA release script run: | python src/create_daily_planequery_aircraft_release.py ls -lah data/faa_releasable ls -lah data/planequery_aircraft + - name: Upload FAA artifacts + uses: actions/upload-artifact@v4 + with: + name: faa-release + path: | + data/planequery_aircraft/planequery_aircraft_faa_*.csv + data/faa_releasable/ReleasableAircraft_*.zip + retention-days: 1 + + adsb-extract: + runs-on: ubuntu-24.04-arm + if: github.event_name != 'schedule' + outputs: + manifest-exists: ${{ steps.check.outputs.exists }} + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.14" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download and extract ADS-B data + run: | + python -m src.adsb.download_and_list_icaos + ls -lah data/output/ + + - name: Check manifest exists + id: check + run: | + if ls data/output/icao_manifest_*.txt 1>/dev/null 2>&1; then + echo "exists=true" >> "$GITHUB_OUTPUT" + else + echo "exists=false" >> "$GITHUB_OUTPUT" + fi + + - name: Create tar of extracted data + run: | + cd data/output + tar -cf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt + ls -lah extracted_data.tar + + - name: Upload extracted data + uses: actions/upload-artifact@v4 + with: + name: adsb-extracted + path: data/output/extracted_data.tar + retention-days: 1 + compression-level: 0 # Already compressed trace files + + adsb-map: + runs-on: ubuntu-24.04-arm + needs: adsb-extract + if: github.event_name != 'schedule' && needs.adsb-extract.outputs.manifest-exists == 'true' + strategy: + fail-fast: false + matrix: + chunk: [0, 1, 2, 3] + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.14" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download extracted data + uses: actions/download-artifact@v4 + with: + name: adsb-extracted + path: data/output/ + + - name: Extract tar + run: | + cd data/output + tar -xf extracted_data.tar + rm extracted_data.tar + echo "=== Contents of data/output ===" + ls -lah + echo "=== Looking for manifest ===" + cat icao_manifest_*.txt | head -20 || echo "No manifest found" + echo "=== Looking for extracted dirs ===" + ls -d *-planes-readsb-prod-0* 2>/dev/null || echo "No extracted dirs" + + - name: Process chunk ${{ matrix.chunk }} + run: | + python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4 + mkdir -p data/output/adsb_chunks + ls -lah data/output/adsb_chunks/ || echo "No chunks created" + + - name: Upload chunk artifacts + uses: actions/upload-artifact@v4 + with: + name: adsb-chunk-${{ matrix.chunk }} + path: data/output/adsb_chunks/ + retention-days: 1 + + adsb-reduce: + runs-on: ubuntu-24.04-arm + needs: adsb-map + if: github.event_name != 'schedule' + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.14" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download all chunk artifacts + uses: actions/download-artifact@v4 + with: + pattern: adsb-chunk-* + path: data/output/adsb_chunks/ + merge-multiple: true + + - name: Debug downloaded files + run: | + echo "=== Listing data/ ===" + find data/ -type f 2>/dev/null | head -50 || echo "No files in data/" + echo "=== Looking for parquet files ===" + find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found" + + - name: Combine chunks to CSV + run: | + mkdir -p data/output/adsb_chunks + ls -lah data/output/adsb_chunks/ || echo "Directory empty or does not exist" + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks + ls -lah data/planequery_aircraft/ + + - name: Upload ADS-B artifacts + uses: actions/upload-artifact@v4 + with: + name: adsb-release + path: data/planequery_aircraft/planequery_aircraft_adsb_*.csv + retention-days: 1 + + build-community: + runs-on: ubuntu-latest + if: github.event_name != 'schedule' + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.14" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pandas + + - name: Run Community release script + run: | + python -m src.contributions.create_daily_community_release + ls -lah data/planequery_aircraft + + - name: Upload Community artifacts + uses: actions/upload-artifact@v4 + with: + name: community-release + path: data/planequery_aircraft/planequery_aircraft_community_*.csv + retention-days: 1 + + create-release: + runs-on: ubuntu-latest + needs: [build-faa, adsb-reduce, build-community] + if: github.event_name != 'schedule' + steps: + - name: Download FAA artifacts + uses: actions/download-artifact@v4 + with: + name: faa-release + path: artifacts/faa + + - name: Download ADS-B artifacts + uses: actions/download-artifact@v4 + with: + name: adsb-release + path: artifacts/adsb + + - name: Download Community artifacts + uses: actions/download-artifact@v4 + with: + name: community-release + path: artifacts/community + - name: Prepare release metadata id: meta run: | DATE=$(date -u +"%Y-%m-%d") - TAG="planequery-aircraft-${DATE}" - # Find the CSV file in data/planequery_aircraft matching the pattern - CSV_FILE=$(ls data/planequery_aircraft/planequery_aircraft_*_${DATE}.csv | head -1) - CSV_BASENAME=$(basename "$CSV_FILE") + BRANCH_NAME="${GITHUB_REF#refs/heads/}" + BRANCH_SUFFIX="" + if [ "$BRANCH_NAME" = "main" ]; then + BRANCH_SUFFIX="-main" + elif [ "$BRANCH_NAME" = "develop" ]; then + BRANCH_SUFFIX="-develop" + fi + TAG="planequery-aircraft-${DATE}${BRANCH_SUFFIX}" + + # Find files from artifacts + CSV_FILE_FAA=$(ls artifacts/faa/data/planequery_aircraft/planequery_aircraft_faa_*.csv | head -1) + CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA") + CSV_FILE_ADSB=$(ls artifacts/adsb/planequery_aircraft_adsb_*.csv | head -1) + CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB") + CSV_FILE_COMMUNITY=$(ls artifacts/community/planequery_aircraft_community_*.csv 2>/dev/null | head -1 || echo "") + CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "") + ZIP_FILE=$(ls artifacts/faa/data/faa_releasable/ReleasableAircraft_*.zip | head -1) + ZIP_BASENAME=$(basename "$ZIP_FILE") + echo "date=$DATE" >> "$GITHUB_OUTPUT" echo "tag=$TAG" >> "$GITHUB_OUTPUT" - echo "csv_file=$CSV_FILE" >> "$GITHUB_OUTPUT" - echo "csv_basename=$CSV_BASENAME" >> "$GITHUB_OUTPUT" - echo "name=planequery-aircraft snapshot ($DATE)" >> "$GITHUB_OUTPUT" + echo "csv_file_faa=$CSV_FILE_FAA" >> "$GITHUB_OUTPUT" + echo "csv_basename_faa=$CSV_BASENAME_FAA" >> "$GITHUB_OUTPUT" + echo "csv_file_adsb=$CSV_FILE_ADSB" >> "$GITHUB_OUTPUT" + echo "csv_basename_adsb=$CSV_BASENAME_ADSB" >> "$GITHUB_OUTPUT" + echo "csv_file_community=$CSV_FILE_COMMUNITY" >> "$GITHUB_OUTPUT" + echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT" + echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT" + echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT" + echo "name=planequery-aircraft snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT" - name: Create GitHub Release and upload assets uses: softprops/action-gh-release@v2 @@ -58,10 +320,14 @@ jobs: Automated daily snapshot generated at 06:00 UTC for ${{ steps.meta.outputs.date }}. Assets: - - ${{ steps.meta.outputs.csv_basename }} - - ReleasableAircraft_${{ steps.meta.outputs.date }}.zip + - ${{ steps.meta.outputs.csv_basename_faa }} + - ${{ steps.meta.outputs.csv_basename_adsb }} + - ${{ steps.meta.outputs.csv_basename_community }} + - ${{ steps.meta.outputs.zip_basename }} files: | - ${{ steps.meta.outputs.csv_file }} - data/faa_releasable/ReleasableAircraft_${{ steps.meta.outputs.date }}.zip + ${{ steps.meta.outputs.csv_file_faa }} + ${{ steps.meta.outputs.csv_file_adsb }} + ${{ steps.meta.outputs.csv_file_community }} + ${{ steps.meta.outputs.zip_file }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/validate-community-submission.yaml b/.github/workflows/validate-community-submission.yaml new file mode 100644 index 0000000..e217401 --- /dev/null +++ b/.github/workflows/validate-community-submission.yaml @@ -0,0 +1,30 @@ +name: Validate Community Submission + +on: + issues: + types: [opened, edited] + +jobs: + validate: + if: contains(github.event.issue.labels.*.name, 'submission') + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: pip install jsonschema + + - name: Validate submission + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_REPOSITORY: ${{ github.repository }} + run: | + python -m src.contributions.validate_submission \ + --issue-body "${{ github.event.issue.body }}" \ + --issue-number ${{ github.event.issue.number }} diff --git a/.gitignore b/.gitignore index 74326ca..7ef2c0b 100644 --- a/.gitignore +++ b/.gitignore @@ -218,4 +218,67 @@ __marimo__/ # Custom data/ .DS_Store -notebooks/ \ No newline at end of file + +# --- CDK --- +# VSCode extension + +# Store launch config in repo but not settings +.vscode/settings.json +/.favorites.json + +# TypeScript incremental build states +*.tsbuildinfo + +# Local state files & OS specifics +.DS_Store +node_modules/ +lerna-debug.log +dist/ +pack/ +.BUILD_COMPLETED +.local-npm/ +.tools/ +coverage/ +.nyc_output +.nycrc +.LAST_BUILD +*.sw[a-z] +*~ +.idea +*.iml +junit.xml + +# We don't want tsconfig at the root +/tsconfig.json + +# CDK Context & Staging files +cdk.context.json +.cdk.staging/ +cdk.out/ +*.tabl.json +cdk-integ.out.*/ + +# Yarn error log +yarn-error.log + +# VSCode history plugin +.vscode/.history/ + +# Cloud9 +.c9 +.nzm-* + +/.versionrc.json +RELEASE_NOTES.md + +# Produced by integ tests +read*lock + +# VSCode jest plugin +.test-output + +# Nx cache +.nx/ + +# jsii-rosetta files +type-fingerprints.txt \ No newline at end of file diff --git a/infra/app.py b/infra/app.py new file mode 100644 index 0000000..83509f6 --- /dev/null +++ b/infra/app.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +import os +import aws_cdk as cdk +from stack import AdsbProcessingStack + +app = cdk.App() +AdsbProcessingStack(app, "AdsbProcessingStack", env=cdk.Environment( + account=os.environ["CDK_DEFAULT_ACCOUNT"], + region=os.environ["CDK_DEFAULT_REGION"], +)) +app.synth() diff --git a/infra/cdk.json b/infra/cdk.json new file mode 100644 index 0000000..b4baa10 --- /dev/null +++ b/infra/cdk.json @@ -0,0 +1,3 @@ +{ + "app": "python3 app.py" +} diff --git a/infra/requirements.txt b/infra/requirements.txt new file mode 100644 index 0000000..32b3387 --- /dev/null +++ b/infra/requirements.txt @@ -0,0 +1,2 @@ +aws-cdk-lib>=2.170.0 +constructs>=10.0.0 diff --git a/infra/stack.py b/infra/stack.py new file mode 100644 index 0000000..a54bd79 --- /dev/null +++ b/infra/stack.py @@ -0,0 +1,213 @@ +import aws_cdk as cdk +from aws_cdk import ( + Stack, + Duration, + RemovalPolicy, + aws_s3 as s3, + aws_ecs as ecs, + aws_ec2 as ec2, + aws_ecr_assets, + aws_iam as iam, + aws_logs as logs, + aws_stepfunctions as sfn, + aws_stepfunctions_tasks as sfn_tasks, +) +from constructs import Construct +from pathlib import Path + + +class AdsbProcessingStack(Stack): + def __init__(self, scope: Construct, id: str, **kwargs): + super().__init__(scope, id, **kwargs) + + # --- S3 bucket for intermediate and final results --- + bucket = s3.Bucket( + self, "ResultsBucket", + bucket_name="planequery-aircraft-dev", + removal_policy=RemovalPolicy.DESTROY, + auto_delete_objects=True, + lifecycle_rules=[ + s3.LifecycleRule( + prefix="intermediate/", + expiration=Duration.days(7), + ) + ], + ) + + # --- Use default VPC (no additional cost) --- + vpc = ec2.Vpc.from_lookup( + self, "Vpc", + is_default=True, + ) + + # --- ECS Cluster --- + cluster = ecs.Cluster( + self, "Cluster", + vpc=vpc, + container_insights=True, + ) + + # --- Log group --- + log_group = logs.LogGroup( + self, "LogGroup", + log_group_name="/adsb-processing", + removal_policy=RemovalPolicy.DESTROY, + retention=logs.RetentionDays.TWO_WEEKS, + ) + + # --- Docker images (built from local Dockerfiles) --- + adsb_dir = str(Path(__file__).parent.parent / "src" / "adsb") + + worker_image = ecs.ContainerImage.from_asset( + adsb_dir, + file="Dockerfile.worker", + platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64, + ) + reducer_image = ecs.ContainerImage.from_asset( + adsb_dir, + file="Dockerfile.reducer", + platform=cdk.aws_ecr_assets.Platform.LINUX_ARM64, + ) + + # --- Task role (shared) --- + task_role = iam.Role( + self, "TaskRole", + assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"), + ) + bucket.grant_read_write(task_role) + + # --- MAP: worker task definition --- + map_task_def = ecs.FargateTaskDefinition( + self, "MapTaskDef", + cpu=4096, # 4 vCPU + memory_limit_mib=30720, # 30 GB + task_role=task_role, + runtime_platform=ecs.RuntimePlatform( + cpu_architecture=ecs.CpuArchitecture.ARM64, + operating_system_family=ecs.OperatingSystemFamily.LINUX, + ), + ) + map_container = map_task_def.add_container( + "worker", + image=worker_image, + logging=ecs.LogDrivers.aws_logs( + stream_prefix="map", + log_group=log_group, + ), + environment={ + "S3_BUCKET": bucket.bucket_name, + }, + ) + + # --- REDUCE: reducer task definition --- + reduce_task_def = ecs.FargateTaskDefinition( + self, "ReduceTaskDef", + cpu=4096, # 4 vCPU + memory_limit_mib=30720, # 30 GB — must hold full year in memory + task_role=task_role, + runtime_platform=ecs.RuntimePlatform( + cpu_architecture=ecs.CpuArchitecture.ARM64, + operating_system_family=ecs.OperatingSystemFamily.LINUX, + ), + ) + reduce_container = reduce_task_def.add_container( + "reducer", + image=reducer_image, + logging=ecs.LogDrivers.aws_logs( + stream_prefix="reduce", + log_group=log_group, + ), + environment={ + "S3_BUCKET": bucket.bucket_name, + }, + ) + + # --- Step Functions --- + + # Map task: run ECS Fargate for each date chunk + map_ecs_task = sfn_tasks.EcsRunTask( + self, "ProcessChunk", + integration_pattern=sfn.IntegrationPattern.RUN_JOB, + cluster=cluster, + task_definition=map_task_def, + launch_target=sfn_tasks.EcsFargateLaunchTarget( + platform_version=ecs.FargatePlatformVersion.LATEST, + ), + container_overrides=[ + sfn_tasks.ContainerOverride( + container_definition=map_container, + environment=[ + sfn_tasks.TaskEnvironmentVariable( + name="START_DATE", + value=sfn.JsonPath.string_at("$.start_date"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="END_DATE", + value=sfn.JsonPath.string_at("$.end_date"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="RUN_ID", + value=sfn.JsonPath.string_at("$.run_id"), + ), + ], + ) + ], + assign_public_ip=True, + subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), + result_path="$.task_result", + ) + + # Map state — max 3 concurrent workers + map_state = sfn.Map( + self, "FanOutChunks", + items_path="$.chunks", + max_concurrency=3, + result_path="$.map_results", + ) + map_state.item_processor(map_ecs_task) + + # Reduce task: combine all chunk CSVs + reduce_ecs_task = sfn_tasks.EcsRunTask( + self, "ReduceResults", + integration_pattern=sfn.IntegrationPattern.RUN_JOB, + cluster=cluster, + task_definition=reduce_task_def, + launch_target=sfn_tasks.EcsFargateLaunchTarget( + platform_version=ecs.FargatePlatformVersion.LATEST, + ), + container_overrides=[ + sfn_tasks.ContainerOverride( + container_definition=reduce_container, + environment=[ + sfn_tasks.TaskEnvironmentVariable( + name="RUN_ID", + value=sfn.JsonPath.string_at("$.run_id"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="GLOBAL_START_DATE", + value=sfn.JsonPath.string_at("$.global_start_date"), + ), + sfn_tasks.TaskEnvironmentVariable( + name="GLOBAL_END_DATE", + value=sfn.JsonPath.string_at("$.global_end_date"), + ), + ], + ) + ], + assign_public_ip=True, + subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), + ) + + # Chain: fan-out map → reduce + definition = map_state.next(reduce_ecs_task) + + sfn.StateMachine( + self, "Pipeline", + state_machine_name="adsb-map-reduce", + definition_body=sfn.DefinitionBody.from_chainable(definition), + timeout=Duration.hours(48), + ) + + # --- Outputs --- + cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name) + cdk.CfnOutput(self, "StateMachineName", value="adsb-map-reduce") diff --git a/notebooks/planequery_adsb_read.ipynb b/notebooks/planequery_adsb_read.ipynb new file mode 100644 index 0000000..f0d98e0 --- /dev/null +++ b/notebooks/planequery_adsb_read.ipynb @@ -0,0 +1,640 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "06ae0319", + "metadata": {}, + "outputs": [], + "source": [ + "import clickhouse_connect\n", + "client = clickhouse_connect.get_client(\n", + " host=os.environ[\"CLICKHOUSE_HOST\"],\n", + " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", + " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", + " secure=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "779710f0", + "metadata": {}, + "outputs": [], + "source": [ + "df = client.query_df(\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '2024-01-01 00:00:00' AND time < '2024-01-02 00:00:00'\")\n", + "df_copy = df.copy()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bf024da8", + "metadata": {}, + "outputs": [], + "source": [ + "# -- military = dbFlags & 1; interesting = dbFlags & 2; PIA = dbFlags & 4; LADD = dbFlags & 8;" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "270607b5", + "metadata": {}, + "outputs": [], + "source": [ + "df = load_raw_adsb_for_day(datetime(2024,1,1))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ac06a30e", + "metadata": {}, + "outputs": [], + "source": [ + "df['aircraft']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "91edab3e", + "metadata": {}, + "outputs": [], + "source": [ + "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", + "def compress_df(df):\n", + " icao = df.name\n", + " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", + " original_df = df.copy()\n", + " df = df.groupby(\"_signature\", as_index=False).last() # check if it works with both last and first.\n", + " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", + " def get_non_empty_dict(row):\n", + " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", + " \n", + " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", + " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", + " \n", + " # Check if row i's non-empty values are a subset of row j's non-empty values\n", + " def is_subset_of_any(idx):\n", + " row_dict = df.loc[idx, '_non_empty_dict']\n", + " row_count = df.loc[idx, '_non_empty_count']\n", + " \n", + " for other_idx in df.index:\n", + " if idx == other_idx:\n", + " continue\n", + " other_dict = df.loc[other_idx, '_non_empty_dict']\n", + " other_count = df.loc[other_idx, '_non_empty_count']\n", + " \n", + " # Check if all non-empty values in current row match those in other row\n", + " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", + " # If they match and other has more defined columns, current row is redundant\n", + " if other_count > row_count:\n", + " return True\n", + " return False\n", + " \n", + " # Keep rows that are not subsets of any other row\n", + " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", + " df = df[keep_mask]\n", + "\n", + " if len(df) > 1:\n", + " original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n", + " value_counts = original_df[\"_signature\"].value_counts()\n", + " max_signature = value_counts.idxmax()\n", + " df = df[df['_signature'] == max_signature]\n", + "\n", + " df['icao'] = icao\n", + " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", + " return df\n", + "\n", + "# df = df_copy\n", + "# df = df_copy.iloc[0:100000]\n", + "# df = df[df['r'] == \"N4131T\"]\n", + "# df = df[(df['icao'] == \"008081\")]\n", + "# df = df.iloc[0:500]\n", + "df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + "df = df.drop(columns=['aircraft'])\n", + "df = df.sort_values(['icao', 'time'])\n", + "df[COLUMNS] = df[COLUMNS].fillna('')\n", + "ORIGINAL_COLUMNS = df.columns.tolist()\n", + "df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", + "cols = df_compressed.columns.tolist()\n", + "cols.remove(\"icao\")\n", + "cols.insert(1, \"icao\")\n", + "df_compressed = df_compressed[cols]\n", + "df_compressed" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "efdfcb2c", + "metadata": {}, + "outputs": [], + "source": [ + "df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + "df[~df['aircraft_category'].isna()]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "495c5025", + "metadata": {}, + "outputs": [], + "source": [ + "# SOME KIND OF MAP REDUCE SYSTEM\n", + "import os\n", + "\n", + "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", + "def compress_df(df):\n", + " icao = df.name\n", + " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", + " \n", + " # Compute signature counts before grouping (avoid copy)\n", + " signature_counts = df[\"_signature\"].value_counts()\n", + " \n", + " df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n", + " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", + " def get_non_empty_dict(row):\n", + " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", + " \n", + " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", + " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", + " \n", + " # Check if row i's non-empty values are a subset of row j's non-empty values\n", + " def is_subset_of_any(idx):\n", + " row_dict = df.loc[idx, '_non_empty_dict']\n", + " row_count = df.loc[idx, '_non_empty_count']\n", + " \n", + " for other_idx in df.index:\n", + " if idx == other_idx:\n", + " continue\n", + " other_dict = df.loc[other_idx, '_non_empty_dict']\n", + " other_count = df.loc[other_idx, '_non_empty_count']\n", + " \n", + " # Check if all non-empty values in current row match those in other row\n", + " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", + " # If they match and other has more defined columns, current row is redundant\n", + " if other_count > row_count:\n", + " return True\n", + " return False\n", + " \n", + " # Keep rows that are not subsets of any other row\n", + " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", + " df = df[keep_mask]\n", + "\n", + " if len(df) > 1:\n", + " # Use pre-computed signature counts instead of original_df\n", + " remaining_sigs = df['_signature']\n", + " sig_counts = signature_counts[remaining_sigs]\n", + " max_signature = sig_counts.idxmax()\n", + " df = df[df['_signature'] == max_signature]\n", + "\n", + " df['icao'] = icao\n", + " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", + " return df\n", + "\n", + "# names of releases something like\n", + "# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n", + "\n", + "# Let's build historical first. \n", + "\n", + "_ch_client = None\n", + "\n", + "def _get_clickhouse_client():\n", + " \"\"\"Return a reusable ClickHouse client, with retry/backoff for transient DNS or connection errors.\"\"\"\n", + " global _ch_client\n", + " if _ch_client is not None:\n", + " return _ch_client\n", + "\n", + " import clickhouse_connect\n", + " import time\n", + "\n", + " max_retries = 5\n", + " for attempt in range(1, max_retries + 1):\n", + " try:\n", + " _ch_client = clickhouse_connect.get_client(\n", + " host=os.environ[\"CLICKHOUSE_HOST\"],\n", + " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", + " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", + " secure=True,\n", + " )\n", + " return _ch_client\n", + " except Exception as e:\n", + " wait = min(2 ** attempt, 30)\n", + " print(f\" ClickHouse connect attempt {attempt}/{max_retries} failed: {e}\")\n", + " if attempt == max_retries:\n", + " raise\n", + " print(f\" Retrying in {wait}s...\")\n", + " time.sleep(wait)\n", + "\n", + "\n", + "def load_raw_adsb_for_day(day):\n", + " \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n", + " from datetime import timedelta\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " import time\n", + " \n", + " start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n", + " end_time = start_time + timedelta(days=1)\n", + " \n", + " # Set up caching\n", + " cache_dir = Path(\"data/adsb\")\n", + " cache_dir.mkdir(parents=True, exist_ok=True)\n", + " cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n", + " \n", + " # Check if cache exists\n", + " if cache_file.exists():\n", + " print(f\" Loading from cache: {cache_file}\")\n", + " df = pd.read_csv(cache_file, compression='zstd')\n", + " df['time'] = pd.to_datetime(df['time'])\n", + " else:\n", + " # Format dates for the query\n", + " start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " \n", + " max_retries = 3\n", + " for attempt in range(1, max_retries + 1):\n", + " try:\n", + " client = _get_clickhouse_client()\n", + " print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n", + " df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n", + " break\n", + " except Exception as e:\n", + " wait = min(2 ** attempt, 30)\n", + " print(f\" Query attempt {attempt}/{max_retries} failed: {e}\")\n", + " if attempt == max_retries:\n", + " raise\n", + " # Reset client in case connection is stale\n", + " global _ch_client\n", + " _ch_client = None\n", + " print(f\" Retrying in {wait}s...\")\n", + " time.sleep(wait)\n", + " \n", + " # Save to cache\n", + " df.to_csv(cache_file, index=False, compression='zstd')\n", + " print(f\" Saved to cache: {cache_file}\")\n", + " \n", + " return df\n", + "\n", + "def load_historical_for_day(day):\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " \n", + " df = load_raw_adsb_for_day(day)\n", + " print(df)\n", + " df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + " df = df.drop(columns=['aircraft'])\n", + " df = df.sort_values(['icao', 'time'])\n", + " df[COLUMNS] = df[COLUMNS].fillna('')\n", + " df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", + " cols = df_compressed.columns.tolist()\n", + " cols.remove('time')\n", + " cols.insert(0, 'time')\n", + " cols.remove(\"icao\")\n", + " cols.insert(1, \"icao\")\n", + " df_compressed = df_compressed[cols]\n", + " return df_compressed\n", + "\n", + "\n", + "def concat_compressed_dfs(df_base, df_new):\n", + " \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n", + " import pandas as pd\n", + " \n", + " # Combine both dataframes\n", + " df_combined = pd.concat([df_base, df_new], ignore_index=True)\n", + " \n", + " # Sort by ICAO and time\n", + " df_combined = df_combined.sort_values(['icao', 'time'])\n", + " \n", + " # Fill NaN values\n", + " df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n", + " \n", + " # Apply compression logic per ICAO to get the best row\n", + " df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n", + " \n", + " # Sort by time\n", + " df_compressed = df_compressed.sort_values('time')\n", + " \n", + " return df_compressed\n", + "\n", + "\n", + "def get_latest_aircraft_adsb_csv_df():\n", + " \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n", + " from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n", + " \n", + " import pandas as pd\n", + " import re\n", + " \n", + " csv_path = download_latest_aircraft_adsb_csv()\n", + " df = pd.read_csv(csv_path)\n", + " df = df.fillna(\"\")\n", + " \n", + " # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n", + " match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n", + " if not match:\n", + " raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n", + " \n", + " date_str = match.group(1)\n", + " return df, date_str\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7f66acf7", + "metadata": {}, + "outputs": [], + "source": [ + "# SOME KIND OF MAP REDUCE SYSTEM\n", + "\n", + "\n", + "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", + "def compress_df(df):\n", + " icao = df.name\n", + " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", + " original_df = df.copy()\n", + " df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n", + " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", + " def get_non_empty_dict(row):\n", + " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", + " \n", + " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", + " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", + " \n", + " # Check if row i's non-empty values are a subset of row j's non-empty values\n", + " def is_subset_of_any(idx):\n", + " row_dict = df.loc[idx, '_non_empty_dict']\n", + " row_count = df.loc[idx, '_non_empty_count']\n", + " \n", + " for other_idx in df.index:\n", + " if idx == other_idx:\n", + " continue\n", + " other_dict = df.loc[other_idx, '_non_empty_dict']\n", + " other_count = df.loc[other_idx, '_non_empty_count']\n", + " \n", + " # Check if all non-empty values in current row match those in other row\n", + " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", + " # If they match and other has more defined columns, current row is redundant\n", + " if other_count > row_count:\n", + " return True\n", + " return False\n", + " \n", + " # Keep rows that are not subsets of any other row\n", + " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", + " df = df[keep_mask]\n", + "\n", + " if len(df) > 1:\n", + " original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n", + " value_counts = original_df[\"_signature\"].value_counts()\n", + " max_signature = value_counts.idxmax()\n", + " df = df[df['_signature'] == max_signature]\n", + "\n", + " df['icao'] = icao\n", + " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", + " return df\n", + "\n", + "# names of releases something like\n", + "# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n", + "\n", + "# Let's build historical first. \n", + "\n", + "def load_raw_adsb_for_day(day):\n", + " \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n", + " from datetime import timedelta\n", + " import clickhouse_connect\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " \n", + " start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n", + " end_time = start_time + timedelta(days=1)\n", + " \n", + " # Set up caching\n", + " cache_dir = Path(\"data/adsb\")\n", + " cache_dir.mkdir(parents=True, exist_ok=True)\n", + " cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n", + " \n", + " # Check if cache exists\n", + " if cache_file.exists():\n", + " print(f\" Loading from cache: {cache_file}\")\n", + " df = pd.read_csv(cache_file, compression='zstd')\n", + " df['time'] = pd.to_datetime(df['time'])\n", + " else:\n", + " # Format dates for the query\n", + " start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n", + " \n", + " client = clickhouse_connect.get_client(\n", + " host=os.environ[\"CLICKHOUSE_HOST\"],\n", + " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", + " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", + " secure=True,\n", + " )\n", + " print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n", + " df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n", + " \n", + " # Save to cache\n", + " df.to_csv(cache_file, index=False, compression='zstd')\n", + " print(f\" Saved to cache: {cache_file}\")\n", + " \n", + " return df\n", + "\n", + "def load_historical_for_day(day):\n", + " from pathlib import Path\n", + " import pandas as pd\n", + " \n", + " df = load_raw_adsb_for_day(day)\n", + " \n", + " df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", + " df = df.drop(columns=['aircraft'])\n", + " df = df.sort_values(['icao', 'time'])\n", + " df[COLUMNS] = df[COLUMNS].fillna('')\n", + " df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", + " cols = df_compressed.columns.tolist()\n", + " cols.remove('time')\n", + " cols.insert(0, 'time')\n", + " cols.remove(\"icao\")\n", + " cols.insert(1, \"icao\")\n", + " df_compressed = df_compressed[cols]\n", + " return df_compressed\n", + "\n", + "\n", + "def concat_compressed_dfs(df_base, df_new):\n", + " \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n", + " import pandas as pd\n", + " \n", + " # Combine both dataframes\n", + " df_combined = pd.concat([df_base, df_new], ignore_index=True)\n", + " \n", + " # Sort by ICAO and time\n", + " df_combined = df_combined.sort_values(['icao', 'time'])\n", + " \n", + " # Fill NaN values\n", + " df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n", + " \n", + " # Apply compression logic per ICAO to get the best row\n", + " df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n", + " \n", + " # Sort by time\n", + " df_compressed = df_compressed.sort_values('time')\n", + " \n", + " return df_compressed\n", + "\n", + "\n", + "def get_latest_aircraft_adsb_csv_df():\n", + " \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n", + " from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n", + " \n", + " import pandas as pd\n", + " import re\n", + " \n", + " csv_path = download_latest_aircraft_adsb_csv()\n", + " df = pd.read_csv(csv_path)\n", + " df = df.fillna(\"\")\n", + " \n", + " # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n", + " match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n", + " if not match:\n", + " raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n", + " \n", + " date_str = match.group(1)\n", + " return df, date_str\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e14c8363", + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "df = load_historical_for_day(datetime(2024,1,1))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3874ba4d", + "metadata": {}, + "outputs": [], + "source": [ + "len(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bcae50ad", + "metadata": {}, + "outputs": [], + "source": [ + "df[(df['icao'] == \"008081\")]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "50921c86", + "metadata": {}, + "outputs": [], + "source": [ + "df[df['icao'] == \"a4e1d2\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8194d9aa", + "metadata": {}, + "outputs": [], + "source": [ + "df[df['r'] == \"N4131T\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1e3b7aa2", + "metadata": {}, + "outputs": [], + "source": [ + "df_compressed[df_compressed['icao'].duplicated(keep=False)]\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "40613bc1", + "metadata": {}, + "outputs": [], + "source": [ + "import gzip\n", + "import json\n", + "\n", + "path = \"/Users/jonahgoode/Downloads/test_extract/traces/fb/trace_full_acbbfb.json\"\n", + "\n", + "with gzip.open(path, \"rt\", encoding=\"utf-8\") as f:\n", + " data = json.load(f)\n", + "\n", + "print(type(data))\n", + "# use `data` here\n", + "import json\n", + "print(json.dumps(data, indent=2)[:2000])\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "320109b2", + "metadata": {}, + "outputs": [], + "source": [ + "# First, load the JSON to inspect its structure\n", + "import json\n", + "with open(\"/Users/jonahgoode/Documents/PlaneQuery/Other-Code/readsb-protobuf/webapp/src/db/aircrafts.json\", 'r') as f:\n", + " data = json.load(f)\n", + "\n", + "# Check the structure\n", + "print(type(data))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "590134f4", + "metadata": {}, + "outputs": [], + "source": [ + "data['AC97E3']" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/requirements.txt b/requirements.txt index c30c4f5..6a4ec9a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ faa-aircraft-registry==0.1.0 pandas==3.0.0 - +pyarrow==23.0.0 +orjson==3.11.7 +polars==1.38.1 diff --git a/schemas/community_submission.v1.schema.json b/schemas/community_submission.v1.schema.json new file mode 100644 index 0000000..18609a4 --- /dev/null +++ b/schemas/community_submission.v1.schema.json @@ -0,0 +1,80 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "PlaneQuery Aircraft Community Submission (v1)", + "type": "object", + "additionalProperties": false, + + "properties": { + "registration_number": { + "type": "string", + "minLength": 1 + }, + "transponder_code_hex": { + "type": "string", + "pattern": "^[0-9A-Fa-f]{6}$" + }, + "planequery_airframe_id": { + "type": "string", + "minLength": 1 + }, + + "contributor_uuid": { + "type": "string", + "format": "uuid" + }, + "contributor_name": { + "type": "string", + "minLength": 0, + "maxLength": 150, + "description": "Display name (may be blank)" + }, + + "creation_timestamp": { + "type": "string", + "format": "date-time", + "description": "Set by the system when the submission is persisted/approved.", + "readOnly": true + }, + + "tags": { + "type": "object", + "description": "Additional community-defined tags as key/value pairs (values may be scalar, array, or object).", + "propertyNames": { + "type": "string", + "pattern": "^[a-z][a-z0-9_]{0,63}$" + }, + "additionalProperties": { "$ref": "#/$defs/tagValue" } + } + }, + + "allOf": [ + { + "anyOf": [ + { "required": ["registration_number"] }, + { "required": ["transponder_code_hex"] }, + { "required": ["planequery_airframe_id"] } + ] + } + ], + + "$defs": { + "tagScalar": { + "type": ["string", "number", "integer", "boolean", "null"] + }, + "tagValue": { + "anyOf": [ + { "$ref": "#/$defs/tagScalar" }, + { + "type": "array", + "maxItems": 50, + "items": { "$ref": "#/$defs/tagScalar" } + }, + { + "type": "object", + "maxProperties": 50, + "additionalProperties": { "$ref": "#/$defs/tagScalar" } + } + ] + } + } +} \ No newline at end of file diff --git a/src/adsb/Dockerfile.reducer b/src/adsb/Dockerfile.reducer new file mode 100644 index 0000000..b375f46 --- /dev/null +++ b/src/adsb/Dockerfile.reducer @@ -0,0 +1,11 @@ +FROM --platform=linux/arm64 python:3.12-slim + +WORKDIR /app + +COPY requirements.reducer.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY compress_adsb_to_aircraft_data.py . +COPY reducer.py . + +CMD ["python", "-u", "reducer.py"] diff --git a/src/adsb/Dockerfile.worker b/src/adsb/Dockerfile.worker new file mode 100644 index 0000000..dc4336d --- /dev/null +++ b/src/adsb/Dockerfile.worker @@ -0,0 +1,12 @@ +FROM --platform=linux/arm64 python:3.12-slim + +WORKDIR /app + +COPY requirements.worker.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY compress_adsb_to_aircraft_data.py . +COPY download_adsb_data_to_parquet.py . +COPY worker.py . + +CMD ["python", "-u", "worker.py"] diff --git a/src/adsb/combine_chunks_to_csv.py b/src/adsb/combine_chunks_to_csv.py new file mode 100644 index 0000000..d8941d3 --- /dev/null +++ b/src/adsb/combine_chunks_to_csv.py @@ -0,0 +1,205 @@ +""" +Combines chunk parquet files and compresses to final aircraft CSV. +This is the reduce phase of the map-reduce pipeline. + +Memory-efficient: processes each chunk separately, compresses, then combines. + +Usage: + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks +""" +import gc +import os +import sys +import glob +import argparse +from datetime import datetime, timedelta + +import polars as pl + +from src.adsb.download_adsb_data_to_parquet import OUTPUT_DIR, get_resource_usage +from src.adsb.compress_adsb_to_aircraft_data import compress_multi_icao_df, COLUMNS + + +DEFAULT_CHUNK_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks") +FINAL_OUTPUT_DIR = "./data/planequery_aircraft" +os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True) + + +def get_target_day() -> datetime: + """Get yesterday's date (the day we're processing).""" + return datetime.utcnow() - timedelta(days=1) + + +def process_single_chunk(chunk_path: str) -> pl.DataFrame: + """Load and compress a single chunk parquet file.""" + print(f"Processing {os.path.basename(chunk_path)}... | {get_resource_usage()}") + + # Load chunk - only columns we need + needed_columns = ['time', 'icao'] + COLUMNS + df = pl.read_parquet(chunk_path, columns=needed_columns) + print(f" Loaded {len(df)} rows") + + # Compress to aircraft records (one per ICAO) using shared function + compressed = compress_multi_icao_df(df, verbose=True) + print(f" Compressed to {len(compressed)} aircraft records") + + del df + gc.collect() + + return compressed + + +def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFrame: + """Combine multiple compressed DataFrames. + + Since chunks are partitioned by ICAO hash, each ICAO only appears in one chunk. + No deduplication needed here - just concatenate. + """ + print(f"Combining {len(compressed_dfs)} compressed chunks... | {get_resource_usage()}") + + # Concat all + combined = pl.concat(compressed_dfs) + print(f"Combined: {len(combined)} records") + + return combined + + +def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame: + """Download base release and merge with new data.""" + from src.get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv + + print("Downloading base ADS-B release...") + try: + base_path = download_latest_aircraft_adsb_csv( + output_dir="./data/planequery_aircraft_base" + ) + print(f"Download returned: {base_path}") + + if base_path and os.path.exists(str(base_path)): + print(f"Loading base release from {base_path}") + base_df = pl.read_csv(base_path) + print(f"Base release has {len(base_df)} records") + + # Ensure columns match + base_cols = set(base_df.columns) + new_cols = set(compressed_df.columns) + print(f"Base columns: {sorted(base_cols)}") + print(f"New columns: {sorted(new_cols)}") + + # Add missing columns + for col in new_cols - base_cols: + base_df = base_df.with_columns(pl.lit(None).alias(col)) + for col in base_cols - new_cols: + compressed_df = compressed_df.with_columns(pl.lit(None).alias(col)) + + # Reorder columns to match + compressed_df = compressed_df.select(base_df.columns) + + # Concat and deduplicate by icao (keep new data - it comes last) + combined = pl.concat([base_df, compressed_df]) + print(f"After concat: {len(combined)} records") + + deduplicated = combined.unique(subset=["icao"], keep="last") + + print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup") + + del base_df, combined + gc.collect() + + return deduplicated + else: + print(f"No base release found at {base_path}, using only new data") + return compressed_df + except Exception as e: + import traceback + print(f"Failed to download base release: {e}") + traceback.print_exc() + return compressed_df + + +def cleanup_chunks(date_str: str, chunks_dir: str): + """Delete chunk parquet files after successful merge.""" + pattern = os.path.join(chunks_dir, f"chunk_*_{date_str}.parquet") + chunk_files = glob.glob(pattern) + for f in chunk_files: + try: + os.remove(f) + print(f"Deleted {f}") + except Exception as e: + print(f"Failed to delete {f}: {e}") + + +def main(): + parser = argparse.ArgumentParser(description="Combine chunk parquets to final CSV") + parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files") + parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release") + parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging") + args = parser.parse_args() + + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + + date_str = target_day.strftime("%Y-%m-%d") + chunks_dir = args.chunks_dir + + print(f"Combining chunks for {date_str}") + print(f"Chunks directory: {chunks_dir}") + print(f"Resource usage at start: {get_resource_usage()}") + + # Find chunk files + pattern = os.path.join(chunks_dir, f"chunk_*_{date_str}.parquet") + chunk_files = sorted(glob.glob(pattern)) + + if not chunk_files: + print(f"No chunk files found matching: {pattern}") + sys.exit(1) + + print(f"Found {len(chunk_files)} chunk files") + + # Process each chunk separately to save memory + compressed_chunks = [] + for chunk_path in chunk_files: + compressed = process_single_chunk(chunk_path) + compressed_chunks.append(compressed) + gc.collect() + + # Combine all compressed chunks + combined = combine_compressed_chunks(compressed_chunks) + + # Free memory from individual chunks + del compressed_chunks + gc.collect() + print(f"After combining: {get_resource_usage()}") + + # Merge with base release + if not args.skip_base: + combined = download_and_merge_base_release(combined) + + # Convert list columns to strings for CSV compatibility + for col in combined.columns: + if combined[col].dtype == pl.List: + combined = combined.with_columns( + pl.col(col).list.join(",").alias(col) + ) + + # Sort by time for consistent output + if 'time' in combined.columns: + combined = combined.sort('time') + + # Write final CSV + output_path = os.path.join(FINAL_OUTPUT_DIR, f"planequery_aircraft_adsb_{date_str}.csv") + combined.write_csv(output_path) + print(f"Wrote {len(combined)} records to {output_path}") + + # Cleanup + if not args.keep_chunks: + cleanup_chunks(date_str, chunks_dir) + + print(f"Done! | {get_resource_usage()}") + + +if __name__ == "__main__": + main() diff --git a/src/adsb/compress_adsb_to_aircraft_data.py b/src/adsb/compress_adsb_to_aircraft_data.py new file mode 100644 index 0000000..5ae58cd --- /dev/null +++ b/src/adsb/compress_adsb_to_aircraft_data.py @@ -0,0 +1,274 @@ +# Shared compression logic for ADS-B aircraft data +import os +import polars as pl + +COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't'] + + +def deduplicate_by_signature(df: pl.DataFrame) -> pl.DataFrame: + """For each icao, keep only the earliest row with each unique signature. + + This is used for deduplicating across multiple compressed chunks. + """ + # Create signature column + df = df.with_columns( + pl.concat_str([pl.col(c).cast(pl.Utf8).fill_null("") for c in COLUMNS], separator="|").alias("_signature") + ) + # Group by icao and signature, take first row (earliest due to time sort) + df = df.sort("time") + df_deduped = df.group_by(["icao", "_signature"]).first() + df_deduped = df_deduped.drop("_signature") + df_deduped = df_deduped.sort("time") + return df_deduped + + +def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame: + """Compress a single ICAO group to its most informative row using Polars.""" + # Create signature string + df = df.with_columns( + pl.concat_str([pl.col(c).cast(pl.Utf8) for c in COLUMNS], separator="|").alias("_signature") + ) + + # Compute signature counts + signature_counts = df.group_by("_signature").len().rename({"len": "_sig_count"}) + + # Group by signature and take first row + df = df.group_by("_signature").first() + + if df.height == 1: + # Only one unique signature, return it + result = df.drop("_signature").with_columns(pl.lit(icao).alias("icao")) + return result + + # For each row, create dict of non-empty column values and check subsets + # Convert to list of dicts for subset checking (same logic as pandas version) + rows_data = [] + for row in df.iter_rows(named=True): + non_empty = {col: row[col] for col in COLUMNS if row[col] != '' and row[col] is not None} + rows_data.append({ + 'signature': row['_signature'], + 'non_empty_dict': non_empty, + 'non_empty_count': len(non_empty), + 'row_data': row + }) + + # Check if row i's non-empty values are a subset of row j's non-empty values + def is_subset_of_any(idx): + row_dict = rows_data[idx]['non_empty_dict'] + row_count = rows_data[idx]['non_empty_count'] + + for other_idx, other_data in enumerate(rows_data): + if idx == other_idx: + continue + other_dict = other_data['non_empty_dict'] + other_count = other_data['non_empty_count'] + + # Check if all non-empty values in current row match those in other row + if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()): + # If they match and other has more defined columns, current row is redundant + if other_count > row_count: + return True + return False + + # Keep rows that are not subsets of any other row + keep_indices = [i for i in range(len(rows_data)) if not is_subset_of_any(i)] + + if len(keep_indices) == 0: + keep_indices = [0] # Fallback: keep first row + + remaining_signatures = [rows_data[i]['signature'] for i in keep_indices] + df = df.filter(pl.col("_signature").is_in(remaining_signatures)) + + if df.height > 1: + # Use signature counts to pick the most frequent one + df = df.join(signature_counts, on="_signature", how="left") + max_count = df["_sig_count"].max() + df = df.filter(pl.col("_sig_count") == max_count).head(1) + df = df.drop("_sig_count") + + result = df.drop("_signature").with_columns(pl.lit(icao).alias("icao")) + + # Ensure empty strings are preserved + for col in COLUMNS: + if col in result.columns: + result = result.with_columns(pl.col(col).fill_null("")) + + return result + + +def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame: + """Compress a DataFrame with multiple ICAOs to one row per ICAO. + + This is the main entry point for compressing ADS-B data. + Used by both daily GitHub Actions runs and historical AWS runs. + + Args: + df: DataFrame with columns ['time', 'icao'] + COLUMNS + verbose: Whether to print progress + + Returns: + Compressed DataFrame with one row per ICAO + """ + if df.height == 0: + return df + + # Sort by icao and time + df = df.sort(['icao', 'time']) + + # Fill null values with empty strings for COLUMNS + for col in COLUMNS: + if col in df.columns: + df = df.with_columns(pl.col(col).cast(pl.Utf8).fill_null("")) + + # First pass: quick deduplication of exact duplicates + df = df.unique(subset=['icao'] + COLUMNS, keep='first') + if verbose: + print(f"After quick dedup: {df.height} records") + + # Second pass: sophisticated compression per ICAO + if verbose: + print("Compressing per ICAO...") + + # Process each ICAO group + icao_groups = df.partition_by('icao', as_dict=True, maintain_order=True) + compressed_dfs = [] + + for icao_key, group_df in icao_groups.items(): + # partition_by with as_dict=True returns tuple keys, extract first element + icao = icao_key[0] if isinstance(icao_key, tuple) else icao_key + compressed = compress_df_polars(group_df, str(icao)) + compressed_dfs.append(compressed) + + if compressed_dfs: + df_compressed = pl.concat(compressed_dfs) + else: + df_compressed = df.head(0) # Empty with same schema + + if verbose: + print(f"After compress: {df_compressed.height} records") + + # Reorder columns: time first, then icao + cols = df_compressed.columns + ordered_cols = ['time', 'icao'] + [c for c in cols if c not in ['time', 'icao']] + df_compressed = df_compressed.select(ordered_cols) + + return df_compressed + + +def load_raw_adsb_for_day(day): + """Load raw ADS-B data for a day from parquet file.""" + from datetime import timedelta + from pathlib import Path + + start_time = day.replace(hour=0, minute=0, second=0, microsecond=0) + + # Check for parquet file first + version_date = f"v{start_time.strftime('%Y.%m.%d')}" + parquet_file = Path(f"data/output/parquet_output/{version_date}.parquet") + + if not parquet_file.exists(): + # Try to generate parquet file by calling the download function + print(f" Parquet file not found: {parquet_file}") + print(f" Attempting to download and generate parquet for {start_time.strftime('%Y-%m-%d')}...") + + from download_adsb_data_to_parquet import create_parquet_for_day + result_path = create_parquet_for_day(start_time, keep_folders=False) + + if result_path: + print(f" Successfully generated parquet file: {result_path}") + else: + raise Exception("Failed to generate parquet file") + + if parquet_file.exists(): + print(f" Loading from parquet: {parquet_file}") + df = pl.read_parquet( + parquet_file, + columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category'] + ) + + # Convert to timezone-naive datetime + if df["time"].dtype == pl.Datetime: + df = df.with_columns(pl.col("time").dt.replace_time_zone(None)) + + return df + else: + # Return empty DataFrame if parquet file doesn't exist + print(f" No data available for {start_time.strftime('%Y-%m-%d')}") + return pl.DataFrame(schema={ + 'time': pl.Datetime, + 'icao': pl.Utf8, + 'r': pl.Utf8, + 't': pl.Utf8, + 'dbFlags': pl.Int64, + 'ownOp': pl.Utf8, + 'year': pl.Int64, + 'desc': pl.Utf8, + 'aircraft_category': pl.Utf8 + }) + + +def load_historical_for_day(day): + """Load and compress historical ADS-B data for a day.""" + df = load_raw_adsb_for_day(day) + if df.height == 0: + return df + + print(f"Loaded {df.height} raw records for {day.strftime('%Y-%m-%d')}") + + # Use shared compression function + return compress_multi_icao_df(df, verbose=True) + + +def concat_compressed_dfs(df_base, df_new): + """Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.""" + # Combine both dataframes + df_combined = pl.concat([df_base, df_new]) + + # Sort by ICAO and time + df_combined = df_combined.sort(['icao', 'time']) + + # Fill null values + for col in COLUMNS: + if col in df_combined.columns: + df_combined = df_combined.with_columns(pl.col(col).fill_null("")) + + # Apply compression logic per ICAO to get the best row + icao_groups = df_combined.partition_by('icao', as_dict=True, maintain_order=True) + compressed_dfs = [] + + for icao, group_df in icao_groups.items(): + compressed = compress_df_polars(group_df, icao) + compressed_dfs.append(compressed) + + if compressed_dfs: + df_compressed = pl.concat(compressed_dfs) + else: + df_compressed = df_combined.head(0) + + # Sort by time + df_compressed = df_compressed.sort('time') + + return df_compressed + + +def get_latest_aircraft_adsb_csv_df(): + """Download and load the latest ADS-B CSV from GitHub releases.""" + from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv + import re + + csv_path = download_latest_aircraft_adsb_csv() + df = pl.read_csv(csv_path, null_values=[""]) + + # Fill nulls with empty strings + for col in df.columns: + if df[col].dtype == pl.Utf8: + df = df.with_columns(pl.col(col).fill_null("")) + + # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + if not match: + raise ValueError(f"Could not extract date from filename: {csv_path.name}") + + date_str = match.group(1) + return df, date_str + diff --git a/src/adsb/download_adsb_data_to_parquet.py b/src/adsb/download_adsb_data_to_parquet.py new file mode 100644 index 0000000..4be76d6 --- /dev/null +++ b/src/adsb/download_adsb_data_to_parquet.py @@ -0,0 +1,739 @@ +""" +Downloads adsb.lol data and writes to Parquet files. + +Usage: + python -m src.process_historical_adsb_data.download_to_parquet 2025-01-01 2025-01-02 + +This will download trace data for the specified date range and output Parquet files. + +This file is self-contained and does not import from other project modules. +""" +import gc +import glob +import gzip +import resource +import shutil +import sys +import logging +import time +import re +import signal +import concurrent.futures +import subprocess +import os +import argparse +import datetime as dt +from datetime import datetime, timedelta, timezone +import urllib.request +import urllib.error + +import orjson +import pyarrow as pa +import pyarrow.parquet as pq + + +# ============================================================================ +# Configuration +# ============================================================================ + +OUTPUT_DIR = "./data/output" +os.makedirs(OUTPUT_DIR, exist_ok=True) + +PARQUET_DIR = os.path.join(OUTPUT_DIR, "parquet_output") +os.makedirs(PARQUET_DIR, exist_ok=True) + +TOKEN = os.environ.get('GITHUB_TOKEN') # Optional: for higher GitHub API rate limits +HEADERS = {"Authorization": f"token {TOKEN}"} if TOKEN else {} + + +def get_resource_usage() -> str: + """Get current RAM and disk usage as a formatted string.""" + # RAM usage (RSS = Resident Set Size) + ram_bytes = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + # On macOS, ru_maxrss is in bytes; on Linux, it's in KB + if sys.platform == 'darwin': + ram_gb = ram_bytes / (1024**3) + else: + ram_gb = ram_bytes / (1024**2) # Convert KB to GB + + # Disk usage + disk = shutil.disk_usage('.') + disk_free_gb = disk.free / (1024**3) + disk_total_gb = disk.total / (1024**3) + + return f"RAM: {ram_gb:.2f}GB | Disk: {disk_free_gb:.1f}GB free / {disk_total_gb:.1f}GB total" + + +# ============================================================================ +# GitHub Release Fetching and Downloading +# ============================================================================ + +class DownloadTimeoutException(Exception): + pass + + +def timeout_handler(signum, frame): + raise DownloadTimeoutException("Download timed out after 40 seconds") + + +def fetch_releases(version_date: str) -> list: + """Fetch GitHub releases for a given version date from adsblol.""" + year = version_date.split('.')[0][1:] + if version_date == "v2024.12.31": + year = "2025" + BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases" + PATTERN = f"{version_date}-planes-readsb-prod-0" + releases = [] + page = 1 + + while True: + max_retries = 10 + retry_delay = 60 + + for attempt in range(1, max_retries + 1): + try: + req = urllib.request.Request(f"{BASE_URL}?page={page}", headers=HEADERS) + with urllib.request.urlopen(req) as response: + if response.status == 200: + data = orjson.loads(response.read()) + break + else: + print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status} {response.reason}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry...") + time.sleep(retry_delay) + else: + print(f"Giving up after {max_retries} attempts") + return releases + except Exception as e: + print(f"Request exception (attempt {attempt}/{max_retries}): {e}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry...") + time.sleep(retry_delay) + else: + print(f"Giving up after {max_retries} attempts") + return releases + if not data: + break + for release in data: + if re.match(PATTERN, release["tag_name"]): + releases.append(release) + page += 1 + return releases + + +def download_asset(asset_url: str, file_path: str) -> bool: + """Download a single release asset.""" + os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True) + + if os.path.exists(file_path): + print(f"[SKIP] {file_path} already downloaded.") + return True + + print(f"Downloading {asset_url}...") + try: + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(40) # 40-second timeout + + req = urllib.request.Request(asset_url, headers=HEADERS) + with urllib.request.urlopen(req) as response: + signal.alarm(0) + + if response.status == 200: + with open(file_path, "wb") as file: + while True: + chunk = response.read(8192) + if not chunk: + break + file.write(chunk) + print(f"Saved {file_path}") + return True + else: + print(f"Failed to download {asset_url}: {response.status} {response.msg}") + return False + except DownloadTimeoutException as e: + print(f"Download aborted for {asset_url}: {e}") + return False + except Exception as e: + print(f"An error occurred while downloading {asset_url}: {e}") + return False + + +def extract_split_archive(file_paths: list, extract_dir: str) -> bool: + """ + Extracts a split archive by concatenating the parts using 'cat' + and then extracting with 'tar' in one pipeline. + Deletes the tar files immediately after extraction to save disk space. + """ + if os.path.isdir(extract_dir): + print(f"[SKIP] Extraction directory already exists: {extract_dir}") + return True + + def sort_key(path: str): + base = os.path.basename(path) + parts = base.rsplit('.', maxsplit=1) + if len(parts) == 2: + suffix = parts[1] + if suffix.isdigit(): + return (0, int(suffix)) + if re.fullmatch(r'[a-zA-Z]+', suffix): + return (1, suffix) + return (2, base) + + file_paths = sorted(file_paths, key=sort_key) + os.makedirs(extract_dir, exist_ok=True) + + try: + cat_proc = subprocess.Popen( + ["cat"] + file_paths, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL + ) + tar_cmd = ["tar", "xf", "-", "-C", extract_dir, "--strip-components=1"] + subprocess.run( + tar_cmd, + stdin=cat_proc.stdout, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True + ) + cat_proc.stdout.close() + cat_proc.wait() + + print(f"Successfully extracted archive to {extract_dir}") + + # Delete tar files immediately after extraction + for tar_file in file_paths: + try: + os.remove(tar_file) + print(f"Deleted tar file: {tar_file}") + except Exception as e: + print(f"Failed to delete {tar_file}: {e}") + + # Check disk usage after deletion + disk = shutil.disk_usage('.') + free_gb = disk.free / (1024**3) + print(f"Disk space after tar deletion: {free_gb:.1f}GB free") + + return True + except subprocess.CalledProcessError as e: + print(f"Failed to extract split archive: {e}") + return False + + +# ============================================================================ +# Trace File Processing (with alt_baro/on_ground handling) +# ============================================================================ + +ALLOWED_DATA_SOURCE = {'', 'adsb.lol', 'adsbexchange', 'airplanes.live'} + + +def process_file(filepath: str) -> list: + """ + Process a single trace file and return list of rows. + Handles alt_baro/on_ground: if altitude == "ground", on_ground=True and alt_baro=None. + """ + insert_rows = [] + with gzip.open(filepath, 'rb') as f: + data = orjson.loads(f.read()) + icao = data.get('icao', None) + if icao is None: + print(f"Skipping file {filepath} as it does not contain 'icao'") + return [] + + r = data.get('r', "") + t = data.get('t', "") + dbFlags = data.get('dbFlags', 0) + noRegData = data.get('noRegData', False) + ownOp = data.get('ownOp', "") + year = int(data.get('year', 0)) + timestamp = data.get('timestamp', None) + desc = data.get('desc', "") + trace_data = data.get('trace', None) + + if timestamp is None or trace_data is None: + print(f"Skipping file {filepath} as it does not contain 'timestamp' or 'trace'") + return [] + + for row in trace_data: + time_offset = row[0] + lat = row[1] + lon = row[2] + altitude = row[3] + + # Handle alt_baro/on_ground + alt_baro = None + on_ground = False + if type(altitude) is str and altitude == "ground": + on_ground = True + elif type(altitude) is int: + alt_baro = altitude + elif type(altitude) is float: + alt_baro = int(altitude) + + ground_speed = row[4] + track_degrees = row[5] + flags = row[6] + vertical_rate = row[7] + aircraft = row[8] + source = row[9] + data_source_value = "adsb.lol" if "adsb.lol" in ALLOWED_DATA_SOURCE else "" + geometric_altitude = row[10] + geometric_vertical_rate = row[11] + indicated_airspeed = row[12] + roll_angle = row[13] + + time_val = timestamp + time_offset + dt64 = dt.datetime.fromtimestamp(time_val, tz=dt.timezone.utc) + + # Prepare base fields + inserted_row = [ + dt64, icao, r, t, dbFlags, noRegData, ownOp, year, desc, + lat, lon, alt_baro, on_ground, ground_speed, track_degrees, + flags, vertical_rate + ] + next_part = [ + source, geometric_altitude, geometric_vertical_rate, + indicated_airspeed, roll_angle + ] + inserted_row.extend(next_part) + + if aircraft is None or type(aircraft) is not dict: + aircraft = dict() + + aircraft_data = { + 'alert': aircraft.get('alert', None), + 'alt_geom': aircraft.get('alt_geom', None), + 'gva': aircraft.get('gva', None), + 'nac_p': aircraft.get('nac_p', None), + 'nac_v': aircraft.get('nac_v', None), + 'nic': aircraft.get('nic', None), + 'nic_baro': aircraft.get('nic_baro', None), + 'rc': aircraft.get('rc', None), + 'sda': aircraft.get('sda', None), + 'sil': aircraft.get('sil', None), + 'sil_type': aircraft.get('sil_type', ""), + 'spi': aircraft.get('spi', None), + 'track': aircraft.get('track', None), + 'type': aircraft.get('type', ""), + 'version': aircraft.get('version', None), + 'category': aircraft.get('category', ''), + 'emergency': aircraft.get('emergency', ''), + 'flight': aircraft.get('flight', ""), + 'squawk': aircraft.get('squawk', ""), + 'baro_rate': aircraft.get('baro_rate', None), + 'nav_altitude_fms': aircraft.get('nav_altitude_fms', None), + 'nav_altitude_mcp': aircraft.get('nav_altitude_mcp', None), + 'nav_modes': aircraft.get('nav_modes', []), + 'nav_qnh': aircraft.get('nav_qnh', None), + 'geom_rate': aircraft.get('geom_rate', None), + 'ias': aircraft.get('ias', None), + 'mach': aircraft.get('mach', None), + 'mag_heading': aircraft.get('mag_heading', None), + 'oat': aircraft.get('oat', None), + 'roll': aircraft.get('roll', None), + 'tas': aircraft.get('tas', None), + 'tat': aircraft.get('tat', None), + 'true_heading': aircraft.get('true_heading', None), + 'wd': aircraft.get('wd', None), + 'ws': aircraft.get('ws', None), + 'track_rate': aircraft.get('track_rate', None), + 'nav_heading': aircraft.get('nav_heading', None) + } + + aircraft_list = list(aircraft_data.values()) + inserted_row.extend(aircraft_list) + inserted_row.append(data_source_value) + + insert_rows.append(inserted_row) + + if insert_rows: + # print(f"Got {len(insert_rows)} rows from {filepath}") + return insert_rows + else: + return [] + + +# ============================================================================ +# Parquet Writing +# ============================================================================ + +# Column names matching the order of data in inserted_row +COLUMNS = [ + "time", "icao", + "r", "t", "dbFlags", "noRegData", "ownOp", "year", "desc", + "lat", "lon", "alt_baro", "on_ground", "ground_speed", "track_degrees", + "flags", "vertical_rate", "source", "geometric_altitude", + "geometric_vertical_rate", "indicated_airspeed", "roll_angle", + "aircraft_alert", "aircraft_alt_geom", "aircraft_gva", "aircraft_nac_p", + "aircraft_nac_v", "aircraft_nic", "aircraft_nic_baro", "aircraft_rc", + "aircraft_sda", "aircraft_sil", "aircraft_sil_type", "aircraft_spi", + "aircraft_track", "aircraft_type", "aircraft_version", "aircraft_category", + "aircraft_emergency", "aircraft_flight", "aircraft_squawk", + "aircraft_baro_rate", "aircraft_nav_altitude_fms", "aircraft_nav_altitude_mcp", + "aircraft_nav_modes", "aircraft_nav_qnh", "aircraft_geom_rate", + "aircraft_ias", "aircraft_mach", "aircraft_mag_heading", "aircraft_oat", + "aircraft_roll", "aircraft_tas", "aircraft_tat", "aircraft_true_heading", + "aircraft_wd", "aircraft_ws", "aircraft_track_rate", "aircraft_nav_heading", + "data_source", +] + + +OS_CPU_COUNT = os.cpu_count() or 1 +MAX_WORKERS = OS_CPU_COUNT if OS_CPU_COUNT > 4 else 1 +CHUNK_SIZE = MAX_WORKERS * 500 # Reduced for lower RAM usage +BATCH_SIZE = 250_000 # Fixed size for predictable memory usage (~500MB per batch) + +# PyArrow schema for efficient Parquet writing +PARQUET_SCHEMA = pa.schema([ + ("time", pa.timestamp("ms", tz="UTC")), + ("icao", pa.string()), + ("r", pa.string()), + ("t", pa.string()), + ("dbFlags", pa.int32()), + ("noRegData", pa.bool_()), + ("ownOp", pa.string()), + ("year", pa.uint16()), + ("desc", pa.string()), + ("lat", pa.float64()), + ("lon", pa.float64()), + ("alt_baro", pa.int32()), + ("on_ground", pa.bool_()), + ("ground_speed", pa.float32()), + ("track_degrees", pa.float32()), + ("flags", pa.uint32()), + ("vertical_rate", pa.int32()), + ("source", pa.string()), + ("geometric_altitude", pa.int32()), + ("geometric_vertical_rate", pa.int32()), + ("indicated_airspeed", pa.int32()), + ("roll_angle", pa.float32()), + ("aircraft_alert", pa.int64()), + ("aircraft_alt_geom", pa.int64()), + ("aircraft_gva", pa.int64()), + ("aircraft_nac_p", pa.int64()), + ("aircraft_nac_v", pa.int64()), + ("aircraft_nic", pa.int64()), + ("aircraft_nic_baro", pa.int64()), + ("aircraft_rc", pa.int64()), + ("aircraft_sda", pa.int64()), + ("aircraft_sil", pa.int64()), + ("aircraft_sil_type", pa.string()), + ("aircraft_spi", pa.int64()), + ("aircraft_track", pa.float64()), + ("aircraft_type", pa.string()), + ("aircraft_version", pa.int64()), + ("aircraft_category", pa.string()), + ("aircraft_emergency", pa.string()), + ("aircraft_flight", pa.string()), + ("aircraft_squawk", pa.string()), + ("aircraft_baro_rate", pa.int64()), + ("aircraft_nav_altitude_fms", pa.int64()), + ("aircraft_nav_altitude_mcp", pa.int64()), + ("aircraft_nav_modes", pa.list_(pa.string())), + ("aircraft_nav_qnh", pa.float64()), + ("aircraft_geom_rate", pa.int64()), + ("aircraft_ias", pa.int64()), + ("aircraft_mach", pa.float64()), + ("aircraft_mag_heading", pa.float64()), + ("aircraft_oat", pa.int64()), + ("aircraft_roll", pa.float64()), + ("aircraft_tas", pa.int64()), + ("aircraft_tat", pa.int64()), + ("aircraft_true_heading", pa.float64()), + ("aircraft_wd", pa.int64()), + ("aircraft_ws", pa.int64()), + ("aircraft_track_rate", pa.float64()), + ("aircraft_nav_heading", pa.float64()), + ("data_source", pa.string()), +]) + + +def collect_trace_files_with_find(root_dir): + """Find all trace_full_*.json files in the extracted directory.""" + trace_dict: dict[str, str] = {} + cmd = ['find', root_dir, '-type', 'f', '-name', 'trace_full_*.json'] + + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + if result.returncode != 0: + print(f"Error executing find: {result.stderr}") + return trace_dict + + for file_path in result.stdout.strip().split('\n'): + if file_path: + filename = os.path.basename(file_path) + if filename.startswith("trace_full_") and filename.endswith(".json"): + icao = filename[len("trace_full_"):-len(".json")] + trace_dict[icao] = file_path + + return trace_dict + + +def generate_version_dates(start_date: str, end_date: str) -> list: + """Generate a list of dates from start_date to end_date inclusive.""" + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + delta = end - start + return [start + timedelta(days=i) for i in range(delta.days + 1)] + + +def safe_process(fp): + """Safely process a file, returning empty list on error.""" + try: + return process_file(fp) + except Exception as e: + logging.error(f"Error processing {fp}: {e}") + return [] + + +def rows_to_arrow_table(rows: list) -> pa.Table: + """Convert list of rows to a PyArrow Table directly (no pandas).""" + # Transpose rows into columns + columns = list(zip(*rows)) + + # Build arrays for each column according to schema + arrays = [] + for i, field in enumerate(PARQUET_SCHEMA): + col_data = list(columns[i]) if i < len(columns) else [None] * len(rows) + arrays.append(pa.array(col_data, type=field.type)) + + return pa.Table.from_arrays(arrays, schema=PARQUET_SCHEMA) + + +def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int): + """Write a batch of rows to a Parquet file.""" + if not rows: + return + + table = rows_to_arrow_table(rows) + + parquet_path = os.path.join(PARQUET_DIR, f"{version_date}_batch_{batch_idx:04d}.parquet") + + pq.write_table(table, parquet_path, compression='snappy') + + print(f"Written parquet batch {batch_idx} ({len(rows)} rows) | {get_resource_usage()}") + + +def merge_parquet_files(version_date: str, delete_batches: bool = True): + """Merge all batch parquet files for a version_date into a single file using streaming.""" + pattern = os.path.join(PARQUET_DIR, f"{version_date}_batch_*.parquet") + batch_files = sorted(glob.glob(pattern)) + + if not batch_files: + print(f"No batch files found for {version_date}") + return None + + print(f"Merging {len(batch_files)} batch files for {version_date} (streaming)...") + + merged_path = os.path.join(PARQUET_DIR, f"{version_date}.parquet") + total_rows = 0 + + # Stream write: read one batch at a time to minimize RAM usage + writer = None + try: + for i, f in enumerate(batch_files): + table = pq.read_table(f) + total_rows += table.num_rows + + if writer is None: + writer = pq.ParquetWriter(merged_path, table.schema, compression='snappy') + + writer.write_table(table) + + # Delete batch file immediately after reading to free disk space + if delete_batches: + os.remove(f) + + # Free memory + del table + if (i + 1) % 10 == 0: + gc.collect() + print(f" Merged {i + 1}/{len(batch_files)} batches... | {get_resource_usage()}") + finally: + if writer is not None: + writer.close() + + print(f"Merged parquet file written to {merged_path} ({total_rows} total rows) | {get_resource_usage()}") + + if delete_batches: + print(f"Deleted {len(batch_files)} batch files during merge") + + gc.collect() + return merged_path + + +def process_version_date(version_date: str, keep_folders: bool = False): + """Download, extract, and process trace files for a single version date.""" + print(f"\nProcessing version_date: {version_date}") + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + + def collect_trace_files_for_version_date(vd): + releases = fetch_releases(vd) + if len(releases) == 0: + print(f"No releases found for {vd}.") + return None + + downloaded_files = [] + for release in releases: + tag_name = release["tag_name"] + print(f"Processing release: {tag_name}") + + # Only download prod-0 if available, else prod-0tmp + assets = release.get("assets", []) + normal_assets = [ + a for a in assets + if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"] + ] + tmp_assets = [ + a for a in assets + if "planes-readsb-prod-0tmp" in a["name"] + ] + use_assets = normal_assets if normal_assets else tmp_assets + + for asset in use_assets: + asset_name = asset["name"] + asset_url = asset["browser_download_url"] + file_path = os.path.join(OUTPUT_DIR, asset_name) + result = download_asset(asset_url, file_path) + if result: + downloaded_files.append(file_path) + + extract_split_archive(downloaded_files, extract_dir) + return collect_trace_files_with_find(extract_dir) + + # Check if files already exist + pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*") + matches = [p for p in glob.glob(pattern) if os.path.isfile(p)] + + if matches: + print(f"Found existing files for {version_date}:") + # Prefer non-tmp slices when reusing existing files + normal_matches = [ + p for p in matches + if "-planes-readsb-prod-0." in os.path.basename(p) + and "tmp" not in os.path.basename(p) + ] + downloaded_files = normal_matches if normal_matches else matches + + extract_split_archive(downloaded_files, extract_dir) + trace_files = collect_trace_files_with_find(extract_dir) + else: + trace_files = collect_trace_files_for_version_date(version_date) + + if trace_files is None or len(trace_files) == 0: + print(f"No trace files found for version_date: {version_date}") + return 0 + + file_list = list(trace_files.values()) + + start_time = time.perf_counter() + total_num_rows = 0 + batch_rows = [] + batch_idx = 0 + + # Process files in chunks + for offset in range(0, len(file_list), CHUNK_SIZE): + chunk = file_list[offset:offset + CHUNK_SIZE] + with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as process_executor: + for rows in process_executor.map(safe_process, chunk): + if not rows: + continue + batch_rows.extend(rows) + + if len(batch_rows) >= BATCH_SIZE: + total_num_rows += len(batch_rows) + write_batch_to_parquet(batch_rows, version_date, batch_idx) + batch_idx += 1 + batch_rows = [] + + elapsed = time.perf_counter() - start_time + speed = total_num_rows / elapsed if elapsed > 0 else 0 + print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)") + + gc.collect() + + # Final batch + if batch_rows: + total_num_rows += len(batch_rows) + write_batch_to_parquet(batch_rows, version_date, batch_idx) + elapsed = time.perf_counter() - start_time + speed = total_num_rows / elapsed if elapsed > 0 else 0 + print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)") + + print(f"Total rows processed for version_date {version_date}: {total_num_rows}") + + # Clean up extracted directory immediately after processing (before merging parquet files) + if not keep_folders and os.path.isdir(extract_dir): + print(f"Deleting extraction directory with 100,000+ files: {extract_dir}") + shutil.rmtree(extract_dir) + print(f"Successfully deleted extraction directory: {extract_dir} | {get_resource_usage()}") + + # Merge batch files into a single parquet file + merge_parquet_files(version_date, delete_batches=True) + + return total_num_rows + + +def create_parquet_for_day(day, keep_folders: bool = False): + """Create parquet file for a single day. + + Args: + day: datetime object or string in 'YYYY-MM-DD' format + keep_folders: Whether to keep extracted folders after processing + + Returns: + Path to the created parquet file, or None if failed + """ + from pathlib import Path + + if isinstance(day, str): + day = datetime.strptime(day, "%Y-%m-%d") + + version_date = f"v{day.strftime('%Y.%m.%d')}" + + # Check if parquet already exists + parquet_path = Path(PARQUET_DIR) / f"{version_date}.parquet" + if parquet_path.exists(): + print(f"Parquet file already exists: {parquet_path}") + return parquet_path + + print(f"Creating parquet for {version_date}...") + rows_processed = process_version_date(version_date, keep_folders) + + if rows_processed > 0 and parquet_path.exists(): + return parquet_path + else: + return None + + +def main(start_date: str, end_date: str, keep_folders: bool = False): + """Main function to download and convert adsb.lol data to Parquet.""" + version_dates = [f"v{date.strftime('%Y.%m.%d')}" for date in generate_version_dates(start_date, end_date)] + print(f"Processing dates: {version_dates}") + + total_rows_all = 0 + for version_date in version_dates: + rows_processed = process_version_date(version_date, keep_folders) + total_rows_all += rows_processed + + print(f"\n=== Summary ===") + print(f"Total dates processed: {len(version_dates)}") + print(f"Total rows written to Parquet: {total_rows_all}") + print(f"Parquet files location: {PARQUET_DIR}") + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, stream=sys.stdout, force=True) + + parser = argparse.ArgumentParser( + description="Download adsb.lol data and write to Parquet files" + ) + parser.add_argument("start_date", type=str, help="Start date in YYYY-MM-DD format") + parser.add_argument("end_date", type=str, help="End date in YYYY-MM-DD format") + parser.add_argument("--keep-folders", action="store_true", + help="Keep extracted folders after processing") + + args = parser.parse_args() + + main(args.start_date, args.end_date, args.keep_folders) diff --git a/src/adsb/download_and_list_icaos.py b/src/adsb/download_and_list_icaos.py new file mode 100644 index 0000000..b058b5d --- /dev/null +++ b/src/adsb/download_and_list_icaos.py @@ -0,0 +1,148 @@ +""" +Downloads and extracts adsb.lol tar files, then lists all ICAO folders. +This is the first step of the map-reduce pipeline. + +Outputs: +- Extracted trace files in data/output/{version_date}-planes-readsb-prod-0.tar_0/ +- ICAO manifest at data/output/icao_manifest_{date}.txt +""" +import os +import sys +import argparse +import glob +import subprocess +from datetime import datetime, timedelta + +# Re-use download/extract functions from download_adsb_data_to_parquet +from src.adsb.download_adsb_data_to_parquet import ( + OUTPUT_DIR, + fetch_releases, + download_asset, + extract_split_archive, + collect_trace_files_with_find, +) + + +def get_target_day() -> datetime: + """Get yesterday's date (the day we're processing).""" + # return datetime.utcnow() - timedelta(days=1) + return datetime.utcnow() - timedelta(days=1) + + +def download_and_extract(version_date: str) -> str | None: + """Download and extract tar files, return extract directory path.""" + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + + # Check if already extracted + if os.path.isdir(extract_dir): + print(f"[SKIP] Already extracted: {extract_dir}") + return extract_dir + + # Check for existing tar files + pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*") + matches = [p for p in glob.glob(pattern) if os.path.isfile(p)] + + if matches: + print(f"Found existing tar files for {version_date}") + normal_matches = [ + p for p in matches + if "-planes-readsb-prod-0." in os.path.basename(p) + and "tmp" not in os.path.basename(p) + ] + downloaded_files = normal_matches if normal_matches else matches + else: + # Download from GitHub + print(f"Downloading releases for {version_date}...") + releases = fetch_releases(version_date) + if not releases: + print(f"No releases found for {version_date}") + return None + + downloaded_files = [] + for release in releases: + tag_name = release["tag_name"] + print(f"Processing release: {tag_name}") + + assets = release.get("assets", []) + normal_assets = [ + a for a in assets + if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"] + ] + tmp_assets = [ + a for a in assets + if "planes-readsb-prod-0tmp" in a["name"] + ] + use_assets = normal_assets if normal_assets else tmp_assets + + for asset in use_assets: + asset_name = asset["name"] + asset_url = asset["browser_download_url"] + file_path = os.path.join(OUTPUT_DIR, asset_name) + if download_asset(asset_url, file_path): + downloaded_files.append(file_path) + + if not downloaded_files: + print(f"No files downloaded for {version_date}") + return None + + # Extract + if extract_split_archive(downloaded_files, extract_dir): + return extract_dir + return None + + +def list_icao_folders(extract_dir: str) -> list[str]: + """List all ICAO folder names from extracted directory.""" + trace_files = collect_trace_files_with_find(extract_dir) + icaos = sorted(trace_files.keys()) + print(f"Found {len(icaos)} unique ICAOs") + return icaos + + +def write_manifest(icaos: list[str], date_str: str) -> str: + """Write ICAO list to manifest file.""" + manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{date_str}.txt") + with open(manifest_path, "w") as f: + for icao in icaos: + f.write(f"{icao}\n") + print(f"Wrote manifest with {len(icaos)} ICAOs to {manifest_path}") + return manifest_path + + +def main(): + parser = argparse.ArgumentParser(description="Download and list ICAOs from adsb.lol data") + parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") + args = parser.parse_args() + + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + + date_str = target_day.strftime("%Y-%m-%d") + version_date = f"v{target_day.strftime('%Y.%m.%d')}" + + print(f"Processing date: {date_str} (version: {version_date})") + + # Download and extract + extract_dir = download_and_extract(version_date) + if not extract_dir: + print("Failed to download/extract data") + sys.exit(1) + + # List ICAOs + icaos = list_icao_folders(extract_dir) + if not icaos: + print("No ICAOs found") + sys.exit(1) + + # Write manifest + manifest_path = write_manifest(icaos, date_str) + + print(f"\nDone! Extract dir: {extract_dir}") + print(f"Manifest: {manifest_path}") + print(f"Total ICAOs: {len(icaos)}") + + +if __name__ == "__main__": + main() diff --git a/src/adsb/process_icao_chunk.py b/src/adsb/process_icao_chunk.py new file mode 100644 index 0000000..9092088 --- /dev/null +++ b/src/adsb/process_icao_chunk.py @@ -0,0 +1,270 @@ +""" +Processes a chunk of ICAOs from pre-extracted trace files. +This is the map phase of the map-reduce pipeline. + +Expects extract_dir to already exist with trace files. +Reads ICAO manifest to determine which ICAOs to process based on chunk-id. + +Usage: + python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 +""" +import gc +import os +import sys +import argparse +import time +import concurrent.futures +from datetime import datetime, timedelta + +import pyarrow as pa +import pyarrow.parquet as pq + +from src.adsb.download_adsb_data_to_parquet import ( + OUTPUT_DIR, + PARQUET_DIR, + PARQUET_SCHEMA, + COLUMNS, + MAX_WORKERS, + process_file, + get_resource_usage, + collect_trace_files_with_find, +) + + +CHUNK_OUTPUT_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks") +os.makedirs(CHUNK_OUTPUT_DIR, exist_ok=True) + +# Smaller batch size for memory efficiency +BATCH_SIZE = 100_000 + + +def get_target_day() -> datetime: + """Get yesterday's date (the day we're processing).""" + return datetime.utcnow() - timedelta(days=1) + + +def read_manifest(date_str: str) -> list[str]: + """Read ICAO manifest file.""" + manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{date_str}.txt") + if not os.path.exists(manifest_path): + raise FileNotFoundError(f"Manifest not found: {manifest_path}") + + with open(manifest_path, "r") as f: + icaos = [line.strip() for line in f if line.strip()] + return icaos + + +def deterministic_hash(s: str) -> int: + """Return a deterministic hash for a string (unlike Python's hash() which is randomized).""" + # Use sum of byte values - simple but deterministic + return sum(ord(c) for c in s) + + +def get_chunk_icaos(icaos: list[str], chunk_id: int, total_chunks: int) -> list[str]: + """Get the subset of ICAOs for this chunk based on deterministic hash partitioning.""" + return [icao for icao in icaos if deterministic_hash(icao) % total_chunks == chunk_id] + + +def build_trace_file_map(extract_dir: str) -> dict[str, str]: + """Build a map of ICAO -> trace file path using find command.""" + print(f"Building trace file map from {extract_dir}...") + + # Debug: check what's in extract_dir + if os.path.isdir(extract_dir): + items = os.listdir(extract_dir)[:10] + print(f"First 10 items in extract_dir: {items}") + # Check if there are subdirectories + for item in items[:3]: + subpath = os.path.join(extract_dir, item) + if os.path.isdir(subpath): + subitems = os.listdir(subpath)[:5] + print(f" Contents of {item}/: {subitems}") + + trace_map = collect_trace_files_with_find(extract_dir) + print(f"Found {len(trace_map)} trace files") + + if len(trace_map) == 0: + # Debug: try manual find + import subprocess + result = subprocess.run( + ['find', extract_dir, '-type', 'f', '-name', 'trace_full_*'], + capture_output=True, text=True + ) + print(f"Manual find output (first 500 chars): {result.stdout[:500]}") + print(f"Manual find stderr: {result.stderr[:200]}") + + return trace_map + + +def safe_process(filepath: str) -> list: + """Safely process a file, returning empty list on error.""" + try: + return process_file(filepath) + except Exception as e: + print(f"Error processing {filepath}: {e}") + return [] + + +def rows_to_table(rows: list) -> pa.Table: + """Convert list of rows to PyArrow table.""" + import pandas as pd + df = pd.DataFrame(rows, columns=COLUMNS) + if not df['time'].dt.tz: + df['time'] = df['time'].dt.tz_localize('UTC') + return pa.Table.from_pandas(df, schema=PARQUET_SCHEMA, preserve_index=False) + + +def process_chunk( + chunk_id: int, + total_chunks: int, + trace_map: dict[str, str], + icaos: list[str], + date_str: str, +) -> str | None: + """Process a chunk of ICAOs and write to parquet.""" + chunk_icaos = get_chunk_icaos(icaos, chunk_id, total_chunks) + print(f"Chunk {chunk_id}/{total_chunks}: Processing {len(chunk_icaos)} ICAOs") + + if not chunk_icaos: + print(f"Chunk {chunk_id}: No ICAOs to process") + return None + + # Get trace file paths from the map + trace_files = [] + for icao in chunk_icaos: + if icao in trace_map: + trace_files.append(trace_map[icao]) + + print(f"Chunk {chunk_id}: Found {len(trace_files)} trace files") + + if not trace_files: + print(f"Chunk {chunk_id}: No trace files found") + return None + + # Process files and write parquet in batches + output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{date_str}.parquet") + + start_time = time.perf_counter() + total_rows = 0 + batch_rows = [] + writer = None + + try: + # Process in parallel batches + files_per_batch = MAX_WORKERS * 100 + for offset in range(0, len(trace_files), files_per_batch): + batch_files = trace_files[offset:offset + files_per_batch] + + with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor: + for rows in executor.map(safe_process, batch_files): + if rows: + batch_rows.extend(rows) + + # Write when batch is full + if len(batch_rows) >= BATCH_SIZE: + table = rows_to_table(batch_rows) + total_rows += len(batch_rows) + + if writer is None: + writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') + writer.write_table(table) + + batch_rows = [] + del table + gc.collect() + + elapsed = time.perf_counter() - start_time + print(f"Chunk {chunk_id}: {total_rows} rows, {elapsed:.1f}s | {get_resource_usage()}") + + gc.collect() + + # Write remaining rows + if batch_rows: + table = rows_to_table(batch_rows) + total_rows += len(batch_rows) + + if writer is None: + writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') + writer.write_table(table) + del table + + finally: + if writer: + writer.close() + + elapsed = time.perf_counter() - start_time + print(f"Chunk {chunk_id}: Done! {total_rows} rows in {elapsed:.1f}s | {get_resource_usage()}") + + if total_rows > 0: + return output_path + return None + + +def main(): + parser = argparse.ArgumentParser(description="Process a chunk of ICAOs") + parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)") + parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks") + parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") + args = parser.parse_args() + + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + + date_str = target_day.strftime("%Y-%m-%d") + version_date = f"v{target_day.strftime('%Y.%m.%d')}" + + print(f"Processing chunk {args.chunk_id}/{args.total_chunks} for {date_str}") + print(f"OUTPUT_DIR: {OUTPUT_DIR}") + print(f"CHUNK_OUTPUT_DIR: {CHUNK_OUTPUT_DIR}") + print(f"Resource usage at start: {get_resource_usage()}") + + # Debug: List what's in OUTPUT_DIR + print(f"\nContents of {OUTPUT_DIR}:") + if os.path.isdir(OUTPUT_DIR): + for item in os.listdir(OUTPUT_DIR)[:20]: + print(f" - {item}") + else: + print(f" Directory does not exist!") + + # Find extract directory + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + print(f"\nLooking for extract_dir: {extract_dir}") + if not os.path.isdir(extract_dir): + print(f"Extract directory not found: {extract_dir}") + # Try to find any extracted directory + import glob + pattern = os.path.join(OUTPUT_DIR, "*-planes-readsb-prod-0*") + matches = glob.glob(pattern) + print(f"Searching for pattern: {pattern}") + print(f"Found matches: {matches}") + sys.exit(1) + + # Build trace file map using find + trace_map = build_trace_file_map(extract_dir) + if not trace_map: + print("No trace files found in extract directory") + sys.exit(1) + + # Read manifest + icaos = read_manifest(date_str) + print(f"Total ICAOs in manifest: {len(icaos)}") + + # Process chunk + output_path = process_chunk( + args.chunk_id, + args.total_chunks, + trace_map, + icaos, + date_str, + ) + + if output_path: + print(f"Output: {output_path}") + else: + print("No output generated") + + +if __name__ == "__main__": + main() diff --git a/src/adsb/reducer.py b/src/adsb/reducer.py new file mode 100644 index 0000000..3ad7f40 --- /dev/null +++ b/src/adsb/reducer.py @@ -0,0 +1,97 @@ +""" +Reduce step: downloads all chunk CSVs from S3, combines them, +deduplicates across the full dataset, and uploads the final result. + +Environment variables: + S3_BUCKET — bucket with intermediate results + RUN_ID — run identifier matching the map workers + GLOBAL_START_DATE — overall start date for output filename + GLOBAL_END_DATE — overall end date for output filename +""" +import gzip +import os +import shutil +from pathlib import Path + +import boto3 +import polars as pl + +from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature + + +def main(): + s3_bucket = os.environ["S3_BUCKET"] + run_id = os.environ.get("RUN_ID", "default") + global_start = os.environ["GLOBAL_START_DATE"] + global_end = os.environ["GLOBAL_END_DATE"] + + s3 = boto3.client("s3") + prefix = f"intermediate/{run_id}/" + + # List all chunk files for this run + paginator = s3.get_paginator("list_objects_v2") + chunk_keys = [] + for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix): + for obj in page.get("Contents", []): + if obj["Key"].endswith(".csv.gz"): + chunk_keys.append(obj["Key"]) + + chunk_keys.sort() + print(f"Found {len(chunk_keys)} chunks to combine") + + if not chunk_keys: + print("No chunks found — nothing to reduce.") + return + + # Download and concatenate all chunks + download_dir = Path("/tmp/chunks") + download_dir.mkdir(parents=True, exist_ok=True) + + dfs = [] + + for key in chunk_keys: + gz_path = download_dir / Path(key).name + csv_path = gz_path.with_suffix("") # Remove .gz + print(f"Downloading {key}...") + s3.download_file(s3_bucket, key, str(gz_path)) + + # Decompress + with gzip.open(gz_path, 'rb') as f_in: + with open(csv_path, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + gz_path.unlink() + + df_chunk = pl.read_csv(csv_path) + print(f" Loaded {df_chunk.height} rows from {csv_path.name}") + dfs.append(df_chunk) + + # Free disk space after loading + csv_path.unlink() + + df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame() + print(f"Combined: {df_accumulated.height} rows before dedup") + + # Final global deduplication + df_accumulated = deduplicate_by_signature(df_accumulated) + print(f"After dedup: {df_accumulated.height} rows") + + # Write and upload final result + output_name = f"planequery_aircraft_adsb_{global_start}_{global_end}.csv.gz" + csv_output = Path(f"/tmp/planequery_aircraft_adsb_{global_start}_{global_end}.csv") + gz_output = Path(f"/tmp/{output_name}") + + df_accumulated.write_csv(csv_output) + with open(csv_output, 'rb') as f_in: + with gzip.open(gz_output, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + csv_output.unlink() + + final_key = f"final/{output_name}" + print(f"Uploading to s3://{s3_bucket}/{final_key}") + s3.upload_file(str(gz_output), s3_bucket, final_key) + + print(f"Final output: {df_accumulated.height} records -> {final_key}") + + +if __name__ == "__main__": + main() diff --git a/src/adsb/requirements.reducer.txt b/src/adsb/requirements.reducer.txt new file mode 100644 index 0000000..29e6bf9 --- /dev/null +++ b/src/adsb/requirements.reducer.txt @@ -0,0 +1,2 @@ +polars>=1.0 +boto3>=1.34 diff --git a/src/adsb/requirements.worker.txt b/src/adsb/requirements.worker.txt new file mode 100644 index 0000000..cf305a7 --- /dev/null +++ b/src/adsb/requirements.worker.txt @@ -0,0 +1,5 @@ +polars>=1.0 +pyarrow>=14.0 +orjson>=3.9 +boto3>=1.34 +zstandard>=0.22 diff --git a/src/adsb/worker.py b/src/adsb/worker.py new file mode 100644 index 0000000..9884ce7 --- /dev/null +++ b/src/adsb/worker.py @@ -0,0 +1,89 @@ +""" +Map worker: processes a date range chunk, uploads result to S3. + +Environment variables: + START_DATE — inclusive, YYYY-MM-DD + END_DATE — exclusive, YYYY-MM-DD + S3_BUCKET — bucket for intermediate results + RUN_ID — unique run identifier for namespacing S3 keys +""" +import os +import sys +from datetime import datetime, timedelta +from pathlib import Path + +import boto3 +import polars as pl + +from compress_adsb_to_aircraft_data import ( + load_historical_for_day, + deduplicate_by_signature, + COLUMNS, +) + + +def main(): + start_date_str = os.environ["START_DATE"] + end_date_str = os.environ["END_DATE"] + s3_bucket = os.environ["S3_BUCKET"] + run_id = os.environ.get("RUN_ID", "default") + + start_date = datetime.strptime(start_date_str, "%Y-%m-%d") + end_date = datetime.strptime(end_date_str, "%Y-%m-%d") + + total_days = (end_date - start_date).days + print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})") + + dfs = [] + current_date = start_date + + while current_date < end_date: + day_str = current_date.strftime("%Y-%m-%d") + print(f" Loading {day_str}...") + + df_compressed = load_historical_for_day(current_date) + if df_compressed.height == 0: + raise RuntimeError(f"No data found for {day_str}") + + dfs.append(df_compressed) + total_rows = sum(df.height for df in dfs) + print(f" +{df_compressed.height} rows (total: {total_rows})") + + # Delete local cache after each day to save disk in container + cache_dir = Path("data/adsb") + if cache_dir.exists(): + import shutil + shutil.rmtree(cache_dir) + + current_date += timedelta(days=1) + + # Concatenate all days + df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame() + + # Deduplicate within this chunk + df_accumulated = deduplicate_by_signature(df_accumulated) + print(f"After dedup: {df_accumulated.height} rows") + + # Write to local file then upload to S3 + local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv") + df_accumulated.write_csv(local_path) + + # Compress with gzip + import gzip + import shutil + gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz") + with open(local_path, 'rb') as f_in: + with gzip.open(gz_path, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + local_path.unlink() # Remove uncompressed file + + s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz" + print(f"Uploading to s3://{s3_bucket}/{s3_key}") + + s3 = boto3.client("s3") + s3.upload_file(str(gz_path), s3_bucket, s3_key) + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/src/concat_csvs.py b/src/concat_csvs.py deleted file mode 100644 index ce6780e..0000000 --- a/src/concat_csvs.py +++ /dev/null @@ -1,89 +0,0 @@ -from pathlib import Path -import pandas as pd -import re -from derive_from_faa_master_txt import concat_faa_historical_df - -def concatenate_aircraft_csvs( - input_dir: Path = Path("data/concat"), - output_dir: Path = Path("data/planequery_aircraft"), - filename_pattern: str = r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_(\d{4}-\d{2}-\d{2})\.csv" -): - """ - Read all CSVs matching the pattern from input_dir in order, - concatenate them using concat_faa_historical_df, and output a single CSV. - - Args: - input_dir: Directory containing the CSV files to concatenate - output_dir: Directory where the output CSV will be saved - filename_pattern: Regex pattern to match CSV filenames - """ - input_dir = Path(input_dir) - output_dir = Path(output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - - # Find all matching CSV files - pattern = re.compile(filename_pattern) - csv_files = [] - - for csv_path in sorted(input_dir.glob("*.csv")): - match = pattern.search(csv_path.name) - if match: - start_date = match.group(1) - end_date = match.group(2) - csv_files.append((start_date, end_date, csv_path)) - - # Sort by start date, then end date - csv_files.sort(key=lambda x: (x[0], x[1])) - - if not csv_files: - raise FileNotFoundError(f"No CSV files matching pattern found in {input_dir}") - - print(f"Found {len(csv_files)} CSV files to concatenate") - - # Read first CSV as base - first_start_date, first_end_date, first_path = csv_files[0] - print(f"Reading base file: {first_path.name}") - df_base = pd.read_csv( - first_path, - dtype={ - 'transponder_code': str, - 'unique_regulatory_id': str, - 'registrant_county': str - } - ) - - # Concatenate remaining CSVs - for start_date, end_date, csv_path in csv_files[1:]: - print(f"Concatenating: {csv_path.name}") - df_new = pd.read_csv( - csv_path, - dtype={ - 'transponder_code': str, - 'unique_regulatory_id': str, - 'registrant_county': str - } - ) - df_base = concat_faa_historical_df(df_base, df_new) - - # Verify monotonic increasing download_date - assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" - - # Output filename uses first start date and last end date - last_start_date, last_end_date, _ = csv_files[-1] - output_filename = f"planequery_aircraft_{first_start_date}_{last_end_date}.csv" - output_path = output_dir / output_filename - - print(f"Writing output to: {output_path}") - df_base.to_csv(output_path, index=False) - print(f"Successfully concatenated {len(csv_files)} files into {output_filename}") - print(f"Total rows: {len(df_base)}") - - return output_path - - -if __name__ == "__main__": - # Example usage - modify these paths as needed - concatenate_aircraft_csvs( - input_dir=Path("data/concat"), - output_dir=Path("data/planequery_aircraft") - ) \ No newline at end of file diff --git a/src/contributions/__init__.py b/src/contributions/__init__.py new file mode 100644 index 0000000..b630a86 --- /dev/null +++ b/src/contributions/__init__.py @@ -0,0 +1 @@ +"""Community contributions processing module.""" diff --git a/src/contributions/approve_submission.py b/src/contributions/approve_submission.py new file mode 100644 index 0000000..5d953a4 --- /dev/null +++ b/src/contributions/approve_submission.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +""" +Approve a community submission and create a PR. + +This script is called by the GitHub Actions workflow when the 'approved' +label is added to a validated submission issue. + +Usage: + python -m src.contributions.approve_submission --issue-number 123 --issue-body "..." --author "username" --author-id 12345 + +Environment variables: + GITHUB_TOKEN: GitHub API token with repo write permissions + GITHUB_REPOSITORY: owner/repo +""" +import argparse +import base64 +import json +import os +import sys +import urllib.request +import urllib.error +from datetime import datetime, timezone + +from .schema import extract_json_from_issue_body, extract_contributor_name_from_issue_body, parse_and_validate +from .contributor import ( + generate_contributor_uuid, + generate_submission_filename, + compute_content_hash, +) + + +def github_api_request( + method: str, + endpoint: str, + data: dict | None = None, + accept: str = "application/vnd.github.v3+json" +) -> dict: + """Make a GitHub API request.""" + token = os.environ.get("GITHUB_TOKEN") + repo = os.environ.get("GITHUB_REPOSITORY") + + if not token or not repo: + raise EnvironmentError("GITHUB_TOKEN and GITHUB_REPOSITORY must be set") + + url = f"https://api.github.com/repos/{repo}{endpoint}" + headers = { + "Authorization": f"token {token}", + "Accept": accept, + "Content-Type": "application/json", + } + + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, headers=headers, method=method) + + try: + with urllib.request.urlopen(req) as response: + return json.loads(response.read()) + except urllib.error.HTTPError as e: + error_body = e.read().decode() if e.fp else "" + print(f"GitHub API error: {e.code} {e.reason}: {error_body}", file=sys.stderr) + raise + + +def add_issue_comment(issue_number: int, body: str) -> None: + """Add a comment to a GitHub issue.""" + github_api_request("POST", f"/issues/{issue_number}/comments", {"body": body}) + + +def get_default_branch_sha() -> str: + """Get the SHA of the default branch (main).""" + ref = github_api_request("GET", "/git/ref/heads/main") + return ref["object"]["sha"] + + +def create_branch(branch_name: str, sha: str) -> None: + """Create a new branch from a SHA.""" + try: + github_api_request("POST", "/git/refs", { + "ref": f"refs/heads/{branch_name}", + "sha": sha, + }) + except urllib.error.HTTPError as e: + if e.code == 422: # Branch exists + # Delete and recreate + try: + github_api_request("DELETE", f"/git/refs/heads/{branch_name}") + except urllib.error.HTTPError: + pass + github_api_request("POST", "/git/refs", { + "ref": f"refs/heads/{branch_name}", + "sha": sha, + }) + else: + raise + + +def create_or_update_file(path: str, content: str, message: str, branch: str) -> None: + """Create or update a file in the repository.""" + content_b64 = base64.b64encode(content.encode()).decode() + github_api_request("PUT", f"/contents/{path}", { + "message": message, + "content": content_b64, + "branch": branch, + }) + + +def create_pull_request(title: str, head: str, base: str, body: str) -> dict: + """Create a pull request.""" + return github_api_request("POST", "/pulls", { + "title": title, + "head": head, + "base": base, + "body": body, + }) + + +def add_labels_to_issue(issue_number: int, labels: list[str]) -> None: + """Add labels to an issue or PR.""" + github_api_request("POST", f"/issues/{issue_number}/labels", {"labels": labels}) + + +def process_submission( + issue_number: int, + issue_body: str, + author_username: str, + author_id: int, +) -> bool: + """ + Process an approved submission and create a PR. + + Args: + issue_number: The GitHub issue number + issue_body: The issue body text + author_username: The GitHub username of the issue author + author_id: The numeric GitHub user ID + + Returns: + True if successful, False otherwise + """ + # Extract and validate JSON + json_str = extract_json_from_issue_body(issue_body) + if not json_str: + add_issue_comment(issue_number, "❌ Could not extract JSON from submission.") + return False + + data, errors = parse_and_validate(json_str) + if errors: + error_list = "\n".join(f"- {e}" for e in errors) + add_issue_comment(issue_number, f"❌ **Validation Failed**\n\n{error_list}") + return False + + # Normalize to list + submissions = data if isinstance(data, list) else [data] + + # Generate contributor UUID from GitHub ID + contributor_uuid = generate_contributor_uuid(author_id) + + # Extract contributor name from issue form (or default to GitHub username) + contributor_name = extract_contributor_name_from_issue_body(issue_body) + if not contributor_name: + contributor_name = f"@{author_username}" + + # Add metadata to each submission + now = datetime.now(timezone.utc) + date_str = now.strftime("%Y-%m-%d") + timestamp_str = now.isoformat() + + for submission in submissions: + submission["contributor_uuid"] = contributor_uuid + submission["contributor_name"] = contributor_name + submission["creation_timestamp"] = timestamp_str + + # Generate unique filename + content_json = json.dumps(submissions, indent=2, sort_keys=True) + content_hash = compute_content_hash(content_json) + filename = generate_submission_filename(author_username, date_str, content_hash) + file_path = f"community/{filename}" + + # Create branch + branch_name = f"community-submission-{issue_number}" + default_sha = get_default_branch_sha() + create_branch(branch_name, default_sha) + + # Create file + commit_message = f"Add community submission from @{author_username} (closes #{issue_number})" + create_or_update_file(file_path, content_json, commit_message, branch_name) + + # Create PR + pr_body = f"""## Community Submission + +Adds {len(submissions)} submission(s) from @{author_username}. + +**File:** `{file_path}` +**Contributor UUID:** `{contributor_uuid}` + +Closes #{issue_number} + +--- + +### Submissions +```json +{content_json} +```""" + + pr = create_pull_request( + title=f"Community submission: {filename}", + head=branch_name, + base="main", + body=pr_body, + ) + + # Add labels to PR + add_labels_to_issue(pr["number"], ["community", "auto-generated"]) + + # Comment on original issue + add_issue_comment( + issue_number, + f"✅ **Submission Approved**\n\n" + f"PR #{pr['number']} has been created to add your submission.\n\n" + f"**File:** `{file_path}`\n" + f"**Your Contributor UUID:** `{contributor_uuid}`\n\n" + f"The PR will be merged by a maintainer." + ) + + print(f"Created PR #{pr['number']} for submission") + return True + + +def main(): + parser = argparse.ArgumentParser(description="Approve community submission and create PR") + parser.add_argument("--issue-number", type=int, required=True, help="GitHub issue number") + parser.add_argument("--issue-body", required=True, help="Issue body text") + parser.add_argument("--author", required=True, help="Issue author username") + parser.add_argument("--author-id", type=int, required=True, help="Issue author numeric ID") + + args = parser.parse_args() + + success = process_submission( + issue_number=args.issue_number, + issue_body=args.issue_body, + author_username=args.author, + author_id=args.author_id, + ) + + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/src/contributions/contributor.py b/src/contributions/contributor.py new file mode 100644 index 0000000..7e7c10b --- /dev/null +++ b/src/contributions/contributor.py @@ -0,0 +1,86 @@ +"""Contributor identification utilities.""" +import hashlib +import uuid + + +# DNS namespace UUID for generating UUIDv5 +DNS_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') + + +def generate_contributor_uuid(github_user_id: int) -> str: + """ + Generate a deterministic UUID v5 from a GitHub user ID. + + This ensures the same GitHub account always gets the same contributor UUID. + + Args: + github_user_id: The numeric GitHub user ID + + Returns: + UUID string in standard format + """ + name = f"github:{github_user_id}" + return str(uuid.uuid5(DNS_NAMESPACE, name)) + + +def sanitize_username(username: str, max_length: int = 20) -> str: + """ + Sanitize a GitHub username for use in filenames. + + Args: + username: GitHub username + max_length: Maximum length of sanitized name + + Returns: + Lowercase alphanumeric string with underscores + """ + sanitized = "" + for char in username.lower(): + if char.isalnum(): + sanitized += char + else: + sanitized += "_" + + # Collapse multiple underscores + while "__" in sanitized: + sanitized = sanitized.replace("__", "_") + + return sanitized.strip("_")[:max_length] + + +def generate_submission_filename( + username: str, + date_str: str, + content_hash: str, + extension: str = ".json" +) -> str: + """ + Generate a unique filename for a community submission. + + Format: {sanitized_username}_{date}_{short_hash}.json + + Args: + username: GitHub username + date_str: Date in YYYY-MM-DD format + content_hash: Hash of the submission content (will be truncated to 8 chars) + extension: File extension (default: .json) + + Returns: + Unique filename string + """ + sanitized_name = sanitize_username(username) + short_hash = content_hash[:8] + return f"{sanitized_name}_{date_str}_{short_hash}{extension}" + + +def compute_content_hash(content: str) -> str: + """ + Compute SHA256 hash of content. + + Args: + content: String content to hash + + Returns: + Hex digest of SHA256 hash + """ + return hashlib.sha256(content.encode()).hexdigest() diff --git a/src/contributions/create_daily_community_release.py b/src/contributions/create_daily_community_release.py new file mode 100644 index 0000000..f709d24 --- /dev/null +++ b/src/contributions/create_daily_community_release.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +""" +Generate a daily CSV of all community contributions. + +Reads all JSON files from the community/ directory and outputs a sorted CSV +with creation_timestamp as the first column and contributor_name/contributor_uuid as the last columns. + +Usage: + python -m src.contributions.create_daily_community_release +""" +from datetime import datetime, timezone +from pathlib import Path +import json +import sys + +import pandas as pd + + +COMMUNITY_DIR = Path(__file__).parent.parent.parent / "community" +OUT_ROOT = Path("data/planequery_aircraft") + + +def read_all_submissions(community_dir: Path) -> list[dict]: + """Read all JSON submissions from the community directory.""" + all_submissions = [] + + for json_file in sorted(community_dir.glob("*.json")): + try: + with open(json_file) as f: + data = json.load(f) + + # Normalize to list + submissions = data if isinstance(data, list) else [data] + all_submissions.extend(submissions) + + except (json.JSONDecodeError, OSError) as e: + print(f"Warning: Failed to read {json_file}: {e}", file=sys.stderr) + + return all_submissions + + +def submissions_to_dataframe(submissions: list[dict]) -> pd.DataFrame: + """ + Convert submissions to a DataFrame with proper column ordering. + + Column order: + - creation_timestamp (first) + - transponder_code_hex + - registration_number + - planequery_airframe_id + - contributor_name + - [other columns alphabetically] + - contributor_uuid (last) + """ + if not submissions: + return pd.DataFrame() + + df = pd.DataFrame(submissions) + + # Ensure required columns exist + required_cols = [ + "creation_timestamp", + "transponder_code_hex", + "registration_number", + "planequery_airframe_id", + "contributor_name", + "contributor_uuid", + ] + for col in required_cols: + if col not in df.columns: + df[col] = None + + # Sort by creation_timestamp ascending + df = df.sort_values("creation_timestamp", ascending=True, na_position="last") + + # Reorder columns: specific order first, contributor_uuid last + first_cols = [ + "creation_timestamp", + "transponder_code_hex", + "registration_number", + "planequery_airframe_id", + "contributor_name", + ] + last_cols = ["contributor_uuid"] + + middle_cols = sorted([ + col for col in df.columns + if col not in first_cols and col not in last_cols + ]) + + ordered_cols = first_cols + middle_cols + last_cols + df = df[ordered_cols] + + return df.reset_index(drop=True) + + +def main(): + """Generate the daily community contributions CSV.""" + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + + print(f"Reading community submissions from {COMMUNITY_DIR}") + submissions = read_all_submissions(COMMUNITY_DIR) + + if not submissions: + print("No community submissions found.") + # Still create an empty CSV with headers + df = pd.DataFrame(columns=[ + "creation_timestamp", + "transponder_code_hex", + "registration_number", + "planequery_airframe_id", + "contributor_name", + "tags", + "contributor_uuid", + ]) + else: + print(f"Found {len(submissions)} total submissions") + df = submissions_to_dataframe(submissions) + + # Determine date range for filename + if not df.empty and df["creation_timestamp"].notna().any(): + # Get earliest timestamp for start date + earliest = pd.to_datetime(df["creation_timestamp"]).min() + start_date_str = earliest.strftime("%Y-%m-%d") + else: + start_date_str = date_str + + # Output + OUT_ROOT.mkdir(parents=True, exist_ok=True) + output_file = OUT_ROOT / f"planequery_aircraft_community_{start_date_str}_{date_str}.csv" + + df.to_csv(output_file, index=False) + + print(f"Saved: {output_file}") + print(f"Total contributions: {len(df)}") + + return output_file + + +if __name__ == "__main__": + main() diff --git a/src/contributions/read_community_data.py b/src/contributions/read_community_data.py new file mode 100644 index 0000000..0e6e4ea --- /dev/null +++ b/src/contributions/read_community_data.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +""" +Read and aggregate all community submission data. + +Usage: + python -m src.contributions.read_community_data + python -m src.contributions.read_community_data --output merged.json +""" +import argparse +import json +import sys +from pathlib import Path + + +COMMUNITY_DIR = Path(__file__).parent.parent.parent / "community" + + +def read_all_submissions(community_dir: Path | None = None) -> list[dict]: + """ + Read all JSON submissions from the community directory. + + Args: + community_dir: Path to community directory. Uses default if None. + + Returns: + List of all submission dictionaries + """ + if community_dir is None: + community_dir = COMMUNITY_DIR + + all_submissions = [] + + for json_file in sorted(community_dir.glob("*.json")): + try: + with open(json_file) as f: + data = json.load(f) + + # Normalize to list + submissions = data if isinstance(data, list) else [data] + + # Add source file metadata + for submission in submissions: + submission["_source_file"] = json_file.name + + all_submissions.extend(submissions) + + except (json.JSONDecodeError, OSError) as e: + print(f"Warning: Failed to read {json_file}: {e}", file=sys.stderr) + + return all_submissions + + +def group_by_identifier(submissions: list[dict]) -> dict[str, list[dict]]: + """ + Group submissions by their identifier (registration, transponder, or airframe ID). + + Returns: + Dict mapping identifier to list of submissions for that identifier + """ + grouped = {} + + for submission in submissions: + # Determine identifier + if "registration_number" in submission: + key = f"reg:{submission['registration_number']}" + elif "transponder_code_hex" in submission: + key = f"icao:{submission['transponder_code_hex']}" + elif "planequery_airframe_id" in submission: + key = f"id:{submission['planequery_airframe_id']}" + else: + key = "_unknown" + + if key not in grouped: + grouped[key] = [] + grouped[key].append(submission) + + return grouped + + +def main(): + parser = argparse.ArgumentParser(description="Read community submission data") + parser.add_argument("--output", "-o", help="Output file (default: stdout)") + parser.add_argument("--group", action="store_true", help="Group by identifier") + parser.add_argument("--stats", action="store_true", help="Print statistics only") + + args = parser.parse_args() + + submissions = read_all_submissions() + + if args.stats: + grouped = group_by_identifier(submissions) + contributors = set(s.get("contributor_uuid", "unknown") for s in submissions) + + print(f"Total submissions: {len(submissions)}") + print(f"Unique identifiers: {len(grouped)}") + print(f"Unique contributors: {len(contributors)}") + return + + if args.group: + result = group_by_identifier(submissions) + else: + result = submissions + + output = json.dumps(result, indent=2) + + if args.output: + with open(args.output, "w") as f: + f.write(output) + print(f"Wrote {len(submissions)} submissions to {args.output}") + else: + print(output) + + +if __name__ == "__main__": + main() diff --git a/src/contributions/schema.py b/src/contributions/schema.py new file mode 100644 index 0000000..3bc9539 --- /dev/null +++ b/src/contributions/schema.py @@ -0,0 +1,117 @@ +"""Schema validation for community submissions.""" +import json +import re +from pathlib import Path +from typing import Any + +try: + from jsonschema import Draft202012Validator +except ImportError: + Draft202012Validator = None + + +SCHEMA_PATH = Path(__file__).parent.parent.parent / "schemas" / "community_submission.v1.schema.json" + + +def load_schema() -> dict: + """Load the community submission schema.""" + with open(SCHEMA_PATH) as f: + return json.load(f) + + +def validate_submission(data: dict | list, schema: dict | None = None) -> list[str]: + """ + Validate submission(s) against schema. + + Args: + data: Single submission dict or list of submissions + schema: Optional schema dict. If None, loads from default path. + + Returns: + List of error messages. Empty list means validation passed. + """ + if Draft202012Validator is None: + raise ImportError("jsonschema is required: pip install jsonschema") + + if schema is None: + schema = load_schema() + + submissions = data if isinstance(data, list) else [data] + errors = [] + + validator = Draft202012Validator(schema) + + for i, submission in enumerate(submissions): + prefix = f"[{i}] " if len(submissions) > 1 else "" + for error in validator.iter_errors(submission): + path = ".".join(str(p) for p in error.path) if error.path else "(root)" + errors.append(f"{prefix}{path}: {error.message}") + + return errors + + +def extract_json_from_issue_body(body: str) -> str | None: + """ + Extract JSON from GitHub issue body. + + Looks for JSON in the 'Submission JSON' section wrapped in code blocks. + + Args: + body: The issue body text + + Returns: + Extracted JSON string or None if not found + """ + # Match JSON in "### Submission JSON" section + pattern = r"### Submission JSON\s*\n\s*```(?:json)?\s*\n([\s\S]*?)\n\s*```" + match = re.search(pattern, body) + + if match: + return match.group(1).strip() + + return None + + +def extract_contributor_name_from_issue_body(body: str) -> str | None: + """ + Extract contributor name from GitHub issue body. + + Looks for the 'Contributor Name' field in the issue form. + + Args: + body: The issue body text + + Returns: + Contributor name string or None if not found/empty + """ + # Match "### Contributor Name" section + pattern = r"### Contributor Name\s*\n\s*(.+?)(?=\n###|\n\n|$)" + match = re.search(pattern, body) + + if match: + name = match.group(1).strip() + # GitHub issue forms show "_No response_" for empty optional fields + if name and name != "_No response_": + return name + + return None + + +def parse_and_validate(json_str: str, schema: dict | None = None) -> tuple[list | dict | None, list[str]]: + """ + Parse JSON string and validate against schema. + + Args: + json_str: JSON string to parse + schema: Optional schema dict + + Returns: + Tuple of (parsed data or None, list of errors) + """ + try: + data = json.loads(json_str) + except json.JSONDecodeError as e: + return None, [f"Invalid JSON: {e}"] + + errors = validate_submission(data, schema) + return data, errors diff --git a/src/contributions/validate_submission.py b/src/contributions/validate_submission.py new file mode 100644 index 0000000..e4d45b6 --- /dev/null +++ b/src/contributions/validate_submission.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +""" +Validate a community submission from a GitHub issue. + +This script is called by the GitHub Actions workflow to validate +submissions when issues are opened or edited. + +Usage: + python -m src.contributions.validate_submission --issue-body "..." + python -m src.contributions.validate_submission --file submission.json + echo '{"registration_number": "N12345"}' | python -m src.contributions.validate_submission --stdin + +Environment variables (for GitHub Actions): + GITHUB_TOKEN: GitHub API token + GITHUB_REPOSITORY: owner/repo + ISSUE_NUMBER: Issue number to comment on +""" +import argparse +import json +import os +import sys +import urllib.request +import urllib.error + +from .schema import extract_json_from_issue_body, parse_and_validate, load_schema + + +def github_api_request(method: str, endpoint: str, data: dict | None = None) -> dict: + """Make a GitHub API request.""" + token = os.environ.get("GITHUB_TOKEN") + repo = os.environ.get("GITHUB_REPOSITORY") + + if not token or not repo: + raise EnvironmentError("GITHUB_TOKEN and GITHUB_REPOSITORY must be set") + + url = f"https://api.github.com/repos/{repo}{endpoint}" + headers = { + "Authorization": f"token {token}", + "Accept": "application/vnd.github.v3+json", + "Content-Type": "application/json", + } + + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, headers=headers, method=method) + + with urllib.request.urlopen(req) as response: + return json.loads(response.read()) + + +def add_issue_comment(issue_number: int, body: str) -> None: + """Add a comment to a GitHub issue.""" + github_api_request("POST", f"/issues/{issue_number}/comments", {"body": body}) + + +def add_issue_label(issue_number: int, label: str) -> None: + """Add a label to a GitHub issue.""" + github_api_request("POST", f"/issues/{issue_number}/labels", {"labels": [label]}) + + +def remove_issue_label(issue_number: int, label: str) -> None: + """Remove a label from a GitHub issue.""" + try: + github_api_request("DELETE", f"/issues/{issue_number}/labels/{label}") + except urllib.error.HTTPError: + pass # Label might not exist + + +def validate_and_report(json_str: str, issue_number: int | None = None) -> bool: + """ + Validate JSON and optionally report to GitHub issue. + + Args: + json_str: JSON string to validate + issue_number: Optional issue number to comment on + + Returns: + True if validation passed, False otherwise + """ + data, errors = parse_and_validate(json_str) + + if errors: + error_list = "\n".join(f"- {e}" for e in errors) + message = f"❌ **Validation Failed**\n\n{error_list}\n\nPlease fix the errors and edit your submission." + + print(message, file=sys.stderr) + + if issue_number: + add_issue_comment(issue_number, message) + remove_issue_label(issue_number, "validated") + + return False + + count = len(data) if isinstance(data, list) else 1 + message = f"✅ **Validation Passed**\n\n{count} submission(s) validated successfully against the schema.\n\nA maintainer can approve this submission by adding the `approved` label." + + print(message) + + if issue_number: + add_issue_comment(issue_number, message) + add_issue_label(issue_number, "validated") + + return True + + +def main(): + parser = argparse.ArgumentParser(description="Validate community submission JSON") + source_group = parser.add_mutually_exclusive_group(required=True) + source_group.add_argument("--issue-body", help="Issue body text containing JSON") + source_group.add_argument("--file", help="JSON file to validate") + source_group.add_argument("--stdin", action="store_true", help="Read JSON from stdin") + + parser.add_argument("--issue-number", type=int, help="GitHub issue number to comment on") + + args = parser.parse_args() + + # Get JSON string + if args.issue_body: + json_str = extract_json_from_issue_body(args.issue_body) + if not json_str: + print("❌ Could not extract JSON from issue body", file=sys.stderr) + if args.issue_number: + add_issue_comment( + args.issue_number, + "❌ **Validation Failed**\n\nCould not extract JSON from submission. " + "Please ensure your JSON is in the 'Submission JSON' field wrapped in code blocks." + ) + sys.exit(1) + elif args.file: + with open(args.file) as f: + json_str = f.read() + else: # stdin + json_str = sys.stdin.read() + + # Validate + success = validate_and_report(json_str, args.issue_number) + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/src/create_daily_planequery_aircraft_adsb_release.py b/src/create_daily_planequery_aircraft_adsb_release.py new file mode 100644 index 0000000..e5de1f8 --- /dev/null +++ b/src/create_daily_planequery_aircraft_adsb_release.py @@ -0,0 +1,84 @@ +from pathlib import Path +from datetime import datetime, timezone, timedelta +import sys + +import polars as pl + +# Add adsb directory to path +sys.path.insert(0, str(Path(__file__).parent / "adsb")) # TODO: Fix this hacky path manipulation + +from adsb.compress_adsb_to_aircraft_data import ( + load_historical_for_day, + concat_compressed_dfs, + get_latest_aircraft_adsb_csv_df, +) + +if __name__ == '__main__': + # Get yesterday's date (data for the previous day) + day = datetime.now(timezone.utc) - timedelta(days=1) + + # Find a day with complete data + max_attempts = 2 # Don't look back more than a week + for attempt in range(max_attempts): + date_str = day.strftime("%Y-%m-%d") + print(f"Processing ADS-B data for {date_str}") + + print("Loading new ADS-B data...") + df_new = load_historical_for_day(day) + if df_new.height == 0: + day = day - timedelta(days=1) + continue + max_time = df_new['time'].max() + if max_time is not None: + # Handle timezone + max_time_dt = max_time + if hasattr(max_time_dt, 'replace'): + max_time_dt = max_time_dt.replace(tzinfo=timezone.utc) + + end_of_day = day.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) - timedelta(minutes=5) + + # Convert polars datetime to python datetime if needed + if isinstance(max_time_dt, datetime): + if max_time_dt.replace(tzinfo=timezone.utc) >= end_of_day: + break + else: + # Polars returns python datetime already + if max_time >= day.replace(hour=23, minute=54, second=59): + break + + print(f"WARNING: Latest data time is {max_time}, which is more than 5 minutes before end of day.") + day = day - timedelta(days=1) + else: + raise RuntimeError(f"Could not find complete data in the last {max_attempts} days") + + try: + # Get the latest release data + print("Downloading latest ADS-B release...") + df_base, start_date_str = get_latest_aircraft_adsb_csv_df() + # Combine with historical data + print("Combining with historical data...") + df_combined = concat_compressed_dfs(df_base, df_new) + except Exception as e: + print(f"Error downloading latest ADS-B release: {e}") + df_combined = df_new + start_date_str = date_str + + # Sort by time for consistent ordering + df_combined = df_combined.sort('time') + + # Convert any list columns to strings for CSV compatibility + for col in df_combined.columns: + if df_combined[col].dtype == pl.List: + df_combined = df_combined.with_columns( + pl.col(col).list.join(",").alias(col) + ) + + # Save the result + OUT_ROOT = Path("data/planequery_aircraft") + OUT_ROOT.mkdir(parents=True, exist_ok=True) + + output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{date_str}.csv" + df_combined.write_csv(output_file) + + print(f"Saved: {output_file}") + print(f"Total aircraft: {df_combined.height}") diff --git a/src/create_daily_planequery_aircraft_release.py b/src/create_daily_planequery_aircraft_release.py index 4019aa7..559f8fc 100644 --- a/src/create_daily_planequery_aircraft_release.py +++ b/src/create_daily_planequery_aircraft_release.py @@ -25,9 +25,9 @@ if not zip_path.exists(): OUT_ROOT = Path("data/planequery_aircraft") OUT_ROOT.mkdir(parents=True, exist_ok=True) from derive_from_faa_master_txt import convert_faa_master_txt_to_df, concat_faa_historical_df -from get_latest_planequery_aircraft_release import get_latest_aircraft_csv_df +from get_latest_planequery_aircraft_release import get_latest_aircraft_faa_csv_df df_new = convert_faa_master_txt_to_df(zip_path, date_str) -df_base, start_date_str = get_latest_aircraft_csv_df() +df_base, start_date_str = get_latest_aircraft_faa_csv_df() df_base = concat_faa_historical_df(df_base, df_new) assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" -df_base.to_csv(OUT_ROOT / f"planequery_aircraft_{start_date_str}_{date_str}.csv", index=False) \ No newline at end of file +df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date_str}_{date_str}.csv", index=False) \ No newline at end of file diff --git a/src/get_historical_faa.py b/src/get_historical_faa.py index 51cdcd4..656345e 100644 --- a/src/get_historical_faa.py +++ b/src/get_historical_faa.py @@ -112,5 +112,5 @@ for date, sha in date_to_sha.items(): print(len(df_base), "total entries so far") assert df_base['download_date'].is_monotonic_increasing, "download_date is not monotonic increasing" -df_base.to_csv(OUT_ROOT / f"planequery_aircraft_{start_date}_{end_date}.csv", index=False) +df_base.to_csv(OUT_ROOT / f"planequery_aircraft_faa_{start_date}_{end_date}.csv", index=False) # TODO: get average number of new rows per day. diff --git a/src/get_latest_planequery_aircraft_release.py b/src/get_latest_planequery_aircraft_release.py index c264250..9867a5d 100644 --- a/src/get_latest_planequery_aircraft_release.py +++ b/src/get_latest_planequery_aircraft_release.py @@ -109,7 +109,7 @@ def download_latest_aircraft_csv( repo: str = REPO, ) -> Path: """ - Download the latest planequery_aircraft_*.csv file from the latest GitHub release. + Download the latest planequery_aircraft_faa_*.csv file from the latest GitHub release. Args: output_dir: Directory to save the downloaded file (default: "downloads") @@ -120,25 +120,70 @@ def download_latest_aircraft_csv( Path to the downloaded file """ assets = get_latest_release_assets(repo, github_token=github_token) - asset = pick_asset(assets, name_regex=r"^planequery_aircraft_.*\.csv$") + try: + asset = pick_asset(assets, name_regex=r"^planequery_aircraft_faa_.*\.csv$") + except FileNotFoundError: + # Fallback to old naming pattern + asset = pick_asset(assets, name_regex=r"^planequery_aircraft_\d{4}-\d{2}-\d{2}_.*\.csv$") saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") return saved_to -def get_latest_aircraft_csv_df(): +def get_latest_aircraft_faa_csv_df(): csv_path = download_latest_aircraft_csv() import pandas as pd df = pd.read_csv(csv_path, dtype={'transponder_code': str, 'unique_regulatory_id': str, 'registrant_county': str}) df = df.fillna("") - # Extract date from filename pattern: planequery_aircraft_{date}_{date}.csv - match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + # Extract start date from filename pattern: planequery_aircraft_faa_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_faa_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + if not match: + # Fallback to old naming pattern: planequery_aircraft_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_(\d{4}-\d{2}-\d{2})_", str(csv_path)) if not match: raise ValueError(f"Could not extract date from filename: {csv_path.name}") date_str = match.group(1) return df, date_str + +def download_latest_aircraft_adsb_csv( + output_dir: Path = Path("downloads"), + github_token: Optional[str] = None, + repo: str = REPO, +) -> Path: + """ + Download the latest planequery_aircraft_adsb_*.csv file from the latest GitHub release. + + Args: + output_dir: Directory to save the downloaded file (default: "downloads") + github_token: Optional GitHub token for authentication + repo: GitHub repository in format "owner/repo" (default: REPO) + + Returns: + Path to the downloaded file + """ + assets = get_latest_release_assets(repo, github_token=github_token) + asset = pick_asset(assets, name_regex=r"^planequery_aircraft_adsb_.*\.csv$") + saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) + print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") + return saved_to + + +def get_latest_aircraft_adsb_csv_df(): + csv_path = download_latest_aircraft_adsb_csv() + import pandas as pd + df = pd.read_csv(csv_path) + df = df.fillna("") + # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv + match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) + if not match: + raise ValueError(f"Could not extract date from filename: {csv_path.name}") + + date_str = match.group(1) + return df, date_str + + if __name__ == "__main__": download_latest_aircraft_csv() diff --git a/trigger_pipeline.py b/trigger_pipeline.py new file mode 100644 index 0000000..56f47d8 --- /dev/null +++ b/trigger_pipeline.py @@ -0,0 +1,90 @@ +""" +Generate Step Functions input and start the pipeline. + +Usage: + python trigger_pipeline.py 2024-01-01 2025-01-01 + python trigger_pipeline.py 2024-01-01 2025-01-01 --chunk-days 30 + python trigger_pipeline.py 2024-01-01 2025-01-01 --dry-run +""" +import argparse +import json +import os +import uuid +from datetime import datetime, timedelta + +import boto3 + + +def generate_chunks(start_date: str, end_date: str, chunk_days: int = 1): + """Split a date range into chunks of chunk_days.""" + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + + chunks = [] + current = start + while current < end: + chunk_end = min(current + timedelta(days=chunk_days), end) + chunks.append({ + "start_date": current.strftime("%Y-%m-%d"), + "end_date": chunk_end.strftime("%Y-%m-%d"), + }) + current = chunk_end + + return chunks + + +def main(): + parser = argparse.ArgumentParser(description="Trigger ADS-B map-reduce pipeline") + parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)") + parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)") + parser.add_argument("--chunk-days", type=int, default=1, + help="Days per chunk (default: 1)") + parser.add_argument("--dry-run", action="store_true", + help="Print input JSON without starting execution") + args = parser.parse_args() + + run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}" + chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days) + + # Inject run_id into each chunk + for chunk in chunks: + chunk["run_id"] = run_id + + sfn_input = { + "run_id": run_id, + "global_start_date": args.start_date, + "global_end_date": args.end_date, + "chunks": chunks, + } + + print(f"Run ID: {run_id}") + print(f"Chunks: {len(chunks)} (at {args.chunk_days} days each)") + print(f"Max concurrency: 3 (enforced by Step Functions Map state)") + print() + print(json.dumps(sfn_input, indent=2)) + + if args.dry_run: + print("\n--dry-run: not starting execution") + return + + client = boto3.client("stepfunctions") + + # Find the state machine ARN + machines = client.list_state_machines()["stateMachines"] + arn = next( + m["stateMachineArn"] + for m in machines + if m["name"] == "adsb-map-reduce" + ) + + response = client.start_execution( + stateMachineArn=arn, + name=run_id, + input=json.dumps(sfn_input), + ) + + print(f"\nStarted execution: {response['executionArn']}") + + +if __name__ == "__main__": + main()