diff --git a/scripts/download_and_concat_runs.py b/scripts/download_and_concat_runs.py new file mode 100644 index 0000000..b931eb1 --- /dev/null +++ b/scripts/download_and_concat_runs.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +""" +Download and concatenate artifacts from a specific set of workflow runs. + +Usage: + python scripts/download_and_concat_runs.py triggered_runs_20260216_123456.json +""" + +import argparse +import json +import os +import subprocess +import sys +from pathlib import Path + + +def download_run_artifact(run_id, output_dir): + """Download artifact from a specific workflow run.""" + print(f" Downloading artifacts from run {run_id}...") + + cmd = [ + 'gh', 'run', 'download', str(run_id), + '--pattern', 'openairframes_adsb-*', + '--dir', output_dir + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + print(f" ✓ Downloaded") + return True + else: + if "no artifacts" in result.stderr.lower(): + print(f" ⚠ No artifacts found (workflow may still be running)") + else: + print(f" ✗ Failed: {result.stderr}") + return False + + +def find_csv_files(download_dir): + """Find all CSV.gz files in the download directory.""" + csv_files = [] + for root, dirs, files in os.walk(download_dir): + for file in files: + if file.endswith('.csv.gz'): + csv_files.append(os.path.join(root, file)) + return sorted(csv_files) + + +def concatenate_csv_files(csv_files, output_file): + """Concatenate CSV files in order, preserving headers.""" + import gzip + + print(f"\nConcatenating {len(csv_files)} CSV files...") + + with gzip.open(output_file, 'wt') as outf: + header_written = False + + for i, csv_file in enumerate(csv_files, 1): + print(f" [{i}/{len(csv_files)}] Processing {os.path.basename(csv_file)}") + + with gzip.open(csv_file, 'rt') as inf: + lines = inf.readlines() + + if not header_written: + # Write header from first file + outf.writelines(lines) + header_written = True + else: + # Skip header for subsequent files + outf.writelines(lines[1:]) + + print(f"\n✓ Concatenated CSV saved to: {output_file}") + + # Show file size + size_mb = os.path.getsize(output_file) / (1024 * 1024) + print(f" Size: {size_mb:.1f} MB") + + +def main(): + parser = argparse.ArgumentParser( + description='Download and concatenate artifacts from workflow runs' + ) + parser.add_argument( + 'runs_file', + help='JSON file containing run IDs (from run_historical_adsb_action.py)' + ) + parser.add_argument( + '--output-dir', + default='./downloads/historical_concat', + help='Directory for downloads (default: ./downloads/historical_concat)' + ) + parser.add_argument( + '--wait', + action='store_true', + help='Wait for workflows to complete before downloading' + ) + + args = parser.parse_args() + + # Load run IDs + if not os.path.exists(args.runs_file): + print(f"Error: File not found: {args.runs_file}") + sys.exit(1) + + with open(args.runs_file, 'r') as f: + data = json.load(f) + + runs = data['runs'] + start_date = data['start_date'] + end_date = data['end_date'] + + print("=" * 60) + print("Download and Concatenate Historical Artifacts") + print("=" * 60) + print(f"Date range: {start_date} to {end_date}") + print(f"Workflow runs: {len(runs)}") + print(f"Output directory: {args.output_dir}") + print("=" * 60) + + # Create output directory + os.makedirs(args.output_dir, exist_ok=True) + + # Wait for workflows to complete if requested + if args.wait: + print("\nWaiting for workflows to complete...") + for run_info in runs: + run_id = run_info['run_id'] + print(f" Checking run {run_id}...") + + cmd = ['gh', 'run', 'watch', str(run_id)] + subprocess.run(cmd) + + # Download artifacts + print("\nDownloading artifacts...") + successful_downloads = 0 + + for i, run_info in enumerate(runs, 1): + run_id = run_info['run_id'] + print(f"\n[{i}/{len(runs)}] Run {run_id} ({run_info['start']} to {run_info['end']})") + + if download_run_artifact(run_id, args.output_dir): + successful_downloads += 1 + + print(f"\n\nDownload Summary: {successful_downloads}/{len(runs)} artifacts downloaded") + + if successful_downloads == 0: + print("\nNo artifacts downloaded. Workflows may still be running.") + print("Use --wait to wait for completion, or try again later.") + sys.exit(1) + + # Find all CSV files + csv_files = find_csv_files(args.output_dir) + + if not csv_files: + print("\nError: No CSV files found in download directory") + sys.exit(1) + + print(f"\nFound {len(csv_files)} CSV file(s):") + for csv_file in csv_files: + print(f" - {os.path.basename(csv_file)}") + + # Concatenate + # Calculate actual end date for filename (end_date - 1 day since it's exclusive) + from datetime import datetime, timedelta + end_dt = datetime.strptime(end_date, '%Y-%m-%d') - timedelta(days=1) + actual_end = end_dt.strftime('%Y-%m-%d') + + output_file = os.path.join( + args.output_dir, + f"openairframes_adsb_{start_date}_{actual_end}.csv.gz" + ) + + concatenate_csv_files(csv_files, output_file) + + print("\n" + "=" * 60) + print("Done!") + print("=" * 60) + + +if __name__ == '__main__': + main() diff --git a/scripts/run_historical_adsb_action.py b/scripts/run_historical_adsb_action.py index 3ed649a..4a17c76 100644 --- a/scripts/run_historical_adsb_action.py +++ b/scripts/run_historical_adsb_action.py @@ -38,7 +38,7 @@ def generate_date_chunks(start_date_str, end_date_str, chunk_days=15): return chunks -def trigger_workflow(start_date, end_date, chunk_days=3, branch='main', dry_run=False): +def trigger_workflow(start_date, end_date, chunk_days=1, branch='main', dry_run=False): """Trigger the historical-adsb workflow via GitHub CLI.""" cmd = [ 'gh', 'workflow', 'run', 'historical-adsb.yaml', @@ -50,18 +50,36 @@ def trigger_workflow(start_date, end_date, chunk_days=3, branch='main', dry_run= if dry_run: print(f"[DRY RUN] Would run: {' '.join(cmd)}") - return True + return True, None print(f"Triggering workflow: {start_date} to {end_date} (on {branch})") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: print(f"✓ Successfully triggered workflow for {start_date} to {end_date}") - return True + + # Get the run ID of the workflow we just triggered + # Wait a moment for it to appear + import time + time.sleep(2) + + # Get the most recent run (should be the one we just triggered) + list_cmd = [ + 'gh', 'run', 'list', + '--workflow', 'historical-adsb.yaml', + '--branch', branch, + '--limit', '1', + '--json', 'databaseId', + '--jq', '.[0].databaseId' + ] + list_result = subprocess.run(list_cmd, capture_output=True, text=True) + run_id = list_result.stdout.strip() if list_result.returncode == 0 else None + + return True, run_id else: print(f"✗ Failed to trigger workflow for {start_date} to {end_date}") print(f"Error: {result.stderr}") - return False + return False, None def main(): @@ -81,8 +99,8 @@ def main(): parser.add_argument( '--chunk-days', type=int, - default=3, - help='Days per job chunk within each workflow run (default: 3)' + default=1, + help='Days per job chunk within each workflow run (default: 1)' ) parser.add_argument( '--workflow-chunk-days', @@ -139,18 +157,27 @@ def main(): # Trigger workflows import time success_count = 0 + triggered_runs = [] for i, chunk in enumerate(chunks, 1): print(f"\n[{i}/{len(chunks)}] ", end='') - if trigger_workflow( + success, run_id = trigger_workflow( chunk['start'], chunk['end'], chunk_days=args.chunk_days, branch=args.branch, dry_run=args.dry_run - ): + ) + + if success: success_count += 1 + if run_id: + triggered_runs.append({ + 'run_id': run_id, + 'start': chunk['start'], + 'end': chunk['end'] + }) # Add delay between triggers (except for last one) if i < len(chunks) and not args.dry_run: @@ -158,6 +185,22 @@ def main(): print(f"\n\nSummary: {success_count}/{len(chunks)} workflows triggered successfully") + # Save triggered run IDs to a file + if triggered_runs and not args.dry_run: + import json + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + runs_file = f"./triggered_runs_{timestamp}.json" + with open(runs_file, 'w') as f: + json.dump({ + 'start_date': args.start_date, + 'end_date': args.end_date, + 'branch': args.branch, + 'runs': triggered_runs + }, f, indent=2) + print(f"\nRun IDs saved to: {runs_file}") + print(f"\nTo download and concatenate these artifacts, run:") + print(f" python scripts/download_and_concat_runs.py {runs_file}") + if success_count < len(chunks): sys.exit(1)