mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-06-09 22:53:55 +02:00
stop depue that destroys previous days
This commit is contained in:
@@ -93,62 +93,44 @@ def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.Dat
|
|||||||
from src.get_latest_release import download_latest_aircraft_adsb_csv
|
from src.get_latest_release import download_latest_aircraft_adsb_csv
|
||||||
|
|
||||||
print("Downloading base ADS-B release...")
|
print("Downloading base ADS-B release...")
|
||||||
try:
|
base_path = download_latest_aircraft_adsb_csv(
|
||||||
base_path = download_latest_aircraft_adsb_csv(
|
output_dir="./data/openairframes_base"
|
||||||
output_dir="./data/openairframes_base"
|
)
|
||||||
)
|
print(f"Download returned: {base_path}")
|
||||||
print(f"Download returned: {base_path}")
|
|
||||||
|
|
||||||
if base_path and os.path.exists(str(base_path)):
|
print(f"Loading base release from {base_path}")
|
||||||
print(f"Loading base release from {base_path}")
|
|
||||||
|
|
||||||
# Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz)
|
# Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz)
|
||||||
import re
|
import re
|
||||||
filename = os.path.basename(str(base_path))
|
filename = os.path.basename(str(base_path))
|
||||||
match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename)
|
match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename)
|
||||||
earliest_date = match.group(1) if match else None
|
earliest_date = match.group(1) if match else None
|
||||||
print(f"Start date from base filename: {earliest_date}")
|
print(f"Start date from base filename: {earliest_date}")
|
||||||
|
|
||||||
# Read CSV with schema matching the new data
|
# Read CSV with schema matching the new data
|
||||||
base_df = pl.read_csv(base_path, schema=compressed_df.schema)
|
base_df = pl.read_csv(base_path, schema=compressed_df.schema)
|
||||||
print(f"Base release has {len(base_df)} records")
|
print(f"Base release has {len(base_df)} records")
|
||||||
|
|
||||||
# Ensure columns match
|
# Ensure columns match
|
||||||
base_cols = set(base_df.columns)
|
base_cols = set(base_df.columns)
|
||||||
new_cols = set(compressed_df.columns)
|
new_cols = set(compressed_df.columns)
|
||||||
print(f"Base columns: {sorted(base_cols)}")
|
print(f"Base columns: {sorted(base_cols)}")
|
||||||
print(f"New columns: {sorted(new_cols)}")
|
print(f"New columns: {sorted(new_cols)}")
|
||||||
|
|
||||||
# Add missing columns
|
# Add missing columns
|
||||||
for col in new_cols - base_cols:
|
for col in new_cols - base_cols:
|
||||||
base_df = base_df.with_columns(pl.lit(None).alias(col))
|
base_df = base_df.with_columns(pl.lit(None).alias(col))
|
||||||
for col in base_cols - new_cols:
|
for col in base_cols - new_cols:
|
||||||
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
|
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
|
||||||
|
|
||||||
# Reorder columns to match
|
# Reorder columns to match
|
||||||
compressed_df = compressed_df.select(base_df.columns)
|
compressed_df = compressed_df.select(base_df.columns)
|
||||||
|
|
||||||
# Concat and deduplicate by icao (keep new data - it comes last)
|
# Concat and deduplicate by icao (keep new data - it comes last)
|
||||||
combined = pl.concat([base_df, compressed_df])
|
combined = pl.concat([base_df, compressed_df])
|
||||||
print(f"After concat: {len(combined)} records")
|
print(f"After concat: {len(combined)} records")
|
||||||
|
|
||||||
deduplicated = combined.unique(subset=["icao"], keep="last")
|
|
||||||
|
|
||||||
print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup")
|
|
||||||
|
|
||||||
del base_df, combined
|
|
||||||
gc.collect()
|
|
||||||
|
|
||||||
return deduplicated, earliest_date
|
|
||||||
else:
|
|
||||||
print(f"No base release found at {base_path}, using only new data")
|
|
||||||
return compressed_df, None
|
|
||||||
except Exception as e:
|
|
||||||
import traceback
|
|
||||||
print(f"Failed to download base release: {e}")
|
|
||||||
traceback.print_exc()
|
|
||||||
return compressed_df, None
|
|
||||||
|
|
||||||
|
return combined, earliest_date
|
||||||
|
|
||||||
def cleanup_chunks(output_id: str, chunks_dir: str):
|
def cleanup_chunks(output_id: str, chunks_dir: str):
|
||||||
"""Delete chunk parquet files after successful merge."""
|
"""Delete chunk parquet files after successful merge."""
|
||||||
|
|||||||
Reference in New Issue
Block a user