From 4ff80d6d725bd43002e309971c531ce918bb6272 Mon Sep 17 00:00:00 2001 From: zerone0x Date: Wed, 11 Mar 2026 16:37:05 +0800 Subject: [PATCH] fix: refresh OAuth2 tokens in long-running watch/subscribe loops The `+watch` and `+subscribe` commands obtained OAuth2 access tokens once at startup and passed them as static `&str` values into their pull loops. Since Google access tokens expire after 3600 seconds, both commands would exit with a 401 error after ~1 hour. Fix by calling `auth::get_token()` on each loop iteration instead of once before the loop. The underlying `yup_oauth2` authenticator uses cached credentials and only contacts the OAuth server when the current access token has expired, so this adds negligible overhead. Also refresh the token before cleanup operations that run after the potentially long-lived loop. Fixes #392 Co-Authored-By: Claude Opus 4.6 --- src/helpers/events/subscribe.rs | 23 +++++++++++++++++------ src/helpers/gmail/watch.rs | 29 ++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/src/helpers/events/subscribe.rs b/src/helpers/events/subscribe.rs index dbe6047c..eb1a3963 100644 --- a/src/helpers/events/subscribe.rs +++ b/src/helpers/events/subscribe.rs @@ -248,25 +248,30 @@ pub(super) async fn handle_subscribe( }; // Pull loop - let result = pull_loop(&client, &pubsub_token, &pubsub_subscription, config.clone()).await; + let result = pull_loop(&client, &pubsub_subscription, config.clone()).await; // On exit, print reconnection info or cleanup if created_resources { if config.cleanup { eprintln!("\nCleaning up Pub/Sub resources..."); + // Refresh the token for cleanup — the original may have expired + // during the long-running pull loop. + let cleanup_token = auth::get_token(&[PUBSUB_SCOPE]) + .await + .unwrap_or_default(); // Delete Pub/Sub subscription let _ = client .delete(format!( "https://pubsub.googleapis.com/v1/{pubsub_subscription}" )) - .bearer_auth(&pubsub_token) + .bearer_auth(&cleanup_token) .send() .await; // Delete Pub/Sub topic if let Some(ref topic) = topic_name { let _ = client .delete(format!("https://pubsub.googleapis.com/v1/{topic}")) - .bearer_auth(&pubsub_token) + .bearer_auth(&cleanup_token) .send() .await; } @@ -301,12 +306,18 @@ pub(super) async fn handle_subscribe( /// Pulls messages from a Pub/Sub subscription in a loop. async fn pull_loop( client: &reqwest::Client, - token: &str, subscription: &str, config: SubscribeConfig, ) -> Result<(), GwsError> { let mut file_counter: u64 = 0; loop { + // Refresh token on each iteration to avoid expiry after ~1 hour. + // `get_token` uses cached credentials and only contacts the OAuth server + // when the current access token is expired, so this is inexpensive. + let token = auth::get_token(&[PUBSUB_SCOPE]) + .await + .map_err(|e| GwsError::Auth(format!("Failed to refresh Pub/Sub token: {e}")))?; + let pull_body = json!({ "maxMessages": config.max_messages, }); @@ -315,7 +326,7 @@ async fn pull_loop( .post(format!( "https://pubsub.googleapis.com/v1/{subscription}:pull" )) - .bearer_auth(token) + .bearer_auth(&token) .header("Content-Type", "application/json") .json(&pull_body) .timeout(std::time::Duration::from_secs(config.poll_interval.max(10))) @@ -382,7 +393,7 @@ async fn pull_loop( .post(format!( "https://pubsub.googleapis.com/v1/{subscription}:acknowledge" )) - .bearer_auth(token) + .bearer_auth(&token) .header("Content-Type", "application/json") .json(&ack_body) .send() diff --git a/src/helpers/gmail/watch.rs b/src/helpers/gmail/watch.rs index 86401799..526c0f7b 100644 --- a/src/helpers/gmail/watch.rs +++ b/src/helpers/gmail/watch.rs @@ -197,8 +197,6 @@ pub(super) async fn handle_watch( // Pull loop let result = watch_pull_loop( &client, - &pubsub_token, - &gmail_token, &pubsub_subscription, &mut last_history_id, config.clone(), @@ -210,18 +208,23 @@ pub(super) async fn handle_watch( if created_resources { if config.cleanup { eprintln!("\nCleaning up Pub/Sub resources..."); + // Refresh the token for cleanup — the original may have expired + // during the long-running pull loop. + let cleanup_token = auth::get_token(&[PUBSUB_SCOPE]) + .await + .unwrap_or_default(); let _ = client .delete(format!( "https://pubsub.googleapis.com/v1/{}", pubsub_subscription )) - .bearer_auth(&pubsub_token) + .bearer_auth(&cleanup_token) .send() .await; if let Some(ref topic) = topic_name { let _ = client .delete(format!("https://pubsub.googleapis.com/v1/{}", topic)) - .bearer_auth(&pubsub_token) + .bearer_auth(&cleanup_token) .send() .await; } @@ -246,20 +249,28 @@ pub(super) async fn handle_watch( /// Pull loop for Gmail watch — polls Pub/Sub, fetches messages via history API. async fn watch_pull_loop( client: &reqwest::Client, - pubsub_token: &str, - gmail_token: &str, subscription: &str, last_history_id: &mut u64, config: WatchConfig, sanitize_config: &crate::helpers::modelarmor::SanitizeConfig, ) -> Result<(), GwsError> { loop { + // Refresh tokens on each iteration to avoid expiry after ~1 hour. + // `get_token` uses cached credentials and only contacts the OAuth server + // when the current access token is expired, so this is inexpensive. + let pubsub_token = auth::get_token(&[PUBSUB_SCOPE]) + .await + .context("Failed to refresh Pub/Sub token")?; + let gmail_token = auth::get_token(&[GMAIL_SCOPE]) + .await + .context("Failed to refresh Gmail token")?; + let pull_body = json!({ "maxMessages": config.max_messages }); let pull_future = client .post(format!( "https://pubsub.googleapis.com/v1/{subscription}:pull" )) - .bearer_auth(pubsub_token) + .bearer_auth(&pubsub_token) .header("Content-Type", "application/json") .json(&pull_body) .timeout(std::time::Duration::from_secs(config.poll_interval.max(10))) @@ -297,7 +308,7 @@ async fn watch_pull_loop( // Fetch new messages via history API fetch_and_output_messages( client, - gmail_token, + &gmail_token, *last_history_id, &config.format, config.output_dir.as_ref(), @@ -317,7 +328,7 @@ async fn watch_pull_loop( .post(format!( "https://pubsub.googleapis.com/v1/{subscription}:acknowledge" )) - .bearer_auth(pubsub_token) + .bearer_auth(&pubsub_token) .header("Content-Type", "application/json") .json(&ack_body) .send()