Skip to content

Commit 2ddb163

Browse files
authored
Support DistinctColumns pushdown at the datashard side (required for #24390) (#30183)
1 parent 58fb875 commit 2ddb163

File tree

3 files changed

+78
-33
lines changed

3 files changed

+78
-33
lines changed

ydb/core/protos/kqp.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,7 @@ message TReadVectorTopK {
877877
optional Ydb.Table.VectorIndexSettings Settings = 2;
878878
optional string TargetVector = 3;
879879
optional uint32 Limit = 4;
880+
repeated uint32 DistinctColumns = 5;
880881
}
881882

882883
message TKqpStreamLookupSettings {

ydb/core/tx/datashard/datashard__read_iterator.cpp

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ using namespace NTabletFlatExecutor;
2626
struct TReadIteratorVectorTopItem {
2727
TOwnedCellVec Row;
2828
double Distance = 0;
29+
TString UniqueKey;
2930

30-
TReadIteratorVectorTopItem(TArrayRef<const TCell> cells, double distance):
31+
TReadIteratorVectorTopItem(TArrayRef<const TCell> cells, double distance, TString&& uniqueKey):
3132
Row(TOwnedCellVec(cells)),
32-
Distance(distance) {
33+
Distance(distance),
34+
UniqueKey(std::move(uniqueKey)) {
3335
}
3436

3537
bool operator<(const TReadIteratorVectorTopItem& rhs) const {
@@ -42,25 +44,48 @@ struct TReadIteratorVectorTop {
4244
ui32 Limit = 0;
4345
TString Target;
4446
std::unique_ptr<NKMeans::IClusters> KMeans;
47+
std::vector<ui32> DistinctColumns;
48+
49+
std::unordered_set<TString> UniqueKeys;
4550
std::vector<TReadIteratorVectorTopItem> Rows;
4651
ui64 TotalReadRows = 0;
4752
ui64 TotalReadBytes = 0;
4853

4954
void AddRow(TConstArrayRef<TCell> cells) {
55+
TotalReadRows++;
56+
TotalReadBytes += EstimateSize(cells);
57+
TString serializedKey;
58+
if (DistinctColumns.size()) {
59+
TVector<TCell> key;
60+
for (auto colIdx: DistinctColumns) {
61+
key.push_back(cells.at(colIdx));
62+
}
63+
serializedKey = TSerializedCellVec::Serialize(key);
64+
if (UniqueKeys.contains(serializedKey)) {
65+
return;
66+
}
67+
}
5068
const auto embedding = cells.at(Column).AsBuf();
5169
if (!KMeans->IsExpectedFormat(embedding)) {
5270
return;
5371
}
54-
TotalReadRows++;
55-
TotalReadBytes += EstimateSize(cells);
5672
double distance = KMeans->CalcDistance(embedding, Target);
5773
if (Rows.size() < Limit) {
58-
Rows.emplace_back(cells, distance);
74+
if (DistinctColumns.size()) {
75+
UniqueKeys.insert(serializedKey);
76+
}
77+
Rows.emplace_back(cells, distance, std::move(serializedKey));
5978
std::push_heap(Rows.begin(), Rows.end());
6079
} else if (distance < Rows.front().Distance) {
61-
Rows.emplace_back(cells, distance);
80+
if (DistinctColumns.size()) {
81+
UniqueKeys.insert(serializedKey);
82+
}
83+
Rows.emplace_back(cells, distance, std::move(serializedKey));
6284
std::push_heap(Rows.begin(), Rows.end());
6385
std::pop_heap(Rows.begin(), Rows.end());
86+
if (DistinctColumns.size()) {
87+
UniqueKeys.erase(Rows.back().UniqueKey);
88+
}
6489
Rows.pop_back();
6590
}
6691
}
@@ -2221,6 +2246,12 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation {
22212246
error = "Target vector has invalid format";
22222247
}
22232248
}
2249+
for (auto& colIdx: topK.GetDistinctColumns()) {
2250+
if (colIdx >= record.ColumnsSize()) {
2251+
error = TStringBuilder() << "Too large unique column index: " << colIdx;
2252+
}
2253+
topState->DistinctColumns.push_back(colIdx);
2254+
}
22242255
if (error != "") {
22252256
SetStatusError(Result->Record, Ydb::StatusIds::BAD_REQUEST, TStringBuilder()
22262257
<< error << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")");

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5840,6 +5840,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorVectorTopK) {
58405840
Y_UNIT_TEST(BadRequest) {
58415841
TTestHelper helper;
58425842
TVector<TShardedTableOptions::TColumn> columns = {
5843+
{"parent", "Uint32", true, false},
58435844
{"key", "Uint32", true, false},
58445845
{"emb", "String", false, false}};
58455846
helper.CreateCustomTable("table-vector", columns);
@@ -5848,9 +5849,10 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorVectorTopK) {
58485849
auto request1 = helper.GetBaseReadRequest("table-vector", 1, NKikimrDataEvents::FORMAT_CELLVEC);
58495850
AddRangeQuery<ui32>(*request1, { 1 }, true, { 20 }, true);
58505851
auto topK = request1->Record.MutableVectorTopK();
5851-
topK->SetColumn(1);
5852+
topK->SetColumn(2);
58525853
topK->SetTargetVector("\xE4\x16\x02");
58535854
topK->SetLimit(3);
5855+
topK->AddDistinctColumns(1);
58545856
auto idx = topK->MutableSettings();
58555857
idx->set_metric(Ydb::Table::VectorIndexSettings::DISTANCE_COSINE);
58565858
idx->set_vector_type(Ydb::Table::VectorIndexSettings::VECTOR_TYPE_UINT8);
@@ -5866,7 +5868,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorVectorTopK) {
58665868
UNIT_ASSERT(readResult1->Record.GetStatus().GetIssues(0).message().Contains("limit is 0"));
58675869

58685870
request1 = createRequest();
5869-
request1->Record.MutableVectorTopK()->SetColumn(2);
5871+
request1->Record.MutableVectorTopK()->SetColumn(3);
58705872
readResult1 = helper.SendRead("table-vector", request1.release());
58715873
UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_REQUEST);
58725874
UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.GetStatus().IssuesSize(), 1);
@@ -5892,57 +5894,68 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorVectorTopK) {
58925894
UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_REQUEST);
58935895
UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.GetStatus().IssuesSize(), 1);
58945896
UNIT_ASSERT(readResult1->Record.GetStatus().GetIssues(0).message().Contains("either distance or similarity"));
5897+
5898+
request1 = createRequest();
5899+
request1->Record.MutableVectorTopK()->AddDistinctColumns(3);
5900+
readResult1 = helper.SendRead("table-vector", request1.release());
5901+
UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.GetStatus().GetCode(), Ydb::StatusIds::BAD_REQUEST);
5902+
UNIT_ASSERT_VALUES_EQUAL(readResult1->Record.GetStatus().IssuesSize(), 1);
5903+
UNIT_ASSERT(readResult1->Record.GetStatus().GetIssues(0).message().Contains("Too large unique column index"));
58955904
}
58965905

58975906
Y_UNIT_TEST_TWIN(Simple, Batch) {
58985907
TTestHelper helper;
58995908
TVector<TShardedTableOptions::TColumn> columns = {
5909+
{"parent", "Uint32", true, false},
59005910
{"key", "Uint32", true, false},
59015911
{"emb", "String", false, false}};
59025912
helper.CreateCustomTable("table-vector", columns);
59035913

59045914
// Insert initial data
5905-
ExecSQL(helper.Server, helper.Sender, R"(UPSERT INTO `/Root/table-vector` (key, emb) VALUES
5906-
( 1, "\x00\xFF\x02"),
5907-
( 2, "\x10\xF0\x02"),
5908-
( 3, "\x20\xE0\x02"),
5909-
( 4, "\x30\xD0\x02"),
5910-
( 5, "\x40\xC0\x02"),
5911-
( 6, "\x50\xB0\x02"),
5912-
( 7, "\x60\xA0\x02"),
5913-
( 8, "\x70\x90\x02"),
5914-
( 9, "\x80\x80\x02"),
5915-
(10, "\x90\x70\x02"),
5916-
(11, "\xA0\x60\x02"),
5917-
(12, "\xB0\x50\x02"),
5918-
(13, "\xC0\x40\x02"),
5919-
(14, "\xD0\x30\x02"),
5920-
(15, "\xE0\x20\x02"),
5921-
(16, "\xF0\x10\x02"),
5922-
(17, "\xFF\x00\x02");)");
5915+
ExecSQL(helper.Server, helper.Sender, R"(UPSERT INTO `/Root/table-vector` (parent, key, emb) VALUES
5916+
(1, 1, "\x00\xFF\x02"),
5917+
(1, 2, "\x10\xF0\x02"),
5918+
(1, 3, "\x20\xE0\x02"),
5919+
(1, 4, "\x30\xD0\x02"),
5920+
(1, 5, "\x40\xC0\x02"),
5921+
(1, 6, "\x50\xB0\x02"),
5922+
(1, 7, "\x60\xA0\x02"),
5923+
(1, 8, "\x70\x90\x02"),
5924+
(1, 9, "\x80\x80\x02"),
5925+
(1, 10, "\x90\x70\x02"),
5926+
(1, 11, "\xA0\x60\x02"),
5927+
(1, 12, "\xB0\x50\x02"),
5928+
(1, 13, "\xC0\x40\x02"),
5929+
(1, 14, "\xD0\x30\x02"),
5930+
(1, 15, "\xE0\x20\x02"),
5931+
(1, 16, "\xF0\x10\x02"),
5932+
(2, 16, "\xF0\x10\x02"),
5933+
(1, 17, "\xFF\x00\x02");)");
59235934

59245935
auto request1 = helper.GetBaseReadRequest("table-vector", 1, NKikimrDataEvents::FORMAT_CELLVEC);
59255936
if (Batch) {
59265937
request1->Record.SetHints(TEvDataShard::TEvRead::HINT_BATCH);
59275938
}
5928-
AddRangeQuery<ui32>(*request1, { 1 }, true, { 20 }, true);
5939+
AddRangeQuery<ui32>(*request1, { 1, 1 }, true, { 2, 20 }, true);
59295940
auto topK = request1->Record.MutableVectorTopK();
5930-
topK->SetColumn(1);
5941+
topK->SetColumn(2);
59315942
topK->SetTargetVector("\xE4\x16\x02");
59325943
topK->SetLimit(3);
5944+
topK->AddDistinctColumns(1);
59335945
auto idx = topK->MutableSettings();
59345946
idx->set_metric(Ydb::Table::VectorIndexSettings::DISTANCE_COSINE);
59355947
idx->set_vector_type(Ydb::Table::VectorIndexSettings::VECTOR_TYPE_UINT8);
59365948
idx->set_vector_dimension(2);
59375949
auto readResult1 = helper.SendRead("table-vector", request1.release());
59385950
UNIT_ASSERT(readResult1->Record.GetFinished());
5939-
UNIT_ASSERT(readResult1->Record.GetStats().GetRows() == 17);
5940-
UNIT_ASSERT(readResult1->Record.GetStats().GetBytes() == 544);
5951+
UNIT_ASSERT_C(readResult1->Record.GetStats().GetRows() == 18, TStringBuilder() << "Rows: " << readResult1->Record.GetStats().GetRows());
5952+
UNIT_ASSERT_C(readResult1->Record.GetStats().GetBytes() == 864, TStringBuilder() << "Bytes: " << readResult1->Record.GetStats().GetBytes());
59415953
CheckResult(helper.Tables.at("table-vector").UserTable, *readResult1, {
5942-
{TCell::Make(16), TCell("\xF0\x10\x02", 3)},
5943-
{TCell::Make(15), TCell("\xE0\x20\x02", 3)},
5944-
{TCell::Make(17), TCell("\xFF\x00\x02", 3)},
5954+
{TCell::Make(1), TCell::Make(16), TCell("\xF0\x10\x02", 3)},
5955+
{TCell::Make(1), TCell::Make(15), TCell("\xE0\x20\x02", 3)},
5956+
{TCell::Make(1), TCell::Make(17), TCell("\xFF\x00\x02", 3)},
59455957
}, {
5958+
NScheme::TTypeInfo(NScheme::NTypeIds::Uint32),
59465959
NScheme::TTypeInfo(NScheme::NTypeIds::Uint32),
59475960
NScheme::TTypeInfo(NScheme::NTypeIds::String)
59485961
});

0 commit comments

Comments
 (0)