Skip to content

Commit 825c678

Browse files
authored
Merge pull request #394 from OlegGalizin/Params
Parameters in clickhouse-cpp
2 parents 8ad0fd4 + 4f83683 commit 825c678

File tree

6 files changed

+271
-4
lines changed

6 files changed

+271
-4
lines changed

clickhouse/base/wire_format.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <assert.h>
12
#include "wire_format.h"
23

34
#include "input.h"
@@ -6,6 +7,7 @@
67
#include "../exceptions.h"
78

89
#include <stdexcept>
10+
#include <algorithm>
911

1012
namespace {
1113
constexpr int MAX_VARINT_BYTES = 10;
@@ -99,4 +101,77 @@ bool WireFormat::SkipString(InputStream& input) {
99101
return false;
100102
}
101103

104+
inline const char* find_quoted_chars(const char* start, const char* end)
105+
{
106+
static constexpr char quoted_chars[] = {'\0', '\b', '\t', '\n', '\'', '\\'};
107+
const auto first = std::find_first_of(start, end, std::begin(quoted_chars), std::end(quoted_chars));
108+
109+
return (first == end) ? nullptr : first;
102110
}
111+
112+
void WireFormat::WriteQuotedString(OutputStream& output, std::string_view value) {
113+
auto size = value.size();
114+
const char* start = value.data();
115+
const char* end = start + size;
116+
const char* quoted_char = find_quoted_chars(start, end);
117+
if (quoted_char == nullptr) {
118+
WriteVarint64(output, size + 2);
119+
WriteAll(output, "'", 1);
120+
WriteAll(output, start, size);
121+
WriteAll(output, "'", 1);
122+
return;
123+
}
124+
125+
// calculate quoted chars count
126+
int quoted_count = 1;
127+
const char* next_quoted_char = quoted_char + 1;
128+
while ((next_quoted_char = find_quoted_chars(next_quoted_char, end))) {
129+
quoted_count++;
130+
next_quoted_char++;
131+
}
132+
WriteVarint64(output, size + 2 + 3 * quoted_count); // length
133+
134+
WriteAll(output, "'", 1);
135+
136+
do {
137+
auto write_size = quoted_char - start;
138+
WriteAll(output, start, write_size);
139+
WriteAll(output, "\\", 1);
140+
char c = quoted_char[0];
141+
switch (c) {
142+
case '\0':
143+
WriteAll(output, "x00", 3);
144+
break;
145+
case '\b':
146+
WriteAll(output, "x08", 3);
147+
break;
148+
case '\t':
149+
WriteAll(output, R"(\\t)", 3);
150+
break;
151+
case '\n':
152+
WriteAll(output, R"(\\n)", 3);
153+
break;
154+
case '\'':
155+
WriteAll(output, "x27", 3);
156+
break;
157+
case '\\':
158+
WriteAll(output, R"(\\\)", 3);
159+
break;
160+
default:
161+
break;
162+
}
163+
start = quoted_char + 1;
164+
quoted_char = find_quoted_chars(start, end);
165+
} while (quoted_char);
166+
167+
WriteAll(output, start, end - start);
168+
WriteAll(output, "'", 1);
169+
}
170+
171+
void WireFormat::WriteParamNullRepresentation(OutputStream& output) {
172+
const std::string NULL_REPRESENTATION(R"('\\N')");
173+
WriteVarint64(output, NULL_REPRESENTATION.size());
174+
WriteAll(output, NULL_REPRESENTATION.data(), NULL_REPRESENTATION.size());
175+
}
176+
177+
} // namespace clickhouse

clickhouse/base/wire_format.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ class WireFormat {
2222
static void WriteFixed(OutputStream& output, const T& value);
2323
static void WriteBytes(OutputStream& output, const void* buf, size_t len);
2424
static void WriteString(OutputStream& output, std::string_view value);
25+
static void WriteQuotedString(OutputStream& output, std::string_view value);
26+
static void WriteParamNullRepresentation(OutputStream& output);
2527
static void WriteUInt64(OutputStream& output, const uint64_t value);
2628
static void WriteVarint64(OutputStream& output, uint64_t value);
2729

clickhouse/client.cpp

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,13 @@
3838
#define DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH 54448
3939
#define DBMS_MIN_REVISION_WITH_INITIAL_QUERY_START_TIME 54449
4040
#define DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS 54451
41+
#define DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS 54453
42+
#define DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION 54454 // Client can get some fields in JSon format
43+
#define DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM 54458 // send quota key after handshake
44+
#define DBMS_MIN_PROTOCOL_REVISION_WITH_QUOTA_KEY 54458 // the same
45+
#define DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS 54459
4146

42-
#define DMBS_PROTOCOL_REVISION DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS
47+
#define DMBS_PROTOCOL_REVISION DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS
4348

4449
namespace clickhouse {
4550

@@ -433,6 +438,11 @@ bool Client::Impl::Handshake() {
433438
if (!ReceiveHello()) {
434439
return false;
435440
}
441+
442+
if (server_info_.revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM) {
443+
WireFormat::WriteString(*output_, std::string());
444+
}
445+
436446
return true;
437447
}
438448

@@ -502,7 +512,7 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
502512
return false;
503513
}
504514
}
505-
if constexpr (DMBS_PROTOCOL_REVISION >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
515+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
506516
{
507517
if (!WireFormat::ReadUInt64(*input_, &info.written_rows)) {
508518
return false;
@@ -589,7 +599,7 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
589599

590600
bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
591601
// Additional information about block.
592-
if constexpr (DMBS_PROTOCOL_REVISION >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
602+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
593603
uint64_t num;
594604
BlockInfo info;
595605

@@ -635,6 +645,16 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
635645
if (!WireFormat::ReadString(input, &type)) {
636646
return false;
637647
}
648+
649+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION) {
650+
uint8_t custom_format_len;
651+
if (!WireFormat::ReadFixed(input, &custom_format_len)) {
652+
return false;
653+
}
654+
if (custom_format_len > 0) {
655+
throw UnimplementedError(std::string("unsupported custom serialization"));
656+
}
657+
}
638658

639659
if (ColumnRef col = CreateColumnByType(type, create_column_settings)) {
640660
if (num_rows && !col->Load(&input, num_rows)) {
@@ -653,7 +673,7 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
653673
bool Client::Impl::ReceiveData() {
654674
Block block;
655675

656-
if constexpr (DMBS_PROTOCOL_REVISION >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
676+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
657677
if (!WireFormat::SkipString(*input_)) {
658678
return false;
659679
}
@@ -793,6 +813,12 @@ void Client::Impl::SendQuery(const Query& query) {
793813
throw UnimplementedError(std::string("Can't send open telemetry tracing context to a server, server version is too old"));
794814
}
795815
}
816+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS) {
817+
// replica dont supported by client
818+
WireFormat::WriteUInt64(*output_, 0);
819+
WireFormat::WriteUInt64(*output_, 0);
820+
WireFormat::WriteUInt64(*output_, 0);
821+
}
796822
}
797823

798824
/// Per query settings
@@ -817,6 +843,22 @@ void Client::Impl::SendQuery(const Query& query) {
817843
WireFormat::WriteUInt64(*output_, Stages::Complete);
818844
WireFormat::WriteUInt64(*output_, compression_);
819845
WireFormat::WriteString(*output_, query.GetText());
846+
847+
//Send params after query text
848+
if (server_info_.revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS) {
849+
for(const auto& [name, value] : query.GetParams()) {
850+
// params is like query settings
851+
WireFormat::WriteString(*output_, name);
852+
const uint64_t Custom = 2;
853+
WireFormat::WriteVarint64(*output_, Custom);
854+
if (value)
855+
WireFormat::WriteQuotedString(*output_, *value);
856+
else
857+
WireFormat::WriteParamNullRepresentation(*output_);
858+
}
859+
WireFormat::WriteString(*output_, std::string()); // empty string after last param
860+
}
861+
820862
// Send empty block as marker of
821863
// end of data
822864
SendData(Block());
@@ -842,6 +884,11 @@ void Client::Impl::WriteBlock(const Block& block, OutputStream& output) {
842884
WireFormat::WriteString(output, bi.Name());
843885
WireFormat::WriteString(output, bi.Type()->GetName());
844886

887+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION) {
888+
// TODO: custom serialization
889+
WireFormat::WriteFixed<uint8_t>(output, 0);
890+
}
891+
845892
// Empty columns are not serialized and occupy exactly 0 bytes.
846893
// ref https://github.com/ClickHouse/ClickHouse/blob/39b37a3240f74f4871c8c1679910e065af6bea19/src/Formats/NativeWriter.cpp#L163
847894
const bool containsData = block.GetRowCount() > 0;

clickhouse/query.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ struct QuerySettingsField {
2626
};
2727

2828
using QuerySettings = std::unordered_map<std::string, QuerySettingsField>;
29+
using QueryParamValue = std::optional<std::string>;
30+
using QueryParams = std::unordered_map<std::string, QueryParamValue>;
2931

3032
struct Profile {
3133
uint64_t rows = 0;
@@ -115,6 +117,18 @@ class Query : public QueryEvents {
115117
return *this;
116118
}
117119

120+
inline const QueryParams& GetParams() const { return query_params_; }
121+
122+
inline Query& SetParams(QueryParams query_params) {
123+
query_params_ = std::move(query_params);
124+
return *this;
125+
}
126+
127+
inline Query& SetParam(const std::string& name, const QueryParamValue& value) {
128+
query_params_[name] = value;
129+
return *this;
130+
}
131+
118132
inline const std::optional<open_telemetry::TracingContext>& GetTracingContext() const {
119133
return tracing_context_;
120134
}
@@ -219,6 +233,7 @@ class Query : public QueryEvents {
219233
const std::string query_id_;
220234
std::optional<open_telemetry::TracingContext> tracing_context_;
221235
QuerySettings query_settings_;
236+
QueryParams query_params_;
222237
ExceptionCallback exception_cb_;
223238
ProgressCallback progress_cb_;
224239
SelectCallback select_cb_;

tests/simple/main.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,81 @@ inline void GenericExample(Client& client) {
234234
client.Execute("DROP TEMPORARY TABLE test_client");
235235
}
236236

237+
inline void ParamExample(Client& client) {
238+
/// Create a table.
239+
client.Execute("CREATE TEMPORARY TABLE IF NOT EXISTS test_client (id UInt64, name String)");
240+
241+
{
242+
Query query("insert into test_client values ({id: UInt64}, {name: String})");
243+
244+
query.SetParam("id", "1").SetParam("name", "NAME");
245+
client.Execute(query);
246+
247+
query.SetParam("id", "123").SetParam("name", "FromParam");
248+
client.Execute(query);
249+
250+
const char FirstPrintable = ' ';
251+
char test_str1[FirstPrintable * 2 + 1];
252+
for (unsigned int i = 0; i < FirstPrintable; i++) {
253+
test_str1[i * 2] = 'A';
254+
test_str1[i * 2 + 1] = i;
255+
}
256+
test_str1[int(FirstPrintable * 2)] = 'A';
257+
258+
query.SetParam("id", "333").SetParam("name", std::string(test_str1, FirstPrintable * 2 + 1));
259+
client.Execute(query);
260+
261+
const char LastPrintable = 127;
262+
unsigned char big_string[LastPrintable - FirstPrintable];
263+
for (unsigned int i = 0; i < sizeof(big_string); i++) big_string[i] = i + FirstPrintable;
264+
query.SetParam("id", "444").SetParam("name", std::string((char*)big_string, sizeof(big_string)));
265+
client.Execute(query);
266+
267+
query.SetParam("id", "555")
268+
.SetParam("name", "utf8Русский");
269+
client.Execute(query);
270+
}
271+
272+
/// Select values inserted in the previous step.
273+
Query query ("SELECT id, name, length(name) FROM test_client where id > {a: Int32}");
274+
query.SetParam("a", "4");
275+
SelectCallback cb([](const Block& block)
276+
{
277+
std::cout << PrettyPrintBlock{block} << std::endl;
278+
});
279+
query.OnData(cb);
280+
client.Select(query);
281+
/// Delete table.
282+
client.Execute("DROP TEMPORARY TABLE test_client");
283+
}
284+
285+
inline void ParamNullExample(Client& client) {
286+
client.Execute("CREATE TEMPORARY TABLE IF NOT EXISTS test_client (id UInt64, name Nullable(String))");
287+
288+
Query query("insert into test_client values ({id: UInt64}, {name: Nullable(String)})");
289+
290+
query.SetParam("id", "123").SetParam("name", QueryParamValue());
291+
client.Execute(query);
292+
293+
query.SetParam("id", "456").SetParam("name", "String Value");
294+
client.Execute(query);
295+
296+
client.Select("SELECT id, name FROM test_client", [](const Block& block) {
297+
for (size_t c = 0; c < block.GetRowCount(); ++c) {
298+
std::cerr << block[0]->As<ColumnUInt64>()->At(c) << " ";
299+
300+
auto col_string = block[1]->As<ColumnNullable>();
301+
if (col_string->IsNull(c)) {
302+
std::cerr << "\\N\n";
303+
} else {
304+
std::cerr << col_string->Nested()->As<ColumnString>()->At(c) << "\n";
305+
}
306+
}
307+
});
308+
309+
client.Execute("DROP TEMPORARY TABLE test_client");
310+
}
311+
237312
inline void NullableExample(Client& client) {
238313
/// Create a table.
239314
client.Execute("CREATE TEMPORARY TABLE IF NOT EXISTS test_client (id Nullable(UInt64), date Nullable(Date))");
@@ -478,6 +553,8 @@ inline void IPExample(Client &client) {
478553
}
479554

480555
static void RunTests(Client& client) {
556+
ParamExample(client);
557+
ParamNullExample(client);
481558
ArrayExample(client);
482559
CancelableExample(client);
483560
DateExample(client);

0 commit comments

Comments
 (0)