refactor: proxy spawn

This commit is contained in:
zhom
2026-05-10 20:33:29 +04:00
parent 85e0072915
commit 722aaecbbe
2 changed files with 36 additions and 57 deletions
+17 -10
View File
@@ -82,9 +82,14 @@ fn build_proxy_url(
#[tokio::main(flavor = "multi_thread")]
async fn main() {
// Initialize logger to write to stderr (which will be redirected to file)
// Initialize logger to write to stderr (which will be redirected to file).
//
// Default filter is Info — Debug pulls in reqwest/hyper internals which
// make the per-worker log unreadable on a busy browser session and obscure
// the actual lines we care about (binds, accept errors, upstream failures).
// RUST_LOG=debug or RUST_LOG=donut_proxy=trace still works for deep dives.
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Debug)
.filter_level(log::LevelFilter::Info)
.format_timestamp_millis()
.init();
@@ -343,8 +348,11 @@ async fn main() {
// Set high priority so this process is killed last under resource pressure
set_high_priority();
log::error!("Proxy worker starting, looking for config id: {}", id);
log::error!("Process PID: {}", std::process::id());
log::info!(
"Proxy worker starting (pid {}, config id {})",
std::process::id(),
id
);
// Retry config loading to handle file system race condition on Windows
// where the config file may not be immediately visible after being written
@@ -352,7 +360,7 @@ async fn main() {
let mut attempts = 0;
loop {
if let Some(config) = get_proxy_config(id) {
log::error!(
log::info!(
"Found config: id={}, port={:?}, upstream={}",
config.id,
config.local_port,
@@ -369,20 +377,19 @@ async fn main() {
);
process::exit(1);
}
log::error!("Config {} not found yet, retrying ({}/10)...", id, attempts);
log::debug!("Config {} not found yet, retrying ({}/10)...", id, attempts);
std::thread::sleep(std::time::Duration::from_millis(50));
}
};
// Run the proxy server - this should never return (infinite loop)
log::error!("Starting proxy server for config id: {}", id);
log::info!("Starting proxy server for config id: {}", id);
if let Err(e) = run_proxy_server(config).await {
log::error!("Failed to run proxy server: {}", e);
log::error!("Error details: {:?}", e);
log::error!("Proxy server failed: {} ({:?})", e, e);
process::exit(1);
}
// This should never be reached - run_proxy_server has an infinite loop
log::error!("ERROR: Proxy server returned unexpectedly (this should never happen)");
log::error!("Proxy server returned unexpectedly (this should never happen)");
process::exit(1);
} else {
log::error!("Invalid action for proxy-worker. Use 'start'");
+19 -47
View File
@@ -918,8 +918,8 @@ async fn handle_http(
return Ok(response);
}
log::error!(
"DEBUG: Handling HTTP request: {} {} (host: {:?})",
log::trace!(
"Handling HTTP request: {} {} (host: {:?})",
req.method(),
req.uri(),
req.uri().host()
@@ -1182,7 +1182,7 @@ pub async fn handle_proxy_connection(
}
pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::error::Error>> {
log::error!(
log::info!(
"Proxy worker starting, looking for config id: {}",
config.id
);
@@ -1196,7 +1196,7 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
}
};
log::error!(
log::info!(
"Found config: id={}, port={:?}, upstream={}, profile_id={:?}",
config.id,
config.local_port,
@@ -1204,32 +1204,14 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
config.profile_id
);
log::error!("Starting proxy server for config id: {}", config.id);
// Initialize traffic tracker with profile ID if available
// This can now be called multiple times to update the tracker
// Initialize traffic tracker with profile ID if available.
// This can be called multiple times to update the tracker.
init_traffic_tracker(config.id.clone(), config.profile_id.clone());
log::error!(
"Traffic tracker initialized for proxy: {} (profile_id: {:?})",
config.id,
config.profile_id
);
// Verify tracker was initialized correctly
if let Some(tracker) = crate::traffic_stats::get_traffic_tracker() {
log::error!(
"Tracker verified: proxy_id={}, profile_id={:?}",
tracker.proxy_id,
tracker.profile_id
);
} else {
log::error!("WARNING: Tracker was not initialized!");
}
// Determine the bind address
let bind_addr = SocketAddr::from(([127, 0, 0, 1], config.local_port.unwrap_or(0)));
log::error!("Attempting to bind proxy server to {}", bind_addr);
log::info!("Attempting to bind proxy server to {}", bind_addr);
// Bind to the port. Use SO_REUSEADDR so that a freshly-restarted worker
// can bind a port that the previous worker left in TIME_WAIT, and retry
@@ -1276,18 +1258,13 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
};
let actual_port = listener.local_addr()?.port();
log::error!("Successfully bound to port {}", actual_port);
log::info!("Successfully bound to port {}", actual_port);
// Update config with actual port and local_url
let mut updated_config = config.clone();
updated_config.local_port = Some(actual_port);
updated_config.local_url = Some(format!("http://127.0.0.1:{}", actual_port));
// Save the updated config
log::error!(
"Saving updated config with local_url={:?}",
updated_config.local_url
);
if !crate::proxy_storage::update_proxy_config(&updated_config) {
log::error!("Failed to update proxy config");
return Err("Failed to update proxy config".into());
@@ -1299,12 +1276,11 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
Some(updated_config.upstream_url.clone())
};
log::error!("Proxy server bound to 127.0.0.1:{}", actual_port);
log::error!(
log::info!(
"Proxy server listening on 127.0.0.1:{} (ready to accept connections)",
actual_port
);
log::error!("Proxy server entering accept loop - process should stay alive");
log::info!("Proxy server entering accept loop - process should stay alive");
// Start a background task to write lightweight session snapshots for real-time updates
// These are much smaller than full stats and can be written frequently (~100 bytes every 2 seconds)
@@ -1623,7 +1599,7 @@ async fn handle_connect_from_buffer(
.await?;
client_stream.flush().await?;
log::error!("DEBUG: Sent 200 Connection Established response, starting tunnel");
log::trace!("Sent 200 Connection Established response, starting tunnel");
// Now tunnel data bidirectionally with counting
// Wrap streams to count bytes transferred
@@ -1640,17 +1616,17 @@ async fn handle_connect_from_buffer(
let (mut client_read, mut client_write) = tokio::io::split(counting_client);
let (mut target_read, mut target_write) = tokio::io::split(counting_target);
log::error!("DEBUG: Starting bidirectional tunnel");
log::trace!("Starting bidirectional tunnel");
// Spawn two tasks to forward data in both directions
let client_to_target = tokio::spawn(async move {
let result = tokio::io::copy(&mut client_read, &mut target_write).await;
match result {
Ok(bytes) => {
log::error!("DEBUG: Tunneled {} bytes from client->target", bytes);
log::trace!("Tunneled {bytes} bytes from client->target");
}
Err(e) => {
log::error!("Error forwarding client->target: {:?}", e);
log::debug!("Error forwarding client->target: {e:?}");
}
}
});
@@ -1659,10 +1635,10 @@ async fn handle_connect_from_buffer(
let result = tokio::io::copy(&mut target_read, &mut client_write).await;
match result {
Ok(bytes) => {
log::error!("DEBUG: Tunneled {} bytes from target->client", bytes);
log::trace!("Tunneled {bytes} bytes from target->client");
}
Err(e) => {
log::error!("Error forwarding target->client: {:?}", e);
log::debug!("Error forwarding target->client: {e:?}");
}
}
});
@@ -1670,10 +1646,10 @@ async fn handle_connect_from_buffer(
// Wait for either direction to finish (connection closed)
tokio::select! {
_ = client_to_target => {
log::error!("DEBUG: Client->target tunnel closed");
log::trace!("Client->target tunnel closed");
}
_ = target_to_client => {
log::error!("DEBUG: Target->client tunnel closed");
log::trace!("Target->client tunnel closed");
}
}
@@ -1682,11 +1658,7 @@ async fn handle_connect_from_buffer(
client_read_counter.load(Ordering::Relaxed) + target_write_counter.load(Ordering::Relaxed);
let final_recv =
target_read_counter.load(Ordering::Relaxed) + client_write_counter.load(Ordering::Relaxed);
log::error!(
"DEBUG: Tunnel closed - sent: {} bytes, received: {} bytes",
final_sent,
final_recv
);
log::trace!("Tunnel closed - sent: {final_sent} bytes, received: {final_recv} bytes");
// Update domain-specific byte counts now that tunnel is complete
if let Some(tracker) = get_traffic_tracker() {