mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-13 15:13:13 +00:00
LLM Analysis Workflow: - Add llm_analyzer module for AI-powered code security analysis - Add llm_analysis workflow with SARIF output support - Mount AI module in Python worker for A2A wrapper access - Add a2a-sdk dependency to Python worker requirements - Fix workflow parameter ordering in Temporal manager Ruff Linter Fixes: - Fix bare except clauses (E722) across AI and CLI modules - Add noqa comments for intentional late imports (E402) - Replace undefined get_ai_status_async with TODO placeholder - Remove unused imports and variables - Remove container diagnostics display from exception handler MCP Configuration: - Reactivate FUZZFORGE_MCP_URL with default value - Set default MCP URL to http://localhost:8010/mcp in init
350 lines
12 KiB
Python
350 lines
12 KiB
Python
"""
|
|
LLM Analyzer Module - Uses AI to analyze code for security issues
|
|
"""
|
|
|
|
# Copyright (c) 2025 FuzzingLabs
|
|
#
|
|
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
|
|
# at the root of this repository for details.
|
|
#
|
|
# After the Change Date (four years from publication), this version of the
|
|
# Licensed Work will be made available under the Apache License, Version 2.0.
|
|
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Additional attribution and requirements are provided in the NOTICE file.
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
|
|
try:
|
|
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult
|
|
except ImportError:
|
|
try:
|
|
from modules.base import BaseModule, ModuleMetadata, ModuleResult
|
|
except ImportError:
|
|
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LLMAnalyzer(BaseModule):
|
|
"""
|
|
Uses an LLM to analyze code for potential security issues.
|
|
|
|
This module:
|
|
- Sends code to an LLM agent via A2A protocol
|
|
- Asks the LLM to identify security vulnerabilities
|
|
- Collects findings and returns them in structured format
|
|
"""
|
|
|
|
def get_metadata(self) -> ModuleMetadata:
|
|
"""Get module metadata"""
|
|
return ModuleMetadata(
|
|
name="llm_analyzer",
|
|
version="1.0.0",
|
|
description="Uses AI to analyze code for security issues",
|
|
author="FuzzForge Team",
|
|
category="analyzer",
|
|
tags=["llm", "ai", "security", "analysis"],
|
|
input_schema={
|
|
"agent_url": {
|
|
"type": "string",
|
|
"description": "A2A agent endpoint URL",
|
|
"default": "http://fuzzforge-task-agent:8000/a2a/litellm_agent"
|
|
},
|
|
"llm_model": {
|
|
"type": "string",
|
|
"description": "LLM model to use",
|
|
"default": "gpt-4o-mini"
|
|
},
|
|
"llm_provider": {
|
|
"type": "string",
|
|
"description": "LLM provider (openai, anthropic, etc.)",
|
|
"default": "openai"
|
|
},
|
|
"file_patterns": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "File patterns to analyze",
|
|
"default": ["*.py", "*.js", "*.ts", "*.java", "*.go"]
|
|
},
|
|
"max_files": {
|
|
"type": "integer",
|
|
"description": "Maximum number of files to analyze",
|
|
"default": 5
|
|
},
|
|
"max_file_size": {
|
|
"type": "integer",
|
|
"description": "Maximum file size in bytes",
|
|
"default": 50000 # 50KB
|
|
},
|
|
"timeout": {
|
|
"type": "integer",
|
|
"description": "Timeout per file in seconds",
|
|
"default": 60
|
|
}
|
|
},
|
|
output_schema={
|
|
"findings": {
|
|
"type": "array",
|
|
"description": "Security issues identified by LLM"
|
|
}
|
|
},
|
|
requires_workspace=True
|
|
)
|
|
|
|
def validate_config(self, config: Dict[str, Any]) -> bool:
|
|
"""Validate module configuration"""
|
|
# Lazy import to avoid Temporal sandbox restrictions
|
|
try:
|
|
from fuzzforge_ai.a2a_wrapper import send_agent_task # noqa: F401
|
|
except ImportError:
|
|
raise RuntimeError(
|
|
"A2A wrapper not available. Ensure fuzzforge_ai module is accessible."
|
|
)
|
|
|
|
agent_url = config.get("agent_url")
|
|
if not agent_url or not isinstance(agent_url, str):
|
|
raise ValueError("agent_url must be a valid URL string")
|
|
|
|
max_files = config.get("max_files", 5)
|
|
if not isinstance(max_files, int) or max_files <= 0:
|
|
raise ValueError("max_files must be a positive integer")
|
|
|
|
return True
|
|
|
|
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
|
|
"""
|
|
Execute the LLM analysis module.
|
|
|
|
Args:
|
|
config: Module configuration
|
|
workspace: Path to the workspace containing code to analyze
|
|
|
|
Returns:
|
|
ModuleResult with findings from LLM analysis
|
|
"""
|
|
# Start execution timer
|
|
self.start_timer()
|
|
|
|
logger.info(f"Starting LLM analysis in workspace: {workspace}")
|
|
|
|
# Extract configuration
|
|
agent_url = config.get("agent_url", "http://fuzzforge-task-agent:8000/a2a/litellm_agent")
|
|
llm_model = config.get("llm_model", "gpt-4o-mini")
|
|
llm_provider = config.get("llm_provider", "openai")
|
|
file_patterns = config.get("file_patterns", ["*.py", "*.js", "*.ts", "*.java", "*.go"])
|
|
max_files = config.get("max_files", 5)
|
|
max_file_size = config.get("max_file_size", 50000)
|
|
timeout = config.get("timeout", 60)
|
|
|
|
# Find files to analyze
|
|
files_to_analyze = []
|
|
for pattern in file_patterns:
|
|
for file_path in workspace.rglob(pattern):
|
|
if file_path.is_file():
|
|
try:
|
|
# Check file size
|
|
if file_path.stat().st_size > max_file_size:
|
|
logger.debug(f"Skipping {file_path} (too large)")
|
|
continue
|
|
|
|
files_to_analyze.append(file_path)
|
|
|
|
if len(files_to_analyze) >= max_files:
|
|
break
|
|
except Exception as e:
|
|
logger.warning(f"Error checking file {file_path}: {e}")
|
|
continue
|
|
|
|
if len(files_to_analyze) >= max_files:
|
|
break
|
|
|
|
logger.info(f"Found {len(files_to_analyze)} files to analyze")
|
|
|
|
# Analyze each file
|
|
all_findings = []
|
|
for file_path in files_to_analyze:
|
|
logger.info(f"Analyzing: {file_path.relative_to(workspace)}")
|
|
|
|
try:
|
|
findings = await self._analyze_file(
|
|
file_path=file_path,
|
|
workspace=workspace,
|
|
agent_url=agent_url,
|
|
llm_model=llm_model,
|
|
llm_provider=llm_provider,
|
|
timeout=timeout
|
|
)
|
|
all_findings.extend(findings)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing {file_path}: {e}")
|
|
# Continue with next file
|
|
continue
|
|
|
|
logger.info(f"LLM analysis complete. Found {len(all_findings)} issues.")
|
|
|
|
# Create result using base module helper
|
|
return self.create_result(
|
|
findings=all_findings,
|
|
status="success",
|
|
summary={
|
|
"files_analyzed": len(files_to_analyze),
|
|
"total_findings": len(all_findings),
|
|
"agent_url": agent_url,
|
|
"model": f"{llm_provider}/{llm_model}"
|
|
}
|
|
)
|
|
|
|
async def _analyze_file(
|
|
self,
|
|
file_path: Path,
|
|
workspace: Path,
|
|
agent_url: str,
|
|
llm_model: str,
|
|
llm_provider: str,
|
|
timeout: int
|
|
) -> List[Dict[str, Any]]:
|
|
"""Analyze a single file with LLM"""
|
|
|
|
# Read file content
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
code_content = f.read()
|
|
except Exception as e:
|
|
logger.error(f"Failed to read {file_path}: {e}")
|
|
return []
|
|
|
|
# Determine language from extension
|
|
extension = file_path.suffix.lower()
|
|
language_map = {
|
|
".py": "python",
|
|
".js": "javascript",
|
|
".ts": "typescript",
|
|
".java": "java",
|
|
".go": "go",
|
|
".rs": "rust",
|
|
".c": "c",
|
|
".cpp": "cpp",
|
|
}
|
|
language = language_map.get(extension, "code")
|
|
|
|
# Build prompt for LLM
|
|
system_prompt = (
|
|
"You are a security code analyzer. Analyze the provided code and identify "
|
|
"potential security vulnerabilities, bugs, and code quality issues. "
|
|
"For each issue found, respond in this exact format:\n"
|
|
"ISSUE: [short title]\n"
|
|
"SEVERITY: [error/warning/note]\n"
|
|
"LINE: [line number or 'unknown']\n"
|
|
"DESCRIPTION: [detailed explanation]\n\n"
|
|
"If no issues are found, respond with 'NO_ISSUES_FOUND'."
|
|
)
|
|
|
|
user_message = (
|
|
f"Analyze this {language} code for security vulnerabilities:\n\n"
|
|
f"File: {file_path.relative_to(workspace)}\n\n"
|
|
f"```{language}\n{code_content}\n```"
|
|
)
|
|
|
|
# Call LLM via A2A wrapper (lazy import to avoid Temporal sandbox restrictions)
|
|
try:
|
|
from fuzzforge_ai.a2a_wrapper import send_agent_task
|
|
|
|
result = await send_agent_task(
|
|
url=agent_url,
|
|
model=llm_model,
|
|
provider=llm_provider,
|
|
prompt=system_prompt,
|
|
message=user_message,
|
|
context=f"llm_analysis_{file_path.stem}",
|
|
timeout=float(timeout)
|
|
)
|
|
|
|
llm_response = result.text
|
|
|
|
except Exception as e:
|
|
logger.error(f"A2A call failed for {file_path}: {e}")
|
|
return []
|
|
|
|
# Parse LLM response into findings
|
|
findings = self._parse_llm_response(
|
|
llm_response=llm_response,
|
|
file_path=file_path,
|
|
workspace=workspace
|
|
)
|
|
|
|
return findings
|
|
|
|
def _parse_llm_response(
|
|
self,
|
|
llm_response: str,
|
|
file_path: Path,
|
|
workspace: Path
|
|
) -> List:
|
|
"""Parse LLM response into structured findings"""
|
|
|
|
if "NO_ISSUES_FOUND" in llm_response:
|
|
return []
|
|
|
|
findings = []
|
|
relative_path = str(file_path.relative_to(workspace))
|
|
|
|
# Simple parser for the expected format
|
|
lines = llm_response.split('\n')
|
|
current_issue = {}
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
if line.startswith("ISSUE:"):
|
|
# Save previous issue if exists
|
|
if current_issue:
|
|
findings.append(self._create_module_finding(current_issue, relative_path))
|
|
current_issue = {"title": line.replace("ISSUE:", "").strip()}
|
|
|
|
elif line.startswith("SEVERITY:"):
|
|
current_issue["severity"] = line.replace("SEVERITY:", "").strip().lower()
|
|
|
|
elif line.startswith("LINE:"):
|
|
line_num = line.replace("LINE:", "").strip()
|
|
try:
|
|
current_issue["line"] = int(line_num)
|
|
except ValueError:
|
|
current_issue["line"] = None
|
|
|
|
elif line.startswith("DESCRIPTION:"):
|
|
current_issue["description"] = line.replace("DESCRIPTION:", "").strip()
|
|
|
|
# Save last issue
|
|
if current_issue:
|
|
findings.append(self._create_module_finding(current_issue, relative_path))
|
|
|
|
return findings
|
|
|
|
def _create_module_finding(self, issue: Dict[str, Any], file_path: str):
|
|
"""Create a ModuleFinding from parsed issue"""
|
|
|
|
severity_map = {
|
|
"error": "critical",
|
|
"warning": "medium",
|
|
"note": "low",
|
|
"info": "low"
|
|
}
|
|
|
|
# Use base class helper to create proper ModuleFinding
|
|
return self.create_finding(
|
|
title=issue.get("title", "Security issue detected"),
|
|
description=issue.get("description", ""),
|
|
severity=severity_map.get(issue.get("severity", "warning"), "medium"),
|
|
category="security",
|
|
file_path=file_path,
|
|
line_start=issue.get("line"),
|
|
metadata={
|
|
"tool": "llm-analyzer",
|
|
"type": "llm-security-analysis"
|
|
}
|
|
)
|