refactor: reduce disk usage for proxy data sharing

This commit is contained in:
zhom
2025-12-11 22:26:00 +04:00
parent 55974d17be
commit cc74589243
3 changed files with 466 additions and 31 deletions
+2 -6
View File
@@ -249,12 +249,8 @@ async fn is_geoip_database_available() -> Result<bool, String> {
#[tauri::command]
async fn get_all_traffic_snapshots() -> Result<Vec<crate::traffic_stats::TrafficSnapshot>, String> {
Ok(
crate::traffic_stats::list_traffic_stats()
.into_iter()
.map(|s| s.to_snapshot())
.collect(),
)
// Use real-time snapshots that merge in-memory data with disk data
Ok(crate::traffic_stats::get_all_traffic_snapshots_realtime())
}
#[tauri::command]
+57 -15
View File
@@ -599,37 +599,79 @@ pub async fn run_proxy_server(config: ProxyConfig) -> Result<(), Box<dyn std::er
);
log::error!("Proxy server entering accept loop - process should stay alive");
// Start a background task to write lightweight session snapshots for real-time updates
// These are much smaller than full stats and can be written frequently (~100 bytes every 2 seconds)
if let Some(tracker) = get_traffic_tracker() {
let tracker_clone = tracker.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(2));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
// Write lightweight session snapshot (only current counters, ~100 bytes)
if let Err(e) = tracker_clone.write_session_snapshot() {
log::debug!("Failed to write session snapshot: {}", e);
}
}
});
}
// Start a background task to periodically flush traffic stats to disk
// Use adaptive flush frequency: every 5 seconds when active, every 30 seconds when idle
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut last_activity_time = std::time::Instant::now();
let mut last_byte_count = 0u64;
let mut last_flush_time = std::time::Instant::now();
let mut current_interval_secs = 5u64;
loop {
interval.tick().await;
if let Some(tracker) = get_traffic_tracker() {
let (sent, recv, requests) = tracker.get_snapshot();
let current_bytes = sent + recv;
let bytes_changed = current_bytes != last_byte_count;
let time_since_activity = last_activity_time.elapsed();
let time_since_flush = last_flush_time.elapsed();
let has_traffic = current_bytes > 0 || requests > 0;
// Always flush if we have traffic, or if bytes changed, or if it's been less than 30s since activity
// This ensures traffic is always persisted, even during active periods
let should_flush =
has_traffic || bytes_changed || time_since_activity < std::time::Duration::from_secs(30);
// Determine flush frequency based on activity
// When active: flush every 5 seconds
// When idle: flush every 30 seconds
let desired_interval_secs =
if has_traffic || time_since_activity < std::time::Duration::from_secs(30) {
5u64
} else {
30u64
};
// Update interval if needed
if desired_interval_secs != current_interval_secs {
current_interval_secs = desired_interval_secs;
interval = tokio::time::interval(tokio::time::Duration::from_secs(desired_interval_secs));
}
// Only flush if enough time has passed since last flush
let flush_interval = std::time::Duration::from_secs(desired_interval_secs);
let should_flush = time_since_flush >= flush_interval;
if should_flush {
if let Err(e) = tracker.flush_to_disk() {
log::error!("Failed to flush traffic stats: {}", e);
} else {
// Update tracking state after successful flush
if has_traffic || bytes_changed {
last_activity_time = std::time::Instant::now();
match tracker.flush_to_disk() {
Ok(Some((sent, recv))) => {
// Successful flush with data
last_flush_time = std::time::Instant::now();
if sent > 0 || recv > 0 {
last_activity_time = std::time::Instant::now();
}
}
Ok(None) => {
// No data to flush - this is normal
last_flush_time = std::time::Instant::now();
}
Err(e) => {
log::error!("Failed to flush traffic stats: {}", e);
// Don't update flush time on error - retry sooner
}
// After flush, bytes are reset to 0, so update last_byte_count
last_byte_count = 0;
}
}
}
+407 -10
View File
@@ -175,6 +175,37 @@ impl TrafficStats {
});
}
/// Prune old data to prevent unbounded growth
/// Keeps only the last 7 days of bandwidth history and domain access history
pub fn prune_old_data(&mut self) {
const RETENTION_SECONDS: u64 = 7 * 24 * 60 * 60; // 7 days
let now = current_timestamp();
let cutoff = now.saturating_sub(RETENTION_SECONDS);
// Prune bandwidth history
self.bandwidth_history.retain(|dp| dp.timestamp >= cutoff);
// Prune domain access history
self
.domain_access_history
.retain(|dp| dp.timestamp >= cutoff);
// Remove domains that haven't been accessed recently and have no recent history
let recent_domains: std::collections::HashSet<String> = self
.domain_access_history
.iter()
.filter(|dp| dp.timestamp >= cutoff)
.map(|dp| dp.domain.clone())
.collect();
// Keep domains that were accessed recently OR have high total traffic
self.domains.retain(|domain, access| {
recent_domains.contains(domain)
|| access.last_access >= cutoff
|| (access.bytes_sent + access.bytes_received) > 1_000_000 // Keep domains with >1MB traffic
});
}
/// Record a request to a domain
pub fn record_request(&mut self, domain: &str, bytes_sent: u64, bytes_received: u64) {
let now = current_timestamp();
@@ -235,6 +266,61 @@ fn current_timestamp() -> u64 {
.as_secs()
}
/// File lock guard for preventing concurrent writes
struct FileLockGuard {
_file: std::fs::File,
}
/// Acquire a file lock for exclusive access
/// On Unix, uses flock; on Windows, uses file handles
fn acquire_file_lock(lock_path: &PathBuf) -> Result<FileLockGuard, Box<dyn std::error::Error>> {
use std::fs::OpenOptions;
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(lock_path)?;
#[cfg(unix)]
{
use std::os::unix::io::AsRawFd;
let fd = file.as_raw_fd();
unsafe {
if libc::flock(fd, libc::LOCK_EX | libc::LOCK_NB) != 0 {
return Err("Failed to acquire file lock".into());
}
}
}
#[cfg(windows)]
{
use std::os::windows::io::AsRawHandle;
use windows::Win32::Storage::FileSystem::LockFileEx;
use windows::Win32::Storage::FileSystem::LOCKFILE_EXCLUSIVE_LOCK;
use windows::Win32::Storage::FileSystem::LOCKFILE_FAIL_IMMEDIATELY;
let handle = file.as_raw_handle();
unsafe {
let mut overlapped = std::mem::zeroed();
if LockFileEx(
handle,
LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
0,
u32::MAX,
u32::MAX,
&mut overlapped,
)
.is_err()
{
return Err("Failed to acquire file lock".into());
}
}
}
Ok(FileLockGuard { _file: file })
}
/// Get the traffic stats storage directory
pub fn get_traffic_stats_dir() -> PathBuf {
let base_dirs = BaseDirs::new().expect("Failed to get base directories");
@@ -432,6 +518,17 @@ pub fn clear_all_traffic_stats() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
/// Lightweight session snapshot for real-time updates (written frequently, separate from full stats)
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SessionSnapshot {
proxy_id: String,
profile_id: Option<String>,
timestamp: u64,
bytes_sent: u64,
bytes_received: u64,
requests: u64,
}
/// Live bandwidth tracker for real-time stats collection in the proxy
/// This is designed to be used from within the proxy server
pub struct LiveTrafficTracker {
@@ -444,6 +541,7 @@ pub struct LiveTrafficTracker {
ips: RwLock<Vec<String>>,
#[allow(dead_code)]
session_start: u64,
last_session_write: std::sync::atomic::AtomicU64,
}
impl LiveTrafficTracker {
@@ -457,9 +555,46 @@ impl LiveTrafficTracker {
domain_stats: RwLock::new(HashMap::new()),
ips: RwLock::new(Vec::new()),
session_start: current_timestamp(),
last_session_write: std::sync::atomic::AtomicU64::new(0),
}
}
/// Write a lightweight session snapshot for real-time updates
/// This is much smaller than full stats and can be written frequently
pub fn write_session_snapshot(&self) -> Result<(), Box<dyn std::error::Error>> {
let now = current_timestamp();
let last_write = self.last_session_write.load(Ordering::Relaxed);
// Only write if at least 1 second has passed (avoid excessive writes)
if now.saturating_sub(last_write) < 1 {
return Ok(());
}
let snapshot = SessionSnapshot {
proxy_id: self.proxy_id.clone(),
profile_id: self.profile_id.clone(),
timestamp: now,
bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
bytes_received: self.bytes_received.load(Ordering::Relaxed),
requests: self.requests.load(Ordering::Relaxed),
};
let storage_key = self
.profile_id
.clone()
.unwrap_or_else(|| self.proxy_id.clone());
let session_file = get_traffic_stats_dir().join(format!("{}.session.json", storage_key));
// Write atomically using a temp file
let temp_file = session_file.with_extension("tmp");
let content = serde_json::to_string(&snapshot)?;
fs::write(&temp_file, content)?;
fs::rename(&temp_file, &session_file)?;
self.last_session_write.store(now, Ordering::Relaxed);
Ok(())
}
pub fn add_bytes_sent(&self, bytes: u64) {
self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
}
@@ -509,10 +644,120 @@ impl LiveTrafficTracker {
)
}
/// Create a real-time snapshot that merges in-memory data with disk-stored data
/// This provides near real-time updates without waiting for disk flush
pub fn to_realtime_snapshot(&self) -> TrafficSnapshot {
let now = current_timestamp();
let cutoff = now.saturating_sub(60); // Last 60 seconds for mini chart
// Get in-memory counters (not yet flushed to disk)
let in_memory_sent = self.bytes_sent.load(Ordering::Relaxed);
let in_memory_recv = self.bytes_received.load(Ordering::Relaxed);
let in_memory_requests = self.requests.load(Ordering::Relaxed);
// Load disk-stored stats
let storage_key = self
.profile_id
.clone()
.unwrap_or_else(|| self.proxy_id.clone());
let disk_stats = load_traffic_stats(&storage_key);
if let Some(stats) = disk_stats {
// Merge in-memory data with disk data
let total_sent = stats.total_bytes_sent + in_memory_sent;
let total_recv = stats.total_bytes_received + in_memory_recv;
let total_requests = stats.total_requests + in_memory_requests;
// Get current bandwidth from in-memory counters (most recent)
// For the chart, we'll use disk data + current in-memory data point
let mut recent_bandwidth = stats
.bandwidth_history
.iter()
.filter(|dp| dp.timestamp >= cutoff)
.cloned()
.collect::<Vec<_>>();
// Add current second's data if we have in-memory traffic
if in_memory_sent > 0 || in_memory_recv > 0 {
// Check if we already have a data point for this second
if let Some(last) = recent_bandwidth.last_mut() {
if last.timestamp == now {
last.bytes_sent += in_memory_sent;
last.bytes_received += in_memory_recv;
} else {
recent_bandwidth.push(BandwidthDataPoint {
timestamp: now,
bytes_sent: in_memory_sent,
bytes_received: in_memory_recv,
});
}
} else {
recent_bandwidth.push(BandwidthDataPoint {
timestamp: now,
bytes_sent: in_memory_sent,
bytes_received: in_memory_recv,
});
}
}
TrafficSnapshot {
profile_id: self.profile_id.clone(),
session_start: stats.session_start,
last_update: now,
total_bytes_sent: total_sent,
total_bytes_received: total_recv,
total_requests,
current_bytes_sent: in_memory_sent,
current_bytes_received: in_memory_recv,
recent_bandwidth,
}
} else {
// No disk data yet, use only in-memory data
let recent_bandwidth = if in_memory_sent > 0 || in_memory_recv > 0 {
vec![BandwidthDataPoint {
timestamp: now,
bytes_sent: in_memory_sent,
bytes_received: in_memory_recv,
}]
} else {
Vec::new()
};
TrafficSnapshot {
profile_id: self.profile_id.clone(),
session_start: self.session_start,
last_update: now,
total_bytes_sent: in_memory_sent,
total_bytes_received: in_memory_recv,
total_requests: in_memory_requests,
current_bytes_sent: in_memory_sent,
current_bytes_received: in_memory_recv,
recent_bandwidth,
}
}
}
/// Flush current stats to disk and return the delta
pub fn flush_to_disk(&self) -> Result<(u64, u64), Box<dyn std::error::Error>> {
let bytes_sent = self.bytes_sent.swap(0, Ordering::Relaxed);
let bytes_received = self.bytes_received.swap(0, Ordering::Relaxed);
/// Returns None if there's no new data to flush
pub fn flush_to_disk(&self) -> Result<Option<(u64, u64)>, Box<dyn std::error::Error>> {
let bytes_sent = self.bytes_sent.load(Ordering::Relaxed);
let bytes_received = self.bytes_received.load(Ordering::Relaxed);
// Check if there's any new data to flush
let has_domain_updates = {
let domain_map = self.domain_stats.read().ok();
domain_map.is_some_and(|dm| !dm.is_empty())
};
let has_ip_updates = {
let ips = self.ips.read().ok();
ips.is_some_and(|i| !i.is_empty())
};
// Only flush if there's meaningful new data (bytes or domain/IP updates)
if bytes_sent == 0 && bytes_received == 0 && !has_domain_updates && !has_ip_updates {
return Ok(None);
}
// Use profile_id as storage key if available, otherwise fall back to proxy_id
let storage_key = self
@@ -520,6 +765,10 @@ impl LiveTrafficTracker {
.clone()
.unwrap_or_else(|| self.proxy_id.clone());
// Use file locking to prevent concurrent writes from multiple proxy processes
let lock_path = get_traffic_stats_dir().join(format!("{}.lock", storage_key));
let _lock = acquire_file_lock(&lock_path)?;
// Load or create stats using the storage key
let mut stats = load_traffic_stats(&storage_key)
.unwrap_or_else(|| TrafficStats::new(self.proxy_id.clone(), self.profile_id.clone()));
@@ -532,8 +781,15 @@ impl LiveTrafficTracker {
// Update the proxy_id to current session (for debugging/tracking)
stats.proxy_id = self.proxy_id.clone();
// Prune old data before adding new data to keep file size manageable
stats.prune_old_data();
// Reset counters after reading
let sent = self.bytes_sent.swap(0, Ordering::Relaxed);
let received = self.bytes_received.swap(0, Ordering::Relaxed);
// Update bandwidth history
stats.record_bandwidth(bytes_sent, bytes_received);
stats.record_bandwidth(sent, received);
// Update domain stats
if let Ok(mut domain_map) = self.domain_stats.write() {
@@ -551,10 +807,10 @@ impl LiveTrafficTracker {
}
}
// Save to disk
// Save to disk (lock is still held)
save_traffic_stats(&stats)?;
Ok((bytes_sent, bytes_received))
Ok(Some((sent, received)))
}
}
@@ -601,11 +857,36 @@ pub struct FilteredTrafficStats {
/// Get traffic stats for a profile, filtered to a specific time period
/// seconds: number of seconds to include (0 = all time)
/// Merges in-memory data with disk data for real-time updates
pub fn get_traffic_stats_for_period(
profile_id: &str,
seconds: u64,
) -> Option<FilteredTrafficStats> {
let stats = load_traffic_stats(profile_id)?;
// Get in-memory data if available
let in_memory_sent = get_traffic_tracker()
.and_then(|t| {
if t.profile_id.as_deref() == Some(profile_id) {
Some(t.bytes_sent.load(Ordering::Relaxed))
} else {
None
}
})
.unwrap_or(0);
let in_memory_recv = get_traffic_tracker()
.and_then(|t| {
if t.profile_id.as_deref() == Some(profile_id) {
Some(t.bytes_received.load(Ordering::Relaxed))
} else {
None
}
})
.unwrap_or(0);
let mut stats = load_traffic_stats(profile_id)?;
// Merge in-memory counters with disk data for real-time totals
stats.total_bytes_sent += in_memory_sent;
stats.total_bytes_received += in_memory_recv;
let now = current_timestamp();
let cutoff = if seconds == 0 {
@@ -615,14 +896,39 @@ pub fn get_traffic_stats_for_period(
};
// Filter bandwidth history to requested period
let filtered_history: Vec<BandwidthDataPoint> = stats
let mut filtered_history: Vec<BandwidthDataPoint> = stats
.bandwidth_history
.iter()
.filter(|dp| dp.timestamp >= cutoff)
.cloned()
.collect();
// Calculate period totals for bandwidth
// Add current in-memory data point for real-time display
if (seconds == 0 || now.saturating_sub(seconds) <= now)
&& (in_memory_sent > 0 || in_memory_recv > 0)
{
// Check if we already have a data point for this second
if let Some(last) = filtered_history.last_mut() {
if last.timestamp == now {
last.bytes_sent += in_memory_sent;
last.bytes_received += in_memory_recv;
} else {
filtered_history.push(BandwidthDataPoint {
timestamp: now,
bytes_sent: in_memory_sent,
bytes_received: in_memory_recv,
});
}
} else {
filtered_history.push(BandwidthDataPoint {
timestamp: now,
bytes_sent: in_memory_sent,
bytes_received: in_memory_recv,
});
}
}
// Calculate period totals for bandwidth (includes in-memory data)
let period_bytes_sent: u64 = filtered_history.iter().map(|dp| dp.bytes_sent).sum();
let period_bytes_received: u64 = filtered_history.iter().map(|dp| dp.bytes_received).sum();
@@ -664,7 +970,7 @@ pub fn get_traffic_stats_for_period(
Some(FilteredTrafficStats {
profile_id: stats.profile_id,
session_start: stats.session_start,
last_update: stats.last_update,
last_update: now, // Use current time for real-time updates
total_bytes_sent: stats.total_bytes_sent,
total_bytes_received: stats.total_bytes_received,
total_requests: stats.total_requests,
@@ -678,11 +984,102 @@ pub fn get_traffic_stats_for_period(
}
/// Get lightweight traffic snapshot for a profile (for mini charts, only recent 60 seconds)
/// Merges in-memory data with disk data for real-time updates
pub fn get_traffic_snapshot_for_profile(profile_id: &str) -> Option<TrafficSnapshot> {
// First try to get real-time data from active tracker
if let Some(tracker) = get_traffic_tracker() {
let tracker_profile_id = tracker.profile_id.as_deref();
if tracker_profile_id == Some(profile_id) {
return Some(tracker.to_realtime_snapshot());
}
}
// Fall back to disk data
let stats = load_traffic_stats(profile_id)?;
Some(stats.to_snapshot())
}
/// Load session snapshot from disk (written by proxy worker processes)
fn load_session_snapshot(profile_id: &str) -> Option<SessionSnapshot> {
let session_file = get_traffic_stats_dir().join(format!("{}.session.json", profile_id));
if !session_file.exists() {
return None;
}
let content = fs::read_to_string(&session_file).ok()?;
serde_json::from_str::<SessionSnapshot>(&content).ok()
}
/// Get all traffic snapshots with real-time data merged
/// This provides near real-time updates by merging session snapshots with disk data
pub fn get_all_traffic_snapshots_realtime() -> Vec<TrafficSnapshot> {
use std::collections::HashMap;
// Start with disk-stored stats
let mut snapshots: HashMap<String, TrafficSnapshot> = list_traffic_stats()
.into_iter()
.map(|s| {
let key = s.profile_id.clone().unwrap_or_else(|| s.proxy_id.clone());
(key, s.to_snapshot())
})
.collect();
// Try to merge in real-time data from active tracker (if in same process)
if let Some(tracker) = get_traffic_tracker() {
let key = tracker
.profile_id
.clone()
.unwrap_or_else(|| tracker.proxy_id.clone());
let realtime_snapshot = tracker.to_realtime_snapshot();
snapshots.insert(key, realtime_snapshot);
}
// Also merge session snapshots from proxy worker processes
let storage_dir = get_traffic_stats_dir();
if let Ok(entries) = fs::read_dir(&storage_dir) {
for entry in entries.flatten() {
let path = entry.path();
if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
if file_name.ends_with(".session.json") {
if let Some(profile_id) = file_name.strip_suffix(".session.json") {
if let Some(session) = load_session_snapshot(profile_id) {
// Merge session data with disk snapshot
if let Some(snapshot) = snapshots.get_mut(profile_id) {
// Update totals with session data
snapshot.total_bytes_sent = snapshot.total_bytes_sent.max(session.bytes_sent);
snapshot.total_bytes_received =
snapshot.total_bytes_received.max(session.bytes_received);
snapshot.total_requests = snapshot.total_requests.max(session.requests);
snapshot.current_bytes_sent = session.bytes_sent;
snapshot.current_bytes_received = session.bytes_received;
snapshot.last_update = session.timestamp;
} else {
// Create new snapshot from session data
snapshots.insert(
profile_id.to_string(),
TrafficSnapshot {
profile_id: session.profile_id,
session_start: current_timestamp().saturating_sub(60),
last_update: session.timestamp,
total_bytes_sent: session.bytes_sent,
total_bytes_received: session.bytes_received,
total_requests: session.requests,
current_bytes_sent: session.bytes_sent,
current_bytes_received: session.bytes_received,
recent_bandwidth: vec![],
},
);
}
}
}
}
}
}
}
snapshots.into_values().collect()
}
#[cfg(test)]
mod tests {
use super::*;