mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-04-23 19:46:09 +02:00
make a histoircla runner for adsb
This commit is contained in:
@@ -0,0 +1,128 @@
|
||||
name: Historical ADS-B Processing
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
start_date:
|
||||
description: 'Start date (YYYY-MM-DD, inclusive)'
|
||||
required: true
|
||||
type: string
|
||||
end_date:
|
||||
description: 'End date (YYYY-MM-DD, inclusive)'
|
||||
required: true
|
||||
type: string
|
||||
chunk_days:
|
||||
description: 'Days per job chunk (default: 7)'
|
||||
required: false
|
||||
type: number
|
||||
default: 7
|
||||
|
||||
jobs:
|
||||
generate-matrix:
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
chunks: ${{ steps.generate.outputs.chunks }}
|
||||
global_start: ${{ inputs.start_date }}
|
||||
global_end: ${{ inputs.end_date }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Generate date chunks
|
||||
id: generate
|
||||
env:
|
||||
INPUT_START_DATE: ${{ inputs.start_date }}
|
||||
INPUT_END_DATE: ${{ inputs.end_date }}
|
||||
INPUT_CHUNK_DAYS: ${{ inputs.chunk_days }}
|
||||
run: python src/adsb/historical_generate_matrix.py
|
||||
|
||||
process-chunk:
|
||||
needs: generate-matrix
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }}
|
||||
max-parallel: 3
|
||||
fail-fast: false
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install polars pyarrow orjson zstandard
|
||||
|
||||
- name: Free disk space
|
||||
run: |
|
||||
sudo rm -rf /usr/share/dotnet
|
||||
sudo rm -rf /opt/ghc
|
||||
sudo rm -rf /usr/local/share/boost
|
||||
df -h
|
||||
|
||||
- name: Process date range
|
||||
env:
|
||||
CHUNK_START_DATE: ${{ matrix.chunk.start_date }}
|
||||
CHUNK_END_DATE: ${{ matrix.chunk.end_date }}
|
||||
working-directory: src/adsb
|
||||
run: python historical_process_chunk.py
|
||||
|
||||
- name: Upload chunk artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: chunk-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }}
|
||||
path: data/chunks/*.csv
|
||||
retention-days: 1
|
||||
if-no-files-found: ignore
|
||||
|
||||
combine-chunks:
|
||||
needs: [generate-matrix, process-chunk]
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install polars
|
||||
|
||||
- name: Download all chunk artifacts
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
path: chunks
|
||||
pattern: chunk-*
|
||||
merge-multiple: true
|
||||
|
||||
- name: List downloaded chunks
|
||||
run: |
|
||||
echo "Downloaded chunks:"
|
||||
find chunks -name "*.csv" -type f 2>/dev/null || echo "No CSV files found"
|
||||
|
||||
- name: Combine chunks
|
||||
env:
|
||||
GLOBAL_START_DATE: ${{ needs.generate-matrix.outputs.global_start }}
|
||||
GLOBAL_END_DATE: ${{ needs.generate-matrix.outputs.global_end }}
|
||||
run: python src/adsb/historical_combine_chunks.py
|
||||
|
||||
- name: Upload final artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: planequery_aircraft_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }}
|
||||
path: data/planequery_aircraft/*.csv
|
||||
retention-days: 30
|
||||
@@ -0,0 +1,85 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Combine processed chunks into final historical ADS-B release."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import polars as pl
|
||||
|
||||
|
||||
def combine_chunks(chunks_dir: Path, output_dir: Path, start_date: str, end_date: str) -> Path:
|
||||
"""Combine all chunk CSVs into final output.
|
||||
|
||||
Args:
|
||||
chunks_dir: Directory containing chunk CSV files
|
||||
output_dir: Directory to write final output
|
||||
start_date: Global start date for filename
|
||||
end_date: Global end date for filename
|
||||
|
||||
Returns:
|
||||
Path to final output CSV
|
||||
"""
|
||||
# Import here to allow script to be run from repo root
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from compress_adsb_to_aircraft_data import deduplicate_by_signature
|
||||
|
||||
csv_files = sorted(chunks_dir.glob("**/*.csv"))
|
||||
print(f"Found {len(csv_files)} chunk files")
|
||||
|
||||
if not csv_files:
|
||||
print("ERROR: No chunk files found", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
dfs: list[pl.DataFrame] = []
|
||||
for csv_file in csv_files:
|
||||
print(f"Loading {csv_file}")
|
||||
df = pl.read_csv(csv_file, null_values=[""])
|
||||
dfs.append(df)
|
||||
print(f" {df.height} rows")
|
||||
|
||||
df_combined = pl.concat(dfs)
|
||||
print(f"Combined: {df_combined.height} rows")
|
||||
|
||||
df_combined = deduplicate_by_signature(df_combined)
|
||||
print(f"After final dedup: {df_combined.height} rows")
|
||||
|
||||
# Sort by time
|
||||
if "time" in df_combined.columns:
|
||||
df_combined = df_combined.sort("time")
|
||||
|
||||
# Convert list columns to strings for CSV compatibility
|
||||
for col in df_combined.columns:
|
||||
if df_combined[col].dtype == pl.List:
|
||||
df_combined = df_combined.with_columns(
|
||||
pl.col(col).list.join(",").alias(col)
|
||||
)
|
||||
|
||||
# Write output
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
output_path = output_dir / f"planequery_aircraft_adsb_{start_date}_{end_date}.csv"
|
||||
|
||||
df_combined.write_csv(output_path)
|
||||
print(f"Wrote final output: {output_path}")
|
||||
print(f"Total records: {df_combined.height}")
|
||||
|
||||
return output_path
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point for GitHub Actions."""
|
||||
start_date = os.environ.get("GLOBAL_START_DATE")
|
||||
end_date = os.environ.get("GLOBAL_END_DATE")
|
||||
|
||||
if not start_date or not end_date:
|
||||
print("ERROR: GLOBAL_START_DATE and GLOBAL_END_DATE must be set", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
chunks_dir = Path("chunks")
|
||||
output_dir = Path("data/planequery_aircraft")
|
||||
|
||||
combine_chunks(chunks_dir, output_dir, start_date, end_date)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,62 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate date chunk matrix for historical ADS-B processing."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
def generate_chunks(start_date: str, end_date: str, chunk_days: int) -> list[dict]:
|
||||
"""Generate date chunks for parallel processing.
|
||||
|
||||
Args:
|
||||
start_date: Start date in YYYY-MM-DD format
|
||||
end_date: End date in YYYY-MM-DD format
|
||||
chunk_days: Number of days per chunk
|
||||
|
||||
Returns:
|
||||
List of chunk dictionaries with start_date and end_date
|
||||
"""
|
||||
start = datetime.strptime(start_date, "%Y-%m-%d")
|
||||
end = datetime.strptime(end_date, "%Y-%m-%d")
|
||||
|
||||
chunks = []
|
||||
current = start
|
||||
|
||||
while current <= end:
|
||||
chunk_end = min(current + timedelta(days=chunk_days - 1), end)
|
||||
chunks.append({
|
||||
"start_date": current.strftime("%Y-%m-%d"),
|
||||
"end_date": chunk_end.strftime("%Y-%m-%d"),
|
||||
})
|
||||
current = chunk_end + timedelta(days=1)
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point for GitHub Actions."""
|
||||
start_date = os.environ.get("INPUT_START_DATE")
|
||||
end_date = os.environ.get("INPUT_END_DATE")
|
||||
chunk_days = int(os.environ.get("INPUT_CHUNK_DAYS", "7"))
|
||||
|
||||
if not start_date or not end_date:
|
||||
print("ERROR: INPUT_START_DATE and INPUT_END_DATE must be set", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
chunks = generate_chunks(start_date, end_date, chunk_days)
|
||||
print(f"Generated {len(chunks)} chunks for {start_date} to {end_date}")
|
||||
|
||||
# Write to GitHub Actions output
|
||||
github_output = os.environ.get("GITHUB_OUTPUT")
|
||||
if github_output:
|
||||
with open(github_output, "a") as f:
|
||||
f.write(f"chunks={json.dumps(chunks)}\n")
|
||||
else:
|
||||
# For local testing, just print
|
||||
print(json.dumps(chunks, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,91 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Process a single date chunk for historical ADS-B data."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent directory to path for imports when run from repo root
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
|
||||
def process_chunk(start_date: str, end_date: str, output_dir: Path) -> Path | None:
|
||||
"""Process a date range and output compressed CSV.
|
||||
|
||||
Args:
|
||||
start_date: Start date in YYYY-MM-DD format
|
||||
end_date: End date in YYYY-MM-DD format
|
||||
output_dir: Directory to write output CSV
|
||||
|
||||
Returns:
|
||||
Path to output CSV, or None if no data
|
||||
"""
|
||||
from compress_adsb_to_aircraft_data import (
|
||||
load_historical_for_day,
|
||||
deduplicate_by_signature,
|
||||
)
|
||||
import polars as pl
|
||||
|
||||
start = datetime.strptime(start_date, "%Y-%m-%d")
|
||||
end = datetime.strptime(end_date, "%Y-%m-%d")
|
||||
|
||||
total_days = (end - start).days + 1
|
||||
print(f"Processing {total_days} days [{start_date}, {end_date}]")
|
||||
|
||||
dfs: list[pl.DataFrame] = []
|
||||
current_date = start
|
||||
|
||||
while current_date <= end:
|
||||
day_str = current_date.strftime("%Y-%m-%d")
|
||||
print(f" Loading {day_str}...")
|
||||
|
||||
try:
|
||||
df_compressed = load_historical_for_day(current_date)
|
||||
if df_compressed.height > 0:
|
||||
dfs.append(df_compressed)
|
||||
total_rows = sum(df.height for df in dfs)
|
||||
print(f" +{df_compressed.height} rows (total: {total_rows})")
|
||||
except Exception as e:
|
||||
print(f" Warning: Failed to load {day_str}: {e}")
|
||||
|
||||
current_date += timedelta(days=1)
|
||||
|
||||
if not dfs:
|
||||
print("No data found for this chunk")
|
||||
return None
|
||||
|
||||
df_accumulated = pl.concat(dfs)
|
||||
df_accumulated = deduplicate_by_signature(df_accumulated)
|
||||
print(f"After dedup: {df_accumulated.height} rows")
|
||||
|
||||
# Write output
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
output_path = output_dir / f"chunk_{start_date}_{end_date}.csv"
|
||||
df_accumulated.write_csv(output_path)
|
||||
print(f"Wrote {output_path}")
|
||||
|
||||
return output_path
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point for GitHub Actions."""
|
||||
start_date = os.environ.get("CHUNK_START_DATE")
|
||||
end_date = os.environ.get("CHUNK_END_DATE")
|
||||
|
||||
if not start_date or not end_date:
|
||||
print("ERROR: CHUNK_START_DATE and CHUNK_END_DATE must be set", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Output to repo root data/chunks (script runs from src/adsb)
|
||||
repo_root = Path(__file__).parent.parent.parent
|
||||
output_dir = repo_root / "data" / "chunks"
|
||||
result = process_chunk(start_date, end_date, output_dir)
|
||||
|
||||
if result is None:
|
||||
print("No data produced for this chunk")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user