rename: FuzzForge → SecPipe

Rename the entire project from FuzzForge to SecPipe:
- Python packages: fuzzforge_cli → secpipe_cli, fuzzforge_common → secpipe_common,
  fuzzforge_mcp → secpipe_mcp, fuzzforge_tests → secpipe_tests
- Directories: fuzzforge-cli → secpipe-cli, fuzzforge-common → secpipe-common,
  fuzzforge-mcp → secpipe-mcp, fuzzforge-tests → secpipe-tests
- Environment variables: FUZZFORGE_* → SECPIPE_*
- MCP server name: SecPipe MCP Server
- CI workflows, Makefile, Dockerfile, hub-config, NOTICE updated
- Fix mcp-server.yml to use uvicorn secpipe_mcp.application:app
This commit is contained in:
AFredefon
2026-04-09 04:10:46 +02:00
parent bbf864e88b
commit be009a4094
120 changed files with 866 additions and 800 deletions
+1
View File
@@ -0,0 +1 @@
"""TODO."""
+12
View File
@@ -0,0 +1,12 @@
"""TODO."""
from secpipe_cli.application import application
def main() -> None:
"""TODO."""
application()
if __name__ == "__main__":
main()
@@ -0,0 +1,70 @@
"""SecPipe CLI application."""
from pathlib import Path
from typing import Annotated
from secpipe_mcp.storage import LocalStorage # type: ignore[import-untyped]
from typer import Context as TyperContext
from typer import Option, Typer
from secpipe_cli.commands import mcp, projects
from secpipe_cli.context import Context
application: Typer = Typer(
name="secpipe",
help="SecPipe AI - Security research orchestration platform.",
)
@application.callback()
def main(
project_path: Annotated[
Path,
Option(
"--project",
"-p",
envvar="SECPIPE_PROJECT__DEFAULT_PATH",
help="Path to the SecPipe project directory.",
),
] = Path.cwd(),
storage_path: Annotated[
Path,
Option(
"--storage",
envvar="SECPIPE_STORAGE__PATH",
help="Path to the storage directory.",
),
] = Path.cwd() / ".secpipe" / "storage",
context: TyperContext = None, # type: ignore[assignment]
) -> None:
"""SecPipe AI - Security research orchestration platform.
Discover and execute MCP hub tools for security research.
"""
storage = LocalStorage(base_path=storage_path)
context.obj = Context(
storage=storage,
project_path=project_path,
)
application.add_typer(mcp.application)
application.add_typer(projects.application)
@application.command(
name="ui",
help="Launch the SecPipe terminal interface.",
)
def launch_ui() -> None:
"""Launch the interactive SecPipe TUI dashboard.
Provides a visual dashboard showing AI agent connection status
and hub server availability, with wizards for setup and configuration.
"""
from secpipe_cli.tui.app import SecPipeApp
SecPipeApp().run()
@@ -0,0 +1 @@
"""TODO."""
+510
View File
@@ -0,0 +1,510 @@
"""MCP server configuration commands for SecPipe CLI.
This module provides commands for setting up MCP server connections
with various AI agents (VS Code Copilot, Claude Code, etc.).
"""
from __future__ import annotations
import json
import os
import sys
from enum import StrEnum
from pathlib import Path
from typing import Annotated, Any
from rich.console import Console
from rich.panel import Panel
from rich.syntax import Syntax
from rich.table import Table
from typer import Argument, Context, Option, Typer
application: Typer = Typer(
name="mcp",
help="MCP server configuration commands.",
)
class AIAgent(StrEnum):
"""Supported AI agents."""
COPILOT = "copilot" # GitHub Copilot in VS Code
CLAUDE_DESKTOP = "claude-desktop" # Claude Desktop app
CLAUDE_CODE = "claude-code" # Claude Code CLI (terminal)
def _get_copilot_mcp_path() -> Path:
"""Get the GitHub Copilot MCP configuration file path.
GitHub Copilot uses VS Code's mcp.json for MCP servers.
:returns: Path to the mcp.json file.
"""
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / "Code" / "User" / "mcp.json"
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", "")) / "Code" / "User" / "mcp.json"
# Linux
return Path.home() / ".config" / "Code" / "User" / "mcp.json"
def _get_claude_desktop_mcp_path() -> Path:
"""Get the Claude Desktop MCP configuration file path.
:returns: Path to the claude_desktop_config.json file.
"""
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / "Claude" / "claude_desktop_config.json"
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", "")) / "Claude" / "claude_desktop_config.json"
# Linux
return Path.home() / ".config" / "Claude" / "claude_desktop_config.json"
def _get_claude_code_mcp_path(project_path: Path | None = None) -> Path:
"""Get the Claude Code MCP configuration file path.
Claude Code uses .mcp.json in the project root for project-scoped servers.
:param project_path: Project directory path. If None, uses current directory.
:returns: Path to the .mcp.json file.
"""
if project_path:
return project_path / ".mcp.json"
return Path.cwd() / ".mcp.json"
def _get_claude_code_user_mcp_path() -> Path:
"""Get the Claude Code user-scoped MCP configuration file path.
:returns: Path to ~/.claude.json file.
"""
return Path.home() / ".claude.json"
def _detect_podman_socket() -> str:
"""Auto-detect the Podman socket path.
:returns: Path to the Podman socket.
"""
uid = os.getuid()
socket_paths = [
f"/run/user/{uid}/podman/podman.sock",
"/run/podman/podman.sock",
"/var/run/podman/podman.sock",
]
for path in socket_paths:
if Path(path).exists():
return path
# Default to user socket
return f"/run/user/{uid}/podman/podman.sock"
def _detect_docker_socket() -> str:
"""Auto-detect the Docker socket path.
:returns: Path to the Docker socket.
"""
socket_paths: list[Path] = [
Path("/var/run/docker.sock"),
Path.home() / ".docker" / "run" / "docker.sock",
]
for path in socket_paths:
if path.exists():
return str(path)
return "/var/run/docker.sock"
def _find_secpipe_root() -> Path:
"""Find the SecPipe installation root.
:returns: Path to secpipe-oss directory.
"""
# Check environment variable override first
env_root = os.environ.get("SECPIPE_ROOT")
if env_root:
return Path(env_root).resolve()
# Walk up from cwd to find a secpipe root (hub-config.json is the marker)
for parent in [Path.cwd(), *Path.cwd().parents]:
if (parent / "hub-config.json").is_file():
return parent
# Fall back to __file__-based search (dev install inside secpipe-oss)
current = Path(__file__).resolve()
for parent in current.parents:
if (parent / "secpipe-mcp").is_dir():
return parent
return Path.cwd()
def _generate_mcp_config(
secpipe_root: Path,
engine_type: str,
engine_socket: str,
) -> dict[str, Any]:
"""Generate MCP server configuration.
:param secpipe_root: Path to secpipe-oss installation.
:param engine_type: Container engine type (podman or docker).
:param engine_socket: Container engine socket path.
:returns: MCP configuration dictionary.
"""
venv_python = secpipe_root / ".venv" / "bin" / "python"
# Use uv run if no venv, otherwise use venv python directly
if venv_python.exists():
command = str(venv_python)
args = ["-m", "secpipe_mcp"]
else:
command = "uv"
args = ["--directory", str(secpipe_root), "run", "secpipe-mcp"]
# User-global storage paths for SecPipe containers.
# Kept under ~/.secpipe so images are built once and shared across
# all workspaces — regardless of where `secpipe mcp install` is run.
# Override with SECPIPE_USER_DIR for isolated testing.
user_dir_env = os.environ.get("SECPIPE_USER_DIR")
secpipe_home = Path(user_dir_env).resolve() if user_dir_env else Path.home() / ".secpipe"
graphroot = secpipe_home / "containers" / "storage"
runroot = secpipe_home / "containers" / "run"
return {
"type": "stdio",
"command": command,
"args": args,
"cwd": str(secpipe_root),
"env": {
"SECPIPE_ENGINE__TYPE": engine_type,
"SECPIPE_ENGINE__GRAPHROOT": str(graphroot),
"SECPIPE_ENGINE__RUNROOT": str(runroot),
"SECPIPE_HUB__ENABLED": "true",
"SECPIPE_HUB__CONFIG_PATH": str(secpipe_root / "hub-config.json"),
},
}
@application.command(
help="Show current MCP configuration status.",
name="status",
)
def status(context: Context) -> None:
"""Show MCP configuration status for all supported agents.
:param context: Typer context.
"""
console = Console()
table = Table(title="MCP Configuration Status")
table.add_column("Agent", style="cyan")
table.add_column("Config Path")
table.add_column("Status")
table.add_column("SecPipe Configured")
secpipe_root = _find_secpipe_root()
agents = [
("GitHub Copilot", _get_copilot_mcp_path(), "servers"),
("Claude Desktop", _get_claude_desktop_mcp_path(), "mcpServers"),
("Claude Code", _get_claude_code_user_mcp_path(), "mcpServers"),
]
for name, config_path, servers_key in agents:
if config_path.exists():
try:
config = json.loads(config_path.read_text())
servers = config.get(servers_key, {})
has_secpipe = "secpipe" in servers
table.add_row(
name,
str(config_path),
"[green]✓ Exists[/green]",
"[green]✓ Yes[/green]" if has_secpipe else "[yellow]✗ No[/yellow]",
)
except json.JSONDecodeError:
table.add_row(
name,
str(config_path),
"[red]✗ Invalid JSON[/red]",
"[dim]-[/dim]",
)
else:
table.add_row(
name,
str(config_path),
"[dim]Not found[/dim]",
"[dim]-[/dim]",
)
console.print(table)
# Show detected environment
console.print()
console.print("[bold]Detected Environment:[/bold]")
console.print(f" SecPipe Root: {_find_secpipe_root()}")
console.print(f" Podman Socket: {_detect_podman_socket()}")
console.print(f" Docker Socket: {_detect_docker_socket()}")
@application.command(
help="Generate MCP configuration for an AI agent.",
name="generate",
)
def generate(
context: Context,
agent: Annotated[
AIAgent,
Argument(
help="AI agent to generate config for (copilot, claude-desktop, or claude-code).",
),
],
engine: Annotated[
str,
Option(
"--engine",
"-e",
help="Container engine (docker or podman).",
),
] = "docker",
) -> None:
"""Generate MCP configuration and print to stdout.
:param context: Typer context.
:param agent: Target AI agent.
:param engine: Container engine type.
"""
console = Console()
secpipe_root = _find_secpipe_root()
# Detect socket
if engine == "podman":
socket = _detect_podman_socket()
else:
socket = _detect_docker_socket()
# Generate config
server_config = _generate_mcp_config(
secpipe_root=secpipe_root,
engine_type=engine,
engine_socket=socket,
)
# Format based on agent
if agent == AIAgent.COPILOT:
full_config = {"servers": {"secpipe": server_config}}
else: # Claude Desktop or Claude Code
full_config = {"mcpServers": {"secpipe": server_config}}
config_json = json.dumps(full_config, indent=4)
console.print(Panel(
Syntax(config_json, "json", theme="monokai"),
title=f"MCP Configuration for {agent.value}",
))
# Show where to save it
if agent == AIAgent.COPILOT:
config_path = _get_copilot_mcp_path()
elif agent == AIAgent.CLAUDE_CODE:
config_path = _get_claude_code_mcp_path(secpipe_root)
else: # Claude Desktop
config_path = _get_claude_desktop_mcp_path()
console.print()
console.print(f"[bold]Save to:[/bold] {config_path}")
console.print()
console.print("[dim]Or run 'secpipe mcp install' to install automatically.[/dim]")
@application.command(
help="Install MCP configuration for an AI agent.",
name="install",
)
def install(
context: Context,
agent: Annotated[
AIAgent,
Argument(
help="AI agent to install config for (copilot, claude-desktop, or claude-code).",
),
],
engine: Annotated[
str,
Option(
"--engine",
"-e",
help="Container engine (docker or podman).",
),
] = "docker",
force: Annotated[
bool,
Option(
"--force",
"-f",
help="Overwrite existing secpipe configuration.",
),
] = False,
) -> None:
"""Install MCP configuration for the specified AI agent.
This will create or update the MCP configuration file, adding the
secpipe server configuration.
:param context: Typer context.
:param agent: Target AI agent.
:param engine: Container engine type.
:param force: Overwrite existing configuration.
"""
console = Console()
secpipe_root = _find_secpipe_root()
# Determine config path
if agent == AIAgent.COPILOT:
config_path = _get_copilot_mcp_path()
servers_key = "servers"
elif agent == AIAgent.CLAUDE_CODE:
config_path = _get_claude_code_user_mcp_path()
servers_key = "mcpServers"
else: # Claude Desktop
config_path = _get_claude_desktop_mcp_path()
servers_key = "mcpServers"
# Detect socket
if engine == "podman":
socket = _detect_podman_socket()
else:
socket = _detect_docker_socket()
# Generate server config
server_config = _generate_mcp_config(
secpipe_root=secpipe_root,
engine_type=engine,
engine_socket=socket,
)
# Load existing config or create new
if config_path.exists():
try:
existing_config = json.loads(config_path.read_text())
except json.JSONDecodeError:
console.print(f"[red]Error: Invalid JSON in {config_path}[/red]")
console.print("[dim]Please fix the file manually or delete it.[/dim]")
raise SystemExit(1)
# Check if secpipe already exists
servers = existing_config.get(servers_key, {})
if "secpipe" in servers and not force:
console.print("[yellow]SecPipe is already configured.[/yellow]")
console.print("[dim]Use --force to overwrite existing configuration.[/dim]")
raise SystemExit(1)
# Add/update secpipe
if servers_key not in existing_config:
existing_config[servers_key] = {}
existing_config[servers_key]["secpipe"] = server_config
full_config = existing_config
else:
# Create new config
config_path.parent.mkdir(parents=True, exist_ok=True)
full_config = {servers_key: {"secpipe": server_config}}
# Write config
config_path.write_text(json.dumps(full_config, indent=4))
console.print(f"[green]✓ Installed SecPipe MCP configuration for {agent.value}[/green]")
console.print()
console.print(f"[bold]Configuration file:[/bold] {config_path}")
console.print()
console.print("[bold]Settings:[/bold]")
console.print(f" Engine: {engine}")
console.print(f" Socket: {socket}")
console.print(f" Hub Config: {secpipe_root / 'hub-config.json'}")
console.print()
console.print("[bold]Next steps:[/bold]")
if agent == AIAgent.COPILOT:
console.print(" 1. Restart VS Code")
console.print(" 2. Open Copilot Chat and look for SecPipe tools")
elif agent == AIAgent.CLAUDE_CODE:
console.print(" 1. Run 'claude' from any directory")
console.print(" 2. SecPipe tools will be available")
else: # Claude Desktop
console.print(" 1. Restart Claude Desktop")
console.print(" 2. The secpipe MCP server will be available")
@application.command(
help="Remove MCP configuration for an AI agent.",
name="uninstall",
)
def uninstall(
context: Context,
agent: Annotated[
AIAgent,
Argument(
help="AI agent to remove config from (copilot, claude-desktop, or claude-code).",
),
],
) -> None:
"""Remove SecPipe MCP configuration from the specified AI agent.
:param context: Typer context.
:param agent: Target AI agent.
"""
console = Console()
secpipe_root = _find_secpipe_root()
# Determine config path
if agent == AIAgent.COPILOT:
config_path = _get_copilot_mcp_path()
servers_key = "servers"
elif agent == AIAgent.CLAUDE_CODE:
config_path = _get_claude_code_user_mcp_path()
servers_key = "mcpServers"
else: # Claude Desktop
config_path = _get_claude_desktop_mcp_path()
servers_key = "mcpServers"
if not config_path.exists():
console.print(f"[yellow]Configuration file not found: {config_path}[/yellow]")
return
try:
config = json.loads(config_path.read_text())
except json.JSONDecodeError:
console.print(f"[red]Error: Invalid JSON in {config_path}[/red]")
raise SystemExit(1)
servers = config.get(servers_key, {})
if "secpipe" not in servers:
console.print("[yellow]SecPipe is not configured.[/yellow]")
return
# Remove secpipe
del servers["secpipe"]
# Write back
config_path.write_text(json.dumps(config, indent=4))
console.print(f"[green]✓ Removed SecPipe MCP configuration from {agent.value}[/green]")
console.print()
console.print("[dim]Restart your AI agent for changes to take effect.[/dim]")
@@ -0,0 +1,186 @@
"""Project management commands for SecPipe CLI."""
from pathlib import Path
from typing import Annotated
from rich.console import Console
from rich.table import Table
from typer import Argument, Context, Option, Typer
from secpipe_cli.context import get_project_path, get_storage
application: Typer = Typer(
name="project",
help="Project management commands.",
)
@application.command(
help="Initialize a new SecPipe project.",
name="init",
)
def init_project(
context: Context,
path: Annotated[
Path | None,
Argument(
help="Path to initialize the project in. Defaults to current directory.",
),
] = None,
) -> None:
"""Initialize a new SecPipe project.
Creates the necessary storage directories for the project.
:param context: Typer context.
:param path: Path to initialize (defaults to current directory).
"""
storage = get_storage(context)
project_path = path or get_project_path(context)
storage_path = storage.init_project(project_path)
console = Console()
console.print(f"[green]✓[/green] Project initialized at {project_path}")
console.print(f" Storage: {storage_path}")
@application.command(
help="Set project assets.",
name="assets",
)
def set_assets(
context: Context,
assets_path: Annotated[
Path,
Argument(
help="Path to assets file or directory.",
),
],
) -> None:
"""Set the initial assets for the project.
:param context: Typer context.
:param assets_path: Path to assets.
"""
storage = get_storage(context)
project_path = get_project_path(context)
stored_path = storage.set_project_assets(project_path, assets_path)
console = Console()
console.print(f"[green]✓[/green] Assets stored from {assets_path}")
console.print(f" Location: {stored_path}")
@application.command(
help="Show project information.",
name="info",
)
def show_info(
context: Context,
) -> None:
"""Show information about the current project.
:param context: Typer context.
"""
storage = get_storage(context)
project_path = get_project_path(context)
executions = storage.list_executions(project_path)
assets_path = storage.get_project_assets_path(project_path)
console = Console()
table = Table(title=f"Project: {project_path.name}")
table.add_column("Property", style="cyan")
table.add_column("Value")
table.add_row("Path", str(project_path))
table.add_row("Has Assets", "Yes" if assets_path else "No")
table.add_row("Assets Path", str(assets_path) if assets_path else "-")
table.add_row("Executions", str(len(executions)))
console.print(table)
@application.command(
help="List all executions.",
name="executions",
)
def list_executions(
context: Context,
) -> None:
"""List all executions for the project.
:param context: Typer context.
"""
storage = get_storage(context)
project_path = get_project_path(context)
executions = storage.list_executions(project_path)
console = Console()
if not executions:
console.print("[yellow]No executions found.[/yellow]")
return
table = Table(title="Executions")
table.add_column("ID", style="cyan")
table.add_column("Has Results")
for exec_id in executions:
has_results = storage.get_execution_results(project_path, exec_id) is not None
table.add_row(exec_id, "" if has_results else "-")
console.print(table)
@application.command(
help="Get execution results.",
name="results",
)
def get_results(
context: Context,
execution_id: Annotated[
str,
Argument(
help="Execution ID to get results for.",
),
],
extract_to: Annotated[
Path | None,
Option(
"--extract",
"-x",
help="Extract results to this directory.",
),
] = None,
) -> None:
"""Get results for a specific execution.
:param context: Typer context.
:param execution_id: Execution ID.
:param extract_to: Optional directory to extract to.
"""
storage = get_storage(context)
project_path = get_project_path(context)
results_path = storage.get_execution_results(project_path, execution_id)
console = Console()
if results_path is None:
console.print(f"[red]✗[/red] No results found for execution {execution_id}")
return
console.print(f"[green]✓[/green] Results: {results_path}")
if extract_to:
extracted = storage.extract_results(results_path, extract_to)
console.print(f" Extracted to: {extracted}")
+64
View File
@@ -0,0 +1,64 @@
"""SecPipe CLI context management."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, cast
from secpipe_mcp.storage import LocalStorage # type: ignore[import-untyped]
if TYPE_CHECKING:
from typer import Context as TyperContext
class Context:
"""CLI context holding the storage instance and settings."""
_storage: LocalStorage
_project_path: Path
def __init__(self, storage: LocalStorage, project_path: Path) -> None:
"""Initialize an instance of the class.
:param storage: SecPipe local storage instance.
:param project_path: Path to the current project.
"""
self._storage = storage
self._project_path = project_path
def get_storage(self) -> LocalStorage:
"""Get the storage instance.
:return: LocalStorage instance.
"""
return self._storage
def get_project_path(self) -> Path:
"""Get the current project path.
:return: Project path.
"""
return self._project_path
def get_storage(context: TyperContext) -> LocalStorage:
"""Get storage from Typer context.
:param context: Typer context.
:return: LocalStorage instance.
"""
return cast("Context", context.obj).get_storage()
def get_project_path(context: TyperContext) -> Path:
"""Get project path from Typer context.
:param context: Typer context.
:return: Project path.
"""
return cast("Context", context.obj).get_project_path()
@@ -0,0 +1 @@
"""SecPipe terminal user interface."""
+562
View File
@@ -0,0 +1,562 @@
"""SecPipe TUI application.
Main terminal user interface for SecPipe, providing a dashboard
with AI agent connection status, hub server availability, and
hub management capabilities.
"""
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any
from rich.text import Text
from textual import events, work
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.message import Message
from textual.widgets import Button, DataTable, Footer, Header
from secpipe_cli.tui.helpers import (
check_agent_status,
check_hub_image,
find_secpipe_root,
get_agent_configs,
load_hub_config,
)
if TYPE_CHECKING:
from secpipe_cli.commands.mcp import AIAgent
# Agent config entries stored alongside their linked status for row mapping
_AgentRow = tuple[str, "AIAgent", Path, str, bool]
class SingleClickDataTable(DataTable[Any]):
"""DataTable subclass that also fires ``RowClicked`` on a single mouse click.
Textual's built-in ``RowSelected`` only fires on Enter or on a second click
of an already-highlighted row. ``RowClicked`` fires on every first click,
enabling single-click-to-act UX without requiring Enter.
"""
class RowClicked(Message):
"""Fired on every single mouse click on a data row."""
def __init__(self, data_table: SingleClickDataTable, cursor_row: int) -> None:
self.data_table = data_table
self.cursor_row = cursor_row
super().__init__()
@property
def control(self) -> SingleClickDataTable:
"""Return the data table that fired this event."""
return self.data_table
async def _on_click(self, event: events.Click) -> None:
"""Forward to parent, then post RowClicked on every mouse click.
The hub table is handled exclusively via RowClicked. RowSelected is
intentionally NOT used for the hub table to avoid double-dispatch.
"""
await super()._on_click(event)
meta = event.style.meta
if meta and "row" in meta and self.cursor_type == "row":
row_index: int = int(meta["row"])
if row_index >= 0:
self.post_message(SingleClickDataTable.RowClicked(self, row_index))
class SecPipeApp(App[None]):
"""SecPipe AI terminal user interface."""
TITLE = "SecPipe AI"
SUB_TITLE = "Security Research Orchestration"
CSS = """
Screen {
background: $surface;
}
#main {
height: 1fr;
margin: 1 2;
}
.panel {
width: 1fr;
border: round #4699fc;
padding: 1 2;
margin: 0 0 1 0;
}
#hub-panel {
height: 12;
}
#hub-table {
height: 1fr;
}
#agents-panel {
height: auto;
}
.panel-title {
text-style: bold;
color: #4699fc;
text-align: left;
margin-bottom: 1;
}
#hub-title-bar {
height: auto;
align: center middle;
margin: 0 0 1 0;
}
#btn-hub-manager {
min-width: 40;
margin-right: 2;
}
#btn-fuzzinglabs-hub {
min-width: 30;
}
#agents-table {
height: auto;
}
/* Modal screens */
AgentSetupScreen, AgentUnlinkScreen,
HubManagerScreen, LinkHubScreen, CloneHubScreen,
BuildImageScreen, BuildLogScreen {
align: center middle;
}
#setup-dialog, #unlink-dialog {
width: 56;
height: auto;
max-height: 80%;
border: thick #4699fc;
background: $surface;
padding: 2 3;
overflow-y: auto;
}
#hub-manager-dialog {
width: 100;
height: auto;
max-height: 85%;
border: thick #4699fc;
background: $surface;
padding: 2 3;
overflow-y: auto;
}
#link-dialog, #clone-dialog {
width: 72;
height: auto;
max-height: 80%;
border: thick #4699fc;
background: $surface;
padding: 2 3;
overflow-y: auto;
}
#build-dialog {
width: 72;
height: auto;
max-height: 80%;
border: thick #4699fc;
background: $surface;
padding: 2 3;
}
#confirm-text {
margin: 1 0 2 0;
}
#build-log {
height: 30;
border: round $panel;
margin: 1 0;
}
#build-subtitle {
color: $text-muted;
margin-bottom: 1;
}
#build-status {
height: 1;
margin-top: 1;
}
.dialog-title {
text-style: bold;
text-align: center;
color: #4699fc;
margin-bottom: 1;
}
.field-label {
margin-top: 1;
text-style: bold;
}
RadioSet {
height: auto;
margin: 0 0 1 2;
}
Input {
margin: 0 0 1 0;
}
.dialog-buttons {
layout: horizontal;
height: 3;
align: center middle;
margin-top: 1;
}
.dialog-buttons Button {
margin: 0 1;
min-width: 14;
}
"""
BINDINGS = [
Binding("q", "quit", "Quit"),
Binding("h", "manage_hubs", "Hub Manager"),
Binding("r", "refresh", "Refresh"),
Binding("enter", "select_row", "Select", show=False),
]
def compose(self) -> ComposeResult:
"""Compose the dashboard layout."""
yield Header()
with VerticalScroll(id="main"):
with Vertical(id="hub-panel", classes="panel"):
yield SingleClickDataTable(id="hub-table")
with Horizontal(id="hub-title-bar"):
yield Button(
"Hub Manager (h)",
variant="primary",
id="btn-hub-manager",
)
yield Button(
"FuzzingLabs Hub",
variant="primary",
id="btn-fuzzinglabs-hub",
)
with Vertical(id="agents-panel", classes="panel"):
yield DataTable(id="agents-table")
yield Footer()
def on_mount(self) -> None:
"""Populate tables on startup."""
self._agent_rows: list[_AgentRow] = []
self._hub_rows: list[tuple[str, str, str, bool] | None] = []
# Background build tracking
self._active_builds: dict[str, object] = {} # image -> Popen
self._build_logs: dict[str, list[str]] = {} # image -> log lines
self._build_results: dict[str, bool] = {} # image -> success
self.query_one("#hub-panel").border_title = "Hub Servers [dim](click ✗ Not built to build)[/dim]"
self.query_one("#agents-panel").border_title = "AI Agents"
self._refresh_agents()
self._refresh_hub()
def _refresh_agents(self) -> None:
"""Refresh the AI agents status table."""
table = self.query_one("#agents-table", DataTable)
table.clear(columns=True)
table.add_columns("Agent", "Status", "Config Path")
table.cursor_type = "row"
self._agent_rows = []
for display_name, agent, config_path, servers_key in get_agent_configs():
is_linked, status_text = check_agent_status(config_path, servers_key)
if is_linked:
status_cell = Text(f"{status_text}", style="green")
else:
status_cell = Text(f"{status_text}", style="red")
table.add_row(display_name, status_cell, str(config_path))
self._agent_rows.append(
(display_name, agent, config_path, servers_key, is_linked)
)
def _refresh_hub(self) -> None:
"""Refresh the hub servers table, grouped by source hub."""
self._hub_rows = []
table = self.query_one("#hub-table", SingleClickDataTable)
table.clear(columns=True)
table.add_columns("Server", "Image", "Hub", "Status")
table.cursor_type = "row"
try:
secpipe_root = find_secpipe_root()
hub_config = load_hub_config(secpipe_root)
except Exception:
table.add_row(
Text("Error loading config", style="red"), "", "", ""
)
return
servers = hub_config.get("servers", [])
if not servers:
table.add_row(
Text("No servers — press h", style="dim"), "", "", ""
)
return
# Group servers by source hub
groups: dict[str, list[dict[str, Any]]] = defaultdict(list)
for server in servers:
source = server.get("source_hub", "manual")
groups[source].append(server)
for hub_name, hub_servers in groups.items():
ready_count = 0
total = len(hub_servers)
statuses: list[tuple[dict[str, Any], bool, str]] = []
for server in hub_servers:
enabled = server.get("enabled", True)
if not enabled:
statuses.append((server, False, "Disabled"))
else:
is_ready, status_text = check_hub_image(
server.get("image", "")
)
if is_ready:
ready_count += 1
statuses.append((server, is_ready, status_text))
# Group header row
if hub_name == "manual":
header = Text(
f"▼ 📦 Local config ({ready_count}/{total} ready)",
style="bold",
)
else:
header = Text(
f"▼ 🔗 {hub_name} ({ready_count}/{total} ready)",
style="bold",
)
table.add_row(header, "", "", "")
self._hub_rows.append(None) # group header — not selectable
# Tool rows
for server, is_ready, status_text in statuses:
name = server.get("name", "unknown")
image = server.get("image", "unknown")
enabled = server.get("enabled", True)
if image in getattr(self, "_active_builds", {}):
status_cell = Text("⏳ Building…", style="yellow")
elif not enabled:
status_cell = Text("Disabled", style="dim")
elif is_ready:
status_cell = Text("✓ Ready", style="green")
else:
status_cell = Text(f"{status_text}", style="red dim")
table.add_row(
f" {name}",
Text(image, style="dim"),
hub_name,
status_cell,
)
self._hub_rows.append((name, image, hub_name, is_ready))
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle Enter-key row selection (agents table only).
Hub table uses RowClicked exclusively — wiring it to RowSelected too
would cause a double push on every click since Textual 8 fires
RowSelected on ALL clicks, not just second-click-on-same-row.
"""
if event.data_table.id == "agents-table":
self._handle_agent_row(event.cursor_row)
def on_single_click_data_table_row_clicked(
self, event: SingleClickDataTable.RowClicked
) -> None:
"""Handle single mouse-click on a hub table row."""
if event.data_table.id == "hub-table":
self._handle_hub_row(event.cursor_row)
def _handle_agent_row(self, idx: int) -> None:
"""Open agent setup/unlink for the selected agent row."""
if idx < 0 or idx >= len(self._agent_rows):
return
display_name, agent, _config_path, _servers_key, is_linked = self._agent_rows[idx]
if is_linked:
from secpipe_cli.tui.screens.agent_setup import AgentUnlinkScreen
self.push_screen(
AgentUnlinkScreen(agent, display_name),
callback=self._on_agent_changed,
)
else:
from secpipe_cli.tui.screens.agent_setup import AgentSetupScreen
self.push_screen(
AgentSetupScreen(agent, display_name),
callback=self._on_agent_changed,
)
def _handle_hub_row(self, idx: int) -> None:
"""Handle a click on a hub table row."""
# Guard: never push two build dialogs at once (double-click protection)
if getattr(self, "_build_dialog_open", False):
return
if idx < 0 or idx >= len(self._hub_rows):
return
row_data = self._hub_rows[idx]
if row_data is None:
return # group header row — ignore
server_name, image, hub_name, is_ready = row_data
# If a build is already running, open the live log viewer
if image in self._active_builds:
from secpipe_cli.tui.screens.build_log import BuildLogScreen
self._build_dialog_open = True
self.push_screen(
BuildLogScreen(image),
callback=lambda _: setattr(self, "_build_dialog_open", False),
)
return
if is_ready:
self.notify(f"{image} is already built ✓", severity="information")
return
if hub_name == "manual":
self.notify("Manual servers must be built outside SecPipe")
return
from secpipe_cli.tui.screens.build_image import BuildImageScreen
self._build_dialog_open = True
def _on_build_dialog_done(result: bool | None) -> None:
self._build_dialog_open = False
if result is not None:
self._on_build_confirmed(result, server_name, image, hub_name)
self.push_screen(
BuildImageScreen(server_name, image, hub_name),
callback=_on_build_dialog_done,
)
def _on_build_confirmed(self, confirmed: bool, server_name: str, image: str, hub_name: str) -> None:
"""Start a background build if the user confirmed."""
if not confirmed:
return
self._build_logs[image] = []
self._build_results.pop(image, None)
self._active_builds[image] = True # mark as pending so ⏳ shows immediately
self._refresh_hub() # show ⏳ Building… immediately
self._run_build(server_name, image, hub_name)
@work(thread=True)
def _run_build(self, server_name: str, image: str, hub_name: str) -> None:
"""Build a Docker/Podman image in a background thread."""
from secpipe_cli.tui.helpers import build_image, find_dockerfile_for_server
logs = self._build_logs.setdefault(image, [])
dockerfile = find_dockerfile_for_server(server_name, hub_name)
if dockerfile is None:
logs.append(f"ERROR: Dockerfile not found for '{server_name}' in hub '{hub_name}'")
self._build_results[image] = False
self._active_builds.pop(image, None)
self.call_from_thread(self._on_build_done, image, success=False)
return
logs.append(f"Building {image} from {dockerfile.parent}")
logs.append("")
try:
proc = build_image(image, dockerfile)
except FileNotFoundError as exc:
logs.append(f"ERROR: {exc}")
self._build_results[image] = False
self._active_builds.pop(image, None)
self.call_from_thread(self._on_build_done, image, success=False)
return
self._active_builds[image] = proc # replace pending marker with actual process
self.call_from_thread(self._refresh_hub) # show ⏳ in table
if proc.stdout is None:
return
for line in proc.stdout:
logs.append(line.rstrip())
proc.wait()
self._active_builds.pop(image, None)
success = proc.returncode == 0
self._build_results[image] = success
self.call_from_thread(self._on_build_done, image, success=success)
def _on_build_done(self, image: str, *, success: bool) -> None:
"""Handle completion of a background build on the main thread."""
self._refresh_hub()
if success:
self.notify(f"{image} built successfully", severity="information")
else:
self.notify(f"{image} build failed — click row for log", severity="error")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "btn-hub-manager":
self.action_manage_hubs()
elif event.button.id == "btn-fuzzinglabs-hub":
self.action_add_fuzzinglabs_hub()
def action_add_fuzzinglabs_hub(self) -> None:
"""Open the clone dialog pre-filled with the FuzzingLabs hub URL."""
from secpipe_cli.tui.screens.hub_manager import CloneHubScreen
self.push_screen(
CloneHubScreen(
default_url="https://github.com/FuzzingLabs/mcp-security-hub",
default_name="mcp-security-hub",
is_default=True,
),
callback=self._on_hub_changed,
)
def action_manage_hubs(self) -> None:
"""Open the hub manager."""
from secpipe_cli.tui.screens.hub_manager import HubManagerScreen
self.push_screen(HubManagerScreen(), callback=self._on_hub_changed)
def _on_agent_changed(self, result: str | None) -> None:
"""Handle agent setup/unlink completion."""
if result:
self.notify(result)
self._refresh_agents()
def _on_hub_changed(self, result: str | None) -> None:
"""Handle hub manager completion — refresh the hub table."""
self._refresh_hub()
def action_refresh(self) -> None:
"""Refresh all status panels."""
self._refresh_agents()
self._refresh_hub()
self.notify("Status refreshed")
+687
View File
@@ -0,0 +1,687 @@
"""Shared helpers for SecPipe TUI and CLI.
Provides utility functions for checking AI agent configuration status,
hub server image availability, installing/removing MCP configurations,
and managing linked MCP hub repositories.
"""
from __future__ import annotations
import contextlib
import json
import os
import subprocess
from pathlib import Path
from typing import Any
from secpipe_cli.commands.mcp import (
AIAgent,
_detect_docker_socket,
_detect_podman_socket,
_find_secpipe_root,
_generate_mcp_config,
_get_claude_code_user_mcp_path,
_get_claude_desktop_mcp_path,
_get_copilot_mcp_path,
)
# --- Hub Management Constants ---
SECPIPE_DEFAULT_HUB_URL = "git@github.com:FuzzingLabs/mcp-security-hub.git"
SECPIPE_DEFAULT_HUB_NAME = "mcp-security-hub"
def get_secpipe_user_dir() -> Path:
"""Return the user-global ``~/.secpipe/`` directory.
Stores data that is shared across all workspaces: cloned hub
repositories, the hub registry, container storage (graphroot/runroot),
and the hub workspace volume.
Override with the ``SECPIPE_USER_DIR`` environment variable to
redirect all user-global data to a custom path — useful for testing
a fresh install without touching the real ``~/.secpipe/``.
:return: ``Path.home() / ".secpipe"`` or ``$SECPIPE_USER_DIR``
"""
env_dir = os.environ.get("SECPIPE_USER_DIR")
if env_dir:
return Path(env_dir).resolve()
return Path.home() / ".secpipe"
def get_secpipe_dir() -> Path:
"""Return the project-local ``.secpipe/`` directory.
Stores data that is specific to the current workspace: fuzzing
results and project artifacts. Similar to how ``.git/`` scopes
version-control data to a single project.
:return: ``Path.cwd() / ".secpipe"``
"""
return Path.cwd() / ".secpipe"
# Categories that typically need NET_RAW capability for network access
_NET_RAW_CATEGORIES = {"reconnaissance", "web-security"}
# Directories to skip when scanning a hub for MCP tool Dockerfiles
_SCAN_SKIP_DIRS = {
".git",
".github",
"scripts",
"tests",
"examples",
"meta",
"__pycache__",
"node_modules",
".venv",
}
def get_agent_configs() -> list[tuple[str, AIAgent, Path, str]]:
"""Return agent display configs with resolved paths.
Each tuple contains:
- Display name
- AIAgent enum value
- Config file path
- Servers JSON key
:return: List of agent configuration tuples.
"""
return [
("GitHub Copilot", AIAgent.COPILOT, _get_copilot_mcp_path(), "servers"),
("Claude Desktop", AIAgent.CLAUDE_DESKTOP, _get_claude_desktop_mcp_path(), "mcpServers"),
("Claude Code", AIAgent.CLAUDE_CODE, _get_claude_code_user_mcp_path(), "mcpServers"),
]
def check_agent_status(config_path: Path, servers_key: str) -> tuple[bool, str]:
"""Check whether an AI agent has SecPipe configured.
:param config_path: Path to the agent's MCP config file.
:param servers_key: JSON key for the servers dict (e.g. "servers" or "mcpServers").
:return: Tuple of (is_linked, status_description).
"""
if not config_path.exists():
return False, "Not configured"
try:
config = json.loads(config_path.read_text())
servers = config.get(servers_key, {})
if "secpipe" in servers:
return True, "Linked"
return False, "Config exists, not linked"
except json.JSONDecodeError:
return False, "Invalid config file"
def check_hub_image(image: str) -> tuple[bool, str]:
"""Check whether a container image exists locally.
Respects the ``SECPIPE_ENGINE__TYPE`` environment variable so that
Podman users see the correct build status instead of always "Not built".
:param image: Image name (e.g. "semgrep-mcp:latest").
:return: Tuple of (is_ready, status_description).
"""
engine = os.environ.get("SECPIPE_ENGINE__TYPE", "docker").lower()
cmd = "podman" if engine == "podman" else "docker"
try:
result = subprocess.run(
[cmd, "image", "inspect", image],
check=False, capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
return True, "Ready"
return False, "Not built"
except subprocess.TimeoutExpired:
return False, "Timeout"
except FileNotFoundError:
return False, f"{cmd} not found"
def load_hub_config(secpipe_root: Path) -> dict[str, Any]:
"""Load hub-config.json from the SecPipe root.
:param secpipe_root: Path to secpipe-oss directory.
:return: Parsed hub configuration dict, empty dict on error.
"""
config_path = secpipe_root / "hub-config.json"
if not config_path.exists():
return {}
try:
data: dict[str, Any] = json.loads(config_path.read_text())
return data
except json.JSONDecodeError:
return {}
def find_secpipe_root() -> Path:
"""Find the SecPipe installation root directory.
:return: Path to the secpipe-oss directory.
"""
return _find_secpipe_root()
def install_agent_config(agent: AIAgent, engine: str, force: bool = False) -> str:
"""Install SecPipe MCP configuration for an AI agent.
:param agent: Target AI agent.
:param engine: Container engine type ("docker" or "podman").
:param force: Overwrite existing configuration.
:return: Result message string.
"""
secpipe_root = _find_secpipe_root()
if agent == AIAgent.COPILOT:
config_path = _get_copilot_mcp_path()
servers_key = "servers"
elif agent == AIAgent.CLAUDE_CODE:
config_path = _get_claude_code_user_mcp_path()
servers_key = "mcpServers"
else:
config_path = _get_claude_desktop_mcp_path()
servers_key = "mcpServers"
socket = _detect_docker_socket() if engine == "docker" else _detect_podman_socket()
server_config = _generate_mcp_config(
secpipe_root=secpipe_root,
engine_type=engine,
engine_socket=socket,
)
if config_path.exists():
try:
existing = json.loads(config_path.read_text())
except json.JSONDecodeError:
return f"Error: Invalid JSON in {config_path}"
servers = existing.get(servers_key, {})
if "secpipe" in servers and not force:
return "Already configured (use force to overwrite)"
if servers_key not in existing:
existing[servers_key] = {}
existing[servers_key]["secpipe"] = server_config
full_config = existing
else:
config_path.parent.mkdir(parents=True, exist_ok=True)
full_config = {servers_key: {"secpipe": server_config}}
config_path.write_text(json.dumps(full_config, indent=4))
return f"Installed SecPipe for {agent.value}"
def uninstall_agent_config(agent: AIAgent) -> str:
"""Remove SecPipe MCP configuration from an AI agent.
:param agent: Target AI agent.
:return: Result message string.
"""
if agent == AIAgent.COPILOT:
config_path = _get_copilot_mcp_path()
servers_key = "servers"
elif agent == AIAgent.CLAUDE_CODE:
config_path = _get_claude_code_user_mcp_path()
servers_key = "mcpServers"
else:
config_path = _get_claude_desktop_mcp_path()
servers_key = "mcpServers"
if not config_path.exists():
return "Configuration file not found"
try:
config = json.loads(config_path.read_text())
except json.JSONDecodeError:
return "Error: Invalid JSON in config file"
servers = config.get(servers_key, {})
if "secpipe" not in servers:
return "SecPipe is not configured for this agent"
del servers["secpipe"]
config_path.write_text(json.dumps(config, indent=4))
return f"Removed SecPipe from {agent.value}"
# ---------------------------------------------------------------------------
# Hub Management
# ---------------------------------------------------------------------------
def get_hubs_registry_path() -> Path:
"""Return path to the hubs registry file (``~/.secpipe/hubs.json``).
Stored in the user-global directory so the registry is shared across
all workspaces.
:return: Path to the registry JSON file.
"""
return get_secpipe_user_dir() / "hubs.json"
def get_default_hubs_dir() -> Path:
"""Return default directory for cloned hubs (``~/.secpipe/hubs/``).
Stored in the user-global directory so hubs are cloned once and
reused in every workspace.
:return: Path to the default hubs directory.
"""
return get_secpipe_user_dir() / "hubs"
def _discover_hub_dirs() -> list[Path]:
"""Scan known hub directories for cloned repos.
Checks both the current global location (``~/.secpipe/hubs/``) and the
legacy workspace-local location (``<cwd>/.secpipe/hubs/``) so that hubs
cloned before the global-dir migration are still found.
:return: List of hub directory paths (each is a direct child with a ``.git``
sub-directory).
"""
candidates: list[Path] = []
for base in (get_secpipe_user_dir() / "hubs", get_secpipe_dir() / "hubs"):
if base.is_dir():
candidates.extend(
entry for entry in base.iterdir()
if entry.is_dir() and (entry / ".git").is_dir()
)
return candidates
def load_hubs_registry() -> dict[str, Any]:
"""Load the hubs registry from disk.
If the registry file does not exist, auto-recovers it by scanning known hub
directories and rebuilding entries for any discovered hubs. This handles
the migration from the old workspace-local ``<cwd>/.secpipe/hubs.json``
path to the global ``~/.secpipe/hubs.json`` path, as well as any case
where the registry was lost.
:return: Registry dict with ``hubs`` key containing a list of hub entries.
"""
path = get_hubs_registry_path()
if path.exists():
try:
data: dict[str, Any] = json.loads(path.read_text())
return data
except (json.JSONDecodeError, OSError):
pass
# Registry missing — attempt to rebuild from discovered hub directories.
discovered = _discover_hub_dirs()
if not discovered:
return {"hubs": []}
hubs: list[dict[str, Any]] = []
for hub_dir in discovered:
name = hub_dir.name
# Try to read the git remote URL
git_url: str = ""
try:
import subprocess as _sp
r = _sp.run(
["git", "-C", str(hub_dir), "remote", "get-url", "origin"],
check=False, capture_output=True, text=True, timeout=5,
)
if r.returncode == 0:
git_url = r.stdout.strip()
except Exception: # noqa: S110 - git URL is optional, failure is acceptable
pass
hubs.append({
"name": name,
"path": str(hub_dir),
"git_url": git_url,
"is_default": name == SECPIPE_DEFAULT_HUB_NAME,
})
registry: dict[str, Any] = {"hubs": hubs}
# Persist so we don't re-scan on every load
with contextlib.suppress(OSError):
save_hubs_registry(registry)
return registry
def save_hubs_registry(registry: dict[str, Any]) -> None:
"""Save the hubs registry to disk.
:param registry: Registry dict to persist.
"""
path = get_hubs_registry_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(registry, indent=2))
def scan_hub_for_servers(hub_path: Path) -> list[dict[str, Any]]:
"""Scan a hub directory for MCP tool Dockerfiles.
Looks for the ``category/tool-name/Dockerfile`` pattern and generates
a server configuration entry for each discovered tool.
:param hub_path: Root directory of the hub repository.
:return: Sorted list of server configuration dicts.
"""
servers: list[dict[str, Any]] = []
if not hub_path.is_dir():
return servers
for dockerfile in sorted(hub_path.rglob("Dockerfile")):
rel = dockerfile.relative_to(hub_path)
parts = rel.parts
# Expected layout: category/tool-name/Dockerfile (exactly 3 parts)
if len(parts) != 3:
continue
category, tool_name, _ = parts
if category in _SCAN_SKIP_DIRS:
continue
capabilities: list[str] = []
if category in _NET_RAW_CATEGORIES:
capabilities = ["NET_RAW"]
servers.append(
{
"name": tool_name,
"description": f"{tool_name}{category}",
"type": "docker",
"image": f"{tool_name}:latest",
"category": category,
"capabilities": capabilities,
"volumes": [f"{get_secpipe_user_dir()}/hub/workspace:/data"],
"enabled": True,
}
)
return servers
def link_hub(
name: str,
path: str | Path,
git_url: str | None = None,
is_default: bool = False,
) -> str:
"""Link a hub directory and add its servers to hub-config.json.
:param name: Display name for the hub.
:param path: Local directory path containing the hub.
:param git_url: Optional git remote URL (for tracking).
:param is_default: Whether this is the default FuzzingLabs hub.
:return: Result message string.
"""
hub_path = Path(path).resolve()
if not hub_path.is_dir():
return f"Error: directory not found: {hub_path}"
# Update registry
registry = load_hubs_registry()
hubs = registry.get("hubs", [])
# Remove existing entry with same name
hubs = [h for h in hubs if h.get("name") != name]
hubs.append(
{
"name": name,
"path": str(hub_path),
"git_url": git_url,
"is_default": is_default,
}
)
registry["hubs"] = hubs
save_hubs_registry(registry)
# Scan and update hub-config.json
scanned = scan_hub_for_servers(hub_path)
if not scanned:
return f"Linked '{name}' (0 servers found)"
try:
added = _merge_servers_into_hub_config(name, scanned)
except Exception as exc:
return f"Linked '{name}' but config update failed: {exc}"
return f"Linked '{name}'{added} new servers added ({len(scanned)} scanned)"
def unlink_hub(name: str) -> str:
"""Unlink a hub and remove its servers from hub-config.json.
:param name: Name of the hub to unlink.
:return: Result message string.
"""
registry = load_hubs_registry()
hubs = registry.get("hubs", [])
if not any(h.get("name") == name for h in hubs):
return f"Hub '{name}' is not linked"
hubs = [h for h in hubs if h.get("name") != name]
registry["hubs"] = hubs
save_hubs_registry(registry)
try:
removed = _remove_hub_servers_from_config(name)
except Exception:
removed = 0
return f"Unlinked '{name}'{removed} server(s) removed"
def clone_hub(
git_url: str,
dest: Path | None = None,
name: str | None = None,
) -> tuple[bool, str, Path | None]:
"""Clone a git hub repository.
If the destination already exists and is a git repo, pulls instead.
:param git_url: Git remote URL to clone.
:param dest: Destination directory (auto-derived from URL if *None*).
:param name: Hub name (auto-derived from URL if *None*).
:return: Tuple of ``(success, message, clone_path)``.
"""
if name is None:
name = git_url.rstrip("/").split("/")[-1]
name = name.removesuffix(".git")
if dest is None:
dest = get_default_hubs_dir() / name
if dest.exists():
if (dest / ".git").is_dir():
try:
result = subprocess.run(
["git", "-C", str(dest), "pull"],
check=False, capture_output=True,
text=True,
timeout=120,
)
if result.returncode == 0:
return True, f"Updated existing clone at {dest}", dest
return False, f"Git pull failed: {result.stderr.strip()}", None
except subprocess.TimeoutExpired:
return False, "Git pull timed out", None
except FileNotFoundError:
return False, "Git not found", None
return False, f"Directory already exists (not a git repo): {dest}", None
dest.parent.mkdir(parents=True, exist_ok=True)
try:
result = subprocess.run(
["git", "clone", git_url, str(dest)],
check=False, capture_output=True,
text=True,
timeout=300,
)
if result.returncode == 0:
return True, f"Cloned to {dest}", dest
return False, f"Git clone failed: {result.stderr.strip()}", None
except subprocess.TimeoutExpired:
return False, "Git clone timed out (5 min limit)", None
except FileNotFoundError:
return False, "Git not found on PATH", None
def _merge_servers_into_hub_config(
hub_name: str,
servers: list[dict[str, Any]],
) -> int:
"""Merge scanned servers into hub-config.json.
Only adds servers whose name does not already exist in the config.
New entries are tagged with ``source_hub`` for later removal.
:param hub_name: Name of the source hub (used for tagging).
:param servers: List of server dicts from :func:`scan_hub_for_servers`.
:return: Number of newly added servers.
"""
secpipe_root = find_secpipe_root()
config_path = secpipe_root / "hub-config.json"
if config_path.exists():
try:
config = json.loads(config_path.read_text())
except json.JSONDecodeError:
config = {"servers": [], "default_timeout": 300, "cache_tools": True}
else:
config = {"servers": [], "default_timeout": 300, "cache_tools": True}
existing = config.get("servers", [])
existing_names = {s.get("name") for s in existing}
added = 0
for server in servers:
if server["name"] not in existing_names:
server["source_hub"] = hub_name
existing.append(server)
existing_names.add(server["name"])
added += 1
config["servers"] = existing
config_path.write_text(json.dumps(config, indent=2))
return added
def _remove_hub_servers_from_config(hub_name: str) -> int:
"""Remove servers belonging to a hub from hub-config.json.
Only removes servers tagged with the given ``source_hub`` value.
Manually-added servers (without a tag) are preserved.
:param hub_name: Name of the hub whose servers should be removed.
:return: Number of servers removed.
"""
secpipe_root = find_secpipe_root()
config_path = secpipe_root / "hub-config.json"
if not config_path.exists():
return 0
try:
config = json.loads(config_path.read_text())
except json.JSONDecodeError:
return 0
existing = config.get("servers", [])
before = len(existing)
config["servers"] = [s for s in existing if s.get("source_hub") != hub_name]
after = len(config["servers"])
config_path.write_text(json.dumps(config, indent=2))
return before - after
def find_dockerfile_for_server(server_name: str, hub_name: str) -> Path | None:
"""Find the Dockerfile for a hub server tool.
Looks up the hub path from the registry, then scans for
``category/<server_name>/Dockerfile``.
:param server_name: Tool name (e.g. ``"nmap-mcp"``).
:param hub_name: Hub name as stored in the registry.
:return: Absolute path to the Dockerfile, or ``None`` if not found.
"""
registry = load_hubs_registry()
hub_entry = next(
(h for h in registry.get("hubs", []) if h.get("name") == hub_name),
None,
)
if not hub_entry:
return None
hub_path = Path(hub_entry["path"])
for dockerfile in hub_path.rglob("Dockerfile"):
rel = dockerfile.relative_to(hub_path)
parts = rel.parts
if len(parts) == 3 and parts[1] == server_name:
return dockerfile
return None
def build_image(
image: str,
dockerfile: Path,
*,
engine: str | None = None,
) -> subprocess.Popen[str]:
"""Start a non-blocking ``docker/podman build`` subprocess.
Returns the running :class:`subprocess.Popen` object so the caller
can stream ``stdout`` / ``stderr`` lines incrementally.
:param image: Image tag (e.g. ``"nmap-mcp:latest"``).
:param dockerfile: Path to the ``Dockerfile``.
:param engine: ``"docker"`` or ``"podman"`` (auto-detected if ``None``).
:return: Running subprocess with merged stdout+stderr.
"""
if engine is None:
engine = os.environ.get("SECPIPE_ENGINE__TYPE", "docker").lower()
engine = "podman" if engine == "podman" else "docker"
context_dir = str(dockerfile.parent)
return subprocess.Popen(
[engine, "build", "-t", image, context_dir],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
@@ -0,0 +1 @@
"""TUI screens for SecPipe."""
@@ -0,0 +1,96 @@
"""Agent setup and unlink modal screens for SecPipe TUI.
Provides context-aware modals that receive the target agent directly
from the dashboard row selection — no redundant agent picker needed.
"""
from __future__ import annotations
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Label, RadioButton, RadioSet
from secpipe_cli.commands.mcp import AIAgent
from secpipe_cli.tui.helpers import install_agent_config, uninstall_agent_config
class AgentSetupScreen(ModalScreen[str | None]):
"""Modal for linking a specific agent — only asks for engine choice."""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(self, agent: AIAgent, display_name: str) -> None:
super().__init__()
self._agent = agent
self._display_name = display_name
def compose(self) -> ComposeResult:
"""Compose the setup dialog layout."""
with Vertical(id="setup-dialog"):
yield Label(f"Setup {self._display_name}", classes="dialog-title")
yield Label("Container Engine:", classes="field-label")
yield RadioSet(
RadioButton("Docker", value=True),
RadioButton("Podman"),
id="engine-select",
)
with Horizontal(classes="dialog-buttons"):
yield Button("Install", variant="primary", id="btn-install")
yield Button("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button clicks."""
if event.button.id == "btn-cancel":
self.dismiss(None)
elif event.button.id == "btn-install":
self._do_install()
def action_cancel(self) -> None:
"""Dismiss the dialog without action."""
self.dismiss(None)
def _do_install(self) -> None:
"""Execute the installation."""
engine_set = self.query_one("#engine-select", RadioSet)
engine = "docker" if engine_set.pressed_index <= 0 else "podman"
result = install_agent_config(self._agent, engine, force=True)
self.dismiss(result)
class AgentUnlinkScreen(ModalScreen[str | None]):
"""Confirmation modal for unlinking a specific agent."""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(self, agent: AIAgent, display_name: str) -> None:
super().__init__()
self._agent = agent
self._display_name = display_name
def compose(self) -> ComposeResult:
"""Compose the unlink confirmation layout."""
with Vertical(id="unlink-dialog"):
yield Label(f"Unlink {self._display_name}?", classes="dialog-title")
yield Label(
f"This will remove the SecPipe MCP configuration from {self._display_name}.",
)
with Horizontal(classes="dialog-buttons"):
yield Button("Unlink", variant="warning", id="btn-unlink")
yield Button("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button clicks."""
if event.button.id == "btn-cancel":
self.dismiss(None)
elif event.button.id == "btn-unlink":
result = uninstall_agent_config(self._agent)
self.dismiss(result)
def action_cancel(self) -> None:
"""Dismiss without action."""
self.dismiss(None)
@@ -0,0 +1,58 @@
"""Build-image confirm dialog for SecPipe TUI.
Simple modal that asks the user to confirm before starting a background
build. The actual build is managed by the app so the user is never
locked on this screen.
"""
from __future__ import annotations
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Label
class _NoFocusButton(Button):
can_focus = False
class BuildImageScreen(ModalScreen[bool]):
"""Quick confirmation before starting a background Docker/Podman build."""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(self, server_name: str, image: str, hub_name: str) -> None:
super().__init__()
self._server_name = server_name
self._image = image
self._hub_name = hub_name
def compose(self) -> ComposeResult:
"""Build the confirmation dialog UI."""
with Vertical(id="build-dialog"):
yield Label(f"Build {self._image}", classes="dialog-title")
yield Label(
f"Hub: {self._hub_name} • Tool: {self._server_name}",
id="build-subtitle",
)
yield Label(
"The image will be built in the background.\n"
"You'll receive a notification when it's done.",
id="confirm-text",
)
with Horizontal(classes="dialog-buttons"):
yield _NoFocusButton("Build", variant="primary", id="btn-build")
yield _NoFocusButton("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle Build or Cancel button clicks."""
if event.button.id == "btn-build":
self.dismiss(result=True)
elif event.button.id == "btn-cancel":
self.dismiss(result=False)
def action_cancel(self) -> None:
"""Dismiss the dialog when Escape is pressed."""
self.dismiss(result=False)
@@ -0,0 +1,80 @@
"""Build-log viewer screen for SecPipe TUI.
Shows live output of a background build started by the app. Polls the
app's ``_build_logs`` buffer every 500 ms so the user can pop this screen
open at any time while the build is running and see up-to-date output.
"""
from __future__ import annotations
from typing import Any
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Label, Log
class _NoFocusButton(Button):
can_focus = False
class BuildLogScreen(ModalScreen[None]):
"""Live log viewer for a background build job managed by the app."""
BINDINGS = [("escape", "close", "Close")]
def __init__(self, image: str) -> None:
super().__init__()
self._image = image
self._last_line: int = 0
def compose(self) -> ComposeResult:
"""Build the log viewer UI."""
with Vertical(id="build-dialog"):
yield Label(f"Build log — {self._image}", classes="dialog-title")
yield Label("", id="build-status")
yield Log(id="build-log", auto_scroll=True)
with Horizontal(classes="dialog-buttons"):
yield _NoFocusButton("Close", variant="default", id="btn-close")
def on_mount(self) -> None:
"""Initialize log polling when the screen is mounted."""
self._flush_log()
self.set_interval(0.5, self._poll_log)
def _flush_log(self) -> None:
"""Write any new lines since the last flush."""
logs: list[str] = getattr(self.app, "_build_logs", {}).get(self._image, [])
log_widget = self.query_one("#build-log", Log)
new_lines = logs[self._last_line :]
for line in new_lines:
log_widget.write_line(line)
self._last_line += len(new_lines)
active: dict[str, Any] = getattr(self.app, "_active_builds", {})
status = self.query_one("#build-status", Label)
if self._image in active:
status.update("[yellow]⏳ Building…[/yellow]")
else:
# Build is done — check if we have a result stored
results: dict[str, Any] = getattr(self.app, "_build_results", {})
if self._image in results:
if results[self._image]:
status.update(f"[green]✓ {self._image} built successfully[/green]")
else:
status.update(f"[red]✗ {self._image} build failed[/red]")
def _poll_log(self) -> None:
"""Poll for new log lines periodically."""
self._flush_log()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle Close button click."""
if event.button.id == "btn-close":
self.dismiss(None)
def action_close(self) -> None:
"""Dismiss the dialog when Escape is pressed."""
self.dismiss(None)
@@ -0,0 +1,301 @@
"""Hub management screens for SecPipe TUI.
Provides modal dialogs for managing linked MCP hub repositories:
- HubManagerScreen: list, add, remove linked hubs
- LinkHubScreen: link a local directory as a hub
- CloneHubScreen: clone a git repo and link it (defaults to FuzzingLabs hub)
"""
from __future__ import annotations
from pathlib import Path
from rich.text import Text
from textual import work
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, DataTable, Input, Label, Static
from secpipe_cli.tui.helpers import (
SECPIPE_DEFAULT_HUB_NAME,
SECPIPE_DEFAULT_HUB_URL,
clone_hub,
link_hub,
load_hubs_registry,
scan_hub_for_servers,
unlink_hub,
)
class HubManagerScreen(ModalScreen[str | None]):
"""Modal screen for managing linked MCP hubs."""
BINDINGS = [("escape", "cancel", "Close")]
def compose(self) -> ComposeResult:
"""Compose the hub manager layout."""
with Vertical(id="hub-manager-dialog"):
yield Label("Hub Manager", classes="dialog-title")
yield DataTable(id="hubs-table")
yield Label("", id="hub-status")
with Horizontal(classes="dialog-buttons"):
yield Button(
"FuzzingLabs Hub",
variant="primary",
id="btn-clone-default",
)
yield Button("Link Path", variant="default", id="btn-link")
yield Button("Clone URL", variant="default", id="btn-clone")
yield Button("Remove", variant="primary", id="btn-remove")
yield Button("Close", variant="default", id="btn-close")
def on_mount(self) -> None:
"""Populate the hubs table on startup."""
self._refresh_hubs()
def _refresh_hubs(self) -> None:
"""Refresh the linked hubs table."""
table = self.query_one("#hubs-table", DataTable)
table.clear(columns=True)
table.add_columns("Name", "Path", "Servers", "Source")
table.cursor_type = "row"
registry = load_hubs_registry()
hubs = registry.get("hubs", [])
if not hubs:
table.add_row(
Text("No hubs linked", style="dim"),
Text("Press 'FuzzingLabs Hub' to get started", style="dim"),
"",
"",
)
return
for hub in hubs:
name = hub.get("name", "unknown")
path = hub.get("path", "")
git_url = hub.get("git_url", "")
is_default = hub.get("is_default", False)
hub_path = Path(path)
count: str | Text
if hub_path.is_dir():
servers = scan_hub_for_servers(hub_path)
count = str(len(servers))
else:
count = Text("dir missing", style="yellow")
source = git_url or "local"
name_cell: str | Text
if is_default:
name_cell = Text(f"{name}", style="bold")
else:
name_cell = name
table.add_row(name_cell, path, count, source)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Route button actions."""
if event.button.id == "btn-close":
self.dismiss("refreshed")
elif event.button.id == "btn-clone-default":
self.app.push_screen(
CloneHubScreen(
SECPIPE_DEFAULT_HUB_URL,
SECPIPE_DEFAULT_HUB_NAME,
is_default=True,
),
callback=self._on_hub_action,
)
elif event.button.id == "btn-link":
self.app.push_screen(
LinkHubScreen(),
callback=self._on_hub_action,
)
elif event.button.id == "btn-clone":
self.app.push_screen(
CloneHubScreen(),
callback=self._on_hub_action,
)
elif event.button.id == "btn-remove":
self._remove_selected()
def _on_hub_action(self, result: str | None) -> None:
"""Handle result from a sub-screen."""
if result:
self.query_one("#hub-status", Label).update(result)
self.app.notify(result)
self._refresh_hubs()
def _remove_selected(self) -> None:
"""Remove the currently selected hub."""
table = self.query_one("#hubs-table", DataTable)
registry = load_hubs_registry()
hubs = registry.get("hubs", [])
if not hubs:
self.app.notify("No hubs to remove", severity="warning")
return
idx = table.cursor_row
if idx is None or idx < 0 or idx >= len(hubs):
self.app.notify("Select a hub to remove", severity="warning")
return
name = hubs[idx].get("name", "")
result = unlink_hub(name)
self.query_one("#hub-status", Label).update(result)
self._refresh_hubs()
self.app.notify(result)
def action_cancel(self) -> None:
"""Close the hub manager."""
self.dismiss("refreshed")
class LinkHubScreen(ModalScreen[str | None]):
"""Modal for linking a local directory as an MCP hub."""
BINDINGS = [("escape", "cancel", "Cancel")]
def compose(self) -> ComposeResult:
"""Compose the link dialog layout."""
with Vertical(id="link-dialog"):
yield Label("Link Local Hub", classes="dialog-title")
yield Label("Hub Name:", classes="field-label")
yield Input(placeholder="my-hub", id="name-input")
yield Label("Directory Path:", classes="field-label")
yield Input(placeholder="/path/to/hub-directory", id="path-input")
yield Label("", id="link-status")
with Horizontal(classes="dialog-buttons"):
yield Button("Link", variant="primary", id="btn-link")
yield Button("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button clicks."""
if event.button.id == "btn-cancel":
self.dismiss(None)
elif event.button.id == "btn-link":
self._do_link()
def _do_link(self) -> None:
"""Execute the link operation."""
name = self.query_one("#name-input", Input).value.strip()
path = self.query_one("#path-input", Input).value.strip()
if not name:
self.app.notify("Please enter a hub name", severity="warning")
return
if not path:
self.app.notify("Please enter a directory path", severity="warning")
return
result = link_hub(name, path)
self.dismiss(result)
def action_cancel(self) -> None:
"""Dismiss without action."""
self.dismiss(None)
class CloneHubScreen(ModalScreen[str | None]):
"""Modal for cloning a git hub repository and linking it.
When instantiated with *is_default=True* and FuzzingLabs URL,
provides a one-click setup for the standard security hub.
"""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(
self,
default_url: str = "",
default_name: str = "",
is_default: bool = False,
) -> None:
super().__init__()
self._default_url = default_url
self._default_name = default_name
self._is_default = is_default
def compose(self) -> ComposeResult:
"""Compose the clone dialog layout."""
title = "Clone FuzzingLabs Hub" if self._is_default else "Clone Git Hub"
with Vertical(id="clone-dialog"):
yield Label(title, classes="dialog-title")
yield Label("Git URL:", classes="field-label")
yield Input(
value=self._default_url,
placeholder="git@github.com:org/repo.git",
id="url-input",
)
yield Label("Hub Name (optional):", classes="field-label")
yield Input(
value=self._default_name,
placeholder="auto-detect from URL",
id="name-input",
)
yield Static("", id="clone-status")
with Horizontal(classes="dialog-buttons"):
yield Button(
"Clone & Link",
variant="primary",
id="btn-clone",
)
yield Button("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button clicks."""
if event.button.id == "btn-cancel":
self.dismiss(None)
elif event.button.id == "btn-clone":
self._start_clone()
def _start_clone(self) -> None:
"""Validate input and start the async clone operation."""
url = self.query_one("#url-input", Input).value.strip()
if not url:
self.app.notify("Please enter a git URL", severity="warning")
return
self.query_one("#btn-clone", Button).disabled = True
self.query_one("#clone-status", Static).update("⏳ Cloning repository...")
self._do_clone(url)
@work(thread=True)
def _do_clone(self, url: str) -> None:
"""Clone the repo in a background thread."""
name_input = self.query_one("#name-input", Input).value.strip()
name = name_input or None
success, msg, path = clone_hub(url, name=name)
if success and path:
hub_name = name or path.name
link_result = link_hub(
hub_name,
path,
git_url=url,
is_default=self._is_default,
)
self.app.call_from_thread(self.dismiss, f"{link_result}")
else:
self.app.call_from_thread(self._on_clone_failed, msg)
def _on_clone_failed(self, msg: str) -> None:
"""Handle a failed clone — re-enable the button and show the error."""
self.query_one("#clone-status", Static).update(f"{msg}")
self.query_one("#btn-clone", Button).disabled = False
def action_cancel(self) -> None:
"""Dismiss without action."""
self.dismiss(None)
+18
View File
@@ -0,0 +1,18 @@
"""CLI utility functions."""
from rich.console import Console
from rich.table import Table
from typer import Exit
def on_error(message: str) -> None:
"""Display an error message and exit.
:param message: Error message to display.
"""
table = Table()
table.add_column("Error")
table.add_row(message)
Console().print(table)
raise Exit(code=1)