mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-03-13 07:06:46 +00:00
Compare commits
20 Commits
refactor/r
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
07c32de294 | ||
|
|
bc5e9373ce | ||
|
|
73a0170d65 | ||
|
|
6cdd0caec0 | ||
|
|
462f6ed408 | ||
|
|
9cfbc29677 | ||
|
|
6ced81affc | ||
|
|
b975d285c6 | ||
|
|
1891a43189 | ||
|
|
a3441676a3 | ||
|
|
f192771b9b | ||
|
|
976947cf5c | ||
|
|
544569ddbd | ||
|
|
6f967fff63 | ||
|
|
47c254e2bd | ||
|
|
b137f48e7f | ||
|
|
f8002254e5 | ||
|
|
f2dca0a7e7 | ||
|
|
9376645197 | ||
|
|
3e0d1cd02f |
86
.github/workflows/ci.yml
vendored
Normal file
86
.github/workflows/ci.yml
vendored
Normal file
@@ -0,0 +1,86 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main, dev, feature/*]
|
||||
pull_request:
|
||||
branches: [main, dev]
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
lint-and-typecheck:
|
||||
name: Lint & Type Check
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v5
|
||||
with:
|
||||
version: "latest"
|
||||
|
||||
- name: Set up Python
|
||||
run: uv python install 3.14
|
||||
|
||||
- name: Install dependencies
|
||||
run: uv sync
|
||||
|
||||
- name: Ruff check (fuzzforge-cli)
|
||||
run: |
|
||||
cd fuzzforge-cli
|
||||
uv run --extra lints ruff check src/
|
||||
|
||||
- name: Ruff check (fuzzforge-mcp)
|
||||
run: |
|
||||
cd fuzzforge-mcp
|
||||
uv run --extra lints ruff check src/
|
||||
|
||||
- name: Ruff check (fuzzforge-common)
|
||||
run: |
|
||||
cd fuzzforge-common
|
||||
uv run --extra lints ruff check src/
|
||||
|
||||
- name: Mypy type check (fuzzforge-cli)
|
||||
run: |
|
||||
cd fuzzforge-cli
|
||||
uv run --extra lints mypy src/
|
||||
|
||||
- name: Mypy type check (fuzzforge-mcp)
|
||||
run: |
|
||||
cd fuzzforge-mcp
|
||||
uv run --extra lints mypy src/
|
||||
|
||||
# NOTE: Mypy check for fuzzforge-common temporarily disabled
|
||||
# due to 37 pre-existing type errors in legacy code.
|
||||
# TODO: Fix type errors and re-enable strict checking
|
||||
#- name: Mypy type check (fuzzforge-common)
|
||||
# run: |
|
||||
# cd fuzzforge-common
|
||||
# uv run --extra lints mypy src/
|
||||
|
||||
test:
|
||||
name: Tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v5
|
||||
with:
|
||||
version: "latest"
|
||||
|
||||
- name: Set up Python
|
||||
run: uv python install 3.14
|
||||
|
||||
- name: Install dependencies
|
||||
run: uv sync --all-extras
|
||||
|
||||
- name: Run MCP tests
|
||||
run: |
|
||||
cd fuzzforge-mcp
|
||||
uv run --extra tests pytest -v
|
||||
|
||||
- name: Run common tests
|
||||
run: |
|
||||
cd fuzzforge-common
|
||||
uv run --extra tests pytest -v
|
||||
49
.github/workflows/mcp-server.yml
vendored
Normal file
49
.github/workflows/mcp-server.yml
vendored
Normal file
@@ -0,0 +1,49 @@
|
||||
name: MCP Server Smoke Test
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main, dev]
|
||||
pull_request:
|
||||
branches: [main, dev]
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
mcp-server:
|
||||
name: MCP Server Test
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v5
|
||||
with:
|
||||
version: "latest"
|
||||
|
||||
- name: Set up Python
|
||||
run: uv python install 3.14
|
||||
|
||||
- name: Install dependencies
|
||||
run: uv sync --all-extras
|
||||
|
||||
- name: Start MCP server in background
|
||||
run: |
|
||||
cd fuzzforge-mcp
|
||||
nohup uv run python -m fuzzforge_mcp.server > server.log 2>&1 &
|
||||
echo $! > server.pid
|
||||
sleep 3
|
||||
|
||||
- name: Run MCP tool tests
|
||||
run: |
|
||||
cd fuzzforge-mcp
|
||||
uv run --extra tests pytest tests/test_resources.py -v
|
||||
|
||||
- name: Stop MCP server
|
||||
if: always()
|
||||
run: |
|
||||
if [ -f fuzzforge-mcp/server.pid ]; then
|
||||
kill $(cat fuzzforge-mcp/server.pid) || true
|
||||
fi
|
||||
|
||||
- name: Show server logs
|
||||
if: failure()
|
||||
run: cat fuzzforge-mcp/server.log || true
|
||||
426
USAGE.md
426
USAGE.md
@@ -1,8 +1,9 @@
|
||||
# FuzzForge AI Usage Guide
|
||||
|
||||
This guide covers everything you need to know to get started with FuzzForge AI - from installation to running your first security research workflow with AI.
|
||||
This guide covers everything you need to know to get started with FuzzForge AI — from installation to linking your first MCP hub and running security research workflows with AI.
|
||||
|
||||
> **FuzzForge is designed to be used with AI agents** (GitHub Copilot, Claude, etc.) via MCP.
|
||||
> A terminal UI (`fuzzforge ui`) is provided for managing agents and hubs.
|
||||
> The CLI is available for advanced users but the primary experience is through natural language interaction with your AI assistant.
|
||||
|
||||
---
|
||||
@@ -12,8 +13,17 @@ This guide covers everything you need to know to get started with FuzzForge AI -
|
||||
- [Quick Start](#quick-start)
|
||||
- [Prerequisites](#prerequisites)
|
||||
- [Installation](#installation)
|
||||
- [Building Modules](#building-modules)
|
||||
- [MCP Server Configuration](#mcp-server-configuration)
|
||||
- [Terminal UI](#terminal-ui)
|
||||
- [Launching the UI](#launching-the-ui)
|
||||
- [Dashboard](#dashboard)
|
||||
- [Agent Setup](#agent-setup)
|
||||
- [Hub Manager](#hub-manager)
|
||||
- [MCP Hub System](#mcp-hub-system)
|
||||
- [What is an MCP Hub?](#what-is-an-mcp-hub)
|
||||
- [FuzzingLabs Security Hub](#fuzzinglabs-security-hub)
|
||||
- [Linking a Custom Hub](#linking-a-custom-hub)
|
||||
- [Building Hub Images](#building-hub-images)
|
||||
- [MCP Server Configuration (CLI)](#mcp-server-configuration-cli)
|
||||
- [GitHub Copilot](#github-copilot)
|
||||
- [Claude Code (CLI)](#claude-code-cli)
|
||||
- [Claude Desktop](#claude-desktop)
|
||||
@@ -27,7 +37,7 @@ This guide covers everything you need to know to get started with FuzzForge AI -
|
||||
## Quick Start
|
||||
|
||||
> **Prerequisites:** You need [uv](https://docs.astral.sh/uv/) and [Docker](https://docs.docker.com/get-docker/) installed.
|
||||
> See the [Prerequisites](#prerequisites) section for installation instructions.
|
||||
> See the [Prerequisites](#prerequisites) section for details.
|
||||
|
||||
```bash
|
||||
# 1. Clone and install
|
||||
@@ -35,20 +45,35 @@ git clone https://github.com/FuzzingLabs/fuzzforge_ai.git
|
||||
cd fuzzforge_ai
|
||||
uv sync
|
||||
|
||||
# 2. Build the module images (one-time setup)
|
||||
make build-modules
|
||||
# 2. Launch the terminal UI
|
||||
uv run fuzzforge ui
|
||||
|
||||
# 3. Install MCP for your AI agent
|
||||
uv run fuzzforge mcp install copilot # For VS Code + GitHub Copilot
|
||||
# OR
|
||||
uv run fuzzforge mcp install claude-code # For Claude Code CLI
|
||||
# 3. Press 'h' → "FuzzingLabs Hub" to clone & link the default security hub
|
||||
# 4. Select an agent row and press Enter to install the MCP server for your agent
|
||||
# 5. Build the Docker images for the hub tools (required before tools can run)
|
||||
./scripts/build-hub-images.sh
|
||||
|
||||
# 4. Restart your AI agent (VS Code, Claude, etc.)
|
||||
|
||||
# 5. Start talking to your AI:
|
||||
# "List available FuzzForge modules"
|
||||
# 6. Restart your AI agent and start talking:
|
||||
# "What security tools are available?"
|
||||
# "Scan this binary with binwalk and yara"
|
||||
# "Analyze this Rust crate for fuzzable functions"
|
||||
# "Start fuzzing the parse_input function"
|
||||
```
|
||||
|
||||
Or do it entirely from the command line:
|
||||
|
||||
```bash
|
||||
# Install MCP for your AI agent
|
||||
uv run fuzzforge mcp install copilot # For VS Code + GitHub Copilot
|
||||
# OR
|
||||
uv run fuzzforge mcp install claude-code # For Claude Code CLI
|
||||
|
||||
# Clone and link the default security hub
|
||||
git clone git@github.com:FuzzingLabs/mcp-security-hub.git ~/.fuzzforge/hubs/mcp-security-hub
|
||||
|
||||
# Build hub tool images (required — tools only run once their image is built)
|
||||
./scripts/build-hub-images.sh
|
||||
|
||||
# Restart your AI agent — done!
|
||||
```
|
||||
|
||||
> **Note:** FuzzForge uses Docker by default. Podman is also supported via `--engine podman`.
|
||||
@@ -59,9 +84,10 @@ uv run fuzzforge mcp install claude-code # For Claude Code CLI
|
||||
|
||||
Before installing FuzzForge AI, ensure you have:
|
||||
|
||||
- **Python 3.12+** - [Download Python](https://www.python.org/downloads/)
|
||||
- **uv** package manager - [Install uv](https://docs.astral.sh/uv/)
|
||||
- **Docker** - Container runtime ([Install Docker](https://docs.docker.com/get-docker/))
|
||||
- **Python 3.12+** — [Download Python](https://www.python.org/downloads/)
|
||||
- **uv** package manager — [Install uv](https://docs.astral.sh/uv/)
|
||||
- **Docker** — Container runtime ([Install Docker](https://docs.docker.com/get-docker/))
|
||||
- **Git** — For cloning hub repositories
|
||||
|
||||
### Installing uv
|
||||
|
||||
@@ -115,74 +141,164 @@ uv run fuzzforge --help
|
||||
|
||||
---
|
||||
|
||||
## Building Modules
|
||||
## Terminal UI
|
||||
|
||||
FuzzForge modules are containerized security tools. After cloning, you need to build them once:
|
||||
FuzzForge ships with a terminal user interface (TUI) built on [Textual](https://textual.textualize.io/) for managing AI agents and MCP hub servers from a single dashboard.
|
||||
|
||||
### Build All Modules
|
||||
### Launching the UI
|
||||
|
||||
```bash
|
||||
# From the fuzzforge_ai directory
|
||||
make build-modules
|
||||
uv run fuzzforge ui
|
||||
```
|
||||
|
||||
This builds all available modules:
|
||||
- `fuzzforge-rust-analyzer` - Analyzes Rust code for fuzzable functions
|
||||
- `fuzzforge-cargo-fuzzer` - Runs cargo-fuzz on Rust crates
|
||||
- `fuzzforge-harness-validator` - Validates generated fuzzing harnesses
|
||||
- `fuzzforge-crash-analyzer` - Analyzes crash inputs
|
||||
### Dashboard
|
||||
|
||||
### Build a Single Module
|
||||
The main screen is split into two panels:
|
||||
|
||||
```bash
|
||||
# Build a specific module
|
||||
cd fuzzforge-modules/rust-analyzer
|
||||
make build
|
||||
```
|
||||
| Panel | Content |
|
||||
|-------|---------|
|
||||
| **AI Agents** (left) | Shows GitHub Copilot, Claude Desktop, and Claude Code with live link status and config file path |
|
||||
| **Hub Servers** (right) | Shows all configured MCP hub tools with Docker image name, source hub, and build status (✓ Ready / ✗ Not built) |
|
||||
|
||||
### Verify Modules are Built
|
||||
### Keyboard Shortcuts
|
||||
|
||||
```bash
|
||||
# List built module images
|
||||
docker images | grep fuzzforge
|
||||
```
|
||||
| Key | Action |
|
||||
|-----|--------|
|
||||
| `Enter` | **Select** — Act on the selected row (setup/unlink an agent) |
|
||||
| `h` | **Hub Manager** — Open the hub management screen |
|
||||
| `r` | **Refresh** — Re-check all agent and hub statuses |
|
||||
| `q` | **Quit** |
|
||||
|
||||
You should see something like:
|
||||
```
|
||||
fuzzforge-rust-analyzer 0.1.0 abc123def456 2 minutes ago 850 MB
|
||||
fuzzforge-cargo-fuzzer 0.1.0 789ghi012jkl 2 minutes ago 1.2 GB
|
||||
...
|
||||
```
|
||||
### Agent Setup
|
||||
|
||||
Select an agent row in the AI Agents table and press `Enter`:
|
||||
|
||||
- **If the agent is not linked** → a setup dialog opens asking for your container engine (Docker or Podman), then installs the FuzzForge MCP configuration
|
||||
- **If the agent is already linked** → a confirmation dialog offers to unlink it (removes the `fuzzforge` entry without touching other MCP servers)
|
||||
|
||||
The setup auto-detects:
|
||||
- FuzzForge installation root
|
||||
- Docker/Podman socket path
|
||||
- Hub configuration from `hub-config.json`
|
||||
|
||||
### Hub Manager
|
||||
|
||||
Press `h` to open the hub manager. This is where you manage your MCP hub repositories:
|
||||
|
||||
| Button | Action |
|
||||
|--------|--------|
|
||||
| **FuzzingLabs Hub** | One-click clone of the official [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub) repository — clones to `~/.fuzzforge/hubs/mcp-security-hub`, scans for tools, and registers them in `hub-config.json` |
|
||||
| **Link Path** | Link any local directory as a hub — enter a name and path, FuzzForge scans it for `category/tool-name/Dockerfile` patterns |
|
||||
| **Clone URL** | Clone any git repository and link it as a hub |
|
||||
| **Remove** | Unlink the selected hub and remove its servers from the configuration |
|
||||
|
||||
The hub table shows:
|
||||
- **Name** — Hub name (★ prefix for the default hub)
|
||||
- **Path** — Local directory path
|
||||
- **Servers** — Number of MCP tools discovered
|
||||
- **Source** — Git URL or "local"
|
||||
|
||||
---
|
||||
|
||||
## MCP Server Configuration
|
||||
## MCP Hub System
|
||||
|
||||
FuzzForge integrates with AI agents through the Model Context Protocol (MCP). Configure your preferred AI agent to use FuzzForge tools.
|
||||
### What is an MCP Hub?
|
||||
|
||||
An MCP hub is a directory containing one or more containerized MCP tools, organized by category:
|
||||
|
||||
```
|
||||
my-hub/
|
||||
├── category-a/
|
||||
│ ├── tool-1/
|
||||
│ │ └── Dockerfile
|
||||
│ └── tool-2/
|
||||
│ └── Dockerfile
|
||||
├── category-b/
|
||||
│ └── tool-3/
|
||||
│ └── Dockerfile
|
||||
└── ...
|
||||
```
|
||||
|
||||
FuzzForge scans for the pattern `category/tool-name/Dockerfile` and auto-generates server configuration entries for each discovered tool.
|
||||
|
||||
### FuzzingLabs Security Hub
|
||||
|
||||
The default MCP hub is [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub), maintained by FuzzingLabs. It includes **40+ security tools** across categories:
|
||||
|
||||
| Category | Tools |
|
||||
|----------|-------|
|
||||
| **Reconnaissance** | nmap, masscan, shodan, zoomeye, whatweb, pd-tools, externalattacker, networksdb |
|
||||
| **Binary Analysis** | binwalk, yara, capa, radare2, ghidra, ida |
|
||||
| **Code Security** | semgrep, rust-analyzer, harness-tester, cargo-fuzzer, crash-analyzer |
|
||||
| **Web Security** | nuclei, nikto, sqlmap, ffuf, burp, waybackurls |
|
||||
| **Fuzzing** | boofuzz, dharma |
|
||||
| **Exploitation** | searchsploit |
|
||||
| **Secrets** | gitleaks |
|
||||
| **Cloud Security** | trivy, prowler, roadrecon |
|
||||
| **OSINT** | maigret, dnstwist |
|
||||
| **Threat Intel** | virustotal, otx |
|
||||
| **Password Cracking** | hashcat |
|
||||
| **Blockchain** | medusa, solazy, daml-viewer |
|
||||
|
||||
**Clone it via the UI:**
|
||||
|
||||
1. `uv run fuzzforge ui`
|
||||
2. Press `h` → click **FuzzingLabs Hub**
|
||||
3. Wait for the clone to finish — servers are auto-registered
|
||||
|
||||
**Or clone manually:**
|
||||
|
||||
```bash
|
||||
git clone git@github.com:FuzzingLabs/mcp-security-hub.git ~/.fuzzforge/hubs/mcp-security-hub
|
||||
```
|
||||
|
||||
### Linking a Custom Hub
|
||||
|
||||
You can link any directory that follows the `category/tool-name/Dockerfile` layout:
|
||||
|
||||
**Via the UI:**
|
||||
|
||||
1. Press `h` → **Link Path**
|
||||
2. Enter a name and the directory path
|
||||
|
||||
**Via the CLI (planned):** Not yet available — use the UI.
|
||||
|
||||
### Building Hub Images
|
||||
|
||||
After linking a hub, you need to build the Docker images before the tools can be used:
|
||||
|
||||
```bash
|
||||
# Build all images from the default security hub
|
||||
./scripts/build-hub-images.sh
|
||||
|
||||
# Or build a single tool image
|
||||
docker build -t semgrep-mcp:latest mcp-security-hub/code-security/semgrep-mcp/
|
||||
```
|
||||
|
||||
The dashboard hub table shows ✓ Ready for built images and ✗ Not built for missing ones.
|
||||
|
||||
---
|
||||
|
||||
## MCP Server Configuration (CLI)
|
||||
|
||||
If you prefer the command line over the TUI, you can configure agents directly:
|
||||
|
||||
### GitHub Copilot
|
||||
|
||||
```bash
|
||||
# That's it! Just run this command:
|
||||
uv run fuzzforge mcp install copilot
|
||||
```
|
||||
|
||||
The command auto-detects everything:
|
||||
- **FuzzForge root** - Where FuzzForge is installed
|
||||
- **Modules path** - Defaults to `fuzzforge_ai/fuzzforge-modules`
|
||||
- **Docker socket** - Auto-detects `/var/run/docker.sock`
|
||||
The command auto-detects:
|
||||
- **FuzzForge root** — Where FuzzForge is installed
|
||||
- **Docker socket** — Auto-detects `/var/run/docker.sock`
|
||||
|
||||
**Optional overrides** (usually not needed):
|
||||
**Optional overrides:**
|
||||
```bash
|
||||
uv run fuzzforge mcp install copilot \
|
||||
--modules /path/to/modules \
|
||||
--engine podman # if using Podman instead of Docker
|
||||
uv run fuzzforge mcp install copilot --engine podman
|
||||
```
|
||||
|
||||
**After installation:**
|
||||
1. Restart VS Code
|
||||
2. Open GitHub Copilot Chat
|
||||
3. FuzzForge tools are now available!
|
||||
**After installation:** Restart VS Code. FuzzForge tools appear in GitHub Copilot Chat.
|
||||
|
||||
### Claude Code (CLI)
|
||||
|
||||
@@ -190,143 +306,89 @@ uv run fuzzforge mcp install copilot \
|
||||
uv run fuzzforge mcp install claude-code
|
||||
```
|
||||
|
||||
Installs to `~/.claude.json` so FuzzForge tools are available from any directory.
|
||||
|
||||
**After installation:**
|
||||
1. Run `claude` from any directory
|
||||
2. FuzzForge tools are now available!
|
||||
Installs to `~/.claude.json`. FuzzForge tools are available from any directory after restarting Claude.
|
||||
|
||||
### Claude Desktop
|
||||
|
||||
```bash
|
||||
# Automatic installation
|
||||
uv run fuzzforge mcp install claude-desktop
|
||||
|
||||
# Verify
|
||||
uv run fuzzforge mcp status
|
||||
```
|
||||
|
||||
**After installation:**
|
||||
1. Restart Claude Desktop
|
||||
2. FuzzForge tools are now available!
|
||||
**After installation:** Restart Claude Desktop.
|
||||
|
||||
### Check MCP Status
|
||||
### Check Status
|
||||
|
||||
```bash
|
||||
uv run fuzzforge mcp status
|
||||
```
|
||||
|
||||
Shows configuration status for all supported AI agents:
|
||||
|
||||
```
|
||||
┏━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓
|
||||
┃ Agent ┃ Config Path ┃ Status ┃ FuzzForge Configured ┃
|
||||
┡━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩
|
||||
│ GitHub Copilot │ ~/.config/Code/User/mcp.json │ ✓ Exists │ ✓ Yes │
|
||||
│ Claude Desktop │ ~/.config/Claude/claude_desktop_config... │ Not found │ - │
|
||||
│ Claude Code │ ~/.claude.json │ ✓ Exists │ ✓ Yes │
|
||||
└──────────────────────┴───────────────────────────────────────────┴──────────────┴─────────────────────────┘
|
||||
```
|
||||
|
||||
### Generate Config Without Installing
|
||||
|
||||
```bash
|
||||
# Preview the configuration that would be installed
|
||||
uv run fuzzforge mcp generate copilot
|
||||
uv run fuzzforge mcp generate claude-desktop
|
||||
uv run fuzzforge mcp generate claude-code
|
||||
```
|
||||
|
||||
### Remove MCP Configuration
|
||||
### Remove Configuration
|
||||
|
||||
```bash
|
||||
uv run fuzzforge mcp uninstall copilot
|
||||
uv run fuzzforge mcp uninstall claude-desktop
|
||||
uv run fuzzforge mcp uninstall claude-code
|
||||
uv run fuzzforge mcp uninstall claude-desktop
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Using FuzzForge with AI
|
||||
|
||||
Once MCP is configured, you interact with FuzzForge through natural language with your AI assistant.
|
||||
Once MCP is configured and hub images are built, interact with FuzzForge through natural language with your AI assistant.
|
||||
|
||||
### Example Conversations
|
||||
|
||||
**Discover available tools:**
|
||||
```
|
||||
You: "What FuzzForge modules are available?"
|
||||
AI: Uses list_modules → "I found 4 modules: rust-analyzer, cargo-fuzzer,
|
||||
harness-validator, and crash-analyzer..."
|
||||
You: "What security tools are available in FuzzForge?"
|
||||
AI: Queries hub tools → "I found 15 tools across categories: nmap for
|
||||
port scanning, binwalk for firmware analysis, semgrep for code
|
||||
scanning, cargo-fuzzer for Rust fuzzing..."
|
||||
```
|
||||
|
||||
**Analyze code for fuzzing targets:**
|
||||
**Analyze a binary:**
|
||||
```
|
||||
You: "Extract and analyze this firmware image"
|
||||
AI: Uses binwalk to extract → yara for pattern matching → capa for
|
||||
capability detection → "Found 3 embedded filesystems, 2 YARA
|
||||
matches for known vulnerabilities..."
|
||||
```
|
||||
|
||||
**Fuzz Rust code:**
|
||||
```
|
||||
You: "Analyze this Rust crate for functions I should fuzz"
|
||||
AI: Uses execute_module("rust-analyzer") → "I found 3 good fuzzing candidates:
|
||||
- parse_input() in src/parser.rs - handles untrusted input
|
||||
- decode_message() in src/codec.rs - complex parsing logic
|
||||
..."
|
||||
```
|
||||
AI: Uses rust-analyzer → "Found 3 fuzzable entry points..."
|
||||
|
||||
**Generate and validate harnesses:**
|
||||
```
|
||||
You: "Generate a fuzzing harness for the parse_input function"
|
||||
AI: Creates harness code, then uses execute_module("harness-validator")
|
||||
→ "Here's a harness that compiles successfully..."
|
||||
```
|
||||
|
||||
**Run continuous fuzzing:**
|
||||
```
|
||||
You: "Start fuzzing parse_input for 10 minutes"
|
||||
AI: Uses start_continuous_module("cargo-fuzzer") → "Started fuzzing session abc123"
|
||||
|
||||
You: "How's the fuzzing going?"
|
||||
AI: Uses get_continuous_status("abc123") → "Running for 5 minutes:
|
||||
- 150,000 executions
|
||||
- 2 crashes found
|
||||
- 45% edge coverage"
|
||||
|
||||
You: "Stop and show me the crashes"
|
||||
AI: Uses stop_continuous_module("abc123") → "Found 2 unique crashes..."
|
||||
AI: Uses cargo-fuzzer → "Fuzzing session started. 2 crashes found..."
|
||||
```
|
||||
|
||||
### Available MCP Tools
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `list_modules` | List all available security modules |
|
||||
| `execute_module` | Run a module once and get results |
|
||||
| `start_continuous_module` | Start a long-running module (e.g., fuzzing) |
|
||||
| `get_continuous_status` | Check status of a continuous session |
|
||||
| `stop_continuous_module` | Stop a continuous session |
|
||||
| `list_continuous_sessions` | List all active sessions |
|
||||
| `get_execution_results` | Retrieve results from an execution |
|
||||
| `execute_workflow` | Run a multi-step workflow |
|
||||
**Scan for vulnerabilities:**
|
||||
```
|
||||
You: "Scan this codebase with semgrep for security issues"
|
||||
AI: Uses semgrep-mcp → "Found 5 findings: 2 high severity SQL injection
|
||||
patterns, 3 medium severity hardcoded secrets..."
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## CLI Reference
|
||||
|
||||
> **Note:** The CLI is for advanced users. Most users should interact with FuzzForge through their AI assistant.
|
||||
### UI Command
|
||||
|
||||
```bash
|
||||
uv run fuzzforge ui # Launch the terminal dashboard
|
||||
```
|
||||
|
||||
### MCP Commands
|
||||
|
||||
```bash
|
||||
uv run fuzzforge mcp status # Check configuration status
|
||||
uv run fuzzforge mcp install <agent> # Install MCP config
|
||||
uv run fuzzforge mcp status # Check agent configuration status
|
||||
uv run fuzzforge mcp install <agent> # Install MCP config (copilot|claude-code|claude-desktop)
|
||||
uv run fuzzforge mcp uninstall <agent> # Remove MCP config
|
||||
uv run fuzzforge mcp generate <agent> # Preview config without installing
|
||||
```
|
||||
|
||||
### Module Commands
|
||||
|
||||
```bash
|
||||
uv run fuzzforge modules list # List available modules
|
||||
uv run fuzzforge modules info <module> # Show module details
|
||||
uv run fuzzforge modules run <module> --assets . # Run a module
|
||||
```
|
||||
|
||||
### Project Commands
|
||||
|
||||
```bash
|
||||
@@ -343,14 +405,20 @@ uv run fuzzforge project results <id> # Get execution results
|
||||
Configure FuzzForge using environment variables:
|
||||
|
||||
```bash
|
||||
# Project paths
|
||||
export FUZZFORGE_MODULES_PATH=/path/to/modules
|
||||
export FUZZFORGE_STORAGE_PATH=/path/to/storage
|
||||
# Override the FuzzForge installation root (auto-detected from cwd by default)
|
||||
export FUZZFORGE_ROOT=/path/to/fuzzforge_ai
|
||||
|
||||
# Override the user-global data directory (default: ~/.fuzzforge)
|
||||
# Useful for isolated testing without touching your real installation
|
||||
export FUZZFORGE_USER_DIR=/tmp/my-fuzzforge-test
|
||||
|
||||
# Storage path for projects and execution results (default: <workspace>/.fuzzforge/storage)
|
||||
export FUZZFORGE_STORAGE__PATH=/path/to/storage
|
||||
|
||||
# Container engine (Docker is default)
|
||||
export FUZZFORGE_ENGINE__TYPE=docker # or podman
|
||||
|
||||
# Podman-specific settings (only needed if using Podman under Snap)
|
||||
# Podman-specific container storage paths
|
||||
export FUZZFORGE_ENGINE__GRAPHROOT=~/.fuzzforge/containers/storage
|
||||
export FUZZFORGE_ENGINE__RUNROOT=~/.fuzzforge/containers/run
|
||||
```
|
||||
@@ -384,66 +452,62 @@ Error: Permission denied connecting to Docker socket
|
||||
|
||||
**Solution:**
|
||||
```bash
|
||||
# Add your user to the docker group
|
||||
sudo usermod -aG docker $USER
|
||||
|
||||
# Log out and back in for changes to take effect
|
||||
# Then verify:
|
||||
# Log out and back in, then verify:
|
||||
docker run --rm hello-world
|
||||
```
|
||||
|
||||
### No Modules Found
|
||||
### Hub Images Not Built
|
||||
|
||||
```
|
||||
No modules found.
|
||||
```
|
||||
The dashboard shows ✗ Not built for tools:
|
||||
|
||||
**Solution:**
|
||||
1. Build the modules first: `make build-modules`
|
||||
2. Check the modules path: `uv run fuzzforge modules list`
|
||||
3. Verify images exist: `docker images | grep fuzzforge`
|
||||
```bash
|
||||
# Build all hub images
|
||||
./scripts/build-hub-images.sh
|
||||
|
||||
# Or build a single tool
|
||||
docker build -t <tool-name>:latest mcp-security-hub/<category>/<tool-name>/
|
||||
```
|
||||
|
||||
### MCP Server Not Starting
|
||||
|
||||
Check the MCP configuration:
|
||||
```bash
|
||||
# Check agent configuration
|
||||
uv run fuzzforge mcp status
|
||||
```
|
||||
|
||||
Verify the configuration file path exists and contains valid JSON.
|
||||
|
||||
### Module Container Fails to Build
|
||||
|
||||
```bash
|
||||
# Build module container manually to see errors
|
||||
cd fuzzforge-modules/<module-name>
|
||||
docker build -t <module-name> .
|
||||
# Verify the config file path exists and contains valid JSON
|
||||
cat ~/.config/Code/User/mcp.json # Copilot
|
||||
cat ~/.claude.json # Claude Code
|
||||
```
|
||||
|
||||
### Using Podman Instead of Docker
|
||||
|
||||
If you prefer Podman:
|
||||
```bash
|
||||
# Use --engine podman with CLI
|
||||
# Install with Podman engine
|
||||
uv run fuzzforge mcp install copilot --engine podman
|
||||
|
||||
# Or set environment variable
|
||||
export FUZZFORGE_ENGINE=podman
|
||||
```
|
||||
|
||||
### Check Logs
|
||||
### Hub Registry
|
||||
|
||||
FuzzForge stores linked hub information in `~/.fuzzforge/hubs.json`. If something goes wrong:
|
||||
|
||||
FuzzForge stores execution logs in the storage directory:
|
||||
```bash
|
||||
ls -la ~/.fuzzforge/storage/<project-id>/<execution-id>/
|
||||
# View registry
|
||||
cat ~/.fuzzforge/hubs.json
|
||||
|
||||
# Reset registry
|
||||
rm ~/.fuzzforge/hubs.json
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Next Steps
|
||||
|
||||
- 📖 Read the [Module SDK Guide](fuzzforge-modules/fuzzforge-modules-sdk/README.md) to create custom modules
|
||||
- 🎬 Check the demos in the [README](README.md)
|
||||
- 🖥️ Launch `uv run fuzzforge ui` and explore the dashboard
|
||||
- 🔒 Clone the [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub) for 40+ security tools
|
||||
- 💬 Join our [Discord](https://discord.gg/8XEX33UUwZ) for support
|
||||
|
||||
---
|
||||
|
||||
@@ -8,6 +8,7 @@ requires-python = ">=3.14"
|
||||
dependencies = [
|
||||
"fuzzforge-mcp==0.0.1",
|
||||
"rich>=14.0.0",
|
||||
"textual>=1.0.0",
|
||||
"typer==0.20.1",
|
||||
]
|
||||
|
||||
|
||||
@@ -13,3 +13,49 @@ ignore = [
|
||||
"PLR2004", # allowing comparisons using unamed numerical constants in tests
|
||||
"S101", # allowing 'assert' statements in tests
|
||||
]
|
||||
"src/fuzzforge_cli/tui/**" = [
|
||||
"ARG002", # unused method argument: callback signature
|
||||
"BLE001", # blind exception: broad error handling in UI
|
||||
"C901", # complexity: UI logic
|
||||
"D107", # missing docstring in __init__: simple dataclasses
|
||||
"FBT001", # boolean positional arg
|
||||
"FBT002", # boolean default arg
|
||||
"PLC0415", # import outside top-level: lazy loading
|
||||
"PLR0911", # too many return statements
|
||||
"PLR0912", # too many branches
|
||||
"PLR2004", # magic value comparison
|
||||
"RUF012", # mutable class default: Textual pattern
|
||||
"S603", # subprocess: validated inputs
|
||||
"S607", # subprocess: PATH lookup
|
||||
"SIM108", # ternary: readability preference
|
||||
"TC001", # TYPE_CHECKING: runtime type needs
|
||||
"TC002", # TYPE_CHECKING: runtime type needs
|
||||
"TC003", # TYPE_CHECKING: runtime type needs
|
||||
"TRY300", # try-else: existing pattern
|
||||
]
|
||||
"tui/*.py" = [
|
||||
"D107", # missing docstring in __init__: simple dataclasses
|
||||
"TC001", # TYPE_CHECKING: runtime type needs
|
||||
"TC002", # TYPE_CHECKING: runtime type needs
|
||||
"TC003", # TYPE_CHECKING: runtime type needs
|
||||
]
|
||||
"src/fuzzforge_cli/commands/mcp.py" = [
|
||||
"ARG001", # unused argument: callback signature
|
||||
"B904", # raise from: existing pattern
|
||||
"F841", # unused variable: legacy code
|
||||
"FBT002", # boolean default arg
|
||||
"PLR0912", # too many branches
|
||||
"PLR0915", # too many statements
|
||||
"SIM108", # ternary: readability preference
|
||||
]
|
||||
"src/fuzzforge_cli/application.py" = [
|
||||
"B008", # function call in default: Path.cwd()
|
||||
"PLC0415", # import outside top-level: lazy loading
|
||||
]
|
||||
"src/fuzzforge_cli/commands/projects.py" = [
|
||||
"TC003", # TYPE_CHECKING: runtime type needs
|
||||
]
|
||||
"src/fuzzforge_cli/context.py" = [
|
||||
"TC002", # TYPE_CHECKING: runtime type needs
|
||||
"TC003", # TYPE_CHECKING: runtime type needs
|
||||
]
|
||||
|
||||
@@ -3,12 +3,12 @@
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
|
||||
from fuzzforge_mcp.storage import LocalStorage # type: ignore[import-untyped]
|
||||
from typer import Context as TyperContext
|
||||
from typer import Option, Typer
|
||||
|
||||
from fuzzforge_cli.commands import mcp, projects
|
||||
from fuzzforge_cli.context import Context
|
||||
from fuzzforge_mcp.storage import LocalStorage
|
||||
|
||||
application: Typer = Typer(
|
||||
name="fuzzforge",
|
||||
@@ -34,7 +34,7 @@ def main(
|
||||
envvar="FUZZFORGE_STORAGE__PATH",
|
||||
help="Path to the storage directory.",
|
||||
),
|
||||
] = Path.home() / ".fuzzforge" / "storage",
|
||||
] = Path.cwd() / ".fuzzforge" / "storage",
|
||||
context: TyperContext = None, # type: ignore[assignment]
|
||||
) -> None:
|
||||
"""FuzzForge AI - Security research orchestration platform.
|
||||
@@ -42,7 +42,7 @@ def main(
|
||||
Discover and execute MCP hub tools for security research.
|
||||
|
||||
"""
|
||||
storage = LocalStorage(storage_path=storage_path)
|
||||
storage = LocalStorage(base_path=storage_path)
|
||||
|
||||
context.obj = Context(
|
||||
storage=storage,
|
||||
@@ -52,3 +52,19 @@ def main(
|
||||
|
||||
application.add_typer(mcp.application)
|
||||
application.add_typer(projects.application)
|
||||
|
||||
|
||||
@application.command(
|
||||
name="ui",
|
||||
help="Launch the FuzzForge terminal interface.",
|
||||
)
|
||||
def launch_ui() -> None:
|
||||
"""Launch the interactive FuzzForge TUI dashboard.
|
||||
|
||||
Provides a visual dashboard showing AI agent connection status
|
||||
and hub server availability, with wizards for setup and configuration.
|
||||
|
||||
"""
|
||||
from fuzzforge_cli.tui.app import FuzzForgeApp
|
||||
|
||||
FuzzForgeApp().run()
|
||||
|
||||
@@ -12,7 +12,7 @@ import os
|
||||
import sys
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
from typing import Annotated, Any
|
||||
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
@@ -44,10 +44,10 @@ def _get_copilot_mcp_path() -> Path:
|
||||
"""
|
||||
if sys.platform == "darwin":
|
||||
return Path.home() / "Library" / "Application Support" / "Code" / "User" / "mcp.json"
|
||||
elif sys.platform == "win32":
|
||||
if sys.platform == "win32":
|
||||
return Path(os.environ.get("APPDATA", "")) / "Code" / "User" / "mcp.json"
|
||||
else: # Linux
|
||||
return Path.home() / ".config" / "Code" / "User" / "mcp.json"
|
||||
# Linux
|
||||
return Path.home() / ".config" / "Code" / "User" / "mcp.json"
|
||||
|
||||
|
||||
def _get_claude_desktop_mcp_path() -> Path:
|
||||
@@ -58,10 +58,10 @@ def _get_claude_desktop_mcp_path() -> Path:
|
||||
"""
|
||||
if sys.platform == "darwin":
|
||||
return Path.home() / "Library" / "Application Support" / "Claude" / "claude_desktop_config.json"
|
||||
elif sys.platform == "win32":
|
||||
if sys.platform == "win32":
|
||||
return Path(os.environ.get("APPDATA", "")) / "Claude" / "claude_desktop_config.json"
|
||||
else: # Linux
|
||||
return Path.home() / ".config" / "Claude" / "claude_desktop_config.json"
|
||||
# Linux
|
||||
return Path.home() / ".config" / "Claude" / "claude_desktop_config.json"
|
||||
|
||||
|
||||
def _get_claude_code_mcp_path(project_path: Path | None = None) -> Path:
|
||||
@@ -114,13 +114,13 @@ def _detect_docker_socket() -> str:
|
||||
:returns: Path to the Docker socket.
|
||||
|
||||
"""
|
||||
socket_paths = [
|
||||
"/var/run/docker.sock",
|
||||
socket_paths: list[Path] = [
|
||||
Path("/var/run/docker.sock"),
|
||||
Path.home() / ".docker" / "run" / "docker.sock",
|
||||
]
|
||||
|
||||
for path in socket_paths:
|
||||
if Path(path).exists():
|
||||
if path.exists():
|
||||
return str(path)
|
||||
|
||||
return "/var/run/docker.sock"
|
||||
@@ -132,15 +132,22 @@ def _find_fuzzforge_root() -> Path:
|
||||
:returns: Path to fuzzforge-oss directory.
|
||||
|
||||
"""
|
||||
# Try to find from current file location
|
||||
current = Path(__file__).resolve()
|
||||
# Check environment variable override first
|
||||
env_root = os.environ.get("FUZZFORGE_ROOT")
|
||||
if env_root:
|
||||
return Path(env_root).resolve()
|
||||
|
||||
# Walk up to find fuzzforge-oss root
|
||||
# Walk up from cwd to find a fuzzforge root (hub-config.json is the marker)
|
||||
for parent in [Path.cwd(), *Path.cwd().parents]:
|
||||
if (parent / "hub-config.json").is_file():
|
||||
return parent
|
||||
|
||||
# Fall back to __file__-based search (dev install inside fuzzforge-oss)
|
||||
current = Path(__file__).resolve()
|
||||
for parent in current.parents:
|
||||
if (parent / "fuzzforge-mcp").is_dir():
|
||||
return parent
|
||||
|
||||
# Fall back to cwd
|
||||
return Path.cwd()
|
||||
|
||||
|
||||
@@ -148,7 +155,7 @@ def _generate_mcp_config(
|
||||
fuzzforge_root: Path,
|
||||
engine_type: str,
|
||||
engine_socket: str,
|
||||
) -> dict:
|
||||
) -> dict[str, Any]:
|
||||
"""Generate MCP server configuration.
|
||||
|
||||
:param fuzzforge_root: Path to fuzzforge-oss installation.
|
||||
@@ -167,9 +174,12 @@ def _generate_mcp_config(
|
||||
command = "uv"
|
||||
args = ["--directory", str(fuzzforge_root), "run", "fuzzforge-mcp"]
|
||||
|
||||
# Self-contained storage paths for FuzzForge containers
|
||||
# This isolates FuzzForge from system Podman and avoids snap issues
|
||||
fuzzforge_home = Path.home() / ".fuzzforge"
|
||||
# User-global storage paths for FuzzForge containers.
|
||||
# Kept under ~/.fuzzforge so images are built once and shared across
|
||||
# all workspaces — regardless of where `fuzzforge mcp install` is run.
|
||||
# Override with FUZZFORGE_USER_DIR for isolated testing.
|
||||
user_dir_env = os.environ.get("FUZZFORGE_USER_DIR")
|
||||
fuzzforge_home = Path(user_dir_env).resolve() if user_dir_env else Path.home() / ".fuzzforge"
|
||||
graphroot = fuzzforge_home / "containers" / "storage"
|
||||
runroot = fuzzforge_home / "containers" / "run"
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
from fuzzforge_mcp.storage import LocalStorage
|
||||
from fuzzforge_mcp.storage import LocalStorage # type: ignore[import-untyped]
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typer import Context as TyperContext
|
||||
|
||||
1
fuzzforge-cli/src/fuzzforge_cli/tui/__init__.py
Normal file
1
fuzzforge-cli/src/fuzzforge_cli/tui/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""FuzzForge terminal user interface."""
|
||||
562
fuzzforge-cli/src/fuzzforge_cli/tui/app.py
Normal file
562
fuzzforge-cli/src/fuzzforge_cli/tui/app.py
Normal file
@@ -0,0 +1,562 @@
|
||||
"""FuzzForge TUI application.
|
||||
|
||||
Main terminal user interface for FuzzForge, providing a dashboard
|
||||
with AI agent connection status, hub server availability, and
|
||||
hub management capabilities.
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from rich.text import Text
|
||||
from textual import events, work
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.binding import Binding
|
||||
from textual.containers import Horizontal, Vertical, VerticalScroll
|
||||
from textual.message import Message
|
||||
from textual.widgets import Button, DataTable, Footer, Header
|
||||
|
||||
from fuzzforge_cli.tui.helpers import (
|
||||
check_agent_status,
|
||||
check_hub_image,
|
||||
find_fuzzforge_root,
|
||||
get_agent_configs,
|
||||
load_hub_config,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fuzzforge_cli.commands.mcp import AIAgent
|
||||
|
||||
# Agent config entries stored alongside their linked status for row mapping
|
||||
_AgentRow = tuple[str, "AIAgent", Path, str, bool]
|
||||
|
||||
|
||||
class SingleClickDataTable(DataTable[Any]):
|
||||
"""DataTable subclass that also fires ``RowClicked`` on a single mouse click.
|
||||
|
||||
Textual's built-in ``RowSelected`` only fires on Enter or on a second click
|
||||
of an already-highlighted row. ``RowClicked`` fires on every first click,
|
||||
enabling single-click-to-act UX without requiring Enter.
|
||||
"""
|
||||
|
||||
class RowClicked(Message):
|
||||
"""Fired on every single mouse click on a data row."""
|
||||
|
||||
def __init__(self, data_table: SingleClickDataTable, cursor_row: int) -> None:
|
||||
self.data_table = data_table
|
||||
self.cursor_row = cursor_row
|
||||
super().__init__()
|
||||
|
||||
@property
|
||||
def control(self) -> SingleClickDataTable:
|
||||
"""Return the data table that fired this event."""
|
||||
return self.data_table
|
||||
|
||||
async def _on_click(self, event: events.Click) -> None:
|
||||
"""Forward to parent, then post RowClicked on every mouse click.
|
||||
|
||||
The hub table is handled exclusively via RowClicked. RowSelected is
|
||||
intentionally NOT used for the hub table to avoid double-dispatch.
|
||||
"""
|
||||
await super()._on_click(event)
|
||||
meta = event.style.meta
|
||||
if meta and "row" in meta and self.cursor_type == "row":
|
||||
row_index: int = int(meta["row"])
|
||||
if row_index >= 0:
|
||||
self.post_message(SingleClickDataTable.RowClicked(self, row_index))
|
||||
|
||||
|
||||
class FuzzForgeApp(App[None]):
|
||||
"""FuzzForge AI terminal user interface."""
|
||||
|
||||
TITLE = "FuzzForge AI"
|
||||
SUB_TITLE = "Security Research Orchestration"
|
||||
|
||||
CSS = """
|
||||
Screen {
|
||||
background: $surface;
|
||||
}
|
||||
|
||||
#main {
|
||||
height: 1fr;
|
||||
margin: 1 2;
|
||||
}
|
||||
|
||||
.panel {
|
||||
width: 1fr;
|
||||
border: round #4699fc;
|
||||
padding: 1 2;
|
||||
margin: 0 0 1 0;
|
||||
}
|
||||
|
||||
#hub-panel {
|
||||
height: 12;
|
||||
}
|
||||
|
||||
#hub-table {
|
||||
height: 1fr;
|
||||
}
|
||||
|
||||
#agents-panel {
|
||||
height: auto;
|
||||
}
|
||||
|
||||
.panel-title {
|
||||
text-style: bold;
|
||||
color: #4699fc;
|
||||
text-align: left;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
#hub-title-bar {
|
||||
height: auto;
|
||||
align: center middle;
|
||||
margin: 0 0 1 0;
|
||||
}
|
||||
|
||||
#btn-hub-manager {
|
||||
min-width: 40;
|
||||
margin-right: 2;
|
||||
}
|
||||
|
||||
#btn-fuzzinglabs-hub {
|
||||
min-width: 30;
|
||||
}
|
||||
|
||||
#agents-table {
|
||||
height: auto;
|
||||
}
|
||||
|
||||
/* Modal screens */
|
||||
AgentSetupScreen, AgentUnlinkScreen,
|
||||
HubManagerScreen, LinkHubScreen, CloneHubScreen,
|
||||
BuildImageScreen, BuildLogScreen {
|
||||
align: center middle;
|
||||
}
|
||||
|
||||
#setup-dialog, #unlink-dialog {
|
||||
width: 56;
|
||||
height: auto;
|
||||
max-height: 80%;
|
||||
border: thick #4699fc;
|
||||
background: $surface;
|
||||
padding: 2 3;
|
||||
overflow-y: auto;
|
||||
}
|
||||
|
||||
#hub-manager-dialog {
|
||||
width: 100;
|
||||
height: auto;
|
||||
max-height: 85%;
|
||||
border: thick #4699fc;
|
||||
background: $surface;
|
||||
padding: 2 3;
|
||||
overflow-y: auto;
|
||||
}
|
||||
|
||||
#link-dialog, #clone-dialog {
|
||||
width: 72;
|
||||
height: auto;
|
||||
max-height: 80%;
|
||||
border: thick #4699fc;
|
||||
background: $surface;
|
||||
padding: 2 3;
|
||||
overflow-y: auto;
|
||||
}
|
||||
|
||||
#build-dialog {
|
||||
width: 72;
|
||||
height: auto;
|
||||
max-height: 80%;
|
||||
border: thick #4699fc;
|
||||
background: $surface;
|
||||
padding: 2 3;
|
||||
}
|
||||
|
||||
#confirm-text {
|
||||
margin: 1 0 2 0;
|
||||
}
|
||||
|
||||
#build-log {
|
||||
height: 30;
|
||||
border: round $panel;
|
||||
margin: 1 0;
|
||||
}
|
||||
|
||||
#build-subtitle {
|
||||
color: $text-muted;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
#build-status {
|
||||
height: 1;
|
||||
margin-top: 1;
|
||||
}
|
||||
|
||||
.dialog-title {
|
||||
text-style: bold;
|
||||
text-align: center;
|
||||
color: #4699fc;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
.field-label {
|
||||
margin-top: 1;
|
||||
text-style: bold;
|
||||
}
|
||||
|
||||
RadioSet {
|
||||
height: auto;
|
||||
margin: 0 0 1 2;
|
||||
}
|
||||
|
||||
Input {
|
||||
margin: 0 0 1 0;
|
||||
}
|
||||
|
||||
.dialog-buttons {
|
||||
layout: horizontal;
|
||||
height: 3;
|
||||
align: center middle;
|
||||
margin-top: 1;
|
||||
}
|
||||
|
||||
.dialog-buttons Button {
|
||||
margin: 0 1;
|
||||
min-width: 14;
|
||||
}
|
||||
"""
|
||||
|
||||
BINDINGS = [
|
||||
Binding("q", "quit", "Quit"),
|
||||
Binding("h", "manage_hubs", "Hub Manager"),
|
||||
Binding("r", "refresh", "Refresh"),
|
||||
Binding("enter", "select_row", "Select", show=False),
|
||||
]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Compose the dashboard layout."""
|
||||
yield Header()
|
||||
with VerticalScroll(id="main"):
|
||||
with Vertical(id="hub-panel", classes="panel"):
|
||||
yield SingleClickDataTable(id="hub-table")
|
||||
with Horizontal(id="hub-title-bar"):
|
||||
yield Button(
|
||||
"Hub Manager (h)",
|
||||
variant="primary",
|
||||
id="btn-hub-manager",
|
||||
)
|
||||
yield Button(
|
||||
"FuzzingLabs Hub",
|
||||
variant="primary",
|
||||
id="btn-fuzzinglabs-hub",
|
||||
)
|
||||
with Vertical(id="agents-panel", classes="panel"):
|
||||
yield DataTable(id="agents-table")
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Populate tables on startup."""
|
||||
self._agent_rows: list[_AgentRow] = []
|
||||
self._hub_rows: list[tuple[str, str, str, bool] | None] = []
|
||||
# Background build tracking
|
||||
self._active_builds: dict[str, object] = {} # image -> Popen
|
||||
self._build_logs: dict[str, list[str]] = {} # image -> log lines
|
||||
self._build_results: dict[str, bool] = {} # image -> success
|
||||
self.query_one("#hub-panel").border_title = "Hub Servers [dim](click ✗ Not built to build)[/dim]"
|
||||
self.query_one("#agents-panel").border_title = "AI Agents"
|
||||
self._refresh_agents()
|
||||
self._refresh_hub()
|
||||
|
||||
def _refresh_agents(self) -> None:
|
||||
"""Refresh the AI agents status table."""
|
||||
table = self.query_one("#agents-table", DataTable)
|
||||
table.clear(columns=True)
|
||||
table.add_columns("Agent", "Status", "Config Path")
|
||||
table.cursor_type = "row"
|
||||
|
||||
self._agent_rows = []
|
||||
for display_name, agent, config_path, servers_key in get_agent_configs():
|
||||
is_linked, status_text = check_agent_status(config_path, servers_key)
|
||||
if is_linked:
|
||||
status_cell = Text(f"✓ {status_text}", style="green")
|
||||
else:
|
||||
status_cell = Text(f"✗ {status_text}", style="red")
|
||||
table.add_row(display_name, status_cell, str(config_path))
|
||||
self._agent_rows.append(
|
||||
(display_name, agent, config_path, servers_key, is_linked)
|
||||
)
|
||||
|
||||
def _refresh_hub(self) -> None:
|
||||
"""Refresh the hub servers table, grouped by source hub."""
|
||||
self._hub_rows = []
|
||||
table = self.query_one("#hub-table", SingleClickDataTable)
|
||||
table.clear(columns=True)
|
||||
table.add_columns("Server", "Image", "Hub", "Status")
|
||||
table.cursor_type = "row"
|
||||
|
||||
try:
|
||||
fuzzforge_root = find_fuzzforge_root()
|
||||
hub_config = load_hub_config(fuzzforge_root)
|
||||
except Exception:
|
||||
table.add_row(
|
||||
Text("Error loading config", style="red"), "", "", ""
|
||||
)
|
||||
return
|
||||
|
||||
servers = hub_config.get("servers", [])
|
||||
if not servers:
|
||||
table.add_row(
|
||||
Text("No servers — press h", style="dim"), "", "", ""
|
||||
)
|
||||
return
|
||||
|
||||
# Group servers by source hub
|
||||
groups: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
||||
for server in servers:
|
||||
source = server.get("source_hub", "manual")
|
||||
groups[source].append(server)
|
||||
|
||||
for hub_name, hub_servers in groups.items():
|
||||
ready_count = 0
|
||||
total = len(hub_servers)
|
||||
|
||||
statuses: list[tuple[dict[str, Any], bool, str]] = []
|
||||
for server in hub_servers:
|
||||
enabled = server.get("enabled", True)
|
||||
if not enabled:
|
||||
statuses.append((server, False, "Disabled"))
|
||||
else:
|
||||
is_ready, status_text = check_hub_image(
|
||||
server.get("image", "")
|
||||
)
|
||||
if is_ready:
|
||||
ready_count += 1
|
||||
statuses.append((server, is_ready, status_text))
|
||||
|
||||
# Group header row
|
||||
if hub_name == "manual":
|
||||
header = Text(
|
||||
f"▼ 📦 Local config ({ready_count}/{total} ready)",
|
||||
style="bold",
|
||||
)
|
||||
else:
|
||||
header = Text(
|
||||
f"▼ 🔗 {hub_name} ({ready_count}/{total} ready)",
|
||||
style="bold",
|
||||
)
|
||||
table.add_row(header, "", "", "")
|
||||
self._hub_rows.append(None) # group header — not selectable
|
||||
|
||||
# Tool rows
|
||||
for server, is_ready, status_text in statuses:
|
||||
name = server.get("name", "unknown")
|
||||
image = server.get("image", "unknown")
|
||||
enabled = server.get("enabled", True)
|
||||
|
||||
if image in getattr(self, "_active_builds", {}):
|
||||
status_cell = Text("⏳ Building…", style="yellow")
|
||||
elif not enabled:
|
||||
status_cell = Text("Disabled", style="dim")
|
||||
elif is_ready:
|
||||
status_cell = Text("✓ Ready", style="green")
|
||||
else:
|
||||
status_cell = Text(f"✗ {status_text}", style="red dim")
|
||||
|
||||
table.add_row(
|
||||
f" {name}",
|
||||
Text(image, style="dim"),
|
||||
hub_name,
|
||||
status_cell,
|
||||
)
|
||||
self._hub_rows.append((name, image, hub_name, is_ready))
|
||||
|
||||
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
|
||||
"""Handle Enter-key row selection (agents table only).
|
||||
|
||||
Hub table uses RowClicked exclusively — wiring it to RowSelected too
|
||||
would cause a double push on every click since Textual 8 fires
|
||||
RowSelected on ALL clicks, not just second-click-on-same-row.
|
||||
"""
|
||||
if event.data_table.id == "agents-table":
|
||||
self._handle_agent_row(event.cursor_row)
|
||||
|
||||
def on_single_click_data_table_row_clicked(
|
||||
self, event: SingleClickDataTable.RowClicked
|
||||
) -> None:
|
||||
"""Handle single mouse-click on a hub table row."""
|
||||
if event.data_table.id == "hub-table":
|
||||
self._handle_hub_row(event.cursor_row)
|
||||
|
||||
def _handle_agent_row(self, idx: int) -> None:
|
||||
"""Open agent setup/unlink for the selected agent row."""
|
||||
if idx < 0 or idx >= len(self._agent_rows):
|
||||
return
|
||||
|
||||
display_name, agent, _config_path, _servers_key, is_linked = self._agent_rows[idx]
|
||||
|
||||
if is_linked:
|
||||
from fuzzforge_cli.tui.screens.agent_setup import AgentUnlinkScreen
|
||||
|
||||
self.push_screen(
|
||||
AgentUnlinkScreen(agent, display_name),
|
||||
callback=self._on_agent_changed,
|
||||
)
|
||||
else:
|
||||
from fuzzforge_cli.tui.screens.agent_setup import AgentSetupScreen
|
||||
|
||||
self.push_screen(
|
||||
AgentSetupScreen(agent, display_name),
|
||||
callback=self._on_agent_changed,
|
||||
)
|
||||
|
||||
def _handle_hub_row(self, idx: int) -> None:
|
||||
"""Handle a click on a hub table row."""
|
||||
# Guard: never push two build dialogs at once (double-click protection)
|
||||
if getattr(self, "_build_dialog_open", False):
|
||||
return
|
||||
|
||||
if idx < 0 or idx >= len(self._hub_rows):
|
||||
return
|
||||
row_data = self._hub_rows[idx]
|
||||
if row_data is None:
|
||||
return # group header row — ignore
|
||||
|
||||
server_name, image, hub_name, is_ready = row_data
|
||||
|
||||
# If a build is already running, open the live log viewer
|
||||
if image in self._active_builds:
|
||||
from fuzzforge_cli.tui.screens.build_log import BuildLogScreen
|
||||
self._build_dialog_open = True
|
||||
self.push_screen(
|
||||
BuildLogScreen(image),
|
||||
callback=lambda _: setattr(self, "_build_dialog_open", False),
|
||||
)
|
||||
return
|
||||
|
||||
if is_ready:
|
||||
self.notify(f"{image} is already built ✓", severity="information")
|
||||
return
|
||||
|
||||
if hub_name == "manual":
|
||||
self.notify("Manual servers must be built outside FuzzForge")
|
||||
return
|
||||
|
||||
from fuzzforge_cli.tui.screens.build_image import BuildImageScreen
|
||||
|
||||
self._build_dialog_open = True
|
||||
|
||||
def _on_build_dialog_done(result: bool | None) -> None:
|
||||
self._build_dialog_open = False
|
||||
if result is not None:
|
||||
self._on_build_confirmed(result, server_name, image, hub_name)
|
||||
|
||||
self.push_screen(
|
||||
BuildImageScreen(server_name, image, hub_name),
|
||||
callback=_on_build_dialog_done,
|
||||
)
|
||||
|
||||
def _on_build_confirmed(self, confirmed: bool, server_name: str, image: str, hub_name: str) -> None:
|
||||
"""Start a background build if the user confirmed."""
|
||||
if not confirmed:
|
||||
return
|
||||
self._build_logs[image] = []
|
||||
self._build_results.pop(image, None)
|
||||
self._active_builds[image] = True # mark as pending so ⏳ shows immediately
|
||||
self._refresh_hub() # show ⏳ Building… immediately
|
||||
self._run_build(server_name, image, hub_name)
|
||||
|
||||
@work(thread=True)
|
||||
def _run_build(self, server_name: str, image: str, hub_name: str) -> None:
|
||||
"""Build a Docker/Podman image in a background thread."""
|
||||
from fuzzforge_cli.tui.helpers import build_image, find_dockerfile_for_server
|
||||
|
||||
logs = self._build_logs.setdefault(image, [])
|
||||
|
||||
dockerfile = find_dockerfile_for_server(server_name, hub_name)
|
||||
if dockerfile is None:
|
||||
logs.append(f"ERROR: Dockerfile not found for '{server_name}' in hub '{hub_name}'")
|
||||
self._build_results[image] = False
|
||||
self._active_builds.pop(image, None)
|
||||
self.call_from_thread(self._on_build_done, image, success=False)
|
||||
return
|
||||
|
||||
logs.append(f"Building {image} from {dockerfile.parent}")
|
||||
logs.append("")
|
||||
|
||||
try:
|
||||
proc = build_image(image, dockerfile)
|
||||
except FileNotFoundError as exc:
|
||||
logs.append(f"ERROR: {exc}")
|
||||
self._build_results[image] = False
|
||||
self._active_builds.pop(image, None)
|
||||
self.call_from_thread(self._on_build_done, image, success=False)
|
||||
return
|
||||
|
||||
self._active_builds[image] = proc # replace pending marker with actual process
|
||||
self.call_from_thread(self._refresh_hub) # show ⏳ in table
|
||||
|
||||
if proc.stdout is None:
|
||||
return
|
||||
for line in proc.stdout:
|
||||
logs.append(line.rstrip())
|
||||
|
||||
proc.wait()
|
||||
self._active_builds.pop(image, None)
|
||||
success = proc.returncode == 0
|
||||
self._build_results[image] = success
|
||||
self.call_from_thread(self._on_build_done, image, success=success)
|
||||
|
||||
def _on_build_done(self, image: str, *, success: bool) -> None:
|
||||
"""Handle completion of a background build on the main thread."""
|
||||
self._refresh_hub()
|
||||
if success:
|
||||
self.notify(f"✓ {image} built successfully", severity="information")
|
||||
else:
|
||||
self.notify(f"✗ {image} build failed — click row for log", severity="error")
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle button presses."""
|
||||
if event.button.id == "btn-hub-manager":
|
||||
self.action_manage_hubs()
|
||||
elif event.button.id == "btn-fuzzinglabs-hub":
|
||||
self.action_add_fuzzinglabs_hub()
|
||||
|
||||
def action_add_fuzzinglabs_hub(self) -> None:
|
||||
"""Open the clone dialog pre-filled with the FuzzingLabs hub URL."""
|
||||
from fuzzforge_cli.tui.screens.hub_manager import CloneHubScreen
|
||||
|
||||
self.push_screen(
|
||||
CloneHubScreen(
|
||||
default_url="https://github.com/FuzzingLabs/mcp-security-hub",
|
||||
default_name="mcp-security-hub",
|
||||
is_default=True,
|
||||
),
|
||||
callback=self._on_hub_changed,
|
||||
)
|
||||
|
||||
def action_manage_hubs(self) -> None:
|
||||
"""Open the hub manager."""
|
||||
from fuzzforge_cli.tui.screens.hub_manager import HubManagerScreen
|
||||
|
||||
self.push_screen(HubManagerScreen(), callback=self._on_hub_changed)
|
||||
|
||||
def _on_agent_changed(self, result: str | None) -> None:
|
||||
"""Handle agent setup/unlink completion."""
|
||||
if result:
|
||||
self.notify(result)
|
||||
self._refresh_agents()
|
||||
|
||||
def _on_hub_changed(self, result: str | None) -> None:
|
||||
"""Handle hub manager completion — refresh the hub table."""
|
||||
self._refresh_hub()
|
||||
|
||||
def action_refresh(self) -> None:
|
||||
"""Refresh all status panels."""
|
||||
self._refresh_agents()
|
||||
self._refresh_hub()
|
||||
self.notify("Status refreshed")
|
||||
687
fuzzforge-cli/src/fuzzforge_cli/tui/helpers.py
Normal file
687
fuzzforge-cli/src/fuzzforge_cli/tui/helpers.py
Normal file
@@ -0,0 +1,687 @@
|
||||
"""Shared helpers for FuzzForge TUI and CLI.
|
||||
|
||||
Provides utility functions for checking AI agent configuration status,
|
||||
hub server image availability, installing/removing MCP configurations,
|
||||
and managing linked MCP hub repositories.
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from fuzzforge_cli.commands.mcp import (
|
||||
AIAgent,
|
||||
_detect_docker_socket,
|
||||
_detect_podman_socket,
|
||||
_find_fuzzforge_root,
|
||||
_generate_mcp_config,
|
||||
_get_claude_code_user_mcp_path,
|
||||
_get_claude_desktop_mcp_path,
|
||||
_get_copilot_mcp_path,
|
||||
)
|
||||
|
||||
# --- Hub Management Constants ---
|
||||
|
||||
FUZZFORGE_DEFAULT_HUB_URL = "git@github.com:FuzzingLabs/mcp-security-hub.git"
|
||||
FUZZFORGE_DEFAULT_HUB_NAME = "mcp-security-hub"
|
||||
|
||||
|
||||
def get_fuzzforge_user_dir() -> Path:
|
||||
"""Return the user-global ``~/.fuzzforge/`` directory.
|
||||
|
||||
Stores data that is shared across all workspaces: cloned hub
|
||||
repositories, the hub registry, container storage (graphroot/runroot),
|
||||
and the hub workspace volume.
|
||||
|
||||
Override with the ``FUZZFORGE_USER_DIR`` environment variable to
|
||||
redirect all user-global data to a custom path — useful for testing
|
||||
a fresh install without touching the real ``~/.fuzzforge/``.
|
||||
|
||||
:return: ``Path.home() / ".fuzzforge"`` or ``$FUZZFORGE_USER_DIR``
|
||||
|
||||
"""
|
||||
env_dir = os.environ.get("FUZZFORGE_USER_DIR")
|
||||
if env_dir:
|
||||
return Path(env_dir).resolve()
|
||||
return Path.home() / ".fuzzforge"
|
||||
|
||||
|
||||
def get_fuzzforge_dir() -> Path:
|
||||
"""Return the project-local ``.fuzzforge/`` directory.
|
||||
|
||||
Stores data that is specific to the current workspace: fuzzing
|
||||
results and project artifacts. Similar to how ``.git/`` scopes
|
||||
version-control data to a single project.
|
||||
|
||||
:return: ``Path.cwd() / ".fuzzforge"``
|
||||
|
||||
"""
|
||||
return Path.cwd() / ".fuzzforge"
|
||||
|
||||
# Categories that typically need NET_RAW capability for network access
|
||||
_NET_RAW_CATEGORIES = {"reconnaissance", "web-security"}
|
||||
|
||||
# Directories to skip when scanning a hub for MCP tool Dockerfiles
|
||||
_SCAN_SKIP_DIRS = {
|
||||
".git",
|
||||
".github",
|
||||
"scripts",
|
||||
"tests",
|
||||
"examples",
|
||||
"meta",
|
||||
"__pycache__",
|
||||
"node_modules",
|
||||
".venv",
|
||||
}
|
||||
|
||||
|
||||
def get_agent_configs() -> list[tuple[str, AIAgent, Path, str]]:
|
||||
"""Return agent display configs with resolved paths.
|
||||
|
||||
Each tuple contains:
|
||||
- Display name
|
||||
- AIAgent enum value
|
||||
- Config file path
|
||||
- Servers JSON key
|
||||
|
||||
:return: List of agent configuration tuples.
|
||||
|
||||
"""
|
||||
return [
|
||||
("GitHub Copilot", AIAgent.COPILOT, _get_copilot_mcp_path(), "servers"),
|
||||
("Claude Desktop", AIAgent.CLAUDE_DESKTOP, _get_claude_desktop_mcp_path(), "mcpServers"),
|
||||
("Claude Code", AIAgent.CLAUDE_CODE, _get_claude_code_user_mcp_path(), "mcpServers"),
|
||||
]
|
||||
|
||||
|
||||
def check_agent_status(config_path: Path, servers_key: str) -> tuple[bool, str]:
|
||||
"""Check whether an AI agent has FuzzForge configured.
|
||||
|
||||
:param config_path: Path to the agent's MCP config file.
|
||||
:param servers_key: JSON key for the servers dict (e.g. "servers" or "mcpServers").
|
||||
:return: Tuple of (is_linked, status_description).
|
||||
|
||||
"""
|
||||
if not config_path.exists():
|
||||
return False, "Not configured"
|
||||
try:
|
||||
config = json.loads(config_path.read_text())
|
||||
servers = config.get(servers_key, {})
|
||||
if "fuzzforge" in servers:
|
||||
return True, "Linked"
|
||||
return False, "Config exists, not linked"
|
||||
except json.JSONDecodeError:
|
||||
return False, "Invalid config file"
|
||||
|
||||
|
||||
def check_hub_image(image: str) -> tuple[bool, str]:
|
||||
"""Check whether a container image exists locally.
|
||||
|
||||
Respects the ``FUZZFORGE_ENGINE__TYPE`` environment variable so that
|
||||
Podman users see the correct build status instead of always "Not built".
|
||||
|
||||
:param image: Image name (e.g. "semgrep-mcp:latest").
|
||||
:return: Tuple of (is_ready, status_description).
|
||||
|
||||
"""
|
||||
engine = os.environ.get("FUZZFORGE_ENGINE__TYPE", "docker").lower()
|
||||
cmd = "podman" if engine == "podman" else "docker"
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[cmd, "image", "inspect", image],
|
||||
check=False, capture_output=True,
|
||||
text=True,
|
||||
timeout=5,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return True, "Ready"
|
||||
return False, "Not built"
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, "Timeout"
|
||||
except FileNotFoundError:
|
||||
return False, f"{cmd} not found"
|
||||
|
||||
|
||||
def load_hub_config(fuzzforge_root: Path) -> dict[str, Any]:
|
||||
"""Load hub-config.json from the FuzzForge root.
|
||||
|
||||
:param fuzzforge_root: Path to fuzzforge-oss directory.
|
||||
:return: Parsed hub configuration dict, empty dict on error.
|
||||
|
||||
"""
|
||||
config_path = fuzzforge_root / "hub-config.json"
|
||||
if not config_path.exists():
|
||||
return {}
|
||||
try:
|
||||
data: dict[str, Any] = json.loads(config_path.read_text())
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
return {}
|
||||
|
||||
|
||||
def find_fuzzforge_root() -> Path:
|
||||
"""Find the FuzzForge installation root directory.
|
||||
|
||||
:return: Path to the fuzzforge-oss directory.
|
||||
|
||||
"""
|
||||
return _find_fuzzforge_root()
|
||||
|
||||
|
||||
def install_agent_config(agent: AIAgent, engine: str, force: bool = False) -> str:
|
||||
"""Install FuzzForge MCP configuration for an AI agent.
|
||||
|
||||
:param agent: Target AI agent.
|
||||
:param engine: Container engine type ("docker" or "podman").
|
||||
:param force: Overwrite existing configuration.
|
||||
:return: Result message string.
|
||||
|
||||
"""
|
||||
fuzzforge_root = _find_fuzzforge_root()
|
||||
|
||||
if agent == AIAgent.COPILOT:
|
||||
config_path = _get_copilot_mcp_path()
|
||||
servers_key = "servers"
|
||||
elif agent == AIAgent.CLAUDE_CODE:
|
||||
config_path = _get_claude_code_user_mcp_path()
|
||||
servers_key = "mcpServers"
|
||||
else:
|
||||
config_path = _get_claude_desktop_mcp_path()
|
||||
servers_key = "mcpServers"
|
||||
|
||||
socket = _detect_docker_socket() if engine == "docker" else _detect_podman_socket()
|
||||
|
||||
server_config = _generate_mcp_config(
|
||||
fuzzforge_root=fuzzforge_root,
|
||||
engine_type=engine,
|
||||
engine_socket=socket,
|
||||
)
|
||||
|
||||
if config_path.exists():
|
||||
try:
|
||||
existing = json.loads(config_path.read_text())
|
||||
except json.JSONDecodeError:
|
||||
return f"Error: Invalid JSON in {config_path}"
|
||||
|
||||
servers = existing.get(servers_key, {})
|
||||
if "fuzzforge" in servers and not force:
|
||||
return "Already configured (use force to overwrite)"
|
||||
|
||||
if servers_key not in existing:
|
||||
existing[servers_key] = {}
|
||||
existing[servers_key]["fuzzforge"] = server_config
|
||||
full_config = existing
|
||||
else:
|
||||
config_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
full_config = {servers_key: {"fuzzforge": server_config}}
|
||||
|
||||
config_path.write_text(json.dumps(full_config, indent=4))
|
||||
return f"Installed FuzzForge for {agent.value}"
|
||||
|
||||
|
||||
def uninstall_agent_config(agent: AIAgent) -> str:
|
||||
"""Remove FuzzForge MCP configuration from an AI agent.
|
||||
|
||||
:param agent: Target AI agent.
|
||||
:return: Result message string.
|
||||
|
||||
"""
|
||||
if agent == AIAgent.COPILOT:
|
||||
config_path = _get_copilot_mcp_path()
|
||||
servers_key = "servers"
|
||||
elif agent == AIAgent.CLAUDE_CODE:
|
||||
config_path = _get_claude_code_user_mcp_path()
|
||||
servers_key = "mcpServers"
|
||||
else:
|
||||
config_path = _get_claude_desktop_mcp_path()
|
||||
servers_key = "mcpServers"
|
||||
|
||||
if not config_path.exists():
|
||||
return "Configuration file not found"
|
||||
|
||||
try:
|
||||
config = json.loads(config_path.read_text())
|
||||
except json.JSONDecodeError:
|
||||
return "Error: Invalid JSON in config file"
|
||||
|
||||
servers = config.get(servers_key, {})
|
||||
if "fuzzforge" not in servers:
|
||||
return "FuzzForge is not configured for this agent"
|
||||
|
||||
del servers["fuzzforge"]
|
||||
config_path.write_text(json.dumps(config, indent=4))
|
||||
return f"Removed FuzzForge from {agent.value}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Hub Management
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def get_hubs_registry_path() -> Path:
|
||||
"""Return path to the hubs registry file (``~/.fuzzforge/hubs.json``).
|
||||
|
||||
Stored in the user-global directory so the registry is shared across
|
||||
all workspaces.
|
||||
|
||||
:return: Path to the registry JSON file.
|
||||
|
||||
"""
|
||||
return get_fuzzforge_user_dir() / "hubs.json"
|
||||
|
||||
|
||||
def get_default_hubs_dir() -> Path:
|
||||
"""Return default directory for cloned hubs (``~/.fuzzforge/hubs/``).
|
||||
|
||||
Stored in the user-global directory so hubs are cloned once and
|
||||
reused in every workspace.
|
||||
|
||||
:return: Path to the default hubs directory.
|
||||
|
||||
"""
|
||||
return get_fuzzforge_user_dir() / "hubs"
|
||||
|
||||
|
||||
def _discover_hub_dirs() -> list[Path]:
|
||||
"""Scan known hub directories for cloned repos.
|
||||
|
||||
Checks both the current global location (``~/.fuzzforge/hubs/``) and the
|
||||
legacy workspace-local location (``<cwd>/.fuzzforge/hubs/``) so that hubs
|
||||
cloned before the global-dir migration are still found.
|
||||
|
||||
:return: List of hub directory paths (each is a direct child with a ``.git``
|
||||
sub-directory).
|
||||
|
||||
"""
|
||||
candidates: list[Path] = []
|
||||
for base in (get_fuzzforge_user_dir() / "hubs", get_fuzzforge_dir() / "hubs"):
|
||||
if base.is_dir():
|
||||
candidates.extend(
|
||||
entry for entry in base.iterdir()
|
||||
if entry.is_dir() and (entry / ".git").is_dir()
|
||||
)
|
||||
return candidates
|
||||
|
||||
|
||||
def load_hubs_registry() -> dict[str, Any]:
|
||||
"""Load the hubs registry from disk.
|
||||
|
||||
If the registry file does not exist, auto-recovers it by scanning known hub
|
||||
directories and rebuilding entries for any discovered hubs. This handles
|
||||
the migration from the old workspace-local ``<cwd>/.fuzzforge/hubs.json``
|
||||
path to the global ``~/.fuzzforge/hubs.json`` path, as well as any case
|
||||
where the registry was lost.
|
||||
|
||||
:return: Registry dict with ``hubs`` key containing a list of hub entries.
|
||||
|
||||
"""
|
||||
path = get_hubs_registry_path()
|
||||
if path.exists():
|
||||
try:
|
||||
data: dict[str, Any] = json.loads(path.read_text())
|
||||
return data
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
|
||||
# Registry missing — attempt to rebuild from discovered hub directories.
|
||||
discovered = _discover_hub_dirs()
|
||||
if not discovered:
|
||||
return {"hubs": []}
|
||||
|
||||
hubs: list[dict[str, Any]] = []
|
||||
for hub_dir in discovered:
|
||||
name = hub_dir.name
|
||||
# Try to read the git remote URL
|
||||
git_url: str = ""
|
||||
try:
|
||||
import subprocess as _sp
|
||||
r = _sp.run(
|
||||
["git", "-C", str(hub_dir), "remote", "get-url", "origin"],
|
||||
check=False, capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
if r.returncode == 0:
|
||||
git_url = r.stdout.strip()
|
||||
except Exception: # noqa: S110 - git URL is optional, failure is acceptable
|
||||
pass
|
||||
hubs.append({
|
||||
"name": name,
|
||||
"path": str(hub_dir),
|
||||
"git_url": git_url,
|
||||
"is_default": name == FUZZFORGE_DEFAULT_HUB_NAME,
|
||||
})
|
||||
|
||||
registry: dict[str, Any] = {"hubs": hubs}
|
||||
# Persist so we don't re-scan on every load
|
||||
with contextlib.suppress(OSError):
|
||||
save_hubs_registry(registry)
|
||||
return registry
|
||||
|
||||
|
||||
def save_hubs_registry(registry: dict[str, Any]) -> None:
|
||||
"""Save the hubs registry to disk.
|
||||
|
||||
:param registry: Registry dict to persist.
|
||||
|
||||
"""
|
||||
path = get_hubs_registry_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(registry, indent=2))
|
||||
|
||||
|
||||
def scan_hub_for_servers(hub_path: Path) -> list[dict[str, Any]]:
|
||||
"""Scan a hub directory for MCP tool Dockerfiles.
|
||||
|
||||
Looks for the ``category/tool-name/Dockerfile`` pattern and generates
|
||||
a server configuration entry for each discovered tool.
|
||||
|
||||
:param hub_path: Root directory of the hub repository.
|
||||
:return: Sorted list of server configuration dicts.
|
||||
|
||||
"""
|
||||
servers: list[dict[str, Any]] = []
|
||||
|
||||
if not hub_path.is_dir():
|
||||
return servers
|
||||
|
||||
for dockerfile in sorted(hub_path.rglob("Dockerfile")):
|
||||
rel = dockerfile.relative_to(hub_path)
|
||||
parts = rel.parts
|
||||
|
||||
# Expected layout: category/tool-name/Dockerfile (exactly 3 parts)
|
||||
if len(parts) != 3:
|
||||
continue
|
||||
|
||||
category, tool_name, _ = parts
|
||||
|
||||
if category in _SCAN_SKIP_DIRS:
|
||||
continue
|
||||
|
||||
capabilities: list[str] = []
|
||||
if category in _NET_RAW_CATEGORIES:
|
||||
capabilities = ["NET_RAW"]
|
||||
|
||||
servers.append(
|
||||
{
|
||||
"name": tool_name,
|
||||
"description": f"{tool_name} — {category}",
|
||||
"type": "docker",
|
||||
"image": f"{tool_name}:latest",
|
||||
"category": category,
|
||||
"capabilities": capabilities,
|
||||
"volumes": [f"{get_fuzzforge_user_dir()}/hub/workspace:/data"],
|
||||
"enabled": True,
|
||||
}
|
||||
)
|
||||
|
||||
return servers
|
||||
|
||||
|
||||
def link_hub(
|
||||
name: str,
|
||||
path: str | Path,
|
||||
git_url: str | None = None,
|
||||
is_default: bool = False,
|
||||
) -> str:
|
||||
"""Link a hub directory and add its servers to hub-config.json.
|
||||
|
||||
:param name: Display name for the hub.
|
||||
:param path: Local directory path containing the hub.
|
||||
:param git_url: Optional git remote URL (for tracking).
|
||||
:param is_default: Whether this is the default FuzzingLabs hub.
|
||||
:return: Result message string.
|
||||
|
||||
"""
|
||||
hub_path = Path(path).resolve()
|
||||
|
||||
if not hub_path.is_dir():
|
||||
return f"Error: directory not found: {hub_path}"
|
||||
|
||||
# Update registry
|
||||
registry = load_hubs_registry()
|
||||
hubs = registry.get("hubs", [])
|
||||
|
||||
# Remove existing entry with same name
|
||||
hubs = [h for h in hubs if h.get("name") != name]
|
||||
|
||||
hubs.append(
|
||||
{
|
||||
"name": name,
|
||||
"path": str(hub_path),
|
||||
"git_url": git_url,
|
||||
"is_default": is_default,
|
||||
}
|
||||
)
|
||||
|
||||
registry["hubs"] = hubs
|
||||
save_hubs_registry(registry)
|
||||
|
||||
# Scan and update hub-config.json
|
||||
scanned = scan_hub_for_servers(hub_path)
|
||||
if not scanned:
|
||||
return f"Linked '{name}' (0 servers found)"
|
||||
|
||||
try:
|
||||
added = _merge_servers_into_hub_config(name, scanned)
|
||||
except Exception as exc:
|
||||
return f"Linked '{name}' but config update failed: {exc}"
|
||||
|
||||
return f"Linked '{name}' — {added} new servers added ({len(scanned)} scanned)"
|
||||
|
||||
|
||||
def unlink_hub(name: str) -> str:
|
||||
"""Unlink a hub and remove its servers from hub-config.json.
|
||||
|
||||
:param name: Name of the hub to unlink.
|
||||
:return: Result message string.
|
||||
|
||||
"""
|
||||
registry = load_hubs_registry()
|
||||
hubs = registry.get("hubs", [])
|
||||
|
||||
if not any(h.get("name") == name for h in hubs):
|
||||
return f"Hub '{name}' is not linked"
|
||||
|
||||
hubs = [h for h in hubs if h.get("name") != name]
|
||||
registry["hubs"] = hubs
|
||||
save_hubs_registry(registry)
|
||||
|
||||
try:
|
||||
removed = _remove_hub_servers_from_config(name)
|
||||
except Exception:
|
||||
removed = 0
|
||||
|
||||
return f"Unlinked '{name}' — {removed} server(s) removed"
|
||||
|
||||
|
||||
def clone_hub(
|
||||
git_url: str,
|
||||
dest: Path | None = None,
|
||||
name: str | None = None,
|
||||
) -> tuple[bool, str, Path | None]:
|
||||
"""Clone a git hub repository.
|
||||
|
||||
If the destination already exists and is a git repo, pulls instead.
|
||||
|
||||
:param git_url: Git remote URL to clone.
|
||||
:param dest: Destination directory (auto-derived from URL if *None*).
|
||||
:param name: Hub name (auto-derived from URL if *None*).
|
||||
:return: Tuple of ``(success, message, clone_path)``.
|
||||
|
||||
"""
|
||||
if name is None:
|
||||
name = git_url.rstrip("/").split("/")[-1]
|
||||
name = name.removesuffix(".git")
|
||||
|
||||
if dest is None:
|
||||
dest = get_default_hubs_dir() / name
|
||||
|
||||
if dest.exists():
|
||||
if (dest / ".git").is_dir():
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "-C", str(dest), "pull"],
|
||||
check=False, capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return True, f"Updated existing clone at {dest}", dest
|
||||
return False, f"Git pull failed: {result.stderr.strip()}", None
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, "Git pull timed out", None
|
||||
except FileNotFoundError:
|
||||
return False, "Git not found", None
|
||||
return False, f"Directory already exists (not a git repo): {dest}", None
|
||||
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "clone", git_url, str(dest)],
|
||||
check=False, capture_output=True,
|
||||
text=True,
|
||||
timeout=300,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return True, f"Cloned to {dest}", dest
|
||||
return False, f"Git clone failed: {result.stderr.strip()}", None
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, "Git clone timed out (5 min limit)", None
|
||||
except FileNotFoundError:
|
||||
return False, "Git not found on PATH", None
|
||||
|
||||
|
||||
def _merge_servers_into_hub_config(
|
||||
hub_name: str,
|
||||
servers: list[dict[str, Any]],
|
||||
) -> int:
|
||||
"""Merge scanned servers into hub-config.json.
|
||||
|
||||
Only adds servers whose name does not already exist in the config.
|
||||
New entries are tagged with ``source_hub`` for later removal.
|
||||
|
||||
:param hub_name: Name of the source hub (used for tagging).
|
||||
:param servers: List of server dicts from :func:`scan_hub_for_servers`.
|
||||
:return: Number of newly added servers.
|
||||
|
||||
"""
|
||||
fuzzforge_root = find_fuzzforge_root()
|
||||
config_path = fuzzforge_root / "hub-config.json"
|
||||
|
||||
if config_path.exists():
|
||||
try:
|
||||
config = json.loads(config_path.read_text())
|
||||
except json.JSONDecodeError:
|
||||
config = {"servers": [], "default_timeout": 300, "cache_tools": True}
|
||||
else:
|
||||
config = {"servers": [], "default_timeout": 300, "cache_tools": True}
|
||||
|
||||
existing = config.get("servers", [])
|
||||
existing_names = {s.get("name") for s in existing}
|
||||
|
||||
added = 0
|
||||
for server in servers:
|
||||
if server["name"] not in existing_names:
|
||||
server["source_hub"] = hub_name
|
||||
existing.append(server)
|
||||
existing_names.add(server["name"])
|
||||
added += 1
|
||||
|
||||
config["servers"] = existing
|
||||
config_path.write_text(json.dumps(config, indent=2))
|
||||
return added
|
||||
|
||||
|
||||
def _remove_hub_servers_from_config(hub_name: str) -> int:
|
||||
"""Remove servers belonging to a hub from hub-config.json.
|
||||
|
||||
Only removes servers tagged with the given ``source_hub`` value.
|
||||
Manually-added servers (without a tag) are preserved.
|
||||
|
||||
:param hub_name: Name of the hub whose servers should be removed.
|
||||
:return: Number of servers removed.
|
||||
|
||||
"""
|
||||
fuzzforge_root = find_fuzzforge_root()
|
||||
config_path = fuzzforge_root / "hub-config.json"
|
||||
|
||||
if not config_path.exists():
|
||||
return 0
|
||||
|
||||
try:
|
||||
config = json.loads(config_path.read_text())
|
||||
except json.JSONDecodeError:
|
||||
return 0
|
||||
|
||||
existing = config.get("servers", [])
|
||||
before = len(existing)
|
||||
config["servers"] = [s for s in existing if s.get("source_hub") != hub_name]
|
||||
after = len(config["servers"])
|
||||
|
||||
config_path.write_text(json.dumps(config, indent=2))
|
||||
return before - after
|
||||
|
||||
|
||||
def find_dockerfile_for_server(server_name: str, hub_name: str) -> Path | None:
|
||||
"""Find the Dockerfile for a hub server tool.
|
||||
|
||||
Looks up the hub path from the registry, then scans for
|
||||
``category/<server_name>/Dockerfile``.
|
||||
|
||||
:param server_name: Tool name (e.g. ``"nmap-mcp"``).
|
||||
:param hub_name: Hub name as stored in the registry.
|
||||
:return: Absolute path to the Dockerfile, or ``None`` if not found.
|
||||
|
||||
"""
|
||||
registry = load_hubs_registry()
|
||||
hub_entry = next(
|
||||
(h for h in registry.get("hubs", []) if h.get("name") == hub_name),
|
||||
None,
|
||||
)
|
||||
if not hub_entry:
|
||||
return None
|
||||
|
||||
hub_path = Path(hub_entry["path"])
|
||||
for dockerfile in hub_path.rglob("Dockerfile"):
|
||||
rel = dockerfile.relative_to(hub_path)
|
||||
parts = rel.parts
|
||||
if len(parts) == 3 and parts[1] == server_name:
|
||||
return dockerfile
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def build_image(
|
||||
image: str,
|
||||
dockerfile: Path,
|
||||
*,
|
||||
engine: str | None = None,
|
||||
) -> subprocess.Popen[str]:
|
||||
"""Start a non-blocking ``docker/podman build`` subprocess.
|
||||
|
||||
Returns the running :class:`subprocess.Popen` object so the caller
|
||||
can stream ``stdout`` / ``stderr`` lines incrementally.
|
||||
|
||||
:param image: Image tag (e.g. ``"nmap-mcp:latest"``).
|
||||
:param dockerfile: Path to the ``Dockerfile``.
|
||||
:param engine: ``"docker"`` or ``"podman"`` (auto-detected if ``None``).
|
||||
:return: Running subprocess with merged stdout+stderr.
|
||||
|
||||
"""
|
||||
if engine is None:
|
||||
engine = os.environ.get("FUZZFORGE_ENGINE__TYPE", "docker").lower()
|
||||
engine = "podman" if engine == "podman" else "docker"
|
||||
|
||||
context_dir = str(dockerfile.parent)
|
||||
return subprocess.Popen(
|
||||
[engine, "build", "-t", image, context_dir],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
)
|
||||
1
fuzzforge-cli/src/fuzzforge_cli/tui/screens/__init__.py
Normal file
1
fuzzforge-cli/src/fuzzforge_cli/tui/screens/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""TUI screens for FuzzForge."""
|
||||
96
fuzzforge-cli/src/fuzzforge_cli/tui/screens/agent_setup.py
Normal file
96
fuzzforge-cli/src/fuzzforge_cli/tui/screens/agent_setup.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""Agent setup and unlink modal screens for FuzzForge TUI.
|
||||
|
||||
Provides context-aware modals that receive the target agent directly
|
||||
from the dashboard row selection — no redundant agent picker needed.
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Horizontal, Vertical
|
||||
from textual.screen import ModalScreen
|
||||
from textual.widgets import Button, Label, RadioButton, RadioSet
|
||||
|
||||
from fuzzforge_cli.commands.mcp import AIAgent
|
||||
from fuzzforge_cli.tui.helpers import install_agent_config, uninstall_agent_config
|
||||
|
||||
|
||||
class AgentSetupScreen(ModalScreen[str | None]):
|
||||
"""Modal for linking a specific agent — only asks for engine choice."""
|
||||
|
||||
BINDINGS = [("escape", "cancel", "Cancel")]
|
||||
|
||||
def __init__(self, agent: AIAgent, display_name: str) -> None:
|
||||
super().__init__()
|
||||
self._agent = agent
|
||||
self._display_name = display_name
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Compose the setup dialog layout."""
|
||||
with Vertical(id="setup-dialog"):
|
||||
yield Label(f"Setup {self._display_name}", classes="dialog-title")
|
||||
|
||||
yield Label("Container Engine:", classes="field-label")
|
||||
yield RadioSet(
|
||||
RadioButton("Docker", value=True),
|
||||
RadioButton("Podman"),
|
||||
id="engine-select",
|
||||
)
|
||||
|
||||
with Horizontal(classes="dialog-buttons"):
|
||||
yield Button("Install", variant="primary", id="btn-install")
|
||||
yield Button("Cancel", variant="default", id="btn-cancel")
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle button clicks."""
|
||||
if event.button.id == "btn-cancel":
|
||||
self.dismiss(None)
|
||||
elif event.button.id == "btn-install":
|
||||
self._do_install()
|
||||
|
||||
def action_cancel(self) -> None:
|
||||
"""Dismiss the dialog without action."""
|
||||
self.dismiss(None)
|
||||
|
||||
def _do_install(self) -> None:
|
||||
"""Execute the installation."""
|
||||
engine_set = self.query_one("#engine-select", RadioSet)
|
||||
engine = "docker" if engine_set.pressed_index <= 0 else "podman"
|
||||
result = install_agent_config(self._agent, engine, force=True)
|
||||
self.dismiss(result)
|
||||
|
||||
|
||||
class AgentUnlinkScreen(ModalScreen[str | None]):
|
||||
"""Confirmation modal for unlinking a specific agent."""
|
||||
|
||||
BINDINGS = [("escape", "cancel", "Cancel")]
|
||||
|
||||
def __init__(self, agent: AIAgent, display_name: str) -> None:
|
||||
super().__init__()
|
||||
self._agent = agent
|
||||
self._display_name = display_name
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Compose the unlink confirmation layout."""
|
||||
with Vertical(id="unlink-dialog"):
|
||||
yield Label(f"Unlink {self._display_name}?", classes="dialog-title")
|
||||
yield Label(
|
||||
f"This will remove the FuzzForge MCP configuration from {self._display_name}.",
|
||||
)
|
||||
|
||||
with Horizontal(classes="dialog-buttons"):
|
||||
yield Button("Unlink", variant="warning", id="btn-unlink")
|
||||
yield Button("Cancel", variant="default", id="btn-cancel")
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle button clicks."""
|
||||
if event.button.id == "btn-cancel":
|
||||
self.dismiss(None)
|
||||
elif event.button.id == "btn-unlink":
|
||||
result = uninstall_agent_config(self._agent)
|
||||
self.dismiss(result)
|
||||
|
||||
def action_cancel(self) -> None:
|
||||
"""Dismiss without action."""
|
||||
self.dismiss(None)
|
||||
58
fuzzforge-cli/src/fuzzforge_cli/tui/screens/build_image.py
Normal file
58
fuzzforge-cli/src/fuzzforge_cli/tui/screens/build_image.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""Build-image confirm dialog for FuzzForge TUI.
|
||||
|
||||
Simple modal that asks the user to confirm before starting a background
|
||||
build. The actual build is managed by the app so the user is never
|
||||
locked on this screen.
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Horizontal, Vertical
|
||||
from textual.screen import ModalScreen
|
||||
from textual.widgets import Button, Label
|
||||
|
||||
|
||||
class _NoFocusButton(Button):
|
||||
can_focus = False
|
||||
|
||||
|
||||
class BuildImageScreen(ModalScreen[bool]):
|
||||
"""Quick confirmation before starting a background Docker/Podman build."""
|
||||
|
||||
BINDINGS = [("escape", "cancel", "Cancel")]
|
||||
|
||||
def __init__(self, server_name: str, image: str, hub_name: str) -> None:
|
||||
super().__init__()
|
||||
self._server_name = server_name
|
||||
self._image = image
|
||||
self._hub_name = hub_name
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Build the confirmation dialog UI."""
|
||||
with Vertical(id="build-dialog"):
|
||||
yield Label(f"Build {self._image}", classes="dialog-title")
|
||||
yield Label(
|
||||
f"Hub: {self._hub_name} • Tool: {self._server_name}",
|
||||
id="build-subtitle",
|
||||
)
|
||||
yield Label(
|
||||
"The image will be built in the background.\n"
|
||||
"You'll receive a notification when it's done.",
|
||||
id="confirm-text",
|
||||
)
|
||||
with Horizontal(classes="dialog-buttons"):
|
||||
yield _NoFocusButton("Build", variant="primary", id="btn-build")
|
||||
yield _NoFocusButton("Cancel", variant="default", id="btn-cancel")
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle Build or Cancel button clicks."""
|
||||
if event.button.id == "btn-build":
|
||||
self.dismiss(result=True)
|
||||
elif event.button.id == "btn-cancel":
|
||||
self.dismiss(result=False)
|
||||
|
||||
def action_cancel(self) -> None:
|
||||
"""Dismiss the dialog when Escape is pressed."""
|
||||
self.dismiss(result=False)
|
||||
80
fuzzforge-cli/src/fuzzforge_cli/tui/screens/build_log.py
Normal file
80
fuzzforge-cli/src/fuzzforge_cli/tui/screens/build_log.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""Build-log viewer screen for FuzzForge TUI.
|
||||
|
||||
Shows live output of a background build started by the app. Polls the
|
||||
app's ``_build_logs`` buffer every 500 ms so the user can pop this screen
|
||||
open at any time while the build is running and see up-to-date output.
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Horizontal, Vertical
|
||||
from textual.screen import ModalScreen
|
||||
from textual.widgets import Button, Label, Log
|
||||
|
||||
|
||||
class _NoFocusButton(Button):
|
||||
can_focus = False
|
||||
|
||||
|
||||
class BuildLogScreen(ModalScreen[None]):
|
||||
"""Live log viewer for a background build job managed by the app."""
|
||||
|
||||
BINDINGS = [("escape", "close", "Close")]
|
||||
|
||||
def __init__(self, image: str) -> None:
|
||||
super().__init__()
|
||||
self._image = image
|
||||
self._last_line: int = 0
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Build the log viewer UI."""
|
||||
with Vertical(id="build-dialog"):
|
||||
yield Label(f"Build log — {self._image}", classes="dialog-title")
|
||||
yield Label("", id="build-status")
|
||||
yield Log(id="build-log", auto_scroll=True)
|
||||
with Horizontal(classes="dialog-buttons"):
|
||||
yield _NoFocusButton("Close", variant="default", id="btn-close")
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Initialize log polling when the screen is mounted."""
|
||||
self._flush_log()
|
||||
self.set_interval(0.5, self._poll_log)
|
||||
|
||||
def _flush_log(self) -> None:
|
||||
"""Write any new lines since the last flush."""
|
||||
logs: list[str] = getattr(self.app, "_build_logs", {}).get(self._image, [])
|
||||
log_widget = self.query_one("#build-log", Log)
|
||||
new_lines = logs[self._last_line :]
|
||||
for line in new_lines:
|
||||
log_widget.write_line(line)
|
||||
self._last_line += len(new_lines)
|
||||
|
||||
active: dict[str, Any] = getattr(self.app, "_active_builds", {})
|
||||
status = self.query_one("#build-status", Label)
|
||||
if self._image in active:
|
||||
status.update("[yellow]⏳ Building…[/yellow]")
|
||||
else:
|
||||
# Build is done — check if we have a result stored
|
||||
results: dict[str, Any] = getattr(self.app, "_build_results", {})
|
||||
if self._image in results:
|
||||
if results[self._image]:
|
||||
status.update(f"[green]✓ {self._image} built successfully[/green]")
|
||||
else:
|
||||
status.update(f"[red]✗ {self._image} build failed[/red]")
|
||||
|
||||
def _poll_log(self) -> None:
|
||||
"""Poll for new log lines periodically."""
|
||||
self._flush_log()
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle Close button click."""
|
||||
if event.button.id == "btn-close":
|
||||
self.dismiss(None)
|
||||
|
||||
def action_close(self) -> None:
|
||||
"""Dismiss the dialog when Escape is pressed."""
|
||||
self.dismiss(None)
|
||||
301
fuzzforge-cli/src/fuzzforge_cli/tui/screens/hub_manager.py
Normal file
301
fuzzforge-cli/src/fuzzforge_cli/tui/screens/hub_manager.py
Normal file
@@ -0,0 +1,301 @@
|
||||
"""Hub management screens for FuzzForge TUI.
|
||||
|
||||
Provides modal dialogs for managing linked MCP hub repositories:
|
||||
- HubManagerScreen: list, add, remove linked hubs
|
||||
- LinkHubScreen: link a local directory as a hub
|
||||
- CloneHubScreen: clone a git repo and link it (defaults to FuzzingLabs hub)
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from rich.text import Text
|
||||
from textual import work
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Horizontal, Vertical
|
||||
from textual.screen import ModalScreen
|
||||
from textual.widgets import Button, DataTable, Input, Label, Static
|
||||
|
||||
from fuzzforge_cli.tui.helpers import (
|
||||
FUZZFORGE_DEFAULT_HUB_NAME,
|
||||
FUZZFORGE_DEFAULT_HUB_URL,
|
||||
clone_hub,
|
||||
link_hub,
|
||||
load_hubs_registry,
|
||||
scan_hub_for_servers,
|
||||
unlink_hub,
|
||||
)
|
||||
|
||||
|
||||
class HubManagerScreen(ModalScreen[str | None]):
|
||||
"""Modal screen for managing linked MCP hubs."""
|
||||
|
||||
BINDINGS = [("escape", "cancel", "Close")]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Compose the hub manager layout."""
|
||||
with Vertical(id="hub-manager-dialog"):
|
||||
yield Label("Hub Manager", classes="dialog-title")
|
||||
yield DataTable(id="hubs-table")
|
||||
yield Label("", id="hub-status")
|
||||
with Horizontal(classes="dialog-buttons"):
|
||||
yield Button(
|
||||
"FuzzingLabs Hub",
|
||||
variant="primary",
|
||||
id="btn-clone-default",
|
||||
)
|
||||
yield Button("Link Path", variant="default", id="btn-link")
|
||||
yield Button("Clone URL", variant="default", id="btn-clone")
|
||||
yield Button("Remove", variant="primary", id="btn-remove")
|
||||
yield Button("Close", variant="default", id="btn-close")
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Populate the hubs table on startup."""
|
||||
self._refresh_hubs()
|
||||
|
||||
def _refresh_hubs(self) -> None:
|
||||
"""Refresh the linked hubs table."""
|
||||
table = self.query_one("#hubs-table", DataTable)
|
||||
table.clear(columns=True)
|
||||
table.add_columns("Name", "Path", "Servers", "Source")
|
||||
table.cursor_type = "row"
|
||||
|
||||
registry = load_hubs_registry()
|
||||
hubs = registry.get("hubs", [])
|
||||
|
||||
if not hubs:
|
||||
table.add_row(
|
||||
Text("No hubs linked", style="dim"),
|
||||
Text("Press 'FuzzingLabs Hub' to get started", style="dim"),
|
||||
"",
|
||||
"",
|
||||
)
|
||||
return
|
||||
|
||||
for hub in hubs:
|
||||
name = hub.get("name", "unknown")
|
||||
path = hub.get("path", "")
|
||||
git_url = hub.get("git_url", "")
|
||||
is_default = hub.get("is_default", False)
|
||||
|
||||
hub_path = Path(path)
|
||||
count: str | Text
|
||||
if hub_path.is_dir():
|
||||
servers = scan_hub_for_servers(hub_path)
|
||||
count = str(len(servers))
|
||||
else:
|
||||
count = Text("dir missing", style="yellow")
|
||||
|
||||
source = git_url or "local"
|
||||
name_cell: str | Text
|
||||
if is_default:
|
||||
name_cell = Text(f"★ {name}", style="bold")
|
||||
else:
|
||||
name_cell = name
|
||||
|
||||
table.add_row(name_cell, path, count, source)
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Route button actions."""
|
||||
if event.button.id == "btn-close":
|
||||
self.dismiss("refreshed")
|
||||
elif event.button.id == "btn-clone-default":
|
||||
self.app.push_screen(
|
||||
CloneHubScreen(
|
||||
FUZZFORGE_DEFAULT_HUB_URL,
|
||||
FUZZFORGE_DEFAULT_HUB_NAME,
|
||||
is_default=True,
|
||||
),
|
||||
callback=self._on_hub_action,
|
||||
)
|
||||
elif event.button.id == "btn-link":
|
||||
self.app.push_screen(
|
||||
LinkHubScreen(),
|
||||
callback=self._on_hub_action,
|
||||
)
|
||||
elif event.button.id == "btn-clone":
|
||||
self.app.push_screen(
|
||||
CloneHubScreen(),
|
||||
callback=self._on_hub_action,
|
||||
)
|
||||
elif event.button.id == "btn-remove":
|
||||
self._remove_selected()
|
||||
|
||||
def _on_hub_action(self, result: str | None) -> None:
|
||||
"""Handle result from a sub-screen."""
|
||||
if result:
|
||||
self.query_one("#hub-status", Label).update(result)
|
||||
self.app.notify(result)
|
||||
self._refresh_hubs()
|
||||
|
||||
def _remove_selected(self) -> None:
|
||||
"""Remove the currently selected hub."""
|
||||
table = self.query_one("#hubs-table", DataTable)
|
||||
registry = load_hubs_registry()
|
||||
hubs = registry.get("hubs", [])
|
||||
|
||||
if not hubs:
|
||||
self.app.notify("No hubs to remove", severity="warning")
|
||||
return
|
||||
|
||||
idx = table.cursor_row
|
||||
if idx is None or idx < 0 or idx >= len(hubs):
|
||||
self.app.notify("Select a hub to remove", severity="warning")
|
||||
return
|
||||
|
||||
name = hubs[idx].get("name", "")
|
||||
result = unlink_hub(name)
|
||||
self.query_one("#hub-status", Label).update(result)
|
||||
self._refresh_hubs()
|
||||
self.app.notify(result)
|
||||
|
||||
def action_cancel(self) -> None:
|
||||
"""Close the hub manager."""
|
||||
self.dismiss("refreshed")
|
||||
|
||||
|
||||
class LinkHubScreen(ModalScreen[str | None]):
|
||||
"""Modal for linking a local directory as an MCP hub."""
|
||||
|
||||
BINDINGS = [("escape", "cancel", "Cancel")]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Compose the link dialog layout."""
|
||||
with Vertical(id="link-dialog"):
|
||||
yield Label("Link Local Hub", classes="dialog-title")
|
||||
|
||||
yield Label("Hub Name:", classes="field-label")
|
||||
yield Input(placeholder="my-hub", id="name-input")
|
||||
|
||||
yield Label("Directory Path:", classes="field-label")
|
||||
yield Input(placeholder="/path/to/hub-directory", id="path-input")
|
||||
|
||||
yield Label("", id="link-status")
|
||||
with Horizontal(classes="dialog-buttons"):
|
||||
yield Button("Link", variant="primary", id="btn-link")
|
||||
yield Button("Cancel", variant="default", id="btn-cancel")
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle button clicks."""
|
||||
if event.button.id == "btn-cancel":
|
||||
self.dismiss(None)
|
||||
elif event.button.id == "btn-link":
|
||||
self._do_link()
|
||||
|
||||
def _do_link(self) -> None:
|
||||
"""Execute the link operation."""
|
||||
name = self.query_one("#name-input", Input).value.strip()
|
||||
path = self.query_one("#path-input", Input).value.strip()
|
||||
|
||||
if not name:
|
||||
self.app.notify("Please enter a hub name", severity="warning")
|
||||
return
|
||||
if not path:
|
||||
self.app.notify("Please enter a directory path", severity="warning")
|
||||
return
|
||||
|
||||
result = link_hub(name, path)
|
||||
self.dismiss(result)
|
||||
|
||||
def action_cancel(self) -> None:
|
||||
"""Dismiss without action."""
|
||||
self.dismiss(None)
|
||||
|
||||
|
||||
class CloneHubScreen(ModalScreen[str | None]):
|
||||
"""Modal for cloning a git hub repository and linking it.
|
||||
|
||||
When instantiated with *is_default=True* and FuzzingLabs URL,
|
||||
provides a one-click setup for the standard security hub.
|
||||
|
||||
"""
|
||||
|
||||
BINDINGS = [("escape", "cancel", "Cancel")]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
default_url: str = "",
|
||||
default_name: str = "",
|
||||
is_default: bool = False,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self._default_url = default_url
|
||||
self._default_name = default_name
|
||||
self._is_default = is_default
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""Compose the clone dialog layout."""
|
||||
title = "Clone FuzzingLabs Hub" if self._is_default else "Clone Git Hub"
|
||||
with Vertical(id="clone-dialog"):
|
||||
yield Label(title, classes="dialog-title")
|
||||
|
||||
yield Label("Git URL:", classes="field-label")
|
||||
yield Input(
|
||||
value=self._default_url,
|
||||
placeholder="git@github.com:org/repo.git",
|
||||
id="url-input",
|
||||
)
|
||||
|
||||
yield Label("Hub Name (optional):", classes="field-label")
|
||||
yield Input(
|
||||
value=self._default_name,
|
||||
placeholder="auto-detect from URL",
|
||||
id="name-input",
|
||||
)
|
||||
|
||||
yield Static("", id="clone-status")
|
||||
with Horizontal(classes="dialog-buttons"):
|
||||
yield Button(
|
||||
"Clone & Link",
|
||||
variant="primary",
|
||||
id="btn-clone",
|
||||
)
|
||||
yield Button("Cancel", variant="default", id="btn-cancel")
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""Handle button clicks."""
|
||||
if event.button.id == "btn-cancel":
|
||||
self.dismiss(None)
|
||||
elif event.button.id == "btn-clone":
|
||||
self._start_clone()
|
||||
|
||||
def _start_clone(self) -> None:
|
||||
"""Validate input and start the async clone operation."""
|
||||
url = self.query_one("#url-input", Input).value.strip()
|
||||
if not url:
|
||||
self.app.notify("Please enter a git URL", severity="warning")
|
||||
return
|
||||
|
||||
self.query_one("#btn-clone", Button).disabled = True
|
||||
self.query_one("#clone-status", Static).update("⏳ Cloning repository...")
|
||||
self._do_clone(url)
|
||||
|
||||
@work(thread=True)
|
||||
def _do_clone(self, url: str) -> None:
|
||||
"""Clone the repo in a background thread."""
|
||||
name_input = self.query_one("#name-input", Input).value.strip()
|
||||
name = name_input or None
|
||||
|
||||
success, msg, path = clone_hub(url, name=name)
|
||||
if success and path:
|
||||
hub_name = name or path.name
|
||||
link_result = link_hub(
|
||||
hub_name,
|
||||
path,
|
||||
git_url=url,
|
||||
is_default=self._is_default,
|
||||
)
|
||||
self.app.call_from_thread(self.dismiss, f"✓ {link_result}")
|
||||
else:
|
||||
self.app.call_from_thread(self._on_clone_failed, msg)
|
||||
|
||||
def _on_clone_failed(self, msg: str) -> None:
|
||||
"""Handle a failed clone — re-enable the button and show the error."""
|
||||
self.query_one("#clone-status", Static).update(f"✗ {msg}")
|
||||
self.query_one("#btn-clone", Button).disabled = False
|
||||
|
||||
def action_cancel(self) -> None:
|
||||
"""Dismiss without action."""
|
||||
self.dismiss(None)
|
||||
@@ -18,3 +18,32 @@ ignore = [
|
||||
"PLR2004", # allowing comparisons using unamed numerical constants in tests
|
||||
"S101", # allowing 'assert' statements in tests
|
||||
]
|
||||
"src/**" = [
|
||||
"ANN201", # missing return type: legacy code
|
||||
"ARG002", # unused argument: callback pattern
|
||||
"ASYNC109", # async with timeout param: intentional pattern
|
||||
"BLE001", # blind exception: broad error handling needed
|
||||
"C901", # complexity: legacy code
|
||||
"EM102", # f-string in exception: existing pattern
|
||||
"F401", # unused import: re-export pattern
|
||||
"FBT001", # boolean positional arg
|
||||
"FBT002", # boolean default arg
|
||||
"FIX002", # TODO comments: documented tech debt
|
||||
"N806", # variable naming: intentional constants
|
||||
"PERF401", # list comprehension: readability over perf
|
||||
"PLW0603", # global statement: intentional for shared state
|
||||
"PTH111", # os.path usage: legacy code
|
||||
"RUF005", # collection literal: legacy style
|
||||
"S110", # try-except-pass: intentional suppression
|
||||
"S603", # subprocess: validated inputs
|
||||
"SIM108", # ternary: readability preference
|
||||
"TC001", # TYPE_CHECKING: causes circular imports
|
||||
"TC003", # TYPE_CHECKING: causes circular imports
|
||||
"TRY003", # message in exception: existing pattern
|
||||
"TRY300", # try-else: existing pattern
|
||||
"TRY400", # logging.error vs exception: existing pattern
|
||||
"UP017", # datetime.UTC: Python 3.11+ only
|
||||
"UP041", # TimeoutError alias: compatibility
|
||||
"UP043", # unnecessary type args: compatibility
|
||||
"W293", # blank line whitespace: formatting
|
||||
]
|
||||
|
||||
@@ -176,6 +176,7 @@ class HubClient:
|
||||
arguments: dict[str, Any],
|
||||
*,
|
||||
timeout: int | None = None,
|
||||
extra_volumes: list[str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Execute a tool on a hub server.
|
||||
|
||||
@@ -183,6 +184,7 @@ class HubClient:
|
||||
:param tool_name: Name of the tool to execute.
|
||||
:param arguments: Tool arguments.
|
||||
:param timeout: Execution timeout (uses default if None).
|
||||
:param extra_volumes: Additional Docker volume mounts to inject.
|
||||
:returns: Tool execution result.
|
||||
:raises HubClientError: If execution fails.
|
||||
|
||||
@@ -199,7 +201,7 @@ class HubClient:
|
||||
)
|
||||
|
||||
try:
|
||||
async with self._connect(config) as (reader, writer):
|
||||
async with self._connect(config, extra_volumes=extra_volumes) as (reader, writer):
|
||||
# Initialise MCP session (skip for persistent — already done)
|
||||
if not self._persistent_sessions.get(config.name):
|
||||
await self._initialize_session(reader, writer, config.name)
|
||||
@@ -248,6 +250,7 @@ class HubClient:
|
||||
async def _connect(
|
||||
self,
|
||||
config: HubServerConfig,
|
||||
extra_volumes: list[str] | None = None,
|
||||
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
|
||||
"""Connect to an MCP server.
|
||||
|
||||
@@ -256,6 +259,7 @@ class HubClient:
|
||||
ephemeral per-call connection logic.
|
||||
|
||||
:param config: Server configuration.
|
||||
:param extra_volumes: Additional Docker volume mounts to inject.
|
||||
:yields: Tuple of (reader, writer) for communication.
|
||||
|
||||
"""
|
||||
@@ -268,7 +272,7 @@ class HubClient:
|
||||
|
||||
# Ephemeral connection (original behaviour)
|
||||
if config.type == HubServerType.DOCKER:
|
||||
async with self._connect_docker(config) as streams:
|
||||
async with self._connect_docker(config, extra_volumes=extra_volumes) as streams:
|
||||
yield streams
|
||||
elif config.type == HubServerType.COMMAND:
|
||||
async with self._connect_command(config) as streams:
|
||||
@@ -284,10 +288,12 @@ class HubClient:
|
||||
async def _connect_docker(
|
||||
self,
|
||||
config: HubServerConfig,
|
||||
extra_volumes: list[str] | None = None,
|
||||
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
|
||||
"""Connect to a Docker-based MCP server.
|
||||
|
||||
:param config: Server configuration with image name.
|
||||
:param extra_volumes: Additional volume mounts to inject (e.g. project assets).
|
||||
:yields: Tuple of (reader, writer) for stdio communication.
|
||||
|
||||
"""
|
||||
@@ -302,10 +308,14 @@ class HubClient:
|
||||
for cap in config.capabilities:
|
||||
cmd.extend(["--cap-add", cap])
|
||||
|
||||
# Add volumes
|
||||
# Add volumes from server config
|
||||
for volume in config.volumes:
|
||||
cmd.extend(["-v", os.path.expanduser(volume)])
|
||||
|
||||
# Add extra volumes (e.g. project assets injected at runtime)
|
||||
for volume in (extra_volumes or []):
|
||||
cmd.extend(["-v", os.path.expanduser(volume)])
|
||||
|
||||
# Add environment variables
|
||||
for key, value in config.environment.items():
|
||||
cmd.extend(["-e", f"{key}={value}"])
|
||||
@@ -529,6 +539,7 @@ class HubClient:
|
||||
async def start_persistent_session(
|
||||
self,
|
||||
config: HubServerConfig,
|
||||
extra_volumes: list[str] | None = None,
|
||||
) -> PersistentSession:
|
||||
"""Start a persistent Docker container and initialise MCP session.
|
||||
|
||||
@@ -536,6 +547,7 @@ class HubClient:
|
||||
called, allowing multiple tool calls on the same session.
|
||||
|
||||
:param config: Server configuration (must be Docker type).
|
||||
:param extra_volumes: Additional host:container volume mounts to inject.
|
||||
:returns: The created persistent session.
|
||||
:raises HubClientError: If the container cannot be started.
|
||||
|
||||
@@ -580,6 +592,9 @@ class HubClient:
|
||||
for volume in config.volumes:
|
||||
cmd.extend(["-v", os.path.expanduser(volume)])
|
||||
|
||||
for extra_vol in (extra_volumes or []):
|
||||
cmd.extend(["-v", extra_vol])
|
||||
|
||||
for key, value in config.environment.items():
|
||||
cmd.extend(["-e", f"{key}={value}"])
|
||||
|
||||
|
||||
@@ -180,12 +180,14 @@ class HubExecutor:
|
||||
arguments: dict[str, Any] | None = None,
|
||||
*,
|
||||
timeout: int | None = None,
|
||||
extra_volumes: list[str] | None = None,
|
||||
) -> HubExecutionResult:
|
||||
"""Execute a hub tool.
|
||||
|
||||
:param identifier: Tool identifier (hub:server:tool or server:tool).
|
||||
:param arguments: Tool arguments.
|
||||
:param timeout: Execution timeout.
|
||||
:param extra_volumes: Additional Docker volume mounts to inject.
|
||||
:returns: Execution result.
|
||||
|
||||
"""
|
||||
@@ -232,6 +234,7 @@ class HubExecutor:
|
||||
tool_name_to_use or tool_name,
|
||||
arguments,
|
||||
timeout=timeout,
|
||||
extra_volumes=extra_volumes,
|
||||
)
|
||||
return HubExecutionResult(
|
||||
success=True,
|
||||
@@ -268,6 +271,7 @@ class HubExecutor:
|
||||
tool.name,
|
||||
arguments,
|
||||
timeout=timeout,
|
||||
extra_volumes=extra_volumes,
|
||||
)
|
||||
return HubExecutionResult(
|
||||
success=True,
|
||||
@@ -341,13 +345,14 @@ class HubExecutor:
|
||||
# Persistent session management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def start_persistent_server(self, server_name: str) -> dict[str, Any]:
|
||||
async def start_persistent_server(self, server_name: str, extra_volumes: list[str] | None = None) -> dict[str, Any]:
|
||||
"""Start a persistent container session for a server.
|
||||
|
||||
The container stays running between tool calls, allowing stateful
|
||||
interactions (e.g., radare2 sessions, long-running fuzzing).
|
||||
|
||||
:param server_name: Name of the hub server to start.
|
||||
:param extra_volumes: Additional host:container volume mounts to inject.
|
||||
:returns: Session status dictionary.
|
||||
:raises ValueError: If server not found.
|
||||
|
||||
@@ -358,7 +363,7 @@ class HubExecutor:
|
||||
msg = f"Server '{server_name}' not found"
|
||||
raise ValueError(msg)
|
||||
|
||||
session = await self._client.start_persistent_session(server.config)
|
||||
session = await self._client.start_persistent_session(server.config, extra_volumes=extra_volumes)
|
||||
|
||||
# Auto-discover tools on the new session
|
||||
try:
|
||||
|
||||
@@ -4,7 +4,7 @@ from typing import TYPE_CHECKING
|
||||
from pydantic import BaseModel
|
||||
|
||||
from fuzzforge_common.sandboxes.engines.enumeration import (
|
||||
FuzzForgeSandboxEngines, # noqa: TC001 (required by 'pydantic' at runtime)
|
||||
FuzzForgeSandboxEngines,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
@@ -14,3 +14,18 @@ ignore = [
|
||||
"PLR2004", # allowing comparisons using unamed numerical constants in tests
|
||||
"S101", # allowing 'assert' statements in tests
|
||||
]
|
||||
"src/**" = [
|
||||
"ASYNC109", # async with timeout param: intentional pattern
|
||||
"EM102", # f-string in exception: existing pattern
|
||||
"PERF401", # list comprehension: readability over perf
|
||||
"PLR0913", # too many arguments: API compatibility
|
||||
"PLW0602", # global variable: intentional for shared state
|
||||
"PLW0603", # global statement: intentional for shared state
|
||||
"RET504", # unnecessary assignment: readability
|
||||
"RET505", # unnecessary elif after return: readability
|
||||
"TC001", # TYPE_CHECKING: causes circular imports
|
||||
"TC003", # TYPE_CHECKING: causes circular imports
|
||||
"TRY300", # try-else: existing pattern
|
||||
"TRY301", # abstract raise: existing pattern
|
||||
"TRY003", # message in exception: existing pattern
|
||||
]
|
||||
|
||||
@@ -10,7 +10,6 @@ from fastmcp.exceptions import ResourceError
|
||||
|
||||
from fuzzforge_mcp.dependencies import get_project_path, get_storage
|
||||
|
||||
|
||||
mcp: FastMCP = FastMCP()
|
||||
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ from fastmcp.exceptions import ResourceError
|
||||
|
||||
from fuzzforge_mcp.dependencies import get_project_path, get_settings, get_storage
|
||||
|
||||
|
||||
mcp: FastMCP = FastMCP()
|
||||
|
||||
|
||||
|
||||
@@ -13,9 +13,9 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from tarfile import open as Archive # noqa: N812
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger("fuzzforge-mcp")
|
||||
|
||||
@@ -131,7 +131,7 @@ class LocalStorage:
|
||||
storage_path.mkdir(parents=True, exist_ok=True)
|
||||
config_path = storage_path / "config.json"
|
||||
|
||||
config: dict = {}
|
||||
config: dict[str, Any] = {}
|
||||
if config_path.exists():
|
||||
config = json.loads(config_path.read_text())
|
||||
|
||||
|
||||
@@ -10,14 +10,13 @@ through the FuzzForge hub. AI agents can:
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from fastmcp import FastMCP
|
||||
from fastmcp.exceptions import ToolError
|
||||
|
||||
from fuzzforge_common.hub import HubExecutor, HubServerConfig, HubServerType
|
||||
from fuzzforge_mcp.dependencies import get_settings
|
||||
|
||||
from fuzzforge_mcp.dependencies import get_project_path, get_settings, get_storage
|
||||
|
||||
mcp: FastMCP = FastMCP()
|
||||
|
||||
@@ -173,18 +172,44 @@ async def execute_hub_tool(
|
||||
:return: Tool execution result.
|
||||
|
||||
Example identifiers:
|
||||
- "hub:binwalk-mcp:binwalk_scan"
|
||||
- "hub:yara-mcp:yara_scan_with_rules"
|
||||
- "hub:nmap:nmap_scan"
|
||||
- "nmap:nmap_scan"
|
||||
- "hub:nuclei:nuclei_scan"
|
||||
|
||||
FILE ACCESS — if set_project_assets was called, the assets directory is
|
||||
mounted read-only inside the container at two standard paths:
|
||||
- /app/uploads/ (used by binwalk, and tools with UPLOAD_DIR)
|
||||
- /app/samples/ (used by yara, capa, and tools with SAMPLES_DIR)
|
||||
Always use /app/uploads/<filename> or /app/samples/<filename> when
|
||||
passing file paths to hub tools — do NOT use the host path.
|
||||
|
||||
"""
|
||||
try:
|
||||
executor = _get_hub_executor()
|
||||
|
||||
# Inject project assets as Docker volume mounts if configured.
|
||||
# Mounts the assets directory at the standard paths used by hub tools:
|
||||
# /app/uploads — binwalk, and other tools that use UPLOAD_DIR
|
||||
# /app/samples — yara, capa, and other tools that use SAMPLES_DIR
|
||||
extra_volumes: list[str] = []
|
||||
try:
|
||||
storage = get_storage()
|
||||
project_path = get_project_path()
|
||||
assets_path = storage.get_project_assets_path(project_path)
|
||||
if assets_path:
|
||||
assets_str = str(assets_path)
|
||||
extra_volumes = [
|
||||
f"{assets_str}:/app/uploads:ro",
|
||||
f"{assets_str}:/app/samples:ro",
|
||||
]
|
||||
except Exception: # noqa: BLE001 - never block tool execution due to asset injection failure
|
||||
extra_volumes = []
|
||||
|
||||
result = await executor.execute_tool(
|
||||
identifier=identifier,
|
||||
arguments=arguments or {},
|
||||
timeout=timeout,
|
||||
extra_volumes=extra_volumes or None,
|
||||
)
|
||||
|
||||
return result.to_dict()
|
||||
@@ -335,7 +360,22 @@ async def start_hub_server(server_name: str) -> dict[str, Any]:
|
||||
try:
|
||||
executor = _get_hub_executor()
|
||||
|
||||
result = await executor.start_persistent_server(server_name)
|
||||
# Inject project assets as Docker volume mounts (same logic as execute_hub_tool).
|
||||
extra_volumes: list[str] = []
|
||||
try:
|
||||
storage = get_storage()
|
||||
project_path = get_project_path()
|
||||
assets_path = storage.get_project_assets_path(project_path)
|
||||
if assets_path:
|
||||
assets_str = str(assets_path)
|
||||
extra_volumes = [
|
||||
f"{assets_str}:/app/uploads:ro",
|
||||
f"{assets_str}:/app/samples:ro",
|
||||
]
|
||||
except Exception: # noqa: BLE001 - never block server start due to asset injection failure
|
||||
extra_volumes = []
|
||||
|
||||
result = await executor.start_persistent_server(server_name, extra_volumes=extra_volumes or None)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
|
||||
@@ -10,7 +10,6 @@ from fastmcp.exceptions import ToolError
|
||||
|
||||
from fuzzforge_mcp.dependencies import get_project_path, get_storage, set_current_project_path
|
||||
|
||||
|
||||
mcp: FastMCP = FastMCP()
|
||||
|
||||
|
||||
|
||||
106
hub-config.json
106
hub-config.json
@@ -1,105 +1 @@
|
||||
{
|
||||
"servers": [
|
||||
{
|
||||
"name": "nmap-mcp",
|
||||
"description": "Network reconnaissance using Nmap - port scanning, service detection, OS fingerprinting",
|
||||
"type": "docker",
|
||||
"image": "nmap-mcp:latest",
|
||||
"category": "reconnaissance",
|
||||
"capabilities": ["NET_RAW"],
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "binwalk-mcp",
|
||||
"description": "Firmware extraction and analysis using Binwalk - file signatures, entropy analysis, embedded file extraction",
|
||||
"type": "docker",
|
||||
"image": "binwalk-mcp:latest",
|
||||
"category": "binary-analysis",
|
||||
"capabilities": [],
|
||||
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "yara-mcp",
|
||||
"description": "Pattern matching and malware classification using YARA rules",
|
||||
"type": "docker",
|
||||
"image": "yara-mcp:latest",
|
||||
"category": "binary-analysis",
|
||||
"capabilities": [],
|
||||
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "capa-mcp",
|
||||
"description": "Static capability detection using capa - identifies malware capabilities in binaries",
|
||||
"type": "docker",
|
||||
"image": "capa-mcp:latest",
|
||||
"category": "binary-analysis",
|
||||
"capabilities": [],
|
||||
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "radare2-mcp",
|
||||
"description": "Binary analysis and reverse engineering using radare2",
|
||||
"type": "docker",
|
||||
"image": "radare2-mcp:latest",
|
||||
"category": "binary-analysis",
|
||||
"capabilities": [],
|
||||
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "ghidra-mcp",
|
||||
"description": "Advanced binary decompilation and reverse engineering using Ghidra",
|
||||
"type": "docker",
|
||||
"image": "ghcr.io/clearbluejar/pyghidra-mcp:latest",
|
||||
"category": "binary-analysis",
|
||||
"capabilities": [],
|
||||
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "searchsploit-mcp",
|
||||
"description": "CVE and exploit search using SearchSploit / Exploit-DB",
|
||||
"type": "docker",
|
||||
"image": "searchsploit-mcp:latest",
|
||||
"category": "exploitation",
|
||||
"capabilities": [],
|
||||
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "nuclei-mcp",
|
||||
"description": "Vulnerability scanning using Nuclei templates",
|
||||
"type": "docker",
|
||||
"image": "nuclei-mcp:latest",
|
||||
"category": "web-security",
|
||||
"capabilities": ["NET_RAW"],
|
||||
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "trivy-mcp",
|
||||
"description": "Container and filesystem vulnerability scanning using Trivy",
|
||||
"type": "docker",
|
||||
"image": "trivy-mcp:latest",
|
||||
"category": "cloud-security",
|
||||
"capabilities": [],
|
||||
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
|
||||
"enabled": true
|
||||
},
|
||||
{
|
||||
"name": "gitleaks-mcp",
|
||||
"description": "Secret and credential detection in code and firmware using Gitleaks",
|
||||
"type": "docker",
|
||||
"image": "gitleaks-mcp:latest",
|
||||
"category": "secrets",
|
||||
"capabilities": [],
|
||||
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
|
||||
"enabled": true
|
||||
}
|
||||
],
|
||||
"default_timeout": 300,
|
||||
"cache_tools": true
|
||||
}
|
||||
{"servers": []}
|
||||
|
||||
@@ -7,6 +7,11 @@ requires-python = ">=3.14"
|
||||
authors = [
|
||||
{ name = "FuzzingLabs", email = "contact@fuzzinglabs.com" }
|
||||
]
|
||||
dependencies = [
|
||||
"fuzzforge-cli",
|
||||
"fuzzforge-mcp",
|
||||
"fuzzforge-common",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
|
||||
Reference in New Issue
Block a user