From 11ed7e597daf19b3a5ae9505b24429371599f502 Mon Sep 17 00:00:00 2001 From: ggman12 Date: Tue, 17 Feb 2026 12:48:01 -0500 Subject: [PATCH] delete unused code --- src/adsb/download_adsb_data_to_parquet.py | 271 +--------------------- src/create_daily_adsb_release.py | 84 ------- 2 files changed, 8 insertions(+), 347 deletions(-) delete mode 100644 src/create_daily_adsb_release.py diff --git a/src/adsb/download_adsb_data_to_parquet.py b/src/adsb/download_adsb_data_to_parquet.py index 59c4e8d..f082b68 100644 --- a/src/adsb/download_adsb_data_to_parquet.py +++ b/src/adsb/download_adsb_data_to_parquet.py @@ -1,31 +1,21 @@ """ Downloads adsb.lol data and writes to Parquet files. -Usage: - python -m src.process_historical_adsb_data.download_to_parquet 2025-01-01 2025-01-02 - -This will download trace data for the specified date range and output Parquet files. - -This file is self-contained and does not import from other project modules. +This file contains utility functions for downloading and processing adsb.lol trace data. +Used by the historical ADS-B processing pipeline. """ -import gc -import glob +import datetime as dt import gzip +import os +import re import resource import shutil -import sys -import logging -import time -import re import signal -import concurrent.futures import subprocess -import os -import argparse -import datetime as dt -from datetime import datetime, timedelta, timezone -import urllib.request +import sys import urllib.error +import urllib.request +from datetime import datetime import orjson import pyarrow as pa @@ -404,8 +394,6 @@ COLUMNS = [ OS_CPU_COUNT = os.cpu_count() or 1 MAX_WORKERS = OS_CPU_COUNT if OS_CPU_COUNT > 4 else 1 -CHUNK_SIZE = MAX_WORKERS * 500 # Reduced for lower RAM usage -BATCH_SIZE = 250_000 # Fixed size for predictable memory usage (~500MB per batch) # PyArrow schema for efficient Parquet writing PARQUET_SCHEMA = pa.schema([ @@ -493,217 +481,6 @@ def collect_trace_files_with_find(root_dir): return trace_dict -def generate_version_dates(start_date: str, end_date: str) -> list: - """Generate a list of dates from start_date to end_date inclusive.""" - start = datetime.strptime(start_date, "%Y-%m-%d") - end = datetime.strptime(end_date, "%Y-%m-%d") - delta = end - start - return [start + timedelta(days=i) for i in range(delta.days + 1)] - - -def safe_process(fp): - """Safely process a file, returning empty list on error.""" - try: - return process_file(fp) - except Exception as e: - logging.error(f"Error processing {fp}: {e}") - return [] - - -def rows_to_arrow_table(rows: list) -> pa.Table: - """Convert list of rows to a PyArrow Table directly (no pandas).""" - # Transpose rows into columns - columns = list(zip(*rows)) - - # Build arrays for each column according to schema - arrays = [] - for i, field in enumerate(PARQUET_SCHEMA): - col_data = list(columns[i]) if i < len(columns) else [None] * len(rows) - arrays.append(pa.array(col_data, type=field.type)) - - return pa.Table.from_arrays(arrays, schema=PARQUET_SCHEMA) - - -def write_batch_to_parquet(rows: list, version_date: str, batch_idx: int): - """Write a batch of rows to a Parquet file.""" - if not rows: - return - - table = rows_to_arrow_table(rows) - - parquet_path = os.path.join(PARQUET_DIR, f"{version_date}_batch_{batch_idx:04d}.parquet") - - pq.write_table(table, parquet_path, compression='snappy') - - print(f"Written parquet batch {batch_idx} ({len(rows)} rows) | {get_resource_usage()}") - - -def merge_parquet_files(version_date: str, delete_batches: bool = True): - """Merge all batch parquet files for a version_date into a single file using streaming.""" - pattern = os.path.join(PARQUET_DIR, f"{version_date}_batch_*.parquet") - batch_files = sorted(glob.glob(pattern)) - - if not batch_files: - print(f"No batch files found for {version_date}") - return None - - print(f"Merging {len(batch_files)} batch files for {version_date} (streaming)...") - - merged_path = os.path.join(PARQUET_DIR, f"{version_date}.parquet") - total_rows = 0 - - # Stream write: read one batch at a time to minimize RAM usage - writer = None - try: - for i, f in enumerate(batch_files): - table = pq.read_table(f) - total_rows += table.num_rows - - if writer is None: - writer = pq.ParquetWriter(merged_path, table.schema, compression='snappy') - - writer.write_table(table) - - # Delete batch file immediately after reading to free disk space - if delete_batches: - os.remove(f) - - # Free memory - del table - if (i + 1) % 10 == 0: - gc.collect() - print(f" Merged {i + 1}/{len(batch_files)} batches... | {get_resource_usage()}") - finally: - if writer is not None: - writer.close() - - print(f"Merged parquet file written to {merged_path} ({total_rows} total rows) | {get_resource_usage()}") - - if delete_batches: - print(f"Deleted {len(batch_files)} batch files during merge") - - gc.collect() - return merged_path - - -def process_version_date(version_date: str, keep_folders: bool = False): - """Download, extract, and process trace files for a single version date.""" - print(f"\nProcessing version_date: {version_date}") - extract_dir = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0.tar_0") - - def collect_trace_files_for_version_date(vd): - releases = fetch_releases(vd) - if len(releases) == 0: - print(f"No releases found for {vd}.") - return None - - # Prefer non-tmp releases; only use tmp if no normal releases exist - normal_releases = [r for r in releases if "tmp" not in r["tag_name"]] - tmp_releases = [r for r in releases if "tmp" in r["tag_name"]] - releases = normal_releases if normal_releases else tmp_releases - print(f"Using {'normal' if normal_releases else 'tmp'} releases ({len(releases)} found)") - - downloaded_files = [] - for release in releases: - tag_name = release["tag_name"] - print(f"Processing release: {tag_name}") - - # Only download prod-0 if available, else prod-0tmp - assets = release.get("assets", []) - normal_assets = [ - a for a in assets - if "planes-readsb-prod-0." in a["name"] and "tmp" not in a["name"] - ] - tmp_assets = [ - a for a in assets - if "planes-readsb-prod-0tmp" in a["name"] - ] - use_assets = normal_assets if normal_assets else tmp_assets - - for asset in use_assets: - asset_name = asset["name"] - asset_url = asset["browser_download_url"] - file_path = os.path.join(OUTPUT_DIR, asset_name) - result = download_asset(asset_url, file_path) - if result: - downloaded_files.append(file_path) - - extract_split_archive(downloaded_files, extract_dir) - return collect_trace_files_with_find(extract_dir) - - # Check if files already exist - pattern = os.path.join(OUTPUT_DIR, f"{version_date}-planes-readsb-prod-0*") - matches = [p for p in glob.glob(pattern) if os.path.isfile(p)] - - if matches: - print(f"Found existing files for {version_date}:") - # Prefer non-tmp slices when reusing existing files - normal_matches = [ - p for p in matches - if "-planes-readsb-prod-0." in os.path.basename(p) - and "tmp" not in os.path.basename(p) - ] - downloaded_files = normal_matches if normal_matches else matches - - extract_split_archive(downloaded_files, extract_dir) - trace_files = collect_trace_files_with_find(extract_dir) - else: - trace_files = collect_trace_files_for_version_date(version_date) - - if trace_files is None or len(trace_files) == 0: - print(f"No trace files found for version_date: {version_date}") - return 0 - - file_list = list(trace_files.values()) - - start_time = time.perf_counter() - total_num_rows = 0 - batch_rows = [] - batch_idx = 0 - - # Process files in chunks - for offset in range(0, len(file_list), CHUNK_SIZE): - chunk = file_list[offset:offset + CHUNK_SIZE] - with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_WORKERS) as process_executor: - for rows in process_executor.map(safe_process, chunk): - if not rows: - continue - batch_rows.extend(rows) - - if len(batch_rows) >= BATCH_SIZE: - total_num_rows += len(batch_rows) - write_batch_to_parquet(batch_rows, version_date, batch_idx) - batch_idx += 1 - batch_rows = [] - - elapsed = time.perf_counter() - start_time - speed = total_num_rows / elapsed if elapsed > 0 else 0 - print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)") - - gc.collect() - - # Final batch - if batch_rows: - total_num_rows += len(batch_rows) - write_batch_to_parquet(batch_rows, version_date, batch_idx) - elapsed = time.perf_counter() - start_time - speed = total_num_rows / elapsed if elapsed > 0 else 0 - print(f"[{version_date}] processed {total_num_rows} rows in {elapsed:.2f}s ({speed:.2f} rows/s)") - - print(f"Total rows processed for version_date {version_date}: {total_num_rows}") - - # Clean up extracted directory immediately after processing (before merging parquet files) - if not keep_folders and os.path.isdir(extract_dir): - print(f"Deleting extraction directory with 100,000+ files: {extract_dir}") - shutil.rmtree(extract_dir) - print(f"Successfully deleted extraction directory: {extract_dir} | {get_resource_usage()}") - - # Merge batch files into a single parquet file - merge_parquet_files(version_date, delete_batches=True) - - return total_num_rows - - def create_parquet_for_day(day, keep_folders: bool = False): """Create parquet file for a single day. @@ -734,35 +511,3 @@ def create_parquet_for_day(day, keep_folders: bool = False): return parquet_path else: return None - - -def main(start_date: str, end_date: str, keep_folders: bool = False): - """Main function to download and convert adsb.lol data to Parquet.""" - version_dates = [f"v{date.strftime('%Y.%m.%d')}" for date in generate_version_dates(start_date, end_date)] - print(f"Processing dates: {version_dates}") - - total_rows_all = 0 - for version_date in version_dates: - rows_processed = process_version_date(version_date, keep_folders) - total_rows_all += rows_processed - - print(f"\n=== Summary ===") - print(f"Total dates processed: {len(version_dates)}") - print(f"Total rows written to Parquet: {total_rows_all}") - print(f"Parquet files location: {PARQUET_DIR}") - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO, stream=sys.stdout, force=True) - - parser = argparse.ArgumentParser( - description="Download adsb.lol data and write to Parquet files" - ) - parser.add_argument("start_date", type=str, help="Start date in YYYY-MM-DD format") - parser.add_argument("end_date", type=str, help="End date in YYYY-MM-DD format") - parser.add_argument("--keep-folders", action="store_true", - help="Keep extracted folders after processing") - - args = parser.parse_args() - - main(args.start_date, args.end_date, args.keep_folders) diff --git a/src/create_daily_adsb_release.py b/src/create_daily_adsb_release.py deleted file mode 100644 index 887ea97..0000000 --- a/src/create_daily_adsb_release.py +++ /dev/null @@ -1,84 +0,0 @@ -from pathlib import Path -from datetime import datetime, timezone, timedelta -import sys - -import polars as pl - -# Add adsb directory to path -sys.path.insert(0, str(Path(__file__).parent / "adsb")) # TODO: Fix this hacky path manipulation - -from adsb.compress_adsb_to_aircraft_data import ( - load_historical_for_day, - concat_compressed_dfs, - get_latest_aircraft_adsb_csv_df, -) - -if __name__ == '__main__': - # Get yesterday's date (data for the previous day) - day = datetime.now(timezone.utc) - timedelta(days=1) - - # Find a day with complete data - max_attempts = 2 # Don't look back more than a week - for attempt in range(max_attempts): - date_str = day.strftime("%Y-%m-%d") - print(f"Processing ADS-B data for {date_str}") - - print("Loading new ADS-B data...") - df_new = load_historical_for_day(day) - if df_new.height == 0: - day = day - timedelta(days=1) - continue - max_time = df_new['time'].max() - if max_time is not None: - # Handle timezone - max_time_dt = max_time - if hasattr(max_time_dt, 'replace'): - max_time_dt = max_time_dt.replace(tzinfo=timezone.utc) - - end_of_day = day.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) - timedelta(minutes=5) - - # Convert polars datetime to python datetime if needed - if isinstance(max_time_dt, datetime): - if max_time_dt.replace(tzinfo=timezone.utc) >= end_of_day: - break - else: - # Polars returns python datetime already - if max_time >= day.replace(hour=23, minute=54, second=59): - break - - print(f"WARNING: Latest data time is {max_time}, which is more than 5 minutes before end of day.") - day = day - timedelta(days=1) - else: - raise RuntimeError(f"Could not find complete data in the last {max_attempts} days") - - try: - # Get the latest release data - print("Downloading latest ADS-B release...") - df_base, start_date_str = get_latest_aircraft_adsb_csv_df() - # Combine with historical data - print("Combining with historical data...") - df_combined = concat_compressed_dfs(df_base, df_new) - except Exception as e: - print(f"Error downloading latest ADS-B release: {e}") - df_combined = df_new - start_date_str = date_str - - # Sort by time for consistent ordering - df_combined = df_combined.sort('time') - - # Convert any list columns to strings for CSV compatibility - for col in df_combined.columns: - if df_combined[col].dtype == pl.List: - df_combined = df_combined.with_columns( - pl.col(col).list.join(",").alias(col) - ) - - # Save the result - OUT_ROOT = Path("data/openairframes") - OUT_ROOT.mkdir(parents=True, exist_ok=True) - - output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv.gz" - df_combined.write_csv(output_file) - - print(f"Saved: {output_file}") - print(f"Total aircraft: {df_combined.height}")