This commit is contained in:
Henry
2025-10-29 17:52:31 +08:00
parent ff17ed5406
commit 6433a4ecb2
7 changed files with 1035 additions and 0 deletions

125
analyzer/README.md Normal file
View File

@@ -0,0 +1,125 @@
# SWE-Bench Trajectory Analysis
A comprehensive analysis tool for comparing failed and successful software engineering trajectories across different SWE-bench dataset splits. This tool helps identify patterns in agent failures and provides actionable insights for improving software engineering agents.
## Features
- **Multi-Split Support**: Analyze trajectories across SWE-bench splits (Verified, Lite, Test, Multimodal)
- **Automated Comparison**: Compare your agent's failed trajectories with successful trajectories from top-performing agents
- **LLM-Powered Analysis**: Uses advanced language models to generate detailed failure analysis reports
- **Batch Processing**: Process multiple instances in parallel for balanced analysis
- **Leaderboard Integration**: Automatically downloads and analyzes top agent results from SWE-bench leaderboards
## Requirements
- Python 3.8+
- Required packages (see `requirements.txt`):
- `datasets>=2.0.0` - For loading SWE-bench datasets
- `requests>=2.25.0` - For API calls
- `python-dotenv>=0.19.0` - For environment variable management
- `PyYAML>=6.0` - For configuration file parsing
- `boto3>=1.40.52` - For AWS S3 access
## Installation
1. Clone the repository and navigate to the directory:
```bash
cd trajectory_analysis
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Set up environment variables:
```bash
cp .env.example .env
# Edit .env and add your OPENROUTER_API_KEY
```
## Configuration
Edit `config.yaml` to configure your analysis:
```yaml
# Dataset configuration
dataset:
split: "Verified" # Options: Verified, Lite, Test, Multimodal
# Paths
paths:
txt_path: "/path/to/unsolved_instances.txt"
intermediate_output_path: "/path/to/intermediate_output/"
output_path: "/path/to/final_report.txt"
logs_directory: "/path/to/failed_trajectories/"
prompts_path: "/path/to/prompts_manager.py"
tools_path: "/path/to/tools/"
# Settings
settings:
max_workers: 20 # Maximum number of parallel requests to OpenRouter
num_agents: 4 # Number of top SWE-bench leaderboard agents' to download trajectories for
```
## Usage
### Basic Usage
1. **Prepare your data**:
- Create a text file listing unresolved instance IDs (one per line)
- Ensure your failed trajectories are in the specified logs directory
2. **Run the analysis**:
```bash
python run.py
```
### Input Requirements
- **Failed Trajectories**: Your agent's failed trajectories in one of these formats:
- Direct `.log` files named `{instance_id}.log`
- Issues directory structure: `issues/{numerical_id}-{instance_id}/logs/{timestamp}/run/{instance_id}/generator/{generator_id}/`
- **Unresolved Instances**: Text file containing instance IDs that your agent failed to solve
### Output
The tool generates:
- **Individual Analysis**: Detailed failure analysis for each instance in the intermediate output directory
- **Comprehensive Report**: Categorized analysis of all failures with improvement recommendations
## Architecture
### Core Components
- **`run.py`**: Main entry point and batch processing coordinator
- **`trajectory_discovery.py`**: Handles trajectory finding and leaderboard integration
- **`pairwise_analysis.py`**: LLM-powered failure analysis and dataset loading
- **`utils.py`**: Utility functions for file operations and trajectory parsing
- **`config.yaml`**: Configuration management
## Supported SWE-bench Splits
| Split | Dataset | Instances | Description |
|-------|---------|-----------|-------------|
| Verified | `princeton-nlp/SWE-bench_Verified` | 500 | Verified high-quality instances |
| Lite | `SWE-bench/SWE-bench_Lite` | 300 | Smaller subset for faster testing |
| Test | `SWE-bench/SWE-bench` | 2,294 | Full test set |
| Multimodal | `SWE-bench/SWE-bench_Multimodal` | 510 | Instances with UI elements |
## Error Handling
The tool includes robust error handling for:
- Missing trajectory files
- API rate limiting and failures
- Dataset loading errors
- Invalid configuration parameters
# Contributor
- [Hengzhi Zhang](https://henryzhang11.github.io)
## License
This project is licensed under the MIT License - see the project LICENSE file for details.

17
analyzer/config.yaml Normal file
View File

@@ -0,0 +1,17 @@
# Configuration file for trajectory analysis
paths:
txt_path: "/root/henry/trajectory_analysis/unsolved.txt"
intermediate_output_path: "/root/henry/trajectory_analysis_test/per_instance_failure_analysis"
output_path: "/root/henry/trajectory_analysis_test/failure_analysis_report.txt"
logs_directory: "/root/peteWong/project/Code-agent/batch_out/batch-first/issues"
prompts_path: "/root/henry/Code-agent/src/managers/prompts/prompts_manager.py"
tools_path: "/root/henry/Code-agent/src/tools"
# Dataset configuration
dataset:
split: "Verified" # Options: Verified, Lite, Test, Multimodal (bash-only not available on Hugging Face)
# Optional: Add other configuration parameters
settings:
max_workers: 20
num_agents: 1

View File

@@ -0,0 +1,179 @@
import json
import re
import os
import requests
import time
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Callable
from datasets import load_dataset
import random
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def failure_analysis_prompt(problem_statement: str, hints_text: str, working_trajectory: str, failed_trajectory: str, code_agent_code: str) -> str:
"""
Generate a failure analysis prompt for comparing successful and failed trajectories.
Creates a structured prompt that asks an AI model to analyze why one trajectory
succeeded while another failed, and what the failed trajectory could have done
differently to succeed.
"""
return f"""Here's a software engineering issue: {problem_statement}.
Here's the hints: {hints_text}.
Here's the trajectory that fixed the issue: {working_trajectory}.
Here's the trajectory that failed: {failed_trajectory}.
Here's the prompts/tools code for the software engineering agent: {code_agent_code}.
Why does the first SWE-agent resolve the issue but the second failed?
What could the second agent have done differently to resolve the issue?
What improvements could be made to the prompt/tool usage?
Answer in the following format:
# Why the second agent failed
...
# What could the second agent have done differently
...
# What improvements could be made to the prompt/tool usage
..."""
def openrouter_gemini_client(prompt: str, max_retries: int = 10, base_delay: float = 1.0) -> str:
"""
Send a prompt to OpenRouter API using Gemini model for analysis.
Makes API calls to OpenRouter service using Google's Gemini 2.5 Pro model
with exponential backoff retry logic. Handles authentication, rate limiting,
and error recovery for robust API communication.
Args:
prompt (str): The analysis prompt to send to the model
max_retries (int, optional): Maximum number of retry attempts. Defaults to 10.
base_delay (float, optional): Base delay in seconds for exponential backoff. Defaults to 1.0.
Returns:
str: The model's response content as a string
Raises:
ValueError: If OPENROUTER_API_KEY environment variable is not set
Exception: If API request fails after all retry attempts or returns invalid response
Note:
Requires OPENROUTER_API_KEY environment variable to be set with valid API key.
Uses exponential backoff: 1s, 2s, 4s, 8s, 16s, etc. between retries.
"""
api_key = os.getenv('OPENROUTER_API_KEY')
if not api_key:
raise ValueError("OPENROUTER_API_KEY environment variable not found")
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
data = {
"model": "google/gemini-2.5-pro",
"messages": [
{
"role": "user",
"content": prompt
}
],
"max_tokens": 200000, # Increased limit for comprehensive analysis
"temperature": 0.7 # Lower temperature for more consistent summaries
}
for attempt in range(max_retries):
try:
print(f"ATTEMPT: API attempt {attempt + 1}/{max_retries}...")
response = requests.post(url, headers=headers, json=data, timeout=30)
response.raise_for_status()
result = response.json()
if 'choices' not in result or not result['choices']:
raise Exception(f"Invalid API response: missing or empty choices")
content = result['choices'][0]['message']['content']
if not content or content.strip() == "":
print(f"WARNING: Empty content returned from API on attempt {attempt + 1}/{max_retries}")
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt) / (2 ** random.uniform(0, 1)) # Exponential backoff with jitter
print(f"WAIT: Waiting {delay:.2f} seconds before retry...")
time.sleep(delay)
continue
else:
raise Exception(f"Empty content returned from API after {max_retries} attempts")
print(f"SUCCESS: API request successful on attempt {attempt + 1}")
return content.strip()
except requests.exceptions.RequestException as e:
print(f"FAILED: Request failed on attempt {attempt + 1}/{max_retries}: {e}")
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt) / (2 ** random.uniform(0, 1)) # Exponential backoff with jitter
print(f"WAIT: Waiting {delay:.2f} seconds before retry...")
time.sleep(delay)
continue
else:
raise Exception(f"FAILED: OpenRouter API request failed after {max_retries} attempts: {e}")
except (KeyError, IndexError) as e:
raise Exception(f"FAILED: Unexpected response format from OpenRouter API: {e}")
raise Exception(f"FAILED: All {max_retries} attempts failed")
def get_swe_bench_problem_and_hints_for_split(instance_id: str, split_name: str) -> tuple[str, str]:
"""
Retrieve problem statement and hints for a specific SWE-Bench instance in a specific split.
Loads the appropriate SWE-Bench dataset and searches for the specified instance ID to extract the problem statement and hints text.
This data is used for generating analysis prompts and understanding the context of the software engineering problem.
Args:
instance_id (str): The unique identifier for the SWE-Bench instance
(e.g., 'scikit-learn__scikit-learn-10908')
split_name (str): The split name (e.g., "Verified", "Lite", "Test", "Multimodal", "bash-only")
Returns:
tuple[str, str]: A tuple containing (problem_statement, hints_text)
Raises:
ValueError: If the specified instance_id is not found in the dataset
Note:
This function loads the full SWE-Bench dataset for the specified split, which may take some time on first execution due to dataset download and caching.
"""
dataset_mapping = {
"Verified": "princeton-nlp/SWE-bench_Verified",
"Lite": "SWE-bench/SWE-bench_Lite",
"Test": "SWE-bench/SWE-bench",
"Multimodal": "SWE-bench/SWE-bench_Multimodal",
# Note: bash-only dataset not yet available on Hugging Face
}
dataset_name = dataset_mapping.get(split_name)
if not dataset_name:
raise ValueError(f"No dataset mapping found for split '{split_name}'")
ds = load_dataset(dataset_name, split="test")
target_row = None
for row in ds:
if row['instance_id'] == instance_id:
target_row = row
break
if target_row is None:
raise ValueError(f"Instance ID '{instance_id}' not found in the {split_name} dataset")
problem_statement = target_row['problem_statement']
hints_text = target_row['hints_text']
return problem_statement, hints_text

View File

@@ -0,0 +1,5 @@
datasets>=2.0.0
requests>=2.25.0
python-dotenv>=0.19.0
PyYAML>=6.0
boto3>=1.40.52

222
analyzer/run.py Normal file
View File

@@ -0,0 +1,222 @@
import json
import os
import yaml
from typing import List, Dict, Any, Tuple, Optional
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from trajectory_discovery import find_failed_trajectory, find_successful_trajectory_for_split, download_top_agents_logs_for_split, count_top_agents_solved_for_split
from pairwise_analysis import get_swe_bench_problem_and_hints_for_split, failure_analysis_prompt, openrouter_gemini_client
def load_config(config_path: str = "config.yaml") -> Dict[str, Any]:
"""Load configuration from YAML file."""
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def extract_code_agent_code(prompts_path: str, tools_path: str) -> Dict[str, str]:
"""
Extract content from Code-agent directory including prompts_manager.py and tools/ files.
"""
result = ""
prompts_path = Path(prompts_path)
if prompts_path.exists():
try:
with open(prompts_path, 'r', encoding='utf-8') as f:
result += f"{prompts_path.name}: \n" + "```python\n" + f.read() + "```\n"
except Exception as e:
print(f"Error reading prompts_manager.py: {e}")
tools_path = Path(tools_path)
if tools_path.exists():
for file_path in tools_path.iterdir():
if file_path.is_file() and not file_path.name.startswith('test'):
try:
with open(file_path, 'r', encoding='utf-8') as f:
result += f"{file_path.name}: \n" + "```python\n" + f.read() + "```\n"
except Exception as e:
print(f"Error reading {file_path}: {e}")
return result
def analyze_failure_causes(instance_id: str, current_directory: str, code_agent_code: str, split_name: str, num_agents: int = 5) -> Optional[str]:
"""
Compare trajectories alongside test results to analyze specific failure causes.
This function finds both failed and successful trajectories for a given instance, retrieves the problem context, and uses an LLM to analyze why the failure occurred.
Args:
instance_id: The instance ID to analyze
current_directory: Directory containing failed trajectories
code_agent_code: Code agent code for analysis
split_name: The split name (e.g., "Verified", "Lite", "Test", "Multimodal", "bash-only")
num_agents: Number of top agents to check for successful trajectories
"""
failed_trajectory_path = find_failed_trajectory(instance_id, current_directory)
if not failed_trajectory_path:
print(f"No failed trajectory found for {instance_id}")
return None
successful_trajectory_path = find_successful_trajectory_for_split(instance_id, split_name, num_agents)
if not successful_trajectory_path:
print(f"No successful trajectory found for {instance_id} in {split_name} split")
return None
try:
with open(failed_trajectory_path, 'r', encoding='utf-8') as f:
failed_content = f.read()
with open(successful_trajectory_path, 'r', encoding='utf-8') as f:
successful_content = f.read()
problem_statement, hints_text = get_swe_bench_problem_and_hints_for_split(instance_id, split_name)
prompt = failure_analysis_prompt(problem_statement, hints_text, successful_content, failed_content, code_agent_code)
analysis_result = openrouter_gemini_client(prompt)
formatted_result = f"Instance: {instance_id}\nAnalysis: {analysis_result}\n"
print(f"Analysis result for {instance_id}: {analysis_result}")
return formatted_result
except Exception as e:
print(f"Error analyzing {instance_id}: {e}")
return None
def batch_failure_analysis(unresolved_id_txt_path: str, intermediate_output_path: str, output_path: str, logs_directory: str, prompts_path: str, tools_path: str, split_name: str, max_workers: int = 20, num_agents: int = 5) -> None:
"""
Complete batch failure analysis pipeline for a specific split.
Process failed trajectories, find successful trajectories, analyze failure causes, and categorize them by cause. Generate a comprehensive report with recommendations and save it to output_path.
Args:
unresolved_id_txt_path: Path to text file containing unresolved instance IDs
intermediate_output_path: Path to directory for intermediate output files
output_path: Path to final output report file
logs_directory: Directory containing failed trajectory logs
prompts_path: Path to prompts manager Python file
tools_path: Path to tools directory
split_name: The split name (e.g., "Verified", "Lite", "Test", "Multimodal", "bash-only")
max_workers: Maximum number of worker threads
num_agents: Number of top agents to analyze
"""
print(f"Regenerating top agents performance CSV for {split_name} split...")
count_top_agents_solved_for_split(split_name, num_agents)
print(f"Ensuring experiments repository and logs are up to date for {split_name} split...")
download_top_agents_logs_for_split("experiments", split_name, num_agents)
try:
with open(unresolved_id_txt_path, 'r', encoding='utf-8') as f:
unresolved_ids = [line.strip() for line in f.readlines() if line.strip()]
if not unresolved_ids:
print("No unresolved instances found in text file")
return
print(f"Found {len(unresolved_ids)} unresolved instances to analyze")
except Exception as e:
print(f"Error loading text file: {e}")
return
failure_analysis_results = []
skipped_instances = []
code_agent_code = extract_code_agent_code(prompts_path, tools_path)
# create the intermediate output directory if it doesn't exist
os.makedirs(intermediate_output_path, exist_ok=True)
def process_instance(instance_id):
print(f"\nProcessing: {instance_id}")
try:
analysis_result = analyze_failure_causes(instance_id, logs_directory, code_agent_code, split_name, num_agents)
if analysis_result:
with open(os.path.join(intermediate_output_path, f'{instance_id}.txt'), 'w', encoding='utf-8') as f:
f.write(analysis_result)
return (instance_id, analysis_result, None)
else:
print(f"Skipped {instance_id} - no trajectories found")
return (instance_id, None, "no trajectories found")
except Exception as e:
print(f"Error processing {instance_id}: {e}")
return (instance_id, None, str(e))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_instance, instance_id) for instance_id in unresolved_ids]
for future in as_completed(futures):
instance_id, analysis_result, error = future.result()
if analysis_result:
failure_analysis_results.append(analysis_result)
else:
skipped_instances.append(instance_id)
# combine the intermediate output files into a single file
with open(output_path, 'w', encoding='utf-8') as f:
for file in os.listdir(intermediate_output_path):
if file.endswith('.txt'):
with open(os.path.join(intermediate_output_path, file), 'r', encoding='utf-8') as f1:
f.write(f1.read() + '\n')
# read the combined failure analysis report
with open(output_path, 'r', encoding='utf-8') as f:
failure_analysis_report = f.read()
batch_prompt = f"""Read failure analysis reports and group failure causes into categories.
Here are the failure analysis reports:
{failure_analysis_report}
Here are the prompts/tools code for the software engineering agent:
{code_agent_code}
Please read the above analysis and categorize the failure causes into categories.
For each category, please describe the failure cause in detail, list related instances, and suggest improvements for the prompt/tool usage to address the failure cause.
Format your response as:
# Problem 1:
**Description**: [Your analysis of this problem]
**Related Instances**:
- instance_id_1: [brief description of the failure]
- instance_id_2: [brief description of the failure]
**Improvements**: [Suggestions for improving the prompt/tool usage to address the failure cause]
# Problem 2:
**Description**: [Your analysis of this problem]
**Related Instances**:
- instance_id_3: [brief description of the failure]
- instance_id_4: [brief description of the failure]
**Improvements**: [Suggestions for improving the prompt/tool usage to address the failure cause]
Continue for all identified problems."""
try:
batch_analysis = openrouter_gemini_client(batch_prompt)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(batch_analysis)
except Exception as e:
print(f"Error generating batch analysis: {e}")
return f"Error generating batch analysis: {e}"
if __name__ == "__main__":
config = load_config("config.yaml")
# Validate paths exist
paths = config["paths"]
for path_name in ["txt_path", "logs_directory", "prompts_path", "tools_path"]:
if not os.path.exists(paths[path_name]):
raise FileNotFoundError(f"Path {path_name}: {paths[path_name]} does not exist")
# Validate dataset split
split_name = config["dataset"]["split"]
valid_splits = ["Verified", "Lite", "Test", "Multimodal"] # bash-only not yet available on Hugging Face
if split_name not in valid_splits:
raise ValueError(f"split must be one of {valid_splits}, got: {split_name}")
# Validate max_workers
max_workers = config["settings"]["max_workers"]
if not isinstance(max_workers, int) or max_workers <= 0:
raise ValueError(f"max_workers must be positive integer, got: {max_workers}")
# Validate num_agents
num_agents = config["settings"]["num_agents"]
if not isinstance(num_agents, int) or num_agents <= 0:
raise ValueError(f"num_agents must be positive integer, got: {num_agents}")
batch_failure_analysis(
paths["txt_path"],
paths["intermediate_output_path"],
paths["output_path"],
paths["logs_directory"],
paths["prompts_path"],
paths["tools_path"],
split_name,
max_workers,
num_agents
)

View File

@@ -0,0 +1,386 @@
from __future__ import annotations
import os
import re
import subprocess
import sys
import json
import pathlib
import csv
import urllib.request
import argparse
from typing import Optional, List, Tuple, Dict, Any
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
from utils import read_json_url_or_path
from datasets import load_dataset
from utils import find_failed_trajectory_in_issues_directory
LEADERBOARD_JSON = "https://raw.githubusercontent.com/swe-bench/swe-bench.github.io/master/data/leaderboards.json"
EXPERIMENTS_DIR = str(Path(__file__).parent / "experiments")
EXPERIMENTS_REPO_URL = "https://github.com/SWE-bench/experiments.git"
def top_agents_for_split(leaderboards_json: dict, split_name: str, num_agents: int = 5) -> list[dict]:
"""
Get the top N SWE-bench agents for a specific split from the leaderboard.
Args:
leaderboards_json: The leaderboard JSON data
split_name: The split name (e.g., "Verified", "Lite", "Test", "Multimodal")
num_agents: Number of top agents to return (default: 5)
Returns:
List of top N agents sorted by resolution rate
"""
lbs = leaderboards_json.get("leaderboards") or leaderboards_json
split_lb = next((lb for lb in lbs if (lb.get("name") or "").lower() == split_name.lower()), None)
if not split_lb:
raise KeyError(f"Could not find '{split_name}' leaderboard in leaderboards.json")
rows = [r for r in split_lb.get("results", []) if not r.get("warning")]
rows.sort(key=lambda r: float(r.get("resolved", 0)), reverse=True)
return rows[:num_agents]
def read_submission_resolved_ids(submission_dir: pathlib.Path) -> set[str]:
"""
Read the resolved instance_ids from the results.json file in a SWE-bench verified submission folder.
"""
fp = submission_dir / "results" / "results.json"
if not fp.is_file():
return set()
try:
j = json.loads(fp.read_text(encoding="utf-8"))
return set(j.get("resolved", []))
except Exception:
return set()
def load_canonical_ids_for_split(split_name: str) -> list[str]:
"""
Load canonical instance IDs for a specific SWE-bench split.
Args:
split_name: The split name (e.g., "Verified", "Lite", "Test", "Multimodal")
Returns:
List of sorted instance IDs for the split
"""
dataset_mapping = {
"Verified": "princeton-nlp/SWE-bench_Verified",
"Lite": "SWE-bench/SWE-bench_Lite",
"Test": "SWE-bench/SWE-bench",
"Multimodal": "SWE-bench/SWE-bench_Multimodal",
# Note: bash-only dataset not available on Hugging Face
}
dataset_name = dataset_mapping.get(split_name)
if not dataset_name:
print(f"Warning: No dataset mapping found for split '{split_name}'")
return []
try:
ds = load_dataset(dataset_name, split="test")
ids = sorted(ds["instance_id"])
print(f"Loaded {len(ids)} instance IDs for {split_name} split")
return ids
except Exception as e:
print(f"Error loading dataset for {split_name}: {e}")
return []
def ensure_experiments_repo() -> pathlib.Path:
"""Clone or update the SWE-bench experiments repository."""
experiments_path = pathlib.Path(EXPERIMENTS_DIR).expanduser().resolve()
if experiments_path.exists() and (experiments_path / ".git").exists():
try:
subprocess.run(["git", "pull"], cwd=experiments_path, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to update repository: {e.stderr}")
print("Continuing with existing version...")
else:
try:
subprocess.run(["git", "clone", EXPERIMENTS_REPO_URL, str(experiments_path)], check=True, capture_output=True, text=True)
print("Repository cloned successfully")
except subprocess.CalledProcessError as e:
print(f"Error: Failed to clone repository: {e.stderr}")
sys.exit(1)
return experiments_path
def count_top_agents_solved_for_split(split_name: str, num_agents: int = 5):
"""
Generate CSV showing which top agents solved which instances for a specific split.
Args:
split_name: The split name (e.g., "Verified", "Lite", "Test", "Multimodal")
num_agents: Number of top agents to analyze
"""
# Ensure we have the latest experiments repository
ensure_experiments_repo()
split_root = Path(EXPERIMENTS_DIR) / "evaluation" / split_name.lower()
if not split_root.is_dir():
print(f"ERROR: {split_root} not found. Set EXPERIMENTS_DIR correctly.", file=sys.stderr)
sys.exit(2)
lb = read_json_url_or_path(LEADERBOARD_JSON)
top_agents = top_agents_for_split(lb, split_name, num_agents)
columns = []
local_union_ids: set[str] = set()
print(f"Top {num_agents} ({split_name}):")
for i, row in enumerate(top_agents, 1):
folder = row.get("folder")
name = row.get("name") or f"rank{i}"
print(f"{i:2d}. {name} | resolved={row.get('resolved')} | folder={folder}")
if not folder:
continue
subdir = split_root / folder
if not subdir.is_dir():
print(f"[warn] missing folder locally: {subdir}", file=sys.stderr)
continue
resolved = read_submission_resolved_ids(subdir)
columns.append((f"{i:02d}_{folder}", resolved))
# also collect skipped ids so rows don't get lost
try:
j = json.loads((subdir / "results" / "results.json").read_text(encoding="utf-8"))
local_union_ids |= set(j.get("resolved", []))
local_union_ids |= set(j.get("no_generation", []))
local_union_ids |= set(j.get("no_logs", []))
except Exception:
pass
# decide row universe (instance_ids)
instance_ids = load_canonical_ids_for_split(split_name)
if not instance_ids:
instance_ids = sorted(local_union_ids)
print(f"[warn] using local union of IDs ({len(instance_ids)}). "
f"For guaranteed canonical IDs, install `datasets` and internet access.", file=sys.stderr)
out_csv = pathlib.Path(f"top_agents_performance_{split_name.lower()}.csv")
headers = ["instance_id"] + [col for col, _ in columns] + ["any_top_agents_solved"]
with out_csv.open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(headers)
non_resolved_instances = []
for iid in instance_ids:
row_vals = [iid]
solved_by_any = 0
for _, resolved in columns:
solved = 1 if iid in resolved else 0
row_vals.append(solved)
if solved == 1:
solved_by_any = 1
row_vals.append(solved_by_any)
w.writerow(row_vals)
if solved_by_any == 0:
non_resolved_instances.append(iid)
print(f"\nResults saved to: {out_csv}")
print(f"\nNon-resolved instances ({len(non_resolved_instances)}):")
for instance_id in non_resolved_instances:
print(f" {instance_id}")
def find_failed_trajectory(instance_id: str, experiments_dir: str) -> Optional[str]:
"""
Find a failed trajectory for the given instance within the experiments directory (assume names like '{instance_id}.log').
If the file exists, return the path to the file.
Otherwise, call find_failed_trajectory_in_issues_directory to find the failed trajectory.
"""
trajectory_path = os.path.join(experiments_dir, instance_id + ".log")
if os.path.exists(trajectory_path):
return trajectory_path
else:
return find_failed_trajectory_in_issues_directory(instance_id, experiments_dir)
def find_successful_trajectory_for_split(instance_id: str, split_name: str, num_agents: int = 5) -> Optional[str]:
"""
Find a successful trajectory for the given instance in a specific split.
Checks the top agents performance CSV to find which agents solved the instance, then looks in each agent's directory for trajectory files.
Tries agents in order of ranking until a trajectory is found.
Special handling for 20250915_JoyCode agent where trajectories are in instance-named folders.
Automatically downloads logs and generates CSV if missing.
Args:
instance_id: The instance ID to find trajectory for
split_name: The split name (e.g., "Verified", "Lite", "Test", "Multimodal")
num_agents: Number of top agents to check
"""
# Check if trajectory folders exist, download if missing
split_dir = os.path.join(EXPERIMENTS_DIR, "evaluation", split_name.lower())
if not os.path.exists(split_dir):
download_top_agents_logs_for_split(EXPERIMENTS_DIR, split_name, num_agents)
csv_path = os.path.join(os.path.dirname(__file__), f"top_agents_performance_{split_name.lower()}.csv")
if not os.path.exists(csv_path):
count_top_agents_solved_for_split(split_name, num_agents)
# Get all agents that solved this instance
solved_agents = _find_all_agents_for_instance(instance_id, csv_path)
if not solved_agents:
print(f"No agent found that solved {instance_id} in {split_name} split")
return None
# Try each agent until we find a trajectory
for agent_name in solved_agents:
print(f"Trying agent {agent_name} for {instance_id}")
# Special handling for JoyCode agent
if agent_name == "20250915_JoyCode":
trajectory_path = _find_joycode_trajectory_for_split(instance_id, EXPERIMENTS_DIR, split_name)
else:
trajectory_path = _find_regular_agent_trajectory_for_split(instance_id, agent_name, EXPERIMENTS_DIR, split_name)
if trajectory_path:
print(f"Found trajectory for {instance_id} using agent {agent_name}")
return trajectory_path
else:
print(f"No trajectory found for {instance_id} using agent {agent_name}")
return None
def _find_best_agent_for_instance(instance_id: str, csv_path: str) -> Optional[str]:
"""
Find the best agent that solved the given instance by checking the CSV.
"""
try:
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if row['instance_id'] == instance_id:
for col_name in reader.fieldnames:
if col_name.startswith(('01_', '02_', '03_', '04_', '05_', '06_', '07_', '08_', '09_', '10_')) and row.get(col_name) == '1':
agent_name = col_name[3:]
return agent_name
return None
return None
except Exception as e:
print(f"Error reading CSV: {e}")
return None
return None
def _find_all_agents_for_instance(instance_id: str, csv_path: str) -> List[str]:
"""
Find all agents that solved the given instance by checking the CSV.
Returns a list of agent names sorted by their ranking (01_, 02_, etc.).
"""
agents = []
try:
with open(csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if row['instance_id'] == instance_id:
for col_name in reader.fieldnames:
if col_name.startswith(('01_', '02_', '03_', '04_', '05_', '06_', '07_', '08_', '09_', '10_')) and row.get(col_name) == '1':
agent_name = col_name[3:]
agents.append(agent_name)
break
except Exception as e:
print(f"Error reading CSV: {e}")
return []
return agents
def _find_joycode_trajectory_for_split(instance_id: str, experiments_dir: str, split_name: str) -> Optional[str]:
"""
Find trajectory for JoyCode agent in a specific split (special case with instance-named folders).
"""
joycode_trajs_dir = os.path.join(experiments_dir, "evaluation", split_name.lower(), "20250915_JoyCode", "trajs")
instance_dir = os.path.join(joycode_trajs_dir, instance_id)
if not os.path.exists(instance_dir):
print(f"JoyCode trajectory directory not found: {instance_dir}")
return None
completed_logs_path = os.path.join(instance_dir, "completed_logs.txt")
if os.path.exists(completed_logs_path):
return os.path.abspath(completed_logs_path)
else:
print(f"completed_logs.txt not found in {instance_dir}")
return None
def _find_regular_agent_trajectory_for_split(instance_id: str, agent_name: str, experiments_dir: str, split_name: str) -> Optional[str]:
"""
Find trajectory for regular agents in a specific split (files named with instance_id).
"""
agent_trajs_dir = os.path.join(experiments_dir, "evaluation", split_name.lower(), agent_name, "trajs")
if not os.path.exists(agent_trajs_dir):
print(f"Agent trajectory directory not found: {agent_trajs_dir}")
return None
try:
for filename in os.listdir(agent_trajs_dir):
if instance_id in filename:
trajectory_path = os.path.join(agent_trajs_dir, filename)
if os.path.isfile(trajectory_path):
return os.path.abspath(trajectory_path)
except OSError:
pass
return None
def download_single_agent_for_split(agent_info: tuple, experiments_dir: str, split_name: str) -> tuple[str, bool, str]:
"""
Download logs for a single agent in a specific split. Returns (agent_name, success, error_message).
"""
i, agent = agent_info
folder = agent.get("folder")
name = agent.get("name") or f"rank{i}"
if not folder:
return (name, False, "No folder specified")
# Check if trajectories already exist
trajs_path = os.path.join(experiments_dir, "evaluation", split_name.lower(), folder, "trajs")
if os.path.exists(trajs_path) and os.listdir(trajs_path):
print(f"[{name}] ✓ Trajectories already exist, skipping download")
return (name, True, "")
download_path = f"evaluation/{split_name.lower()}/{folder}"
cmd = [sys.executable, "-m", "analysis.download_logs", "--only_trajs", "--skip_existing", download_path]
try:
result = subprocess.run(cmd, cwd=experiments_dir, check=True)
print(f"[{name}] ✓ Download completed successfully")
return (name, True, "")
except Exception as e:
error_msg = f"Unexpected error: {e}"
print(f"[{name}] ✗ {error_msg}")
return (name, False, error_msg)
def download_top_agents_logs_for_split(experiments_dir: str = "experiments", split_name: str = "Verified", num_agents: int = 5) -> None:
"""
Download logs for all top N agents in a specific split using parallel processing.
Args:
experiments_dir: Directory containing the experiments repository
split_name: The split name (e.g., "Verified", "Lite", "Test", "Multimodal")
num_agents: Number of top agents to download logs for (default: 5)
"""
ensure_experiments_repo()
print("Fetching leaderboard data...")
leaderboard_json = read_json_url_or_path(LEADERBOARD_JSON)
top_agents = top_agents_for_split(leaderboard_json, split_name, num_agents)
print(f"Checking/downloading trajectories for {len(top_agents)} agents in {split_name} split...")
agent_infos = [(i, agent) for i, agent in enumerate(top_agents, 1)]
with ProcessPoolExecutor(max_workers=10) as executor:
future_to_agent = {
executor.submit(download_single_agent_for_split, agent_info, experiments_dir, split_name): agent_info[1]
for agent_info in agent_infos
}
successful = 0
failed = 0
for future in as_completed(future_to_agent):
agent = future_to_agent[future]
try:
name, success, error_msg = future.result()
if success:
successful += 1
else:
failed += 1
except Exception as e:
name = agent.get("name", "Unknown")
print(f"[{name}] ✗ Exception: {e}")
failed += 1
print(f"\nDownload completed for {split_name} split. Success: {successful}, Failed: {failed}")

101
analyzer/utils.py Normal file
View File

@@ -0,0 +1,101 @@
import urllib.request
import json
import pathlib
import os
import re
from typing import Optional
def read_json_url_or_path(src: str) -> dict:
if src.startswith(("http://", "https://")):
with urllib.request.urlopen(src, timeout=60) as r:
return json.load(r)
p = pathlib.Path(src).expanduser().resolve()
with p.open("r", encoding="utf-8") as f:
return json.load(f)
# an "issues" directory containing "{numerical_id}-{instance_id}/logs/{timestamp}/run/{instance_id}/generator/{generator_id}/" and within it "notice.log" and "debug.log"
def find_failed_trajectory_in_issues_directory(instance_id: str, experiments_dir: str) -> Optional[str]:
"""
Find a failed trajectory for the given instance within the experiments directory.
The function searches for debug.log files that contain successful patches and returns
the path to the corresponding notice.log file.
The function searches through the directory structure:
{experiments_dir}/{numerical_id}-{instance_id}/logs/{timestamp}/run/{instance_id}/generator/{generator_id}/
"""
if not os.path.exists(experiments_dir):
return None
pattern = re.compile(r'^\d+-' + re.escape(instance_id) + r'$')
for item in os.listdir(experiments_dir):
if pattern.match(item):
instance_dir = os.path.join(experiments_dir, item)
if not os.path.isdir(instance_dir):
continue
logs_dir = os.path.join(instance_dir, 'logs')
if not os.path.exists(logs_dir):
continue
timestamp_dirs = []
for timestamp in os.listdir(logs_dir):
timestamp_path = os.path.join(logs_dir, timestamp)
if os.path.isdir(timestamp_path):
timestamp_dirs.append(timestamp)
timestamp_dirs.sort(reverse=True)
for timestamp in timestamp_dirs:
timestamp_path = os.path.join(logs_dir, timestamp)
run_dir = os.path.join(timestamp_path, 'run', instance_id, 'generator')
if not os.path.exists(run_dir):
continue
for generator_id in range(5):
generator_dir = os.path.join(run_dir, f"{generator_id:03d}")
debug_log_path = os.path.join(generator_dir, 'debug.log')
notice_log_path = os.path.join(generator_dir, 'notice.log')
if not os.path.exists(debug_log_path):
continue
if _contains_successful_patch(debug_log_path):
if os.path.exists(notice_log_path):
return os.path.abspath(notice_log_path)
return None
def _contains_successful_patch(debug_log_path: str) -> bool:
"""
Check if a debug.log file contains a successful patch.
Reads the last non-empty line, splits on 'result_data: ', parses the dict,
and checks if the golden_patch contains non-empty patch_content.
"""
try:
with open(debug_log_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
last_line = None
for line in reversed(lines):
if line.strip():
last_line = line.strip()
break
if not last_line:
return False
if 'result_data: ' not in last_line:
return False
parts = last_line.split('result_data: ', 1)
if len(parts) != 2:
return False
dict_str = parts[1]
try:
result_data = eval(dict_str)
except:
return False
if not isinstance(result_data, dict):
return False
golden_patch = result_data.get('golden_patch')
if not isinstance(golden_patch, list) or len(golden_patch) == 0:
return False
first_patch = golden_patch[0]
if not isinstance(first_patch, dict):
return False
patch_content = first_patch.get('patch_content')
if not isinstance(patch_content, str) or not patch_content.strip():
return False
return True
except (IOError, UnicodeDecodeError, Exception):
return False