feat: Add LLM analysis workflow and ruff linter fixes

LLM Analysis Workflow:
- Add llm_analyzer module for AI-powered code security analysis
- Add llm_analysis workflow with SARIF output support
- Mount AI module in Python worker for A2A wrapper access
- Add a2a-sdk dependency to Python worker requirements
- Fix workflow parameter ordering in Temporal manager

Ruff Linter Fixes:
- Fix bare except clauses (E722) across AI and CLI modules
- Add noqa comments for intentional late imports (E402)
- Replace undefined get_ai_status_async with TODO placeholder
- Remove unused imports and variables
- Remove container diagnostics display from exception handler

MCP Configuration:
- Reactivate FUZZFORGE_MCP_URL with default value
- Set default MCP URL to http://localhost:8010/mcp in init
This commit is contained in:
tduhamel42
2025-10-14 16:43:14 +02:00
parent 71fbe879df
commit ff00146f20
23 changed files with 882 additions and 74 deletions
+1
View File
@@ -1,3 +1,4 @@
# ruff: noqa: E402 # Imports delayed for environment/logging setup
"""
FuzzForge A2A Server
Run this to expose FuzzForge as an A2A-compatible agent
+1 -1
View File
@@ -15,7 +15,7 @@ Programmatic interface to send tasks to A2A agents with custom model/prompt/cont
from __future__ import annotations
from typing import Optional, Dict, Any
from typing import Optional, Any
from uuid import uuid4
import httpx
+5 -4
View File
@@ -1,3 +1,4 @@
# ruff: noqa: E402 # Imports delayed for environment/logging setup
"""FuzzForge Agent Executor - orchestrates workflows and delegation."""
# Copyright (c) 2025 FuzzingLabs
#
@@ -173,7 +174,7 @@ class FuzzForgeExecutor:
else:
# Run now if no loop is running
loop.run_until_complete(self._register_agent_async(url, name))
except:
except Exception:
# Ignore auto-registration failures
pass
except Exception as e:
@@ -451,11 +452,11 @@ class FuzzForgeExecutor:
try:
user = await get_user(user_email)
logger.info(f"Using existing user: {user_email}")
except:
except Exception:
try:
user = await create_user(user_email, user_tenant)
logger.info(f"Created new user: {user_email}")
except:
except Exception:
user = None
if user:
@@ -1704,7 +1705,7 @@ Be concise and intelligent in your responses."""
if self.agentops_trace:
try:
agentops.end_trace()
except:
except Exception:
pass
# Cancel background monitors
+2 -1
View File
@@ -1,3 +1,4 @@
# ruff: noqa: E402 # Imports delayed for environment/logging setup
#!/usr/bin/env python3
# Copyright (c) 2025 FuzzingLabs
#
@@ -428,7 +429,7 @@ class FuzzForgeCLI:
text = data['parts'][0].get('text', '')[:150]
role = data.get('role', 'unknown')
console.print(f"{i}. [{role}]: {text}...")
except:
except Exception:
console.print(f"{i}. {content[:150]}...")
else:
console.print("[yellow]No matches found in SQLite either[/yellow]")
+1 -1
View File
@@ -156,7 +156,7 @@ class CogneeService:
self._user = await get_user(fallback_email)
logger.info(f"Using existing user: {fallback_email}")
return
except:
except Exception:
# User doesn't exist, try to create fallback
pass
+1 -1
View File
@@ -37,7 +37,7 @@ class RemoteAgentConnection:
response.raise_for_status()
self.agent_card = response.json()
return self.agent_card
except:
except Exception:
# Try old path for compatibility
try:
response = await self.client.get(f"{self.url}/.well-known/agent.json")
+11 -6
View File
@@ -181,15 +181,18 @@ class TemporalManager:
# Prepare workflow input arguments
workflow_params = workflow_params or {}
# Build args list: [target_id, ...workflow_params values]
# Build args list: [target_id, ...workflow_params in schema order]
# The workflow parameters are passed as individual positional args
workflow_args = [target_id]
# Add parameters in order based on workflow signature
# For security_assessment: scanner_config, analyzer_config, reporter_config
# For atheris_fuzzing: target_file, max_iterations, timeout_seconds
if workflow_params:
workflow_args.extend(workflow_params.values())
# Add parameters in order based on metadata schema
# This ensures parameters match the workflow signature order
if workflow_params and 'parameters' in workflow_info.metadata:
param_schema = workflow_info.metadata['parameters'].get('properties', {})
# Iterate parameters in schema order and add values
for param_name in param_schema.keys():
param_value = workflow_params.get(param_name)
workflow_args.append(param_value)
# Determine task queue from workflow vertical
vertical = workflow_info.metadata.get("vertical", "default")
@@ -199,6 +202,8 @@ class TemporalManager:
f"Starting workflow: {workflow_name} "
f"(id={workflow_id}, queue={task_queue}, target={target_id})"
)
logger.info(f"DEBUG: workflow_args = {workflow_args}")
logger.info(f"DEBUG: workflow_params received = {workflow_params}")
try:
# Start workflow execution with positional arguments
@@ -0,0 +1,349 @@
"""
LLM Analyzer Module - Uses AI to analyze code for security issues
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import logging
from pathlib import Path
from typing import Dict, Any, List
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult
logger = logging.getLogger(__name__)
class LLMAnalyzer(BaseModule):
"""
Uses an LLM to analyze code for potential security issues.
This module:
- Sends code to an LLM agent via A2A protocol
- Asks the LLM to identify security vulnerabilities
- Collects findings and returns them in structured format
"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="llm_analyzer",
version="1.0.0",
description="Uses AI to analyze code for security issues",
author="FuzzForge Team",
category="analyzer",
tags=["llm", "ai", "security", "analysis"],
input_schema={
"agent_url": {
"type": "string",
"description": "A2A agent endpoint URL",
"default": "http://fuzzforge-task-agent:8000/a2a/litellm_agent"
},
"llm_model": {
"type": "string",
"description": "LLM model to use",
"default": "gpt-4o-mini"
},
"llm_provider": {
"type": "string",
"description": "LLM provider (openai, anthropic, etc.)",
"default": "openai"
},
"file_patterns": {
"type": "array",
"items": {"type": "string"},
"description": "File patterns to analyze",
"default": ["*.py", "*.js", "*.ts", "*.java", "*.go"]
},
"max_files": {
"type": "integer",
"description": "Maximum number of files to analyze",
"default": 5
},
"max_file_size": {
"type": "integer",
"description": "Maximum file size in bytes",
"default": 50000 # 50KB
},
"timeout": {
"type": "integer",
"description": "Timeout per file in seconds",
"default": 60
}
},
output_schema={
"findings": {
"type": "array",
"description": "Security issues identified by LLM"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
# Lazy import to avoid Temporal sandbox restrictions
try:
from fuzzforge_ai.a2a_wrapper import send_agent_task # noqa: F401
except ImportError:
raise RuntimeError(
"A2A wrapper not available. Ensure fuzzforge_ai module is accessible."
)
agent_url = config.get("agent_url")
if not agent_url or not isinstance(agent_url, str):
raise ValueError("agent_url must be a valid URL string")
max_files = config.get("max_files", 5)
if not isinstance(max_files, int) or max_files <= 0:
raise ValueError("max_files must be a positive integer")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the LLM analysis module.
Args:
config: Module configuration
workspace: Path to the workspace containing code to analyze
Returns:
ModuleResult with findings from LLM analysis
"""
# Start execution timer
self.start_timer()
logger.info(f"Starting LLM analysis in workspace: {workspace}")
# Extract configuration
agent_url = config.get("agent_url", "http://fuzzforge-task-agent:8000/a2a/litellm_agent")
llm_model = config.get("llm_model", "gpt-4o-mini")
llm_provider = config.get("llm_provider", "openai")
file_patterns = config.get("file_patterns", ["*.py", "*.js", "*.ts", "*.java", "*.go"])
max_files = config.get("max_files", 5)
max_file_size = config.get("max_file_size", 50000)
timeout = config.get("timeout", 60)
# Find files to analyze
files_to_analyze = []
for pattern in file_patterns:
for file_path in workspace.rglob(pattern):
if file_path.is_file():
try:
# Check file size
if file_path.stat().st_size > max_file_size:
logger.debug(f"Skipping {file_path} (too large)")
continue
files_to_analyze.append(file_path)
if len(files_to_analyze) >= max_files:
break
except Exception as e:
logger.warning(f"Error checking file {file_path}: {e}")
continue
if len(files_to_analyze) >= max_files:
break
logger.info(f"Found {len(files_to_analyze)} files to analyze")
# Analyze each file
all_findings = []
for file_path in files_to_analyze:
logger.info(f"Analyzing: {file_path.relative_to(workspace)}")
try:
findings = await self._analyze_file(
file_path=file_path,
workspace=workspace,
agent_url=agent_url,
llm_model=llm_model,
llm_provider=llm_provider,
timeout=timeout
)
all_findings.extend(findings)
except Exception as e:
logger.error(f"Error analyzing {file_path}: {e}")
# Continue with next file
continue
logger.info(f"LLM analysis complete. Found {len(all_findings)} issues.")
# Create result using base module helper
return self.create_result(
findings=all_findings,
status="success",
summary={
"files_analyzed": len(files_to_analyze),
"total_findings": len(all_findings),
"agent_url": agent_url,
"model": f"{llm_provider}/{llm_model}"
}
)
async def _analyze_file(
self,
file_path: Path,
workspace: Path,
agent_url: str,
llm_model: str,
llm_provider: str,
timeout: int
) -> List[Dict[str, Any]]:
"""Analyze a single file with LLM"""
# Read file content
try:
with open(file_path, 'r', encoding='utf-8') as f:
code_content = f.read()
except Exception as e:
logger.error(f"Failed to read {file_path}: {e}")
return []
# Determine language from extension
extension = file_path.suffix.lower()
language_map = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".java": "java",
".go": "go",
".rs": "rust",
".c": "c",
".cpp": "cpp",
}
language = language_map.get(extension, "code")
# Build prompt for LLM
system_prompt = (
"You are a security code analyzer. Analyze the provided code and identify "
"potential security vulnerabilities, bugs, and code quality issues. "
"For each issue found, respond in this exact format:\n"
"ISSUE: [short title]\n"
"SEVERITY: [error/warning/note]\n"
"LINE: [line number or 'unknown']\n"
"DESCRIPTION: [detailed explanation]\n\n"
"If no issues are found, respond with 'NO_ISSUES_FOUND'."
)
user_message = (
f"Analyze this {language} code for security vulnerabilities:\n\n"
f"File: {file_path.relative_to(workspace)}\n\n"
f"```{language}\n{code_content}\n```"
)
# Call LLM via A2A wrapper (lazy import to avoid Temporal sandbox restrictions)
try:
from fuzzforge_ai.a2a_wrapper import send_agent_task
result = await send_agent_task(
url=agent_url,
model=llm_model,
provider=llm_provider,
prompt=system_prompt,
message=user_message,
context=f"llm_analysis_{file_path.stem}",
timeout=float(timeout)
)
llm_response = result.text
except Exception as e:
logger.error(f"A2A call failed for {file_path}: {e}")
return []
# Parse LLM response into findings
findings = self._parse_llm_response(
llm_response=llm_response,
file_path=file_path,
workspace=workspace
)
return findings
def _parse_llm_response(
self,
llm_response: str,
file_path: Path,
workspace: Path
) -> List:
"""Parse LLM response into structured findings"""
if "NO_ISSUES_FOUND" in llm_response:
return []
findings = []
relative_path = str(file_path.relative_to(workspace))
# Simple parser for the expected format
lines = llm_response.split('\n')
current_issue = {}
for line in lines:
line = line.strip()
if line.startswith("ISSUE:"):
# Save previous issue if exists
if current_issue:
findings.append(self._create_module_finding(current_issue, relative_path))
current_issue = {"title": line.replace("ISSUE:", "").strip()}
elif line.startswith("SEVERITY:"):
current_issue["severity"] = line.replace("SEVERITY:", "").strip().lower()
elif line.startswith("LINE:"):
line_num = line.replace("LINE:", "").strip()
try:
current_issue["line"] = int(line_num)
except ValueError:
current_issue["line"] = None
elif line.startswith("DESCRIPTION:"):
current_issue["description"] = line.replace("DESCRIPTION:", "").strip()
# Save last issue
if current_issue:
findings.append(self._create_module_finding(current_issue, relative_path))
return findings
def _create_module_finding(self, issue: Dict[str, Any], file_path: str):
"""Create a ModuleFinding from parsed issue"""
severity_map = {
"error": "critical",
"warning": "medium",
"note": "low",
"info": "low"
}
# Use base class helper to create proper ModuleFinding
return self.create_finding(
title=issue.get("title", "Security issue detected"),
description=issue.get("description", ""),
severity=severity_map.get(issue.get("severity", "warning"), "medium"),
category="security",
file_path=file_path,
line_start=issue.get("line"),
metadata={
"tool": "llm-analyzer",
"type": "llm-security-analysis"
}
)
@@ -0,0 +1,19 @@
"""
LLM Analysis Workflow
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
from .workflow import LlmAnalysisWorkflow
from .activities import analyze_with_llm
__all__ = ["LlmAnalysisWorkflow", "analyze_with_llm"]
@@ -0,0 +1,162 @@
"""
LLM Analysis Workflow Activities
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import logging
from pathlib import Path
from typing import Dict, Any
from temporalio import activity
try:
from toolbox.modules.analyzer.llm_analyzer import LLMAnalyzer
except ImportError:
try:
from modules.analyzer.llm_analyzer import LLMAnalyzer
except ImportError:
from src.toolbox.modules.analyzer.llm_analyzer import LLMAnalyzer
logger = logging.getLogger(__name__)
@activity.defn(name="llm_generate_sarif")
async def llm_generate_sarif(findings: list, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate SARIF report from LLM findings.
Args:
findings: List of finding dictionaries
metadata: Metadata including tool_name, tool_version, run_id
Returns:
SARIF report dictionary
"""
activity.logger.info(f"Generating SARIF report from {len(findings)} findings")
# Basic SARIF 2.1.0 structure
sarif_report = {
"version": "2.1.0",
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
"runs": [
{
"tool": {
"driver": {
"name": metadata.get("tool_name", "llm-analyzer"),
"version": metadata.get("tool_version", "1.0.0"),
"informationUri": "https://github.com/FuzzingLabs/fuzzforge_ai"
}
},
"results": []
}
]
}
# Convert findings to SARIF results
for finding in findings:
sarif_result = {
"ruleId": finding.get("id", "unknown"),
"level": _severity_to_sarif_level(finding.get("severity", "warning")),
"message": {
"text": finding.get("title", "Security issue detected")
},
"locations": []
}
# Add description if present
if finding.get("description"):
sarif_result["message"]["markdown"] = finding["description"]
# Add location if file path is present
if finding.get("file_path"):
location = {
"physicalLocation": {
"artifactLocation": {
"uri": finding["file_path"]
}
}
}
# Add region if line number is present
if finding.get("line_start"):
location["physicalLocation"]["region"] = {
"startLine": finding["line_start"]
}
if finding.get("line_end"):
location["physicalLocation"]["region"]["endLine"] = finding["line_end"]
sarif_result["locations"].append(location)
sarif_report["runs"][0]["results"].append(sarif_result)
activity.logger.info(f"Generated SARIF report with {len(sarif_report['runs'][0]['results'])} results")
return sarif_report
def _severity_to_sarif_level(severity: str) -> str:
"""Convert severity to SARIF level"""
severity_map = {
"critical": "error",
"high": "error",
"medium": "warning",
"low": "note",
"info": "note"
}
return severity_map.get(severity.lower(), "warning")
@activity.defn(name="analyze_with_llm")
async def analyze_with_llm(target_path: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyze code using LLM.
Args:
target_path: Path to the workspace containing code
config: LLM analyzer configuration
Returns:
Dictionary containing findings and summary
"""
activity.logger.info(f"Starting LLM analysis: {target_path}")
activity.logger.info(f"Config: {config}")
workspace = Path(target_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {target_path}")
# Create and execute LLM analyzer
analyzer = LLMAnalyzer()
# Validate configuration
analyzer.validate_config(config)
# Execute analysis
result = await analyzer.execute(config, workspace)
if result.status == "failed":
raise RuntimeError(f"LLM analysis failed: {result.error or 'Unknown error'}")
activity.logger.info(
f"LLM analysis completed: {len(result.findings)} findings from "
f"{result.summary.get('files_analyzed', 0)} files"
)
# Convert ModuleFinding objects to dicts for serialization
findings_dicts = [finding.model_dump() for finding in result.findings]
return {
"findings": findings_dicts,
"summary": result.summary
}
@@ -0,0 +1,64 @@
name: llm_analysis
version: "1.0.0"
vertical: python
description: "Uses AI/LLM to analyze code for security vulnerabilities and code quality issues"
author: "FuzzForge Team"
tags:
- "llm"
- "ai"
- "security"
- "static-analysis"
- "code-quality"
# Workspace isolation mode
workspace_isolation: "shared"
default_parameters:
agent_url: "http://fuzzforge-task-agent:8000/a2a/litellm_agent"
llm_model: "gpt-4o-mini"
llm_provider: "openai"
max_files: 5
parameters:
type: object
properties:
agent_url:
type: string
description: "A2A agent endpoint URL"
llm_model:
type: string
description: "LLM model to use (e.g., gpt-4o-mini, claude-3-5-sonnet)"
llm_provider:
type: string
description: "LLM provider (openai, anthropic, etc.)"
file_patterns:
type: array
items:
type: string
description: "File patterns to analyze (e.g., ['*.py', '*.js'])"
max_files:
type: integer
description: "Maximum number of files to analyze"
max_file_size:
type: integer
description: "Maximum file size in bytes"
timeout:
type: integer
description: "Timeout per file in seconds"
output_schema:
type: object
properties:
sarif:
type: object
description: "SARIF-formatted security findings from LLM"
summary:
type: object
description: "Analysis summary"
properties:
files_analyzed:
type: integer
total_findings:
type: integer
model_used:
type: string
@@ -0,0 +1,236 @@
"""
LLM Analysis Workflow - Temporal Version
Uses AI/LLM to analyze code for security issues.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
from datetime import timedelta
from typing import Dict, Any, Optional
from temporalio import workflow
from temporalio.common import RetryPolicy
# Import for type hints (will be executed by worker)
with workflow.unsafe.imports_passed_through():
import logging
logger = logging.getLogger(__name__)
@workflow.defn
class LlmAnalysisWorkflow:
"""
Analyze code using AI/LLM for security vulnerabilities.
User workflow:
1. User runs: ff workflow run llm_analysis .
2. CLI uploads project to MinIO
3. Worker downloads project
4. Worker calls LLM analyzer module
5. LLM analyzes code files and reports findings
6. Results returned in SARIF format
"""
@workflow.run
async def run(
self,
target_id: str, # MinIO UUID of uploaded user code
agent_url: Optional[str] = None,
llm_model: Optional[str] = None,
llm_provider: Optional[str] = None,
file_patterns: Optional[list] = None,
max_files: Optional[int] = None,
max_file_size: Optional[int] = None,
timeout: Optional[int] = None
) -> Dict[str, Any]:
"""
Main workflow execution.
Args:
target_id: UUID of the uploaded target in MinIO
agent_url: A2A agent endpoint URL
llm_model: LLM model to use
llm_provider: LLM provider
file_patterns: File patterns to analyze
max_files: Maximum number of files to analyze
max_file_size: Maximum file size in bytes
timeout: Timeout per file in seconds
Returns:
Dictionary containing findings and summary
"""
workflow_id = workflow.info().workflow_id
workflow.logger.info(
f"Starting LLMAnalysisWorkflow "
f"(workflow_id={workflow_id}, target_id={target_id}, model={llm_model})"
)
results = {
"workflow_id": workflow_id,
"target_id": target_id,
"status": "running",
"steps": [],
"findings": []
}
try:
# Get run ID for workspace isolation
run_id = workflow.info().run_id
# Step 1: Download user's project from MinIO
workflow.logger.info("Step 1: Downloading user code from MinIO")
target_path = await workflow.execute_activity(
"get_target",
args=[target_id, run_id, "shared"],
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
maximum_attempts=3
)
)
results["steps"].append({
"step": "download",
"status": "success",
"target_path": target_path
})
workflow.logger.info(f"✓ Target downloaded to: {target_path}")
# Step 2: Run LLM analysis
workflow.logger.info("Step 2: Analyzing code with LLM")
# Build analyzer config
analyzer_config = {}
if agent_url:
analyzer_config["agent_url"] = agent_url
if llm_model:
analyzer_config["llm_model"] = llm_model
if llm_provider:
analyzer_config["llm_provider"] = llm_provider
if file_patterns:
analyzer_config["file_patterns"] = file_patterns
if max_files is not None:
analyzer_config["max_files"] = max_files
if max_file_size is not None:
analyzer_config["max_file_size"] = max_file_size
if timeout is not None:
analyzer_config["timeout"] = timeout
analysis_results = await workflow.execute_activity(
"analyze_with_llm",
args=[target_path, analyzer_config],
start_to_close_timeout=timedelta(minutes=30), # LLM calls can be slow
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5),
maximum_interval=timedelta(minutes=1),
maximum_attempts=2
)
)
findings = analysis_results.get("findings", [])
summary = analysis_results.get("summary", {})
results["steps"].append({
"step": "llm_analysis",
"status": "success",
"files_analyzed": summary.get("files_analyzed", 0),
"findings_count": len(findings)
})
workflow.logger.info(
f"✓ LLM analysis completed: "
f"{summary.get('files_analyzed', 0)} files, "
f"{len(findings)} findings"
)
# Step 3: Generate SARIF report
workflow.logger.info("Step 3: Generating SARIF report")
sarif_report = await workflow.execute_activity(
"llm_generate_sarif",
args=[findings, {
"tool_name": "llm-analyzer",
"tool_version": "1.0.0",
"run_id": run_id
}],
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
maximum_attempts=3
)
)
results["steps"].append({
"step": "sarif_generation",
"status": "success",
"results_count": len(sarif_report.get("runs", [{}])[0].get("results", []))
})
workflow.logger.info(
f"✓ SARIF report generated: "
f"{len(sarif_report.get('runs', [{}])[0].get('results', []))} results"
)
# Step 4: Upload results to MinIO
workflow.logger.info("Step 4: Uploading results to MinIO")
# Upload SARIF report
if sarif_report:
results_url = await workflow.execute_activity(
"upload_results",
args=[run_id, sarif_report],
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
maximum_attempts=3
)
)
results["results_url"] = results_url
workflow.logger.info(f"✓ Results uploaded to: {results_url}")
# Step 5: Cleanup cache
workflow.logger.info("Step 5: Cleaning up cache")
await workflow.execute_activity(
"cleanup_cache",
args=[target_id],
start_to_close_timeout=timedelta(minutes=2),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
maximum_attempts=2
)
)
workflow.logger.info("✓ Cache cleaned up")
# Mark workflow as successful
results["status"] = "success"
results["sarif"] = sarif_report
results["summary"] = summary
results["findings"] = findings
workflow.logger.info(
f"✅ LLMAnalysisWorkflow completed successfully: "
f"{len(findings)} findings"
)
except Exception as e:
workflow.logger.error(f"❌ Workflow failed: {e}")
results["status"] = "failed"
results["error"] = str(e)
raise
return results
+9 -36
View File
@@ -18,7 +18,6 @@ import os
import typer
from rich.console import Console
from rich.table import Table
console = Console()
@@ -49,41 +48,15 @@ def ai_agent() -> None:
@app.command("status")
def ai_status() -> None:
"""Show AI system health and configuration."""
try:
status = asyncio.run(get_ai_status_async())
except Exception as exc: # pragma: no cover
console.print(f"[red]Failed to get AI status:[/red] {exc}")
raise typer.Exit(1) from exc
console.print("[bold cyan]🤖 FuzzForge AI System Status[/bold cyan]\n")
config_table = Table(title="Configuration", show_header=True, header_style="bold magenta")
config_table.add_column("Setting", style="bold")
config_table.add_column("Value", style="cyan")
config_table.add_column("Status", style="green")
for key, info in status["config"].items():
status_icon = "" if info["configured"] else ""
display_value = info["value"] if info["value"] else "-"
config_table.add_row(key, display_value, f"{status_icon}")
console.print(config_table)
console.print()
components_table = Table(title="AI Components", show_header=True, header_style="bold magenta")
components_table.add_column("Component", style="bold")
components_table.add_column("Status", style="green")
components_table.add_column("Details", style="dim")
for component, info in status["components"].items():
status_icon = "🟢" if info["available"] else "🔴"
components_table.add_row(component, status_icon, info["details"])
console.print(components_table)
if status["agents"]:
console.print()
console.print(f"[bold green]✓[/bold green] {len(status['agents'])} agents registered")
# TODO: Implement AI status checking
# This command is a placeholder for future health monitoring functionality
console.print("🚧 [yellow]AI status command is not yet implemented.[/yellow]")
console.print("\nPlanned features:")
console.print(" • LLM provider connectivity")
console.print(" • API key validation")
console.print(" • Registered agents status")
console.print(" • Memory/session persistence health")
console.print("\nFor now, use [cyan]ff ai agent[/cyan] to launch the AI agent.")
@app.command("server")
+1 -4
View File
@@ -205,9 +205,6 @@ def _ensure_env_file(fuzzforge_dir: Path, force: bool) -> None:
console=console,
)
enable_cognee = False
cognee_url = ""
session_db_path = fuzzforge_dir / "fuzzforge_sessions.db"
session_db_rel = session_db_path.relative_to(fuzzforge_dir.parent)
@@ -219,7 +216,7 @@ def _ensure_env_file(fuzzforge_dir: Path, force: bool) -> None:
f"LLM_MODEL={llm_model}",
f"LITELLM_MODEL={llm_model}",
f"OPENAI_API_KEY={api_key}",
f"FUZZFORGE_MCP_URL={os.getenv('FUZZFORGE_MCP_URL', 'http://localhost:8010/mcp')}",
"FUZZFORGE_MCP_URL=http://localhost:8010/mcp",
"",
"# Cognee configuration mirrors the primary LLM by default",
f"LLM_COGNEE_PROVIDER={llm_provider}",
+6 -6
View File
@@ -403,12 +403,12 @@ class ProjectConfigManager:
if max_tokens:
os.environ["LLM_MAX_TOKENS"] = str(max_tokens)
# Disabled - FuzzForge MCP backend connection
# if not os.getenv("FUZZFORGE_MCP_URL"):
# os.environ["FUZZFORGE_MCP_URL"] = os.getenv(
# "FUZZFORGE_DEFAULT_MCP_URL",
# "http://localhost:8010/mcp",
# )
# FuzzForge MCP backend connection - fallback if not in .env
if not os.getenv("FUZZFORGE_MCP_URL"):
os.environ["FUZZFORGE_MCP_URL"] = os.getenv(
"FUZZFORGE_DEFAULT_MCP_URL",
"http://localhost:8010/mcp",
)
def refresh(self) -> None:
"""Reload configuration from disk."""
+3 -3
View File
@@ -152,7 +152,7 @@ class FuzzForgeDatabase:
if conn:
try:
conn.rollback()
except:
except Exception:
pass # Connection might be broken
if "database is locked" in str(e).lower():
raise sqlite3.OperationalError(
@@ -167,14 +167,14 @@ class FuzzForgeDatabase:
if conn:
try:
conn.rollback()
except:
except Exception:
pass # Connection might be broken
raise
finally:
if conn:
try:
conn.close()
except:
except Exception:
pass # Ensure cleanup even if close fails
# Run management methods
-6
View File
@@ -259,12 +259,6 @@ def handle_error(error: Exception, context: str = "") -> None:
if hasattr(error, 'context') and error.context:
ctx = error.context
# Container diagnostics
if ctx.container_diagnostics:
console.print("\n[bold]Container Diagnostics:[/bold]")
display_container_diagnostics(ctx.container_diagnostics)
display_container_logs(ctx.container_diagnostics)
# Error patterns
if ctx.error_patterns:
display_error_patterns(ctx.error_patterns)
-1
View File
@@ -20,7 +20,6 @@ from typing import Optional, List
import sys
from .commands import (
init,
workflows,
workflow_exec,
findings,
+2
View File
@@ -259,6 +259,8 @@ services:
volumes:
# Mount workflow code (read-only) for dynamic discovery
- ./backend/toolbox:/app/toolbox:ro
# Mount AI module for A2A wrapper access
- ./ai/src:/app/ai_src:ro
# Worker cache for downloaded targets
- worker_python_cache:/cache
networks:
+1
View File
@@ -1,3 +1,4 @@
# ruff: noqa: E402 # Imports delayed for environment/logging setup
#!/usr/bin/env python3
"""
Quick smoke test for SDK exception handling after exceptions.py modifications.
+3 -2
View File
@@ -1,3 +1,4 @@
# ruff: noqa: E402 # Imports delayed for environment/logging setup
#!/usr/bin/env python3
"""
Test script for A2A wrapper module
@@ -103,7 +104,7 @@ async def test_multi_turn():
context="geography_quiz",
timeout=30
)
print(f"Q1: What is the capital of France?")
print("Q1: What is the capital of France?")
print(f"A1: {result1.text}")
print()
@@ -114,7 +115,7 @@ async def test_multi_turn():
context="geography_quiz", # Same context
timeout=30
)
print(f"Q2: What is the population of that city?")
print("Q2: What is the population of that city?")
print(f"A2: {result2.text}")
print()
+2 -2
View File
@@ -35,8 +35,8 @@ RUN mkdir -p /cache && chmod 755 /cache
# Copy worker entrypoint
COPY worker.py /app/worker.py
# Add toolbox to Python path (mounted at runtime)
ENV PYTHONPATH="/app:/app/toolbox:${PYTHONPATH}"
# Add toolbox and AI module to Python path (mounted at runtime)
ENV PYTHONPATH="/app:/app/toolbox:/app/ai_src:${PYTHONPATH}"
ENV PYTHONUNBUFFERED=1
# Healthcheck
+3
View File
@@ -11,5 +11,8 @@ pyyaml>=6.0.0
# HTTP Client (for real-time stats reporting)
httpx>=0.27.0
# A2A Agent Communication
a2a-sdk[all]>=0.1.0
# Fuzzing
atheris>=2.3.0