[FEA] Multi-node Out of Core Streaming KMeans API#2066
Conversation
…nto combine-batch
…nto combine-batch
…nto combine-batch
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cpp/src/cluster/detail/kmeans_mg.cuh`:
- Around line 188-197: The variable has_data is declared after it's used in the
if-block; move the declaration bool has_data = (n_local > 0); to before the if
statement that uses it (the block containing RAFT_LOG_WARN and
streaming_batch_size assignment) or replace uses of has_data with the expression
(n_local > 0) directly; ensure you only have one definition of has_data (no
shadowing) so functions/conditions like the if (data_on_device && has_data &&
streaming_batch_size < max_part_rows) see a valid variable.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 95c4ace5-47ce-4301-a2d0-ca00029462a5
📒 Files selected for processing (2)
cpp/src/cluster/detail/kmeans_mg.cuhcpp/tests/cluster/kmeans_mg.cu
💤 Files with no reviewable changes (1)
- cpp/tests/cluster/kmeans_mg.cu
|
/ok to test 089e970 |
viclafargue
left a comment
There was a problem hiding this comment.
Thanks for working on this @tarang-jain. Unifying the implementations is a good step for maintainability, but I’d like to raise a few concerns around initialization.
This PR changes regular MG KMeans initialization semantics in ways that should be discussed explicitly:
- The truly distributed KMeans++ initialization is dropped. This limits the scaling of the initialization of centroids to what the root rank allows. This may be an acceptable change as there may be less communication steps with this simpler init, be it would need to be discussed.
init_sizeis documented as a host-only parameter, but regular KMeans now uses it or defaults tomin(3 * n_clusters, global_n).- Some NCCL communication patterns (left from the multi-GPU Batched KMeans PR) should be revisited :
- Array init : Redundant broadcast from root. All ranks should normally already be correctly initialized with the user-defined centroids.
- Random init : Points are randomly sampled on ranks, and then merged with an allreduce operation, finally they are broadcasted again from root. We should remove the unnecessary broadcast.
- KMeans++ : If we keep the current root-rank implementation, the communication pattern should be adjusted. Only the root rank needs the full initialization sample, so gather-to-root would be a better fit than dense allreduce over the full sampled matrix on every rank. The resulting centroids can then be broadcast.
| void fit( | ||
| raft::resources const& handle, | ||
| const cuvs::cluster::kmeans::params& params, | ||
| const std::vector<raft::device_matrix_view<const float, int>>& X_parts, |
There was a problem hiding this comment.
Please expose the new API in cpp/include/cuvs/cluster/kmeans.hpp.
There was a problem hiding this comment.
Yeah we need to find a good way to do that. The problem is that it would look identical to the single matrix, single GPU API. If a user passes an SG handle to this API, it would go through this MG impl
There was a problem hiding this comment.
I added them in the ::mg namespace.
| if (n_weights == 0) { continue; } | ||
|
|
||
| auto d_part_wt = raft::make_device_scalar<DataT>(dev_res, DataT{0}); | ||
| cuvs::cluster::kmeans::detail::weightSum(dev_res, weights, d_part_wt.view()); |
There was a problem hiding this comment.
Partition weights are validated per partition, not globally. weightSum throws an error when the partial sum is <= 0, but we should instead fail when the global sum (after reduce operation) is <= 0.
There was a problem hiding this comment.
I intentionally removed the global assertion. Earlier we were doing a stream sync to bring the sum to host and validate on host. I dont think syncing the whole stream just for an assertion is worth the perf impact.
Merge after #2015 and #2017
Allows a stream of input matrices per worker, that are further batched using the
streaming_batch_sizeparameter. Reasoning: We should be able to supply dask partitions (on host) directly without having to concatenate them into one consolidated matrix.As a part of this PR, we also unify the multi-GPU implementations into one (earlier the out of core implementation was separate).
Tests: We get rid of the separate out of core test file. The single MG testing unit is taking care of both out of core and on device matrices.