mirror of
https://github.com/esekercan/sans-paper-public.git
synced 2026-02-12 15:52:45 +00:00
947 lines
40 KiB
Python
947 lines
40 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
OpenRouter Helm Chart Security Analysis Tool
|
|
|
|
This tool automates security analysis of Helm charts using OpenRouter.ai,
|
|
supporting multiple AI models, mapping findings to MITRE ATT&CK framework,
|
|
and outputting results to CSV and Markdown.
|
|
|
|
The tool can use Anthropic and OpenAI APIs directly when API keys are available,
|
|
falling back to OpenRouter when they're not. It also supports OpenAI reasoning models
|
|
like o3 and o4-mini which require a different request schema.
|
|
|
|
# Run 3 times with a single model
|
|
python openrouter_threatmodeler.py ./helm-chart -n 3 -m claude-3-haiku
|
|
|
|
# Run 2 times each with 3 different models (6 total runs)
|
|
python openrouter_threatmodeler.py ./helm-chart -n 2 -m claude-3-haiku gpt-4 mistral-large
|
|
|
|
# Mix of short names and direct paths
|
|
python openrouter_threatmodeler.py ./helm-chart -n 1 -m claude-3-opus openai/gpt-4-turbo-preview
|
|
|
|
# Use direct API access with environment variables
|
|
ANTHROPIC_API_KEY=your_key OPENAI_API_KEY=your_key python openrouter_threatmodeler.py ./helm-chart -n 2 -m claude-3-haiku gpt-4
|
|
|
|
# Use direct API access with command-line arguments
|
|
python openrouter_threatmodeler.py ./helm-chart -n 2 -m claude-3-haiku gpt-4 --anthropic-api-key=your_key --openai-api-key=your_key
|
|
|
|
# Run used for the SANs Whitepaper Experiment:
|
|
python openrouter_threatmodeler.py $PWD/wordpress-helm-chart -o combined.csv -n 5 -m anthropic/claude-3-5-haiku-latest anthropic/claude-3-7-sonnet-latest anthropic/claude-opus-4-0 openai/o4-mini openai/chatgpt-4o-latest
|
|
|
|
"""
|
|
|
|
import os
|
|
import csv
|
|
import json
|
|
import yaml
|
|
import time
|
|
import argparse
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional, Tuple, Union
|
|
from dataclasses import dataclass, asdict
|
|
import re
|
|
import requests
|
|
|
|
|
|
@dataclass
|
|
class SecurityFinding:
|
|
"""Data class for security findings"""
|
|
run_id: int
|
|
timestamp: str
|
|
mitre_attack_tactic: str
|
|
mitre_attack_base_technique_id: str
|
|
mitre_attack_sub_technique_id: Optional[str]
|
|
mitre_attack_technique_name: str
|
|
misconfiguration: str
|
|
risk_priority: str
|
|
mitigation_steps: str
|
|
helm_file: str
|
|
raw_response: str
|
|
model: str # Added model field
|
|
|
|
|
|
@dataclass
|
|
class SimplifiedFinding:
|
|
"""Simplified data class for CSV output"""
|
|
run_number: int
|
|
model: str # Added model field
|
|
mitre_attack_tactic: str
|
|
mitre_attack_base_technique_id: str
|
|
mitre_attack_sub_technique_id: Optional[str]
|
|
mitre_attack_technique_name: str
|
|
risk_priority: str
|
|
|
|
|
|
class HelmSecurityAnalyzer:
|
|
"""Main class for analyzing Helm charts using OpenRouter API"""
|
|
|
|
# Available models on OpenRouter
|
|
AVAILABLE_MODELS = {
|
|
'claude-3-opus': 'anthropic/claude-3-opus',
|
|
'claude-3-sonnet': 'anthropic/claude-3-sonnet-20240229',
|
|
'claude-3-haiku': 'anthropic/claude-3-haiku',
|
|
'gpt-4': 'openai/gpt-4',
|
|
'gpt-4-turbo': 'openai/gpt-4-turbo-preview',
|
|
'gpt-3.5-turbo': 'openai/gpt-3.5-turbo',
|
|
'o3': 'openai/o3',
|
|
'o4-mini': 'openai/o4-mini',
|
|
'mistral-large': 'mistralai/mistral-large',
|
|
'mixtral-8x7b': 'mistralai/mixtral-8x7b-instruct',
|
|
'gemini-pro': 'google/gemini-pro',
|
|
'llama-3-70b': 'meta-llama/llama-3-70b-instruct',
|
|
'llama-3-8b': 'meta-llama/llama-3-8b-instruct'
|
|
}
|
|
|
|
# Known reasoning models that require special formatting
|
|
REASONING_MODELS = ["o3", "o4-mini"]
|
|
|
|
def __init__(self, api_key: str, model: str = "anthropic/claude-3-haiku",
|
|
site_url: Optional[str] = None, app_name: Optional[str] = None,
|
|
anthropic_api_key: Optional[str] = None, openai_api_key: Optional[str] = None):
|
|
"""Initialize the analyzer with API credentials"""
|
|
self.api_key = api_key # OpenRouter API key
|
|
self.anthropic_api_key = anthropic_api_key or os.environ.get("ANTHROPIC_API_KEY")
|
|
self.openai_api_key = openai_api_key or os.environ.get("OPENAI_API_KEY")
|
|
self.base_url = "https://openrouter.ai/api/v1"
|
|
|
|
# Map short model names to full OpenRouter model paths
|
|
if model in self.AVAILABLE_MODELS:
|
|
self.model = self.AVAILABLE_MODELS[model]
|
|
else:
|
|
# Allow direct model path specification
|
|
self.model = model
|
|
|
|
self.site_url = site_url or "https://github.com/esekercan/sans-paper/tree/main/python"
|
|
self.app_name = app_name or "Helm Security Analyzer"
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def set_model(self, model: str):
|
|
"""Set the current model"""
|
|
if model in self.AVAILABLE_MODELS:
|
|
self.model = self.AVAILABLE_MODELS[model]
|
|
else:
|
|
self.model = model
|
|
|
|
def call_anthropic_api(self, system_prompt: str, user_prompt: str) -> Tuple[str, Dict[str, int]]:
|
|
"""Call Anthropic API directly when API key is available"""
|
|
self.logger.info(f"Calling Anthropic API directly with model: {self.model}")
|
|
|
|
# Extract the model name from the full path (e.g., "anthropic/claude-3-haiku" -> "claude-3-haiku")
|
|
model_name = self.model.split('/')[-1] if '/' in self.model else self.model
|
|
|
|
headers = {
|
|
"x-api-key": self.anthropic_api_key,
|
|
"anthropic-version": "2023-06-01",
|
|
"content-type": "application/json"
|
|
}
|
|
|
|
data = {
|
|
"model": model_name,
|
|
"system": system_prompt,
|
|
"messages": [
|
|
{"role": "user", "content": user_prompt}
|
|
],
|
|
"temperature": 0.1,
|
|
"max_tokens": 5000,
|
|
"top_p": 1
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
"https://api.anthropic.com/v1/messages",
|
|
headers=headers,
|
|
json=data,
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
content = result['content'][0]['text']
|
|
|
|
# Extract usage information (Anthropic might provide different format)
|
|
usage = {
|
|
'input_tokens': result.get('usage', {}).get('input_tokens', 0),
|
|
'output_tokens': result.get('usage', {}).get('output_tokens', 0),
|
|
'total_tokens': result.get('usage', {}).get('input_tokens', 0) + result.get('usage', {}).get('output_tokens', 0)
|
|
}
|
|
|
|
self.logger.info(f"Anthropic API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
|
|
return content, usage
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Anthropic API call failed: {e}")
|
|
if hasattr(e, 'response') and e.response is not None:
|
|
self.logger.error(f"Response content: {e.response.text}")
|
|
raise
|
|
|
|
|
|
def call_openai_api(self, system_prompt: str, user_prompt: str) -> Tuple[str, Dict[str, int]]:
|
|
"""Call OpenAI API directly when API key is available"""
|
|
self.logger.info(f"Calling OpenAI API directly with model: {self.model}")
|
|
|
|
# Extract the model name from the full path (e.g., "openai/gpt-4" -> "gpt-4")
|
|
model_name = self.model.split('/')[-1] if '/' in self.model else self.model
|
|
|
|
# Check if this is a reasoning model that requires the Responses API
|
|
is_reasoning_model = model_name in self.REASONING_MODELS
|
|
|
|
if is_reasoning_model:
|
|
return self._call_openai_responses_api(system_prompt, user_prompt, model_name)
|
|
else:
|
|
return self._call_openai_chat_completions_api(system_prompt, user_prompt, model_name)
|
|
|
|
def _call_openai_chat_completions_api(self, system_prompt: str, user_prompt: str, model_name: str) -> Tuple[str, Dict[str, int]]:
|
|
"""Call OpenAI Chat Completions API for standard models"""
|
|
self.logger.info(f"Using Chat Completions API for model: {model_name}")
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {self.openai_api_key}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Base request data for chat completions
|
|
data = {
|
|
"model": model_name,
|
|
"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt}
|
|
],
|
|
"temperature": 0.1,
|
|
"max_tokens": 5000,
|
|
"top_p": 1,
|
|
"frequency_penalty": 0,
|
|
"presence_penalty": 0
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
"https://api.openai.com/v1/chat/completions",
|
|
headers=headers,
|
|
json=data,
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
|
|
if 'choices' not in result or not result['choices']:
|
|
self.logger.error(f"No choices in OpenAI response: {result}")
|
|
raise ValueError("No choices returned in OpenAI API response")
|
|
|
|
content = result['choices'][0]['message']['content']
|
|
|
|
# Extract usage information
|
|
usage_data = result.get('usage', {})
|
|
usage = {
|
|
'input_tokens': usage_data.get('prompt_tokens', 0),
|
|
'output_tokens': usage_data.get('completion_tokens', 0),
|
|
'total_tokens': usage_data.get('total_tokens', 0)
|
|
}
|
|
|
|
self.logger.info(f"Chat Completions API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
|
|
return content, usage
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Chat Completions API call failed: {e}")
|
|
if hasattr(e, 'response') and e.response is not None:
|
|
self.logger.error(f"Response content: {e.response.text}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error in Chat Completions API call: {e}")
|
|
raise
|
|
|
|
def _call_openai_responses_api(self, system_prompt: str, user_prompt: str, model_name: str) -> Tuple[str, Dict[str, int]]:
|
|
"""Call OpenAI Responses API for reasoning models like o3 and o4-mini"""
|
|
self.logger.info(f"Using Responses API for reasoning model: {model_name}")
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {self.openai_api_key}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Base request data for responses API (note: uses "input" instead of "messages", no temperature)
|
|
data = {
|
|
"model": model_name,
|
|
"input": [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt}
|
|
],
|
|
"max_output_tokens": 4000,
|
|
"reasoning": {"effort": "medium"}
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
"https://api.openai.com/v1/responses",
|
|
headers=headers,
|
|
json=data,
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
|
|
# Parse the Responses API structure which has an 'output' array
|
|
content = None
|
|
if 'output' in result and isinstance(result['output'], list):
|
|
# Look for the message object in the output array
|
|
for output_item in result['output']:
|
|
if output_item.get('type') == 'message' and 'content' in output_item:
|
|
# Extract text from the content array
|
|
content_array = output_item['content']
|
|
if isinstance(content_array, list) and len(content_array) > 0:
|
|
# Look for the text content
|
|
for content_item in content_array:
|
|
if content_item.get('type') == 'output_text' and 'text' in content_item:
|
|
content = content_item['text']
|
|
break
|
|
break
|
|
|
|
# Fallback to legacy structure parsing if needed
|
|
if content is None:
|
|
if 'choices' in result and result['choices']:
|
|
content = result['choices'][0]['message']['content']
|
|
elif 'response' in result:
|
|
content = result['response']
|
|
elif 'content' in result:
|
|
content = result['content']
|
|
else:
|
|
self.logger.error(f"Unexpected response structure from Responses API: {list(result.keys())}")
|
|
raise ValueError("Unexpected response structure from OpenAI Responses API")
|
|
|
|
# Extract usage information (might be in different format for Responses API)
|
|
usage_data = result.get('usage', {})
|
|
usage = {
|
|
'input_tokens': usage_data.get('prompt_tokens', 0) or usage_data.get('input_tokens', 0),
|
|
'output_tokens': usage_data.get('completion_tokens', 0) or usage_data.get('output_tokens', 0),
|
|
'total_tokens': usage_data.get('total_tokens', 0)
|
|
}
|
|
|
|
# Calculate total if not provided
|
|
if usage['total_tokens'] == 0:
|
|
usage['total_tokens'] = usage['input_tokens'] + usage['output_tokens']
|
|
|
|
self.logger.info(f"Responses API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
|
|
return content, usage
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Responses API call failed: {e}")
|
|
if hasattr(e, 'response') and e.response is not None:
|
|
self.logger.error(f"Response content: {e.response.text}")
|
|
raise
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error in Responses API call: {e}")
|
|
raise
|
|
|
|
def load_helm_files(self, helm_dir: Path) -> Dict[str, str]:
|
|
"""Load all YAML files from a Helm chart directory"""
|
|
helm_files = {}
|
|
|
|
# Common Helm chart file patterns
|
|
patterns = ['*.yaml', '*.yml', '**/*.yaml', '**/*.yml']
|
|
|
|
for pattern in patterns:
|
|
for file_path in helm_dir.glob(pattern):
|
|
if file_path.is_file():
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
content = f.read()
|
|
relative_path = file_path.relative_to(helm_dir)
|
|
helm_files[str(relative_path)] = content
|
|
self.logger.info(f"Loaded {relative_path}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error loading {file_path}: {e}")
|
|
|
|
return helm_files
|
|
|
|
def load_prompt_from_file(self, prompt_file: Path) -> str:
|
|
"""Load prompt from file - raises error if not found"""
|
|
if not prompt_file.exists():
|
|
raise FileNotFoundError(f"Required prompt file not found: {prompt_file}")
|
|
|
|
try:
|
|
with open(prompt_file, 'r', encoding='utf-8') as f:
|
|
content = f.read().strip()
|
|
self.logger.info(f"Loaded prompt from {prompt_file}")
|
|
return content
|
|
except Exception as e:
|
|
self.logger.error(f"Error loading prompt file {prompt_file}: {e}")
|
|
raise
|
|
|
|
def create_system_prompt(self, system_prompt_file: Path) -> str:
|
|
"""Load the system prompt from file"""
|
|
return self.load_prompt_from_file(system_prompt_file)
|
|
|
|
def create_user_prompt(self, helm_files: Dict[str, str], user_prompt_file: Path) -> str:
|
|
"""Load the user prompt from file and append Helm chart content"""
|
|
user_prompt = self.load_prompt_from_file(user_prompt_file)
|
|
|
|
# Add Helm files content
|
|
helm_content = "\n\n**Helm Chart Files:**\n\n"
|
|
for filename, content in helm_files.items():
|
|
helm_content += f"### {filename}\n```yaml\n{content}\n```\n\n"
|
|
|
|
return user_prompt + helm_content
|
|
|
|
def parse_response(self, response: str, run_id: int, helm_file: str, model: str) -> List[SecurityFinding]:
|
|
"""Parse the LLM response into structured findings"""
|
|
findings = []
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# Try to parse table format
|
|
# Look for table rows (lines with | separators)
|
|
lines = response.split('\n')
|
|
table_rows = []
|
|
in_table = False
|
|
|
|
for line in lines:
|
|
if '|' in line and line.count('|') >= 4:
|
|
# Skip header and separator rows
|
|
if any(header in line.lower() for header in ['technique','tactic', 'misconfiguration', 'risk', 'mitigation']):
|
|
in_table = True
|
|
continue
|
|
if line.strip().startswith('|---') or line.strip().startswith('| ---'):
|
|
continue
|
|
if in_table:
|
|
table_rows.append(line)
|
|
|
|
# If we found table rows, parse them
|
|
if table_rows:
|
|
for row in table_rows:
|
|
cells = [cell.strip() for cell in row.split('|') if cell.strip()]
|
|
|
|
# Handle new format (with separate base and sub technique columns)
|
|
if len(cells) >= 7:
|
|
finding = SecurityFinding(
|
|
run_id=run_id,
|
|
timestamp=timestamp,
|
|
mitre_attack_base_technique_id=cells[0],
|
|
mitre_attack_sub_technique_id=cells[1] if cells[1] and cells[1] != "N/A" else None,
|
|
mitre_attack_technique_name=cells[2],
|
|
mitre_attack_tactic=cells[3],
|
|
misconfiguration=cells[4],
|
|
risk_priority=cells[5],
|
|
mitigation_steps=cells[6],
|
|
helm_file=helm_file,
|
|
raw_response=response,
|
|
model=model
|
|
)
|
|
findings.append(finding)
|
|
# Handle old format (with single technique column) for backward compatibility
|
|
elif len(cells) >= 5:
|
|
# Try to split the technique into base and sub if it contains a dot
|
|
technique = cells[0]
|
|
base_technique = technique
|
|
sub_technique = None
|
|
technique_name = ""
|
|
|
|
# Extract technique name if present (e.g., "T1234 - Brute Force")
|
|
if " - " in technique:
|
|
parts = technique.split(" - ", 1)
|
|
technique_id = parts[0].strip()
|
|
technique_name = parts[1].strip()
|
|
|
|
# Now check if the ID part contains a dot for sub-technique
|
|
if '.' in technique_id:
|
|
id_parts = technique_id.split('.')
|
|
base_technique = id_parts[0]
|
|
sub_technique = f"{base_technique}.{id_parts[1]}"
|
|
else:
|
|
base_technique = technique_id
|
|
# If no name separator but contains a dot (e.g., T1234.001)
|
|
elif '.' in technique:
|
|
parts = technique.split('.')
|
|
base_technique = parts[0]
|
|
sub_technique = f"{base_technique}.{parts[1]}"
|
|
|
|
finding = SecurityFinding(
|
|
run_id=run_id,
|
|
timestamp=timestamp,
|
|
mitre_attack_base_technique_id=base_technique,
|
|
mitre_attack_sub_technique_id=sub_technique,
|
|
mitre_attack_technique_name=technique_name,
|
|
mitre_attack_tactic=cells[1],
|
|
misconfiguration=cells[2],
|
|
risk_priority=cells[3],
|
|
mitigation_steps=cells[4],
|
|
helm_file=helm_file,
|
|
raw_response=response,
|
|
model=model
|
|
)
|
|
findings.append(finding)
|
|
else:
|
|
# If no table format found, try to extract findings from text
|
|
# This is a fallback parser
|
|
self.logger.warning("No table format found in response, attempting text extraction")
|
|
|
|
# Create a single finding with the full response
|
|
finding = SecurityFinding(
|
|
run_id=run_id,
|
|
timestamp=timestamp,
|
|
mitre_attack_base_technique_id="See raw response",
|
|
mitre_attack_sub_technique_id=None,
|
|
mitre_attack_technique_name="See raw response",
|
|
mitre_attack_tactic="See raw response",
|
|
misconfiguration="See raw response",
|
|
risk_priority="See raw response",
|
|
mitigation_steps="See raw response",
|
|
helm_file=helm_file,
|
|
raw_response=response,
|
|
model=model
|
|
)
|
|
findings.append(finding)
|
|
|
|
return findings
|
|
|
|
def analyze_helm_chart(self, helm_dir: Path, system_prompt_file: Path,
|
|
user_prompt_file: Path) -> tuple[str, Dict[str, str], Dict[str, int]]:
|
|
"""Analyze a single Helm chart and return findings"""
|
|
# Load Helm files
|
|
helm_files = self.load_helm_files(helm_dir)
|
|
if not helm_files:
|
|
raise ValueError(f"No YAML files found in {helm_dir}")
|
|
|
|
# Create prompts
|
|
system_prompt = self.create_system_prompt(system_prompt_file)
|
|
user_prompt = self.create_user_prompt(helm_files, user_prompt_file)
|
|
|
|
# Check if we should use direct API access based on model and available API keys
|
|
model_provider = self.model.split('/')[0] if '/' in self.model else ""
|
|
|
|
# Try to use Anthropic API directly for Anthropic models
|
|
if model_provider == "anthropic" and self.anthropic_api_key:
|
|
try:
|
|
self.logger.info(f"Using direct Anthropic API for model: {self.model}")
|
|
content, usage = self.call_anthropic_api(system_prompt, user_prompt)
|
|
return content, helm_files, usage
|
|
except Exception as e:
|
|
self.logger.warning(f"Direct Anthropic API call failed, falling back to OpenRouter: {e}")
|
|
# Fall back to OpenRouter
|
|
|
|
# Try to use OpenAI API directly for OpenAI models
|
|
elif model_provider == "openai" and self.openai_api_key:
|
|
try:
|
|
self.logger.info(f"Using direct OpenAI API for model: {self.model}")
|
|
content, usage = self.call_openai_api(system_prompt, user_prompt)
|
|
return content, helm_files, usage
|
|
except Exception as e:
|
|
self.logger.warning(f"Direct OpenAI API call failed, falling back to OpenRouter: {e}")
|
|
# Fall back to OpenRouter
|
|
|
|
# Use OpenRouter API as fallback
|
|
self.logger.info(f"Using OpenRouter API with model: {self.model}")
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
"HTTP-Referer": self.site_url,
|
|
"X-Title": self.app_name,
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Base request data
|
|
data = {
|
|
"model": self.model,
|
|
"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt}
|
|
],
|
|
"temperature": .1,
|
|
"top_p": 1,
|
|
"frequency_penalty": 0,
|
|
"presence_penalty": 0
|
|
}
|
|
|
|
# Extract model name and apply reasoning model specific formatting if needed
|
|
model_name = self.model.split('/')[-1] if '/' in self.model else self.model
|
|
if model_name in self.REASONING_MODELS:
|
|
self.logger.info(f"OpenRouter: Applying reasoning model formatting for: {model_name}")
|
|
data["max_output_tokens"] = 4000
|
|
data["reasoning"] = {"effort": "medium"}
|
|
else:
|
|
self.logger.info(f"OpenRouter: Using standard formatting for: {model_name}")
|
|
data["max_tokens"] = 4000
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{self.base_url}/chat/completions",
|
|
headers=headers,
|
|
json=data,
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
content = result['choices'][0]['message']['content']
|
|
|
|
# Extract usage information
|
|
usage_data = result.get('usage', {})
|
|
usage = {
|
|
'input_tokens': usage_data.get('prompt_tokens', 0),
|
|
'output_tokens': usage_data.get('completion_tokens', 0),
|
|
'total_tokens': usage_data.get('total_tokens', 0)
|
|
}
|
|
|
|
self.logger.info(f"OpenRouter API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
|
|
return content, helm_files, usage
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"OpenRouter API call failed: {e}")
|
|
if hasattr(e, 'response') and e.response is not None:
|
|
self.logger.error(f"Response content: {e.response.text}")
|
|
raise
|
|
|
|
def run_experiments(self, helm_dir: Path, models: List[str], num_runs: int,
|
|
output_file: Path, system_prompt_file: Path,
|
|
user_prompt_file: Path, delay_seconds: float = 1.0):
|
|
"""Run multiple experiments with multiple models and save results"""
|
|
all_findings = []
|
|
all_responses = []
|
|
total_usage_by_model = {}
|
|
overall_run_id = 0
|
|
|
|
# Initialize usage tracking for each model
|
|
for model in models:
|
|
model_name = self.AVAILABLE_MODELS.get(model, model)
|
|
total_usage_by_model[model_name] = {
|
|
'input_tokens': 0,
|
|
'output_tokens': 0,
|
|
'total_tokens': 0,
|
|
'runs': 0
|
|
}
|
|
|
|
# Run experiments for each model
|
|
for model_idx, model in enumerate(models):
|
|
self.set_model(model)
|
|
model_name = self.model
|
|
self.logger.info(f"\n{'='*60}")
|
|
self.logger.info(f"Starting experiments with model {model_idx + 1}/{len(models)}: {model_name}")
|
|
self.logger.info(f"{'='*60}\n")
|
|
|
|
for run_in_model in range(1, num_runs + 1):
|
|
overall_run_id += 1
|
|
self.logger.info(f"Model '{model_name}' - Run {run_in_model}/{num_runs} (Overall run {overall_run_id})")
|
|
|
|
try:
|
|
response, helm_files, usage = self.analyze_helm_chart(
|
|
helm_dir, system_prompt_file, user_prompt_file
|
|
)
|
|
helm_files_str = ", ".join(helm_files.keys())
|
|
findings = self.parse_response(response, overall_run_id, helm_files_str, model_name)
|
|
all_findings.extend(findings)
|
|
|
|
# Update usage for this model
|
|
total_usage_by_model[model_name]['input_tokens'] += usage['input_tokens']
|
|
total_usage_by_model[model_name]['output_tokens'] += usage['output_tokens']
|
|
total_usage_by_model[model_name]['total_tokens'] += usage['total_tokens']
|
|
total_usage_by_model[model_name]['runs'] += 1
|
|
|
|
# Store the raw response with metadata
|
|
all_responses.append({
|
|
'run_id': overall_run_id,
|
|
'model_run_id': run_in_model,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'helm_files': helm_files_str,
|
|
'response': response,
|
|
'usage': usage,
|
|
'model': model_name
|
|
})
|
|
|
|
self.logger.info(f"Run {overall_run_id} completed with {len(findings)} findings")
|
|
|
|
# Add delay between API calls to avoid rate limiting
|
|
if overall_run_id < len(models) * num_runs:
|
|
time.sleep(delay_seconds)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in run {overall_run_id} (model: {model_name}): {e}")
|
|
# Continue with next run
|
|
continue
|
|
|
|
# Save outputs
|
|
self.save_to_csv(all_findings, output_file)
|
|
self.save_to_markdown(all_responses, output_file.with_suffix('.md'), total_usage_by_model)
|
|
return all_findings
|
|
|
|
def save_to_csv(self, findings: List[SecurityFinding], output_file: Path):
|
|
"""Save simplified findings to CSV file"""
|
|
if not findings:
|
|
self.logger.warning("No findings to save")
|
|
return
|
|
|
|
# Create simplified findings with only the requested fields
|
|
simplified_findings = []
|
|
for finding in findings:
|
|
simplified = SimplifiedFinding(
|
|
run_number=finding.run_id,
|
|
model=finding.model,
|
|
mitre_attack_base_technique_id=finding.mitre_attack_base_technique_id,
|
|
mitre_attack_sub_technique_id=finding.mitre_attack_sub_technique_id,
|
|
mitre_attack_technique_name=finding.mitre_attack_technique_name,
|
|
mitre_attack_tactic=finding.mitre_attack_tactic,
|
|
risk_priority=finding.risk_priority
|
|
)
|
|
simplified_findings.append(asdict(simplified))
|
|
|
|
# Write to CSV
|
|
with open(output_file, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=['run_number', 'model', 'mitre_attack_base_technique_id',
|
|
'mitre_attack_sub_technique_id', 'mitre_attack_technique_name',
|
|
'mitre_attack_tactic', 'risk_priority'])
|
|
writer.writeheader()
|
|
writer.writerows(simplified_findings)
|
|
|
|
self.logger.info(f"Saved {len(simplified_findings)} findings to {output_file}")
|
|
|
|
def save_to_markdown(self, responses: List[Dict[str, Any]], output_file: Path,
|
|
total_usage_by_model: Dict[str, Dict[str, int]]):
|
|
"""Save all responses to a Markdown file"""
|
|
if not responses:
|
|
self.logger.warning("No responses to save")
|
|
return
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write("# Helm Chart Security Analysis Results\n\n")
|
|
f.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
|
|
f.write(f"Total runs: {len(responses)}\n\n")
|
|
|
|
# Add summary of models used
|
|
f.write("## Models Used\n\n")
|
|
for model, usage in total_usage_by_model.items():
|
|
if usage['runs'] > 0:
|
|
f.write(f"- **{model}**: {usage['runs']} runs\n")
|
|
f.write("\n")
|
|
|
|
# Add total token usage summary by model
|
|
f.write("## Token Usage Summary by Model\n\n")
|
|
grand_total_tokens = 0
|
|
for model, usage in total_usage_by_model.items():
|
|
if usage['runs'] > 0:
|
|
f.write(f"### {model}\n\n")
|
|
f.write(f"- **Runs:** {usage['runs']}\n")
|
|
f.write(f"- **Total Input Tokens:** {usage['input_tokens']:,}\n")
|
|
f.write(f"- **Total Output Tokens:** {usage['output_tokens']:,}\n")
|
|
f.write(f"- **Total Tokens:** {usage['total_tokens']:,}\n")
|
|
f.write(f"- **Average Tokens per Run:** {usage['total_tokens'] // usage['runs']:,}\n\n")
|
|
grand_total_tokens += usage['total_tokens']
|
|
|
|
f.write(f"### Grand Total\n\n")
|
|
f.write(f"- **Total Tokens Across All Models:** {grand_total_tokens:,}\n\n")
|
|
|
|
f.write("---\n\n")
|
|
|
|
# Group responses by model for better organization
|
|
from collections import defaultdict
|
|
responses_by_model = defaultdict(list)
|
|
for response_data in responses:
|
|
responses_by_model[response_data['model']].append(response_data)
|
|
|
|
# Write responses grouped by model
|
|
for model, model_responses in responses_by_model.items():
|
|
f.write(f"## Model: {model}\n\n")
|
|
|
|
for response_data in model_responses:
|
|
f.write(f"### Run {response_data['run_id']} (Model Run {response_data['model_run_id']})\n\n")
|
|
f.write(f"**Timestamp:** {response_data['timestamp']}\n\n")
|
|
f.write(f"**Analyzed files:** {response_data['helm_files']}\n\n")
|
|
|
|
# Add token usage for this run
|
|
if 'usage' in response_data:
|
|
f.write("#### Token Usage\n\n")
|
|
f.write(f"- **Input Tokens:** {response_data['usage']['input_tokens']:,}\n")
|
|
f.write(f"- **Output Tokens:** {response_data['usage']['output_tokens']:,}\n")
|
|
f.write(f"- **Total Tokens:** {response_data['usage']['total_tokens']:,}\n\n")
|
|
|
|
f.write("#### Analysis Results\n\n")
|
|
f.write(response_data['response'])
|
|
f.write("\n\n---\n\n")
|
|
|
|
self.logger.info(f"Saved {len(responses)} responses to {output_file}")
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze Helm charts for security issues using OpenRouter API",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=f"""
|
|
Available models:
|
|
{chr(10).join(f' - {key}: {value}' for key, value in HelmSecurityAnalyzer.AVAILABLE_MODELS.items())}
|
|
|
|
You can also specify a direct model path like 'anthropic/claude-3-opus-20240229'
|
|
|
|
Examples:
|
|
# Run 3 times with claude-3-haiku
|
|
%(prog)s ./helm-chart -n 3 -m claude-3-haiku
|
|
|
|
# Run 2 times each with 3 different models
|
|
%(prog)s ./helm-chart -n 2 -m claude-3-haiku gpt-4 mistral-large
|
|
|
|
# Run with direct model paths
|
|
%(prog)s ./helm-chart -n 1 -m anthropic/claude-3-opus-20240229 openai/gpt-4-turbo
|
|
|
|
# Use direct API access with environment variables
|
|
ANTHROPIC_API_KEY=your_key OPENAI_API_KEY=your_key %(prog)s ./helm-chart -n 2 -m claude-3-haiku gpt-4
|
|
|
|
# Use direct API access with command-line arguments
|
|
%(prog)s ./helm-chart -n 2 -m claude-3-haiku gpt-4 --anthropic-api-key=your_key --openai-api-key=your_key
|
|
"""
|
|
)
|
|
parser.add_argument(
|
|
"helm_dir",
|
|
type=Path,
|
|
help="Path to Helm chart directory"
|
|
)
|
|
parser.add_argument(
|
|
"-n", "--num-runs",
|
|
type=int,
|
|
default=1,
|
|
help="Number of times to run the analysis per model (default: 1)"
|
|
)
|
|
parser.add_argument(
|
|
"-o", "--output",
|
|
type=Path,
|
|
default=Path("security_findings.csv"),
|
|
help="Output CSV file (default: security_findings.csv)"
|
|
)
|
|
parser.add_argument(
|
|
"-k", "--api-key",
|
|
type=str,
|
|
help="OpenRouter API key (or set OPENROUTER_API_KEY env var)"
|
|
)
|
|
parser.add_argument(
|
|
"--anthropic-api-key",
|
|
type=str,
|
|
help="Anthropic API key (or set ANTHROPIC_API_KEY env var)"
|
|
)
|
|
parser.add_argument(
|
|
"--openai-api-key",
|
|
type=str,
|
|
help="OpenAI API key (or set OPENAI_API_KEY env var)"
|
|
)
|
|
parser.add_argument(
|
|
"-m", "--models",
|
|
type=str,
|
|
nargs='+',
|
|
default=["anthropic/claude-3-haiku"],
|
|
help="Models to use (can specify multiple)"
|
|
)
|
|
parser.add_argument(
|
|
"-sp", "--system-prompt",
|
|
type=Path,
|
|
default=Path("prompts/system_prompt.md"),
|
|
help="System prompt file (default: prompts/system_prompt.md)"
|
|
)
|
|
parser.add_argument(
|
|
"-up", "--user-prompt",
|
|
type=Path,
|
|
default=Path("prompts/user_prompt.md"),
|
|
help="User prompt file (default: prompts/user_prompt.md)"
|
|
)
|
|
parser.add_argument(
|
|
"--site-url",
|
|
type=str,
|
|
help="Your site URL for OpenRouter tracking"
|
|
)
|
|
parser.add_argument(
|
|
"--app-name",
|
|
type=str,
|
|
help="Your app name for OpenRouter tracking"
|
|
)
|
|
parser.add_argument(
|
|
"-d", "--delay",
|
|
type=float,
|
|
default=1.0,
|
|
help="Delay between API calls in seconds (default: 1.0)"
|
|
)
|
|
parser.add_argument(
|
|
"-v", "--verbose",
|
|
action="store_true",
|
|
help="Enable verbose logging"
|
|
)
|
|
parser.add_argument(
|
|
"--list-models",
|
|
action="store_true",
|
|
help="List available models and exit"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Handle list-models flag
|
|
if args.list_models:
|
|
print("Available models:")
|
|
for key, value in HelmSecurityAnalyzer.AVAILABLE_MODELS.items():
|
|
print(f" - {key}: {value}")
|
|
return 0
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Get API key
|
|
api_key = args.api_key or os.environ.get("OPENROUTER_API_KEY")
|
|
if not api_key:
|
|
parser.error("API key required: use --api-key or set OPENROUTER_API_KEY")
|
|
|
|
# Check that prompt files exist
|
|
if not args.system_prompt.exists():
|
|
parser.error(f"System prompt file not found: {args.system_prompt}")
|
|
if not args.user_prompt.exists():
|
|
parser.error(f"User prompt file not found: {args.user_prompt}")
|
|
|
|
# Create analyzer (model will be set dynamically)
|
|
analyzer = HelmSecurityAnalyzer(
|
|
api_key=api_key,
|
|
model=args.models[0], # Initialize with first model
|
|
site_url=args.site_url,
|
|
app_name=args.app_name,
|
|
anthropic_api_key=args.anthropic_api_key,
|
|
openai_api_key=args.openai_api_key
|
|
)
|
|
|
|
try:
|
|
# Display run plan
|
|
total_runs = len(args.models) * args.num_runs
|
|
print(f"\nAnalysis Plan:")
|
|
print(f" - Models: {len(args.models)}")
|
|
for model in args.models:
|
|
model_name = analyzer.AVAILABLE_MODELS.get(model, model)
|
|
print(f" • {model_name}")
|
|
print(f" - Runs per model: {args.num_runs}")
|
|
print(f" - Total runs: {total_runs}")
|
|
print(f" - Estimated time: ~{total_runs * (args.delay + 10):.0f} seconds\n")
|
|
|
|
findings = analyzer.run_experiments(
|
|
helm_dir=args.helm_dir,
|
|
models=args.models,
|
|
num_runs=args.num_runs,
|
|
output_file=args.output,
|
|
system_prompt_file=args.system_prompt,
|
|
user_prompt_file=args.user_prompt,
|
|
delay_seconds=args.delay
|
|
)
|
|
|
|
print(f"\nAnalysis complete!")
|
|
print(f" - Total security findings: {len(findings)}")
|
|
print(f" - Models used: {len(args.models)}")
|
|
print(f" - Total runs: {total_runs}")
|
|
print(f"\nResults saved to:")
|
|
print(f" - CSV: {args.output}")
|
|
print(f" - Markdown: {args.output.with_suffix('.md')}")
|
|
print(f"\nPrompts loaded from:")
|
|
print(f" - System: {args.system_prompt}")
|
|
print(f" - User: {args.user_prompt}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Fatal error: {e}")
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|