mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-04-24 12:06:31 +02:00
update naming
This commit is contained in:
@@ -84,8 +84,12 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram
|
||||
return combined
|
||||
|
||||
|
||||
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame:
|
||||
"""Download base release and merge with new data."""
|
||||
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.DataFrame, str | None]:
|
||||
"""Download base release and merge with new data.
|
||||
|
||||
Returns:
|
||||
Tuple of (merged_df, earliest_date_str) where earliest_date_str is None if no base release was merged
|
||||
"""
|
||||
from src.get_latest_release import download_latest_aircraft_adsb_csv
|
||||
|
||||
print("Downloading base ADS-B release...")
|
||||
@@ -100,6 +104,19 @@ def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame
|
||||
base_df = pl.read_csv(base_path)
|
||||
print(f"Base release has {len(base_df)} records")
|
||||
|
||||
# Extract earliest date from base release
|
||||
earliest_date = None
|
||||
if 'time' in base_df.columns and len(base_df) > 0:
|
||||
try:
|
||||
earliest_timestamp = base_df['time'].min()
|
||||
if earliest_timestamp:
|
||||
# Parse timestamp and extract date
|
||||
earliest_dt = datetime.fromisoformat(str(earliest_timestamp).replace('Z', '+00:00'))
|
||||
earliest_date = earliest_dt.strftime('%Y-%m-%d')
|
||||
print(f"Earliest date in base release: {earliest_date}")
|
||||
except Exception as e:
|
||||
print(f"Could not extract earliest date from base release: {e}")
|
||||
|
||||
# Ensure columns match
|
||||
base_cols = set(base_df.columns)
|
||||
new_cols = set(compressed_df.columns)
|
||||
@@ -126,15 +143,15 @@ def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame
|
||||
del base_df, combined
|
||||
gc.collect()
|
||||
|
||||
return deduplicated
|
||||
return deduplicated, earliest_date
|
||||
else:
|
||||
print(f"No base release found at {base_path}, using only new data")
|
||||
return compressed_df
|
||||
return compressed_df, None
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f"Failed to download base release: {e}")
|
||||
traceback.print_exc()
|
||||
return compressed_df
|
||||
return compressed_df, None
|
||||
|
||||
|
||||
def cleanup_chunks(output_id: str, chunks_dir: str):
|
||||
@@ -221,8 +238,15 @@ def main():
|
||||
print(f"After combining: {get_resource_usage()}")
|
||||
|
||||
# Merge with base release (unless skipped)
|
||||
base_start_date = None
|
||||
if not args.skip_base:
|
||||
combined = download_and_merge_base_release(combined)
|
||||
combined, base_start_date = download_and_merge_base_release(combined)
|
||||
|
||||
# Update filename if we merged with base release and got a start date
|
||||
if base_start_date and not (args.start_date and args.end_date):
|
||||
# Only update filename for daily mode when base was merged
|
||||
output_filename = f"openairframes_adsb_{base_start_date}_{date_str}.csv.gz"
|
||||
print(f"Updated filename to reflect date range: {output_filename}")
|
||||
|
||||
# Convert list columns to strings for CSV compatibility
|
||||
for col in combined.columns:
|
||||
|
||||
Reference in New Issue
Block a user