diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 035e3e7..0d2551f 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -249,12 +249,8 @@ async fn is_geoip_database_available() -> Result { #[tauri::command] async fn get_all_traffic_snapshots() -> Result, String> { - Ok( - crate::traffic_stats::list_traffic_stats() - .into_iter() - .map(|s| s.to_snapshot()) - .collect(), - ) + // Use real-time snapshots that merge in-memory data with disk data + Ok(crate::traffic_stats::get_all_traffic_snapshots_realtime()) } #[tauri::command] diff --git a/src-tauri/src/proxy_server.rs b/src-tauri/src/proxy_server.rs index 2f15b5f..6e3ce4a 100644 --- a/src-tauri/src/proxy_server.rs +++ b/src-tauri/src/proxy_server.rs @@ -599,37 +599,79 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box 0 || requests > 0; - // Always flush if we have traffic, or if bytes changed, or if it's been less than 30s since activity - // This ensures traffic is always persisted, even during active periods - let should_flush = - has_traffic || bytes_changed || time_since_activity < std::time::Duration::from_secs(30); + // Determine flush frequency based on activity + // When active: flush every 5 seconds + // When idle: flush every 30 seconds + let desired_interval_secs = + if has_traffic || time_since_activity < std::time::Duration::from_secs(30) { + 5u64 + } else { + 30u64 + }; + + // Update interval if needed + if desired_interval_secs != current_interval_secs { + current_interval_secs = desired_interval_secs; + interval = tokio::time::interval(tokio::time::Duration::from_secs(desired_interval_secs)); + } + + // Only flush if enough time has passed since last flush + let flush_interval = std::time::Duration::from_secs(desired_interval_secs); + let should_flush = time_since_flush >= flush_interval; if should_flush { - if let Err(e) = tracker.flush_to_disk() { - log::error!("Failed to flush traffic stats: {}", e); - } else { - // Update tracking state after successful flush - if has_traffic || bytes_changed { - last_activity_time = std::time::Instant::now(); + match tracker.flush_to_disk() { + Ok(Some((sent, recv))) => { + // Successful flush with data + last_flush_time = std::time::Instant::now(); + if sent > 0 || recv > 0 { + last_activity_time = std::time::Instant::now(); + } + } + Ok(None) => { + // No data to flush - this is normal + last_flush_time = std::time::Instant::now(); + } + Err(e) => { + log::error!("Failed to flush traffic stats: {}", e); + // Don't update flush time on error - retry sooner } - // After flush, bytes are reset to 0, so update last_byte_count - last_byte_count = 0; } } } diff --git a/src-tauri/src/traffic_stats.rs b/src-tauri/src/traffic_stats.rs index e9ceb36..bda48be 100644 --- a/src-tauri/src/traffic_stats.rs +++ b/src-tauri/src/traffic_stats.rs @@ -175,6 +175,37 @@ impl TrafficStats { }); } + /// Prune old data to prevent unbounded growth + /// Keeps only the last 7 days of bandwidth history and domain access history + pub fn prune_old_data(&mut self) { + const RETENTION_SECONDS: u64 = 7 * 24 * 60 * 60; // 7 days + let now = current_timestamp(); + let cutoff = now.saturating_sub(RETENTION_SECONDS); + + // Prune bandwidth history + self.bandwidth_history.retain(|dp| dp.timestamp >= cutoff); + + // Prune domain access history + self + .domain_access_history + .retain(|dp| dp.timestamp >= cutoff); + + // Remove domains that haven't been accessed recently and have no recent history + let recent_domains: std::collections::HashSet = self + .domain_access_history + .iter() + .filter(|dp| dp.timestamp >= cutoff) + .map(|dp| dp.domain.clone()) + .collect(); + + // Keep domains that were accessed recently OR have high total traffic + self.domains.retain(|domain, access| { + recent_domains.contains(domain) + || access.last_access >= cutoff + || (access.bytes_sent + access.bytes_received) > 1_000_000 // Keep domains with >1MB traffic + }); + } + /// Record a request to a domain pub fn record_request(&mut self, domain: &str, bytes_sent: u64, bytes_received: u64) { let now = current_timestamp(); @@ -235,6 +266,61 @@ fn current_timestamp() -> u64 { .as_secs() } +/// File lock guard for preventing concurrent writes +struct FileLockGuard { + _file: std::fs::File, +} + +/// Acquire a file lock for exclusive access +/// On Unix, uses flock; on Windows, uses file handles +fn acquire_file_lock(lock_path: &PathBuf) -> Result> { + use std::fs::OpenOptions; + + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(false) + .open(lock_path)?; + + #[cfg(unix)] + { + use std::os::unix::io::AsRawFd; + let fd = file.as_raw_fd(); + unsafe { + if libc::flock(fd, libc::LOCK_EX | libc::LOCK_NB) != 0 { + return Err("Failed to acquire file lock".into()); + } + } + } + + #[cfg(windows)] + { + use std::os::windows::io::AsRawHandle; + use windows::Win32::Storage::FileSystem::LockFileEx; + use windows::Win32::Storage::FileSystem::LOCKFILE_EXCLUSIVE_LOCK; + use windows::Win32::Storage::FileSystem::LOCKFILE_FAIL_IMMEDIATELY; + + let handle = file.as_raw_handle(); + unsafe { + let mut overlapped = std::mem::zeroed(); + if LockFileEx( + handle, + LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY, + 0, + u32::MAX, + u32::MAX, + &mut overlapped, + ) + .is_err() + { + return Err("Failed to acquire file lock".into()); + } + } + } + + Ok(FileLockGuard { _file: file }) +} + /// Get the traffic stats storage directory pub fn get_traffic_stats_dir() -> PathBuf { let base_dirs = BaseDirs::new().expect("Failed to get base directories"); @@ -432,6 +518,17 @@ pub fn clear_all_traffic_stats() -> Result<(), Box> { Ok(()) } +/// Lightweight session snapshot for real-time updates (written frequently, separate from full stats) +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SessionSnapshot { + proxy_id: String, + profile_id: Option, + timestamp: u64, + bytes_sent: u64, + bytes_received: u64, + requests: u64, +} + /// Live bandwidth tracker for real-time stats collection in the proxy /// This is designed to be used from within the proxy server pub struct LiveTrafficTracker { @@ -444,6 +541,7 @@ pub struct LiveTrafficTracker { ips: RwLock>, #[allow(dead_code)] session_start: u64, + last_session_write: std::sync::atomic::AtomicU64, } impl LiveTrafficTracker { @@ -457,9 +555,46 @@ impl LiveTrafficTracker { domain_stats: RwLock::new(HashMap::new()), ips: RwLock::new(Vec::new()), session_start: current_timestamp(), + last_session_write: std::sync::atomic::AtomicU64::new(0), } } + /// Write a lightweight session snapshot for real-time updates + /// This is much smaller than full stats and can be written frequently + pub fn write_session_snapshot(&self) -> Result<(), Box> { + let now = current_timestamp(); + let last_write = self.last_session_write.load(Ordering::Relaxed); + + // Only write if at least 1 second has passed (avoid excessive writes) + if now.saturating_sub(last_write) < 1 { + return Ok(()); + } + + let snapshot = SessionSnapshot { + proxy_id: self.proxy_id.clone(), + profile_id: self.profile_id.clone(), + timestamp: now, + bytes_sent: self.bytes_sent.load(Ordering::Relaxed), + bytes_received: self.bytes_received.load(Ordering::Relaxed), + requests: self.requests.load(Ordering::Relaxed), + }; + + let storage_key = self + .profile_id + .clone() + .unwrap_or_else(|| self.proxy_id.clone()); + let session_file = get_traffic_stats_dir().join(format!("{}.session.json", storage_key)); + + // Write atomically using a temp file + let temp_file = session_file.with_extension("tmp"); + let content = serde_json::to_string(&snapshot)?; + fs::write(&temp_file, content)?; + fs::rename(&temp_file, &session_file)?; + + self.last_session_write.store(now, Ordering::Relaxed); + Ok(()) + } + pub fn add_bytes_sent(&self, bytes: u64) { self.bytes_sent.fetch_add(bytes, Ordering::Relaxed); } @@ -509,10 +644,120 @@ impl LiveTrafficTracker { ) } + /// Create a real-time snapshot that merges in-memory data with disk-stored data + /// This provides near real-time updates without waiting for disk flush + pub fn to_realtime_snapshot(&self) -> TrafficSnapshot { + let now = current_timestamp(); + let cutoff = now.saturating_sub(60); // Last 60 seconds for mini chart + + // Get in-memory counters (not yet flushed to disk) + let in_memory_sent = self.bytes_sent.load(Ordering::Relaxed); + let in_memory_recv = self.bytes_received.load(Ordering::Relaxed); + let in_memory_requests = self.requests.load(Ordering::Relaxed); + + // Load disk-stored stats + let storage_key = self + .profile_id + .clone() + .unwrap_or_else(|| self.proxy_id.clone()); + let disk_stats = load_traffic_stats(&storage_key); + + if let Some(stats) = disk_stats { + // Merge in-memory data with disk data + let total_sent = stats.total_bytes_sent + in_memory_sent; + let total_recv = stats.total_bytes_received + in_memory_recv; + let total_requests = stats.total_requests + in_memory_requests; + + // Get current bandwidth from in-memory counters (most recent) + // For the chart, we'll use disk data + current in-memory data point + let mut recent_bandwidth = stats + .bandwidth_history + .iter() + .filter(|dp| dp.timestamp >= cutoff) + .cloned() + .collect::>(); + + // Add current second's data if we have in-memory traffic + if in_memory_sent > 0 || in_memory_recv > 0 { + // Check if we already have a data point for this second + if let Some(last) = recent_bandwidth.last_mut() { + if last.timestamp == now { + last.bytes_sent += in_memory_sent; + last.bytes_received += in_memory_recv; + } else { + recent_bandwidth.push(BandwidthDataPoint { + timestamp: now, + bytes_sent: in_memory_sent, + bytes_received: in_memory_recv, + }); + } + } else { + recent_bandwidth.push(BandwidthDataPoint { + timestamp: now, + bytes_sent: in_memory_sent, + bytes_received: in_memory_recv, + }); + } + } + + TrafficSnapshot { + profile_id: self.profile_id.clone(), + session_start: stats.session_start, + last_update: now, + total_bytes_sent: total_sent, + total_bytes_received: total_recv, + total_requests, + current_bytes_sent: in_memory_sent, + current_bytes_received: in_memory_recv, + recent_bandwidth, + } + } else { + // No disk data yet, use only in-memory data + let recent_bandwidth = if in_memory_sent > 0 || in_memory_recv > 0 { + vec![BandwidthDataPoint { + timestamp: now, + bytes_sent: in_memory_sent, + bytes_received: in_memory_recv, + }] + } else { + Vec::new() + }; + + TrafficSnapshot { + profile_id: self.profile_id.clone(), + session_start: self.session_start, + last_update: now, + total_bytes_sent: in_memory_sent, + total_bytes_received: in_memory_recv, + total_requests: in_memory_requests, + current_bytes_sent: in_memory_sent, + current_bytes_received: in_memory_recv, + recent_bandwidth, + } + } + } + /// Flush current stats to disk and return the delta - pub fn flush_to_disk(&self) -> Result<(u64, u64), Box> { - let bytes_sent = self.bytes_sent.swap(0, Ordering::Relaxed); - let bytes_received = self.bytes_received.swap(0, Ordering::Relaxed); + /// Returns None if there's no new data to flush + pub fn flush_to_disk(&self) -> Result, Box> { + let bytes_sent = self.bytes_sent.load(Ordering::Relaxed); + let bytes_received = self.bytes_received.load(Ordering::Relaxed); + + // Check if there's any new data to flush + let has_domain_updates = { + let domain_map = self.domain_stats.read().ok(); + domain_map.is_some_and(|dm| !dm.is_empty()) + }; + + let has_ip_updates = { + let ips = self.ips.read().ok(); + ips.is_some_and(|i| !i.is_empty()) + }; + + // Only flush if there's meaningful new data (bytes or domain/IP updates) + if bytes_sent == 0 && bytes_received == 0 && !has_domain_updates && !has_ip_updates { + return Ok(None); + } // Use profile_id as storage key if available, otherwise fall back to proxy_id let storage_key = self @@ -520,6 +765,10 @@ impl LiveTrafficTracker { .clone() .unwrap_or_else(|| self.proxy_id.clone()); + // Use file locking to prevent concurrent writes from multiple proxy processes + let lock_path = get_traffic_stats_dir().join(format!("{}.lock", storage_key)); + let _lock = acquire_file_lock(&lock_path)?; + // Load or create stats using the storage key let mut stats = load_traffic_stats(&storage_key) .unwrap_or_else(|| TrafficStats::new(self.proxy_id.clone(), self.profile_id.clone())); @@ -532,8 +781,15 @@ impl LiveTrafficTracker { // Update the proxy_id to current session (for debugging/tracking) stats.proxy_id = self.proxy_id.clone(); + // Prune old data before adding new data to keep file size manageable + stats.prune_old_data(); + + // Reset counters after reading + let sent = self.bytes_sent.swap(0, Ordering::Relaxed); + let received = self.bytes_received.swap(0, Ordering::Relaxed); + // Update bandwidth history - stats.record_bandwidth(bytes_sent, bytes_received); + stats.record_bandwidth(sent, received); // Update domain stats if let Ok(mut domain_map) = self.domain_stats.write() { @@ -551,10 +807,10 @@ impl LiveTrafficTracker { } } - // Save to disk + // Save to disk (lock is still held) save_traffic_stats(&stats)?; - Ok((bytes_sent, bytes_received)) + Ok(Some((sent, received))) } } @@ -601,11 +857,36 @@ pub struct FilteredTrafficStats { /// Get traffic stats for a profile, filtered to a specific time period /// seconds: number of seconds to include (0 = all time) +/// Merges in-memory data with disk data for real-time updates pub fn get_traffic_stats_for_period( profile_id: &str, seconds: u64, ) -> Option { - let stats = load_traffic_stats(profile_id)?; + // Get in-memory data if available + let in_memory_sent = get_traffic_tracker() + .and_then(|t| { + if t.profile_id.as_deref() == Some(profile_id) { + Some(t.bytes_sent.load(Ordering::Relaxed)) + } else { + None + } + }) + .unwrap_or(0); + let in_memory_recv = get_traffic_tracker() + .and_then(|t| { + if t.profile_id.as_deref() == Some(profile_id) { + Some(t.bytes_received.load(Ordering::Relaxed)) + } else { + None + } + }) + .unwrap_or(0); + + let mut stats = load_traffic_stats(profile_id)?; + + // Merge in-memory counters with disk data for real-time totals + stats.total_bytes_sent += in_memory_sent; + stats.total_bytes_received += in_memory_recv; let now = current_timestamp(); let cutoff = if seconds == 0 { @@ -615,14 +896,39 @@ pub fn get_traffic_stats_for_period( }; // Filter bandwidth history to requested period - let filtered_history: Vec = stats + let mut filtered_history: Vec = stats .bandwidth_history .iter() .filter(|dp| dp.timestamp >= cutoff) .cloned() .collect(); - // Calculate period totals for bandwidth + // Add current in-memory data point for real-time display + if (seconds == 0 || now.saturating_sub(seconds) <= now) + && (in_memory_sent > 0 || in_memory_recv > 0) + { + // Check if we already have a data point for this second + if let Some(last) = filtered_history.last_mut() { + if last.timestamp == now { + last.bytes_sent += in_memory_sent; + last.bytes_received += in_memory_recv; + } else { + filtered_history.push(BandwidthDataPoint { + timestamp: now, + bytes_sent: in_memory_sent, + bytes_received: in_memory_recv, + }); + } + } else { + filtered_history.push(BandwidthDataPoint { + timestamp: now, + bytes_sent: in_memory_sent, + bytes_received: in_memory_recv, + }); + } + } + + // Calculate period totals for bandwidth (includes in-memory data) let period_bytes_sent: u64 = filtered_history.iter().map(|dp| dp.bytes_sent).sum(); let period_bytes_received: u64 = filtered_history.iter().map(|dp| dp.bytes_received).sum(); @@ -664,7 +970,7 @@ pub fn get_traffic_stats_for_period( Some(FilteredTrafficStats { profile_id: stats.profile_id, session_start: stats.session_start, - last_update: stats.last_update, + last_update: now, // Use current time for real-time updates total_bytes_sent: stats.total_bytes_sent, total_bytes_received: stats.total_bytes_received, total_requests: stats.total_requests, @@ -678,11 +984,102 @@ pub fn get_traffic_stats_for_period( } /// Get lightweight traffic snapshot for a profile (for mini charts, only recent 60 seconds) +/// Merges in-memory data with disk data for real-time updates pub fn get_traffic_snapshot_for_profile(profile_id: &str) -> Option { + // First try to get real-time data from active tracker + if let Some(tracker) = get_traffic_tracker() { + let tracker_profile_id = tracker.profile_id.as_deref(); + if tracker_profile_id == Some(profile_id) { + return Some(tracker.to_realtime_snapshot()); + } + } + + // Fall back to disk data let stats = load_traffic_stats(profile_id)?; Some(stats.to_snapshot()) } +/// Load session snapshot from disk (written by proxy worker processes) +fn load_session_snapshot(profile_id: &str) -> Option { + let session_file = get_traffic_stats_dir().join(format!("{}.session.json", profile_id)); + if !session_file.exists() { + return None; + } + + let content = fs::read_to_string(&session_file).ok()?; + serde_json::from_str::(&content).ok() +} + +/// Get all traffic snapshots with real-time data merged +/// This provides near real-time updates by merging session snapshots with disk data +pub fn get_all_traffic_snapshots_realtime() -> Vec { + use std::collections::HashMap; + + // Start with disk-stored stats + let mut snapshots: HashMap = list_traffic_stats() + .into_iter() + .map(|s| { + let key = s.profile_id.clone().unwrap_or_else(|| s.proxy_id.clone()); + (key, s.to_snapshot()) + }) + .collect(); + + // Try to merge in real-time data from active tracker (if in same process) + if let Some(tracker) = get_traffic_tracker() { + let key = tracker + .profile_id + .clone() + .unwrap_or_else(|| tracker.proxy_id.clone()); + let realtime_snapshot = tracker.to_realtime_snapshot(); + snapshots.insert(key, realtime_snapshot); + } + + // Also merge session snapshots from proxy worker processes + let storage_dir = get_traffic_stats_dir(); + if let Ok(entries) = fs::read_dir(&storage_dir) { + for entry in entries.flatten() { + let path = entry.path(); + if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) { + if file_name.ends_with(".session.json") { + if let Some(profile_id) = file_name.strip_suffix(".session.json") { + if let Some(session) = load_session_snapshot(profile_id) { + // Merge session data with disk snapshot + if let Some(snapshot) = snapshots.get_mut(profile_id) { + // Update totals with session data + snapshot.total_bytes_sent = snapshot.total_bytes_sent.max(session.bytes_sent); + snapshot.total_bytes_received = + snapshot.total_bytes_received.max(session.bytes_received); + snapshot.total_requests = snapshot.total_requests.max(session.requests); + snapshot.current_bytes_sent = session.bytes_sent; + snapshot.current_bytes_received = session.bytes_received; + snapshot.last_update = session.timestamp; + } else { + // Create new snapshot from session data + snapshots.insert( + profile_id.to_string(), + TrafficSnapshot { + profile_id: session.profile_id, + session_start: current_timestamp().saturating_sub(60), + last_update: session.timestamp, + total_bytes_sent: session.bytes_sent, + total_bytes_received: session.bytes_received, + total_requests: session.requests, + current_bytes_sent: session.bytes_sent, + current_bytes_received: session.bytes_received, + recent_bandwidth: vec![], + }, + ); + } + } + } + } + } + } + } + + snapshots.into_values().collect() +} + #[cfg(test)] mod tests { use super::*;