refactor: bound proxy connection

This commit is contained in:
zhom
2026-06-23 23:20:24 +04:00
parent f8791a9ec5
commit 50d2834634
+50 -12
View File
@@ -374,7 +374,12 @@ async fn connect_via_http_proxy(
) -> Result<TcpStream, Box<dyn std::error::Error>> {
let proxy_host = upstream.host_str().unwrap_or("127.0.0.1");
let proxy_port = upstream.port().unwrap_or(8080);
let mut stream = TcpStream::connect((proxy_host, proxy_port)).await?;
let mut stream = tokio::time::timeout(
UPSTREAM_DIAL_TIMEOUT,
TcpStream::connect((proxy_host, proxy_port)),
)
.await
.map_err(|_| format!("upstream proxy connect to {proxy_host}:{proxy_port} timed out"))??;
// Add proxy authentication if provided
let mut connect_req = format!(
@@ -394,7 +399,9 @@ async fn connect_via_http_proxy(
stream.write_all(connect_req.as_bytes()).await?;
let mut buffer = [0u8; 4096];
let n = stream.read(&mut buffer).await?;
let n = tokio::time::timeout(UPSTREAM_DIAL_TIMEOUT, stream.read(&mut buffer))
.await
.map_err(|_| "upstream proxy CONNECT response timed out")??;
let response = String::from_utf8_lossy(&buffer[..n]);
if response.starts_with("HTTP/1.1 200") || response.starts_with("HTTP/1.0 200") {
@@ -501,7 +508,9 @@ async fn connect_via_socks(
is_socks5: bool,
auth: Option<(&str, &str)>,
) -> Result<TcpStream, Box<dyn std::error::Error>> {
let stream = TcpStream::connect(socks_addr).await?;
let stream = tokio::time::timeout(UPSTREAM_DIAL_TIMEOUT, TcpStream::connect(socks_addr))
.await
.map_err(|_| format!("SOCKS upstream connect to {socks_addr} timed out"))??;
if is_socks5 {
// SOCKS5 connection using async_socks5
@@ -541,18 +550,26 @@ async fn connect_via_socks(
let label = format!("{socks_addr}->{target_host}:{target_port}");
let logged = SocksHandshakeLogger::new(stream, label);
let mut buffered = tokio::io::BufStream::new(logged);
let handshake = connect(&mut buffered, target, auth_info).await;
let handshake = tokio::time::timeout(
UPSTREAM_DIAL_TIMEOUT,
connect(&mut buffered, target, auth_info),
)
.await;
// Unwrap the layered stream: BufStream → SocksHandshakeLogger → TcpStream
let stream = buffered.into_inner().into_inner();
match handshake {
Ok(_) => {
Ok(Ok(_)) => {
log::trace!("[socks-handshake] handshake completed ok");
Ok(stream)
}
Err(e) => {
Ok(Err(e)) => {
log::trace!("[socks-handshake] handshake failed: {:?}", e);
Err(e.into())
}
Err(_) => {
log::trace!("[socks-handshake] handshake timed out");
Err("SOCKS5 upstream handshake timed out".into())
}
}
} else {
let mut stream = stream;
@@ -1680,6 +1697,14 @@ const MAX_CONCURRENT_CONNECTIONS: usize = 512;
/// full OS timeout.
const DIRECT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
/// Overall timeout for dialing an UPSTREAM proxy (TCP connect + CONNECT/SOCKS/SS
/// handshake). Without it, an upstream that accepts TCP but stalls before
/// replying hangs the worker task forever and holds a connection slot; under
/// load (e.g. two profiles sharing one proxy) the slots exhaust and the browser
/// sees `ERR_PROXY_CONNECTION_FAILED` until the profile is restarted (issue
/// #439). A bounded dial fails fast and releases the slot.
const UPSTREAM_DIAL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20);
/// Per-host failure state (last failure instant, consecutive failure count) for
/// the direct dial path. Process-global — each worker is its own process.
fn direct_dial_failures() -> &'static Mutex<HashMap<String, (std::time::Instant, u32)>> {
@@ -1827,7 +1852,14 @@ pub(crate) async fn connect_to_target_via_upstream(
"http" | "https" => {
let proxy_host = upstream.host_str().unwrap_or("127.0.0.1");
let proxy_port = upstream.port().unwrap_or(8080);
let mut proxy_stream = TcpStream::connect((proxy_host, proxy_port)).await?;
let mut proxy_stream = tokio::time::timeout(
UPSTREAM_DIAL_TIMEOUT,
TcpStream::connect((proxy_host, proxy_port)),
)
.await
.map_err(|_| {
format!("upstream proxy connect to {proxy_host}:{proxy_port} timed out")
})??;
configure_tcp(&proxy_stream);
let mut connect_req = format!(
@@ -1847,7 +1879,9 @@ pub(crate) async fn connect_to_target_via_upstream(
proxy_stream.write_all(connect_req.as_bytes()).await?;
let mut buffer = [0u8; 4096];
let n = proxy_stream.read(&mut buffer).await?;
let n = tokio::time::timeout(UPSTREAM_DIAL_TIMEOUT, proxy_stream.read(&mut buffer))
.await
.map_err(|_| "upstream proxy CONNECT response timed out")??;
let response_full = String::from_utf8_lossy(&buffer[..n]).to_string();
let status_line = response_full.lines().next().unwrap_or("").to_string();
@@ -1946,12 +1980,16 @@ pub(crate) async fn connect_to_target_via_upstream(
let target_addr =
shadowsocks::relay::Address::DomainNameAddress(target_host.to_string(), target_port);
let stream = shadowsocks::relay::tcprelay::proxy_stream::ProxyClientStream::connect(
context,
&svr_cfg,
target_addr,
let stream = tokio::time::timeout(
UPSTREAM_DIAL_TIMEOUT,
shadowsocks::relay::tcprelay::proxy_stream::ProxyClientStream::connect(
context,
&svr_cfg,
target_addr,
),
)
.await
.map_err(|_| "Shadowsocks connection timed out".to_string())?
.map_err(|e| format!("Shadowsocks connection failed: {e}"))?;
Box::new(stream)