mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-06-08 06:03:55 +02:00
remvoe aws worker, reducer
This commit is contained in:
@@ -1,11 +0,0 @@
|
|||||||
FROM --platform=linux/arm64 python:3.12-slim
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
COPY requirements.reducer.txt requirements.txt
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
|
||||||
|
|
||||||
COPY compress_adsb_to_aircraft_data.py .
|
|
||||||
COPY reducer.py .
|
|
||||||
|
|
||||||
CMD ["python", "-u", "reducer.py"]
|
|
||||||
@@ -1,12 +0,0 @@
|
|||||||
FROM --platform=linux/arm64 python:3.12-slim
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
COPY requirements.worker.txt requirements.txt
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
|
||||||
|
|
||||||
COPY compress_adsb_to_aircraft_data.py .
|
|
||||||
COPY download_adsb_data_to_parquet.py .
|
|
||||||
COPY worker.py .
|
|
||||||
|
|
||||||
CMD ["python", "-u", "worker.py"]
|
|
||||||
@@ -1,97 +0,0 @@
|
|||||||
"""
|
|
||||||
Reduce step: downloads all chunk CSVs from S3, combines them,
|
|
||||||
deduplicates across the full dataset, and uploads the final result.
|
|
||||||
|
|
||||||
Environment variables:
|
|
||||||
S3_BUCKET — bucket with intermediate results
|
|
||||||
RUN_ID — run identifier matching the map workers
|
|
||||||
GLOBAL_START_DATE — overall start date for output filename
|
|
||||||
GLOBAL_END_DATE — overall end date for output filename
|
|
||||||
"""
|
|
||||||
import gzip
|
|
||||||
import os
|
|
||||||
import shutil
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import boto3
|
|
||||||
import polars as pl
|
|
||||||
|
|
||||||
from compress_adsb_to_aircraft_data import COLUMNS, deduplicate_by_signature
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
s3_bucket = os.environ["S3_BUCKET"]
|
|
||||||
run_id = os.environ.get("RUN_ID", "default")
|
|
||||||
global_start = os.environ["GLOBAL_START_DATE"]
|
|
||||||
global_end = os.environ["GLOBAL_END_DATE"]
|
|
||||||
|
|
||||||
s3 = boto3.client("s3")
|
|
||||||
prefix = f"intermediate/{run_id}/"
|
|
||||||
|
|
||||||
# List all chunk files for this run
|
|
||||||
paginator = s3.get_paginator("list_objects_v2")
|
|
||||||
chunk_keys = []
|
|
||||||
for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix):
|
|
||||||
for obj in page.get("Contents", []):
|
|
||||||
if obj["Key"].endswith(".csv.gz"):
|
|
||||||
chunk_keys.append(obj["Key"])
|
|
||||||
|
|
||||||
chunk_keys.sort()
|
|
||||||
print(f"Found {len(chunk_keys)} chunks to combine")
|
|
||||||
|
|
||||||
if not chunk_keys:
|
|
||||||
print("No chunks found — nothing to reduce.")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Download and concatenate all chunks
|
|
||||||
download_dir = Path("/tmp/chunks")
|
|
||||||
download_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
dfs = []
|
|
||||||
|
|
||||||
for key in chunk_keys:
|
|
||||||
gz_path = download_dir / Path(key).name
|
|
||||||
csv_path = gz_path.with_suffix("") # Remove .gz
|
|
||||||
print(f"Downloading {key}...")
|
|
||||||
s3.download_file(s3_bucket, key, str(gz_path))
|
|
||||||
|
|
||||||
# Decompress
|
|
||||||
with gzip.open(gz_path, 'rb') as f_in:
|
|
||||||
with open(csv_path, 'wb') as f_out:
|
|
||||||
shutil.copyfileobj(f_in, f_out)
|
|
||||||
gz_path.unlink()
|
|
||||||
|
|
||||||
df_chunk = pl.read_csv(csv_path)
|
|
||||||
print(f" Loaded {df_chunk.height} rows from {csv_path.name}")
|
|
||||||
dfs.append(df_chunk)
|
|
||||||
|
|
||||||
# Free disk space after loading
|
|
||||||
csv_path.unlink()
|
|
||||||
|
|
||||||
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
|
|
||||||
print(f"Combined: {df_accumulated.height} rows before dedup")
|
|
||||||
|
|
||||||
# Final global deduplication
|
|
||||||
df_accumulated = deduplicate_by_signature(df_accumulated)
|
|
||||||
print(f"After dedup: {df_accumulated.height} rows")
|
|
||||||
|
|
||||||
# Write and upload final result
|
|
||||||
output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz"
|
|
||||||
csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv")
|
|
||||||
gz_output = Path(f"/tmp/{output_name}")
|
|
||||||
|
|
||||||
df_accumulated.write_csv(csv_output)
|
|
||||||
with open(csv_output, 'rb') as f_in:
|
|
||||||
with gzip.open(gz_output, 'wb') as f_out:
|
|
||||||
shutil.copyfileobj(f_in, f_out)
|
|
||||||
csv_output.unlink()
|
|
||||||
|
|
||||||
final_key = f"final/{output_name}"
|
|
||||||
print(f"Uploading to s3://{s3_bucket}/{final_key}")
|
|
||||||
s3.upload_file(str(gz_output), s3_bucket, final_key)
|
|
||||||
|
|
||||||
print(f"Final output: {df_accumulated.height} records -> {final_key}")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@@ -1,89 +0,0 @@
|
|||||||
"""
|
|
||||||
Map worker: processes a date range chunk, uploads result to S3.
|
|
||||||
|
|
||||||
Environment variables:
|
|
||||||
START_DATE — inclusive, YYYY-MM-DD
|
|
||||||
END_DATE — exclusive, YYYY-MM-DD
|
|
||||||
S3_BUCKET — bucket for intermediate results
|
|
||||||
RUN_ID — unique run identifier for namespacing S3 keys
|
|
||||||
"""
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
from datetime import datetime, timedelta
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import boto3
|
|
||||||
import polars as pl
|
|
||||||
|
|
||||||
from compress_adsb_to_aircraft_data import (
|
|
||||||
load_historical_for_day,
|
|
||||||
deduplicate_by_signature,
|
|
||||||
COLUMNS,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
start_date_str = os.environ["START_DATE"]
|
|
||||||
end_date_str = os.environ["END_DATE"]
|
|
||||||
s3_bucket = os.environ["S3_BUCKET"]
|
|
||||||
run_id = os.environ.get("RUN_ID", "default")
|
|
||||||
|
|
||||||
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
|
|
||||||
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
|
|
||||||
|
|
||||||
total_days = (end_date - start_date).days
|
|
||||||
print(f"Worker: processing {total_days} days [{start_date_str}, {end_date_str})")
|
|
||||||
|
|
||||||
dfs = []
|
|
||||||
current_date = start_date
|
|
||||||
|
|
||||||
while current_date < end_date:
|
|
||||||
day_str = current_date.strftime("%Y-%m-%d")
|
|
||||||
print(f" Loading {day_str}...")
|
|
||||||
|
|
||||||
df_compressed = load_historical_for_day(current_date)
|
|
||||||
if df_compressed.height == 0:
|
|
||||||
raise RuntimeError(f"No data found for {day_str}")
|
|
||||||
|
|
||||||
dfs.append(df_compressed)
|
|
||||||
total_rows = sum(df.height for df in dfs)
|
|
||||||
print(f" +{df_compressed.height} rows (total: {total_rows})")
|
|
||||||
|
|
||||||
# Delete local cache after each day to save disk in container
|
|
||||||
cache_dir = Path("data/adsb")
|
|
||||||
if cache_dir.exists():
|
|
||||||
import shutil
|
|
||||||
shutil.rmtree(cache_dir)
|
|
||||||
|
|
||||||
current_date += timedelta(days=1)
|
|
||||||
|
|
||||||
# Concatenate all days
|
|
||||||
df_accumulated = pl.concat(dfs) if dfs else pl.DataFrame()
|
|
||||||
|
|
||||||
# Deduplicate within this chunk
|
|
||||||
df_accumulated = deduplicate_by_signature(df_accumulated)
|
|
||||||
print(f"After dedup: {df_accumulated.height} rows")
|
|
||||||
|
|
||||||
# Write to local file then upload to S3
|
|
||||||
local_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv")
|
|
||||||
df_accumulated.write_csv(local_path)
|
|
||||||
|
|
||||||
# Compress with gzip
|
|
||||||
import gzip
|
|
||||||
import shutil
|
|
||||||
gz_path = Path(f"/tmp/chunk_{start_date_str}_{end_date_str}.csv.gz")
|
|
||||||
with open(local_path, 'rb') as f_in:
|
|
||||||
with gzip.open(gz_path, 'wb') as f_out:
|
|
||||||
shutil.copyfileobj(f_in, f_out)
|
|
||||||
local_path.unlink() # Remove uncompressed file
|
|
||||||
|
|
||||||
s3_key = f"intermediate/{run_id}/chunk_{start_date_str}_{end_date_str}.csv.gz"
|
|
||||||
print(f"Uploading to s3://{s3_bucket}/{s3_key}")
|
|
||||||
|
|
||||||
s3 = boto3.client("s3")
|
|
||||||
s3.upload_file(str(gz_path), s3_bucket, s3_key)
|
|
||||||
print("Done.")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
Reference in New Issue
Block a user