From 399022017c906fa5c843fbe0fd59509a141eac8d Mon Sep 17 00:00:00 2001 From: Gareth Lloyd Date: Thu, 29 Jun 2023 16:29:37 +0100 Subject: [PATCH 1/3] WIP --- src/interactive.cpp | 10 ++- src/utils/utils.cpp | 145 +++++++++++++++++++++++++++----------------- src/utils/utils.hpp | 8 +++ 3 files changed, 106 insertions(+), 57 deletions(-) diff --git a/src/interactive.cpp b/src/interactive.cpp index 1a6688f9..980e4984 100644 --- a/src/interactive.cpp +++ b/src/interactive.cpp @@ -113,12 +113,16 @@ int Run(const utils::bolt::Config &bolt_config, const std::string &history, bool } try { - auto ret = query::ExecuteQuery(session.get(), query->query); - if (ret.records.size() > 0) { + auto ret = query::QueryResult{}; + auto handler = build_handler(ret, session.get()); + auto wall_time = query::ExecuteQueryEx(session.get(), query->query, std::move(handler)); + ret.wall_time = wall_time; + if (!ret.records.empty()) { + // HERE Output(ret.header, ret.records, output_opts, csv_opts); } std::string summary; - if (ret.records.size() == 0) { + if (ret.records.empty()) { summary = "Empty set"; } else if (ret.records.size() == 1) { summary = std::to_string(ret.records.size()) + " row in set"; diff --git a/src/utils/utils.cpp b/src/utils/utils.cpp index 2c4ef5c5..abf22dba 100644 --- a/src/utils/utils.cpp +++ b/src/utils/utils.cpp @@ -524,6 +524,80 @@ std::map ParseNotifications(const mg_value *mg_notific double ParseFloat(const mg_value *mg_val_float) { return mg_value_float(mg_val_float); } +auto build_handler_impl(query::QueryResult &ret, mg_session *session) { + enum states { + START, + RECEIVING, + DONE, + }; + return [session, state = states::START, &ret](int status, mg_result *result) mutable { + switch (status) { + case 0: + state = states::DONE; + break; + case 1: + break; + default: + if (mg_session_status(session) == MG_SESSION_BAD) { + throw utils::ClientFatalException(mg_session_error(session)); + } else { + throw utils::ClientQueryException(mg_session_error(session)); + } + } + + switch (state) { + case states::START: { + const mg_list *header = mg_result_columns(result); + for (uint32_t i = 0; i < mg_list_size(header); ++i) { + const mg_value *field = mg_list_at(header, i); + if (mg_value_get_type(field) == MG_VALUE_TYPE_STRING) { + ret.header.emplace_back(mg_string_data(mg_value_string(field)), mg_string_size(mg_value_string(field))); + } else { + std::stringstream field_stream; + utils::PrintValue(field_stream, field); + ret.header.push_back(field_stream.str()); + } + } + } + state = states::RECEIVING; + [[fallthrough]]; + case states::RECEIVING: + ret.records.push_back(mg_memory::MakeCustomUnique(mg_list_copy(mg_result_row(result)))); + if (!ret.records.back()) { + std::cerr << "out of memory"; + std::abort(); + } + // track stats + break; + case states::DONE: + // build stats + const mg_map *summary = mg_result_summary(result); + if (summary && mg_map_size(summary) > 0) { + { + std::map execution_info; + for (auto key : {"cost_estimate", "parsing_time", "planning_time", "plan_execution_time"}) { + if (const mg_value *info = mg_map_at(summary, key); info) { + execution_info.emplace(key, ParseFloat(info)); + } + } + if (!execution_info.empty()) { + ret.execution_info = execution_info; + } + } + + if (const mg_value *mg_stats = mg_map_at(summary, "stats"); mg_stats) { + ret.stats.emplace(ParseStats(mg_stats)); + } + if (const mg_value *mg_notifications = mg_map_at(summary, "notifications"); mg_notifications) { + ret.notification.emplace(ParseNotifications(mg_notifications)); + } + } + return false; + } + return true; + }; +} + } // namespace namespace console { @@ -850,7 +924,13 @@ void PrintQueryInfo(const Query &query) { std::cout << "line: " << query.line_number << " index: " << query.index << " query: " << query.query << std::endl; } -QueryResult ExecuteQuery(mg_session *session, const std::string &query) { +auto build_handler(query::QueryResult &ret, mg_session *session) -> std::function{ + return build_handler_impl(ret,session); +} + + +std::chrono::duration ExecuteQueryEx(mg_session *session, const std::string &query, + std::function &&result_handler) { int status = mg_session_run(session, query.c_str(), nullptr, nullptr, nullptr, nullptr); auto start = std::chrono::system_clock::now(); if (status != 0) { @@ -881,62 +961,13 @@ QueryResult ExecuteQuery(mg_session *session, const std::string &query) { } } - QueryResult ret; mg_result *result; - while ((status = mg_session_fetch(session, &result)) == 1) { - ret.records.push_back(mg_memory::MakeCustomUnique(mg_list_copy(mg_result_row(result)))); - if (!ret.records.back()) { - std::cerr << "out of memory"; - std::abort(); - } - } - if (status != 0) { - if (mg_session_status(session) == MG_SESSION_BAD) { - throw utils::ClientFatalException(mg_session_error(session)); - } else { - throw utils::ClientQueryException(mg_session_error(session)); - } - } - { - const mg_list *header = mg_result_columns(result); - for (uint32_t i = 0; i < mg_list_size(header); ++i) { - const mg_value *field = mg_list_at(header, i); - if (mg_value_get_type(field) == MG_VALUE_TYPE_STRING) { - ret.header.push_back( - std::string(mg_string_data(mg_value_string(field)), mg_string_size(mg_value_string(field)))); - } else { - std::stringstream field_stream; - utils::PrintValue(field_stream, field); - ret.header.push_back(field_stream.str()); - } - } + while (true) { + status = mg_session_fetch(session, &result); + if (!result_handler(status, result)) break; } - - const mg_map *summary = mg_result_summary(result); - if (summary && mg_map_size(summary) > 0) { - { - std::map execution_info; - for (auto key : {"cost_estimate", "parsing_time", "planning_time", "plan_execution_time"}) { - if (const mg_value *info = mg_map_at(summary, key); info) { - execution_info.emplace(key, ParseFloat(info)); - } - } - if (!execution_info.empty()) { - ret.execution_info = execution_info; - } - } - - if (const mg_value *mg_stats = mg_map_at(summary, "stats"); mg_stats) { - ret.stats.emplace(ParseStats(mg_stats)); - } - if (const mg_value *mg_notifications = mg_map_at(summary, "notifications"); mg_notifications) { - ret.notification.emplace(ParseNotifications(mg_notifications)); - } - } - - ret.wall_time = std::chrono::system_clock::now() - start; - return ret; + return std::chrono::system_clock::now() - start; } void PrintBatchesInfo(const std::vector &batches) { @@ -989,6 +1020,12 @@ BatchResult ExecuteBatch(mg_session *session, const Batch &batch) { } return BatchResult{.is_executed = true}; } +QueryResult ExecuteQuery(mg_session *session, const std::string &query) { + auto ret = query::QueryResult{}; + auto handler = build_handler(ret, session); + ret.wall_time = ExecuteQueryEx(session, query, std::move(handler)); + return ret; +} } // namespace query diff --git a/src/utils/utils.hpp b/src/utils/utils.hpp index 2ed8f7e4..76cbf8ce 100644 --- a/src/utils/utils.hpp +++ b/src/utils/utils.hpp @@ -29,6 +29,9 @@ #endif /* _WIN32 */ #include "mgclient.h" + +#include + #include "replxx.h" #include "query_type.hpp" @@ -321,6 +324,11 @@ struct BatchResult { // The extra part is perserved for the next GetQuery call std::optional GetQuery(Replxx *replxx_instance, bool collect_info = false); + +auto build_handler(query::QueryResult &ret, mg_session *session) -> std::function; + +std::chrono::duration ExecuteQueryEx(mg_session *session, const std::string &query, std::function &&result_handler); + QueryResult ExecuteQuery(mg_session *session, const std::string &query); BatchResult ExecuteBatch(mg_session *session, const Batch &batch); From 2080e70cfddf894c3d309b3ec21d5c6af216ca1f Mon Sep 17 00:00:00 2001 From: Gareth Lloyd Date: Thu, 29 Jun 2023 18:12:18 +0100 Subject: [PATCH 2/3] WIP --- src/utils/utils.cpp | 113 ++++++++++++++++++++++++-------------------- 1 file changed, 62 insertions(+), 51 deletions(-) diff --git a/src/utils/utils.cpp b/src/utils/utils.cpp index abf22dba..de5af76b 100644 --- a/src/utils/utils.cpp +++ b/src/utils/utils.cpp @@ -524,6 +524,50 @@ std::map ParseNotifications(const mg_value *mg_notific double ParseFloat(const mg_value *mg_val_float) { return mg_value_float(mg_val_float); } +void process_headers(query::QueryResult &ret, const mg_list *header) { + for (uint32_t i = 0; i < mg_list_size(header); ++i) { + const mg_value *field = mg_list_at(header, i); + if (mg_value_get_type(field) == MG_VALUE_TYPE_STRING) { + ret.header.emplace_back(mg_string_data(mg_value_string(field)), mg_string_size(mg_value_string(field))); + } else { + std::stringstream field_stream; + utils::PrintValue(field_stream, field); + ret.header.push_back(field_stream.str()); + } + } +} + +void process_row(query::QueryResult &ret, const mg_list *row) { + ret.records.push_back(mg_memory::MakeCustomUnique(mg_list_copy(row))); + if (!ret.records.back()) { + std::cerr << "out of memory"; + abort(); + } +} + +void process_summary(query::QueryResult &ret, const mg_map *summary) { + if (summary && mg_map_size(summary) > 0) { + { + std::map execution_info; + for (auto key : {"cost_estimate", "parsing_time", "planning_time", "plan_execution_time"}) { + if (const mg_value *info = mg_map_at(summary, key); info) { + execution_info.emplace(key, ParseFloat(info)); + } + } + if (!execution_info.empty()) { + ret.execution_info = execution_info; + } + } + + if (const mg_value *mg_stats = mg_map_at(summary, "stats"); mg_stats) { + ret.stats.emplace(ParseStats(mg_stats)); + } + if (const mg_value *mg_notifications = mg_map_at(summary, "notifications"); mg_notifications) { + ret.notification.emplace(ParseNotifications(mg_notifications)); + } + } +} + auto build_handler_impl(query::QueryResult &ret, mg_session *session) { enum states { START, @@ -548,53 +592,21 @@ auto build_handler_impl(query::QueryResult &ret, mg_session *session) { switch (state) { case states::START: { const mg_list *header = mg_result_columns(result); - for (uint32_t i = 0; i < mg_list_size(header); ++i) { - const mg_value *field = mg_list_at(header, i); - if (mg_value_get_type(field) == MG_VALUE_TYPE_STRING) { - ret.header.emplace_back(mg_string_data(mg_value_string(field)), mg_string_size(mg_value_string(field))); - } else { - std::stringstream field_stream; - utils::PrintValue(field_stream, field); - ret.header.push_back(field_stream.str()); - } - } - } + process_headers(ret, header); state = states::RECEIVING; [[fallthrough]]; - case states::RECEIVING: - ret.records.push_back(mg_memory::MakeCustomUnique(mg_list_copy(mg_result_row(result)))); - if (!ret.records.back()) { - std::cerr << "out of memory"; - std::abort(); - } - // track stats - break; - case states::DONE: - // build stats + } + case states::RECEIVING: { + const mg_list *row = mg_result_row(result); + process_row(ret, row); + return true; + } + case states::DONE: { const mg_map *summary = mg_result_summary(result); - if (summary && mg_map_size(summary) > 0) { - { - std::map execution_info; - for (auto key : {"cost_estimate", "parsing_time", "planning_time", "plan_execution_time"}) { - if (const mg_value *info = mg_map_at(summary, key); info) { - execution_info.emplace(key, ParseFloat(info)); - } - } - if (!execution_info.empty()) { - ret.execution_info = execution_info; - } - } - - if (const mg_value *mg_stats = mg_map_at(summary, "stats"); mg_stats) { - ret.stats.emplace(ParseStats(mg_stats)); - } - if (const mg_value *mg_notifications = mg_map_at(summary, "notifications"); mg_notifications) { - ret.notification.emplace(ParseNotifications(mg_notifications)); - } - } + process_summary(ret, summary); return false; + } } - return true; }; } @@ -924,13 +936,12 @@ void PrintQueryInfo(const Query &query) { std::cout << "line: " << query.line_number << " index: " << query.index << " query: " << query.query << std::endl; } -auto build_handler(query::QueryResult &ret, mg_session *session) -> std::function{ - return build_handler_impl(ret,session); +auto build_handler(query::QueryResult &ret, mg_session *session) -> std::function { + return build_handler_impl(ret, session); } - std::chrono::duration ExecuteQueryEx(mg_session *session, const std::string &query, - std::function &&result_handler) { + std::function &&result_handler) { int status = mg_session_run(session, query.c_str(), nullptr, nullptr, nullptr, nullptr); auto start = std::chrono::system_clock::now(); if (status != 0) { @@ -967,7 +978,7 @@ std::chrono::duration ExecuteQueryEx(mg_session *session, const std::str status = mg_session_fetch(session, &result); if (!result_handler(status, result)) break; } - return std::chrono::system_clock::now() - start; + return std::chrono::system_clock::now() - start; } void PrintBatchesInfo(const std::vector &batches) { @@ -1021,10 +1032,10 @@ BatchResult ExecuteBatch(mg_session *session, const Batch &batch) { return BatchResult{.is_executed = true}; } QueryResult ExecuteQuery(mg_session *session, const std::string &query) { - auto ret = query::QueryResult{}; - auto handler = build_handler(ret, session); - ret.wall_time = ExecuteQueryEx(session, query, std::move(handler)); - return ret; + auto ret = query::QueryResult{}; + auto handler = build_handler(ret, session); + ret.wall_time = ExecuteQueryEx(session, query, std::move(handler)); + return ret; } } // namespace query From 017e8fc98ec0b98cc4b2bc68b8d24b22eadafd39 Mon Sep 17 00:00:00 2001 From: Gareth Lloyd Date: Tue, 4 Jul 2023 16:56:57 +0100 Subject: [PATCH 3/3] WIP --- src/interactive.cpp | 17 +++- src/utils/utils.cpp | 187 ++++++++++++++++++++++++-------------------- src/utils/utils.hpp | 16 +++- 3 files changed, 130 insertions(+), 90 deletions(-) diff --git a/src/interactive.cpp b/src/interactive.cpp index 980e4984..27cf74d5 100644 --- a/src/interactive.cpp +++ b/src/interactive.cpp @@ -25,6 +25,15 @@ namespace mode::interactive { using namespace std::string_literals; + +//struct InteractiveQueryProcessor : query::QueryProcessor { +// void process_header(const mg_list *header) override {} +// void process_row(const mg_list *row) override {} +// void process_summary(const mg_map *summary) override {} +//}; + + + int Run(const utils::bolt::Config &bolt_config, const std::string &history, bool no_history, bool verbose_execution_info, const format::CsvOptions &csv_opts, const format::OutputOptions &output_opts) { Replxx *replxx_instance = InitAndSetupReplxx(); @@ -113,10 +122,10 @@ int Run(const utils::bolt::Config &bolt_config, const std::string &history, bool } try { - auto ret = query::QueryResult{}; - auto handler = build_handler(ret, session.get()); - auto wall_time = query::ExecuteQueryEx(session.get(), query->query, std::move(handler)); - ret.wall_time = wall_time; + //TODO: CSV Processor and CSV do not need to wait for all results +// auto processor = InteractiveQueryProcessor{}; + auto ret = query::ExecuteQuery/*Ex*/(session.get(), query->query); + if (!ret.records.empty()) { // HERE Output(ret.header, ret.records, output_opts, csv_opts); diff --git a/src/utils/utils.cpp b/src/utils/utils.cpp index de5af76b..5ae6ac0a 100644 --- a/src/utils/utils.cpp +++ b/src/utils/utils.cpp @@ -524,91 +524,75 @@ std::map ParseNotifications(const mg_value *mg_notific double ParseFloat(const mg_value *mg_val_float) { return mg_value_float(mg_val_float); } -void process_headers(query::QueryResult &ret, const mg_list *header) { - for (uint32_t i = 0; i < mg_list_size(header); ++i) { - const mg_value *field = mg_list_at(header, i); - if (mg_value_get_type(field) == MG_VALUE_TYPE_STRING) { - ret.header.emplace_back(mg_string_data(mg_value_string(field)), mg_string_size(mg_value_string(field))); - } else { - std::stringstream field_stream; - utils::PrintValue(field_stream, field); - ret.header.push_back(field_stream.str()); - } - } -} -void process_row(query::QueryResult &ret, const mg_list *row) { - ret.records.push_back(mg_memory::MakeCustomUnique(mg_list_copy(row))); - if (!ret.records.back()) { - std::cerr << "out of memory"; - abort(); - } -} -void process_summary(query::QueryResult &ret, const mg_map *summary) { - if (summary && mg_map_size(summary) > 0) { - { - std::map execution_info; - for (auto key : {"cost_estimate", "parsing_time", "planning_time", "plan_execution_time"}) { - if (const mg_value *info = mg_map_at(summary, key); info) { - execution_info.emplace(key, ParseFloat(info)); - } - } - if (!execution_info.empty()) { - ret.execution_info = execution_info; + +struct QueryResultProcessor : query::QueryProcessor { + QueryResultProcessor() : start{std::chrono::system_clock::now()} {} + + void process_header(mg_list const *header) override { + for (uint32_t i = 0; i < mg_list_size(header); ++i) { + const mg_value *field = mg_list_at(header, i); + if (mg_value_get_type(field) == MG_VALUE_TYPE_STRING) { + result.header.emplace_back(mg_string_data(mg_value_string(field)), mg_string_size(mg_value_string(field))); + } else { + std::stringstream field_stream; + utils::PrintValue(field_stream, field); + result.header.push_back(field_stream.str()); } } + } - if (const mg_value *mg_stats = mg_map_at(summary, "stats"); mg_stats) { - ret.stats.emplace(ParseStats(mg_stats)); - } - if (const mg_value *mg_notifications = mg_map_at(summary, "notifications"); mg_notifications) { - ret.notification.emplace(ParseNotifications(mg_notifications)); + void process_row(mg_list const *row) override { + result.records.push_back(mg_memory::MakeCustomUnique(mg_list_copy(row))); + if (!result.records.back()) { + std::cerr << "out of memory"; + abort(); } } -} -auto build_handler_impl(query::QueryResult &ret, mg_session *session) { - enum states { - START, - RECEIVING, - DONE, - }; - return [session, state = states::START, &ret](int status, mg_result *result) mutable { - switch (status) { - case 0: - state = states::DONE; - break; - case 1: - break; - default: - if (mg_session_status(session) == MG_SESSION_BAD) { - throw utils::ClientFatalException(mg_session_error(session)); - } else { - throw utils::ClientQueryException(mg_session_error(session)); + void process_summary(mg_map const *summary) override { + if (summary && mg_map_size(summary) > 0) { + { + std::map execution_info; + for (auto key : {"cost_estimate", "parsing_time", "planning_time", "plan_execution_time"}) { + if (const mg_value *info = mg_map_at(summary, key); info) { + execution_info.emplace(key, ParseFloat(info)); + } + } + if (!execution_info.empty()) { + result.execution_info = execution_info; } - } - - switch (state) { - case states::START: { - const mg_list *header = mg_result_columns(result); - process_headers(ret, header); - state = states::RECEIVING; - [[fallthrough]]; } - case states::RECEIVING: { - const mg_list *row = mg_result_row(result); - process_row(ret, row); - return true; + + if (const mg_value *mg_stats = mg_map_at(summary, "stats"); mg_stats) { + result.stats.emplace(ParseStats(mg_stats)); } - case states::DONE: { - const mg_map *summary = mg_result_summary(result); - process_summary(ret, summary); - return false; + if (const mg_value *mg_notifications = mg_map_at(summary, "notifications"); mg_notifications) { + result.notification.emplace(ParseNotifications(mg_notifications)); } } - }; -} + } + + auto finish() -> query::QueryResult { + result.wall_time = std::chrono::system_clock::now() - start; + return std::move(result); + } + + void process_fatal() override { + std::cout << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << std::endl; + } + void process_query_error() override { + std::cout << "$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" << std::endl; + } + + private: + + query::QueryResult result; + std::chrono::time_point start; +}; + + } // namespace @@ -936,14 +920,8 @@ void PrintQueryInfo(const Query &query) { std::cout << "line: " << query.line_number << " index: " << query.index << " query: " << query.query << std::endl; } -auto build_handler(query::QueryResult &ret, mg_session *session) -> std::function { - return build_handler_impl(ret, session); -} - -std::chrono::duration ExecuteQueryEx(mg_session *session, const std::string &query, - std::function &&result_handler) { +void ExecuteQueryEx(mg_session *session, const std::string &query, QueryProcessor & processor) { int status = mg_session_run(session, query.c_str(), nullptr, nullptr, nullptr, nullptr); - auto start = std::chrono::system_clock::now(); if (status != 0) { if (mg_session_status(session) == MG_SESSION_BAD) { throw utils::ClientFatalException(mg_session_error(session)); @@ -972,13 +950,55 @@ std::chrono::duration ExecuteQueryEx(mg_session *session, const std::str } } + enum states { + START, + RECEIVING, + DONE, + }; + + auto result_handler = [session, state = states::START, &processor](int status, mg_result *result) mutable { + switch (status) { + case 0: + state = states::DONE; + break; + case 1: + break; + default: + if (mg_session_status(session) == MG_SESSION_BAD) { + processor.process_fatal(); + throw utils::ClientFatalException(mg_session_error(session)); + } else { + processor.process_query_error(); + throw utils::ClientQueryException(mg_session_error(session)); + } + } + + switch (state) { + case states::START: { + const mg_list *header = mg_result_columns(result); + processor.process_header(header); + state = states::RECEIVING; + [[fallthrough]]; + } + case states::RECEIVING: { + const mg_list *row = mg_result_row(result); + processor.process_row(row); + return true; + } + case states::DONE: { + const mg_map *summary = mg_result_summary(result); + processor.process_summary(summary); + return false; + } + } + }; + mg_result *result; while (true) { status = mg_session_fetch(session, &result); if (!result_handler(status, result)) break; } - return std::chrono::system_clock::now() - start; } void PrintBatchesInfo(const std::vector &batches) { @@ -1032,10 +1052,9 @@ BatchResult ExecuteBatch(mg_session *session, const Batch &batch) { return BatchResult{.is_executed = true}; } QueryResult ExecuteQuery(mg_session *session, const std::string &query) { - auto ret = query::QueryResult{}; - auto handler = build_handler(ret, session); - ret.wall_time = ExecuteQueryEx(session, query, std::move(handler)); - return ret; + auto processor = QueryResultProcessor{}; + ExecuteQueryEx(session, query, processor); + return processor.finish(); } } // namespace query diff --git a/src/utils/utils.hpp b/src/utils/utils.hpp index 76cbf8ce..cc1093a8 100644 --- a/src/utils/utils.hpp +++ b/src/utils/utils.hpp @@ -325,9 +325,21 @@ struct BatchResult { std::optional GetQuery(Replxx *replxx_instance, bool collect_info = false); -auto build_handler(query::QueryResult &ret, mg_session *session) -> std::function; +//auto build_handler(query::QueryResult &ret, mg_session *session) -> std::function; -std::chrono::duration ExecuteQueryEx(mg_session *session, const std::string &query, std::function &&result_handler); +struct QueryProcessor{ + virtual void process_header(mg_list const *header) = 0; + + virtual void process_row(mg_list const *row) =0; + + virtual void process_summary(mg_map const *summary)= 0; + + virtual void process_fatal() = 0; + virtual void process_query_error() = 0; + +}; + +void ExecuteQueryEx(mg_session *session, const std::string &query, QueryProcessor & processor); QueryResult ExecuteQuery(mg_session *session, const std::string &query); BatchResult ExecuteBatch(mg_session *session, const Batch &batch);