Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6a2198f
feat(tdigest): implement TDIGEST.TRIMMED_MEAN command
chakkk309 Dec 25, 2025
7799524
Merge branch 'unstable' into feat-implement-TDIGEST.TRIMMED_MEAN-command
chakkk309 Dec 25, 2025
4394482
Merge branch 'unstable' into feat-implement-TDIGEST.TRIMMED_MEAN-command
chakkk309 Jan 15, 2026
178b1ee
tdigest: refine trimmed mean implementation and tests
chakkk309 Jan 18, 2026
8b2bb67
Merge branch 'unstable' into feat-implement-TDIGEST.TRIMMED_MEAN-command
chakkk309 Jan 19, 2026
cf5451a
Merge branch 'unstable' into feat-implement-TDIGEST.TRIMMED_MEAN-command
chakkk309 Mar 4, 2026
3f01bd6
refactor(tdigest): move TRIMMED_MEAN quantile validation to parse step
chakkk309 Mar 8, 2026
af7b44f
fix(tdigest): fix TRIMMED_MEAN algorithm and parameter validation
chakkk309 Mar 8, 2026
a7f773f
test(tdigest): improve TRIMMED_MEAN test coverage and precision
chakkk309 Mar 8, 2026
75c60a7
test(tdigest): extract TRIMMED_MEAN error messages as constants
chakkk309 Mar 8, 2026
f9ca36d
Merge branch 'unstable' into feat-implement-TDIGEST.TRIMMED_MEAN-command
chakkk309 Mar 8, 2026
7d02463
test(tdigest): extract TRIMMED_MEAN error messages as constants
chakkk309 Mar 9, 2026
1cf77d3
Merge remote-tracking branch 'fork/feat-implement-TDIGEST.TRIMMED_MEA…
chakkk309 Mar 9, 2026
8a3e9b1
Update src/types/redis_tdigest.cc
chakkk309 Mar 9, 2026
fedf4b8
fix(tdigest): handle non-finite quantile inputs
chakkk309 Mar 9, 2026
6afbab9
Merge branch 'unstable' into feat-implement-TDIGEST.TRIMMED_MEAN-command
chakkk309 Mar 11, 2026
6fb6a9e
chore(types): clarify trimmed mean overlap logic
chakkk309 Mar 12, 2026
f5b60b0
chore(tests): remove redundant tdigest NaN assertion
chakkk309 Mar 12, 2026
49cdccb
fix(commands): align tdigest trimmed mean errors with redis
chakkk309 Mar 12, 2026
9b7ac7c
fix(tdigest): handle nan quantile inputs
chakkk309 Mar 12, 2026
592558d
style(commands): simplify tdigest trimmed mean errors
chakkk309 Mar 12, 2026
cfccf80
Merge branch 'unstable' into feat-implement-TDIGEST.TRIMMED_MEAN-command
chakkk309 Mar 12, 2026
5960a92
Merge branch 'unstable' into feat-implement-TDIGEST.TRIMMED_MEAN-command
chakkk309 Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/commands/cmd_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
constexpr auto kInfoObservations = "Observations";
constexpr auto kInfoTotalCompressions = "Total compressions";
constexpr auto kNan = "nan";

constexpr const char *errParseLowCutQuantile = "error parsing low_cut_percentile";
constexpr const char *errParseHighCutQuantile = "error parsing high_cut_percentile";
constexpr const char *errCutQuantileRange = "low_cut_percentile and high_cut_percentile should be in [0,1]";
constexpr const char *errLowCutQuantileLess = "low_cut_percentile should be lower than high_cut_percentile";
} // namespace

class CommandTDigestCreate : public Commander {
Expand Down Expand Up @@ -492,6 +497,67 @@
TDigestMergeOptions options_;
};

class CommandTDigestTrimmedMean : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {

Check failure on line 502 in src/commands/cmd_tdigest.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Correct these functions so that no function in "redis::CommandTDigestTrimmedMean::Parse" hides a function in "redis::Commander::Parse".

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZy4BIDrvOvPLz4Gtlsi&open=AZy4BIDrvOvPLz4Gtlsi&pullRequest=3312
if (args.size() != 4) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

key_name_ = args[1];

auto low_cut_quantile = ParseFloat(args[2]);
if (!low_cut_quantile || std::isnan(*low_cut_quantile)) {
return {Status::RedisParseErr, errParseLowCutQuantile};
}
low_cut_quantile_ = *low_cut_quantile;

auto high_cut_quantile = ParseFloat(args[3]);
if (!high_cut_quantile || std::isnan(*high_cut_quantile)) {
return {Status::RedisParseErr, errParseHighCutQuantile};
}
high_cut_quantile_ = *high_cut_quantile;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the validation of high_cut_quantile and low_cut_quantile.
The parameter validation should be done in the earliest step rather than in the command processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! I've moved the validation to Parse stage and removed the duplicate checks in the TDigestTrimmedMean method.


if (!std::isfinite(low_cut_quantile_) || low_cut_quantile_ < 0.0 || low_cut_quantile_ > 1.0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a string validation before numeric validation maybe a better way to avoid the unstable comparison of float numbers.
The string must be ^(0(?:\.\d*)?)|(1(?:\.0*))$. Please double confirm my regex.
We could also use comparing with delta to do this, but from pure literal text would be more stable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested Redis's actual behavior and found that it accepts numeric forms like .5 and +0.5 for this command. If we add a strict string-level regex here, would that make it more restrictive than Redis?

To preserve Redis compatibility, I kept numeric parsing and aligned the validation with Redis. Could you please let me know if this approach would be acceptable? 👀

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your update.

In this case we could keep the float number validation.

Maybe we could add a decimal parser in future.😊

return {Status::RedisParseErr, errCutQuantileRange};
}
if (!std::isfinite(high_cut_quantile_) || high_cut_quantile_ < 0.0 || high_cut_quantile_ > 1.0) {
return {Status::RedisParseErr, errCutQuantileRange};
}
if (low_cut_quantile_ >= high_cut_quantile_) {
return {Status::RedisParseErr, errLowCutQuantileLess};
}
Comment on lines +509 to +529
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The quantile validation doesn’t handle NaN: ParseFloat("nan") succeeds, but comparisons like < 0.0 / > 1.0 will be false, and DoubleCompare with NaN will route to the "low cut quantile must be less..." error (or even allow invalid values through in other cases). Explicitly reject NaN (and ideally non-finite values) for both low_cut_quantile_ and high_cut_quantile_ so invalid inputs consistently return the intended range errors.

Copilot uses AI. Check for mistakes.

return Status::OK();
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
TDigest tdigest(srv->storage, conn->GetNamespace());
TDigestTrimmedMeanResult result;

auto s = tdigest.TrimmedMean(ctx, key_name_, low_cut_quantile_, high_cut_quantile_, &result);
if (!s.ok()) {

Check warning on line 539 in src/commands/cmd_tdigest.cc

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use the init-statement to declare "s" inside the if statement.

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZy4BIDrvOvPLz4Gtlsj&open=AZy4BIDrvOvPLz4Gtlsj&pullRequest=3312
if (s.IsNotFound()) {
return {Status::RedisExecErr, errKeyNotFound};
}
return {Status::RedisExecErr, s.ToString()};
}

if (!result.mean.has_value()) {
*output = redis::BulkString(kNan);
} else {
*output = redis::BulkString(util::Float2String(*result.mean));
}

return Status::OK();
}

private:
std::string key_name_;
double low_cut_quantile_;
double high_cut_quantile_;
};

std::vector<CommandKeyRange> GetMergeKeyRange(const std::vector<std::string> &args) {
auto numkeys = ParseInt<int>(args[2], 10).ValueOr(0);
return {{1, 1, 1}, {3, 2 + numkeys, 1}};
Expand All @@ -507,6 +573,7 @@
MakeCmdAttr<CommandTDigestByRevRank>("tdigest.byrevrank", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestByRank>("tdigest.byrank", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestTrimmedMean>("tdigest.trimmed_mean", 4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, "write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4, "write", GetMergeKeyRange));
} // namespace redis
36 changes: 36 additions & 0 deletions src/types/redis_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,42 @@ rocksdb::Status TDigest::applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBat
return rocksdb::Status::OK();
}

rocksdb::Status TDigest::TrimmedMean(engine::Context& ctx, const Slice& digest_name, double low_cut_quantile,
double high_cut_quantile, TDigestTrimmedMeanResult* result) {
auto ns_key = AppendNamespacePrefix(digest_name);
TDigestMetadata metadata;

{
LockGuard guard(storage_->GetLockManager(), ns_key);
if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}

if (metadata.total_observations == 0) {
result->mean.reset();
return rocksdb::Status::OK();
}

if (auto status = mergeNodes(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
}

// Dump centroids and create DummyCentroids wrapper for TDigest algorithm
std::vector<Centroid> centroids;
if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); !status.ok()) {
return status;
}
auto dump_centroids = DummyCentroids<false>(metadata, centroids);
auto trimmed_mean_result = TDigestTrimmedMean(dump_centroids, low_cut_quantile, high_cut_quantile);
if (!trimmed_mean_result) {
return rocksdb::Status::InvalidArgument(trimmed_mean_result.Msg());
}

result->mean = *trimmed_mean_result;
return rocksdb::Status::OK();
}

std::string TDigest::internalSegmentGuardPrefixKey(const TDigestMetadata& metadata, const std::string& ns_key,
SegmentType seg) const {
std::string prefix_key;
Expand Down
6 changes: 6 additions & 0 deletions src/types/redis_tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ struct TDigestQuantitleResult {
std::optional<std::vector<double>> quantiles;
};

struct TDigestTrimmedMeanResult {
std::optional<double> mean;
};

class TDigest : public SubKeyScanner {
public:
using Slice = rocksdb::Slice;
Expand Down Expand Up @@ -85,6 +89,8 @@ class TDigest : public SubKeyScanner {
std::vector<double>* result);
rocksdb::Status ByRank(engine::Context& ctx, const Slice& digest_name, const std::vector<int>& inputs,
std::vector<double>* result);
rocksdb::Status TrimmedMean(engine::Context& ctx, const Slice& digest_name, double low_cut_quantile,
double high_cut_quantile, TDigestTrimmedMeanResult* result);
rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata);

private:
Expand Down
43 changes: 43 additions & 0 deletions src/types/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,46 @@
}
return Status::OK();
}

template <typename TD>
inline StatusOr<double> TDigestTrimmedMean(TD&& td, double low_cut_quantile, double high_cut_quantile) {

Check failure on line 314 in src/types/tdigest.h

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

"std::forward" is never called on this forwarding reference argument.

See more on https://sonarcloud.io/project/issues?id=apache_kvrocks&issues=AZy4BIBvvOvPLz4Gtlsh&open=AZy4BIBvvOvPLz4Gtlsh&pullRequest=3312
if (td.Size() == 0) {
return std::numeric_limits<double>::quiet_NaN();
}

const double total_weight = td.TotalWeight();
const double leftmost_weight = std::floor(total_weight * low_cut_quantile);
const double rightmost_weight = std::ceil(total_weight * high_cut_quantile);

double count_done = 0.0;
double trimmed_sum = 0.0;
double trimmed_count = 0.0;

auto iter = td.Begin();
while (iter->Valid()) {
auto centroid = GET_OR_RET(iter->GetCentroid());
const double n_weight = centroid.weight;
double count_add = n_weight;

// Keep only the portion of this centroid that overlaps with the trimmed rank range.
count_add -= std::min(std::max(0.0, leftmost_weight - count_done), count_add);
count_add = std::min(std::max(0.0, rightmost_weight - count_done), count_add);
Comment on lines +334 to +335
Copy link
Member

@LindaSummer LindaSummer Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a comment on this? It may be hard to understand at first glance.

Copy link
Contributor Author

@chakkk309 chakkk309 Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add it.


count_done += n_weight;

trimmed_sum += centroid.mean * count_add;
trimmed_count += count_add;

if (count_done >= rightmost_weight) {
break;
}

iter->Next();
}

if (trimmed_count == 0.0) {
return std::numeric_limits<double>::quiet_NaN();
}

return trimmed_sum / trimmed_count;
}
78 changes: 78 additions & 0 deletions tests/cppunit/types/tdigest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,81 @@ TEST_F(RedisTDigestTest, ByRank_And_ByRevRank) {
EXPECT_EQ(result[0], 1.0) << "Rank 0 should be minimum";
EXPECT_TRUE(std::isinf(result[3])) << "Rank >= total_weight should be infinity";
}

TEST_F(RedisTDigestTest, TrimmedMean) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add cases for invalid arguments and more unordered and complex inputs.

std::string test_digest_name = "test_digest_trimmed_mean" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<double> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
status = tdigest_->Add(*ctx_, test_digest_name, values);
ASSERT_TRUE(status.ok()) << status.ToString();

redis::TDigestTrimmedMeanResult result;
status = tdigest_->TrimmedMean(*ctx_, test_digest_name, 0.1, 0.9, &result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_TRUE(result.mean.has_value());
EXPECT_NEAR(*result.mean, 5.5, 0.01);

status = tdigest_->TrimmedMean(*ctx_, test_digest_name, 0.0, 1.0, &result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_TRUE(result.mean.has_value());
EXPECT_NEAR(*result.mean, 5.5, 0.01);

status = tdigest_->TrimmedMean(*ctx_, test_digest_name, 0.25, 0.75, &result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_TRUE(result.mean.has_value());
EXPECT_NEAR(*result.mean, 5.5, 0.01);
}

TEST_F(RedisTDigestTest, TrimmedMeanEmptyDigest) {
std::string test_digest_name = "test_digest_trimmed_mean_empty" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

redis::TDigestTrimmedMeanResult result;
status = tdigest_->TrimmedMean(*ctx_, test_digest_name, 0.1, 0.9, &result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_FALSE(result.mean.has_value());
}

TEST_F(RedisTDigestTest, TrimmedMeanUnorderedInput) {
std::string test_digest_name = "test_digest_trimmed_mean_unordered" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<double> values = {5, 2, 8, 1, 9, 3, 7, 4, 6, 10};
status = tdigest_->Add(*ctx_, test_digest_name, values);
ASSERT_TRUE(status.ok()) << status.ToString();

redis::TDigestTrimmedMeanResult result;
status = tdigest_->TrimmedMean(*ctx_, test_digest_name, 0.1, 0.9, &result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_TRUE(result.mean.has_value());
EXPECT_NEAR(*result.mean, 5.5, 0.01);
}

TEST_F(RedisTDigestTest, TrimmedMeanComplexInput) {
std::string test_digest_name = "test_digest_trimmed_mean_complex" + std::to_string(util::GetTimeStampMS());
bool exists = false;
auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
ASSERT_FALSE(exists);
ASSERT_TRUE(status.ok());

std::vector<double> values = {-10, 5, -3, 5, 0, 5, 3, -5, 10, -10};
status = tdigest_->Add(*ctx_, test_digest_name, values);
ASSERT_TRUE(status.ok()) << status.ToString();

redis::TDigestTrimmedMeanResult result;
status = tdigest_->TrimmedMean(*ctx_, test_digest_name, 0.2, 0.8, &result);
ASSERT_TRUE(status.ok()) << status.ToString();
ASSERT_TRUE(result.mean.has_value());
ASSERT_FALSE(std::isnan(*result.mean));
EXPECT_NEAR(*result.mean, 5.0 / 6.0, 0.01);
}
Loading
Loading