FEATURE: Add contributions framework. Fix and improve daily adsb release using Github actions for map reduce.

This commit is contained in:
ggman12
2026-02-11 14:04:27 -05:00
parent 27da93801e
commit 722bcdf791
29 changed files with 2347 additions and 343 deletions
@@ -0,0 +1,89 @@
name: Community submission (JSON)
description: Submit one or more community records (JSON) to be reviewed and approved.
title: "Community submission: "
labels:
- community
- submission
body:
- type: markdown
attributes:
value: |
Submit **one object** or an **array of objects** that matches the community submission schema.
**Rules (enforced on review/automation):**
- Each object must include **at least one** of:
- `registration_number`
- `transponder_code_hex` (6 hex chars)
- `planequery_airframe_id`
- Your contributor name (entered below) will be applied to all objects.
- `contributor_uuid` is derived from your GitHub account automatically.
- `creation_timestamp` is created by the system (you may omit it).
**Example: single object**
```json
{
"transponder_code_hex": "a1b2c3"
}
```
**Example: multiple objects (array)**
```json
[
{
"registration_number": "N123AB"
},
{
"planequery_airframe_id": "cessna|172s|12345",
"transponder_code_hex": "0f1234"
}
]
```
- type: input
id: contributor_name
attributes:
label: Contributor Name
description: Your display name for attribution. Leave blank to use your GitHub username. Max 150 characters.
placeholder: "e.g., JamesBerry.com or leave blank"
validations:
required: false
- type: textarea
id: submission_json
attributes:
label: Submission JSON
description: Paste either one JSON object or an array of JSON objects. Must be valid JSON. Do not include contributor_name or contributor_uuid in your JSON.
placeholder: |
Paste JSON here...
validations:
required: true
- type: dropdown
id: submission_type
attributes:
label: What did you submit?
options:
- Single object
- Multiple objects (array)
validations:
required: true
- type: checkboxes
id: confirmations
attributes:
label: Confirmations
options:
- label: "I confirm this is valid JSON (not JSONL) and matches the field names exactly."
required: true
- label: "I confirm `transponder_code_hex` values (if provided) are 6 hex characters."
required: true
- label: "I understand submissions are reviewed and may be rejected or require changes."
required: true
- type: textarea
id: notes
attributes:
label: Notes (optional)
description: Any context, sources, or links that help validate your submission.
validations:
required: false
@@ -0,0 +1,46 @@
name: Approve Community Submission
on:
issues:
types: [labeled]
permissions:
contents: write
pull-requests: write
issues: write
jobs:
approve:
if: github.event.label.name == 'approved' && contains(github.event.issue.labels.*.name, 'validated')
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install jsonschema
- name: Get issue author ID
id: author
uses: actions/github-script@v7
with:
script: |
const issue = context.payload.issue;
core.setOutput('username', issue.user.login);
core.setOutput('user_id', issue.user.id);
- name: Process and create PR
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_REPOSITORY: ${{ github.repository }}
run: |
python -m src.contributions.approve_submission \
--issue-number ${{ github.event.issue.number }} \
--issue-body "${{ github.event.issue.body }}" \
--author "${{ steps.author.outputs.username }}" \
--author-id ${{ steps.author.outputs.user_id }}
@@ -36,10 +36,9 @@ jobs:
ref: 'develop'
});
build-and-release:
build-faa:
runs-on: ubuntu-24.04-arm
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
@@ -56,17 +55,227 @@ jobs:
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run daily release script
env:
CLICKHOUSE_HOST: ${{ secrets.CLICKHOUSE_HOST }}
CLICKHOUSE_USERNAME: ${{ secrets.CLICKHOUSE_USERNAME }}
CLICKHOUSE_PASSWORD: ${{ secrets.CLICKHOUSE_PASSWORD }}
- name: Run FAA release script
run: |
python src/create_daily_planequery_aircraft_release.py
python src/create_daily_planequery_aircraft_adsb_release.py
ls -lah data/faa_releasable
ls -lah data/planequery_aircraft
- name: Upload FAA artifacts
uses: actions/upload-artifact@v4
with:
name: faa-release
path: |
data/planequery_aircraft/planequery_aircraft_faa_*.csv
data/faa_releasable/ReleasableAircraft_*.zip
retention-days: 1
adsb-extract:
runs-on: ubuntu-24.04-arm
if: github.event_name != 'schedule'
outputs:
manifest-exists: ${{ steps.check.outputs.exists }}
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download and extract ADS-B data
run: |
python -m src.adsb.download_and_list_icaos
ls -lah data/output/
- name: Check manifest exists
id: check
run: |
if ls data/output/icao_manifest_*.txt 1>/dev/null 2>&1; then
echo "exists=true" >> "$GITHUB_OUTPUT"
else
echo "exists=false" >> "$GITHUB_OUTPUT"
fi
- name: Create tar of extracted data
run: |
cd data/output
tar -cf extracted_data.tar *-planes-readsb-prod-0.tar_0 icao_manifest_*.txt
ls -lah extracted_data.tar
- name: Upload extracted data
uses: actions/upload-artifact@v4
with:
name: adsb-extracted
path: data/output/extracted_data.tar
retention-days: 1
compression-level: 0 # Already compressed trace files
adsb-map:
runs-on: ubuntu-24.04-arm
needs: adsb-extract
if: github.event_name != 'schedule' && needs.adsb-extract.outputs.manifest-exists == 'true'
strategy:
fail-fast: false
matrix:
chunk: [0, 1, 2, 3]
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download extracted data
uses: actions/download-artifact@v4
with:
name: adsb-extracted
path: data/output/
- name: Extract tar
run: |
cd data/output
tar -xf extracted_data.tar
rm extracted_data.tar
echo "=== Contents of data/output ==="
ls -lah
echo "=== Looking for manifest ==="
cat icao_manifest_*.txt | head -20 || echo "No manifest found"
echo "=== Looking for extracted dirs ==="
ls -d *-planes-readsb-prod-0* 2>/dev/null || echo "No extracted dirs"
- name: Process chunk ${{ matrix.chunk }}
run: |
python -m src.adsb.process_icao_chunk --chunk-id ${{ matrix.chunk }} --total-chunks 4
mkdir -p data/output/adsb_chunks
ls -lah data/output/adsb_chunks/ || echo "No chunks created"
- name: Upload chunk artifacts
uses: actions/upload-artifact@v4
with:
name: adsb-chunk-${{ matrix.chunk }}
path: data/output/adsb_chunks/
retention-days: 1
adsb-reduce:
runs-on: ubuntu-24.04-arm
needs: adsb-map
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Download all chunk artifacts
uses: actions/download-artifact@v4
with:
pattern: adsb-chunk-*
path: data/output/adsb_chunks/
merge-multiple: true
- name: Debug downloaded files
run: |
echo "=== Listing data/ ==="
find data/ -type f 2>/dev/null | head -50 || echo "No files in data/"
echo "=== Looking for parquet files ==="
find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found"
- name: Combine chunks to CSV
run: |
mkdir -p data/output/adsb_chunks
ls -lah data/output/adsb_chunks/ || echo "Directory empty or does not exist"
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks
ls -lah data/planequery_aircraft/
- name: Upload ADS-B artifacts
uses: actions/upload-artifact@v4
with:
name: adsb-release
path: data/planequery_aircraft/planequery_aircraft_adsb_*.csv
retention-days: 1
build-community:
runs-on: ubuntu-latest
if: github.event_name != 'schedule'
steps:
- name: Checkout
uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.14"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pandas
- name: Run Community release script
run: |
python -m src.contributions.create_daily_community_release
ls -lah data/planequery_aircraft
- name: Upload Community artifacts
uses: actions/upload-artifact@v4
with:
name: community-release
path: data/planequery_aircraft/planequery_aircraft_community_*.csv
retention-days: 1
create-release:
runs-on: ubuntu-latest
needs: [build-faa, adsb-reduce, build-community]
if: github.event_name != 'schedule'
steps:
- name: Download FAA artifacts
uses: actions/download-artifact@v4
with:
name: faa-release
path: artifacts/faa
- name: Download ADS-B artifacts
uses: actions/download-artifact@v4
with:
name: adsb-release
path: artifacts/adsb
- name: Download Community artifacts
uses: actions/download-artifact@v4
with:
name: community-release
path: artifacts/community
- name: Prepare release metadata
id: meta
run: |
@@ -79,17 +288,27 @@ jobs:
BRANCH_SUFFIX="-develop"
fi
TAG="planequery-aircraft-${DATE}${BRANCH_SUFFIX}"
# Find the CSV files in data/planequery_aircraft matching the patterns
CSV_FILE_FAA=$(ls data/planequery_aircraft/planequery_aircraft_faa_*_${DATE}.csv | head -1)
# Find files from artifacts
CSV_FILE_FAA=$(ls artifacts/faa/data/planequery_aircraft/planequery_aircraft_faa_*.csv | head -1)
CSV_BASENAME_FAA=$(basename "$CSV_FILE_FAA")
CSV_FILE_ADSB=$(ls data/planequery_aircraft/planequery_aircraft_adsb_*_${DATE}.csv | head -1)
CSV_FILE_ADSB=$(ls artifacts/adsb/planequery_aircraft_adsb_*.csv | head -1)
CSV_BASENAME_ADSB=$(basename "$CSV_FILE_ADSB")
CSV_FILE_COMMUNITY=$(ls artifacts/community/planequery_aircraft_community_*.csv 2>/dev/null | head -1 || echo "")
CSV_BASENAME_COMMUNITY=$(basename "$CSV_FILE_COMMUNITY" 2>/dev/null || echo "")
ZIP_FILE=$(ls artifacts/faa/data/faa_releasable/ReleasableAircraft_*.zip | head -1)
ZIP_BASENAME=$(basename "$ZIP_FILE")
echo "date=$DATE" >> "$GITHUB_OUTPUT"
echo "tag=$TAG" >> "$GITHUB_OUTPUT"
echo "csv_file_faa=$CSV_FILE_FAA" >> "$GITHUB_OUTPUT"
echo "csv_basename_faa=$CSV_BASENAME_FAA" >> "$GITHUB_OUTPUT"
echo "csv_file_adsb=$CSV_FILE_ADSB" >> "$GITHUB_OUTPUT"
echo "csv_basename_adsb=$CSV_BASENAME_ADSB" >> "$GITHUB_OUTPUT"
echo "csv_file_community=$CSV_FILE_COMMUNITY" >> "$GITHUB_OUTPUT"
echo "csv_basename_community=$CSV_BASENAME_COMMUNITY" >> "$GITHUB_OUTPUT"
echo "zip_file=$ZIP_FILE" >> "$GITHUB_OUTPUT"
echo "zip_basename=$ZIP_BASENAME" >> "$GITHUB_OUTPUT"
echo "name=planequery-aircraft snapshot ($DATE)${BRANCH_SUFFIX}" >> "$GITHUB_OUTPUT"
- name: Create GitHub Release and upload assets
@@ -103,10 +322,12 @@ jobs:
Assets:
- ${{ steps.meta.outputs.csv_basename_faa }}
- ${{ steps.meta.outputs.csv_basename_adsb }}
- ReleasableAircraft_${{ steps.meta.outputs.date }}.zip
- ${{ steps.meta.outputs.csv_basename_community }}
- ${{ steps.meta.outputs.zip_basename }}
files: |
${{ steps.meta.outputs.csv_file_faa }}
${{ steps.meta.outputs.csv_file_adsb }}
data/faa_releasable/ReleasableAircraft_${{ steps.meta.outputs.date }}.zip
${{ steps.meta.outputs.csv_file_community }}
${{ steps.meta.outputs.zip_file }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@@ -0,0 +1,30 @@
name: Validate Community Submission
on:
issues:
types: [opened, edited]
jobs:
validate:
if: contains(github.event.issue.labels.*.name, 'submission')
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install jsonschema
- name: Validate submission
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_REPOSITORY: ${{ github.repository }}
run: |
python -m src.contributions.validate_submission \
--issue-body "${{ github.event.issue.body }}" \
--issue-number ${{ github.event.issue.number }}
+2 -14
View File
@@ -79,8 +79,8 @@ class AdsbProcessingStack(Stack):
# --- MAP: worker task definition ---
map_task_def = ecs.FargateTaskDefinition(
self, "MapTaskDef",
cpu=16384, # 16 vCPU
memory_limit_mib=65536, # 64 GB — high memory for pandas operations on large datasets
cpu=4096, # 4 vCPU
memory_limit_mib=30720, # 30 GB
task_role=task_role,
runtime_platform=ecs.RuntimePlatform(
cpu_architecture=ecs.CpuArchitecture.ARM64,
@@ -149,18 +149,6 @@ class AdsbProcessingStack(Stack):
name="RUN_ID",
value=sfn.JsonPath.string_at("$.run_id"),
),
sfn_tasks.TaskEnvironmentVariable(
name="CLICKHOUSE_HOST",
value=sfn.JsonPath.string_at("$.clickhouse_host"),
),
sfn_tasks.TaskEnvironmentVariable(
name="CLICKHOUSE_USERNAME",
value=sfn.JsonPath.string_at("$.clickhouse_username"),
),
sfn_tasks.TaskEnvironmentVariable(
name="CLICKHOUSE_PASSWORD",
value=sfn.JsonPath.string_at("$.clickhouse_password"),
),
],
)
],
+3 -1
View File
@@ -1,3 +1,5 @@
faa-aircraft-registry==0.1.0
pandas==3.0.0
clickhouse-connect==0.10.0
pyarrow==23.0.0
orjson==3.11.7
polars==1.38.1
@@ -0,0 +1,80 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "PlaneQuery Aircraft Community Submission (v1)",
"type": "object",
"additionalProperties": false,
"properties": {
"registration_number": {
"type": "string",
"minLength": 1
},
"transponder_code_hex": {
"type": "string",
"pattern": "^[0-9A-Fa-f]{6}$"
},
"planequery_airframe_id": {
"type": "string",
"minLength": 1
},
"contributor_uuid": {
"type": "string",
"format": "uuid"
},
"contributor_name": {
"type": "string",
"minLength": 0,
"maxLength": 150,
"description": "Display name (may be blank)"
},
"creation_timestamp": {
"type": "string",
"format": "date-time",
"description": "Set by the system when the submission is persisted/approved.",
"readOnly": true
},
"tags": {
"type": "object",
"description": "Additional community-defined tags as key/value pairs (values may be scalar, array, or object).",
"propertyNames": {
"type": "string",
"pattern": "^[a-z][a-z0-9_]{0,63}$"
},
"additionalProperties": { "$ref": "#/$defs/tagValue" }
}
},
"allOf": [
{
"anyOf": [
{ "required": ["registration_number"] },
{ "required": ["transponder_code_hex"] },
{ "required": ["planequery_airframe_id"] }
]
}
],
"$defs": {
"tagScalar": {
"type": ["string", "number", "integer", "boolean", "null"]
},
"tagValue": {
"anyOf": [
{ "$ref": "#/$defs/tagScalar" },
{
"type": "array",
"maxItems": 50,
"items": { "$ref": "#/$defs/tagScalar" }
},
{
"type": "object",
"maxProperties": 50,
"additionalProperties": { "$ref": "#/$defs/tagScalar" }
}
]
}
}
}
+1
View File
@@ -5,6 +5,7 @@ WORKDIR /app
COPY requirements.reducer.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY reducer.py .
CMD ["python", "-u", "reducer.py"]
+1
View File
@@ -6,6 +6,7 @@ COPY requirements.worker.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY download_adsb_data_to_parquet.py .
COPY worker.py .
CMD ["python", "-u", "worker.py"]
-7
View File
@@ -1,7 +0,0 @@
from pathlib import Path
from datetime import datetime, timezone,timedelta
from adsb_to_aircraft_data_historical import load_historical_for_day
day = datetime.now(timezone.utc) - timedelta(days=1)
load_historical_for_day(day)
@@ -1,87 +0,0 @@
"""
Process historical ADS-B data by date range.
Downloads and compresses ADS-B messages for each day in the specified range.
"""
import argparse
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
from compress_adsb_to_aircraft_data import load_historical_for_day, COLUMNS
def deduplicate_by_signature(df):
"""For each icao, keep only the earliest row with each unique signature."""
df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)
# Group by icao and signature, keep first (earliest) occurrence
df_deduped = df.groupby(['icao', '_signature'], as_index=False).first()
df_deduped = df_deduped.drop(columns=['_signature'])
df_deduped = df_deduped.sort_values('time')
return df_deduped
def main(start_date_str: str, end_date_str: str):
"""Process historical ADS-B data for the given date range."""
OUT_ROOT = Path("data/planequery_aircraft")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
# Parse dates
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
# Calculate total number of days
total_days = (end_date - start_date).days
print(f"Processing {total_days} days from {start_date_str} to {end_date_str}")
# Initialize accumulated dataframe
df_accumulated = pd.DataFrame()
# Cache directory path
cache_dir = Path("data/adsb")
# Iterate through each day
current_date = start_date
while current_date < end_date:
print(f"Processing {current_date.strftime('%Y-%m-%d')}...")
df_compressed = load_historical_for_day(current_date)
# Concatenate to accumulated dataframe
if df_accumulated.empty:
df_accumulated = df_compressed
else:
df_accumulated = pd.concat([df_accumulated, df_compressed], ignore_index=True)
print(f" Added {len(df_compressed)} records (total: {len(df_accumulated)})")
# Save intermediate output after each day
current_date_str = current_date.strftime('%Y-%m-%d')
output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{current_date_str}.csv.gz"
df_deduped = deduplicate_by_signature(df_accumulated.copy())
df_deduped.to_csv(output_file, index=False, compression='gzip')
print(f" Saved to {output_file.name}")
# Delete cache after processing if processing more than 10 days
if total_days > 5 and cache_dir.exists():
import shutil
shutil.rmtree(cache_dir)
print(f" Deleted cache directory to save space")
# Move to next day
current_date += timedelta(days=1)
# Save the final accumulated data
output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{end_date_str}.csv.gz"
df_accumulated = deduplicate_by_signature(df_accumulated)
df_accumulated.to_csv(output_file, index=False, compression='gzip')
print(f"Completed processing from {start_date_str} to {end_date_str}")
print(f"Saved {len(df_accumulated)} total records to {output_file}")
if __name__ == '__main__':
# Parse command line arguments
parser = argparse.ArgumentParser(description="Process historical ADS-B data from ClickHouse")
parser.add_argument("start_date", help="Start date (YYYY-MM-DD, inclusive)")
parser.add_argument("end_date", help="End date (YYYY-MM-DD, exclusive)")
args = parser.parse_args()
main(args.start_date, args.end_date)
+205
View File
@@ -0,0 +1,205 @@
"""
Combines chunk parquet files and compresses to final aircraft CSV.
This is the reduce phase of the map-reduce pipeline.
Memory-efficient: processes each chunk separately, compresses, then combines.
Usage:
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks
"""
import gc
import os
import sys
import glob
import argparse
from datetime import datetime, timedelta
import polars as pl
from src.adsb.download_adsb_data_to_parquet import OUTPUT_DIR, get_resource_usage
from src.adsb.compress_adsb_to_aircraft_data import compress_multi_icao_df, COLUMNS
DEFAULT_CHUNK_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks")
FINAL_OUTPUT_DIR = "./data/planequery_aircraft"
os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
def get_target_day() -> datetime:
"""Get yesterday's date (the day we're processing)."""
return datetime.utcnow() - timedelta(days=1)
def process_single_chunk(chunk_path: str) -> pl.DataFrame:
"""Load and compress a single chunk parquet file."""
print(f"Processing {os.path.basename(chunk_path)}... | {get_resource_usage()}")
# Load chunk - only columns we need
needed_columns = ['time', 'icao'] + COLUMNS
df = pl.read_parquet(chunk_path, columns=needed_columns)
print(f" Loaded {len(df)} rows")
# Compress to aircraft records (one per ICAO) using shared function
compressed = compress_multi_icao_df(df, verbose=True)
print(f" Compressed to {len(compressed)} aircraft records")
del df
gc.collect()
return compressed
def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFrame:
"""Combine multiple compressed DataFrames.
Since chunks are partitioned by ICAO hash, each ICAO only appears in one chunk.
No deduplication needed here - just concatenate.
"""
print(f"Combining {len(compressed_dfs)} compressed chunks... | {get_resource_usage()}")
# Concat all
combined = pl.concat(compressed_dfs)
print(f"Combined: {len(combined)} records")
return combined
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame:
"""Download base release and merge with new data."""
from src.get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv
print("Downloading base ADS-B release...")
try:
base_path = download_latest_aircraft_adsb_csv(
output_dir="./data/planequery_aircraft_base"
)
print(f"Download returned: {base_path}")
if base_path and os.path.exists(str(base_path)):
print(f"Loading base release from {base_path}")
base_df = pl.read_csv(base_path)
print(f"Base release has {len(base_df)} records")
# Ensure columns match
base_cols = set(base_df.columns)
new_cols = set(compressed_df.columns)
print(f"Base columns: {sorted(base_cols)}")
print(f"New columns: {sorted(new_cols)}")
# Add missing columns
for col in new_cols - base_cols:
base_df = base_df.with_columns(pl.lit(None).alias(col))
for col in base_cols - new_cols:
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
# Reorder columns to match
compressed_df = compressed_df.select(base_df.columns)
# Concat and deduplicate by icao (keep new data - it comes last)
combined = pl.concat([base_df, compressed_df])
print(f"After concat: {len(combined)} records")
deduplicated = combined.unique(subset=["icao"], keep="last")
print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup")
del base_df, combined
gc.collect()
return deduplicated
else:
print(f"No base release found at {base_path}, using only new data")
return compressed_df
except Exception as e:
import traceback
print(f"Failed to download base release: {e}")
traceback.print_exc()
return compressed_df
def cleanup_chunks(date_str: str, chunks_dir: str):
"""Delete chunk parquet files after successful merge."""
pattern = os.path.join(chunks_dir, f"chunk_*_{date_str}.parquet")
chunk_files = glob.glob(pattern)
for f in chunk_files:
try:
os.remove(f)
print(f"Deleted {f}")
except Exception as e:
print(f"Failed to delete {f}: {e}")
def main():
parser = argparse.ArgumentParser(description="Combine chunk parquets to final CSV")
parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)")
parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files")
parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release")
parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging")
args = parser.parse_args()
if args.date:
target_day = datetime.strptime(args.date, "%Y-%m-%d")
else:
target_day = get_target_day()
date_str = target_day.strftime("%Y-%m-%d")
chunks_dir = args.chunks_dir
print(f"Combining chunks for {date_str}")
print(f"Chunks directory: {chunks_dir}")
print(f"Resource usage at start: {get_resource_usage()}")
# Find chunk files
pattern = os.path.join(chunks_dir, f"chunk_*_{date_str}.parquet")
chunk_files = sorted(glob.glob(pattern))
if not chunk_files:
print(f"No chunk files found matching: {pattern}")
sys.exit(1)
print(f"Found {len(chunk_files)} chunk files")
# Process each chunk separately to save memory
compressed_chunks = []
for chunk_path in chunk_files:
compressed = process_single_chunk(chunk_path)
compressed_chunks.append(compressed)
gc.collect()
# Combine all compressed chunks
combined = combine_compressed_chunks(compressed_chunks)
# Free memory from individual chunks
del compressed_chunks
gc.collect()
print(f"After combining: {get_resource_usage()}")
# Merge with base release
if not args.skip_base:
combined = download_and_merge_base_release(combined)
# Convert list columns to strings for CSV compatibility
for col in combined.columns:
if combined[col].dtype == pl.List:
combined = combined.with_columns(
pl.col(col).list.join(",").alias(col)
)
# Sort by time for consistent output
if 'time' in combined.columns:
combined = combined.sort('time')
# Write final CSV
output_path = os.path.join(FINAL_OUTPUT_DIR, f"planequery_aircraft_adsb_{date_str}.csv")
combined.write_csv(output_path)
print(f"Wrote {len(combined)} records to {output_path}")
# Cleanup
if not args.keep_chunks:
cleanup_chunks(date_str, chunks_dir)
print(f"Done! | {get_resource_usage()}")
if __name__ == "__main__":
main()
+177 -73
View File
@@ -1,32 +1,67 @@
# SOME KIND OF MAP REDUCE SYSTEM
# Shared compression logic for ADS-B aircraft data
import os
import polars as pl
COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']
def compress_df(df):
icao = df.name
df["_signature"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)
def deduplicate_by_signature(df: pl.DataFrame) -> pl.DataFrame:
"""For each icao, keep only the earliest row with each unique signature.
# Compute signature counts before grouping (avoid copy)
signature_counts = df["_signature"].value_counts()
This is used for deduplicating across multiple compressed chunks.
"""
# Create signature column
df = df.with_columns(
pl.concat_str([pl.col(c).cast(pl.Utf8).fill_null("") for c in COLUMNS], separator="|").alias("_signature")
)
# Group by icao and signature, take first row (earliest due to time sort)
df = df.sort("time")
df_deduped = df.group_by(["icao", "_signature"]).first()
df_deduped = df_deduped.drop("_signature")
df_deduped = df_deduped.sort("time")
return df_deduped
def compress_df_polars(df: pl.DataFrame, icao: str) -> pl.DataFrame:
"""Compress a single ICAO group to its most informative row using Polars."""
# Create signature string
df = df.with_columns(
pl.concat_str([pl.col(c).cast(pl.Utf8) for c in COLUMNS], separator="|").alias("_signature")
)
df = df.groupby("_signature", as_index=False).first() # check if it works with both last and first.
# For each row, create a dict of non-empty column values. This is using sets and subsets...
def get_non_empty_dict(row):
return {col: row[col] for col in COLUMNS if row[col] != ''}
# Compute signature counts
signature_counts = df.group_by("_signature").len().rename({"len": "_sig_count"})
df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)
df['_non_empty_count'] = df['_non_empty_dict'].apply(len)
# Group by signature and take first row
df = df.group_by("_signature").first()
if df.height == 1:
# Only one unique signature, return it
result = df.drop("_signature").with_columns(pl.lit(icao).alias("icao"))
return result
# For each row, create dict of non-empty column values and check subsets
# Convert to list of dicts for subset checking (same logic as pandas version)
rows_data = []
for row in df.iter_rows(named=True):
non_empty = {col: row[col] for col in COLUMNS if row[col] != '' and row[col] is not None}
rows_data.append({
'signature': row['_signature'],
'non_empty_dict': non_empty,
'non_empty_count': len(non_empty),
'row_data': row
})
# Check if row i's non-empty values are a subset of row j's non-empty values
def is_subset_of_any(idx):
row_dict = df.loc[idx, '_non_empty_dict']
row_count = df.loc[idx, '_non_empty_count']
row_dict = rows_data[idx]['non_empty_dict']
row_count = rows_data[idx]['non_empty_count']
for other_idx in df.index:
for other_idx, other_data in enumerate(rows_data):
if idx == other_idx:
continue
other_dict = df.loc[other_idx, '_non_empty_dict']
other_count = df.loc[other_idx, '_non_empty_count']
other_dict = other_data['non_empty_dict']
other_count = other_data['non_empty_count']
# Check if all non-empty values in current row match those in other row
if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):
@@ -36,32 +71,94 @@ def compress_df(df):
return False
# Keep rows that are not subsets of any other row
keep_mask = ~df.index.to_series().apply(is_subset_of_any)
df = df[keep_mask]
keep_indices = [i for i in range(len(rows_data)) if not is_subset_of_any(i)]
if len(keep_indices) == 0:
keep_indices = [0] # Fallback: keep first row
remaining_signatures = [rows_data[i]['signature'] for i in keep_indices]
df = df.filter(pl.col("_signature").is_in(remaining_signatures))
if df.height > 1:
# Use signature counts to pick the most frequent one
df = df.join(signature_counts, on="_signature", how="left")
max_count = df["_sig_count"].max()
df = df.filter(pl.col("_sig_count") == max_count).head(1)
df = df.drop("_sig_count")
result = df.drop("_signature").with_columns(pl.lit(icao).alias("icao"))
# Ensure empty strings are preserved
for col in COLUMNS:
if col in result.columns:
result = result.with_columns(pl.col(col).fill_null(""))
return result
if len(df) > 1:
# Use pre-computed signature counts instead of original_df
remaining_sigs = df['_signature']
sig_counts = signature_counts[remaining_sigs]
max_signature = sig_counts.idxmax()
df = df[df['_signature'] == max_signature]
df['icao'] = icao
df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])
# Ensure empty strings are preserved, not NaN
df[COLUMNS] = df[COLUMNS].fillna('')
return df
def compress_multi_icao_df(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame:
"""Compress a DataFrame with multiple ICAOs to one row per ICAO.
This is the main entry point for compressing ADS-B data.
Used by both daily GitHub Actions runs and historical AWS runs.
Args:
df: DataFrame with columns ['time', 'icao'] + COLUMNS
verbose: Whether to print progress
Returns:
Compressed DataFrame with one row per ICAO
"""
if df.height == 0:
return df
# Sort by icao and time
df = df.sort(['icao', 'time'])
# Fill null values with empty strings for COLUMNS
for col in COLUMNS:
if col in df.columns:
df = df.with_columns(pl.col(col).cast(pl.Utf8).fill_null(""))
# First pass: quick deduplication of exact duplicates
df = df.unique(subset=['icao'] + COLUMNS, keep='first')
if verbose:
print(f"After quick dedup: {df.height} records")
# Second pass: sophisticated compression per ICAO
if verbose:
print("Compressing per ICAO...")
# Process each ICAO group
icao_groups = df.partition_by('icao', as_dict=True, maintain_order=True)
compressed_dfs = []
for icao_key, group_df in icao_groups.items():
# partition_by with as_dict=True returns tuple keys, extract first element
icao = icao_key[0] if isinstance(icao_key, tuple) else icao_key
compressed = compress_df_polars(group_df, str(icao))
compressed_dfs.append(compressed)
if compressed_dfs:
df_compressed = pl.concat(compressed_dfs)
else:
df_compressed = df.head(0) # Empty with same schema
if verbose:
print(f"After compress: {df_compressed.height} records")
# Reorder columns: time first, then icao
cols = df_compressed.columns
ordered_cols = ['time', 'icao'] + [c for c in cols if c not in ['time', 'icao']]
df_compressed = df_compressed.select(ordered_cols)
return df_compressed
# names of releases something like
# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz
# Let's build historical first.
def load_raw_adsb_for_day(day):
"""Load raw ADS-B data for a day from parquet file."""
from datetime import timedelta
from pathlib import Path
import pandas as pd
start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)
@@ -84,67 +181,72 @@ def load_raw_adsb_for_day(day):
if parquet_file.exists():
print(f" Loading from parquet: {parquet_file}")
df = pd.read_parquet(
df = pl.read_parquet(
parquet_file,
columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category']
)
# Convert to timezone-naive datetime
df['time'] = df['time'].dt.tz_localize(None)
if df["time"].dtype == pl.Datetime:
df = df.with_columns(pl.col("time").dt.replace_time_zone(None))
return df
else:
# Return empty DataFrame if parquet file doesn't exist
print(f" No data available for {start_time.strftime('%Y-%m-%d')}")
import pandas as pd
return pd.DataFrame(columns=['time', 'icao', 'r', 't', 'dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category'])
return pl.DataFrame(schema={
'time': pl.Datetime,
'icao': pl.Utf8,
'r': pl.Utf8,
't': pl.Utf8,
'dbFlags': pl.Int64,
'ownOp': pl.Utf8,
'year': pl.Int64,
'desc': pl.Utf8,
'aircraft_category': pl.Utf8
})
def load_historical_for_day(day):
from pathlib import Path
import pandas as pd
"""Load and compress historical ADS-B data for a day."""
df = load_raw_adsb_for_day(day)
if df.empty:
if df.height == 0:
return df
print(f"Loaded {len(df)} raw records for {day.strftime('%Y-%m-%d')}")
df = df.sort_values(['icao', 'time'])
print("done sort")
df[COLUMNS] = df[COLUMNS].fillna('')
# First pass: quick deduplication of exact duplicates
df = df.drop_duplicates(subset=['icao'] + COLUMNS, keep='first')
print(f"After quick dedup: {len(df)} records")
print(f"Loaded {df.height} raw records for {day.strftime('%Y-%m-%d')}")
# Second pass: sophisticated compression per ICAO
print("Compressing per ICAO...")
df_compressed = df.groupby('icao', group_keys=False).apply(compress_df)
print(f"After compress: {len(df_compressed)} records")
cols = df_compressed.columns.tolist()
cols.remove('time')
cols.insert(0, 'time')
cols.remove("icao")
cols.insert(1, "icao")
df_compressed = df_compressed[cols]
return df_compressed
# Use shared compression function
return compress_multi_icao_df(df, verbose=True)
def concat_compressed_dfs(df_base, df_new):
"""Concatenate base and new compressed dataframes, keeping the most informative row per ICAO."""
import pandas as pd
# Combine both dataframes
df_combined = pd.concat([df_base, df_new], ignore_index=True)
df_combined = pl.concat([df_base, df_new])
# Sort by ICAO and time
df_combined = df_combined.sort_values(['icao', 'time'])
df_combined = df_combined.sort(['icao', 'time'])
# Fill NaN values
df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')
# Fill null values
for col in COLUMNS:
if col in df_combined.columns:
df_combined = df_combined.with_columns(pl.col(col).fill_null(""))
# Apply compression logic per ICAO to get the best row
df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)
icao_groups = df_combined.partition_by('icao', as_dict=True, maintain_order=True)
compressed_dfs = []
for icao, group_df in icao_groups.items():
compressed = compress_df_polars(group_df, icao)
compressed_dfs.append(compressed)
if compressed_dfs:
df_compressed = pl.concat(compressed_dfs)
else:
df_compressed = df_combined.head(0)
# Sort by time
df_compressed = df_compressed.sort_values('time')
df_compressed = df_compressed.sort('time')
return df_compressed
@@ -152,13 +254,15 @@ def concat_compressed_dfs(df_base, df_new):
def get_latest_aircraft_adsb_csv_df():
"""Download and load the latest ADS-B CSV from GitHub releases."""
from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv
import pandas as pd
import re
csv_path = download_latest_aircraft_adsb_csv()
df = pd.read_csv(csv_path)
df = df.fillna("")
df = pl.read_csv(csv_path, null_values=[""])
# Fill nulls with empty strings
for col in df.columns:
if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null(""))
# Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv
match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
+119 -64
View File
@@ -11,6 +11,8 @@ This file is self-contained and does not import from other project modules.
import gc
import glob
import gzip
import resource
import shutil
import sys
import logging
import time
@@ -22,10 +24,10 @@ import os
import argparse
import datetime as dt
from datetime import datetime, timedelta, timezone
import urllib.request
import urllib.error
import requests
import orjson
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
@@ -44,6 +46,24 @@ TOKEN = os.environ.get('GITHUB_TOKEN') # Optional: for higher GitHub API rate l
HEADERS = {"Authorization": f"token {TOKEN}"} if TOKEN else {}
def get_resource_usage() -> str:
"""Get current RAM and disk usage as a formatted string."""
# RAM usage (RSS = Resident Set Size)
ram_bytes = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# On macOS, ru_maxrss is in bytes; on Linux, it's in KB
if sys.platform == 'darwin':
ram_gb = ram_bytes / (1024**3)
else:
ram_gb = ram_bytes / (1024**2) # Convert KB to GB
# Disk usage
disk = shutil.disk_usage('.')
disk_free_gb = disk.free / (1024**3)
disk_total_gb = disk.total / (1024**3)
return f"RAM: {ram_gb:.2f}GB | Disk: {disk_free_gb:.1f}GB free / {disk_total_gb:.1f}GB total"
# ============================================================================
# GitHub Release Fetching and Downloading
# ============================================================================
@@ -72,17 +92,19 @@ def fetch_releases(version_date: str) -> list:
for attempt in range(1, max_retries + 1):
try:
response = requests.get(f"{BASE_URL}?page={page}", headers=HEADERS)
if response.status_code == 200:
break
else:
print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status_code} {response.reason}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry...")
time.sleep(retry_delay)
req = urllib.request.Request(f"{BASE_URL}?page={page}", headers=HEADERS)
with urllib.request.urlopen(req) as response:
if response.status == 200:
data = orjson.loads(response.read())
break
else:
print(f"Giving up after {max_retries} attempts")
return releases
print(f"Failed to fetch releases (attempt {attempt}/{max_retries}): {response.status} {response.reason}")
if attempt < max_retries:
print(f"Waiting {retry_delay} seconds before retry...")
time.sleep(retry_delay)
else:
print(f"Giving up after {max_retries} attempts")
return releases
except Exception as e:
print(f"Request exception (attempt {attempt}/{max_retries}): {e}")
if attempt < max_retries:
@@ -91,8 +113,6 @@ def fetch_releases(version_date: str) -> list:
else:
print(f"Giving up after {max_retries} attempts")
return releases
data = response.json()
if not data:
break
for release in data:
@@ -115,18 +135,22 @@ def download_asset(asset_url: str, file_path: str) -> bool:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(40) # 40-second timeout
response = requests.get(asset_url, headers=HEADERS, stream=True)
signal.alarm(0)
if response.status_code == 200:
with open(file_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"Saved {file_path}")
return True
else:
print(f"Failed to download {asset_url}: {response.status_code} {response.reason}")
return False
req = urllib.request.Request(asset_url, headers=HEADERS)
with urllib.request.urlopen(req) as response:
signal.alarm(0)
if response.status == 200:
with open(file_path, "wb") as file:
while True:
chunk = response.read(8192)
if not chunk:
break
file.write(chunk)
print(f"Saved {file_path}")
return True
else:
print(f"Failed to download {asset_url}: {response.status} {response.msg}")
return False
except DownloadTimeoutException as e:
print(f"Download aborted for {asset_url}: {e}")
return False
@@ -139,6 +163,7 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
"""
Extracts a split archive by concatenating the parts using 'cat'
and then extracting with 'tar' in one pipeline.
Deletes the tar files immediately after extraction to save disk space.
"""
if os.path.isdir(extract_dir):
print(f"[SKIP] Extraction directory already exists: {extract_dir}")
@@ -176,6 +201,20 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
cat_proc.wait()
print(f"Successfully extracted archive to {extract_dir}")
# Delete tar files immediately after extraction
for tar_file in file_paths:
try:
os.remove(tar_file)
print(f"Deleted tar file: {tar_file}")
except Exception as e:
print(f"Failed to delete {tar_file}: {e}")
# Check disk usage after deletion
disk = shutil.disk_usage('.')
free_gb = disk.free / (1024**3)
print(f"Disk space after tar deletion: {free_gb:.1f}GB free")
return True
except subprocess.CalledProcessError as e:
print(f"Failed to extract split archive: {e}")
@@ -309,7 +348,7 @@ def process_file(filepath: str) -> list:
insert_rows.append(inserted_row)
if insert_rows:
print(f"Got {len(insert_rows)} rows from {filepath}")
# print(f"Got {len(insert_rows)} rows from {filepath}")
return insert_rows
else:
return []
@@ -342,8 +381,8 @@ COLUMNS = [
OS_CPU_COUNT = os.cpu_count() or 1
MAX_WORKERS = OS_CPU_COUNT if OS_CPU_COUNT > 4 else 1
CHUNK_SIZE = MAX_WORKERS * 1000
BATCH_SIZE = (os.cpu_count() or 1) * 100000
CHUNK_SIZE = MAX_WORKERS * 500 # Reduced for lower RAM usage
BATCH_SIZE = 250_000 # Fixed size for predictable memory usage (~500MB per batch)
# PyArrow schema for efficient Parquet writing
PARQUET_SCHEMA = pa.schema([
@@ -448,10 +487,18 @@ def safe_process(fp):
return []
def rows_to_dataframe(rows: list) -> pd.DataFrame:
"""Convert list of rows to a pandas DataFrame."""
df = pd.DataFrame(rows, columns=COLUMNS)
return df
def rows_to_arrow_table(rows: list) -> pa.Table:
"""Convert list of rows to a PyArrow Table directly (no pandas)."""
# Transpose rows into columns
columns = list(zip(*rows))
# Build arrays for each column according to schema
arrays = []
for i, field in enumerate(PARQUET_SCHEMA):
col_data = list(columns[i]) if i < len(columns) else [None] * len(rows)
arrays.append(pa.array(col_data, type=field.type))
return pa.Table.from_arrays(arrays, schema=PARQUET_SCHEMA)
def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int):
@@ -459,23 +506,17 @@ def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int):
if not rows:
return
df = rows_to_dataframe(rows)
# Ensure datetime column is timezone-aware
if not df['time'].dt.tz:
df['time'] = df['time'].dt.tz_localize('UTC')
table = rows_to_arrow_table(rows)
parquet_path = os.path.join(PARQUET_DIR, f"{version_date}_batch_{batch_idx:04d}.parquet")
# Convert to PyArrow table and write
table = pa.Table.from_pandas(df, schema=PARQUET_SCHEMA, preserve_index=False)
pq.write_table(table, parquet_path, compression='snappy')
print(f"Written parquet batch {batch_idx} ({len(rows)} rows) to {parquet_path}")
print(f"Written parquet batch {batch_idx} ({len(rows)} rows) | {get_resource_usage()}")
def merge_parquet_files(version_date: str, delete_batches: bool = True):
"""Merge all batch parquet files for a version_date into a single file."""
"""Merge all batch parquet files for a version_date into a single file using streaming."""
pattern = os.path.join(PARQUET_DIR, f"{version_date}_batch_*.parquet")
batch_files = sorted(glob.glob(pattern))
@@ -483,28 +524,42 @@ def merge_parquet_files(version_date: str, delete_batches: bool = True):
print(f"No batch files found for {version_date}")
return None
print(f"Merging {len(batch_files)} batch files for {version_date}...")
print(f"Merging {len(batch_files)} batch files for {version_date} (streaming)...")
# Read all batch files
tables = []
for f in batch_files:
tables.append(pq.read_table(f))
# Concatenate all tables
merged_table = pa.concat_tables(tables)
# Write merged file
merged_path = os.path.join(PARQUET_DIR, f"{version_date}.parquet")
pq.write_table(merged_table, merged_path, compression='snappy')
total_rows = 0
print(f"Merged parquet file written to {merged_path} ({merged_table.num_rows} total rows)")
# Stream write: read one batch at a time to minimize RAM usage
writer = None
try:
for i, f in enumerate(batch_files):
table = pq.read_table(f)
total_rows += table.num_rows
if writer is None:
writer = pq.ParquetWriter(merged_path, table.schema, compression='snappy')
writer.write_table(table)
# Delete batch file immediately after reading to free disk space
if delete_batches:
os.remove(f)
# Free memory
del table
if (i + 1) % 10 == 0:
gc.collect()
print(f" Merged {i + 1}/{len(batch_files)} batches... | {get_resource_usage()}")
finally:
if writer is not None:
writer.close()
print(f"Merged parquet file written to {merged_path} ({total_rows} total rows) | {get_resource_usage()}")
# Optionally delete batch files
if delete_batches:
for f in batch_files:
os.remove(f)
print(f"Deleted {len(batch_files)} batch files")
print(f"Deleted {len(batch_files)} batch files during merge")
gc.collect()
return merged_path
@@ -608,15 +663,15 @@ def process_version_date(version_date: str, keep_folders: bool = False):
print(f"Total rows processed for version_date {version_date}: {total_num_rows}")
# Clean up extracted directory immediately after processing (before merging parquet files)
if not keep_folders and os.path.isdir(extract_dir):
print(f"Deleting extraction directory with 100,000+ files: {extract_dir}")
shutil.rmtree(extract_dir)
print(f"Successfully deleted extraction directory: {extract_dir} | {get_resource_usage()}")
# Merge batch files into a single parquet file
merge_parquet_files(version_date, delete_batches=True)
# Clean up extracted directory if not keeping
if not keep_folders and os.path.isdir(extract_dir):
import shutil
shutil.rmtree(extract_dir)
print(f"Cleaned up extraction directory: {extract_dir}")
return total_num_rows
+148
View File
@@ -0,0 +1,148 @@
"""
Downloads and extracts adsb.lol tar files, then lists all ICAO folders.
This is the first step of the map-reduce pipeline.
Outputs:
- Extracted trace files in data/output/{version_date}-planes-readsb-prod-0.tar_0/
- ICAO manifest at data/output/icao_manifest_{date}.txt
"""
import os
import sys
import argparse
import glob
import subprocess
from datetime import datetime, timedelta
# Re-use download/extract functions from download_adsb_data_to_parquet
from src.adsb.download_adsb_data_to_parquet import (
OUTPUT_DIR,
fetch_releases,
download_asset,
extract_split_archive,
collect_trace_files_with_find,
)
def get_target_day() -> datetime:
"""Get yesterday's date (the day we're processing)."""
# return datetime.utcnow() - timedelta(days=1)
return datetime.utcnow() - timedelta(days=1)
def download_and_extract(version_date: str) -> str | None:
"""Download and extract tar files, return extract directory path."""
extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0")
# Check if already extracted
if os.path.isdir(extract_dir):
print(f"[SKIP] Already extracted: {extract_dir}")
return extract_dir
# Check for existing tar files
pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*")
matches = [p for p in glob.glob(pattern) if os.path.isfile(p)]
if matches:
print(f"Found existing tar files for {version_date}")
normal_matches = [
p for p in matches
if "-planes-readsb-prod-0." in os.path.basename(p)
and "tmp" not in os.path.basename(p)
]
downloaded_files = normal_matches if normal_matches else matches
else:
# Download from GitHub
print(f"Downloading releases for {version_date}...")
releases = fetch_releases(version_date)
if not releases:
print(f"No releases found for {version_date}")
return None
downloaded_files = []
for release in releases:
tag_name = release["tag_name"]
print(f"Processing release: {tag_name}")
assets = release.get("assets", [])
normal_assets = [
a for a in assets
if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"]
]
tmp_assets = [
a for a in assets
if "planes-readsb-prod-0tmp" in a["name"]
]
use_assets = normal_assets if normal_assets else tmp_assets
for asset in use_assets:
asset_name = asset["name"]
asset_url = asset["browser_download_url"]
file_path = os.path.join(OUTPUT_DIR, asset_name)
if download_asset(asset_url, file_path):
downloaded_files.append(file_path)
if not downloaded_files:
print(f"No files downloaded for {version_date}")
return None
# Extract
if extract_split_archive(downloaded_files, extract_dir):
return extract_dir
return None
def list_icao_folders(extract_dir: str) -> list[str]:
"""List all ICAO folder names from extracted directory."""
trace_files = collect_trace_files_with_find(extract_dir)
icaos = sorted(trace_files.keys())
print(f"Found {len(icaos)} unique ICAOs")
return icaos
def write_manifest(icaos: list[str], date_str: str) -> str:
"""Write ICAO list to manifest file."""
manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{date_str}.txt")
with open(manifest_path, "w") as f:
for icao in icaos:
f.write(f"{icao}\n")
print(f"Wrote manifest with {len(icaos)} ICAOs to {manifest_path}")
return manifest_path
def main():
parser = argparse.ArgumentParser(description="Download and list ICAOs from adsb.lol data")
parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)")
args = parser.parse_args()
if args.date:
target_day = datetime.strptime(args.date, "%Y-%m-%d")
else:
target_day = get_target_day()
date_str = target_day.strftime("%Y-%m-%d")
version_date = f"v{target_day.strftime('%Y.%m.%d')}"
print(f"Processing date: {date_str} (version: {version_date})")
# Download and extract
extract_dir = download_and_extract(version_date)
if not extract_dir:
print("Failed to download/extract data")
sys.exit(1)
# List ICAOs
icaos = list_icao_folders(extract_dir)
if not icaos:
print("No ICAOs found")
sys.exit(1)
# Write manifest
manifest_path = write_manifest(icaos, date_str)
print(f"\nDone! Extract dir: {extract_dir}")
print(f"Manifest: {manifest_path}")
print(f"Total ICAOs: {len(icaos)}")
if __name__ == "__main__":
main()
+270
View File
@@ -0,0 +1,270 @@
"""
Processes a chunk of ICAOs from pre-extracted trace files.
This is the map phase of the map-reduce pipeline.
Expects extract_dir to already exist with trace files.
Reads ICAO manifest to determine which ICAOs to process based on chunk-id.
Usage:
python -m src.adsb.process_icao_chunk --chunk-id 0 --total-chunks 4
"""
import gc
import os
import sys
import argparse
import time
import concurrent.futures
from datetime import datetime, timedelta
import pyarrow as pa
import pyarrow.parquet as pq
from src.adsb.download_adsb_data_to_parquet import (
OUTPUT_DIR,
PARQUET_DIR,
PARQUET_SCHEMA,
COLUMNS,
MAX_WORKERS,
process_file,
get_resource_usage,
collect_trace_files_with_find,
)
CHUNK_OUTPUT_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks")
os.makedirs(CHUNK_OUTPUT_DIR, exist_ok=True)
# Smaller batch size for memory efficiency
BATCH_SIZE = 100_000
def get_target_day() -> datetime:
"""Get yesterday's date (the day we're processing)."""
return datetime.utcnow() - timedelta(days=1)
def read_manifest(date_str: str) -> list[str]:
"""Read ICAO manifest file."""
manifest_path = os.path.join(OUTPUT_DIR, f"icao_manifest_{date_str}.txt")
if not os.path.exists(manifest_path):
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
with open(manifest_path, "r") as f:
icaos = [line.strip() for line in f if line.strip()]
return icaos
def deterministic_hash(s: str) -> int:
"""Return a deterministic hash for a string (unlike Python's hash() which is randomized)."""
# Use sum of byte values - simple but deterministic
return sum(ord(c) for c in s)
def get_chunk_icaos(icaos: list[str], chunk_id: int, total_chunks: int) -> list[str]:
"""Get the subset of ICAOs for this chunk based on deterministic hash partitioning."""
return [icao for icao in icaos if deterministic_hash(icao) % total_chunks == chunk_id]
def build_trace_file_map(extract_dir: str) -> dict[str, str]:
"""Build a map of ICAO -> trace file path using find command."""
print(f"Building trace file map from {extract_dir}...")
# Debug: check what's in extract_dir
if os.path.isdir(extract_dir):
items = os.listdir(extract_dir)[:10]
print(f"First 10 items in extract_dir: {items}")
# Check if there are subdirectories
for item in items[:3]:
subpath = os.path.join(extract_dir, item)
if os.path.isdir(subpath):
subitems = os.listdir(subpath)[:5]
print(f" Contents of {item}/: {subitems}")
trace_map = collect_trace_files_with_find(extract_dir)
print(f"Found {len(trace_map)} trace files")
if len(trace_map) == 0:
# Debug: try manual find
import subprocess
result = subprocess.run(
['find', extract_dir, '-type', 'f', '-name', 'trace_full_*'],
capture_output=True, text=True
)
print(f"Manual find output (first 500 chars): {result.stdout[:500]}")
print(f"Manual find stderr: {result.stderr[:200]}")
return trace_map
def safe_process(filepath: str) -> list:
"""Safely process a file, returning empty list on error."""
try:
return process_file(filepath)
except Exception as e:
print(f"Error processing {filepath}: {e}")
return []
def rows_to_table(rows: list) -> pa.Table:
"""Convert list of rows to PyArrow table."""
import pandas as pd
df = pd.DataFrame(rows, columns=COLUMNS)
if not df['time'].dt.tz:
df['time'] = df['time'].dt.tz_localize('UTC')
return pa.Table.from_pandas(df, schema=PARQUET_SCHEMA, preserve_index=False)
def process_chunk(
chunk_id: int,
total_chunks: int,
trace_map: dict[str, str],
icaos: list[str],
date_str: str,
) -> str | None:
"""Process a chunk of ICAOs and write to parquet."""
chunk_icaos = get_chunk_icaos(icaos, chunk_id, total_chunks)
print(f"Chunk {chunk_id}/{total_chunks}: Processing {len(chunk_icaos)} ICAOs")
if not chunk_icaos:
print(f"Chunk {chunk_id}: No ICAOs to process")
return None
# Get trace file paths from the map
trace_files = []
for icao in chunk_icaos:
if icao in trace_map:
trace_files.append(trace_map[icao])
print(f"Chunk {chunk_id}: Found {len(trace_files)} trace files")
if not trace_files:
print(f"Chunk {chunk_id}: No trace files found")
return None
# Process files and write parquet in batches
output_path = os.path.join(CHUNK_OUTPUT_DIR, f"chunk_{chunk_id}_{date_str}.parquet")
start_time = time.perf_counter()
total_rows = 0
batch_rows = []
writer = None
try:
# Process in parallel batches
files_per_batch = MAX_WORKERS * 100
for offset in range(0, len(trace_files), files_per_batch):
batch_files = trace_files[offset:offset + files_per_batch]
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
for rows in executor.map(safe_process, batch_files):
if rows:
batch_rows.extend(rows)
# Write when batch is full
if len(batch_rows) >= BATCH_SIZE:
table = rows_to_table(batch_rows)
total_rows += len(batch_rows)
if writer is None:
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
writer.write_table(table)
batch_rows = []
del table
gc.collect()
elapsed = time.perf_counter() - start_time
print(f"Chunk {chunk_id}: {total_rows} rows, {elapsed:.1f}s | {get_resource_usage()}")
gc.collect()
# Write remaining rows
if batch_rows:
table = rows_to_table(batch_rows)
total_rows += len(batch_rows)
if writer is None:
writer = pq.ParquetWriter(output_path, PARQUET_SCHEMA, compression='snappy')
writer.write_table(table)
del table
finally:
if writer:
writer.close()
elapsed = time.perf_counter() - start_time
print(f"Chunk {chunk_id}: Done! {total_rows} rows in {elapsed:.1f}s | {get_resource_usage()}")
if total_rows > 0:
return output_path
return None
def main():
parser = argparse.ArgumentParser(description="Process a chunk of ICAOs")
parser.add_argument("--chunk-id", type=int, required=True, help="Chunk ID (0-indexed)")
parser.add_argument("--total-chunks", type=int, required=True, help="Total number of chunks")
parser.add_argument("--date", type=str, help="Date in YYYY-MM-DD format (default: yesterday)")
args = parser.parse_args()
if args.date:
target_day = datetime.strptime(args.date, "%Y-%m-%d")
else:
target_day = get_target_day()
date_str = target_day.strftime("%Y-%m-%d")
version_date = f"v{target_day.strftime('%Y.%m.%d')}"
print(f"Processing chunk {args.chunk_id}/{args.total_chunks} for {date_str}")
print(f"OUTPUT_DIR: {OUTPUT_DIR}")
print(f"CHUNK_OUTPUT_DIR: {CHUNK_OUTPUT_DIR}")
print(f"Resource usage at start: {get_resource_usage()}")
# Debug: List what's in OUTPUT_DIR
print(f"\nContents of {OUTPUT_DIR}:")
if os.path.isdir(OUTPUT_DIR):
for item in os.listdir(OUTPUT_DIR)[:20]:
print(f" - {item}")
else:
print(f" Directory does not exist!")
# Find extract directory
extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0")
print(f"\nLooking for extract_dir: {extract_dir}")
if not os.path.isdir(extract_dir):
print(f"Extract directory not found: {extract_dir}")
# Try to find any extracted directory
import glob
pattern = os.path.join(OUTPUT_DIR, "*-planes-readsb-prod-0*")
matches = glob.glob(pattern)
print(f"Searching for pattern: {pattern}")
print(f"Found matches: {matches}")
sys.exit(1)
# Build trace file map using find
trace_map = build_trace_file_map(extract_dir)
if not trace_map:
print("No trace files found in extract directory")
sys.exit(1)
# Read manifest
icaos = read_manifest(date_str)
print(f"Total ICAOs in manifest: {len(icaos)}")
# Process chunk
output_path = process_chunk(
args.chunk_id,
args.total_chunks,
trace_map,
icaos,
date_str,
)
if output_path:
print(f"Output: {output_path}")
else:
print("No output generated")
if __name__ == "__main__":
main()
+30 -30
View File
@@ -8,23 +8,15 @@ Environment variables:
GLOBAL_START_DATE — overall start date for output filename
GLOBAL_END_DATE — overall end date for output filename
"""
import gzip
import os
import shutil
from pathlib import Path
import boto3
import pandas as pd
import polars as pl
COLUMNS = ["dbFlags", "ownOp", "year", "desc", "aircraft_category", "r", "t"]
def deduplicate_by_signature(df: pd.DataFrame) -> pd.DataFrame:
"""For each icao, keep only the earliest row with each unique signature."""
df["_signature"] = df[COLUMNS].astype(str).agg("|".join, axis=1)
df_deduped = df.groupby(["icao", "_signature"], as_index=False).first()
df_deduped = df_deduped.drop(columns=["_signature"])
df_deduped = df_deduped.sort_values("time")
return df_deduped
from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature
def main():
@@ -55,42 +47,50 @@ def main():
download_dir = Path("/tmp/chunks")
download_dir.mkdir(parents=True, exist_ok=True)
df_accumulated = pd.DataFrame()
dfs = []
for key in chunk_keys:
local_path = download_dir / Path(key).name
gz_path = download_dir / Path(key).name
csv_path = gz_path.with_suffix("") # Remove .gz
print(f"Downloading {key}...")
s3.download_file(s3_bucket, key, str(local_path))
s3.download_file(s3_bucket, key, str(gz_path))
df_chunk = pd.read_csv(local_path, compression="gzip", keep_default_na=False)
print(f" Loaded {len(df_chunk)} rows from {local_path.name}")
# Decompress
with gzip.open(gz_path, 'rb') as f_in:
with open(csv_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
gz_path.unlink()
if df_accumulated.empty:
df_accumulated = df_chunk
else:
df_accumulated = pd.concat(
[df_accumulated, df_chunk], ignore_index=True
)
df_chunk = pl.read_csv(csv_path)
print(f" Loaded {df_chunk.height} rows from {csv_path.name}")
dfs.append(df_chunk)
# Free disk space after loading
local_path.unlink()
csv_path.unlink()
print(f"Combined: {len(df_accumulated)} rows before dedup")
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
print(f"Combined: {df_accumulated.height} rows before dedup")
# Final global deduplication
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {len(df_accumulated)} rows")
print(f"After dedup: {df_accumulated.height} rows")
# Write and upload final result
output_name = f"planequery_aircraft_adsb_{global_start}_{global_end}.csv.gz"
local_output = Path(f"/tmp/{output_name}")
df_accumulated.to_csv(local_output, index=False, compression="gzip")
csv_output = Path(f"/tmp/planequery_aircraft_adsb_{global_start}_{global_end}.csv")
gz_output = Path(f"/tmp/{output_name}")
df_accumulated.write_csv(csv_output)
with open(csv_output, 'rb') as f_in:
with gzip.open(gz_output, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
csv_output.unlink()
final_key = f"final/{output_name}"
print(f"Uploading to s3://{s3_bucket}/{final_key}")
s3.upload_file(str(local_output), s3_bucket, final_key)
s3.upload_file(str(gz_output), s3_bucket, final_key)
print(f"Final output: {len(df_accumulated)} records -> {final_key}")
print(f"Final output: {df_accumulated.height} records -> {final_key}")
if __name__ == "__main__":
+1 -1
View File
@@ -1,2 +1,2 @@
pandas>=2.0
polars>=1.0
boto3>=1.34
+3 -2
View File
@@ -1,4 +1,5 @@
pandas>=2.0
clickhouse-connect>=0.7
polars>=1.0
pyarrow>=14.0
orjson>=3.9
boto3>=1.34
zstandard>=0.22
+28 -33
View File
@@ -13,18 +13,13 @@ from datetime import datetime, timedelta
from pathlib import Path
import boto3
import pandas as pd
import polars as pl
from compress_adsb_to_aircraft_data import load_historical_for_day, COLUMNS
def deduplicate_by_signature(df: pd.DataFrame) -> pd.DataFrame:
"""For each icao, keep only the earliest row with each unique signature."""
df["_signature"] = df[COLUMNS].astype(str).agg("|".join, axis=1)
df_deduped = df.groupby(["icao", "_signature"], as_index=False).first()
df_deduped = df_deduped.drop(columns=["_signature"])
df_deduped = df_deduped.sort_values("time")
return df_deduped
from compress_adsb_to_aircraft_data import (
load_historical_for_day,
deduplicate_by_signature,
COLUMNS,
)
def main():
@@ -39,28 +34,20 @@ def main():
total_days = (end_date - start_date).days
print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})")
df_accumulated = pd.DataFrame()
dfs = []
current_date = start_date
while current_date < end_date:
day_str = current_date.strftime("%Y-%m-%d")
print(f" Loading {day_str}...")
try:
df_compressed = load_historical_for_day(current_date)
except Exception as e:
print(f" WARNING: Failed to load {day_str}: {e}")
current_date += timedelta(days=1)
continue
df_compressed = load_historical_for_day(current_date)
if df_compressed.height == 0:
raise RuntimeError(f"No data found for {day_str}")
if df_accumulated.empty:
df_accumulated = df_compressed
else:
df_accumulated = pd.concat(
[df_accumulated, df_compressed], ignore_index=True
)
print(f" +{len(df_compressed)} rows (total: {len(df_accumulated)})")
dfs.append(df_compressed)
total_rows = sum(df.height for df in dfs)
print(f" +{df_compressed.height} rows (total: {total_rows})")
# Delete local cache after each day to save disk in container
cache_dir = Path("data/adsb")
@@ -70,23 +57,31 @@ def main():
current_date += timedelta(days=1)
if df_accumulated.empty:
print("No data collected — exiting.")
return
# Concatenate all days
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
# Deduplicate within this chunk
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {len(df_accumulated)} rows")
print(f"After dedup: {df_accumulated.height} rows")
# Write to local file then upload to S3
local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz")
df_accumulated.to_csv(local_path, index=False, compression="gzip")
local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv")
df_accumulated.write_csv(local_path)
# Compress with gzip
import gzip
import shutil
gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz")
with open(local_path, 'rb') as f_in:
with gzip.open(gz_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
local_path.unlink() # Remove uncompressed file
s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz"
print(f"Uploading to s3://{s3_bucket}/{s3_key}")
s3 = boto3.client("s3")
s3.upload_file(str(local_path), s3_bucket, s3_key)
s3.upload_file(str(gz_path), s3_bucket, s3_key)
print("Done.")
+1
View File
@@ -0,0 +1 @@
"""Community contributions processing module."""
+249
View File
@@ -0,0 +1,249 @@
#!/usr/bin/env python3
"""
Approve a community submission and create a PR.
This script is called by the GitHub Actions workflow when the 'approved'
label is added to a validated submission issue.
Usage:
python -m src.contributions.approve_submission --issue-number 123 --issue-body "..." --author "username" --author-id 12345
Environment variables:
GITHUB_TOKEN: GitHub API token with repo write permissions
GITHUB_REPOSITORY: owner/repo
"""
import argparse
import base64
import json
import os
import sys
import urllib.request
import urllib.error
from datetime import datetime, timezone
from .schema import extract_json_from_issue_body, extract_contributor_name_from_issue_body, parse_and_validate
from .contributor import (
generate_contributor_uuid,
generate_submission_filename,
compute_content_hash,
)
def github_api_request(
method: str,
endpoint: str,
data: dict | None = None,
accept: str = "application/vnd.github.v3+json"
) -> dict:
"""Make a GitHub API request."""
token = os.environ.get("GITHUB_TOKEN")
repo = os.environ.get("GITHUB_REPOSITORY")
if not token or not repo:
raise EnvironmentError("GITHUB_TOKEN and GITHUB_REPOSITORY must be set")
url = f"https://api.github.com/repos/{repo}{endpoint}"
headers = {
"Authorization": f"token {token}",
"Accept": accept,
"Content-Type": "application/json",
}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req) as response:
return json.loads(response.read())
except urllib.error.HTTPError as e:
error_body = e.read().decode() if e.fp else ""
print(f"GitHub API error: {e.code} {e.reason}: {error_body}", file=sys.stderr)
raise
def add_issue_comment(issue_number: int, body: str) -> None:
"""Add a comment to a GitHub issue."""
github_api_request("POST", f"/issues/{issue_number}/comments", {"body": body})
def get_default_branch_sha() -> str:
"""Get the SHA of the default branch (main)."""
ref = github_api_request("GET", "/git/ref/heads/main")
return ref["object"]["sha"]
def create_branch(branch_name: str, sha: str) -> None:
"""Create a new branch from a SHA."""
try:
github_api_request("POST", "/git/refs", {
"ref": f"refs/heads/{branch_name}",
"sha": sha,
})
except urllib.error.HTTPError as e:
if e.code == 422: # Branch exists
# Delete and recreate
try:
github_api_request("DELETE", f"/git/refs/heads/{branch_name}")
except urllib.error.HTTPError:
pass
github_api_request("POST", "/git/refs", {
"ref": f"refs/heads/{branch_name}",
"sha": sha,
})
else:
raise
def create_or_update_file(path: str, content: str, message: str, branch: str) -> None:
"""Create or update a file in the repository."""
content_b64 = base64.b64encode(content.encode()).decode()
github_api_request("PUT", f"/contents/{path}", {
"message": message,
"content": content_b64,
"branch": branch,
})
def create_pull_request(title: str, head: str, base: str, body: str) -> dict:
"""Create a pull request."""
return github_api_request("POST", "/pulls", {
"title": title,
"head": head,
"base": base,
"body": body,
})
def add_labels_to_issue(issue_number: int, labels: list[str]) -> None:
"""Add labels to an issue or PR."""
github_api_request("POST", f"/issues/{issue_number}/labels", {"labels": labels})
def process_submission(
issue_number: int,
issue_body: str,
author_username: str,
author_id: int,
) -> bool:
"""
Process an approved submission and create a PR.
Args:
issue_number: The GitHub issue number
issue_body: The issue body text
author_username: The GitHub username of the issue author
author_id: The numeric GitHub user ID
Returns:
True if successful, False otherwise
"""
# Extract and validate JSON
json_str = extract_json_from_issue_body(issue_body)
if not json_str:
add_issue_comment(issue_number, "❌ Could not extract JSON from submission.")
return False
data, errors = parse_and_validate(json_str)
if errors:
error_list = "\n".join(f"- {e}" for e in errors)
add_issue_comment(issue_number, f"❌ **Validation Failed**\n\n{error_list}")
return False
# Normalize to list
submissions = data if isinstance(data, list) else [data]
# Generate contributor UUID from GitHub ID
contributor_uuid = generate_contributor_uuid(author_id)
# Extract contributor name from issue form (or default to GitHub username)
contributor_name = extract_contributor_name_from_issue_body(issue_body)
if not contributor_name:
contributor_name = f"@{author_username}"
# Add metadata to each submission
now = datetime.now(timezone.utc)
date_str = now.strftime("%Y-%m-%d")
timestamp_str = now.isoformat()
for submission in submissions:
submission["contributor_uuid"] = contributor_uuid
submission["contributor_name"] = contributor_name
submission["creation_timestamp"] = timestamp_str
# Generate unique filename
content_json = json.dumps(submissions, indent=2, sort_keys=True)
content_hash = compute_content_hash(content_json)
filename = generate_submission_filename(author_username, date_str, content_hash)
file_path = f"community/{filename}"
# Create branch
branch_name = f"community-submission-{issue_number}"
default_sha = get_default_branch_sha()
create_branch(branch_name, default_sha)
# Create file
commit_message = f"Add community submission from @{author_username} (closes #{issue_number})"
create_or_update_file(file_path, content_json, commit_message, branch_name)
# Create PR
pr_body = f"""## Community Submission
Adds {len(submissions)} submission(s) from @{author_username}.
**File:** `{file_path}`
**Contributor UUID:** `{contributor_uuid}`
Closes #{issue_number}
---
### Submissions
```json
{content_json}
```"""
pr = create_pull_request(
title=f"Community submission: {filename}",
head=branch_name,
base="main",
body=pr_body,
)
# Add labels to PR
add_labels_to_issue(pr["number"], ["community", "auto-generated"])
# Comment on original issue
add_issue_comment(
issue_number,
f"✅ **Submission Approved**\n\n"
f"PR #{pr['number']} has been created to add your submission.\n\n"
f"**File:** `{file_path}`\n"
f"**Your Contributor UUID:** `{contributor_uuid}`\n\n"
f"The PR will be merged by a maintainer."
)
print(f"Created PR #{pr['number']} for submission")
return True
def main():
parser = argparse.ArgumentParser(description="Approve community submission and create PR")
parser.add_argument("--issue-number", type=int, required=True, help="GitHub issue number")
parser.add_argument("--issue-body", required=True, help="Issue body text")
parser.add_argument("--author", required=True, help="Issue author username")
parser.add_argument("--author-id", type=int, required=True, help="Issue author numeric ID")
args = parser.parse_args()
success = process_submission(
issue_number=args.issue_number,
issue_body=args.issue_body,
author_username=args.author,
author_id=args.author_id,
)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
+86
View File
@@ -0,0 +1,86 @@
"""Contributor identification utilities."""
import hashlib
import uuid
# DNS namespace UUID for generating UUIDv5
DNS_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
def generate_contributor_uuid(github_user_id: int) -> str:
"""
Generate a deterministic UUID v5 from a GitHub user ID.
This ensures the same GitHub account always gets the same contributor UUID.
Args:
github_user_id: The numeric GitHub user ID
Returns:
UUID string in standard format
"""
name = f"github:{github_user_id}"
return str(uuid.uuid5(DNS_NAMESPACE, name))
def sanitize_username(username: str, max_length: int = 20) -> str:
"""
Sanitize a GitHub username for use in filenames.
Args:
username: GitHub username
max_length: Maximum length of sanitized name
Returns:
Lowercase alphanumeric string with underscores
"""
sanitized = ""
for char in username.lower():
if char.isalnum():
sanitized += char
else:
sanitized += "_"
# Collapse multiple underscores
while "__" in sanitized:
sanitized = sanitized.replace("__", "_")
return sanitized.strip("_")[:max_length]
def generate_submission_filename(
username: str,
date_str: str,
content_hash: str,
extension: str = ".json"
) -> str:
"""
Generate a unique filename for a community submission.
Format: {sanitized_username}_{date}_{short_hash}.json
Args:
username: GitHub username
date_str: Date in YYYY-MM-DD format
content_hash: Hash of the submission content (will be truncated to 8 chars)
extension: File extension (default: .json)
Returns:
Unique filename string
"""
sanitized_name = sanitize_username(username)
short_hash = content_hash[:8]
return f"{sanitized_name}_{date_str}_{short_hash}{extension}"
def compute_content_hash(content: str) -> str:
"""
Compute SHA256 hash of content.
Args:
content: String content to hash
Returns:
Hex digest of SHA256 hash
"""
return hashlib.sha256(content.encode()).hexdigest()
@@ -0,0 +1,141 @@
#!/usr/bin/env python3
"""
Generate a daily CSV of all community contributions.
Reads all JSON files from the community/ directory and outputs a sorted CSV
with creation_timestamp as the first column and contributor_name/contributor_uuid as the last columns.
Usage:
python -m src.contributions.create_daily_community_release
"""
from datetime import datetime, timezone
from pathlib import Path
import json
import sys
import pandas as pd
COMMUNITY_DIR = Path(__file__).parent.parent.parent / "community"
OUT_ROOT = Path("data/planequery_aircraft")
def read_all_submissions(community_dir: Path) -> list[dict]:
"""Read all JSON submissions from the community directory."""
all_submissions = []
for json_file in sorted(community_dir.glob("*.json")):
try:
with open(json_file) as f:
data = json.load(f)
# Normalize to list
submissions = data if isinstance(data, list) else [data]
all_submissions.extend(submissions)
except (json.JSONDecodeError, OSError) as e:
print(f"Warning: Failed to read {json_file}: {e}", file=sys.stderr)
return all_submissions
def submissions_to_dataframe(submissions: list[dict]) -> pd.DataFrame:
"""
Convert submissions to a DataFrame with proper column ordering.
Column order:
- creation_timestamp (first)
- transponder_code_hex
- registration_number
- planequery_airframe_id
- contributor_name
- [other columns alphabetically]
- contributor_uuid (last)
"""
if not submissions:
return pd.DataFrame()
df = pd.DataFrame(submissions)
# Ensure required columns exist
required_cols = [
"creation_timestamp",
"transponder_code_hex",
"registration_number",
"planequery_airframe_id",
"contributor_name",
"contributor_uuid",
]
for col in required_cols:
if col not in df.columns:
df[col] = None
# Sort by creation_timestamp ascending
df = df.sort_values("creation_timestamp", ascending=True, na_position="last")
# Reorder columns: specific order first, contributor_uuid last
first_cols = [
"creation_timestamp",
"transponder_code_hex",
"registration_number",
"planequery_airframe_id",
"contributor_name",
]
last_cols = ["contributor_uuid"]
middle_cols = sorted([
col for col in df.columns
if col not in first_cols and col not in last_cols
])
ordered_cols = first_cols + middle_cols + last_cols
df = df[ordered_cols]
return df.reset_index(drop=True)
def main():
"""Generate the daily community contributions CSV."""
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
print(f"Reading community submissions from {COMMUNITY_DIR}")
submissions = read_all_submissions(COMMUNITY_DIR)
if not submissions:
print("No community submissions found.")
# Still create an empty CSV with headers
df = pd.DataFrame(columns=[
"creation_timestamp",
"transponder_code_hex",
"registration_number",
"planequery_airframe_id",
"contributor_name",
"tags",
"contributor_uuid",
])
else:
print(f"Found {len(submissions)} total submissions")
df = submissions_to_dataframe(submissions)
# Determine date range for filename
if not df.empty and df["creation_timestamp"].notna().any():
# Get earliest timestamp for start date
earliest = pd.to_datetime(df["creation_timestamp"]).min()
start_date_str = earliest.strftime("%Y-%m-%d")
else:
start_date_str = date_str
# Output
OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"planequery_aircraft_community_{start_date_str}_{date_str}.csv"
df.to_csv(output_file, index=False)
print(f"Saved: {output_file}")
print(f"Total contributions: {len(df)}")
return output_file
if __name__ == "__main__":
main()
+115
View File
@@ -0,0 +1,115 @@
#!/usr/bin/env python3
"""
Read and aggregate all community submission data.
Usage:
python -m src.contributions.read_community_data
python -m src.contributions.read_community_data --output merged.json
"""
import argparse
import json
import sys
from pathlib import Path
COMMUNITY_DIR = Path(__file__).parent.parent.parent / "community"
def read_all_submissions(community_dir: Path | None = None) -> list[dict]:
"""
Read all JSON submissions from the community directory.
Args:
community_dir: Path to community directory. Uses default if None.
Returns:
List of all submission dictionaries
"""
if community_dir is None:
community_dir = COMMUNITY_DIR
all_submissions = []
for json_file in sorted(community_dir.glob("*.json")):
try:
with open(json_file) as f:
data = json.load(f)
# Normalize to list
submissions = data if isinstance(data, list) else [data]
# Add source file metadata
for submission in submissions:
submission["_source_file"] = json_file.name
all_submissions.extend(submissions)
except (json.JSONDecodeError, OSError) as e:
print(f"Warning: Failed to read {json_file}: {e}", file=sys.stderr)
return all_submissions
def group_by_identifier(submissions: list[dict]) -> dict[str, list[dict]]:
"""
Group submissions by their identifier (registration, transponder, or airframe ID).
Returns:
Dict mapping identifier to list of submissions for that identifier
"""
grouped = {}
for submission in submissions:
# Determine identifier
if "registration_number" in submission:
key = f"reg:{submission['registration_number']}"
elif "transponder_code_hex" in submission:
key = f"icao:{submission['transponder_code_hex']}"
elif "planequery_airframe_id" in submission:
key = f"id:{submission['planequery_airframe_id']}"
else:
key = "_unknown"
if key not in grouped:
grouped[key] = []
grouped[key].append(submission)
return grouped
def main():
parser = argparse.ArgumentParser(description="Read community submission data")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
parser.add_argument("--group", action="store_true", help="Group by identifier")
parser.add_argument("--stats", action="store_true", help="Print statistics only")
args = parser.parse_args()
submissions = read_all_submissions()
if args.stats:
grouped = group_by_identifier(submissions)
contributors = set(s.get("contributor_uuid", "unknown") for s in submissions)
print(f"Total submissions: {len(submissions)}")
print(f"Unique identifiers: {len(grouped)}")
print(f"Unique contributors: {len(contributors)}")
return
if args.group:
result = group_by_identifier(submissions)
else:
result = submissions
output = json.dumps(result, indent=2)
if args.output:
with open(args.output, "w") as f:
f.write(output)
print(f"Wrote {len(submissions)} submissions to {args.output}")
else:
print(output)
if __name__ == "__main__":
main()
+117
View File
@@ -0,0 +1,117 @@
"""Schema validation for community submissions."""
import json
import re
from pathlib import Path
from typing import Any
try:
from jsonschema import Draft202012Validator
except ImportError:
Draft202012Validator = None
SCHEMA_PATH = Path(__file__).parent.parent.parent / "schemas" / "community_submission.v1.schema.json"
def load_schema() -> dict:
"""Load the community submission schema."""
with open(SCHEMA_PATH) as f:
return json.load(f)
def validate_submission(data: dict | list, schema: dict | None = None) -> list[str]:
"""
Validate submission(s) against schema.
Args:
data: Single submission dict or list of submissions
schema: Optional schema dict. If None, loads from default path.
Returns:
List of error messages. Empty list means validation passed.
"""
if Draft202012Validator is None:
raise ImportError("jsonschema is required: pip install jsonschema")
if schema is None:
schema = load_schema()
submissions = data if isinstance(data, list) else [data]
errors = []
validator = Draft202012Validator(schema)
for i, submission in enumerate(submissions):
prefix = f"[{i}] " if len(submissions) > 1 else ""
for error in validator.iter_errors(submission):
path = ".".join(str(p) for p in error.path) if error.path else "(root)"
errors.append(f"{prefix}{path}: {error.message}")
return errors
def extract_json_from_issue_body(body: str) -> str | None:
"""
Extract JSON from GitHub issue body.
Looks for JSON in the 'Submission JSON' section wrapped in code blocks.
Args:
body: The issue body text
Returns:
Extracted JSON string or None if not found
"""
# Match JSON in "### Submission JSON" section
pattern = r"### Submission JSON\s*\n\s*```(?:json)?\s*\n([\s\S]*?)\n\s*```"
match = re.search(pattern, body)
if match:
return match.group(1).strip()
return None
def extract_contributor_name_from_issue_body(body: str) -> str | None:
"""
Extract contributor name from GitHub issue body.
Looks for the 'Contributor Name' field in the issue form.
Args:
body: The issue body text
Returns:
Contributor name string or None if not found/empty
"""
# Match "### Contributor Name" section
pattern = r"### Contributor Name\s*\n\s*(.+?)(?=\n###|\n\n|$)"
match = re.search(pattern, body)
if match:
name = match.group(1).strip()
# GitHub issue forms show "_No response_" for empty optional fields
if name and name != "_No response_":
return name
return None
def parse_and_validate(json_str: str, schema: dict | None = None) -> tuple[list | dict | None, list[str]]:
"""
Parse JSON string and validate against schema.
Args:
json_str: JSON string to parse
schema: Optional schema dict
Returns:
Tuple of (parsed data or None, list of errors)
"""
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
return None, [f"Invalid JSON: {e}"]
errors = validate_submission(data, schema)
return data, errors
+140
View File
@@ -0,0 +1,140 @@
#!/usr/bin/env python3
"""
Validate a community submission from a GitHub issue.
This script is called by the GitHub Actions workflow to validate
submissions when issues are opened or edited.
Usage:
python -m src.contributions.validate_submission --issue-body "..."
python -m src.contributions.validate_submission --file submission.json
echo '{"registration_number": "N12345"}' | python -m src.contributions.validate_submission --stdin
Environment variables (for GitHub Actions):
GITHUB_TOKEN: GitHub API token
GITHUB_REPOSITORY: owner/repo
ISSUE_NUMBER: Issue number to comment on
"""
import argparse
import json
import os
import sys
import urllib.request
import urllib.error
from .schema import extract_json_from_issue_body, parse_and_validate, load_schema
def github_api_request(method: str, endpoint: str, data: dict | None = None) -> dict:
"""Make a GitHub API request."""
token = os.environ.get("GITHUB_TOKEN")
repo = os.environ.get("GITHUB_REPOSITORY")
if not token or not repo:
raise EnvironmentError("GITHUB_TOKEN and GITHUB_REPOSITORY must be set")
url = f"https://api.github.com/repos/{repo}{endpoint}"
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
"Content-Type": "application/json",
}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
with urllib.request.urlopen(req) as response:
return json.loads(response.read())
def add_issue_comment(issue_number: int, body: str) -> None:
"""Add a comment to a GitHub issue."""
github_api_request("POST", f"/issues/{issue_number}/comments", {"body": body})
def add_issue_label(issue_number: int, label: str) -> None:
"""Add a label to a GitHub issue."""
github_api_request("POST", f"/issues/{issue_number}/labels", {"labels": [label]})
def remove_issue_label(issue_number: int, label: str) -> None:
"""Remove a label from a GitHub issue."""
try:
github_api_request("DELETE", f"/issues/{issue_number}/labels/{label}")
except urllib.error.HTTPError:
pass # Label might not exist
def validate_and_report(json_str: str, issue_number: int | None = None) -> bool:
"""
Validate JSON and optionally report to GitHub issue.
Args:
json_str: JSON string to validate
issue_number: Optional issue number to comment on
Returns:
True if validation passed, False otherwise
"""
data, errors = parse_and_validate(json_str)
if errors:
error_list = "\n".join(f"- {e}" for e in errors)
message = f"❌ **Validation Failed**\n\n{error_list}\n\nPlease fix the errors and edit your submission."
print(message, file=sys.stderr)
if issue_number:
add_issue_comment(issue_number, message)
remove_issue_label(issue_number, "validated")
return False
count = len(data) if isinstance(data, list) else 1
message = f"✅ **Validation Passed**\n\n{count} submission(s) validated successfully against the schema.\n\nA maintainer can approve this submission by adding the `approved` label."
print(message)
if issue_number:
add_issue_comment(issue_number, message)
add_issue_label(issue_number, "validated")
return True
def main():
parser = argparse.ArgumentParser(description="Validate community submission JSON")
source_group = parser.add_mutually_exclusive_group(required=True)
source_group.add_argument("--issue-body", help="Issue body text containing JSON")
source_group.add_argument("--file", help="JSON file to validate")
source_group.add_argument("--stdin", action="store_true", help="Read JSON from stdin")
parser.add_argument("--issue-number", type=int, help="GitHub issue number to comment on")
args = parser.parse_args()
# Get JSON string
if args.issue_body:
json_str = extract_json_from_issue_body(args.issue_body)
if not json_str:
print("❌ Could not extract JSON from issue body", file=sys.stderr)
if args.issue_number:
add_issue_comment(
args.issue_number,
"❌ **Validation Failed**\n\nCould not extract JSON from submission. "
"Please ensure your JSON is in the 'Submission JSON' field wrapped in code blocks."
)
sys.exit(1)
elif args.file:
with open(args.file) as f:
json_str = f.read()
else: # stdin
json_str = sys.stdin.read()
# Validate
success = validate_and_report(json_str, args.issue_number)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
@@ -2,6 +2,8 @@ from pathlib import Path
from datetime import datetime, timezone, timedelta
import sys
import polars as pl
# Add adsb directory to path
sys.path.insert(0, str(Path(__file__).parent / "adsb")) # TODO: Fix this hacky path manipulation
@@ -23,15 +25,26 @@ if __name__ == '__main__':
print("Loading new ADS-B data...")
df_new = load_historical_for_day(day)
if df_new.empty:
if df_new.height == 0:
day = day - timedelta(days=1)
continue
max_time = df_new['time'].max()
max_time = max_time.replace(tzinfo=timezone.utc)
if max_time >= day.replace(hour=23, minute=59, second=59) - timedelta(minutes=5):
# Data is complete
break
max_time = df_new['time'].max()
if max_time is not None:
# Handle timezone
max_time_dt = max_time
if hasattr(max_time_dt, 'replace'):
max_time_dt = max_time_dt.replace(tzinfo=timezone.utc)
end_of_day = day.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) - timedelta(minutes=5)
# Convert polars datetime to python datetime if needed
if isinstance(max_time_dt, datetime):
if max_time_dt.replace(tzinfo=timezone.utc) >= end_of_day:
break
else:
# Polars returns python datetime already
if max_time >= day.replace(hour=23, minute=54, second=59):
break
print(f"WARNING: Latest data time is {max_time}, which is more than 5 minutes before end of day.")
day = day - timedelta(days=1)
@@ -51,14 +64,21 @@ if __name__ == '__main__':
start_date_str = date_str
# Sort by time for consistent ordering
df_combined = df_combined.sort_values('time').reset_index(drop=True)
df_combined = df_combined.sort('time')
# Convert any list columns to strings for CSV compatibility
for col in df_combined.columns:
if df_combined[col].dtype == pl.List:
df_combined = df_combined.with_columns(
pl.col(col).list.join(",").alias(col)
)
# Save the result
OUT_ROOT = Path("data/planequery_aircraft")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"planequery_aircraft_adsb_{start_date_str}_{date_str}.csv"
df_combined.to_csv(output_file, index=False)
df_combined.write_csv(output_file)
print(f"Saved: {output_file}")
print(f"Total aircraft: {len(df_combined)}")
print(f"Total aircraft: {df_combined.height}")
+1 -8
View File
@@ -46,16 +46,9 @@ def main():
run_id = f"run-{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}-{uuid.uuid4().hex[:8]}"
chunks = generate_chunks(args.start_date, args.end_date, args.chunk_days)
clickhouse_host = os.environ["CLICKHOUSE_HOST"]
clickhouse_username = os.environ["CLICKHOUSE_USERNAME"]
clickhouse_password = os.environ["CLICKHOUSE_PASSWORD"]
# Inject run_id and ClickHouse credentials into each chunk
# Inject run_id into each chunk
for chunk in chunks:
chunk["run_id"] = run_id
chunk["clickhouse_host"] = clickhouse_host
chunk["clickhouse_username"] = clickhouse_username
chunk["clickhouse_password"] = clickhouse_password
sfn_input = {
"run_id": run_id,