diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 76db9300d0f..009acfffc96 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -94,6 +94,7 @@ async fn open_fts_segments( .await } +#[allow(clippy::too_many_arguments)] async fn search_segments( indices: &[Arc], tokens: Arc, @@ -102,6 +103,7 @@ async fn search_segments( pre_filter: Arc, metrics: Arc, base_scorer: Arc, + parallelism: usize, ) -> Result<(Vec, Vec)> { let limit = params.limit.unwrap_or(usize::MAX); let mut candidates = std::collections::BinaryHeap::new(); @@ -128,7 +130,7 @@ async fn search_segments( } }) .collect::>(); - let searches = stream::iter(searches).buffer_unordered(get_num_compute_intensive_cpus()); + let searches = stream::iter(searches).buffer_unordered(parallelism); let mut searches = searches; while let Some((doc_ids, scores)) = searches.try_next().await? { @@ -432,6 +434,7 @@ impl ExecutionPlan for MatchQueryExec { partition: usize, context: Arc, ) -> DataFusionResult { + let target_partitions = context.session_config().target_partitions(); let query = self.query.clone(); let params = self.params.clone(); let ds = self.dataset.clone(); @@ -439,6 +442,9 @@ impl ExecutionPlan for MatchQueryExec { let preset_base_scorer = self.base_scorer.clone(); let preset_segments = self.preset_segments.clone(); let metrics = Arc::new(FtsIndexMetrics::new(&self.metrics, partition)); + let parallelism = get_num_compute_intensive_cpus() + .min(target_partitions) + .max(1); let column = query.column.ok_or(DataFusionError::Execution(format!( "column not set for MatchQuery {}", query.terms @@ -515,6 +521,7 @@ impl ExecutionPlan for MatchQueryExec { pre_filter, metrics.clone(), base_scorer, + parallelism, ) .await?; scores.iter_mut().for_each(|s| { @@ -1316,6 +1323,10 @@ impl ExecutionPlan for PhraseQueryExec { partition: usize, context: Arc, ) -> DataFusionResult { + let target_partitions = context.session_config().target_partitions(); + let parallelism = get_num_compute_intensive_cpus() + .min(target_partitions) + .max(1); let query = self.query.clone(); let params = self.params.clone(); let ds = self.dataset.clone(); @@ -1385,6 +1396,7 @@ impl ExecutionPlan for PhraseQueryExec { pre_filter, metrics.clone(), base_scorer, + parallelism, ) .await?; metrics.baseline_metrics.record_output(doc_ids.len()); diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index 48bd9fb4196..5340da4fbe5 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -510,6 +510,7 @@ impl ExecutionPlan for KNNVectorDistanceExec { partition: usize, context: Arc, ) -> DataFusionResult { + let target_partitions = context.session_config().target_partitions(); let input_stream = self.input.execute(partition, context)?; if self.is_batch { let stream = stream::once(Self::execute_batch( @@ -572,7 +573,11 @@ impl ExecutionPlan for KNNVectorDistanceExec { .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) } }) - .buffer_unordered(get_num_compute_intensive_cpus()); + .buffer_unordered( + get_num_compute_intensive_cpus() + .min(target_partitions) + .max(1), + ); let stream = stream.map(move |batch| { let poll = baseline.record_poll(std::task::Poll::Ready(Some(batch))); @@ -2633,6 +2638,64 @@ mod tests { assert_eq!(expected, results[0]); } + /// Verify that KNNVectorDistanceExec with target_partitions=1 produces the same + /// row count as the default context. Regression guard for the parallelism cap. + #[tokio::test] + async fn test_knn_vector_distance_respects_target_partitions() { + use arrow_array::UInt64Array; + use datafusion::execution::context::{SessionConfig, TaskContext}; + + let dim: usize = 16; + let n_batches = 10; + let batch_size = 50; + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new(ROW_ID, DataType::UInt64, false), + ArrowField::new( + "vector", + DataType::FixedSizeList( + Arc::new(ArrowField::new("item", DataType::Float32, true)), + dim as i32, + ), + true, + ), + ])); + + let batches: Vec = (0..n_batches) + .map(|i| { + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt64Array::from_iter_values( + ((i * batch_size) as u64)..((i + 1) as u64 * batch_size as u64), + )), + Arc::new( + FixedSizeListArray::try_new_from_values( + generate_random_array(dim * batch_size), + dim as i32, + ) + .unwrap(), + ), + ], + ) + .unwrap() + }) + .collect(); + + let input: Arc = Arc::new(TestingExec::new(batches)); + let query_vec = Arc::new(generate_random_array(dim)) as ArrayRef; + let exec = + KNNVectorDistanceExec::try_new(input, "vector", query_vec, DistanceType::L2).unwrap(); + + let low_ctx = Arc::new( + TaskContext::default() + .with_session_config(SessionConfig::default().with_target_partitions(1)), + ); + let stream = exec.execute(0, low_ctx).unwrap(); + let batches = stream.try_collect::>().await.unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, n_batches * batch_size); + } + #[test] fn test_create_knn_flat() { let dim: usize = 128;