refactor: allow use without external sleep

This commit is contained in:
zhom
2026-03-14 11:29:13 +04:00
parent 6a88887a6c
commit 90563ea6f5
+231 -51
View File
@@ -834,7 +834,7 @@ impl McpServer {
// Browser interaction tools
McpTool {
name: "navigate".to_string(),
description: "Navigate a running browser profile to a URL".to_string(),
description: "Navigate a running browser profile to a URL. Waits for the page to fully load before returning.".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
@@ -881,7 +881,7 @@ impl McpServer {
McpTool {
name: "evaluate_javascript".to_string(),
description:
"Execute JavaScript in the context of the current page and return the result. Works with both static and dynamically-generated content."
"Execute JavaScript in the context of the current page and return the result. Works with both static and dynamically-generated content. Set wait_for_load=true if the script triggers navigation (e.g., form.submit())."
.to_string(),
input_schema: serde_json::json!({
"type": "object",
@@ -897,6 +897,10 @@ impl McpServer {
"await_promise": {
"type": "boolean",
"description": "Whether to await the result if it's a Promise (default: false)"
},
"wait_for_load": {
"type": "boolean",
"description": "Wait for page load after execution, use when the script triggers navigation like form.submit() (default: false)"
}
},
"required": ["profile_id", "expression"]
@@ -904,7 +908,7 @@ impl McpServer {
},
McpTool {
name: "click_element".to_string(),
description: "Click on an element identified by a CSS selector".to_string(),
description: "Click on an element identified by a CSS selector. If the click triggers a page navigation, waits for the new page to load before returning.".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
@@ -2638,19 +2642,28 @@ impl McpServer {
let profile_path = profile.get_profile_data_path(&profiles_dir);
let profile_path_str = profile_path.to_string_lossy();
let port = if profile.browser == "wayfern" {
crate::wayfern_manager::WayfernManager::instance()
.get_cdp_port(&profile_path_str)
.await
} else if profile.browser == "camoufox" {
crate::camoufox_manager::CamoufoxManager::instance()
.get_cdp_port(&profile_path_str)
.await
} else {
None
};
// Retry a few times — port info may not be stored yet right after launch
for attempt in 0..10 {
if attempt > 0 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
let port = if profile.browser == "wayfern" {
crate::wayfern_manager::WayfernManager::instance()
.get_cdp_port(&profile_path_str)
.await
} else if profile.browser == "camoufox" {
crate::camoufox_manager::CamoufoxManager::instance()
.get_cdp_port(&profile_path_str)
.await
} else {
None
};
if let Some(p) = port {
return Ok(p);
}
}
port.ok_or_else(|| McpError {
Err(McpError {
code: -32000,
message: format!(
"No CDP connection available for profile '{}'. Make sure the browser is running.",
@@ -2662,31 +2675,47 @@ impl McpServer {
async fn get_cdp_ws_url(&self, port: u16) -> Result<String, McpError> {
let url = format!("http://127.0.0.1:{port}/json");
let client = reqwest::Client::new();
let resp = client
.get(&url)
.timeout(std::time::Duration::from_secs(5))
.send()
.await
.map_err(|e| McpError {
code: -32000,
message: format!("Failed to connect to browser CDP endpoint: {e}"),
})?;
let targets: Vec<serde_json::Value> = resp.json().await.map_err(|e| McpError {
// Retry connecting to CDP endpoint — Wayfern closes the debugging port
// briefly after launch for anti-detection and reopens it after ~30s.
let max_attempts = 45;
let mut last_err = String::new();
for attempt in 0..max_attempts {
if attempt > 0 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
match client
.get(&url)
.timeout(std::time::Duration::from_secs(3))
.send()
.await
{
Ok(resp) => match resp.json::<Vec<serde_json::Value>>().await {
Ok(targets) => {
if let Some(ws_url) = targets
.iter()
.find(|t| t.get("type").and_then(|v| v.as_str()) == Some("page"))
.and_then(|t| t.get("webSocketDebuggerUrl"))
.and_then(|v| v.as_str())
{
return Ok(ws_url.to_string());
}
last_err = "No page target found in browser".to_string();
}
Err(e) => {
last_err = format!("Failed to parse CDP targets: {e}");
}
},
Err(e) => {
last_err = format!("Failed to connect to browser CDP endpoint: {e}");
}
}
}
Err(McpError {
code: -32000,
message: format!("Failed to parse CDP targets: {e}"),
})?;
targets
.iter()
.find(|t| t.get("type").and_then(|v| v.as_str()) == Some("page"))
.and_then(|t| t.get("webSocketDebuggerUrl"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.ok_or_else(|| McpError {
code: -32000,
message: "No page target found in browser".to_string(),
})
message: last_err,
})
}
async fn send_cdp(
@@ -2753,6 +2782,140 @@ impl McpServer {
})
}
/// Send a CDP command and wait for the page to finish loading.
/// Uses a single WebSocket connection to: enable Page events, send the command,
/// wait for the command response, then wait for `Page.loadEventFired`.
async fn send_cdp_and_wait_for_load(
&self,
ws_url: &str,
method: &str,
params: serde_json::Value,
timeout_secs: u64,
) -> Result<serde_json::Value, McpError> {
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
let (mut ws_stream, _) = connect_async(ws_url).await.map_err(|e| McpError {
code: -32000,
message: format!("Failed to connect to CDP WebSocket: {e}"),
})?;
// Enable Page domain events so we receive loadEventFired
let enable_cmd = serde_json::json!({
"id": 1,
"method": "Page.enable",
"params": {}
});
ws_stream
.send(Message::Text(enable_cmd.to_string().into()))
.await
.map_err(|e| McpError {
code: -32000,
message: format!("Failed to send Page.enable: {e}"),
})?;
// Wait for Page.enable response
loop {
let msg = ws_stream
.next()
.await
.ok_or_else(|| McpError {
code: -32000,
message: "WebSocket closed waiting for Page.enable response".to_string(),
})?
.map_err(|e| McpError {
code: -32000,
message: format!("CDP WebSocket error: {e}"),
})?;
if let Message::Text(text) = msg {
let resp: serde_json::Value = serde_json::from_str(text.as_str()).unwrap_or_default();
if resp.get("id") == Some(&serde_json::json!(1)) {
break;
}
}
}
// Send the actual command (e.g., Page.navigate)
let command = serde_json::json!({
"id": 2,
"method": method,
"params": params
});
ws_stream
.send(Message::Text(command.to_string().into()))
.await
.map_err(|e| McpError {
code: -32000,
message: format!("Failed to send CDP command: {e}"),
})?;
// Wait for command response and then for Page.loadEventFired
let mut command_result = None;
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(timeout_secs);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
// Timed out waiting for load — return the command result if we have it
break;
}
let msg = match tokio::time::timeout(remaining, ws_stream.next()).await {
Ok(Some(Ok(msg))) => msg,
Ok(Some(Err(e))) => {
return Err(McpError {
code: -32000,
message: format!("CDP WebSocket error: {e}"),
});
}
Ok(None) => break, // stream ended
Err(_) => break, // timeout
};
if let Message::Text(text) = msg {
let response: serde_json::Value = serde_json::from_str(text.as_str()).unwrap_or_default();
// Check for command response
if response.get("id") == Some(&serde_json::json!(2)) {
if let Some(error) = response.get("error") {
return Err(McpError {
code: -32000,
message: format!("CDP error: {error}"),
});
}
command_result = Some(
response
.get("result")
.cloned()
.unwrap_or(serde_json::json!({})),
);
}
// Check for Page.loadEventFired — page is fully loaded
if response.get("method") == Some(&serde_json::json!("Page.loadEventFired")) {
break;
}
}
}
// Disable Page domain events
let disable_cmd = serde_json::json!({
"id": 3,
"method": "Page.disable",
"params": {}
});
let _ = ws_stream
.send(Message::Text(disable_cmd.to_string().into()))
.await;
command_result.ok_or_else(|| McpError {
code: -32000,
message: "No response received from CDP".to_string(),
})
}
fn get_running_profile(&self, profile_id: &str) -> Result<BrowserProfile, McpError> {
let profiles = ProfileManager::instance()
.list_profiles()
@@ -2812,7 +2975,12 @@ impl McpServer {
let ws_url = self.get_cdp_ws_url(cdp_port).await?;
self
.send_cdp(&ws_url, "Page.navigate", serde_json::json!({ "url": url }))
.send_cdp_and_wait_for_load(
&ws_url,
"Page.navigate",
serde_json::json!({ "url": url }),
30,
)
.await?;
Ok(serde_json::json!({
@@ -2911,22 +3079,30 @@ impl McpServer {
.get("await_promise")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let wait_for_load = arguments
.get("wait_for_load")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let profile = self.get_running_profile(profile_id)?;
let cdp_port = self.get_cdp_port_for_profile(&profile).await?;
let ws_url = self.get_cdp_ws_url(cdp_port).await?;
let result = self
.send_cdp(
&ws_url,
"Runtime.evaluate",
serde_json::json!({
"expression": expression,
"returnByValue": true,
"awaitPromise": await_promise,
}),
)
.await?;
let cdp_params = serde_json::json!({
"expression": expression,
"returnByValue": true,
"awaitPromise": await_promise,
});
let result = if wait_for_load {
self
.send_cdp_and_wait_for_load(&ws_url, "Runtime.evaluate", cdp_params, 30)
.await?
} else {
self
.send_cdp(&ws_url, "Runtime.evaluate", cdp_params)
.await?
};
let value = if let Some(exception) = result.get("exceptionDetails") {
let text = exception
@@ -2989,14 +3165,18 @@ impl McpServer {
selector_escaped, selector_escaped
);
// Use send_cdp_and_wait_for_load: if the click triggers navigation,
// we wait for the new page to load. If not, the 10s timeout expires
// and we return immediately.
let result = self
.send_cdp(
.send_cdp_and_wait_for_load(
&ws_url,
"Runtime.evaluate",
serde_json::json!({
"expression": js,
"returnByValue": true,
}),
10,
)
.await?;