From f7d090bfde587306554b23184ebc762fbc5884ca Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Fri, 3 Apr 2026 08:46:30 +0000 Subject: [PATCH] Close the stdio MCP lifecycle parity gap The runtime already had stdio MCP bootstrap, initialize, tool discovery, and tool call plumbing, but the user-facing MCP tools still returned stub payloads. This wires ListMcpResources, ReadMcpResource, McpAuth, and MCP through real config-loaded MCP manager lifecycles, adds resource listing/reading support to the manager, and updates parity notes to reflect the new stdio-only coverage.\n\nConstraint: Keep scope on the MCP parity lane and avoid behavior changes in already-landed task/team/cron/bash/file tool work beyond lint-safe compile integration\nRejected: Implement remote MCP transports and browser OAuth flow now | broader than the requested parity slice\nConfidence: high\nScope-risk: moderate\nDirective: Reuse the manager-backed lifecycle for future remote transport support instead of adding new tool-local stubs\nTested: cargo fmt; cargo clippy --workspace --all-targets -- -D warnings; cargo test --workspace\nNot-tested: Live OAuth/browser auth flow; non-stdio MCP transports --- PARITY.md | 12 +- rust/crates/runtime/src/file_ops.rs | 5 + rust/crates/runtime/src/lib.rs | 10 +- rust/crates/runtime/src/mcp_stdio.rs | 242 +++++++++ rust/crates/runtime/src/task_registry.rs | 5 +- rust/crates/runtime/src/team_cron_registry.rs | 9 +- rust/crates/tools/src/lib.rs | 511 +++++++++++++++++- 7 files changed, 769 insertions(+), 25 deletions(-) diff --git a/PARITY.md b/PARITY.md index b573509..a5a877b 100644 --- a/PARITY.md +++ b/PARITY.md @@ -48,6 +48,10 @@ Canonical scenario map: `rust/mock_parity_scenarios.json` | **Sleep** | `tools` | delay execution — **good parity** | | **SendUserMessage/Brief** | `tools` | user-facing message — **good parity** | | **Config** | `tools` | config inspection — **moderate parity** | +| **ListMcpResources** | `tools` + `runtime::mcp_stdio` | stdio MCP connect/list resources/disconnect — **moderate parity**. Missing: remote transports | +| **ReadMcpResource** | `tools` + `runtime::mcp_stdio` | stdio MCP connect/read resource/disconnect — **moderate parity**. Missing: remote transports | +| **McpAuth** | `tools` + `runtime::mcp_client` | stdio no-auth connect probe + OAuth requirement reporting — **partial parity**. Missing: interactive OAuth/browser flow | +| **MCP** | `tools` + `runtime::mcp_stdio` | stdio MCP connect/list tools/call/disconnect — **moderate parity**. Missing: remote transports | | **EnterPlanMode** | `tools` | worktree plan mode toggle — **good parity** | | **ExitPlanMode** | `tools` | worktree plan mode restore — **good parity** | | **StructuredOutput** | `tools` | passthrough JSON — **good parity** | @@ -71,10 +75,6 @@ Canonical scenario map: `rust/mock_parity_scenarios.json` | **CronDelete** | stub | needs cron registry | | **CronList** | stub | needs cron registry | | **LSP** | stub | needs language server client | -| **ListMcpResources** | stub | needs MCP client | -| **ReadMcpResource** | stub | needs MCP client | -| **McpAuth** | stub | needs OAuth flow | -| **MCP** | stub | needs MCP tool proxy | | **RemoteTrigger** | stub | needs HTTP client | | **TestingPermission** | stub | test-only, low priority | @@ -108,7 +108,9 @@ Harness note: milestone 2 validates bash success plus workspace-write escalation Harness note: read_file, grep_search, write_file allow/deny, and multi-tool same-turn assembly are now covered by the mock parity harness. **Config/Plugin/MCP flows:** -- [ ] Full MCP server lifecycle (connect, list tools, call tool, disconnect) +- [x] Stdio MCP lifecycle (connect, list tools/resources, call tool, read resource, disconnect) +- [ ] Remote MCP transports (HTTP/SSE/WS/managed proxy) +- [ ] Interactive MCP OAuth/browser auth flow - [ ] Plugin install/enable/disable/uninstall full flow - [ ] Config merge precedence (user > project > local) diff --git a/rust/crates/runtime/src/file_ops.rs b/rust/crates/runtime/src/file_ops.rs index 770efd4..01b4d47 100644 --- a/rust/crates/runtime/src/file_ops.rs +++ b/rust/crates/runtime/src/file_ops.rs @@ -28,6 +28,7 @@ fn is_binary_file(path: &Path) -> io::Result { /// Validate that a resolved path stays within the given workspace root. /// Returns the canonical path on success, or an error if the path escapes /// the workspace boundary (e.g. via `../` traversal or symlink). +#[allow(dead_code)] fn validate_workspace_boundary(resolved: &Path, workspace_root: &Path) -> io::Result<()> { if !resolved.starts_with(workspace_root) { return Err(io::Error::new( @@ -544,6 +545,7 @@ fn normalize_path_allow_missing(path: &str) -> io::Result { } /// Read a file with workspace boundary enforcement. +#[allow(dead_code)] pub fn read_file_in_workspace( path: &str, offset: Option, @@ -559,6 +561,7 @@ pub fn read_file_in_workspace( } /// Write a file with workspace boundary enforcement. +#[allow(dead_code)] pub fn write_file_in_workspace( path: &str, content: &str, @@ -573,6 +576,7 @@ pub fn write_file_in_workspace( } /// Edit a file with workspace boundary enforcement. +#[allow(dead_code)] pub fn edit_file_in_workspace( path: &str, old_string: &str, @@ -589,6 +593,7 @@ pub fn edit_file_in_workspace( } /// Check whether a path is a symlink that resolves outside the workspace. +#[allow(dead_code)] pub fn is_symlink_escape(path: &Path, workspace_root: &Path) -> io::Result { let metadata = fs::symlink_metadata(path)?; if !metadata.is_symlink() { diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index b3a43d5..9d86ba8 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -57,11 +57,11 @@ pub use mcp_client::{ }; pub use mcp_stdio::{ spawn_mcp_stdio_process, JsonRpcError, JsonRpcId, JsonRpcRequest, JsonRpcResponse, - ManagedMcpTool, McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, - McpInitializeServerInfo, McpListResourcesParams, McpListResourcesResult, McpListToolsParams, - McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpResource, - McpResourceContents, McpServerManager, McpServerManagerError, McpStdioProcess, McpTool, - McpToolCallContent, McpToolCallParams, McpToolCallResult, UnsupportedMcpServer, + ManagedMcpResource, ManagedMcpTool, McpInitializeClientInfo, McpInitializeParams, + McpInitializeResult, McpInitializeServerInfo, McpListResourcesParams, McpListResourcesResult, + McpListToolsParams, McpListToolsResult, McpReadResourceParams, McpReadResourceResult, + McpResource, McpResourceContents, McpServerManager, McpServerManagerError, McpStdioProcess, + McpTool, McpToolCallContent, McpToolCallParams, McpToolCallResult, UnsupportedMcpServer, }; pub use oauth::{ clear_oauth_credentials, code_challenge_s256, credentials_path, generate_pkce_pair, diff --git a/rust/crates/runtime/src/mcp_stdio.rs b/rust/crates/runtime/src/mcp_stdio.rs index 49611a2..166d06e 100644 --- a/rust/crates/runtime/src/mcp_stdio.rs +++ b/rust/crates/runtime/src/mcp_stdio.rs @@ -25,6 +25,11 @@ const MCP_LIST_TOOLS_TIMEOUT_MS: u64 = 300; #[cfg(not(test))] const MCP_LIST_TOOLS_TIMEOUT_MS: u64 = 30_000; +#[cfg(test)] +const MCP_LIST_RESOURCES_TIMEOUT_MS: u64 = 300; +#[cfg(not(test))] +const MCP_LIST_RESOURCES_TIMEOUT_MS: u64 = 30_000; + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(untagged)] pub enum JsonRpcId { @@ -223,6 +228,12 @@ pub struct ManagedMcpTool { pub tool: McpTool, } +#[derive(Debug, Clone, PartialEq)] +pub struct ManagedMcpResource { + pub server_name: String, + pub resource: McpResource, +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct UnsupportedMcpServer { pub server_name: String, @@ -420,6 +431,65 @@ impl McpServerManager { Ok(discovered_tools) } + pub async fn list_resources( + &mut self, + server_name: Option<&str>, + ) -> Result, McpServerManagerError> { + let server_names = match server_name { + Some(server_name) => vec![server_name.to_string()], + None => self.servers.keys().cloned().collect::>(), + }; + let mut resources = Vec::new(); + + for server_name in server_names { + resources.extend(self.list_resources_for_server(&server_name).await?); + } + + Ok(resources) + } + + pub async fn read_resource( + &mut self, + server_name: &str, + uri: &str, + ) -> Result, McpServerManagerError> { + let timeout_ms = self.tool_call_timeout_ms(server_name)?; + + self.ensure_server_ready(server_name).await?; + let request_id = self.take_request_id(); + let response = + { + let server = self.server_mut(server_name)?; + let process = server.process.as_mut().ok_or_else(|| { + McpServerManagerError::InvalidResponse { + server_name: server_name.to_string(), + method: "resources/read", + details: "server process missing after initialization".to_string(), + } + })?; + Self::run_process_request( + server_name, + "resources/read", + timeout_ms, + process.read_resource( + request_id, + McpReadResourceParams { + uri: uri.to_string(), + }, + ), + ) + .await + }; + + if let Err(error) = &response { + if Self::should_reset_server(error) { + self.reset_server(server_name).await?; + } + } + + response + } + pub async fn call_tool( &mut self, qualified_tool_name: &str, @@ -623,6 +693,94 @@ impl McpServerManager { Ok(discovered_tools) } + async fn list_resources_for_server( + &mut self, + server_name: &str, + ) -> Result, McpServerManagerError> { + let mut attempts = 0; + + loop { + match self.list_resources_for_server_once(server_name).await { + Ok(resources) => return Ok(resources), + Err(error) if attempts == 0 && Self::is_retryable_error(&error) => { + self.reset_server(server_name).await?; + attempts += 1; + } + Err(error) => { + if Self::should_reset_server(&error) { + self.reset_server(server_name).await?; + } + return Err(error); + } + } + } + } + + async fn list_resources_for_server_once( + &mut self, + server_name: &str, + ) -> Result, McpServerManagerError> { + self.ensure_server_ready(server_name).await?; + + let mut discovered_resources = Vec::new(); + let mut cursor = None; + loop { + let request_id = self.take_request_id(); + let response = { + let server = self.server_mut(server_name)?; + let process = server.process.as_mut().ok_or_else(|| { + McpServerManagerError::InvalidResponse { + server_name: server_name.to_string(), + method: "resources/list", + details: "server process missing after initialization".to_string(), + } + })?; + Self::run_process_request( + server_name, + "resources/list", + MCP_LIST_RESOURCES_TIMEOUT_MS, + process.list_resources( + request_id, + Some(McpListResourcesParams { + cursor: cursor.clone(), + }), + ), + ) + .await? + }; + + if let Some(error) = response.error { + return Err(McpServerManagerError::JsonRpc { + server_name: server_name.to_string(), + method: "resources/list", + error, + }); + } + + let result = response + .result + .ok_or_else(|| McpServerManagerError::InvalidResponse { + server_name: server_name.to_string(), + method: "resources/list", + details: "missing result payload".to_string(), + })?; + + for resource in result.resources { + discovered_resources.push(ManagedMcpResource { + server_name: server_name.to_string(), + resource, + }); + } + + match result.next_cursor { + Some(next_cursor) => cursor = Some(next_cursor), + None => break, + } + } + + Ok(discovered_resources) + } + async fn reset_server(&mut self, server_name: &str) -> Result<(), McpServerManagerError> { let mut process = { let server = self.server_mut(server_name)?; @@ -1386,6 +1544,36 @@ mod tests { " 'isError': False", " }", " })", + " elif method == 'resources/list':", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'result': {", + " 'resources': [", + " {", + " 'uri': f'resource://{LABEL}/guide.txt',", + " 'name': f'{LABEL}-guide',", + " 'description': f'Guide for {LABEL}',", + " 'mimeType': 'text/plain'", + " }", + " ]", + " }", + " })", + " elif method == 'resources/read':", + " uri = request['params']['uri']", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'result': {", + " 'contents': [", + " {", + " 'uri': uri,", + " 'mimeType': 'text/plain',", + " 'text': f'{LABEL} contents for {uri}'", + " }", + " ]", + " }", + " })", " else:", " send_message({", " 'jsonrpc': '2.0',", @@ -1936,6 +2124,60 @@ mod tests { }); } + #[test] + fn manager_lists_and_reads_resources_from_stdio_servers() { + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("runtime"); + runtime.block_on(async { + let script_path = write_manager_mcp_server_script(); + let root = script_path.parent().expect("script parent"); + let log_path = root.join("resources.log"); + let servers = BTreeMap::from([( + "alpha".to_string(), + manager_server_config(&script_path, "alpha", &log_path), + )]); + let mut manager = McpServerManager::from_servers(&servers); + + let resources = manager + .list_resources(Some("alpha")) + .await + .expect("list resources"); + + assert_eq!(resources.len(), 1); + assert_eq!(resources[0].server_name, "alpha"); + assert_eq!(resources[0].resource.uri, "resource://alpha/guide.txt"); + assert_eq!(resources[0].resource.name.as_deref(), Some("alpha-guide")); + + let read = manager + .read_resource("alpha", "resource://alpha/guide.txt") + .await + .expect("read resource"); + + assert_eq!( + read.result.as_ref().map(|result| result.contents.len()), + Some(1) + ); + assert_eq!( + read.result + .as_ref() + .and_then(|result| result.contents.first()) + .and_then(|content| content.text.as_deref()), + Some("alpha contents for resource://alpha/guide.txt") + ); + + let log = fs::read_to_string(&log_path).expect("read log"); + assert_eq!( + log.lines().collect::>(), + vec!["initialize", "resources/list", "resources/read"] + ); + + manager.shutdown().await.expect("shutdown"); + cleanup_script(&script_path); + }); + } + #[test] fn manager_times_out_slow_tool_calls() { let runtime = Builder::new_current_thread() diff --git a/rust/crates/runtime/src/task_registry.rs b/rust/crates/runtime/src/task_registry.rs index 861f9bc..6c002d7 100644 --- a/rust/crates/runtime/src/task_registry.rs +++ b/rust/crates/runtime/src/task_registry.rs @@ -103,18 +103,20 @@ impl TaskRegistry { } /// Look up a task by ID. + #[must_use] pub fn get(&self, task_id: &str) -> Option { let inner = self.inner.lock().expect("registry lock poisoned"); inner.tasks.get(task_id).cloned() } /// List all tasks, optionally filtered by status. + #[must_use] pub fn list(&self, status_filter: Option) -> Vec { let inner = self.inner.lock().expect("registry lock poisoned"); inner .tasks .values() - .filter(|t| status_filter.map_or(true, |s| t.status == s)) + .filter(|t| status_filter.is_none_or(|s| t.status == s)) .cloned() .collect() } @@ -206,6 +208,7 @@ impl TaskRegistry { } /// Remove a task from the registry. + #[must_use] pub fn remove(&self, task_id: &str) -> Option { let mut inner = self.inner.lock().expect("registry lock poisoned"); inner.tasks.remove(task_id) diff --git a/rust/crates/runtime/src/team_cron_registry.rs b/rust/crates/runtime/src/team_cron_registry.rs index 2fc14cc..6f5df4c 100644 --- a/rust/crates/runtime/src/team_cron_registry.rs +++ b/rust/crates/runtime/src/team_cron_registry.rs @@ -70,6 +70,7 @@ impl TeamRegistry { } /// Create a new team with the given name and task IDs. + #[must_use] pub fn create(&self, name: &str, task_ids: Vec) -> Team { let mut inner = self.inner.lock().expect("team registry lock poisoned"); inner.counter += 1; @@ -88,12 +89,14 @@ impl TeamRegistry { } /// Get a team by ID. + #[must_use] pub fn get(&self, team_id: &str) -> Option { let inner = self.inner.lock().expect("team registry lock poisoned"); inner.teams.get(team_id).cloned() } /// List all teams. + #[must_use] pub fn list(&self) -> Vec { let inner = self.inner.lock().expect("team registry lock poisoned"); inner.teams.values().cloned().collect() @@ -112,6 +115,7 @@ impl TeamRegistry { } /// Remove a team entirely from the registry. + #[must_use] pub fn remove(&self, team_id: &str) -> Option { let mut inner = self.inner.lock().expect("team registry lock poisoned"); inner.teams.remove(team_id) @@ -166,6 +170,7 @@ impl CronRegistry { } /// Create a new cron entry. + #[must_use] pub fn create(&self, schedule: &str, prompt: &str, description: Option<&str>) -> CronEntry { let mut inner = self.inner.lock().expect("cron registry lock poisoned"); inner.counter += 1; @@ -187,12 +192,14 @@ impl CronRegistry { } /// Get a cron entry by ID. + #[must_use] pub fn get(&self, cron_id: &str) -> Option { let inner = self.inner.lock().expect("cron registry lock poisoned"); inner.entries.get(cron_id).cloned() } /// List all cron entries, optionally filtered to enabled only. + #[must_use] pub fn list(&self, enabled_only: bool) -> Vec { let inner = self.inner.lock().expect("cron registry lock poisoned"); inner @@ -284,7 +291,7 @@ mod tests { assert_eq!(still_there.status, TeamStatus::Deleted); // Hard remove - registry.remove(&t2.team_id); + let _ = registry.remove(&t2.team_id); assert_eq!(registry.len(), 1); } diff --git a/rust/crates/tools/src/lib.rs b/rust/crates/tools/src/lib.rs index 34f32fd..d602d92 100644 --- a/rust/crates/tools/src/lib.rs +++ b/rust/crates/tools/src/lib.rs @@ -14,9 +14,11 @@ use runtime::{ edit_file, execute_bash, glob_search, grep_search, load_system_prompt, read_file, task_registry::TaskRegistry, team_cron_registry::{CronRegistry, TeamRegistry}, - write_file, ApiClient, ApiRequest, AssistantEvent, BashCommandInput, ContentBlock, - ConversationMessage, ConversationRuntime, GrepSearchInput, MessageRole, PermissionMode, - PermissionPolicy, PromptCacheEvent, RuntimeError, Session, ToolError, ToolExecutor, + write_file, ApiClient, ApiRequest, AssistantEvent, BashCommandInput, ConfigLoader, + ContentBlock, ConversationMessage, ConversationRuntime, GrepSearchInput, McpClientAuth, + McpClientTransport, McpServerManager, McpTransport, MessageRole, PermissionMode, + PermissionPolicy, PromptCacheEvent, RuntimeConfig, RuntimeError, Session, ToolError, + ToolExecutor, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -1116,31 +1118,254 @@ fn run_lsp(input: LspInput) -> Result { })) } +fn load_runtime_config_for_current_dir() -> Result { + let cwd = std::env::current_dir().map_err(|error| error.to_string())?; + ConfigLoader::default_for(cwd) + .load() + .map_err(|error| error.to_string()) +} + +fn prepare_mcp_manager( +) -> Result<(tokio::runtime::Runtime, RuntimeConfig, McpServerManager), String> { + let config = load_runtime_config_for_current_dir()?; + let runtime = tokio::runtime::Runtime::new().map_err(|error| error.to_string())?; + let manager = McpServerManager::from_runtime_config(&config); + Ok((runtime, config, manager)) +} + +fn finish_mcp_manager_operation( + runtime: &tokio::runtime::Runtime, + manager: &mut McpServerManager, + result: Result, +) -> Result { + let result = result.map_err(|error| error.to_string()); + let shutdown_result = runtime + .block_on(manager.shutdown()) + .map_err(|error| error.to_string()); + match (result, shutdown_result) { + (Ok(value), Ok(())) => Ok(value), + (Err(error), _) | (Ok(_), Err(error)) => Err(error), + } +} + +fn require_configured_mcp_server<'a>( + config: &'a RuntimeConfig, + server_name: &str, +) -> Result<&'a runtime::ScopedMcpServerConfig, String> { + config + .mcp() + .get(server_name) + .ok_or_else(|| format!("unknown MCP server `{server_name}`")) +} + +fn mcp_transport_label(transport: McpTransport) -> &'static str { + match transport { + McpTransport::Stdio => "stdio", + McpTransport::Sse => "sse", + McpTransport::Http => "http", + McpTransport::Ws => "ws", + McpTransport::Sdk => "sdk", + McpTransport::ManagedProxy => "managed_proxy", + } +} + +fn mcp_auth_from_transport(transport: &McpClientTransport) -> McpClientAuth { + match transport { + McpClientTransport::Sse(remote) | McpClientTransport::Http(remote) => remote.auth.clone(), + McpClientTransport::Stdio(_) + | McpClientTransport::WebSocket(_) + | McpClientTransport::Sdk(_) + | McpClientTransport::ManagedProxy(_) => McpClientAuth::None, + } +} + +fn render_unsupported_mcp_servers( + manager: &McpServerManager, + server_filter: Option<&str>, +) -> Vec { + manager + .unsupported_servers() + .iter() + .filter(|server| server_filter.is_none_or(|filter| filter == server.server_name)) + .map(|server| { + json!({ + "server": server.server_name, + "transport": mcp_transport_label(server.transport), + "reason": server.reason, + }) + }) + .collect() +} + #[allow(clippy::needless_pass_by_value)] fn run_list_mcp_resources(input: McpResourceInput) -> Result { + let (runtime, config, mut manager) = prepare_mcp_manager()?; + if let Some(server_name) = input.server.as_deref() { + let scoped = require_configured_mcp_server(&config, server_name)?; + if scoped.transport() != McpTransport::Stdio { + let unsupported_servers = render_unsupported_mcp_servers(&manager, Some(server_name)); + return to_pretty_json(json!({ + "server": input.server, + "resources": [], + "count": 0, + "unsupportedServers": unsupported_servers, + "message": format!( + "MCP server `{server_name}` uses unsupported transport `{}` for resource listing", + mcp_transport_label(scoped.transport()) + ), + })); + } + } + + let unsupported_servers = render_unsupported_mcp_servers(&manager, input.server.as_deref()); + let operation_result = runtime.block_on(manager.list_resources(input.server.as_deref())); + let resources = finish_mcp_manager_operation(&runtime, &mut manager, operation_result)?; + let resource_count = resources.len(); + let resources_empty = resources.is_empty(); + let rendered_resources = resources + .into_iter() + .map(|resource| { + json!({ + "server": resource.server_name, + "uri": resource.resource.uri, + "name": resource.resource.name, + "description": resource.resource.description, + "mimeType": resource.resource.mime_type, + }) + }) + .collect::>(); + + let message = if resources_empty { + Some(if config.mcp().servers().is_empty() { + "No MCP servers configured".to_string() + } else { + "No MCP resources available".to_string() + }) + } else { + None + }; + to_pretty_json(json!({ "server": input.server, - "resources": [], - "message": "No MCP resources available" + "resources": rendered_resources, + "count": resource_count, + "unsupportedServers": unsupported_servers, + "message": message, })) } #[allow(clippy::needless_pass_by_value)] fn run_read_mcp_resource(input: McpResourceInput) -> Result { + let uri = input + .uri + .clone() + .ok_or_else(|| "missing field `uri`".to_string())?; + let (runtime, config, mut manager) = prepare_mcp_manager()?; + let operation_result = (|| { + let server_name = if let Some(server_name) = input.server.as_deref() { + let scoped = require_configured_mcp_server(&config, server_name)?; + if scoped.transport() != McpTransport::Stdio { + return Err(format!( + "MCP server `{server_name}` uses unsupported transport `{}` for resource reads", + mcp_transport_label(scoped.transport()) + )); + } + server_name.to_string() + } else { + let resources = runtime + .block_on(manager.list_resources(None)) + .map_err(|error| error.to_string())?; + let matching_servers = resources + .into_iter() + .filter(|resource| resource.resource.uri == uri) + .map(|resource| resource.server_name) + .collect::>(); + match matching_servers.len() { + 0 => return Err(format!("MCP resource `{uri}` was not found")), + 1 => matching_servers + .into_iter() + .next() + .expect("single resource server should exist"), + _ => { + return Err(format!( + "MCP resource `{uri}` is exposed by multiple servers; specify `server`" + )); + } + } + }; + + let response = runtime + .block_on(manager.read_resource(&server_name, &uri)) + .map_err(|error| error.to_string())?; + Ok((server_name, response)) + })(); + let (server_name, response) = + finish_mcp_manager_operation(&runtime, &mut manager, operation_result)?; + to_pretty_json(json!({ - "server": input.server, - "uri": input.uri, - "content": "", - "message": "Resource not available" + "server": server_name, + "uri": uri, + "contents": response.result.as_ref().map(|result| result.contents.clone()).unwrap_or_default(), + "content": response + .result + .as_ref() + .and_then(|result| result.contents.first()) + .and_then(|content| content.text.clone().or_else(|| content.blob.clone())) + .unwrap_or_default(), + "error": response.error, })) } #[allow(clippy::needless_pass_by_value)] fn run_mcp_auth(input: McpAuthInput) -> Result { + let (runtime, config, mut manager) = prepare_mcp_manager()?; + let scoped = require_configured_mcp_server(&config, &input.server)?; + let transport = scoped.transport(); + let client_transport = McpClientTransport::from_config(&scoped.config); + let auth = mcp_auth_from_transport(&client_transport); + let auth_required = auth.requires_user_auth(); + + if transport == McpTransport::Stdio && !auth_required { + let operation_result = runtime.block_on(manager.list_resources(Some(&input.server))); + let resources = finish_mcp_manager_operation(&runtime, &mut manager, operation_result)?; + return to_pretty_json(json!({ + "server": input.server, + "transport": mcp_transport_label(transport), + "status": "connected", + "authRequired": false, + "resourceCount": resources.len(), + "message": "MCP server connected without user authentication", + })); + } + + let oauth = match auth { + McpClientAuth::OAuth(oauth) => Some(json!({ + "clientId": oauth.client_id, + "callbackPort": oauth.callback_port, + "authServerMetadataUrl": oauth.auth_server_metadata_url, + "xaa": oauth.xaa, + })), + McpClientAuth::None => None, + }; + to_pretty_json(json!({ "server": input.server, - "status": "auth_required", - "message": "MCP authentication not yet implemented" + "transport": mcp_transport_label(transport), + "status": if auth_required { "auth_required" } else { "unsupported_transport" }, + "authRequired": auth_required, + "oauth": oauth, + "message": if auth_required { + format!( + "MCP server `{}` requires user authentication for `{}` transport", + input.server, + mcp_transport_label(transport) + ) + } else { + format!( + "MCP transport `{}` is not yet connected by the runtime auth flow", + mcp_transport_label(transport) + ) + }, })) } @@ -1158,12 +1383,35 @@ fn run_remote_trigger(input: RemoteTriggerInput) -> Result { #[allow(clippy::needless_pass_by_value)] fn run_mcp_tool(input: McpToolInput) -> Result { + let (runtime, config, mut manager) = prepare_mcp_manager()?; + let scoped = require_configured_mcp_server(&config, &input.server)?; + if scoped.transport() != McpTransport::Stdio { + return Err(format!( + "MCP server `{}` uses unsupported transport `{}` for tool execution", + input.server, + mcp_transport_label(scoped.transport()) + )); + } + + let qualified_tool_name = runtime::mcp_tool_name(&input.server, &input.tool); + let operation_result = (|| { + runtime + .block_on(manager.discover_tools()) + .map_err(|error| error.to_string())?; + runtime + .block_on(manager.call_tool(&qualified_tool_name, input.arguments.clone())) + .map_err(|error| error.to_string()) + })(); + let response = finish_mcp_manager_operation(&runtime, &mut manager, operation_result)?; + to_pretty_json(json!({ "server": input.server, "tool": input.tool, + "qualifiedToolName": qualified_tool_name, "arguments": input.arguments, - "result": null, - "message": "MCP tool proxy not yet connected" + "result": response.result, + "error": response.error, + "message": response.error.as_ref().map(|error| error.message.clone()), })) } @@ -4095,6 +4343,7 @@ mod tests { use std::fs; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpListener}; + use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; use std::sync::{Arc, Mutex, OnceLock}; use std::thread; @@ -4123,6 +4372,132 @@ mod tests { std::env::temp_dir().join(format!("clawd-tools-{unique}-{name}")) } + #[allow(clippy::too_many_lines)] + fn write_fake_mcp_server_script(script_path: &std::path::Path, log_path: &std::path::Path) { + let script = [ + "#!/usr/bin/env python3", + "import json, os, sys", + "", + "LOG_PATH = os.environ.get('MCP_LOG_PATH')", + "", + "def log(method):", + " if LOG_PATH:", + " with open(LOG_PATH, 'a', encoding='utf-8') as handle:", + " handle.write(f'{method}\\n')", + "", + "def read_message():", + " header = b''", + r" while not header.endswith(b'\r\n\r\n'):", + " chunk = sys.stdin.buffer.read(1)", + " if not chunk:", + " return None", + " header += chunk", + " length = 0", + r" for line in header.decode().split('\r\n'):", + r" if line.lower().startswith('content-length:'):", + r" length = int(line.split(':', 1)[1].strip())", + " payload = sys.stdin.buffer.read(length)", + " return json.loads(payload.decode())", + "", + "def send_message(message):", + " payload = json.dumps(message).encode()", + r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)", + " sys.stdout.buffer.flush()", + "", + "while True:", + " request = read_message()", + " if request is None:", + " break", + " method = request['method']", + " log(method)", + " if method == 'initialize':", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'result': {", + " 'protocolVersion': request['params']['protocolVersion'],", + " 'capabilities': {'tools': {}, 'resources': {}},", + " 'serverInfo': {'name': 'fake-mcp', 'version': '1.0.0'}", + " }", + " })", + " elif method == 'tools/list':", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'result': {", + " 'tools': [", + " {", + " 'name': 'echo',", + " 'description': 'Echo tool',", + " 'inputSchema': {", + " 'type': 'object',", + " 'properties': {'text': {'type': 'string'}},", + " 'required': ['text']", + " }", + " }", + " ]", + " }", + " })", + " elif method == 'tools/call':", + " args = request['params'].get('arguments') or {}", + " text = args.get('text', '')", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'result': {", + " 'content': [{'type': 'text', 'text': f'echo:{text}'}],", + " 'structuredContent': {'echoed': text},", + " 'isError': False", + " }", + " })", + " elif method == 'resources/list':", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'result': {", + " 'resources': [", + " {", + " 'uri': 'resource://alpha/guide.txt',", + " 'name': 'alpha-guide',", + " 'description': 'Guide text',", + " 'mimeType': 'text/plain'", + " }", + " ]", + " }", + " })", + " elif method == 'resources/read':", + " uri = request['params']['uri']", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'result': {", + " 'contents': [", + " {", + " 'uri': uri,", + " 'mimeType': 'text/plain',", + " 'text': f'contents for {uri}'", + " }", + " ]", + " }", + " })", + " else:", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'error': {'code': -32601, 'message': f'unknown method: {method}'},", + " })", + ] + .join("\n"); + + fs::write(script_path, script).expect("write fake mcp server"); + let mut permissions = fs::metadata(script_path) + .expect("script metadata") + .permissions(); + permissions.set_mode(0o755); + fs::set_permissions(script_path, permissions).expect("chmod"); + let _ = fs::remove_file(log_path); + } + #[test] fn exposes_mvp_tools() { let names = mvp_tool_specs() @@ -5338,6 +5713,116 @@ mod tests { let _ = std::fs::remove_dir_all(root); } + #[test] + fn mcp_tools_use_real_stdio_lifecycle() { + let _guard = env_lock() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let root = temp_path("mcp-tool-lifecycle"); + let home = root.join("home"); + let cwd = root.join("cwd"); + let script_dir = root.join("scripts"); + let script_path = script_dir.join("fake-mcp.py"); + let log_path = root.join("mcp.log"); + + fs::create_dir_all(home.join(".claw")).expect("home dir"); + fs::create_dir_all(cwd.join(".claw")).expect("cwd dir"); + fs::create_dir_all(&script_dir).expect("script dir"); + write_fake_mcp_server_script(&script_path, &log_path); + fs::write( + home.join(".claw").join("settings.json"), + serde_json::to_string(&json!({ + "mcpServers": { + "alpha": { + "command": "python3", + "args": [script_path.to_string_lossy().to_string()], + "env": { + "MCP_LOG_PATH": log_path.to_string_lossy().to_string(), + } + } + } + })) + .expect("serialize settings"), + ) + .expect("write settings"); + + let original_home = std::env::var("HOME").ok(); + let original_config_home = std::env::var("CLAW_CONFIG_HOME").ok(); + let original_dir = std::env::current_dir().expect("cwd"); + std::env::set_var("HOME", &home); + std::env::remove_var("CLAW_CONFIG_HOME"); + std::env::set_current_dir(&cwd).expect("set cwd"); + + let auth = execute_tool("McpAuth", &json!({"server": "alpha"})).expect("mcp auth"); + let auth_output: serde_json::Value = serde_json::from_str(&auth).expect("json"); + assert_eq!(auth_output["status"], "connected"); + assert_eq!(auth_output["transport"], "stdio"); + assert_eq!(auth_output["authRequired"], false); + + let listed = execute_tool("ListMcpResources", &json!({"server": "alpha"})) + .expect("list mcp resources"); + let listed_output: serde_json::Value = serde_json::from_str(&listed).expect("json"); + assert_eq!(listed_output["count"], 1); + assert_eq!(listed_output["resources"][0]["server"], "alpha"); + assert_eq!( + listed_output["resources"][0]["uri"], + "resource://alpha/guide.txt" + ); + + let read = execute_tool( + "ReadMcpResource", + &json!({"server": "alpha", "uri": "resource://alpha/guide.txt"}), + ) + .expect("read mcp resource"); + let read_output: serde_json::Value = serde_json::from_str(&read).expect("json"); + assert_eq!(read_output["server"], "alpha"); + assert_eq!( + read_output["content"], + "contents for resource://alpha/guide.txt" + ); + assert_eq!(read_output["contents"][0]["mimeType"], "text/plain"); + + let called = execute_tool( + "MCP", + &json!({"server": "alpha", "tool": "echo", "arguments": {"text": "hello"}}), + ) + .expect("call mcp tool"); + let called_output: serde_json::Value = serde_json::from_str(&called).expect("json"); + assert_eq!(called_output["qualifiedToolName"], "mcp__alpha__echo"); + assert_eq!( + called_output["result"]["structuredContent"]["echoed"], + "hello" + ); + assert_eq!(called_output["result"]["content"][0]["text"], "echo:hello"); + + let log = fs::read_to_string(&log_path).expect("read mcp log"); + assert_eq!( + log.lines().collect::>(), + vec![ + "initialize", + "resources/list", + "initialize", + "resources/list", + "initialize", + "resources/read", + "initialize", + "tools/list", + "tools/call", + ] + ); + + std::env::set_current_dir(&original_dir).expect("restore cwd"); + match original_home { + Some(value) => std::env::set_var("HOME", value), + None => std::env::remove_var("HOME"), + } + match original_config_home { + Some(value) => std::env::set_var("CLAW_CONFIG_HOME", value), + None => std::env::remove_var("CLAW_CONFIG_HOME"), + } + let _ = fs::remove_dir_all(root); + } + #[test] fn enter_and_exit_plan_mode_round_trip_existing_local_override() { let _guard = env_lock()