Files
sans-paper-public/openrouter_threatmodeler.py

947 lines
40 KiB
Python

#!/usr/bin/env python3
"""
OpenRouter Helm Chart Security Analysis Tool
This tool automates security analysis of Helm charts using OpenRouter.ai,
supporting multiple AI models, mapping findings to MITRE ATT&CK framework,
and outputting results to CSV and Markdown.
The tool can use Anthropic and OpenAI APIs directly when API keys are available,
falling back to OpenRouter when they're not. It also supports OpenAI reasoning models
like o3 and o4-mini which require a different request schema.
# Run 3 times with a single model
python openrouter_threatmodeler.py ./helm-chart -n 3 -m claude-3-haiku
# Run 2 times each with 3 different models (6 total runs)
python openrouter_threatmodeler.py ./helm-chart -n 2 -m claude-3-haiku gpt-4 mistral-large
# Mix of short names and direct paths
python openrouter_threatmodeler.py ./helm-chart -n 1 -m claude-3-opus openai/gpt-4-turbo-preview
# Use direct API access with environment variables
ANTHROPIC_API_KEY=your_key OPENAI_API_KEY=your_key python openrouter_threatmodeler.py ./helm-chart -n 2 -m claude-3-haiku gpt-4
# Use direct API access with command-line arguments
python openrouter_threatmodeler.py ./helm-chart -n 2 -m claude-3-haiku gpt-4 --anthropic-api-key=your_key --openai-api-key=your_key
# Run used for the SANs Whitepaper Experiment:
python openrouter_threatmodeler.py $PWD/wordpress-helm-chart -o combined.csv -n 5 -m anthropic/claude-3-5-haiku-latest anthropic/claude-3-7-sonnet-latest anthropic/claude-opus-4-0 openai/o4-mini openai/chatgpt-4o-latest
"""
import os
import csv
import json
import yaml
import time
import argparse
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass, asdict
import re
import requests
@dataclass
class SecurityFinding:
"""Data class for security findings"""
run_id: int
timestamp: str
mitre_attack_tactic: str
mitre_attack_base_technique_id: str
mitre_attack_sub_technique_id: Optional[str]
mitre_attack_technique_name: str
misconfiguration: str
risk_priority: str
mitigation_steps: str
helm_file: str
raw_response: str
model: str # Added model field
@dataclass
class SimplifiedFinding:
"""Simplified data class for CSV output"""
run_number: int
model: str # Added model field
mitre_attack_tactic: str
mitre_attack_base_technique_id: str
mitre_attack_sub_technique_id: Optional[str]
mitre_attack_technique_name: str
risk_priority: str
class HelmSecurityAnalyzer:
"""Main class for analyzing Helm charts using OpenRouter API"""
# Available models on OpenRouter
AVAILABLE_MODELS = {
'claude-3-opus': 'anthropic/claude-3-opus',
'claude-3-sonnet': 'anthropic/claude-3-sonnet-20240229',
'claude-3-haiku': 'anthropic/claude-3-haiku',
'gpt-4': 'openai/gpt-4',
'gpt-4-turbo': 'openai/gpt-4-turbo-preview',
'gpt-3.5-turbo': 'openai/gpt-3.5-turbo',
'o3': 'openai/o3',
'o4-mini': 'openai/o4-mini',
'mistral-large': 'mistralai/mistral-large',
'mixtral-8x7b': 'mistralai/mixtral-8x7b-instruct',
'gemini-pro': 'google/gemini-pro',
'llama-3-70b': 'meta-llama/llama-3-70b-instruct',
'llama-3-8b': 'meta-llama/llama-3-8b-instruct'
}
# Known reasoning models that require special formatting
REASONING_MODELS = ["o3", "o4-mini"]
def __init__(self, api_key: str, model: str = "anthropic/claude-3-haiku",
site_url: Optional[str] = None, app_name: Optional[str] = None,
anthropic_api_key: Optional[str] = None, openai_api_key: Optional[str] = None):
"""Initialize the analyzer with API credentials"""
self.api_key = api_key # OpenRouter API key
self.anthropic_api_key = anthropic_api_key or os.environ.get("ANTHROPIC_API_KEY")
self.openai_api_key = openai_api_key or os.environ.get("OPENAI_API_KEY")
self.base_url = "https://openrouter.ai/api/v1"
# Map short model names to full OpenRouter model paths
if model in self.AVAILABLE_MODELS:
self.model = self.AVAILABLE_MODELS[model]
else:
# Allow direct model path specification
self.model = model
self.site_url = site_url or "https://github.com/esekercan/sans-paper/tree/main/python"
self.app_name = app_name or "Helm Security Analyzer"
self.logger = logging.getLogger(__name__)
def set_model(self, model: str):
"""Set the current model"""
if model in self.AVAILABLE_MODELS:
self.model = self.AVAILABLE_MODELS[model]
else:
self.model = model
def call_anthropic_api(self, system_prompt: str, user_prompt: str) -> Tuple[str, Dict[str, int]]:
"""Call Anthropic API directly when API key is available"""
self.logger.info(f"Calling Anthropic API directly with model: {self.model}")
# Extract the model name from the full path (e.g., "anthropic/claude-3-haiku" -> "claude-3-haiku")
model_name = self.model.split('/')[-1] if '/' in self.model else self.model
headers = {
"x-api-key": self.anthropic_api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json"
}
data = {
"model": model_name,
"system": system_prompt,
"messages": [
{"role": "user", "content": user_prompt}
],
"temperature": 0.1,
"max_tokens": 5000,
"top_p": 1
}
try:
response = requests.post(
"https://api.anthropic.com/v1/messages",
headers=headers,
json=data,
timeout=120
)
response.raise_for_status()
result = response.json()
content = result['content'][0]['text']
# Extract usage information (Anthropic might provide different format)
usage = {
'input_tokens': result.get('usage', {}).get('input_tokens', 0),
'output_tokens': result.get('usage', {}).get('output_tokens', 0),
'total_tokens': result.get('usage', {}).get('input_tokens', 0) + result.get('usage', {}).get('output_tokens', 0)
}
self.logger.info(f"Anthropic API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
return content, usage
except requests.exceptions.RequestException as e:
self.logger.error(f"Anthropic API call failed: {e}")
if hasattr(e, 'response') and e.response is not None:
self.logger.error(f"Response content: {e.response.text}")
raise
def call_openai_api(self, system_prompt: str, user_prompt: str) -> Tuple[str, Dict[str, int]]:
"""Call OpenAI API directly when API key is available"""
self.logger.info(f"Calling OpenAI API directly with model: {self.model}")
# Extract the model name from the full path (e.g., "openai/gpt-4" -> "gpt-4")
model_name = self.model.split('/')[-1] if '/' in self.model else self.model
# Check if this is a reasoning model that requires the Responses API
is_reasoning_model = model_name in self.REASONING_MODELS
if is_reasoning_model:
return self._call_openai_responses_api(system_prompt, user_prompt, model_name)
else:
return self._call_openai_chat_completions_api(system_prompt, user_prompt, model_name)
def _call_openai_chat_completions_api(self, system_prompt: str, user_prompt: str, model_name: str) -> Tuple[str, Dict[str, int]]:
"""Call OpenAI Chat Completions API for standard models"""
self.logger.info(f"Using Chat Completions API for model: {model_name}")
headers = {
"Authorization": f"Bearer {self.openai_api_key}",
"Content-Type": "application/json"
}
# Base request data for chat completions
data = {
"model": model_name,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"temperature": 0.1,
"max_tokens": 5000,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0
}
try:
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=data,
timeout=120
)
response.raise_for_status()
result = response.json()
if 'choices' not in result or not result['choices']:
self.logger.error(f"No choices in OpenAI response: {result}")
raise ValueError("No choices returned in OpenAI API response")
content = result['choices'][0]['message']['content']
# Extract usage information
usage_data = result.get('usage', {})
usage = {
'input_tokens': usage_data.get('prompt_tokens', 0),
'output_tokens': usage_data.get('completion_tokens', 0),
'total_tokens': usage_data.get('total_tokens', 0)
}
self.logger.info(f"Chat Completions API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
return content, usage
except requests.exceptions.RequestException as e:
self.logger.error(f"Chat Completions API call failed: {e}")
if hasattr(e, 'response') and e.response is not None:
self.logger.error(f"Response content: {e.response.text}")
raise
except Exception as e:
self.logger.error(f"Unexpected error in Chat Completions API call: {e}")
raise
def _call_openai_responses_api(self, system_prompt: str, user_prompt: str, model_name: str) -> Tuple[str, Dict[str, int]]:
"""Call OpenAI Responses API for reasoning models like o3 and o4-mini"""
self.logger.info(f"Using Responses API for reasoning model: {model_name}")
headers = {
"Authorization": f"Bearer {self.openai_api_key}",
"Content-Type": "application/json"
}
# Base request data for responses API (note: uses "input" instead of "messages", no temperature)
data = {
"model": model_name,
"input": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"max_output_tokens": 4000,
"reasoning": {"effort": "medium"}
}
try:
response = requests.post(
"https://api.openai.com/v1/responses",
headers=headers,
json=data,
timeout=120
)
response.raise_for_status()
result = response.json()
# Parse the Responses API structure which has an 'output' array
content = None
if 'output' in result and isinstance(result['output'], list):
# Look for the message object in the output array
for output_item in result['output']:
if output_item.get('type') == 'message' and 'content' in output_item:
# Extract text from the content array
content_array = output_item['content']
if isinstance(content_array, list) and len(content_array) > 0:
# Look for the text content
for content_item in content_array:
if content_item.get('type') == 'output_text' and 'text' in content_item:
content = content_item['text']
break
break
# Fallback to legacy structure parsing if needed
if content is None:
if 'choices' in result and result['choices']:
content = result['choices'][0]['message']['content']
elif 'response' in result:
content = result['response']
elif 'content' in result:
content = result['content']
else:
self.logger.error(f"Unexpected response structure from Responses API: {list(result.keys())}")
raise ValueError("Unexpected response structure from OpenAI Responses API")
# Extract usage information (might be in different format for Responses API)
usage_data = result.get('usage', {})
usage = {
'input_tokens': usage_data.get('prompt_tokens', 0) or usage_data.get('input_tokens', 0),
'output_tokens': usage_data.get('completion_tokens', 0) or usage_data.get('output_tokens', 0),
'total_tokens': usage_data.get('total_tokens', 0)
}
# Calculate total if not provided
if usage['total_tokens'] == 0:
usage['total_tokens'] = usage['input_tokens'] + usage['output_tokens']
self.logger.info(f"Responses API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
return content, usage
except requests.exceptions.RequestException as e:
self.logger.error(f"Responses API call failed: {e}")
if hasattr(e, 'response') and e.response is not None:
self.logger.error(f"Response content: {e.response.text}")
raise
except Exception as e:
self.logger.error(f"Unexpected error in Responses API call: {e}")
raise
def load_helm_files(self, helm_dir: Path) -> Dict[str, str]:
"""Load all YAML files from a Helm chart directory"""
helm_files = {}
# Common Helm chart file patterns
patterns = ['*.yaml', '*.yml', '**/*.yaml', '**/*.yml']
for pattern in patterns:
for file_path in helm_dir.glob(pattern):
if file_path.is_file():
try:
with open(file_path, 'r') as f:
content = f.read()
relative_path = file_path.relative_to(helm_dir)
helm_files[str(relative_path)] = content
self.logger.info(f"Loaded {relative_path}")
except Exception as e:
self.logger.error(f"Error loading {file_path}: {e}")
return helm_files
def load_prompt_from_file(self, prompt_file: Path) -> str:
"""Load prompt from file - raises error if not found"""
if not prompt_file.exists():
raise FileNotFoundError(f"Required prompt file not found: {prompt_file}")
try:
with open(prompt_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
self.logger.info(f"Loaded prompt from {prompt_file}")
return content
except Exception as e:
self.logger.error(f"Error loading prompt file {prompt_file}: {e}")
raise
def create_system_prompt(self, system_prompt_file: Path) -> str:
"""Load the system prompt from file"""
return self.load_prompt_from_file(system_prompt_file)
def create_user_prompt(self, helm_files: Dict[str, str], user_prompt_file: Path) -> str:
"""Load the user prompt from file and append Helm chart content"""
user_prompt = self.load_prompt_from_file(user_prompt_file)
# Add Helm files content
helm_content = "\n\n**Helm Chart Files:**\n\n"
for filename, content in helm_files.items():
helm_content += f"### {filename}\n```yaml\n{content}\n```\n\n"
return user_prompt + helm_content
def parse_response(self, response: str, run_id: int, helm_file: str, model: str) -> List[SecurityFinding]:
"""Parse the LLM response into structured findings"""
findings = []
timestamp = datetime.now().isoformat()
# Try to parse table format
# Look for table rows (lines with | separators)
lines = response.split('\n')
table_rows = []
in_table = False
for line in lines:
if '|' in line and line.count('|') >= 4:
# Skip header and separator rows
if any(header in line.lower() for header in ['technique','tactic', 'misconfiguration', 'risk', 'mitigation']):
in_table = True
continue
if line.strip().startswith('|---') or line.strip().startswith('| ---'):
continue
if in_table:
table_rows.append(line)
# If we found table rows, parse them
if table_rows:
for row in table_rows:
cells = [cell.strip() for cell in row.split('|') if cell.strip()]
# Handle new format (with separate base and sub technique columns)
if len(cells) >= 7:
finding = SecurityFinding(
run_id=run_id,
timestamp=timestamp,
mitre_attack_base_technique_id=cells[0],
mitre_attack_sub_technique_id=cells[1] if cells[1] and cells[1] != "N/A" else None,
mitre_attack_technique_name=cells[2],
mitre_attack_tactic=cells[3],
misconfiguration=cells[4],
risk_priority=cells[5],
mitigation_steps=cells[6],
helm_file=helm_file,
raw_response=response,
model=model
)
findings.append(finding)
# Handle old format (with single technique column) for backward compatibility
elif len(cells) >= 5:
# Try to split the technique into base and sub if it contains a dot
technique = cells[0]
base_technique = technique
sub_technique = None
technique_name = ""
# Extract technique name if present (e.g., "T1234 - Brute Force")
if " - " in technique:
parts = technique.split(" - ", 1)
technique_id = parts[0].strip()
technique_name = parts[1].strip()
# Now check if the ID part contains a dot for sub-technique
if '.' in technique_id:
id_parts = technique_id.split('.')
base_technique = id_parts[0]
sub_technique = f"{base_technique}.{id_parts[1]}"
else:
base_technique = technique_id
# If no name separator but contains a dot (e.g., T1234.001)
elif '.' in technique:
parts = technique.split('.')
base_technique = parts[0]
sub_technique = f"{base_technique}.{parts[1]}"
finding = SecurityFinding(
run_id=run_id,
timestamp=timestamp,
mitre_attack_base_technique_id=base_technique,
mitre_attack_sub_technique_id=sub_technique,
mitre_attack_technique_name=technique_name,
mitre_attack_tactic=cells[1],
misconfiguration=cells[2],
risk_priority=cells[3],
mitigation_steps=cells[4],
helm_file=helm_file,
raw_response=response,
model=model
)
findings.append(finding)
else:
# If no table format found, try to extract findings from text
# This is a fallback parser
self.logger.warning("No table format found in response, attempting text extraction")
# Create a single finding with the full response
finding = SecurityFinding(
run_id=run_id,
timestamp=timestamp,
mitre_attack_base_technique_id="See raw response",
mitre_attack_sub_technique_id=None,
mitre_attack_technique_name="See raw response",
mitre_attack_tactic="See raw response",
misconfiguration="See raw response",
risk_priority="See raw response",
mitigation_steps="See raw response",
helm_file=helm_file,
raw_response=response,
model=model
)
findings.append(finding)
return findings
def analyze_helm_chart(self, helm_dir: Path, system_prompt_file: Path,
user_prompt_file: Path) -> tuple[str, Dict[str, str], Dict[str, int]]:
"""Analyze a single Helm chart and return findings"""
# Load Helm files
helm_files = self.load_helm_files(helm_dir)
if not helm_files:
raise ValueError(f"No YAML files found in {helm_dir}")
# Create prompts
system_prompt = self.create_system_prompt(system_prompt_file)
user_prompt = self.create_user_prompt(helm_files, user_prompt_file)
# Check if we should use direct API access based on model and available API keys
model_provider = self.model.split('/')[0] if '/' in self.model else ""
# Try to use Anthropic API directly for Anthropic models
if model_provider == "anthropic" and self.anthropic_api_key:
try:
self.logger.info(f"Using direct Anthropic API for model: {self.model}")
content, usage = self.call_anthropic_api(system_prompt, user_prompt)
return content, helm_files, usage
except Exception as e:
self.logger.warning(f"Direct Anthropic API call failed, falling back to OpenRouter: {e}")
# Fall back to OpenRouter
# Try to use OpenAI API directly for OpenAI models
elif model_provider == "openai" and self.openai_api_key:
try:
self.logger.info(f"Using direct OpenAI API for model: {self.model}")
content, usage = self.call_openai_api(system_prompt, user_prompt)
return content, helm_files, usage
except Exception as e:
self.logger.warning(f"Direct OpenAI API call failed, falling back to OpenRouter: {e}")
# Fall back to OpenRouter
# Use OpenRouter API as fallback
self.logger.info(f"Using OpenRouter API with model: {self.model}")
headers = {
"Authorization": f"Bearer {self.api_key}",
"HTTP-Referer": self.site_url,
"X-Title": self.app_name,
"Content-Type": "application/json"
}
# Base request data
data = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"temperature": .1,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0
}
# Extract model name and apply reasoning model specific formatting if needed
model_name = self.model.split('/')[-1] if '/' in self.model else self.model
if model_name in self.REASONING_MODELS:
self.logger.info(f"OpenRouter: Applying reasoning model formatting for: {model_name}")
data["max_output_tokens"] = 4000
data["reasoning"] = {"effort": "medium"}
else:
self.logger.info(f"OpenRouter: Using standard formatting for: {model_name}")
data["max_tokens"] = 4000
try:
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=data,
timeout=120
)
response.raise_for_status()
result = response.json()
content = result['choices'][0]['message']['content']
# Extract usage information
usage_data = result.get('usage', {})
usage = {
'input_tokens': usage_data.get('prompt_tokens', 0),
'output_tokens': usage_data.get('completion_tokens', 0),
'total_tokens': usage_data.get('total_tokens', 0)
}
self.logger.info(f"OpenRouter API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
return content, helm_files, usage
except requests.exceptions.RequestException as e:
self.logger.error(f"OpenRouter API call failed: {e}")
if hasattr(e, 'response') and e.response is not None:
self.logger.error(f"Response content: {e.response.text}")
raise
def run_experiments(self, helm_dir: Path, models: List[str], num_runs: int,
output_file: Path, system_prompt_file: Path,
user_prompt_file: Path, delay_seconds: float = 1.0):
"""Run multiple experiments with multiple models and save results"""
all_findings = []
all_responses = []
total_usage_by_model = {}
overall_run_id = 0
# Initialize usage tracking for each model
for model in models:
model_name = self.AVAILABLE_MODELS.get(model, model)
total_usage_by_model[model_name] = {
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'runs': 0
}
# Run experiments for each model
for model_idx, model in enumerate(models):
self.set_model(model)
model_name = self.model
self.logger.info(f"\n{'='*60}")
self.logger.info(f"Starting experiments with model {model_idx + 1}/{len(models)}: {model_name}")
self.logger.info(f"{'='*60}\n")
for run_in_model in range(1, num_runs + 1):
overall_run_id += 1
self.logger.info(f"Model '{model_name}' - Run {run_in_model}/{num_runs} (Overall run {overall_run_id})")
try:
response, helm_files, usage = self.analyze_helm_chart(
helm_dir, system_prompt_file, user_prompt_file
)
helm_files_str = ", ".join(helm_files.keys())
findings = self.parse_response(response, overall_run_id, helm_files_str, model_name)
all_findings.extend(findings)
# Update usage for this model
total_usage_by_model[model_name]['input_tokens'] += usage['input_tokens']
total_usage_by_model[model_name]['output_tokens'] += usage['output_tokens']
total_usage_by_model[model_name]['total_tokens'] += usage['total_tokens']
total_usage_by_model[model_name]['runs'] += 1
# Store the raw response with metadata
all_responses.append({
'run_id': overall_run_id,
'model_run_id': run_in_model,
'timestamp': datetime.now().isoformat(),
'helm_files': helm_files_str,
'response': response,
'usage': usage,
'model': model_name
})
self.logger.info(f"Run {overall_run_id} completed with {len(findings)} findings")
# Add delay between API calls to avoid rate limiting
if overall_run_id < len(models) * num_runs:
time.sleep(delay_seconds)
except Exception as e:
self.logger.error(f"Error in run {overall_run_id} (model: {model_name}): {e}")
# Continue with next run
continue
# Save outputs
self.save_to_csv(all_findings, output_file)
self.save_to_markdown(all_responses, output_file.with_suffix('.md'), total_usage_by_model)
return all_findings
def save_to_csv(self, findings: List[SecurityFinding], output_file: Path):
"""Save simplified findings to CSV file"""
if not findings:
self.logger.warning("No findings to save")
return
# Create simplified findings with only the requested fields
simplified_findings = []
for finding in findings:
simplified = SimplifiedFinding(
run_number=finding.run_id,
model=finding.model,
mitre_attack_base_technique_id=finding.mitre_attack_base_technique_id,
mitre_attack_sub_technique_id=finding.mitre_attack_sub_technique_id,
mitre_attack_technique_name=finding.mitre_attack_technique_name,
mitre_attack_tactic=finding.mitre_attack_tactic,
risk_priority=finding.risk_priority
)
simplified_findings.append(asdict(simplified))
# Write to CSV
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['run_number', 'model', 'mitre_attack_base_technique_id',
'mitre_attack_sub_technique_id', 'mitre_attack_technique_name',
'mitre_attack_tactic', 'risk_priority'])
writer.writeheader()
writer.writerows(simplified_findings)
self.logger.info(f"Saved {len(simplified_findings)} findings to {output_file}")
def save_to_markdown(self, responses: List[Dict[str, Any]], output_file: Path,
total_usage_by_model: Dict[str, Dict[str, int]]):
"""Save all responses to a Markdown file"""
if not responses:
self.logger.warning("No responses to save")
return
with open(output_file, 'w', encoding='utf-8') as f:
f.write("# Helm Chart Security Analysis Results\n\n")
f.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"Total runs: {len(responses)}\n\n")
# Add summary of models used
f.write("## Models Used\n\n")
for model, usage in total_usage_by_model.items():
if usage['runs'] > 0:
f.write(f"- **{model}**: {usage['runs']} runs\n")
f.write("\n")
# Add total token usage summary by model
f.write("## Token Usage Summary by Model\n\n")
grand_total_tokens = 0
for model, usage in total_usage_by_model.items():
if usage['runs'] > 0:
f.write(f"### {model}\n\n")
f.write(f"- **Runs:** {usage['runs']}\n")
f.write(f"- **Total Input Tokens:** {usage['input_tokens']:,}\n")
f.write(f"- **Total Output Tokens:** {usage['output_tokens']:,}\n")
f.write(f"- **Total Tokens:** {usage['total_tokens']:,}\n")
f.write(f"- **Average Tokens per Run:** {usage['total_tokens'] // usage['runs']:,}\n\n")
grand_total_tokens += usage['total_tokens']
f.write(f"### Grand Total\n\n")
f.write(f"- **Total Tokens Across All Models:** {grand_total_tokens:,}\n\n")
f.write("---\n\n")
# Group responses by model for better organization
from collections import defaultdict
responses_by_model = defaultdict(list)
for response_data in responses:
responses_by_model[response_data['model']].append(response_data)
# Write responses grouped by model
for model, model_responses in responses_by_model.items():
f.write(f"## Model: {model}\n\n")
for response_data in model_responses:
f.write(f"### Run {response_data['run_id']} (Model Run {response_data['model_run_id']})\n\n")
f.write(f"**Timestamp:** {response_data['timestamp']}\n\n")
f.write(f"**Analyzed files:** {response_data['helm_files']}\n\n")
# Add token usage for this run
if 'usage' in response_data:
f.write("#### Token Usage\n\n")
f.write(f"- **Input Tokens:** {response_data['usage']['input_tokens']:,}\n")
f.write(f"- **Output Tokens:** {response_data['usage']['output_tokens']:,}\n")
f.write(f"- **Total Tokens:** {response_data['usage']['total_tokens']:,}\n\n")
f.write("#### Analysis Results\n\n")
f.write(response_data['response'])
f.write("\n\n---\n\n")
self.logger.info(f"Saved {len(responses)} responses to {output_file}")
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Analyze Helm charts for security issues using OpenRouter API",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""
Available models:
{chr(10).join(f' - {key}: {value}' for key, value in HelmSecurityAnalyzer.AVAILABLE_MODELS.items())}
You can also specify a direct model path like 'anthropic/claude-3-opus-20240229'
Examples:
# Run 3 times with claude-3-haiku
%(prog)s ./helm-chart -n 3 -m claude-3-haiku
# Run 2 times each with 3 different models
%(prog)s ./helm-chart -n 2 -m claude-3-haiku gpt-4 mistral-large
# Run with direct model paths
%(prog)s ./helm-chart -n 1 -m anthropic/claude-3-opus-20240229 openai/gpt-4-turbo
# Use direct API access with environment variables
ANTHROPIC_API_KEY=your_key OPENAI_API_KEY=your_key %(prog)s ./helm-chart -n 2 -m claude-3-haiku gpt-4
# Use direct API access with command-line arguments
%(prog)s ./helm-chart -n 2 -m claude-3-haiku gpt-4 --anthropic-api-key=your_key --openai-api-key=your_key
"""
)
parser.add_argument(
"helm_dir",
type=Path,
help="Path to Helm chart directory"
)
parser.add_argument(
"-n", "--num-runs",
type=int,
default=1,
help="Number of times to run the analysis per model (default: 1)"
)
parser.add_argument(
"-o", "--output",
type=Path,
default=Path("security_findings.csv"),
help="Output CSV file (default: security_findings.csv)"
)
parser.add_argument(
"-k", "--api-key",
type=str,
help="OpenRouter API key (or set OPENROUTER_API_KEY env var)"
)
parser.add_argument(
"--anthropic-api-key",
type=str,
help="Anthropic API key (or set ANTHROPIC_API_KEY env var)"
)
parser.add_argument(
"--openai-api-key",
type=str,
help="OpenAI API key (or set OPENAI_API_KEY env var)"
)
parser.add_argument(
"-m", "--models",
type=str,
nargs='+',
default=["anthropic/claude-3-haiku"],
help="Models to use (can specify multiple)"
)
parser.add_argument(
"-sp", "--system-prompt",
type=Path,
default=Path("prompts/system_prompt.md"),
help="System prompt file (default: prompts/system_prompt.md)"
)
parser.add_argument(
"-up", "--user-prompt",
type=Path,
default=Path("prompts/user_prompt.md"),
help="User prompt file (default: prompts/user_prompt.md)"
)
parser.add_argument(
"--site-url",
type=str,
help="Your site URL for OpenRouter tracking"
)
parser.add_argument(
"--app-name",
type=str,
help="Your app name for OpenRouter tracking"
)
parser.add_argument(
"-d", "--delay",
type=float,
default=1.0,
help="Delay between API calls in seconds (default: 1.0)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Enable verbose logging"
)
parser.add_argument(
"--list-models",
action="store_true",
help="List available models and exit"
)
args = parser.parse_args()
# Handle list-models flag
if args.list_models:
print("Available models:")
for key, value in HelmSecurityAnalyzer.AVAILABLE_MODELS.items():
print(f" - {key}: {value}")
return 0
# Setup logging
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Get API key
api_key = args.api_key or os.environ.get("OPENROUTER_API_KEY")
if not api_key:
parser.error("API key required: use --api-key or set OPENROUTER_API_KEY")
# Check that prompt files exist
if not args.system_prompt.exists():
parser.error(f"System prompt file not found: {args.system_prompt}")
if not args.user_prompt.exists():
parser.error(f"User prompt file not found: {args.user_prompt}")
# Create analyzer (model will be set dynamically)
analyzer = HelmSecurityAnalyzer(
api_key=api_key,
model=args.models[0], # Initialize with first model
site_url=args.site_url,
app_name=args.app_name,
anthropic_api_key=args.anthropic_api_key,
openai_api_key=args.openai_api_key
)
try:
# Display run plan
total_runs = len(args.models) * args.num_runs
print(f"\nAnalysis Plan:")
print(f" - Models: {len(args.models)}")
for model in args.models:
model_name = analyzer.AVAILABLE_MODELS.get(model, model)
print(f"{model_name}")
print(f" - Runs per model: {args.num_runs}")
print(f" - Total runs: {total_runs}")
print(f" - Estimated time: ~{total_runs * (args.delay + 10):.0f} seconds\n")
findings = analyzer.run_experiments(
helm_dir=args.helm_dir,
models=args.models,
num_runs=args.num_runs,
output_file=args.output,
system_prompt_file=args.system_prompt,
user_prompt_file=args.user_prompt,
delay_seconds=args.delay
)
print(f"\nAnalysis complete!")
print(f" - Total security findings: {len(findings)}")
print(f" - Models used: {len(args.models)}")
print(f" - Total runs: {total_runs}")
print(f"\nResults saved to:")
print(f" - CSV: {args.output}")
print(f" - Markdown: {args.output.with_suffix('.md')}")
print(f"\nPrompts loaded from:")
print(f" - System: {args.system_prompt}")
print(f" - User: {args.user_prompt}")
except Exception as e:
logging.error(f"Fatal error: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())