diff --git a/.github/workflows/openairframes-daily-release.yaml b/.github/workflows/openairframes-daily-release.yaml index 9fbae9a..c0787bb 100644 --- a/.github/workflows/openairframes-daily-release.yaml +++ b/.github/workflows/openairframes-daily-release.yaml @@ -101,6 +101,51 @@ jobs: date: ${{ needs.resolve-dates.outputs.adsb_date }} concat_with_latest_csv: true + adsb-reduce: + needs: [resolve-dates, adsb-to-aircraft] + if: always() && github.event_name != 'schedule' && needs.adsb-to-aircraft.result == 'failure' + runs-on: ubuntu-24.04-arm + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: '3.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download compressed outputs + uses: actions/download-artifact@v4 + with: + pattern: adsb-compressed-${{ needs.resolve-dates.outputs.adsb_date }}-part-* + path: data/output/compressed/${{ needs.resolve-dates.outputs.adsb_date }} + merge-multiple: true + + - name: Concatenate final outputs + env: + DATE: ${{ needs.resolve-dates.outputs.adsb_date }} + CONCAT_WITH_LATEST_CSV: true + run: | + EXTRA="" + if [ "$CONCAT_WITH_LATEST_CSV" = "true" ]; then + EXTRA="--concat_with_latest_csv" + fi + python -m src.adsb.concat_parquet_to_final --date "$DATE" $EXTRA + ls -lah data/output/ || true + + - name: Upload final artifacts + uses: actions/upload-artifact@v4 + with: + name: openairframes_adsb-${{ needs.resolve-dates.outputs.adsb_date }} + path: data/output/openairframes_adsb_* + retention-days: 30 + if-no-files-found: error + build-community: runs-on: ubuntu-latest if: github.event_name != 'schedule' @@ -188,13 +233,13 @@ jobs: create-release: runs-on: ubuntu-latest - needs: [resolve-dates, build-faa, adsb-to-aircraft, build-community, build-adsbexchange-json, build-mictronics-db] + needs: [resolve-dates, build-faa, adsb-to-aircraft, adsb-reduce, build-community, build-adsbexchange-json, build-mictronics-db] if: github.event_name != 'schedule' && !cancelled() steps: - - name: Check adsb-to-aircraft status - if: needs.adsb-to-aircraft.result != 'success' + - name: Check ADS-B workflow status + if: needs.adsb-to-aircraft.result != 'success' && needs.adsb-reduce.result != 'success' run: | - echo "WARNING: adsb-to-aircraft result was '${{ needs.adsb-to-aircraft.result }}', will continue without ADS-B artifacts" + echo "WARNING: ADS-B workflow failed (adsb-to-aircraft='${{ needs.adsb-to-aircraft.result }}', adsb-reduce='${{ needs.adsb-reduce.result }}'), will continue without ADS-B artifacts" - name: Checkout for gh CLI uses: actions/checkout@v4 @@ -211,7 +256,7 @@ jobs: - name: Download ADS-B artifacts uses: actions/download-artifact@v5 - if: needs.adsb-to-aircraft.result == 'success' + if: needs.adsb-to-aircraft.result == 'success' || needs.adsb-reduce.result == 'success' continue-on-error: true with: name: openairframes_adsb-${{ needs.resolve-dates.outputs.adsb_date }} diff --git a/src/adsb/concat_parquet_to_final.py b/src/adsb/concat_parquet_to_final.py index a38b796..21b86e4 100644 --- a/src/adsb/concat_parquet_to_final.py +++ b/src/adsb/concat_parquet_to_final.py @@ -1,7 +1,7 @@ from pathlib import Path import polars as pl import argparse - +import os OUTPUT_DIR = Path("./data/output") CORRECT_ORDER_OF_COLUMNS = ["time", "icao", "r", "t", "dbFlags", "ownOp", "year", "desc", "aircraft_category"] @@ -13,26 +13,25 @@ def main(): compressed_dir = OUTPUT_DIR / "compressed" date_dir = compressed_dir / args.date - if not date_dir.is_dir(): - raise FileNotFoundError(f"No date folder found: {date_dir}") parquet_files = sorted(date_dir.glob("*.parquet")) - if not parquet_files: - raise FileNotFoundError(f"No parquet files found in {date_dir}") + df = None + if parquet_files: # TODO: This logic could be updated slightly. + print(f"No parquet files found in {date_dir}") - frames = [pl.read_parquet(p) for p in parquet_files] - df = pl.concat(frames, how="vertical", rechunk=True) + frames = [pl.read_parquet(p) for p in parquet_files] + df = pl.concat(frames, how="vertical", rechunk=True) - df = df.sort(["time", "icao"]) - df = df.select(CORRECT_ORDER_OF_COLUMNS) - - output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.parquet" - print(f"Writing combined parquet to {output_path} with {df.height} rows") - df.write_parquet(output_path) + df = df.sort(["time", "icao"]) + df = df.select(CORRECT_ORDER_OF_COLUMNS) + + output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.parquet" + print(f"Writing combined parquet to {output_path} with {df.height} rows") + df.write_parquet(output_path) - csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.csv.gz" - print(f"Writing combined csv.gz to {csv_output_path} with {df.height} rows") - df.write_csv(csv_output_path, compression="gzip") + csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}.csv.gz" + print(f"Writing combined csv.gz to {csv_output_path} with {df.height} rows") + df.write_csv(csv_output_path, compression="gzip") if args.concat_with_latest_csv: print("Loading latest CSV from GitHub releases to concatenate with...") @@ -46,9 +45,10 @@ def main(): csv_end_dt = datetime.strptime(csv_end_date, "%Y-%m-%d") args_dt = datetime.strptime(args.date, "%Y-%m-%d") - if csv_end_dt >= args_dt: + if df is None or csv_end_dt >= args_dt: print(f"Latest CSV already includes data through {args.date} (end_date={csv_end_date} is exclusive)") print("Writing latest CSV directly without concatenation to avoid duplicates") + os.makedirs(OUTPUT_DIR, exist_ok=True) final_csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{csv_start_date}_{csv_end_date}.csv.gz" df_latest_csv = df_latest_csv.select(CORRECT_ORDER_OF_COLUMNS) df_latest_csv.write_csv(final_csv_output_path, compression="gzip")