mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-04-24 20:16:12 +02:00
feat: implement download and concatenate script for workflow artifacts
This commit is contained in:
@@ -0,0 +1,182 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Download and concatenate artifacts from a specific set of workflow runs.
|
||||
|
||||
Usage:
|
||||
python scripts/download_and_concat_runs.py triggered_runs_20260216_123456.json
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def download_run_artifact(run_id, output_dir):
|
||||
"""Download artifact from a specific workflow run."""
|
||||
print(f" Downloading artifacts from run {run_id}...")
|
||||
|
||||
cmd = [
|
||||
'gh', 'run', 'download', str(run_id),
|
||||
'--pattern', 'openairframes_adsb-*',
|
||||
'--dir', output_dir
|
||||
]
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
if result.returncode == 0:
|
||||
print(f" ✓ Downloaded")
|
||||
return True
|
||||
else:
|
||||
if "no artifacts" in result.stderr.lower():
|
||||
print(f" ⚠ No artifacts found (workflow may still be running)")
|
||||
else:
|
||||
print(f" ✗ Failed: {result.stderr}")
|
||||
return False
|
||||
|
||||
|
||||
def find_csv_files(download_dir):
|
||||
"""Find all CSV.gz files in the download directory."""
|
||||
csv_files = []
|
||||
for root, dirs, files in os.walk(download_dir):
|
||||
for file in files:
|
||||
if file.endswith('.csv.gz'):
|
||||
csv_files.append(os.path.join(root, file))
|
||||
return sorted(csv_files)
|
||||
|
||||
|
||||
def concatenate_csv_files(csv_files, output_file):
|
||||
"""Concatenate CSV files in order, preserving headers."""
|
||||
import gzip
|
||||
|
||||
print(f"\nConcatenating {len(csv_files)} CSV files...")
|
||||
|
||||
with gzip.open(output_file, 'wt') as outf:
|
||||
header_written = False
|
||||
|
||||
for i, csv_file in enumerate(csv_files, 1):
|
||||
print(f" [{i}/{len(csv_files)}] Processing {os.path.basename(csv_file)}")
|
||||
|
||||
with gzip.open(csv_file, 'rt') as inf:
|
||||
lines = inf.readlines()
|
||||
|
||||
if not header_written:
|
||||
# Write header from first file
|
||||
outf.writelines(lines)
|
||||
header_written = True
|
||||
else:
|
||||
# Skip header for subsequent files
|
||||
outf.writelines(lines[1:])
|
||||
|
||||
print(f"\n✓ Concatenated CSV saved to: {output_file}")
|
||||
|
||||
# Show file size
|
||||
size_mb = os.path.getsize(output_file) / (1024 * 1024)
|
||||
print(f" Size: {size_mb:.1f} MB")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Download and concatenate artifacts from workflow runs'
|
||||
)
|
||||
parser.add_argument(
|
||||
'runs_file',
|
||||
help='JSON file containing run IDs (from run_historical_adsb_action.py)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--output-dir',
|
||||
default='./downloads/historical_concat',
|
||||
help='Directory for downloads (default: ./downloads/historical_concat)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--wait',
|
||||
action='store_true',
|
||||
help='Wait for workflows to complete before downloading'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load run IDs
|
||||
if not os.path.exists(args.runs_file):
|
||||
print(f"Error: File not found: {args.runs_file}")
|
||||
sys.exit(1)
|
||||
|
||||
with open(args.runs_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
runs = data['runs']
|
||||
start_date = data['start_date']
|
||||
end_date = data['end_date']
|
||||
|
||||
print("=" * 60)
|
||||
print("Download and Concatenate Historical Artifacts")
|
||||
print("=" * 60)
|
||||
print(f"Date range: {start_date} to {end_date}")
|
||||
print(f"Workflow runs: {len(runs)}")
|
||||
print(f"Output directory: {args.output_dir}")
|
||||
print("=" * 60)
|
||||
|
||||
# Create output directory
|
||||
os.makedirs(args.output_dir, exist_ok=True)
|
||||
|
||||
# Wait for workflows to complete if requested
|
||||
if args.wait:
|
||||
print("\nWaiting for workflows to complete...")
|
||||
for run_info in runs:
|
||||
run_id = run_info['run_id']
|
||||
print(f" Checking run {run_id}...")
|
||||
|
||||
cmd = ['gh', 'run', 'watch', str(run_id)]
|
||||
subprocess.run(cmd)
|
||||
|
||||
# Download artifacts
|
||||
print("\nDownloading artifacts...")
|
||||
successful_downloads = 0
|
||||
|
||||
for i, run_info in enumerate(runs, 1):
|
||||
run_id = run_info['run_id']
|
||||
print(f"\n[{i}/{len(runs)}] Run {run_id} ({run_info['start']} to {run_info['end']})")
|
||||
|
||||
if download_run_artifact(run_id, args.output_dir):
|
||||
successful_downloads += 1
|
||||
|
||||
print(f"\n\nDownload Summary: {successful_downloads}/{len(runs)} artifacts downloaded")
|
||||
|
||||
if successful_downloads == 0:
|
||||
print("\nNo artifacts downloaded. Workflows may still be running.")
|
||||
print("Use --wait to wait for completion, or try again later.")
|
||||
sys.exit(1)
|
||||
|
||||
# Find all CSV files
|
||||
csv_files = find_csv_files(args.output_dir)
|
||||
|
||||
if not csv_files:
|
||||
print("\nError: No CSV files found in download directory")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"\nFound {len(csv_files)} CSV file(s):")
|
||||
for csv_file in csv_files:
|
||||
print(f" - {os.path.basename(csv_file)}")
|
||||
|
||||
# Concatenate
|
||||
# Calculate actual end date for filename (end_date - 1 day since it's exclusive)
|
||||
from datetime import datetime, timedelta
|
||||
end_dt = datetime.strptime(end_date, '%Y-%m-%d') - timedelta(days=1)
|
||||
actual_end = end_dt.strftime('%Y-%m-%d')
|
||||
|
||||
output_file = os.path.join(
|
||||
args.output_dir,
|
||||
f"openairframes_adsb_{start_date}_{actual_end}.csv.gz"
|
||||
)
|
||||
|
||||
concatenate_csv_files(csv_files, output_file)
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("Done!")
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -38,7 +38,7 @@ def generate_date_chunks(start_date_str, end_date_str, chunk_days=15):
|
||||
return chunks
|
||||
|
||||
|
||||
def trigger_workflow(start_date, end_date, chunk_days=3, branch='main', dry_run=False):
|
||||
def trigger_workflow(start_date, end_date, chunk_days=1, branch='main', dry_run=False):
|
||||
"""Trigger the historical-adsb workflow via GitHub CLI."""
|
||||
cmd = [
|
||||
'gh', 'workflow', 'run', 'historical-adsb.yaml',
|
||||
@@ -50,18 +50,36 @@ def trigger_workflow(start_date, end_date, chunk_days=3, branch='main', dry_run=
|
||||
|
||||
if dry_run:
|
||||
print(f"[DRY RUN] Would run: {' '.join(cmd)}")
|
||||
return True
|
||||
return True, None
|
||||
|
||||
print(f"Triggering workflow: {start_date} to {end_date} (on {branch})")
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
if result.returncode == 0:
|
||||
print(f"✓ Successfully triggered workflow for {start_date} to {end_date}")
|
||||
return True
|
||||
|
||||
# Get the run ID of the workflow we just triggered
|
||||
# Wait a moment for it to appear
|
||||
import time
|
||||
time.sleep(2)
|
||||
|
||||
# Get the most recent run (should be the one we just triggered)
|
||||
list_cmd = [
|
||||
'gh', 'run', 'list',
|
||||
'--workflow', 'historical-adsb.yaml',
|
||||
'--branch', branch,
|
||||
'--limit', '1',
|
||||
'--json', 'databaseId',
|
||||
'--jq', '.[0].databaseId'
|
||||
]
|
||||
list_result = subprocess.run(list_cmd, capture_output=True, text=True)
|
||||
run_id = list_result.stdout.strip() if list_result.returncode == 0 else None
|
||||
|
||||
return True, run_id
|
||||
else:
|
||||
print(f"✗ Failed to trigger workflow for {start_date} to {end_date}")
|
||||
print(f"Error: {result.stderr}")
|
||||
return False
|
||||
return False, None
|
||||
|
||||
|
||||
def main():
|
||||
@@ -81,8 +99,8 @@ def main():
|
||||
parser.add_argument(
|
||||
'--chunk-days',
|
||||
type=int,
|
||||
default=3,
|
||||
help='Days per job chunk within each workflow run (default: 3)'
|
||||
default=1,
|
||||
help='Days per job chunk within each workflow run (default: 1)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--workflow-chunk-days',
|
||||
@@ -139,18 +157,27 @@ def main():
|
||||
# Trigger workflows
|
||||
import time
|
||||
success_count = 0
|
||||
triggered_runs = []
|
||||
|
||||
for i, chunk in enumerate(chunks, 1):
|
||||
print(f"\n[{i}/{len(chunks)}] ", end='')
|
||||
|
||||
if trigger_workflow(
|
||||
success, run_id = trigger_workflow(
|
||||
chunk['start'],
|
||||
chunk['end'],
|
||||
chunk_days=args.chunk_days,
|
||||
branch=args.branch,
|
||||
dry_run=args.dry_run
|
||||
):
|
||||
)
|
||||
|
||||
if success:
|
||||
success_count += 1
|
||||
if run_id:
|
||||
triggered_runs.append({
|
||||
'run_id': run_id,
|
||||
'start': chunk['start'],
|
||||
'end': chunk['end']
|
||||
})
|
||||
|
||||
# Add delay between triggers (except for last one)
|
||||
if i < len(chunks) and not args.dry_run:
|
||||
@@ -158,6 +185,22 @@ def main():
|
||||
|
||||
print(f"\n\nSummary: {success_count}/{len(chunks)} workflows triggered successfully")
|
||||
|
||||
# Save triggered run IDs to a file
|
||||
if triggered_runs and not args.dry_run:
|
||||
import json
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
runs_file = f"./triggered_runs_{timestamp}.json"
|
||||
with open(runs_file, 'w') as f:
|
||||
json.dump({
|
||||
'start_date': args.start_date,
|
||||
'end_date': args.end_date,
|
||||
'branch': args.branch,
|
||||
'runs': triggered_runs
|
||||
}, f, indent=2)
|
||||
print(f"\nRun IDs saved to: {runs_file}")
|
||||
print(f"\nTo download and concatenate these artifacts, run:")
|
||||
print(f" python scripts/download_and_concat_runs.py {runs_file}")
|
||||
|
||||
if success_count < len(chunks):
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user