refactor: cleanup

This commit is contained in:
zhom
2026-05-05 22:33:43 +04:00
parent 904dda2bad
commit 34450ad06b
29 changed files with 1312 additions and 369 deletions
+83 -41
View File
@@ -16,7 +16,6 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::net::TcpListener;
use tokio::net::TcpStream;
/// Combined read+write trait for tunnel target streams, allowing
@@ -1232,8 +1231,49 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
log::error!("Attempting to bind proxy server to {}", bind_addr);
// Bind to the port
let listener = TcpListener::bind(bind_addr).await?;
// Bind to the port. Use SO_REUSEADDR so that a freshly-restarted worker
// can bind a port that the previous worker left in TIME_WAIT, and retry
// briefly to absorb transient races with the OS releasing the socket.
let listener = {
let mut attempts: u32 = 0;
loop {
let socket = tokio::net::TcpSocket::new_v4()?;
let _ = socket.set_reuseaddr(true);
match socket.bind(bind_addr) {
Ok(()) => match socket.listen(1024) {
Ok(l) => break l,
Err(e) if attempts < 5 => {
attempts += 1;
let delay = std::time::Duration::from_millis(200 * u64::from(attempts));
log::warn!(
"listen() on {} failed (attempt {}/5): {}, retrying in {}ms",
bind_addr,
attempts,
e,
delay.as_millis()
);
tokio::time::sleep(delay).await;
}
Err(e) => {
return Err(format!("Failed to listen on {bind_addr} after 5 attempts: {e}").into())
}
},
Err(e) if attempts < 5 => {
attempts += 1;
let delay = std::time::Duration::from_millis(200 * u64::from(attempts));
log::warn!(
"bind() on {} failed (attempt {}/5): {}, retrying in {}ms",
bind_addr,
attempts,
e,
delay.as_millis()
);
tokio::time::sleep(delay).await;
}
Err(e) => return Err(format!("Failed to bind {bind_addr} after 5 attempts: {e}").into()),
}
}
};
let actual_port = listener.local_addr()?.port();
log::error!("Successfully bound to port {}", actual_port);
@@ -1295,52 +1335,54 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
loop {
interval.tick().await;
if let Some(tracker) = get_traffic_tracker() {
let (sent, recv, requests) = tracker.get_snapshot();
let current_bytes = sent + recv;
let time_since_activity = last_activity_time.elapsed();
let time_since_flush = last_flush_time.elapsed();
let has_traffic = current_bytes > 0 || requests > 0;
// Catch panics so a poisoned lock or unexpected error inside
// flush_to_disk doesn't abort the flush task and leave stats
// unwritten for the lifetime of the worker. The captured state
// is all Copy or atomic-assignment, so AssertUnwindSafe is sound.
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
if let Some(tracker) = get_traffic_tracker() {
let (sent, recv, requests) = tracker.get_snapshot();
let current_bytes = sent + recv;
let time_since_activity = last_activity_time.elapsed();
let time_since_flush = last_flush_time.elapsed();
let has_traffic = current_bytes > 0 || requests > 0;
// Determine flush frequency based on activity
// When active: flush every 5 seconds
// When idle: flush every 30 seconds
let desired_interval_secs =
if has_traffic || time_since_activity < std::time::Duration::from_secs(30) {
5u64
} else {
30u64
};
let desired_interval_secs =
if has_traffic || time_since_activity < std::time::Duration::from_secs(30) {
5u64
} else {
30u64
};
// Update interval if needed
if desired_interval_secs != current_interval_secs {
current_interval_secs = desired_interval_secs;
interval = tokio::time::interval(tokio::time::Duration::from_secs(desired_interval_secs));
}
if desired_interval_secs != current_interval_secs {
current_interval_secs = desired_interval_secs;
interval =
tokio::time::interval(tokio::time::Duration::from_secs(desired_interval_secs));
}
// Only flush if enough time has passed since last flush
let flush_interval = std::time::Duration::from_secs(desired_interval_secs);
let should_flush = time_since_flush >= flush_interval;
let flush_interval = std::time::Duration::from_secs(desired_interval_secs);
let should_flush = time_since_flush >= flush_interval;
if should_flush {
match tracker.flush_to_disk() {
Ok(Some((sent, recv))) => {
// Successful flush with data
last_flush_time = std::time::Instant::now();
if sent > 0 || recv > 0 {
last_activity_time = std::time::Instant::now();
if should_flush {
match tracker.flush_to_disk() {
Ok(Some((sent, recv))) => {
last_flush_time = std::time::Instant::now();
if sent > 0 || recv > 0 {
last_activity_time = std::time::Instant::now();
}
}
Ok(None) => {
last_flush_time = std::time::Instant::now();
}
Err(e) => {
log::error!("Failed to flush traffic stats: {}", e);
}
}
Ok(None) => {
// No data to flush - this is normal
last_flush_time = std::time::Instant::now();
}
Err(e) => {
log::error!("Failed to flush traffic stats: {}", e);
// Don't update flush time on error - retry sooner
}
}
}
}));
if let Err(panic) = result {
log::error!("Panic caught in proxy traffic flush task; continuing: {panic:?}");
}
}
});