use super::engine::SyncEngine; use super::subscription::SyncWorkItem; use crate::events; use crate::profile::ProfileManager; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tokio::sync::Mutex; use tokio::time::sleep; static GLOBAL_SCHEDULER: std::sync::Mutex>> = std::sync::Mutex::new(None); pub fn get_global_scheduler() -> Option> { GLOBAL_SCHEDULER.lock().ok().and_then(|g| g.clone()) } pub fn set_global_scheduler(scheduler: Arc) { if let Ok(mut g) = GLOBAL_SCHEDULER.lock() { *g = Some(scheduler); } } #[derive(Debug, Clone)] struct ProfileStopTime { #[allow(dead_code)] stopped_at: Instant, queued: bool, } pub struct SyncScheduler { running: Arc, pending_profiles: Arc>>, pending_proxies: Arc>>, pending_groups: Arc>>, pending_vpns: Arc>>, pending_extensions: Arc>>, pending_extension_groups: Arc>>, pending_tombstones: Arc>>, running_profiles: Arc>>, in_flight_profiles: Arc>>, } impl Default for SyncScheduler { fn default() -> Self { Self::new() } } impl SyncScheduler { pub fn new() -> Self { Self { running: Arc::new(AtomicBool::new(false)), pending_profiles: Arc::new(Mutex::new(HashMap::new())), pending_proxies: Arc::new(Mutex::new(HashSet::new())), pending_groups: Arc::new(Mutex::new(HashSet::new())), pending_vpns: Arc::new(Mutex::new(HashSet::new())), pending_extensions: Arc::new(Mutex::new(HashSet::new())), pending_extension_groups: Arc::new(Mutex::new(HashSet::new())), pending_tombstones: Arc::new(Mutex::new(Vec::new())), running_profiles: Arc::new(Mutex::new(HashSet::new())), in_flight_profiles: Arc::new(Mutex::new(HashSet::new())), } } pub fn is_running(&self) -> bool { self.running.load(Ordering::SeqCst) } pub fn stop(&self) { self.running.store(false, Ordering::SeqCst); } /// Check if any sync operation is currently in progress pub async fn is_sync_in_progress(&self) -> bool { let in_flight = self.in_flight_profiles.lock().await; if !in_flight.is_empty() { return true; } drop(in_flight); let pending_profiles = self.pending_profiles.lock().await; if !pending_profiles.is_empty() { return true; } drop(pending_profiles); let pending_proxies = self.pending_proxies.lock().await; if !pending_proxies.is_empty() { return true; } drop(pending_proxies); let pending_groups = self.pending_groups.lock().await; if !pending_groups.is_empty() { return true; } drop(pending_groups); let pending_vpns = self.pending_vpns.lock().await; if !pending_vpns.is_empty() { return true; } drop(pending_vpns); let pending_extensions = self.pending_extensions.lock().await; if !pending_extensions.is_empty() { return true; } drop(pending_extensions); let pending_extension_groups = self.pending_extension_groups.lock().await; if !pending_extension_groups.is_empty() { return true; } drop(pending_extension_groups); let pending_tombstones = self.pending_tombstones.lock().await; if !pending_tombstones.is_empty() { return true; } false } pub async fn mark_profile_running(&self, profile_id: &str) { let mut running = self.running_profiles.lock().await; running.insert(profile_id.to_string()); log::debug!("Marked profile {} as running", profile_id); } pub async fn mark_profile_stopped(&self, profile_id: &str) { let mut running = self.running_profiles.lock().await; running.remove(profile_id); log::debug!("Marked profile {} as stopped", profile_id); let mut pending = self.pending_profiles.lock().await; if pending.contains_key(profile_id) { // Set stopped_at to past so it syncs immediately pending.insert( profile_id.to_string(), ProfileStopTime { stopped_at: Instant::now() - Duration::from_secs(3), queued: true, }, ); log::debug!( "Profile {} has pending sync, will execute immediately", profile_id ); } } pub async fn is_profile_running(&self, profile_id: &str) -> bool { // Check our internal tracking (authoritative — immediately updated by mark_profile_stopped) let running = self.running_profiles.lock().await; if running.contains(profile_id) { return true; } drop(running); // Check if locked by another device (profile in use remotely) if crate::team_lock::PROFILE_LOCK .is_locked_by_another(profile_id) .await { log::debug!( "Profile {} is locked on another device, treating as running", profile_id ); return true; } false } pub async fn queue_profile_sync(&self, profile_id: String) { self.queue_profile_sync_internal(profile_id).await; } pub async fn queue_profile_sync_immediate(&self, profile_id: String) { self.queue_profile_sync_internal(profile_id).await; } async fn queue_profile_sync_internal(&self, profile_id: String) { let is_running = self.is_profile_running(&profile_id).await; let mut pending = self.pending_profiles.lock().await; if is_running { // Profile is running - queue for after it stops pending.insert( profile_id.clone(), ProfileStopTime { stopped_at: Instant::now(), queued: true, }, ); log::debug!( "Profile {} is running, queued sync for after stop", profile_id ); } else { // Profile is not running - sync immediately (set stopped_at to past) pending.insert( profile_id.clone(), ProfileStopTime { stopped_at: Instant::now() - Duration::from_secs(3), queued: true, }, ); log::debug!("Profile {} queued for immediate sync", profile_id); } } pub async fn queue_proxy_sync(&self, proxy_id: String) { let mut pending = self.pending_proxies.lock().await; pending.insert(proxy_id); } pub async fn queue_vpn_sync(&self, vpn_id: String) { let mut pending = self.pending_vpns.lock().await; pending.insert(vpn_id); } pub async fn queue_group_sync(&self, group_id: String) { let mut pending = self.pending_groups.lock().await; pending.insert(group_id); } pub async fn queue_extension_sync(&self, extension_id: String) { let mut pending = self.pending_extensions.lock().await; pending.insert(extension_id); } pub async fn queue_extension_group_sync(&self, extension_group_id: String) { let mut pending = self.pending_extension_groups.lock().await; pending.insert(extension_group_id); } pub async fn queue_tombstone(&self, entity_type: String, entity_id: String) { let mut pending = self.pending_tombstones.lock().await; if !pending .iter() .any(|(t, i)| t == &entity_type && i == &entity_id) { pending.push((entity_type, entity_id)); } } pub async fn sync_all_enabled_profiles(&self, _app_handle: &tauri::AppHandle) { log::info!("Starting initial sync for all enabled profiles..."); let profiles = { let profile_manager = ProfileManager::instance(); match profile_manager.list_profiles() { Ok(p) => p, Err(e) => { log::error!("Failed to list profiles for initial sync: {e}"); return; } } }; let sync_enabled_profiles: Vec<_> = profiles .into_iter() .filter(|p| p.is_sync_enabled()) .collect(); if sync_enabled_profiles.is_empty() { log::debug!("No sync-enabled profiles found"); return; } log::info!( "Found {} sync-enabled profiles, queueing for sync", sync_enabled_profiles.len() ); for profile in sync_enabled_profiles { let profile_id = profile.id.to_string(); let is_running = profile.process_id.is_some(); let is_team_locked = crate::team_lock::TEAM_LOCK .is_locked_by_another(&profile_id) .await; let should_wait = is_running || is_team_locked; // Track running state in the scheduler if is_running { self.mark_profile_running(&profile_id).await; } if should_wait { log::info!( "Profile '{}' is {} — will sync after it becomes available", profile.name, if is_running { "running locally" } else { "locked by a team member" } ); } // Emit initial status let _ = events::emit( "profile-sync-status", serde_json::json!({ "profile_id": profile_id, "status": if should_wait { "waiting" } else { "syncing" } }), ); // Queue for sync — running profiles will be deferred by the scheduler self.queue_profile_sync_immediate(profile_id).await; } } pub async fn start( self: Arc, app_handle: tauri::AppHandle, mut work_rx: mpsc::UnboundedReceiver, ) { if self.running.swap(true, Ordering::SeqCst) { return; } let scheduler = self.clone(); let app_handle_clone = app_handle.clone(); tokio::spawn(async move { while scheduler.running.load(Ordering::SeqCst) { tokio::select! { Some(work_item) = work_rx.recv() => { match work_item { SyncWorkItem::Profile(id) => scheduler.queue_profile_sync(id).await, SyncWorkItem::Proxy(id) => scheduler.queue_proxy_sync(id).await, SyncWorkItem::Group(id) => scheduler.queue_group_sync(id).await, SyncWorkItem::Vpn(id) => scheduler.queue_vpn_sync(id).await, SyncWorkItem::Extension(id) => scheduler.queue_extension_sync(id).await, SyncWorkItem::ExtensionGroup(id) => scheduler.queue_extension_group_sync(id).await, SyncWorkItem::Tombstone(entity_type, entity_id) => { scheduler.queue_tombstone(entity_type, entity_id).await } } } _ = sleep(Duration::from_millis(2000)) => { scheduler.process_pending(&app_handle_clone).await; } } } log::info!("Sync scheduler stopped"); }); } async fn process_pending(&self, app_handle: &tauri::AppHandle) { self.process_pending_profiles(app_handle).await; self.process_pending_proxies(app_handle).await; self.process_pending_groups(app_handle).await; self.process_pending_vpns(app_handle).await; self.process_pending_extensions(app_handle).await; self.process_pending_extension_groups(app_handle).await; self.process_pending_tombstones(app_handle).await; } async fn process_pending_profiles(&self, app_handle: &tauri::AppHandle) { let profiles_to_sync: Vec = { let mut pending = self.pending_profiles.lock().await; let running = self.running_profiles.lock().await; let in_flight = self.in_flight_profiles.lock().await; // Sync immediately if not running and not in-flight (no delay check) let ready: Vec = pending .iter() .filter(|(id, stop_time)| { !running.contains(*id) && !in_flight.contains(*id) && stop_time.queued }) .map(|(id, _)| id.clone()) .collect(); for id in &ready { pending.remove(id); } ready }; // Mark all profiles as in-flight and filter out duplicates let mut to_sync = Vec::new(); for profile_id in profiles_to_sync { let mut in_flight = self.in_flight_profiles.lock().await; if in_flight.contains(&profile_id) { log::debug!("Profile {} already in-flight, skipping", profile_id); continue; } in_flight.insert(profile_id.clone()); to_sync.push(profile_id); } // Sync all profiles in parallel let mut sync_set = tokio::task::JoinSet::new(); for profile_id in to_sync { let app = app_handle.clone(); let in_flight = self.in_flight_profiles.clone(); sync_set.spawn(async move { log::info!("Executing queued sync for profile {}", profile_id); let _ = events::emit( "profile-sync-status", serde_json::json!({ "profile_id": profile_id, "status": "syncing" }), ); let profile_to_sync = { let profile_manager = ProfileManager::instance(); profile_manager.list_profiles().ok().and_then(|profiles| { profiles .into_iter() .find(|p| p.id.to_string() == profile_id && p.is_sync_enabled()) }) }; let Some(profile) = profile_to_sync else { let mut inf = in_flight.lock().await; inf.remove(&profile_id); return; }; let result = match SyncEngine::create_from_settings(&app).await { Ok(engine) => engine.sync_profile(&app, &profile).await, Err(e) => { log::error!("Failed to create sync engine: {}", e); Err(super::types::SyncError::NotConfigured) } }; { let mut inf = in_flight.lock().await; inf.remove(&profile_id); } match result { Ok(()) => { log::info!("Profile {} synced successfully", profile_id); let _ = events::emit( "profile-sync-status", serde_json::json!({ "profile_id": profile_id, "status": "synced" }), ); } Err(e) => { log::error!("Failed to sync profile {}: {}", profile_id, e); let _ = events::emit( "profile-sync-status", serde_json::json!({ "profile_id": profile_id, "status": "error", "error": e.to_string() }), ); } } }); } // Wait for all parallel syncs to finish (only if we actually spawned any) if !sync_set.is_empty() { while let Some(result) = sync_set.join_next().await { if let Err(e) = result { log::error!("Profile sync task panicked: {e}"); } } } } async fn process_pending_proxies(&self, app_handle: &tauri::AppHandle) { let proxies_to_sync: Vec = { let mut pending = self.pending_proxies.lock().await; let list: Vec = pending.drain().collect(); list }; if proxies_to_sync.is_empty() { return; } match SyncEngine::create_from_settings(app_handle).await { Ok(engine) => { for proxy_id in proxies_to_sync { log::info!("Syncing proxy {}", proxy_id); let _ = events::emit( "proxy-sync-status", serde_json::json!({ "id": proxy_id, "status": "syncing" }), ); match engine .sync_proxy_by_id_with_handle(&proxy_id, app_handle) .await { Ok(()) => { let _ = events::emit( "proxy-sync-status", serde_json::json!({ "id": proxy_id, "status": "synced" }), ); } Err(e) => { log::error!("Failed to sync proxy {}: {}", proxy_id, e); let _ = events::emit( "proxy-sync-status", serde_json::json!({ "id": proxy_id, "status": "error", "error": e.to_string() }), ); } } } // Check if all sync work is complete after proxies finish } Err(e) => { log::error!("Failed to create sync engine: {}", e); } } } async fn process_pending_groups(&self, app_handle: &tauri::AppHandle) { let groups_to_sync: Vec = { let mut pending = self.pending_groups.lock().await; let list: Vec = pending.drain().collect(); list }; if groups_to_sync.is_empty() { return; } match SyncEngine::create_from_settings(app_handle).await { Ok(engine) => { for group_id in groups_to_sync { log::info!("Syncing group {}", group_id); let _ = events::emit( "group-sync-status", serde_json::json!({ "id": group_id, "status": "syncing" }), ); match engine .sync_group_by_id_with_handle(&group_id, app_handle) .await { Ok(()) => { let _ = events::emit( "group-sync-status", serde_json::json!({ "id": group_id, "status": "synced" }), ); } Err(e) => { log::error!("Failed to sync group {}: {}", group_id, e); let _ = events::emit( "group-sync-status", serde_json::json!({ "id": group_id, "status": "error", "error": e.to_string() }), ); } } } // Check if all sync work is complete after groups finish } Err(e) => { log::error!("Failed to create sync engine: {}", e); } } } async fn process_pending_vpns(&self, app_handle: &tauri::AppHandle) { let vpns_to_sync: Vec = { let mut pending = self.pending_vpns.lock().await; let list: Vec = pending.drain().collect(); list }; if vpns_to_sync.is_empty() { return; } match SyncEngine::create_from_settings(app_handle).await { Ok(engine) => { for vpn_id in vpns_to_sync { log::info!("Syncing VPN {}", vpn_id); let _ = events::emit( "vpn-sync-status", serde_json::json!({ "id": vpn_id, "status": "syncing" }), ); match engine.sync_vpn_by_id_with_handle(&vpn_id, app_handle).await { Ok(()) => { let _ = events::emit( "vpn-sync-status", serde_json::json!({ "id": vpn_id, "status": "synced" }), ); } Err(e) => { log::error!("Failed to sync VPN {}: {}", vpn_id, e); let _ = events::emit( "vpn-sync-status", serde_json::json!({ "id": vpn_id, "status": "error", "error": e.to_string() }), ); } } } } Err(e) => { log::error!("Failed to create sync engine: {}", e); } } } async fn process_pending_extensions(&self, app_handle: &tauri::AppHandle) { let extensions_to_sync: Vec = { let mut pending = self.pending_extensions.lock().await; let list: Vec = pending.drain().collect(); list }; if extensions_to_sync.is_empty() { return; } match SyncEngine::create_from_settings(app_handle).await { Ok(engine) => { for ext_id in extensions_to_sync { log::info!("Syncing extension {}", ext_id); if let Err(e) = engine .sync_extension_by_id_with_handle(&ext_id, app_handle) .await { log::error!("Failed to sync extension {}: {}", ext_id, e); } } } Err(e) => { log::error!("Failed to create sync engine: {}", e); } } } async fn process_pending_extension_groups(&self, app_handle: &tauri::AppHandle) { let groups_to_sync: Vec = { let mut pending = self.pending_extension_groups.lock().await; let list: Vec = pending.drain().collect(); list }; if groups_to_sync.is_empty() { return; } match SyncEngine::create_from_settings(app_handle).await { Ok(engine) => { for group_id in groups_to_sync { log::info!("Syncing extension group {}", group_id); if let Err(e) = engine .sync_extension_group_by_id_with_handle(&group_id, app_handle) .await { log::error!("Failed to sync extension group {}: {}", group_id, e); } } } Err(e) => { log::error!("Failed to create sync engine: {}", e); } } } async fn process_pending_tombstones(&self, _app_handle: &tauri::AppHandle) { let tombstones: Vec<(String, String)> = { let mut pending = self.pending_tombstones.lock().await; std::mem::take(&mut *pending) }; if tombstones.is_empty() { return; } for (entity_type, entity_id) in tombstones { log::info!("Processing tombstone for {} {}", entity_type, entity_id); match entity_type.as_str() { "profile" => { let profile_manager = ProfileManager::instance(); let has_profile = { if let Ok(profiles) = profile_manager.list_profiles() { let profile_uuid = uuid::Uuid::parse_str(&entity_id).ok(); profile_uuid.is_some_and(|uuid| profiles.iter().any(|p| p.id == uuid)) } else { false } }; if has_profile { log::info!( "Profile {} was deleted remotely, deleting locally", entity_id ); if let Err(e) = profile_manager.delete_profile_local_only(&entity_id) { log::warn!("Failed to delete tombstoned profile {}: {}", entity_id, e); } } } "proxy" => { let proxy_manager = &crate::proxy_manager::PROXY_MANAGER; let proxies = proxy_manager.get_stored_proxies(); if let Some(proxy) = proxies.iter().find(|p| p.id == entity_id) { if proxy.sync_enabled { log::info!("Proxy {} was deleted remotely, deleting locally", entity_id); let proxy_file = proxy_manager.get_proxy_file_path(&entity_id); if proxy_file.exists() { let _ = std::fs::remove_file(&proxy_file); } proxy_manager.remove_from_memory(&entity_id); let _ = events::emit("stored-proxies-changed", ()); } } } "group" => { let group_manager = crate::group_manager::GROUP_MANAGER.lock().unwrap(); let groups = group_manager.get_all_groups().unwrap_or_default(); if let Some(group) = groups.iter().find(|g| g.id == entity_id) { if group.sync_enabled { log::info!("Group {} was deleted remotely, deleting locally", entity_id); let _ = group_manager.delete_group_internal(&entity_id); let _ = events::emit("groups-changed", ()); } } } "vpn" => { let storage = crate::vpn::VPN_STORAGE.lock().unwrap(); if let Ok(vpn) = storage.load_config(&entity_id) { if vpn.sync_enabled { log::info!("VPN {} was deleted remotely, deleting locally", entity_id); let _ = storage.delete_config(&entity_id); let _ = events::emit("vpn-configs-changed", ()); } } } "extension" => { let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap(); if let Ok(ext) = manager.get_extension(&entity_id) { if ext.sync_enabled { log::info!( "Extension {} was deleted remotely, deleting locally", entity_id ); let _ = manager.delete_extension_internal(&entity_id); let _ = events::emit("extensions-changed", ()); } } } "extension_group" => { let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap(); if let Ok(group) = manager.get_group(&entity_id) { if group.sync_enabled { log::info!( "Extension group {} was deleted remotely, deleting locally", entity_id ); let _ = manager.delete_group_internal(&entity_id); let _ = events::emit("extensions-changed", ()); } } } _ => {} } } } }