delete parquet chunck after load to not use so much space for big historical run

This commit is contained in:
ggman12
2026-02-12 10:52:42 -05:00
parent 1348e1f3a0
commit 99b680476a
2 changed files with 24 additions and 7 deletions
+6 -4
View File
@@ -188,17 +188,19 @@ jobs:
- name: Debug downloaded files
run: |
echo "=== Disk space before processing ==="
df -h
echo "=== Listing data/output/adsb_chunks/ ==="
find data/output/adsb_chunks/ -type f 2>/dev/null | head -50 || echo "No files found"
echo "=== Looking for parquet files ==="
find . -name "*.parquet" 2>/dev/null | head -20 || echo "No parquet files found"
find data/output/adsb_chunks/ -type f 2>/dev/null | wc -l
echo "=== Total parquet size ==="
du -sh data/output/adsb_chunks/ || echo "No chunks dir"
- name: Combine chunks to CSV
env:
START_DATE: ${{ needs.generate-matrix.outputs.global_start }}
END_DATE: ${{ needs.generate-matrix.outputs.global_end }}
run: |
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date "$START_DATE" --end-date "$END_DATE" --skip-base
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date "$START_DATE" --end-date "$END_DATE" --skip-base --stream
ls -lah data/planequery_aircraft/
- name: Upload final artifact
+18 -3
View File
@@ -36,8 +36,13 @@ def get_target_day() -> datetime:
return datetime.utcnow() - timedelta(days=1)
def process_single_chunk(chunk_path: str) -> pl.DataFrame:
"""Load and compress a single chunk parquet file."""
def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame:
"""Load and compress a single chunk parquet file.
Args:
chunk_path: Path to parquet file
delete_after_load: If True, delete the parquet file after loading to free disk space
"""
print(f"Processing {os.path.basename(chunk_path)}... | {get_resource_usage()}")
# Load chunk - only columns we need
@@ -45,6 +50,14 @@ def process_single_chunk(chunk_path: str) -> pl.DataFrame:
df = pl.read_parquet(chunk_path, columns=needed_columns)
print(f" Loaded {len(df)} rows")
# Delete file immediately after loading to free disk space
if delete_after_load:
try:
os.remove(chunk_path)
print(f" Deleted {chunk_path} to free disk space")
except Exception as e:
print(f" Warning: Failed to delete {chunk_path}: {e}")
# Compress to aircraft records (one per ICAO) using shared function
compressed = compress_multi_icao_df(df, verbose=True)
print(f" Compressed to {len(compressed)} aircraft records")
@@ -156,6 +169,7 @@ def main():
parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files")
parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release")
parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging")
parser.add_argument("--stream", action="store_true", help="Delete parquet files immediately after loading to save disk space")
args = parser.parse_args()
# Determine output ID and filename based on mode
@@ -190,9 +204,10 @@ def main():
print(f"Found {len(chunk_files)} chunk files")
# Process each chunk separately to save memory
# With --stream, delete parquet files immediately after loading to save disk space
compressed_chunks = []
for chunk_path in chunk_files:
compressed = process_single_chunk(chunk_path)
compressed = process_single_chunk(chunk_path, delete_after_load=args.stream)
compressed_chunks.append(compressed)
gc.collect()