feat: teams plan

This commit is contained in:
zhom
2026-03-02 15:49:26 +04:00
parent 9822ad4e3f
commit acd572ed23
30 changed files with 1223 additions and 200 deletions
+7
View File
@@ -1305,6 +1305,11 @@ async fn run_profile(
return Err(StatusCode::BAD_REQUEST);
}
// Team lock check
crate::team_lock::acquire_team_lock_if_needed(profile)
.await
.map_err(|_| StatusCode::CONFLICT)?;
// Generate a random port for remote debugging
let remote_debugging_port = rand::random::<u16>().saturating_add(9000).max(9000);
@@ -1399,6 +1404,8 @@ async fn kill_profile(
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
crate::team_lock::release_team_lock_if_needed(profile).await;
Ok(StatusCode::NO_CONTENT)
}
+2
View File
@@ -525,6 +525,8 @@ mod tests {
ephemeral: false,
extension_group_id: None,
proxy_bypass_rules: Vec::new(),
created_by_id: None,
created_by_email: None,
}
}
+6
View File
@@ -2536,6 +2536,9 @@ pub async fn launch_browser_profile(
));
}
// Team lock check: if profile is sync-enabled and user is on a team, acquire lock
crate::team_lock::acquire_team_lock_if_needed(&profile).await?;
let browser_runner = BrowserRunner::instance();
// Store the internal proxy settings for passing to launch_browser
@@ -2740,6 +2743,9 @@ pub async fn kill_browser_profile(
profile.id
);
// Release team lock if applicable
crate::team_lock::release_team_lock_if_needed(&profile).await;
// Auto-update non-running profiles and cleanup unused binaries
let browser_for_update = profile.browser.clone();
let app_handle_for_update = app_handle.clone();
+31 -1
View File
@@ -39,6 +39,12 @@ pub struct CloudUser {
pub proxy_bandwidth_used_mb: i64,
#[serde(rename = "proxyBandwidthExtraMb", default)]
pub proxy_bandwidth_extra_mb: i64,
#[serde(rename = "teamId", default)]
pub team_id: Option<String>,
#[serde(rename = "teamName", default)]
pub team_name: Option<String>,
#[serde(rename = "teamRole", default)]
pub team_role: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -249,7 +255,7 @@ impl CloudAuthManager {
Self::encrypt_and_store(&path, b"DBCAT", token)
}
fn load_access_token() -> Result<Option<String>, String> {
pub(crate) fn load_access_token() -> Result<Option<String>, String> {
let path = Self::get_settings_dir().join("cloud_access_token.dat");
Self::decrypt_from_file(&path, b"DBCAT")
}
@@ -572,6 +578,9 @@ impl CloudAuthManager {
}
pub async fn logout(&self) -> Result<(), String> {
// Disconnect team lock manager
crate::team_lock::TEAM_LOCK.disconnect().await;
// Try to call the logout API (best-effort)
if let Ok(Some(access_token)) = Self::load_access_token() {
let refresh_token = Self::load_refresh_token().ok().flatten();
@@ -637,6 +646,13 @@ impl CloudAuthManager {
}
}
pub async fn is_on_team_plan(&self) -> bool {
if let Some(state) = self.get_user().await {
return state.user.team_id.is_some();
}
false
}
pub async fn get_user(&self) -> Option<CloudAuthState> {
let state = self.state.lock().await;
state.clone()
@@ -935,6 +951,13 @@ impl CloudAuthManager {
log::debug!("Failed to refresh cloud profile: {e}");
}
// Reconnect team lock manager if needed
if let Some(auth_state) = CLOUD_AUTH.get_user().await {
if let Some(tid) = &auth_state.user.team_id {
crate::team_lock::TEAM_LOCK.connect(tid).await;
}
}
// Sync cloud proxy credentials
CLOUD_AUTH.sync_cloud_proxy().await;
@@ -978,6 +1001,13 @@ pub async fn cloud_verify_otp(
// Sync cloud proxy after login
CLOUD_AUTH.sync_cloud_proxy().await;
// Connect team lock manager if on a team plan
if state.user.team_id.is_some() {
if let Some(tid) = &state.user.team_id {
crate::team_lock::TEAM_LOCK.connect(tid).await;
}
}
let _ = crate::events::emit_empty("cloud-auth-changed");
let _ = &app_handle;
+2
View File
@@ -275,6 +275,8 @@ mod tests {
ephemeral,
extension_group_id: None,
proxy_bypass_rules: Vec::new(),
created_by_id: None,
created_by_email: None,
}
}
+6 -1
View File
@@ -50,6 +50,7 @@ pub mod daemon_ws;
pub mod events;
mod mcp_server;
mod tag_manager;
mod team_lock;
mod version_updater;
pub mod vpn;
pub mod vpn_worker_runner;
@@ -1466,7 +1467,10 @@ pub fn run() {
cloud_auth::cloud_get_states,
cloud_auth::cloud_get_cities,
cloud_auth::create_cloud_location_proxy,
cloud_auth::restart_sync_service
cloud_auth::restart_sync_service,
// Team lock commands
team_lock::get_team_locks,
team_lock::get_team_lock_status,
])
.build(tauri::generate_context!())
.expect("error while building tauri application")
@@ -1509,6 +1513,7 @@ mod tests {
"update_extension",
"set_extension_sync_enabled",
"set_extension_group_sync_enabled",
"get_team_lock_status",
];
// Extract command names from the generate_handler! macro in this file
+86 -2
View File
@@ -809,6 +809,30 @@ impl McpServer {
"required": ["profile_id"]
}),
},
// Team lock tools
McpTool {
name: "get_team_locks".to_string(),
description: "List all active team profile locks. Requires team plan.".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {},
"required": []
}),
},
McpTool {
name: "get_team_lock_status".to_string(),
description: "Check if a profile is locked by a team member. Requires team plan.".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"profile_id": {
"type": "string",
"description": "The UUID of the profile to check"
}
},
"required": ["profile_id"]
}),
},
]
}
@@ -926,6 +950,9 @@ impl McpServer {
.handle_assign_extension_group_to_profile(&arguments)
.await
}
// Team lock tools
"get_team_locks" => self.handle_get_team_locks().await,
"get_team_lock_status" => self.handle_get_team_lock_status(&arguments).await,
_ => Err(McpError {
code: -32602,
message: format!("Unknown tool: {tool_name}"),
@@ -1040,6 +1067,14 @@ impl McpServer {
});
}
// Team lock check
crate::team_lock::acquire_team_lock_if_needed(profile)
.await
.map_err(|e| McpError {
code: -32000,
message: e,
})?;
// Get app handle to launch
let inner = self.inner.lock().await;
let app_handle = inner.app_handle.as_ref().ok_or_else(|| McpError {
@@ -1121,6 +1156,8 @@ impl McpServer {
message: format!("Failed to kill browser: {e}"),
})?;
crate::team_lock::release_team_lock_if_needed(profile).await;
Ok(serde_json::json!({
"content": [{
"type": "text",
@@ -2388,6 +2425,50 @@ impl McpServer {
})?;
Ok(serde_json::to_value(profile).unwrap())
}
async fn handle_get_team_locks(&self) -> Result<serde_json::Value, McpError> {
if !CLOUD_AUTH.is_on_team_plan().await {
return Err(McpError {
code: -32000,
message: "Team features require an active team plan".to_string(),
});
}
let locks = crate::team_lock::TEAM_LOCK.get_locks().await;
Ok(serde_json::json!({
"content": [{
"type": "text",
"text": serde_json::to_string_pretty(&locks).unwrap_or_default()
}]
}))
}
async fn handle_get_team_lock_status(
&self,
arguments: &serde_json::Value,
) -> Result<serde_json::Value, McpError> {
if !CLOUD_AUTH.is_on_team_plan().await {
return Err(McpError {
code: -32000,
message: "Team features require an active team plan".to_string(),
});
}
let profile_id = arguments
.get("profile_id")
.and_then(|v| v.as_str())
.ok_or_else(|| McpError {
code: -32602,
message: "Missing profile_id".to_string(),
})?;
let lock_status = crate::team_lock::TEAM_LOCK
.get_lock_status(profile_id)
.await;
Ok(serde_json::json!({
"content": [{
"type": "text",
"text": serde_json::to_string_pretty(&lock_status).unwrap_or_default()
}]
}))
}
}
lazy_static::lazy_static! {
@@ -2403,8 +2484,8 @@ mod tests {
let server = McpServer::new();
let tools = server.get_tools();
// Should have at least 32 tools (26 + 6 extension tools)
assert!(tools.len() >= 32);
// Should have at least 34 tools (26 + 6 extension tools + 2 team lock tools)
assert!(tools.len() >= 34);
// Check tool names
let tool_names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
@@ -2448,6 +2529,9 @@ mod tests {
assert!(tool_names.contains(&"delete_extension"));
assert!(tool_names.contains(&"delete_extension_group"));
assert!(tool_names.contains(&"assign_extension_group_to_profile"));
// Team lock tools
assert!(tool_names.contains(&"get_team_locks"));
assert!(tool_names.contains(&"get_team_lock_status"));
}
#[test]
+8
View File
@@ -179,6 +179,8 @@ impl ProfileManager {
ephemeral: false,
extension_group_id: None,
proxy_bypass_rules: Vec::new(),
created_by_id: None,
created_by_email: None,
};
match self
@@ -298,6 +300,8 @@ impl ProfileManager {
ephemeral: false,
extension_group_id: None,
proxy_bypass_rules: Vec::new(),
created_by_id: None,
created_by_email: None,
};
match self
@@ -349,6 +353,8 @@ impl ProfileManager {
ephemeral,
extension_group_id: None,
proxy_bypass_rules: Vec::new(),
created_by_id: None,
created_by_email: None,
};
// Save profile info
@@ -903,6 +909,8 @@ impl ProfileManager {
ephemeral: false,
extension_group_id: source.extension_group_id,
proxy_bypass_rules: source.proxy_bypass_rules,
created_by_id: None,
created_by_email: None,
};
self.save_profile(&new_profile)?;
+4
View File
@@ -61,6 +61,10 @@ pub struct BrowserProfile {
pub extension_group_id: Option<String>,
#[serde(default)]
pub proxy_bypass_rules: Vec<String>,
#[serde(default)]
pub created_by_id: Option<String>,
#[serde(default)]
pub created_by_email: Option<String>,
}
pub fn default_release_type() -> String {
+2
View File
@@ -561,6 +561,8 @@ impl ProfileImporter {
ephemeral: false,
extension_group_id: None,
proxy_bypass_rules: Vec::new(),
created_by_id: None,
created_by_email: None,
};
// Save the profile metadata
+149 -28
View File
@@ -67,6 +67,23 @@ impl SyncEngine {
Ok(Self::new(server_url, token))
}
/// Get the key prefix for team profiles. Returns empty string for personal profiles.
async fn get_team_key_prefix(profile: &BrowserProfile) -> String {
if profile.created_by_id.is_some() {
if let Some(auth) = crate::cloud_auth::CLOUD_AUTH.get_user().await {
if let Some(team_id) = &auth.user.team_id {
return format!("teams/{}/", team_id);
}
}
}
String::new()
}
/// Check if this is a self-hosted sync (no cloud login).
async fn is_self_hosted_sync() -> bool {
!crate::cloud_auth::CLOUD_AUTH.is_logged_in().await
}
pub async fn sync_profile(
&self,
app_handle: &tauri::AppHandle,
@@ -81,6 +98,16 @@ impl SyncEngine {
return Ok(());
}
// Skip team profiles for self-hosted sync
if Self::is_self_hosted_sync().await && profile.created_by_id.is_some() {
log::info!(
"Skipping team profile for self-hosted sync: {} ({})",
profile.name,
profile.id
);
return Ok(());
}
// Derive encryption key if encrypted sync
let encryption_key = if profile.is_encrypted_sync() {
let password = encryption::load_e2e_password()
@@ -104,10 +131,18 @@ impl SyncEngine {
let profile_dir = profiles_dir.join(profile.id.to_string());
let profile_id = profile.id.to_string();
// Determine team key prefix for team profiles
let key_prefix = Self::get_team_key_prefix(profile).await;
log::info!(
"Starting delta sync for profile: {} ({})",
"Starting delta sync for profile: {} ({}){}",
profile.name,
profile_id
profile_id,
if key_prefix.is_empty() {
String::new()
} else {
format!(" [team prefix: {}]", key_prefix)
}
);
let _ = events::emit(
@@ -155,7 +190,7 @@ impl SyncEngine {
hash_cache.save(&cache_path)?;
// Try to download remote manifest
let remote_manifest_key = format!("profiles/{}/manifest.json", profile_id);
let remote_manifest_key = format!("{}profiles/{}/manifest.json", key_prefix, profile_id);
let remote_manifest = self.download_manifest(&remote_manifest_key).await?;
// Compute diff
@@ -173,6 +208,13 @@ impl SyncEngine {
return Ok(());
}
let upload_bytes: u64 = diff.files_to_upload.iter().map(|f| f.size).sum();
let download_bytes: u64 = diff.files_to_download.iter().map(|f| f.size).sum();
let total_files = diff.files_to_upload.len()
+ diff.files_to_download.len()
+ diff.files_to_delete_local.len()
+ diff.files_to_delete_remote.len();
log::info!(
"Profile {} diff: {} to upload, {} to download, {} to delete local, {} to delete remote",
profile_id,
@@ -182,6 +224,16 @@ impl SyncEngine {
diff.files_to_delete_remote.len()
);
let _ = events::emit(
"profile-sync-progress",
serde_json::json!({
"profile_id": profile_id,
"phase": "started",
"total_files": total_files,
"total_bytes": upload_bytes + download_bytes
}),
);
// Perform uploads
if !diff.files_to_upload.is_empty() {
self
@@ -191,6 +243,7 @@ impl SyncEngine {
&profile_dir,
&diff.files_to_upload,
encryption_key.as_ref(),
&key_prefix,
)
.await?;
}
@@ -204,6 +257,7 @@ impl SyncEngine {
&profile_dir,
&diff.files_to_download,
encryption_key.as_ref(),
&key_prefix,
)
.await?;
}
@@ -219,18 +273,22 @@ impl SyncEngine {
// Delete remote files that don't exist locally (when local is newer)
for path in &diff.files_to_delete_remote {
let remote_key = format!("profiles/{}/files/{}", profile_id, path);
let remote_key = format!("{}profiles/{}/files/{}", key_prefix, profile_id, path);
let _ = self.client.delete(&remote_key, None).await;
log::debug!("Deleted remote file: {}", path);
}
// Upload metadata.json (sanitized profile)
self.upload_profile_metadata(&profile_id, profile).await?;
self
.upload_profile_metadata(&profile_id, profile, &key_prefix)
.await?;
// Upload manifest.json last for atomicity
let mut final_manifest = local_manifest;
final_manifest.encrypted = encryption_key.is_some();
self.upload_manifest(&profile_id, &final_manifest).await?;
self
.upload_manifest(&profile_id, &final_manifest, &key_prefix)
.await?;
// Sync associated proxy, group, and VPN
if let Some(proxy_id) = &profile.proxy_id {
@@ -281,11 +339,16 @@ impl SyncEngine {
Ok(Some(manifest))
}
async fn upload_manifest(&self, profile_id: &str, manifest: &SyncManifest) -> SyncResult<()> {
async fn upload_manifest(
&self,
profile_id: &str,
manifest: &SyncManifest,
key_prefix: &str,
) -> SyncResult<()> {
let json = serde_json::to_string_pretty(manifest)
.map_err(|e| SyncError::SerializationError(format!("Failed to serialize manifest: {e}")))?;
let remote_key = format!("profiles/{}/manifest.json", profile_id);
let remote_key = format!("{}profiles/{}/manifest.json", key_prefix, profile_id);
let presign = self
.client
.presign_upload(&remote_key, Some("application/json"))
@@ -303,6 +366,7 @@ impl SyncEngine {
&self,
profile_id: &str,
profile: &BrowserProfile,
key_prefix: &str,
) -> SyncResult<()> {
let mut sanitized = profile.clone();
sanitized.process_id = None;
@@ -311,7 +375,7 @@ impl SyncEngine {
let json = serde_json::to_string_pretty(&sanitized)
.map_err(|e| SyncError::SerializationError(format!("Failed to serialize profile: {e}")))?;
let remote_key = format!("profiles/{}/metadata.json", profile_id);
let remote_key = format!("{}profiles/{}/metadata.json", key_prefix, profile_id);
let presign = self
.client
.presign_upload(&remote_key, Some("application/json"))
@@ -332,6 +396,7 @@ impl SyncEngine {
profile_dir: &Path,
files: &[super::manifest::ManifestFileEntry],
encryption_key: Option<&[u8; 32]>,
key_prefix: &str,
) -> SyncResult<()> {
if files.is_empty() {
return Ok(());
@@ -343,7 +408,7 @@ impl SyncEngine {
let items: Vec<(String, Option<String>)> = files
.iter()
.map(|f| {
let key = format!("profiles/{}/files/{}", profile_id, f.path);
let key = format!("{}profiles/{}/files/{}", key_prefix, profile_id, f.path);
let content_type = mime_guess::from_path(&f.path)
.first()
.map(|m| m.to_string());
@@ -372,7 +437,7 @@ impl SyncEngine {
for file in files {
let sem = semaphore.clone();
let file_path = profile_dir.join(&file.path);
let remote_key = format!("profiles/{}/files/{}", profile_id, file.path);
let remote_key = format!("{}profiles/{}/files/{}", key_prefix, profile_id, file.path);
let url = url_map.get(&remote_key).cloned();
if url.is_none() {
@@ -442,6 +507,7 @@ impl SyncEngine {
profile_dir: &Path,
files: &[super::manifest::ManifestFileEntry],
encryption_key: Option<&[u8; 32]>,
key_prefix: &str,
) -> SyncResult<()> {
if files.is_empty() {
return Ok(());
@@ -456,7 +522,7 @@ impl SyncEngine {
// Get batch presigned URLs
let keys: Vec<String> = files
.iter()
.map(|f| format!("profiles/{}/files/{}", profile_id, f.path))
.map(|f| format!("{}profiles/{}/files/{}", key_prefix, profile_id, f.path))
.collect();
let batch_response = self.client.presign_download_batch(keys).await?;
@@ -480,7 +546,7 @@ impl SyncEngine {
for file in files {
let sem = semaphore.clone();
let file_path = profile_dir.join(&file.path);
let remote_key = format!("profiles/{}/files/{}", profile_id, file.path);
let remote_key = format!("{}profiles/{}/files/{}", key_prefix, profile_id, file.path);
let url = url_map.get(&remote_key).cloned();
if url.is_none() {
@@ -845,6 +911,26 @@ impl SyncEngine {
profile_id,
result.deleted_count
);
// Also delete from team path if user is on a team
if let Some(auth) = crate::cloud_auth::CLOUD_AUTH.get_user().await {
if let Some(team_id) = &auth.user.team_id {
let team_prefix = format!("teams/{}/profiles/{}/", team_id, profile_id);
let team_tombstone = format!("teams/{}/tombstones/profiles/{}.json", team_id, profile_id);
let team_result = self
.client
.delete_prefix(&team_prefix, Some(&team_tombstone))
.await?;
if team_result.deleted_count > 0 {
log::info!(
"Profile {} deleted from team sync ({} objects removed)",
profile_id,
team_result.deleted_count
);
}
}
}
Ok(())
}
@@ -1359,6 +1445,7 @@ impl SyncEngine {
&self,
app_handle: &tauri::AppHandle,
profile_id: &str,
key_prefix: &str,
) -> SyncResult<bool> {
let profile_manager = ProfileManager::instance();
let profiles_dir = profile_manager.get_profiles_dir();
@@ -1380,7 +1467,7 @@ impl SyncEngine {
}
// Check if profile exists remotely
let manifest_key = format!("profiles/{}/manifest.json", profile_id);
let manifest_key = format!("{}profiles/{}/manifest.json", key_prefix, profile_id);
let stat = self.client.stat(&manifest_key).await?;
if !stat.exists {
@@ -1394,7 +1481,7 @@ impl SyncEngine {
);
// Download metadata.json first to get profile info
let metadata_key = format!("profiles/{}/metadata.json", profile_id);
let metadata_key = format!("{}profiles/{}/metadata.json", key_prefix, profile_id);
let metadata_stat = self.client.stat(&metadata_key).await?;
if !metadata_stat.exists {
@@ -1515,6 +1602,7 @@ impl SyncEngine {
&profile_dir,
&manifest.files,
encryption_key.as_ref(),
key_prefix,
)
.await?;
}
@@ -1558,13 +1646,13 @@ impl SyncEngine {
) -> SyncResult<Vec<String>> {
log::info!("Checking for missing synced profiles...");
// List all profiles from S3
// List personal profiles from S3
let list_response = self.client.list("profiles/").await?;
let mut downloaded: Vec<String> = Vec::new();
// Extract unique profile IDs from the list
let mut profile_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
// Extract unique profile IDs with their key prefix
let mut profiles_to_check: HashMap<String, String> = HashMap::new();
for obj in list_response.objects {
if obj.key.starts_with("profiles/") && obj.key.ends_with("/manifest.json") {
if let Some(profile_id) = obj
@@ -1572,24 +1660,45 @@ impl SyncEngine {
.strip_prefix("profiles/")
.and_then(|s| s.strip_suffix("/manifest.json"))
{
profile_ids.insert(profile_id.to_string());
profiles_to_check.insert(profile_id.to_string(), String::new());
}
}
}
// Also list team profiles if user is on a team
if let Some(auth) = crate::cloud_auth::CLOUD_AUTH.get_user().await {
if let Some(team_id) = &auth.user.team_id {
let team_prefix = format!("teams/{}/", team_id);
let team_list_key = format!("{}profiles/", team_prefix);
if let Ok(team_list) = self.client.list(&team_list_key).await {
for obj in team_list.objects {
if obj.key.starts_with("profiles/") && obj.key.ends_with("/manifest.json") {
if let Some(profile_id) = obj
.key
.strip_prefix("profiles/")
.and_then(|s| s.strip_suffix("/manifest.json"))
{
profiles_to_check.insert(profile_id.to_string(), team_prefix.clone());
}
}
}
}
}
}
log::info!(
"Found {} profiles in remote storage, checking for missing ones...",
profile_ids.len()
profiles_to_check.len()
);
// For each remote profile, check if it exists locally and download if missing
for profile_id in profile_ids {
for (profile_id, key_prefix) in &profiles_to_check {
match self
.download_profile_if_missing(app_handle, &profile_id)
.download_profile_if_missing(app_handle, profile_id, key_prefix)
.await
{
Ok(true) => {
downloaded.push(profile_id);
downloaded.push(profile_id.clone());
}
Ok(false) => {
// Profile exists locally or doesn't exist remotely, skip
@@ -1613,17 +1722,28 @@ impl SyncEngine {
// Refresh metadata for local cross-OS profiles (propagate renames, tags, notes from originating device)
let profile_manager = ProfileManager::instance();
// Collect cross-OS profiles before async operations to avoid holding non-Send Result across await
let cross_os_profiles: Vec<(String, SyncMode)> = profile_manager
let cross_os_profiles: Vec<(String, SyncMode, Option<String>)> = profile_manager
.list_profiles()
.unwrap_or_default()
.iter()
.filter(|p| p.is_cross_os() && p.is_sync_enabled())
.map(|p| (p.id.to_string(), p.sync_mode))
.map(|p| (p.id.to_string(), p.sync_mode, p.created_by_id.clone()))
.collect();
if !cross_os_profiles.is_empty() {
for (pid, sync_mode) in &cross_os_profiles {
let metadata_key = format!("profiles/{}/metadata.json", pid);
let team_prefix = if let Some(auth) = crate::cloud_auth::CLOUD_AUTH.get_user().await {
auth.user.team_id.map(|tid| format!("teams/{}/", tid))
} else {
None
};
for (pid, sync_mode, created_by_id) in &cross_os_profiles {
let kp = if created_by_id.is_some() {
team_prefix.as_deref().unwrap_or("")
} else {
""
};
let metadata_key = format!("{}profiles/{}/metadata.json", kp, pid);
match self.client.stat(&metadata_key).await {
Ok(stat) if stat.exists => match self.client.presign_download(&metadata_key).await {
Ok(presign) => match self.client.download_bytes(&presign.url).await {
@@ -1981,7 +2101,8 @@ pub async fn set_profile_sync_mode(
let mode_switched = old_mode != SyncMode::Disabled && enabling && old_mode != new_mode;
if mode_switched {
if let Ok(engine) = SyncEngine::create_from_settings(&app_handle).await {
let manifest_key = format!("profiles/{}/manifest.json", profile_id);
let key_prefix = SyncEngine::get_team_key_prefix(&profile).await;
let manifest_key = format!("{}profiles/{}/manifest.json", key_prefix, profile_id);
let _ = engine.client.delete(&manifest_key, None).await;
log::info!(
"Deleted remote manifest for profile {} due to sync mode change ({:?} -> {:?})",
+16 -1
View File
@@ -208,8 +208,21 @@ impl SyncSubscription {
data_line.and_then(|data| serde_json::from_str(data).ok())
}
fn strip_team_prefix(key: &str) -> &str {
if key.starts_with("teams/") {
if let Some(rest) = key.find('/').and_then(|first_slash| {
key[first_slash + 1..]
.find('/')
.map(|second_slash| first_slash + 1 + second_slash + 1)
}) {
return &key[rest..];
}
}
key
}
fn handle_event(event: &SubscribeEvent, work_tx: &mpsc::UnboundedSender<SyncWorkItem>) {
let Some(key) = &event.key else {
let Some(raw_key) = &event.key else {
return;
};
@@ -217,6 +230,8 @@ impl SyncSubscription {
return;
}
let key = Self::strip_team_prefix(raw_key);
let work_item = if key.starts_with("profiles/") {
key
.strip_prefix("profiles/")
+335
View File
@@ -0,0 +1,335 @@
use lazy_static::lazy_static;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use crate::cloud_auth::{CloudAuthManager, CLOUD_API_URL, CLOUD_AUTH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProfileLockInfo {
#[serde(rename = "profileId")]
pub profile_id: String,
#[serde(rename = "lockedBy")]
pub locked_by: String,
#[serde(rename = "lockedByEmail")]
pub locked_by_email: String,
#[serde(rename = "lockedAt")]
pub locked_at: String,
#[serde(rename = "expiresAt", default)]
pub expires_at: Option<String>,
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct AcquireLockResponse {
success: bool,
#[serde(rename = "lockedBy")]
locked_by: Option<String>,
#[serde(rename = "lockedByEmail")]
locked_by_email: Option<String>,
}
pub struct TeamLockManager {
locks: RwLock<HashMap<String, ProfileLockInfo>>,
heartbeat_handle: Mutex<Option<JoinHandle<()>>>,
connected_team_id: Mutex<Option<String>>,
}
lazy_static! {
pub static ref TEAM_LOCK: TeamLockManager = TeamLockManager::new();
}
impl TeamLockManager {
fn new() -> Self {
Self {
locks: RwLock::new(HashMap::new()),
heartbeat_handle: Mutex::new(None),
connected_team_id: Mutex::new(None),
}
}
pub async fn connect(&self, team_id: &str) {
log::info!("Connecting team lock manager for team: {team_id}");
{
let mut tid = self.connected_team_id.lock().await;
*tid = Some(team_id.to_string());
}
if let Err(e) = self.fetch_initial_locks(team_id).await {
log::warn!("Failed to fetch initial locks: {e}");
}
self.start_heartbeat_loop().await;
}
pub async fn disconnect(&self) {
log::info!("Disconnecting team lock manager");
{
let mut handle = self.heartbeat_handle.lock().await;
if let Some(h) = handle.take() {
h.abort();
}
}
{
let mut locks = self.locks.write().await;
locks.clear();
}
{
let mut tid = self.connected_team_id.lock().await;
*tid = None;
}
}
pub async fn acquire_lock(&self, profile_id: &str) -> Result<(), String> {
let team_id = self.get_team_id().await?;
let client = Client::new();
let access_token =
CloudAuthManager::load_access_token()?.ok_or_else(|| "Not logged in".to_string())?;
let url = format!("{CLOUD_API_URL}/api/teams/{team_id}/locks");
let response = client
.post(&url)
.header("Authorization", format!("Bearer {access_token}"))
.json(&serde_json::json!({ "profileId": profile_id }))
.send()
.await
.map_err(|e| format!("Failed to acquire lock: {e}"))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(format!("Lock acquisition failed ({status}): {body}"));
}
let result: AcquireLockResponse = response
.json()
.await
.map_err(|e| format!("Failed to parse lock response: {e}"))?;
if !result.success {
let email = result
.locked_by_email
.unwrap_or_else(|| "another user".to_string());
return Err(format!("Profile is in use by {email}"));
}
// Update local cache
if let Some(user) = CLOUD_AUTH.get_user().await {
let mut locks = self.locks.write().await;
locks.insert(
profile_id.to_string(),
ProfileLockInfo {
profile_id: profile_id.to_string(),
locked_by: user.user.id.clone(),
locked_by_email: user.user.email.clone(),
locked_at: chrono::Utc::now().to_rfc3339(),
expires_at: None,
},
);
}
let _ = crate::events::emit(
"team-lock-acquired",
serde_json::json!({ "profileId": profile_id }),
);
Ok(())
}
pub async fn release_lock(&self, profile_id: &str) -> Result<(), String> {
let team_id = self.get_team_id().await?;
let client = Client::new();
let access_token =
CloudAuthManager::load_access_token()?.ok_or_else(|| "Not logged in".to_string())?;
let url = format!("{CLOUD_API_URL}/api/teams/{team_id}/locks/{profile_id}");
let _ = client
.delete(&url)
.header("Authorization", format!("Bearer {access_token}"))
.send()
.await;
{
let mut locks = self.locks.write().await;
locks.remove(profile_id);
}
let _ = crate::events::emit(
"team-lock-released",
serde_json::json!({ "profileId": profile_id }),
);
Ok(())
}
pub async fn get_locks(&self) -> Vec<ProfileLockInfo> {
let locks = self.locks.read().await;
locks.values().cloned().collect()
}
pub async fn get_lock_status(&self, profile_id: &str) -> Option<ProfileLockInfo> {
let locks = self.locks.read().await;
locks.get(profile_id).cloned()
}
pub async fn is_locked_by_another(&self, profile_id: &str) -> bool {
let locks = self.locks.read().await;
if let Some(lock) = locks.get(profile_id) {
if let Some(user) = CLOUD_AUTH.get_user().await {
return lock.locked_by != user.user.id;
}
}
false
}
async fn fetch_initial_locks(&self, team_id: &str) -> Result<(), String> {
let client = Client::new();
let access_token =
CloudAuthManager::load_access_token()?.ok_or_else(|| "Not logged in".to_string())?;
let url = format!("{CLOUD_API_URL}/api/teams/{team_id}/locks");
let response = client
.get(&url)
.header("Authorization", format!("Bearer {access_token}"))
.send()
.await
.map_err(|e| format!("Failed to fetch locks: {e}"))?;
if !response.status().is_success() {
return Err("Failed to fetch locks".to_string());
}
let lock_list: Vec<ProfileLockInfo> = response
.json()
.await
.map_err(|e| format!("Failed to parse locks: {e}"))?;
let mut locks = self.locks.write().await;
locks.clear();
for lock in lock_list {
locks.insert(lock.profile_id.clone(), lock);
}
Ok(())
}
async fn start_heartbeat_loop(&self) {
let mut handle = self.heartbeat_handle.lock().await;
if let Some(h) = handle.take() {
h.abort();
}
let h = tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
let team_id = match TEAM_LOCK.get_team_id().await {
Ok(id) => id,
Err(_) => break,
};
let held_locks: Vec<String> = {
let locks = TEAM_LOCK.locks.read().await;
if let Some(user) = CLOUD_AUTH.get_user().await {
locks
.values()
.filter(|l| l.locked_by == user.user.id)
.map(|l| l.profile_id.clone())
.collect()
} else {
vec![]
}
};
for profile_id in held_locks {
let client = Client::new();
if let Ok(Some(token)) = CloudAuthManager::load_access_token() {
let url = format!("{CLOUD_API_URL}/api/teams/{team_id}/locks/{profile_id}/heartbeat");
let _ = client
.post(&url)
.header("Authorization", format!("Bearer {token}"))
.send()
.await;
}
}
// Refresh lock state from server
if let Err(e) = TEAM_LOCK.fetch_initial_locks(&team_id).await {
log::debug!("Failed to refresh locks: {e}");
}
}
});
*handle = Some(h);
}
async fn get_team_id(&self) -> Result<String, String> {
let tid = self.connected_team_id.lock().await;
tid
.clone()
.ok_or_else(|| "Not connected to a team".to_string())
}
}
/// Acquire team lock if profile is sync-enabled and user is on a team.
/// Returns Ok(()) if lock acquired or not applicable, Err with message if locked by another.
pub async fn acquire_team_lock_if_needed(
profile: &crate::profile::BrowserProfile,
) -> Result<(), String> {
if !profile.is_sync_enabled() {
return Ok(());
}
if !CLOUD_AUTH.is_on_team_plan().await {
return Ok(());
}
if TEAM_LOCK
.is_locked_by_another(&profile.id.to_string())
.await
{
if let Some(lock) = TEAM_LOCK.get_lock_status(&profile.id.to_string()).await {
return Err(format!("Profile is in use by {}", lock.locked_by_email));
}
return Err("Profile is in use by another team member".to_string());
}
TEAM_LOCK.acquire_lock(&profile.id.to_string()).await
}
/// Release team lock if profile is sync-enabled and user is on a team.
/// Logs warnings on failure but does not return errors.
pub async fn release_team_lock_if_needed(profile: &crate::profile::BrowserProfile) {
if !profile.is_sync_enabled() {
return;
}
if !CLOUD_AUTH.is_on_team_plan().await {
return;
}
if let Err(e) = TEAM_LOCK.release_lock(&profile.id.to_string()).await {
log::warn!(
"Failed to release team lock for profile {}: {e}",
profile.id
);
}
}
// --- Tauri commands ---
#[tauri::command]
pub async fn get_team_locks() -> Result<Vec<ProfileLockInfo>, String> {
Ok(TEAM_LOCK.get_locks().await)
}
#[tauri::command]
pub async fn get_team_lock_status(profile_id: String) -> Result<Option<ProfileLockInfo>, String> {
Ok(TEAM_LOCK.get_lock_status(&profile_id).await)
}