mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-13 19:12:44 +00:00
LLM Analysis Workflow: - Add llm_analyzer module for AI-powered code security analysis - Add llm_analysis workflow with SARIF output support - Mount AI module in Python worker for A2A wrapper access - Add a2a-sdk dependency to Python worker requirements - Fix workflow parameter ordering in Temporal manager Ruff Linter Fixes: - Fix bare except clauses (E722) across AI and CLI modules - Add noqa comments for intentional late imports (E402) - Replace undefined get_ai_status_async with TODO placeholder - Remove unused imports and variables - Remove container diagnostics display from exception handler MCP Configuration: - Reactivate FUZZFORGE_MCP_URL with default value - Set default MCP URL to http://localhost:8010/mcp in init
163 lines
5.0 KiB
Python
163 lines
5.0 KiB
Python
"""
|
|
LLM Analysis Workflow Activities
|
|
"""
|
|
|
|
# Copyright (c) 2025 FuzzingLabs
|
|
#
|
|
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
|
|
# at the root of this repository for details.
|
|
#
|
|
# After the Change Date (four years from publication), this version of the
|
|
# Licensed Work will be made available under the Apache License, Version 2.0.
|
|
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Additional attribution and requirements are provided in the NOTICE file.
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, Any
|
|
|
|
from temporalio import activity
|
|
|
|
try:
|
|
from toolbox.modules.analyzer.llm_analyzer import LLMAnalyzer
|
|
except ImportError:
|
|
try:
|
|
from modules.analyzer.llm_analyzer import LLMAnalyzer
|
|
except ImportError:
|
|
from src.toolbox.modules.analyzer.llm_analyzer import LLMAnalyzer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@activity.defn(name="llm_generate_sarif")
|
|
async def llm_generate_sarif(findings: list, metadata: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Generate SARIF report from LLM findings.
|
|
|
|
Args:
|
|
findings: List of finding dictionaries
|
|
metadata: Metadata including tool_name, tool_version, run_id
|
|
|
|
Returns:
|
|
SARIF report dictionary
|
|
"""
|
|
activity.logger.info(f"Generating SARIF report from {len(findings)} findings")
|
|
|
|
# Basic SARIF 2.1.0 structure
|
|
sarif_report = {
|
|
"version": "2.1.0",
|
|
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
|
|
"runs": [
|
|
{
|
|
"tool": {
|
|
"driver": {
|
|
"name": metadata.get("tool_name", "llm-analyzer"),
|
|
"version": metadata.get("tool_version", "1.0.0"),
|
|
"informationUri": "https://github.com/FuzzingLabs/fuzzforge_ai"
|
|
}
|
|
},
|
|
"results": []
|
|
}
|
|
]
|
|
}
|
|
|
|
# Convert findings to SARIF results
|
|
for finding in findings:
|
|
sarif_result = {
|
|
"ruleId": finding.get("id", "unknown"),
|
|
"level": _severity_to_sarif_level(finding.get("severity", "warning")),
|
|
"message": {
|
|
"text": finding.get("title", "Security issue detected")
|
|
},
|
|
"locations": []
|
|
}
|
|
|
|
# Add description if present
|
|
if finding.get("description"):
|
|
sarif_result["message"]["markdown"] = finding["description"]
|
|
|
|
# Add location if file path is present
|
|
if finding.get("file_path"):
|
|
location = {
|
|
"physicalLocation": {
|
|
"artifactLocation": {
|
|
"uri": finding["file_path"]
|
|
}
|
|
}
|
|
}
|
|
|
|
# Add region if line number is present
|
|
if finding.get("line_start"):
|
|
location["physicalLocation"]["region"] = {
|
|
"startLine": finding["line_start"]
|
|
}
|
|
if finding.get("line_end"):
|
|
location["physicalLocation"]["region"]["endLine"] = finding["line_end"]
|
|
|
|
sarif_result["locations"].append(location)
|
|
|
|
sarif_report["runs"][0]["results"].append(sarif_result)
|
|
|
|
activity.logger.info(f"Generated SARIF report with {len(sarif_report['runs'][0]['results'])} results")
|
|
|
|
return sarif_report
|
|
|
|
|
|
def _severity_to_sarif_level(severity: str) -> str:
|
|
"""Convert severity to SARIF level"""
|
|
severity_map = {
|
|
"critical": "error",
|
|
"high": "error",
|
|
"medium": "warning",
|
|
"low": "note",
|
|
"info": "note"
|
|
}
|
|
return severity_map.get(severity.lower(), "warning")
|
|
|
|
|
|
@activity.defn(name="analyze_with_llm")
|
|
async def analyze_with_llm(target_path: str, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Analyze code using LLM.
|
|
|
|
Args:
|
|
target_path: Path to the workspace containing code
|
|
config: LLM analyzer configuration
|
|
|
|
Returns:
|
|
Dictionary containing findings and summary
|
|
"""
|
|
activity.logger.info(f"Starting LLM analysis: {target_path}")
|
|
activity.logger.info(f"Config: {config}")
|
|
|
|
workspace = Path(target_path)
|
|
|
|
if not workspace.exists():
|
|
raise FileNotFoundError(f"Workspace not found: {target_path}")
|
|
|
|
# Create and execute LLM analyzer
|
|
analyzer = LLMAnalyzer()
|
|
|
|
# Validate configuration
|
|
analyzer.validate_config(config)
|
|
|
|
# Execute analysis
|
|
result = await analyzer.execute(config, workspace)
|
|
|
|
if result.status == "failed":
|
|
raise RuntimeError(f"LLM analysis failed: {result.error or 'Unknown error'}")
|
|
|
|
activity.logger.info(
|
|
f"LLM analysis completed: {len(result.findings)} findings from "
|
|
f"{result.summary.get('files_analyzed', 0)} files"
|
|
)
|
|
|
|
# Convert ModuleFinding objects to dicts for serialization
|
|
findings_dicts = [finding.model_dump() for finding in result.findings]
|
|
|
|
return {
|
|
"findings": findings_dicts,
|
|
"summary": result.summary
|
|
}
|