From 94cccc370251952841d6dcbc7562d416da09468c Mon Sep 17 00:00:00 2001 From: zhom <2717306+zhom@users.noreply.github.com> Date: Wed, 24 Jun 2026 00:51:21 +0400 Subject: [PATCH] fix: prevent stale sse token refresh --- src-tauri/src/sync/subscription.rs | 57 +++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/src-tauri/src/sync/subscription.rs b/src-tauri/src/sync/subscription.rs index 360d366..62df17a 100644 --- a/src-tauri/src/sync/subscription.rs +++ b/src-tauri/src/sync/subscription.rs @@ -29,24 +29,35 @@ pub enum SyncWorkItem { Tombstone(String, String), } +/// Where a subscription's sync token comes from, so reconnects can re-fetch a +/// fresh one (tokens are short-lived, ~15 min). +#[derive(Clone, Copy)] +enum TokenSource { + Cloud, + SelfHosted, +} + pub struct SyncSubscription { client: Client, base_url: String, token: String, + source: TokenSource, running: Arc, work_tx: mpsc::UnboundedSender, } impl SyncSubscription { - pub fn new( + fn new( base_url: String, token: String, + source: TokenSource, work_tx: mpsc::UnboundedSender, ) -> Self { Self { client: Client::new(), base_url: base_url.trim_end_matches('/').to_string(), token, + source, running: Arc::new(AtomicBool::new(false)), work_tx, } @@ -66,7 +77,7 @@ impl SyncSubscription { let Some(token) = token else { return Ok(None); }; - return Ok(Some(Self::new(url, token, work_tx))); + return Ok(Some(Self::new(url, token, TokenSource::Cloud, work_tx))); } // Fall back to self-hosted settings @@ -88,7 +99,12 @@ impl SyncSubscription { return Ok(None); }; - Ok(Some(Self::new(server_url, token, work_tx))) + Ok(Some(Self::new( + server_url, + token, + TokenSource::SelfHosted, + work_tx, + ))) } pub fn is_running(&self) -> bool { @@ -106,9 +122,10 @@ impl SyncSubscription { let running = self.running.clone(); let base_url = self.base_url.clone(); - let token = self.token.clone(); + let source = self.source; let work_tx = self.work_tx.clone(); let client = self.client.clone(); + let mut token = self.token.clone(); tokio::spawn(async move { while running.load(Ordering::SeqCst) { @@ -126,6 +143,20 @@ impl SyncSubscription { if running.load(Ordering::SeqCst) { sleep(Duration::from_secs(1)).await; + // Refresh the sync token before reconnecting. The token may have + // expired while the stream was open (tokens last ~15 min); reusing + // the construction-time token otherwise produces an endless 401 + // reconnect loop until the app is restarted (issue #440). + match Self::fetch_sync_token(source, &app_handle).await { + Ok(Some(fresh)) => token = fresh, + Ok(None) => { + log::info!("Sync token no longer available; stopping subscription"); + break; + } + Err(e) => { + log::warn!("Failed to refresh sync token: {e}; retrying with the current token"); + } + } } } @@ -133,6 +164,24 @@ impl SyncSubscription { }); } + /// Fetch a current sync token from the same source the subscription was + /// created from, so reconnects never reuse a stale (expired) token. + async fn fetch_sync_token( + source: TokenSource, + app_handle: &tauri::AppHandle, + ) -> Result, String> { + match source { + TokenSource::Cloud => crate::cloud_auth::CLOUD_AUTH + .get_or_refresh_sync_token() + .await + .map_err(|e| format!("Failed to refresh cloud sync token: {e}")), + TokenSource::SelfHosted => SettingsManager::instance() + .get_sync_token(app_handle) + .await + .map_err(|e| format!("Failed to refresh self-hosted sync token: {e}")), + } + } + async fn connect_and_listen( client: &Client, base_url: &str,