From 7924e442456f496fcf2dc3eacfc5fcbe22743db2 Mon Sep 17 00:00:00 2001 From: AFredefon Date: Mon, 16 Mar 2026 02:09:04 +0100 Subject: [PATCH] feat(hub): volume mounts, get_agent_context convention, category filter --- fuzzforge-mcp/src/fuzzforge_mcp/tools/hub.py | 92 ++++++++++++++++++-- 1 file changed, 87 insertions(+), 5 deletions(-) diff --git a/fuzzforge-mcp/src/fuzzforge_mcp/tools/hub.py b/fuzzforge-mcp/src/fuzzforge_mcp/tools/hub.py index 33d724a..dbdf491 100644 --- a/fuzzforge-mcp/src/fuzzforge_mcp/tools/hub.py +++ b/fuzzforge-mcp/src/fuzzforge_mcp/tools/hub.py @@ -20,10 +20,41 @@ from fuzzforge_mcp.dependencies import get_project_path, get_settings, get_stora mcp: FastMCP = FastMCP() +# Name of the convention tool that hub servers can implement to provide +# rich usage context for AI agents (known issues, workflow tips, rules, etc.). +_AGENT_CONTEXT_TOOL = "get_agent_context" + # Global hub executor instance (lazy initialization) _hub_executor: HubExecutor | None = None +async def _fetch_agent_context( + executor: HubExecutor, + server_name: str, + tools: list[Any], +) -> str | None: + """Call get_agent_context if the server provides it. + + Returns the context string, or None if the server doesn't implement + the convention or the call fails. + """ + if not any(t.name == _AGENT_CONTEXT_TOOL for t in tools): + return None + try: + result = await executor.execute_tool( + identifier=f"hub:{server_name}:{_AGENT_CONTEXT_TOOL}", + arguments={}, + ) + if result.success and result.result: + content = result.result.get("content", []) + if content and isinstance(content, list): + text: str = content[0].get("text", "") + return text + except Exception: # noqa: BLE001, S110 - best-effort context fetch + pass + return None + + def _get_hub_executor() -> HubExecutor: """Get or create the hub executor instance. @@ -50,12 +81,15 @@ def _get_hub_executor() -> HubExecutor: @mcp.tool -async def list_hub_servers() -> dict[str, Any]: +async def list_hub_servers(category: str | None = None) -> dict[str, Any]: """List all registered MCP hub servers. Returns information about configured hub servers, including their connection type, status, and discovered tool count. + :param category: Optional category to filter by (e.g. "binary-analysis", + "web-security", "reconnaissance"). Only servers in this category + are returned. :return: Dictionary with list of hub servers. """ @@ -63,6 +97,9 @@ async def list_hub_servers() -> dict[str, Any]: executor = _get_hub_executor() servers = executor.list_servers() + if category: + servers = [s for s in servers if s.get("category") == category] + return { "servers": servers, "count": len(servers), @@ -93,7 +130,14 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]: if server_name: tools = await executor.discover_server_tools(server_name) - return { + + # Convention: auto-fetch agent context if server provides it. + agent_context = await _fetch_agent_context(executor, server_name, tools) + + # Hide the convention tool from the agent's tool list. + visible_tools = [t for t in tools if t.name != "get_agent_context"] + + result: dict[str, Any] = { "server": server_name, "tools": [ { @@ -102,15 +146,24 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]: "description": t.description, "parameters": [p.model_dump() for p in t.parameters], } - for t in tools + for t in visible_tools ], - "count": len(tools), + "count": len(visible_tools), } + if agent_context: + result["agent_context"] = agent_context + return result else: results = await executor.discover_all_tools() all_tools = [] + contexts: dict[str, str] = {} for server, tools in results.items(): + ctx = await _fetch_agent_context(executor, server, tools) + if ctx: + contexts[server] = ctx for tool in tools: + if tool.name == "get_agent_context": + continue all_tools.append({ "identifier": tool.identifier, "name": tool.name, @@ -119,11 +172,14 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]: "parameters": [p.model_dump() for p in tool.parameters], }) - return { + result = { "servers_discovered": len(results), "tools": all_tools, "count": len(all_tools), } + if contexts: + result["agent_contexts"] = contexts + return result except Exception as e: if isinstance(e, ToolError): @@ -183,6 +239,11 @@ async def execute_hub_tool( Always use /app/uploads/ or /app/samples/ when passing file paths to hub tools — do NOT use the host path. + Tool outputs are persisted to a writable shared volume: + - /app/output/ (writable — extraction results, reports, etc.) + Files written here survive container destruction and are available + to subsequent tool calls. The host path is .fuzzforge/output/. + """ try: executor = _get_hub_executor() @@ -191,6 +252,7 @@ async def execute_hub_tool( # Mounts the assets directory at the standard paths used by hub tools: # /app/uploads — binwalk, and other tools that use UPLOAD_DIR # /app/samples — yara, capa, and other tools that use SAMPLES_DIR + # /app/output — writable volume for tool outputs (persists across calls) extra_volumes: list[str] = [] try: storage = get_storage() @@ -202,6 +264,9 @@ async def execute_hub_tool( f"{assets_str}:/app/uploads:ro", f"{assets_str}:/app/samples:ro", ] + output_path = storage.get_project_output_path(project_path) + if output_path: + extra_volumes.append(f"{output_path!s}:/app/output:rw") except Exception: # noqa: BLE001 - never block tool execution due to asset injection failure extra_volumes = [] @@ -212,6 +277,20 @@ async def execute_hub_tool( extra_volumes=extra_volumes or None, ) + # Record execution history for list_executions / get_execution_results. + try: + storage = get_storage() + project_path = get_project_path() + storage.record_execution( + project_path=project_path, + server_name=result.server_name, + tool_name=result.tool_name, + arguments=arguments or {}, + result=result.to_dict(), + ) + except Exception: # noqa: BLE001, S110 - never fail the tool call due to recording issues + pass + return result.to_dict() except Exception as e: @@ -372,6 +451,9 @@ async def start_hub_server(server_name: str) -> dict[str, Any]: f"{assets_str}:/app/uploads:ro", f"{assets_str}:/app/samples:ro", ] + output_path = storage.get_project_output_path(project_path) + if output_path: + extra_volumes.append(f"{output_path!s}:/app/output:rw") except Exception: # noqa: BLE001 - never block server start due to asset injection failure extra_volumes = []