Files
fuzzforge_ai/backend/toolbox/modules/analyzer/llm_analyzer.py
tduhamel42 4d30b08476 feat: Add LLM analysis workflow and ruff linter fixes
LLM Analysis Workflow:
- Add llm_analyzer module for AI-powered code security analysis
- Add llm_analysis workflow with SARIF output support
- Mount AI module in Python worker for A2A wrapper access
- Add a2a-sdk dependency to Python worker requirements
- Fix workflow parameter ordering in Temporal manager

Ruff Linter Fixes:
- Fix bare except clauses (E722) across AI and CLI modules
- Add noqa comments for intentional late imports (E402)
- Replace undefined get_ai_status_async with TODO placeholder
- Remove unused imports and variables
- Remove container diagnostics display from exception handler

MCP Configuration:
- Reactivate FUZZFORGE_MCP_URL with default value
- Set default MCP URL to http://localhost:8010/mcp in init
2025-10-14 16:43:14 +02:00

350 lines
12 KiB
Python

"""
LLM Analyzer Module - Uses AI to analyze code for security issues
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import logging
from pathlib import Path
from typing import Dict, Any, List
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult
logger = logging.getLogger(__name__)
class LLMAnalyzer(BaseModule):
"""
Uses an LLM to analyze code for potential security issues.
This module:
- Sends code to an LLM agent via A2A protocol
- Asks the LLM to identify security vulnerabilities
- Collects findings and returns them in structured format
"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="llm_analyzer",
version="1.0.0",
description="Uses AI to analyze code for security issues",
author="FuzzForge Team",
category="analyzer",
tags=["llm", "ai", "security", "analysis"],
input_schema={
"agent_url": {
"type": "string",
"description": "A2A agent endpoint URL",
"default": "http://fuzzforge-task-agent:8000/a2a/litellm_agent"
},
"llm_model": {
"type": "string",
"description": "LLM model to use",
"default": "gpt-4o-mini"
},
"llm_provider": {
"type": "string",
"description": "LLM provider (openai, anthropic, etc.)",
"default": "openai"
},
"file_patterns": {
"type": "array",
"items": {"type": "string"},
"description": "File patterns to analyze",
"default": ["*.py", "*.js", "*.ts", "*.java", "*.go"]
},
"max_files": {
"type": "integer",
"description": "Maximum number of files to analyze",
"default": 5
},
"max_file_size": {
"type": "integer",
"description": "Maximum file size in bytes",
"default": 50000 # 50KB
},
"timeout": {
"type": "integer",
"description": "Timeout per file in seconds",
"default": 60
}
},
output_schema={
"findings": {
"type": "array",
"description": "Security issues identified by LLM"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
# Lazy import to avoid Temporal sandbox restrictions
try:
from fuzzforge_ai.a2a_wrapper import send_agent_task # noqa: F401
except ImportError:
raise RuntimeError(
"A2A wrapper not available. Ensure fuzzforge_ai module is accessible."
)
agent_url = config.get("agent_url")
if not agent_url or not isinstance(agent_url, str):
raise ValueError("agent_url must be a valid URL string")
max_files = config.get("max_files", 5)
if not isinstance(max_files, int) or max_files <= 0:
raise ValueError("max_files must be a positive integer")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the LLM analysis module.
Args:
config: Module configuration
workspace: Path to the workspace containing code to analyze
Returns:
ModuleResult with findings from LLM analysis
"""
# Start execution timer
self.start_timer()
logger.info(f"Starting LLM analysis in workspace: {workspace}")
# Extract configuration
agent_url = config.get("agent_url", "http://fuzzforge-task-agent:8000/a2a/litellm_agent")
llm_model = config.get("llm_model", "gpt-4o-mini")
llm_provider = config.get("llm_provider", "openai")
file_patterns = config.get("file_patterns", ["*.py", "*.js", "*.ts", "*.java", "*.go"])
max_files = config.get("max_files", 5)
max_file_size = config.get("max_file_size", 50000)
timeout = config.get("timeout", 60)
# Find files to analyze
files_to_analyze = []
for pattern in file_patterns:
for file_path in workspace.rglob(pattern):
if file_path.is_file():
try:
# Check file size
if file_path.stat().st_size > max_file_size:
logger.debug(f"Skipping {file_path} (too large)")
continue
files_to_analyze.append(file_path)
if len(files_to_analyze) >= max_files:
break
except Exception as e:
logger.warning(f"Error checking file {file_path}: {e}")
continue
if len(files_to_analyze) >= max_files:
break
logger.info(f"Found {len(files_to_analyze)} files to analyze")
# Analyze each file
all_findings = []
for file_path in files_to_analyze:
logger.info(f"Analyzing: {file_path.relative_to(workspace)}")
try:
findings = await self._analyze_file(
file_path=file_path,
workspace=workspace,
agent_url=agent_url,
llm_model=llm_model,
llm_provider=llm_provider,
timeout=timeout
)
all_findings.extend(findings)
except Exception as e:
logger.error(f"Error analyzing {file_path}: {e}")
# Continue with next file
continue
logger.info(f"LLM analysis complete. Found {len(all_findings)} issues.")
# Create result using base module helper
return self.create_result(
findings=all_findings,
status="success",
summary={
"files_analyzed": len(files_to_analyze),
"total_findings": len(all_findings),
"agent_url": agent_url,
"model": f"{llm_provider}/{llm_model}"
}
)
async def _analyze_file(
self,
file_path: Path,
workspace: Path,
agent_url: str,
llm_model: str,
llm_provider: str,
timeout: int
) -> List[Dict[str, Any]]:
"""Analyze a single file with LLM"""
# Read file content
try:
with open(file_path, 'r', encoding='utf-8') as f:
code_content = f.read()
except Exception as e:
logger.error(f"Failed to read {file_path}: {e}")
return []
# Determine language from extension
extension = file_path.suffix.lower()
language_map = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".java": "java",
".go": "go",
".rs": "rust",
".c": "c",
".cpp": "cpp",
}
language = language_map.get(extension, "code")
# Build prompt for LLM
system_prompt = (
"You are a security code analyzer. Analyze the provided code and identify "
"potential security vulnerabilities, bugs, and code quality issues. "
"For each issue found, respond in this exact format:\n"
"ISSUE: [short title]\n"
"SEVERITY: [error/warning/note]\n"
"LINE: [line number or 'unknown']\n"
"DESCRIPTION: [detailed explanation]\n\n"
"If no issues are found, respond with 'NO_ISSUES_FOUND'."
)
user_message = (
f"Analyze this {language} code for security vulnerabilities:\n\n"
f"File: {file_path.relative_to(workspace)}\n\n"
f"```{language}\n{code_content}\n```"
)
# Call LLM via A2A wrapper (lazy import to avoid Temporal sandbox restrictions)
try:
from fuzzforge_ai.a2a_wrapper import send_agent_task
result = await send_agent_task(
url=agent_url,
model=llm_model,
provider=llm_provider,
prompt=system_prompt,
message=user_message,
context=f"llm_analysis_{file_path.stem}",
timeout=float(timeout)
)
llm_response = result.text
except Exception as e:
logger.error(f"A2A call failed for {file_path}: {e}")
return []
# Parse LLM response into findings
findings = self._parse_llm_response(
llm_response=llm_response,
file_path=file_path,
workspace=workspace
)
return findings
def _parse_llm_response(
self,
llm_response: str,
file_path: Path,
workspace: Path
) -> List:
"""Parse LLM response into structured findings"""
if "NO_ISSUES_FOUND" in llm_response:
return []
findings = []
relative_path = str(file_path.relative_to(workspace))
# Simple parser for the expected format
lines = llm_response.split('\n')
current_issue = {}
for line in lines:
line = line.strip()
if line.startswith("ISSUE:"):
# Save previous issue if exists
if current_issue:
findings.append(self._create_module_finding(current_issue, relative_path))
current_issue = {"title": line.replace("ISSUE:", "").strip()}
elif line.startswith("SEVERITY:"):
current_issue["severity"] = line.replace("SEVERITY:", "").strip().lower()
elif line.startswith("LINE:"):
line_num = line.replace("LINE:", "").strip()
try:
current_issue["line"] = int(line_num)
except ValueError:
current_issue["line"] = None
elif line.startswith("DESCRIPTION:"):
current_issue["description"] = line.replace("DESCRIPTION:", "").strip()
# Save last issue
if current_issue:
findings.append(self._create_module_finding(current_issue, relative_path))
return findings
def _create_module_finding(self, issue: Dict[str, Any], file_path: str):
"""Create a ModuleFinding from parsed issue"""
severity_map = {
"error": "critical",
"warning": "medium",
"note": "low",
"info": "low"
}
# Use base class helper to create proper ModuleFinding
return self.create_finding(
title=issue.get("title", "Security issue detected"),
description=issue.get("description", ""),
severity=severity_map.get(issue.get("severity", "warning"), "medium"),
category="security",
file_path=file_path,
line_start=issue.get("line"),
metadata={
"tool": "llm-analyzer",
"type": "llm-security-analysis"
}
)