mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-04-23 11:36:35 +02:00
Refactor concat_parquet_to_final.py to accept date as an argument and streamline file handling
This commit is contained in:
@@ -1,29 +1,34 @@
|
||||
from pathlib import Path
|
||||
import polars as pl
|
||||
import argparse
|
||||
|
||||
OUTPUT_DIR = Path("./data/output")
|
||||
OUTPUT_DIR = Path("./outputs")
|
||||
|
||||
compressed_dir = OUTPUT_DIR / "compressed"
|
||||
date_dirs = sorted(p for p in compressed_dir.iterdir() if p.is_dir())
|
||||
if not date_dirs:
|
||||
raise FileNotFoundError(f"No date folders found in {compressed_dir}")
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Concatenate compressed parquet files for a single day")
|
||||
parser.add_argument("--date", type=str, required=True, help="Date in YYYY-MM-DD format")
|
||||
args = parser.parse_args()
|
||||
|
||||
start_date = date_dirs[0].name
|
||||
end_date = date_dirs[-1].name
|
||||
parquet_files = []
|
||||
for d in date_dirs:
|
||||
parquet_files.extend(sorted(d.glob("*.parquet")))
|
||||
compressed_dir = OUTPUT_DIR / "compressed"
|
||||
date_dir = compressed_dir / args.date
|
||||
if not date_dir.is_dir():
|
||||
raise FileNotFoundError(f"No date folder found: {date_dir}")
|
||||
|
||||
if not parquet_files:
|
||||
raise FileNotFoundError("No parquet files found in compressed subfolders")
|
||||
parquet_files = sorted(date_dir.glob("*.parquet"))
|
||||
if not parquet_files:
|
||||
raise FileNotFoundError(f"No parquet files found in {date_dir}")
|
||||
|
||||
frames = [pl.read_parquet(p) for p in parquet_files]
|
||||
df = pl.concat(frames, how="vertical", rechunk=True)
|
||||
frames = [pl.read_parquet(p) for p in parquet_files]
|
||||
df = pl.concat(frames, how="vertical", rechunk=True)
|
||||
|
||||
df = df.sort(["time", "icao"])
|
||||
output_path = OUTPUT_DIR / f"openairframes_adsb_{start_date}_{end_date}.parquet"
|
||||
print(f"Writing combined parquet to {output_path} with {df.height} rows")
|
||||
df.write_parquet(output_path)
|
||||
csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{start_date}_{end_date}.csv"
|
||||
print(f"Writing combined csv to {csv_output_path} with {df.height} rows")
|
||||
df.write_csv(csv_output_path)
|
||||
df = df.sort(["time", "icao"])
|
||||
output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}_{args.date}.parquet"
|
||||
print(f"Writing combined parquet to {output_path} with {df.height} rows")
|
||||
df.write_parquet(output_path)
|
||||
|
||||
csv_output_path = OUTPUT_DIR / f"openairframes_adsb_{args.date}_{args.date}.csv"
|
||||
print(f"Writing combined csv to {csv_output_path} with {df.height} rows")
|
||||
df.write_csv(csv_output_path)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user