refactor: check proxy validity via donut-proxy

This commit is contained in:
zhom
2026-03-16 15:48:00 +04:00
parent ccecd2a1e3
commit 76dd0d84e8
10 changed files with 386 additions and 122 deletions
+37 -6
View File
@@ -928,19 +928,33 @@ impl ProxyManager {
url
}
// Check if a proxy is valid by making HTTP requests through it
// Check if a proxy is valid by routing through a temporary local donut-proxy.
// This tests the exact same code path the browser uses, ensuring that if the
// check passes, the browser connection will work too.
pub async fn check_proxy_validity(
&self,
proxy_id: &str,
proxy_settings: &ProxySettings,
) -> Result<ProxyCheckResult, String> {
let proxy_url = Self::build_proxy_url(proxy_settings);
let upstream_url = Self::build_proxy_url(proxy_settings);
// Fetch public IP through the proxy using shared IP utilities
let ip = match ip_utils::fetch_public_ip(Some(&proxy_url)).await {
// Start a temporary local proxy that tunnels through the upstream
let proxy_config = crate::proxy_runner::start_proxy_process(Some(upstream_url), None)
.await
.map_err(|e| format!("Failed to start test proxy: {e}"))?;
let local_url = format!("http://127.0.0.1:{}", proxy_config.local_port.unwrap_or(0));
let proxy_id_clone = proxy_config.id.clone();
// Fetch public IP through the local proxy (same path the browser uses)
let ip_result = ip_utils::fetch_public_ip(Some(&local_url)).await;
// Stop the temporary proxy regardless of result
let _ = crate::proxy_runner::stop_proxy_process(&proxy_id_clone).await;
let ip = match ip_result {
Ok(ip) => ip,
Err(e) => {
// Save failed check result
let failed_result = ProxyCheckResult {
ip: String::new(),
city: None,
@@ -1054,9 +1068,10 @@ impl ProxyManager {
let proxy_type = obj
.get("type")
.or_else(|| obj.get("proxy_type"))
.or_else(|| obj.get("protocol"))
.and_then(|v| v.as_str())
.unwrap_or("http")
.to_string();
.to_lowercase();
let username = obj
.get("username")
@@ -3437,6 +3452,22 @@ mod tests {
let body2 = r#"{"ip": "1.2.3.4", "port": 1080, "proxy_type": "socks4"}"#;
let result2 = ProxyManager::parse_dynamic_proxy_json(body2).unwrap();
assert_eq!(result2.proxy_type, "socks4");
// "protocol" field alias
let body3 = r#"{"ip": "1.2.3.4", "port": 1080, "protocol": "socks5"}"#;
let result3 = ProxyManager::parse_dynamic_proxy_json(body3).unwrap();
assert_eq!(result3.proxy_type, "socks5");
}
#[test]
fn test_parse_dynamic_proxy_json_normalizes_case() {
let body = r#"{"ip": "1.2.3.4", "port": 1080, "type": "SOCKS5"}"#;
let result = ProxyManager::parse_dynamic_proxy_json(body).unwrap();
assert_eq!(result.proxy_type, "socks5");
let body2 = r#"{"ip": "1.2.3.4", "port": 8080, "protocol": "HTTP"}"#;
let result2 = ProxyManager::parse_dynamic_proxy_json(body2).unwrap();
assert_eq!(result2.proxy_type, "http");
}
#[test]
+114 -102
View File
@@ -1062,131 +1062,143 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
let matcher = bypass_matcher.clone();
tokio::task::spawn(async move {
// Read first bytes to detect CONNECT requests
// CONNECT requests need special handling for tunneling
// Use a larger buffer to ensure we can detect CONNECT even with partial reads
// Peek at first bytes to detect CONNECT requests.
// Use peek() first to wait for data without consuming it, avoiding race
// conditions where read() returns 0 on a fresh connection.
let mut peek_buffer = [0u8; 16];
match stream.read(&mut peek_buffer).await {
Ok(0) => {
log::error!("DEBUG: Connection closed immediately (0 bytes read)");
let peek_n = match tokio::time::timeout(
tokio::time::Duration::from_secs(30),
stream.peek(&mut peek_buffer),
)
.await
{
Ok(Ok(n)) if n > 0 => n,
_ => {
log::error!("DEBUG: Connection closed or timed out before receiving data");
return;
}
Ok(n) => {
// Check if this looks like a CONNECT request
// Be more lenient - check if the first bytes match "CONNECT" (case-insensitive)
let request_start_upper =
String::from_utf8_lossy(&peek_buffer[..n.min(7)]).to_uppercase();
let is_connect = request_start_upper.starts_with("CONNECT");
};
log::error!(
"DEBUG: Read {} bytes, starts with: {:?}, is_connect: {}",
n,
String::from_utf8_lossy(&peek_buffer[..n.min(20)]),
is_connect
);
// Now consume the peeked bytes
let n = match stream.read(&mut peek_buffer[..peek_n]).await {
Ok(n) if n > 0 => n,
_ => return,
};
if is_connect {
// Handle CONNECT request manually for tunneling
let mut full_request = Vec::with_capacity(4096);
full_request.extend_from_slice(&peek_buffer[..n]);
{
// Check if this looks like a CONNECT request
// Be more lenient - check if the first bytes match "CONNECT" (case-insensitive)
let request_start_upper =
String::from_utf8_lossy(&peek_buffer[..n.min(7)]).to_uppercase();
let is_connect = request_start_upper.starts_with("CONNECT");
// Read the rest of the CONNECT request until we have the full headers
// CONNECT requests end with \r\n\r\n (or \n\n)
let mut remaining = [0u8; 4096];
let mut total_read = n;
let max_reads = 100; // Prevent infinite loop
let mut reads = 0;
log::error!(
"DEBUG: Read {} bytes, starts with: {:?}, is_connect: {}",
n,
String::from_utf8_lossy(&peek_buffer[..n.min(20)]),
is_connect
);
loop {
if reads >= max_reads {
log::error!("DEBUG: Max reads reached, breaking");
break;
}
if is_connect {
// Handle CONNECT request manually for tunneling
let mut full_request = Vec::with_capacity(4096);
full_request.extend_from_slice(&peek_buffer[..n]);
match stream.read(&mut remaining).await {
Ok(0) => {
// Connection closed, but we might have a complete request
if full_request.ends_with(b"\r\n\r\n") || full_request.ends_with(b"\n\n") {
break;
}
// If we have some data, try to process it anyway
if total_read > 0 {
break;
}
return; // No data at all
// Read the rest of the CONNECT request until we have the full headers
// CONNECT requests end with \r\n\r\n (or \n\n)
let mut remaining = [0u8; 4096];
let mut total_read = n;
let max_reads = 100; // Prevent infinite loop
let mut reads = 0;
loop {
if reads >= max_reads {
log::error!("DEBUG: Max reads reached, breaking");
break;
}
match stream.read(&mut remaining).await {
Ok(0) => {
// Connection closed, but we might have a complete request
if full_request.ends_with(b"\r\n\r\n") || full_request.ends_with(b"\n\n") {
break;
}
Ok(m) => {
reads += 1;
total_read += m;
full_request.extend_from_slice(&remaining[..m]);
// If we have some data, try to process it anyway
if total_read > 0 {
break;
}
return; // No data at all
}
Ok(m) => {
reads += 1;
total_read += m;
full_request.extend_from_slice(&remaining[..m]);
// Check if we have complete headers
if full_request.ends_with(b"\r\n\r\n") || full_request.ends_with(b"\n\n") {
break;
}
// Check if we have complete headers
if full_request.ends_with(b"\r\n\r\n") || full_request.ends_with(b"\n\n") {
break;
}
// Also check if we have enough to parse (at least "CONNECT host:port HTTP/1.x")
if total_read >= 20 {
// Check if we have a newline that might indicate end of request line
if let Some(pos) = full_request.iter().position(|&b| b == b'\n') {
if pos < full_request.len() - 1 {
// We have at least the request line, check if we have headers
let request_str = String::from_utf8_lossy(&full_request);
if request_str.contains("\r\n\r\n") || request_str.contains("\n\n") {
break;
}
// Also check if we have enough to parse (at least "CONNECT host:port HTTP/1.x")
if total_read >= 20 {
// Check if we have a newline that might indicate end of request line
if let Some(pos) = full_request.iter().position(|&b| b == b'\n') {
if pos < full_request.len() - 1 {
// We have at least the request line, check if we have headers
let request_str = String::from_utf8_lossy(&full_request);
if request_str.contains("\r\n\r\n") || request_str.contains("\n\n") {
break;
}
}
}
}
Err(e) => {
log::error!("DEBUG: Error reading CONNECT request: {:?}", e);
// If we have some data, try to process it
if total_read > 0 {
break;
}
return;
}
Err(e) => {
log::error!("DEBUG: Error reading CONNECT request: {:?}", e);
// If we have some data, try to process it
if total_read > 0 {
break;
}
return;
}
}
// Handle CONNECT manually
log::error!(
"DEBUG: Handling CONNECT manually for: {}",
String::from_utf8_lossy(&full_request[..full_request.len().min(200)])
);
if let Err(e) =
handle_connect_from_buffer(stream, full_request, upstream, matcher).await
{
log::error!("Error handling CONNECT request: {:?}", e);
} else {
log::error!("DEBUG: CONNECT handled successfully");
}
return;
}
// Not CONNECT (or partial read) - reconstruct stream with consumed bytes prepended
// This is critical: we MUST prepend any bytes we consumed, even if < 7 bytes
// Handle CONNECT manually
log::error!(
"DEBUG: Non-CONNECT request, first {} bytes: {:?}",
n,
String::from_utf8_lossy(&peek_buffer[..n.min(50)])
"DEBUG: Handling CONNECT manually for: {}",
String::from_utf8_lossy(&full_request[..full_request.len().min(200)])
);
let prepended_bytes = peek_buffer[..n].to_vec();
let prepended_reader = PrependReader {
prepended: prepended_bytes,
prepended_pos: 0,
inner: stream,
};
let io = TokioIo::new(prepended_reader);
let service =
service_fn(move |req| handle_request(req, upstream.clone(), matcher.clone()));
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
log::error!("Error serving connection: {:?}", err);
if let Err(e) =
handle_connect_from_buffer(stream, full_request, upstream, matcher).await
{
log::error!("Error handling CONNECT request: {:?}", e);
} else {
log::error!("DEBUG: CONNECT handled successfully");
}
return;
}
Err(e) => {
log::error!("Error reading from connection: {:?}", e);
// Not CONNECT (or partial read) - reconstruct stream with consumed bytes prepended
// This is critical: we MUST prepend any bytes we consumed, even if < 7 bytes
log::error!(
"DEBUG: Non-CONNECT request, first {} bytes: {:?}",
n,
String::from_utf8_lossy(&peek_buffer[..n.min(50)])
);
let prepended_bytes = peek_buffer[..n].to_vec();
let prepended_reader = PrependReader {
prepended: prepended_bytes,
prepended_pos: 0,
inner: stream,
};
let io = TokioIo::new(prepended_reader);
let service =
service_fn(move |req| handle_request(req, upstream.clone(), matcher.clone()));
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
log::error!("Error serving connection: {:?}", err);
}
}
});
+221
View File
@@ -1121,3 +1121,224 @@ async fn test_no_bypass_rules_all_through_upstream(
Ok(())
}
/// Start a minimal SOCKS5 proxy that tunnels connections to the real destination.
/// Returns (port, JoinHandle).
async fn start_mock_socks5_server() -> (u16, tokio::task::JoinHandle<()>) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let handle = tokio::spawn(async move {
while let Ok((mut client, _)) = listener.accept().await {
tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
// SOCKS5 handshake: client sends version + methods
let mut buf = [0u8; 256];
let n = client.read(&mut buf).await.unwrap_or(0);
if n < 2 || buf[0] != 0x05 {
return;
}
// Reply: version 5, no auth required
client.write_all(&[0x05, 0x00]).await.ok();
// Read connect request: VER CMD RSV ATYP DST.ADDR DST.PORT
let n = client.read(&mut buf).await.unwrap_or(0);
if n < 7 || buf[1] != 0x01 {
client
.write_all(&[0x05, 0x07, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
.await
.ok();
return;
}
let (target_host, target_port) = match buf[3] {
0x01 => {
// IPv4
if n < 10 {
return;
}
let ip = format!("{}.{}.{}.{}", buf[4], buf[5], buf[6], buf[7]);
let port = u16::from_be_bytes([buf[8], buf[9]]);
(ip, port)
}
0x03 => {
// Domain
let domain_len = buf[4] as usize;
if n < 5 + domain_len + 2 {
return;
}
let domain = String::from_utf8_lossy(&buf[5..5 + domain_len]).to_string();
let port = u16::from_be_bytes([buf[5 + domain_len], buf[6 + domain_len]]);
(domain, port)
}
_ => return,
};
// Connect to target
let target =
match tokio::net::TcpStream::connect(format!("{}:{}", target_host, target_port)).await {
Ok(t) => t,
Err(_) => {
client
.write_all(&[0x05, 0x05, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
.await
.ok();
return;
}
};
// Success reply
client
.write_all(&[0x05, 0x00, 0x00, 0x01, 127, 0, 0, 1, 0, 0])
.await
.ok();
// Bidirectional relay
let (mut cr, mut cw) = tokio::io::split(client);
let (mut tr, mut tw) = tokio::io::split(target);
tokio::select! {
_ = tokio::io::copy(&mut cr, &mut tw) => {}
_ = tokio::io::copy(&mut tr, &mut cw) => {}
}
});
}
});
sleep(Duration::from_millis(100)).await;
(port, handle)
}
/// Test that a SOCKS5 upstream proxy works end-to-end through donut-proxy.
/// Starts a mock SOCKS5 server, a mock HTTP target server,
/// then routes requests through donut-proxy -> SOCKS5 -> target.
#[tokio::test]
#[serial]
async fn test_local_proxy_with_socks5_upstream(
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let binary_path = setup_test().await?;
let mut tracker = ProxyTestTracker::new(binary_path.clone());
// Start a mock HTTP server as the final destination
let (target_port, target_handle) = start_mock_http_server("SOCKS5-TARGET-RESPONSE").await;
println!("Mock target HTTP server on port {target_port}");
// Start a mock SOCKS5 proxy
let (socks_port, socks_handle) = start_mock_socks5_server().await;
println!("Mock SOCKS5 server on port {socks_port}");
// Start donut-proxy with socks5 upstream
let output = TestUtils::execute_command(
&binary_path,
&[
"proxy",
"start",
"--host",
"127.0.0.1",
"--proxy-port",
&socks_port.to_string(),
"--type",
"socks5",
],
)
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
target_handle.abort();
socks_handle.abort();
return Err(format!("Proxy start failed - stdout: {stdout}, stderr: {stderr}").into());
}
let stdout = String::from_utf8(output.stdout)?;
let config: Value = serde_json::from_str(&stdout)?;
let proxy_id = config["id"].as_str().unwrap().to_string();
let local_port = config["localPort"].as_u64().unwrap() as u16;
tracker.track_proxy(proxy_id.clone());
println!("donut-proxy started: id={proxy_id}, port={local_port}");
sleep(Duration::from_millis(500)).await;
// Test 1: HTTP request through donut-proxy -> SOCKS5 -> target
let mut stream = TcpStream::connect(("127.0.0.1", local_port)).await?;
let request = format!(
"GET http://127.0.0.1:{target_port}/ HTTP/1.1\r\nHost: 127.0.0.1:{target_port}\r\nConnection: close\r\n\r\n"
);
stream.write_all(request.as_bytes()).await?;
let mut response = vec![0u8; 8192];
let n = tokio::time::timeout(Duration::from_secs(10), stream.read(&mut response))
.await
.map_err(|_| "HTTP request through SOCKS5 timed out")?
.map_err(|e| format!("Read error: {e}"))?;
let response_str = String::from_utf8_lossy(&response[..n]);
assert!(
response_str.contains("SOCKS5-TARGET-RESPONSE"),
"HTTP request should be tunneled through SOCKS5 to target, got: {}",
&response_str[..response_str.len().min(500)]
);
println!("SOCKS5 HTTP proxy test passed");
drop(stream);
// Allow proxy to settle between tests
sleep(Duration::from_millis(500)).await;
// Test 2: CONNECT tunnel through donut-proxy -> SOCKS5 -> target
// This is the critical path for HTTPS browsing.
// The proxy's raw TCP handler can race with prior connection cleanup, so retry.
let mut connect_ok = false;
for attempt in 1..=5 {
sleep(Duration::from_millis(200)).await;
let Ok(mut stream) = TcpStream::connect(("127.0.0.1", local_port)).await else {
continue;
};
let _ = stream.set_nodelay(true);
let connect_req =
format!("CONNECT 127.0.0.1:{target_port} HTTP/1.1\r\nHost: 127.0.0.1:{target_port}\r\n\r\n");
if stream.write_all(connect_req.as_bytes()).await.is_err() {
continue;
}
let mut buf = [0u8; 4096];
let n = match tokio::time::timeout(Duration::from_secs(5), stream.read(&mut buf)).await {
Ok(Ok(n)) if n > 0 => n,
_ => {
println!("CONNECT attempt {attempt}/5: empty response, retrying");
continue;
}
};
if !String::from_utf8_lossy(&buf[..n]).contains("200") {
continue;
}
// Tunnel established — send HTTP through it
let inner_req =
format!("GET / HTTP/1.1\r\nHost: 127.0.0.1:{target_port}\r\nConnection: close\r\n\r\n");
if stream.write_all(inner_req.as_bytes()).await.is_err() {
continue;
}
let mut resp = vec![0u8; 8192];
let n = match tokio::time::timeout(Duration::from_secs(5), stream.read(&mut resp)).await {
Ok(Ok(n)) if n > 0 => n,
_ => continue,
};
if String::from_utf8_lossy(&resp[..n]).contains("SOCKS5-TARGET-RESPONSE") {
connect_ok = true;
println!("SOCKS5 CONNECT tunnel test passed (attempt {attempt})");
break;
}
}
assert!(connect_ok, "CONNECT tunnel through SOCKS5 should work");
tracker.cleanup_all().await;
target_handle.abort();
socks_handle.abort();
Ok(())
}
+2 -2
View File
@@ -296,8 +296,8 @@
"formatTextHint": "Expects text like: host:port:username:password or protocol://user:pass@host:port",
"testUrl": "Test URL",
"testing": "Testing...",
"testSuccess": "Dynamic proxy resolved to {{host}}:{{port}}",
"testFailed": "Failed to fetch proxy: {{error}}",
"testSuccess": "Proxy working: {{host}}:{{port}}",
"testFailed": "Proxy test failed: {{error}}",
"fetchFailed": "Failed to fetch dynamic proxy: {{error}}"
},
"check": {
+2 -2
View File
@@ -296,8 +296,8 @@
"formatTextHint": "Espera texto como: host:port:username:password o protocol://user:pass@host:port",
"testUrl": "Probar URL",
"testing": "Probando...",
"testSuccess": "El proxy dinámico se resolvió a {{host}}:{{port}}",
"testFailed": "Error al obtener el proxy: {{error}}",
"testSuccess": "Proxy funcionando: {{host}}:{{port}}",
"testFailed": "Prueba de proxy fallida: {{error}}",
"fetchFailed": "Error al obtener el proxy dinámico: {{error}}"
},
"check": {
+2 -2
View File
@@ -296,8 +296,8 @@
"formatTextHint": "Attend du texte comme : host:port:username:password ou protocol://user:pass@host:port",
"testUrl": "Tester l'URL",
"testing": "Test en cours...",
"testSuccess": "Le proxy dynamique a été résolu en {{host}}:{{port}}",
"testFailed": "Échec de la récupération du proxy : {{error}}",
"testSuccess": "Proxy fonctionnel : {{host}}:{{port}}",
"testFailed": "Échec du test de proxy : {{error}}",
"fetchFailed": "Échec de la récupération du proxy dynamique : {{error}}"
},
"check": {
+2 -2
View File
@@ -296,8 +296,8 @@
"formatTextHint": "テキスト形式: host:port:username:password または protocol://user:pass@host:port",
"testUrl": "URLをテスト",
"testing": "テスト中...",
"testSuccess": "ダイナミックプロキシは {{host}}:{{port}} に解決されました",
"testFailed": "プロキシの取得に失敗しました: {{error}}",
"testSuccess": "プロキシ動作中: {{host}}:{{port}}",
"testFailed": "プロキシテスト失敗: {{error}}",
"fetchFailed": "ダイナミックプロキシの取得に失敗しました: {{error}}"
},
"check": {
+2 -2
View File
@@ -296,8 +296,8 @@
"formatTextHint": "Espera texto como: host:port:username:password ou protocol://user:pass@host:port",
"testUrl": "Testar URL",
"testing": "Testando...",
"testSuccess": "O proxy dinâmico foi resolvido para {{host}}:{{port}}",
"testFailed": "Falha ao obter o proxy: {{error}}",
"testSuccess": "Proxy funcionando: {{host}}:{{port}}",
"testFailed": "Falha no teste de proxy: {{error}}",
"fetchFailed": "Falha ao obter o proxy dinâmico: {{error}}"
},
"check": {
+2 -2
View File
@@ -296,8 +296,8 @@
"formatTextHint": "Ожидается текст вида: host:port:username:password или protocol://user:pass@host:port",
"testUrl": "Проверить URL",
"testing": "Проверка...",
"testSuccess": "Динамический прокси разрешён в {{host}}:{{port}}",
"testFailed": "Не удалось получить прокси: {{error}}",
"testSuccess": "Прокси работает: {{host}}:{{port}}",
"testFailed": "Тест прокси не пройден: {{error}}",
"fetchFailed": "Не удалось получить динамический прокси: {{error}}"
},
"check": {
+2 -2
View File
@@ -296,8 +296,8 @@
"formatTextHint": "期望文本格式: host:port:username:password 或 protocol://user:pass@host:port",
"testUrl": "测试URL",
"testing": "测试中...",
"testSuccess": "动态代理已解析为 {{host}}:{{port}}",
"testFailed": "获取代理失败: {{error}}",
"testSuccess": "代理正常运行: {{host}}:{{port}}",
"testFailed": "代理测试失败: {{error}}",
"fetchFailed": "获取动态代理失败: {{error}}"
},
"check": {