From ff00146f2029710d8cb7ecf55727098a327797e9 Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Tue, 14 Oct 2025 16:43:14 +0200 Subject: [PATCH] feat: Add LLM analysis workflow and ruff linter fixes LLM Analysis Workflow: - Add llm_analyzer module for AI-powered code security analysis - Add llm_analysis workflow with SARIF output support - Mount AI module in Python worker for A2A wrapper access - Add a2a-sdk dependency to Python worker requirements - Fix workflow parameter ordering in Temporal manager Ruff Linter Fixes: - Fix bare except clauses (E722) across AI and CLI modules - Add noqa comments for intentional late imports (E402) - Replace undefined get_ai_status_async with TODO placeholder - Remove unused imports and variables - Remove container diagnostics display from exception handler MCP Configuration: - Reactivate FUZZFORGE_MCP_URL with default value - Set default MCP URL to http://localhost:8010/mcp in init --- ai/src/fuzzforge_ai/__main__.py | 1 + ai/src/fuzzforge_ai/a2a_wrapper.py | 2 +- ai/src/fuzzforge_ai/agent_executor.py | 9 +- ai/src/fuzzforge_ai/cli.py | 3 +- ai/src/fuzzforge_ai/cognee_service.py | 2 +- ai/src/fuzzforge_ai/remote_agent.py | 2 +- backend/src/temporal/manager.py | 17 +- .../toolbox/modules/analyzer/llm_analyzer.py | 349 ++++++++++++++++++ .../workflows/llm_analysis/__init__.py | 19 + .../workflows/llm_analysis/activities.py | 162 ++++++++ .../workflows/llm_analysis/metadata.yaml | 64 ++++ .../workflows/llm_analysis/workflow.py | 236 ++++++++++++ cli/src/fuzzforge_cli/commands/ai.py | 45 +-- cli/src/fuzzforge_cli/commands/init.py | 5 +- cli/src/fuzzforge_cli/config.py | 12 +- cli/src/fuzzforge_cli/database.py | 6 +- cli/src/fuzzforge_cli/exceptions.py | 6 - cli/src/fuzzforge_cli/main.py | 1 - docker-compose.yml | 2 + sdk/test_exception_handling.py | 1 + test_a2a_wrapper.py | 5 +- workers/python/Dockerfile | 4 +- workers/python/requirements.txt | 3 + 23 files changed, 882 insertions(+), 74 deletions(-) create mode 100644 backend/toolbox/modules/analyzer/llm_analyzer.py create mode 100644 backend/toolbox/workflows/llm_analysis/__init__.py create mode 100644 backend/toolbox/workflows/llm_analysis/activities.py create mode 100644 backend/toolbox/workflows/llm_analysis/metadata.yaml create mode 100644 backend/toolbox/workflows/llm_analysis/workflow.py diff --git a/ai/src/fuzzforge_ai/__main__.py b/ai/src/fuzzforge_ai/__main__.py index ac166f7..297369f 100644 --- a/ai/src/fuzzforge_ai/__main__.py +++ b/ai/src/fuzzforge_ai/__main__.py @@ -1,3 +1,4 @@ +# ruff: noqa: E402 # Imports delayed for environment/logging setup """ FuzzForge A2A Server Run this to expose FuzzForge as an A2A-compatible agent diff --git a/ai/src/fuzzforge_ai/a2a_wrapper.py b/ai/src/fuzzforge_ai/a2a_wrapper.py index b3f57b8..0535404 100644 --- a/ai/src/fuzzforge_ai/a2a_wrapper.py +++ b/ai/src/fuzzforge_ai/a2a_wrapper.py @@ -15,7 +15,7 @@ Programmatic interface to send tasks to A2A agents with custom model/prompt/cont from __future__ import annotations -from typing import Optional, Dict, Any +from typing import Optional, Any from uuid import uuid4 import httpx diff --git a/ai/src/fuzzforge_ai/agent_executor.py b/ai/src/fuzzforge_ai/agent_executor.py index 0a858f0..fd1f1d9 100644 --- a/ai/src/fuzzforge_ai/agent_executor.py +++ b/ai/src/fuzzforge_ai/agent_executor.py @@ -1,3 +1,4 @@ +# ruff: noqa: E402 # Imports delayed for environment/logging setup """FuzzForge Agent Executor - orchestrates workflows and delegation.""" # Copyright (c) 2025 FuzzingLabs # @@ -173,7 +174,7 @@ class FuzzForgeExecutor: else: # Run now if no loop is running loop.run_until_complete(self._register_agent_async(url, name)) - except: + except Exception: # Ignore auto-registration failures pass except Exception as e: @@ -451,11 +452,11 @@ class FuzzForgeExecutor: try: user = await get_user(user_email) logger.info(f"Using existing user: {user_email}") - except: + except Exception: try: user = await create_user(user_email, user_tenant) logger.info(f"Created new user: {user_email}") - except: + except Exception: user = None if user: @@ -1704,7 +1705,7 @@ Be concise and intelligent in your responses.""" if self.agentops_trace: try: agentops.end_trace() - except: + except Exception: pass # Cancel background monitors diff --git a/ai/src/fuzzforge_ai/cli.py b/ai/src/fuzzforge_ai/cli.py index 5ff40da..4f5549f 100755 --- a/ai/src/fuzzforge_ai/cli.py +++ b/ai/src/fuzzforge_ai/cli.py @@ -1,3 +1,4 @@ +# ruff: noqa: E402 # Imports delayed for environment/logging setup #!/usr/bin/env python3 # Copyright (c) 2025 FuzzingLabs # @@ -428,7 +429,7 @@ class FuzzForgeCLI: text = data['parts'][0].get('text', '')[:150] role = data.get('role', 'unknown') console.print(f"{i}. [{role}]: {text}...") - except: + except Exception: console.print(f"{i}. {content[:150]}...") else: console.print("[yellow]No matches found in SQLite either[/yellow]") diff --git a/ai/src/fuzzforge_ai/cognee_service.py b/ai/src/fuzzforge_ai/cognee_service.py index 7526764..968e956 100644 --- a/ai/src/fuzzforge_ai/cognee_service.py +++ b/ai/src/fuzzforge_ai/cognee_service.py @@ -156,7 +156,7 @@ class CogneeService: self._user = await get_user(fallback_email) logger.info(f"Using existing user: {fallback_email}") return - except: + except Exception: # User doesn't exist, try to create fallback pass diff --git a/ai/src/fuzzforge_ai/remote_agent.py b/ai/src/fuzzforge_ai/remote_agent.py index 52da844..bac6872 100644 --- a/ai/src/fuzzforge_ai/remote_agent.py +++ b/ai/src/fuzzforge_ai/remote_agent.py @@ -37,7 +37,7 @@ class RemoteAgentConnection: response.raise_for_status() self.agent_card = response.json() return self.agent_card - except: + except Exception: # Try old path for compatibility try: response = await self.client.get(f"{self.url}/.well-known/agent.json") diff --git a/backend/src/temporal/manager.py b/backend/src/temporal/manager.py index 7522e60..9a44e8b 100644 --- a/backend/src/temporal/manager.py +++ b/backend/src/temporal/manager.py @@ -181,15 +181,18 @@ class TemporalManager: # Prepare workflow input arguments workflow_params = workflow_params or {} - # Build args list: [target_id, ...workflow_params values] + # Build args list: [target_id, ...workflow_params in schema order] # The workflow parameters are passed as individual positional args workflow_args = [target_id] - # Add parameters in order based on workflow signature - # For security_assessment: scanner_config, analyzer_config, reporter_config - # For atheris_fuzzing: target_file, max_iterations, timeout_seconds - if workflow_params: - workflow_args.extend(workflow_params.values()) + # Add parameters in order based on metadata schema + # This ensures parameters match the workflow signature order + if workflow_params and 'parameters' in workflow_info.metadata: + param_schema = workflow_info.metadata['parameters'].get('properties', {}) + # Iterate parameters in schema order and add values + for param_name in param_schema.keys(): + param_value = workflow_params.get(param_name) + workflow_args.append(param_value) # Determine task queue from workflow vertical vertical = workflow_info.metadata.get("vertical", "default") @@ -199,6 +202,8 @@ class TemporalManager: f"Starting workflow: {workflow_name} " f"(id={workflow_id}, queue={task_queue}, target={target_id})" ) + logger.info(f"DEBUG: workflow_args = {workflow_args}") + logger.info(f"DEBUG: workflow_params received = {workflow_params}") try: # Start workflow execution with positional arguments diff --git a/backend/toolbox/modules/analyzer/llm_analyzer.py b/backend/toolbox/modules/analyzer/llm_analyzer.py new file mode 100644 index 0000000..b3b1374 --- /dev/null +++ b/backend/toolbox/modules/analyzer/llm_analyzer.py @@ -0,0 +1,349 @@ +""" +LLM Analyzer Module - Uses AI to analyze code for security issues +""" + +# Copyright (c) 2025 FuzzingLabs +# +# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file +# at the root of this repository for details. +# +# After the Change Date (four years from publication), this version of the +# Licensed Work will be made available under the Apache License, Version 2.0. +# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0 +# +# Additional attribution and requirements are provided in the NOTICE file. + +import logging +from pathlib import Path +from typing import Dict, Any, List + +try: + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult +except ImportError: + try: + from modules.base import BaseModule, ModuleMetadata, ModuleResult + except ImportError: + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult + +logger = logging.getLogger(__name__) + + +class LLMAnalyzer(BaseModule): + """ + Uses an LLM to analyze code for potential security issues. + + This module: + - Sends code to an LLM agent via A2A protocol + - Asks the LLM to identify security vulnerabilities + - Collects findings and returns them in structured format + """ + + def get_metadata(self) -> ModuleMetadata: + """Get module metadata""" + return ModuleMetadata( + name="llm_analyzer", + version="1.0.0", + description="Uses AI to analyze code for security issues", + author="FuzzForge Team", + category="analyzer", + tags=["llm", "ai", "security", "analysis"], + input_schema={ + "agent_url": { + "type": "string", + "description": "A2A agent endpoint URL", + "default": "http://fuzzforge-task-agent:8000/a2a/litellm_agent" + }, + "llm_model": { + "type": "string", + "description": "LLM model to use", + "default": "gpt-4o-mini" + }, + "llm_provider": { + "type": "string", + "description": "LLM provider (openai, anthropic, etc.)", + "default": "openai" + }, + "file_patterns": { + "type": "array", + "items": {"type": "string"}, + "description": "File patterns to analyze", + "default": ["*.py", "*.js", "*.ts", "*.java", "*.go"] + }, + "max_files": { + "type": "integer", + "description": "Maximum number of files to analyze", + "default": 5 + }, + "max_file_size": { + "type": "integer", + "description": "Maximum file size in bytes", + "default": 50000 # 50KB + }, + "timeout": { + "type": "integer", + "description": "Timeout per file in seconds", + "default": 60 + } + }, + output_schema={ + "findings": { + "type": "array", + "description": "Security issues identified by LLM" + } + }, + requires_workspace=True + ) + + def validate_config(self, config: Dict[str, Any]) -> bool: + """Validate module configuration""" + # Lazy import to avoid Temporal sandbox restrictions + try: + from fuzzforge_ai.a2a_wrapper import send_agent_task # noqa: F401 + except ImportError: + raise RuntimeError( + "A2A wrapper not available. Ensure fuzzforge_ai module is accessible." + ) + + agent_url = config.get("agent_url") + if not agent_url or not isinstance(agent_url, str): + raise ValueError("agent_url must be a valid URL string") + + max_files = config.get("max_files", 5) + if not isinstance(max_files, int) or max_files <= 0: + raise ValueError("max_files must be a positive integer") + + return True + + async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult: + """ + Execute the LLM analysis module. + + Args: + config: Module configuration + workspace: Path to the workspace containing code to analyze + + Returns: + ModuleResult with findings from LLM analysis + """ + # Start execution timer + self.start_timer() + + logger.info(f"Starting LLM analysis in workspace: {workspace}") + + # Extract configuration + agent_url = config.get("agent_url", "http://fuzzforge-task-agent:8000/a2a/litellm_agent") + llm_model = config.get("llm_model", "gpt-4o-mini") + llm_provider = config.get("llm_provider", "openai") + file_patterns = config.get("file_patterns", ["*.py", "*.js", "*.ts", "*.java", "*.go"]) + max_files = config.get("max_files", 5) + max_file_size = config.get("max_file_size", 50000) + timeout = config.get("timeout", 60) + + # Find files to analyze + files_to_analyze = [] + for pattern in file_patterns: + for file_path in workspace.rglob(pattern): + if file_path.is_file(): + try: + # Check file size + if file_path.stat().st_size > max_file_size: + logger.debug(f"Skipping {file_path} (too large)") + continue + + files_to_analyze.append(file_path) + + if len(files_to_analyze) >= max_files: + break + except Exception as e: + logger.warning(f"Error checking file {file_path}: {e}") + continue + + if len(files_to_analyze) >= max_files: + break + + logger.info(f"Found {len(files_to_analyze)} files to analyze") + + # Analyze each file + all_findings = [] + for file_path in files_to_analyze: + logger.info(f"Analyzing: {file_path.relative_to(workspace)}") + + try: + findings = await self._analyze_file( + file_path=file_path, + workspace=workspace, + agent_url=agent_url, + llm_model=llm_model, + llm_provider=llm_provider, + timeout=timeout + ) + all_findings.extend(findings) + + except Exception as e: + logger.error(f"Error analyzing {file_path}: {e}") + # Continue with next file + continue + + logger.info(f"LLM analysis complete. Found {len(all_findings)} issues.") + + # Create result using base module helper + return self.create_result( + findings=all_findings, + status="success", + summary={ + "files_analyzed": len(files_to_analyze), + "total_findings": len(all_findings), + "agent_url": agent_url, + "model": f"{llm_provider}/{llm_model}" + } + ) + + async def _analyze_file( + self, + file_path: Path, + workspace: Path, + agent_url: str, + llm_model: str, + llm_provider: str, + timeout: int + ) -> List[Dict[str, Any]]: + """Analyze a single file with LLM""" + + # Read file content + try: + with open(file_path, 'r', encoding='utf-8') as f: + code_content = f.read() + except Exception as e: + logger.error(f"Failed to read {file_path}: {e}") + return [] + + # Determine language from extension + extension = file_path.suffix.lower() + language_map = { + ".py": "python", + ".js": "javascript", + ".ts": "typescript", + ".java": "java", + ".go": "go", + ".rs": "rust", + ".c": "c", + ".cpp": "cpp", + } + language = language_map.get(extension, "code") + + # Build prompt for LLM + system_prompt = ( + "You are a security code analyzer. Analyze the provided code and identify " + "potential security vulnerabilities, bugs, and code quality issues. " + "For each issue found, respond in this exact format:\n" + "ISSUE: [short title]\n" + "SEVERITY: [error/warning/note]\n" + "LINE: [line number or 'unknown']\n" + "DESCRIPTION: [detailed explanation]\n\n" + "If no issues are found, respond with 'NO_ISSUES_FOUND'." + ) + + user_message = ( + f"Analyze this {language} code for security vulnerabilities:\n\n" + f"File: {file_path.relative_to(workspace)}\n\n" + f"```{language}\n{code_content}\n```" + ) + + # Call LLM via A2A wrapper (lazy import to avoid Temporal sandbox restrictions) + try: + from fuzzforge_ai.a2a_wrapper import send_agent_task + + result = await send_agent_task( + url=agent_url, + model=llm_model, + provider=llm_provider, + prompt=system_prompt, + message=user_message, + context=f"llm_analysis_{file_path.stem}", + timeout=float(timeout) + ) + + llm_response = result.text + + except Exception as e: + logger.error(f"A2A call failed for {file_path}: {e}") + return [] + + # Parse LLM response into findings + findings = self._parse_llm_response( + llm_response=llm_response, + file_path=file_path, + workspace=workspace + ) + + return findings + + def _parse_llm_response( + self, + llm_response: str, + file_path: Path, + workspace: Path + ) -> List: + """Parse LLM response into structured findings""" + + if "NO_ISSUES_FOUND" in llm_response: + return [] + + findings = [] + relative_path = str(file_path.relative_to(workspace)) + + # Simple parser for the expected format + lines = llm_response.split('\n') + current_issue = {} + + for line in lines: + line = line.strip() + + if line.startswith("ISSUE:"): + # Save previous issue if exists + if current_issue: + findings.append(self._create_module_finding(current_issue, relative_path)) + current_issue = {"title": line.replace("ISSUE:", "").strip()} + + elif line.startswith("SEVERITY:"): + current_issue["severity"] = line.replace("SEVERITY:", "").strip().lower() + + elif line.startswith("LINE:"): + line_num = line.replace("LINE:", "").strip() + try: + current_issue["line"] = int(line_num) + except ValueError: + current_issue["line"] = None + + elif line.startswith("DESCRIPTION:"): + current_issue["description"] = line.replace("DESCRIPTION:", "").strip() + + # Save last issue + if current_issue: + findings.append(self._create_module_finding(current_issue, relative_path)) + + return findings + + def _create_module_finding(self, issue: Dict[str, Any], file_path: str): + """Create a ModuleFinding from parsed issue""" + + severity_map = { + "error": "critical", + "warning": "medium", + "note": "low", + "info": "low" + } + + # Use base class helper to create proper ModuleFinding + return self.create_finding( + title=issue.get("title", "Security issue detected"), + description=issue.get("description", ""), + severity=severity_map.get(issue.get("severity", "warning"), "medium"), + category="security", + file_path=file_path, + line_start=issue.get("line"), + metadata={ + "tool": "llm-analyzer", + "type": "llm-security-analysis" + } + ) diff --git a/backend/toolbox/workflows/llm_analysis/__init__.py b/backend/toolbox/workflows/llm_analysis/__init__.py new file mode 100644 index 0000000..028946c --- /dev/null +++ b/backend/toolbox/workflows/llm_analysis/__init__.py @@ -0,0 +1,19 @@ +""" +LLM Analysis Workflow +""" + +# Copyright (c) 2025 FuzzingLabs +# +# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file +# at the root of this repository for details. +# +# After the Change Date (four years from publication), this version of the +# Licensed Work will be made available under the Apache License, Version 2.0. +# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0 +# +# Additional attribution and requirements are provided in the NOTICE file. + +from .workflow import LlmAnalysisWorkflow +from .activities import analyze_with_llm + +__all__ = ["LlmAnalysisWorkflow", "analyze_with_llm"] diff --git a/backend/toolbox/workflows/llm_analysis/activities.py b/backend/toolbox/workflows/llm_analysis/activities.py new file mode 100644 index 0000000..cb47599 --- /dev/null +++ b/backend/toolbox/workflows/llm_analysis/activities.py @@ -0,0 +1,162 @@ +""" +LLM Analysis Workflow Activities +""" + +# Copyright (c) 2025 FuzzingLabs +# +# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file +# at the root of this repository for details. +# +# After the Change Date (four years from publication), this version of the +# Licensed Work will be made available under the Apache License, Version 2.0. +# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0 +# +# Additional attribution and requirements are provided in the NOTICE file. + +import logging +from pathlib import Path +from typing import Dict, Any + +from temporalio import activity + +try: + from toolbox.modules.analyzer.llm_analyzer import LLMAnalyzer +except ImportError: + try: + from modules.analyzer.llm_analyzer import LLMAnalyzer + except ImportError: + from src.toolbox.modules.analyzer.llm_analyzer import LLMAnalyzer + +logger = logging.getLogger(__name__) + + +@activity.defn(name="llm_generate_sarif") +async def llm_generate_sarif(findings: list, metadata: Dict[str, Any]) -> Dict[str, Any]: + """ + Generate SARIF report from LLM findings. + + Args: + findings: List of finding dictionaries + metadata: Metadata including tool_name, tool_version, run_id + + Returns: + SARIF report dictionary + """ + activity.logger.info(f"Generating SARIF report from {len(findings)} findings") + + # Basic SARIF 2.1.0 structure + sarif_report = { + "version": "2.1.0", + "$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json", + "runs": [ + { + "tool": { + "driver": { + "name": metadata.get("tool_name", "llm-analyzer"), + "version": metadata.get("tool_version", "1.0.0"), + "informationUri": "https://github.com/FuzzingLabs/fuzzforge_ai" + } + }, + "results": [] + } + ] + } + + # Convert findings to SARIF results + for finding in findings: + sarif_result = { + "ruleId": finding.get("id", "unknown"), + "level": _severity_to_sarif_level(finding.get("severity", "warning")), + "message": { + "text": finding.get("title", "Security issue detected") + }, + "locations": [] + } + + # Add description if present + if finding.get("description"): + sarif_result["message"]["markdown"] = finding["description"] + + # Add location if file path is present + if finding.get("file_path"): + location = { + "physicalLocation": { + "artifactLocation": { + "uri": finding["file_path"] + } + } + } + + # Add region if line number is present + if finding.get("line_start"): + location["physicalLocation"]["region"] = { + "startLine": finding["line_start"] + } + if finding.get("line_end"): + location["physicalLocation"]["region"]["endLine"] = finding["line_end"] + + sarif_result["locations"].append(location) + + sarif_report["runs"][0]["results"].append(sarif_result) + + activity.logger.info(f"Generated SARIF report with {len(sarif_report['runs'][0]['results'])} results") + + return sarif_report + + +def _severity_to_sarif_level(severity: str) -> str: + """Convert severity to SARIF level""" + severity_map = { + "critical": "error", + "high": "error", + "medium": "warning", + "low": "note", + "info": "note" + } + return severity_map.get(severity.lower(), "warning") + + +@activity.defn(name="analyze_with_llm") +async def analyze_with_llm(target_path: str, config: Dict[str, Any]) -> Dict[str, Any]: + """ + Analyze code using LLM. + + Args: + target_path: Path to the workspace containing code + config: LLM analyzer configuration + + Returns: + Dictionary containing findings and summary + """ + activity.logger.info(f"Starting LLM analysis: {target_path}") + activity.logger.info(f"Config: {config}") + + workspace = Path(target_path) + + if not workspace.exists(): + raise FileNotFoundError(f"Workspace not found: {target_path}") + + # Create and execute LLM analyzer + analyzer = LLMAnalyzer() + + # Validate configuration + analyzer.validate_config(config) + + # Execute analysis + result = await analyzer.execute(config, workspace) + + if result.status == "failed": + raise RuntimeError(f"LLM analysis failed: {result.error or 'Unknown error'}") + + activity.logger.info( + f"LLM analysis completed: {len(result.findings)} findings from " + f"{result.summary.get('files_analyzed', 0)} files" + ) + + # Convert ModuleFinding objects to dicts for serialization + findings_dicts = [finding.model_dump() for finding in result.findings] + + return { + "findings": findings_dicts, + "summary": result.summary + } diff --git a/backend/toolbox/workflows/llm_analysis/metadata.yaml b/backend/toolbox/workflows/llm_analysis/metadata.yaml new file mode 100644 index 0000000..83818bd --- /dev/null +++ b/backend/toolbox/workflows/llm_analysis/metadata.yaml @@ -0,0 +1,64 @@ +name: llm_analysis +version: "1.0.0" +vertical: python +description: "Uses AI/LLM to analyze code for security vulnerabilities and code quality issues" +author: "FuzzForge Team" +tags: + - "llm" + - "ai" + - "security" + - "static-analysis" + - "code-quality" + +# Workspace isolation mode +workspace_isolation: "shared" + +default_parameters: + agent_url: "http://fuzzforge-task-agent:8000/a2a/litellm_agent" + llm_model: "gpt-4o-mini" + llm_provider: "openai" + max_files: 5 + +parameters: + type: object + properties: + agent_url: + type: string + description: "A2A agent endpoint URL" + llm_model: + type: string + description: "LLM model to use (e.g., gpt-4o-mini, claude-3-5-sonnet)" + llm_provider: + type: string + description: "LLM provider (openai, anthropic, etc.)" + file_patterns: + type: array + items: + type: string + description: "File patterns to analyze (e.g., ['*.py', '*.js'])" + max_files: + type: integer + description: "Maximum number of files to analyze" + max_file_size: + type: integer + description: "Maximum file size in bytes" + timeout: + type: integer + description: "Timeout per file in seconds" + +output_schema: + type: object + properties: + sarif: + type: object + description: "SARIF-formatted security findings from LLM" + summary: + type: object + description: "Analysis summary" + properties: + files_analyzed: + type: integer + total_findings: + type: integer + model_used: + type: string diff --git a/backend/toolbox/workflows/llm_analysis/workflow.py b/backend/toolbox/workflows/llm_analysis/workflow.py new file mode 100644 index 0000000..136e844 --- /dev/null +++ b/backend/toolbox/workflows/llm_analysis/workflow.py @@ -0,0 +1,236 @@ +""" +LLM Analysis Workflow - Temporal Version + +Uses AI/LLM to analyze code for security issues. +""" + +# Copyright (c) 2025 FuzzingLabs +# +# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file +# at the root of this repository for details. +# +# After the Change Date (four years from publication), this version of the +# Licensed Work will be made available under the Apache License, Version 2.0. +# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0 +# +# Additional attribution and requirements are provided in the NOTICE file. + +from datetime import timedelta +from typing import Dict, Any, Optional + +from temporalio import workflow +from temporalio.common import RetryPolicy + +# Import for type hints (will be executed by worker) +with workflow.unsafe.imports_passed_through(): + import logging + +logger = logging.getLogger(__name__) + + +@workflow.defn +class LlmAnalysisWorkflow: + """ + Analyze code using AI/LLM for security vulnerabilities. + + User workflow: + 1. User runs: ff workflow run llm_analysis . + 2. CLI uploads project to MinIO + 3. Worker downloads project + 4. Worker calls LLM analyzer module + 5. LLM analyzes code files and reports findings + 6. Results returned in SARIF format + """ + + @workflow.run + async def run( + self, + target_id: str, # MinIO UUID of uploaded user code + agent_url: Optional[str] = None, + llm_model: Optional[str] = None, + llm_provider: Optional[str] = None, + file_patterns: Optional[list] = None, + max_files: Optional[int] = None, + max_file_size: Optional[int] = None, + timeout: Optional[int] = None + ) -> Dict[str, Any]: + """ + Main workflow execution. + + Args: + target_id: UUID of the uploaded target in MinIO + agent_url: A2A agent endpoint URL + llm_model: LLM model to use + llm_provider: LLM provider + file_patterns: File patterns to analyze + max_files: Maximum number of files to analyze + max_file_size: Maximum file size in bytes + timeout: Timeout per file in seconds + + Returns: + Dictionary containing findings and summary + """ + workflow_id = workflow.info().workflow_id + + workflow.logger.info( + f"Starting LLMAnalysisWorkflow " + f"(workflow_id={workflow_id}, target_id={target_id}, model={llm_model})" + ) + + results = { + "workflow_id": workflow_id, + "target_id": target_id, + "status": "running", + "steps": [], + "findings": [] + } + + try: + # Get run ID for workspace isolation + run_id = workflow.info().run_id + + # Step 1: Download user's project from MinIO + workflow.logger.info("Step 1: Downloading user code from MinIO") + target_path = await workflow.execute_activity( + "get_target", + args=[target_id, run_id, "shared"], + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=1), + maximum_interval=timedelta(seconds=30), + maximum_attempts=3 + ) + ) + results["steps"].append({ + "step": "download", + "status": "success", + "target_path": target_path + }) + workflow.logger.info(f"✓ Target downloaded to: {target_path}") + + # Step 2: Run LLM analysis + workflow.logger.info("Step 2: Analyzing code with LLM") + + # Build analyzer config + analyzer_config = {} + if agent_url: + analyzer_config["agent_url"] = agent_url + if llm_model: + analyzer_config["llm_model"] = llm_model + if llm_provider: + analyzer_config["llm_provider"] = llm_provider + if file_patterns: + analyzer_config["file_patterns"] = file_patterns + if max_files is not None: + analyzer_config["max_files"] = max_files + if max_file_size is not None: + analyzer_config["max_file_size"] = max_file_size + if timeout is not None: + analyzer_config["timeout"] = timeout + + analysis_results = await workflow.execute_activity( + "analyze_with_llm", + args=[target_path, analyzer_config], + start_to_close_timeout=timedelta(minutes=30), # LLM calls can be slow + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=5), + maximum_interval=timedelta(minutes=1), + maximum_attempts=2 + ) + ) + + findings = analysis_results.get("findings", []) + summary = analysis_results.get("summary", {}) + + results["steps"].append({ + "step": "llm_analysis", + "status": "success", + "files_analyzed": summary.get("files_analyzed", 0), + "findings_count": len(findings) + }) + + workflow.logger.info( + f"✓ LLM analysis completed: " + f"{summary.get('files_analyzed', 0)} files, " + f"{len(findings)} findings" + ) + + # Step 3: Generate SARIF report + workflow.logger.info("Step 3: Generating SARIF report") + + sarif_report = await workflow.execute_activity( + "llm_generate_sarif", + args=[findings, { + "tool_name": "llm-analyzer", + "tool_version": "1.0.0", + "run_id": run_id + }], + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=1), + maximum_interval=timedelta(seconds=30), + maximum_attempts=3 + ) + ) + + results["steps"].append({ + "step": "sarif_generation", + "status": "success", + "results_count": len(sarif_report.get("runs", [{}])[0].get("results", [])) + }) + + workflow.logger.info( + f"✓ SARIF report generated: " + f"{len(sarif_report.get('runs', [{}])[0].get('results', []))} results" + ) + + # Step 4: Upload results to MinIO + workflow.logger.info("Step 4: Uploading results to MinIO") + + # Upload SARIF report + if sarif_report: + results_url = await workflow.execute_activity( + "upload_results", + args=[run_id, sarif_report], + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=1), + maximum_interval=timedelta(seconds=30), + maximum_attempts=3 + ) + ) + results["results_url"] = results_url + workflow.logger.info(f"✓ Results uploaded to: {results_url}") + + # Step 5: Cleanup cache + workflow.logger.info("Step 5: Cleaning up cache") + await workflow.execute_activity( + "cleanup_cache", + args=[target_id], + start_to_close_timeout=timedelta(minutes=2), + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=1), + maximum_interval=timedelta(seconds=10), + maximum_attempts=2 + ) + ) + workflow.logger.info("✓ Cache cleaned up") + + # Mark workflow as successful + results["status"] = "success" + results["sarif"] = sarif_report + results["summary"] = summary + results["findings"] = findings + + workflow.logger.info( + f"✅ LLMAnalysisWorkflow completed successfully: " + f"{len(findings)} findings" + ) + + except Exception as e: + workflow.logger.error(f"❌ Workflow failed: {e}") + results["status"] = "failed" + results["error"] = str(e) + raise + + return results diff --git a/cli/src/fuzzforge_cli/commands/ai.py b/cli/src/fuzzforge_cli/commands/ai.py index e67505e..6a39493 100644 --- a/cli/src/fuzzforge_cli/commands/ai.py +++ b/cli/src/fuzzforge_cli/commands/ai.py @@ -18,7 +18,6 @@ import os import typer from rich.console import Console -from rich.table import Table console = Console() @@ -49,41 +48,15 @@ def ai_agent() -> None: @app.command("status") def ai_status() -> None: """Show AI system health and configuration.""" - try: - status = asyncio.run(get_ai_status_async()) - except Exception as exc: # pragma: no cover - console.print(f"[red]Failed to get AI status:[/red] {exc}") - raise typer.Exit(1) from exc - - console.print("[bold cyan]🤖 FuzzForge AI System Status[/bold cyan]\n") - - config_table = Table(title="Configuration", show_header=True, header_style="bold magenta") - config_table.add_column("Setting", style="bold") - config_table.add_column("Value", style="cyan") - config_table.add_column("Status", style="green") - - for key, info in status["config"].items(): - status_icon = "✅" if info["configured"] else "❌" - display_value = info["value"] if info["value"] else "-" - config_table.add_row(key, display_value, f"{status_icon}") - - console.print(config_table) - console.print() - - components_table = Table(title="AI Components", show_header=True, header_style="bold magenta") - components_table.add_column("Component", style="bold") - components_table.add_column("Status", style="green") - components_table.add_column("Details", style="dim") - - for component, info in status["components"].items(): - status_icon = "🟢" if info["available"] else "🔴" - components_table.add_row(component, status_icon, info["details"]) - - console.print(components_table) - - if status["agents"]: - console.print() - console.print(f"[bold green]✓[/bold green] {len(status['agents'])} agents registered") + # TODO: Implement AI status checking + # This command is a placeholder for future health monitoring functionality + console.print("🚧 [yellow]AI status command is not yet implemented.[/yellow]") + console.print("\nPlanned features:") + console.print(" • LLM provider connectivity") + console.print(" • API key validation") + console.print(" • Registered agents status") + console.print(" • Memory/session persistence health") + console.print("\nFor now, use [cyan]ff ai agent[/cyan] to launch the AI agent.") @app.command("server") diff --git a/cli/src/fuzzforge_cli/commands/init.py b/cli/src/fuzzforge_cli/commands/init.py index b9e3242..17167c2 100644 --- a/cli/src/fuzzforge_cli/commands/init.py +++ b/cli/src/fuzzforge_cli/commands/init.py @@ -205,9 +205,6 @@ def _ensure_env_file(fuzzforge_dir: Path, force: bool) -> None: console=console, ) - enable_cognee = False - cognee_url = "" - session_db_path = fuzzforge_dir / "fuzzforge_sessions.db" session_db_rel = session_db_path.relative_to(fuzzforge_dir.parent) @@ -219,7 +216,7 @@ def _ensure_env_file(fuzzforge_dir: Path, force: bool) -> None: f"LLM_MODEL={llm_model}", f"LITELLM_MODEL={llm_model}", f"OPENAI_API_KEY={api_key}", - f"FUZZFORGE_MCP_URL={os.getenv('FUZZFORGE_MCP_URL', 'http://localhost:8010/mcp')}", + "FUZZFORGE_MCP_URL=http://localhost:8010/mcp", "", "# Cognee configuration mirrors the primary LLM by default", f"LLM_COGNEE_PROVIDER={llm_provider}", diff --git a/cli/src/fuzzforge_cli/config.py b/cli/src/fuzzforge_cli/config.py index c47b1dc..f21b87d 100644 --- a/cli/src/fuzzforge_cli/config.py +++ b/cli/src/fuzzforge_cli/config.py @@ -403,12 +403,12 @@ class ProjectConfigManager: if max_tokens: os.environ["LLM_MAX_TOKENS"] = str(max_tokens) - # Disabled - FuzzForge MCP backend connection - # if not os.getenv("FUZZFORGE_MCP_URL"): - # os.environ["FUZZFORGE_MCP_URL"] = os.getenv( - # "FUZZFORGE_DEFAULT_MCP_URL", - # "http://localhost:8010/mcp", - # ) + # FuzzForge MCP backend connection - fallback if not in .env + if not os.getenv("FUZZFORGE_MCP_URL"): + os.environ["FUZZFORGE_MCP_URL"] = os.getenv( + "FUZZFORGE_DEFAULT_MCP_URL", + "http://localhost:8010/mcp", + ) def refresh(self) -> None: """Reload configuration from disk.""" diff --git a/cli/src/fuzzforge_cli/database.py b/cli/src/fuzzforge_cli/database.py index cd42c40..3c8e86c 100644 --- a/cli/src/fuzzforge_cli/database.py +++ b/cli/src/fuzzforge_cli/database.py @@ -152,7 +152,7 @@ class FuzzForgeDatabase: if conn: try: conn.rollback() - except: + except Exception: pass # Connection might be broken if "database is locked" in str(e).lower(): raise sqlite3.OperationalError( @@ -167,14 +167,14 @@ class FuzzForgeDatabase: if conn: try: conn.rollback() - except: + except Exception: pass # Connection might be broken raise finally: if conn: try: conn.close() - except: + except Exception: pass # Ensure cleanup even if close fails # Run management methods diff --git a/cli/src/fuzzforge_cli/exceptions.py b/cli/src/fuzzforge_cli/exceptions.py index 55a6f27..d1137f3 100644 --- a/cli/src/fuzzforge_cli/exceptions.py +++ b/cli/src/fuzzforge_cli/exceptions.py @@ -259,12 +259,6 @@ def handle_error(error: Exception, context: str = "") -> None: if hasattr(error, 'context') and error.context: ctx = error.context - # Container diagnostics - if ctx.container_diagnostics: - console.print("\n[bold]Container Diagnostics:[/bold]") - display_container_diagnostics(ctx.container_diagnostics) - display_container_logs(ctx.container_diagnostics) - # Error patterns if ctx.error_patterns: display_error_patterns(ctx.error_patterns) diff --git a/cli/src/fuzzforge_cli/main.py b/cli/src/fuzzforge_cli/main.py index 15a8530..5726275 100644 --- a/cli/src/fuzzforge_cli/main.py +++ b/cli/src/fuzzforge_cli/main.py @@ -20,7 +20,6 @@ from typing import Optional, List import sys from .commands import ( - init, workflows, workflow_exec, findings, diff --git a/docker-compose.yml b/docker-compose.yml index d342995..18fe145 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -259,6 +259,8 @@ services: volumes: # Mount workflow code (read-only) for dynamic discovery - ./backend/toolbox:/app/toolbox:ro + # Mount AI module for A2A wrapper access + - ./ai/src:/app/ai_src:ro # Worker cache for downloaded targets - worker_python_cache:/cache networks: diff --git a/sdk/test_exception_handling.py b/sdk/test_exception_handling.py index 7330db0..fe60f72 100644 --- a/sdk/test_exception_handling.py +++ b/sdk/test_exception_handling.py @@ -1,3 +1,4 @@ +# ruff: noqa: E402 # Imports delayed for environment/logging setup #!/usr/bin/env python3 """ Quick smoke test for SDK exception handling after exceptions.py modifications. diff --git a/test_a2a_wrapper.py b/test_a2a_wrapper.py index 8e97065..5748eb0 100755 --- a/test_a2a_wrapper.py +++ b/test_a2a_wrapper.py @@ -1,3 +1,4 @@ +# ruff: noqa: E402 # Imports delayed for environment/logging setup #!/usr/bin/env python3 """ Test script for A2A wrapper module @@ -103,7 +104,7 @@ async def test_multi_turn(): context="geography_quiz", timeout=30 ) - print(f"Q1: What is the capital of France?") + print("Q1: What is the capital of France?") print(f"A1: {result1.text}") print() @@ -114,7 +115,7 @@ async def test_multi_turn(): context="geography_quiz", # Same context timeout=30 ) - print(f"Q2: What is the population of that city?") + print("Q2: What is the population of that city?") print(f"A2: {result2.text}") print() diff --git a/workers/python/Dockerfile b/workers/python/Dockerfile index 09b4689..54a8cb8 100644 --- a/workers/python/Dockerfile +++ b/workers/python/Dockerfile @@ -35,8 +35,8 @@ RUN mkdir -p /cache && chmod 755 /cache # Copy worker entrypoint COPY worker.py /app/worker.py -# Add toolbox to Python path (mounted at runtime) -ENV PYTHONPATH="/app:/app/toolbox:${PYTHONPATH}" +# Add toolbox and AI module to Python path (mounted at runtime) +ENV PYTHONPATH="/app:/app/toolbox:/app/ai_src:${PYTHONPATH}" ENV PYTHONUNBUFFERED=1 # Healthcheck diff --git a/workers/python/requirements.txt b/workers/python/requirements.txt index ecfd2c8..2e30f4a 100644 --- a/workers/python/requirements.txt +++ b/workers/python/requirements.txt @@ -11,5 +11,8 @@ pyyaml>=6.0.0 # HTTP Client (for real-time stats reporting) httpx>=0.27.0 +# A2A Agent Communication +a2a-sdk[all]>=0.1.0 + # Fuzzing atheris>=2.3.0