Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions src/helpers/events/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,25 +248,30 @@ pub(super) async fn handle_subscribe(
};

// Pull loop
let result = pull_loop(&client, &pubsub_token, &pubsub_subscription, config.clone()).await;
let result = pull_loop(&client, &pubsub_subscription, config.clone()).await;

// On exit, print reconnection info or cleanup
if created_resources {
if config.cleanup {
eprintln!("\nCleaning up Pub/Sub resources...");
// Refresh the token for cleanup — the original may have expired
// during the long-running pull loop.
let cleanup_token = auth::get_token(&[PUBSUB_SCOPE])
.await
.unwrap_or_default();
// Delete Pub/Sub subscription
let _ = client
.delete(format!(
"https://pubsub.googleapis.com/v1/{pubsub_subscription}"
))
.bearer_auth(&pubsub_token)
.bearer_auth(&cleanup_token)
.send()
.await;
// Delete Pub/Sub topic
if let Some(ref topic) = topic_name {
let _ = client
.delete(format!("https://pubsub.googleapis.com/v1/{topic}"))
.bearer_auth(&pubsub_token)
.bearer_auth(&cleanup_token)
.send()
.await;
}
Expand Down Expand Up @@ -301,12 +306,18 @@ pub(super) async fn handle_subscribe(
/// Pulls messages from a Pub/Sub subscription in a loop.
async fn pull_loop(
client: &reqwest::Client,
token: &str,
subscription: &str,
config: SubscribeConfig,
) -> Result<(), GwsError> {
let mut file_counter: u64 = 0;
loop {
// Refresh token on each iteration to avoid expiry after ~1 hour.
// `get_token` uses cached credentials and only contacts the OAuth server
// when the current access token is expired, so this is inexpensive.
let token = auth::get_token(&[PUBSUB_SCOPE])
.await
.map_err(|e| GwsError::Auth(format!("Failed to refresh Pub/Sub token: {e}")))?;

let pull_body = json!({
"maxMessages": config.max_messages,
});
Expand All @@ -315,7 +326,7 @@ async fn pull_loop(
.post(format!(
"https://pubsub.googleapis.com/v1/{subscription}:pull"
))
.bearer_auth(token)
.bearer_auth(&token)
.header("Content-Type", "application/json")
.json(&pull_body)
.timeout(std::time::Duration::from_secs(config.poll_interval.max(10)))
Expand Down Expand Up @@ -382,7 +393,7 @@ async fn pull_loop(
.post(format!(
"https://pubsub.googleapis.com/v1/{subscription}:acknowledge"
))
.bearer_auth(token)
.bearer_auth(&token)
.header("Content-Type", "application/json")
.json(&ack_body)
.send()
Expand Down
29 changes: 20 additions & 9 deletions src/helpers/gmail/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ pub(super) async fn handle_watch(
// Pull loop
let result = watch_pull_loop(
&client,
&pubsub_token,
&gmail_token,
&pubsub_subscription,
&mut last_history_id,
config.clone(),
Expand All @@ -210,18 +208,23 @@ pub(super) async fn handle_watch(
if created_resources {
if config.cleanup {
eprintln!("\nCleaning up Pub/Sub resources...");
// Refresh the token for cleanup — the original may have expired
// during the long-running pull loop.
let cleanup_token = auth::get_token(&[PUBSUB_SCOPE])
.await
.unwrap_or_default();
let _ = client
.delete(format!(
"https://pubsub.googleapis.com/v1/{}",
pubsub_subscription
))
.bearer_auth(&pubsub_token)
.bearer_auth(&cleanup_token)
.send()
.await;
if let Some(ref topic) = topic_name {
let _ = client
.delete(format!("https://pubsub.googleapis.com/v1/{}", topic))
.bearer_auth(&pubsub_token)
.bearer_auth(&cleanup_token)
.send()
.await;
}
Expand All @@ -246,20 +249,28 @@ pub(super) async fn handle_watch(
/// Pull loop for Gmail watch — polls Pub/Sub, fetches messages via history API.
async fn watch_pull_loop(
client: &reqwest::Client,
pubsub_token: &str,
gmail_token: &str,
subscription: &str,
last_history_id: &mut u64,
config: WatchConfig,
sanitize_config: &crate::helpers::modelarmor::SanitizeConfig,
) -> Result<(), GwsError> {
loop {
// Refresh tokens on each iteration to avoid expiry after ~1 hour.
// `get_token` uses cached credentials and only contacts the OAuth server
// when the current access token is expired, so this is inexpensive.
let pubsub_token = auth::get_token(&[PUBSUB_SCOPE])
.await
.context("Failed to refresh Pub/Sub token")?;
let gmail_token = auth::get_token(&[GMAIL_SCOPE])
.await
.context("Failed to refresh Gmail token")?;

let pull_body = json!({ "maxMessages": config.max_messages });
let pull_future = client
.post(format!(
"https://pubsub.googleapis.com/v1/{subscription}:pull"
))
.bearer_auth(pubsub_token)
.bearer_auth(&pubsub_token)
.header("Content-Type", "application/json")
.json(&pull_body)
.timeout(std::time::Duration::from_secs(config.poll_interval.max(10)))
Expand Down Expand Up @@ -297,7 +308,7 @@ async fn watch_pull_loop(
// Fetch new messages via history API
fetch_and_output_messages(
client,
gmail_token,
&gmail_token,
*last_history_id,
&config.format,
config.output_dir.as_ref(),
Expand All @@ -317,7 +328,7 @@ async fn watch_pull_loop(
.post(format!(
"https://pubsub.googleapis.com/v1/{subscription}:acknowledge"
))
.bearer_auth(pubsub_token)
.bearer_auth(&pubsub_token)
.header("Content-Type", "application/json")
.json(&ack_body)
.send()
Expand Down
Loading