Skip to content

Commit 74d1864

Browse files
authored
Merge pull request #7 from datastax/371
371 - Added request timeout to graph query options
2 parents 51147b3 + d7691bf commit 74d1864

13 files changed

Lines changed: 242 additions & 19 deletions

File tree

cpp-driver

examples/graph/graph.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include <stdarg.h>
3131
#include <stdio.h>
32+
#include <time.h>
3233
#include <stdlib.h>
3334
#include <string.h>
3435

@@ -191,8 +192,9 @@ cass_bool_t create_graph(CassSession* session, const char* name) {
191192
dse_graph_object_finish(values);
192193

193194
if (execute_graph_query(session,
194-
"system.graph(name).drop();" \
195-
"system.graph(name).ifNotExists().create()",
195+
"graph = system.graph(name);" \
196+
"if (graph.exists()) graph.drop();" \
197+
"graph.create();",
196198
NULL, values, NULL)) {
197199
for (i = 0; i < 10; ++i) {
198200
DseGraphResultSet* resultset;

include/dse.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,23 @@ DSE_EXPORT CassError
361361
dse_graph_options_set_graph_name_n(DseGraphOptions* options,
362362
const char* name, size_t name_length);
363363

364+
/**
365+
* Set the request timeout used by graph queries. Only use this if you want
366+
* graph queries to wait less than the server's default timeout (defined in
367+
* "dse.yaml")
368+
*
369+
* <b>Default:</b> 0 (wait for the coordinator to response or timeout)
370+
*
371+
* @public @memberof DseGraphOptions
372+
*
373+
* @param[in] options
374+
* @param[in] timeout_ms
375+
* @return CASS_OK if successful, otherwise an error occurred.
376+
*/
377+
DSE_EXPORT CassError
378+
dse_graph_options_set_request_timeout(DseGraphOptions* options,
379+
cass_int64_t timeout_ms);
380+
364381
/***********************************************************************************
365382
*
366383
* Graph Statement

src/graph.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include "external_types.hpp"
88

9+
#include <serialization.hpp> // cass::encode_int64()
910
#include <assert.h>
1011

1112
static const DseGraphResult* find_member(const DseGraphResult* result,
@@ -77,6 +78,13 @@ CassError dse_graph_options_set_graph_name_n(DseGraphOptions* options,
7778
return CASS_OK;
7879
}
7980

81+
CassError dse_graph_options_set_request_timeout(DseGraphOptions* options,
82+
cass_int64_t timeout_ms) {
83+
if (timeout_ms < 0) return CASS_ERROR_LIB_BAD_PARAMS;
84+
options->set_request_timeout_ms(timeout_ms);
85+
return CASS_OK;
86+
}
87+
8088
DseGraphStatement* dse_graph_statement_new(const char* query,
8189
const DseGraphOptions* options) {
8290
return dse_graph_statement_new_n(query, strlen(query),
@@ -534,6 +542,23 @@ const DseGraphResult* dse_graph_result_element(const DseGraphResult* result,
534542

535543
namespace dse {
536544

545+
void GraphOptions::set_request_timeout_ms(int64_t timeout_ms) {
546+
request_timeout_ms_ = timeout_ms;
547+
if (timeout_ms > 0) {
548+
std::string value(sizeof(timeout_ms), 0);
549+
cass::encode_int64(&value[0], timeout_ms);
550+
cass_custom_payload_set_n(payload_,
551+
DSE_GRAPH_REQUEST_TIMEOUT,
552+
sizeof(DSE_GRAPH_REQUEST_TIMEOUT) - 1,
553+
reinterpret_cast<const cass_byte_t *>(value.data()),
554+
value.size());
555+
} else {
556+
cass_custom_payload_remove_n(payload_,
557+
DSE_GRAPH_REQUEST_TIMEOUT,
558+
sizeof(DSE_GRAPH_REQUEST_TIMEOUT) - 1);
559+
}
560+
}
561+
537562
const GraphResult* GraphResultSet::next() {
538563
if (cass_iterator_next(rows_)) {
539564
const CassRow* row = cass_iterator_get_row(rows_);

src/graph.hpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#define DSE_GRAPH_OPTION_LANGUAGE_KEY "graph-language"
2020
#define DSE_GRAPH_OPTION_SOURCE_KEY "graph-source"
2121
#define DSE_GRAPH_OPTION_NAME_KEY "graph-name"
22+
#define DSE_GRAPH_REQUEST_TIMEOUT "request-timeout"
2223

2324
#define DSE_GRAPH_DEFAULT_LANGUAGE "gremlin-groovy"
2425
#define DSE_GRAPH_DEFAULT_SOURCE "g"
@@ -31,7 +32,8 @@ class GraphStatement;
3132
class GraphOptions {
3233
public:
3334
GraphOptions()
34-
: payload_(cass_custom_payload_new()) {
35+
: payload_(cass_custom_payload_new())
36+
, request_timeout_ms_(0) {
3537
set_graph_language(DSE_GRAPH_DEFAULT_LANGUAGE);
3638
set_graph_source(DSE_GRAPH_DEFAULT_SOURCE);
3739
}
@@ -60,8 +62,13 @@ class GraphOptions {
6062
reinterpret_cast<const cass_byte_t*>(graph_name.data()), graph_name.size());
6163
}
6264

65+
int64_t request_timeout_ms() const { return request_timeout_ms_; }
66+
67+
void set_request_timeout_ms(int64_t timeout_ms);
68+
6369
private:
6470
CassCustomPayload* payload_;
71+
int64_t request_timeout_ms_;
6572
};
6673

6774
class GraphWriter : private rapidjson::Writer<rapidjson::StringBuffer> {
@@ -151,9 +158,13 @@ class GraphStatement {
151158
, wrapped_(cass_statement_new_n(query, length, 0)) {
152159
if (options != NULL) {
153160
cass_statement_set_custom_payload(wrapped_, options->payload());
161+
cass_statement_set_request_timeout(wrapped_,
162+
static_cast<cass_uint64_t>(options->request_timeout_ms()));
154163
} else {
155164
GraphOptions default_options;
156165
cass_statement_set_custom_payload(wrapped_, default_options.payload());
166+
cass_statement_set_request_timeout(wrapped_,
167+
static_cast<cass_uint64_t>(default_options.request_timeout_ms()));
157168
}
158169
}
159170

tests/src/integration/dse_integration.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
#define GRAPH_CREATE \
77
"system.graph(name).option('graph.replication_config').set(replication)" \
8-
".option('graph.system_replication_config').set(replication).ifNotExists().create();"
8+
".option('graph.system_replication_config').set(replication)" \
9+
".option('graph.traversal_sources.g.evaluation_timeout').set(duration)" \
10+
".ifNotExists().create();"
911

1012
#define GRAPH_ALLOW_SCANS \
1113
"schema.config().option('graph.allow_scan').set('true')"
@@ -38,11 +40,13 @@ void DseIntegration::connect() {
3840
}
3941

4042
void DseIntegration::create_graph(const std::string& graph_name,
41-
const std::string& replication_strategy) {
43+
const std::string& replication_strategy,
44+
const std::string& duration) {
4245
// Create the graph statement using the pre-determined replication config
4346
test::driver::DseGraphObject graph_object;
4447
graph_object.add<std::string>("name", graph_name);
4548
graph_object.add<std::string>("replication", replication_strategy);
49+
graph_object.add<std::string>("duration", duration);
4650
test::driver::DseGraphStatement graph_statement(GRAPH_CREATE);
4751
graph_statement.bind(graph_object);
4852
CHECK_FAILURE;
@@ -60,6 +64,6 @@ void DseIntegration::create_graph(const std::string& graph_name,
6064
CHECK_FAILURE;
6165
}
6266

63-
void DseIntegration::create_graph() {
64-
create_graph(test_name_, replication_strategy_);
67+
void DseIntegration::create_graph(const std::string& duration /*= "PT30S"*/) {
68+
create_graph(test_name_, replication_strategy_, duration);
6569
}

tests/src/integration/dse_integration.hpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,19 +57,23 @@ class DseIntegration : public Integration {
5757
* Create the graph using the specified replication strategy
5858
*
5959
* @param graph_name Name of the graph to create
60-
* @param graph_strategy Replication strategy to apply to graph
60+
* @param replication_strategy Replication strategy to apply to graph
61+
* @param duration Maximum duration to wait for a traversal to evaluate
6162
* @see replication_factor_
6263
*/
6364
void DseIntegration::create_graph(const std::string& graph_name,
64-
const std::string& replication_strategy);
65+
const std::string& replication_strategy,
66+
const std::string& duration);
6567

6668
/**
6769
* Create the graph using the test name and default replication strategy
6870
*
71+
* @param duration Maximum duration to wait for a traversal to evaluate
72+
* (default: PT30S; 30 seconds)
6973
* @see test_name_
7074
* @see replication_strategy_
7175
*/
72-
void DseIntegration::create_graph();
76+
void DseIntegration::create_graph(const std::string& duration = "PT30S");
7377
};
7478

7579
#endif //__DSE_INTEGRATION_HPP__

tests/src/integration/integration.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ Integration::Integration()
4444
, is_ccm_start_requested_(true)
4545
, is_session_requested_(true)
4646
, dse_workload_(CCM::DSE_WORKLOAD_CASSANDRA)
47-
, create_keyspace_query_("") {
47+
, create_keyspace_query_("")
48+
, start_time_(0ull) {
4849
// Get the name of the test and the case/suite it belongs to
4950
const testing::TestInfo* test_information = testing::UnitTest::GetInstance()->current_test_info();
5051
test_name_ = test_information->name();

tests/src/integration/integration.hpp

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,36 @@ class Integration : public testing::Test {
218218
*/
219219
std::string format_string(const char* format, ...) const;
220220

221+
/**
222+
* Calculate the elapsed time in milliseconds
223+
*
224+
* @return Elapsed time in milliseconds
225+
*/
226+
inline uint64_t elapsed_time() {
227+
if (start_time_ > 0) {
228+
return (uv_hrtime() - start_time_) / 1000000UL;
229+
}
230+
return 0;
231+
}
232+
233+
/**
234+
* Start the timer to calculate the elapsed time
235+
*/
236+
inline void start_timer() {
237+
start_time_ = uv_hrtime();
238+
}
239+
240+
/**
241+
* Stop the timer - Calculate the elapsed time and reset the timer
242+
*
243+
* @return Elapsed time in milliseconds
244+
*/
245+
inline uint64_t stop_timer() {
246+
uint64_t duration = elapsed_time();
247+
start_time_ = 0ull;
248+
return duration;
249+
}
250+
221251
protected:
222252
/**
223253
* Get the current working directory
@@ -228,6 +258,18 @@ class Integration : public testing::Test {
228258
return Utils::cwd();
229259
}
230260

261+
/**
262+
* Determine if a string contains another string
263+
*
264+
* @param input String being evaluated
265+
* @param search String to find
266+
* @return True if string is contained in other string; false otherwise
267+
*/
268+
inline static bool contains(const std::string& input,
269+
const std::string& search) {
270+
return Utils::contains(input, search);
271+
}
272+
231273
/**
232274
* Split a string into an array/vector
233275
*
@@ -247,7 +289,7 @@ class Integration : public testing::Test {
247289
* @return True if file exists; false otherwise
248290
*/
249291
inline static bool file_exists(const std::string& filename) {
250-
return test::Utils::file_exists(filename);
292+
return Utils::file_exists(filename);
251293
}
252294

253295
/**
@@ -259,7 +301,7 @@ class Integration : public testing::Test {
259301
*/
260302
inline static std::string implode(const std::vector<std::string>& elements,
261303
const char delimiter = ' ') {
262-
return test::Utils::implode(elements, delimiter);
304+
return Utils::implode(elements, delimiter);
263305
}
264306

265307
/**
@@ -268,7 +310,7 @@ class Integration : public testing::Test {
268310
* @param path Directory/Path to create
269311
*/
270312
inline static void mkdir(const std::string& path) {
271-
test::Utils::mkdir(path);
313+
Utils::mkdir(path);
272314
}
273315

274316
/**
@@ -277,7 +319,7 @@ class Integration : public testing::Test {
277319
* @param milliseconds Time in milliseconds to sleep
278320
*/
279321
inline static void msleep(unsigned int milliseconds) {
280-
test::Utils::msleep(milliseconds);
322+
Utils::msleep(milliseconds);
281323
}
282324

283325
/**
@@ -290,7 +332,7 @@ class Integration : public testing::Test {
290332
*/
291333
inline static std::string replace_all(const std::string& input,
292334
const std::string& from, const std::string& to) {
293-
return test::Utils::replace_all(input, from, to);
335+
return Utils::replace_all(input, from, to);
294336
}
295337

296338
/**
@@ -299,7 +341,7 @@ class Integration : public testing::Test {
299341
* @param input String to convert to lowercase
300342
*/
301343
inline static std::string to_lower(const std::string& input) {
302-
return test::Utils::to_lower(input);
344+
return Utils::to_lower(input);
303345
}
304346

305347
/**
@@ -309,14 +351,18 @@ class Integration : public testing::Test {
309351
* @return Trimmed string
310352
*/
311353
inline static std::string trim(const std::string& input) {
312-
return test::Utils::trim(input);
354+
return Utils::trim(input);
313355
}
314356

315357
private:
316358
/**
317359
* Keyspace creation query (generated via SetUp)
318360
*/
319361
std::string create_keyspace_query_;
362+
/**
363+
* High-resolution real time when the timer was started (in nanoseconds)
364+
*/
365+
uint64_t start_time_;
320366
};
321367

322368
#endif //__INTEGRATION_HPP__

tests/src/integration/objects/dse_graph_options.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,15 @@ class DseGraphOptions : public Object< ::DseGraphOptions, dse_graph_options_free
6666
void set_name(const std::string& name) {
6767
ASSERT_EQ(CASS_OK, dse_graph_options_set_graph_name(get(), name.c_str()));
6868
}
69+
70+
/**
71+
* Set the graph timeout to use when applied to a DSE graph statement
72+
*
73+
* @param name Graph timeout (in milliseconds) to apply
74+
*/
75+
void set_timeout(cass_int64_t timemout) {
76+
ASSERT_EQ(CASS_OK, dse_graph_options_set_request_timeout(get(), timemout));
77+
}
6978
};
7079

7180
} // namespace driver

0 commit comments

Comments
 (0)