Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7a7207b
Make MergeSchedulerService optional (#6266)
nadav-govari Apr 6, 2026
2862e87
Create compaction planner service stub; wire up serving layer (#6269)
nadav-govari Apr 9, 2026
19300bc
Scaffold Compactor supervisor and pipeline (#6282)
nadav-govari Apr 13, 2026
40b307c
Spawn merge pipeline from new compactor (#6284)
nadav-govari Apr 13, 2026
ecab99b
Wire up compactor service (#6291)
nadav-govari Apr 13, 2026
f35053e
Implement compactor pipeline update logic (#6297)
nadav-govari Apr 13, 2026
597a81e
Merge planner part 1: Metastore (#6305)
nadav-govari Apr 16, 2026
7fcb4ce
Merge planner part 2: gRPCs (#6310)
nadav-govari Apr 16, 2026
48d499c
Make ReportStatus gRPC from compactors (#6311)
nadav-govari Apr 16, 2026
1e44a30
Remove merge code from indexers; Renaming and cleanup (#6346)
nadav-govari May 12, 2026
d3a780c
Add metrics (#6350)
nadav-govari May 12, 2026
a289fa9
Improvements from live testing (#6404)
nadav-govari May 12, 2026
8056371
Wire up shared indexing split cache between indexers and compactors (…
nadav-govari May 20, 2026
d611865
Score merge operations like before; restructure MergeOperation (#6412)
nadav-govari May 20, 2026
8ff8744
Performance improvements: multipart upload, parallel gets, spans (#6451)
nadav-govari May 20, 2026
e2e5d6a
Exclude known split ids; double merge concurrency (#6452)
nadav-govari May 20, 2026
efaba73
Revert to using existing merge flow when standalone compactors isnt e…
nadav-govari May 20, 2026
1d5baf5
Comments from all PRs
nadav-govari May 20, 2026
78ba1a4
Merge branch nadav/feature-split-merges into main
nadav-govari May 21, 2026
62176e6
Optionally disable metrics merge pipelines like logs
nadav-govari May 21, 2026
82cb6fe
lints
nadav-govari May 21, 2026
1ee8565
Merge branch 'main' into nadav/merge-feature-split-merges
nadav-govari May 21, 2026
e60210f
lint
nadav-govari May 21, 2026
ce46988
Merge branch 'main' into nadav/merge-feature-split-merges
nadav-govari May 22, 2026
7054e1d
Rework merge task for both compactor and indexer merge flows (#6464)
nadav-govari May 29, 2026
a5b1b01
Make standalone compactors config top-level (#6471)
nadav-govari May 31, 2026
bdb04be
type conv
nadav-govari May 31, 2026
82fa2be
Merge branch 'main' into nadav/merge-feature-split-merges
nadav-govari Jun 22, 2026
3220d84
lints
nadav-govari Jun 22, 2026
d6a3fed
Decommission compactors gracefully (#6543)
nadav-govari Jun 25, 2026
11aced9
Ensure all indexers broadcast standalone compactors before starting m…
nadav-govari Jun 25, 2026
0391a21
Merge branch 'main' into nadav/merge-feature-split-merges
nadav-govari Jun 25, 2026
beb33fb
Fix a bunch of nits
guilload Jun 25, 2026
95ee771
Fix `test_qw_env_vars_expansion`
guilload Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"quickwit-codegen",
"quickwit-codegen/example",
"quickwit-common",
"quickwit-compaction",
"quickwit-config",
"quickwit-control-plane",
"quickwit-datafusion",
Expand All @@ -26,9 +27,9 @@ members = [
"quickwit-lambda-server",
"quickwit-macros",
"quickwit-metastore",
"quickwit-metrics-inventory",
"quickwit-metrics",
"quickwit-telemetry-exporters",
"quickwit-metrics-inventory",

# Disabling metastore-utils from the quickwit projects to ease build/deps.
# We can reenable it when we need it.
Expand Down Expand Up @@ -57,6 +58,7 @@ default-members = [
"quickwit-codegen",
"quickwit-codegen/example",
"quickwit-common",
"quickwit-compaction",
"quickwit-config",
"quickwit-control-plane",
"quickwit-datetime",
Expand All @@ -72,9 +74,8 @@ default-members = [
"quickwit-lambda-server",
"quickwit-macros",
"quickwit-metastore",
"quickwit-metrics",
"quickwit-telemetry-exporters",
"quickwit-metrics-inventory",
"quickwit-metrics",
"quickwit-opentelemetry",
"quickwit-parquet-engine",
"quickwit-proto",
Expand All @@ -83,6 +84,7 @@ default-members = [
"quickwit-search",
"quickwit-serve",
"quickwit-storage",
"quickwit-telemetry-exporters",
"quickwit-transport",
]

Expand Down Expand Up @@ -243,6 +245,7 @@ rustls = "0.23"
rustls-pemfile = "2.2"
sea-query = { version = "0.32" }
sea-query-binder = { version = "0.7", features = [
"postgres-array",
"runtime-tokio-rustls",
"sqlx-postgres",
] }
Expand Down Expand Up @@ -375,6 +378,7 @@ quickwit-cluster = { path = "quickwit-cluster" }
quickwit-codegen = { path = "quickwit-codegen" }
quickwit-codegen-example = { path = "quickwit-codegen/example" }
quickwit-common = { path = "quickwit-common" }
quickwit-compaction = { path = "quickwit-compaction" }
quickwit-config = { path = "quickwit-config" }
quickwit-control-plane = { path = "quickwit-control-plane" }
quickwit-datafusion = { path = "quickwit-datafusion" }
Expand All @@ -394,7 +398,6 @@ quickwit-lambda-server = { path = "quickwit-lambda-server" }
quickwit-macros = { path = "quickwit-macros" }
quickwit-metastore = { path = "quickwit-metastore" }
quickwit-metrics = { path = "quickwit-metrics" }
quickwit-telemetry-exporters = { path = "quickwit-telemetry-exporters" }
quickwit-metrics-inventory = { path = "quickwit-metrics-inventory" }
quickwit-opentelemetry = { path = "quickwit-opentelemetry" }
quickwit-parquet-engine = { path = "quickwit-parquet-engine" }
Expand All @@ -404,6 +407,7 @@ quickwit-rest-client = { path = "quickwit-rest-client" }
quickwit-search = { path = "quickwit-search" }
quickwit-serve = { path = "quickwit-serve" }
quickwit-storage = { path = "quickwit-storage" }
quickwit-telemetry-exporters = { path = "quickwit-telemetry-exporters" }
quickwit-transport = { path = "quickwit-transport" }

tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "1e859fd", default-features = false, features = [
Expand Down
22 changes: 15 additions & 7 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::io::{IsTerminal, Stdout, Write, stdout};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{env, fmt, io};

Expand All @@ -35,11 +36,11 @@ use quickwit_config::{
TransformConfig, VecSourceParams,
};
use quickwit_index_management::{IndexService, clear_cache_directory};
use quickwit_indexing::BoxedPipelineHandle;
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService};
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};
use quickwit_indexing::{BoxedPipelineHandle, IndexingSplitCache};
use quickwit_ingest::IngesterPool;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::indexing::CpuCapacity;
Expand All @@ -54,12 +55,12 @@ use quickwit_serve::{
use quickwit_storage::{BundleStorage, Storage};
use quickwit_transport::ChannelFactory;
use thousands::Separable;
use tracing::{debug, info};
use tracing::debug;

use crate::checklist::{GREEN_COLOR, RED_COLOR};
use crate::{
THROUGHPUT_WINDOW_SIZE, config_cli_arg, get_resolvers, load_node_config, run_index_checklist,
start_actor_runtimes,
THROUGHPUT_WINDOW_SIZE, config_cli_arg, get_resolvers, info, load_node_config,
run_index_checklist, start_actor_runtimes,
};

pub fn build_tool_command() -> Command {
Expand Down Expand Up @@ -220,8 +221,8 @@ pub enum ToolCliCommand {
GarbageCollect(GarbageCollectIndexArgs),
LocalIngest(LocalIngestDocsArgs),
LocalSearch(LocalSearchArgs),
Merge(MergeArgs),
ExtractSplit(ExtractSplitArgs),
Merge(MergeArgs),
}

impl ToolCliCommand {
Expand Down Expand Up @@ -447,6 +448,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
)?;
let universe = Universe::new();
let merge_scheduler_service_mailbox = universe.get_or_spawn_one();
let split_cache =
Arc::new(IndexingSplitCache::from_config(&indexer_config, &config.data_dir_path).await?);
let indexing_server = IndexingService::new(
config.node_id.clone(),
config.data_dir_path.clone(),
Expand All @@ -455,10 +458,11 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
cluster,
metastore,
None,
merge_scheduler_service_mailbox,
Some(merge_scheduler_service_mailbox),
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
split_cache,
)
.await?;
let (indexing_server_mailbox, indexing_server_handle) =
Expand Down Expand Up @@ -492,11 +496,13 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
let statistics =
start_statistics_reporting_loop(indexing_pipeline_handle, args.input_path_opt.is_none())
.await?;

merge_pipeline_handle
.mailbox()
.ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline)
.await?;
merge_pipeline_handle.join().await;

// Shutdown the indexing server.
universe
.send_exit_with_success(&indexing_server_mailbox)
Expand Down Expand Up @@ -592,10 +598,11 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
cluster,
metastore,
None,
merge_scheduler_service,
Some(merge_scheduler_service),
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
Arc::new(IndexingSplitCache::no_caching()),
)
.await?;
let (indexing_service_mailbox, indexing_service_handle) =
Expand Down Expand Up @@ -1005,6 +1012,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
indexing_cpu_capacity: CpuCapacity::zero(),
ingester_status: IngesterStatus::default(),
availability_zone: None,
enable_standalone_compactors: false,
};
let channel_factory = ChannelFactory::for_grpc(&config.grpc_config)?;
let cluster = Cluster::join(
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ async fn test_garbage_collect_cli_no_grace() {
let index_path = test_env.indexes_dir_path.join(&test_env.index_id);
assert_eq!(index_path.try_exists().unwrap(), true);

let split_ids = vec![splits_metadata[0].split_id().to_string()];
let split_ids = vec![splits_metadata[0].split_id.clone()];
let metastore = refresh_metastore(metastore).await.unwrap();
let mark_for_deletion_request =
MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids.clone());
Expand Down
23 changes: 22 additions & 1 deletion quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::grpc_gossip::spawn_catchup_callback_task;
use crate::member::{
AVAILABILITY_ZONE_KEY, ClusterMember, ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY,
NodeStateExt, PIPELINE_METRICS_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY,
READINESS_VALUE_READY,
READINESS_VALUE_READY, STANDALONE_COMPACTORS_KEY,
};
use crate::metrics::spawn_metrics_task;
use crate::{ClusterChangeStream, ClusterNode};
Expand Down Expand Up @@ -224,6 +224,10 @@ impl Cluster {
if let Some(az) = &self_node.availability_zone {
initial_key_values.push((AVAILABILITY_ZONE_KEY.to_string(), az.clone()));
}
initial_key_values.push((
STANDALONE_COMPACTORS_KEY.to_string(),
self_node.enable_standalone_compactors.to_string(),
));
let chitchat_handle =
spawn_chitchat(chitchat_config, initial_key_values, transport).await?;

Expand Down Expand Up @@ -275,6 +279,16 @@ impl Cluster {
.collect()
}

pub async fn live_nodes(&self) -> Vec<ClusterNode> {
self.inner
.read()
.await
.live_nodes
.values()
.cloned()
.collect()
}

/// Returns a stream of changes affecting the set of ready nodes in the cluster.
///
/// Replays currently-ready nodes as `Add` events before future changes, under the write lock,
Expand Down Expand Up @@ -339,6 +353,12 @@ impl Cluster {
.await
}

#[cfg(any(test, feature = "testsuite"))]
pub async fn set_self_enable_standalone_compactors(&self, enable: bool) {
self.set_self_key_value(STANDALONE_COMPACTORS_KEY, enable)
.await;
}

/// Sets a key-value pair on the cluster node's state.
pub async fn set_self_key_value(&self, key: impl Display, value: impl Display) {
self.chitchat()
Expand Down Expand Up @@ -716,6 +736,7 @@ pub async fn create_cluster_for_test_with_id(
indexing_cpu_capacity: PIPELINE_FULL_CAPACITY,
ingester_status: IngesterStatus::default(),
availability_zone: None,
enable_standalone_compactors: false,
};
let failure_detector_config = create_failure_detector_config_for_test();
let cluster = Cluster::join(
Expand Down
9 changes: 7 additions & 2 deletions quickwit/quickwit-cluster/src/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ impl ClusterService for Cluster {
#[cfg(test)]
mod tests {
use super::*;
use crate::member::{ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY, READINESS_KEY};
use crate::member::{
ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY, READINESS_KEY, STANDALONE_COMPACTORS_KEY,
};
use crate::{ChitchatTransport, create_cluster_for_test};

#[tokio::test]
Expand Down Expand Up @@ -161,7 +163,7 @@ mod tests {
.key_values
.sort_unstable_by(|left, right| left.key.cmp(&right.key));

assert_eq!(node_state.key_values.len(), 4);
assert_eq!(node_state.key_values.len(), 5);
assert_eq!(node_state.key_values[0].key, ENABLED_SERVICES_KEY);
assert_eq!(node_state.key_values[0].value, "indexer");

Expand All @@ -172,5 +174,8 @@ mod tests {

assert_eq!(node_state.key_values[3].key, READINESS_KEY);
assert_eq!(node_state.key_values[3].value, "READY");

assert_eq!(node_state.key_values[4].key, STANDALONE_COMPACTORS_KEY);
assert_eq!(node_state.key_values[4].value, "false");
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
indexing_cpu_capacity,
ingester_status: IngesterStatus::default(),
availability_zone: node_config.availability_zone.clone(),
enable_standalone_compactors: node_config.enable_standalone_compactors,
};
let failure_detector_config = FailureDetectorConfig {
dead_node_grace_period: Duration::from_secs(2 * 60 * 60), // 2 hours
Expand Down
Loading
Loading