diff --git a/.github/workflows/historical-adsb.yaml b/.github/workflows/historical-adsb.yaml index 0bb99a1..456d500 100644 --- a/.github/workflows/historical-adsb.yaml +++ b/.github/workflows/historical-adsb.yaml @@ -188,17 +188,19 @@ jobs: - name: Debug downloaded files run: | + echo "=== Disk space before processing ===" + df -h echo "=== Listing data/output/adsb_chunks/ ===" - find data/output/adsb_chunks/ -type f 2>/dev/null | head -50 || echo "No files found" - echo "=== Looking for parquet files ===" - find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found" + find data/output/adsb_chunks/ -type f 2>/dev/null | wc -l + echo "=== Total parquet size ===" + du -sh data/output/adsb_chunks/ || echo "No chunks dir" - name: Combine chunks to CSV env: START_DATE: ${{ needs.generate-matrix.outputs.global_start }} END_DATE: ${{ needs.generate-matrix.outputs.global_end }} run: | - python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date "$START_DATE" --end-date "$END_DATE" --skip-base + python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date "$START_DATE" --end-date "$END_DATE" --skip-base --stream ls -lah data/planequery_aircraft/ - name: Upload final artifact diff --git a/src/adsb/combine_chunks_to_csv.py b/src/adsb/combine_chunks_to_csv.py index 2fe8b4e..9b6eaab 100644 --- a/src/adsb/combine_chunks_to_csv.py +++ b/src/adsb/combine_chunks_to_csv.py @@ -36,8 +36,13 @@ def get_target_day() -> datetime: return datetime.utcnow() - timedelta(days=1) -def process_single_chunk(chunk_path: str) -> pl.DataFrame: - """Load and compress a single chunk parquet file.""" +def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame: + """Load and compress a single chunk parquet file. + + Args: + chunk_path: Path to parquet file + delete_after_load: If True, delete the parquet file after loading to free disk space + """ print(f"Processing {os.path.basename(chunk_path)}... | {get_resource_usage()}") # Load chunk - only columns we need @@ -45,6 +50,14 @@ def process_single_chunk(chunk_path: str) -> pl.DataFrame: df = pl.read_parquet(chunk_path, columns=needed_columns) print(f" Loaded {len(df)} rows") + # Delete file immediately after loading to free disk space + if delete_after_load: + try: + os.remove(chunk_path) + print(f" Deleted {chunk_path} to free disk space") + except Exception as e: + print(f" Warning: Failed to delete {chunk_path}: {e}") + # Compress to aircraft records (one per ICAO) using shared function compressed = compress_multi_icao_df(df, verbose=True) print(f" Compressed to {len(compressed)} aircraft records") @@ -156,6 +169,7 @@ def main(): parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files") parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release") parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging") + parser.add_argument("--stream", action="store_true", help="Delete parquet files immediately after loading to save disk space") args = parser.parse_args() # Determine output ID and filename based on mode @@ -190,9 +204,10 @@ def main(): print(f"Found {len(chunk_files)} chunk files") # Process each chunk separately to save memory + # With --stream, delete parquet files immediately after loading to save disk space compressed_chunks = [] for chunk_path in chunk_files: - compressed = process_single_chunk(chunk_path) + compressed = process_single_chunk(chunk_path, delete_after_load=args.stream) compressed_chunks.append(compressed) gc.collect()