mirror of
https://github.com/zhom/donutbrowser.git
synced 2026-06-08 07:53:57 +02:00
refactor: cleanup
This commit is contained in:
+609
-70
@@ -7,11 +7,188 @@ use crate::profile::types::{BrowserProfile, SyncMode};
|
||||
use crate::profile::ProfileManager;
|
||||
use crate::settings_manager::SettingsManager;
|
||||
use chrono::{DateTime, Utc};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Semaphore;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::{Mutex as TokioMutex, Semaphore};
|
||||
|
||||
/// Upload/download concurrency limit
|
||||
const SYNC_CONCURRENCY: usize = 32;
|
||||
|
||||
/// Max retries for individual file uploads/downloads
|
||||
const MAX_FILE_RETRIES: u32 = 3;
|
||||
|
||||
/// Critical file patterns — if any of these fail to upload/download, the sync is aborted.
|
||||
const CRITICAL_FILE_PATTERNS: &[&str] = &[
|
||||
"Cookies",
|
||||
"Login Data",
|
||||
"Local Storage",
|
||||
"Local State",
|
||||
"Preferences",
|
||||
"Secure Preferences",
|
||||
"Web Data",
|
||||
"Extension Cookies",
|
||||
// Firefox/Camoufox equivalents
|
||||
"cookies.sqlite",
|
||||
"key4.db",
|
||||
"logins.json",
|
||||
"cert9.db",
|
||||
"places.sqlite",
|
||||
"formhistory.sqlite",
|
||||
"permissions.sqlite",
|
||||
"prefs.js",
|
||||
"storage.sqlite",
|
||||
];
|
||||
|
||||
fn is_critical_file(path: &str) -> bool {
|
||||
CRITICAL_FILE_PATTERNS
|
||||
.iter()
|
||||
.any(|pattern| path.contains(pattern))
|
||||
}
|
||||
|
||||
/// Resume state persisted to disk so interrupted syncs can continue
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
struct SyncResumeState {
|
||||
profile_id: String,
|
||||
direction: String,
|
||||
started_at: String,
|
||||
completed_files: HashSet<String>,
|
||||
}
|
||||
|
||||
impl SyncResumeState {
|
||||
fn path(profile_dir: &Path) -> std::path::PathBuf {
|
||||
profile_dir.join(".donut-sync").join("resume-state.json")
|
||||
}
|
||||
|
||||
fn load(profile_dir: &Path) -> Option<Self> {
|
||||
let path = Self::path(profile_dir);
|
||||
let content = fs::read_to_string(&path).ok()?;
|
||||
let state: Self = serde_json::from_str(&content).ok()?;
|
||||
// Discard if older than 12 hours (presigned URLs expire in 1h but files may still be there)
|
||||
if let Ok(started) = DateTime::parse_from_rfc3339(&state.started_at) {
|
||||
let age = Utc::now() - started.with_timezone(&Utc);
|
||||
if age.num_hours() > 12 {
|
||||
let _ = fs::remove_file(&path);
|
||||
return None;
|
||||
}
|
||||
}
|
||||
Some(state)
|
||||
}
|
||||
|
||||
fn save(&self, profile_dir: &Path) -> SyncResult<()> {
|
||||
let path = Self::path(profile_dir);
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent)
|
||||
.map_err(|e| SyncError::IoError(format!("Failed to create resume state dir: {e}")))?;
|
||||
}
|
||||
let json = serde_json::to_string(self).map_err(|e| {
|
||||
SyncError::SerializationError(format!("Failed to serialize resume state: {e}"))
|
||||
})?;
|
||||
fs::write(&path, json)
|
||||
.map_err(|e| SyncError::IoError(format!("Failed to write resume state: {e}")))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete(profile_dir: &Path) {
|
||||
let path = Self::path(profile_dir);
|
||||
let _ = fs::remove_file(&path);
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks live sync progress and emits throttled events to the frontend
|
||||
struct SyncProgressTracker {
|
||||
profile_id: String,
|
||||
profile_name: String,
|
||||
phase: String,
|
||||
total_files: u64,
|
||||
total_bytes: u64,
|
||||
completed_files: AtomicU64,
|
||||
completed_bytes: AtomicU64,
|
||||
failed_count: AtomicU64,
|
||||
start_time: Instant,
|
||||
last_emit: TokioMutex<Instant>,
|
||||
}
|
||||
|
||||
impl SyncProgressTracker {
|
||||
fn new(
|
||||
profile_id: String,
|
||||
profile_name: String,
|
||||
phase: &str,
|
||||
total_files: u64,
|
||||
total_bytes: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
profile_id,
|
||||
profile_name,
|
||||
phase: phase.to_string(),
|
||||
total_files,
|
||||
total_bytes,
|
||||
completed_files: AtomicU64::new(0),
|
||||
completed_bytes: AtomicU64::new(0),
|
||||
failed_count: AtomicU64::new(0),
|
||||
start_time: Instant::now(),
|
||||
last_emit: TokioMutex::new(Instant::now() - std::time::Duration::from_secs(1)),
|
||||
}
|
||||
}
|
||||
|
||||
fn record_success(&self, bytes: u64) {
|
||||
self.completed_files.fetch_add(1, Ordering::Relaxed);
|
||||
self.completed_bytes.fetch_add(bytes, Ordering::Relaxed);
|
||||
self.maybe_emit();
|
||||
}
|
||||
|
||||
fn record_failure(&self) {
|
||||
self.completed_files.fetch_add(1, Ordering::Relaxed);
|
||||
self.failed_count.fetch_add(1, Ordering::Relaxed);
|
||||
self.maybe_emit();
|
||||
}
|
||||
|
||||
fn maybe_emit(&self) {
|
||||
let Ok(mut last) = self.last_emit.try_lock() else {
|
||||
return;
|
||||
};
|
||||
if last.elapsed().as_millis() < 250 {
|
||||
return;
|
||||
}
|
||||
*last = Instant::now();
|
||||
self.emit_progress();
|
||||
}
|
||||
|
||||
fn emit_final(&self) {
|
||||
self.emit_progress();
|
||||
}
|
||||
|
||||
fn emit_progress(&self) {
|
||||
let completed_bytes = self.completed_bytes.load(Ordering::Relaxed);
|
||||
let elapsed = self.start_time.elapsed().as_secs_f64().max(0.1);
|
||||
let speed = (completed_bytes as f64 / elapsed) as u64;
|
||||
let remaining_bytes = self.total_bytes.saturating_sub(completed_bytes);
|
||||
let eta = if speed > 0 {
|
||||
remaining_bytes / speed
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
let _ = events::emit(
|
||||
"profile-sync-progress",
|
||||
serde_json::json!({
|
||||
"profile_id": self.profile_id,
|
||||
"profile_name": self.profile_name,
|
||||
"phase": self.phase,
|
||||
"completed_files": self.completed_files.load(Ordering::Relaxed),
|
||||
"total_files": self.total_files,
|
||||
"completed_bytes": completed_bytes,
|
||||
"total_bytes": self.total_bytes,
|
||||
"speed_bytes_per_sec": speed,
|
||||
"eta_seconds": eta,
|
||||
"failed_count": self.failed_count.load(Ordering::Relaxed),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if sync is configured (cloud or self-hosted)
|
||||
pub fn is_sync_configured() -> bool {
|
||||
@@ -108,6 +285,29 @@ impl SyncEngine {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Skip if profile is currently running locally
|
||||
if profile.process_id.is_some() {
|
||||
log::info!(
|
||||
"Skipping sync for running profile: {} ({})",
|
||||
profile.name,
|
||||
profile.id
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Skip if profile is locked by another team member
|
||||
if crate::team_lock::TEAM_LOCK
|
||||
.is_locked_by_another(&profile.id.to_string())
|
||||
.await
|
||||
{
|
||||
log::info!(
|
||||
"Skipping sync for profile locked by another team member: {} ({})",
|
||||
profile.name,
|
||||
profile.id
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Derive encryption key if encrypted sync
|
||||
let encryption_key = if profile.is_encrypted_sync() {
|
||||
let password = encryption::load_e2e_password()
|
||||
@@ -149,6 +349,7 @@ impl SyncEngine {
|
||||
"profile-sync-status",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"status": "syncing"
|
||||
}),
|
||||
);
|
||||
@@ -202,6 +403,7 @@ impl SyncEngine {
|
||||
"profile-sync-status",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"status": "synced"
|
||||
}),
|
||||
);
|
||||
@@ -228,6 +430,7 @@ impl SyncEngine {
|
||||
"profile-sync-progress",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"phase": "started",
|
||||
"total_files": total_files,
|
||||
"total_bytes": upload_bytes + download_bytes
|
||||
@@ -240,6 +443,7 @@ impl SyncEngine {
|
||||
.upload_profile_files(
|
||||
app_handle,
|
||||
&profile_id,
|
||||
&profile.name,
|
||||
&profile_dir,
|
||||
&diff.files_to_upload,
|
||||
encryption_key.as_ref(),
|
||||
@@ -254,6 +458,7 @@ impl SyncEngine {
|
||||
.download_profile_files(
|
||||
app_handle,
|
||||
&profile_id,
|
||||
&profile.name,
|
||||
&profile_dir,
|
||||
&diff.files_to_download,
|
||||
encryption_key.as_ref(),
|
||||
@@ -290,6 +495,9 @@ impl SyncEngine {
|
||||
.upload_manifest(&profile_id, &final_manifest, &key_prefix)
|
||||
.await?;
|
||||
|
||||
// Sync completed successfully — clean up resume state
|
||||
SyncResumeState::delete(&profile_dir);
|
||||
|
||||
// Sync associated proxy, group, and VPN
|
||||
if let Some(proxy_id) = &profile.proxy_id {
|
||||
let _ = self.sync_proxy(proxy_id, Some(app_handle)).await;
|
||||
@@ -316,6 +524,7 @@ impl SyncEngine {
|
||||
"profile-sync-status",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"status": "synced"
|
||||
}),
|
||||
);
|
||||
@@ -389,10 +598,12 @@ impl SyncEngine {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn upload_profile_files(
|
||||
&self,
|
||||
_app_handle: &tauri::AppHandle,
|
||||
profile_id: &str,
|
||||
profile_name: &str,
|
||||
profile_dir: &Path,
|
||||
files: &[super::manifest::ManifestFileEntry],
|
||||
encryption_key: Option<&[u8; 32]>,
|
||||
@@ -402,10 +613,53 @@ impl SyncEngine {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
log::info!("Uploading {} files for profile {}", files.len(), profile_id);
|
||||
// Load resume state to skip already-uploaded files
|
||||
let mut resume_state = SyncResumeState::load(profile_dir)
|
||||
.filter(|s| s.profile_id == profile_id && s.direction == "upload");
|
||||
|
||||
let already_done: HashSet<String> = resume_state
|
||||
.as_ref()
|
||||
.map(|s| s.completed_files.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
let files_to_process: Vec<_> = files
|
||||
.iter()
|
||||
.filter(|f| !already_done.contains(&f.path))
|
||||
.collect();
|
||||
let skipped = files.len() - files_to_process.len();
|
||||
|
||||
if skipped > 0 {
|
||||
log::info!(
|
||||
"Resume: skipping {} already-uploaded files, processing {} remaining for profile {}",
|
||||
skipped,
|
||||
files_to_process.len(),
|
||||
profile_id
|
||||
);
|
||||
}
|
||||
|
||||
log::info!(
|
||||
"Uploading {} files for profile {}",
|
||||
files_to_process.len(),
|
||||
profile_id
|
||||
);
|
||||
|
||||
if files_to_process.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Initialize resume state if not resuming
|
||||
if resume_state.is_none() {
|
||||
resume_state = Some(SyncResumeState {
|
||||
profile_id: profile_id.to_string(),
|
||||
direction: "upload".to_string(),
|
||||
started_at: Utc::now().to_rfc3339(),
|
||||
completed_files: HashSet::new(),
|
||||
});
|
||||
}
|
||||
let resume_state = Arc::new(TokioMutex::new(resume_state.unwrap()));
|
||||
|
||||
// Get batch presigned URLs
|
||||
let items: Vec<(String, Option<String>)> = files
|
||||
let items: Vec<(String, Option<String>)> = files_to_process
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let key = format!("{}profiles/{}/files/{}", key_prefix, profile_id, f.path);
|
||||
@@ -425,28 +679,70 @@ impl SyncEngine {
|
||||
.map(|item| (item.key, item.url))
|
||||
.collect();
|
||||
|
||||
// Upload with bounded concurrency
|
||||
let semaphore = Arc::new(Semaphore::new(8));
|
||||
let total_bytes: u64 = files.iter().map(|f| f.size).sum();
|
||||
let already_bytes: u64 = files
|
||||
.iter()
|
||||
.filter(|f| already_done.contains(&f.path))
|
||||
.map(|f| f.size)
|
||||
.sum();
|
||||
|
||||
let tracker = Arc::new(SyncProgressTracker::new(
|
||||
profile_id.to_string(),
|
||||
profile_name.to_string(),
|
||||
"uploading",
|
||||
files.len() as u64,
|
||||
total_bytes,
|
||||
));
|
||||
// Pre-populate tracker with resumed progress
|
||||
tracker
|
||||
.completed_files
|
||||
.store(skipped as u64, Ordering::Relaxed);
|
||||
tracker
|
||||
.completed_bytes
|
||||
.store(already_bytes, Ordering::Relaxed);
|
||||
tracker.emit_final();
|
||||
|
||||
let semaphore = Arc::new(Semaphore::new(SYNC_CONCURRENCY));
|
||||
let client = self.client.clone();
|
||||
let profile_dir = profile_dir.to_path_buf();
|
||||
let profile_id = profile_id.to_string();
|
||||
let profile_id_owned = profile_id.to_string();
|
||||
let enc_key = encryption_key.copied();
|
||||
|
||||
let mut handles = Vec::new();
|
||||
type FileResult = Result<String, (String, String, bool)>;
|
||||
let mut handles: Vec<tokio::task::JoinHandle<FileResult>> = Vec::new();
|
||||
|
||||
for file in files {
|
||||
// Counter for batching resume state saves
|
||||
let save_counter = Arc::new(AtomicU64::new(0));
|
||||
|
||||
for file in &files_to_process {
|
||||
let sem = semaphore.clone();
|
||||
let file_path = profile_dir.join(&file.path);
|
||||
let remote_key = format!("{}profiles/{}/files/{}", key_prefix, profile_id, file.path);
|
||||
let relative_path = file.path.clone();
|
||||
let file_size = file.size;
|
||||
let remote_key = format!(
|
||||
"{}profiles/{}/files/{}",
|
||||
key_prefix, profile_id_owned, file.path
|
||||
);
|
||||
let url = url_map.get(&remote_key).cloned();
|
||||
let critical = is_critical_file(&file.path);
|
||||
|
||||
if url.is_none() {
|
||||
log::warn!("No presigned URL for {}", remote_key);
|
||||
if critical {
|
||||
return Err(SyncError::NetworkError(format!(
|
||||
"No presigned URL for critical file: {}",
|
||||
file.path
|
||||
)));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let url = url.unwrap();
|
||||
let client = client.clone();
|
||||
let tracker = tracker.clone();
|
||||
let resume_state = resume_state.clone();
|
||||
let save_counter = save_counter.clone();
|
||||
let profile_dir_clone = profile_dir.clone();
|
||||
let content_type = mime_guess::from_path(&file.path)
|
||||
.first()
|
||||
.map(|m| m.to_string());
|
||||
@@ -456,9 +752,16 @@ impl SyncEngine {
|
||||
|
||||
let data = match fs::read(&file_path) {
|
||||
Ok(d) => d,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound && !critical => {
|
||||
log::debug!("File disappeared, skipping: {}", file_path.display());
|
||||
tracker.record_success(0);
|
||||
return Ok(relative_path);
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Failed to read {}: {}", file_path.display(), e);
|
||||
return;
|
||||
let msg = format!("Failed to read {}: {}", file_path.display(), e);
|
||||
log::warn!("{}", msg);
|
||||
tracker.record_failure();
|
||||
return Err((relative_path, msg, critical));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -466,44 +769,113 @@ impl SyncEngine {
|
||||
match encryption::encrypt_bytes(key, &data) {
|
||||
Ok(encrypted) => encrypted,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to encrypt {}: {}", file_path.display(), e);
|
||||
return;
|
||||
let msg = format!("Failed to encrypt {}: {}", file_path.display(), e);
|
||||
log::warn!("{}", msg);
|
||||
tracker.record_failure();
|
||||
return Err((relative_path, msg, critical));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
data
|
||||
};
|
||||
|
||||
if let Err(e) = client
|
||||
.upload_bytes(&url, &upload_data, content_type.as_deref())
|
||||
.await
|
||||
{
|
||||
log::warn!("Failed to upload {}: {}", file_path.display(), e);
|
||||
// Retry loop for network uploads
|
||||
let mut last_err = String::new();
|
||||
for attempt in 0..MAX_FILE_RETRIES {
|
||||
match client
|
||||
.upload_bytes(&url, &upload_data, content_type.as_deref())
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
tracker.record_success(file_size);
|
||||
|
||||
// Record in resume state, save periodically
|
||||
{
|
||||
let mut state = resume_state.lock().await;
|
||||
state.completed_files.insert(relative_path.clone());
|
||||
let count = save_counter.fetch_add(1, Ordering::Relaxed);
|
||||
if count.is_multiple_of(50) {
|
||||
let _ = state.save(&profile_dir_clone);
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(relative_path);
|
||||
}
|
||||
Err(e) => {
|
||||
last_err = format!("{}", e);
|
||||
if attempt < MAX_FILE_RETRIES - 1 {
|
||||
log::debug!(
|
||||
"Retry {}/{} for {}: {}",
|
||||
attempt + 1,
|
||||
MAX_FILE_RETRIES,
|
||||
relative_path,
|
||||
last_err
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_millis(500 * (attempt as u64 + 1)))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let msg = format!(
|
||||
"Failed to upload {} after {} retries: {}",
|
||||
relative_path, MAX_FILE_RETRIES, last_err
|
||||
);
|
||||
log::warn!("{}", msg);
|
||||
tracker.record_failure();
|
||||
Err((relative_path, msg, critical))
|
||||
}));
|
||||
}
|
||||
|
||||
// Collect results
|
||||
let mut critical_failures = Vec::new();
|
||||
let mut non_critical_failures = Vec::new();
|
||||
|
||||
for handle in handles {
|
||||
let _ = handle.await;
|
||||
match handle.await {
|
||||
Ok(Ok(_)) => {}
|
||||
Ok(Err((path, msg, true))) => critical_failures.push((path, msg)),
|
||||
Ok(Err((path, msg, false))) => non_critical_failures.push((path, msg)),
|
||||
Err(e) => {
|
||||
log::warn!("Upload task panicked: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let _ = events::emit(
|
||||
"profile-sync-progress",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"phase": "upload",
|
||||
"done": files.len(),
|
||||
"total": files.len()
|
||||
}),
|
||||
);
|
||||
// Final resume state save
|
||||
{
|
||||
let state = resume_state.lock().await;
|
||||
let _ = state.save(&profile_dir);
|
||||
}
|
||||
|
||||
tracker.emit_final();
|
||||
|
||||
if !non_critical_failures.is_empty() {
|
||||
log::warn!(
|
||||
"Upload completed with {} non-critical failures for profile {}",
|
||||
non_critical_failures.len(),
|
||||
profile_id_owned
|
||||
);
|
||||
}
|
||||
|
||||
if !critical_failures.is_empty() {
|
||||
let file_list: Vec<&str> = critical_failures.iter().map(|(p, _)| p.as_str()).collect();
|
||||
return Err(SyncError::IoError(format!(
|
||||
"Critical files failed to upload: {}. Sync aborted to prevent data loss.",
|
||||
file_list.join(", ")
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn download_profile_files(
|
||||
&self,
|
||||
_app_handle: &tauri::AppHandle,
|
||||
profile_id: &str,
|
||||
profile_name: &str,
|
||||
profile_dir: &Path,
|
||||
files: &[super::manifest::ManifestFileEntry],
|
||||
encryption_key: Option<&[u8; 32]>,
|
||||
@@ -513,14 +885,53 @@ impl SyncEngine {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Load resume state to skip already-downloaded files
|
||||
let mut resume_state = SyncResumeState::load(profile_dir)
|
||||
.filter(|s| s.profile_id == profile_id && s.direction == "download");
|
||||
|
||||
let already_done: HashSet<String> = resume_state
|
||||
.as_ref()
|
||||
.map(|s| s.completed_files.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
let files_to_process: Vec<_> = files
|
||||
.iter()
|
||||
.filter(|f| !already_done.contains(&f.path))
|
||||
.collect();
|
||||
let skipped = files.len() - files_to_process.len();
|
||||
|
||||
if skipped > 0 {
|
||||
log::info!(
|
||||
"Resume: skipping {} already-downloaded files, processing {} remaining for profile {}",
|
||||
skipped,
|
||||
files_to_process.len(),
|
||||
profile_id
|
||||
);
|
||||
}
|
||||
|
||||
log::info!(
|
||||
"Downloading {} files for profile {}",
|
||||
files.len(),
|
||||
files_to_process.len(),
|
||||
profile_id
|
||||
);
|
||||
|
||||
if files_to_process.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Initialize resume state if not resuming
|
||||
if resume_state.is_none() {
|
||||
resume_state = Some(SyncResumeState {
|
||||
profile_id: profile_id.to_string(),
|
||||
direction: "download".to_string(),
|
||||
started_at: Utc::now().to_rfc3339(),
|
||||
completed_files: HashSet::new(),
|
||||
});
|
||||
}
|
||||
let resume_state = Arc::new(TokioMutex::new(resume_state.unwrap()));
|
||||
|
||||
// Get batch presigned URLs
|
||||
let keys: Vec<String> = files
|
||||
let keys: Vec<String> = files_to_process
|
||||
.iter()
|
||||
.map(|f| format!("{}profiles/{}/files/{}", key_prefix, profile_id, f.path))
|
||||
.collect();
|
||||
@@ -534,73 +945,178 @@ impl SyncEngine {
|
||||
.map(|item| (item.key, item.url))
|
||||
.collect();
|
||||
|
||||
// Download with bounded concurrency
|
||||
let semaphore = Arc::new(Semaphore::new(8));
|
||||
let total_bytes: u64 = files.iter().map(|f| f.size).sum();
|
||||
let already_bytes: u64 = files
|
||||
.iter()
|
||||
.filter(|f| already_done.contains(&f.path))
|
||||
.map(|f| f.size)
|
||||
.sum();
|
||||
|
||||
let tracker = Arc::new(SyncProgressTracker::new(
|
||||
profile_id.to_string(),
|
||||
profile_name.to_string(),
|
||||
"downloading",
|
||||
files.len() as u64,
|
||||
total_bytes,
|
||||
));
|
||||
tracker
|
||||
.completed_files
|
||||
.store(skipped as u64, Ordering::Relaxed);
|
||||
tracker
|
||||
.completed_bytes
|
||||
.store(already_bytes, Ordering::Relaxed);
|
||||
tracker.emit_final();
|
||||
|
||||
let semaphore = Arc::new(Semaphore::new(SYNC_CONCURRENCY));
|
||||
let client = self.client.clone();
|
||||
let profile_dir = profile_dir.to_path_buf();
|
||||
let profile_id = profile_id.to_string();
|
||||
let profile_id_owned = profile_id.to_string();
|
||||
let enc_key = encryption_key.copied();
|
||||
|
||||
let mut handles = Vec::new();
|
||||
type FileResult = Result<String, (String, String, bool)>;
|
||||
let mut handles: Vec<tokio::task::JoinHandle<FileResult>> = Vec::new();
|
||||
|
||||
for file in files {
|
||||
let save_counter = Arc::new(AtomicU64::new(0));
|
||||
|
||||
for file in &files_to_process {
|
||||
let sem = semaphore.clone();
|
||||
let file_path = profile_dir.join(&file.path);
|
||||
let remote_key = format!("{}profiles/{}/files/{}", key_prefix, profile_id, file.path);
|
||||
let relative_path = file.path.clone();
|
||||
let file_size = file.size;
|
||||
let remote_key = format!(
|
||||
"{}profiles/{}/files/{}",
|
||||
key_prefix, profile_id_owned, file.path
|
||||
);
|
||||
let url = url_map.get(&remote_key).cloned();
|
||||
let critical = is_critical_file(&file.path);
|
||||
|
||||
if url.is_none() {
|
||||
log::warn!("No presigned URL for {}", remote_key);
|
||||
if critical {
|
||||
return Err(SyncError::NetworkError(format!(
|
||||
"No presigned URL for critical file: {}",
|
||||
file.path
|
||||
)));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let url = url.unwrap();
|
||||
let client = client.clone();
|
||||
let tracker = tracker.clone();
|
||||
let resume_state = resume_state.clone();
|
||||
let save_counter = save_counter.clone();
|
||||
let profile_dir_clone = profile_dir.clone();
|
||||
|
||||
handles.push(tokio::spawn(async move {
|
||||
let _permit = sem.acquire().await.unwrap();
|
||||
|
||||
match client.download_bytes(&url).await {
|
||||
Ok(data) => {
|
||||
let write_data = if let Some(ref key) = enc_key {
|
||||
match encryption::decrypt_bytes(key, &data) {
|
||||
Ok(decrypted) => decrypted,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to decrypt {}, skipping: {}", remote_key, e);
|
||||
return;
|
||||
// Retry loop for network downloads
|
||||
let mut last_err = String::new();
|
||||
for attempt in 0..MAX_FILE_RETRIES {
|
||||
match client.download_bytes(&url).await {
|
||||
Ok(data) => {
|
||||
let write_data = if let Some(ref key) = enc_key {
|
||||
match encryption::decrypt_bytes(key, &data) {
|
||||
Ok(decrypted) => decrypted,
|
||||
Err(e) => {
|
||||
let msg = format!("Failed to decrypt {}: {}", relative_path, e);
|
||||
log::warn!("{}", msg);
|
||||
tracker.record_failure();
|
||||
return Err((relative_path, msg, critical));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
data
|
||||
};
|
||||
|
||||
if let Some(parent) = file_path.parent() {
|
||||
let _ = fs::create_dir_all(parent);
|
||||
}
|
||||
if let Err(e) = fs::write(&file_path, &write_data) {
|
||||
let msg = format!("Failed to write {}: {}", file_path.display(), e);
|
||||
log::warn!("{}", msg);
|
||||
tracker.record_failure();
|
||||
return Err((relative_path, msg, critical));
|
||||
}
|
||||
|
||||
tracker.record_success(file_size);
|
||||
|
||||
{
|
||||
let mut state = resume_state.lock().await;
|
||||
state.completed_files.insert(relative_path.clone());
|
||||
let count = save_counter.fetch_add(1, Ordering::Relaxed);
|
||||
if count.is_multiple_of(50) {
|
||||
let _ = state.save(&profile_dir_clone);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
data
|
||||
};
|
||||
|
||||
if let Some(parent) = file_path.parent() {
|
||||
let _ = fs::create_dir_all(parent);
|
||||
return Ok(relative_path);
|
||||
}
|
||||
if let Err(e) = fs::write(&file_path, &write_data) {
|
||||
log::warn!("Failed to write {}: {}", file_path.display(), e);
|
||||
Err(e) => {
|
||||
last_err = format!("{}", e);
|
||||
if attempt < MAX_FILE_RETRIES - 1 {
|
||||
log::debug!(
|
||||
"Retry {}/{} for {}: {}",
|
||||
attempt + 1,
|
||||
MAX_FILE_RETRIES,
|
||||
relative_path,
|
||||
last_err
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_millis(500 * (attempt as u64 + 1)))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Failed to download {}: {}", remote_key, e);
|
||||
}
|
||||
}
|
||||
|
||||
let msg = format!(
|
||||
"Failed to download {} after {} retries: {}",
|
||||
relative_path, MAX_FILE_RETRIES, last_err
|
||||
);
|
||||
log::warn!("{}", msg);
|
||||
tracker.record_failure();
|
||||
Err((relative_path, msg, critical))
|
||||
}));
|
||||
}
|
||||
|
||||
let mut critical_failures = Vec::new();
|
||||
let mut non_critical_failures = Vec::new();
|
||||
|
||||
for handle in handles {
|
||||
let _ = handle.await;
|
||||
match handle.await {
|
||||
Ok(Ok(_)) => {}
|
||||
Ok(Err((path, msg, true))) => critical_failures.push((path, msg)),
|
||||
Ok(Err((path, msg, false))) => non_critical_failures.push((path, msg)),
|
||||
Err(e) => {
|
||||
log::warn!("Download task panicked: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let _ = events::emit(
|
||||
"profile-sync-progress",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"phase": "download",
|
||||
"done": files.len(),
|
||||
"total": files.len()
|
||||
}),
|
||||
);
|
||||
// Final resume state save
|
||||
{
|
||||
let state = resume_state.lock().await;
|
||||
let _ = state.save(&profile_dir);
|
||||
}
|
||||
|
||||
tracker.emit_final();
|
||||
|
||||
if !non_critical_failures.is_empty() {
|
||||
log::warn!(
|
||||
"Download completed with {} non-critical failures for profile {}",
|
||||
non_critical_failures.len(),
|
||||
profile_id_owned
|
||||
);
|
||||
}
|
||||
|
||||
if !critical_failures.is_empty() {
|
||||
let file_list: Vec<&str> = critical_failures.iter().map(|(p, _)| p.as_str()).collect();
|
||||
return Err(SyncError::IoError(format!(
|
||||
"Critical files failed to download: {}. Sync aborted to prevent data loss.",
|
||||
file_list.join(", ")
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1531,6 +2047,7 @@ impl SyncEngine {
|
||||
"profile-sync-status",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"status": "synced"
|
||||
}),
|
||||
);
|
||||
@@ -1599,6 +2116,7 @@ impl SyncEngine {
|
||||
.download_profile_files(
|
||||
app_handle,
|
||||
profile_id,
|
||||
&profile.name,
|
||||
&profile_dir,
|
||||
&manifest.files,
|
||||
encryption_key.as_ref(),
|
||||
@@ -1631,6 +2149,7 @@ impl SyncEngine {
|
||||
"profile-sync-status",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"status": "synced"
|
||||
}),
|
||||
);
|
||||
@@ -2063,6 +2582,7 @@ pub async fn set_profile_sync_mode(
|
||||
"profile-sync-status",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"status": "error",
|
||||
"error": "Sync server not configured. Please configure sync settings first."
|
||||
}),
|
||||
@@ -2078,6 +2598,7 @@ pub async fn set_profile_sync_mode(
|
||||
"profile-sync-status",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"status": "error",
|
||||
"error": "Sync token not configured. Please configure sync settings first."
|
||||
}),
|
||||
@@ -2135,6 +2656,7 @@ pub async fn set_profile_sync_mode(
|
||||
"profile-sync-status",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"status": if is_running { "waiting" } else { "syncing" }
|
||||
}),
|
||||
);
|
||||
@@ -2197,6 +2719,7 @@ pub async fn set_profile_sync_mode(
|
||||
"profile-sync-status",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"status": "disabled"
|
||||
}),
|
||||
);
|
||||
@@ -2250,6 +2773,7 @@ pub async fn request_profile_sync(
|
||||
"profile-sync-status",
|
||||
serde_json::json!({
|
||||
"profile_id": profile_id,
|
||||
"profile_name": profile.name,
|
||||
"status": if is_running { "waiting" } else { "syncing" }
|
||||
}),
|
||||
);
|
||||
@@ -2624,7 +3148,9 @@ pub async fn enable_sync_for_all_entities(app_handle: tauri::AppHandle) -> Resul
|
||||
let proxies = crate::proxy_manager::PROXY_MANAGER.get_stored_proxies();
|
||||
for proxy in &proxies {
|
||||
if !proxy.sync_enabled && !proxy.is_cloud_managed {
|
||||
set_proxy_sync_enabled(app_handle.clone(), proxy.id.clone(), true).await?;
|
||||
if let Err(e) = set_proxy_sync_enabled(app_handle.clone(), proxy.id.clone(), true).await {
|
||||
log::warn!("Failed to enable sync for proxy {}: {e}", proxy.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2638,7 +3164,9 @@ pub async fn enable_sync_for_all_entities(app_handle: tauri::AppHandle) -> Resul
|
||||
};
|
||||
for group in &groups {
|
||||
if !group.sync_enabled {
|
||||
set_group_sync_enabled(app_handle.clone(), group.id.clone(), true).await?;
|
||||
if let Err(e) = set_group_sync_enabled(app_handle.clone(), group.id.clone(), true).await {
|
||||
log::warn!("Failed to enable sync for group {}: {e}", group.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2653,7 +3181,9 @@ pub async fn enable_sync_for_all_entities(app_handle: tauri::AppHandle) -> Resul
|
||||
};
|
||||
for config in &configs {
|
||||
if !config.sync_enabled {
|
||||
set_vpn_sync_enabled(app_handle.clone(), config.id.clone(), true).await?;
|
||||
if let Err(e) = set_vpn_sync_enabled(app_handle.clone(), config.id.clone(), true).await {
|
||||
log::warn!("Failed to enable sync for VPN {}: {e}", config.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2667,7 +3197,9 @@ pub async fn enable_sync_for_all_entities(app_handle: tauri::AppHandle) -> Resul
|
||||
};
|
||||
for ext in &exts {
|
||||
if !ext.sync_enabled {
|
||||
set_extension_sync_enabled(app_handle.clone(), ext.id.clone(), true).await?;
|
||||
if let Err(e) = set_extension_sync_enabled(app_handle.clone(), ext.id.clone(), true).await {
|
||||
log::warn!("Failed to enable sync for extension {}: {e}", ext.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2681,7 +3213,14 @@ pub async fn enable_sync_for_all_entities(app_handle: tauri::AppHandle) -> Resul
|
||||
};
|
||||
for group in &groups {
|
||||
if !group.sync_enabled {
|
||||
set_extension_group_sync_enabled(app_handle.clone(), group.id.clone(), true).await?;
|
||||
if let Err(e) =
|
||||
set_extension_group_sync_enabled(app_handle.clone(), group.id.clone(), true).await
|
||||
{
|
||||
log::warn!(
|
||||
"Failed to enable sync for extension group {}: {e}",
|
||||
group.id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user