mirror of
https://github.com/dongdongunique/EvoSynth.git
synced 2026-06-02 21:21:42 +02:00
608 lines
27 KiB
Python
608 lines
27 KiB
Python
"""
|
|
Simple disk saving utilities for storing session results
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any
|
|
import aiofiles
|
|
|
|
class SimpleDataSaver:
|
|
"""Simple disk saving for session results - one folder per attack session"""
|
|
|
|
def __init__(self, base_path: str = "./attack_sessions"):
|
|
self.base_path = base_path
|
|
os.makedirs(base_path, exist_ok=True)
|
|
|
|
def create_session_folder(self, session_id: str = None,target_model="", create_subfolder=False) -> str:
|
|
"""Create a folder for the current session"""
|
|
if session_id is None:
|
|
session_id = f"attack_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
|
|
|
|
if create_subfolder and target_model:
|
|
# Create subfolder with session_id+target_model
|
|
session_folder = os.path.join(self.base_path, session_id+"-"+target_model)
|
|
os.makedirs(session_folder, exist_ok=True)
|
|
else:
|
|
# Just use the base_path as session folder
|
|
session_folder = self.base_path
|
|
os.makedirs(session_folder, exist_ok=True)
|
|
|
|
print(f"📁 Session folder created: {session_folder}")
|
|
return session_folder
|
|
|
|
async def save_queries(self, queries: List[str], session_folder: str) -> str:
|
|
"""Save the queries list to JSON file asynchronously"""
|
|
filename = os.path.join(session_folder, "queries.json")
|
|
|
|
data = {
|
|
"queries": queries,
|
|
"count": len(queries),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
|
|
await f.write(json.dumps(data, indent=2, ensure_ascii=False))
|
|
|
|
print(f"📝 Queries saved to: {filename}")
|
|
return filename
|
|
|
|
async def save_session_results(self, session_data: Dict, session_folder: str) -> str:
|
|
"""Save complete session results to JSON file asynchronously"""
|
|
filename = os.path.join(session_folder, "session_results.json")
|
|
|
|
data = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"session_data": session_data
|
|
}
|
|
|
|
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
|
|
await f.write(json.dumps(data, indent=2, ensure_ascii=False, default=str))
|
|
|
|
print(f"📊 Session results saved to: {filename}")
|
|
return filename
|
|
|
|
async def extract_and_save_tool_usage(self, session_data: Dict, session_folder: str, context=None) -> str:
|
|
"""Extract detailed tool data from session results and save to JSON file"""
|
|
filename = os.path.join(session_folder, "tool_usage.json")
|
|
|
|
# Extract ALL created tools using the to_dict function
|
|
all_tools_data = []
|
|
|
|
# Get all created tools from context if available
|
|
if context and hasattr(context, 'created_tools'):
|
|
# Handle both dict and list cases
|
|
if isinstance(context.created_tools, dict):
|
|
all_tools = list(context.created_tools.values())
|
|
else:
|
|
all_tools = list(context.created_tools)
|
|
|
|
# Use the to_dict method for formatted tool data
|
|
for tool in all_tools:
|
|
tool_dict = tool.to_dict()
|
|
all_tools_data.append(tool_dict)
|
|
|
|
# Extract tool data organized by queries
|
|
tools_data = []
|
|
query_results = session_data.get("query_results", {})
|
|
|
|
for query, results in query_results.items():
|
|
# Get tools that have performance data for this specific query
|
|
query_tools = []
|
|
|
|
for tool_dict in all_tools_data:
|
|
# Check if this tool has performance data for the current query
|
|
has_query_performance = False
|
|
for query_perf in tool_dict["performance"]["query_performance"]:
|
|
if query_perf.get('query') == query:
|
|
has_query_performance = True
|
|
break
|
|
|
|
if has_query_performance:
|
|
query_tools.append(tool_dict)
|
|
|
|
# Get tool creation results from session data
|
|
tool_creation_results = results.get("tool_creation_results", [])
|
|
|
|
query_data = {
|
|
"query": query,
|
|
"query_successful": results.get("query_successful", False),
|
|
"tools_created_count": len(query_tools),
|
|
"tool_creation_results": tool_creation_results,
|
|
"created_tools": query_tools
|
|
}
|
|
tools_data.append(query_data)
|
|
|
|
# Calculate overall statistics
|
|
total_tools = len(all_tools_data)
|
|
successful_queries_with_tools = len([t for t in tools_data if t["tools_created_count"] > 0])
|
|
|
|
# Tool category statistics
|
|
category_stats = {}
|
|
for tool_dict in all_tools_data:
|
|
category = tool_dict["tool_category"]
|
|
if category not in category_stats:
|
|
category_stats[category] = {"count": 0, "tools": []}
|
|
category_stats[category]["count"] += 1
|
|
category_stats[category]["tools"].append(tool_dict["tool_name"])
|
|
|
|
data = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"all_created_tools": all_tools_data,
|
|
"tools_by_query": tools_data,
|
|
"summary_statistics": {
|
|
"total_tools_created": total_tools,
|
|
"successful_queries_with_tools": successful_queries_with_tools,
|
|
"queries_with_tools_rate": f"{successful_queries_with_tools}/{len(query_results)}",
|
|
"total_unique_tool_names": len(set(tool["tool_name"] for tool in all_tools_data))
|
|
},
|
|
"tool_category_statistics": category_stats
|
|
}
|
|
|
|
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
|
|
await f.write(json.dumps(data, indent=2, ensure_ascii=False, default=str))
|
|
|
|
print(f"🛠️ Detailed tool usage saved to: {filename}")
|
|
print(f" - Total tools created: {total_tools}")
|
|
print(f" - Queries with tools: {successful_queries_with_tools}/{len(query_results)}")
|
|
print(f" - Tool categories: {len(category_stats)}")
|
|
return filename
|
|
|
|
async def save_final_report(self, report_data: Dict, session_folder: str) -> str:
|
|
"""Save final session report to JSON file"""
|
|
filename = os.path.join(session_folder, "final_report.json")
|
|
|
|
data = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"report": report_data
|
|
}
|
|
|
|
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
|
|
await f.write(json.dumps(data, indent=2, ensure_ascii=False, default=str))
|
|
|
|
print(f"📋 Final report saved to: {filename}")
|
|
return filename
|
|
|
|
async def save_successful_multi_turn_attacks(self, session_data: Dict, session_folder: str, context=None) -> str:
|
|
"""Save successful multi-turn attacks with strategies, conversations and scores for the current query"""
|
|
filename = os.path.join(session_folder, "successful_multi_turn_attacks.json")
|
|
|
|
if not context:
|
|
return filename
|
|
|
|
# Get attack history for current query only
|
|
attack_history = getattr(context, 'session_data', {}).get('attack_history', [])
|
|
print(f"found {len(attack_history)} attacks")
|
|
attack_history = [attack for attack in attack_history if attack.get("original_query", "") == context.original_query]
|
|
print(f"found {len(attack_history)} successful original query attacks")
|
|
|
|
successful_attacks = []
|
|
|
|
for attack in attack_history:
|
|
#print(attack.keys())
|
|
#print("final judge score: ",attack['final_judge_score'])
|
|
# print(attack)
|
|
if attack.get('final_judge_score', 0) >= 5:
|
|
conversation_result = attack.get('multi_turn_results', {})
|
|
conversation_history = conversation_result.get('conversation_history', [])
|
|
|
|
# Extract conversation turns
|
|
|
|
successful_attacks.append({
|
|
"query": attack.get('original_query', ''),
|
|
"tool_name": attack.get('tool_name', ''),
|
|
"strategy_plan": attack.get('strategy_plan', ''),
|
|
"final_score": attack.get('final_judge_score', 0),
|
|
"total_turns": attack.get('total_turns', 0),
|
|
"conversation_turns": conversation_history,
|
|
"conversation_successful": conversation_result.get('conversation_successful', False),
|
|
"highest_score": conversation_result.get('highest_score', 0),
|
|
"average_score": conversation_result.get('average_score', 0),
|
|
"success_turn": conversation_result.get('success_turn', 0),
|
|
"timestamp": conversation_result.get('completed_at', '')
|
|
})
|
|
|
|
export_data = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"query": context.original_query,
|
|
"total_successful_attacks": len(successful_attacks),
|
|
"successful_attacks": successful_attacks
|
|
}
|
|
|
|
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
|
|
await f.write(json.dumps(export_data, indent=2, ensure_ascii=False, default=str))
|
|
|
|
print(f"🎯 Saved {len(successful_attacks)} successful multi-turn attacks for query to: {filename}")
|
|
return filename
|
|
|
|
async def save_session_summary(self, session_data: Dict, session_folder: str, context=None) -> str:
|
|
"""Save a comprehensive text summary of the session with jailbreak tool details and attack results"""
|
|
filename = os.path.join(session_folder, "summary.txt")
|
|
|
|
query_results = session_data.get("query_results", {})
|
|
successful_queries = session_data.get("successful_queries", [])
|
|
failed_queries = session_data.get("failed_queries", [])
|
|
|
|
summary_content = f"""Session Summary
|
|
================
|
|
Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
Total Queries: {len(query_results)}
|
|
Successful Queries: {len(successful_queries)}
|
|
Failed Queries: {len(failed_queries)}
|
|
Success Rate: {len(successful_queries)/len(query_results)*100:.1f}% if query_results else 0
|
|
|
|
"""
|
|
|
|
# Extract attack history from context if available
|
|
attack_history = []
|
|
all_tools = []
|
|
|
|
if context:
|
|
# Get attack history from session_data
|
|
if hasattr(context, 'session_data'):
|
|
attack_history = context.session_data.get('attack_history', [])
|
|
|
|
# Get all created tools from context
|
|
if hasattr(context, 'created_tools'):
|
|
# Handle both dict and list cases
|
|
if isinstance(context.created_tools, dict):
|
|
all_tools = list(context.created_tools.values())
|
|
else:
|
|
all_tools = list(context.created_tools)
|
|
|
|
# Organize successful attacks by query
|
|
successful_attacks_by_query = {}
|
|
|
|
# Process attack history to find successful jailbreaks
|
|
for attack in attack_history:
|
|
if attack.get('attack_successful', False): # Check attack_successful field
|
|
query = attack.get('original_query', 'Unknown Query')
|
|
tool_name = attack.get('tool_name', 'Unknown Tool')
|
|
score = attack.get('final_judge_score', 0)
|
|
|
|
if query not in successful_attacks_by_query:
|
|
successful_attacks_by_query[query] = []
|
|
|
|
successful_attacks_by_query[query].append({
|
|
'tool_name': tool_name,
|
|
'score': score,
|
|
'total_turns': attack.get('total_turns', 0),
|
|
'attack_record': attack
|
|
})
|
|
|
|
# Sort tools by their query performance
|
|
tool_performance_by_query = {}
|
|
for tool in all_tools:
|
|
# Sort query performance by score (highest first)
|
|
sorted_query_performance = sorted(
|
|
tool.performance.query_performance,
|
|
key=lambda x: x.get('score', 0),
|
|
reverse=True
|
|
)
|
|
|
|
for query_perf in sorted_query_performance:
|
|
query = query_perf.get('query', 'Unknown Query')
|
|
score = query_perf.get('score', 0)
|
|
|
|
if query not in tool_performance_by_query:
|
|
tool_performance_by_query[query] = []
|
|
|
|
tool_performance_by_query[query].append({
|
|
'tool_name': tool.tool_name,
|
|
'tool_id': tool.tool_id,
|
|
'score': score,
|
|
'tool': tool
|
|
})
|
|
|
|
# DETAILED QUERY ANALYSIS
|
|
summary_content += """
|
|
==========================
|
|
DETAILED QUERY ANALYSIS
|
|
==========================
|
|
"""
|
|
|
|
for query, results in query_results.items():
|
|
status = "SUCCESS" if results.get("query_successful", False) else "FAILED"
|
|
iterations = len(results.get("coordinator_decisions", []))
|
|
tools_created = sum(r.get("tools_created", 0) for r in results.get("tool_creation_results", []))
|
|
best_score = max([r.get("best_judge_score", 0) for r in results.get("exploitation_results", [])], default=0)
|
|
|
|
summary_content += f"""
|
|
Query: {query}
|
|
Status: {status}
|
|
Iterations: {iterations}
|
|
Tools Created: {tools_created}
|
|
Best Score: {best_score}
|
|
"""
|
|
|
|
# Add successful jailbreak tools for this query
|
|
if query in successful_attacks_by_query:
|
|
summary_content += f"""
|
|
SUCCESSFUL JAILBREAK TOOLS for '{query}':
|
|
"""
|
|
for attack_info in successful_attacks_by_query[query]:
|
|
tool_name = attack_info['tool_name']
|
|
score = attack_info['score']
|
|
turns = attack_info['total_turns']
|
|
attack_record = attack_info['attack_record']
|
|
|
|
summary_content += f"""
|
|
┌─────────────────────────────────────────
|
|
│ Tool: {tool_name}
|
|
│ Score: {score}/5 (SUCCESSFUL JAILBREAK)
|
|
│ Turns: {turns}
|
|
│ Timestamp: {attack_record.get('timestamp', 'N/A')}
|
|
│ Strategy: {attack_record.get('strategy_plan', 'N/A')[:100]}...
|
|
│
|
|
│ MULTI-TURN CONVERSATION RESULTS:
|
|
│"""
|
|
|
|
multi_turn_results = attack_record.get('multi_turn_results', {})
|
|
if isinstance(multi_turn_results, dict):
|
|
for turn_key, turn_data in multi_turn_results.items():
|
|
if turn_key.startswith('turn_'):
|
|
turn_num = turn_key.split('_')[1]
|
|
prompt = turn_data.get('prompt', 'N/A')[:100] + '...' if len(turn_data.get('prompt', 'N/A')) > 100 else turn_data.get('prompt', 'N/A')
|
|
response = turn_data.get('response', 'N/A')[:100] + '...' if len(turn_data.get('response', 'N/A')) > 100 else turn_data.get('response', 'N/A')
|
|
judge_score = turn_data.get('judge_score', 0)
|
|
|
|
summary_content += f"""
|
|
│ Turn {turn_num}:
|
|
│ Prompt: {prompt}
|
|
│ Response: {response}
|
|
│ Judge Score: {judge_score}"""
|
|
|
|
final_judge_score = attack_record.get('final_judge_score', 0)
|
|
summary_content += f"""
|
|
│ FINAL JUDGE SCORE: {final_judge_score}/5 {'✓ SUCCESS' if final_judge_score >= 5 else '✗ FAILED'}
|
|
└─────────────────────────────────────────
|
|
"""
|
|
|
|
# Show tool performance ranking for this query (sorted by query performance)
|
|
if query in tool_performance_by_query:
|
|
summary_content += f"""
|
|
TOOL PERFORMANCE RANKING for '{query}':
|
|
"""
|
|
for i, tool_info in enumerate(tool_performance_by_query[query], 1):
|
|
tool = tool_info['tool']
|
|
summary_content += f"""
|
|
{i}. {tool_info['tool_name']} (Score: {tool_info['score']}/5)
|
|
- Success Rate: {tool.performance.success_count}/{tool.performance.execution_count} ({tool.performance.success_count/max(1, tool.performance.execution_count)*100:.1f}%)
|
|
- Category: {tool.tool_category}
|
|
- Description: {tool.tool_description[:80]}...
|
|
"""
|
|
|
|
# OVERALL SESSION STATISTICS
|
|
total_successful_attacks = sum(len(attacks) for attacks in successful_attacks_by_query.values())
|
|
total_attacks = len(attack_history)
|
|
|
|
summary_content += f"""
|
|
==========================
|
|
SESSION STATISTICS
|
|
==========================
|
|
Total Attacks Executed: {total_attacks}
|
|
Successful Jailbreaks: {total_successful_attacks}
|
|
Overall Attack Success Rate: {total_successful_attacks/total_attacks*100:.1f}% if total_attacks > 0 else 0
|
|
|
|
Queries with Successful Jailbreaks: {len(successful_attacks_by_query)}/{len(query_results)}
|
|
"""
|
|
|
|
# BEST PERFORMING TOOLS OVERALL (based on query performance)
|
|
if all_tools:
|
|
# Sort tools by their best query performance score
|
|
top_tools = sorted(all_tools, key=lambda x: max([q.get('score', 0) for q in x.performance.query_performance], default=0), reverse=True)[:3]
|
|
summary_content += f"""
|
|
==========================
|
|
TOP 3 PERFORMING TOOLS OVERALL
|
|
==========================
|
|
"""
|
|
for i, tool in enumerate(top_tools, 1):
|
|
best_score = max([q.get('score', 0) for q in tool.performance.query_performance], default=0)
|
|
summary_content += f"""
|
|
{i}. {tool.tool_name} (Best Query Score: {best_score}/5)
|
|
- Category: {tool.tool_category}
|
|
- Success Rate: {tool.performance.success_count}/{tool.performance.execution_count} ({tool.performance.success_count/max(1, tool.performance.execution_count)*100:.1f}%)
|
|
- Avg Execution Time: {tool.performance.average_execution_time:.2f}s
|
|
- Total Executions: {tool.performance.execution_count}
|
|
- Description: {tool.tool_description[:100]}...
|
|
- Query Performance: {tool.get_previous_queries_summary()}
|
|
"""
|
|
|
|
# Save the comprehensive summary
|
|
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
|
|
await f.write(summary_content)
|
|
|
|
print(f"📄 Comprehensive session summary saved to: {filename}")
|
|
return filename
|
|
|
|
async def save_session_summary_json(self, session_data: Dict, session_folder: str, context=None) -> str:
|
|
"""Save a comprehensive JSON summary of the session with jailbreak tool details and attack results"""
|
|
filename = os.path.join(session_folder, "summary.json")
|
|
|
|
query_results = session_data.get("query_results", {})
|
|
successful_queries = session_data.get("successful_queries", [])
|
|
failed_queries = session_data.get("failed_queries", [])
|
|
|
|
# Extract attack history from context if available
|
|
attack_history = []
|
|
all_tools = []
|
|
|
|
if context:
|
|
# Get attack history from session_data
|
|
if hasattr(context, 'session_data'):
|
|
attack_history = context.session_data.get('attack_history', [])
|
|
|
|
# Get all created tools from context
|
|
if hasattr(context, 'created_tools'):
|
|
# Handle both dict and list cases
|
|
if isinstance(context.created_tools, dict):
|
|
all_tools = list(context.created_tools.values())
|
|
else:
|
|
all_tools = list(context.created_tools)
|
|
|
|
# Organize successful attacks by query
|
|
successful_attacks_by_query = {}
|
|
|
|
# Process attack history to find successful jailbreaks
|
|
for attack in attack_history:
|
|
if attack.get('attack_successful', False): # Check attack_successful field
|
|
query = attack.get('original_query', 'Unknown Query')
|
|
tool_name = attack.get('tool_name', 'Unknown Tool')
|
|
score = attack.get('final_judge_score', 0)
|
|
|
|
if query not in successful_attacks_by_query:
|
|
successful_attacks_by_query[query] = []
|
|
|
|
successful_attacks_by_query[query].append({
|
|
'tool_name': tool_name,
|
|
'score': score,
|
|
'total_turns': attack.get('total_turns', 0),
|
|
'attack_record': attack
|
|
})
|
|
|
|
# Sort tools by their query performance
|
|
tool_performance_by_query = {}
|
|
for tool in all_tools:
|
|
# Sort query performance by score (highest first)
|
|
sorted_query_performance = sorted(
|
|
tool.performance.query_performance,
|
|
key=lambda x: x.get('score', 0),
|
|
reverse=True
|
|
)
|
|
|
|
for query_perf in sorted_query_performance:
|
|
query = query_perf.get('query', 'Unknown Query')
|
|
score = query_perf.get('score', 0)
|
|
|
|
if query not in tool_performance_by_query:
|
|
tool_performance_by_query[query] = []
|
|
|
|
tool_performance_by_query[query].append({
|
|
'tool_name': tool.tool_name,
|
|
'tool_id': tool.tool_id,
|
|
'score': score,
|
|
'tool': tool
|
|
})
|
|
|
|
# Build detailed query analysis
|
|
detailed_query_analysis = []
|
|
for query, results in query_results.items():
|
|
status = "SUCCESS" if results.get("query_successful", False) else "FAILED"
|
|
iterations = len(results.get("coordinator_decisions", []))
|
|
tools_created = sum(r.get("tools_created", 0) for r in results.get("tool_creation_results", []))
|
|
best_score = max([r.get("best_judge_score", 0) for r in results.get("exploitation_results", [])], default=0)
|
|
|
|
query_info = {
|
|
"query": query,
|
|
"status": status,
|
|
"iterations": iterations,
|
|
"tools_created": tools_created,
|
|
"best_score": best_score,
|
|
"successful_jailbreak_tools": [],
|
|
"tool_performance_ranking": []
|
|
}
|
|
|
|
# Add successful jailbreak tools for this query
|
|
if query in successful_attacks_by_query:
|
|
for attack_info in successful_attacks_by_query[query]:
|
|
tool_name = attack_info['tool_name']
|
|
score = attack_info['score']
|
|
turns = attack_info['total_turns']
|
|
attack_record = attack_info['attack_record']
|
|
|
|
tool_info = {
|
|
"tool_name": tool_name,
|
|
"score": score,
|
|
"turns": turns,
|
|
"timestamp": attack_record.get('timestamp', 'N/A'),
|
|
"strategy": attack_record.get('strategy_plan', 'N/A'),
|
|
"final_judge_score": attack_record.get('final_judge_score', 0),
|
|
"multi_turn_conversation": []
|
|
}
|
|
|
|
# Extract multi-turn conversation
|
|
multi_turn_results = attack_record.get('multi_turn_results', {})
|
|
if isinstance(multi_turn_results, dict):
|
|
for turn_key, turn_data in multi_turn_results.items():
|
|
if turn_key.startswith('turn_'):
|
|
turn_num = turn_key.split('_')[1]
|
|
tool_info["multi_turn_conversation"].append({
|
|
"turn": int(turn_num),
|
|
"attack_response": turn_data.get('attack_response', 'N/A'),
|
|
"target_response": turn_data.get('target_response', 'N/A'),
|
|
"judge_score": turn_data.get('judge_score', 0)
|
|
})
|
|
|
|
query_info["successful_jailbreak_tools"].append(tool_info)
|
|
|
|
# Show tool performance ranking for this query
|
|
if query in tool_performance_by_query:
|
|
for i, tool_info in enumerate(tool_performance_by_query[query], 1):
|
|
tool = tool_info['tool']
|
|
query_info["tool_performance_ranking"].append({
|
|
"rank": i,
|
|
"tool_name": tool_info['tool_name'],
|
|
"tool_id": tool_info['tool_id'],
|
|
"score": tool_info['score'],
|
|
"success_rate": f"{tool.performance.success_count}/{tool.performance.execution_count} ({tool.performance.success_count/max(1, tool.performance.execution_count)*100:.1f}%)",
|
|
"category": tool.tool_category,
|
|
"description": tool.tool_description
|
|
})
|
|
|
|
detailed_query_analysis.append(query_info)
|
|
|
|
# Calculate session statistics
|
|
total_successful_attacks = sum(len(attacks) for attacks in successful_attacks_by_query.values())
|
|
total_attacks = len(attack_history)
|
|
|
|
# Get top performing tools overall
|
|
top_tools = []
|
|
if all_tools:
|
|
# Sort tools by their best query performance score
|
|
top_tools = sorted(all_tools, key=lambda x: max([q.get('score', 0) for q in x.performance.query_performance], default=0), reverse=True)[:3]
|
|
|
|
top_tools_info = []
|
|
for i, tool in enumerate(top_tools, 1):
|
|
best_score = max([q.get('score', 0) for q in tool.performance.query_performance], default=0)
|
|
top_tools_info.append({
|
|
"rank": i,
|
|
"tool_name": tool.tool_name,
|
|
"best_query_score": best_score,
|
|
"category": tool.tool_category,
|
|
"success_rate": f"{tool.performance.success_count}/{tool.performance.execution_count} ({tool.performance.success_count/max(1, tool.performance.execution_count)*100:.1f}%)",
|
|
"avg_execution_time": tool.performance.average_execution_time,
|
|
"total_executions": tool.performance.execution_count,
|
|
"description": tool.tool_description,
|
|
"query_performance": tool.performance.query_performance
|
|
})
|
|
|
|
# Build final JSON structure
|
|
json_summary = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"session_summary": {
|
|
"total_queries": len(query_results),
|
|
"successful_queries": len(successful_queries),
|
|
"failed_queries": len(failed_queries),
|
|
"success_rate": len(successful_queries)/len(query_results)*100 if query_results else 0
|
|
},
|
|
"session_statistics": {
|
|
"total_attacks_executed": total_attacks,
|
|
"successful_jailbreaks": total_successful_attacks,
|
|
"overall_attack_success_rate": total_successful_attacks/total_attacks*100 if total_attacks > 0 else 0,
|
|
"queries_with_successful_jailbreaks": len(successful_attacks_by_query),
|
|
"queries_with_successful_jailbreaks_rate": f"{len(successful_attacks_by_query)}/{len(query_results)}"
|
|
},
|
|
"detailed_query_analysis": detailed_query_analysis,
|
|
"top_performing_tools": top_tools_info
|
|
}
|
|
|
|
# Save the JSON summary
|
|
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
|
|
await f.write(json.dumps(json_summary, indent=2, ensure_ascii=False, default=str))
|
|
|
|
print(f"📊 Comprehensive session summary (JSON) saved to: {filename}")
|
|
return filename
|
|
|
|
# Global instance
|
|
data_saver = SimpleDataSaver() |