Close the stdio MCP lifecycle parity gap

The runtime already had stdio MCP bootstrap, initialize, tool discovery, and tool call plumbing, but the user-facing MCP tools still returned stub payloads. This wires ListMcpResources, ReadMcpResource, McpAuth, and MCP through real config-loaded MCP manager lifecycles, adds resource listing/reading support to the manager, and updates parity notes to reflect the new stdio-only coverage.\n\nConstraint: Keep scope on the MCP parity lane and avoid behavior changes in already-landed task/team/cron/bash/file tool work beyond lint-safe compile integration\nRejected: Implement remote MCP transports and browser OAuth flow now | broader than the requested parity slice\nConfidence: high\nScope-risk: moderate\nDirective: Reuse the manager-backed lifecycle for future remote transport support instead of adding new tool-local stubs\nTested: cargo fmt; cargo clippy --workspace --all-targets -- -D warnings; cargo test --workspace\nNot-tested: Live OAuth/browser auth flow; non-stdio MCP transports
This commit is contained in:
Yeachan-Heo
2026-04-03 08:46:30 +00:00
parent 49653fe02e
commit f7d090bfde
7 changed files with 769 additions and 25 deletions
+5
View File
@@ -28,6 +28,7 @@ fn is_binary_file(path: &Path) -> io::Result<bool> {
/// Validate that a resolved path stays within the given workspace root.
/// Returns the canonical path on success, or an error if the path escapes
/// the workspace boundary (e.g. via `../` traversal or symlink).
#[allow(dead_code)]
fn validate_workspace_boundary(resolved: &Path, workspace_root: &Path) -> io::Result<()> {
if !resolved.starts_with(workspace_root) {
return Err(io::Error::new(
@@ -544,6 +545,7 @@ fn normalize_path_allow_missing(path: &str) -> io::Result<PathBuf> {
}
/// Read a file with workspace boundary enforcement.
#[allow(dead_code)]
pub fn read_file_in_workspace(
path: &str,
offset: Option<usize>,
@@ -559,6 +561,7 @@ pub fn read_file_in_workspace(
}
/// Write a file with workspace boundary enforcement.
#[allow(dead_code)]
pub fn write_file_in_workspace(
path: &str,
content: &str,
@@ -573,6 +576,7 @@ pub fn write_file_in_workspace(
}
/// Edit a file with workspace boundary enforcement.
#[allow(dead_code)]
pub fn edit_file_in_workspace(
path: &str,
old_string: &str,
@@ -589,6 +593,7 @@ pub fn edit_file_in_workspace(
}
/// Check whether a path is a symlink that resolves outside the workspace.
#[allow(dead_code)]
pub fn is_symlink_escape(path: &Path, workspace_root: &Path) -> io::Result<bool> {
let metadata = fs::symlink_metadata(path)?;
if !metadata.is_symlink() {
+5 -5
View File
@@ -57,11 +57,11 @@ pub use mcp_client::{
};
pub use mcp_stdio::{
spawn_mcp_stdio_process, JsonRpcError, JsonRpcId, JsonRpcRequest, JsonRpcResponse,
ManagedMcpTool, McpInitializeClientInfo, McpInitializeParams, McpInitializeResult,
McpInitializeServerInfo, McpListResourcesParams, McpListResourcesResult, McpListToolsParams,
McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpResource,
McpResourceContents, McpServerManager, McpServerManagerError, McpStdioProcess, McpTool,
McpToolCallContent, McpToolCallParams, McpToolCallResult, UnsupportedMcpServer,
ManagedMcpResource, ManagedMcpTool, McpInitializeClientInfo, McpInitializeParams,
McpInitializeResult, McpInitializeServerInfo, McpListResourcesParams, McpListResourcesResult,
McpListToolsParams, McpListToolsResult, McpReadResourceParams, McpReadResourceResult,
McpResource, McpResourceContents, McpServerManager, McpServerManagerError, McpStdioProcess,
McpTool, McpToolCallContent, McpToolCallParams, McpToolCallResult, UnsupportedMcpServer,
};
pub use oauth::{
clear_oauth_credentials, code_challenge_s256, credentials_path, generate_pkce_pair,
+242
View File
@@ -25,6 +25,11 @@ const MCP_LIST_TOOLS_TIMEOUT_MS: u64 = 300;
#[cfg(not(test))]
const MCP_LIST_TOOLS_TIMEOUT_MS: u64 = 30_000;
#[cfg(test)]
const MCP_LIST_RESOURCES_TIMEOUT_MS: u64 = 300;
#[cfg(not(test))]
const MCP_LIST_RESOURCES_TIMEOUT_MS: u64 = 30_000;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
pub enum JsonRpcId {
@@ -223,6 +228,12 @@ pub struct ManagedMcpTool {
pub tool: McpTool,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ManagedMcpResource {
pub server_name: String,
pub resource: McpResource,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UnsupportedMcpServer {
pub server_name: String,
@@ -420,6 +431,65 @@ impl McpServerManager {
Ok(discovered_tools)
}
pub async fn list_resources(
&mut self,
server_name: Option<&str>,
) -> Result<Vec<ManagedMcpResource>, McpServerManagerError> {
let server_names = match server_name {
Some(server_name) => vec![server_name.to_string()],
None => self.servers.keys().cloned().collect::<Vec<_>>(),
};
let mut resources = Vec::new();
for server_name in server_names {
resources.extend(self.list_resources_for_server(&server_name).await?);
}
Ok(resources)
}
pub async fn read_resource(
&mut self,
server_name: &str,
uri: &str,
) -> Result<JsonRpcResponse<McpReadResourceResult>, McpServerManagerError> {
let timeout_ms = self.tool_call_timeout_ms(server_name)?;
self.ensure_server_ready(server_name).await?;
let request_id = self.take_request_id();
let response =
{
let server = self.server_mut(server_name)?;
let process = server.process.as_mut().ok_or_else(|| {
McpServerManagerError::InvalidResponse {
server_name: server_name.to_string(),
method: "resources/read",
details: "server process missing after initialization".to_string(),
}
})?;
Self::run_process_request(
server_name,
"resources/read",
timeout_ms,
process.read_resource(
request_id,
McpReadResourceParams {
uri: uri.to_string(),
},
),
)
.await
};
if let Err(error) = &response {
if Self::should_reset_server(error) {
self.reset_server(server_name).await?;
}
}
response
}
pub async fn call_tool(
&mut self,
qualified_tool_name: &str,
@@ -623,6 +693,94 @@ impl McpServerManager {
Ok(discovered_tools)
}
async fn list_resources_for_server(
&mut self,
server_name: &str,
) -> Result<Vec<ManagedMcpResource>, McpServerManagerError> {
let mut attempts = 0;
loop {
match self.list_resources_for_server_once(server_name).await {
Ok(resources) => return Ok(resources),
Err(error) if attempts == 0 && Self::is_retryable_error(&error) => {
self.reset_server(server_name).await?;
attempts += 1;
}
Err(error) => {
if Self::should_reset_server(&error) {
self.reset_server(server_name).await?;
}
return Err(error);
}
}
}
}
async fn list_resources_for_server_once(
&mut self,
server_name: &str,
) -> Result<Vec<ManagedMcpResource>, McpServerManagerError> {
self.ensure_server_ready(server_name).await?;
let mut discovered_resources = Vec::new();
let mut cursor = None;
loop {
let request_id = self.take_request_id();
let response = {
let server = self.server_mut(server_name)?;
let process = server.process.as_mut().ok_or_else(|| {
McpServerManagerError::InvalidResponse {
server_name: server_name.to_string(),
method: "resources/list",
details: "server process missing after initialization".to_string(),
}
})?;
Self::run_process_request(
server_name,
"resources/list",
MCP_LIST_RESOURCES_TIMEOUT_MS,
process.list_resources(
request_id,
Some(McpListResourcesParams {
cursor: cursor.clone(),
}),
),
)
.await?
};
if let Some(error) = response.error {
return Err(McpServerManagerError::JsonRpc {
server_name: server_name.to_string(),
method: "resources/list",
error,
});
}
let result = response
.result
.ok_or_else(|| McpServerManagerError::InvalidResponse {
server_name: server_name.to_string(),
method: "resources/list",
details: "missing result payload".to_string(),
})?;
for resource in result.resources {
discovered_resources.push(ManagedMcpResource {
server_name: server_name.to_string(),
resource,
});
}
match result.next_cursor {
Some(next_cursor) => cursor = Some(next_cursor),
None => break,
}
}
Ok(discovered_resources)
}
async fn reset_server(&mut self, server_name: &str) -> Result<(), McpServerManagerError> {
let mut process = {
let server = self.server_mut(server_name)?;
@@ -1386,6 +1544,36 @@ mod tests {
" 'isError': False",
" }",
" })",
" elif method == 'resources/list':",
" send_message({",
" 'jsonrpc': '2.0',",
" 'id': request['id'],",
" 'result': {",
" 'resources': [",
" {",
" 'uri': f'resource://{LABEL}/guide.txt',",
" 'name': f'{LABEL}-guide',",
" 'description': f'Guide for {LABEL}',",
" 'mimeType': 'text/plain'",
" }",
" ]",
" }",
" })",
" elif method == 'resources/read':",
" uri = request['params']['uri']",
" send_message({",
" 'jsonrpc': '2.0',",
" 'id': request['id'],",
" 'result': {",
" 'contents': [",
" {",
" 'uri': uri,",
" 'mimeType': 'text/plain',",
" 'text': f'{LABEL} contents for {uri}'",
" }",
" ]",
" }",
" })",
" else:",
" send_message({",
" 'jsonrpc': '2.0',",
@@ -1936,6 +2124,60 @@ mod tests {
});
}
#[test]
fn manager_lists_and_reads_resources_from_stdio_servers() {
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime");
runtime.block_on(async {
let script_path = write_manager_mcp_server_script();
let root = script_path.parent().expect("script parent");
let log_path = root.join("resources.log");
let servers = BTreeMap::from([(
"alpha".to_string(),
manager_server_config(&script_path, "alpha", &log_path),
)]);
let mut manager = McpServerManager::from_servers(&servers);
let resources = manager
.list_resources(Some("alpha"))
.await
.expect("list resources");
assert_eq!(resources.len(), 1);
assert_eq!(resources[0].server_name, "alpha");
assert_eq!(resources[0].resource.uri, "resource://alpha/guide.txt");
assert_eq!(resources[0].resource.name.as_deref(), Some("alpha-guide"));
let read = manager
.read_resource("alpha", "resource://alpha/guide.txt")
.await
.expect("read resource");
assert_eq!(
read.result.as_ref().map(|result| result.contents.len()),
Some(1)
);
assert_eq!(
read.result
.as_ref()
.and_then(|result| result.contents.first())
.and_then(|content| content.text.as_deref()),
Some("alpha contents for resource://alpha/guide.txt")
);
let log = fs::read_to_string(&log_path).expect("read log");
assert_eq!(
log.lines().collect::<Vec<_>>(),
vec!["initialize", "resources/list", "resources/read"]
);
manager.shutdown().await.expect("shutdown");
cleanup_script(&script_path);
});
}
#[test]
fn manager_times_out_slow_tool_calls() {
let runtime = Builder::new_current_thread()
+4 -1
View File
@@ -103,18 +103,20 @@ impl TaskRegistry {
}
/// Look up a task by ID.
#[must_use]
pub fn get(&self, task_id: &str) -> Option<Task> {
let inner = self.inner.lock().expect("registry lock poisoned");
inner.tasks.get(task_id).cloned()
}
/// List all tasks, optionally filtered by status.
#[must_use]
pub fn list(&self, status_filter: Option<TaskStatus>) -> Vec<Task> {
let inner = self.inner.lock().expect("registry lock poisoned");
inner
.tasks
.values()
.filter(|t| status_filter.map_or(true, |s| t.status == s))
.filter(|t| status_filter.is_none_or(|s| t.status == s))
.cloned()
.collect()
}
@@ -206,6 +208,7 @@ impl TaskRegistry {
}
/// Remove a task from the registry.
#[must_use]
pub fn remove(&self, task_id: &str) -> Option<Task> {
let mut inner = self.inner.lock().expect("registry lock poisoned");
inner.tasks.remove(task_id)
@@ -70,6 +70,7 @@ impl TeamRegistry {
}
/// Create a new team with the given name and task IDs.
#[must_use]
pub fn create(&self, name: &str, task_ids: Vec<String>) -> Team {
let mut inner = self.inner.lock().expect("team registry lock poisoned");
inner.counter += 1;
@@ -88,12 +89,14 @@ impl TeamRegistry {
}
/// Get a team by ID.
#[must_use]
pub fn get(&self, team_id: &str) -> Option<Team> {
let inner = self.inner.lock().expect("team registry lock poisoned");
inner.teams.get(team_id).cloned()
}
/// List all teams.
#[must_use]
pub fn list(&self) -> Vec<Team> {
let inner = self.inner.lock().expect("team registry lock poisoned");
inner.teams.values().cloned().collect()
@@ -112,6 +115,7 @@ impl TeamRegistry {
}
/// Remove a team entirely from the registry.
#[must_use]
pub fn remove(&self, team_id: &str) -> Option<Team> {
let mut inner = self.inner.lock().expect("team registry lock poisoned");
inner.teams.remove(team_id)
@@ -166,6 +170,7 @@ impl CronRegistry {
}
/// Create a new cron entry.
#[must_use]
pub fn create(&self, schedule: &str, prompt: &str, description: Option<&str>) -> CronEntry {
let mut inner = self.inner.lock().expect("cron registry lock poisoned");
inner.counter += 1;
@@ -187,12 +192,14 @@ impl CronRegistry {
}
/// Get a cron entry by ID.
#[must_use]
pub fn get(&self, cron_id: &str) -> Option<CronEntry> {
let inner = self.inner.lock().expect("cron registry lock poisoned");
inner.entries.get(cron_id).cloned()
}
/// List all cron entries, optionally filtered to enabled only.
#[must_use]
pub fn list(&self, enabled_only: bool) -> Vec<CronEntry> {
let inner = self.inner.lock().expect("cron registry lock poisoned");
inner
@@ -284,7 +291,7 @@ mod tests {
assert_eq!(still_there.status, TeamStatus::Deleted);
// Hard remove
registry.remove(&t2.team_id);
let _ = registry.remove(&t2.team_id);
assert_eq!(registry.len(), 1);
}