feat: add network overview

This commit is contained in:
zhom
2025-11-30 15:04:48 +04:00
parent 01b3109dc1
commit cdba9aac33
20 changed files with 2416 additions and 48 deletions
+137 -3
View File
@@ -1,4 +1,5 @@
use crate::proxy_storage::ProxyConfig;
use crate::traffic_stats::{get_traffic_tracker, init_traffic_tracker};
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper::server::conn::http1;
@@ -9,12 +10,79 @@ use std::convert::Infallible;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use url::Url;
/// Wrapper stream that counts bytes read and written
struct CountingStream<S> {
inner: S,
bytes_read: Arc<AtomicU64>,
bytes_written: Arc<AtomicU64>,
}
impl<S> CountingStream<S> {
fn new(inner: S) -> Self {
Self {
inner,
bytes_read: Arc::new(AtomicU64::new(0)),
bytes_written: Arc::new(AtomicU64::new(0)),
}
}
}
impl<S: AsyncRead + Unpin> AsyncRead for CountingStream<S> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let filled_before = buf.filled().len();
let result = Pin::new(&mut self.inner).poll_read(cx, buf);
if let Poll::Ready(Ok(())) = &result {
let bytes_read = buf.filled().len() - filled_before;
self
.bytes_read
.fetch_add(bytes_read as u64, Ordering::Relaxed);
// Update global tracker
if let Some(tracker) = get_traffic_tracker() {
tracker.add_bytes_received(bytes_read as u64);
}
}
result
}
}
impl<S: AsyncWrite + Unpin> AsyncWrite for CountingStream<S> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let result = Pin::new(&mut self.inner).poll_write(cx, buf);
if let Poll::Ready(Ok(n)) = &result {
self.bytes_written.fetch_add(*n as u64, Ordering::Relaxed);
// Update global tracker
if let Some(tracker) = get_traffic_tracker() {
tracker.add_bytes_sent(*n as u64);
}
}
result
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}
// Wrapper to prepend consumed bytes to a stream
struct PrependReader {
prepended: Vec<u8>,
@@ -297,6 +365,13 @@ async fn handle_http(
// This is faster and more reliable than trying to use hyper-proxy with version conflicts
use reqwest::Client;
// Extract domain for traffic tracking
let domain = req
.uri()
.host()
.map(|h| h.to_string())
.unwrap_or_else(|| "unknown".to_string());
let client_builder = Client::builder();
let client = if let Some(ref upstream) = upstream_url {
if upstream == "DIRECT" {
@@ -370,6 +445,12 @@ async fn handle_http(
let headers = response.headers().clone();
let body = response.bytes().await.unwrap_or_default();
// Record request in traffic tracker
let response_size = body.len() as u64;
if let Some(tracker) = get_traffic_tracker() {
tracker.record_request(&domain, body_bytes.len() as u64, response_size);
}
let mut hyper_response = Response::new(Full::new(body));
*hyper_response.status_mut() = StatusCode::from_u16(status.as_u16()).unwrap();
@@ -449,6 +530,14 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
log::error!("Starting proxy server for config id: {}", config.id);
// Initialize traffic tracker with profile ID if available
init_traffic_tracker(config.id.clone(), config.profile_id.clone());
log::error!(
"Traffic tracker initialized for proxy: {} (profile_id: {:?})",
config.id,
config.profile_id
);
// Determine the bind address
let bind_addr = SocketAddr::from(([127, 0, 0, 1], config.local_port.unwrap_or(0)));
@@ -488,6 +577,19 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
);
log::error!("Proxy server entering accept loop - process should stay alive");
// Start a background task to periodically flush traffic stats to disk
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
if let Some(tracker) = get_traffic_tracker() {
if let Err(e) = tracker.flush_to_disk() {
log::error!("Failed to flush traffic stats: {}", e);
}
}
}
});
// Keep the runtime alive with an infinite loop
// This ensures the process doesn't exit even if there are no active connections
loop {
@@ -605,6 +707,12 @@ async fn handle_connect_from_buffer(
(target, 443)
};
// Record domain access in traffic tracker
let domain = target_host.to_string();
if let Some(tracker) = get_traffic_tracker() {
tracker.record_request(&domain, 0, 0);
}
// Connect to target (directly or via upstream proxy)
let target_stream = if upstream_url.is_none()
|| upstream_url
@@ -693,10 +801,20 @@ async fn handle_connect_from_buffer(
log::error!("DEBUG: Sent 200 Connection Established response, starting tunnel");
// Now tunnel data bidirectionally
// Now tunnel data bidirectionally with counting
// Wrap streams to count bytes transferred
let counting_client = CountingStream::new(client_stream);
let counting_target = CountingStream::new(target_stream);
// Get references for final stats
let client_read_counter = counting_client.bytes_read.clone();
let client_write_counter = counting_client.bytes_written.clone();
let target_read_counter = counting_target.bytes_read.clone();
let target_write_counter = counting_target.bytes_written.clone();
// Split streams for bidirectional copying
let (mut client_read, mut client_write) = tokio::io::split(client_stream);
let (mut target_read, mut target_write) = tokio::io::split(target_stream);
let (mut client_read, mut client_write) = tokio::io::split(counting_client);
let (mut target_read, mut target_write) = tokio::io::split(counting_target);
log::error!("DEBUG: Starting bidirectional tunnel");
@@ -735,5 +853,21 @@ async fn handle_connect_from_buffer(
}
}
// Log final byte counts and update domain stats
let final_sent =
client_read_counter.load(Ordering::Relaxed) + target_write_counter.load(Ordering::Relaxed);
let final_recv =
target_read_counter.load(Ordering::Relaxed) + client_write_counter.load(Ordering::Relaxed);
log::error!(
"DEBUG: Tunnel closed - sent: {} bytes, received: {} bytes",
final_sent,
final_recv
);
// Update domain-specific byte counts now that tunnel is complete
if let Some(tracker) = get_traffic_tracker() {
tracker.update_domain_bytes(&domain, final_sent, final_recv);
}
Ok(())
}