mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-05-03 16:25:08 +02:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a8b2b66952 | |||
| 3f38263a0c | |||
| 1a553d5f44 | |||
| 405855c566 | |||
| 4e81dde201 | |||
| fde8ef029c | |||
| 18ab51bd60 | |||
| 83b9d2a76d | |||
| 8874619ab0 | |||
| 823f291728 | |||
| 982011b36f | |||
| 1b15e43669 | |||
| f17adc4574 |
@@ -282,5 +282,5 @@ jobs:
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }}
|
||||
path: data/openairframes/*.csv
|
||||
path: data/openairframes/*.csv.gz
|
||||
retention-days: 30
|
||||
|
||||
@@ -227,7 +227,7 @@ jobs:
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: adsb-release
|
||||
path: data/openairframes/openairframes_adsb_*.csv
|
||||
path: data/openairframes/openairframes_adsb_*.csv.gz
|
||||
retention-days: 1
|
||||
|
||||
build-community:
|
||||
@@ -388,7 +388,7 @@ jobs:
|
||||
|
||||
# Find files from artifacts using find (handles nested structures)
|
||||
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
|
||||
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1)
|
||||
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1)
|
||||
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
|
||||
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
|
||||
JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1)
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
FROM --platform=linux/arm64 python:3.12-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.reducer.txt requirements.txt
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY compress_adsb_to_aircraft_data.py .
|
||||
COPY reducer.py .
|
||||
|
||||
CMD ["python", "-u", "reducer.py"]
|
||||
@@ -1,12 +0,0 @@
|
||||
FROM --platform=linux/arm64 python:3.12-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.worker.txt requirements.txt
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY compress_adsb_to_aircraft_data.py .
|
||||
COPY download_adsb_data_to_parquet.py .
|
||||
COPY worker.py .
|
||||
|
||||
CMD ["python", "-u", "worker.py"]
|
||||
@@ -14,11 +14,12 @@ Usage:
|
||||
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base
|
||||
"""
|
||||
import gc
|
||||
import gzip
|
||||
import os
|
||||
import sys
|
||||
import glob
|
||||
import argparse
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import polars as pl
|
||||
|
||||
@@ -33,7 +34,7 @@ os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
|
||||
|
||||
def get_target_day() -> datetime:
|
||||
"""Get yesterday's date (the day we're processing)."""
|
||||
return datetime.utcnow() - timedelta(days=1)
|
||||
return datetime.now(timezone.utc) - timedelta(days=1)
|
||||
|
||||
|
||||
def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame:
|
||||
@@ -83,58 +84,53 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram
|
||||
return combined
|
||||
|
||||
|
||||
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame:
|
||||
"""Download base release and merge with new data."""
|
||||
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.DataFrame, str | None]:
|
||||
"""Download base release and merge with new data.
|
||||
|
||||
Returns:
|
||||
Tuple of (merged_df, earliest_date_str) where earliest_date_str is None if no base release was merged
|
||||
"""
|
||||
from src.get_latest_release import download_latest_aircraft_adsb_csv
|
||||
|
||||
print("Downloading base ADS-B release...")
|
||||
try:
|
||||
base_path = download_latest_aircraft_adsb_csv(
|
||||
output_dir="./data/openairframes_base"
|
||||
)
|
||||
print(f"Download returned: {base_path}")
|
||||
|
||||
if base_path and os.path.exists(str(base_path)):
|
||||
print(f"Loading base release from {base_path}")
|
||||
base_df = pl.read_csv(base_path)
|
||||
print(f"Base release has {len(base_df)} records")
|
||||
|
||||
# Ensure columns match
|
||||
base_cols = set(base_df.columns)
|
||||
new_cols = set(compressed_df.columns)
|
||||
print(f"Base columns: {sorted(base_cols)}")
|
||||
print(f"New columns: {sorted(new_cols)}")
|
||||
|
||||
# Add missing columns
|
||||
for col in new_cols - base_cols:
|
||||
base_df = base_df.with_columns(pl.lit(None).alias(col))
|
||||
for col in base_cols - new_cols:
|
||||
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
|
||||
|
||||
# Reorder columns to match
|
||||
compressed_df = compressed_df.select(base_df.columns)
|
||||
|
||||
# Concat and deduplicate by icao (keep new data - it comes last)
|
||||
combined = pl.concat([base_df, compressed_df])
|
||||
print(f"After concat: {len(combined)} records")
|
||||
|
||||
deduplicated = combined.unique(subset=["icao"], keep="last")
|
||||
|
||||
print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup")
|
||||
|
||||
del base_df, combined
|
||||
gc.collect()
|
||||
|
||||
return deduplicated
|
||||
else:
|
||||
print(f"No base release found at {base_path}, using only new data")
|
||||
return compressed_df
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f"Failed to download base release: {e}")
|
||||
traceback.print_exc()
|
||||
return compressed_df
|
||||
base_path = download_latest_aircraft_adsb_csv(
|
||||
output_dir="./data/openairframes_base"
|
||||
)
|
||||
print(f"Download returned: {base_path}")
|
||||
|
||||
print(f"Loading base release from {base_path}")
|
||||
|
||||
# Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz)
|
||||
import re
|
||||
filename = os.path.basename(str(base_path))
|
||||
match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename)
|
||||
earliest_date = match.group(1) if match else None
|
||||
print(f"Start date from base filename: {earliest_date}")
|
||||
|
||||
# Read CSV with schema matching the new data
|
||||
base_df = pl.read_csv(base_path, schema=compressed_df.schema)
|
||||
print(f"Base release has {len(base_df)} records")
|
||||
|
||||
# Ensure columns match
|
||||
base_cols = set(base_df.columns)
|
||||
new_cols = set(compressed_df.columns)
|
||||
print(f"Base columns: {sorted(base_cols)}")
|
||||
print(f"New columns: {sorted(new_cols)}")
|
||||
|
||||
# Add missing columns
|
||||
for col in new_cols - base_cols:
|
||||
base_df = base_df.with_columns(pl.lit(None).alias(col))
|
||||
for col in base_cols - new_cols:
|
||||
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
|
||||
|
||||
# Reorder columns to match
|
||||
compressed_df = compressed_df.select(base_df.columns)
|
||||
|
||||
# Concat and deduplicate by icao (keep new data - it comes last)
|
||||
combined = pl.concat([base_df, compressed_df])
|
||||
print(f"After concat: {len(combined)} records")
|
||||
|
||||
return combined, earliest_date
|
||||
|
||||
def cleanup_chunks(output_id: str, chunks_dir: str):
|
||||
"""Delete chunk parquet files after successful merge."""
|
||||
@@ -176,7 +172,7 @@ def main():
|
||||
if args.start_date and args.end_date:
|
||||
# Historical mode
|
||||
output_id = f"{args.start_date}_{args.end_date}"
|
||||
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv"
|
||||
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv.gz"
|
||||
print(f"Combining chunks for date range: {args.start_date} to {args.end_date}")
|
||||
else:
|
||||
# Daily mode - use same date for start and end
|
||||
@@ -187,7 +183,7 @@ def main():
|
||||
|
||||
date_str = target_day.strftime("%Y-%m-%d")
|
||||
output_id = date_str
|
||||
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv"
|
||||
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv.gz"
|
||||
print(f"Combining chunks for {date_str}")
|
||||
|
||||
chunks_dir = args.chunks_dir
|
||||
@@ -220,8 +216,15 @@ def main():
|
||||
print(f"After combining: {get_resource_usage()}")
|
||||
|
||||
# Merge with base release (unless skipped)
|
||||
base_start_date = None
|
||||
if not args.skip_base:
|
||||
combined = download_and_merge_base_release(combined)
|
||||
combined, base_start_date = download_and_merge_base_release(combined)
|
||||
|
||||
# Update filename if we merged with base release and got a start date
|
||||
if base_start_date and not (args.start_date and args.end_date):
|
||||
# Only update filename for daily mode when base was merged
|
||||
output_filename = f"openairframes_adsb_{base_start_date}_{date_str}.csv.gz"
|
||||
print(f"Updated filename to reflect date range: {output_filename}")
|
||||
|
||||
# Convert list columns to strings for CSV compatibility
|
||||
for col in combined.columns:
|
||||
@@ -234,9 +237,17 @@ def main():
|
||||
if 'time' in combined.columns:
|
||||
combined = combined.sort('time')
|
||||
|
||||
# Replace empty strings with null across all string columns to avoid quoted empty strings
|
||||
for col in combined.columns:
|
||||
if combined[col].dtype == pl.Utf8:
|
||||
combined = combined.with_columns(
|
||||
pl.when(pl.col(col) == "").then(None).otherwise(pl.col(col)).alias(col)
|
||||
)
|
||||
|
||||
# Write final CSV
|
||||
output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename)
|
||||
combined.write_csv(output_path)
|
||||
with gzip.open(output_path, "wb", compresslevel=9) as f:
|
||||
combined.write_csv(f, null_value='', quote_style='necessary')
|
||||
print(f"Wrote {len(combined)} records to {output_path}")
|
||||
|
||||
# Cleanup
|
||||
|
||||
@@ -264,7 +264,7 @@ def get_latest_aircraft_adsb_csv_df():
|
||||
if df[col].dtype == pl.Utf8:
|
||||
df = df.with_columns(pl.col(col).fill_null(""))
|
||||
|
||||
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
|
||||
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
|
||||
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
|
||||
if not match:
|
||||
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
|
||||
|
||||
@@ -76,13 +76,9 @@ def timeout_handler(signum, frame):
|
||||
raise DownloadTimeoutException("Download timed out after 40 seconds")
|
||||
|
||||
|
||||
def fetch_releases(version_date: str) -> list:
|
||||
"""Fetch GitHub releases for a given version date from adsblol."""
|
||||
year = version_date.split('.')[0][1:]
|
||||
if version_date == "v2024.12.31":
|
||||
year = "2025"
|
||||
def _fetch_releases_from_repo(year: str, version_date: str) -> list:
|
||||
"""Fetch GitHub releases for a given version date from a specific year's adsblol repo."""
|
||||
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
|
||||
# Match both normal and tmp releases
|
||||
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+(tmp)?$"
|
||||
releases = []
|
||||
page = 1
|
||||
@@ -123,6 +119,25 @@ def fetch_releases(version_date: str) -> list:
|
||||
return releases
|
||||
|
||||
|
||||
def fetch_releases(version_date: str) -> list:
|
||||
"""Fetch GitHub releases for a given version date from adsblol.
|
||||
|
||||
For Dec 31 dates, if no releases are found in the current year's repo,
|
||||
also checks the next year's repo (adsblol sometimes publishes Dec 31
|
||||
data in the following year's repository).
|
||||
"""
|
||||
year = version_date.split('.')[0][1:]
|
||||
releases = _fetch_releases_from_repo(year, version_date)
|
||||
|
||||
# For last day of year, also check next year's repo if nothing found
|
||||
if not releases and version_date.endswith(".12.31"):
|
||||
next_year = str(int(year) + 1)
|
||||
print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo...")
|
||||
releases = _fetch_releases_from_repo(next_year, version_date)
|
||||
|
||||
return releases
|
||||
|
||||
|
||||
def download_asset(asset_url: str, file_path: str) -> bool:
|
||||
"""Download a single release asset."""
|
||||
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
|
||||
|
||||
@@ -1,97 +0,0 @@
|
||||
"""
|
||||
Reduce step: downloads all chunk CSVs from S3, combines them,
|
||||
deduplicates across the full dataset, and uploads the final result.
|
||||
|
||||
Environment variables:
|
||||
S3_BUCKET — bucket with intermediate results
|
||||
RUN_ID — run identifier matching the map workers
|
||||
GLOBAL_START_DATE — overall start date for output filename
|
||||
GLOBAL_END_DATE — overall end date for output filename
|
||||
"""
|
||||
import gzip
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import boto3
|
||||
import polars as pl
|
||||
|
||||
from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature
|
||||
|
||||
|
||||
def main():
|
||||
s3_bucket = os.environ["S3_BUCKET"]
|
||||
run_id = os.environ.get("RUN_ID", "default")
|
||||
global_start = os.environ["GLOBAL_START_DATE"]
|
||||
global_end = os.environ["GLOBAL_END_DATE"]
|
||||
|
||||
s3 = boto3.client("s3")
|
||||
prefix = f"intermediate/{run_id}/"
|
||||
|
||||
# List all chunk files for this run
|
||||
paginator = s3.get_paginator("list_objects_v2")
|
||||
chunk_keys = []
|
||||
for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix):
|
||||
for obj in page.get("Contents", []):
|
||||
if obj["Key"].endswith(".csv.gz"):
|
||||
chunk_keys.append(obj["Key"])
|
||||
|
||||
chunk_keys.sort()
|
||||
print(f"Found {len(chunk_keys)} chunks to combine")
|
||||
|
||||
if not chunk_keys:
|
||||
print("No chunks found — nothing to reduce.")
|
||||
return
|
||||
|
||||
# Download and concatenate all chunks
|
||||
download_dir = Path("/tmp/chunks")
|
||||
download_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
dfs = []
|
||||
|
||||
for key in chunk_keys:
|
||||
gz_path = download_dir / Path(key).name
|
||||
csv_path = gz_path.with_suffix("") # Remove .gz
|
||||
print(f"Downloading {key}...")
|
||||
s3.download_file(s3_bucket, key, str(gz_path))
|
||||
|
||||
# Decompress
|
||||
with gzip.open(gz_path, 'rb') as f_in:
|
||||
with open(csv_path, 'wb') as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
gz_path.unlink()
|
||||
|
||||
df_chunk = pl.read_csv(csv_path)
|
||||
print(f" Loaded {df_chunk.height} rows from {csv_path.name}")
|
||||
dfs.append(df_chunk)
|
||||
|
||||
# Free disk space after loading
|
||||
csv_path.unlink()
|
||||
|
||||
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
|
||||
print(f"Combined: {df_accumulated.height} rows before dedup")
|
||||
|
||||
# Final global deduplication
|
||||
df_accumulated = deduplicate_by_signature(df_accumulated)
|
||||
print(f"After dedup: {df_accumulated.height} rows")
|
||||
|
||||
# Write and upload final result
|
||||
output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz"
|
||||
csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv")
|
||||
gz_output = Path(f"/tmp/{output_name}")
|
||||
|
||||
df_accumulated.write_csv(csv_output)
|
||||
with open(csv_output, 'rb') as f_in:
|
||||
with gzip.open(gz_output, 'wb') as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
csv_output.unlink()
|
||||
|
||||
final_key = f"final/{output_name}"
|
||||
print(f"Uploading to s3://{s3_bucket}/{final_key}")
|
||||
s3.upload_file(str(gz_output), s3_bucket, final_key)
|
||||
|
||||
print(f"Final output: {df_accumulated.height} records -> {final_key}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,89 +0,0 @@
|
||||
"""
|
||||
Map worker: processes a date range chunk, uploads result to S3.
|
||||
|
||||
Environment variables:
|
||||
START_DATE — inclusive, YYYY-MM-DD
|
||||
END_DATE — exclusive, YYYY-MM-DD
|
||||
S3_BUCKET — bucket for intermediate results
|
||||
RUN_ID — unique run identifier for namespacing S3 keys
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import boto3
|
||||
import polars as pl
|
||||
|
||||
from compress_adsb_to_aircraft_data import (
|
||||
load_historical_for_day,
|
||||
deduplicate_by_signature,
|
||||
COLUMNS,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
start_date_str = os.environ["START_DATE"]
|
||||
end_date_str = os.environ["END_DATE"]
|
||||
s3_bucket = os.environ["S3_BUCKET"]
|
||||
run_id = os.environ.get("RUN_ID", "default")
|
||||
|
||||
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
|
||||
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
|
||||
|
||||
total_days = (end_date - start_date).days
|
||||
print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})")
|
||||
|
||||
dfs = []
|
||||
current_date = start_date
|
||||
|
||||
while current_date < end_date:
|
||||
day_str = current_date.strftime("%Y-%m-%d")
|
||||
print(f" Loading {day_str}...")
|
||||
|
||||
df_compressed = load_historical_for_day(current_date)
|
||||
if df_compressed.height == 0:
|
||||
raise RuntimeError(f"No data found for {day_str}")
|
||||
|
||||
dfs.append(df_compressed)
|
||||
total_rows = sum(df.height for df in dfs)
|
||||
print(f" +{df_compressed.height} rows (total: {total_rows})")
|
||||
|
||||
# Delete local cache after each day to save disk in container
|
||||
cache_dir = Path("data/adsb")
|
||||
if cache_dir.exists():
|
||||
import shutil
|
||||
shutil.rmtree(cache_dir)
|
||||
|
||||
current_date += timedelta(days=1)
|
||||
|
||||
# Concatenate all days
|
||||
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
|
||||
|
||||
# Deduplicate within this chunk
|
||||
df_accumulated = deduplicate_by_signature(df_accumulated)
|
||||
print(f"After dedup: {df_accumulated.height} rows")
|
||||
|
||||
# Write to local file then upload to S3
|
||||
local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv")
|
||||
df_accumulated.write_csv(local_path)
|
||||
|
||||
# Compress with gzip
|
||||
import gzip
|
||||
import shutil
|
||||
gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz")
|
||||
with open(local_path, 'rb') as f_in:
|
||||
with gzip.open(gz_path, 'wb') as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
local_path.unlink() # Remove uncompressed file
|
||||
|
||||
s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz"
|
||||
print(f"Uploading to s3://{s3_bucket}/{s3_key}")
|
||||
|
||||
s3 = boto3.client("s3")
|
||||
s3.upload_file(str(gz_path), s3_bucket, s3_key)
|
||||
print("Done.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -77,7 +77,7 @@ if __name__ == '__main__':
|
||||
OUT_ROOT = Path("data/openairframes")
|
||||
OUT_ROOT.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv"
|
||||
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv.gz"
|
||||
df_combined.write_csv(output_file)
|
||||
|
||||
print(f"Saved: {output_file}")
|
||||
|
||||
@@ -119,6 +119,7 @@ def download_latest_aircraft_csv(
|
||||
Returns:
|
||||
Path to the downloaded file
|
||||
"""
|
||||
output_dir = Path(output_dir)
|
||||
assets = get_latest_release_assets(repo, github_token=github_token)
|
||||
try:
|
||||
asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$")
|
||||
@@ -164,8 +165,9 @@ def download_latest_aircraft_adsb_csv(
|
||||
Returns:
|
||||
Path to the downloaded file
|
||||
"""
|
||||
output_dir = Path(output_dir)
|
||||
assets = get_latest_release_assets(repo, github_token=github_token)
|
||||
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv$")
|
||||
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
|
||||
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
|
||||
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
|
||||
return saved_to
|
||||
@@ -176,7 +178,7 @@ def get_latest_aircraft_adsb_csv_df():
|
||||
import pandas as pd
|
||||
df = pd.read_csv(csv_path)
|
||||
df = df.fillna("")
|
||||
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
|
||||
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
|
||||
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
|
||||
if not match:
|
||||
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
|
||||
|
||||
Reference in New Issue
Block a user