feat(tui): add terminal UI with hub and agent management

This commit is contained in:
AFredefon
2026-03-10 04:06:50 +01:00
parent 3e0d1cd02f
commit 9376645197
12 changed files with 3592 additions and 196 deletions

View File

@@ -8,6 +8,7 @@ requires-python = ">=3.14"
dependencies = [
"fuzzforge-mcp==0.0.1",
"rich>=14.0.0",
"textual>=1.0.0",
"typer==0.20.1",
]

View File

@@ -34,7 +34,7 @@ def main(
envvar="FUZZFORGE_STORAGE__PATH",
help="Path to the storage directory.",
),
] = Path.home() / ".fuzzforge" / "storage",
] = Path.cwd() / ".fuzzforge" / "storage",
context: TyperContext = None, # type: ignore[assignment]
) -> None:
"""FuzzForge AI - Security research orchestration platform.
@@ -42,7 +42,7 @@ def main(
Discover and execute MCP hub tools for security research.
"""
storage = LocalStorage(storage_path=storage_path)
storage = LocalStorage(base_path=storage_path)
context.obj = Context(
storage=storage,
@@ -52,3 +52,19 @@ def main(
application.add_typer(mcp.application)
application.add_typer(projects.application)
@application.command(
name="ui",
help="Launch the FuzzForge terminal interface.",
)
def launch_ui() -> None:
"""Launch the interactive FuzzForge TUI dashboard.
Provides a visual dashboard showing AI agent connection status
and hub server availability, with wizards for setup and configuration.
"""
from fuzzforge_cli.tui.app import FuzzForgeApp
FuzzForgeApp().run()

View File

@@ -169,7 +169,7 @@ def _generate_mcp_config(
# Self-contained storage paths for FuzzForge containers
# This isolates FuzzForge from system Podman and avoids snap issues
fuzzforge_home = Path.home() / ".fuzzforge"
fuzzforge_home = Path.cwd() / ".fuzzforge"
graphroot = fuzzforge_home / "containers" / "storage"
runroot = fuzzforge_home / "containers" / "run"

View File

@@ -0,0 +1 @@
"""FuzzForge terminal user interface."""

View File

@@ -0,0 +1,360 @@
"""FuzzForge TUI application.
Main terminal user interface for FuzzForge, providing a dashboard
with AI agent connection status, hub server availability, and
hub management capabilities.
"""
from __future__ import annotations
from collections import defaultdict
from rich.text import Text
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.widgets import Button, DataTable, Footer, Header, Label
from fuzzforge_cli.tui.helpers import (
check_agent_status,
check_hub_image,
find_fuzzforge_root,
get_agent_configs,
load_hub_config,
)
# Agent config entries stored alongside their linked status for row mapping
_AgentRow = tuple[str, "AIAgent", "Path", str, bool] # noqa: F821
class FuzzForgeApp(App):
"""FuzzForge AI terminal user interface."""
TITLE = "FuzzForge AI"
SUB_TITLE = "Security Research Orchestration"
CSS = """
Screen {
background: $surface;
}
#main {
height: 1fr;
margin: 1 2;
}
.panel {
width: 1fr;
border: round #4699fc;
padding: 1 2;
margin: 0 0 1 0;
}
#hub-panel {
height: 12;
}
#hub-table {
height: 1fr;
}
#agents-panel {
height: auto;
}
.panel-title {
text-style: bold;
color: #4699fc;
text-align: left;
margin-bottom: 1;
}
#hub-title-bar {
height: auto;
align: center middle;
margin: 0 0 1 0;
}
#btn-hub-manager {
min-width: 40;
margin-right: 2;
}
#btn-fuzzinglabs-hub {
min-width: 30;
}
#agents-table {
height: auto;
}
/* Modal screens */
AgentSetupScreen, AgentUnlinkScreen,
HubManagerScreen, LinkHubScreen, CloneHubScreen {
align: center middle;
}
#setup-dialog, #unlink-dialog {
width: 56;
height: auto;
max-height: 80%;
border: thick #4699fc;
background: $surface;
padding: 2 3;
overflow-y: auto;
}
#hub-manager-dialog {
width: 100;
height: auto;
max-height: 85%;
border: thick #4699fc;
background: $surface;
padding: 2 3;
overflow-y: auto;
}
#link-dialog, #clone-dialog {
width: 72;
height: auto;
max-height: 80%;
border: thick #4699fc;
background: $surface;
padding: 2 3;
overflow-y: auto;
}
.dialog-title {
text-style: bold;
text-align: center;
color: #4699fc;
margin-bottom: 1;
}
.field-label {
margin-top: 1;
text-style: bold;
}
RadioSet {
height: auto;
margin: 0 0 1 2;
}
Input {
margin: 0 0 1 0;
}
.dialog-buttons {
layout: horizontal;
height: 3;
align: center middle;
margin-top: 1;
}
.dialog-buttons Button {
margin: 0 1;
min-width: 14;
}
"""
BINDINGS = [
Binding("q", "quit", "Quit"),
Binding("h", "manage_hubs", "Hub Manager"),
Binding("r", "refresh", "Refresh"),
]
def compose(self) -> ComposeResult:
"""Compose the dashboard layout."""
yield Header()
with VerticalScroll(id="main"):
with Vertical(id="hub-panel", classes="panel"):
yield DataTable(id="hub-table")
with Horizontal(id="hub-title-bar"):
yield Button(
"Hub Manager (h)",
variant="primary",
id="btn-hub-manager",
)
yield Button(
"FuzzingLabs Hub",
variant="primary",
id="btn-fuzzinglabs-hub",
)
with Vertical(id="agents-panel", classes="panel"):
yield DataTable(id="agents-table")
yield Footer()
def on_mount(self) -> None:
"""Populate tables on startup."""
self._agent_rows: list[_AgentRow] = []
self.query_one("#hub-panel").border_title = "Hub Servers"
self.query_one("#agents-panel").border_title = "AI Agents"
self._refresh_agents()
self._refresh_hub()
def _refresh_agents(self) -> None:
"""Refresh the AI agents status table."""
table = self.query_one("#agents-table", DataTable)
table.clear(columns=True)
table.add_columns("Agent", "Status", "Config Path")
table.cursor_type = "row"
self._agent_rows = []
for display_name, agent, config_path, servers_key in get_agent_configs():
is_linked, status_text = check_agent_status(config_path, servers_key)
if is_linked:
status_cell = Text(f"{status_text}", style="green")
else:
status_cell = Text(f"{status_text}", style="red")
table.add_row(display_name, status_cell, str(config_path))
self._agent_rows.append(
(display_name, agent, config_path, servers_key, is_linked)
)
def _refresh_hub(self) -> None:
"""Refresh the hub servers table, grouped by source hub."""
table = self.query_one("#hub-table", DataTable)
table.clear(columns=True)
table.add_columns("Server", "Image", "Hub", "Status")
try:
fuzzforge_root = find_fuzzforge_root()
hub_config = load_hub_config(fuzzforge_root)
except Exception:
table.add_row(
Text("Error loading config", style="red"), "", "", ""
)
return
servers = hub_config.get("servers", [])
if not servers:
table.add_row(
Text("No servers — press h", style="dim"), "", "", ""
)
return
# Group servers by source hub
groups: dict[str, list[dict]] = defaultdict(list)
for server in servers:
source = server.get("source_hub", "manual")
groups[source].append(server)
for hub_name, hub_servers in groups.items():
ready_count = 0
total = len(hub_servers)
statuses: list[tuple[dict, bool, str]] = []
for server in hub_servers:
enabled = server.get("enabled", True)
if not enabled:
statuses.append((server, False, "Disabled"))
else:
is_ready, status_text = check_hub_image(
server.get("image", "")
)
if is_ready:
ready_count += 1
statuses.append((server, is_ready, status_text))
# Group header row
if hub_name == "manual":
header = Text(
f"▼ 📦 Local config ({ready_count}/{total} ready)",
style="bold",
)
else:
header = Text(
f"▼ 🔗 {hub_name} ({ready_count}/{total} ready)",
style="bold",
)
table.add_row(header, "", "", "")
# Tool rows
for server, is_ready, status_text in statuses:
name = server.get("name", "unknown")
image = server.get("image", "unknown")
enabled = server.get("enabled", True)
if not enabled:
status_cell = Text("Disabled", style="dim")
elif is_ready:
status_cell = Text("✓ Ready", style="green")
else:
status_cell = Text(f"{status_text}", style="red")
table.add_row(
f" {name}",
Text(image, style="dim"),
hub_name,
status_cell,
)
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle row selection on the agents table."""
if event.data_table.id != "agents-table":
return
idx = event.cursor_row
if idx < 0 or idx >= len(self._agent_rows):
return
display_name, agent, _config_path, _servers_key, is_linked = self._agent_rows[idx]
if is_linked:
from fuzzforge_cli.tui.screens.agent_setup import AgentUnlinkScreen
self.push_screen(
AgentUnlinkScreen(agent, display_name),
callback=self._on_agent_changed,
)
else:
from fuzzforge_cli.tui.screens.agent_setup import AgentSetupScreen
self.push_screen(
AgentSetupScreen(agent, display_name),
callback=self._on_agent_changed,
)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "btn-hub-manager":
self.action_manage_hubs()
elif event.button.id == "btn-fuzzinglabs-hub":
self.action_add_fuzzinglabs_hub()
def action_add_fuzzinglabs_hub(self) -> None:
"""Open the clone dialog pre-filled with the FuzzingLabs hub URL."""
from fuzzforge_cli.tui.screens.hub_manager import CloneHubScreen
self.push_screen(
CloneHubScreen(
default_url="https://github.com/FuzzingLabs/mcp-security-hub",
default_name="mcp-security-hub",
is_default=True,
),
callback=self._on_hub_changed,
)
def action_manage_hubs(self) -> None:
"""Open the hub manager."""
from fuzzforge_cli.tui.screens.hub_manager import HubManagerScreen
self.push_screen(HubManagerScreen(), callback=self._on_hub_changed)
def _on_agent_changed(self, result: str | None) -> None:
"""Handle agent setup/unlink completion."""
if result:
self.notify(result)
self._refresh_agents()
def _on_hub_changed(self, result: str | None) -> None:
"""Handle hub manager completion — refresh the hub table."""
self._refresh_hub()
def action_refresh(self) -> None:
"""Refresh all status panels."""
self._refresh_agents()
self._refresh_hub()
self.notify("Status refreshed")

View File

@@ -0,0 +1,535 @@
"""Shared helpers for FuzzForge TUI and CLI.
Provides utility functions for checking AI agent configuration status,
hub server image availability, installing/removing MCP configurations,
and managing linked MCP hub repositories.
"""
from __future__ import annotations
import json
import subprocess
from pathlib import Path
from typing import Any
from fuzzforge_cli.commands.mcp import (
AIAgent,
_detect_docker_socket,
_detect_podman_socket,
_find_fuzzforge_root,
_generate_mcp_config,
_get_claude_code_user_mcp_path,
_get_claude_desktop_mcp_path,
_get_copilot_mcp_path,
)
# --- Hub Management Constants ---
FUZZFORGE_DEFAULT_HUB_URL = "git@github.com:FuzzingLabs/mcp-security-hub.git"
FUZZFORGE_DEFAULT_HUB_NAME = "mcp-security-hub"
def get_fuzzforge_dir() -> Path:
"""Return the project-local ``.fuzzforge/`` directory.
Uses the current working directory so that each project gets its
own isolated FuzzForge configuration, hubs, and storage — similar
to how ``.git/`` or ``.venv/`` work.
:return: ``Path.cwd() / ".fuzzforge"``
"""
return Path.cwd() / ".fuzzforge"
# Categories that typically need NET_RAW capability for network access
_NET_RAW_CATEGORIES = {"reconnaissance", "web-security"}
# Directories to skip when scanning a hub for MCP tool Dockerfiles
_SCAN_SKIP_DIRS = {
".git",
".github",
"scripts",
"tests",
"examples",
"meta",
"__pycache__",
"node_modules",
".venv",
}
def get_agent_configs() -> list[tuple[str, AIAgent, Path, str]]:
"""Return agent display configs with resolved paths.
Each tuple contains:
- Display name
- AIAgent enum value
- Config file path
- Servers JSON key
:return: List of agent configuration tuples.
"""
return [
("GitHub Copilot", AIAgent.COPILOT, _get_copilot_mcp_path(), "servers"),
("Claude Desktop", AIAgent.CLAUDE_DESKTOP, _get_claude_desktop_mcp_path(), "mcpServers"),
("Claude Code", AIAgent.CLAUDE_CODE, _get_claude_code_user_mcp_path(), "mcpServers"),
]
def check_agent_status(config_path: Path, servers_key: str) -> tuple[bool, str]:
"""Check whether an AI agent has FuzzForge configured.
:param config_path: Path to the agent's MCP config file.
:param servers_key: JSON key for the servers dict (e.g. "servers" or "mcpServers").
:return: Tuple of (is_linked, status_description).
"""
if not config_path.exists():
return False, "Not configured"
try:
config = json.loads(config_path.read_text())
servers = config.get(servers_key, {})
if "fuzzforge" in servers:
return True, "Linked"
return False, "Config exists, not linked"
except json.JSONDecodeError:
return False, "Invalid config file"
def check_hub_image(image: str) -> tuple[bool, str]:
"""Check whether a Docker image exists locally.
:param image: Docker image name (e.g. "semgrep-mcp:latest").
:return: Tuple of (is_ready, status_description).
"""
try:
result = subprocess.run(
["docker", "image", "inspect", image],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
return True, "Ready"
return False, "Not built"
except subprocess.TimeoutExpired:
return False, "Timeout"
except FileNotFoundError:
return False, "Docker not found"
def load_hub_config(fuzzforge_root: Path) -> dict[str, Any]:
"""Load hub-config.json from the FuzzForge root.
:param fuzzforge_root: Path to fuzzforge-oss directory.
:return: Parsed hub configuration dict, empty dict on error.
"""
config_path = fuzzforge_root / "hub-config.json"
if not config_path.exists():
return {}
try:
return json.loads(config_path.read_text())
except json.JSONDecodeError:
return {}
def find_fuzzforge_root() -> Path:
"""Find the FuzzForge installation root directory.
:return: Path to the fuzzforge-oss directory.
"""
return _find_fuzzforge_root()
def install_agent_config(agent: AIAgent, engine: str, force: bool = False) -> str:
"""Install FuzzForge MCP configuration for an AI agent.
:param agent: Target AI agent.
:param engine: Container engine type ("docker" or "podman").
:param force: Overwrite existing configuration.
:return: Result message string.
"""
fuzzforge_root = _find_fuzzforge_root()
if agent == AIAgent.COPILOT:
config_path = _get_copilot_mcp_path()
servers_key = "servers"
elif agent == AIAgent.CLAUDE_CODE:
config_path = _get_claude_code_user_mcp_path()
servers_key = "mcpServers"
else:
config_path = _get_claude_desktop_mcp_path()
servers_key = "mcpServers"
socket = _detect_docker_socket() if engine == "docker" else _detect_podman_socket()
server_config = _generate_mcp_config(
fuzzforge_root=fuzzforge_root,
engine_type=engine,
engine_socket=socket,
)
if config_path.exists():
try:
existing = json.loads(config_path.read_text())
except json.JSONDecodeError:
return f"Error: Invalid JSON in {config_path}"
servers = existing.get(servers_key, {})
if "fuzzforge" in servers and not force:
return "Already configured (use force to overwrite)"
if servers_key not in existing:
existing[servers_key] = {}
existing[servers_key]["fuzzforge"] = server_config
full_config = existing
else:
config_path.parent.mkdir(parents=True, exist_ok=True)
full_config = {servers_key: {"fuzzforge": server_config}}
config_path.write_text(json.dumps(full_config, indent=4))
return f"Installed FuzzForge for {agent.value}"
def uninstall_agent_config(agent: AIAgent) -> str:
"""Remove FuzzForge MCP configuration from an AI agent.
:param agent: Target AI agent.
:return: Result message string.
"""
if agent == AIAgent.COPILOT:
config_path = _get_copilot_mcp_path()
servers_key = "servers"
elif agent == AIAgent.CLAUDE_CODE:
config_path = _get_claude_code_user_mcp_path()
servers_key = "mcpServers"
else:
config_path = _get_claude_desktop_mcp_path()
servers_key = "mcpServers"
if not config_path.exists():
return "Configuration file not found"
try:
config = json.loads(config_path.read_text())
except json.JSONDecodeError:
return "Error: Invalid JSON in config file"
servers = config.get(servers_key, {})
if "fuzzforge" not in servers:
return "FuzzForge is not configured for this agent"
del servers["fuzzforge"]
config_path.write_text(json.dumps(config, indent=4))
return f"Removed FuzzForge from {agent.value}"
# ---------------------------------------------------------------------------
# Hub Management
# ---------------------------------------------------------------------------
def get_hubs_registry_path() -> Path:
"""Return path to the hubs registry file (``.fuzzforge/hubs.json``).
:return: Path to the registry JSON file.
"""
return get_fuzzforge_dir() / "hubs.json"
def get_default_hubs_dir() -> Path:
"""Return default directory for cloned hubs (``.fuzzforge/hubs/``).
:return: Path to the default hubs directory.
"""
return get_fuzzforge_dir() / "hubs"
def load_hubs_registry() -> dict[str, Any]:
"""Load the hubs registry from disk.
:return: Registry dict with ``hubs`` key containing a list of hub entries.
"""
path = get_hubs_registry_path()
if not path.exists():
return {"hubs": []}
try:
return json.loads(path.read_text())
except (json.JSONDecodeError, OSError):
return {"hubs": []}
def save_hubs_registry(registry: dict[str, Any]) -> None:
"""Save the hubs registry to disk.
:param registry: Registry dict to persist.
"""
path = get_hubs_registry_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(registry, indent=2))
def scan_hub_for_servers(hub_path: Path) -> list[dict[str, Any]]:
"""Scan a hub directory for MCP tool Dockerfiles.
Looks for the ``category/tool-name/Dockerfile`` pattern and generates
a server configuration entry for each discovered tool.
:param hub_path: Root directory of the hub repository.
:return: Sorted list of server configuration dicts.
"""
servers: list[dict[str, Any]] = []
if not hub_path.is_dir():
return servers
for dockerfile in sorted(hub_path.rglob("Dockerfile")):
rel = dockerfile.relative_to(hub_path)
parts = rel.parts
# Expected layout: category/tool-name/Dockerfile (exactly 3 parts)
if len(parts) != 3:
continue
category, tool_name, _ = parts
if category in _SCAN_SKIP_DIRS:
continue
capabilities: list[str] = []
if category in _NET_RAW_CATEGORIES:
capabilities = ["NET_RAW"]
servers.append(
{
"name": tool_name,
"description": f"{tool_name}{category}",
"type": "docker",
"image": f"{tool_name}:latest",
"category": category,
"capabilities": capabilities,
"volumes": [f"{get_fuzzforge_dir()}/hub/workspace:/data"],
"enabled": True,
}
)
return servers
def link_hub(
name: str,
path: str | Path,
git_url: str | None = None,
is_default: bool = False,
) -> str:
"""Link a hub directory and add its servers to hub-config.json.
:param name: Display name for the hub.
:param path: Local directory path containing the hub.
:param git_url: Optional git remote URL (for tracking).
:param is_default: Whether this is the default FuzzingLabs hub.
:return: Result message string.
"""
hub_path = Path(path).resolve()
if not hub_path.is_dir():
return f"Error: directory not found: {hub_path}"
# Update registry
registry = load_hubs_registry()
hubs = registry.get("hubs", [])
# Remove existing entry with same name
hubs = [h for h in hubs if h.get("name") != name]
hubs.append(
{
"name": name,
"path": str(hub_path),
"git_url": git_url,
"is_default": is_default,
}
)
registry["hubs"] = hubs
save_hubs_registry(registry)
# Scan and update hub-config.json
scanned = scan_hub_for_servers(hub_path)
if not scanned:
return f"Linked '{name}' (0 servers found)"
try:
added = _merge_servers_into_hub_config(name, scanned)
except Exception as exc:
return f"Linked '{name}' but config update failed: {exc}"
return f"Linked '{name}'{added} new servers added ({len(scanned)} scanned)"
def unlink_hub(name: str) -> str:
"""Unlink a hub and remove its servers from hub-config.json.
:param name: Name of the hub to unlink.
:return: Result message string.
"""
registry = load_hubs_registry()
hubs = registry.get("hubs", [])
if not any(h.get("name") == name for h in hubs):
return f"Hub '{name}' is not linked"
hubs = [h for h in hubs if h.get("name") != name]
registry["hubs"] = hubs
save_hubs_registry(registry)
try:
removed = _remove_hub_servers_from_config(name)
except Exception:
removed = 0
return f"Unlinked '{name}'{removed} server(s) removed"
def clone_hub(
git_url: str,
dest: Path | None = None,
name: str | None = None,
) -> tuple[bool, str, Path | None]:
"""Clone a git hub repository.
If the destination already exists and is a git repo, pulls instead.
:param git_url: Git remote URL to clone.
:param dest: Destination directory (auto-derived from URL if *None*).
:param name: Hub name (auto-derived from URL if *None*).
:return: Tuple of ``(success, message, clone_path)``.
"""
if name is None:
name = git_url.rstrip("/").split("/")[-1]
if name.endswith(".git"):
name = name[:-4]
if dest is None:
dest = get_default_hubs_dir() / name
if dest.exists():
if (dest / ".git").is_dir():
try:
result = subprocess.run(
["git", "-C", str(dest), "pull"],
capture_output=True,
text=True,
timeout=120,
)
if result.returncode == 0:
return True, f"Updated existing clone at {dest}", dest
return False, f"Git pull failed: {result.stderr.strip()}", None
except subprocess.TimeoutExpired:
return False, "Git pull timed out", None
except FileNotFoundError:
return False, "Git not found", None
return False, f"Directory already exists (not a git repo): {dest}", None
dest.parent.mkdir(parents=True, exist_ok=True)
try:
result = subprocess.run(
["git", "clone", git_url, str(dest)],
capture_output=True,
text=True,
timeout=300,
)
if result.returncode == 0:
return True, f"Cloned to {dest}", dest
return False, f"Git clone failed: {result.stderr.strip()}", None
except subprocess.TimeoutExpired:
return False, "Git clone timed out (5 min limit)", None
except FileNotFoundError:
return False, "Git not found on PATH", None
def _merge_servers_into_hub_config(
hub_name: str,
servers: list[dict[str, Any]],
) -> int:
"""Merge scanned servers into hub-config.json.
Only adds servers whose name does not already exist in the config.
New entries are tagged with ``source_hub`` for later removal.
:param hub_name: Name of the source hub (used for tagging).
:param servers: List of server dicts from :func:`scan_hub_for_servers`.
:return: Number of newly added servers.
"""
fuzzforge_root = find_fuzzforge_root()
config_path = fuzzforge_root / "hub-config.json"
if config_path.exists():
try:
config = json.loads(config_path.read_text())
except json.JSONDecodeError:
config = {"servers": [], "default_timeout": 300, "cache_tools": True}
else:
config = {"servers": [], "default_timeout": 300, "cache_tools": True}
existing = config.get("servers", [])
existing_names = {s.get("name") for s in existing}
added = 0
for server in servers:
if server["name"] not in existing_names:
server["source_hub"] = hub_name
existing.append(server)
existing_names.add(server["name"])
added += 1
config["servers"] = existing
config_path.write_text(json.dumps(config, indent=2))
return added
def _remove_hub_servers_from_config(hub_name: str) -> int:
"""Remove servers belonging to a hub from hub-config.json.
Only removes servers tagged with the given ``source_hub`` value.
Manually-added servers (without a tag) are preserved.
:param hub_name: Name of the hub whose servers should be removed.
:return: Number of servers removed.
"""
fuzzforge_root = find_fuzzforge_root()
config_path = fuzzforge_root / "hub-config.json"
if not config_path.exists():
return 0
try:
config = json.loads(config_path.read_text())
except json.JSONDecodeError:
return 0
existing = config.get("servers", [])
before = len(existing)
config["servers"] = [s for s in existing if s.get("source_hub") != hub_name]
after = len(config["servers"])
config_path.write_text(json.dumps(config, indent=2))
return before - after

View File

@@ -0,0 +1 @@
"""TUI screens for FuzzForge."""

View File

@@ -0,0 +1,96 @@
"""Agent setup and unlink modal screens for FuzzForge TUI.
Provides context-aware modals that receive the target agent directly
from the dashboard row selection — no redundant agent picker needed.
"""
from __future__ import annotations
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Label, RadioButton, RadioSet
from fuzzforge_cli.commands.mcp import AIAgent
from fuzzforge_cli.tui.helpers import install_agent_config, uninstall_agent_config
class AgentSetupScreen(ModalScreen[str | None]):
"""Modal for linking a specific agent — only asks for engine choice."""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(self, agent: AIAgent, display_name: str) -> None:
super().__init__()
self._agent = agent
self._display_name = display_name
def compose(self) -> ComposeResult:
"""Compose the setup dialog layout."""
with Vertical(id="setup-dialog"):
yield Label(f"Setup {self._display_name}", classes="dialog-title")
yield Label("Container Engine:", classes="field-label")
yield RadioSet(
RadioButton("Docker", value=True),
RadioButton("Podman"),
id="engine-select",
)
with Horizontal(classes="dialog-buttons"):
yield Button("Install", variant="primary", id="btn-install")
yield Button("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button clicks."""
if event.button.id == "btn-cancel":
self.dismiss(None)
elif event.button.id == "btn-install":
self._do_install()
def action_cancel(self) -> None:
"""Dismiss the dialog without action."""
self.dismiss(None)
def _do_install(self) -> None:
"""Execute the installation."""
engine_set = self.query_one("#engine-select", RadioSet)
engine = "docker" if engine_set.pressed_index <= 0 else "podman"
result = install_agent_config(self._agent, engine, force=True)
self.dismiss(result)
class AgentUnlinkScreen(ModalScreen[str | None]):
"""Confirmation modal for unlinking a specific agent."""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(self, agent: AIAgent, display_name: str) -> None:
super().__init__()
self._agent = agent
self._display_name = display_name
def compose(self) -> ComposeResult:
"""Compose the unlink confirmation layout."""
with Vertical(id="unlink-dialog"):
yield Label(f"Unlink {self._display_name}?", classes="dialog-title")
yield Label(
f"This will remove the FuzzForge MCP configuration from {self._display_name}.",
)
with Horizontal(classes="dialog-buttons"):
yield Button("Unlink", variant="warning", id="btn-unlink")
yield Button("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button clicks."""
if event.button.id == "btn-cancel":
self.dismiss(None)
elif event.button.id == "btn-unlink":
result = uninstall_agent_config(self._agent)
self.dismiss(result)
def action_cancel(self) -> None:
"""Dismiss without action."""
self.dismiss(None)

View File

@@ -0,0 +1,299 @@
"""Hub management screens for FuzzForge TUI.
Provides modal dialogs for managing linked MCP hub repositories:
- HubManagerScreen: list, add, remove linked hubs
- LinkHubScreen: link a local directory as a hub
- CloneHubScreen: clone a git repo and link it (defaults to FuzzingLabs hub)
"""
from __future__ import annotations
from pathlib import Path
from rich.text import Text
from textual import work
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, DataTable, Input, Label, Static
from fuzzforge_cli.tui.helpers import (
FUZZFORGE_DEFAULT_HUB_NAME,
FUZZFORGE_DEFAULT_HUB_URL,
clone_hub,
link_hub,
load_hubs_registry,
scan_hub_for_servers,
unlink_hub,
)
class HubManagerScreen(ModalScreen[str | None]):
"""Modal screen for managing linked MCP hubs."""
BINDINGS = [("escape", "cancel", "Close")]
def compose(self) -> ComposeResult:
"""Compose the hub manager layout."""
with Vertical(id="hub-manager-dialog"):
yield Label("Hub Manager", classes="dialog-title")
yield DataTable(id="hubs-table")
yield Label("", id="hub-status")
with Horizontal(classes="dialog-buttons"):
yield Button(
"FuzzingLabs Hub",
variant="primary",
id="btn-clone-default",
)
yield Button("Link Path", variant="default", id="btn-link")
yield Button("Clone URL", variant="default", id="btn-clone")
yield Button("Remove", variant="primary", id="btn-remove")
yield Button("Close", variant="default", id="btn-close")
def on_mount(self) -> None:
"""Populate the hubs table on startup."""
self._refresh_hubs()
def _refresh_hubs(self) -> None:
"""Refresh the linked hubs table."""
table = self.query_one("#hubs-table", DataTable)
table.clear(columns=True)
table.add_columns("Name", "Path", "Servers", "Source")
table.cursor_type = "row"
registry = load_hubs_registry()
hubs = registry.get("hubs", [])
if not hubs:
table.add_row(
Text("No hubs linked", style="dim"),
Text("Press 'FuzzingLabs Hub' to get started", style="dim"),
"",
"",
)
return
for hub in hubs:
name = hub.get("name", "unknown")
path = hub.get("path", "")
git_url = hub.get("git_url", "")
is_default = hub.get("is_default", False)
hub_path = Path(path)
if hub_path.is_dir():
servers = scan_hub_for_servers(hub_path)
count = str(len(servers))
else:
count = Text("dir missing", style="yellow")
source = git_url or "local"
if is_default:
name_cell = Text(f"{name}", style="bold")
else:
name_cell = Text(name)
table.add_row(name_cell, path, count, source)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Route button actions."""
if event.button.id == "btn-close":
self.dismiss("refreshed")
elif event.button.id == "btn-clone-default":
self.app.push_screen(
CloneHubScreen(
FUZZFORGE_DEFAULT_HUB_URL,
FUZZFORGE_DEFAULT_HUB_NAME,
is_default=True,
),
callback=self._on_hub_action,
)
elif event.button.id == "btn-link":
self.app.push_screen(
LinkHubScreen(),
callback=self._on_hub_action,
)
elif event.button.id == "btn-clone":
self.app.push_screen(
CloneHubScreen(),
callback=self._on_hub_action,
)
elif event.button.id == "btn-remove":
self._remove_selected()
def _on_hub_action(self, result: str | None) -> None:
"""Handle result from a sub-screen."""
if result:
self.query_one("#hub-status", Label).update(result)
self.app.notify(result)
self._refresh_hubs()
def _remove_selected(self) -> None:
"""Remove the currently selected hub."""
table = self.query_one("#hubs-table", DataTable)
registry = load_hubs_registry()
hubs = registry.get("hubs", [])
if not hubs:
self.app.notify("No hubs to remove", severity="warning")
return
idx = table.cursor_row
if idx is None or idx < 0 or idx >= len(hubs):
self.app.notify("Select a hub to remove", severity="warning")
return
name = hubs[idx].get("name", "")
result = unlink_hub(name)
self.query_one("#hub-status", Label).update(result)
self._refresh_hubs()
self.app.notify(result)
def action_cancel(self) -> None:
"""Close the hub manager."""
self.dismiss("refreshed")
class LinkHubScreen(ModalScreen[str | None]):
"""Modal for linking a local directory as an MCP hub."""
BINDINGS = [("escape", "cancel", "Cancel")]
def compose(self) -> ComposeResult:
"""Compose the link dialog layout."""
with Vertical(id="link-dialog"):
yield Label("Link Local Hub", classes="dialog-title")
yield Label("Hub Name:", classes="field-label")
yield Input(placeholder="my-hub", id="name-input")
yield Label("Directory Path:", classes="field-label")
yield Input(placeholder="/path/to/hub-directory", id="path-input")
yield Label("", id="link-status")
with Horizontal(classes="dialog-buttons"):
yield Button("Link", variant="primary", id="btn-link")
yield Button("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button clicks."""
if event.button.id == "btn-cancel":
self.dismiss(None)
elif event.button.id == "btn-link":
self._do_link()
def _do_link(self) -> None:
"""Execute the link operation."""
name = self.query_one("#name-input", Input).value.strip()
path = self.query_one("#path-input", Input).value.strip()
if not name:
self.app.notify("Please enter a hub name", severity="warning")
return
if not path:
self.app.notify("Please enter a directory path", severity="warning")
return
result = link_hub(name, path)
self.dismiss(result)
def action_cancel(self) -> None:
"""Dismiss without action."""
self.dismiss(None)
class CloneHubScreen(ModalScreen[str | None]):
"""Modal for cloning a git hub repository and linking it.
When instantiated with *is_default=True* and FuzzingLabs URL,
provides a one-click setup for the standard security hub.
"""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(
self,
default_url: str = "",
default_name: str = "",
is_default: bool = False,
) -> None:
super().__init__()
self._default_url = default_url
self._default_name = default_name
self._is_default = is_default
def compose(self) -> ComposeResult:
"""Compose the clone dialog layout."""
title = "Clone FuzzingLabs Hub" if self._is_default else "Clone Git Hub"
with Vertical(id="clone-dialog"):
yield Label(title, classes="dialog-title")
yield Label("Git URL:", classes="field-label")
yield Input(
value=self._default_url,
placeholder="git@github.com:org/repo.git",
id="url-input",
)
yield Label("Hub Name (optional):", classes="field-label")
yield Input(
value=self._default_name,
placeholder="auto-detect from URL",
id="name-input",
)
yield Static("", id="clone-status")
with Horizontal(classes="dialog-buttons"):
yield Button(
"Clone & Link",
variant="primary",
id="btn-clone",
)
yield Button("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button clicks."""
if event.button.id == "btn-cancel":
self.dismiss(None)
elif event.button.id == "btn-clone":
self._start_clone()
def _start_clone(self) -> None:
"""Validate input and start the async clone operation."""
url = self.query_one("#url-input", Input).value.strip()
if not url:
self.app.notify("Please enter a git URL", severity="warning")
return
self.query_one("#btn-clone", Button).disabled = True
self.query_one("#clone-status", Static).update("⏳ Cloning repository...")
self._do_clone(url)
@work(thread=True)
def _do_clone(self, url: str) -> None:
"""Clone the repo in a background thread."""
name_input = self.query_one("#name-input", Input).value.strip()
name = name_input or None
success, msg, path = clone_hub(url, name=name)
if success and path:
hub_name = name or path.name
link_result = link_hub(
hub_name,
path,
git_url=url,
is_default=self._is_default,
)
self.app.call_from_thread(self.dismiss, f"{link_result}")
else:
self.app.call_from_thread(self._on_clone_failed, msg)
def _on_clone_failed(self, msg: str) -> None:
"""Handle a failed clone — re-enable the button and show the error."""
self.query_one("#clone-status", Static).update(f"{msg}")
self.query_one("#btn-clone", Button).disabled = False
def action_cancel(self) -> None:
"""Dismiss without action."""
self.dismiss(None)