Compare commits

...

13 Commits

Author SHA1 Message Date
ggman12 a8b2b66952 fix .csv to .csv.gz transition 2026-02-15 19:08:51 -05:00
ggman12 3f38263a0c stop depue that destroys previous days 2026-02-15 17:55:16 -05:00
ggman12 1a553d5f44 use date of file instead of min timestamp 2026-02-15 16:44:09 -05:00
ggman12 405855c566 deal with whole schema 2026-02-15 16:43:00 -05:00
ggman12 4e81dde201 fix date parsing 2026-02-15 14:55:32 -05:00
ggman12 fde8ef029c update csv writing to handle empty data. Save space with higher gzip compression 2026-02-15 14:14:54 -05:00
ggman12 18ab51bd60 update naming 2026-02-15 13:45:03 -05:00
ggman12 83b9d2a76d write gzip 2026-02-15 13:41:09 -05:00
ggman12 8874619ab0 write gzip 2026-02-15 13:41:02 -05:00
ggman12 823f291728 fix errors in daily release due to new .gz file 2026-02-15 13:21:51 -05:00
ggman12 982011b36f end of year check 2026-02-14 22:42:32 -05:00
ggman12 1b15e43669 use .csv.gz 2026-02-14 22:22:14 -05:00
ggman12 f17adc4574 remvoe aws worker, reducer 2026-02-14 22:21:14 -05:00
11 changed files with 95 additions and 276 deletions
+1 -1
View File
@@ -282,5 +282,5 @@ jobs:
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }} name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }}
path: data/openairframes/*.csv path: data/openairframes/*.csv.gz
retention-days: 30 retention-days: 30
@@ -227,7 +227,7 @@ jobs:
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: adsb-release name: adsb-release
path: data/openairframes/openairframes_adsb_*.csv path: data/openairframes/openairframes_adsb_*.csv.gz
retention-days: 1 retention-days: 1
build-community: build-community:
@@ -388,7 +388,7 @@ jobs:
# Find files from artifacts using find (handles nested structures) # Find files from artifacts using find (handles nested structures)
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1) CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1) CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1)
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1) CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1) ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1) JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1)
-11
View File
@@ -1,11 +0,0 @@
FROM --platform=linux/arm64 python:3.12-slim
WORKDIR /app
COPY requirements.reducer.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY reducer.py .
CMD ["python", "-u", "reducer.py"]
-12
View File
@@ -1,12 +0,0 @@
FROM --platform=linux/arm64 python:3.12-slim
WORKDIR /app
COPY requirements.worker.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY download_adsb_data_to_parquet.py .
COPY worker.py .
CMD ["python", "-u", "worker.py"]
+65 -54
View File
@@ -14,11 +14,12 @@ Usage:
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base
""" """
import gc import gc
import gzip
import os import os
import sys import sys
import glob import glob
import argparse import argparse
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
import polars as pl import polars as pl
@@ -33,7 +34,7 @@ os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
def get_target_day() -> datetime: def get_target_day() -> datetime:
"""Get yesterday's date (the day we're processing).""" """Get yesterday's date (the day we're processing)."""
return datetime.utcnow() - timedelta(days=1) return datetime.now(timezone.utc) - timedelta(days=1)
def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame: def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame:
@@ -83,58 +84,53 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram
return combined return combined
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame: def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.DataFrame, str | None]:
"""Download base release and merge with new data.""" """Download base release and merge with new data.
Returns:
Tuple of (merged_df, earliest_date_str) where earliest_date_str is None if no base release was merged
"""
from src.get_latest_release import download_latest_aircraft_adsb_csv from src.get_latest_release import download_latest_aircraft_adsb_csv
print("Downloading base ADS-B release...") print("Downloading base ADS-B release...")
try: base_path = download_latest_aircraft_adsb_csv(
base_path = download_latest_aircraft_adsb_csv( output_dir="./data/openairframes_base"
output_dir="./data/openairframes_base" )
) print(f"Download returned: {base_path}")
print(f"Download returned: {base_path}")
print(f"Loading base release from {base_path}")
if base_path and os.path.exists(str(base_path)):
print(f"Loading base release from {base_path}") # Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz)
base_df = pl.read_csv(base_path) import re
print(f"Base release has {len(base_df)} records") filename = os.path.basename(str(base_path))
match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename)
# Ensure columns match earliest_date = match.group(1) if match else None
base_cols = set(base_df.columns) print(f"Start date from base filename: {earliest_date}")
new_cols = set(compressed_df.columns)
print(f"Base columns: {sorted(base_cols)}") # Read CSV with schema matching the new data
print(f"New columns: {sorted(new_cols)}") base_df = pl.read_csv(base_path, schema=compressed_df.schema)
print(f"Base release has {len(base_df)} records")
# Add missing columns
for col in new_cols - base_cols: # Ensure columns match
base_df = base_df.with_columns(pl.lit(None).alias(col)) base_cols = set(base_df.columns)
for col in base_cols - new_cols: new_cols = set(compressed_df.columns)
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col)) print(f"Base columns: {sorted(base_cols)}")
print(f"New columns: {sorted(new_cols)}")
# Reorder columns to match
compressed_df = compressed_df.select(base_df.columns) # Add missing columns
for col in new_cols - base_cols:
# Concat and deduplicate by icao (keep new data - it comes last) base_df = base_df.with_columns(pl.lit(None).alias(col))
combined = pl.concat([base_df, compressed_df]) for col in base_cols - new_cols:
print(f"After concat: {len(combined)} records") compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
deduplicated = combined.unique(subset=["icao"], keep="last") # Reorder columns to match
compressed_df = compressed_df.select(base_df.columns)
print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup")
# Concat and deduplicate by icao (keep new data - it comes last)
del base_df, combined combined = pl.concat([base_df, compressed_df])
gc.collect() print(f"After concat: {len(combined)} records")
return deduplicated
else:
print(f"No base release found at {base_path}, using only new data")
return compressed_df
except Exception as e:
import traceback
print(f"Failed to download base release: {e}")
traceback.print_exc()
return compressed_df
return combined, earliest_date
def cleanup_chunks(output_id: str, chunks_dir: str): def cleanup_chunks(output_id: str, chunks_dir: str):
"""Delete chunk parquet files after successful merge.""" """Delete chunk parquet files after successful merge."""
@@ -176,7 +172,7 @@ def main():
if args.start_date and args.end_date: if args.start_date and args.end_date:
# Historical mode # Historical mode
output_id = f"{args.start_date}_{args.end_date}" output_id = f"{args.start_date}_{args.end_date}"
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv" output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv.gz"
print(f"Combining chunks for date range: {args.start_date} to {args.end_date}") print(f"Combining chunks for date range: {args.start_date} to {args.end_date}")
else: else:
# Daily mode - use same date for start and end # Daily mode - use same date for start and end
@@ -187,7 +183,7 @@ def main():
date_str = target_day.strftime("%Y-%m-%d") date_str = target_day.strftime("%Y-%m-%d")
output_id = date_str output_id = date_str
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv" output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv.gz"
print(f"Combining chunks for {date_str}") print(f"Combining chunks for {date_str}")
chunks_dir = args.chunks_dir chunks_dir = args.chunks_dir
@@ -220,8 +216,15 @@ def main():
print(f"After combining: {get_resource_usage()}") print(f"After combining: {get_resource_usage()}")
# Merge with base release (unless skipped) # Merge with base release (unless skipped)
base_start_date = None
if not args.skip_base: if not args.skip_base:
combined = download_and_merge_base_release(combined) combined, base_start_date = download_and_merge_base_release(combined)
# Update filename if we merged with base release and got a start date
if base_start_date and not (args.start_date and args.end_date):
# Only update filename for daily mode when base was merged
output_filename = f"openairframes_adsb_{base_start_date}_{date_str}.csv.gz"
print(f"Updated filename to reflect date range: {output_filename}")
# Convert list columns to strings for CSV compatibility # Convert list columns to strings for CSV compatibility
for col in combined.columns: for col in combined.columns:
@@ -234,9 +237,17 @@ def main():
if 'time' in combined.columns: if 'time' in combined.columns:
combined = combined.sort('time') combined = combined.sort('time')
# Replace empty strings with null across all string columns to avoid quoted empty strings
for col in combined.columns:
if combined[col].dtype == pl.Utf8:
combined = combined.with_columns(
pl.when(pl.col(col) == "").then(None).otherwise(pl.col(col)).alias(col)
)
# Write final CSV # Write final CSV
output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename) output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename)
combined.write_csv(output_path) with gzip.open(output_path, "wb", compresslevel=9) as f:
combined.write_csv(f, null_value='', quote_style='necessary')
print(f"Wrote {len(combined)} records to {output_path}") print(f"Wrote {len(combined)} records to {output_path}")
# Cleanup # Cleanup
+1 -1
View File
@@ -264,7 +264,7 @@ def get_latest_aircraft_adsb_csv_df():
if df[col].dtype == pl.Utf8: if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null("")) df = df.with_columns(pl.col(col).fill_null(""))
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv # Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match: if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}") raise ValueError(f"Could not extract date from filename: {csv_path.name}")
+21 -6
View File
@@ -76,13 +76,9 @@ def timeout_handler(signum, frame):
raise DownloadTimeoutException("Download timed out after 40 seconds") raise DownloadTimeoutException("Download timed out after 40 seconds")
def fetch_releases(version_date: str) -> list: def _fetch_releases_from_repo(year: str, version_date: str) -> list:
"""Fetch GitHub releases for a given version date from adsblol.""" """Fetch GitHub releases for a given version date from a specific year's adsblol repo."""
year = version_date.split('.')[0][1:]
if version_date == "v2024.12.31":
year = "2025"
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases" BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
# Match both normal and tmp releases
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+(tmp)?$" PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+(tmp)?$"
releases = [] releases = []
page = 1 page = 1
@@ -123,6 +119,25 @@ def fetch_releases(version_date: str) -> list:
return releases return releases
def fetch_releases(version_date: str) -> list:
"""Fetch GitHub releases for a given version date from adsblol.
For Dec 31 dates, if no releases are found in the current year's repo,
also checks the next year's repo (adsblol sometimes publishes Dec 31
data in the following year's repository).
"""
year = version_date.split('.')[0][1:]
releases = _fetch_releases_from_repo(year, version_date)
# For last day of year, also check next year's repo if nothing found
if not releases and version_date.endswith(".12.31"):
next_year = str(int(year) + 1)
print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo...")
releases = _fetch_releases_from_repo(next_year, version_date)
return releases
def download_asset(asset_url: str, file_path: str) -> bool: def download_asset(asset_url: str, file_path: str) -> bool:
"""Download a single release asset.""" """Download a single release asset."""
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True) os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
-97
View File
@@ -1,97 +0,0 @@
"""
Reduce step: downloads all chunk CSVs from S3, combines them,
deduplicates across the full dataset, and uploads the final result.
Environment variables:
S3_BUCKET — bucket with intermediate results
RUN_ID — run identifier matching the map workers
GLOBAL_START_DATE — overall start date for output filename
GLOBAL_END_DATE — overall end date for output filename
"""
import gzip
import os
import shutil
from pathlib import Path
import boto3
import polars as pl
from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature
def main():
s3_bucket = os.environ["S3_BUCKET"]
run_id = os.environ.get("RUN_ID", "default")
global_start = os.environ["GLOBAL_START_DATE"]
global_end = os.environ["GLOBAL_END_DATE"]
s3 = boto3.client("s3")
prefix = f"intermediate/{run_id}/"
# List all chunk files for this run
paginator = s3.get_paginator("list_objects_v2")
chunk_keys = []
for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["Key"].endswith(".csv.gz"):
chunk_keys.append(obj["Key"])
chunk_keys.sort()
print(f"Found {len(chunk_keys)} chunks to combine")
if not chunk_keys:
print("No chunks found — nothing to reduce.")
return
# Download and concatenate all chunks
download_dir = Path("/tmp/chunks")
download_dir.mkdir(parents=True, exist_ok=True)
dfs = []
for key in chunk_keys:
gz_path = download_dir / Path(key).name
csv_path = gz_path.with_suffix("") # Remove .gz
print(f"Downloading {key}...")
s3.download_file(s3_bucket, key, str(gz_path))
# Decompress
with gzip.open(gz_path, 'rb') as f_in:
with open(csv_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
gz_path.unlink()
df_chunk = pl.read_csv(csv_path)
print(f" Loaded {df_chunk.height} rows from {csv_path.name}")
dfs.append(df_chunk)
# Free disk space after loading
csv_path.unlink()
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
print(f"Combined: {df_accumulated.height} rows before dedup")
# Final global deduplication
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {df_accumulated.height} rows")
# Write and upload final result
output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz"
csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv")
gz_output = Path(f"/tmp/{output_name}")
df_accumulated.write_csv(csv_output)
with open(csv_output, 'rb') as f_in:
with gzip.open(gz_output, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
csv_output.unlink()
final_key = f"final/{output_name}"
print(f"Uploading to s3://{s3_bucket}/{final_key}")
s3.upload_file(str(gz_output), s3_bucket, final_key)
print(f"Final output: {df_accumulated.height} records -> {final_key}")
if __name__ == "__main__":
main()
-89
View File
@@ -1,89 +0,0 @@
"""
Map worker: processes a date range chunk, uploads result to S3.
Environment variables:
START_DATE — inclusive, YYYY-MM-DD
END_DATE — exclusive, YYYY-MM-DD
S3_BUCKET — bucket for intermediate results
RUN_ID — unique run identifier for namespacing S3 keys
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
import boto3
import polars as pl
from compress_adsb_to_aircraft_data import (
load_historical_for_day,
deduplicate_by_signature,
COLUMNS,
)
def main():
start_date_str = os.environ["START_DATE"]
end_date_str = os.environ["END_DATE"]
s3_bucket = os.environ["S3_BUCKET"]
run_id = os.environ.get("RUN_ID", "default")
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
total_days = (end_date - start_date).days
print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})")
dfs = []
current_date = start_date
while current_date < end_date:
day_str = current_date.strftime("%Y-%m-%d")
print(f" Loading {day_str}...")
df_compressed = load_historical_for_day(current_date)
if df_compressed.height == 0:
raise RuntimeError(f"No data found for {day_str}")
dfs.append(df_compressed)
total_rows = sum(df.height for df in dfs)
print(f" +{df_compressed.height} rows (total: {total_rows})")
# Delete local cache after each day to save disk in container
cache_dir = Path("data/adsb")
if cache_dir.exists():
import shutil
shutil.rmtree(cache_dir)
current_date += timedelta(days=1)
# Concatenate all days
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
# Deduplicate within this chunk
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {df_accumulated.height} rows")
# Write to local file then upload to S3
local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv")
df_accumulated.write_csv(local_path)
# Compress with gzip
import gzip
import shutil
gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz")
with open(local_path, 'rb') as f_in:
with gzip.open(gz_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
local_path.unlink() # Remove uncompressed file
s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz"
print(f"Uploading to s3://{s3_bucket}/{s3_key}")
s3 = boto3.client("s3")
s3.upload_file(str(gz_path), s3_bucket, s3_key)
print("Done.")
if __name__ == "__main__":
main()
+1 -1
View File
@@ -77,7 +77,7 @@ if __name__ == '__main__':
OUT_ROOT = Path("data/openairframes") OUT_ROOT = Path("data/openairframes")
OUT_ROOT.mkdir(parents=True, exist_ok=True) OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv" output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv.gz"
df_combined.write_csv(output_file) df_combined.write_csv(output_file)
print(f"Saved: {output_file}") print(f"Saved: {output_file}")
+4 -2
View File
@@ -119,6 +119,7 @@ def download_latest_aircraft_csv(
Returns: Returns:
Path to the downloaded file Path to the downloaded file
""" """
output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token) assets = get_latest_release_assets(repo, github_token=github_token)
try: try:
asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$") asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$")
@@ -164,8 +165,9 @@ def download_latest_aircraft_adsb_csv(
Returns: Returns:
Path to the downloaded file Path to the downloaded file
""" """
output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token) assets = get_latest_release_assets(repo, github_token=github_token)
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv$") asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token) saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}") print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to return saved_to
@@ -176,7 +178,7 @@ def get_latest_aircraft_adsb_csv_df():
import pandas as pd import pandas as pd
df = pd.read_csv(csv_path) df = pd.read_csv(csv_path)
df = df.fillna("") df = df.fillna("")
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv # Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path)) match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match: if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}") raise ValueError(f"Could not extract date from filename: {csv_path.name}")