delete unused code

This commit is contained in:
ggman12
2026-02-17 12:48:01 -05:00
parent 24c0fc970c
commit 11ed7e597d
2 changed files with 8 additions and 347 deletions
+8 -263
View File
@@ -1,31 +1,21 @@
"""
Downloads adsb.lol data and writes to Parquet files.
Usage:
python -m src.process_historical_adsb_data.download_to_parquet 2025-01-01 2025-01-02
This will download trace data for the specified date range and output Parquet files.
This file is self-contained and does not import from other project modules.
This file contains utility functions for downloading and processing adsb.lol trace data.
Used by the historical ADS-B processing pipeline.
"""
import gc
import glob
import datetime as dt
import gzip
import os
import re
import resource
import shutil
import sys
import logging
import time
import re
import signal
import concurrent.futures
import subprocess
import os
import argparse
import datetime as dt
from datetime import datetime, timedelta, timezone
import urllib.request
import sys
import urllib.error
import urllib.request
from datetime import datetime
import orjson
import pyarrow as pa
@@ -404,8 +394,6 @@ COLUMNS = [
OS_CPU_COUNT = os.cpu_count() or 1
MAX_WORKERS = OS_CPU_COUNT if OS_CPU_COUNT > 4 else 1
CHUNK_SIZE = MAX_WORKERS * 500 # Reduced for lower RAM usage
BATCH_SIZE = 250_000 # Fixed size for predictable memory usage (~500MB per batch)
# PyArrow schema for efficient Parquet writing
PARQUET_SCHEMA = pa.schema([
@@ -493,217 +481,6 @@ def collect_trace_files_with_find(root_dir):
return trace_dict
def generate_version_dates(start_date: str, end_date: str) -> list:
"""Generate a list of dates from start_date to end_date inclusive."""
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
delta = end - start
return [start + timedelta(days=i) for i in range(delta.days + 1)]
def safe_process(fp):
"""Safely process a file, returning empty list on error."""
try:
return process_file(fp)
except Exception as e:
logging.error(f"Error processing {fp}: {e}")
return []
def rows_to_arrow_table(rows: list) -> pa.Table:
"""Convert list of rows to a PyArrow Table directly (no pandas)."""
# Transpose rows into columns
columns = list(zip(*rows))
# Build arrays for each column according to schema
arrays = []
for i, field in enumerate(PARQUET_SCHEMA):
col_data = list(columns[i]) if i < len(columns) else [None] * len(rows)
arrays.append(pa.array(col_data, type=field.type))
return pa.Table.from_arrays(arrays, schema=PARQUET_SCHEMA)
def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int):
"""Write a batch of rows to a Parquet file."""
if not rows:
return
table = rows_to_arrow_table(rows)
parquet_path = os.path.join(PARQUET_DIR, f"{version_date}_batch_{batch_idx:04d}.parquet")
pq.write_table(table, parquet_path, compression='snappy')
print(f"Written parquet batch {batch_idx} ({len(rows)} rows) | {get_resource_usage()}")
def merge_parquet_files(version_date: str, delete_batches: bool = True):
"""Merge all batch parquet files for a version_date into a single file using streaming."""
pattern = os.path.join(PARQUET_DIR, f"{version_date}_batch_*.parquet")
batch_files = sorted(glob.glob(pattern))
if not batch_files:
print(f"No batch files found for {version_date}")
return None
print(f"Merging {len(batch_files)} batch files for {version_date} (streaming)...")
merged_path = os.path.join(PARQUET_DIR, f"{version_date}.parquet")
total_rows = 0
# Stream write: read one batch at a time to minimize RAM usage
writer = None
try:
for i, f in enumerate(batch_files):
table = pq.read_table(f)
total_rows += table.num_rows
if writer is None:
writer = pq.ParquetWriter(merged_path, table.schema, compression='snappy')
writer.write_table(table)
# Delete batch file immediately after reading to free disk space
if delete_batches:
os.remove(f)
# Free memory
del table
if (i + 1) % 10 == 0:
gc.collect()
print(f" Merged {i + 1}/{len(batch_files)} batches... | {get_resource_usage()}")
finally:
if writer is not None:
writer.close()
print(f"Merged parquet file written to {merged_path} ({total_rows} total rows) | {get_resource_usage()}")
if delete_batches:
print(f"Deleted {len(batch_files)} batch files during merge")
gc.collect()
return merged_path
def process_version_date(version_date: str, keep_folders: bool = False):
"""Download, extract, and process trace files for a single version date."""
print(f"\nProcessing version_date: {version_date}")
extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0")
def collect_trace_files_for_version_date(vd):
releases = fetch_releases(vd)
if len(releases) == 0:
print(f"No releases found for {vd}.")
return None
# Prefer non-tmp releases; only use tmp if no normal releases exist
normal_releases = [r for r in releases if "tmp" not in r["tag_name"]]
tmp_releases = [r for r in releases if "tmp" in r["tag_name"]]
releases = normal_releases if normal_releases else tmp_releases
print(f"Using {'normal' if normal_releases else 'tmp'} releases ({len(releases)} found)")
downloaded_files = []
for release in releases:
tag_name = release["tag_name"]
print(f"Processing release: {tag_name}")
# Only download prod-0 if available, else prod-0tmp
assets = release.get("assets", [])
normal_assets = [
a for a in assets
if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"]
]
tmp_assets = [
a for a in assets
if "planes-readsb-prod-0tmp" in a["name"]
]
use_assets = normal_assets if normal_assets else tmp_assets
for asset in use_assets:
asset_name = asset["name"]
asset_url = asset["browser_download_url"]
file_path = os.path.join(OUTPUT_DIR, asset_name)
result = download_asset(asset_url, file_path)
if result:
downloaded_files.append(file_path)
extract_split_archive(downloaded_files, extract_dir)
return collect_trace_files_with_find(extract_dir)
# Check if files already exist
pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*")
matches = [p for p in glob.glob(pattern) if os.path.isfile(p)]
if matches:
print(f"Found existing files for {version_date}:")
# Prefer non-tmp slices when reusing existing files
normal_matches = [
p for p in matches
if "-planes-readsb-prod-0." in os.path.basename(p)
and "tmp" not in os.path.basename(p)
]
downloaded_files = normal_matches if normal_matches else matches
extract_split_archive(downloaded_files, extract_dir)
trace_files = collect_trace_files_with_find(extract_dir)
else:
trace_files = collect_trace_files_for_version_date(version_date)
if trace_files is None or len(trace_files) == 0:
print(f"No trace files found for version_date: {version_date}")
return 0
file_list = list(trace_files.values())
start_time = time.perf_counter()
total_num_rows = 0
batch_rows = []
batch_idx = 0
# Process files in chunks
for offset in range(0, len(file_list), CHUNK_SIZE):
chunk = file_list[offset:offset + CHUNK_SIZE]
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as process_executor:
for rows in process_executor.map(safe_process, chunk):
if not rows:
continue
batch_rows.extend(rows)
if len(batch_rows) >= BATCH_SIZE:
total_num_rows += len(batch_rows)
write_batch_to_parquet(batch_rows, version_date, batch_idx)
batch_idx += 1
batch_rows = []
elapsed = time.perf_counter() - start_time
speed = total_num_rows / elapsed if elapsed > 0 else 0
print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)")
gc.collect()
# Final batch
if batch_rows:
total_num_rows += len(batch_rows)
write_batch_to_parquet(batch_rows, version_date, batch_idx)
elapsed = time.perf_counter() - start_time
speed = total_num_rows / elapsed if elapsed > 0 else 0
print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)")
print(f"Total rows processed for version_date {version_date}: {total_num_rows}")
# Clean up extracted directory immediately after processing (before merging parquet files)
if not keep_folders and os.path.isdir(extract_dir):
print(f"Deleting extraction directory with 100,000+ files: {extract_dir}")
shutil.rmtree(extract_dir)
print(f"Successfully deleted extraction directory: {extract_dir} | {get_resource_usage()}")
# Merge batch files into a single parquet file
merge_parquet_files(version_date, delete_batches=True)
return total_num_rows
def create_parquet_for_day(day, keep_folders: bool = False):
"""Create parquet file for a single day.
@@ -734,35 +511,3 @@ def create_parquet_for_day(day, keep_folders: bool = False):
return parquet_path
else:
return None
def main(start_date: str, end_date: str, keep_folders: bool = False):
"""Main function to download and convert adsb.lol data to Parquet."""
version_dates = [f"v{date.strftime('%Y.%m.%d')}" for date in generate_version_dates(start_date, end_date)]
print(f"Processing dates: {version_dates}")
total_rows_all = 0
for version_date in version_dates:
rows_processed = process_version_date(version_date, keep_folders)
total_rows_all += rows_processed
print(f"\n=== Summary ===")
print(f"Total dates processed: {len(version_dates)}")
print(f"Total rows written to Parquet: {total_rows_all}")
print(f"Parquet files location: {PARQUET_DIR}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout, force=True)
parser = argparse.ArgumentParser(
description="Download adsb.lol data and write to Parquet files"
)
parser.add_argument("start_date", type=str, help="Start date in YYYY-MM-DD format")
parser.add_argument("end_date", type=str, help="End date in YYYY-MM-DD format")
parser.add_argument("--keep-folders", action="store_true",
help="Keep extracted folders after processing")
args = parser.parse_args()
main(args.start_date, args.end_date, args.keep_folders)
-84
View File
@@ -1,84 +0,0 @@
from pathlib import Path
from datetime import datetime, timezone, timedelta
import sys
import polars as pl
# Add adsb directory to path
sys.path.insert(0, str(Path(__file__).parent / "adsb")) # TODO: Fix this hacky path manipulation
from adsb.compress_adsb_to_aircraft_data import (
load_historical_for_day,
concat_compressed_dfs,
get_latest_aircraft_adsb_csv_df,
)
if __name__ == '__main__':
# Get yesterday's date (data for the previous day)
day = datetime.now(timezone.utc) - timedelta(days=1)
# Find a day with complete data
max_attempts = 2 # Don't look back more than a week
for attempt in range(max_attempts):
date_str = day.strftime("%Y-%m-%d")
print(f"Processing ADS-B data for {date_str}")
print("Loading new ADS-B data...")
df_new = load_historical_for_day(day)
if df_new.height == 0:
day = day - timedelta(days=1)
continue
max_time = df_new['time'].max()
if max_time is not None:
# Handle timezone
max_time_dt = max_time
if hasattr(max_time_dt, 'replace'):
max_time_dt = max_time_dt.replace(tzinfo=timezone.utc)
end_of_day = day.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) - timedelta(minutes=5)
# Convert polars datetime to python datetime if needed
if isinstance(max_time_dt, datetime):
if max_time_dt.replace(tzinfo=timezone.utc) >= end_of_day:
break
else:
# Polars returns python datetime already
if max_time >= day.replace(hour=23, minute=54, second=59):
break
print(f"WARNING: Latest data time is {max_time}, which is more than 5 minutes before end of day.")
day = day - timedelta(days=1)
else:
raise RuntimeError(f"Could not find complete data in the last {max_attempts} days")
try:
# Get the latest release data
print("Downloading latest ADS-B release...")
df_base, start_date_str = get_latest_aircraft_adsb_csv_df()
# Combine with historical data
print("Combining with historical data...")
df_combined = concat_compressed_dfs(df_base, df_new)
except Exception as e:
print(f"Error downloading latest ADS-B release: {e}")
df_combined = df_new
start_date_str = date_str
# Sort by time for consistent ordering
df_combined = df_combined.sort('time')
# Convert any list columns to strings for CSV compatibility
for col in df_combined.columns:
if df_combined[col].dtype == pl.List:
df_combined = df_combined.with_columns(
pl.col(col).list.join(",").alias(col)
)
# Save the result
OUT_ROOT = Path("data/openairframes")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv.gz"
df_combined.write_csv(output_file)
print(f"Saved: {output_file}")
print(f"Total aircraft: {df_combined.height}")