diff --git a/src/adsb/combine_chunks_to_csv.py b/src/adsb/combine_chunks_to_csv.py index 113a45e..6153ede 100644 --- a/src/adsb/combine_chunks_to_csv.py +++ b/src/adsb/combine_chunks_to_csv.py @@ -84,8 +84,12 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram return combined -def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame: - """Download base release and merge with new data.""" +def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.DataFrame, str | None]: + """Download base release and merge with new data. + + Returns: + Tuple of (merged_df, earliest_date_str) where earliest_date_str is None if no base release was merged + """ from src.get_latest_release import download_latest_aircraft_adsb_csv print("Downloading base ADS-B release...") @@ -100,6 +104,19 @@ def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame base_df = pl.read_csv(base_path) print(f"Base release has {len(base_df)} records") + # Extract earliest date from base release + earliest_date = None + if 'time' in base_df.columns and len(base_df) > 0: + try: + earliest_timestamp = base_df['time'].min() + if earliest_timestamp: + # Parse timestamp and extract date + earliest_dt = datetime.fromisoformat(str(earliest_timestamp).replace('Z', '+00:00')) + earliest_date = earliest_dt.strftime('%Y-%m-%d') + print(f"Earliest date in base release: {earliest_date}") + except Exception as e: + print(f"Could not extract earliest date from base release: {e}") + # Ensure columns match base_cols = set(base_df.columns) new_cols = set(compressed_df.columns) @@ -126,15 +143,15 @@ def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame del base_df, combined gc.collect() - return deduplicated + return deduplicated, earliest_date else: print(f"No base release found at {base_path}, using only new data") - return compressed_df + return compressed_df, None except Exception as e: import traceback print(f"Failed to download base release: {e}") traceback.print_exc() - return compressed_df + return compressed_df, None def cleanup_chunks(output_id: str, chunks_dir: str): @@ -221,8 +238,15 @@ def main(): print(f"After combining: {get_resource_usage()}") # Merge with base release (unless skipped) + base_start_date = None if not args.skip_base: - combined = download_and_merge_base_release(combined) + combined, base_start_date = download_and_merge_base_release(combined) + + # Update filename if we merged with base release and got a start date + if base_start_date and not (args.start_date and args.end_date): + # Only update filename for daily mode when base was merged + output_filename = f"openairframes_adsb_{base_start_date}_{date_str}.csv.gz" + print(f"Updated filename to reflect date range: {output_filename}") # Convert list columns to strings for CSV compatibility for col in combined.columns: