Compare commits

...

13 Commits

Author SHA1 Message Date
ggman12 a8b2b66952 fix .csv to .csv.gz transition 2026-02-15 19:08:51 -05:00
ggman12 3f38263a0c stop depue that destroys previous days 2026-02-15 17:55:16 -05:00
ggman12 1a553d5f44 use date of file instead of min timestamp 2026-02-15 16:44:09 -05:00
ggman12 405855c566 deal with whole schema 2026-02-15 16:43:00 -05:00
ggman12 4e81dde201 fix date parsing 2026-02-15 14:55:32 -05:00
ggman12 fde8ef029c update csv writing to handle empty data. Save space with higher gzip compression 2026-02-15 14:14:54 -05:00
ggman12 18ab51bd60 update naming 2026-02-15 13:45:03 -05:00
ggman12 83b9d2a76d write gzip 2026-02-15 13:41:09 -05:00
ggman12 8874619ab0 write gzip 2026-02-15 13:41:02 -05:00
ggman12 823f291728 fix errors in daily release due to new .gz file 2026-02-15 13:21:51 -05:00
ggman12 982011b36f end of year check 2026-02-14 22:42:32 -05:00
ggman12 1b15e43669 use .csv.gz 2026-02-14 22:22:14 -05:00
ggman12 f17adc4574 remvoe aws worker, reducer 2026-02-14 22:21:14 -05:00
11 changed files with 95 additions and 276 deletions
+1 -1
View File
@@ -282,5 +282,5 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: openairframes_adsb-${{ needs.generate-matrix.outputs.global_start }}-${{ needs.generate-matrix.outputs.global_end }}
path: data/openairframes/*.csv
path: data/openairframes/*.csv.gz
retention-days: 30
@@ -227,7 +227,7 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: adsb-release
path: data/openairframes/openairframes_adsb_*.csv
path: data/openairframes/openairframes_adsb_*.csv.gz
retention-days: 1
build-community:
@@ -388,7 +388,7 @@ jobs:
# Find files from artifacts using find (handles nested structures)
CSV_FILE_FAA=$(find artifacts/faa -name "openairframes_faa_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv" -type f 2>/dev/null | head -1)
CSV_FILE_ADSB=$(find artifacts/adsb -name "openairframes_adsb_*.csv.gz" -type f 2>/dev/null | head -1)
CSV_FILE_COMMUNITY=$(find artifacts/community -name "openairframes_community_*.csv" -type f 2>/dev/null | head -1)
ZIP_FILE=$(find artifacts/faa -name "ReleasableAircraft_*.zip" -type f 2>/dev/null | head -1)
JSON_FILE_ADSBX=$(find artifacts/adsbexchange -name "basic-ac-db_*.json.gz" -type f 2>/dev/null | head -1)
-11
View File
@@ -1,11 +0,0 @@
FROM --platform=linux/arm64 python:3.12-slim
WORKDIR /app
COPY requirements.reducer.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY reducer.py .
CMD ["python", "-u", "reducer.py"]
-12
View File
@@ -1,12 +0,0 @@
FROM --platform=linux/arm64 python:3.12-slim
WORKDIR /app
COPY requirements.worker.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY compress_adsb_to_aircraft_data.py .
COPY download_adsb_data_to_parquet.py .
COPY worker.py .
CMD ["python", "-u", "worker.py"]
+65 -54
View File
@@ -14,11 +14,12 @@ Usage:
python -m src.adsb.combine_chunks_to_csv --chunks-dir data/output/adsb_chunks --start-date 2024-01-01 --end-date 2024-01-07 --skip-base
"""
import gc
import gzip
import os
import sys
import glob
import argparse
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import polars as pl
@@ -33,7 +34,7 @@ os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
def get_target_day() -> datetime:
"""Get yesterday's date (the day we're processing)."""
return datetime.utcnow() - timedelta(days=1)
return datetime.now(timezone.utc) - timedelta(days=1)
def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame:
@@ -83,58 +84,53 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram
return combined
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame:
"""Download base release and merge with new data."""
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> tuple[pl.DataFrame, str | None]:
"""Download base release and merge with new data.
Returns:
Tuple of (merged_df, earliest_date_str) where earliest_date_str is None if no base release was merged
"""
from src.get_latest_release import download_latest_aircraft_adsb_csv
print("Downloading base ADS-B release...")
try:
base_path = download_latest_aircraft_adsb_csv(
output_dir="./data/openairframes_base"
)
print(f"Download returned: {base_path}")
if base_path and os.path.exists(str(base_path)):
print(f"Loading base release from {base_path}")
base_df = pl.read_csv(base_path)
print(f"Base release has {len(base_df)} records")
# Ensure columns match
base_cols = set(base_df.columns)
new_cols = set(compressed_df.columns)
print(f"Base columns: {sorted(base_cols)}")
print(f"New columns: {sorted(new_cols)}")
# Add missing columns
for col in new_cols - base_cols:
base_df = base_df.with_columns(pl.lit(None).alias(col))
for col in base_cols - new_cols:
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
# Reorder columns to match
compressed_df = compressed_df.select(base_df.columns)
# Concat and deduplicate by icao (keep new data - it comes last)
combined = pl.concat([base_df, compressed_df])
print(f"After concat: {len(combined)} records")
deduplicated = combined.unique(subset=["icao"], keep="last")
print(f"Combined with base: {len(combined)} -> {len(deduplicated)} after dedup")
del base_df, combined
gc.collect()
return deduplicated
else:
print(f"No base release found at {base_path}, using only new data")
return compressed_df
except Exception as e:
import traceback
print(f"Failed to download base release: {e}")
traceback.print_exc()
return compressed_df
base_path = download_latest_aircraft_adsb_csv(
output_dir="./data/openairframes_base"
)
print(f"Download returned: {base_path}")
print(f"Loading base release from {base_path}")
# Extract start date from filename (e.g., openairframes_adsb_2025-05-01_2026-02-14.csv.gz)
import re
filename = os.path.basename(str(base_path))
match = re.search(r'openairframes_adsb_(\d{4}-\d{2}-\d{2})_', filename)
earliest_date = match.group(1) if match else None
print(f"Start date from base filename: {earliest_date}")
# Read CSV with schema matching the new data
base_df = pl.read_csv(base_path, schema=compressed_df.schema)
print(f"Base release has {len(base_df)} records")
# Ensure columns match
base_cols = set(base_df.columns)
new_cols = set(compressed_df.columns)
print(f"Base columns: {sorted(base_cols)}")
print(f"New columns: {sorted(new_cols)}")
# Add missing columns
for col in new_cols - base_cols:
base_df = base_df.with_columns(pl.lit(None).alias(col))
for col in base_cols - new_cols:
compressed_df = compressed_df.with_columns(pl.lit(None).alias(col))
# Reorder columns to match
compressed_df = compressed_df.select(base_df.columns)
# Concat and deduplicate by icao (keep new data - it comes last)
combined = pl.concat([base_df, compressed_df])
print(f"After concat: {len(combined)} records")
return combined, earliest_date
def cleanup_chunks(output_id: str, chunks_dir: str):
"""Delete chunk parquet files after successful merge."""
@@ -176,7 +172,7 @@ def main():
if args.start_date and args.end_date:
# Historical mode
output_id = f"{args.start_date}_{args.end_date}"
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv"
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv.gz"
print(f"Combining chunks for date range: {args.start_date} to {args.end_date}")
else:
# Daily mode - use same date for start and end
@@ -187,7 +183,7 @@ def main():
date_str = target_day.strftime("%Y-%m-%d")
output_id = date_str
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv"
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv.gz"
print(f"Combining chunks for {date_str}")
chunks_dir = args.chunks_dir
@@ -220,8 +216,15 @@ def main():
print(f"After combining: {get_resource_usage()}")
# Merge with base release (unless skipped)
base_start_date = None
if not args.skip_base:
combined = download_and_merge_base_release(combined)
combined, base_start_date = download_and_merge_base_release(combined)
# Update filename if we merged with base release and got a start date
if base_start_date and not (args.start_date and args.end_date):
# Only update filename for daily mode when base was merged
output_filename = f"openairframes_adsb_{base_start_date}_{date_str}.csv.gz"
print(f"Updated filename to reflect date range: {output_filename}")
# Convert list columns to strings for CSV compatibility
for col in combined.columns:
@@ -234,9 +237,17 @@ def main():
if 'time' in combined.columns:
combined = combined.sort('time')
# Replace empty strings with null across all string columns to avoid quoted empty strings
for col in combined.columns:
if combined[col].dtype == pl.Utf8:
combined = combined.with_columns(
pl.when(pl.col(col) == "").then(None).otherwise(pl.col(col)).alias(col)
)
# Write final CSV
output_path = os.path.join(FINAL_OUTPUT_DIR, output_filename)
combined.write_csv(output_path)
with gzip.open(output_path, "wb", compresslevel=9) as f:
combined.write_csv(f, null_value='', quote_style='necessary')
print(f"Wrote {len(combined)} records to {output_path}")
# Cleanup
+1 -1
View File
@@ -264,7 +264,7 @@ def get_latest_aircraft_adsb_csv_df():
if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null(""))
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
+21 -6
View File
@@ -76,13 +76,9 @@ def timeout_handler(signum, frame):
raise DownloadTimeoutException("Download timed out after 40 seconds")
def fetch_releases(version_date: str) -> list:
"""Fetch GitHub releases for a given version date from adsblol."""
year = version_date.split('.')[0][1:]
if version_date == "v2024.12.31":
year = "2025"
def _fetch_releases_from_repo(year: str, version_date: str) -> list:
"""Fetch GitHub releases for a given version date from a specific year's adsblol repo."""
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
# Match both normal and tmp releases
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+(tmp)?$"
releases = []
page = 1
@@ -123,6 +119,25 @@ def fetch_releases(version_date: str) -> list:
return releases
def fetch_releases(version_date: str) -> list:
"""Fetch GitHub releases for a given version date from adsblol.
For Dec 31 dates, if no releases are found in the current year's repo,
also checks the next year's repo (adsblol sometimes publishes Dec 31
data in the following year's repository).
"""
year = version_date.split('.')[0][1:]
releases = _fetch_releases_from_repo(year, version_date)
# For last day of year, also check next year's repo if nothing found
if not releases and version_date.endswith(".12.31"):
next_year = str(int(year) + 1)
print(f"No releases found for {version_date} in {year} repo, checking {next_year} repo...")
releases = _fetch_releases_from_repo(next_year, version_date)
return releases
def download_asset(asset_url: str, file_path: str) -> bool:
"""Download a single release asset."""
os.makedirs(os.path.dirname(file_path) or OUTPUT_DIR, exist_ok=True)
-97
View File
@@ -1,97 +0,0 @@
"""
Reduce step: downloads all chunk CSVs from S3, combines them,
deduplicates across the full dataset, and uploads the final result.
Environment variables:
S3_BUCKET — bucket with intermediate results
RUN_ID — run identifier matching the map workers
GLOBAL_START_DATE — overall start date for output filename
GLOBAL_END_DATE — overall end date for output filename
"""
import gzip
import os
import shutil
from pathlib import Path
import boto3
import polars as pl
from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature
def main():
s3_bucket = os.environ["S3_BUCKET"]
run_id = os.environ.get("RUN_ID", "default")
global_start = os.environ["GLOBAL_START_DATE"]
global_end = os.environ["GLOBAL_END_DATE"]
s3 = boto3.client("s3")
prefix = f"intermediate/{run_id}/"
# List all chunk files for this run
paginator = s3.get_paginator("list_objects_v2")
chunk_keys = []
for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["Key"].endswith(".csv.gz"):
chunk_keys.append(obj["Key"])
chunk_keys.sort()
print(f"Found {len(chunk_keys)} chunks to combine")
if not chunk_keys:
print("No chunks found — nothing to reduce.")
return
# Download and concatenate all chunks
download_dir = Path("/tmp/chunks")
download_dir.mkdir(parents=True, exist_ok=True)
dfs = []
for key in chunk_keys:
gz_path = download_dir / Path(key).name
csv_path = gz_path.with_suffix("") # Remove .gz
print(f"Downloading {key}...")
s3.download_file(s3_bucket, key, str(gz_path))
# Decompress
with gzip.open(gz_path, 'rb') as f_in:
with open(csv_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
gz_path.unlink()
df_chunk = pl.read_csv(csv_path)
print(f" Loaded {df_chunk.height} rows from {csv_path.name}")
dfs.append(df_chunk)
# Free disk space after loading
csv_path.unlink()
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
print(f"Combined: {df_accumulated.height} rows before dedup")
# Final global deduplication
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {df_accumulated.height} rows")
# Write and upload final result
output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz"
csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv")
gz_output = Path(f"/tmp/{output_name}")
df_accumulated.write_csv(csv_output)
with open(csv_output, 'rb') as f_in:
with gzip.open(gz_output, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
csv_output.unlink()
final_key = f"final/{output_name}"
print(f"Uploading to s3://{s3_bucket}/{final_key}")
s3.upload_file(str(gz_output), s3_bucket, final_key)
print(f"Final output: {df_accumulated.height} records -> {final_key}")
if __name__ == "__main__":
main()
-89
View File
@@ -1,89 +0,0 @@
"""
Map worker: processes a date range chunk, uploads result to S3.
Environment variables:
START_DATE — inclusive, YYYY-MM-DD
END_DATE — exclusive, YYYY-MM-DD
S3_BUCKET — bucket for intermediate results
RUN_ID — unique run identifier for namespacing S3 keys
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
import boto3
import polars as pl
from compress_adsb_to_aircraft_data import (
load_historical_for_day,
deduplicate_by_signature,
COLUMNS,
)
def main():
start_date_str = os.environ["START_DATE"]
end_date_str = os.environ["END_DATE"]
s3_bucket = os.environ["S3_BUCKET"]
run_id = os.environ.get("RUN_ID", "default")
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
total_days = (end_date - start_date).days
print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})")
dfs = []
current_date = start_date
while current_date < end_date:
day_str = current_date.strftime("%Y-%m-%d")
print(f" Loading {day_str}...")
df_compressed = load_historical_for_day(current_date)
if df_compressed.height == 0:
raise RuntimeError(f"No data found for {day_str}")
dfs.append(df_compressed)
total_rows = sum(df.height for df in dfs)
print(f" +{df_compressed.height} rows (total: {total_rows})")
# Delete local cache after each day to save disk in container
cache_dir = Path("data/adsb")
if cache_dir.exists():
import shutil
shutil.rmtree(cache_dir)
current_date += timedelta(days=1)
# Concatenate all days
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
# Deduplicate within this chunk
df_accumulated = deduplicate_by_signature(df_accumulated)
print(f"After dedup: {df_accumulated.height} rows")
# Write to local file then upload to S3
local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv")
df_accumulated.write_csv(local_path)
# Compress with gzip
import gzip
import shutil
gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz")
with open(local_path, 'rb') as f_in:
with gzip.open(gz_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
local_path.unlink() # Remove uncompressed file
s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz"
print(f"Uploading to s3://{s3_bucket}/{s3_key}")
s3 = boto3.client("s3")
s3.upload_file(str(gz_path), s3_bucket, s3_key)
print("Done.")
if __name__ == "__main__":
main()
+1 -1
View File
@@ -77,7 +77,7 @@ if __name__ == '__main__':
OUT_ROOT = Path("data/openairframes")
OUT_ROOT.mkdir(parents=True, exist_ok=True)
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv"
output_file = OUT_ROOT / f"openairframes_adsb_{start_date_str}_{date_str}.csv.gz"
df_combined.write_csv(output_file)
print(f"Saved: {output_file}")
+4 -2
View File
@@ -119,6 +119,7 @@ def download_latest_aircraft_csv(
Returns:
Path to the downloaded file
"""
output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token)
try:
asset = pick_asset(assets, name_regex=r"^openairframes_faa_.*\.csv$")
@@ -164,8 +165,9 @@ def download_latest_aircraft_adsb_csv(
Returns:
Path to the downloaded file
"""
output_dir = Path(output_dir)
assets = get_latest_release_assets(repo, github_token=github_token)
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv$")
asset = pick_asset(assets, name_regex=r"^openairframes_adsb_.*\.csv(\.gz)?$")
saved_to = download_asset(asset, output_dir / asset.name, github_token=github_token)
print(f"Downloaded: {asset.name} ({asset.size} bytes) -> {saved_to}")
return saved_to
@@ -176,7 +178,7 @@ def get_latest_aircraft_adsb_csv_df():
import pandas as pd
df = pd.read_csv(csv_path)
df = df.fillna("")
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv[.gz]
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")