OpenAirframes 1.0

This commit is contained in:
ggman12
2026-02-12 10:52:42 -05:00
parent f9e04337ae
commit 4015a5fcf1
33 changed files with 1212 additions and 1138 deletions
+24 -9
View File
@@ -27,7 +27,7 @@ from src.adsb.compress_adsb_to_aircraft_data import compress_multi_icao_df, COLU
DEFAULT_CHUNK_DIR = os.path.join(OUTPUT_DIR, "adsb_chunks")
FINAL_OUTPUT_DIR = "./data/planequery_aircraft"
FINAL_OUTPUT_DIR = "./data/openairframes"
os.makedirs(FINAL_OUTPUT_DIR, exist_ok=True)
@@ -36,8 +36,13 @@ def get_target_day() -> datetime:
return datetime.utcnow() - timedelta(days=1)
def process_single_chunk(chunk_path: str) -> pl.DataFrame:
"""Load and compress a single chunk parquet file."""
def process_single_chunk(chunk_path: str, delete_after_load: bool = False) -> pl.DataFrame:
"""Load and compress a single chunk parquet file.
Args:
chunk_path: Path to parquet file
delete_after_load: If True, delete the parquet file after loading to free disk space
"""
print(f"Processing {os.path.basename(chunk_path)}... | {get_resource_usage()}")
# Load chunk - only columns we need
@@ -45,6 +50,14 @@ def process_single_chunk(chunk_path: str) -> pl.DataFrame:
df = pl.read_parquet(chunk_path, columns=needed_columns)
print(f" Loaded {len(df)} rows")
# Delete file immediately after loading to free disk space
if delete_after_load:
try:
os.remove(chunk_path)
print(f" Deleted {chunk_path} to free disk space")
except Exception as e:
print(f" Warning: Failed to delete {chunk_path}: {e}")
# Compress to aircraft records (one per ICAO) using shared function
compressed = compress_multi_icao_df(df, verbose=True)
print(f" Compressed to {len(compressed)} aircraft records")
@@ -72,12 +85,12 @@ def combine_compressed_chunks(compressed_dfs: list[pl.DataFrame]) -> pl.DataFram
def download_and_merge_base_release(compressed_df: pl.DataFrame) -> pl.DataFrame:
"""Download base release and merge with new data."""
from src.get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv
from src.get_latest_release import download_latest_aircraft_adsb_csv
print("Downloading base ADS-B release...")
try:
base_path = download_latest_aircraft_adsb_csv(
output_dir="./data/planequery_aircraft_base"
output_dir="./data/openairframes_base"
)
print(f"Download returned: {base_path}")
@@ -156,16 +169,17 @@ def main():
parser.add_argument("--chunks-dir", type=str, default=DEFAULT_CHUNK_DIR, help="Directory containing chunk parquet files")
parser.add_argument("--skip-base", action="store_true", help="Skip downloading and merging base release")
parser.add_argument("--keep-chunks", action="store_true", help="Keep chunk files after merging")
parser.add_argument("--stream", action="store_true", help="Delete parquet files immediately after loading to save disk space")
args = parser.parse_args()
# Determine output ID and filename based on mode
if args.start_date and args.end_date:
# Historical mode
output_id = f"{args.start_date}_{args.end_date}"
output_filename = f"planequery_aircraft_adsb_{args.start_date}_{args.end_date}.csv"
output_filename = f"openairframes_adsb_{args.start_date}_{args.end_date}.csv"
print(f"Combining chunks for date range: {args.start_date} to {args.end_date}")
else:
# Daily mode
# Daily mode - use same date for start and end
if args.date:
target_day = datetime.strptime(args.date, "%Y-%m-%d")
else:
@@ -173,7 +187,7 @@ def main():
date_str = target_day.strftime("%Y-%m-%d")
output_id = date_str
output_filename = f"planequery_aircraft_adsb_{date_str}.csv"
output_filename = f"openairframes_adsb_{date_str}_{date_str}.csv"
print(f"Combining chunks for {date_str}")
chunks_dir = args.chunks_dir
@@ -190,9 +204,10 @@ def main():
print(f"Found {len(chunk_files)} chunk files")
# Process each chunk separately to save memory
# With --stream, delete parquet files immediately after loading to save disk space
compressed_chunks = []
for chunk_path in chunk_files:
compressed = process_single_chunk(chunk_path)
compressed = process_single_chunk(chunk_path, delete_after_load=args.stream)
compressed_chunks.append(compressed)
gc.collect()
+3 -3
View File
@@ -253,7 +253,7 @@ def concat_compressed_dfs(df_base, df_new):
def get_latest_aircraft_adsb_csv_df():
"""Download and load the latest ADS-B CSV from GitHub releases."""
from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv
from get_latest_release import download_latest_aircraft_adsb_csv
import re
csv_path = download_latest_aircraft_adsb_csv()
@@ -264,8 +264,8 @@ def get_latest_aircraft_adsb_csv_df():
if df[col].dtype == pl.Utf8:
df = df.with_columns(pl.col(col).fill_null(""))
# Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv
match = re.search(r"planequery_aircraft_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
# Extract start date from filename pattern: openairframes_adsb_{start_date}_{end_date}.csv
match = re.search(r"openairframes_adsb_(\d{4}-\d{2}-\d{2})_", str(csv_path))
if not match:
raise ValueError(f"Could not extract date from filename: {csv_path.name}")
+13 -5
View File
@@ -82,7 +82,8 @@ def fetch_releases(version_date: str) -> list:
if version_date == "v2024.12.31":
year = "2025"
BASE_URL = f"https://api.github.com/repos/adsblol/globe_history_{year}/releases"
PATTERN = f"{version_date}-planes-readsb-prod-0"
# Match exact release name, exclude tmp releases
PATTERN = rf"^{re.escape(version_date)}-planes-readsb-prod-\d+$"
releases = []
page = 1
@@ -187,19 +188,23 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
cat_proc = subprocess.Popen(
["cat"] + file_paths,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
stderr=subprocess.PIPE
)
tar_cmd = ["tar", "xf", "-", "-C", extract_dir, "--strip-components=1"]
subprocess.run(
result = subprocess.run(
tar_cmd,
stdin=cat_proc.stdout,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True
)
cat_proc.stdout.close()
cat_stderr = cat_proc.stderr.read().decode() if cat_proc.stderr else ""
cat_proc.wait()
if cat_stderr:
print(f"cat stderr: {cat_stderr}")
print(f"Successfully extracted archive to {extract_dir}")
# Delete tar files immediately after extraction
@@ -217,7 +222,10 @@ def extract_split_archive(file_paths: list, extract_dir: str) -> bool:
return True
except subprocess.CalledProcessError as e:
stderr_output = e.stderr.decode() if e.stderr else ""
print(f"Failed to extract split archive: {e}")
if stderr_output:
print(f"tar stderr: {stderr_output}")
return False
+2 -2
View File
@@ -76,8 +76,8 @@ def main():
print(f"After dedup: {df_accumulated.height} rows")
# Write and upload final result
output_name = f"planequery_aircraft_adsb_{global_start}_{global_end}.csv.gz"
csv_output = Path(f"/tmp/planequery_aircraft_adsb_{global_start}_{global_end}.csv")
output_name = f"openairframes_adsb_{global_start}_{global_end}.csv.gz"
csv_output = Path(f"/tmp/openairframes_adsb_{global_start}_{global_end}.csv")
gz_output = Path(f"/tmp/{output_name}")
df_accumulated.write_csv(csv_output)