From 722bcdf791c0ce033f807378e7cc9f3c81d7b90e Mon Sep 17 00:00:00 2001 From: ggman12 Date: Wed, 11 Feb 2026 14:04:27 -0500 Subject: [PATCH] FEATURE: Add contributions framework. Fix and improve daily adsb release using Github actions for map reduce. --- .../ISSUE_TEMPLATE/community_submission.yaml | 89 ++++++ .../approve-community-submission.yaml | 46 +++ .../planequery-aircraft-daily-release.yaml | 247 +++++++++++++++- .../validate-community-submission.yaml | 30 ++ infra/stack.py | 16 +- requirements.txt | 4 +- schemas/community_submission.v1.schema.json | 80 ++++++ src/adsb/Dockerfile.reducer | 1 + src/adsb/Dockerfile.worker | 1 + src/adsb/adsb_to_aircraft_data_daily.py | 7 - src/adsb/adsb_to_aircraft_data_historical.py | 87 ------ src/adsb/combine_chunks_to_csv.py | 205 +++++++++++++ src/adsb/compress_adsb_to_aircraft_data.py | 250 +++++++++++----- src/adsb/download_adsb_data_to_parquet.py | 183 +++++++----- src/adsb/download_and_list_icaos.py | 148 ++++++++++ src/adsb/process_icao_chunk.py | 270 ++++++++++++++++++ src/adsb/reducer.py | 60 ++-- src/adsb/requirements.reducer.txt | 2 +- src/adsb/requirements.worker.txt | 5 +- src/adsb/worker.py | 61 ++-- src/contributions/__init__.py | 1 + src/contributions/approve_submission.py | 249 ++++++++++++++++ src/contributions/contributor.py | 86 ++++++ .../create_daily_community_release.py | 141 +++++++++ src/contributions/read_community_data.py | 115 ++++++++ src/contributions/schema.py | 117 ++++++++ src/contributions/validate_submission.py | 140 +++++++++ ..._daily_planequery_aircraft_adsb_release.py | 40 ++- trigger_pipeline.py | 9 +- 29 files changed, 2347 insertions(+), 343 deletions(-) create mode 100644 .github/ISSUE_TEMPLATE/community_submission.yaml create mode 100644 .github/workflows/approve-community-submission.yaml create mode 100644 .github/workflows/validate-community-submission.yaml create mode 100644 schemas/community_submission.v1.schema.json delete mode 100644 src/adsb/adsb_to_aircraft_data_daily.py delete mode 100644 src/adsb/adsb_to_aircraft_data_historical.py create mode 100644 src/adsb/combine_chunks_to_csv.py create mode 100644 src/adsb/download_and_list_icaos.py create mode 100644 src/adsb/process_icao_chunk.py create mode 100644 src/contributions/__init__.py create mode 100644 src/contributions/approve_submission.py create mode 100644 src/contributions/contributor.py create mode 100644 src/contributions/create_daily_community_release.py create mode 100644 src/contributions/read_community_data.py create mode 100644 src/contributions/schema.py create mode 100644 src/contributions/validate_submission.py diff --git a/.github/ISSUE_TEMPLATE/community_submission.yaml b/.github/ISSUE_TEMPLATE/community_submission.yaml new file mode 100644 index 0000000..104aa8e --- /dev/null +++ b/.github/ISSUE_TEMPLATE/community_submission.yaml @@ -0,0 +1,89 @@ +name: Community submission (JSON) +description: Submit one or more community records (JSON) to be reviewed and approved. +title: "Community submission: " +labels: + - community + - submission +body: + - type: markdown + attributes: + value: | + Submit **one object** or an **array of objects** that matches the community submission schema. + + **Rules (enforced on review/automation):** + - Each object must include **at least one** of: + - `registration_number` + - `transponder_code_hex` (6 hex chars) + - `planequery_airframe_id` + - Your contributor name (entered below) will be applied to all objects. + - `contributor_uuid` is derived from your GitHub account automatically. + - `creation_timestamp` is created by the system (you may omit it). + + **Example: single object** + ```json + { + "transponder_code_hex": "a1b2c3" + } + ``` + + **Example: multiple objects (array)** + ```json + [ + { + "registration_number": "N123AB" + }, + { + "planequery_airframe_id": "cessna|172s|12345", + "transponder_code_hex": "0f1234" + } + ] + ``` + + - type: input + id: contributor_name + attributes: + label: Contributor Name + description: Your display name for attribution. Leave blank to use your GitHub username. Max 150 characters. + placeholder: "e.g., JamesBerry.com or leave blank" + validations: + required: false + + - type: textarea + id: submission_json + attributes: + label: Submission JSON + description: Paste either one JSON object or an array of JSON objects. Must be valid JSON. Do not include contributor_name or contributor_uuid in your JSON. + placeholder: | + Paste JSON here... + validations: + required: true + + - type: dropdown + id: submission_type + attributes: + label: What did you submit? + options: + - Single object + - Multiple objects (array) + validations: + required: true + + - type: checkboxes + id: confirmations + attributes: + label: Confirmations + options: + - label: "I confirm this is valid JSON (not JSONL) and matches the field names exactly." + required: true + - label: "I confirm `transponder_code_hex` values (if provided) are 6 hex characters." + required: true + - label: "I understand submissions are reviewed and may be rejected or require changes." + required: true + + - type: textarea + id: notes + attributes: + label: Notes (optional) + description: Any context, sources, or links that help validate your submission. + validations: + required: false \ No newline at end of file diff --git a/.github/workflows/approve-community-submission.yaml b/.github/workflows/approve-community-submission.yaml new file mode 100644 index 0000000..19f9fbd --- /dev/null +++ b/.github/workflows/approve-community-submission.yaml @@ -0,0 +1,46 @@ +name: Approve Community Submission + +on: + issues: + types: [labeled] + +permissions: + contents: write + pull-requests: write + issues: write + +jobs: + approve: + if: github.event.label.name == 'approved' && contains(github.event.issue.labels.*.name, 'validated') + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: pip install jsonschema + + - name: Get issue author ID + id: author + uses: actions/github-script@v7 + with: + script: | + const issue = context.payload.issue; + core.setOutput('username', issue.user.login); + core.setOutput('user_id', issue.user.id); + + - name: Process and create PR + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_REPOSITORY: ${{ github.repository }} + run: | + python -m src.contributions.approve_submission \ + --issue-number ${{ github.event.issue.number }} \ + --issue-body "${{ github.event.issue.body }}" \ + --author "${{ steps.author.outputs.username }}" \ + --author-id ${{ steps.author.outputs.user_id }} diff --git a/.github/workflows/planequery-aircraft-daily-release.yaml b/.github/workflows/planequery-aircraft-daily-release.yaml index 3cb65f9..37c96d1 100644 --- a/.github/workflows/planequery-aircraft-daily-release.yaml +++ b/.github/workflows/planequery-aircraft-daily-release.yaml @@ -36,10 +36,9 @@ jobs: ref: 'develop' }); - build-and-release: + build-faa: runs-on: ubuntu-24.04-arm if: github.event_name != 'schedule' - steps: - name: Checkout uses: actions/checkout@v6 @@ -56,17 +55,227 @@ jobs: python -m pip install --upgrade pip pip install -r requirements.txt - - name: Run daily release script - env: - CLICKHOUSE_HOST: ${{ secrets.CLICKHOUSE_HOST }} - CLICKHOUSE_USERNAME: ${{ secrets.CLICKHOUSE_USERNAME }} - CLICKHOUSE_PASSWORD: ${{ secrets.CLICKHOUSE_PASSWORD }} + - name: Run FAA release script run: | python src/create_daily_planequery_aircraft_release.py - python src/create_daily_planequery_aircraft_adsb_release.py ls -lah data/faa_releasable ls -lah data/planequery_aircraft + - name: Upload FAA artifacts + uses: actions/upload-artifact@v4 + with: + name: faa-release + path: | + data/planequery_aircraft/planequery_aircraft_faa_*.csv + data/faa_releasable/ReleasableAircraft_*.zip + retention-days: 1 + + adsb-extract: + runs-on: ubuntu-24.04-arm + if: github.event_name != 'schedule' + outputs: + manifest-exists: ${{ steps.check.outputs.exists }} + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.14" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download and extract ADS-B data + run: | + python -m src.adsb.download_and_list_icaos + ls -lah data/output/ + + - name: Check manifest exists + id: check + run: | + if ls data/output/icao_manifest_*.txt 1>/dev/null 2>&1; then + echo "exists=true" >> "$GITHUB_OUTPUT" + else + echo "exists=false" >> "$GITHUB_OUTPUT" + fi + + - name: Create tar of extracted data + run: | + cd data/output + tar -cf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt + ls -lah extracted_data.tar + + - name: Upload extracted data + uses: actions/upload-artifact@v4 + with: + name: adsb-extracted + path: data/output/extracted_data.tar + retention-days: 1 + compression-level: 0 # Already compressed trace files + + adsb-map: + runs-on: ubuntu-24.04-arm + needs: adsb-extract + if: github.event_name != 'schedule' && needs.adsb-extract.outputs.manifest-exists == 'true' + strategy: + fail-fast: false + matrix: + chunk: [0, 1, 2, 3] + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.14" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download extracted data + uses: actions/download-artifact@v4 + with: + name: adsb-extracted + path: data/output/ + + - name: Extract tar + run: | + cd data/output + tar -xf extracted_data.tar + rm extracted_data.tar + echo "=== Contents of data/output ===" + ls -lah + echo "=== Looking for manifest ===" + cat icao_manifest_*.txt | head -20 || echo "No manifest found" + echo "=== Looking for extracted dirs ===" + ls -d *-planes-readsb-prod-0* 2>/dev/null || echo "No extracted dirs" + + - name: Process chunk ${{ matrix.chunk }} + run: | + python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4 + mkdir -p data/output/adsb_chunks + ls -lah data/output/adsb_chunks/ || echo "No chunks created" + + - name: Upload chunk artifacts + uses: actions/upload-artifact@v4 + with: + name: adsb-chunk-${{ matrix.chunk }} + path: data/output/adsb_chunks/ + retention-days: 1 + + adsb-reduce: + runs-on: ubuntu-24.04-arm + needs: adsb-map + if: github.event_name != 'schedule' + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.14" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download all chunk artifacts + uses: actions/download-artifact@v4 + with: + pattern: adsb-chunk-* + path: data/output/adsb_chunks/ + merge-multiple: true + + - name: Debug downloaded files + run: | + echo "=== Listing data/ ===" + find data/ -type f 2>/dev/null | head -50 || echo "No files in data/" + echo "=== Looking for parquet files ===" + find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found" + + - name: Combine chunks to CSV + run: | + mkdir -p data/output/adsb_chunks + ls -lah data/output/adsb_chunks/ || echo "Directory empty or does not exist" + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks + ls -lah data/planequery_aircraft/ + + - name: Upload ADS-B artifacts + uses: actions/upload-artifact@v4 + with: + name: adsb-release + path: data/planequery_aircraft/planequery_aircraft_adsb_*.csv + retention-days: 1 + + build-community: + runs-on: ubuntu-latest + if: github.event_name != 'schedule' + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.14" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pandas + + - name: Run Community release script + run: | + python -m src.contributions.create_daily_community_release + ls -lah data/planequery_aircraft + + - name: Upload Community artifacts + uses: actions/upload-artifact@v4 + with: + name: community-release + path: data/planequery_aircraft/planequery_aircraft_community_*.csv + retention-days: 1 + + create-release: + runs-on: ubuntu-latest + needs: [build-faa, adsb-reduce, build-community] + if: github.event_name != 'schedule' + steps: + - name: Download FAA artifacts + uses: actions/download-artifact@v4 + with: + name: faa-release + path: artifacts/faa + + - name: Download ADS-B artifacts + uses: actions/download-artifact@v4 + with: + name: adsb-release + path: artifacts/adsb + + - name: Download Community artifacts + uses: actions/download-artifact@v4 + with: + name: community-release + path: artifacts/community + - name: Prepare release metadata id: meta run: | @@ -79,17 +288,27 @@ jobs: BRANCH_SUFFIX="-develop" fi TAG="planequery-aircraft-${DATE}${BRANCH_SUFFIX}" - # Find the CSV files in data/planequery_aircraft matching the patterns - CSV_FILE_FAA=$(ls data/planequery_aircraft/planequery_aircraft_faa_*_${DATE}.csv | head -1) + + # Find files from artifacts + CSV_FILE_FAA=$(ls artifacts/faa/data/planequery_aircraft/planequery_aircraft_faa_*.csv | head -1) CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA") - CSV_FILE_ADSB=$(ls data/planequery_aircraft/planequery_aircraft_adsb_*_${DATE}.csv | head -1) + CSV_FILE_ADSB=$(ls artifacts/adsb/planequery_aircraft_adsb_*.csv | head -1) CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB") + CSV_FILE_COMMUNITY=$(ls artifacts/community/planequery_aircraft_community_*.csv 2>/dev/null | head -1 || echo "") + CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "") + ZIP_FILE=$(ls artifacts/faa/data/faa_releasable/ReleasableAircraft_*.zip | head -1) + ZIP_BASENAME=$(basename "$ZIP_FILE") + echo "date=$DATE" >> "$GITHUB_OUTPUT" echo "tag=$TAG" >> "$GITHUB_OUTPUT" echo "csv_file_faa=$CSV_FILE_FAA" >> "$GITHUB_OUTPUT" echo "csv_basename_faa=$CSV_BASENAME_FAA" >> "$GITHUB_OUTPUT" echo "csv_file_adsb=$CSV_FILE_ADSB" >> "$GITHUB_OUTPUT" echo "csv_basename_adsb=$CSV_BASENAME_ADSB" >> "$GITHUB_OUTPUT" + echo "csv_file_community=$CSV_FILE_COMMUNITY" >> "$GITHUB_OUTPUT" + echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT" + echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT" + echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT" echo "name=planequery-aircraft snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT" - name: Create GitHub Release and upload assets @@ -103,10 +322,12 @@ jobs: Assets: - ${{ steps.meta.outputs.csv_basename_faa }} - ${{ steps.meta.outputs.csv_basename_adsb }} - - ReleasableAircraft_${{ steps.meta.outputs.date }}.zip + - ${{ steps.meta.outputs.csv_basename_community }} + - ${{ steps.meta.outputs.zip_basename }} files: | ${{ steps.meta.outputs.csv_file_faa }} ${{ steps.meta.outputs.csv_file_adsb }} - data/faa_releasable/ReleasableAircraft_${{ steps.meta.outputs.date }}.zip + ${{ steps.meta.outputs.csv_file_community }} + ${{ steps.meta.outputs.zip_file }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/validate-community-submission.yaml b/.github/workflows/validate-community-submission.yaml new file mode 100644 index 0000000..e217401 --- /dev/null +++ b/.github/workflows/validate-community-submission.yaml @@ -0,0 +1,30 @@ +name: Validate Community Submission + +on: + issues: + types: [opened, edited] + +jobs: + validate: + if: contains(github.event.issue.labels.*.name, 'submission') + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: pip install jsonschema + + - name: Validate submission + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_REPOSITORY: ${{ github.repository }} + run: | + python -m src.contributions.validate_submission \ + --issue-body "${{ github.event.issue.body }}" \ + --issue-number ${{ github.event.issue.number }} diff --git a/infra/stack.py b/infra/stack.py index 84f6d49..a54bd79 100644 --- a/infra/stack.py +++ b/infra/stack.py @@ -79,8 +79,8 @@ class AdsbProcessingStack(Stack): # --- MAP: worker task definition --- map_task_def = ecs.FargateTaskDefinition( self, "MapTaskDef", - cpu=16384, # 16 vCPU - memory_limit_mib=65536, # 64 GB — high memory for pandas operations on large datasets + cpu=4096, # 4 vCPU + memory_limit_mib=30720, # 30 GB task_role=task_role, runtime_platform=ecs.RuntimePlatform( cpu_architecture=ecs.CpuArchitecture.ARM64, @@ -149,18 +149,6 @@ class AdsbProcessingStack(Stack): name="RUN_ID", value=sfn.JsonPath.string_at("$.run_id"), ), - sfn_tasks.TaskEnvironmentVariable( - name="CLICKHOUSE_HOST", - value=sfn.JsonPath.string_at("$.clickhouse_host"), - ), - sfn_tasks.TaskEnvironmentVariable( - name="CLICKHOUSE_USERNAME", - value=sfn.JsonPath.string_at("$.clickhouse_username"), - ), - sfn_tasks.TaskEnvironmentVariable( - name="CLICKHOUSE_PASSWORD", - value=sfn.JsonPath.string_at("$.clickhouse_password"), - ), ], ) ], diff --git a/requirements.txt b/requirements.txt index 387292e..6a4ec9a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ faa-aircraft-registry==0.1.0 pandas==3.0.0 -clickhouse-connect==0.10.0 +pyarrow==23.0.0 +orjson==3.11.7 +polars==1.38.1 diff --git a/schemas/community_submission.v1.schema.json b/schemas/community_submission.v1.schema.json new file mode 100644 index 0000000..18609a4 --- /dev/null +++ b/schemas/community_submission.v1.schema.json @@ -0,0 +1,80 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "PlaneQuery Aircraft Community Submission (v1)", + "type": "object", + "additionalProperties": false, + + "properties": { + "registration_number": { + "type": "string", + "minLength": 1 + }, + "transponder_code_hex": { + "type": "string", + "pattern": "^[0-9A-Fa-f]{6}$" + }, + "planequery_airframe_id": { + "type": "string", + "minLength": 1 + }, + + "contributor_uuid": { + "type": "string", + "format": "uuid" + }, + "contributor_name": { + "type": "string", + "minLength": 0, + "maxLength": 150, + "description": "Display name (may be blank)" + }, + + "creation_timestamp": { + "type": "string", + "format": "date-time", + "description": "Set by the system when the submission is persisted/approved.", + "readOnly": true + }, + + "tags": { + "type": "object", + "description": "Additional community-defined tags as key/value pairs (values may be scalar, array, or object).", + "propertyNames": { + "type": "string", + "pattern": "^[a-z][a-z0-9_]{0,63}$" + }, + "additionalProperties": { "$ref": "#/$defs/tagValue" } + } + }, + + "allOf": [ + { + "anyOf": [ + { "required": ["registration_number"] }, + { "required": ["transponder_code_hex"] }, + { "required": ["planequery_airframe_id"] } + ] + } + ], + + "$defs": { + "tagScalar": { + "type": ["string", "number", "integer", "boolean", "null"] + }, + "tagValue": { + "anyOf": [ + { "$ref": "#/$defs/tagScalar" }, + { + "type": "array", + "maxItems": 50, + "items": { "$ref": "#/$defs/tagScalar" } + }, + { + "type": "object", + "maxProperties": 50, + "additionalProperties": { "$ref": "#/$defs/tagScalar" } + } + ] + } + } +} \ No newline at end of file diff --git a/src/adsb/Dockerfile.reducer b/src/adsb/Dockerfile.reducer index ddc06db..b375f46 100644 --- a/src/adsb/Dockerfile.reducer +++ b/src/adsb/Dockerfile.reducer @@ -5,6 +5,7 @@ WORKDIR /app COPY requirements.reducer.txt requirements.txt RUN pip install --no-cache-dir -r requirements.txt +COPY compress_adsb_to_aircraft_data.py . COPY reducer.py . CMD ["python", "-u", "reducer.py"] diff --git a/src/adsb/Dockerfile.worker b/src/adsb/Dockerfile.worker index e721442..dc4336d 100644 --- a/src/adsb/Dockerfile.worker +++ b/src/adsb/Dockerfile.worker @@ -6,6 +6,7 @@ COPY requirements.worker.txt requirements.txt RUN pip install --no-cache-dir -r requirements.txt COPY compress_adsb_to_aircraft_data.py . +COPY download_adsb_data_to_parquet.py . COPY worker.py . CMD ["python", "-u", "worker.py"] diff --git a/src/adsb/adsb_to_aircraft_data_daily.py b/src/adsb/adsb_to_aircraft_data_daily.py deleted file mode 100644 index 380f187..0000000 --- a/src/adsb/adsb_to_aircraft_data_daily.py +++ /dev/null @@ -1,7 +0,0 @@ -from pathlib import Path -from datetime import datetime, timezone,timedelta -from adsb_to_aircraft_data_historical import load_historical_for_day - - -day = datetime.now(timezone.utc) - timedelta(days=1) -load_historical_for_day(day) \ No newline at end of file diff --git a/src/adsb/adsb_to_aircraft_data_historical.py b/src/adsb/adsb_to_aircraft_data_historical.py deleted file mode 100644 index 1ff7acf..0000000 --- a/src/adsb/adsb_to_aircraft_data_historical.py +++ /dev/null @@ -1,87 +0,0 @@ -""" -Process historical ADS-B data by date range. -Downloads and compresses ADS-B messages for each day in the specified range. -""" -import argparse -from datetime import datetime, timedelta -from pathlib import Path -import pandas as pd -from compress_adsb_to_aircraft_data import load_historical_for_day, COLUMNS - -def deduplicate_by_signature(df): - """For each icao, keep only the earliest row with each unique signature.""" - df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1) - # Group by icao and signature, keep first (earliest) occurrence - df_deduped = df.groupby(['icao', '_signature'], as_index=False).first() - df_deduped = df_deduped.drop(columns=['_signature']) - df_deduped = df_deduped.sort_values('time') - return df_deduped - - -def main(start_date_str: str, end_date_str: str): - """Process historical ADS-B data for the given date range.""" - OUT_ROOT = Path("data/planequery_aircraft") - OUT_ROOT.mkdir(parents=True, exist_ok=True) - - # Parse dates - start_date = datetime.strptime(start_date_str, "%Y-%m-%d") - end_date = datetime.strptime(end_date_str, "%Y-%m-%d") - - # Calculate total number of days - total_days = (end_date - start_date).days - print(f"Processing {total_days} days from {start_date_str} to {end_date_str}") - - # Initialize accumulated dataframe - df_accumulated = pd.DataFrame() - - # Cache directory path - cache_dir = Path("data/adsb") - - # Iterate through each day - current_date = start_date - while current_date < end_date: - print(f"Processing {current_date.strftime('%Y-%m-%d')}...") - - df_compressed = load_historical_for_day(current_date) - - # Concatenate to accumulated dataframe - if df_accumulated.empty: - df_accumulated = df_compressed - else: - df_accumulated = pd.concat([df_accumulated, df_compressed], ignore_index=True) - - print(f" Added {len(df_compressed)} records (total: {len(df_accumulated)})") - - # Save intermediate output after each day - current_date_str = current_date.strftime('%Y-%m-%d') - output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{current_date_str}.csv.gz" - df_deduped = deduplicate_by_signature(df_accumulated.copy()) - df_deduped.to_csv(output_file, index=False, compression='gzip') - print(f" Saved to {output_file.name}") - - # Delete cache after processing if processing more than 10 days - if total_days > 5 and cache_dir.exists(): - import shutil - shutil.rmtree(cache_dir) - print(f" Deleted cache directory to save space") - - # Move to next day - current_date += timedelta(days=1) - - # Save the final accumulated data - output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{end_date_str}.csv.gz" - df_accumulated = deduplicate_by_signature(df_accumulated) - df_accumulated.to_csv(output_file, index=False, compression='gzip') - - print(f"Completed processing from {start_date_str} to {end_date_str}") - print(f"Saved {len(df_accumulated)} total records to {output_file}") - - -if __name__ == '__main__': - # Parse command line arguments - parser = argparse.ArgumentParser(description="Process historical ADS-B data from ClickHouse") - parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)") - parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)") - args = parser.parse_args() - - main(args.start_date, args.end_date) diff --git a/src/adsb/combine_chunks_to_csv.py b/src/adsb/combine_chunks_to_csv.py new file mode 100644 index 0000000..d8941d3 --- /dev/null +++ b/src/adsb/combine_chunks_to_csv.py @@ -0,0 +1,205 @@ +""" +Combines chunk parquet files and compresses to final aircraft CSV. +This is the reduce phase of the map-reduce pipeline. + +Memory-efficient: processes each chunk separately, compresses, then combines. + +Usage: + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks +""" +import gc +import os +import sys +import glob +import argparse +from datetime import datetime, timedelta + +import polars as pl + +from src.adsb.download_adsb_data_to_parquet import OUTPUT_DIR, get_resource_usage +from src.adsb.compress_adsb_to_aircraft_data import compress_multi_icao_df, COLUMNS + + +DEFAULT_CHUNK_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks") +FINAL_OUTPUT_DIR = "./data/planequery_aircraft" +os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True) + + +def get_target_day() -> datetime: + """Get yesterday's date (the day we're processing).""" + return datetime.utcnow() - timedelta(days=1) + + +def process_single_chunk(chunk_path: str) -> pl.DataFrame: + """Load and compress a single chunk parquet file.""" + print(f"Processing {os.path.basename(chunk_path)}... | {get_resource_usage()}") + + # Load chunk - only columns we need + needed_columns = ['time', 'icao'] + COLUMNS + df = pl.read_parquet(chunk_path, columns=needed_columns) + print(f" Loaded {len(df)} rows") + + # Compress to aircraft records (one per ICAO) using shared function + compressed = compress_multi_icao_df(df, verbose=True) + print(f" Compressed to {len(compressed)} aircraft records") + + del df + gc.collect() + + return compressed + + +def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFrame: + """Combine multiple compressed DataFrames. + + Since chunks are partitioned by ICAO hash, each ICAO only appears in one chunk. + No deduplication needed here - just concatenate. + """ + print(f"Combining {len(compressed_dfs)} compressed chunks... | {get_resource_usage()}") + + # Concat all + combined = pl.concat(compressed_dfs) + print(f"Combined: {len(combined)} records") + + return combined + + +def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame: + """Download base release and merge with new data.""" + from src.get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv + + print("Downloading base ADS-B release...") + try: + base_path = download_latest_aircraft_adsb_csv( + output_dir="./data/planequery_aircraft_base" + ) + print(f"Download returned: {base_path}") + + if base_path and os.path.exists(str(base_path)): + print(f"Loading base release from {base_path}") + base_df = pl.read_csv(base_path) + print(f"Base release has {len(base_df)} records") + + # Ensure columns match + base_cols = set(base_df.columns) + new_cols = set(compressed_df.columns) + print(f"Base columns: {sorted(base_cols)}") + print(f"New columns: {sorted(new_cols)}") + + # Add missing columns + for col in new_cols - base_cols: + base_df = base_df.with_columns(pl.lit(None).alias(col)) + for col in base_cols - new_cols: + compressed_df = compressed_df.with_columns(pl.lit(None).alias(col)) + + # Reorder columns to match + compressed_df = compressed_df.select(base_df.columns) + + # Concat and deduplicate by icao (keep new data - it comes last) + combined = pl.concat([base_df, compressed_df]) + print(f"After concat: {len(combined)} records") + + deduplicated = combined.unique(subset=["icao"], keep="last") + + print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup") + + del base_df, combined + gc.collect() + + return deduplicated + else: + print(f"No base release found at {base_path}, using only new data") + return compressed_df + except Exception as e: + import traceback + print(f"Failed to download base release: {e}") + traceback.print_exc() + return compressed_df + + +def cleanup_chunks(date_str: str, chunks_dir: str): + """Delete chunk parquet files after successful merge.""" + pattern = os.path.join(chunks_dir, f"chunk_*_{date_str}.parquet") + chunk_files = glob.glob(pattern) + for f in chunk_files: + try: + os.remove(f) + print(f"Deleted {f}") + except Exception as e: + print(f"Failed to delete {f}: {e}") + + +def main(): + parser = argparse.ArgumentParser(description="Combine chunk parquets to final CSV") + parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") + parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files") + parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release") + parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging") + args = parser.parse_args() + + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + + date_str = target_day.strftime("%Y-%m-%d") + chunks_dir = args.chunks_dir + + print(f"Combining chunks for {date_str}") + print(f"Chunks directory: {chunks_dir}") + print(f"Resource usage at start: {get_resource_usage()}") + + # Find chunk files + pattern = os.path.join(chunks_dir, f"chunk_*_{date_str}.parquet") + chunk_files = sorted(glob.glob(pattern)) + + if not chunk_files: + print(f"No chunk files found matching: {pattern}") + sys.exit(1) + + print(f"Found {len(chunk_files)} chunk files") + + # Process each chunk separately to save memory + compressed_chunks = [] + for chunk_path in chunk_files: + compressed = process_single_chunk(chunk_path) + compressed_chunks.append(compressed) + gc.collect() + + # Combine all compressed chunks + combined = combine_compressed_chunks(compressed_chunks) + + # Free memory from individual chunks + del compressed_chunks + gc.collect() + print(f"After combining: {get_resource_usage()}") + + # Merge with base release + if not args.skip_base: + combined = download_and_merge_base_release(combined) + + # Convert list columns to strings for CSV compatibility + for col in combined.columns: + if combined[col].dtype == pl.List: + combined = combined.with_columns( + pl.col(col).list.join(",").alias(col) + ) + + # Sort by time for consistent output + if 'time' in combined.columns: + combined = combined.sort('time') + + # Write final CSV + output_path = os.path.join(FINAL_OUTPUT_DIR, f"planequery_aircraft_adsb_{date_str}.csv") + combined.write_csv(output_path) + print(f"Wrote {len(combined)} records to {output_path}") + + # Cleanup + if not args.keep_chunks: + cleanup_chunks(date_str, chunks_dir) + + print(f"Done! | {get_resource_usage()}") + + +if __name__ == "__main__": + main() diff --git a/src/adsb/compress_adsb_to_aircraft_data.py b/src/adsb/compress_adsb_to_aircraft_data.py index d191c47..5ae58cd 100644 --- a/src/adsb/compress_adsb_to_aircraft_data.py +++ b/src/adsb/compress_adsb_to_aircraft_data.py @@ -1,32 +1,67 @@ -# SOME KIND OF MAP REDUCE SYSTEM +# Shared compression logic for ADS-B aircraft data import os +import polars as pl COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't'] -def compress_df(df): - icao = df.name - df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1) + + +def deduplicate_by_signature(df: pl.DataFrame) -> pl.DataFrame: + """For each icao, keep only the earliest row with each unique signature. - # Compute signature counts before grouping (avoid copy) - signature_counts = df["_signature"].value_counts() + This is used for deduplicating across multiple compressed chunks. + """ + # Create signature column + df = df.with_columns( + pl.concat_str([pl.col(c).cast(pl.Utf8).fill_null("") for c in COLUMNS], separator="|").alias("_signature") + ) + # Group by icao and signature, take first row (earliest due to time sort) + df = df.sort("time") + df_deduped = df.group_by(["icao", "_signature"]).first() + df_deduped = df_deduped.drop("_signature") + df_deduped = df_deduped.sort("time") + return df_deduped + + +def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame: + """Compress a single ICAO group to its most informative row using Polars.""" + # Create signature string + df = df.with_columns( + pl.concat_str([pl.col(c).cast(pl.Utf8) for c in COLUMNS], separator="|").alias("_signature") + ) - df = df.groupby("_signature", as_index=False).first() # check if it works with both last and first. - # For each row, create a dict of non-empty column values. This is using sets and subsets... - def get_non_empty_dict(row): - return {col: row[col] for col in COLUMNS if row[col] != ''} + # Compute signature counts + signature_counts = df.group_by("_signature").len().rename({"len": "_sig_count"}) - df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1) - df['_non_empty_count'] = df['_non_empty_dict'].apply(len) + # Group by signature and take first row + df = df.group_by("_signature").first() + + if df.height == 1: + # Only one unique signature, return it + result = df.drop("_signature").with_columns(pl.lit(icao).alias("icao")) + return result + + # For each row, create dict of non-empty column values and check subsets + # Convert to list of dicts for subset checking (same logic as pandas version) + rows_data = [] + for row in df.iter_rows(named=True): + non_empty = {col: row[col] for col in COLUMNS if row[col] != '' and row[col] is not None} + rows_data.append({ + 'signature': row['_signature'], + 'non_empty_dict': non_empty, + 'non_empty_count': len(non_empty), + 'row_data': row + }) # Check if row i's non-empty values are a subset of row j's non-empty values def is_subset_of_any(idx): - row_dict = df.loc[idx, '_non_empty_dict'] - row_count = df.loc[idx, '_non_empty_count'] + row_dict = rows_data[idx]['non_empty_dict'] + row_count = rows_data[idx]['non_empty_count'] - for other_idx in df.index: + for other_idx, other_data in enumerate(rows_data): if idx == other_idx: continue - other_dict = df.loc[other_idx, '_non_empty_dict'] - other_count = df.loc[other_idx, '_non_empty_count'] + other_dict = other_data['non_empty_dict'] + other_count = other_data['non_empty_count'] # Check if all non-empty values in current row match those in other row if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()): @@ -36,32 +71,94 @@ def compress_df(df): return False # Keep rows that are not subsets of any other row - keep_mask = ~df.index.to_series().apply(is_subset_of_any) - df = df[keep_mask] + keep_indices = [i for i in range(len(rows_data)) if not is_subset_of_any(i)] + + if len(keep_indices) == 0: + keep_indices = [0] # Fallback: keep first row + + remaining_signatures = [rows_data[i]['signature'] for i in keep_indices] + df = df.filter(pl.col("_signature").is_in(remaining_signatures)) + + if df.height > 1: + # Use signature counts to pick the most frequent one + df = df.join(signature_counts, on="_signature", how="left") + max_count = df["_sig_count"].max() + df = df.filter(pl.col("_sig_count") == max_count).head(1) + df = df.drop("_sig_count") + + result = df.drop("_signature").with_columns(pl.lit(icao).alias("icao")) + + # Ensure empty strings are preserved + for col in COLUMNS: + if col in result.columns: + result = result.with_columns(pl.col(col).fill_null("")) + + return result - if len(df) > 1: - # Use pre-computed signature counts instead of original_df - remaining_sigs = df['_signature'] - sig_counts = signature_counts[remaining_sigs] - max_signature = sig_counts.idxmax() - df = df[df['_signature'] == max_signature] - df['icao'] = icao - df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature']) - # Ensure empty strings are preserved, not NaN - df[COLUMNS] = df[COLUMNS].fillna('') - return df +def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame: + """Compress a DataFrame with multiple ICAOs to one row per ICAO. + + This is the main entry point for compressing ADS-B data. + Used by both daily GitHub Actions runs and historical AWS runs. + + Args: + df: DataFrame with columns ['time', 'icao'] + COLUMNS + verbose: Whether to print progress + + Returns: + Compressed DataFrame with one row per ICAO + """ + if df.height == 0: + return df + + # Sort by icao and time + df = df.sort(['icao', 'time']) + + # Fill null values with empty strings for COLUMNS + for col in COLUMNS: + if col in df.columns: + df = df.with_columns(pl.col(col).cast(pl.Utf8).fill_null("")) + + # First pass: quick deduplication of exact duplicates + df = df.unique(subset=['icao'] + COLUMNS, keep='first') + if verbose: + print(f"After quick dedup: {df.height} records") + + # Second pass: sophisticated compression per ICAO + if verbose: + print("Compressing per ICAO...") + + # Process each ICAO group + icao_groups = df.partition_by('icao', as_dict=True, maintain_order=True) + compressed_dfs = [] + + for icao_key, group_df in icao_groups.items(): + # partition_by with as_dict=True returns tuple keys, extract first element + icao = icao_key[0] if isinstance(icao_key, tuple) else icao_key + compressed = compress_df_polars(group_df, str(icao)) + compressed_dfs.append(compressed) + + if compressed_dfs: + df_compressed = pl.concat(compressed_dfs) + else: + df_compressed = df.head(0) # Empty with same schema + + if verbose: + print(f"After compress: {df_compressed.height} records") + + # Reorder columns: time first, then icao + cols = df_compressed.columns + ordered_cols = ['time', 'icao'] + [c for c in cols if c not in ['time', 'icao']] + df_compressed = df_compressed.select(ordered_cols) + + return df_compressed -# names of releases something like -# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz - -# Let's build historical first. def load_raw_adsb_for_day(day): """Load raw ADS-B data for a day from parquet file.""" from datetime import timedelta from pathlib import Path - import pandas as pd start_time = day.replace(hour=0, minute=0, second=0, microsecond=0) @@ -84,67 +181,72 @@ def load_raw_adsb_for_day(day): if parquet_file.exists(): print(f" Loading from parquet: {parquet_file}") - df = pd.read_parquet( + df = pl.read_parquet( parquet_file, columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category'] ) # Convert to timezone-naive datetime - df['time'] = df['time'].dt.tz_localize(None) + if df["time"].dtype == pl.Datetime: + df = df.with_columns(pl.col("time").dt.replace_time_zone(None)) + return df else: # Return empty DataFrame if parquet file doesn't exist print(f" No data available for {start_time.strftime('%Y-%m-%d')}") - import pandas as pd - return pd.DataFrame(columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category']) + return pl.DataFrame(schema={ + 'time': pl.Datetime, + 'icao': pl.Utf8, + 'r': pl.Utf8, + 't': pl.Utf8, + 'dbFlags': pl.Int64, + 'ownOp': pl.Utf8, + 'year': pl.Int64, + 'desc': pl.Utf8, + 'aircraft_category': pl.Utf8 + }) + def load_historical_for_day(day): - from pathlib import Path - import pandas as pd + """Load and compress historical ADS-B data for a day.""" df = load_raw_adsb_for_day(day) - if df.empty: + if df.height == 0: return df - print(f"Loaded {len(df)} raw records for {day.strftime('%Y-%m-%d')}") - df = df.sort_values(['icao', 'time']) - print("done sort") - df[COLUMNS] = df[COLUMNS].fillna('') - # First pass: quick deduplication of exact duplicates - df = df.drop_duplicates(subset=['icao'] + COLUMNS, keep='first') - print(f"After quick dedup: {len(df)} records") + print(f"Loaded {df.height} raw records for {day.strftime('%Y-%m-%d')}") - # Second pass: sophisticated compression per ICAO - print("Compressing per ICAO...") - df_compressed = df.groupby('icao', group_keys=False).apply(compress_df) - print(f"After compress: {len(df_compressed)} records") - - cols = df_compressed.columns.tolist() - cols.remove('time') - cols.insert(0, 'time') - cols.remove("icao") - cols.insert(1, "icao") - df_compressed = df_compressed[cols] - return df_compressed + # Use shared compression function + return compress_multi_icao_df(df, verbose=True) def concat_compressed_dfs(df_base, df_new): """Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.""" - import pandas as pd - # Combine both dataframes - df_combined = pd.concat([df_base, df_new], ignore_index=True) + df_combined = pl.concat([df_base, df_new]) # Sort by ICAO and time - df_combined = df_combined.sort_values(['icao', 'time']) + df_combined = df_combined.sort(['icao', 'time']) - # Fill NaN values - df_combined[COLUMNS] = df_combined[COLUMNS].fillna('') + # Fill null values + for col in COLUMNS: + if col in df_combined.columns: + df_combined = df_combined.with_columns(pl.col(col).fill_null("")) # Apply compression logic per ICAO to get the best row - df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df) + icao_groups = df_combined.partition_by('icao', as_dict=True, maintain_order=True) + compressed_dfs = [] + + for icao, group_df in icao_groups.items(): + compressed = compress_df_polars(group_df, icao) + compressed_dfs.append(compressed) + + if compressed_dfs: + df_compressed = pl.concat(compressed_dfs) + else: + df_compressed = df_combined.head(0) # Sort by time - df_compressed = df_compressed.sort_values('time') + df_compressed = df_compressed.sort('time') return df_compressed @@ -152,13 +254,15 @@ def concat_compressed_dfs(df_base, df_new): def get_latest_aircraft_adsb_csv_df(): """Download and load the latest ADS-B CSV from GitHub releases.""" from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv - - import pandas as pd import re csv_path = download_latest_aircraft_adsb_csv() - df = pd.read_csv(csv_path) - df = df.fillna("") + df = pl.read_csv(csv_path, null_values=[""]) + + # Fill nulls with empty strings + for col in df.columns: + if df[col].dtype == pl.Utf8: + df = df.with_columns(pl.col(col).fill_null("")) # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) diff --git a/src/adsb/download_adsb_data_to_parquet.py b/src/adsb/download_adsb_data_to_parquet.py index deb1c82..4be76d6 100644 --- a/src/adsb/download_adsb_data_to_parquet.py +++ b/src/adsb/download_adsb_data_to_parquet.py @@ -11,6 +11,8 @@ This file is self-contained and does not import from other project modules. import gc import glob import gzip +import resource +import shutil import sys import logging import time @@ -22,10 +24,10 @@ import os import argparse import datetime as dt from datetime import datetime, timedelta, timezone +import urllib.request +import urllib.error -import requests import orjson -import pandas as pd import pyarrow as pa import pyarrow.parquet as pq @@ -44,6 +46,24 @@ TOKEN = os.environ.get('GITHUB_TOKEN') # Optional: for higher GitHub API rate l HEADERS = {"Authorization": f"token {TOKEN}"} if TOKEN else {} +def get_resource_usage() -> str: + """Get current RAM and disk usage as a formatted string.""" + # RAM usage (RSS = Resident Set Size) + ram_bytes = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + # On macOS, ru_maxrss is in bytes; on Linux, it's in KB + if sys.platform == 'darwin': + ram_gb = ram_bytes / (1024**3) + else: + ram_gb = ram_bytes / (1024**2) # Convert KB to GB + + # Disk usage + disk = shutil.disk_usage('.') + disk_free_gb = disk.free / (1024**3) + disk_total_gb = disk.total / (1024**3) + + return f"RAM: {ram_gb:.2f}GB | Disk: {disk_free_gb:.1f}GB free / {disk_total_gb:.1f}GB total" + + # ============================================================================ # GitHub Release Fetching and Downloading # ============================================================================ @@ -72,17 +92,19 @@ def fetch_releases(version_date: str) -> list: for attempt in range(1, max_retries + 1): try: - response = requests.get(f"{BASE_URL}?page={page}", headers=HEADERS) - if response.status_code == 200: - break - else: - print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status_code} {response.reason}") - if attempt < max_retries: - print(f"Waiting {retry_delay} seconds before retry...") - time.sleep(retry_delay) + req = urllib.request.Request(f"{BASE_URL}?page={page}", headers=HEADERS) + with urllib.request.urlopen(req) as response: + if response.status == 200: + data = orjson.loads(response.read()) + break else: - print(f"Giving up after {max_retries} attempts") - return releases + print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status} {response.reason}") + if attempt < max_retries: + print(f"Waiting {retry_delay} seconds before retry...") + time.sleep(retry_delay) + else: + print(f"Giving up after {max_retries} attempts") + return releases except Exception as e: print(f"Request exception (attempt {attempt}/{max_retries}): {e}") if attempt < max_retries: @@ -91,8 +113,6 @@ def fetch_releases(version_date: str) -> list: else: print(f"Giving up after {max_retries} attempts") return releases - - data = response.json() if not data: break for release in data: @@ -115,18 +135,22 @@ def download_asset(asset_url: str, file_path: str) -> bool: signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(40) # 40-second timeout - response = requests.get(asset_url, headers=HEADERS, stream=True) - signal.alarm(0) - - if response.status_code == 200: - with open(file_path, "wb") as file: - for chunk in response.iter_content(chunk_size=8192): - file.write(chunk) - print(f"Saved {file_path}") - return True - else: - print(f"Failed to download {asset_url}: {response.status_code} {response.reason}") - return False + req = urllib.request.Request(asset_url, headers=HEADERS) + with urllib.request.urlopen(req) as response: + signal.alarm(0) + + if response.status == 200: + with open(file_path, "wb") as file: + while True: + chunk = response.read(8192) + if not chunk: + break + file.write(chunk) + print(f"Saved {file_path}") + return True + else: + print(f"Failed to download {asset_url}: {response.status} {response.msg}") + return False except DownloadTimeoutException as e: print(f"Download aborted for {asset_url}: {e}") return False @@ -139,6 +163,7 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool: """ Extracts a split archive by concatenating the parts using 'cat' and then extracting with 'tar' in one pipeline. + Deletes the tar files immediately after extraction to save disk space. """ if os.path.isdir(extract_dir): print(f"[SKIP] Extraction directory already exists: {extract_dir}") @@ -176,6 +201,20 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool: cat_proc.wait() print(f"Successfully extracted archive to {extract_dir}") + + # Delete tar files immediately after extraction + for tar_file in file_paths: + try: + os.remove(tar_file) + print(f"Deleted tar file: {tar_file}") + except Exception as e: + print(f"Failed to delete {tar_file}: {e}") + + # Check disk usage after deletion + disk = shutil.disk_usage('.') + free_gb = disk.free / (1024**3) + print(f"Disk space after tar deletion: {free_gb:.1f}GB free") + return True except subprocess.CalledProcessError as e: print(f"Failed to extract split archive: {e}") @@ -309,7 +348,7 @@ def process_file(filepath: str) -> list: insert_rows.append(inserted_row) if insert_rows: - print(f"Got {len(insert_rows)} rows from {filepath}") + # print(f"Got {len(insert_rows)} rows from {filepath}") return insert_rows else: return [] @@ -342,8 +381,8 @@ COLUMNS = [ OS_CPU_COUNT = os.cpu_count() or 1 MAX_WORKERS = OS_CPU_COUNT if OS_CPU_COUNT > 4 else 1 -CHUNK_SIZE = MAX_WORKERS * 1000 -BATCH_SIZE = (os.cpu_count() or 1) * 100000 +CHUNK_SIZE = MAX_WORKERS * 500 # Reduced for lower RAM usage +BATCH_SIZE = 250_000 # Fixed size for predictable memory usage (~500MB per batch) # PyArrow schema for efficient Parquet writing PARQUET_SCHEMA = pa.schema([ @@ -448,10 +487,18 @@ def safe_process(fp): return [] -def rows_to_dataframe(rows: list) -> pd.DataFrame: - """Convert list of rows to a pandas DataFrame.""" - df = pd.DataFrame(rows, columns=COLUMNS) - return df +def rows_to_arrow_table(rows: list) -> pa.Table: + """Convert list of rows to a PyArrow Table directly (no pandas).""" + # Transpose rows into columns + columns = list(zip(*rows)) + + # Build arrays for each column according to schema + arrays = [] + for i, field in enumerate(PARQUET_SCHEMA): + col_data = list(columns[i]) if i < len(columns) else [None] * len(rows) + arrays.append(pa.array(col_data, type=field.type)) + + return pa.Table.from_arrays(arrays, schema=PARQUET_SCHEMA) def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int): @@ -459,23 +506,17 @@ def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int): if not rows: return - df = rows_to_dataframe(rows) - - # Ensure datetime column is timezone-aware - if not df['time'].dt.tz: - df['time'] = df['time'].dt.tz_localize('UTC') + table = rows_to_arrow_table(rows) parquet_path = os.path.join(PARQUET_DIR, f"{version_date}_batch_{batch_idx:04d}.parquet") - # Convert to PyArrow table and write - table = pa.Table.from_pandas(df, schema=PARQUET_SCHEMA, preserve_index=False) pq.write_table(table, parquet_path, compression='snappy') - print(f"Written parquet batch {batch_idx} ({len(rows)} rows) to {parquet_path}") + print(f"Written parquet batch {batch_idx} ({len(rows)} rows) | {get_resource_usage()}") def merge_parquet_files(version_date: str, delete_batches: bool = True): - """Merge all batch parquet files for a version_date into a single file.""" + """Merge all batch parquet files for a version_date into a single file using streaming.""" pattern = os.path.join(PARQUET_DIR, f"{version_date}_batch_*.parquet") batch_files = sorted(glob.glob(pattern)) @@ -483,28 +524,42 @@ def merge_parquet_files(version_date: str, delete_batches: bool = True): print(f"No batch files found for {version_date}") return None - print(f"Merging {len(batch_files)} batch files for {version_date}...") + print(f"Merging {len(batch_files)} batch files for {version_date} (streaming)...") - # Read all batch files - tables = [] - for f in batch_files: - tables.append(pq.read_table(f)) - - # Concatenate all tables - merged_table = pa.concat_tables(tables) - - # Write merged file merged_path = os.path.join(PARQUET_DIR, f"{version_date}.parquet") - pq.write_table(merged_table, merged_path, compression='snappy') + total_rows = 0 - print(f"Merged parquet file written to {merged_path} ({merged_table.num_rows} total rows)") + # Stream write: read one batch at a time to minimize RAM usage + writer = None + try: + for i, f in enumerate(batch_files): + table = pq.read_table(f) + total_rows += table.num_rows + + if writer is None: + writer = pq.ParquetWriter(merged_path, table.schema, compression='snappy') + + writer.write_table(table) + + # Delete batch file immediately after reading to free disk space + if delete_batches: + os.remove(f) + + # Free memory + del table + if (i + 1) % 10 == 0: + gc.collect() + print(f" Merged {i + 1}/{len(batch_files)} batches... | {get_resource_usage()}") + finally: + if writer is not None: + writer.close() + + print(f"Merged parquet file written to {merged_path} ({total_rows} total rows) | {get_resource_usage()}") - # Optionally delete batch files if delete_batches: - for f in batch_files: - os.remove(f) - print(f"Deleted {len(batch_files)} batch files") + print(f"Deleted {len(batch_files)} batch files during merge") + gc.collect() return merged_path @@ -608,15 +663,15 @@ def process_version_date(version_date: str, keep_folders: bool = False): print(f"Total rows processed for version_date {version_date}: {total_num_rows}") + # Clean up extracted directory immediately after processing (before merging parquet files) + if not keep_folders and os.path.isdir(extract_dir): + print(f"Deleting extraction directory with 100,000+ files: {extract_dir}") + shutil.rmtree(extract_dir) + print(f"Successfully deleted extraction directory: {extract_dir} | {get_resource_usage()}") + # Merge batch files into a single parquet file merge_parquet_files(version_date, delete_batches=True) - # Clean up extracted directory if not keeping - if not keep_folders and os.path.isdir(extract_dir): - import shutil - shutil.rmtree(extract_dir) - print(f"Cleaned up extraction directory: {extract_dir}") - return total_num_rows diff --git a/src/adsb/download_and_list_icaos.py b/src/adsb/download_and_list_icaos.py new file mode 100644 index 0000000..b058b5d --- /dev/null +++ b/src/adsb/download_and_list_icaos.py @@ -0,0 +1,148 @@ +""" +Downloads and extracts adsb.lol tar files, then lists all ICAO folders. +This is the first step of the map-reduce pipeline. + +Outputs: +- Extracted trace files in data/output/{version_date}-planes-readsb-prod-0.tar_0/ +- ICAO manifest at data/output/icao_manifest_{date}.txt +""" +import os +import sys +import argparse +import glob +import subprocess +from datetime import datetime, timedelta + +# Re-use download/extract functions from download_adsb_data_to_parquet +from src.adsb.download_adsb_data_to_parquet import ( + OUTPUT_DIR, + fetch_releases, + download_asset, + extract_split_archive, + collect_trace_files_with_find, +) + + +def get_target_day() -> datetime: + """Get yesterday's date (the day we're processing).""" + # return datetime.utcnow() - timedelta(days=1) + return datetime.utcnow() - timedelta(days=1) + + +def download_and_extract(version_date: str) -> str | None: + """Download and extract tar files, return extract directory path.""" + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + + # Check if already extracted + if os.path.isdir(extract_dir): + print(f"[SKIP] Already extracted: {extract_dir}") + return extract_dir + + # Check for existing tar files + pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*") + matches = [p for p in glob.glob(pattern) if os.path.isfile(p)] + + if matches: + print(f"Found existing tar files for {version_date}") + normal_matches = [ + p for p in matches + if "-planes-readsb-prod-0." in os.path.basename(p) + and "tmp" not in os.path.basename(p) + ] + downloaded_files = normal_matches if normal_matches else matches + else: + # Download from GitHub + print(f"Downloading releases for {version_date}...") + releases = fetch_releases(version_date) + if not releases: + print(f"No releases found for {version_date}") + return None + + downloaded_files = [] + for release in releases: + tag_name = release["tag_name"] + print(f"Processing release: {tag_name}") + + assets = release.get("assets", []) + normal_assets = [ + a for a in assets + if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"] + ] + tmp_assets = [ + a for a in assets + if "planes-readsb-prod-0tmp" in a["name"] + ] + use_assets = normal_assets if normal_assets else tmp_assets + + for asset in use_assets: + asset_name = asset["name"] + asset_url = asset["browser_download_url"] + file_path = os.path.join(OUTPUT_DIR, asset_name) + if download_asset(asset_url, file_path): + downloaded_files.append(file_path) + + if not downloaded_files: + print(f"No files downloaded for {version_date}") + return None + + # Extract + if extract_split_archive(downloaded_files, extract_dir): + return extract_dir + return None + + +def list_icao_folders(extract_dir: str) -> list[str]: + """List all ICAO folder names from extracted directory.""" + trace_files = collect_trace_files_with_find(extract_dir) + icaos = sorted(trace_files.keys()) + print(f"Found {len(icaos)} unique ICAOs") + return icaos + + +def write_manifest(icaos: list[str], date_str: str) -> str: + """Write ICAO list to manifest file.""" + manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{date_str}.txt") + with open(manifest_path, "w") as f: + for icao in icaos: + f.write(f"{icao}\n") + print(f"Wrote manifest with {len(icaos)} ICAOs to {manifest_path}") + return manifest_path + + +def main(): + parser = argparse.ArgumentParser(description="Download and list ICAOs from adsb.lol data") + parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") + args = parser.parse_args() + + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + + date_str = target_day.strftime("%Y-%m-%d") + version_date = f"v{target_day.strftime('%Y.%m.%d')}" + + print(f"Processing date: {date_str} (version: {version_date})") + + # Download and extract + extract_dir = download_and_extract(version_date) + if not extract_dir: + print("Failed to download/extract data") + sys.exit(1) + + # List ICAOs + icaos = list_icao_folders(extract_dir) + if not icaos: + print("No ICAOs found") + sys.exit(1) + + # Write manifest + manifest_path = write_manifest(icaos, date_str) + + print(f"\nDone! Extract dir: {extract_dir}") + print(f"Manifest: {manifest_path}") + print(f"Total ICAOs: {len(icaos)}") + + +if __name__ == "__main__": + main() diff --git a/src/adsb/process_icao_chunk.py b/src/adsb/process_icao_chunk.py new file mode 100644 index 0000000..9092088 --- /dev/null +++ b/src/adsb/process_icao_chunk.py @@ -0,0 +1,270 @@ +""" +Processes a chunk of ICAOs from pre-extracted trace files. +This is the map phase of the map-reduce pipeline. + +Expects extract_dir to already exist with trace files. +Reads ICAO manifest to determine which ICAOs to process based on chunk-id. + +Usage: + python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4 +""" +import gc +import os +import sys +import argparse +import time +import concurrent.futures +from datetime import datetime, timedelta + +import pyarrow as pa +import pyarrow.parquet as pq + +from src.adsb.download_adsb_data_to_parquet import ( + OUTPUT_DIR, + PARQUET_DIR, + PARQUET_SCHEMA, + COLUMNS, + MAX_WORKERS, + process_file, + get_resource_usage, + collect_trace_files_with_find, +) + + +CHUNK_OUTPUT_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks") +os.makedirs(CHUNK_OUTPUT_DIR, exist_ok=True) + +# Smaller batch size for memory efficiency +BATCH_SIZE = 100_000 + + +def get_target_day() -> datetime: + """Get yesterday's date (the day we're processing).""" + return datetime.utcnow() - timedelta(days=1) + + +def read_manifest(date_str: str) -> list[str]: + """Read ICAO manifest file.""" + manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{date_str}.txt") + if not os.path.exists(manifest_path): + raise FileNotFoundError(f"Manifest not found: {manifest_path}") + + with open(manifest_path, "r") as f: + icaos = [line.strip() for line in f if line.strip()] + return icaos + + +def deterministic_hash(s: str) -> int: + """Return a deterministic hash for a string (unlike Python's hash() which is randomized).""" + # Use sum of byte values - simple but deterministic + return sum(ord(c) for c in s) + + +def get_chunk_icaos(icaos: list[str], chunk_id: int, total_chunks: int) -> list[str]: + """Get the subset of ICAOs for this chunk based on deterministic hash partitioning.""" + return [icao for icao in icaos if deterministic_hash(icao) % total_chunks == chunk_id] + + +def build_trace_file_map(extract_dir: str) -> dict[str, str]: + """Build a map of ICAO -> trace file path using find command.""" + print(f"Building trace file map from {extract_dir}...") + + # Debug: check what's in extract_dir + if os.path.isdir(extract_dir): + items = os.listdir(extract_dir)[:10] + print(f"First 10 items in extract_dir: {items}") + # Check if there are subdirectories + for item in items[:3]: + subpath = os.path.join(extract_dir, item) + if os.path.isdir(subpath): + subitems = os.listdir(subpath)[:5] + print(f" Contents of {item}/: {subitems}") + + trace_map = collect_trace_files_with_find(extract_dir) + print(f"Found {len(trace_map)} trace files") + + if len(trace_map) == 0: + # Debug: try manual find + import subprocess + result = subprocess.run( + ['find', extract_dir, '-type', 'f', '-name', 'trace_full_*'], + capture_output=True, text=True + ) + print(f"Manual find output (first 500 chars): {result.stdout[:500]}") + print(f"Manual find stderr: {result.stderr[:200]}") + + return trace_map + + +def safe_process(filepath: str) -> list: + """Safely process a file, returning empty list on error.""" + try: + return process_file(filepath) + except Exception as e: + print(f"Error processing {filepath}: {e}") + return [] + + +def rows_to_table(rows: list) -> pa.Table: + """Convert list of rows to PyArrow table.""" + import pandas as pd + df = pd.DataFrame(rows, columns=COLUMNS) + if not df['time'].dt.tz: + df['time'] = df['time'].dt.tz_localize('UTC') + return pa.Table.from_pandas(df, schema=PARQUET_SCHEMA, preserve_index=False) + + +def process_chunk( + chunk_id: int, + total_chunks: int, + trace_map: dict[str, str], + icaos: list[str], + date_str: str, +) -> str | None: + """Process a chunk of ICAOs and write to parquet.""" + chunk_icaos = get_chunk_icaos(icaos, chunk_id, total_chunks) + print(f"Chunk {chunk_id}/{total_chunks}: Processing {len(chunk_icaos)} ICAOs") + + if not chunk_icaos: + print(f"Chunk {chunk_id}: No ICAOs to process") + return None + + # Get trace file paths from the map + trace_files = [] + for icao in chunk_icaos: + if icao in trace_map: + trace_files.append(trace_map[icao]) + + print(f"Chunk {chunk_id}: Found {len(trace_files)} trace files") + + if not trace_files: + print(f"Chunk {chunk_id}: No trace files found") + return None + + # Process files and write parquet in batches + output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{date_str}.parquet") + + start_time = time.perf_counter() + total_rows = 0 + batch_rows = [] + writer = None + + try: + # Process in parallel batches + files_per_batch = MAX_WORKERS * 100 + for offset in range(0, len(trace_files), files_per_batch): + batch_files = trace_files[offset:offset + files_per_batch] + + with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor: + for rows in executor.map(safe_process, batch_files): + if rows: + batch_rows.extend(rows) + + # Write when batch is full + if len(batch_rows) >= BATCH_SIZE: + table = rows_to_table(batch_rows) + total_rows += len(batch_rows) + + if writer is None: + writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') + writer.write_table(table) + + batch_rows = [] + del table + gc.collect() + + elapsed = time.perf_counter() - start_time + print(f"Chunk {chunk_id}: {total_rows} rows, {elapsed:.1f}s | {get_resource_usage()}") + + gc.collect() + + # Write remaining rows + if batch_rows: + table = rows_to_table(batch_rows) + total_rows += len(batch_rows) + + if writer is None: + writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy') + writer.write_table(table) + del table + + finally: + if writer: + writer.close() + + elapsed = time.perf_counter() - start_time + print(f"Chunk {chunk_id}: Done! {total_rows} rows in {elapsed:.1f}s | {get_resource_usage()}") + + if total_rows > 0: + return output_path + return None + + +def main(): + parser = argparse.ArgumentParser(description="Process a chunk of ICAOs") + parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)") + parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks") + parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)") + args = parser.parse_args() + + if args.date: + target_day = datetime.strptime(args.date, "%Y-%m-%d") + else: + target_day = get_target_day() + + date_str = target_day.strftime("%Y-%m-%d") + version_date = f"v{target_day.strftime('%Y.%m.%d')}" + + print(f"Processing chunk {args.chunk_id}/{args.total_chunks} for {date_str}") + print(f"OUTPUT_DIR: {OUTPUT_DIR}") + print(f"CHUNK_OUTPUT_DIR: {CHUNK_OUTPUT_DIR}") + print(f"Resource usage at start: {get_resource_usage()}") + + # Debug: List what's in OUTPUT_DIR + print(f"\nContents of {OUTPUT_DIR}:") + if os.path.isdir(OUTPUT_DIR): + for item in os.listdir(OUTPUT_DIR)[:20]: + print(f" - {item}") + else: + print(f" Directory does not exist!") + + # Find extract directory + extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") + print(f"\nLooking for extract_dir: {extract_dir}") + if not os.path.isdir(extract_dir): + print(f"Extract directory not found: {extract_dir}") + # Try to find any extracted directory + import glob + pattern = os.path.join(OUTPUT_DIR, "*-planes-readsb-prod-0*") + matches = glob.glob(pattern) + print(f"Searching for pattern: {pattern}") + print(f"Found matches: {matches}") + sys.exit(1) + + # Build trace file map using find + trace_map = build_trace_file_map(extract_dir) + if not trace_map: + print("No trace files found in extract directory") + sys.exit(1) + + # Read manifest + icaos = read_manifest(date_str) + print(f"Total ICAOs in manifest: {len(icaos)}") + + # Process chunk + output_path = process_chunk( + args.chunk_id, + args.total_chunks, + trace_map, + icaos, + date_str, + ) + + if output_path: + print(f"Output: {output_path}") + else: + print("No output generated") + + +if __name__ == "__main__": + main() diff --git a/src/adsb/reducer.py b/src/adsb/reducer.py index 1bdd85c..3ad7f40 100644 --- a/src/adsb/reducer.py +++ b/src/adsb/reducer.py @@ -8,23 +8,15 @@ Environment variables: GLOBAL_START_DATE — overall start date for output filename GLOBAL_END_DATE — overall end date for output filename """ +import gzip import os +import shutil from pathlib import Path import boto3 -import pandas as pd +import polars as pl - -COLUMNS = ["dbFlags", "ownOp", "year", "desc", "aircraft_category", "r", "t"] - - -def deduplicate_by_signature(df: pd.DataFrame) -> pd.DataFrame: - """For each icao, keep only the earliest row with each unique signature.""" - df["_signature"] = df[COLUMNS].astype(str).agg("|".join, axis=1) - df_deduped = df.groupby(["icao", "_signature"], as_index=False).first() - df_deduped = df_deduped.drop(columns=["_signature"]) - df_deduped = df_deduped.sort_values("time") - return df_deduped +from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature def main(): @@ -55,42 +47,50 @@ def main(): download_dir = Path("/tmp/chunks") download_dir.mkdir(parents=True, exist_ok=True) - df_accumulated = pd.DataFrame() + dfs = [] for key in chunk_keys: - local_path = download_dir / Path(key).name + gz_path = download_dir / Path(key).name + csv_path = gz_path.with_suffix("") # Remove .gz print(f"Downloading {key}...") - s3.download_file(s3_bucket, key, str(local_path)) + s3.download_file(s3_bucket, key, str(gz_path)) - df_chunk = pd.read_csv(local_path, compression="gzip", keep_default_na=False) - print(f" Loaded {len(df_chunk)} rows from {local_path.name}") + # Decompress + with gzip.open(gz_path, 'rb') as f_in: + with open(csv_path, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + gz_path.unlink() - if df_accumulated.empty: - df_accumulated = df_chunk - else: - df_accumulated = pd.concat( - [df_accumulated, df_chunk], ignore_index=True - ) + df_chunk = pl.read_csv(csv_path) + print(f" Loaded {df_chunk.height} rows from {csv_path.name}") + dfs.append(df_chunk) # Free disk space after loading - local_path.unlink() + csv_path.unlink() - print(f"Combined: {len(df_accumulated)} rows before dedup") + df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame() + print(f"Combined: {df_accumulated.height} rows before dedup") # Final global deduplication df_accumulated = deduplicate_by_signature(df_accumulated) - print(f"After dedup: {len(df_accumulated)} rows") + print(f"After dedup: {df_accumulated.height} rows") # Write and upload final result output_name = f"planequery_aircraft_adsb_{global_start}_{global_end}.csv.gz" - local_output = Path(f"/tmp/{output_name}") - df_accumulated.to_csv(local_output, index=False, compression="gzip") + csv_output = Path(f"/tmp/planequery_aircraft_adsb_{global_start}_{global_end}.csv") + gz_output = Path(f"/tmp/{output_name}") + + df_accumulated.write_csv(csv_output) + with open(csv_output, 'rb') as f_in: + with gzip.open(gz_output, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + csv_output.unlink() final_key = f"final/{output_name}" print(f"Uploading to s3://{s3_bucket}/{final_key}") - s3.upload_file(str(local_output), s3_bucket, final_key) + s3.upload_file(str(gz_output), s3_bucket, final_key) - print(f"Final output: {len(df_accumulated)} records -> {final_key}") + print(f"Final output: {df_accumulated.height} records -> {final_key}") if __name__ == "__main__": diff --git a/src/adsb/requirements.reducer.txt b/src/adsb/requirements.reducer.txt index 8aafff3..29e6bf9 100644 --- a/src/adsb/requirements.reducer.txt +++ b/src/adsb/requirements.reducer.txt @@ -1,2 +1,2 @@ -pandas>=2.0 +polars>=1.0 boto3>=1.34 diff --git a/src/adsb/requirements.worker.txt b/src/adsb/requirements.worker.txt index 9566c25..cf305a7 100644 --- a/src/adsb/requirements.worker.txt +++ b/src/adsb/requirements.worker.txt @@ -1,4 +1,5 @@ -pandas>=2.0 -clickhouse-connect>=0.7 +polars>=1.0 +pyarrow>=14.0 +orjson>=3.9 boto3>=1.34 zstandard>=0.22 diff --git a/src/adsb/worker.py b/src/adsb/worker.py index ad2b09b..9884ce7 100644 --- a/src/adsb/worker.py +++ b/src/adsb/worker.py @@ -13,18 +13,13 @@ from datetime import datetime, timedelta from pathlib import Path import boto3 -import pandas as pd +import polars as pl -from compress_adsb_to_aircraft_data import load_historical_for_day, COLUMNS - - -def deduplicate_by_signature(df: pd.DataFrame) -> pd.DataFrame: - """For each icao, keep only the earliest row with each unique signature.""" - df["_signature"] = df[COLUMNS].astype(str).agg("|".join, axis=1) - df_deduped = df.groupby(["icao", "_signature"], as_index=False).first() - df_deduped = df_deduped.drop(columns=["_signature"]) - df_deduped = df_deduped.sort_values("time") - return df_deduped +from compress_adsb_to_aircraft_data import ( + load_historical_for_day, + deduplicate_by_signature, + COLUMNS, +) def main(): @@ -39,28 +34,20 @@ def main(): total_days = (end_date - start_date).days print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})") - df_accumulated = pd.DataFrame() + dfs = [] current_date = start_date while current_date < end_date: day_str = current_date.strftime("%Y-%m-%d") print(f" Loading {day_str}...") - try: - df_compressed = load_historical_for_day(current_date) - except Exception as e: - print(f" WARNING: Failed to load {day_str}: {e}") - current_date += timedelta(days=1) - continue + df_compressed = load_historical_for_day(current_date) + if df_compressed.height == 0: + raise RuntimeError(f"No data found for {day_str}") - if df_accumulated.empty: - df_accumulated = df_compressed - else: - df_accumulated = pd.concat( - [df_accumulated, df_compressed], ignore_index=True - ) - - print(f" +{len(df_compressed)} rows (total: {len(df_accumulated)})") + dfs.append(df_compressed) + total_rows = sum(df.height for df in dfs) + print(f" +{df_compressed.height} rows (total: {total_rows})") # Delete local cache after each day to save disk in container cache_dir = Path("data/adsb") @@ -70,23 +57,31 @@ def main(): current_date += timedelta(days=1) - if df_accumulated.empty: - print("No data collected — exiting.") - return + # Concatenate all days + df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame() # Deduplicate within this chunk df_accumulated = deduplicate_by_signature(df_accumulated) - print(f"After dedup: {len(df_accumulated)} rows") + print(f"After dedup: {df_accumulated.height} rows") # Write to local file then upload to S3 - local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz") - df_accumulated.to_csv(local_path, index=False, compression="gzip") + local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv") + df_accumulated.write_csv(local_path) + + # Compress with gzip + import gzip + import shutil + gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz") + with open(local_path, 'rb') as f_in: + with gzip.open(gz_path, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + local_path.unlink() # Remove uncompressed file s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz" print(f"Uploading to s3://{s3_bucket}/{s3_key}") s3 = boto3.client("s3") - s3.upload_file(str(local_path), s3_bucket, s3_key) + s3.upload_file(str(gz_path), s3_bucket, s3_key) print("Done.") diff --git a/src/contributions/__init__.py b/src/contributions/__init__.py new file mode 100644 index 0000000..b630a86 --- /dev/null +++ b/src/contributions/__init__.py @@ -0,0 +1 @@ +"""Community contributions processing module.""" diff --git a/src/contributions/approve_submission.py b/src/contributions/approve_submission.py new file mode 100644 index 0000000..5d953a4 --- /dev/null +++ b/src/contributions/approve_submission.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +""" +Approve a community submission and create a PR. + +This script is called by the GitHub Actions workflow when the 'approved' +label is added to a validated submission issue. + +Usage: + python -m src.contributions.approve_submission --issue-number 123 --issue-body "..." --author "username" --author-id 12345 + +Environment variables: + GITHUB_TOKEN: GitHub API token with repo write permissions + GITHUB_REPOSITORY: owner/repo +""" +import argparse +import base64 +import json +import os +import sys +import urllib.request +import urllib.error +from datetime import datetime, timezone + +from .schema import extract_json_from_issue_body, extract_contributor_name_from_issue_body, parse_and_validate +from .contributor import ( + generate_contributor_uuid, + generate_submission_filename, + compute_content_hash, +) + + +def github_api_request( + method: str, + endpoint: str, + data: dict | None = None, + accept: str = "application/vnd.github.v3+json" +) -> dict: + """Make a GitHub API request.""" + token = os.environ.get("GITHUB_TOKEN") + repo = os.environ.get("GITHUB_REPOSITORY") + + if not token or not repo: + raise EnvironmentError("GITHUB_TOKEN and GITHUB_REPOSITORY must be set") + + url = f"https://api.github.com/repos/{repo}{endpoint}" + headers = { + "Authorization": f"token {token}", + "Accept": accept, + "Content-Type": "application/json", + } + + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, headers=headers, method=method) + + try: + with urllib.request.urlopen(req) as response: + return json.loads(response.read()) + except urllib.error.HTTPError as e: + error_body = e.read().decode() if e.fp else "" + print(f"GitHub API error: {e.code} {e.reason}: {error_body}", file=sys.stderr) + raise + + +def add_issue_comment(issue_number: int, body: str) -> None: + """Add a comment to a GitHub issue.""" + github_api_request("POST", f"/issues/{issue_number}/comments", {"body": body}) + + +def get_default_branch_sha() -> str: + """Get the SHA of the default branch (main).""" + ref = github_api_request("GET", "/git/ref/heads/main") + return ref["object"]["sha"] + + +def create_branch(branch_name: str, sha: str) -> None: + """Create a new branch from a SHA.""" + try: + github_api_request("POST", "/git/refs", { + "ref": f"refs/heads/{branch_name}", + "sha": sha, + }) + except urllib.error.HTTPError as e: + if e.code == 422: # Branch exists + # Delete and recreate + try: + github_api_request("DELETE", f"/git/refs/heads/{branch_name}") + except urllib.error.HTTPError: + pass + github_api_request("POST", "/git/refs", { + "ref": f"refs/heads/{branch_name}", + "sha": sha, + }) + else: + raise + + +def create_or_update_file(path: str, content: str, message: str, branch: str) -> None: + """Create or update a file in the repository.""" + content_b64 = base64.b64encode(content.encode()).decode() + github_api_request("PUT", f"/contents/{path}", { + "message": message, + "content": content_b64, + "branch": branch, + }) + + +def create_pull_request(title: str, head: str, base: str, body: str) -> dict: + """Create a pull request.""" + return github_api_request("POST", "/pulls", { + "title": title, + "head": head, + "base": base, + "body": body, + }) + + +def add_labels_to_issue(issue_number: int, labels: list[str]) -> None: + """Add labels to an issue or PR.""" + github_api_request("POST", f"/issues/{issue_number}/labels", {"labels": labels}) + + +def process_submission( + issue_number: int, + issue_body: str, + author_username: str, + author_id: int, +) -> bool: + """ + Process an approved submission and create a PR. + + Args: + issue_number: The GitHub issue number + issue_body: The issue body text + author_username: The GitHub username of the issue author + author_id: The numeric GitHub user ID + + Returns: + True if successful, False otherwise + """ + # Extract and validate JSON + json_str = extract_json_from_issue_body(issue_body) + if not json_str: + add_issue_comment(issue_number, "❌ Could not extract JSON from submission.") + return False + + data, errors = parse_and_validate(json_str) + if errors: + error_list = "\n".join(f"- {e}" for e in errors) + add_issue_comment(issue_number, f"❌ **Validation Failed**\n\n{error_list}") + return False + + # Normalize to list + submissions = data if isinstance(data, list) else [data] + + # Generate contributor UUID from GitHub ID + contributor_uuid = generate_contributor_uuid(author_id) + + # Extract contributor name from issue form (or default to GitHub username) + contributor_name = extract_contributor_name_from_issue_body(issue_body) + if not contributor_name: + contributor_name = f"@{author_username}" + + # Add metadata to each submission + now = datetime.now(timezone.utc) + date_str = now.strftime("%Y-%m-%d") + timestamp_str = now.isoformat() + + for submission in submissions: + submission["contributor_uuid"] = contributor_uuid + submission["contributor_name"] = contributor_name + submission["creation_timestamp"] = timestamp_str + + # Generate unique filename + content_json = json.dumps(submissions, indent=2, sort_keys=True) + content_hash = compute_content_hash(content_json) + filename = generate_submission_filename(author_username, date_str, content_hash) + file_path = f"community/{filename}" + + # Create branch + branch_name = f"community-submission-{issue_number}" + default_sha = get_default_branch_sha() + create_branch(branch_name, default_sha) + + # Create file + commit_message = f"Add community submission from @{author_username} (closes #{issue_number})" + create_or_update_file(file_path, content_json, commit_message, branch_name) + + # Create PR + pr_body = f"""## Community Submission + +Adds {len(submissions)} submission(s) from @{author_username}. + +**File:** `{file_path}` +**Contributor UUID:** `{contributor_uuid}` + +Closes #{issue_number} + +--- + +### Submissions +```json +{content_json} +```""" + + pr = create_pull_request( + title=f"Community submission: {filename}", + head=branch_name, + base="main", + body=pr_body, + ) + + # Add labels to PR + add_labels_to_issue(pr["number"], ["community", "auto-generated"]) + + # Comment on original issue + add_issue_comment( + issue_number, + f"✅ **Submission Approved**\n\n" + f"PR #{pr['number']} has been created to add your submission.\n\n" + f"**File:** `{file_path}`\n" + f"**Your Contributor UUID:** `{contributor_uuid}`\n\n" + f"The PR will be merged by a maintainer." + ) + + print(f"Created PR #{pr['number']} for submission") + return True + + +def main(): + parser = argparse.ArgumentParser(description="Approve community submission and create PR") + parser.add_argument("--issue-number", type=int, required=True, help="GitHub issue number") + parser.add_argument("--issue-body", required=True, help="Issue body text") + parser.add_argument("--author", required=True, help="Issue author username") + parser.add_argument("--author-id", type=int, required=True, help="Issue author numeric ID") + + args = parser.parse_args() + + success = process_submission( + issue_number=args.issue_number, + issue_body=args.issue_body, + author_username=args.author, + author_id=args.author_id, + ) + + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/src/contributions/contributor.py b/src/contributions/contributor.py new file mode 100644 index 0000000..7e7c10b --- /dev/null +++ b/src/contributions/contributor.py @@ -0,0 +1,86 @@ +"""Contributor identification utilities.""" +import hashlib +import uuid + + +# DNS namespace UUID for generating UUIDv5 +DNS_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') + + +def generate_contributor_uuid(github_user_id: int) -> str: + """ + Generate a deterministic UUID v5 from a GitHub user ID. + + This ensures the same GitHub account always gets the same contributor UUID. + + Args: + github_user_id: The numeric GitHub user ID + + Returns: + UUID string in standard format + """ + name = f"github:{github_user_id}" + return str(uuid.uuid5(DNS_NAMESPACE, name)) + + +def sanitize_username(username: str, max_length: int = 20) -> str: + """ + Sanitize a GitHub username for use in filenames. + + Args: + username: GitHub username + max_length: Maximum length of sanitized name + + Returns: + Lowercase alphanumeric string with underscores + """ + sanitized = "" + for char in username.lower(): + if char.isalnum(): + sanitized += char + else: + sanitized += "_" + + # Collapse multiple underscores + while "__" in sanitized: + sanitized = sanitized.replace("__", "_") + + return sanitized.strip("_")[:max_length] + + +def generate_submission_filename( + username: str, + date_str: str, + content_hash: str, + extension: str = ".json" +) -> str: + """ + Generate a unique filename for a community submission. + + Format: {sanitized_username}_{date}_{short_hash}.json + + Args: + username: GitHub username + date_str: Date in YYYY-MM-DD format + content_hash: Hash of the submission content (will be truncated to 8 chars) + extension: File extension (default: .json) + + Returns: + Unique filename string + """ + sanitized_name = sanitize_username(username) + short_hash = content_hash[:8] + return f"{sanitized_name}_{date_str}_{short_hash}{extension}" + + +def compute_content_hash(content: str) -> str: + """ + Compute SHA256 hash of content. + + Args: + content: String content to hash + + Returns: + Hex digest of SHA256 hash + """ + return hashlib.sha256(content.encode()).hexdigest() diff --git a/src/contributions/create_daily_community_release.py b/src/contributions/create_daily_community_release.py new file mode 100644 index 0000000..f709d24 --- /dev/null +++ b/src/contributions/create_daily_community_release.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +""" +Generate a daily CSV of all community contributions. + +Reads all JSON files from the community/ directory and outputs a sorted CSV +with creation_timestamp as the first column and contributor_name/contributor_uuid as the last columns. + +Usage: + python -m src.contributions.create_daily_community_release +""" +from datetime import datetime, timezone +from pathlib import Path +import json +import sys + +import pandas as pd + + +COMMUNITY_DIR = Path(__file__).parent.parent.parent / "community" +OUT_ROOT = Path("data/planequery_aircraft") + + +def read_all_submissions(community_dir: Path) -> list[dict]: + """Read all JSON submissions from the community directory.""" + all_submissions = [] + + for json_file in sorted(community_dir.glob("*.json")): + try: + with open(json_file) as f: + data = json.load(f) + + # Normalize to list + submissions = data if isinstance(data, list) else [data] + all_submissions.extend(submissions) + + except (json.JSONDecodeError, OSError) as e: + print(f"Warning: Failed to read {json_file}: {e}", file=sys.stderr) + + return all_submissions + + +def submissions_to_dataframe(submissions: list[dict]) -> pd.DataFrame: + """ + Convert submissions to a DataFrame with proper column ordering. + + Column order: + - creation_timestamp (first) + - transponder_code_hex + - registration_number + - planequery_airframe_id + - contributor_name + - [other columns alphabetically] + - contributor_uuid (last) + """ + if not submissions: + return pd.DataFrame() + + df = pd.DataFrame(submissions) + + # Ensure required columns exist + required_cols = [ + "creation_timestamp", + "transponder_code_hex", + "registration_number", + "planequery_airframe_id", + "contributor_name", + "contributor_uuid", + ] + for col in required_cols: + if col not in df.columns: + df[col] = None + + # Sort by creation_timestamp ascending + df = df.sort_values("creation_timestamp", ascending=True, na_position="last") + + # Reorder columns: specific order first, contributor_uuid last + first_cols = [ + "creation_timestamp", + "transponder_code_hex", + "registration_number", + "planequery_airframe_id", + "contributor_name", + ] + last_cols = ["contributor_uuid"] + + middle_cols = sorted([ + col for col in df.columns + if col not in first_cols and col not in last_cols + ]) + + ordered_cols = first_cols + middle_cols + last_cols + df = df[ordered_cols] + + return df.reset_index(drop=True) + + +def main(): + """Generate the daily community contributions CSV.""" + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + + print(f"Reading community submissions from {COMMUNITY_DIR}") + submissions = read_all_submissions(COMMUNITY_DIR) + + if not submissions: + print("No community submissions found.") + # Still create an empty CSV with headers + df = pd.DataFrame(columns=[ + "creation_timestamp", + "transponder_code_hex", + "registration_number", + "planequery_airframe_id", + "contributor_name", + "tags", + "contributor_uuid", + ]) + else: + print(f"Found {len(submissions)} total submissions") + df = submissions_to_dataframe(submissions) + + # Determine date range for filename + if not df.empty and df["creation_timestamp"].notna().any(): + # Get earliest timestamp for start date + earliest = pd.to_datetime(df["creation_timestamp"]).min() + start_date_str = earliest.strftime("%Y-%m-%d") + else: + start_date_str = date_str + + # Output + OUT_ROOT.mkdir(parents=True, exist_ok=True) + output_file = OUT_ROOT / f"planequery_aircraft_community_{start_date_str}_{date_str}.csv" + + df.to_csv(output_file, index=False) + + print(f"Saved: {output_file}") + print(f"Total contributions: {len(df)}") + + return output_file + + +if __name__ == "__main__": + main() diff --git a/src/contributions/read_community_data.py b/src/contributions/read_community_data.py new file mode 100644 index 0000000..0e6e4ea --- /dev/null +++ b/src/contributions/read_community_data.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +""" +Read and aggregate all community submission data. + +Usage: + python -m src.contributions.read_community_data + python -m src.contributions.read_community_data --output merged.json +""" +import argparse +import json +import sys +from pathlib import Path + + +COMMUNITY_DIR = Path(__file__).parent.parent.parent / "community" + + +def read_all_submissions(community_dir: Path | None = None) -> list[dict]: + """ + Read all JSON submissions from the community directory. + + Args: + community_dir: Path to community directory. Uses default if None. + + Returns: + List of all submission dictionaries + """ + if community_dir is None: + community_dir = COMMUNITY_DIR + + all_submissions = [] + + for json_file in sorted(community_dir.glob("*.json")): + try: + with open(json_file) as f: + data = json.load(f) + + # Normalize to list + submissions = data if isinstance(data, list) else [data] + + # Add source file metadata + for submission in submissions: + submission["_source_file"] = json_file.name + + all_submissions.extend(submissions) + + except (json.JSONDecodeError, OSError) as e: + print(f"Warning: Failed to read {json_file}: {e}", file=sys.stderr) + + return all_submissions + + +def group_by_identifier(submissions: list[dict]) -> dict[str, list[dict]]: + """ + Group submissions by their identifier (registration, transponder, or airframe ID). + + Returns: + Dict mapping identifier to list of submissions for that identifier + """ + grouped = {} + + for submission in submissions: + # Determine identifier + if "registration_number" in submission: + key = f"reg:{submission['registration_number']}" + elif "transponder_code_hex" in submission: + key = f"icao:{submission['transponder_code_hex']}" + elif "planequery_airframe_id" in submission: + key = f"id:{submission['planequery_airframe_id']}" + else: + key = "_unknown" + + if key not in grouped: + grouped[key] = [] + grouped[key].append(submission) + + return grouped + + +def main(): + parser = argparse.ArgumentParser(description="Read community submission data") + parser.add_argument("--output", "-o", help="Output file (default: stdout)") + parser.add_argument("--group", action="store_true", help="Group by identifier") + parser.add_argument("--stats", action="store_true", help="Print statistics only") + + args = parser.parse_args() + + submissions = read_all_submissions() + + if args.stats: + grouped = group_by_identifier(submissions) + contributors = set(s.get("contributor_uuid", "unknown") for s in submissions) + + print(f"Total submissions: {len(submissions)}") + print(f"Unique identifiers: {len(grouped)}") + print(f"Unique contributors: {len(contributors)}") + return + + if args.group: + result = group_by_identifier(submissions) + else: + result = submissions + + output = json.dumps(result, indent=2) + + if args.output: + with open(args.output, "w") as f: + f.write(output) + print(f"Wrote {len(submissions)} submissions to {args.output}") + else: + print(output) + + +if __name__ == "__main__": + main() diff --git a/src/contributions/schema.py b/src/contributions/schema.py new file mode 100644 index 0000000..3bc9539 --- /dev/null +++ b/src/contributions/schema.py @@ -0,0 +1,117 @@ +"""Schema validation for community submissions.""" +import json +import re +from pathlib import Path +from typing import Any + +try: + from jsonschema import Draft202012Validator +except ImportError: + Draft202012Validator = None + + +SCHEMA_PATH = Path(__file__).parent.parent.parent / "schemas" / "community_submission.v1.schema.json" + + +def load_schema() -> dict: + """Load the community submission schema.""" + with open(SCHEMA_PATH) as f: + return json.load(f) + + +def validate_submission(data: dict | list, schema: dict | None = None) -> list[str]: + """ + Validate submission(s) against schema. + + Args: + data: Single submission dict or list of submissions + schema: Optional schema dict. If None, loads from default path. + + Returns: + List of error messages. Empty list means validation passed. + """ + if Draft202012Validator is None: + raise ImportError("jsonschema is required: pip install jsonschema") + + if schema is None: + schema = load_schema() + + submissions = data if isinstance(data, list) else [data] + errors = [] + + validator = Draft202012Validator(schema) + + for i, submission in enumerate(submissions): + prefix = f"[{i}] " if len(submissions) > 1 else "" + for error in validator.iter_errors(submission): + path = ".".join(str(p) for p in error.path) if error.path else "(root)" + errors.append(f"{prefix}{path}: {error.message}") + + return errors + + +def extract_json_from_issue_body(body: str) -> str | None: + """ + Extract JSON from GitHub issue body. + + Looks for JSON in the 'Submission JSON' section wrapped in code blocks. + + Args: + body: The issue body text + + Returns: + Extracted JSON string or None if not found + """ + # Match JSON in "### Submission JSON" section + pattern = r"### Submission JSON\s*\n\s*```(?:json)?\s*\n([\s\S]*?)\n\s*```" + match = re.search(pattern, body) + + if match: + return match.group(1).strip() + + return None + + +def extract_contributor_name_from_issue_body(body: str) -> str | None: + """ + Extract contributor name from GitHub issue body. + + Looks for the 'Contributor Name' field in the issue form. + + Args: + body: The issue body text + + Returns: + Contributor name string or None if not found/empty + """ + # Match "### Contributor Name" section + pattern = r"### Contributor Name\s*\n\s*(.+?)(?=\n###|\n\n|$)" + match = re.search(pattern, body) + + if match: + name = match.group(1).strip() + # GitHub issue forms show "_No response_" for empty optional fields + if name and name != "_No response_": + return name + + return None + + +def parse_and_validate(json_str: str, schema: dict | None = None) -> tuple[list | dict | None, list[str]]: + """ + Parse JSON string and validate against schema. + + Args: + json_str: JSON string to parse + schema: Optional schema dict + + Returns: + Tuple of (parsed data or None, list of errors) + """ + try: + data = json.loads(json_str) + except json.JSONDecodeError as e: + return None, [f"Invalid JSON: {e}"] + + errors = validate_submission(data, schema) + return data, errors diff --git a/src/contributions/validate_submission.py b/src/contributions/validate_submission.py new file mode 100644 index 0000000..e4d45b6 --- /dev/null +++ b/src/contributions/validate_submission.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +""" +Validate a community submission from a GitHub issue. + +This script is called by the GitHub Actions workflow to validate +submissions when issues are opened or edited. + +Usage: + python -m src.contributions.validate_submission --issue-body "..." + python -m src.contributions.validate_submission --file submission.json + echo '{"registration_number": "N12345"}' | python -m src.contributions.validate_submission --stdin + +Environment variables (for GitHub Actions): + GITHUB_TOKEN: GitHub API token + GITHUB_REPOSITORY: owner/repo + ISSUE_NUMBER: Issue number to comment on +""" +import argparse +import json +import os +import sys +import urllib.request +import urllib.error + +from .schema import extract_json_from_issue_body, parse_and_validate, load_schema + + +def github_api_request(method: str, endpoint: str, data: dict | None = None) -> dict: + """Make a GitHub API request.""" + token = os.environ.get("GITHUB_TOKEN") + repo = os.environ.get("GITHUB_REPOSITORY") + + if not token or not repo: + raise EnvironmentError("GITHUB_TOKEN and GITHUB_REPOSITORY must be set") + + url = f"https://api.github.com/repos/{repo}{endpoint}" + headers = { + "Authorization": f"token {token}", + "Accept": "application/vnd.github.v3+json", + "Content-Type": "application/json", + } + + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, headers=headers, method=method) + + with urllib.request.urlopen(req) as response: + return json.loads(response.read()) + + +def add_issue_comment(issue_number: int, body: str) -> None: + """Add a comment to a GitHub issue.""" + github_api_request("POST", f"/issues/{issue_number}/comments", {"body": body}) + + +def add_issue_label(issue_number: int, label: str) -> None: + """Add a label to a GitHub issue.""" + github_api_request("POST", f"/issues/{issue_number}/labels", {"labels": [label]}) + + +def remove_issue_label(issue_number: int, label: str) -> None: + """Remove a label from a GitHub issue.""" + try: + github_api_request("DELETE", f"/issues/{issue_number}/labels/{label}") + except urllib.error.HTTPError: + pass # Label might not exist + + +def validate_and_report(json_str: str, issue_number: int | None = None) -> bool: + """ + Validate JSON and optionally report to GitHub issue. + + Args: + json_str: JSON string to validate + issue_number: Optional issue number to comment on + + Returns: + True if validation passed, False otherwise + """ + data, errors = parse_and_validate(json_str) + + if errors: + error_list = "\n".join(f"- {e}" for e in errors) + message = f"❌ **Validation Failed**\n\n{error_list}\n\nPlease fix the errors and edit your submission." + + print(message, file=sys.stderr) + + if issue_number: + add_issue_comment(issue_number, message) + remove_issue_label(issue_number, "validated") + + return False + + count = len(data) if isinstance(data, list) else 1 + message = f"✅ **Validation Passed**\n\n{count} submission(s) validated successfully against the schema.\n\nA maintainer can approve this submission by adding the `approved` label." + + print(message) + + if issue_number: + add_issue_comment(issue_number, message) + add_issue_label(issue_number, "validated") + + return True + + +def main(): + parser = argparse.ArgumentParser(description="Validate community submission JSON") + source_group = parser.add_mutually_exclusive_group(required=True) + source_group.add_argument("--issue-body", help="Issue body text containing JSON") + source_group.add_argument("--file", help="JSON file to validate") + source_group.add_argument("--stdin", action="store_true", help="Read JSON from stdin") + + parser.add_argument("--issue-number", type=int, help="GitHub issue number to comment on") + + args = parser.parse_args() + + # Get JSON string + if args.issue_body: + json_str = extract_json_from_issue_body(args.issue_body) + if not json_str: + print("❌ Could not extract JSON from issue body", file=sys.stderr) + if args.issue_number: + add_issue_comment( + args.issue_number, + "❌ **Validation Failed**\n\nCould not extract JSON from submission. " + "Please ensure your JSON is in the 'Submission JSON' field wrapped in code blocks." + ) + sys.exit(1) + elif args.file: + with open(args.file) as f: + json_str = f.read() + else: # stdin + json_str = sys.stdin.read() + + # Validate + success = validate_and_report(json_str, args.issue_number) + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/src/create_daily_planequery_aircraft_adsb_release.py b/src/create_daily_planequery_aircraft_adsb_release.py index faa6b6d..e5de1f8 100644 --- a/src/create_daily_planequery_aircraft_adsb_release.py +++ b/src/create_daily_planequery_aircraft_adsb_release.py @@ -2,6 +2,8 @@ from pathlib import Path from datetime import datetime, timezone, timedelta import sys +import polars as pl + # Add adsb directory to path sys.path.insert(0, str(Path(__file__).parent / "adsb")) # TODO: Fix this hacky path manipulation @@ -23,15 +25,26 @@ if __name__ == '__main__': print("Loading new ADS-B data...") df_new = load_historical_for_day(day) - if df_new.empty: + if df_new.height == 0: day = day - timedelta(days=1) continue - max_time = df_new['time'].max() - max_time = max_time.replace(tzinfo=timezone.utc) - - if max_time >= day.replace(hour=23, minute=59, second=59) - timedelta(minutes=5): - # Data is complete - break + max_time = df_new['time'].max() + if max_time is not None: + # Handle timezone + max_time_dt = max_time + if hasattr(max_time_dt, 'replace'): + max_time_dt = max_time_dt.replace(tzinfo=timezone.utc) + + end_of_day = day.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) - timedelta(minutes=5) + + # Convert polars datetime to python datetime if needed + if isinstance(max_time_dt, datetime): + if max_time_dt.replace(tzinfo=timezone.utc) >= end_of_day: + break + else: + # Polars returns python datetime already + if max_time >= day.replace(hour=23, minute=54, second=59): + break print(f"WARNING: Latest data time is {max_time}, which is more than 5 minutes before end of day.") day = day - timedelta(days=1) @@ -51,14 +64,21 @@ if __name__ == '__main__': start_date_str = date_str # Sort by time for consistent ordering - df_combined = df_combined.sort_values('time').reset_index(drop=True) + df_combined = df_combined.sort('time') + + # Convert any list columns to strings for CSV compatibility + for col in df_combined.columns: + if df_combined[col].dtype == pl.List: + df_combined = df_combined.with_columns( + pl.col(col).list.join(",").alias(col) + ) # Save the result OUT_ROOT = Path("data/planequery_aircraft") OUT_ROOT.mkdir(parents=True, exist_ok=True) output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{date_str}.csv" - df_combined.to_csv(output_file, index=False) + df_combined.write_csv(output_file) print(f"Saved: {output_file}") - print(f"Total aircraft: {len(df_combined)}") + print(f"Total aircraft: {df_combined.height}") diff --git a/trigger_pipeline.py b/trigger_pipeline.py index d386b25..56f47d8 100644 --- a/trigger_pipeline.py +++ b/trigger_pipeline.py @@ -46,16 +46,9 @@ def main(): run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}" chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days) - clickhouse_host = os.environ["CLICKHOUSE_HOST"] - clickhouse_username = os.environ["CLICKHOUSE_USERNAME"] - clickhouse_password = os.environ["CLICKHOUSE_PASSWORD"] - - # Inject run_id and ClickHouse credentials into each chunk + # Inject run_id into each chunk for chunk in chunks: chunk["run_id"] = run_id - chunk["clickhouse_host"] = clickhouse_host - chunk["clickhouse_username"] = clickhouse_username - chunk["clickhouse_password"] = clickhouse_password sfn_input = { "run_id": run_id,