diff --git a/src/helpers/events/subscribe.rs b/src/helpers/events/subscribe.rs index dbe6047c..eb1a3963 100644 --- a/src/helpers/events/subscribe.rs +++ b/src/helpers/events/subscribe.rs @@ -248,25 +248,30 @@ pub(super) async fn handle_subscribe( }; // Pull loop - let result = pull_loop(&client, &pubsub_token, &pubsub_subscription, config.clone()).await; + let result = pull_loop(&client, &pubsub_subscription, config.clone()).await; // On exit, print reconnection info or cleanup if created_resources { if config.cleanup { eprintln!("\nCleaning up Pub/Sub resources..."); + // Refresh the token for cleanup — the original may have expired + // during the long-running pull loop. + let cleanup_token = auth::get_token(&[PUBSUB_SCOPE]) + .await + .unwrap_or_default(); // Delete Pub/Sub subscription let _ = client .delete(format!( "https://pubsub.googleapis.com/v1/{pubsub_subscription}" )) - .bearer_auth(&pubsub_token) + .bearer_auth(&cleanup_token) .send() .await; // Delete Pub/Sub topic if let Some(ref topic) = topic_name { let _ = client .delete(format!("https://pubsub.googleapis.com/v1/{topic}")) - .bearer_auth(&pubsub_token) + .bearer_auth(&cleanup_token) .send() .await; } @@ -301,12 +306,18 @@ pub(super) async fn handle_subscribe( /// Pulls messages from a Pub/Sub subscription in a loop. async fn pull_loop( client: &reqwest::Client, - token: &str, subscription: &str, config: SubscribeConfig, ) -> Result<(), GwsError> { let mut file_counter: u64 = 0; loop { + // Refresh token on each iteration to avoid expiry after ~1 hour. + // `get_token` uses cached credentials and only contacts the OAuth server + // when the current access token is expired, so this is inexpensive. + let token = auth::get_token(&[PUBSUB_SCOPE]) + .await + .map_err(|e| GwsError::Auth(format!("Failed to refresh Pub/Sub token: {e}")))?; + let pull_body = json!({ "maxMessages": config.max_messages, }); @@ -315,7 +326,7 @@ async fn pull_loop( .post(format!( "https://pubsub.googleapis.com/v1/{subscription}:pull" )) - .bearer_auth(token) + .bearer_auth(&token) .header("Content-Type", "application/json") .json(&pull_body) .timeout(std::time::Duration::from_secs(config.poll_interval.max(10))) @@ -382,7 +393,7 @@ async fn pull_loop( .post(format!( "https://pubsub.googleapis.com/v1/{subscription}:acknowledge" )) - .bearer_auth(token) + .bearer_auth(&token) .header("Content-Type", "application/json") .json(&ack_body) .send() diff --git a/src/helpers/gmail/watch.rs b/src/helpers/gmail/watch.rs index 86401799..526c0f7b 100644 --- a/src/helpers/gmail/watch.rs +++ b/src/helpers/gmail/watch.rs @@ -197,8 +197,6 @@ pub(super) async fn handle_watch( // Pull loop let result = watch_pull_loop( &client, - &pubsub_token, - &gmail_token, &pubsub_subscription, &mut last_history_id, config.clone(), @@ -210,18 +208,23 @@ pub(super) async fn handle_watch( if created_resources { if config.cleanup { eprintln!("\nCleaning up Pub/Sub resources..."); + // Refresh the token for cleanup — the original may have expired + // during the long-running pull loop. + let cleanup_token = auth::get_token(&[PUBSUB_SCOPE]) + .await + .unwrap_or_default(); let _ = client .delete(format!( "https://pubsub.googleapis.com/v1/{}", pubsub_subscription )) - .bearer_auth(&pubsub_token) + .bearer_auth(&cleanup_token) .send() .await; if let Some(ref topic) = topic_name { let _ = client .delete(format!("https://pubsub.googleapis.com/v1/{}", topic)) - .bearer_auth(&pubsub_token) + .bearer_auth(&cleanup_token) .send() .await; } @@ -246,20 +249,28 @@ pub(super) async fn handle_watch( /// Pull loop for Gmail watch — polls Pub/Sub, fetches messages via history API. async fn watch_pull_loop( client: &reqwest::Client, - pubsub_token: &str, - gmail_token: &str, subscription: &str, last_history_id: &mut u64, config: WatchConfig, sanitize_config: &crate::helpers::modelarmor::SanitizeConfig, ) -> Result<(), GwsError> { loop { + // Refresh tokens on each iteration to avoid expiry after ~1 hour. + // `get_token` uses cached credentials and only contacts the OAuth server + // when the current access token is expired, so this is inexpensive. + let pubsub_token = auth::get_token(&[PUBSUB_SCOPE]) + .await + .context("Failed to refresh Pub/Sub token")?; + let gmail_token = auth::get_token(&[GMAIL_SCOPE]) + .await + .context("Failed to refresh Gmail token")?; + let pull_body = json!({ "maxMessages": config.max_messages }); let pull_future = client .post(format!( "https://pubsub.googleapis.com/v1/{subscription}:pull" )) - .bearer_auth(pubsub_token) + .bearer_auth(&pubsub_token) .header("Content-Type", "application/json") .json(&pull_body) .timeout(std::time::Duration::from_secs(config.poll_interval.max(10))) @@ -297,7 +308,7 @@ async fn watch_pull_loop( // Fetch new messages via history API fetch_and_output_messages( client, - gmail_token, + &gmail_token, *last_history_id, &config.format, config.output_dir.as_ref(), @@ -317,7 +328,7 @@ async fn watch_pull_loop( .post(format!( "https://pubsub.googleapis.com/v1/{subscription}:acknowledge" )) - .bearer_auth(pubsub_token) + .bearer_auth(&pubsub_token) .header("Content-Type", "application/json") .json(&ack_body) .send()