fix: prevent stale sse token refresh

This commit is contained in:
zhom
2026-06-24 00:51:21 +04:00
parent 9edc154397
commit 94cccc3702
+53 -4
View File
@@ -29,24 +29,35 @@ pub enum SyncWorkItem {
Tombstone(String, String),
}
/// Where a subscription's sync token comes from, so reconnects can re-fetch a
/// fresh one (tokens are short-lived, ~15 min).
#[derive(Clone, Copy)]
enum TokenSource {
Cloud,
SelfHosted,
}
pub struct SyncSubscription {
client: Client,
base_url: String,
token: String,
source: TokenSource,
running: Arc<AtomicBool>,
work_tx: mpsc::UnboundedSender<SyncWorkItem>,
}
impl SyncSubscription {
pub fn new(
fn new(
base_url: String,
token: String,
source: TokenSource,
work_tx: mpsc::UnboundedSender<SyncWorkItem>,
) -> Self {
Self {
client: Client::new(),
base_url: base_url.trim_end_matches('/').to_string(),
token,
source,
running: Arc::new(AtomicBool::new(false)),
work_tx,
}
@@ -66,7 +77,7 @@ impl SyncSubscription {
let Some(token) = token else {
return Ok(None);
};
return Ok(Some(Self::new(url, token, work_tx)));
return Ok(Some(Self::new(url, token, TokenSource::Cloud, work_tx)));
}
// Fall back to self-hosted settings
@@ -88,7 +99,12 @@ impl SyncSubscription {
return Ok(None);
};
Ok(Some(Self::new(server_url, token, work_tx)))
Ok(Some(Self::new(
server_url,
token,
TokenSource::SelfHosted,
work_tx,
)))
}
pub fn is_running(&self) -> bool {
@@ -106,9 +122,10 @@ impl SyncSubscription {
let running = self.running.clone();
let base_url = self.base_url.clone();
let token = self.token.clone();
let source = self.source;
let work_tx = self.work_tx.clone();
let client = self.client.clone();
let mut token = self.token.clone();
tokio::spawn(async move {
while running.load(Ordering::SeqCst) {
@@ -126,6 +143,20 @@ impl SyncSubscription {
if running.load(Ordering::SeqCst) {
sleep(Duration::from_secs(1)).await;
// Refresh the sync token before reconnecting. The token may have
// expired while the stream was open (tokens last ~15 min); reusing
// the construction-time token otherwise produces an endless 401
// reconnect loop until the app is restarted (issue #440).
match Self::fetch_sync_token(source, &app_handle).await {
Ok(Some(fresh)) => token = fresh,
Ok(None) => {
log::info!("Sync token no longer available; stopping subscription");
break;
}
Err(e) => {
log::warn!("Failed to refresh sync token: {e}; retrying with the current token");
}
}
}
}
@@ -133,6 +164,24 @@ impl SyncSubscription {
});
}
/// Fetch a current sync token from the same source the subscription was
/// created from, so reconnects never reuse a stale (expired) token.
async fn fetch_sync_token(
source: TokenSource,
app_handle: &tauri::AppHandle,
) -> Result<Option<String>, String> {
match source {
TokenSource::Cloud => crate::cloud_auth::CLOUD_AUTH
.get_or_refresh_sync_token()
.await
.map_err(|e| format!("Failed to refresh cloud sync token: {e}")),
TokenSource::SelfHosted => SettingsManager::instance()
.get_sync_token(app_handle)
.await
.map_err(|e| format!("Failed to refresh self-hosted sync token: {e}")),
}
}
async fn connect_and_listen(
client: &Client,
base_url: &str,