refactor: sync cleanup

This commit is contained in:
zhom
2026-05-10 21:38:24 +04:00
parent 722aaecbbe
commit b5f000849f
5 changed files with 223 additions and 93 deletions
+67 -42
View File
@@ -1479,103 +1479,128 @@ impl McpServer {
.unwrap_or("<none>");
log::info!("[mcp] tools/call name={tool_name} profile_id={profile_id}");
let started = std::time::Instant::now();
let result = self.dispatch_tool_call(tool_name, &arguments).await;
let elapsed_ms = started.elapsed().as_millis();
match &result {
Ok(_) => {
log::info!(
"[mcp] tools/call name={tool_name} profile_id={profile_id} -> ok ({elapsed_ms} ms)"
);
}
Err(e) => {
log::warn!(
"[mcp] tools/call name={tool_name} profile_id={profile_id} -> error code={} msg={:?} ({elapsed_ms} ms)",
e.code,
e.message
);
}
}
result
}
async fn dispatch_tool_call(
&self,
tool_name: &str,
arguments: &serde_json::Value,
) -> Result<serde_json::Value, McpError> {
match tool_name {
"list_profiles" => self.handle_list_profiles().await,
"get_profile" => self.handle_get_profile(&arguments).await,
"get_profile" => self.handle_get_profile(arguments).await,
"run_profile" => {
Self::require_paid_subscription("Browser automation").await?;
self.handle_run_profile(&arguments).await
self.handle_run_profile(arguments).await
}
"kill_profile" => self.handle_kill_profile(&arguments).await,
"create_profile" => self.handle_create_profile(&arguments).await,
"update_profile" => self.handle_update_profile(&arguments).await,
"delete_profile" => self.handle_delete_profile(&arguments).await,
"kill_profile" => self.handle_kill_profile(arguments).await,
"create_profile" => self.handle_create_profile(arguments).await,
"update_profile" => self.handle_update_profile(arguments).await,
"delete_profile" => self.handle_delete_profile(arguments).await,
"list_tags" => self.handle_list_tags().await,
"list_proxies" => self.handle_list_proxies().await,
"get_profile_status" => self.handle_get_profile_status(&arguments).await,
"get_profile_status" => self.handle_get_profile_status(arguments).await,
// Group management
"list_groups" => self.handle_list_groups().await,
"get_group" => self.handle_get_group(&arguments).await,
"create_group" => self.handle_create_group(&arguments).await,
"update_group" => self.handle_update_group(&arguments).await,
"delete_group" => self.handle_delete_group(&arguments).await,
"assign_profiles_to_group" => self.handle_assign_profiles_to_group(&arguments).await,
"get_group" => self.handle_get_group(arguments).await,
"create_group" => self.handle_create_group(arguments).await,
"update_group" => self.handle_update_group(arguments).await,
"delete_group" => self.handle_delete_group(arguments).await,
"assign_profiles_to_group" => self.handle_assign_profiles_to_group(arguments).await,
// Full proxy management
"get_proxy" => self.handle_get_proxy(&arguments).await,
"create_proxy" => self.handle_create_proxy(&arguments).await,
"update_proxy" => self.handle_update_proxy(&arguments).await,
"delete_proxy" => self.handle_delete_proxy(&arguments).await,
"get_proxy" => self.handle_get_proxy(arguments).await,
"create_proxy" => self.handle_create_proxy(arguments).await,
"update_proxy" => self.handle_update_proxy(arguments).await,
"delete_proxy" => self.handle_delete_proxy(arguments).await,
// Proxy import/export
"export_proxies" => self.handle_export_proxies(&arguments).await,
"import_proxies" => self.handle_import_proxies(&arguments).await,
"export_proxies" => self.handle_export_proxies(arguments).await,
"import_proxies" => self.handle_import_proxies(arguments).await,
// VPN management
"import_vpn" => self.handle_import_vpn(&arguments).await,
"import_vpn" => self.handle_import_vpn(arguments).await,
"list_vpn_configs" => self.handle_list_vpn_configs().await,
"delete_vpn" => self.handle_delete_vpn(&arguments).await,
"connect_vpn" => self.handle_connect_vpn(&arguments).await,
"disconnect_vpn" => self.handle_disconnect_vpn(&arguments).await,
"get_vpn_status" => self.handle_get_vpn_status(&arguments).await,
"delete_vpn" => self.handle_delete_vpn(arguments).await,
"connect_vpn" => self.handle_connect_vpn(arguments).await,
"disconnect_vpn" => self.handle_disconnect_vpn(arguments).await,
"get_vpn_status" => self.handle_get_vpn_status(arguments).await,
// Fingerprint management
"get_profile_fingerprint" => self.handle_get_profile_fingerprint(&arguments).await,
"update_profile_fingerprint" => self.handle_update_profile_fingerprint(&arguments).await,
"get_profile_fingerprint" => self.handle_get_profile_fingerprint(arguments).await,
"update_profile_fingerprint" => self.handle_update_profile_fingerprint(arguments).await,
"update_profile_proxy_bypass_rules" => {
self
.handle_update_profile_proxy_bypass_rules(&arguments)
.handle_update_profile_proxy_bypass_rules(arguments)
.await
}
// DNS blocklist management
"update_profile_dns_blocklist" => self.handle_update_profile_dns_blocklist(&arguments).await,
"update_profile_dns_blocklist" => self.handle_update_profile_dns_blocklist(arguments).await,
"get_dns_blocklist_status" => self.handle_get_dns_blocklist_status().await,
// Extension management
"list_extensions" => self.handle_list_extensions().await,
"list_extension_groups" => self.handle_list_extension_groups().await,
"create_extension_group" => self.handle_create_extension_group(&arguments).await,
"delete_extension" => self.handle_delete_extension_mcp(&arguments).await,
"delete_extension_group" => self.handle_delete_extension_group_mcp(&arguments).await,
"create_extension_group" => self.handle_create_extension_group(arguments).await,
"delete_extension" => self.handle_delete_extension_mcp(arguments).await,
"delete_extension_group" => self.handle_delete_extension_group_mcp(arguments).await,
"assign_extension_group_to_profile" => {
self
.handle_assign_extension_group_to_profile(&arguments)
.handle_assign_extension_group_to_profile(arguments)
.await
}
// Team lock tools
"get_team_locks" => self.handle_get_team_locks().await,
"get_team_lock_status" => self.handle_get_team_lock_status(&arguments).await,
"get_team_lock_status" => self.handle_get_team_lock_status(arguments).await,
// Synchronizer tools
"start_sync_session" => {
Self::require_paid_subscription("Synchronizer").await?;
self.handle_start_sync_session(&arguments).await
self.handle_start_sync_session(arguments).await
}
"stop_sync_session" => self.handle_stop_sync_session(&arguments).await,
"stop_sync_session" => self.handle_stop_sync_session(arguments).await,
"get_sync_sessions" => self.handle_get_sync_sessions().await,
"remove_sync_follower" => self.handle_remove_sync_follower(&arguments).await,
"remove_sync_follower" => self.handle_remove_sync_follower(arguments).await,
// Browser interaction tools (require paid subscription)
"navigate" => {
Self::require_paid_subscription("Browser automation").await?;
self.handle_navigate(&arguments).await
self.handle_navigate(arguments).await
}
"screenshot" => {
Self::require_paid_subscription("Browser automation").await?;
self.handle_screenshot(&arguments).await
self.handle_screenshot(arguments).await
}
"evaluate_javascript" => {
Self::require_paid_subscription("Browser automation").await?;
self.handle_evaluate_javascript(&arguments).await
self.handle_evaluate_javascript(arguments).await
}
"click_element" => {
Self::require_paid_subscription("Browser automation").await?;
self.handle_click_element(&arguments).await
self.handle_click_element(arguments).await
}
"type_text" => {
Self::require_paid_subscription("Browser automation").await?;
self.handle_type_text(&arguments).await
self.handle_type_text(arguments).await
}
"get_page_content" => {
Self::require_paid_subscription("Browser automation").await?;
self.handle_get_page_content(&arguments).await
self.handle_get_page_content(arguments).await
}
"get_page_info" => {
Self::require_paid_subscription("Browser automation").await?;
self.handle_get_page_info(&arguments).await
self.handle_get_page_info(arguments).await
}
_ => Err(McpError {
code: -32602,
+30 -8
View File
@@ -645,7 +645,7 @@ impl ProfileManager {
pub fn assign_profiles_to_group(
&self,
app_handle: &tauri::AppHandle,
_app_handle: &tauri::AppHandle,
profile_ids: Vec<String>,
group_id: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
@@ -674,10 +674,8 @@ impl ProfileManager {
if profile.is_sync_enabled() {
if let Some(ref new_group_id) = group_id {
let group_id_clone = new_group_id.clone();
let app_handle_clone = app_handle.clone();
tauri::async_runtime::spawn(async move {
let _ =
crate::sync::enable_group_sync_if_needed(&group_id_clone, &app_handle_clone).await;
let _ = crate::sync::enable_group_sync_if_needed(&group_id_clone).await;
if let Some(scheduler) = crate::sync::get_global_scheduler() {
scheduler.queue_group_sync(group_id_clone).await;
}
@@ -1124,7 +1122,7 @@ impl ProfileManager {
pub async fn update_profile_proxy(
&self,
app_handle: tauri::AppHandle,
_app_handle: tauri::AppHandle,
profile_id: &str,
proxy_id: Option<String>,
) -> Result<BrowserProfile, Box<dyn std::error::Error + Send + Sync>> {
@@ -1165,7 +1163,7 @@ impl ProfileManager {
// Auto-enable sync for new proxy if profile has sync enabled
if profile.is_sync_enabled() {
if let Some(ref new_proxy_id) = proxy_id {
let _ = crate::sync::enable_proxy_sync_if_needed(new_proxy_id, &app_handle).await;
let _ = crate::sync::enable_proxy_sync_if_needed(new_proxy_id).await;
if let Some(scheduler) = crate::sync::get_global_scheduler() {
scheduler.queue_proxy_sync(new_proxy_id.clone()).await;
}
@@ -1242,7 +1240,7 @@ impl ProfileManager {
})?;
// Update VPN and clear proxy (mutual exclusion)
profile.vpn_id = vpn_id;
profile.vpn_id = vpn_id.clone();
profile.proxy_id = None;
self
@@ -1251,6 +1249,16 @@ impl ProfileManager {
format!("Failed to save profile: {e}").into()
})?;
// Auto-enable sync for the new VPN if profile has sync enabled.
if profile.is_sync_enabled() {
if let Some(ref new_vpn_id) = vpn_id {
let _ = crate::sync::enable_vpn_sync_if_needed(new_vpn_id).await;
if let Some(scheduler) = crate::sync::get_global_scheduler() {
scheduler.queue_vpn_sync(new_vpn_id.clone()).await;
}
}
}
if let Err(e) = events::emit("profile-updated", &profile) {
log::warn!("Warning: Failed to emit profile update event: {e}");
}
@@ -1275,9 +1283,23 @@ impl ProfileManager {
.find(|p| p.id == profile_uuid)
.ok_or_else(|| format!("Profile with ID '{profile_id}' not found"))?;
profile.extension_group_id = extension_group_id;
profile.extension_group_id = extension_group_id.clone();
self.save_profile(&profile)?;
// Auto-enable sync for the new extension group if profile has sync
// enabled. The helper is sync internally; we fire-and-forget through
// the async runtime so any I/O doesn't block this caller.
if profile.is_sync_enabled() {
if let Some(new_group_id) = extension_group_id {
tauri::async_runtime::spawn(async move {
let _ = crate::sync::enable_extension_group_sync_if_needed(&new_group_id).await;
if let Some(scheduler) = crate::sync::get_global_scheduler() {
scheduler.queue_extension_group_sync(new_group_id).await;
}
});
}
}
if let Err(e) = events::emit("profile-updated", &profile) {
log::warn!("Failed to emit profile update event: {e}");
}
+36
View File
@@ -830,6 +830,42 @@ impl ProxyManager {
Ok(updated_proxy)
}
/// Update the in-memory `sync_enabled` / `last_sync` fields of a stored
/// proxy and persist the change to disk. Returns the updated proxy or
/// `Err` if the proxy isn't found / is cloud-managed.
///
/// This is the canonical write path for sync-state changes — direct
/// `fs::write` from a sync command would leave the in-memory cache
/// (`stored_proxies`) stale, and the next `get_stored_proxies()` would
/// return the old `sync_enabled`, breaking the UI toggle.
pub fn set_stored_proxy_sync_state(
&self,
proxy_id: &str,
sync_enabled: bool,
last_sync: Option<u64>,
) -> Result<StoredProxy, String> {
let updated_proxy = {
let mut stored_proxies = self.stored_proxies.lock().unwrap();
let proxy = stored_proxies
.get_mut(proxy_id)
.ok_or_else(|| format!("Proxy with ID '{proxy_id}' not found"))?;
if proxy.is_cloud_managed {
return Err("Cannot modify sync for a cloud-managed proxy".to_string());
}
proxy.sync_enabled = sync_enabled;
proxy.last_sync = last_sync;
proxy.clone()
};
self
.save_proxy(&updated_proxy)
.map_err(|e| format!("Failed to save proxy: {e}"))?;
Ok(updated_proxy)
}
// Delete a stored proxy
pub fn delete_stored_proxy(
&self,
+82 -36
View File
@@ -2742,10 +2742,7 @@ pub fn is_group_used_by_synced_profile(group_id: &str) -> bool {
}
/// Enable sync for proxy if not already enabled
pub async fn enable_proxy_sync_if_needed(
proxy_id: &str,
_app_handle: &tauri::AppHandle,
) -> Result<(), String> {
pub async fn enable_proxy_sync_if_needed(proxy_id: &str) -> Result<(), String> {
let proxy_manager = &crate::proxy_manager::PROXY_MANAGER;
let proxies = proxy_manager.get_stored_proxies();
let proxy = proxies
@@ -2754,15 +2751,7 @@ pub async fn enable_proxy_sync_if_needed(
.ok_or_else(|| format!("Proxy with ID '{proxy_id}' not found"))?;
if !proxy.sync_enabled {
let mut updated_proxy = proxy.clone();
updated_proxy.sync_enabled = true;
let proxy_file = proxy_manager.get_proxy_file_path(&proxy.id);
let json = serde_json::to_string_pretty(&updated_proxy)
.map_err(|e| format!("Failed to serialize proxy: {e}"))?;
std::fs::write(&proxy_file, &json)
.map_err(|e| format!("Failed to update proxy file {}: {e}", proxy_file.display()))?;
proxy_manager.set_stored_proxy_sync_state(proxy_id, true, proxy.last_sync)?;
let _ = events::emit("stored-proxies-changed", ());
log::info!("Auto-enabled sync for proxy {}", proxy_id);
}
@@ -2783,10 +2772,7 @@ pub fn is_vpn_used_by_synced_profile(vpn_id: &str) -> bool {
}
/// Enable sync for VPN if not already enabled
pub async fn enable_vpn_sync_if_needed(
vpn_id: &str,
_app_handle: &tauri::AppHandle,
) -> Result<(), String> {
pub async fn enable_vpn_sync_if_needed(vpn_id: &str) -> Result<(), String> {
let vpn = {
let storage = crate::vpn::VPN_STORAGE.lock().unwrap();
storage
@@ -2808,10 +2794,7 @@ pub async fn enable_vpn_sync_if_needed(
}
/// Enable sync for group if not already enabled
pub async fn enable_group_sync_if_needed(
group_id: &str,
_app_handle: &tauri::AppHandle,
) -> Result<(), String> {
pub async fn enable_group_sync_if_needed(group_id: &str) -> Result<(), String> {
let group = {
let group_manager = crate::group_manager::GROUP_MANAGER.lock().unwrap();
let groups = group_manager.get_all_groups().unwrap_or_default();
@@ -2840,6 +2823,66 @@ pub async fn enable_group_sync_if_needed(
Ok(())
}
/// Enable sync for extension group (and its member extensions) if not
/// already enabled. Mirrors the proxy/vpn/group helpers — call from any
/// site where a synced profile gains an `extension_group_id`.
pub async fn enable_extension_group_sync_if_needed(extension_group_id: &str) -> Result<(), String> {
let (group_already_synced, extension_ids) = {
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
let group = manager
.get_group(extension_group_id)
.map_err(|e| format!("Extension group with ID '{extension_group_id}' not found: {e}"))?;
(group.sync_enabled, group.extension_ids.clone())
};
if !group_already_synced {
let mut updated_group = {
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
manager
.get_group(extension_group_id)
.map_err(|e| format!("Failed to load extension group: {e}"))?
};
updated_group.sync_enabled = true;
{
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
manager
.update_group_internal(&updated_group)
.map_err(|e| format!("Failed to update extension group sync: {e}"))?;
}
let _ = events::emit("extensions-changed", ());
log::info!(
"Auto-enabled sync for extension group {}",
extension_group_id
);
}
// Cascade to every extension referenced by the group so the other device
// has the actual extension binaries when it pulls the group.
for ext_id in extension_ids {
let already_synced = {
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
manager
.get_extension(&ext_id)
.ok()
.map(|e| e.sync_enabled)
.unwrap_or(true)
};
if !already_synced {
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
if let Ok(mut ext) = manager.get_extension(&ext_id) {
ext.sync_enabled = true;
if let Err(e) = manager.update_extension_internal(&ext) {
log::warn!("Failed to auto-enable sync for extension {}: {e}", ext_id);
} else {
log::info!("Auto-enabled sync for extension {}", ext_id);
}
}
}
}
Ok(())
}
#[tauri::command]
pub async fn set_profile_sync_mode(
app_handle: tauri::AppHandle,
@@ -2968,26 +3011,39 @@ pub async fn set_profile_sync_mode(
.await;
if let Some(ref proxy_id) = profile.proxy_id {
if let Err(e) = enable_proxy_sync_if_needed(proxy_id, &app_handle).await {
if let Err(e) = enable_proxy_sync_if_needed(proxy_id).await {
log::warn!("Failed to enable sync for proxy {}: {}", proxy_id, e);
} else {
scheduler.queue_proxy_sync(proxy_id.clone()).await;
}
}
if let Some(ref group_id) = profile.group_id {
if let Err(e) = enable_group_sync_if_needed(group_id, &app_handle).await {
if let Err(e) = enable_group_sync_if_needed(group_id).await {
log::warn!("Failed to enable sync for group {}: {}", group_id, e);
} else {
scheduler.queue_group_sync(group_id.clone()).await;
}
}
if let Some(ref vpn_id) = profile.vpn_id {
if let Err(e) = enable_vpn_sync_if_needed(vpn_id, &app_handle).await {
if let Err(e) = enable_vpn_sync_if_needed(vpn_id).await {
log::warn!("Failed to enable sync for VPN {}: {}", vpn_id, e);
} else {
scheduler.queue_vpn_sync(vpn_id.clone()).await;
}
}
if let Some(ref ext_group_id) = profile.extension_group_id {
if let Err(e) = enable_extension_group_sync_if_needed(ext_group_id).await {
log::warn!(
"Failed to enable sync for extension group {}: {}",
ext_group_id,
e
);
} else {
scheduler
.queue_extension_group_sync(ext_group_id.clone())
.await;
}
}
} else {
log::warn!("Scheduler not initialized, sync will not start");
}
@@ -3165,18 +3221,8 @@ pub async fn set_proxy_sync_enabled(
}
}
let mut updated_proxy = proxy.clone();
updated_proxy.sync_enabled = enabled;
if !enabled {
updated_proxy.last_sync = None;
}
let proxy_file = proxy_manager.get_proxy_file_path(&proxy.id);
let json = serde_json::to_string_pretty(&updated_proxy)
.map_err(|e| format!("Failed to serialize proxy: {e}"))?;
std::fs::write(&proxy_file, &json)
.map_err(|e| format!("Failed to update proxy file {}: {e}", proxy_file.display()))?;
let new_last_sync = if enabled { proxy.last_sync } else { None };
proxy_manager.set_stored_proxy_sync_state(&proxy_id, enabled, new_last_sync)?;
let _ = events::emit("stored-proxies-changed", ());
+8 -7
View File
@@ -9,13 +9,14 @@ pub mod types;
pub use client::SyncClient;
pub use encryption::{check_has_e2e_password, delete_e2e_password, set_e2e_password};
pub use engine::{
enable_group_sync_if_needed, enable_proxy_sync_if_needed, enable_sync_for_all_entities,
enable_vpn_sync_if_needed, get_unsynced_entity_counts, is_group_in_use_by_synced_profile,
is_group_used_by_synced_profile, is_proxy_in_use_by_synced_profile,
is_proxy_used_by_synced_profile, is_sync_configured, is_vpn_in_use_by_synced_profile,
is_vpn_used_by_synced_profile, request_profile_sync, set_extension_group_sync_enabled,
set_extension_sync_enabled, set_group_sync_enabled, set_profile_sync_mode,
set_proxy_sync_enabled, set_vpn_sync_enabled, sync_profile, trigger_sync_for_profile, SyncEngine,
enable_extension_group_sync_if_needed, enable_group_sync_if_needed, enable_proxy_sync_if_needed,
enable_sync_for_all_entities, enable_vpn_sync_if_needed, get_unsynced_entity_counts,
is_group_in_use_by_synced_profile, is_group_used_by_synced_profile,
is_proxy_in_use_by_synced_profile, is_proxy_used_by_synced_profile, is_sync_configured,
is_vpn_in_use_by_synced_profile, is_vpn_used_by_synced_profile, request_profile_sync,
set_extension_group_sync_enabled, set_extension_sync_enabled, set_group_sync_enabled,
set_profile_sync_mode, set_proxy_sync_enabled, set_vpn_sync_enabled, sync_profile,
trigger_sync_for_profile, SyncEngine,
};
pub use manifest::{compute_diff, generate_manifest, HashCache, ManifestDiff, SyncManifest};
pub use scheduler::{get_global_scheduler, set_global_scheduler, SyncScheduler};