From 4598b22af1ab39c40827a27e02ea5aed1eaa18f8 Mon Sep 17 00:00:00 2001 From: zhom <2717306+zhom@users.noreply.github.com> Date: Tue, 23 Jun 2026 22:10:16 +0400 Subject: [PATCH] refactor: add robust proxy lifecycle management" --- src-tauri/src/browser_runner.rs | 8 ++ src-tauri/src/proxy_manager.rs | 36 +++++ src-tauri/src/proxy_server.rs | 224 +++++++++++++++++++++++++++++--- src-tauri/src/proxy_storage.rs | 8 ++ src-tauri/src/socks5_local.rs | 33 +++-- 5 files changed, 280 insertions(+), 29 deletions(-) diff --git a/src-tauri/src/browser_runner.rs b/src-tauri/src/browser_runner.rs index 8b21829..aec04f8 100644 --- a/src-tauri/src/browser_runner.rs +++ b/src-tauri/src/browser_runner.rs @@ -409,6 +409,10 @@ impl BrowserRunner { log::info!("Updated proxy PID mapping from temp (0) to actual PID: {process_id}"); } + // Persist the real browser PID so the detached proxy worker self-reaps + // when this browser dies, even after the GUI exits/restarts. + PROXY_MANAGER.set_browser_pid_for_profile(&updated_profile.id.to_string(), process_id); + // Save the updated profile (includes new fingerprint if randomize is enabled) log::info!( "Saving profile {} with camoufox_config fingerprint length: {}", @@ -696,6 +700,10 @@ impl BrowserRunner { log::info!("Updated proxy PID mapping from temp (0) to actual PID: {process_id}"); } + // Persist the real browser PID so the detached proxy worker self-reaps + // when this browser dies, even after the GUI exits/restarts. + PROXY_MANAGER.set_browser_pid_for_profile(&updated_profile.id.to_string(), process_id); + // Save the updated profile log::info!( "Saving profile {} with wayfern_config fingerprint length: {}", diff --git a/src-tauri/src/proxy_manager.rs b/src-tauri/src/proxy_manager.rs index 59404a8..9e4c4ec 100644 --- a/src-tauri/src/proxy_manager.rs +++ b/src-tauri/src/proxy_manager.rs @@ -1860,6 +1860,38 @@ impl ProxyManager { } } + /// Persist the real browser PID onto the worker's on-disk config so the + /// detached worker can self-terminate when that browser dies, independent of + /// the GUI being alive. Resolved via the profile→proxy_id map rather than the + /// PID-keyed `active_proxies` map: the latter uses a placeholder key 0 during + /// launch that collides across concurrent launches, which could tag a live + /// worker with the wrong (dead) PID and make it self-exit. Safe on the reuse + /// path — it simply rewrites `browser_pid` to the new live PID. A `browser_pid` + /// of 0 (launch failed to report a PID) is ignored so the worker never + /// self-exits against a bogus PID. + pub fn set_browser_pid_for_profile(&self, profile_id: &str, browser_pid: u32) { + if browser_pid == 0 { + return; + } + let proxy_id = { + let map = self.profile_active_proxy_ids.lock().unwrap(); + match map.get(profile_id) { + Some(id) => id.clone(), + None => return, // No local worker for this profile — nothing to tag. + } + }; + if let Some(mut cfg) = crate::proxy_storage::get_proxy_config(&proxy_id) { + cfg.browser_pid = Some(browser_pid); + if crate::proxy_storage::update_proxy_config(&cfg) { + log::info!( + "Recorded browser PID {browser_pid} on proxy config {proxy_id} for self-reaping" + ); + } else { + log::warn!("Failed to persist browser_pid {browser_pid} to proxy config {proxy_id}"); + } + } + } + // Clean up proxies for dead browser processes // Only clean up orphaned config files where the proxy process itself is dead pub async fn cleanup_dead_proxies( @@ -2894,6 +2926,7 @@ mod tests { bypass_rules: Vec::new(), blocklist_file: None, local_protocol: None, + browser_pid: None, }; let dead_config = ProxyConfig { id: dead_id.clone(), @@ -2906,6 +2939,7 @@ mod tests { bypass_rules: Vec::new(), blocklist_file: None, local_protocol: None, + browser_pid: None, }; save_proxy_config(&live_config).unwrap(); @@ -2946,6 +2980,7 @@ mod tests { bypass_rules: vec!["*.local".to_string(), "192.168.*".to_string()], blocklist_file: None, local_protocol: None, + browser_pid: None, }; // Save @@ -3265,6 +3300,7 @@ mod tests { bypass_rules: Vec::new(), blocklist_file: None, local_protocol: None, + browser_pid: None, }; save_proxy_config(&config).unwrap(); diff --git a/src-tauri/src/proxy_server.rs b/src-tauri/src/proxy_server.rs index 003f8cb..b519abc 100644 --- a/src-tauri/src/proxy_server.rs +++ b/src-tauri/src/proxy_server.rs @@ -7,13 +7,13 @@ use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use hyper_util::rt::TokioIo; use regex_lite::Regex; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::convert::Infallible; use std::io; use std::net::SocketAddr; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex, OnceLock}; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::net::TcpStream; @@ -1260,7 +1260,16 @@ pub async fn handle_proxy_connection( ) .await { - log::warn!("CONNECT tunnel ended with error: {e}"); + let msg = e.to_string(); + if let Some(suppressed) = log_throttle(&msg) { + if suppressed > 0 { + log::warn!( + "CONNECT tunnel ended with error: {msg} ({suppressed} more suppressed in last 30s)" + ); + } else { + log::warn!("CONNECT tunnel ended with error: {msg}"); + } + } } return; } @@ -1479,6 +1488,48 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box match cfg.browser_pid { + Some(bpid) if bpid != 0 => { + if crate::proxy_storage::is_process_running(bpid) { + consecutive_misses = 0; + } else { + consecutive_misses += 1; + if consecutive_misses >= 2 { + log::info!("Browser PID {bpid} for config {watch_id} is gone; worker exiting"); + crate::proxy_storage::delete_proxy_config(&watch_id); + std::process::exit(0); + } + } + } + // No browser PID recorded yet (launch window / old config): keep running. + _ => consecutive_misses = 0, + }, + // Our own config was removed (e.g. GUI stopped us): nothing to serve. + None => { + log::info!("Proxy config {watch_id} was removed; worker exiting"); + std::process::exit(0); + } + } + } + }); + } + let bypass_matcher = BypassMatcher::new(&config.bypass_rules); let blocklist_matcher = if let Some(ref path) = config.blocklist_file { match BlocklistMatcher::from_file(path) { @@ -1492,20 +1543,37 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box { + // The semaphore is never closed, so acquire cannot fail. + let permit = conn_semaphore + .clone() + .acquire_owned() + .await + .expect("connection semaphore is never closed"); let upstream = upstream_url.clone(); let matcher = bypass_matcher.clone(); let blocker = blocklist_matcher.clone(); if serve_socks5 { tokio::task::spawn(async move { + let _permit = permit; crate::socks5_local::handle_socks5_connection(stream, upstream, matcher, blocker).await; }); } else { tokio::task::spawn(async move { + let _permit = permit; handle_proxy_connection(stream, upstream, matcher, blocker).await; }); } @@ -1571,7 +1639,7 @@ async fn handle_connect_from_buffer( tracker.record_request(&domain, 0, 0); } - log::info!( + log::debug!( "CONNECT {}:{} (upstream={})", target_host, target_port, @@ -1601,6 +1669,137 @@ async fn handle_connect_from_buffer( Ok(()) } +/// Upper bound on concurrent connection handlers per worker. A real browser +/// never holds anywhere near this many simultaneous tunnels; the cap stops a +/// client retry-storm from spawning unbounded tasks (each of which parks a +/// Tokio blocking thread inside getaddrinfo). +const MAX_CONCURRENT_CONNECTIONS: usize = 512; + +/// Connect timeout for the direct (no-upstream) dial path. Bounds a wedged +/// `getaddrinfo` so a broken resolver can't park a blocking thread for the +/// full OS timeout. +const DIRECT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +/// Per-host failure state (last failure instant, consecutive failure count) for +/// the direct dial path. Process-global — each worker is its own process. +fn direct_dial_failures() -> &'static Mutex> { + static M: OnceLock>> = OnceLock::new(); + M.get_or_init(|| Mutex::new(HashMap::new())) +} + +/// If `host` is inside its failure backoff window, return the remaining time so +/// the caller can short-circuit without a fresh getaddrinfo/connect. Never +/// mutates state, so the window always expires and the path self-heals once +/// DNS recovers. +fn direct_backoff_remaining(host: &str) -> Option { + let map = direct_dial_failures(); + let guard = map.lock().unwrap(); + let (last, fails) = guard.get(host).copied()?; + // Exponential window capped at 30s: 2, 4, 8, 16, 30, 30, ... + let window = std::time::Duration::from_secs((1u64 << fails.min(5)).min(30)); + let elapsed = last.elapsed(); + if elapsed < window { + Some(window - elapsed) + } else { + None + } +} + +/// Record a direct-dial failure for `host`, growing its backoff window. +fn direct_backoff_record(host: &str) { + let map = direct_dial_failures(); + let mut guard = map.lock().unwrap(); + // Bound memory against a page that emits many distinct failing hosts. + if guard.len() > 2048 { + guard.retain(|_, (last, _)| last.elapsed() < std::time::Duration::from_secs(60)); + } + let entry = guard + .entry(host.to_string()) + .or_insert_with(|| (std::time::Instant::now(), 0)); + entry.0 = std::time::Instant::now(); + entry.1 = entry.1.saturating_add(1); +} + +/// Clear `host`'s failure state after a successful dial. +fn direct_backoff_clear(host: &str) { + direct_dial_failures().lock().unwrap().remove(host); +} + +/// Dial a target directly (no upstream) with a connect timeout and per-host +/// failure backoff. This is the server-side counterpart to the browser's +/// instant client-side retry: when a host's DNS/connect is failing (e.g. the +/// macOS resolver wedges after sleep/wake), repeated CONNECT requests +/// short-circuit +/// here instead of each spawning a fresh blocking getaddrinfo — which is what +/// let a retry-storm exhaust the blocking thread pool and peg the CPU. +async fn dial_direct(host: &str, port: u16) -> Result> { + if let Some(remaining) = direct_backoff_remaining(host) { + return Err( + format!( + "skipping direct dial to {host}: backing off ~{}s after repeated connect failures", + remaining.as_secs().max(1) + ) + .into(), + ); + } + match tokio::time::timeout(DIRECT_CONNECT_TIMEOUT, TcpStream::connect((host, port))).await { + Ok(Ok(stream)) => { + let _ = stream.set_nodelay(true); + direct_backoff_clear(host); + Ok(stream) + } + Ok(Err(e)) => { + direct_backoff_record(host); + Err(e.into()) + } + Err(_) => { + direct_backoff_record(host); + Err( + format!( + "direct connect to {host}:{port} timed out after {}s", + DIRECT_CONNECT_TIMEOUT.as_secs() + ) + .into(), + ) + } + } +} + +/// Rate-limit a repetitive log line keyed by `key`: returns `Some(suppressed)` +/// when the caller should emit (first time or after a 30s window, with the +/// count dropped since the last emit), or `None` to skip. Stops a connect/DNS +/// storm from writing the same WARN millions of times (the line that grew +/// worker logs to 100MB). +pub(crate) fn log_throttle(key: &str) -> Option { + fn throttle_map() -> &'static Mutex> { + static M: OnceLock>> = OnceLock::new(); + M.get_or_init(|| Mutex::new(HashMap::new())) + } + let map = throttle_map(); + let mut guard = map.lock().unwrap(); + if guard.len() > 2048 { + guard.retain(|_, (last, _)| last.elapsed() < std::time::Duration::from_secs(60)); + } + let now = std::time::Instant::now(); + match guard.get_mut(key) { + Some((last, suppressed)) => { + if now.duration_since(*last) >= std::time::Duration::from_secs(30) { + let dropped = *suppressed; + *last = now; + *suppressed = 0; + Some(dropped) + } else { + *suppressed += 1; + None + } + } + None => { + guard.insert(key.to_string(), (now, 0)); + Some(0) + } + } +} + /// Establish a stream to `target_host:target_port`, either directly or through /// the configured upstream proxy. Shared by the HTTP CONNECT path and the /// local SOCKS5 server so every upstream type (direct, HTTP/HTTPS CONNECT, @@ -1618,21 +1817,8 @@ pub(crate) async fn connect_to_target_via_upstream( let _ = stream.set_nodelay(true); }; let target_stream: BoxedAsyncStream = match upstream_url { - None => { - let s = TcpStream::connect((target_host, target_port)).await?; - configure_tcp(&s); - Box::new(s) - } - Some("DIRECT") => { - let s = TcpStream::connect((target_host, target_port)).await?; - configure_tcp(&s); - Box::new(s) - } - _ if should_bypass => { - let s = TcpStream::connect((target_host, target_port)).await?; - configure_tcp(&s); - Box::new(s) - } + None | Some("DIRECT") => Box::new(dial_direct(target_host, target_port).await?), + _ if should_bypass => Box::new(dial_direct(target_host, target_port).await?), Some(upstream_url_str) => { let upstream = Url::parse(upstream_url_str)?; let scheme = upstream.scheme(); diff --git a/src-tauri/src/proxy_storage.rs b/src-tauri/src/proxy_storage.rs index f4e5073..fff84e6 100644 --- a/src-tauri/src/proxy_storage.rs +++ b/src-tauri/src/proxy_storage.rs @@ -22,6 +22,13 @@ pub struct ProxyConfig { /// `upstream_url`, which is the real upstream proxy/VPN this worker dials. #[serde(default)] pub local_protocol: Option, + /// PID of the browser process this worker serves, recorded by the GUI after + /// launch. The detached worker watches this and self-terminates when the + /// browser dies, so it dies with its browser even if the GUI has exited or + /// restarted. `None` until launch completes (the worker keeps running while + /// it is `None`). + #[serde(default)] + pub browser_pid: Option, } impl ProxyConfig { @@ -37,6 +44,7 @@ impl ProxyConfig { bypass_rules: Vec::new(), blocklist_file: None, local_protocol: None, + browser_pid: None, } } diff --git a/src-tauri/src/socks5_local.rs b/src-tauri/src/socks5_local.rs index 1dfd154..4055e4f 100644 --- a/src-tauri/src/socks5_local.rs +++ b/src-tauri/src/socks5_local.rs @@ -243,7 +243,7 @@ async fn handle_connect( tracker.record_request(&host, 0, 0); } - log::info!( + log::debug!( "SOCKS5 CONNECT {}:{} (upstream={})", host, port, @@ -252,16 +252,29 @@ async fn handle_connect( // Resolve to the target stream, logging and dropping the (non-Send) dial // error inside the match arm so it is never held across the await below. - let target = - match connect_to_target_via_upstream(&host, port, upstream_url.as_deref(), &bypass_matcher) - .await - { - Ok(t) => Some(t), - Err(e) => { - log::warn!("SOCKS5 CONNECT to {host}:{port} failed: {e}"); - None + let target = match connect_to_target_via_upstream( + &host, + port, + upstream_url.as_deref(), + &bypass_matcher, + ) + .await + { + Ok(t) => Some(t), + Err(e) => { + let key = format!("socks5-connect:{host}:{port}"); + if let Some(suppressed) = crate::proxy_server::log_throttle(&key) { + if suppressed > 0 { + log::warn!( + "SOCKS5 CONNECT to {host}:{port} failed: {e} ({suppressed} more suppressed in last 30s)" + ); + } else { + log::warn!("SOCKS5 CONNECT to {host}:{port} failed: {e}"); + } } - }; + None + } + }; let Some(target) = target else { let _ = send_reply(&mut stream, REP_GENERAL_FAILURE, unspecified()).await;