mirror of
https://github.com/esekercan/sans-paper-public.git
synced 2026-02-12 15:52:45 +00:00
Clean commit with proof-of-concept code, architecture, readme, and input files
This commit is contained in:
5
README.md
Normal file
5
README.md
Normal file
@@ -0,0 +1,5 @@
|
||||
# sans-paper
|
||||
|
||||
The is the associated repository for the SANS Whitepaper: "Comparative Analysis of Large Language Model Performance in Automated Threat Modeling: A WordPress Application Case Study"
|
||||
|
||||
It was written in 2025 as a part of a graduate program.
|
||||
54
application_architecture_simplified.mm
Normal file
54
application_architecture_simplified.mm
Normal file
@@ -0,0 +1,54 @@
|
||||
|
||||
flowchart TB
|
||||
subgraph Inputs["Inputs"]
|
||||
HC["Helm Chart Directory"]
|
||||
PROMPTS["System & User Prompts"]
|
||||
CONFIG["Configuration<br>• API Keys<br>• Models<br>• Run Count"]
|
||||
end
|
||||
subgraph subGraph1["API Selection"]
|
||||
DECISION{"API Key Available?"}
|
||||
ANTHROPIC["Anthropic API<br>Claude Models"]
|
||||
OPENAI["OpenAI API<br>GPT Models"]
|
||||
OPENROUTER["OpenRouter API<br>Fallback"]
|
||||
end
|
||||
subgraph subGraph2["Analysis Engine"]
|
||||
LOAD["Load Helm Files"]
|
||||
ANALYZE["Security Analysis"]
|
||||
PARSE["Parse AI Response"]
|
||||
end
|
||||
subgraph Results["Results"]
|
||||
CSV["CSV Report<br>• MITRE ATT&CK Data<br>• Risk Priorities"]
|
||||
MD["Markdown Report<br>• Full Responses<br>• Token Usage"]
|
||||
end
|
||||
HC --> ANALYZER["Helm Security Analyzer"]
|
||||
PROMPTS --> ANALYZER
|
||||
CONFIG --> ANALYZER
|
||||
ANALYZER --> LOAD
|
||||
LOAD --> ANALYZE
|
||||
ANALYZE --> DECISION
|
||||
DECISION -- Anthropic Key --> ANTHROPIC
|
||||
DECISION -- OpenAI Key --> OPENAI
|
||||
DECISION -- No Direct Key --> OPENROUTER
|
||||
ANTHROPIC --> PARSE
|
||||
OPENAI --> PARSE
|
||||
OPENROUTER --> PARSE
|
||||
PARSE --> CSV & MD
|
||||
HC:::input
|
||||
PROMPTS:::input
|
||||
CONFIG:::input
|
||||
DECISION:::decision
|
||||
ANTHROPIC:::api
|
||||
OPENAI:::api
|
||||
OPENROUTER:::api
|
||||
LOAD:::process
|
||||
ANALYZE:::process
|
||||
PARSE:::process
|
||||
CSV:::output
|
||||
MD:::output
|
||||
ANALYZER:::core
|
||||
classDef input fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
|
||||
classDef core fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
|
||||
classDef api fill:#fff3e0,stroke:#f57c00,stroke-width:2px
|
||||
classDef process fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
|
||||
classDef output fill:#fce4ec,stroke:#c2185b,stroke-width:2px
|
||||
classDef decision fill:#fff8e1,stroke:#fbc02d,stroke-width:2px
|
||||
946
openrouter_threatmodeler.py
Normal file
946
openrouter_threatmodeler.py
Normal file
@@ -0,0 +1,946 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
OpenRouter Helm Chart Security Analysis Tool
|
||||
|
||||
This tool automates security analysis of Helm charts using OpenRouter.ai,
|
||||
supporting multiple AI models, mapping findings to MITRE ATT&CK framework,
|
||||
and outputting results to CSV and Markdown.
|
||||
|
||||
The tool can use Anthropic and OpenAI APIs directly when API keys are available,
|
||||
falling back to OpenRouter when they're not. It also supports OpenAI reasoning models
|
||||
like o3 and o4-mini which require a different request schema.
|
||||
|
||||
# Run 3 times with a single model
|
||||
python openrouter_threatmodeler.py ./helm-chart -n 3 -m claude-3-haiku
|
||||
|
||||
# Run 2 times each with 3 different models (6 total runs)
|
||||
python openrouter_threatmodeler.py ./helm-chart -n 2 -m claude-3-haiku gpt-4 mistral-large
|
||||
|
||||
# Mix of short names and direct paths
|
||||
python openrouter_threatmodeler.py ./helm-chart -n 1 -m claude-3-opus openai/gpt-4-turbo-preview
|
||||
|
||||
# Use direct API access with environment variables
|
||||
ANTHROPIC_API_KEY=your_key OPENAI_API_KEY=your_key python openrouter_threatmodeler.py ./helm-chart -n 2 -m claude-3-haiku gpt-4
|
||||
|
||||
# Use direct API access with command-line arguments
|
||||
python openrouter_threatmodeler.py ./helm-chart -n 2 -m claude-3-haiku gpt-4 --anthropic-api-key=your_key --openai-api-key=your_key
|
||||
|
||||
# Run used for the SANs Whitepaper Experiment:
|
||||
python openrouter_threatmodeler.py $PWD/wordpress-helm-chart -o combined.csv -n 5 -m anthropic/claude-3-5-haiku-latest anthropic/claude-3-7-sonnet-latest anthropic/claude-opus-4-0 openai/o4-mini openai/chatgpt-4o-latest
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import csv
|
||||
import json
|
||||
import yaml
|
||||
import time
|
||||
import argparse
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any, Optional, Tuple, Union
|
||||
from dataclasses import dataclass, asdict
|
||||
import re
|
||||
import requests
|
||||
|
||||
|
||||
@dataclass
|
||||
class SecurityFinding:
|
||||
"""Data class for security findings"""
|
||||
run_id: int
|
||||
timestamp: str
|
||||
mitre_attack_tactic: str
|
||||
mitre_attack_base_technique_id: str
|
||||
mitre_attack_sub_technique_id: Optional[str]
|
||||
mitre_attack_technique_name: str
|
||||
misconfiguration: str
|
||||
risk_priority: str
|
||||
mitigation_steps: str
|
||||
helm_file: str
|
||||
raw_response: str
|
||||
model: str # Added model field
|
||||
|
||||
|
||||
@dataclass
|
||||
class SimplifiedFinding:
|
||||
"""Simplified data class for CSV output"""
|
||||
run_number: int
|
||||
model: str # Added model field
|
||||
mitre_attack_tactic: str
|
||||
mitre_attack_base_technique_id: str
|
||||
mitre_attack_sub_technique_id: Optional[str]
|
||||
mitre_attack_technique_name: str
|
||||
risk_priority: str
|
||||
|
||||
|
||||
class HelmSecurityAnalyzer:
|
||||
"""Main class for analyzing Helm charts using OpenRouter API"""
|
||||
|
||||
# Available models on OpenRouter
|
||||
AVAILABLE_MODELS = {
|
||||
'claude-3-opus': 'anthropic/claude-3-opus',
|
||||
'claude-3-sonnet': 'anthropic/claude-3-sonnet-20240229',
|
||||
'claude-3-haiku': 'anthropic/claude-3-haiku',
|
||||
'gpt-4': 'openai/gpt-4',
|
||||
'gpt-4-turbo': 'openai/gpt-4-turbo-preview',
|
||||
'gpt-3.5-turbo': 'openai/gpt-3.5-turbo',
|
||||
'o3': 'openai/o3',
|
||||
'o4-mini': 'openai/o4-mini',
|
||||
'mistral-large': 'mistralai/mistral-large',
|
||||
'mixtral-8x7b': 'mistralai/mixtral-8x7b-instruct',
|
||||
'gemini-pro': 'google/gemini-pro',
|
||||
'llama-3-70b': 'meta-llama/llama-3-70b-instruct',
|
||||
'llama-3-8b': 'meta-llama/llama-3-8b-instruct'
|
||||
}
|
||||
|
||||
# Known reasoning models that require special formatting
|
||||
REASONING_MODELS = ["o3", "o4-mini"]
|
||||
|
||||
def __init__(self, api_key: str, model: str = "anthropic/claude-3-haiku",
|
||||
site_url: Optional[str] = None, app_name: Optional[str] = None,
|
||||
anthropic_api_key: Optional[str] = None, openai_api_key: Optional[str] = None):
|
||||
"""Initialize the analyzer with API credentials"""
|
||||
self.api_key = api_key # OpenRouter API key
|
||||
self.anthropic_api_key = anthropic_api_key or os.environ.get("ANTHROPIC_API_KEY")
|
||||
self.openai_api_key = openai_api_key or os.environ.get("OPENAI_API_KEY")
|
||||
self.base_url = "https://openrouter.ai/api/v1"
|
||||
|
||||
# Map short model names to full OpenRouter model paths
|
||||
if model in self.AVAILABLE_MODELS:
|
||||
self.model = self.AVAILABLE_MODELS[model]
|
||||
else:
|
||||
# Allow direct model path specification
|
||||
self.model = model
|
||||
|
||||
self.site_url = site_url or "https://github.com/esekercan/sans-paper/tree/main/python"
|
||||
self.app_name = app_name or "Helm Security Analyzer"
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def set_model(self, model: str):
|
||||
"""Set the current model"""
|
||||
if model in self.AVAILABLE_MODELS:
|
||||
self.model = self.AVAILABLE_MODELS[model]
|
||||
else:
|
||||
self.model = model
|
||||
|
||||
def call_anthropic_api(self, system_prompt: str, user_prompt: str) -> Tuple[str, Dict[str, int]]:
|
||||
"""Call Anthropic API directly when API key is available"""
|
||||
self.logger.info(f"Calling Anthropic API directly with model: {self.model}")
|
||||
|
||||
# Extract the model name from the full path (e.g., "anthropic/claude-3-haiku" -> "claude-3-haiku")
|
||||
model_name = self.model.split('/')[-1] if '/' in self.model else self.model
|
||||
|
||||
headers = {
|
||||
"x-api-key": self.anthropic_api_key,
|
||||
"anthropic-version": "2023-06-01",
|
||||
"content-type": "application/json"
|
||||
}
|
||||
|
||||
data = {
|
||||
"model": model_name,
|
||||
"system": system_prompt,
|
||||
"messages": [
|
||||
{"role": "user", "content": user_prompt}
|
||||
],
|
||||
"temperature": 0.1,
|
||||
"max_tokens": 5000,
|
||||
"top_p": 1
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
"https://api.anthropic.com/v1/messages",
|
||||
headers=headers,
|
||||
json=data,
|
||||
timeout=120
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
content = result['content'][0]['text']
|
||||
|
||||
# Extract usage information (Anthropic might provide different format)
|
||||
usage = {
|
||||
'input_tokens': result.get('usage', {}).get('input_tokens', 0),
|
||||
'output_tokens': result.get('usage', {}).get('output_tokens', 0),
|
||||
'total_tokens': result.get('usage', {}).get('input_tokens', 0) + result.get('usage', {}).get('output_tokens', 0)
|
||||
}
|
||||
|
||||
self.logger.info(f"Anthropic API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
|
||||
return content, usage
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"Anthropic API call failed: {e}")
|
||||
if hasattr(e, 'response') and e.response is not None:
|
||||
self.logger.error(f"Response content: {e.response.text}")
|
||||
raise
|
||||
|
||||
|
||||
def call_openai_api(self, system_prompt: str, user_prompt: str) -> Tuple[str, Dict[str, int]]:
|
||||
"""Call OpenAI API directly when API key is available"""
|
||||
self.logger.info(f"Calling OpenAI API directly with model: {self.model}")
|
||||
|
||||
# Extract the model name from the full path (e.g., "openai/gpt-4" -> "gpt-4")
|
||||
model_name = self.model.split('/')[-1] if '/' in self.model else self.model
|
||||
|
||||
# Check if this is a reasoning model that requires the Responses API
|
||||
is_reasoning_model = model_name in self.REASONING_MODELS
|
||||
|
||||
if is_reasoning_model:
|
||||
return self._call_openai_responses_api(system_prompt, user_prompt, model_name)
|
||||
else:
|
||||
return self._call_openai_chat_completions_api(system_prompt, user_prompt, model_name)
|
||||
|
||||
def _call_openai_chat_completions_api(self, system_prompt: str, user_prompt: str, model_name: str) -> Tuple[str, Dict[str, int]]:
|
||||
"""Call OpenAI Chat Completions API for standard models"""
|
||||
self.logger.info(f"Using Chat Completions API for model: {model_name}")
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.openai_api_key}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
# Base request data for chat completions
|
||||
data = {
|
||||
"model": model_name,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt}
|
||||
],
|
||||
"temperature": 0.1,
|
||||
"max_tokens": 5000,
|
||||
"top_p": 1,
|
||||
"frequency_penalty": 0,
|
||||
"presence_penalty": 0
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
"https://api.openai.com/v1/chat/completions",
|
||||
headers=headers,
|
||||
json=data,
|
||||
timeout=120
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
|
||||
if 'choices' not in result or not result['choices']:
|
||||
self.logger.error(f"No choices in OpenAI response: {result}")
|
||||
raise ValueError("No choices returned in OpenAI API response")
|
||||
|
||||
content = result['choices'][0]['message']['content']
|
||||
|
||||
# Extract usage information
|
||||
usage_data = result.get('usage', {})
|
||||
usage = {
|
||||
'input_tokens': usage_data.get('prompt_tokens', 0),
|
||||
'output_tokens': usage_data.get('completion_tokens', 0),
|
||||
'total_tokens': usage_data.get('total_tokens', 0)
|
||||
}
|
||||
|
||||
self.logger.info(f"Chat Completions API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
|
||||
return content, usage
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"Chat Completions API call failed: {e}")
|
||||
if hasattr(e, 'response') and e.response is not None:
|
||||
self.logger.error(f"Response content: {e.response.text}")
|
||||
raise
|
||||
except Exception as e:
|
||||
self.logger.error(f"Unexpected error in Chat Completions API call: {e}")
|
||||
raise
|
||||
|
||||
def _call_openai_responses_api(self, system_prompt: str, user_prompt: str, model_name: str) -> Tuple[str, Dict[str, int]]:
|
||||
"""Call OpenAI Responses API for reasoning models like o3 and o4-mini"""
|
||||
self.logger.info(f"Using Responses API for reasoning model: {model_name}")
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.openai_api_key}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
# Base request data for responses API (note: uses "input" instead of "messages", no temperature)
|
||||
data = {
|
||||
"model": model_name,
|
||||
"input": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt}
|
||||
],
|
||||
"max_output_tokens": 4000,
|
||||
"reasoning": {"effort": "medium"}
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
"https://api.openai.com/v1/responses",
|
||||
headers=headers,
|
||||
json=data,
|
||||
timeout=120
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
|
||||
# Parse the Responses API structure which has an 'output' array
|
||||
content = None
|
||||
if 'output' in result and isinstance(result['output'], list):
|
||||
# Look for the message object in the output array
|
||||
for output_item in result['output']:
|
||||
if output_item.get('type') == 'message' and 'content' in output_item:
|
||||
# Extract text from the content array
|
||||
content_array = output_item['content']
|
||||
if isinstance(content_array, list) and len(content_array) > 0:
|
||||
# Look for the text content
|
||||
for content_item in content_array:
|
||||
if content_item.get('type') == 'output_text' and 'text' in content_item:
|
||||
content = content_item['text']
|
||||
break
|
||||
break
|
||||
|
||||
# Fallback to legacy structure parsing if needed
|
||||
if content is None:
|
||||
if 'choices' in result and result['choices']:
|
||||
content = result['choices'][0]['message']['content']
|
||||
elif 'response' in result:
|
||||
content = result['response']
|
||||
elif 'content' in result:
|
||||
content = result['content']
|
||||
else:
|
||||
self.logger.error(f"Unexpected response structure from Responses API: {list(result.keys())}")
|
||||
raise ValueError("Unexpected response structure from OpenAI Responses API")
|
||||
|
||||
# Extract usage information (might be in different format for Responses API)
|
||||
usage_data = result.get('usage', {})
|
||||
usage = {
|
||||
'input_tokens': usage_data.get('prompt_tokens', 0) or usage_data.get('input_tokens', 0),
|
||||
'output_tokens': usage_data.get('completion_tokens', 0) or usage_data.get('output_tokens', 0),
|
||||
'total_tokens': usage_data.get('total_tokens', 0)
|
||||
}
|
||||
|
||||
# Calculate total if not provided
|
||||
if usage['total_tokens'] == 0:
|
||||
usage['total_tokens'] = usage['input_tokens'] + usage['output_tokens']
|
||||
|
||||
self.logger.info(f"Responses API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
|
||||
return content, usage
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"Responses API call failed: {e}")
|
||||
if hasattr(e, 'response') and e.response is not None:
|
||||
self.logger.error(f"Response content: {e.response.text}")
|
||||
raise
|
||||
except Exception as e:
|
||||
self.logger.error(f"Unexpected error in Responses API call: {e}")
|
||||
raise
|
||||
|
||||
def load_helm_files(self, helm_dir: Path) -> Dict[str, str]:
|
||||
"""Load all YAML files from a Helm chart directory"""
|
||||
helm_files = {}
|
||||
|
||||
# Common Helm chart file patterns
|
||||
patterns = ['*.yaml', '*.yml', '**/*.yaml', '**/*.yml']
|
||||
|
||||
for pattern in patterns:
|
||||
for file_path in helm_dir.glob(pattern):
|
||||
if file_path.is_file():
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
content = f.read()
|
||||
relative_path = file_path.relative_to(helm_dir)
|
||||
helm_files[str(relative_path)] = content
|
||||
self.logger.info(f"Loaded {relative_path}")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error loading {file_path}: {e}")
|
||||
|
||||
return helm_files
|
||||
|
||||
def load_prompt_from_file(self, prompt_file: Path) -> str:
|
||||
"""Load prompt from file - raises error if not found"""
|
||||
if not prompt_file.exists():
|
||||
raise FileNotFoundError(f"Required prompt file not found: {prompt_file}")
|
||||
|
||||
try:
|
||||
with open(prompt_file, 'r', encoding='utf-8') as f:
|
||||
content = f.read().strip()
|
||||
self.logger.info(f"Loaded prompt from {prompt_file}")
|
||||
return content
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error loading prompt file {prompt_file}: {e}")
|
||||
raise
|
||||
|
||||
def create_system_prompt(self, system_prompt_file: Path) -> str:
|
||||
"""Load the system prompt from file"""
|
||||
return self.load_prompt_from_file(system_prompt_file)
|
||||
|
||||
def create_user_prompt(self, helm_files: Dict[str, str], user_prompt_file: Path) -> str:
|
||||
"""Load the user prompt from file and append Helm chart content"""
|
||||
user_prompt = self.load_prompt_from_file(user_prompt_file)
|
||||
|
||||
# Add Helm files content
|
||||
helm_content = "\n\n**Helm Chart Files:**\n\n"
|
||||
for filename, content in helm_files.items():
|
||||
helm_content += f"### {filename}\n```yaml\n{content}\n```\n\n"
|
||||
|
||||
return user_prompt + helm_content
|
||||
|
||||
def parse_response(self, response: str, run_id: int, helm_file: str, model: str) -> List[SecurityFinding]:
|
||||
"""Parse the LLM response into structured findings"""
|
||||
findings = []
|
||||
timestamp = datetime.now().isoformat()
|
||||
|
||||
# Try to parse table format
|
||||
# Look for table rows (lines with | separators)
|
||||
lines = response.split('\n')
|
||||
table_rows = []
|
||||
in_table = False
|
||||
|
||||
for line in lines:
|
||||
if '|' in line and line.count('|') >= 4:
|
||||
# Skip header and separator rows
|
||||
if any(header in line.lower() for header in ['technique','tactic', 'misconfiguration', 'risk', 'mitigation']):
|
||||
in_table = True
|
||||
continue
|
||||
if line.strip().startswith('|---') or line.strip().startswith('| ---'):
|
||||
continue
|
||||
if in_table:
|
||||
table_rows.append(line)
|
||||
|
||||
# If we found table rows, parse them
|
||||
if table_rows:
|
||||
for row in table_rows:
|
||||
cells = [cell.strip() for cell in row.split('|') if cell.strip()]
|
||||
|
||||
# Handle new format (with separate base and sub technique columns)
|
||||
if len(cells) >= 7:
|
||||
finding = SecurityFinding(
|
||||
run_id=run_id,
|
||||
timestamp=timestamp,
|
||||
mitre_attack_base_technique_id=cells[0],
|
||||
mitre_attack_sub_technique_id=cells[1] if cells[1] and cells[1] != "N/A" else None,
|
||||
mitre_attack_technique_name=cells[2],
|
||||
mitre_attack_tactic=cells[3],
|
||||
misconfiguration=cells[4],
|
||||
risk_priority=cells[5],
|
||||
mitigation_steps=cells[6],
|
||||
helm_file=helm_file,
|
||||
raw_response=response,
|
||||
model=model
|
||||
)
|
||||
findings.append(finding)
|
||||
# Handle old format (with single technique column) for backward compatibility
|
||||
elif len(cells) >= 5:
|
||||
# Try to split the technique into base and sub if it contains a dot
|
||||
technique = cells[0]
|
||||
base_technique = technique
|
||||
sub_technique = None
|
||||
technique_name = ""
|
||||
|
||||
# Extract technique name if present (e.g., "T1234 - Brute Force")
|
||||
if " - " in technique:
|
||||
parts = technique.split(" - ", 1)
|
||||
technique_id = parts[0].strip()
|
||||
technique_name = parts[1].strip()
|
||||
|
||||
# Now check if the ID part contains a dot for sub-technique
|
||||
if '.' in technique_id:
|
||||
id_parts = technique_id.split('.')
|
||||
base_technique = id_parts[0]
|
||||
sub_technique = f"{base_technique}.{id_parts[1]}"
|
||||
else:
|
||||
base_technique = technique_id
|
||||
# If no name separator but contains a dot (e.g., T1234.001)
|
||||
elif '.' in technique:
|
||||
parts = technique.split('.')
|
||||
base_technique = parts[0]
|
||||
sub_technique = f"{base_technique}.{parts[1]}"
|
||||
|
||||
finding = SecurityFinding(
|
||||
run_id=run_id,
|
||||
timestamp=timestamp,
|
||||
mitre_attack_base_technique_id=base_technique,
|
||||
mitre_attack_sub_technique_id=sub_technique,
|
||||
mitre_attack_technique_name=technique_name,
|
||||
mitre_attack_tactic=cells[1],
|
||||
misconfiguration=cells[2],
|
||||
risk_priority=cells[3],
|
||||
mitigation_steps=cells[4],
|
||||
helm_file=helm_file,
|
||||
raw_response=response,
|
||||
model=model
|
||||
)
|
||||
findings.append(finding)
|
||||
else:
|
||||
# If no table format found, try to extract findings from text
|
||||
# This is a fallback parser
|
||||
self.logger.warning("No table format found in response, attempting text extraction")
|
||||
|
||||
# Create a single finding with the full response
|
||||
finding = SecurityFinding(
|
||||
run_id=run_id,
|
||||
timestamp=timestamp,
|
||||
mitre_attack_base_technique_id="See raw response",
|
||||
mitre_attack_sub_technique_id=None,
|
||||
mitre_attack_technique_name="See raw response",
|
||||
mitre_attack_tactic="See raw response",
|
||||
misconfiguration="See raw response",
|
||||
risk_priority="See raw response",
|
||||
mitigation_steps="See raw response",
|
||||
helm_file=helm_file,
|
||||
raw_response=response,
|
||||
model=model
|
||||
)
|
||||
findings.append(finding)
|
||||
|
||||
return findings
|
||||
|
||||
def analyze_helm_chart(self, helm_dir: Path, system_prompt_file: Path,
|
||||
user_prompt_file: Path) -> tuple[str, Dict[str, str], Dict[str, int]]:
|
||||
"""Analyze a single Helm chart and return findings"""
|
||||
# Load Helm files
|
||||
helm_files = self.load_helm_files(helm_dir)
|
||||
if not helm_files:
|
||||
raise ValueError(f"No YAML files found in {helm_dir}")
|
||||
|
||||
# Create prompts
|
||||
system_prompt = self.create_system_prompt(system_prompt_file)
|
||||
user_prompt = self.create_user_prompt(helm_files, user_prompt_file)
|
||||
|
||||
# Check if we should use direct API access based on model and available API keys
|
||||
model_provider = self.model.split('/')[0] if '/' in self.model else ""
|
||||
|
||||
# Try to use Anthropic API directly for Anthropic models
|
||||
if model_provider == "anthropic" and self.anthropic_api_key:
|
||||
try:
|
||||
self.logger.info(f"Using direct Anthropic API for model: {self.model}")
|
||||
content, usage = self.call_anthropic_api(system_prompt, user_prompt)
|
||||
return content, helm_files, usage
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Direct Anthropic API call failed, falling back to OpenRouter: {e}")
|
||||
# Fall back to OpenRouter
|
||||
|
||||
# Try to use OpenAI API directly for OpenAI models
|
||||
elif model_provider == "openai" and self.openai_api_key:
|
||||
try:
|
||||
self.logger.info(f"Using direct OpenAI API for model: {self.model}")
|
||||
content, usage = self.call_openai_api(system_prompt, user_prompt)
|
||||
return content, helm_files, usage
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Direct OpenAI API call failed, falling back to OpenRouter: {e}")
|
||||
# Fall back to OpenRouter
|
||||
|
||||
# Use OpenRouter API as fallback
|
||||
self.logger.info(f"Using OpenRouter API with model: {self.model}")
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"HTTP-Referer": self.site_url,
|
||||
"X-Title": self.app_name,
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
# Base request data
|
||||
data = {
|
||||
"model": self.model,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt}
|
||||
],
|
||||
"temperature": .1,
|
||||
"top_p": 1,
|
||||
"frequency_penalty": 0,
|
||||
"presence_penalty": 0
|
||||
}
|
||||
|
||||
# Extract model name and apply reasoning model specific formatting if needed
|
||||
model_name = self.model.split('/')[-1] if '/' in self.model else self.model
|
||||
if model_name in self.REASONING_MODELS:
|
||||
self.logger.info(f"OpenRouter: Applying reasoning model formatting for: {model_name}")
|
||||
data["max_output_tokens"] = 4000
|
||||
data["reasoning"] = {"effort": "medium"}
|
||||
else:
|
||||
self.logger.info(f"OpenRouter: Using standard formatting for: {model_name}")
|
||||
data["max_tokens"] = 4000
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{self.base_url}/chat/completions",
|
||||
headers=headers,
|
||||
json=data,
|
||||
timeout=120
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
content = result['choices'][0]['message']['content']
|
||||
|
||||
# Extract usage information
|
||||
usage_data = result.get('usage', {})
|
||||
usage = {
|
||||
'input_tokens': usage_data.get('prompt_tokens', 0),
|
||||
'output_tokens': usage_data.get('completion_tokens', 0),
|
||||
'total_tokens': usage_data.get('total_tokens', 0)
|
||||
}
|
||||
|
||||
self.logger.info(f"OpenRouter API call successful - Tokens used: {usage['total_tokens']} (input: {usage['input_tokens']}, output: {usage['output_tokens']})")
|
||||
return content, helm_files, usage
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"OpenRouter API call failed: {e}")
|
||||
if hasattr(e, 'response') and e.response is not None:
|
||||
self.logger.error(f"Response content: {e.response.text}")
|
||||
raise
|
||||
|
||||
def run_experiments(self, helm_dir: Path, models: List[str], num_runs: int,
|
||||
output_file: Path, system_prompt_file: Path,
|
||||
user_prompt_file: Path, delay_seconds: float = 1.0):
|
||||
"""Run multiple experiments with multiple models and save results"""
|
||||
all_findings = []
|
||||
all_responses = []
|
||||
total_usage_by_model = {}
|
||||
overall_run_id = 0
|
||||
|
||||
# Initialize usage tracking for each model
|
||||
for model in models:
|
||||
model_name = self.AVAILABLE_MODELS.get(model, model)
|
||||
total_usage_by_model[model_name] = {
|
||||
'input_tokens': 0,
|
||||
'output_tokens': 0,
|
||||
'total_tokens': 0,
|
||||
'runs': 0
|
||||
}
|
||||
|
||||
# Run experiments for each model
|
||||
for model_idx, model in enumerate(models):
|
||||
self.set_model(model)
|
||||
model_name = self.model
|
||||
self.logger.info(f"\n{'='*60}")
|
||||
self.logger.info(f"Starting experiments with model {model_idx + 1}/{len(models)}: {model_name}")
|
||||
self.logger.info(f"{'='*60}\n")
|
||||
|
||||
for run_in_model in range(1, num_runs + 1):
|
||||
overall_run_id += 1
|
||||
self.logger.info(f"Model '{model_name}' - Run {run_in_model}/{num_runs} (Overall run {overall_run_id})")
|
||||
|
||||
try:
|
||||
response, helm_files, usage = self.analyze_helm_chart(
|
||||
helm_dir, system_prompt_file, user_prompt_file
|
||||
)
|
||||
helm_files_str = ", ".join(helm_files.keys())
|
||||
findings = self.parse_response(response, overall_run_id, helm_files_str, model_name)
|
||||
all_findings.extend(findings)
|
||||
|
||||
# Update usage for this model
|
||||
total_usage_by_model[model_name]['input_tokens'] += usage['input_tokens']
|
||||
total_usage_by_model[model_name]['output_tokens'] += usage['output_tokens']
|
||||
total_usage_by_model[model_name]['total_tokens'] += usage['total_tokens']
|
||||
total_usage_by_model[model_name]['runs'] += 1
|
||||
|
||||
# Store the raw response with metadata
|
||||
all_responses.append({
|
||||
'run_id': overall_run_id,
|
||||
'model_run_id': run_in_model,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'helm_files': helm_files_str,
|
||||
'response': response,
|
||||
'usage': usage,
|
||||
'model': model_name
|
||||
})
|
||||
|
||||
self.logger.info(f"Run {overall_run_id} completed with {len(findings)} findings")
|
||||
|
||||
# Add delay between API calls to avoid rate limiting
|
||||
if overall_run_id < len(models) * num_runs:
|
||||
time.sleep(delay_seconds)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in run {overall_run_id} (model: {model_name}): {e}")
|
||||
# Continue with next run
|
||||
continue
|
||||
|
||||
# Save outputs
|
||||
self.save_to_csv(all_findings, output_file)
|
||||
self.save_to_markdown(all_responses, output_file.with_suffix('.md'), total_usage_by_model)
|
||||
return all_findings
|
||||
|
||||
def save_to_csv(self, findings: List[SecurityFinding], output_file: Path):
|
||||
"""Save simplified findings to CSV file"""
|
||||
if not findings:
|
||||
self.logger.warning("No findings to save")
|
||||
return
|
||||
|
||||
# Create simplified findings with only the requested fields
|
||||
simplified_findings = []
|
||||
for finding in findings:
|
||||
simplified = SimplifiedFinding(
|
||||
run_number=finding.run_id,
|
||||
model=finding.model,
|
||||
mitre_attack_base_technique_id=finding.mitre_attack_base_technique_id,
|
||||
mitre_attack_sub_technique_id=finding.mitre_attack_sub_technique_id,
|
||||
mitre_attack_technique_name=finding.mitre_attack_technique_name,
|
||||
mitre_attack_tactic=finding.mitre_attack_tactic,
|
||||
risk_priority=finding.risk_priority
|
||||
)
|
||||
simplified_findings.append(asdict(simplified))
|
||||
|
||||
# Write to CSV
|
||||
with open(output_file, 'w', newline='', encoding='utf-8') as f:
|
||||
writer = csv.DictWriter(f, fieldnames=['run_number', 'model', 'mitre_attack_base_technique_id',
|
||||
'mitre_attack_sub_technique_id', 'mitre_attack_technique_name',
|
||||
'mitre_attack_tactic', 'risk_priority'])
|
||||
writer.writeheader()
|
||||
writer.writerows(simplified_findings)
|
||||
|
||||
self.logger.info(f"Saved {len(simplified_findings)} findings to {output_file}")
|
||||
|
||||
def save_to_markdown(self, responses: List[Dict[str, Any]], output_file: Path,
|
||||
total_usage_by_model: Dict[str, Dict[str, int]]):
|
||||
"""Save all responses to a Markdown file"""
|
||||
if not responses:
|
||||
self.logger.warning("No responses to save")
|
||||
return
|
||||
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
f.write("# Helm Chart Security Analysis Results\n\n")
|
||||
f.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
|
||||
f.write(f"Total runs: {len(responses)}\n\n")
|
||||
|
||||
# Add summary of models used
|
||||
f.write("## Models Used\n\n")
|
||||
for model, usage in total_usage_by_model.items():
|
||||
if usage['runs'] > 0:
|
||||
f.write(f"- **{model}**: {usage['runs']} runs\n")
|
||||
f.write("\n")
|
||||
|
||||
# Add total token usage summary by model
|
||||
f.write("## Token Usage Summary by Model\n\n")
|
||||
grand_total_tokens = 0
|
||||
for model, usage in total_usage_by_model.items():
|
||||
if usage['runs'] > 0:
|
||||
f.write(f"### {model}\n\n")
|
||||
f.write(f"- **Runs:** {usage['runs']}\n")
|
||||
f.write(f"- **Total Input Tokens:** {usage['input_tokens']:,}\n")
|
||||
f.write(f"- **Total Output Tokens:** {usage['output_tokens']:,}\n")
|
||||
f.write(f"- **Total Tokens:** {usage['total_tokens']:,}\n")
|
||||
f.write(f"- **Average Tokens per Run:** {usage['total_tokens'] // usage['runs']:,}\n\n")
|
||||
grand_total_tokens += usage['total_tokens']
|
||||
|
||||
f.write(f"### Grand Total\n\n")
|
||||
f.write(f"- **Total Tokens Across All Models:** {grand_total_tokens:,}\n\n")
|
||||
|
||||
f.write("---\n\n")
|
||||
|
||||
# Group responses by model for better organization
|
||||
from collections import defaultdict
|
||||
responses_by_model = defaultdict(list)
|
||||
for response_data in responses:
|
||||
responses_by_model[response_data['model']].append(response_data)
|
||||
|
||||
# Write responses grouped by model
|
||||
for model, model_responses in responses_by_model.items():
|
||||
f.write(f"## Model: {model}\n\n")
|
||||
|
||||
for response_data in model_responses:
|
||||
f.write(f"### Run {response_data['run_id']} (Model Run {response_data['model_run_id']})\n\n")
|
||||
f.write(f"**Timestamp:** {response_data['timestamp']}\n\n")
|
||||
f.write(f"**Analyzed files:** {response_data['helm_files']}\n\n")
|
||||
|
||||
# Add token usage for this run
|
||||
if 'usage' in response_data:
|
||||
f.write("#### Token Usage\n\n")
|
||||
f.write(f"- **Input Tokens:** {response_data['usage']['input_tokens']:,}\n")
|
||||
f.write(f"- **Output Tokens:** {response_data['usage']['output_tokens']:,}\n")
|
||||
f.write(f"- **Total Tokens:** {response_data['usage']['total_tokens']:,}\n\n")
|
||||
|
||||
f.write("#### Analysis Results\n\n")
|
||||
f.write(response_data['response'])
|
||||
f.write("\n\n---\n\n")
|
||||
|
||||
self.logger.info(f"Saved {len(responses)} responses to {output_file}")
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Analyze Helm charts for security issues using OpenRouter API",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=f"""
|
||||
Available models:
|
||||
{chr(10).join(f' - {key}: {value}' for key, value in HelmSecurityAnalyzer.AVAILABLE_MODELS.items())}
|
||||
|
||||
You can also specify a direct model path like 'anthropic/claude-3-opus-20240229'
|
||||
|
||||
Examples:
|
||||
# Run 3 times with claude-3-haiku
|
||||
%(prog)s ./helm-chart -n 3 -m claude-3-haiku
|
||||
|
||||
# Run 2 times each with 3 different models
|
||||
%(prog)s ./helm-chart -n 2 -m claude-3-haiku gpt-4 mistral-large
|
||||
|
||||
# Run with direct model paths
|
||||
%(prog)s ./helm-chart -n 1 -m anthropic/claude-3-opus-20240229 openai/gpt-4-turbo
|
||||
|
||||
# Use direct API access with environment variables
|
||||
ANTHROPIC_API_KEY=your_key OPENAI_API_KEY=your_key %(prog)s ./helm-chart -n 2 -m claude-3-haiku gpt-4
|
||||
|
||||
# Use direct API access with command-line arguments
|
||||
%(prog)s ./helm-chart -n 2 -m claude-3-haiku gpt-4 --anthropic-api-key=your_key --openai-api-key=your_key
|
||||
"""
|
||||
)
|
||||
parser.add_argument(
|
||||
"helm_dir",
|
||||
type=Path,
|
||||
help="Path to Helm chart directory"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-n", "--num-runs",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Number of times to run the analysis per model (default: 1)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o", "--output",
|
||||
type=Path,
|
||||
default=Path("security_findings.csv"),
|
||||
help="Output CSV file (default: security_findings.csv)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k", "--api-key",
|
||||
type=str,
|
||||
help="OpenRouter API key (or set OPENROUTER_API_KEY env var)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--anthropic-api-key",
|
||||
type=str,
|
||||
help="Anthropic API key (or set ANTHROPIC_API_KEY env var)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--openai-api-key",
|
||||
type=str,
|
||||
help="OpenAI API key (or set OPENAI_API_KEY env var)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-m", "--models",
|
||||
type=str,
|
||||
nargs='+',
|
||||
default=["anthropic/claude-3-haiku"],
|
||||
help="Models to use (can specify multiple)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-sp", "--system-prompt",
|
||||
type=Path,
|
||||
default=Path("prompts/system_prompt.md"),
|
||||
help="System prompt file (default: prompts/system_prompt.md)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-up", "--user-prompt",
|
||||
type=Path,
|
||||
default=Path("prompts/user_prompt.md"),
|
||||
help="User prompt file (default: prompts/user_prompt.md)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--site-url",
|
||||
type=str,
|
||||
help="Your site URL for OpenRouter tracking"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--app-name",
|
||||
type=str,
|
||||
help="Your app name for OpenRouter tracking"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d", "--delay",
|
||||
type=float,
|
||||
default=1.0,
|
||||
help="Delay between API calls in seconds (default: 1.0)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v", "--verbose",
|
||||
action="store_true",
|
||||
help="Enable verbose logging"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--list-models",
|
||||
action="store_true",
|
||||
help="List available models and exit"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Handle list-models flag
|
||||
if args.list_models:
|
||||
print("Available models:")
|
||||
for key, value in HelmSecurityAnalyzer.AVAILABLE_MODELS.items():
|
||||
print(f" - {key}: {value}")
|
||||
return 0
|
||||
|
||||
# Setup logging
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
# Get API key
|
||||
api_key = args.api_key or os.environ.get("OPENROUTER_API_KEY")
|
||||
if not api_key:
|
||||
parser.error("API key required: use --api-key or set OPENROUTER_API_KEY")
|
||||
|
||||
# Check that prompt files exist
|
||||
if not args.system_prompt.exists():
|
||||
parser.error(f"System prompt file not found: {args.system_prompt}")
|
||||
if not args.user_prompt.exists():
|
||||
parser.error(f"User prompt file not found: {args.user_prompt}")
|
||||
|
||||
# Create analyzer (model will be set dynamically)
|
||||
analyzer = HelmSecurityAnalyzer(
|
||||
api_key=api_key,
|
||||
model=args.models[0], # Initialize with first model
|
||||
site_url=args.site_url,
|
||||
app_name=args.app_name,
|
||||
anthropic_api_key=args.anthropic_api_key,
|
||||
openai_api_key=args.openai_api_key
|
||||
)
|
||||
|
||||
try:
|
||||
# Display run plan
|
||||
total_runs = len(args.models) * args.num_runs
|
||||
print(f"\nAnalysis Plan:")
|
||||
print(f" - Models: {len(args.models)}")
|
||||
for model in args.models:
|
||||
model_name = analyzer.AVAILABLE_MODELS.get(model, model)
|
||||
print(f" • {model_name}")
|
||||
print(f" - Runs per model: {args.num_runs}")
|
||||
print(f" - Total runs: {total_runs}")
|
||||
print(f" - Estimated time: ~{total_runs * (args.delay + 10):.0f} seconds\n")
|
||||
|
||||
findings = analyzer.run_experiments(
|
||||
helm_dir=args.helm_dir,
|
||||
models=args.models,
|
||||
num_runs=args.num_runs,
|
||||
output_file=args.output,
|
||||
system_prompt_file=args.system_prompt,
|
||||
user_prompt_file=args.user_prompt,
|
||||
delay_seconds=args.delay
|
||||
)
|
||||
|
||||
print(f"\nAnalysis complete!")
|
||||
print(f" - Total security findings: {len(findings)}")
|
||||
print(f" - Models used: {len(args.models)}")
|
||||
print(f" - Total runs: {total_runs}")
|
||||
print(f"\nResults saved to:")
|
||||
print(f" - CSV: {args.output}")
|
||||
print(f" - Markdown: {args.output.with_suffix('.md')}")
|
||||
print(f"\nPrompts loaded from:")
|
||||
print(f" - System: {args.system_prompt}")
|
||||
print(f" - User: {args.user_prompt}")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Fatal error: {e}")
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
||||
1
prompts/basic_system_prompt.md
Normal file
1
prompts/basic_system_prompt.md
Normal file
@@ -0,0 +1 @@
|
||||
You are a helpful assistant.
|
||||
1
prompts/basic_user_prompt.md
Normal file
1
prompts/basic_user_prompt.md
Normal file
@@ -0,0 +1 @@
|
||||
You are analyzing a helm chart for security vulnerabilities and reporting those back in a tabular format.
|
||||
9
prompts/system_prompt.md
Normal file
9
prompts/system_prompt.md
Normal file
@@ -0,0 +1,9 @@
|
||||
You are a Kubernetes and cloud security expert specializing in analyzing Helm charts
|
||||
for security vulnerabilities. You have deep knowledge of the MITRE ATT&CK framework and how it
|
||||
applies to containerized environments. Your analysis should be thorough, accurate, and actionable.
|
||||
|
||||
When analyzing Helm charts, you should:
|
||||
1. Carefully examine all configurations for security implications
|
||||
2. Map findings to specific MITRE ATT&CK tactics and techniques
|
||||
3. Provide clear risk assessments (High, Medium, Low)
|
||||
4. Suggest concrete mitigation steps
|
||||
22
prompts/user_prompt.md
Normal file
22
prompts/user_prompt.md
Normal file
@@ -0,0 +1,22 @@
|
||||
**Context**: You are analyzing a **Helm chart** used for deploying WordPress on an **Amazon EKS cluster**. The deployment consists of Kubernetes resources such as **Deployments, Services, PVCs, and Secrets**.
|
||||
**Objective**: Perform a **security analysis** of the Helm chart using **MITRE ATT&CK**. Identify potential threats based on misconfigurations and map them to **ATT&CK Tactics and Techniques**.
|
||||
**Scope**:
|
||||
* Focus on specific configurations in the Helm chart (values.yaml, Deployment, PVC, Secrets).
|
||||
* Identify **realistic attack vectors** that could be exploited.
|
||||
* Provide **actionable mitigations** for each identified risk.
|
||||
**Structure**: Output the findings in a table with the following columns:
|
||||
1. **MITRE ATT&CK Technique** (e.g., specific attack method T1234)
|
||||
2. **MITRE ATT&CK Tactic** (e.g., Initial Access, Persistence, Privilege Escalation)
|
||||
3. **Misconfiguration in Helm Chart** (e.g., unencrypted secrets, lack of network policies)
|
||||
4. **Risk Priority** (High, Medium, Low)
|
||||
5. **Recommended Mitigation Steps**
|
||||
**Reasoning**: Use a **step-by-step chain-of-thought approach**:
|
||||
1. **Identify relevant ATT&CK Tactics** that apply to Kubernetes environments.
|
||||
2. **Map misconfigurations in the Helm chart** to specific ATT&CK techniques.
|
||||
3. **Assess the risk level** and **propose effective mitigations** to reduce the attack surface.
|
||||
|
||||
Format your response as a structured table with columns in this order:
|
||||
|
||||
| MITRE ATT&CK Base Technique ID | MITRE ATT&CK Sub-Technique ID | MITRE ATT&CK Technique Name | MITRE ATT&CK Tactic | Misconfiguration in Helm Chart | Risk Priority | Recommended Mitigation Steps |
|
||||
|---------------------|---------------------|---------------------|------------------------|--------------------------------|---------------|------------------------------|
|
||||
| T#### | T####.### (if applicable) | ... | ... | ... | ... | ... |
|
||||
6
wordpress-helm-chart/Chart.yaml
Normal file
6
wordpress-helm-chart/Chart.yaml
Normal file
@@ -0,0 +1,6 @@
|
||||
apiVersion: v2
|
||||
name: wordpress-tm-example
|
||||
description: A Helm chart for WordPress for the purposes of Threat Modeling against a constant system.
|
||||
type: application
|
||||
version: 0.1.0
|
||||
appVersion: "6.0"
|
||||
30
wordpress-helm-chart/templates/deployment.yaml
Normal file
30
wordpress-helm-chart/templates/deployment.yaml
Normal file
@@ -0,0 +1,30 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: wordpress
|
||||
spec:
|
||||
replicas: {{ .Values.wordpress.replicas }}
|
||||
selector:
|
||||
matchLabels:
|
||||
app: wordpress
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: wordpress
|
||||
spec:
|
||||
containers:
|
||||
- name: wordpress
|
||||
image: {{ .Values.wordpress.image }}
|
||||
ports:
|
||||
- containerPort: {{ .Values.wordpress.containerPort }}
|
||||
env:
|
||||
- name: WORDPRESS_DB_HOST
|
||||
value: "{{ .Values.wordpress.env.WORDPRESS_DB_HOST }}"
|
||||
- name: WORDPRESS_DB_USER
|
||||
value: "{{ .Values.wordpress.env.WORDPRESS_DB_USER }}"
|
||||
- name: WORDPRESS_DB_PASSWORD
|
||||
value: "{{ .Values.wordpress.env.WORDPRESS_DB_PASSWORD }}"
|
||||
- name: WORDPRESS_DB_NAME
|
||||
value: "{{ .Values.wordpress.env.WORDPRESS_DB_NAME }}"
|
||||
securityContext:
|
||||
runAsUser: 0 # Running as root (insecure)
|
||||
16
wordpress-helm-chart/templates/ingress.yaml
Normal file
16
wordpress-helm-chart/templates/ingress.yaml
Normal file
@@ -0,0 +1,16 @@
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: Ingress
|
||||
metadata:
|
||||
name: wordpress-ingress
|
||||
spec:
|
||||
rules:
|
||||
- host: {{ .Values.ingress.hosts[0].host }}
|
||||
http:
|
||||
paths:
|
||||
- path: /
|
||||
pathType: ImplementationSpecific
|
||||
backend:
|
||||
service:
|
||||
name: wordpress
|
||||
port:
|
||||
number: 80
|
||||
28
wordpress-helm-chart/templates/mysql-deployment.yaml
Normal file
28
wordpress-helm-chart/templates/mysql-deployment.yaml
Normal file
@@ -0,0 +1,28 @@
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: mysql
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: mysql
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: mysql
|
||||
spec:
|
||||
containers:
|
||||
- name: mysql
|
||||
image: {{ .Values.mysql.image }}
|
||||
ports:
|
||||
- containerPort: {{ .Values.mysql.containerPort }}
|
||||
env:
|
||||
- name: MYSQL_ROOT_PASSWORD
|
||||
value: "{{ .Values.mysql.rootPassword }}"
|
||||
args:
|
||||
- "--default-authentication-plugin=mysql_native_password"
|
||||
- "--skip-secure-auth"
|
||||
- "--allow-root-remote={{ .Values.mysql.allowRootRemote }}" # Insecure
|
||||
securityContext:
|
||||
runAsUser: 0 # Running as root (insecure)
|
||||
11
wordpress-helm-chart/templates/mysql-service.yaml
Normal file
11
wordpress-helm-chart/templates/mysql-service.yaml
Normal file
@@ -0,0 +1,11 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: mysql
|
||||
spec:
|
||||
type: {{ .Values.mysqlService.type }}
|
||||
ports:
|
||||
- port: {{ .Values.mysqlService.port }}
|
||||
targetPort: {{ .Values.mysql.containerPort }}
|
||||
selector:
|
||||
app: mysql
|
||||
11
wordpress-helm-chart/templates/service.yaml
Normal file
11
wordpress-helm-chart/templates/service.yaml
Normal file
@@ -0,0 +1,11 @@
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: wordpress
|
||||
spec:
|
||||
type: {{ .Values.service.type }}
|
||||
ports:
|
||||
- port: {{ .Values.service.port }}
|
||||
targetPort: {{ .Values.wordpress.containerPort }}
|
||||
selector:
|
||||
app: wordpress
|
||||
46
wordpress-helm-chart/values.yaml
Normal file
46
wordpress-helm-chart/values.yaml
Normal file
@@ -0,0 +1,46 @@
|
||||
wordpress:
|
||||
image: wordpress:latest
|
||||
replicas: 1
|
||||
containerPort: 80
|
||||
env:
|
||||
WORDPRESS_DB_HOST: mysql
|
||||
WORDPRESS_DB_USER: root
|
||||
WORDPRESS_DB_PASSWORD: root
|
||||
WORDPRESS_DB_NAME: wordpress
|
||||
|
||||
mysql:
|
||||
image: mysql:5.7
|
||||
containerPort: 3306
|
||||
rootPassword: root
|
||||
allowRootRemote: true # Allows remote root login (insecure)
|
||||
persistence:
|
||||
enabled: false # No persistence, making it prone to data loss
|
||||
|
||||
service:
|
||||
type: LoadBalancer # Exposes MySQL publicly (insecure)
|
||||
port: 80
|
||||
|
||||
mysqlService:
|
||||
type: LoadBalancer
|
||||
port: 3306
|
||||
|
||||
ingress:
|
||||
enabled: true
|
||||
annotations: {}
|
||||
hosts:
|
||||
- host: wordpress.local
|
||||
paths:
|
||||
- path: /
|
||||
pathType: ImplementationSpecific
|
||||
|
||||
resources:
|
||||
limits: {}
|
||||
requests: {}
|
||||
|
||||
securityContext:
|
||||
enabled: false # No security hardening
|
||||
runAsUser: 0 # Runs as root (insecure)
|
||||
runAsGroup: 0
|
||||
|
||||
networkPolicy:
|
||||
enabled: false # No network restrictions
|
||||
Reference in New Issue
Block a user