mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-12 22:12:45 +00:00
451 lines
15 KiB
Python
451 lines
15 KiB
Python
"""
|
|
NeuroSploit v3 - Prompt Parser
|
|
|
|
Parses user prompts to extract:
|
|
1. Vulnerability types to test
|
|
2. Testing scope and depth
|
|
3. Special instructions
|
|
4. Output format preferences
|
|
|
|
This enables dynamic, prompt-driven testing instead of hardcoded vulnerability types.
|
|
"""
|
|
import re
|
|
from typing import List, Dict, Optional, Tuple
|
|
from backend.schemas.prompt import (
|
|
PromptParseResult,
|
|
VulnerabilityTypeExtracted,
|
|
TestingScope
|
|
)
|
|
|
|
|
|
class PromptParser:
|
|
"""
|
|
Parses penetration testing prompts to extract structured testing instructions.
|
|
|
|
Instead of requiring specific LLM calls for every parse, this uses pattern matching
|
|
and keyword analysis for fast, deterministic extraction.
|
|
"""
|
|
|
|
# Vulnerability keyword mappings
|
|
VULNERABILITY_KEYWORDS = {
|
|
# XSS variants
|
|
"xss_reflected": [
|
|
"xss", "cross-site scripting", "reflected xss", "reflected cross-site",
|
|
"script injection", "html injection"
|
|
],
|
|
"xss_stored": [
|
|
"stored xss", "persistent xss", "stored cross-site", "persistent cross-site"
|
|
],
|
|
"xss_dom": [
|
|
"dom xss", "dom-based xss", "dom based", "client-side xss"
|
|
],
|
|
|
|
# SQL Injection variants
|
|
"sqli_error": [
|
|
"sql injection", "sqli", "sql error", "error-based sql"
|
|
],
|
|
"sqli_union": [
|
|
"union sql", "union injection", "union-based", "union based"
|
|
],
|
|
"sqli_blind": [
|
|
"blind sql", "blind injection", "boolean sql", "boolean-based"
|
|
],
|
|
"sqli_time": [
|
|
"time-based sql", "time based sql", "time-based injection"
|
|
],
|
|
|
|
# Other injections
|
|
"nosql_injection": [
|
|
"nosql", "mongodb injection", "nosql injection"
|
|
],
|
|
"command_injection": [
|
|
"command injection", "os command", "shell injection", "rce",
|
|
"remote code execution", "code execution"
|
|
],
|
|
"ssti": [
|
|
"ssti", "template injection", "server-side template", "jinja injection",
|
|
"twig injection"
|
|
],
|
|
"ldap_injection": [
|
|
"ldap injection", "ldap"
|
|
],
|
|
"xpath_injection": [
|
|
"xpath injection", "xpath"
|
|
],
|
|
"header_injection": [
|
|
"header injection", "http header"
|
|
],
|
|
"crlf_injection": [
|
|
"crlf", "carriage return", "header splitting"
|
|
],
|
|
|
|
# File access
|
|
"lfi": [
|
|
"lfi", "local file inclusion", "file inclusion", "path traversal",
|
|
"directory traversal", "../"
|
|
],
|
|
"rfi": [
|
|
"rfi", "remote file inclusion"
|
|
],
|
|
"path_traversal": [
|
|
"path traversal", "directory traversal", "dot dot slash"
|
|
],
|
|
"file_upload": [
|
|
"file upload", "upload vulnerability", "unrestricted upload",
|
|
"malicious upload"
|
|
],
|
|
"xxe": [
|
|
"xxe", "xml external entity", "xml injection"
|
|
],
|
|
|
|
# Request forgery
|
|
"ssrf": [
|
|
"ssrf", "server-side request forgery", "server side request",
|
|
"internal request"
|
|
],
|
|
"ssrf_cloud": [
|
|
"cloud metadata", "169.254.169.254", "metadata service", "aws metadata",
|
|
"gcp metadata"
|
|
],
|
|
"csrf": [
|
|
"csrf", "cross-site request forgery", "xsrf"
|
|
],
|
|
|
|
# Authentication
|
|
"auth_bypass": [
|
|
"authentication bypass", "auth bypass", "login bypass", "broken auth"
|
|
],
|
|
"session_fixation": [
|
|
"session fixation", "session hijacking"
|
|
],
|
|
"jwt_manipulation": [
|
|
"jwt", "json web token", "token manipulation", "jwt bypass"
|
|
],
|
|
"weak_password": [
|
|
"weak password", "password policy", "credential"
|
|
],
|
|
"brute_force": [
|
|
"brute force", "credential stuffing", "password spray"
|
|
],
|
|
|
|
# Authorization
|
|
"idor": [
|
|
"idor", "insecure direct object", "direct object reference"
|
|
],
|
|
"bola": [
|
|
"bola", "broken object level", "api authorization"
|
|
],
|
|
"privilege_escalation": [
|
|
"privilege escalation", "privesc", "priv esc", "elevation"
|
|
],
|
|
|
|
# API Security
|
|
"rate_limiting": [
|
|
"rate limit", "rate limiting", "throttling"
|
|
],
|
|
"mass_assignment": [
|
|
"mass assignment", "parameter pollution"
|
|
],
|
|
"excessive_data": [
|
|
"excessive data", "data exposure", "over-fetching"
|
|
],
|
|
"graphql_introspection": [
|
|
"graphql introspection", "graphql schema"
|
|
],
|
|
"graphql_injection": [
|
|
"graphql injection", "graphql attack"
|
|
],
|
|
|
|
# Client-side
|
|
"cors_misconfig": [
|
|
"cors", "cross-origin", "cors misconfiguration"
|
|
],
|
|
"clickjacking": [
|
|
"clickjacking", "click jacking", "ui redressing", "x-frame-options"
|
|
],
|
|
"open_redirect": [
|
|
"open redirect", "url redirect", "redirect vulnerability"
|
|
],
|
|
|
|
# Information disclosure
|
|
"error_disclosure": [
|
|
"error message", "stack trace", "debug information"
|
|
],
|
|
"sensitive_data": [
|
|
"sensitive data", "pii exposure", "data leak"
|
|
],
|
|
"debug_endpoints": [
|
|
"debug endpoint", "admin panel", "hidden endpoint"
|
|
],
|
|
|
|
# Infrastructure
|
|
"security_headers": [
|
|
"security headers", "http headers", "csp", "content-security-policy",
|
|
"hsts", "x-content-type"
|
|
],
|
|
"ssl_issues": [
|
|
"ssl", "tls", "certificate", "https"
|
|
],
|
|
"http_methods": [
|
|
"http methods", "options method", "trace method", "put method"
|
|
],
|
|
|
|
# Logic flaws
|
|
"race_condition": [
|
|
"race condition", "toctou", "time of check"
|
|
],
|
|
"business_logic": [
|
|
"business logic", "logic flaw", "workflow"
|
|
]
|
|
}
|
|
|
|
# Category mappings
|
|
VULNERABILITY_CATEGORIES = {
|
|
"injection": [
|
|
"xss_reflected", "xss_stored", "xss_dom", "sqli_error", "sqli_union",
|
|
"sqli_blind", "sqli_time", "nosql_injection", "command_injection",
|
|
"ssti", "ldap_injection", "xpath_injection", "header_injection", "crlf_injection"
|
|
],
|
|
"file_access": ["lfi", "rfi", "path_traversal", "file_upload", "xxe"],
|
|
"request_forgery": ["ssrf", "ssrf_cloud", "csrf"],
|
|
"authentication": [
|
|
"auth_bypass", "session_fixation", "jwt_manipulation",
|
|
"weak_password", "brute_force"
|
|
],
|
|
"authorization": ["idor", "bola", "privilege_escalation"],
|
|
"api_security": [
|
|
"rate_limiting", "mass_assignment", "excessive_data",
|
|
"graphql_introspection", "graphql_injection"
|
|
],
|
|
"client_side": ["cors_misconfig", "clickjacking", "open_redirect"],
|
|
"information_disclosure": ["error_disclosure", "sensitive_data", "debug_endpoints"],
|
|
"infrastructure": ["security_headers", "ssl_issues", "http_methods"],
|
|
"logic_flaws": ["race_condition", "business_logic"]
|
|
}
|
|
|
|
# Depth keywords
|
|
DEPTH_KEYWORDS = {
|
|
"quick": ["quick", "fast", "basic", "simple", "light"],
|
|
"standard": ["standard", "normal", "default"],
|
|
"thorough": ["thorough", "comprehensive", "complete", "full", "deep"],
|
|
"exhaustive": ["exhaustive", "extensive", "all", "everything", "maximum"]
|
|
}
|
|
|
|
def __init__(self):
|
|
# Compile regex patterns for efficiency
|
|
self._compile_patterns()
|
|
|
|
def _compile_patterns(self):
|
|
"""Compile regex patterns for keyword matching"""
|
|
self.vuln_patterns = {}
|
|
for vuln_type, keywords in self.VULNERABILITY_KEYWORDS.items():
|
|
pattern = r'\b(' + '|'.join(re.escape(kw) for kw in keywords) + r')\b'
|
|
self.vuln_patterns[vuln_type] = re.compile(pattern, re.IGNORECASE)
|
|
|
|
async def parse(self, prompt: str) -> PromptParseResult:
|
|
"""
|
|
Parse a prompt to extract testing instructions.
|
|
|
|
Args:
|
|
prompt: User's penetration testing prompt
|
|
|
|
Returns:
|
|
PromptParseResult with extracted vulnerabilities and scope
|
|
"""
|
|
prompt_lower = prompt.lower()
|
|
|
|
# Extract vulnerability types
|
|
vulnerabilities = self._extract_vulnerabilities(prompt, prompt_lower)
|
|
|
|
# If no specific vulnerabilities mentioned but comprehensive keywords found,
|
|
# add all vulnerabilities
|
|
if not vulnerabilities:
|
|
if any(kw in prompt_lower for kw in ["all vulnerabilities", "comprehensive", "full pentest", "everything"]):
|
|
vulnerabilities = self._get_all_vulnerabilities(prompt)
|
|
|
|
# Extract testing scope
|
|
scope = self._extract_scope(prompt_lower)
|
|
|
|
# Extract special instructions
|
|
special_instructions = self._extract_special_instructions(prompt)
|
|
|
|
# Extract target filters
|
|
target_filters = self._extract_target_filters(prompt)
|
|
|
|
# Extract output preferences
|
|
output_preferences = self._extract_output_preferences(prompt_lower)
|
|
|
|
return PromptParseResult(
|
|
vulnerabilities_to_test=vulnerabilities,
|
|
testing_scope=scope,
|
|
special_instructions=special_instructions,
|
|
target_filters=target_filters,
|
|
output_preferences=output_preferences
|
|
)
|
|
|
|
def _extract_vulnerabilities(self, prompt: str, prompt_lower: str) -> List[VulnerabilityTypeExtracted]:
|
|
"""Extract vulnerability types from prompt"""
|
|
vulnerabilities = []
|
|
found_types = set()
|
|
|
|
for vuln_type, pattern in self.vuln_patterns.items():
|
|
matches = pattern.findall(prompt_lower)
|
|
if matches:
|
|
# Calculate confidence based on number of matches and context
|
|
confidence = min(0.9, 0.5 + len(matches) * 0.1)
|
|
|
|
# Get category
|
|
category = self._get_category(vuln_type)
|
|
|
|
# Extract context (surrounding text)
|
|
context = self._extract_context(prompt, matches[0])
|
|
|
|
if vuln_type not in found_types:
|
|
found_types.add(vuln_type)
|
|
vulnerabilities.append(VulnerabilityTypeExtracted(
|
|
type=vuln_type,
|
|
category=category,
|
|
confidence=confidence,
|
|
context=context
|
|
))
|
|
|
|
return vulnerabilities
|
|
|
|
def _get_all_vulnerabilities(self, prompt: str) -> List[VulnerabilityTypeExtracted]:
|
|
"""Get all vulnerability types for comprehensive testing"""
|
|
vulnerabilities = []
|
|
for vuln_type in self.VULNERABILITY_KEYWORDS.keys():
|
|
category = self._get_category(vuln_type)
|
|
vulnerabilities.append(VulnerabilityTypeExtracted(
|
|
type=vuln_type,
|
|
category=category,
|
|
confidence=0.7,
|
|
context="Comprehensive testing requested"
|
|
))
|
|
return vulnerabilities
|
|
|
|
def _get_category(self, vuln_type: str) -> str:
|
|
"""Get category for a vulnerability type"""
|
|
for category, types in self.VULNERABILITY_CATEGORIES.items():
|
|
if vuln_type in types:
|
|
return category
|
|
return "other"
|
|
|
|
def _extract_context(self, prompt: str, keyword: str, window: int = 50) -> str:
|
|
"""Extract context around a keyword"""
|
|
idx = prompt.lower().find(keyword.lower())
|
|
if idx == -1:
|
|
return ""
|
|
start = max(0, idx - window)
|
|
end = min(len(prompt), idx + len(keyword) + window)
|
|
return prompt[start:end].strip()
|
|
|
|
def _extract_scope(self, prompt_lower: str) -> TestingScope:
|
|
"""Extract testing scope from prompt"""
|
|
# Determine depth
|
|
depth = "standard"
|
|
for level, keywords in self.DEPTH_KEYWORDS.items():
|
|
if any(kw in prompt_lower for kw in keywords):
|
|
depth = level
|
|
break
|
|
|
|
# Check for recon
|
|
include_recon = not any(
|
|
kw in prompt_lower for kw in ["no recon", "skip recon", "without recon"]
|
|
)
|
|
|
|
# Extract time limits
|
|
time_limit = None
|
|
time_match = re.search(r'(\d+)\s*(minute|min|hour|hr)', prompt_lower)
|
|
if time_match:
|
|
value = int(time_match.group(1))
|
|
unit = time_match.group(2)
|
|
if 'hour' in unit or 'hr' in unit:
|
|
time_limit = value * 60
|
|
else:
|
|
time_limit = value
|
|
|
|
# Extract request limits
|
|
max_requests = None
|
|
req_match = re.search(r'(\d+)\s*(request|req)', prompt_lower)
|
|
if req_match:
|
|
max_requests = int(req_match.group(1))
|
|
|
|
return TestingScope(
|
|
include_recon=include_recon,
|
|
depth=depth,
|
|
max_requests_per_endpoint=max_requests,
|
|
time_limit_minutes=time_limit
|
|
)
|
|
|
|
def _extract_special_instructions(self, prompt: str) -> List[str]:
|
|
"""Extract special instructions from prompt"""
|
|
instructions = []
|
|
|
|
# Look for explicit instructions
|
|
instruction_patterns = [
|
|
r'focus on[:\s]+([^.]+)',
|
|
r'prioritize[:\s]+([^.]+)',
|
|
r'especially[:\s]+([^.]+)',
|
|
r'important[:\s]+([^.]+)',
|
|
r'make sure to[:\s]+([^.]+)',
|
|
r'don\'t forget to[:\s]+([^.]+)'
|
|
]
|
|
|
|
for pattern in instruction_patterns:
|
|
matches = re.findall(pattern, prompt, re.IGNORECASE)
|
|
instructions.extend(matches)
|
|
|
|
return instructions
|
|
|
|
def _extract_target_filters(self, prompt: str) -> Dict:
|
|
"""Extract target filtering preferences"""
|
|
filters = {
|
|
"include_patterns": [],
|
|
"exclude_patterns": [],
|
|
"focus_on_parameters": []
|
|
}
|
|
|
|
# Look for include patterns
|
|
include_match = re.findall(r'only\s+test\s+([^.]+)', prompt, re.IGNORECASE)
|
|
if include_match:
|
|
filters["include_patterns"].extend(include_match)
|
|
|
|
# Look for exclude patterns
|
|
exclude_match = re.findall(r'(?:skip|exclude|ignore)\s+([^.]+)', prompt, re.IGNORECASE)
|
|
if exclude_match:
|
|
filters["exclude_patterns"].extend(exclude_match)
|
|
|
|
# Look for parameter focus
|
|
param_match = re.findall(r'parameter[s]?\s+(?:like|named|called)\s+(\w+)', prompt, re.IGNORECASE)
|
|
if param_match:
|
|
filters["focus_on_parameters"].extend(param_match)
|
|
|
|
return filters
|
|
|
|
def _extract_output_preferences(self, prompt_lower: str) -> Dict:
|
|
"""Extract output and reporting preferences"""
|
|
preferences = {
|
|
"severity_threshold": "all",
|
|
"include_poc": True,
|
|
"include_remediation": True
|
|
}
|
|
|
|
# Severity threshold
|
|
if "critical only" in prompt_lower or "only critical" in prompt_lower:
|
|
preferences["severity_threshold"] = "critical"
|
|
elif "high and above" in prompt_lower or "high severity" in prompt_lower:
|
|
preferences["severity_threshold"] = "high"
|
|
elif "medium and above" in prompt_lower:
|
|
preferences["severity_threshold"] = "medium"
|
|
|
|
# PoC preference
|
|
if "no poc" in prompt_lower or "without poc" in prompt_lower:
|
|
preferences["include_poc"] = False
|
|
|
|
# Remediation preference
|
|
if "no remediation" in prompt_lower or "without remediation" in prompt_lower:
|
|
preferences["include_remediation"] = False
|
|
|
|
return preferences
|