Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use quickwit_proto::types::NodeId;
use scheduling::{SourceToSchedule, SourceToScheduleType};
use serde::Serialize;
use tracing::{debug, info, warn};
use ulid::Ulid;

use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier};
Expand Down Expand Up @@ -419,6 +420,11 @@ impl IndexingScheduler {
) {
debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan");
APPLY_PLAN_TOTAL.inc();
// The indexing plan ID is a monotonically increasing time based ID that's used as the
// publish token for indexers, which ensures indexing plans and shard acquisition are always
// informed by the most recent plan.
let indexing_plan_id = Ulid::new().to_string();
Comment thread
nadav-govari marked this conversation as resolved.

// Retiring and decommissioning indexers still receive the plan so they can gracefully shut
// down dropped pipelines; other states (initializing, decommissioned, failed) are skipped.
for indexer in self.indexer_pool.values().into_iter().filter(|indexer| {
Expand All @@ -431,20 +437,24 @@ impl IndexingScheduler {
.indexer(indexer.node_id.as_str())
.unwrap_or(&[])
.to_vec();

// We don't want to block on a slow indexer so we apply this change asynchronously.
// Bound the apply only for retiring/decommissioning indexers, so a slow or unreachable
// draining node can't hold the change-notification guard; ready indexers get no
// timeout.
// Retiring/decommissioning indexers are time-bound, so a slow or unreachable
// draining node can't hold the notify guard. Ready indexers get no timeout.
let apply_deadline = matches!(
indexer.ingester_status,
IngesterStatus::Retiring | IngesterStatus::Decommissioning
)
.then_some(APPLY_INDEXING_PLAN_TIMEOUT);

let notify_on_drop = notify_on_drop.clone();
let indexing_plan_id = indexing_plan_id.clone();
tokio::spawn(async move {
let client = indexer.client.clone();
let apply_plan_fut =
client.apply_indexing_plan(ApplyIndexingPlanRequest { indexing_tasks });
let apply_plan_fut = client.apply_indexing_plan(ApplyIndexingPlanRequest {
indexing_tasks,
indexing_plan_id,
});
let apply_result = match apply_deadline {
Some(timeout) => tokio::time::timeout(timeout, apply_plan_fut).await,
None => Ok(apply_plan_fut.await),
Expand Down
18 changes: 1 addition & 17 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ use tokio::runtime::Handle;
use super::vrl_processing::*;
use crate::actors::Indexer;
use crate::metrics::{PROCESSED_BYTES, PROCESSED_DOCS_TOTAL};
use crate::models::{
NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch,
};
use crate::models::{NewPublishLock, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch};

const PLAIN_TEXT: &str = "plain_text";
pub(super) struct JsonDoc {
Expand Down Expand Up @@ -607,20 +605,6 @@ impl Handler<NewPublishLock> for DocProcessor {
}
}

#[async_trait]
impl Handler<NewPublishToken> for DocProcessor {
type Reply = ();

async fn handle(
&mut self,
message: NewPublishToken,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
ctx.send_message(&self.indexer_mailbox, message).await?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ impl Handler<IndexedSplitBatchBuilder> for IndexSerializer {
splits,
checkpoint_delta_opt: batch_builder.checkpoint_delta_opt,
publish_lock: batch_builder.publish_lock,
publish_token_opt: batch_builder.publish_token_opt,
merge_task_opt: None,
batch_parent_span: batch_builder.batch_parent_span,
};
Expand Down
27 changes: 2 additions & 25 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use quickwit_proto::indexing::{IndexingPipelineId, PipelineMetrics};
use quickwit_proto::metastore::{
LastDeleteOpstampRequest, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::{DocMappingUid, PublishToken};
use quickwit_proto::types::DocMappingUid;
use quickwit_query::get_quickwit_fastfield_normalizer_manager;
use serde::Serialize;
use tantivy::schema::Schema;
Expand All @@ -55,7 +55,7 @@ use super::cooperative_indexing::{CooperativeIndexingCycle, CooperativeIndexingP
use crate::metrics::SPLIT_BUILDERS;
use crate::models::{
CommitTrigger, EmptySplit, IndexedSplitBatchBuilder, IndexedSplitBuilder, NewPublishLock,
NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock,
ProcessedDoc, ProcessedDocBatch, PublishLock,
};

// Random partition ID used to gather partitions exceeding the maximum number of partitions.
Expand Down Expand Up @@ -93,7 +93,6 @@ struct IndexerState {
indexing_directory: TempDirectory,
indexing_settings: IndexingSettings,
publish_lock: PublishLock,
publish_token_opt: Option<PublishToken>,
schema: Schema,
doc_mapping_uid: DocMappingUid,
tokenizer_manager: TokenizerManager,
Expand Down Expand Up @@ -219,7 +218,6 @@ impl IndexerState {
source_delta: SourceCheckpointDelta::default(),
};
let publish_lock = self.publish_lock.clone();
let publish_token_opt = self.publish_token_opt.clone();

let split_builders_guard = GaugeGuard::new(&SPLIT_BUILDERS, 1.0);

Expand All @@ -231,7 +229,6 @@ impl IndexerState {
other_indexed_split_opt: None,
checkpoint_delta,
publish_lock,
publish_token_opt,
last_delete_opstamp,
memory_usage: GaugeGuard::new(&IN_FLIGHT_INDEX_WRITER, 0.0),
cooperative_indexing_period,
Expand Down Expand Up @@ -349,7 +346,6 @@ struct IndexingWorkbench {

checkpoint_delta: IndexCheckpointDelta,
publish_lock: PublishLock,
publish_token_opt: Option<PublishToken>,
// On workbench creation, we fetch from the metastore the last delete task opstamp.
// We use this value to set the `delete_opstamp` of the workbench splits.
last_delete_opstamp: u64,
Expand Down Expand Up @@ -513,21 +509,6 @@ impl Handler<NewPublishLock> for Indexer {
}
}

#[async_trait]
impl Handler<NewPublishToken> for Indexer {
type Reply = ();

async fn handle(
&mut self,
message: NewPublishToken,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let NewPublishToken(publish_token) = message;
self.indexer_state.publish_token_opt = Some(publish_token);
Ok(())
}
}

impl Indexer {
pub fn new(
pipeline_id: IndexingPipelineId,
Expand Down Expand Up @@ -563,7 +544,6 @@ impl Indexer {
indexing_directory,
indexing_settings,
publish_lock: PublishLock::default(),
publish_token_opt: None,
schema,
doc_mapping_uid: doc_mapper.doc_mapping_uid(),
tokenizer_manager: tokenizer_manager.tantivy_manager().clone(),
Expand Down Expand Up @@ -630,7 +610,6 @@ impl Indexer {
other_indexed_split_opt,
checkpoint_delta,
publish_lock,
publish_token_opt,
batch_parent_span,
memory_usage,
split_builders_guard,
Expand All @@ -656,7 +635,6 @@ impl Indexer {
index_uid: self.indexer_state.pipeline_id.index_uid.clone(),
checkpoint_delta,
publish_lock,
publish_token_opt,
batch_parent_span,
},
)
Expand All @@ -680,7 +658,6 @@ impl Indexer {
splits,
checkpoint_delta_opt: Some(checkpoint_delta),
publish_lock,
publish_token_opt,
commit_trigger,
batch_parent_span,
memory_usage,
Expand Down
13 changes: 12 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::actors::uploader::UploaderType;
use crate::actors::{Publisher, Uploader};
use crate::merge_policy::MergePolicy;
use crate::metrics::{ACTOR_NAME, BACKPRESSURE_MICROS, INDEXING_PIPELINES};
use crate::models::IndexingStatistics;
use crate::models::{IndexingStatistics, SharedPublishToken};
use crate::source::{
AssignShards, Assignment, SourceActor, SourceRuntime, quickwit_supported_sources,
};
Expand Down Expand Up @@ -89,6 +89,10 @@ pub struct IndexingPipeline {
// requiring a respawn of the pipeline.
// We keep the list of shards here however, to reassign them after a respawn.
shard_ids: BTreeSet<ShardId>,
// Id of the last indexing plan assigned to this pipeline. Kept here, like `shard_ids`, so it
// can be re-sent to the source on respawn; the source adopts it as its publish token.
indexing_plan_id: String,
publish_token: SharedPublishToken,
_indexing_pipelines_gauge_guard: GaugeGuard,
}

Expand Down Expand Up @@ -137,6 +141,8 @@ impl IndexingPipeline {
..Default::default()
},
shard_ids: Default::default(),
indexing_plan_id: String::new(),
publish_token: SharedPublishToken::default(),
_indexing_pipelines_gauge_guard: indexing_pipelines_gauge_guard,
}
}
Expand Down Expand Up @@ -306,6 +312,7 @@ impl IndexingPipeline {
self.params.metastore.clone(),
Some(self.params.merge_planner_mailbox.clone()),
Some(source_mailbox.clone()),
self.publish_token.clone(),
);
let (publisher_mailbox, publisher_handle) = ctx
.spawn_actor()
Expand Down Expand Up @@ -390,6 +397,7 @@ impl IndexingPipeline {
storage_resolver: self.params.source_storage_resolver.clone(),
event_broker: self.params.event_broker.clone(),
indexing_setting: self.params.indexing_settings.clone(),
publish_token: self.publish_token.clone(),
};
let source = ctx
.protect_future(quickwit_supported_sources().load_source(source_runtime))
Expand All @@ -402,6 +410,7 @@ impl IndexingPipeline {
.spawn(actor_source);
let assign_shards_message = AssignShards(Assignment {
shard_ids: self.shard_ids.clone(),
indexing_plan_id: self.indexing_plan_id.clone(),
});
source_mailbox.send_message(assign_shards_message).await?;

Expand Down Expand Up @@ -496,6 +505,8 @@ impl Handler<AssignShards> for IndexingPipeline {
) -> Result<(), ActorExitStatus> {
self.shard_ids
.clone_from(&assign_shards_message.0.shard_ids);
self.indexing_plan_id
.clone_from(&assign_shards_message.0.indexing_plan_id);
// If the pipeline is running, we forward the message to its source.
// If it is not, it will be respawned soon, and the shards will be assigned afterward.
if let Some(handles) = &self.handles_opt {
Expand Down
Loading
Loading