diff --git a/.github/workflows/historical-adsb.yaml b/.github/workflows/historical-adsb.yaml new file mode 100644 index 0000000..db85e99 --- /dev/null +++ b/.github/workflows/historical-adsb.yaml @@ -0,0 +1,128 @@ +name: Historical ADS-B Processing + +on: + workflow_dispatch: + inputs: + start_date: + description: 'Start date (YYYY-MM-DD, inclusive)' + required: true + type: string + end_date: + description: 'End date (YYYY-MM-DD, inclusive)' + required: true + type: string + chunk_days: + description: 'Days per job chunk (default: 7)' + required: false + type: number + default: 7 + +jobs: + generate-matrix: + runs-on: ubuntu-latest + outputs: + chunks: ${{ steps.generate.outputs.chunks }} + global_start: ${{ inputs.start_date }} + global_end: ${{ inputs.end_date }} + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Generate date chunks + id: generate + env: + INPUT_START_DATE: ${{ inputs.start_date }} + INPUT_END_DATE: ${{ inputs.end_date }} + INPUT_CHUNK_DAYS: ${{ inputs.chunk_days }} + run: python src/adsb/historical_generate_matrix.py + + process-chunk: + needs: generate-matrix + runs-on: ubuntu-latest + strategy: + matrix: + chunk: ${{ fromJson(needs.generate-matrix.outputs.chunks) }} + max-parallel: 3 + fail-fast: false + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install polars pyarrow orjson zstandard + + - name: Free disk space + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf /opt/ghc + sudo rm -rf /usr/local/share/boost + df -h + + - name: Process date range + env: + CHUNK_START_DATE: ${{ matrix.chunk.start_date }} + CHUNK_END_DATE: ${{ matrix.chunk.end_date }} + working-directory: src/adsb + run: python historical_process_chunk.py + + - name: Upload chunk artifact + uses: actions/upload-artifact@v4 + with: + name: chunk-${{ matrix.chunk.start_date }}-${{ matrix.chunk.end_date }} + path: data/chunks/*.csv + retention-days: 1 + if-no-files-found: ignore + + combine-chunks: + needs: [generate-matrix, process-chunk] + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install polars + + - name: Download all chunk artifacts + uses: actions/download-artifact@v4 + with: + path: chunks + pattern: chunk-* + merge-multiple: true + + - name: List downloaded chunks + run: | + echo "Downloaded chunks:" + find chunks -name "*.csv" -type f 2>/dev/null || echo "No CSV files found" + + - name: Combine chunks + env: + GLOBAL_START_DATE: ${{ needs.generate-matrix.outputs.global_start }} + GLOBAL_END_DATE: ${{ needs.generate-matrix.outputs.global_end }} + run: python src/adsb/historical_combine_chunks.py + + - name: Upload final artifact + uses: actions/upload-artifact@v4 + with: + name: planequery_aircraft_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }} + path: data/planequery_aircraft/*.csv + retention-days: 30 diff --git a/src/adsb/historical_combine_chunks.py b/src/adsb/historical_combine_chunks.py new file mode 100644 index 0000000..06ce5c6 --- /dev/null +++ b/src/adsb/historical_combine_chunks.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +"""Combine processed chunks into final historical ADS-B release.""" + +import os +import sys +from pathlib import Path + +import polars as pl + + +def combine_chunks(chunks_dir: Path, output_dir: Path, start_date: str, end_date: str) -> Path: + """Combine all chunk CSVs into final output. + + Args: + chunks_dir: Directory containing chunk CSV files + output_dir: Directory to write final output + start_date: Global start date for filename + end_date: Global end date for filename + + Returns: + Path to final output CSV + """ + # Import here to allow script to be run from repo root + sys.path.insert(0, str(Path(__file__).parent)) + from compress_adsb_to_aircraft_data import deduplicate_by_signature + + csv_files = sorted(chunks_dir.glob("**/*.csv")) + print(f"Found {len(csv_files)} chunk files") + + if not csv_files: + print("ERROR: No chunk files found", file=sys.stderr) + sys.exit(1) + + dfs: list[pl.DataFrame] = [] + for csv_file in csv_files: + print(f"Loading {csv_file}") + df = pl.read_csv(csv_file, null_values=[""]) + dfs.append(df) + print(f" {df.height} rows") + + df_combined = pl.concat(dfs) + print(f"Combined: {df_combined.height} rows") + + df_combined = deduplicate_by_signature(df_combined) + print(f"After final dedup: {df_combined.height} rows") + + # Sort by time + if "time" in df_combined.columns: + df_combined = df_combined.sort("time") + + # Convert list columns to strings for CSV compatibility + for col in df_combined.columns: + if df_combined[col].dtype == pl.List: + df_combined = df_combined.with_columns( + pl.col(col).list.join(",").alias(col) + ) + + # Write output + output_dir.mkdir(parents=True, exist_ok=True) + output_path = output_dir / f"planequery_aircraft_adsb_{start_date}_{end_date}.csv" + + df_combined.write_csv(output_path) + print(f"Wrote final output: {output_path}") + print(f"Total records: {df_combined.height}") + + return output_path + + +def main() -> None: + """Main entry point for GitHub Actions.""" + start_date = os.environ.get("GLOBAL_START_DATE") + end_date = os.environ.get("GLOBAL_END_DATE") + + if not start_date or not end_date: + print("ERROR: GLOBAL_START_DATE and GLOBAL_END_DATE must be set", file=sys.stderr) + sys.exit(1) + + chunks_dir = Path("chunks") + output_dir = Path("data/planequery_aircraft") + + combine_chunks(chunks_dir, output_dir, start_date, end_date) + + +if __name__ == "__main__": + main() diff --git a/src/adsb/historical_generate_matrix.py b/src/adsb/historical_generate_matrix.py new file mode 100644 index 0000000..3a687e5 --- /dev/null +++ b/src/adsb/historical_generate_matrix.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +"""Generate date chunk matrix for historical ADS-B processing.""" + +import json +import os +import sys +from datetime import datetime, timedelta + + +def generate_chunks(start_date: str, end_date: str, chunk_days: int) -> list[dict]: + """Generate date chunks for parallel processing. + + Args: + start_date: Start date in YYYY-MM-DD format + end_date: End date in YYYY-MM-DD format + chunk_days: Number of days per chunk + + Returns: + List of chunk dictionaries with start_date and end_date + """ + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + + chunks = [] + current = start + + while current <= end: + chunk_end = min(current + timedelta(days=chunk_days - 1), end) + chunks.append({ + "start_date": current.strftime("%Y-%m-%d"), + "end_date": chunk_end.strftime("%Y-%m-%d"), + }) + current = chunk_end + timedelta(days=1) + + return chunks + + +def main() -> None: + """Main entry point for GitHub Actions.""" + start_date = os.environ.get("INPUT_START_DATE") + end_date = os.environ.get("INPUT_END_DATE") + chunk_days = int(os.environ.get("INPUT_CHUNK_DAYS", "7")) + + if not start_date or not end_date: + print("ERROR: INPUT_START_DATE and INPUT_END_DATE must be set", file=sys.stderr) + sys.exit(1) + + chunks = generate_chunks(start_date, end_date, chunk_days) + print(f"Generated {len(chunks)} chunks for {start_date} to {end_date}") + + # Write to GitHub Actions output + github_output = os.environ.get("GITHUB_OUTPUT") + if github_output: + with open(github_output, "a") as f: + f.write(f"chunks={json.dumps(chunks)}\n") + else: + # For local testing, just print + print(json.dumps(chunks, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/src/adsb/historical_process_chunk.py b/src/adsb/historical_process_chunk.py new file mode 100644 index 0000000..f5dbe1e --- /dev/null +++ b/src/adsb/historical_process_chunk.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +"""Process a single date chunk for historical ADS-B data.""" + +import os +import sys +from datetime import datetime, timedelta +from pathlib import Path + +# Add parent directory to path for imports when run from repo root +sys.path.insert(0, str(Path(__file__).parent)) + + +def process_chunk(start_date: str, end_date: str, output_dir: Path) -> Path | None: + """Process a date range and output compressed CSV. + + Args: + start_date: Start date in YYYY-MM-DD format + end_date: End date in YYYY-MM-DD format + output_dir: Directory to write output CSV + + Returns: + Path to output CSV, or None if no data + """ + from compress_adsb_to_aircraft_data import ( + load_historical_for_day, + deduplicate_by_signature, + ) + import polars as pl + + start = datetime.strptime(start_date, "%Y-%m-%d") + end = datetime.strptime(end_date, "%Y-%m-%d") + + total_days = (end - start).days + 1 + print(f"Processing {total_days} days [{start_date}, {end_date}]") + + dfs: list[pl.DataFrame] = [] + current_date = start + + while current_date <= end: + day_str = current_date.strftime("%Y-%m-%d") + print(f" Loading {day_str}...") + + try: + df_compressed = load_historical_for_day(current_date) + if df_compressed.height > 0: + dfs.append(df_compressed) + total_rows = sum(df.height for df in dfs) + print(f" +{df_compressed.height} rows (total: {total_rows})") + except Exception as e: + print(f" Warning: Failed to load {day_str}: {e}") + + current_date += timedelta(days=1) + + if not dfs: + print("No data found for this chunk") + return None + + df_accumulated = pl.concat(dfs) + df_accumulated = deduplicate_by_signature(df_accumulated) + print(f"After dedup: {df_accumulated.height} rows") + + # Write output + output_dir.mkdir(parents=True, exist_ok=True) + output_path = output_dir / f"chunk_{start_date}_{end_date}.csv" + df_accumulated.write_csv(output_path) + print(f"Wrote {output_path}") + + return output_path + + +def main() -> None: + """Main entry point for GitHub Actions.""" + start_date = os.environ.get("CHUNK_START_DATE") + end_date = os.environ.get("CHUNK_END_DATE") + + if not start_date or not end_date: + print("ERROR: CHUNK_START_DATE and CHUNK_END_DATE must be set", file=sys.stderr) + sys.exit(1) + + # Output to repo root data/chunks (script runs from src/adsb) + repo_root = Path(__file__).parent.parent.parent + output_dir = repo_root / "data" / "chunks" + result = process_chunk(start_date, end_date, output_dir) + + if result is None: + print("No data produced for this chunk") + sys.exit(0) + + +if __name__ == "__main__": + main()