mirror of
https://github.com/zhom/donutbrowser.git
synced 2026-05-08 11:24:53 +02:00
feat: extension management
This commit is contained in:
@@ -1013,6 +1013,347 @@ impl SyncEngine {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Extension sync
|
||||
|
||||
async fn sync_extension(
|
||||
&self,
|
||||
ext_id: &str,
|
||||
app_handle: Option<&tauri::AppHandle>,
|
||||
) -> SyncResult<()> {
|
||||
let local_ext = {
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
manager.get_extension(ext_id).ok()
|
||||
};
|
||||
|
||||
let remote_key = format!("extensions/{}.json", ext_id);
|
||||
let stat = self.client.stat(&remote_key).await?;
|
||||
|
||||
match (local_ext, stat.exists) {
|
||||
(Some(ext), true) => {
|
||||
let local_updated = ext.last_sync.unwrap_or(0);
|
||||
let remote_updated: DateTime<Utc> = stat
|
||||
.last_modified
|
||||
.as_ref()
|
||||
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.unwrap_or_else(Utc::now);
|
||||
let remote_ts = remote_updated.timestamp() as u64;
|
||||
|
||||
if remote_ts > local_updated {
|
||||
self.download_extension(ext_id, app_handle).await?;
|
||||
} else if local_updated > remote_ts {
|
||||
self.upload_extension(&ext).await?;
|
||||
}
|
||||
}
|
||||
(Some(ext), false) => {
|
||||
self.upload_extension(&ext).await?;
|
||||
}
|
||||
(None, true) => {
|
||||
self.download_extension(ext_id, app_handle).await?;
|
||||
}
|
||||
(None, false) => {
|
||||
log::debug!("Extension {} not found locally or remotely", ext_id);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn upload_extension(&self, ext: &crate::extension_manager::Extension) -> SyncResult<()> {
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
let mut updated_ext = ext.clone();
|
||||
updated_ext.last_sync = Some(now);
|
||||
|
||||
let json = serde_json::to_string_pretty(&updated_ext)
|
||||
.map_err(|e| SyncError::SerializationError(format!("Failed to serialize extension: {e}")))?;
|
||||
|
||||
let remote_key = format!("extensions/{}.json", ext.id);
|
||||
let presign = self
|
||||
.client
|
||||
.presign_upload(&remote_key, Some("application/json"))
|
||||
.await?;
|
||||
self
|
||||
.client
|
||||
.upload_bytes(&presign.url, json.as_bytes(), Some("application/json"))
|
||||
.await?;
|
||||
|
||||
// Also upload the extension file data
|
||||
let file_path = {
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
let file_dir = manager.get_file_dir_public(&ext.id);
|
||||
file_dir.join(&ext.file_name)
|
||||
};
|
||||
|
||||
if file_path.exists() {
|
||||
let file_data = fs::read(&file_path).map_err(|e| {
|
||||
SyncError::IoError(format!(
|
||||
"Failed to read extension file {}: {e}",
|
||||
file_path.display()
|
||||
))
|
||||
})?;
|
||||
|
||||
let file_remote_key = format!("extensions/{}/file/{}", ext.id, ext.file_name);
|
||||
let file_presign = self
|
||||
.client
|
||||
.presign_upload(&file_remote_key, Some("application/octet-stream"))
|
||||
.await?;
|
||||
self
|
||||
.client
|
||||
.upload_bytes(
|
||||
&file_presign.url,
|
||||
&file_data,
|
||||
Some("application/octet-stream"),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Update local extension with new last_sync
|
||||
{
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
if let Err(e) = manager.update_extension_internal(&updated_ext) {
|
||||
log::warn!("Failed to update extension last_sync: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("Extension {} uploaded", ext.id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download_extension(
|
||||
&self,
|
||||
ext_id: &str,
|
||||
app_handle: Option<&tauri::AppHandle>,
|
||||
) -> SyncResult<()> {
|
||||
let remote_key = format!("extensions/{}.json", ext_id);
|
||||
let presign = self.client.presign_download(&remote_key).await?;
|
||||
let data = self.client.download_bytes(&presign.url).await?;
|
||||
|
||||
let mut ext: crate::extension_manager::Extension = serde_json::from_slice(&data)
|
||||
.map_err(|e| SyncError::SerializationError(format!("Failed to parse extension JSON: {e}")))?;
|
||||
|
||||
ext.last_sync = Some(
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs(),
|
||||
);
|
||||
ext.sync_enabled = true;
|
||||
|
||||
// Download the extension file
|
||||
let file_remote_key = format!("extensions/{}/file/{}", ext.id, ext.file_name);
|
||||
let file_stat = self.client.stat(&file_remote_key).await?;
|
||||
if file_stat.exists {
|
||||
let file_presign = self.client.presign_download(&file_remote_key).await?;
|
||||
let file_data = self.client.download_bytes(&file_presign.url).await?;
|
||||
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
let file_dir = manager.get_file_dir_public(&ext.id);
|
||||
drop(manager);
|
||||
|
||||
fs::create_dir_all(&file_dir).map_err(|e| {
|
||||
SyncError::IoError(format!(
|
||||
"Failed to create extension file dir {}: {e}",
|
||||
file_dir.display()
|
||||
))
|
||||
})?;
|
||||
let file_path = file_dir.join(&ext.file_name);
|
||||
fs::write(&file_path, &file_data).map_err(|e| {
|
||||
SyncError::IoError(format!(
|
||||
"Failed to write extension file {}: {e}",
|
||||
file_path.display()
|
||||
))
|
||||
})?;
|
||||
}
|
||||
|
||||
// Save or update local extension
|
||||
{
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
if let Err(e) = manager.upsert_extension_internal(&ext) {
|
||||
log::warn!("Failed to save downloaded extension: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(_handle) = app_handle {
|
||||
let _ = events::emit("extensions-changed", ());
|
||||
}
|
||||
|
||||
log::info!("Extension {} downloaded", ext_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn sync_extension_by_id_with_handle(
|
||||
&self,
|
||||
ext_id: &str,
|
||||
app_handle: &tauri::AppHandle,
|
||||
) -> SyncResult<()> {
|
||||
self.sync_extension(ext_id, Some(app_handle)).await
|
||||
}
|
||||
|
||||
pub async fn delete_extension(&self, ext_id: &str) -> SyncResult<()> {
|
||||
let remote_key = format!("extensions/{}.json", ext_id);
|
||||
let file_prefix = format!("extensions/{}/file/", ext_id);
|
||||
let tombstone_key = format!("tombstones/extensions/{}.json", ext_id);
|
||||
|
||||
// Delete metadata
|
||||
self
|
||||
.client
|
||||
.delete(&remote_key, Some(&tombstone_key))
|
||||
.await?;
|
||||
|
||||
// Delete file data
|
||||
let _ = self.client.delete_prefix(&file_prefix, None).await;
|
||||
|
||||
log::info!("Extension {} deleted from sync", ext_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Extension group sync
|
||||
|
||||
async fn sync_extension_group(
|
||||
&self,
|
||||
group_id: &str,
|
||||
app_handle: Option<&tauri::AppHandle>,
|
||||
) -> SyncResult<()> {
|
||||
let local_group = {
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
manager.get_group(group_id).ok()
|
||||
};
|
||||
|
||||
let remote_key = format!("extension_groups/{}.json", group_id);
|
||||
let stat = self.client.stat(&remote_key).await?;
|
||||
|
||||
match (local_group, stat.exists) {
|
||||
(Some(group), true) => {
|
||||
let local_updated = group.last_sync.unwrap_or(0);
|
||||
let remote_updated: DateTime<Utc> = stat
|
||||
.last_modified
|
||||
.as_ref()
|
||||
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.unwrap_or_else(Utc::now);
|
||||
let remote_ts = remote_updated.timestamp() as u64;
|
||||
|
||||
if remote_ts > local_updated {
|
||||
self.download_extension_group(group_id, app_handle).await?;
|
||||
} else if local_updated > remote_ts {
|
||||
self.upload_extension_group(&group).await?;
|
||||
}
|
||||
}
|
||||
(Some(group), false) => {
|
||||
self.upload_extension_group(&group).await?;
|
||||
}
|
||||
(None, true) => {
|
||||
self.download_extension_group(group_id, app_handle).await?;
|
||||
}
|
||||
(None, false) => {
|
||||
log::debug!("Extension group {} not found locally or remotely", group_id);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn upload_extension_group(
|
||||
&self,
|
||||
group: &crate::extension_manager::ExtensionGroup,
|
||||
) -> SyncResult<()> {
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
let mut updated_group = group.clone();
|
||||
updated_group.last_sync = Some(now);
|
||||
|
||||
let json = serde_json::to_string_pretty(&updated_group).map_err(|e| {
|
||||
SyncError::SerializationError(format!("Failed to serialize extension group: {e}"))
|
||||
})?;
|
||||
|
||||
let remote_key = format!("extension_groups/{}.json", group.id);
|
||||
let presign = self
|
||||
.client
|
||||
.presign_upload(&remote_key, Some("application/json"))
|
||||
.await?;
|
||||
self
|
||||
.client
|
||||
.upload_bytes(&presign.url, json.as_bytes(), Some("application/json"))
|
||||
.await?;
|
||||
|
||||
// Update local group with new last_sync
|
||||
{
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
if let Err(e) = manager.update_group_internal(&updated_group) {
|
||||
log::warn!("Failed to update extension group last_sync: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("Extension group {} uploaded", group.id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download_extension_group(
|
||||
&self,
|
||||
group_id: &str,
|
||||
app_handle: Option<&tauri::AppHandle>,
|
||||
) -> SyncResult<()> {
|
||||
let remote_key = format!("extension_groups/{}.json", group_id);
|
||||
let presign = self.client.presign_download(&remote_key).await?;
|
||||
let data = self.client.download_bytes(&presign.url).await?;
|
||||
|
||||
let mut group: crate::extension_manager::ExtensionGroup = serde_json::from_slice(&data)
|
||||
.map_err(|e| {
|
||||
SyncError::SerializationError(format!("Failed to parse extension group JSON: {e}"))
|
||||
})?;
|
||||
|
||||
group.last_sync = Some(
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs(),
|
||||
);
|
||||
group.sync_enabled = true;
|
||||
|
||||
// Save or update local group
|
||||
{
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
if let Err(e) = manager.upsert_group_internal(&group) {
|
||||
log::warn!("Failed to save downloaded extension group: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(_handle) = app_handle {
|
||||
let _ = events::emit("extensions-changed", ());
|
||||
}
|
||||
|
||||
log::info!("Extension group {} downloaded", group_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn sync_extension_group_by_id_with_handle(
|
||||
&self,
|
||||
group_id: &str,
|
||||
app_handle: &tauri::AppHandle,
|
||||
) -> SyncResult<()> {
|
||||
self.sync_extension_group(group_id, Some(app_handle)).await
|
||||
}
|
||||
|
||||
pub async fn delete_extension_group(&self, group_id: &str) -> SyncResult<()> {
|
||||
let remote_key = format!("extension_groups/{}.json", group_id);
|
||||
let tombstone_key = format!("tombstones/extension_groups/{}.json", group_id);
|
||||
|
||||
self
|
||||
.client
|
||||
.delete(&remote_key, Some(&tombstone_key))
|
||||
.await?;
|
||||
|
||||
log::info!("Extension group {} deleted from sync", group_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Download a profile from S3 if it exists remotely but not locally
|
||||
pub async fn download_profile_if_missing(
|
||||
&self,
|
||||
@@ -2093,6 +2434,8 @@ pub struct UnsyncedEntityCounts {
|
||||
pub proxies: usize,
|
||||
pub groups: usize,
|
||||
pub vpns: usize,
|
||||
pub extensions: usize,
|
||||
pub extension_groups: usize,
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
@@ -2121,10 +2464,28 @@ pub fn get_unsynced_entity_counts() -> Result<UnsyncedEntityCounts, String> {
|
||||
configs.iter().filter(|c| !c.sync_enabled).count()
|
||||
};
|
||||
|
||||
let extension_count = {
|
||||
let em = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
let exts = em
|
||||
.list_extensions()
|
||||
.map_err(|e| format!("Failed to list extensions: {e}"))?;
|
||||
exts.iter().filter(|e| !e.sync_enabled).count()
|
||||
};
|
||||
|
||||
let extension_group_count = {
|
||||
let em = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
let groups = em
|
||||
.list_groups()
|
||||
.map_err(|e| format!("Failed to list extension groups: {e}"))?;
|
||||
groups.iter().filter(|g| !g.sync_enabled).count()
|
||||
};
|
||||
|
||||
Ok(UnsyncedEntityCounts {
|
||||
proxies: proxy_count,
|
||||
groups: group_count,
|
||||
vpns: vpn_count,
|
||||
extensions: extension_count,
|
||||
extension_groups: extension_group_count,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2169,5 +2530,147 @@ pub async fn enable_sync_for_all_entities(app_handle: tauri::AppHandle) -> Resul
|
||||
}
|
||||
}
|
||||
|
||||
// Enable sync for all unsynced extensions
|
||||
{
|
||||
let exts = {
|
||||
let em = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
em.list_extensions()
|
||||
.map_err(|e| format!("Failed to list extensions: {e}"))?
|
||||
};
|
||||
for ext in &exts {
|
||||
if !ext.sync_enabled {
|
||||
set_extension_sync_enabled(app_handle.clone(), ext.id.clone(), true).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Enable sync for all unsynced extension groups
|
||||
{
|
||||
let groups = {
|
||||
let em = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
em.list_groups()
|
||||
.map_err(|e| format!("Failed to list extension groups: {e}"))?
|
||||
};
|
||||
for group in &groups {
|
||||
if !group.sync_enabled {
|
||||
set_extension_group_sync_enabled(app_handle.clone(), group.id.clone(), true).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn set_extension_sync_enabled(
|
||||
app_handle: tauri::AppHandle,
|
||||
extension_id: String,
|
||||
enabled: bool,
|
||||
) -> Result<(), String> {
|
||||
let ext = {
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
manager
|
||||
.get_extension(&extension_id)
|
||||
.map_err(|e| format!("Extension with ID '{extension_id}' not found: {e}"))?
|
||||
};
|
||||
|
||||
if enabled {
|
||||
let cloud_logged_in = crate::cloud_auth::CLOUD_AUTH.is_logged_in().await;
|
||||
if !cloud_logged_in {
|
||||
let manager = SettingsManager::instance();
|
||||
let settings = manager
|
||||
.load_settings()
|
||||
.map_err(|e| format!("Failed to load settings: {e}"))?;
|
||||
if settings.sync_server_url.is_none() {
|
||||
return Err(
|
||||
"Sync server not configured. Please configure sync settings first.".to_string(),
|
||||
);
|
||||
}
|
||||
let token = manager.get_sync_token(&app_handle).await.ok().flatten();
|
||||
if token.is_none() {
|
||||
return Err("Sync token not configured. Please configure sync settings first.".to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut updated_ext = ext;
|
||||
updated_ext.sync_enabled = enabled;
|
||||
if !enabled {
|
||||
updated_ext.last_sync = None;
|
||||
}
|
||||
|
||||
{
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
manager
|
||||
.update_extension_internal(&updated_ext)
|
||||
.map_err(|e| format!("Failed to update extension sync: {e}"))?;
|
||||
}
|
||||
|
||||
let _ = events::emit("extensions-changed", ());
|
||||
|
||||
if enabled {
|
||||
if let Some(scheduler) = super::get_global_scheduler() {
|
||||
scheduler.queue_extension_sync(extension_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn set_extension_group_sync_enabled(
|
||||
app_handle: tauri::AppHandle,
|
||||
extension_group_id: String,
|
||||
enabled: bool,
|
||||
) -> Result<(), String> {
|
||||
let group = {
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
manager
|
||||
.get_group(&extension_group_id)
|
||||
.map_err(|e| format!("Extension group with ID '{extension_group_id}' not found: {e}"))?
|
||||
};
|
||||
|
||||
if enabled {
|
||||
let cloud_logged_in = crate::cloud_auth::CLOUD_AUTH.is_logged_in().await;
|
||||
if !cloud_logged_in {
|
||||
let manager = SettingsManager::instance();
|
||||
let settings = manager
|
||||
.load_settings()
|
||||
.map_err(|e| format!("Failed to load settings: {e}"))?;
|
||||
if settings.sync_server_url.is_none() {
|
||||
return Err(
|
||||
"Sync server not configured. Please configure sync settings first.".to_string(),
|
||||
);
|
||||
}
|
||||
let token = manager.get_sync_token(&app_handle).await.ok().flatten();
|
||||
if token.is_none() {
|
||||
return Err("Sync token not configured. Please configure sync settings first.".to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut updated_group = group;
|
||||
updated_group.sync_enabled = enabled;
|
||||
if !enabled {
|
||||
updated_group.last_sync = None;
|
||||
}
|
||||
|
||||
{
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
manager
|
||||
.update_group_internal(&updated_group)
|
||||
.map_err(|e| format!("Failed to update extension group sync: {e}"))?;
|
||||
}
|
||||
|
||||
let _ = events::emit("extensions-changed", ());
|
||||
|
||||
if enabled {
|
||||
if let Some(scheduler) = super::get_global_scheduler() {
|
||||
scheduler
|
||||
.queue_extension_group_sync(extension_group_id)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -13,9 +13,9 @@ pub use engine::{
|
||||
enable_vpn_sync_if_needed, get_unsynced_entity_counts, is_group_in_use_by_synced_profile,
|
||||
is_group_used_by_synced_profile, is_proxy_in_use_by_synced_profile,
|
||||
is_proxy_used_by_synced_profile, is_sync_configured, is_vpn_in_use_by_synced_profile,
|
||||
is_vpn_used_by_synced_profile, request_profile_sync, set_group_sync_enabled,
|
||||
set_profile_sync_mode, set_proxy_sync_enabled, set_vpn_sync_enabled, sync_profile,
|
||||
trigger_sync_for_profile, SyncEngine,
|
||||
is_vpn_used_by_synced_profile, request_profile_sync, set_extension_group_sync_enabled,
|
||||
set_extension_sync_enabled, set_group_sync_enabled, set_profile_sync_mode,
|
||||
set_proxy_sync_enabled, set_vpn_sync_enabled, sync_profile, trigger_sync_for_profile, SyncEngine,
|
||||
};
|
||||
pub use manifest::{compute_diff, generate_manifest, HashCache, ManifestDiff, SyncManifest};
|
||||
pub use scheduler::{get_global_scheduler, set_global_scheduler, SyncScheduler};
|
||||
|
||||
@@ -35,6 +35,8 @@ pub struct SyncScheduler {
|
||||
pending_proxies: Arc<Mutex<HashSet<String>>>,
|
||||
pending_groups: Arc<Mutex<HashSet<String>>>,
|
||||
pending_vpns: Arc<Mutex<HashSet<String>>>,
|
||||
pending_extensions: Arc<Mutex<HashSet<String>>>,
|
||||
pending_extension_groups: Arc<Mutex<HashSet<String>>>,
|
||||
pending_tombstones: Arc<Mutex<Vec<(String, String)>>>,
|
||||
running_profiles: Arc<Mutex<HashSet<String>>>,
|
||||
in_flight_profiles: Arc<Mutex<HashSet<String>>>,
|
||||
@@ -54,6 +56,8 @@ impl SyncScheduler {
|
||||
pending_proxies: Arc::new(Mutex::new(HashSet::new())),
|
||||
pending_groups: Arc::new(Mutex::new(HashSet::new())),
|
||||
pending_vpns: Arc::new(Mutex::new(HashSet::new())),
|
||||
pending_extensions: Arc::new(Mutex::new(HashSet::new())),
|
||||
pending_extension_groups: Arc::new(Mutex::new(HashSet::new())),
|
||||
pending_tombstones: Arc::new(Mutex::new(Vec::new())),
|
||||
running_profiles: Arc::new(Mutex::new(HashSet::new())),
|
||||
in_flight_profiles: Arc::new(Mutex::new(HashSet::new())),
|
||||
@@ -100,6 +104,18 @@ impl SyncScheduler {
|
||||
}
|
||||
drop(pending_vpns);
|
||||
|
||||
let pending_extensions = self.pending_extensions.lock().await;
|
||||
if !pending_extensions.is_empty() {
|
||||
return true;
|
||||
}
|
||||
drop(pending_extensions);
|
||||
|
||||
let pending_extension_groups = self.pending_extension_groups.lock().await;
|
||||
if !pending_extension_groups.is_empty() {
|
||||
return true;
|
||||
}
|
||||
drop(pending_extension_groups);
|
||||
|
||||
let pending_tombstones = self.pending_tombstones.lock().await;
|
||||
if !pending_tombstones.is_empty() {
|
||||
return true;
|
||||
@@ -208,6 +224,16 @@ impl SyncScheduler {
|
||||
pending.insert(group_id);
|
||||
}
|
||||
|
||||
pub async fn queue_extension_sync(&self, extension_id: String) {
|
||||
let mut pending = self.pending_extensions.lock().await;
|
||||
pending.insert(extension_id);
|
||||
}
|
||||
|
||||
pub async fn queue_extension_group_sync(&self, extension_group_id: String) {
|
||||
let mut pending = self.pending_extension_groups.lock().await;
|
||||
pending.insert(extension_group_id);
|
||||
}
|
||||
|
||||
pub async fn queue_tombstone(&self, entity_type: String, entity_id: String) {
|
||||
let mut pending = self.pending_tombstones.lock().await;
|
||||
if !pending
|
||||
@@ -234,7 +260,7 @@ impl SyncScheduler {
|
||||
|
||||
let sync_enabled_profiles: Vec<_> = profiles
|
||||
.into_iter()
|
||||
.filter(|p| p.is_sync_enabled())
|
||||
.filter(|p| p.is_sync_enabled() && !p.is_cross_os())
|
||||
.collect();
|
||||
|
||||
if sync_enabled_profiles.is_empty() {
|
||||
@@ -286,6 +312,8 @@ impl SyncScheduler {
|
||||
SyncWorkItem::Proxy(id) => scheduler.queue_proxy_sync(id).await,
|
||||
SyncWorkItem::Group(id) => scheduler.queue_group_sync(id).await,
|
||||
SyncWorkItem::Vpn(id) => scheduler.queue_vpn_sync(id).await,
|
||||
SyncWorkItem::Extension(id) => scheduler.queue_extension_sync(id).await,
|
||||
SyncWorkItem::ExtensionGroup(id) => scheduler.queue_extension_group_sync(id).await,
|
||||
SyncWorkItem::Tombstone(entity_type, entity_id) => {
|
||||
scheduler.queue_tombstone(entity_type, entity_id).await
|
||||
}
|
||||
@@ -306,6 +334,8 @@ impl SyncScheduler {
|
||||
self.process_pending_proxies(app_handle).await;
|
||||
self.process_pending_groups(app_handle).await;
|
||||
self.process_pending_vpns(app_handle).await;
|
||||
self.process_pending_extensions(app_handle).await;
|
||||
self.process_pending_extension_groups(app_handle).await;
|
||||
self.process_pending_tombstones(app_handle).await;
|
||||
}
|
||||
|
||||
@@ -356,7 +386,7 @@ impl SyncScheduler {
|
||||
profile_manager.list_profiles().ok().and_then(|profiles| {
|
||||
profiles
|
||||
.into_iter()
|
||||
.find(|p| p.id.to_string() == profile_id && p.is_sync_enabled())
|
||||
.find(|p| p.id.to_string() == profile_id && p.is_sync_enabled() && !p.is_cross_os())
|
||||
})
|
||||
};
|
||||
|
||||
@@ -385,6 +415,8 @@ impl SyncScheduler {
|
||||
&& self.pending_proxies.lock().await.is_empty()
|
||||
&& self.pending_groups.lock().await.is_empty()
|
||||
&& self.pending_vpns.lock().await.is_empty()
|
||||
&& self.pending_extensions.lock().await.is_empty()
|
||||
&& self.pending_extension_groups.lock().await.is_empty()
|
||||
};
|
||||
|
||||
match result {
|
||||
@@ -618,6 +650,82 @@ impl SyncScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_pending_extensions(&self, app_handle: &tauri::AppHandle) {
|
||||
let extensions_to_sync: Vec<String> = {
|
||||
let mut pending = self.pending_extensions.lock().await;
|
||||
let list: Vec<String> = pending.drain().collect();
|
||||
list
|
||||
};
|
||||
|
||||
if extensions_to_sync.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
match SyncEngine::create_from_settings(app_handle).await {
|
||||
Ok(engine) => {
|
||||
for ext_id in extensions_to_sync {
|
||||
log::info!("Syncing extension {}", ext_id);
|
||||
if let Err(e) = engine
|
||||
.sync_extension_by_id_with_handle(&ext_id, app_handle)
|
||||
.await
|
||||
{
|
||||
log::error!("Failed to sync extension {}: {}", ext_id, e);
|
||||
}
|
||||
}
|
||||
|
||||
if !self.is_sync_in_progress().await {
|
||||
log::debug!("All syncs completed after extension sync, triggering cleanup");
|
||||
let registry =
|
||||
crate::downloaded_browsers_registry::DownloadedBrowsersRegistry::instance();
|
||||
if let Err(e) = registry.cleanup_unused_binaries() {
|
||||
log::warn!("Cleanup after sync failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to create sync engine: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_pending_extension_groups(&self, app_handle: &tauri::AppHandle) {
|
||||
let groups_to_sync: Vec<String> = {
|
||||
let mut pending = self.pending_extension_groups.lock().await;
|
||||
let list: Vec<String> = pending.drain().collect();
|
||||
list
|
||||
};
|
||||
|
||||
if groups_to_sync.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
match SyncEngine::create_from_settings(app_handle).await {
|
||||
Ok(engine) => {
|
||||
for group_id in groups_to_sync {
|
||||
log::info!("Syncing extension group {}", group_id);
|
||||
if let Err(e) = engine
|
||||
.sync_extension_group_by_id_with_handle(&group_id, app_handle)
|
||||
.await
|
||||
{
|
||||
log::error!("Failed to sync extension group {}: {}", group_id, e);
|
||||
}
|
||||
}
|
||||
|
||||
if !self.is_sync_in_progress().await {
|
||||
log::debug!("All syncs completed after extension group sync, triggering cleanup");
|
||||
let registry =
|
||||
crate::downloaded_browsers_registry::DownloadedBrowsersRegistry::instance();
|
||||
if let Err(e) = registry.cleanup_unused_binaries() {
|
||||
log::warn!("Cleanup after sync failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to create sync engine: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_pending_tombstones(&self, _app_handle: &tauri::AppHandle) {
|
||||
let tombstones: Vec<(String, String)> = {
|
||||
let mut pending = self.pending_tombstones.lock().await;
|
||||
@@ -695,6 +803,32 @@ impl SyncScheduler {
|
||||
}
|
||||
}
|
||||
}
|
||||
"extension" => {
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
if let Ok(ext) = manager.get_extension(&entity_id) {
|
||||
if ext.sync_enabled {
|
||||
log::info!(
|
||||
"Extension {} was deleted remotely, deleting locally",
|
||||
entity_id
|
||||
);
|
||||
let _ = manager.delete_extension_internal(&entity_id);
|
||||
let _ = events::emit("extensions-changed", ());
|
||||
}
|
||||
}
|
||||
}
|
||||
"extension_group" => {
|
||||
let manager = crate::extension_manager::EXTENSION_MANAGER.lock().unwrap();
|
||||
if let Ok(group) = manager.get_group(&entity_id) {
|
||||
if group.sync_enabled {
|
||||
log::info!(
|
||||
"Extension group {} was deleted remotely, deleting locally",
|
||||
entity_id
|
||||
);
|
||||
let _ = manager.delete_group_internal(&entity_id);
|
||||
let _ = events::emit("extensions-changed", ());
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,8 @@ pub enum SyncWorkItem {
|
||||
Proxy(String),
|
||||
Group(String),
|
||||
Vpn(String),
|
||||
Extension(String),
|
||||
ExtensionGroup(String),
|
||||
Tombstone(String, String),
|
||||
}
|
||||
|
||||
@@ -235,6 +237,16 @@ impl SyncSubscription {
|
||||
.strip_prefix("vpns/")
|
||||
.and_then(|s| s.strip_suffix(".json"))
|
||||
.map(|s| SyncWorkItem::Vpn(s.to_string()))
|
||||
} else if key.starts_with("extensions/") {
|
||||
key
|
||||
.strip_prefix("extensions/")
|
||||
.and_then(|s| s.strip_suffix(".json"))
|
||||
.map(|s| SyncWorkItem::Extension(s.to_string()))
|
||||
} else if key.starts_with("extension_groups/") {
|
||||
key
|
||||
.strip_prefix("extension_groups/")
|
||||
.and_then(|s| s.strip_suffix(".json"))
|
||||
.map(|s| SyncWorkItem::ExtensionGroup(s.to_string()))
|
||||
} else if key.starts_with("tombstones/") {
|
||||
key.strip_prefix("tombstones/").and_then(|rest| {
|
||||
if rest.starts_with("profiles/") {
|
||||
@@ -257,6 +269,16 @@ impl SyncSubscription {
|
||||
.strip_prefix("vpns/")
|
||||
.and_then(|s| s.strip_suffix(".json"))
|
||||
.map(|id| SyncWorkItem::Tombstone("vpn".to_string(), id.to_string()))
|
||||
} else if rest.starts_with("extensions/") {
|
||||
rest
|
||||
.strip_prefix("extensions/")
|
||||
.and_then(|s| s.strip_suffix(".json"))
|
||||
.map(|id| SyncWorkItem::Tombstone("extension".to_string(), id.to_string()))
|
||||
} else if rest.starts_with("extension_groups/") {
|
||||
rest
|
||||
.strip_prefix("extension_groups/")
|
||||
.and_then(|s| s.strip_suffix(".json"))
|
||||
.map(|id| SyncWorkItem::Tombstone("extension_group".to_string(), id.to_string()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user