feat: synchronizer

This commit is contained in:
zhom
2026-03-15 18:00:04 +04:00
parent e72874142b
commit 5bea6a32e0
38 changed files with 3943 additions and 957 deletions
+30 -1
View File
@@ -127,6 +127,14 @@ impl SyncClient {
}
pub async fn list(&self, prefix: &str) -> SyncResult<ListResponse> {
self.list_page(prefix, None).await
}
async fn list_page(
&self,
prefix: &str,
continuation_token: Option<String>,
) -> SyncResult<ListResponse> {
let response = self
.client
.post(self.url("list"))
@@ -134,7 +142,7 @@ impl SyncClient {
.json(&ListRequest {
prefix: prefix.to_string(),
max_keys: Some(1000),
continuation_token: None,
continuation_token,
})
.send()
.await
@@ -152,6 +160,27 @@ impl SyncClient {
.map_err(|e| SyncError::SerializationError(e.to_string()))
}
/// List all objects under a prefix, paginating through all results
pub async fn list_all(&self, prefix: &str) -> SyncResult<Vec<ListObject>> {
let mut all_objects = Vec::new();
let mut continuation_token: Option<String> = None;
loop {
let response = self.list_page(prefix, continuation_token).await?;
all_objects.extend(response.objects);
if !response.is_truncated {
break;
}
continuation_token = response.next_continuation_token;
if continuation_token.is_none() {
break;
}
}
Ok(all_objects)
}
pub async fn upload_bytes(
&self,
presigned_url: &str,
+224 -8
View File
@@ -9,7 +9,7 @@ use crate::settings_manager::SettingsManager;
use chrono::{DateTime, Utc};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
@@ -49,6 +49,70 @@ fn is_critical_file(path: &str) -> bool {
.any(|pattern| path.contains(pattern))
}
/// Checkpoint all SQLite WAL files in a profile directory.
///
/// When a browser crashes or is killed, SQLite WAL files may contain
/// uncommitted data (e.g. cookies, login data). Since WAL files are
/// excluded from sync, we must checkpoint them into the main database
/// files before generating the manifest to avoid data loss.
fn checkpoint_sqlite_wal_files(profile_dir: &Path) {
fn find_wal_files(dir: &Path, wal_files: &mut Vec<PathBuf>) {
let Ok(entries) = fs::read_dir(dir) else {
return;
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
find_wal_files(&path, wal_files);
} else if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.ends_with("-wal") {
wal_files.push(path);
}
}
}
}
let mut wal_files = Vec::new();
find_wal_files(profile_dir, &mut wal_files);
for wal_path in &wal_files {
// Only checkpoint non-empty WAL files
let is_non_empty = fs::metadata(wal_path).map(|m| m.len() > 0).unwrap_or(false);
if !is_non_empty {
continue;
}
// Derive the main database path by stripping the "-wal" suffix
let db_path_str = wal_path.to_string_lossy();
let db_path = PathBuf::from(db_path_str.strip_suffix("-wal").unwrap());
if !db_path.exists() {
continue;
}
match rusqlite::Connection::open(&db_path) {
Ok(conn) => match conn.pragma_update(None, "wal_checkpoint", "TRUNCATE") {
Ok(_) => {
log::info!(
"Checkpointed WAL for: {}",
db_path.file_name().unwrap_or_default().to_string_lossy()
);
}
Err(e) => {
log::warn!("Failed to checkpoint WAL for {}: {}", db_path.display(), e);
}
},
Err(e) => {
log::warn!(
"Failed to open DB for WAL checkpoint {}: {}",
db_path.display(),
e
);
}
}
}
}
/// Resume state persisted to disk so interrupted syncs can continue
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct SyncResumeState {
@@ -362,6 +426,10 @@ impl SyncEngine {
))
})?;
// Checkpoint any SQLite WAL files to ensure all data is in the main DB
// before we generate the manifest (WAL files are excluded from sync)
checkpoint_sqlite_wal_files(&profile_dir);
// Load or create hash cache
let cache_path = get_cache_path(&profile_dir);
let mut hash_cache = HashCache::load(&cache_path);
@@ -488,9 +556,22 @@ impl SyncEngine {
.upload_profile_metadata(&profile_id, profile, &key_prefix)
.await?;
// If we recovered from an empty local state (downloaded everything from remote),
// regenerate the manifest from the actual files now on disk so we don't
// overwrite the remote manifest with an empty one.
let final_manifest = if local_manifest.files.is_empty() && !diff.files_to_download.is_empty() {
let mut new_cache = HashCache::load(&cache_path);
let mut regenerated = generate_manifest(&profile_id, &profile_dir, &mut new_cache)?;
new_cache.save(&cache_path)?;
regenerated.encrypted = encryption_key.is_some();
regenerated
} else {
let mut m = local_manifest;
m.encrypted = encryption_key.is_some();
m
};
// Upload manifest.json last for atomicity
let mut final_manifest = local_manifest;
final_manifest.encrypted = encryption_key.is_some();
self
.upload_manifest(&profile_id, &final_manifest, &key_prefix)
.await?;
@@ -2165,14 +2246,14 @@ impl SyncEngine {
) -> SyncResult<Vec<String>> {
log::info!("Checking for missing synced profiles...");
// List personal profiles from S3
let list_response = self.client.list("profiles/").await?;
// List all personal profiles from S3 (paginated)
let all_objects = self.client.list_all("profiles/").await?;
let mut downloaded: Vec<String> = Vec::new();
// Extract unique profile IDs with their key prefix
let mut profiles_to_check: HashMap<String, String> = HashMap::new();
for obj in list_response.objects {
for obj in all_objects {
if obj.key.starts_with("profiles/") && obj.key.ends_with("/manifest.json") {
if let Some(profile_id) = obj
.key
@@ -2189,8 +2270,8 @@ impl SyncEngine {
if let Some(team_id) = &auth.user.team_id {
let team_prefix = format!("teams/{}/", team_id);
let team_list_key = format!("{}profiles/", team_prefix);
if let Ok(team_list) = self.client.list(&team_list_key).await {
for obj in team_list.objects {
if let Ok(team_objects) = self.client.list_all(&team_list_key).await {
for obj in team_objects {
if obj.key.starts_with("profiles/") && obj.key.ends_with("/manifest.json") {
if let Some(profile_id) = obj
.key
@@ -3341,3 +3422,138 @@ pub async fn set_extension_group_sync_enabled(
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_checkpoint_sqlite_wal_files() {
let temp_dir = tempfile::TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.db");
// Create a SQLite database in WAL mode and insert data.
// Use std::mem::forget to prevent the connection destructor from running,
// which simulates a browser crash where WAL is not checkpointed.
{
let conn = rusqlite::Connection::open(&db_path).unwrap();
conn.pragma_update(None, "journal_mode", "WAL").unwrap();
conn.pragma_update(None, "wal_autocheckpoint", "0").unwrap();
conn
.execute(
"CREATE TABLE cookies (id INTEGER PRIMARY KEY, value TEXT)",
[],
)
.unwrap();
conn
.execute(
"INSERT INTO cookies (value) VALUES ('session_token_123')",
[],
)
.unwrap();
// Leak the connection to prevent auto-checkpoint on drop
std::mem::forget(conn);
}
// Verify WAL file exists and has data
let wal_path = temp_dir.path().join("test.db-wal");
assert!(wal_path.exists(), "WAL file should exist");
let wal_size = fs::metadata(&wal_path).unwrap().len();
assert!(wal_size > 0, "WAL file should be non-empty");
// Run checkpoint
checkpoint_sqlite_wal_files(temp_dir.path());
// After checkpoint, WAL should be truncated (empty)
let wal_size_after = fs::metadata(&wal_path).map(|m| m.len()).unwrap_or(0);
assert_eq!(
wal_size_after, 0,
"WAL should be truncated after checkpoint"
);
// Verify data is still accessible from the main database
let conn = rusqlite::Connection::open(&db_path).unwrap();
let value: String = conn
.query_row("SELECT value FROM cookies WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(value, "session_token_123");
}
#[test]
fn test_checkpoint_handles_missing_db() {
let temp_dir = tempfile::TempDir::new().unwrap();
// Create a WAL file without a corresponding database
let wal_path = temp_dir.path().join("missing.db-wal");
fs::write(&wal_path, b"fake wal data").unwrap();
// Should not panic
checkpoint_sqlite_wal_files(temp_dir.path());
}
#[test]
fn test_checkpoint_skips_empty_wal() {
let temp_dir = tempfile::TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.db");
// Create a database and checkpoint immediately (WAL is empty)
{
let conn = rusqlite::Connection::open(&db_path).unwrap();
conn.pragma_update(None, "journal_mode", "WAL").unwrap();
conn
.execute("CREATE TABLE t (id INTEGER PRIMARY KEY)", [])
.unwrap();
}
// Create an empty WAL file
let wal_path = temp_dir.path().join("test.db-wal");
fs::write(&wal_path, b"").unwrap();
// Should skip empty WAL without error
checkpoint_sqlite_wal_files(temp_dir.path());
}
#[test]
fn test_checkpoint_nested_directories() {
let temp_dir = tempfile::TempDir::new().unwrap();
let nested_dir = temp_dir.path().join("profile").join("Default");
fs::create_dir_all(&nested_dir).unwrap();
let db_path = nested_dir.join("Cookies");
// Create a database with WAL data, leak connection to simulate crash
{
let conn = rusqlite::Connection::open(&db_path).unwrap();
conn.pragma_update(None, "journal_mode", "WAL").unwrap();
conn.pragma_update(None, "wal_autocheckpoint", "0").unwrap();
conn
.execute(
"CREATE TABLE cookies (host_key TEXT, name TEXT, value TEXT)",
[],
)
.unwrap();
conn
.execute(
"INSERT INTO cookies VALUES ('.example.com', 'session', 'abc')",
[],
)
.unwrap();
std::mem::forget(conn);
}
let wal_path = nested_dir.join("Cookies-wal");
assert!(wal_path.exists());
// Checkpoint from the top-level directory
checkpoint_sqlite_wal_files(temp_dir.path());
// Verify data is in the main database
let conn = rusqlite::Connection::open(&db_path).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM cookies", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 1);
}
}
+59
View File
@@ -408,6 +408,19 @@ pub fn compute_diff(local: &SyncManifest, remote: Option<&SyncManifest>) -> Mani
let remote_files: HashMap<&str, &ManifestFileEntry> =
remote.files.iter().map(|f| (f.path.as_str(), f)).collect();
// Safety: if local is empty but remote has files, always download from remote.
// This prevents data loss when profile data files are deleted but metadata
// survives — the newly generated manifest would have updated_at=NOW, which
// would appear "newer" and cause all remote files to be deleted.
if local.files.is_empty() && !remote.files.is_empty() {
log::info!(
"Local manifest is empty but remote has {} files — downloading from remote to recover",
remote.files.len()
);
diff.files_to_download = remote.files.clone();
return diff;
}
// Compare timestamps to determine direction
let local_updated = local.updated_at_datetime();
let remote_updated = remote.updated_at_datetime();
@@ -738,4 +751,50 @@ mod tests {
let deserialized: SyncManifest = serde_json::from_str(&serialized).unwrap();
assert!(deserialized.encrypted);
}
#[test]
fn test_compute_diff_empty_local_downloads_from_remote() {
// When local has no files but remote does, always download from remote.
// This prevents data loss when profile data is deleted but metadata survives.
let local = SyncManifest {
version: 1,
profile_id: "test".to_string(),
generated_at: Utc::now().to_rfc3339(),
updated_at: Utc::now().to_rfc3339(), // NOW — appears newer than remote
exclude_globs: vec![],
files: vec![],
encrypted: false,
};
let remote = SyncManifest {
version: 1,
profile_id: "test".to_string(),
generated_at: "2024-01-01T00:00:00Z".to_string(),
updated_at: "2024-01-01T00:00:00Z".to_string(),
exclude_globs: vec![],
files: vec![
ManifestFileEntry {
path: "Cookies".to_string(),
size: 100,
mtime: 1000,
hash: "abc".to_string(),
},
ManifestFileEntry {
path: "Local State".to_string(),
size: 200,
mtime: 1000,
hash: "def".to_string(),
},
],
encrypted: false,
};
let diff = compute_diff(&local, Some(&remote));
// Must download all remote files, NOT delete them
assert_eq!(diff.files_to_download.len(), 2);
assert!(diff.files_to_upload.is_empty());
assert!(diff.files_to_delete_remote.is_empty());
assert!(diff.files_to_delete_local.is_empty());
}
}
+101 -86
View File
@@ -396,97 +396,112 @@ impl SyncScheduler {
ready
};
// Mark all profiles as in-flight and filter out duplicates
let mut to_sync = Vec::new();
for profile_id in profiles_to_sync {
// Mark as in-flight to prevent duplicate syncs
{
let mut in_flight = self.in_flight_profiles.lock().await;
if in_flight.contains(&profile_id) {
log::debug!("Profile {} already in-flight, skipping", profile_id);
continue;
}
in_flight.insert(profile_id.clone());
}
log::info!("Executing queued sync for profile {}", profile_id);
let _ = events::emit(
"profile-sync-status",
serde_json::json!({
"profile_id": profile_id,
"status": "syncing"
}),
);
let profile_to_sync = {
let profile_manager = ProfileManager::instance();
profile_manager.list_profiles().ok().and_then(|profiles| {
profiles
.into_iter()
.find(|p| p.id.to_string() == profile_id && p.is_sync_enabled() && !p.is_cross_os())
})
};
let Some(profile) = profile_to_sync else {
// Remove from in-flight
let mut in_flight = self.in_flight_profiles.lock().await;
in_flight.remove(&profile_id);
let mut in_flight = self.in_flight_profiles.lock().await;
if in_flight.contains(&profile_id) {
log::debug!("Profile {} already in-flight, skipping", profile_id);
continue;
};
let result = match SyncEngine::create_from_settings(app_handle).await {
Ok(engine) => engine.sync_profile(app_handle, &profile).await,
Err(e) => {
log::error!("Failed to create sync engine: {}", e);
Err(super::types::SyncError::NotConfigured)
}
};
// Remove from in-flight and check if sync just completed
let sync_just_completed = {
let mut in_flight = self.in_flight_profiles.lock().await;
in_flight.remove(&profile_id);
// If this was the last in-flight profile and there are no pending profiles, sync just completed
in_flight.is_empty()
&& self.pending_profiles.lock().await.is_empty()
&& self.pending_proxies.lock().await.is_empty()
&& self.pending_groups.lock().await.is_empty()
&& self.pending_vpns.lock().await.is_empty()
&& self.pending_extensions.lock().await.is_empty()
&& self.pending_extension_groups.lock().await.is_empty()
};
match result {
Ok(()) => {
log::info!("Profile {} synced successfully", profile_id);
let _ = events::emit(
"profile-sync-status",
serde_json::json!({
"profile_id": profile_id,
"status": "synced"
}),
);
}
Err(e) => {
log::error!("Failed to sync profile {}: {}", profile_id, e);
let _ = events::emit(
"profile-sync-status",
serde_json::json!({
"profile_id": profile_id,
"status": "error",
"error": e.to_string()
}),
);
}
}
in_flight.insert(profile_id.clone());
to_sync.push(profile_id);
}
// Trigger cleanup after sync completes if this was the last profile
if sync_just_completed {
log::debug!("All profile syncs completed, triggering cleanup");
let registry = crate::downloaded_browsers_registry::DownloadedBrowsersRegistry::instance();
if let Err(e) = registry.cleanup_unused_binaries() {
log::warn!("Cleanup after sync failed: {e}");
} else {
log::debug!("Cleanup after sync completed successfully");
// Sync all profiles in parallel
let mut sync_set = tokio::task::JoinSet::new();
for profile_id in to_sync {
let app = app_handle.clone();
let in_flight = self.in_flight_profiles.clone();
sync_set.spawn(async move {
log::info!("Executing queued sync for profile {}", profile_id);
let _ = events::emit(
"profile-sync-status",
serde_json::json!({
"profile_id": profile_id,
"status": "syncing"
}),
);
let profile_to_sync = {
let profile_manager = ProfileManager::instance();
profile_manager.list_profiles().ok().and_then(|profiles| {
profiles
.into_iter()
.find(|p| p.id.to_string() == profile_id && p.is_sync_enabled() && !p.is_cross_os())
})
};
let Some(profile) = profile_to_sync else {
let mut inf = in_flight.lock().await;
inf.remove(&profile_id);
return;
};
let result = match SyncEngine::create_from_settings(&app).await {
Ok(engine) => engine.sync_profile(&app, &profile).await,
Err(e) => {
log::error!("Failed to create sync engine: {}", e);
Err(super::types::SyncError::NotConfigured)
}
};
{
let mut inf = in_flight.lock().await;
inf.remove(&profile_id);
}
match result {
Ok(()) => {
log::info!("Profile {} synced successfully", profile_id);
let _ = events::emit(
"profile-sync-status",
serde_json::json!({
"profile_id": profile_id,
"status": "synced"
}),
);
}
Err(e) => {
log::error!("Failed to sync profile {}: {}", profile_id, e);
let _ = events::emit(
"profile-sync-status",
serde_json::json!({
"profile_id": profile_id,
"status": "error",
"error": e.to_string()
}),
);
}
}
});
}
// Wait for all parallel syncs to finish
while let Some(result) = sync_set.join_next().await {
if let Err(e) = result {
log::error!("Profile sync task panicked: {e}");
}
}
// Trigger cleanup if everything is done
let all_done = {
let in_flight = self.in_flight_profiles.lock().await;
in_flight.is_empty()
&& self.pending_profiles.lock().await.is_empty()
&& self.pending_proxies.lock().await.is_empty()
&& self.pending_groups.lock().await.is_empty()
&& self.pending_vpns.lock().await.is_empty()
&& self.pending_extensions.lock().await.is_empty()
&& self.pending_extension_groups.lock().await.is_empty()
};
if all_done {
log::debug!("All profile syncs completed, triggering cleanup");
let registry = crate::downloaded_browsers_registry::DownloadedBrowsersRegistry::instance();
if let Err(e) = registry.cleanup_unused_binaries() {
log::warn!("Cleanup after sync failed: {e}");
} else {
log::debug!("Cleanup after sync completed successfully");
}
}
}