refactor: add robust proxy lifecycle management"

This commit is contained in:
zhom
2026-06-23 22:10:16 +04:00
parent 4ac4c6e8a9
commit 4598b22af1
5 changed files with 280 additions and 29 deletions
+8
View File
@@ -409,6 +409,10 @@ impl BrowserRunner {
log::info!("Updated proxy PID mapping from temp (0) to actual PID: {process_id}");
}
// Persist the real browser PID so the detached proxy worker self-reaps
// when this browser dies, even after the GUI exits/restarts.
PROXY_MANAGER.set_browser_pid_for_profile(&updated_profile.id.to_string(), process_id);
// Save the updated profile (includes new fingerprint if randomize is enabled)
log::info!(
"Saving profile {} with camoufox_config fingerprint length: {}",
@@ -696,6 +700,10 @@ impl BrowserRunner {
log::info!("Updated proxy PID mapping from temp (0) to actual PID: {process_id}");
}
// Persist the real browser PID so the detached proxy worker self-reaps
// when this browser dies, even after the GUI exits/restarts.
PROXY_MANAGER.set_browser_pid_for_profile(&updated_profile.id.to_string(), process_id);
// Save the updated profile
log::info!(
"Saving profile {} with wayfern_config fingerprint length: {}",
+36
View File
@@ -1860,6 +1860,38 @@ impl ProxyManager {
}
}
/// Persist the real browser PID onto the worker's on-disk config so the
/// detached worker can self-terminate when that browser dies, independent of
/// the GUI being alive. Resolved via the profile→proxy_id map rather than the
/// PID-keyed `active_proxies` map: the latter uses a placeholder key 0 during
/// launch that collides across concurrent launches, which could tag a live
/// worker with the wrong (dead) PID and make it self-exit. Safe on the reuse
/// path — it simply rewrites `browser_pid` to the new live PID. A `browser_pid`
/// of 0 (launch failed to report a PID) is ignored so the worker never
/// self-exits against a bogus PID.
pub fn set_browser_pid_for_profile(&self, profile_id: &str, browser_pid: u32) {
if browser_pid == 0 {
return;
}
let proxy_id = {
let map = self.profile_active_proxy_ids.lock().unwrap();
match map.get(profile_id) {
Some(id) => id.clone(),
None => return, // No local worker for this profile — nothing to tag.
}
};
if let Some(mut cfg) = crate::proxy_storage::get_proxy_config(&proxy_id) {
cfg.browser_pid = Some(browser_pid);
if crate::proxy_storage::update_proxy_config(&cfg) {
log::info!(
"Recorded browser PID {browser_pid} on proxy config {proxy_id} for self-reaping"
);
} else {
log::warn!("Failed to persist browser_pid {browser_pid} to proxy config {proxy_id}");
}
}
}
// Clean up proxies for dead browser processes
// Only clean up orphaned config files where the proxy process itself is dead
pub async fn cleanup_dead_proxies(
@@ -2894,6 +2926,7 @@ mod tests {
bypass_rules: Vec::new(),
blocklist_file: None,
local_protocol: None,
browser_pid: None,
};
let dead_config = ProxyConfig {
id: dead_id.clone(),
@@ -2906,6 +2939,7 @@ mod tests {
bypass_rules: Vec::new(),
blocklist_file: None,
local_protocol: None,
browser_pid: None,
};
save_proxy_config(&live_config).unwrap();
@@ -2946,6 +2980,7 @@ mod tests {
bypass_rules: vec!["*.local".to_string(), "192.168.*".to_string()],
blocklist_file: None,
local_protocol: None,
browser_pid: None,
};
// Save
@@ -3265,6 +3300,7 @@ mod tests {
bypass_rules: Vec::new(),
blocklist_file: None,
local_protocol: None,
browser_pid: None,
};
save_proxy_config(&config).unwrap();
+205 -19
View File
@@ -7,13 +7,13 @@ use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use regex_lite::Regex;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::convert::Infallible;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex, OnceLock};
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::net::TcpStream;
@@ -1260,7 +1260,16 @@ pub async fn handle_proxy_connection(
)
.await
{
log::warn!("CONNECT tunnel ended with error: {e}");
let msg = e.to_string();
if let Some(suppressed) = log_throttle(&msg) {
if suppressed > 0 {
log::warn!(
"CONNECT tunnel ended with error: {msg} ({suppressed} more suppressed in last 30s)"
);
} else {
log::warn!("CONNECT tunnel ended with error: {msg}");
}
}
}
return;
}
@@ -1479,6 +1488,48 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
}
});
// Self-reaping supervisor. The worker is a detached process that outlives the
// GUI, so it cannot rely on the GUI's in-memory death-monitor (which is lost
// when the GUI restarts). Once the GUI records the browser PID this worker
// serves, poll it and exit when that browser is gone — never while it is
// alive, and never before a PID is recorded (covers the launch window and
// pre-upgrade configs lacking the field). A 2-miss debounce avoids exiting on
// a transient sysinfo false-negative under load / sleep-wake.
{
let watch_id = config.id.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(15));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut consecutive_misses: u32 = 0;
loop {
interval.tick().await;
match crate::proxy_storage::get_proxy_config(&watch_id) {
Some(cfg) => match cfg.browser_pid {
Some(bpid) if bpid != 0 => {
if crate::proxy_storage::is_process_running(bpid) {
consecutive_misses = 0;
} else {
consecutive_misses += 1;
if consecutive_misses >= 2 {
log::info!("Browser PID {bpid} for config {watch_id} is gone; worker exiting");
crate::proxy_storage::delete_proxy_config(&watch_id);
std::process::exit(0);
}
}
}
// No browser PID recorded yet (launch window / old config): keep running.
_ => consecutive_misses = 0,
},
// Our own config was removed (e.g. GUI stopped us): nothing to serve.
None => {
log::info!("Proxy config {watch_id} was removed; worker exiting");
std::process::exit(0);
}
}
}
});
}
let bypass_matcher = BypassMatcher::new(&config.bypass_rules);
let blocklist_matcher = if let Some(ref path) = config.blocklist_file {
match BlocklistMatcher::from_file(path) {
@@ -1492,20 +1543,37 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
BlocklistMatcher::new()
};
// Bound concurrent connection handlers. A client retry-storm (e.g. a browser
// hammering CONNECT requests while DNS is failing) must not spawn unbounded
// tasks,
// each of which parks a Tokio blocking thread inside getaddrinfo — that is
// what exhausted the resolver pool and pegged the CPU on long-lived workers.
// A real browser never approaches this ceiling; waiting for a permit
// backpressures a storm instead of amplifying it.
let conn_semaphore = Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CONNECTIONS));
// Keep the runtime alive with an infinite loop
// This ensures the process doesn't exit even if there are no active connections
loop {
match listener.accept().await {
Ok((stream, _peer_addr)) => {
// The semaphore is never closed, so acquire cannot fail.
let permit = conn_semaphore
.clone()
.acquire_owned()
.await
.expect("connection semaphore is never closed");
let upstream = upstream_url.clone();
let matcher = bypass_matcher.clone();
let blocker = blocklist_matcher.clone();
if serve_socks5 {
tokio::task::spawn(async move {
let _permit = permit;
crate::socks5_local::handle_socks5_connection(stream, upstream, matcher, blocker).await;
});
} else {
tokio::task::spawn(async move {
let _permit = permit;
handle_proxy_connection(stream, upstream, matcher, blocker).await;
});
}
@@ -1571,7 +1639,7 @@ async fn handle_connect_from_buffer(
tracker.record_request(&domain, 0, 0);
}
log::info!(
log::debug!(
"CONNECT {}:{} (upstream={})",
target_host,
target_port,
@@ -1601,6 +1669,137 @@ async fn handle_connect_from_buffer(
Ok(())
}
/// Upper bound on concurrent connection handlers per worker. A real browser
/// never holds anywhere near this many simultaneous tunnels; the cap stops a
/// client retry-storm from spawning unbounded tasks (each of which parks a
/// Tokio blocking thread inside getaddrinfo).
const MAX_CONCURRENT_CONNECTIONS: usize = 512;
/// Connect timeout for the direct (no-upstream) dial path. Bounds a wedged
/// `getaddrinfo` so a broken resolver can't park a blocking thread for the
/// full OS timeout.
const DIRECT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
/// Per-host failure state (last failure instant, consecutive failure count) for
/// the direct dial path. Process-global — each worker is its own process.
fn direct_dial_failures() -> &'static Mutex<HashMap<String, (std::time::Instant, u32)>> {
static M: OnceLock<Mutex<HashMap<String, (std::time::Instant, u32)>>> = OnceLock::new();
M.get_or_init(|| Mutex::new(HashMap::new()))
}
/// If `host` is inside its failure backoff window, return the remaining time so
/// the caller can short-circuit without a fresh getaddrinfo/connect. Never
/// mutates state, so the window always expires and the path self-heals once
/// DNS recovers.
fn direct_backoff_remaining(host: &str) -> Option<std::time::Duration> {
let map = direct_dial_failures();
let guard = map.lock().unwrap();
let (last, fails) = guard.get(host).copied()?;
// Exponential window capped at 30s: 2, 4, 8, 16, 30, 30, ...
let window = std::time::Duration::from_secs((1u64 << fails.min(5)).min(30));
let elapsed = last.elapsed();
if elapsed < window {
Some(window - elapsed)
} else {
None
}
}
/// Record a direct-dial failure for `host`, growing its backoff window.
fn direct_backoff_record(host: &str) {
let map = direct_dial_failures();
let mut guard = map.lock().unwrap();
// Bound memory against a page that emits many distinct failing hosts.
if guard.len() > 2048 {
guard.retain(|_, (last, _)| last.elapsed() < std::time::Duration::from_secs(60));
}
let entry = guard
.entry(host.to_string())
.or_insert_with(|| (std::time::Instant::now(), 0));
entry.0 = std::time::Instant::now();
entry.1 = entry.1.saturating_add(1);
}
/// Clear `host`'s failure state after a successful dial.
fn direct_backoff_clear(host: &str) {
direct_dial_failures().lock().unwrap().remove(host);
}
/// Dial a target directly (no upstream) with a connect timeout and per-host
/// failure backoff. This is the server-side counterpart to the browser's
/// instant client-side retry: when a host's DNS/connect is failing (e.g. the
/// macOS resolver wedges after sleep/wake), repeated CONNECT requests
/// short-circuit
/// here instead of each spawning a fresh blocking getaddrinfo — which is what
/// let a retry-storm exhaust the blocking thread pool and peg the CPU.
async fn dial_direct(host: &str, port: u16) -> Result<TcpStream, Box<dyn std::error::Error>> {
if let Some(remaining) = direct_backoff_remaining(host) {
return Err(
format!(
"skipping direct dial to {host}: backing off ~{}s after repeated connect failures",
remaining.as_secs().max(1)
)
.into(),
);
}
match tokio::time::timeout(DIRECT_CONNECT_TIMEOUT, TcpStream::connect((host, port))).await {
Ok(Ok(stream)) => {
let _ = stream.set_nodelay(true);
direct_backoff_clear(host);
Ok(stream)
}
Ok(Err(e)) => {
direct_backoff_record(host);
Err(e.into())
}
Err(_) => {
direct_backoff_record(host);
Err(
format!(
"direct connect to {host}:{port} timed out after {}s",
DIRECT_CONNECT_TIMEOUT.as_secs()
)
.into(),
)
}
}
}
/// Rate-limit a repetitive log line keyed by `key`: returns `Some(suppressed)`
/// when the caller should emit (first time or after a 30s window, with the
/// count dropped since the last emit), or `None` to skip. Stops a connect/DNS
/// storm from writing the same WARN millions of times (the line that grew
/// worker logs to 100MB).
pub(crate) fn log_throttle(key: &str) -> Option<u64> {
fn throttle_map() -> &'static Mutex<HashMap<String, (std::time::Instant, u64)>> {
static M: OnceLock<Mutex<HashMap<String, (std::time::Instant, u64)>>> = OnceLock::new();
M.get_or_init(|| Mutex::new(HashMap::new()))
}
let map = throttle_map();
let mut guard = map.lock().unwrap();
if guard.len() > 2048 {
guard.retain(|_, (last, _)| last.elapsed() < std::time::Duration::from_secs(60));
}
let now = std::time::Instant::now();
match guard.get_mut(key) {
Some((last, suppressed)) => {
if now.duration_since(*last) >= std::time::Duration::from_secs(30) {
let dropped = *suppressed;
*last = now;
*suppressed = 0;
Some(dropped)
} else {
*suppressed += 1;
None
}
}
None => {
guard.insert(key.to_string(), (now, 0));
Some(0)
}
}
}
/// Establish a stream to `target_host:target_port`, either directly or through
/// the configured upstream proxy. Shared by the HTTP CONNECT path and the
/// local SOCKS5 server so every upstream type (direct, HTTP/HTTPS CONNECT,
@@ -1618,21 +1817,8 @@ pub(crate) async fn connect_to_target_via_upstream(
let _ = stream.set_nodelay(true);
};
let target_stream: BoxedAsyncStream = match upstream_url {
None => {
let s = TcpStream::connect((target_host, target_port)).await?;
configure_tcp(&s);
Box::new(s)
}
Some("DIRECT") => {
let s = TcpStream::connect((target_host, target_port)).await?;
configure_tcp(&s);
Box::new(s)
}
_ if should_bypass => {
let s = TcpStream::connect((target_host, target_port)).await?;
configure_tcp(&s);
Box::new(s)
}
None | Some("DIRECT") => Box::new(dial_direct(target_host, target_port).await?),
_ if should_bypass => Box::new(dial_direct(target_host, target_port).await?),
Some(upstream_url_str) => {
let upstream = Url::parse(upstream_url_str)?;
let scheme = upstream.scheme();
+8
View File
@@ -22,6 +22,13 @@ pub struct ProxyConfig {
/// `upstream_url`, which is the real upstream proxy/VPN this worker dials.
#[serde(default)]
pub local_protocol: Option<String>,
/// PID of the browser process this worker serves, recorded by the GUI after
/// launch. The detached worker watches this and self-terminates when the
/// browser dies, so it dies with its browser even if the GUI has exited or
/// restarted. `None` until launch completes (the worker keeps running while
/// it is `None`).
#[serde(default)]
pub browser_pid: Option<u32>,
}
impl ProxyConfig {
@@ -37,6 +44,7 @@ impl ProxyConfig {
bypass_rules: Vec::new(),
blocklist_file: None,
local_protocol: None,
browser_pid: None,
}
}
+23 -10
View File
@@ -243,7 +243,7 @@ async fn handle_connect(
tracker.record_request(&host, 0, 0);
}
log::info!(
log::debug!(
"SOCKS5 CONNECT {}:{} (upstream={})",
host,
port,
@@ -252,16 +252,29 @@ async fn handle_connect(
// Resolve to the target stream, logging and dropping the (non-Send) dial
// error inside the match arm so it is never held across the await below.
let target =
match connect_to_target_via_upstream(&host, port, upstream_url.as_deref(), &bypass_matcher)
.await
{
Ok(t) => Some(t),
Err(e) => {
log::warn!("SOCKS5 CONNECT to {host}:{port} failed: {e}");
None
let target = match connect_to_target_via_upstream(
&host,
port,
upstream_url.as_deref(),
&bypass_matcher,
)
.await
{
Ok(t) => Some(t),
Err(e) => {
let key = format!("socks5-connect:{host}:{port}");
if let Some(suppressed) = crate::proxy_server::log_throttle(&key) {
if suppressed > 0 {
log::warn!(
"SOCKS5 CONNECT to {host}:{port} failed: {e} ({suppressed} more suppressed in last 30s)"
);
} else {
log::warn!("SOCKS5 CONNECT to {host}:{port} failed: {e}");
}
}
};
None
}
};
let Some(target) = target else {
let _ = send_reply(&mut stream, REP_GENERAL_FAILURE, unspecified()).await;