Files
fuzzforge_ai/backend/toolbox/modules/analyzer/llm_analyzer.py
tduhamel42 fccd8f32ab refactor: Update all modules to use new create_finding signature
Updated 10 modules to use the new create_finding() signature with required rule_id and found_by parameters:

- llm_analyzer.py: Added FoundBy and LLMContext for AI-detected findings
- bandit_analyzer.py: Added tool attribution and moved CWE/confidence to proper fields
- security_analyzer.py: Updated all three finding types (secrets, SQL injection, dangerous functions)
- mypy_analyzer.py: Added tool attribution and moved column info to column_start
- mobsf_scanner.py: Updated all 6 finding types (permissions, manifest, code analysis, behavior) with proper line number handling
- opengrep_android.py: Added tool attribution, proper CWE/OWASP formatting, and confidence mapping
- dependency_scanner.py: Added pip-audit attribution for CVE findings
- file_scanner.py: Updated both sensitive file and enumeration findings
- cargo_fuzzer.py: Added fuzzer type attribution for crash findings
- atheris_fuzzer.py: Added fuzzer type attribution for Python crash findings

All modules now properly track:
- Finding source (module, tool name, version, type)
- Confidence levels (high/medium/low)
- CWE and OWASP mappings where applicable
- LLM context for AI-detected issues
2025-11-14 10:51:38 +01:00

395 lines
13 KiB
Python

"""
LLM Analyzer Module - Uses AI to analyze code for security issues
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import logging
from pathlib import Path
from typing import Dict, Any, List
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy, LLMContext
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy, LLMContext
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy, LLMContext
logger = logging.getLogger(__name__)
class LLMAnalyzer(BaseModule):
"""
Uses an LLM to analyze code for potential security issues.
This module:
- Sends code to an LLM agent via A2A protocol
- Asks the LLM to identify security vulnerabilities
- Collects findings and returns them in structured format
"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="llm_analyzer",
version="1.0.0",
description="Uses AI to analyze code for security issues",
author="FuzzForge Team",
category="analyzer",
tags=["llm", "ai", "security", "analysis"],
input_schema={
"agent_url": {
"type": "string",
"description": "A2A agent endpoint URL",
"default": "http://fuzzforge-task-agent:8000/a2a/litellm_agent"
},
"llm_model": {
"type": "string",
"description": "LLM model to use",
"default": "gpt-4o-mini"
},
"llm_provider": {
"type": "string",
"description": "LLM provider (openai, anthropic, etc.)",
"default": "openai"
},
"file_patterns": {
"type": "array",
"items": {"type": "string"},
"description": "File patterns to analyze",
"default": ["*.py", "*.js", "*.ts", "*.java", "*.go"]
},
"max_files": {
"type": "integer",
"description": "Maximum number of files to analyze",
"default": 5
},
"max_file_size": {
"type": "integer",
"description": "Maximum file size in bytes",
"default": 50000 # 50KB
},
"timeout": {
"type": "integer",
"description": "Timeout per file in seconds",
"default": 60
}
},
output_schema={
"findings": {
"type": "array",
"description": "Security issues identified by LLM"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
# Lazy import to avoid Temporal sandbox restrictions
try:
from fuzzforge_ai.a2a_wrapper import send_agent_task # noqa: F401
except ImportError:
raise RuntimeError(
"A2A wrapper not available. Ensure fuzzforge_ai module is accessible."
)
agent_url = config.get("agent_url")
if not agent_url or not isinstance(agent_url, str):
raise ValueError("agent_url must be a valid URL string")
max_files = config.get("max_files", 5)
if not isinstance(max_files, int) or max_files <= 0:
raise ValueError("max_files must be a positive integer")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the LLM analysis module.
Args:
config: Module configuration
workspace: Path to the workspace containing code to analyze
Returns:
ModuleResult with findings from LLM analysis
"""
# Start execution timer
self.start_timer()
logger.info(f"Starting LLM analysis in workspace: {workspace}")
# Extract configuration
agent_url = config.get("agent_url", "http://fuzzforge-task-agent:8000/a2a/litellm_agent")
llm_model = config.get("llm_model", "gpt-4o-mini")
llm_provider = config.get("llm_provider", "openai")
file_patterns = config.get("file_patterns", ["*.py", "*.js", "*.ts", "*.java", "*.go"])
max_files = config.get("max_files", 5)
max_file_size = config.get("max_file_size", 50000)
timeout = config.get("timeout", 60)
# Find files to analyze
files_to_analyze = []
for pattern in file_patterns:
for file_path in workspace.rglob(pattern):
if file_path.is_file():
try:
# Check file size
if file_path.stat().st_size > max_file_size:
logger.debug(f"Skipping {file_path} (too large)")
continue
files_to_analyze.append(file_path)
if len(files_to_analyze) >= max_files:
break
except Exception as e:
logger.warning(f"Error checking file {file_path}: {e}")
continue
if len(files_to_analyze) >= max_files:
break
logger.info(f"Found {len(files_to_analyze)} files to analyze")
# Analyze each file
all_findings = []
for file_path in files_to_analyze:
logger.info(f"Analyzing: {file_path.relative_to(workspace)}")
try:
findings = await self._analyze_file(
file_path=file_path,
workspace=workspace,
agent_url=agent_url,
llm_model=llm_model,
llm_provider=llm_provider,
timeout=timeout
)
all_findings.extend(findings)
except Exception as e:
logger.error(f"Error analyzing {file_path}: {e}")
# Continue with next file
continue
logger.info(f"LLM analysis complete. Found {len(all_findings)} issues.")
# Create result using base module helper
return self.create_result(
findings=all_findings,
status="success",
summary={
"files_analyzed": len(files_to_analyze),
"total_findings": len(all_findings),
"agent_url": agent_url,
"model": f"{llm_provider}/{llm_model}"
}
)
async def _analyze_file(
self,
file_path: Path,
workspace: Path,
agent_url: str,
llm_model: str,
llm_provider: str,
timeout: int
) -> List[Dict[str, Any]]:
"""Analyze a single file with LLM"""
# Read file content
try:
with open(file_path, 'r', encoding='utf-8') as f:
code_content = f.read()
except Exception as e:
logger.error(f"Failed to read {file_path}: {e}")
return []
# Determine language from extension
extension = file_path.suffix.lower()
language_map = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".java": "java",
".go": "go",
".rs": "rust",
".c": "c",
".cpp": "cpp",
}
language = language_map.get(extension, "code")
# Build prompt for LLM
system_prompt = (
"You are a security code analyzer. Analyze the provided code and identify "
"potential security vulnerabilities, bugs, and code quality issues. "
"For each issue found, respond in this exact format:\n"
"ISSUE: [short title]\n"
"SEVERITY: [error/warning/note]\n"
"LINE: [line number or 'unknown']\n"
"DESCRIPTION: [detailed explanation]\n\n"
"If no issues are found, respond with 'NO_ISSUES_FOUND'."
)
user_message = (
f"Analyze this {language} code for security vulnerabilities:\n\n"
f"File: {file_path.relative_to(workspace)}\n\n"
f"```{language}\n{code_content}\n```"
)
# Call LLM via A2A wrapper (lazy import to avoid Temporal sandbox restrictions)
try:
from fuzzforge_ai.a2a_wrapper import send_agent_task
result = await send_agent_task(
url=agent_url,
model=llm_model,
provider=llm_provider,
prompt=system_prompt,
message=user_message,
context=f"llm_analysis_{file_path.stem}",
timeout=float(timeout)
)
llm_response = result.text
except Exception as e:
logger.error(f"A2A call failed for {file_path}: {e}")
return []
# Parse LLM response into findings
full_prompt = f"{system_prompt}\n\n{user_message}"
findings = self._parse_llm_response(
llm_response=llm_response,
file_path=file_path,
workspace=workspace,
llm_model=llm_model,
llm_provider=llm_provider,
prompt=full_prompt
)
return findings
def _parse_llm_response(
self,
llm_response: str,
file_path: Path,
workspace: Path,
llm_model: str,
llm_provider: str,
prompt: str
) -> List:
"""Parse LLM response into structured findings"""
if "NO_ISSUES_FOUND" in llm_response:
return []
findings = []
relative_path = str(file_path.relative_to(workspace))
# Simple parser for the expected format
lines = llm_response.split('\n')
current_issue = {}
for line in lines:
line = line.strip()
if line.startswith("ISSUE:"):
# Save previous issue if exists
if current_issue:
findings.append(self._create_module_finding(
current_issue, relative_path, llm_model, llm_provider, prompt
))
current_issue = {"title": line.replace("ISSUE:", "").strip()}
elif line.startswith("SEVERITY:"):
current_issue["severity"] = line.replace("SEVERITY:", "").strip().lower()
elif line.startswith("LINE:"):
line_num = line.replace("LINE:", "").strip()
try:
current_issue["line"] = int(line_num)
except ValueError:
current_issue["line"] = None
elif line.startswith("DESCRIPTION:"):
current_issue["description"] = line.replace("DESCRIPTION:", "").strip()
# Save last issue
if current_issue:
findings.append(self._create_module_finding(
current_issue, relative_path, llm_model, llm_provider, prompt
))
return findings
def _create_module_finding(
self,
issue: Dict[str, Any],
file_path: str,
llm_model: str,
llm_provider: str,
prompt: str
):
"""Create a ModuleFinding from parsed issue"""
severity_map = {
"error": "critical",
"warning": "medium",
"note": "low",
"info": "low"
}
# Determine confidence based on severity (LLM is more confident on critical issues)
confidence_map = {
"error": "high",
"warning": "medium",
"note": "medium",
"info": "low"
}
# Create FoundBy attribution
found_by = FoundBy(
module="llm_analyzer",
tool_name=f"{llm_provider}/{llm_model}",
tool_version="1.0.0",
type="llm"
)
# Create LLM context
llm_context = LLMContext(
model=llm_model,
prompt=prompt,
temperature=None # Not exposed in current config
)
# Use base class helper to create proper ModuleFinding
return self.create_finding(
rule_id=f"llm_security_{issue.get('severity', 'warning')}",
title=issue.get("title", "Security issue detected"),
description=issue.get("description", ""),
severity=severity_map.get(issue.get("severity", "warning"), "medium"),
category="security",
found_by=found_by,
confidence=confidence_map.get(issue.get("severity", "warning"), "medium"),
llm_context=llm_context,
file_path=file_path,
line_start=issue.get("line"),
metadata={
"tool": "llm-analyzer",
"type": "llm-security-analysis"
}
)