diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index c4b01ba2f3e..160b9b12406 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -35,6 +35,7 @@ use quickwit_proto::types::NodeId; use scheduling::{SourceToSchedule, SourceToScheduleType}; use serde::Serialize; use tracing::{debug, info, warn}; +use ulid::Ulid; use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier}; @@ -419,6 +420,11 @@ impl IndexingScheduler { ) { debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan"); APPLY_PLAN_TOTAL.inc(); + // The indexing plan ID is a monotonically increasing time based ID that's used as the + // publish token for indexers, which ensures indexing plans and shard acquisition are always + // informed by the most recent plan. + let indexing_plan_id = Ulid::new().to_string(); + // Retiring and decommissioning indexers still receive the plan so they can gracefully shut // down dropped pipelines; other states (initializing, decommissioned, failed) are skipped. for indexer in self.indexer_pool.values().into_iter().filter(|indexer| { @@ -431,20 +437,24 @@ impl IndexingScheduler { .indexer(indexer.node_id.as_str()) .unwrap_or(&[]) .to_vec(); + // We don't want to block on a slow indexer so we apply this change asynchronously. - // Bound the apply only for retiring/decommissioning indexers, so a slow or unreachable - // draining node can't hold the change-notification guard; ready indexers get no - // timeout. + // Retiring/decommissioning indexers are time-bound, so a slow or unreachable + // draining node can't hold the notify guard. Ready indexers get no timeout. let apply_deadline = matches!( indexer.ingester_status, IngesterStatus::Retiring | IngesterStatus::Decommissioning ) .then_some(APPLY_INDEXING_PLAN_TIMEOUT); + let notify_on_drop = notify_on_drop.clone(); + let indexing_plan_id = indexing_plan_id.clone(); tokio::spawn(async move { let client = indexer.client.clone(); - let apply_plan_fut = - client.apply_indexing_plan(ApplyIndexingPlanRequest { indexing_tasks }); + let apply_plan_fut = client.apply_indexing_plan(ApplyIndexingPlanRequest { + indexing_tasks, + indexing_plan_id, + }); let apply_result = match apply_deadline { Some(timeout) => tokio::time::timeout(timeout, apply_plan_fut).await, None => Ok(apply_plan_fut.await), diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 559d392afba..dfc9dc70abb 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -41,9 +41,7 @@ use tokio::runtime::Handle; use super::vrl_processing::*; use crate::actors::Indexer; use crate::metrics::{PROCESSED_BYTES, PROCESSED_DOCS_TOTAL}; -use crate::models::{ - NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch, -}; +use crate::models::{NewPublishLock, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch}; const PLAIN_TEXT: &str = "plain_text"; pub(super) struct JsonDoc { @@ -607,20 +605,6 @@ impl Handler for DocProcessor { } } -#[async_trait] -impl Handler for DocProcessor { - type Reply = (); - - async fn handle( - &mut self, - message: NewPublishToken, - ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - ctx.send_message(&self.indexer_mailbox, message).await?; - Ok(()) - } -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/quickwit/quickwit-indexing/src/actors/index_serializer.rs b/quickwit/quickwit-indexing/src/actors/index_serializer.rs index 2bc79cea5b2..086eb73f08b 100644 --- a/quickwit/quickwit-indexing/src/actors/index_serializer.rs +++ b/quickwit/quickwit-indexing/src/actors/index_serializer.rs @@ -89,7 +89,6 @@ impl Handler for IndexSerializer { splits, checkpoint_delta_opt: batch_builder.checkpoint_delta_opt, publish_lock: batch_builder.publish_lock, - publish_token_opt: batch_builder.publish_token_opt, merge_task_opt: None, batch_parent_span: batch_builder.batch_parent_span, }; diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index da36d6b7994..4ec8f54dfbf 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -38,7 +38,7 @@ use quickwit_proto::indexing::{IndexingPipelineId, PipelineMetrics}; use quickwit_proto::metastore::{ LastDeleteOpstampRequest, MetastoreService, MetastoreServiceClient, }; -use quickwit_proto::types::{DocMappingUid, PublishToken}; +use quickwit_proto::types::DocMappingUid; use quickwit_query::get_quickwit_fastfield_normalizer_manager; use serde::Serialize; use tantivy::schema::Schema; @@ -55,7 +55,7 @@ use super::cooperative_indexing::{CooperativeIndexingCycle, CooperativeIndexingP use crate::metrics::SPLIT_BUILDERS; use crate::models::{ CommitTrigger, EmptySplit, IndexedSplitBatchBuilder, IndexedSplitBuilder, NewPublishLock, - NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, + ProcessedDoc, ProcessedDocBatch, PublishLock, }; // Random partition ID used to gather partitions exceeding the maximum number of partitions. @@ -93,7 +93,6 @@ struct IndexerState { indexing_directory: TempDirectory, indexing_settings: IndexingSettings, publish_lock: PublishLock, - publish_token_opt: Option, schema: Schema, doc_mapping_uid: DocMappingUid, tokenizer_manager: TokenizerManager, @@ -219,7 +218,6 @@ impl IndexerState { source_delta: SourceCheckpointDelta::default(), }; let publish_lock = self.publish_lock.clone(); - let publish_token_opt = self.publish_token_opt.clone(); let split_builders_guard = GaugeGuard::new(&SPLIT_BUILDERS, 1.0); @@ -231,7 +229,6 @@ impl IndexerState { other_indexed_split_opt: None, checkpoint_delta, publish_lock, - publish_token_opt, last_delete_opstamp, memory_usage: GaugeGuard::new(&IN_FLIGHT_INDEX_WRITER, 0.0), cooperative_indexing_period, @@ -349,7 +346,6 @@ struct IndexingWorkbench { checkpoint_delta: IndexCheckpointDelta, publish_lock: PublishLock, - publish_token_opt: Option, // On workbench creation, we fetch from the metastore the last delete task opstamp. // We use this value to set the `delete_opstamp` of the workbench splits. last_delete_opstamp: u64, @@ -513,21 +509,6 @@ impl Handler for Indexer { } } -#[async_trait] -impl Handler for Indexer { - type Reply = (); - - async fn handle( - &mut self, - message: NewPublishToken, - _ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - let NewPublishToken(publish_token) = message; - self.indexer_state.publish_token_opt = Some(publish_token); - Ok(()) - } -} - impl Indexer { pub fn new( pipeline_id: IndexingPipelineId, @@ -563,7 +544,6 @@ impl Indexer { indexing_directory, indexing_settings, publish_lock: PublishLock::default(), - publish_token_opt: None, schema, doc_mapping_uid: doc_mapper.doc_mapping_uid(), tokenizer_manager: tokenizer_manager.tantivy_manager().clone(), @@ -630,7 +610,6 @@ impl Indexer { other_indexed_split_opt, checkpoint_delta, publish_lock, - publish_token_opt, batch_parent_span, memory_usage, split_builders_guard, @@ -656,7 +635,6 @@ impl Indexer { index_uid: self.indexer_state.pipeline_id.index_uid.clone(), checkpoint_delta, publish_lock, - publish_token_opt, batch_parent_span, }, ) @@ -680,7 +658,6 @@ impl Indexer { splits, checkpoint_delta_opt: Some(checkpoint_delta), publish_lock, - publish_token_opt, commit_trigger, batch_parent_span, memory_usage, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 9d6287245ac..234a1f90813 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -46,7 +46,7 @@ use crate::actors::uploader::UploaderType; use crate::actors::{Publisher, Uploader}; use crate::merge_policy::MergePolicy; use crate::metrics::{ACTOR_NAME, BACKPRESSURE_MICROS, INDEXING_PIPELINES}; -use crate::models::IndexingStatistics; +use crate::models::{IndexingStatistics, SharedPublishToken}; use crate::source::{ AssignShards, Assignment, SourceActor, SourceRuntime, quickwit_supported_sources, }; @@ -89,6 +89,10 @@ pub struct IndexingPipeline { // requiring a respawn of the pipeline. // We keep the list of shards here however, to reassign them after a respawn. shard_ids: BTreeSet, + // Id of the last indexing plan assigned to this pipeline. Kept here, like `shard_ids`, so it + // can be re-sent to the source on respawn; the source adopts it as its publish token. + indexing_plan_id: String, + publish_token: SharedPublishToken, _indexing_pipelines_gauge_guard: GaugeGuard, } @@ -137,6 +141,8 @@ impl IndexingPipeline { ..Default::default() }, shard_ids: Default::default(), + indexing_plan_id: String::new(), + publish_token: SharedPublishToken::default(), _indexing_pipelines_gauge_guard: indexing_pipelines_gauge_guard, } } @@ -306,6 +312,7 @@ impl IndexingPipeline { self.params.metastore.clone(), Some(self.params.merge_planner_mailbox.clone()), Some(source_mailbox.clone()), + self.publish_token.clone(), ); let (publisher_mailbox, publisher_handle) = ctx .spawn_actor() @@ -390,6 +397,7 @@ impl IndexingPipeline { storage_resolver: self.params.source_storage_resolver.clone(), event_broker: self.params.event_broker.clone(), indexing_setting: self.params.indexing_settings.clone(), + publish_token: self.publish_token.clone(), }; let source = ctx .protect_future(quickwit_supported_sources().load_source(source_runtime)) @@ -402,6 +410,7 @@ impl IndexingPipeline { .spawn(actor_source); let assign_shards_message = AssignShards(Assignment { shard_ids: self.shard_ids.clone(), + indexing_plan_id: self.indexing_plan_id.clone(), }); source_mailbox.send_message(assign_shards_message).await?; @@ -496,6 +505,8 @@ impl Handler for IndexingPipeline { ) -> Result<(), ActorExitStatus> { self.shard_ids .clone_from(&assign_shards_message.0.shard_ids); + self.indexing_plan_id + .clone_from(&assign_shards_message.0.indexing_plan_id); // If the pipeline is running, we forward the message to its source. // If it is not, it will be respawned soon, and the shards will be assigned afterward. if let Some(handles) = &self.handles_opt { diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 57197a8fc58..e17265b93f1 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -52,7 +52,7 @@ use quickwit_proto::metastore::{ ListIndexesMetadataRequest, ListSplitsRequest, MetastoreResult, MetastoreService, MetastoreServiceClient, }; -use quickwit_proto::types::{IndexId, IndexUid, NodeId, PipelineUid, ShardId}; +use quickwit_proto::types::{IndexId, IndexUid, NodeId, PipelineUid, PublishToken, ShardId}; use quickwit_storage::StorageResolver; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -110,6 +110,7 @@ pub struct IndexingService { pub(crate) ingester_pool: IngesterPool, pub(crate) storage_resolver: StorageResolver, indexing_pipelines: HashMap, + latest_indexing_plan_id: PublishToken, counters: IndexingServiceCounters, local_split_store: Arc, pub(crate) max_concurrent_split_uploads: usize, @@ -182,6 +183,7 @@ impl IndexingService { storage_resolver, local_split_store: Arc::new(local_split_store), indexing_pipelines: Default::default(), + latest_indexing_plan_id: String::new(), counters: Default::default(), max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads, #[cfg(feature = "metrics")] @@ -760,8 +762,8 @@ impl IndexingService { /// or not. /// /// If a pipeline actor has failed, this function just logs an error. - async fn assign_shards_to_pipelines(&mut self, tasks: &[IndexingTask]) { - for task in tasks { + async fn assign_shards_to_pipelines(&mut self, plan_request: &ApplyIndexingPlanRequest) { + for task in &plan_request.indexing_tasks { if task.shard_ids.is_empty() { continue; } @@ -771,6 +773,7 @@ impl IndexingService { }; let assignment = Assignment { shard_ids: task.shard_ids.iter().cloned().collect(), + indexing_plan_id: plan_request.indexing_plan_id.clone(), }; let message = AssignShards(assignment); @@ -785,10 +788,21 @@ impl IndexingService { /// - Starting the pipelines that are not running. async fn apply_indexing_plan( &mut self, - tasks: &[IndexingTask], + plan_request: ApplyIndexingPlanRequest, ctx: &ActorContext, ) -> Result<(), IndexingError> { - let pipeline_diff = self.compute_pipeline_diff(tasks); + // Plan ids are ULIDs + if plan_request.indexing_plan_id < self.latest_indexing_plan_id { + info!( + dropped_plan_id = %plan_request.indexing_plan_id, + latest_plan_id = %self.latest_indexing_plan_id, + "ignoring stale indexing plan" + ); + return Ok(()); + } + self.latest_indexing_plan_id = plan_request.indexing_plan_id.clone(); + + let pipeline_diff = self.compute_pipeline_diff(&plan_request.indexing_tasks); if !pipeline_diff.pipelines_to_shutdown.is_empty() { self.shutdown_pipelines(&pipeline_diff.pipelines_to_shutdown) @@ -801,7 +815,7 @@ impl IndexingService { .spawn_pipelines(&pipeline_diff.pipelines_to_spawn, ctx) .await?; } - self.assign_shards_to_pipelines(tasks).await; + self.assign_shards_to_pipelines(&plan_request).await; self.update_chitchat_running_plan().await; if !spawn_pipeline_failures.is_empty() { @@ -1135,7 +1149,7 @@ impl Handler for IndexingService { ctx: &ActorContext, ) -> Result { Ok(self - .apply_indexing_plan(&plan_request.indexing_tasks, ctx) + .apply_indexing_plan(plan_request, ctx) .await .map(|_| ApplyIndexingPlanResponse {})) } @@ -1465,7 +1479,10 @@ mod tests { }, ]; indexing_service - .ask_for_res(ApplyIndexingPlanRequest { indexing_tasks }) + .ask_for_res(ApplyIndexingPlanRequest { + indexing_tasks, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }) .await .unwrap(); assert_eq!( @@ -1531,6 +1548,7 @@ mod tests { indexing_service .ask_for_res(ApplyIndexingPlanRequest { indexing_tasks: indexing_tasks.clone(), + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), }) .await .unwrap(); @@ -1587,6 +1605,7 @@ mod tests { indexing_service .ask_for_res(ApplyIndexingPlanRequest { indexing_tasks: indexing_tasks.clone(), + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), }) .await .unwrap(); @@ -1646,6 +1665,7 @@ mod tests { indexing_service .ask_for_res(ApplyIndexingPlanRequest { indexing_tasks: indexing_tasks.clone(), + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), }) .await .unwrap(); @@ -1665,6 +1685,7 @@ mod tests { indexing_service .ask_for_res(ApplyIndexingPlanRequest { indexing_tasks: Vec::new(), + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), }) .await .unwrap(); @@ -1676,6 +1697,110 @@ mod tests { universe.assert_quit().await; } + #[tokio::test] + async fn test_indexing_service_drops_superseded_plan() { + quickwit_common::setup_logging_for_tests(); + let transport = ChitchatTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let metastore = metastore_for_test(); + + let index_id = append_random_suffix("test-plan-gate"); + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(&index_id, &index_uri); + let create_index_request = + CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let index_uid: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let source_config = SourceConfig { + source_id: "test-plan-gate--source".to_string(), + num_pipelines: NonZeroUsize::MIN, + enabled: true, + source_params: SourceParams::void(), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + let add_source_request = + AddSourceRequest::try_from_source_config(index_uid.clone(), &source_config).unwrap(); + metastore.add_source(add_source_request).await.unwrap(); + + let universe = Universe::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let (indexing_service, indexing_service_handle) = spawn_indexing_service_for_test( + temp_dir.path(), + &universe, + metastore.clone(), + cluster.clone(), + ) + .await; + + let params_fingerprint = + indexing_pipeline_params_fingerprint(&index_config, &source_config); + let task = |pipeline_uid: u128| IndexingTask { + index_uid: Some(index_uid.clone()), + source_id: source_config.source_id.clone(), + shard_ids: Vec::new(), + pipeline_uid: Some(PipelineUid::for_test(pipeline_uid)), + params_fingerprint, + }; + + indexing_service + .ask_for_res(ApplyIndexingPlanRequest { + indexing_tasks: vec![task(0), task(1)], + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5F50".to_string(), + }) + .await + .unwrap(); + assert_eq!( + indexing_service_handle + .observe() + .await + .num_running_pipelines, + 2 + ); + + // A superseded (older id) plan that would drop a pipeline is ignored. + indexing_service + .ask_for_res(ApplyIndexingPlanRequest { + indexing_tasks: vec![task(0)], + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5F40".to_string(), + }) + .await + .unwrap(); + assert_eq!( + indexing_service_handle + .observe() + .await + .num_running_pipelines, + 2 + ); + + // A newer plan applies, dropping the second pipeline. + indexing_service + .ask_for_res(ApplyIndexingPlanRequest { + indexing_tasks: vec![task(0)], + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5F60".to_string(), + }) + .await + .unwrap(); + assert_eq!( + indexing_service_handle + .observe() + .await + .num_running_pipelines, + 1 + ); + + indexing_service_handle.quit().await; + universe.assert_quit().await; + } + #[tokio::test] async fn test_indexing_service_shutdown_merge_pipeline_when_no_indexing_pipeline() { quickwit_common::setup_logging_for_tests(); @@ -2072,6 +2197,7 @@ mod tests { params_fingerprint: 0, }, ], + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), }) .await .unwrap(); diff --git a/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs b/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs index 02a4bca1661..ddffb45231b 100644 --- a/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs +++ b/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs @@ -15,7 +15,6 @@ //! `Handler` and `Handler` implementations //! for `Publisher`, specific to the logs/traces pipeline. -use anyhow::Context; use async_trait::async_trait; use fail::fail_point; use quickwit_actors::{ActorContext, ActorExitStatus, Handler}; @@ -23,7 +22,8 @@ use quickwit_proto::metastore::{MetastoreService, PublishSplitsRequest}; use tracing::{info, instrument}; use crate::actors::publisher::{ - DisconnectMergePlanner, Publisher, serialize_checkpoint_delta, suggest_truncate, + DisconnectMergePlanner, Publisher, publish_with_retry, serialize_checkpoint_delta, + suggest_truncate, }; use crate::models::{NewSplits, SplitsUpdate}; @@ -71,7 +71,6 @@ impl Handler for Publisher { replaced_split_ids, checkpoint_delta_opt, publish_lock, - publish_token_opt, .. } = split_update; @@ -81,16 +80,20 @@ impl Handler for Publisher { .map(|split| split.split_id.clone()) .collect(); if let Some(_guard) = publish_lock.acquire().await { - let publish_splits_request = PublishSplitsRequest { - index_uid: Some(index_uid), - staged_split_ids: split_ids.clone(), - replaced_split_ids: replaced_split_ids.clone(), - index_checkpoint_delta_json_opt, - publish_token_opt: publish_token_opt.clone(), - }; - ctx.protect_future(self.metastore.publish_splits(publish_splits_request)) - .await - .context("failed to publish splits")?; + publish_with_retry(ctx, "publish splits", || { + // Move the request construction in the closure so that fresh values are captured + // on each retry, such as the publish token updating + let metastore = self.metastore.clone(); + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: split_ids.clone(), + replaced_split_ids: replaced_split_ids.clone(), + index_checkpoint_delta_json_opt: index_checkpoint_delta_json_opt.clone(), + publish_token_opt: self.publish_token.load().as_deref().cloned(), + }; + async move { metastore.publish_splits(publish_splits_request).await } + }) + .await?; } else { info!( split_ids=?split_ids, @@ -123,18 +126,20 @@ impl Handler for Publisher { #[cfg(test)] mod tests { - use quickwit_actors::{QueueCapacity, Universe}; + use quickwit_actors::{ActorExitStatus, QueueCapacity, Universe}; use quickwit_metastore::checkpoint::{ IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta, }; use quickwit_metastore::{PublishSplitsRequestExt, SplitMetadata}; - use quickwit_proto::metastore::{EmptyResponse, MetastoreServiceClient, MockMetastoreService}; + use quickwit_proto::metastore::{ + EmptyResponse, MetastoreError, MetastoreServiceClient, MockMetastoreService, + }; use quickwit_proto::types::{IndexUid, Position}; use tracing::Span; use super::PUBLISHER_NAME; use crate::actors::publisher::Publisher; - use crate::models::{PublishLock, SplitsUpdate}; + use crate::models::{PublishLock, SharedPublishToken, SplitsUpdate}; use crate::source::SuggestTruncate; #[tokio::test] @@ -168,6 +173,7 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), Some(merge_planner_mailbox), Some(source_mailbox), + SharedPublishToken::default(), ); let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); @@ -185,7 +191,6 @@ mod tests { source_delta: SourceCheckpointDelta::from_range(1..3), }), publish_lock: PublishLock::default(), - publish_token_opt: None, merge_task: None, parent_span: tracing::Span::none(), }) @@ -248,6 +253,7 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), Some(merge_planner_mailbox), Some(source_mailbox), + SharedPublishToken::default(), ); let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); @@ -262,7 +268,6 @@ mod tests { source_delta: SourceCheckpointDelta::from_range(1..3), }), publish_lock: PublishLock::default(), - publish_token_opt: None, merge_task: None, parent_span: tracing::Span::none(), }) @@ -314,6 +319,7 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), Some(merge_planner_mailbox), None, + SharedPublishToken::default(), ); let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); publisher_mailbox @@ -326,7 +332,6 @@ mod tests { replaced_split_ids: vec!["split1".to_string(), "split2".to_string()], checkpoint_delta_opt: None, publish_lock: PublishLock::default(), - publish_token_opt: None, merge_task: None, parent_span: Span::none(), }) @@ -356,6 +361,7 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), Some(merge_planner_mailbox), None, + SharedPublishToken::default(), ); let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); @@ -369,7 +375,6 @@ mod tests { replaced_split_ids: Vec::new(), checkpoint_delta_opt: None, publish_lock, - publish_token_opt: None, merge_task: None, parent_span: Span::none(), }) @@ -383,4 +388,90 @@ mod tests { assert!(merger_messages.is_empty()); universe.assert_quit().await; } + + #[tokio::test] + async fn test_publisher_retries_then_succeeds_on_retryable_error() { + let universe = Universe::with_accelerated_time(); + let index_uid: IndexUid = IndexUid::for_test("index", 1); + let mut mock_metastore = MockMetastoreService::new(); + let mut attempt = 0; + mock_metastore + .expect_publish_splits() + .times(2) + .returning(move |_| { + attempt += 1; + if attempt == 1 { + Err(MetastoreError::InvalidArgument { + message: "failed to apply checkpoint delta: invalid publish token" + .to_string(), + }) + } else { + Ok(EmptyResponse {}) + } + }); + let publisher = Publisher::new( + PUBLISHER_NAME, + QueueCapacity::Bounded(1), + MetastoreServiceClient::from_mock(mock_metastore), + None, + None, + SharedPublishToken::default(), + ); + let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); + publisher_mailbox + .send_message(SplitsUpdate { + index_uid, + new_splits: vec![SplitMetadata { + split_id: "split".to_string(), + ..Default::default() + }], + replaced_split_ids: Vec::new(), + checkpoint_delta_opt: None, + publish_lock: PublishLock::default(), + merge_task: None, + parent_span: Span::none(), + }) + .await + .unwrap(); + let observation = publisher_handle.process_pending_and_observe().await.state; + assert_eq!(observation.num_published_splits, 1); + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_publisher_does_not_retry_non_retryable_error() { + let universe = Universe::with_accelerated_time(); + let index_uid: IndexUid = IndexUid::for_test("index", 1); + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_publish_splits() + .times(1) + .returning(|_| Err(MetastoreError::Timeout("publish timed out".to_string()))); + let publisher = Publisher::new( + PUBLISHER_NAME, + QueueCapacity::Bounded(1), + MetastoreServiceClient::from_mock(mock_metastore), + None, + None, + SharedPublishToken::default(), + ); + let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); + publisher_mailbox + .send_message(SplitsUpdate { + index_uid, + new_splits: vec![SplitMetadata { + split_id: "split".to_string(), + ..Default::default() + }], + replaced_split_ids: Vec::new(), + checkpoint_delta_opt: None, + publish_lock: PublishLock::default(), + merge_task: None, + parent_span: Span::none(), + }) + .await + .unwrap(); + let (exit_status, _) = publisher_handle.join().await; + assert!(matches!(exit_status, ActorExitStatus::Failure(_))); + } } diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index d07c6e8046d..965b47bd70a 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -150,7 +150,6 @@ impl Handler for MergeExecutor { splits: vec![indexed_split], checkpoint_delta_opt: Default::default(), publish_lock: PublishLock::default(), - publish_token_opt: None, batch_parent_span: merge_task.merge_parent_span.clone(), merge_task_opt: Some(merge_task), }, diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 53ece0a09fa..5169307b472 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -46,7 +46,7 @@ use crate::actors::publisher::DisconnectMergePlanner; use crate::actors::{MergeSchedulerService, Publisher, Uploader, UploaderType}; use crate::merge_policy::MergePolicy; use crate::metrics::{ACTOR_NAME, BACKPRESSURE_MICROS, ONGOING_MERGE_OPERATIONS}; -use crate::models::MergeStatistics; +use crate::models::{MergeStatistics, SharedPublishToken}; use crate::split_store::IndexingSplitStore; /// Spawning a merge pipeline puts a lot of pressure on the metastore so @@ -270,6 +270,7 @@ impl MergePipeline { self.params.metastore.clone(), Some(self.merge_planner_mailbox.clone()), None, + SharedPublishToken::default(), ); let (merge_publisher_mailbox, merge_publisher_handle) = ctx .spawn_actor() diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index 9f691d285b6..f370a42e9c3 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -152,7 +152,6 @@ impl Handler for Packager { packaged_splits, batch.checkpoint_delta_opt, batch.publish_lock, - batch.publish_token_opt, batch.merge_task_opt, batch.batch_parent_span, ), @@ -569,7 +568,6 @@ mod tests { splits: vec![indexed_split], checkpoint_delta_opt: IndexCheckpointDelta::for_test("source_id", 10..20).into(), publish_lock: PublishLock::default(), - publish_token_opt: None, merge_task_opt: None, batch_parent_span: Span::none(), }) diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_doc_processor.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_doc_processor.rs index b8f9ea2b32e..74532cdef8d 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_doc_processor.rs @@ -31,7 +31,7 @@ use tokio::runtime::Handle; use tracing::{debug, info, instrument}; use super::{ParquetIndexer, ProcessedParquetBatch}; -use crate::models::{NewPublishLock, NewPublishToken, PublishLock, RawDocBatch}; +use crate::models::{NewPublishLock, PublishLock, RawDocBatch}; /// Arrow IPC stream continuation marker (4 bytes of 0xFF). const ARROW_IPC_CONTINUATION_MARKER: [u8; 4] = [0xFF, 0xFF, 0xFF, 0xFF]; @@ -323,21 +323,6 @@ impl Handler for ParquetDocProcessor { } } -#[async_trait] -impl Handler for ParquetDocProcessor { - type Reply = (); - - async fn handle( - &mut self, - message: NewPublishToken, - ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - ctx.send_message(&self.indexer_mailbox, message).await?; - - Ok(()) - } -} - #[cfg(test)] mod tests { use std::sync::atomic::Ordering; diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_e2e_test.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_e2e_test.rs index 986ab31fd2d..44b09282d1a 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_e2e_test.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_e2e_test.rs @@ -43,7 +43,7 @@ use crate::actors::sequencer::Sequencer; use crate::actors::{ ParquetDocProcessor, ParquetIndexer, ParquetPackager, ParquetUploader, Publisher, UploaderType, }; -use crate::models::RawDocBatch; +use crate::models::{RawDocBatch, SharedPublishToken}; // ============================================================================= // Helpers @@ -166,6 +166,7 @@ async fn test_metrics_pipeline_e2e() { metastore_client.clone(), None, None, + SharedPublishToken::default(), ); let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); @@ -516,6 +517,7 @@ async fn test_sketch_pipeline_e2e() { metastore_client.clone(), None, None, + SharedPublishToken::default(), ); let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_indexer.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_indexer.rs index 2d64f54b9b7..cf9f109aac1 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_indexer.rs @@ -33,7 +33,7 @@ use quickwit_doc_mapper::{ArrowRowContext, RoutingExpr}; use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta}; use quickwit_parquet_engine::index::{ParquetBatchAccumulator, ParquetIndexingConfig}; use quickwit_parquet_engine::split::ParquetSplitMetadata; -use quickwit_proto::types::{IndexUid, PublishToken, SourceId}; +use quickwit_proto::types::{IndexUid, SourceId}; use serde::Serialize; use tokio::runtime::Handle; use tracing::{debug, info, info_span, warn}; @@ -43,7 +43,7 @@ use super::ProcessedParquetBatch; use super::parquet_merge_messages::ParquetMergeTask; use super::parquet_packager::{ParquetBatchForPackager, ParquetPackager, PartitionedRecordBatch}; use crate::actors::indexer::OTHER_PARTITION_ID; -use crate::models::{NewPublishLock, NewPublishToken, PublishLock}; +use crate::models::{NewPublishLock, PublishLock}; /// Default commit timeout for ParquetIndexer (60 seconds). // TODO: read from index config commit_timeout_secs. @@ -118,8 +118,6 @@ pub struct ParquetSplitBatch { pub checkpoint_delta_opt: Option, /// Publish lock for coordinating with sources. pub publish_lock: PublishLock, - /// Optional publish token. - pub publish_token_opt: Option, /// Split IDs being replaced by this batch (non-empty for merges). /// Empty for the ingest path. pub replaced_split_ids: Vec, @@ -176,8 +174,6 @@ pub struct ParquetIndexer { checkpoint_delta: SourceCheckpointDelta, /// Publish lock for coordinating with sources. publish_lock: PublishLock, - /// Optional publish token. - publish_token_opt: Option, /// Observability counters. counters: ParquetIndexerCounters, /// Current workbench ID for tracing. @@ -270,7 +266,6 @@ impl ParquetIndexer { accumulator_config, checkpoint_delta: SourceCheckpointDelta::default(), publish_lock: PublishLock::default(), - publish_token_opt: None, counters, workbench_id: Ulid::new(), packager_mailbox, @@ -557,7 +552,6 @@ impl ParquetIndexer { index_uid: self.index_uid.clone(), checkpoint_delta: self.make_index_checkpoint_delta(), publish_lock: self.publish_lock.clone(), - publish_token_opt: self.publish_token_opt.clone(), }; if batch_for_packager.batches.is_empty() @@ -695,21 +689,6 @@ impl Handler for ParquetIndexer { } } -#[async_trait] -impl Handler for ParquetIndexer { - type Reply = (); - - async fn handle( - &mut self, - message: NewPublishToken, - _ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - let NewPublishToken(publish_token) = message; - self.publish_token_opt = Some(publish_token); - Ok(()) - } -} - #[async_trait] impl Handler for ParquetIndexer { type Reply = (); diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs index ffe27b09a0e..659da70a85d 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_executor.rs @@ -374,7 +374,6 @@ impl Handler for ParquetMergeExecutor { output_dir, checkpoint_delta_opt: None, publish_lock: PublishLock::default(), - publish_token_opt: None, replaced_split_ids, _scratch_directory_opt: Some(scratch.scratch_directory), _merge_task_opt: Some(ParquetMergeTask { @@ -460,7 +459,6 @@ impl Handler for ParquetMergeExecutor { output_dir, checkpoint_delta_opt: None, publish_lock: PublishLock::default(), - publish_token_opt: None, replaced_split_ids, _scratch_directory_opt: Some(scratch.scratch_directory), _merge_task_opt: Some(ParquetMergeTask { diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs index 6de3f5f5983..a918ce5b1e7 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs @@ -57,7 +57,7 @@ use crate::actors::pipeline_shared::wait_duration_before_retry; use crate::actors::publisher::DisconnectMergePlanner; use crate::actors::{MergeSchedulerService, Publisher, Sequencer, UploaderType}; use crate::metrics::ONGOING_MERGE_OPERATIONS; -use crate::models::MergeStatistics; +use crate::models::{MergeStatistics, SharedPublishToken}; /// Limits concurrent Parquet merge pipeline spawns to avoid overwhelming the /// metastore. This is a separate semaphore from the Tantivy merge pipeline's. @@ -302,6 +302,7 @@ impl ParquetMergePipeline { self.params.metastore.clone(), None, // No Tantivy planner None, // No source + SharedPublishToken::default(), ) .set_parquet_merge_planner_mailbox(self.merge_planner_mailbox.clone()); diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline_crash_test.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline_crash_test.rs index 96215fe1799..c9a926ab1f7 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline_crash_test.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline_crash_test.rs @@ -138,12 +138,14 @@ async fn test_merge_pipeline_crash_and_restart() { let final_publish_done = Arc::new(AtomicBool::new(false)); let final_publish_clone = final_publish_done.clone(); + // `publish_with_retry` retries each publish up to 3×; the second logical publish must fail on + // all 3 attempts (calls 1-3) to actually crash the pipeline rather than be masked by a retry. mock_metastore .expect_publish_metrics_splits() .returning(move |request| { let call_num = publish_call_clone.fetch_add(1, Ordering::SeqCst); - if call_num == 1 { - // Fail on the second publish to trigger pipeline restart. + if (1..=3).contains(&call_num) { + // Second publish: fail every retry attempt to trigger pipeline restart. return Err(quickwit_proto::metastore::MetastoreError::Internal { message: "injected failure for crash test".to_string(), cause: "test".to_string(), @@ -153,8 +155,8 @@ async fn test_merge_pipeline_crash_and_restart() { .lock() .unwrap() .extend(request.replaced_split_ids.clone()); - // Signal completion after a post-restart publish. - if call_num >= 2 { + // Signal completion on the first post-restart publish (call 4, after the 3 failures). + if call_num >= 4 { final_publish_clone.store(true, Ordering::SeqCst); } Ok(EmptyResponse {}) diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline_trace_conformance_test.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline_trace_conformance_test.rs index 0fefdb6a16b..02d6bd4c76b 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline_trace_conformance_test.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline_trace_conformance_test.rs @@ -524,7 +524,13 @@ fn build_mock_metastore(tracker: Arc) -> MetastoreServiceCli let n = publish_clone .publish_call_count .fetch_add(1, Ordering::SeqCst); - if Some(n) == publish_clone.fail_publish_at_call { + // `publish_with_retry` retries each logical publish up to 3× on a retryable error + // (Internal is retryable). To actually crash the pipeline the injected failure must + // span all 3 attempts (calls k..=k+2) so the retry budget is exhausted rather than + // masked by a successful retry. + if let Some(k) = publish_clone.fail_publish_at_call + && (k..=k + 2).contains(&n) + { // Failed publish: do NOT promote staged → published. return Err(quickwit_proto::metastore::MetastoreError::Internal { message: "injected failure for trace conformance test".to_string(), @@ -545,9 +551,11 @@ fn build_mock_metastore(tracker: Arc) -> MetastoreServiceCli published.remove(replaced_id); replaced_history.insert(replaced_id.clone(), ()); } + // The first successful publish after the exhausted retries (call k+3) marks + // completion. With no injected failure (None), the first publish (n >= 0) marks it. if n >= publish_clone .fail_publish_at_call - .map(|x| x + 1) + .map(|x| x + 3) .unwrap_or(0) { publish_clone.publish_done.store(true, Ordering::SeqCst); diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_packager.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_packager.rs index 6e1b6866c50..df0d94e2a46 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_packager.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_packager.rs @@ -30,7 +30,7 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Qu use quickwit_common::runtimes::RuntimeType; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_parquet_engine::storage::ParquetSplitWriter; -use quickwit_proto::types::{IndexUid, PublishToken}; +use quickwit_proto::types::IndexUid; use serde::Serialize; use tokio::runtime::Handle; use tracing::{info, warn}; @@ -64,8 +64,6 @@ pub struct ParquetBatchForPackager { pub checkpoint_delta: IndexCheckpointDelta, /// Publish lock for coordination. pub publish_lock: PublishLock, - /// Optional publish token. - pub publish_token_opt: Option, } /// Counters for ParquetPackager observability. @@ -182,7 +180,6 @@ impl Handler for ParquetPackager { index_uid, checkpoint_delta, publish_lock, - publish_token_opt, } = batch_for_packager; let output_dir = self.split_writer.base_path().clone(); @@ -235,7 +232,6 @@ impl Handler for ParquetPackager { output_dir, checkpoint_delta_opt: Some(checkpoint_delta), publish_lock, - publish_token_opt, replaced_split_ids: Vec::new(), _scratch_directory_opt: None, _merge_task_opt: None, @@ -349,7 +345,6 @@ mod tests { source_delta: SourceCheckpointDelta::from_range(0..10), }, publish_lock: PublishLock::default(), - publish_token_opt: None, }; packager_mailbox @@ -388,7 +383,6 @@ mod tests { source_delta: SourceCheckpointDelta::from_range(0..10), }, publish_lock: PublishLock::default(), - publish_token_opt: None, }; packager_mailbox @@ -431,7 +425,6 @@ mod tests { source_delta: SourceCheckpointDelta::from_range(0..30), }, publish_lock: PublishLock::default(), - publish_token_opt: None, }; packager_mailbox @@ -476,7 +469,6 @@ mod tests { source_delta: SourceCheckpointDelta::from_range(0..30), }, publish_lock: PublishLock::default(), - publish_token_opt: None, }; packager_mailbox diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_splits_update.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_splits_update.rs index b4ac6d197d0..af6fd2e7088 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_splits_update.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_splits_update.rs @@ -19,7 +19,7 @@ use std::fmt; use itertools::Itertools; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_parquet_engine::split::ParquetSplitMetadata; -use quickwit_proto::types::{IndexUid, PublishToken}; +use quickwit_proto::types::IndexUid; use tracing::Span; use super::parquet_merge_messages::ParquetMergeTask; @@ -40,8 +40,6 @@ pub struct ParquetSplitsUpdate { pub checkpoint_delta_opt: Option, /// Publish lock for coordination. pub publish_lock: PublishLock, - /// Optional publish token. - pub publish_token_opt: Option, /// Parent span for tracing. pub parent_span: Span, /// Merge task — held until the publisher drops this message, ensuring the diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_uploader.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_uploader.rs index f3bf904e1b7..620d4720f4c 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_uploader.rs @@ -199,7 +199,6 @@ impl Handler for ParquetUploader { replaced_split_ids: batch.replaced_split_ids, checkpoint_delta_opt: batch.checkpoint_delta_opt, publish_lock: batch.publish_lock, - publish_token_opt: batch.publish_token_opt, parent_span: tracing::Span::current(), _merge_task_opt: batch._merge_task_opt, }; @@ -237,7 +236,6 @@ impl Handler for ParquetUploader { let output_dir = batch.output_dir; let checkpoint_delta_opt = batch.checkpoint_delta_opt; let publish_lock = batch.publish_lock; - let publish_token_opt = batch.publish_token_opt; let mut splits = batch.splits; let replaced_split_ids = batch.replaced_split_ids; let merge_task_opt = batch._merge_task_opt; @@ -378,7 +376,6 @@ impl Handler for ParquetUploader { replaced_split_ids, checkpoint_delta_opt, publish_lock, - publish_token_opt, parent_span: Span::current(), _merge_task_opt: merge_task_opt, }; @@ -499,7 +496,6 @@ mod tests { output_dir: temp_dir.path().to_path_buf(), checkpoint_delta_opt: Some(checkpoint_delta), publish_lock: PublishLock::default(), - publish_token_opt: None, replaced_split_ids: Vec::new(), _scratch_directory_opt: None, _merge_task_opt: None, @@ -598,7 +594,6 @@ mod tests { output_dir: temp_dir.path().to_path_buf(), checkpoint_delta_opt: Some(checkpoint_delta), publish_lock: PublishLock::default(), - publish_token_opt: None, replaced_split_ids: Vec::new(), _scratch_directory_opt: None, _merge_task_opt: None, @@ -678,7 +673,6 @@ mod tests { output_dir: temp_dir.path().to_path_buf(), checkpoint_delta_opt: Some(checkpoint_delta), publish_lock: PublishLock::default(), - publish_token_opt: None, replaced_split_ids: Vec::new(), _scratch_directory_opt: None, _merge_task_opt: None, @@ -754,7 +748,6 @@ mod tests { output_dir: temp_dir.path().to_path_buf(), checkpoint_delta_opt: Some(checkpoint_delta), publish_lock: PublishLock::default(), - publish_token_opt: None, replaced_split_ids: Vec::new(), _scratch_directory_opt: None, _merge_task_opt: None, diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/pipeline.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/pipeline.rs index b0b7f869d40..d0ed12c2c7a 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/pipeline.rs @@ -51,7 +51,7 @@ use crate::actors::pipeline_shared::{ use crate::actors::sequencer::Sequencer; use crate::actors::{Publisher, UploaderType}; use crate::metrics::INDEXING_PIPELINES; -use crate::models::IndexingStatistics; +use crate::models::{IndexingStatistics, SharedPublishToken}; use crate::source::{ AssignShards, Assignment, SourceActor, SourceRuntime, quickwit_supported_sources, }; @@ -114,6 +114,10 @@ pub struct MetricsPipeline { handles_opt: Option, kill_switch: KillSwitch, shard_ids: BTreeSet, + // Id of the last indexing plan assigned to this pipeline. Kept here, like `shard_ids`, so it + // can be re-sent to the source on respawn; the source adopts it as its publish token. + indexing_plan_id: String, + publish_token: SharedPublishToken, _indexing_pipelines_gauge_guard: GaugeGuard, } @@ -163,6 +167,8 @@ impl MetricsPipeline { ..Default::default() }, shard_ids: Default::default(), + indexing_plan_id: String::new(), + publish_token: SharedPublishToken::default(), _indexing_pipelines_gauge_guard: indexing_pipelines_gauge_guard, } } @@ -332,6 +338,7 @@ impl MetricsPipeline { self.params.metastore.clone(), None, Some(source_mailbox.clone()), + self.publish_token.clone(), ); if let Some(planner_mailbox) = &self.params.parquet_merge_planner_mailbox_opt { publisher = publisher.set_parquet_merge_planner_mailbox(planner_mailbox.clone()); @@ -435,6 +442,7 @@ impl MetricsPipeline { storage_resolver: self.params.source_storage_resolver.clone(), event_broker: self.params.event_broker.clone(), indexing_setting: self.params.indexing_settings.clone(), + publish_token: self.publish_token.clone(), }; let source = ctx .protect_future(quickwit_supported_sources().load_source(source_runtime)) @@ -447,6 +455,7 @@ impl MetricsPipeline { .spawn(actor_source); let assign_shards_message = AssignShards(Assignment { shard_ids: self.shard_ids.clone(), + indexing_plan_id: self.indexing_plan_id.clone(), }); source_mailbox.send_message(assign_shards_message).await?; @@ -539,6 +548,8 @@ impl Handler for MetricsPipeline { ) -> Result<(), ActorExitStatus> { self.shard_ids .clone_from(&assign_shards_message.0.shard_ids); + self.indexing_plan_id + .clone_from(&assign_shards_message.0.indexing_plan_id); if let Some(handles) = &self.handles_opt { info!( shard_ids=?assign_shards_message.0.shard_ids, diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/publisher_impl.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/publisher_impl.rs index 8ae01f06c68..5e3b5381275 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/publisher_impl.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/publisher_impl.rs @@ -15,7 +15,6 @@ //! `Handler` implementation for `Publisher`, //! specific to the metrics pipeline. -use anyhow::Context; use async_trait::async_trait; use quickwit_actors::{ActorContext, ActorExitStatus, Handler}; use quickwit_dst::events::merge_pipeline::{MergePipelineEvent, record_merge_pipeline_event}; @@ -25,7 +24,9 @@ use quickwit_proto::metastore::{ use tracing::{info, instrument}; use super::ParquetSplitsUpdate; -use crate::actors::publisher::{Publisher, serialize_checkpoint_delta, suggest_truncate}; +use crate::actors::publisher::{ + Publisher, publish_with_retry, serialize_checkpoint_delta, suggest_truncate, +}; pub(crate) const METRICS_PUBLISHER_NAME: &str = "ParquetPublisher"; @@ -45,7 +46,6 @@ impl Handler for Publisher { replaced_split_ids, checkpoint_delta_opt, publish_lock, - publish_token_opt, _merge_task_opt, .. } = split_update; @@ -57,27 +57,31 @@ impl Handler for Publisher { .collect(); if let Some(_guard) = publish_lock.acquire().await { if quickwit_common::is_sketches_index(&index_uid.index_id) { - let publish_request = PublishSketchSplitsRequest { - index_uid: Some(index_uid.clone()), - staged_split_ids: split_ids.clone(), - replaced_split_ids: replaced_split_ids.clone(), - index_checkpoint_delta_json_opt, - publish_token_opt: publish_token_opt.clone(), - }; - ctx.protect_future(self.metastore.publish_sketch_splits(publish_request)) - .await - .context("failed to publish sketch splits")?; + publish_with_retry(ctx, "publish sketch splits", || { + let metastore = self.metastore.clone(); + let publish_request = PublishSketchSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: split_ids.clone(), + replaced_split_ids: replaced_split_ids.clone(), + index_checkpoint_delta_json_opt: index_checkpoint_delta_json_opt.clone(), + publish_token_opt: self.publish_token.load().as_deref().cloned(), + }; + async move { metastore.publish_sketch_splits(publish_request).await } + }) + .await?; } else { - let publish_request = PublishMetricsSplitsRequest { - index_uid: Some(index_uid.clone()), - staged_split_ids: split_ids.clone(), - replaced_split_ids: replaced_split_ids.clone(), - index_checkpoint_delta_json_opt, - publish_token_opt: publish_token_opt.clone(), - }; - ctx.protect_future(self.metastore.publish_metrics_splits(publish_request)) - .await - .context("failed to publish metrics splits")?; + publish_with_retry(ctx, "publish metrics splits", || { + let metastore = self.metastore.clone(); + let publish_request = PublishMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: split_ids.clone(), + replaced_split_ids: replaced_split_ids.clone(), + index_checkpoint_delta_json_opt: index_checkpoint_delta_json_opt.clone(), + publish_token_opt: self.publish_token.load().as_deref().cloned(), + }; + async move { metastore.publish_metrics_splits(publish_request).await } + }) + .await?; } } else { info!( @@ -165,7 +169,7 @@ mod tests { use super::{METRICS_PUBLISHER_NAME, ParquetSplitsUpdate}; use crate::actors::publisher::Publisher; - use crate::models::PublishLock; + use crate::models::{PublishLock, SharedPublishToken}; fn create_test_metrics_split_metadata(index_uid: &str, split_id: &str) -> ParquetSplitMetadata { ParquetSplitMetadata::metrics_builder() @@ -200,6 +204,7 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), None, None, + SharedPublishToken::default(), ); let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); @@ -215,7 +220,6 @@ mod tests { source_delta: SourceCheckpointDelta::from_range(0..10), }), publish_lock: PublishLock::default(), - publish_token_opt: None, parent_span: Span::none(), _merge_task_opt: None, }; @@ -252,6 +256,7 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), None, None, + SharedPublishToken::default(), ); let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); @@ -264,7 +269,6 @@ mod tests { source_delta: SourceCheckpointDelta::from_range(0..1), }), publish_lock: PublishLock::default(), - publish_token_opt: None, parent_span: Span::none(), _merge_task_opt: None, }; @@ -292,6 +296,7 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), None, None, + SharedPublishToken::default(), ); let (publisher_mailbox, publisher_handle) = universe.spawn_builder().spawn(publisher); @@ -310,7 +315,6 @@ mod tests { source_delta: SourceCheckpointDelta::from_range(0..10), }), publish_lock, - publish_token_opt: None, parent_span: Span::none(), _merge_task_opt: None, }; diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index 3de412bb400..fb07a5a720d 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use anyhow::Context; use async_trait::async_trait; -use quickwit_actors::{Actor, ActorContext, Mailbox, QueueCapacity}; +use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Mailbox, QueueCapacity}; use quickwit_metastore::checkpoint::IndexCheckpointDelta; -use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_proto::metastore::{MetastoreError, MetastoreResult, MetastoreServiceClient}; use serde::Serialize; +use tracing::warn; use crate::actors::MergePlanner; +use crate::models::SharedPublishToken; use crate::source::{SourceActor, SuggestTruncate}; #[derive(Clone, Debug, Default, Serialize)] @@ -44,6 +48,7 @@ pub struct Publisher { pub(crate) parquet_merge_planner_mailbox_opt: Option>, pub(crate) source_mailbox_opt: Option>, + pub(crate) publish_token: SharedPublishToken, pub(crate) counters: PublisherCounters, } @@ -54,6 +59,7 @@ impl Publisher { metastore: MetastoreServiceClient, merge_planner_mailbox_opt: Option>, source_mailbox_opt: Option>, + publish_token: SharedPublishToken, ) -> Publisher { Publisher { name, @@ -63,6 +69,7 @@ impl Publisher { #[cfg(feature = "metrics")] parquet_merge_planner_mailbox_opt: None, source_mailbox_opt, + publish_token, counters: PublisherCounters::default(), } } @@ -107,6 +114,47 @@ pub(crate) async fn suggest_truncate( } } +pub(crate) async fn publish_with_retry( + ctx: &ActorContext, + operation_name: &str, + mut publish: F, +) -> Result<(), ActorExitStatus> +where + F: FnMut() -> Fut, + Fut: Future>, +{ + for retry_delay in [ + Some(Duration::from_millis(100)), + Some(Duration::from_millis(250)), + None, + ] { + let Err(error) = ctx.protect_future(publish()).await else { + return Ok(()); + }; + let retryable = matches!( + error, + MetastoreError::InvalidArgument { .. } + | MetastoreError::Unavailable(_) + | MetastoreError::TooManyRequests + | MetastoreError::Connection { .. } + | MetastoreError::Internal { .. } + ); + match retry_delay { + Some(retry_delay) if retryable => { + warn!(%error, operation = operation_name, "metastore publish failed, retrying"); + ctx.protect_future(tokio::time::sleep(retry_delay)).await; + } + _ => { + warn!(%error, operation = operation_name, retryable, "metastore publish failed, giving up after 3 tries"); + return Err(anyhow::Error::from(error) + .context(format!("failed to {operation_name}")) + .into()); + } + } + } + unreachable!("retry loop returns on the final attempt") +} + #[async_trait] impl Actor for Publisher { type ObservableState = PublisherCounters; diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index 0b2901f26d5..fc299dc27db 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -31,7 +31,7 @@ use quickwit_metastore::{SplitMetadata, StageSplitsRequestExt}; use quickwit_metrics::{gauge, label_values}; use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, StageSplitsRequest}; use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; -use quickwit_proto::types::{IndexUid, PublishToken}; +use quickwit_proto::types::IndexUid; use quickwit_storage::SplitPayloadBuilder; use serde::Serialize; use tokio::sync::oneshot::Sender; @@ -389,7 +389,6 @@ impl Handler for Uploader { packaged_splits_and_metadata, batch.checkpoint_delta_opt, batch.publish_lock, - batch.publish_token_opt, batch.merge_task_opt, batch.batch_parent_span, ); @@ -438,7 +437,6 @@ impl Handler for Uploader { replaced_split_ids: Vec::new(), checkpoint_delta_opt: Some(empty_split.checkpoint_delta), publish_lock: empty_split.publish_lock, - publish_token_opt: empty_split.publish_token_opt, merge_task: None, parent_span: empty_split.batch_parent_span, }; @@ -453,7 +451,6 @@ fn make_publish_operation( packaged_splits_and_metadatas: Vec<(PackagedSplit, SplitMetadata)>, checkpoint_delta_opt: Option, publish_lock: PublishLock, - publish_token_opt: Option, merge_task: Option, parent_span: Span, ) -> SplitsUpdate { @@ -471,7 +468,6 @@ fn make_publish_operation( replaced_split_ids: Vec::from_iter(replaced_split_ids), checkpoint_delta_opt, publish_lock, - publish_token_opt, merge_task, parent_span, } @@ -599,7 +595,6 @@ mod tests { checkpoint_delta_opt, PublishLock::default(), None, - None, Span::none(), )) .await?; @@ -743,7 +738,6 @@ mod tests { None, PublishLock::default(), None, - None, Span::none(), )) .await?; @@ -860,7 +854,6 @@ mod tests { checkpoint_delta_opt, PublishLock::default(), None, - None, Span::none(), )) .await?; @@ -912,7 +905,6 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), checkpoint_delta, publish_lock: PublishLock::default(), - publish_token_opt: None, batch_parent_span: Span::none(), }) .await?; @@ -1042,7 +1034,6 @@ mod tests { checkpoint_delta_opt, PublishLock::default(), None, - None, Span::none(), )) .await?; diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index 03728fe2f6a..ea6233e7d2a 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -20,7 +20,7 @@ use quickwit_common::temp_dir::TempDirectory; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_metrics::GaugeGuard; use quickwit_proto::indexing::IndexingPipelineId; -use quickwit_proto::types::{DocMappingUid, IndexUid, PublishToken}; +use quickwit_proto::types::{DocMappingUid, IndexUid}; use tantivy::IndexBuilder; use tantivy::directory::MmapDirectory; use tracing::{Span, instrument}; @@ -155,7 +155,6 @@ pub struct IndexedSplitBatch { pub splits: Vec, pub checkpoint_delta_opt: Option, pub publish_lock: PublishLock, - pub publish_token_opt: Option, /// A [`MergeTask`] tracked by either the `MergePlanner` or the `DeleteTaskPlanner` /// in the `MergePipeline` or `DeleteTaskPipeline`. /// See planners docs to understand the usage. @@ -179,7 +178,6 @@ pub struct IndexedSplitBatchBuilder { pub splits: Vec, pub checkpoint_delta_opt: Option, pub publish_lock: PublishLock, - pub publish_token_opt: Option, pub commit_trigger: CommitTrigger, pub batch_parent_span: Span, pub memory_usage: GaugeGuard, @@ -192,6 +190,5 @@ pub struct EmptySplit { pub index_uid: IndexUid, pub checkpoint_delta: IndexCheckpointDelta, pub publish_lock: PublishLock, - pub publish_token_opt: Option, pub batch_parent_span: Span, } diff --git a/quickwit/quickwit-indexing/src/models/mod.rs b/quickwit/quickwit-indexing/src/models/mod.rs index 9dfdfde1594..8d9ccf72596 100644 --- a/quickwit/quickwit-indexing/src/models/mod.rs +++ b/quickwit/quickwit-indexing/src/models/mod.rs @@ -28,6 +28,9 @@ mod raw_doc_batch; mod shard_positions; mod split_attrs; +use std::sync::Arc; + +use arc_swap::ArcSwapOption; pub use indexed_split::{ CommitTrigger, EmptySplit, IndexedSplit, IndexedSplitBatch, IndexedSplitBatchBuilder, IndexedSplitBuilder, @@ -49,5 +52,7 @@ pub(crate) use shard_positions::LocalShardPositionsUpdate; pub use shard_positions::ShardPositionsService; pub use split_attrs::{SplitAttrs, create_split_metadata}; -#[derive(Debug)] -pub struct NewPublishToken(pub PublishToken); +/// Shared, live publish token owned by an indexing pipeline. The source writes it on reset and +/// shard re-acquisition; the publisher reads it at publish time. `None` means no token (merge +/// pipelines, or sources without a publish token such as file/kafka). +pub type SharedPublishToken = Arc>; diff --git a/quickwit/quickwit-indexing/src/models/packaged_split.rs b/quickwit/quickwit-indexing/src/models/packaged_split.rs index db9b4abae39..91c5fa7a53a 100644 --- a/quickwit/quickwit-indexing/src/models/packaged_split.rs +++ b/quickwit/quickwit-indexing/src/models/packaged_split.rs @@ -19,7 +19,7 @@ use std::path::PathBuf; use itertools::Itertools; use quickwit_common::temp_dir::TempDirectory; use quickwit_metastore::checkpoint::IndexCheckpointDelta; -use quickwit_proto::types::{IndexUid, PublishToken, SplitId}; +use quickwit_proto::types::{IndexUid, SplitId}; use tracing::Span; use crate::merge_policy::MergeTask; @@ -60,7 +60,6 @@ pub struct PackagedSplitBatch { pub splits: Vec, pub checkpoint_delta_opt: Option, pub publish_lock: PublishLock, - pub publish_token_opt: Option, /// A [`MergeTask`] tracked by either the `MergePlanner` or the `DeleteTaskPlanner` /// in the `MergePipeline` or `DeleteTaskPipeline`. /// See planners docs to understand the usage. @@ -78,7 +77,6 @@ impl PackagedSplitBatch { splits: Vec, checkpoint_delta_opt: Option, publish_lock: PublishLock, - publish_token_opt: Option, merge_task_opt: Option, batch_parent_span: Span, ) -> Self { @@ -94,7 +92,6 @@ impl PackagedSplitBatch { splits, checkpoint_delta_opt, publish_lock, - publish_token_opt, merge_task_opt, batch_parent_span, } diff --git a/quickwit/quickwit-indexing/src/models/publisher_message.rs b/quickwit/quickwit-indexing/src/models/publisher_message.rs index 13182a8f76a..5700a48bc04 100644 --- a/quickwit/quickwit-indexing/src/models/publisher_message.rs +++ b/quickwit/quickwit-indexing/src/models/publisher_message.rs @@ -17,7 +17,7 @@ use std::fmt; use itertools::Itertools; use quickwit_metastore::SplitMetadata; use quickwit_metastore::checkpoint::IndexCheckpointDelta; -use quickwit_proto::types::{IndexUid, PublishToken}; +use quickwit_proto::types::IndexUid; use tracing::Span; use crate::merge_policy::MergeTask; @@ -29,7 +29,6 @@ pub struct SplitsUpdate { pub replaced_split_ids: Vec, pub checkpoint_delta_opt: Option, pub publish_lock: PublishLock, - pub publish_token_opt: Option, /// A [`MergeTask`] tracked by either the `MergePlanner` or the `DeleteTaskPlanner` /// in the `MergePipeline` or `DeleteTaskPipeline`. /// See planners docs to understand the usage. diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index cbf3d70d976..d1f52432ccb 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -14,6 +14,7 @@ use std::collections::BTreeSet; use std::fmt; +use std::sync::Arc; use std::time::Duration; use anyhow::Context; @@ -46,10 +47,10 @@ use tracing::{debug, error, info, warn}; use ulid::Ulid; use super::{ - BATCH_NUM_BYTES_LIMIT, BatchBuilder, EMIT_BATCHES_TIMEOUT, Source, SourceContext, + Assignment, BATCH_NUM_BYTES_LIMIT, BatchBuilder, EMIT_BATCHES_TIMEOUT, Source, SourceContext, SourceRuntime, SourceSink, TypedSourceFactory, }; -use crate::models::{LocalShardPositionsUpdate, NewPublishLock, NewPublishToken, PublishLock}; +use crate::models::{LocalShardPositionsUpdate, NewPublishLock, PublishLock, SharedPublishToken}; pub struct IngestSourceFactory; @@ -101,10 +102,20 @@ impl ClientId { pipeline_uid, } } +} + +trait PublishTokenExt { + fn resolve(node_id: &str, indexing_plan_id: &str) -> Self; +} - fn new_publish_token(&self) -> String { - let ulid = if cfg!(test) { Ulid::nil() } else { Ulid::new() }; - format!("{self}/{ulid}") +impl PublishTokenExt for PublishToken { + fn resolve(node_id: &str, indexing_plan_id: &str) -> PublishToken { + if indexing_plan_id.is_empty() { + let ulid = if cfg!(test) { Ulid::nil() } else { Ulid::new() }; + format!("{node_id}/{ulid}") + } else { + format!("{indexing_plan_id}-{node_id}") + } } } @@ -142,7 +153,7 @@ pub struct IngestSource { assigned_shards: FnvHashMap, fetch_stream: MultiFetchStream, publish_lock: PublishLock, - publish_token: PublishToken, + publish_token: SharedPublishToken, event_broker: EventBroker, } @@ -176,9 +187,10 @@ impl IngestSource { retry_params, ); // We start as dead. The first reset with a non-empty list of shards will create an alive - // publish lock. + // publish lock. The publish token is left empty until then: the first reset adopts the + // indexing plan id carried by the assignment. let publish_lock = PublishLock::dead(); - let publish_token = client_id.new_publish_token(); + let publish_token = source_runtime.publish_token.clone(); Ok(IngestSource { client_id, @@ -388,7 +400,7 @@ impl IngestSource { /// Ongoing work and splits traveling through the pipeline will be dropped. /// /// After this method has returned we are guaranteed to have the following post condition: - /// - a alive publish lock / non-empty publish token + /// - an alive publish lock /// - all currently assigned shards included in the `new_assigned_shard_ids` set. async fn reset_if_needed( &mut self, @@ -439,13 +451,9 @@ impl IngestSource { self.fetch_stream.reset(); self.publish_lock.kill().await; self.publish_lock = PublishLock::default(); - self.publish_token = self.client_id.new_publish_token(); source_sink .send_publish_lock(NewPublishLock(self.publish_lock.clone()), ctx) .await?; - source_sink - .send_publish_token(NewPublishToken(self.publish_token.clone()), ctx) - .await?; Ok(()) } } @@ -504,10 +512,14 @@ impl Source for IngestSource { async fn assign_shards( &mut self, - new_assigned_shard_ids: BTreeSet, + assignment: Assignment, source_sink: &SourceSink, ctx: &SourceContext, ) -> anyhow::Result<()> { + let Assignment { + shard_ids: new_assigned_shard_ids, + indexing_plan_id, + } = assignment; self.reset_if_needed(&new_assigned_shard_ids, source_sink, ctx) .await?; @@ -525,27 +537,34 @@ impl Source for IngestSource { return Ok(()); } - let added_shard_ids: Vec = new_assigned_shard_ids - .into_iter() - .filter(|shard_id| !self.assigned_shards.contains_key(shard_id)) - .collect(); - + // Publish tokens are stored per shard in the metastore, but are managed per indexing + // pipeline. Whenever a new shard assignment arrives, all the assigned shards need to + // be re-acquired so that they all have the same publish token. + let (added_shard_ids, renewed_shard_ids): (Vec<&ShardId>, Vec<&ShardId>) = + new_assigned_shard_ids + .iter() + .partition(|shard_id| !self.assigned_shards.contains_key(shard_id)); assert!(!added_shard_ids.is_empty()); info!(added_shards=?added_shard_ids, "adding shards assignment"); + info!(renewed_shards=?renewed_shard_ids, "renewing publish token for shards"); + let publish_token = + PublishToken::resolve(self.client_id.node_id.as_str(), &indexing_plan_id); + let shard_ids_to_acquire: Vec = new_assigned_shard_ids.into_iter().collect(); let acquire_shards_request = AcquireShardsRequest { index_uid: Some(self.client_id.source_uid.index_uid.clone()), source_id: self.client_id.source_uid.source_id.clone(), - shard_ids: added_shard_ids.clone(), - publish_token: self.publish_token.clone(), + shard_ids: shard_ids_to_acquire.clone(), + publish_token: publish_token.clone(), }; let acquire_shards_response: AcquireShardsResponse = ctx .protect_future(self.metastore.acquire_shards(acquire_shards_request)) .await .context("failed to acquire shards")?; + self.publish_token.store(Some(Arc::new(publish_token))); - if acquire_shards_response.acquired_shards.len() != added_shard_ids.len() { - let missing_shards = added_shard_ids + if acquire_shards_response.acquired_shards.len() != shard_ids_to_acquire.len() { + let missing_shards = shard_ids_to_acquire .iter() .filter(|shard_id| { !acquire_shards_response @@ -556,15 +575,20 @@ impl Source for IngestSource { .collect::>(); // This can happen if the shards have been deleted by the control plane, after building // the plan and before the apply terminated. See #4888. - info!(missing_shards=?missing_shards, "failed to acquire all assigned shards"); + warn!(missing_shards=?missing_shards, "failed to acquire all assigned shards"); } let mut truncate_up_to_positions = Vec::with_capacity(acquire_shards_response.acquired_shards.len()); for acquired_shard in acquire_shards_response.acquired_shards { - let index_uid = acquired_shard.index_uid().clone(); let shard_id = acquired_shard.shard_id().clone(); + if self.assigned_shards.contains_key(&shard_id) { + // we re-acquired these shards to update their publish token; we don't want to + // resubscribe here, which would cause an error + continue; + } + let index_uid = acquired_shard.index_uid().clone(); let mut current_position_inclusive = acquired_shard.publish_position_inclusive(); let leader_id: NodeId = NodeId::from_str(&acquired_shard.leader_id); let follower_id_opt: Option = @@ -650,7 +674,7 @@ impl Source for IngestSource { json!({ "client_id": self.client_id.to_string(), "assigned_shards": assigned_shards, - "publish_token": self.publish_token, + "publish_token": self.publish_token.load().as_deref(), }) } } @@ -730,24 +754,38 @@ mod tests { mock_metastore .expect_acquire_shards() .once() - .withf(|request| request.shard_ids == [ShardId::from(1)]) + .withf(|request| request.shard_ids == [ShardId::from(0), ShardId::from(1)]) .returning(|request| { assert_eq!(request.index_uid(), &("test-index", 0)); assert_eq!(request.source_id, "test-source"); let response = AcquireShardsResponse { - acquired_shards: vec![Shard { - leader_id: "test-ingester-0".to_string(), - follower_id: None, - index_uid: Some(IndexUid::for_test("test-index", 0)), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - doc_mapping_uid: Some(DocMappingUid::default()), - publish_position_inclusive: Some(Position::offset(11u64)), - publish_token: Some(publish_token.to_string()), - update_timestamp: 1724158996, - }], + acquired_shards: vec![ + Shard { + leader_id: "test-ingester-0".to_string(), + follower_id: None, + index_uid: Some(IndexUid::for_test("test-index", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(0)), + shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), + publish_position_inclusive: Some(Position::offset(10u64)), + publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, + }, + Shard { + leader_id: "test-ingester-0".to_string(), + follower_id: None, + index_uid: Some(IndexUid::for_test("test-index", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + doc_mapping_uid: Some(DocMappingUid::default()), + publish_position_inclusive: Some(Position::offset(11u64)), + publish_token: Some(publish_token.to_string()), + update_timestamp: 1724158996, + }, + ], }; Ok(response) }); @@ -943,6 +981,7 @@ mod tests { storage_resolver: StorageResolver::for_test(), event_broker, indexing_setting: IndexingSettings::default(), + publish_token: SharedPublishToken::default(), }; let retry_params = RetryParams::no_retries(); let mut source = IngestSource::try_new(source_runtime, retry_params) @@ -963,21 +1002,38 @@ mod tests { let shard_ids: BTreeSet = once(0).map(ShardId::from).collect(); let publish_lock = source.publish_lock.clone(); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); assert_eq!(sequence_rx.recv().await.unwrap(), 1); assert!(!publish_lock.is_alive()); assert!(source.publish_lock.is_alive()); - assert!(!source.publish_token.is_empty()); + assert_eq!( + source.publish_token.load_full().unwrap().as_str(), + "01ARZ3NDEKTSV4RRFFQ69G5FAV-test-node" + ); // We assign [0,1] (previously [0]). This should just add the shard 1. // The stream does not need to be reset. let shard_ids: BTreeSet = (0..2).map(ShardId::from).collect(); let publish_lock = source.publish_lock.clone(); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); assert_eq!(sequence_rx.recv().await.unwrap(), 2); @@ -990,7 +1046,14 @@ mod tests { let shard_ids: BTreeSet = (1..3).map(ShardId::from).collect(); let publish_lock = source.publish_lock.clone(); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); @@ -1006,13 +1069,10 @@ mod tests { .unwrap(); assert_ne!(&source.publish_lock, &publish_lock); - // assert!(publish_token != source.publish_token); - - let NewPublishToken(publish_token) = doc_processor_inbox - .recv_typed_message::() - .await - .unwrap(); - assert_eq!(source.publish_token, publish_token); + assert_eq!( + source.publish_token.load_full().unwrap().as_str(), + "01ARZ3NDEKTSV4RRFFQ69G5FAV-test-node" + ); assert_eq!(source.assigned_shards.len(), 2); @@ -1040,6 +1100,17 @@ mod tests { time::sleep(Duration::from_millis(1)).await; } + #[test] + fn test_publish_token_resolve() { + let with_plan = PublishToken::resolve("test-node", "01ARZ3NDEKTSV4RRFFQ69G5FAV"); + assert_eq!(with_plan, "01ARZ3NDEKTSV4RRFFQ69G5FAV-test-node"); + + let fallback = PublishToken::resolve("test-node", ""); + assert!(!fallback.is_empty()); + assert!(fallback.contains('/')); + assert!(fallback.starts_with("test-node/")); + } + #[tokio::test] async fn test_ingest_source_assign_shards_all_eof() { // In this test, we check that if all assigned shards are originally marked as EOF in the @@ -1149,6 +1220,7 @@ mod tests { storage_resolver: StorageResolver::for_test(), event_broker, indexing_setting: IndexingSettings::default(), + publish_token: SharedPublishToken::default(), }; let retry_params = RetryParams::for_test(); let mut source = IngestSource::try_new(source_runtime, retry_params) @@ -1169,7 +1241,14 @@ mod tests { BTreeSet::from_iter([ShardId::from(1), ShardId::from(2)]); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); @@ -1316,6 +1395,7 @@ mod tests { storage_resolver: StorageResolver::for_test(), event_broker, indexing_setting: IndexingSettings::default(), + publish_token: SharedPublishToken::default(), }; let retry_params = RetryParams::for_test(); let mut source = IngestSource::try_new(source_runtime, retry_params) @@ -1340,7 +1420,14 @@ mod tests { // In this scenario, the indexer will only be able to acquire shard 1. source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); @@ -1383,6 +1470,7 @@ mod tests { storage_resolver: StorageResolver::for_test(), event_broker, indexing_setting: IndexingSettings::default(), + publish_token: SharedPublishToken::default(), }; let retry_params = RetryParams::for_test(); let mut source = IngestSource::try_new(source_runtime, retry_params) @@ -1546,6 +1634,12 @@ mod tests { let ingester_pool = IngesterPool::default(); let event_broker = EventBroker::default(); + // A representative non-empty publish token (the source normally holds one); `emit_batches` + // never reads it, so it stays out of the way of what this test exercises. + let publish_token = SharedPublishToken::default(); + publish_token.store(Some(Arc::new( + "01ARZ3NDEKTSV4RRFFQ69G5FAV-test-node".to_string(), + ))); let source_runtime = SourceRuntime { pipeline_id, source_config, @@ -1555,6 +1649,7 @@ mod tests { storage_resolver: StorageResolver::for_test(), event_broker, indexing_setting: IndexingSettings::default(), + publish_token, }; let retry_params = RetryParams::for_test(); let mut source = IngestSource::try_new(source_runtime, retry_params) @@ -1698,6 +1793,7 @@ mod tests { storage_resolver: StorageResolver::for_test(), event_broker, indexing_setting: IndexingSettings::default(), + publish_token: SharedPublishToken::default(), }; let retry_params = RetryParams::for_test(); let mut source = IngestSource::try_new(source_runtime, retry_params) @@ -1716,7 +1812,14 @@ mod tests { let shard_ids: BTreeSet = BTreeSet::from_iter([ShardId::from(1)]); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); @@ -1854,6 +1957,7 @@ mod tests { storage_resolver: StorageResolver::for_test(), event_broker, indexing_setting: IndexingSettings::default(), + publish_token: SharedPublishToken::default(), }; let retry_params = RetryParams::for_test(); let mut source = IngestSource::try_new(source_runtime, retry_params) @@ -1986,6 +2090,7 @@ mod tests { storage_resolver: StorageResolver::for_test(), event_broker: event_broker.clone(), indexing_setting: IndexingSettings::default(), + publish_token: SharedPublishToken::default(), }; let retry_params = RetryParams::for_test(); let mut source = IngestSource::try_new(source_runtime, retry_params) @@ -2011,7 +2116,14 @@ mod tests { }); source - .assign_shards(shard_ids, &source_sink, &ctx) + .assign_shards( + Assignment { + shard_ids, + indexing_plan_id: "01ARZ3NDEKTSV4RRFFQ69G5FAV".to_string(), + }, + &source_sink, + &ctx, + ) .await .unwrap(); diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 8b4db0844ad..c33f46efece 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -123,7 +123,7 @@ pub use void_source::{VoidSource, VoidSourceFactory}; use self::doc_file_reader::dir_and_filename; use self::stdin_source::StdinSourceFactory; -use crate::models::RawDocBatch; +use crate::models::{RawDocBatch, SharedPublishToken}; use crate::source::ingest::IngestSourceFactory; use crate::source::ingest_api_source::IngestApiSourceFactory; @@ -166,6 +166,7 @@ pub struct SourceRuntime { pub storage_resolver: StorageResolver, pub event_broker: EventBroker, pub indexing_setting: IndexingSettings, + pub publish_token: SharedPublishToken, } impl SourceRuntime { @@ -266,7 +267,7 @@ pub trait Source: Send + 'static { /// plane. async fn assign_shards( &mut self, - _shard_ids: BTreeSet, + _assignment: Assignment, _source_sink: &SourceSink, _ctx: &SourceContext, ) -> anyhow::Result<()> { @@ -337,6 +338,8 @@ struct Loop; #[derive(Debug)] pub struct Assignment { pub shard_ids: BTreeSet, + /// ULID of the originating indexing plan, used as the publish token when (re)acquiring shards. + pub indexing_plan_id: String, } #[derive(Debug)] @@ -402,9 +405,9 @@ impl Handler for SourceActor { assign_shards_message: AssignShards, ctx: &SourceContext, ) -> Result<(), ActorExitStatus> { - let AssignShards(Assignment { shard_ids }) = assign_shards_message; + let AssignShards(assignment) = assign_shards_message; self.source - .assign_shards(shard_ids, &self.source_sink, ctx) + .assign_shards(assignment, &self.source_sink, ctx) .await?; Ok(()) } @@ -631,6 +634,7 @@ mod tests { storage_resolver: StorageResolver::for_test(), event_broker: EventBroker::default(), indexing_setting: IndexingSettings::default(), + publish_token: SharedPublishToken::default(), } } diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs index f241d04bb57..a8a10d7e3fb 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs @@ -34,7 +34,7 @@ use super::local_state::QueueLocalState; use super::message::{MessageType, PreProcessingError, ReadyMessage}; use super::shared_state::{QueueSharedState, checkpoint_messages}; use super::visibility::{VisibilitySettings, spawn_visibility_task}; -use crate::models::{NewPublishLock, NewPublishToken, PublishLock}; +use crate::models::{NewPublishLock, PublishLock}; use crate::source::{SourceContext, SourceRuntime, SourceSink}; /// Maximum duration that the `emit_batches()` callback can wait for @@ -95,6 +95,10 @@ impl QueueCoordinator { shard_max_count: Option, shard_pruning_interval: Duration, ) -> Self { + let publish_token = Ulid::new().to_string(); + source_runtime + .publish_token + .store(Some(Arc::new(publish_token.clone()))); Self { shared_state: QueueSharedState::new( source_runtime.metastore, @@ -116,7 +120,7 @@ impl QueueCoordinator { observable_state: QueueCoordinatorObservableState::default(), message_type, publish_lock: PublishLock::default(), - publish_token: Ulid::new().to_string(), + publish_token, visibility_settings: VisibilitySettings::from_commit_timeout( source_runtime.indexing_setting.commit_timeout_secs, ), @@ -154,9 +158,6 @@ impl QueueCoordinator { source_sink .send_publish_lock(NewPublishLock(publish_lock), ctx) .await?; - source_sink - .send_publish_token(NewPublishToken(self.publish_token.clone()), ctx) - .await?; Ok(()) } diff --git a/quickwit/quickwit-indexing/src/source/source_sink.rs b/quickwit/quickwit-indexing/src/source/source_sink.rs index 40dba6f6e82..760cb6d0bef 100644 --- a/quickwit/quickwit-indexing/src/source/source_sink.rs +++ b/quickwit/quickwit-indexing/src/source/source_sink.rs @@ -23,23 +23,19 @@ use async_trait::async_trait; use quickwit_actors::{Actor, Command, DeferableReplyHandler, Mailbox, SendError}; use super::SourceContext; -use crate::models::{NewPublishLock, NewPublishToken, RawDocBatch}; +use crate::models::{NewPublishLock, RawDocBatch}; /// Internal trait used to type-erase the concrete `Mailbox`. #[async_trait] trait SourceSinkTrait: Send + Sync + 'static { async fn send_raw_doc_batch(&self, batch: RawDocBatch) -> Result<(), SendError>; async fn send_publish_lock(&self, lock: NewPublishLock) -> Result<(), SendError>; - async fn send_publish_token(&self, token: NewPublishToken) -> Result<(), SendError>; async fn send_exit_with_success(&self) -> Result<(), SendError>; } #[async_trait] impl SourceSinkTrait for Mailbox -where A: Actor - + DeferableReplyHandler - + DeferableReplyHandler - + DeferableReplyHandler +where A: Actor + DeferableReplyHandler + DeferableReplyHandler { async fn send_raw_doc_batch(&self, batch: RawDocBatch) -> Result<(), SendError> { self.send_message(batch).await?; @@ -51,11 +47,6 @@ where A: Actor Ok(()) } - async fn send_publish_token(&self, token: NewPublishToken) -> Result<(), SendError> { - self.send_message(token).await?; - Ok(()) - } - async fn send_exit_with_success(&self) -> Result<(), SendError> { self.send_message(Command::ExitWithSuccess).await?; Ok(()) @@ -103,15 +94,6 @@ impl SourceSink { self.inner.send_publish_lock(lock).await } - pub async fn send_publish_token( - &self, - token: NewPublishToken, - ctx: &SourceContext, - ) -> Result<(), SendError> { - let _guard = ctx.protect_zone(); - self.inner.send_publish_token(token).await - } - pub async fn send_exit_with_success(&self, ctx: &SourceContext) -> Result<(), SendError> { let _guard = ctx.protect_zone(); self.inner.send_exit_with_success().await diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 861c0bde62f..d8f9f84ca5d 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -31,6 +31,7 @@ use quickwit_indexing::actors::{ PublisherCounters, Uploader, UploaderCounters, UploaderType, }; use quickwit_indexing::merge_policy::merge_policy_from_settings; +use quickwit_indexing::models::SharedPublishToken; use quickwit_indexing::{IndexingSplitStore, SplitsUpdateMailbox}; use quickwit_metastore::IndexMetadataResponseExt; use quickwit_proto::indexing::MergePipelineId; @@ -167,6 +168,7 @@ impl DeleteTaskPipeline { self.metastore.clone(), None, None, + SharedPublishToken::default(), ); let (publisher_mailbox, publisher_supervisor_handler) = ctx.spawn_actor().supervise(publisher); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs index c30a27ea101..51c405ab368 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/shards.rs @@ -25,7 +25,7 @@ use quickwit_proto::metastore::{ }; use quickwit_proto::types::{IndexUid, Position, PublishToken, ShardId, SourceId, queue_id}; use time::OffsetDateTime; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use crate::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta}; use crate::file_backed::MutationOccurred; @@ -51,6 +51,23 @@ impl fmt::Debug for Shards { } } +/// Whether a shard recording `existing_token` can be acquired by a pipeline presenting +/// `presented_token`. Acquisition between ULIDs is monotonic (newer-or-equal wins). A legacy +/// (`'/'`-containing, pre-ULID) presented token always wins, so a rolling upgrade can still hand a +/// shard to an old indexer; a missing or legacy recorded token loses to any ULID. +fn can_acquire_shard(existing_token: &str, presented_token: &str) -> bool { + // An old indexer presenting a legacy token keeps the pre-upgrade overwrite behavior. + if presented_token.contains('/') { + return true; + } + // A missing or legacy recorded token loses to any ULID. + if existing_token.is_empty() || existing_token.contains('/') { + return true; + } + // Both are ULIDs: acquire only if ours is newer-or-equal. + presented_token >= existing_token +} + impl Shards { pub(super) fn empty(index_uid: IndexUid, source_id: SourceId) -> Self { Self { @@ -164,6 +181,17 @@ impl Shards { for shard_id in &request.shard_ids { if let Some(shard) = self.shards.get_mut(shard_id) { + if !can_acquire_shard(shard.publish_token(), &request.publish_token) { + error!( + index_uid=%self.index_uid, + source_id=%self.source_id, + %shard_id, + existing_publish_token=%shard.publish_token(), + publish_token=%request.publish_token, + "cannot acquire shard held by a more recent publish token" + ); + continue; + } if shard.publish_token() != request.publish_token { shard.publish_token = Some(request.publish_token.clone()); mutation_occurred = true; @@ -535,6 +563,27 @@ mod tests { ); } + #[test] + fn test_can_acquire_shard() { + const OLDER: &str = "01000000000000000000000000"; + const NEWER: &str = "02000000000000000000000000"; + const LEGACY: &str = "indexer/node/index:0/source/01000000000000000000000000"; + + // No token recorded yet: free to acquire. + assert!(can_acquire_shard("", NEWER)); + // A legacy (pre-ULID) recorded token is always superseded by a ULID. + assert!(can_acquire_shard(LEGACY, NEWER)); + // A newer ULID supersedes an older one. + assert!(can_acquire_shard(OLDER, NEWER)); + // The same ULID re-acquires (e.g. after a local respawn). + assert!(can_acquire_shard(NEWER, NEWER)); + // An older ULID cannot steal a shard owned by a newer one. + assert!(!can_acquire_shard(NEWER, OLDER)); + // A legacy presented token always wins, so a rolling upgrade can move a shard from a new + // indexer back to an old one. + assert!(can_acquire_shard(NEWER, LEGACY)); + } + #[test] fn test_delete_shards() { let index_uid = IndexUid::for_test("test-index", 0); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 43bd995e1ba..2c64b4794b8 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -1436,6 +1436,11 @@ impl MetastoreService for PostgresqlMetastore { .bind(&request.publish_token) .fetch_all(&self.connection_pool) .await?; + + if pg_shards.len() != request.shard_ids.len() { + warn_on_unacquired_shards(&request, &pg_shards); + } + let acquired_shards = pg_shards .into_iter() .map(|pg_shard| pg_shard.into()) @@ -2953,6 +2958,28 @@ impl PostgresqlMetastore { } } +/// Best-effort diagnostics for the acquire error path: logs the shards from `request` that were not +/// acquired — those absent from `acquired_pg_shards` because a more recent publish token owns them, +/// or because they no longer exist. Does not touch the database. +fn warn_on_unacquired_shards(request: &AcquireShardsRequest, acquired_pg_shards: &[PgShard]) { + let not_acquired_shard_ids: Vec<&ShardId> = request + .shard_ids + .iter() + .filter(|shard_id| { + !acquired_pg_shards + .iter() + .any(|pg_shard| &pg_shard.shard_id == *shard_id) + }) + .collect(); + warn!( + index_uid=%request.index_uid(), + source_id=%request.source_id, + shard_ids=?not_acquired_shard_ids, + publish_token=%request.publish_token, + "could not acquire shards: held by a more recent publish token, or no longer present" + ); +} + async fn open_or_fetch_shard<'e>( executor: impl Executor<'e, Database = Postgres> + Clone, subrequest: &OpenShardSubrequest, diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql index 740235a3851..e23a0b21a0a 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql +++ b/quickwit/quickwit-metastore/src/metastore/postgres/queries/shards/acquire.sql @@ -6,5 +6,13 @@ WHERE index_uid = $1 AND source_id = $2 AND shard_id = ANY ($3) + -- Acquisition is monotonic between ULIDs; a legacy presented token keeps pre-upgrade behavior. + AND ( + $4 LIKE '%/%' -- presented token is legacy (pre-ULID): always takes, for rolling upgrades + OR publish_token IS NULL -- never acquired: free to take + OR publish_token = '' -- empty placeholder: free to take + OR publish_token LIKE '%/%' -- recorded token is legacy: superseded by any ULID + OR $4 >= publish_token -- both are ULIDs: take only if ours is newer-or-equal + ) RETURNING * diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index f781a2e24ab..bf32042d6ea 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -229,6 +229,18 @@ pub async fn test_metastore_acquire_shards< ) .await; + // Publish tokens are the ULID of the indexing plan that minted them; ULIDs are time-ordered, so + // lexicographic order is chronological order. A token containing '/' is the legacy pre-ULID + // format: it loses to a ULID when recorded, but always wins when presented, so a rolling + // upgrade can still hand a shard back to an old indexer. + const OLDER_TOKEN: &str = "01000000000000000000000000"; + const TOKEN: &str = "02000000000000000000000000"; + const NEWER_TOKEN: &str = "03000000000000000000000000"; + const LEGACY_TOKEN: &str = + "indexer/test-node/test-index:0/test-source/01000000000000000000000000"; + + // Shard 1 owned by `TOKEN`, shard 2 unowned, shard 3 owned by a legacy token, shard 4 owned by + // `TOKEN`. let shards = vec![ Shard { index_uid: Some(test_index.index_uid.clone()), @@ -239,7 +251,7 @@ pub async fn test_metastore_acquire_shards< follower_id: Some("test-ingester-bar".to_string()), doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), - publish_token: Some("test-publish-token-foo".to_string()), + publish_token: Some(TOKEN.to_string()), update_timestamp: 1724158996, }, Shard { @@ -251,7 +263,7 @@ pub async fn test_metastore_acquire_shards< follower_id: Some("test-ingester-qux".to_string()), doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), - publish_token: Some("test-publish-token-bar".to_string()), + publish_token: None, update_timestamp: 1724158996, }, Shard { @@ -263,7 +275,7 @@ pub async fn test_metastore_acquire_shards< follower_id: Some("test-ingester-baz".to_string()), doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), - publish_token: None, + publish_token: Some(LEGACY_TOKEN.to_string()), update_timestamp: 1724158996, }, Shard { @@ -275,7 +287,7 @@ pub async fn test_metastore_acquire_shards< follower_id: Some("test-ingester-tux".to_string()), doc_mapping_uid: Some(DocMappingUid::default()), publish_position_inclusive: Some(Position::Beginning), - publish_token: None, + publish_token: Some(TOKEN.to_string()), update_timestamp: 1724158996, }, ]; @@ -283,27 +295,31 @@ pub async fn test_metastore_acquire_shards< .insert_shards(&test_index.index_uid, &test_index.source_id, shards) .await; - // Test acquire shards. - let acquire_shards_request = AcquireShardsRequest { - index_uid: Some(test_index.index_uid.clone()), - source_id: test_index.source_id.clone(), - shard_ids: vec![ - ShardId::from(1), - ShardId::from(2), - ShardId::from(3), - ShardId::from(666), - ], // shard 666 does not exist - publish_token: "test-publish-token-foo".to_string(), - }; - let mut acquire_shards_response = metastore - .acquire_shards(acquire_shards_request) + // A token ranking below the recorded one — and a non-existent shard — are refused: both are + // omitted from the response and the recorded token is left untouched. + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(1), ShardId::from(666)], + publish_token: OLDER_TOKEN.to_string(), + }) .await .unwrap(); + assert!(acquire_shards_response.acquired_shards.is_empty()); - acquire_shards_response - .acquired_shards - .sort_unstable_by(|left, right| left.shard_id().cmp(right.shard_id())); - + // The same token re-acquires successfully (idempotent, e.g. after a local respawn); the full + // shard is returned. + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(1)], + publish_token: TOKEN.to_string(), + }) + .await + .unwrap(); + assert_eq!(acquire_shards_response.acquired_shards.len(), 1); let shard = &acquire_shards_response.acquired_shards[0]; assert_eq!(shard.index_uid(), &test_index.index_uid); assert_eq!(shard.source_id, test_index.source_id); @@ -312,27 +328,72 @@ pub async fn test_metastore_acquire_shards< assert_eq!(shard.leader_id, "test-ingester-foo"); assert_eq!(shard.follower_id(), "test-ingester-bar"); assert_eq!(shard.publish_position_inclusive(), Position::Beginning); - assert_eq!(shard.publish_token(), "test-publish-token-foo"); + assert_eq!(shard.publish_token(), TOKEN); - let shard = &acquire_shards_response.acquired_shards[1]; - assert_eq!(shard.index_uid(), &test_index.index_uid); - assert_eq!(shard.source_id, test_index.source_id); - assert_eq!(shard.shard_id(), ShardId::from(2)); - assert_eq!(shard.shard_state(), ShardState::Open); - assert_eq!(shard.leader_id, "test-ingester-bar"); - assert_eq!(shard.follower_id(), "test-ingester-qux"); - assert_eq!(shard.publish_position_inclusive(), Position::Beginning); - assert_eq!(shard.publish_token(), "test-publish-token-foo"); + // A strictly newer ULID takes the shard over. + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(1)], + publish_token: NEWER_TOKEN.to_string(), + }) + .await + .unwrap(); + assert_eq!(acquire_shards_response.acquired_shards.len(), 1); + assert_eq!( + acquire_shards_response.acquired_shards[0].publish_token(), + NEWER_TOKEN + ); - let shard = &acquire_shards_response.acquired_shards[2]; - assert_eq!(shard.index_uid(), &test_index.index_uid); - assert_eq!(shard.source_id, test_index.source_id); - assert_eq!(shard.shard_id(), ShardId::from(3)); - assert_eq!(shard.shard_state(), ShardState::Open); - assert_eq!(shard.leader_id, "test-ingester-qux"); - assert_eq!(shard.follower_id(), "test-ingester-baz"); - assert_eq!(shard.publish_position_inclusive(), Position::Beginning); - assert_eq!(shard.publish_token(), "test-publish-token-foo"); + // An unowned shard can be acquired by any ULID. + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(2)], + publish_token: TOKEN.to_string(), + }) + .await + .unwrap(); + assert_eq!(acquire_shards_response.acquired_shards.len(), 1); + assert_eq!( + acquire_shards_response.acquired_shards[0].publish_token(), + TOKEN + ); + + // A ULID supersedes a recorded legacy token. + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(3)], + publish_token: TOKEN.to_string(), + }) + .await + .unwrap(); + assert_eq!(acquire_shards_response.acquired_shards.len(), 1); + assert_eq!( + acquire_shards_response.acquired_shards[0].publish_token(), + TOKEN + ); + + // Shard 4 is owned by a ULID, yet a legacy token reclaims it: the rolling-upgrade hand-back, + // where the control plane moves a shard from a new indexer back to an old one. + let acquire_shards_response = metastore + .acquire_shards(AcquireShardsRequest { + index_uid: Some(test_index.index_uid.clone()), + source_id: test_index.source_id.clone(), + shard_ids: vec![ShardId::from(4)], + publish_token: LEGACY_TOKEN.to_string(), + }) + .await + .unwrap(); + assert_eq!(acquire_shards_response.acquired_shards.len(), 1); + assert_eq!( + acquire_shards_response.acquired_shards[0].publish_token(), + LEGACY_TOKEN + ); cleanup_index(&mut metastore, test_index.index_uid).await; } diff --git a/quickwit/quickwit-proto/protos/quickwit/indexing.proto b/quickwit/quickwit-proto/protos/quickwit/indexing.proto index a4c28f46829..00a2e299add 100644 --- a/quickwit/quickwit-proto/protos/quickwit/indexing.proto +++ b/quickwit/quickwit-proto/protos/quickwit/indexing.proto @@ -26,6 +26,11 @@ service IndexingService { message ApplyIndexingPlanRequest { repeated IndexingTask indexing_tasks = 1; + // Identifier of the indexing plan, minted by the control plane as a ULID when the plan is + // applied. Indexers use it as the publish token for the shards they acquire: since ULIDs are + // monotonic, `AcquireShards` only succeeds for a token greater than or equal to the one already + // recorded, so a stale plan can never steal a shard from a more recent one. + string indexing_plan_id = 2; } message PipelineUid { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs index dc89720854a..986d1f4c156 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs @@ -4,6 +4,12 @@ pub struct ApplyIndexingPlanRequest { #[prost(message, repeated, tag = "1")] pub indexing_tasks: ::prost::alloc::vec::Vec, + /// Identifier of the indexing plan, minted by the control plane as a ULID when the plan is + /// applied. Indexers use it as the publish token for the shards they acquire: since ULIDs are + /// monotonic, `AcquireShards` only succeeds for a token greater than or equal to the one already + /// recorded, so a stale plan can never steal a shard from a more recent one. + #[prost(string, tag = "2")] + pub indexing_plan_id: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 118cd01edb9..a9109c1ad11 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -455,7 +455,7 @@ pub struct RootResourceStats { /// the first phase (running aggregation, and identifying the doc address of the top-k hits we should return) /// and the second phase (fetch documents). /// - /// If there are no top-k hits, the second phase . + /// If there are no top-k hits, the second phase is skipped. #[prost(uint64, tag = "8")] pub root_wall_time_microsecs: u64, }