diff --git a/src/interactive.cpp b/src/interactive.cpp index 1a6688f9..27cf74d5 100644 --- a/src/interactive.cpp +++ b/src/interactive.cpp @@ -25,6 +25,15 @@ namespace mode::interactive { using namespace std::string_literals; + +//struct InteractiveQueryProcessor : query::QueryProcessor { +// void process_header(const mg_list *header) override {} +// void process_row(const mg_list *row) override {} +// void process_summary(const mg_map *summary) override {} +//}; + + + int Run(const utils::bolt::Config &bolt_config, const std::string &history, bool no_history, bool verbose_execution_info, const format::CsvOptions &csv_opts, const format::OutputOptions &output_opts) { Replxx *replxx_instance = InitAndSetupReplxx(); @@ -113,12 +122,16 @@ int Run(const utils::bolt::Config &bolt_config, const std::string &history, bool } try { - auto ret = query::ExecuteQuery(session.get(), query->query); - if (ret.records.size() > 0) { + //TODO: CSV Processor and CSV do not need to wait for all results +// auto processor = InteractiveQueryProcessor{}; + auto ret = query::ExecuteQuery/*Ex*/(session.get(), query->query); + + if (!ret.records.empty()) { + // HERE Output(ret.header, ret.records, output_opts, csv_opts); } std::string summary; - if (ret.records.size() == 0) { + if (ret.records.empty()) { summary = "Empty set"; } else if (ret.records.size() == 1) { summary = std::to_string(ret.records.size()) + " row in set"; diff --git a/src/utils/utils.cpp b/src/utils/utils.cpp index 2c4ef5c5..5ae6ac0a 100644 --- a/src/utils/utils.cpp +++ b/src/utils/utils.cpp @@ -524,6 +524,76 @@ std::map ParseNotifications(const mg_value *mg_notific double ParseFloat(const mg_value *mg_val_float) { return mg_value_float(mg_val_float); } + + + +struct QueryResultProcessor : query::QueryProcessor { + QueryResultProcessor() : start{std::chrono::system_clock::now()} {} + + void process_header(mg_list const *header) override { + for (uint32_t i = 0; i < mg_list_size(header); ++i) { + const mg_value *field = mg_list_at(header, i); + if (mg_value_get_type(field) == MG_VALUE_TYPE_STRING) { + result.header.emplace_back(mg_string_data(mg_value_string(field)), mg_string_size(mg_value_string(field))); + } else { + std::stringstream field_stream; + utils::PrintValue(field_stream, field); + result.header.push_back(field_stream.str()); + } + } + } + + void process_row(mg_list const *row) override { + result.records.push_back(mg_memory::MakeCustomUnique(mg_list_copy(row))); + if (!result.records.back()) { + std::cerr << "out of memory"; + abort(); + } + } + + void process_summary(mg_map const *summary) override { + if (summary && mg_map_size(summary) > 0) { + { + std::map execution_info; + for (auto key : {"cost_estimate", "parsing_time", "planning_time", "plan_execution_time"}) { + if (const mg_value *info = mg_map_at(summary, key); info) { + execution_info.emplace(key, ParseFloat(info)); + } + } + if (!execution_info.empty()) { + result.execution_info = execution_info; + } + } + + if (const mg_value *mg_stats = mg_map_at(summary, "stats"); mg_stats) { + result.stats.emplace(ParseStats(mg_stats)); + } + if (const mg_value *mg_notifications = mg_map_at(summary, "notifications"); mg_notifications) { + result.notification.emplace(ParseNotifications(mg_notifications)); + } + } + } + + auto finish() -> query::QueryResult { + result.wall_time = std::chrono::system_clock::now() - start; + return std::move(result); + } + + void process_fatal() override { + std::cout << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << std::endl; + } + void process_query_error() override { + std::cout << "$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" << std::endl; + } + + private: + + query::QueryResult result; + std::chrono::time_point start; +}; + + + } // namespace namespace console { @@ -850,9 +920,8 @@ void PrintQueryInfo(const Query &query) { std::cout << "line: " << query.line_number << " index: " << query.index << " query: " << query.query << std::endl; } -QueryResult ExecuteQuery(mg_session *session, const std::string &query) { +void ExecuteQueryEx(mg_session *session, const std::string &query, QueryProcessor & processor) { int status = mg_session_run(session, query.c_str(), nullptr, nullptr, nullptr, nullptr); - auto start = std::chrono::system_clock::now(); if (status != 0) { if (mg_session_status(session) == MG_SESSION_BAD) { throw utils::ClientFatalException(mg_session_error(session)); @@ -881,62 +950,55 @@ QueryResult ExecuteQuery(mg_session *session, const std::string &query) { } } - QueryResult ret; - mg_result *result; - while ((status = mg_session_fetch(session, &result)) == 1) { - ret.records.push_back(mg_memory::MakeCustomUnique(mg_list_copy(mg_result_row(result)))); - if (!ret.records.back()) { - std::cerr << "out of memory"; - std::abort(); - } - } - if (status != 0) { - if (mg_session_status(session) == MG_SESSION_BAD) { - throw utils::ClientFatalException(mg_session_error(session)); - } else { - throw utils::ClientQueryException(mg_session_error(session)); - } - } + enum states { + START, + RECEIVING, + DONE, + }; - { - const mg_list *header = mg_result_columns(result); - for (uint32_t i = 0; i < mg_list_size(header); ++i) { - const mg_value *field = mg_list_at(header, i); - if (mg_value_get_type(field) == MG_VALUE_TYPE_STRING) { - ret.header.push_back( - std::string(mg_string_data(mg_value_string(field)), mg_string_size(mg_value_string(field)))); - } else { - std::stringstream field_stream; - utils::PrintValue(field_stream, field); - ret.header.push_back(field_stream.str()); - } + auto result_handler = [session, state = states::START, &processor](int status, mg_result *result) mutable { + switch (status) { + case 0: + state = states::DONE; + break; + case 1: + break; + default: + if (mg_session_status(session) == MG_SESSION_BAD) { + processor.process_fatal(); + throw utils::ClientFatalException(mg_session_error(session)); + } else { + processor.process_query_error(); + throw utils::ClientQueryException(mg_session_error(session)); + } } - } - const mg_map *summary = mg_result_summary(result); - if (summary && mg_map_size(summary) > 0) { - { - std::map execution_info; - for (auto key : {"cost_estimate", "parsing_time", "planning_time", "plan_execution_time"}) { - if (const mg_value *info = mg_map_at(summary, key); info) { - execution_info.emplace(key, ParseFloat(info)); - } + switch (state) { + case states::START: { + const mg_list *header = mg_result_columns(result); + processor.process_header(header); + state = states::RECEIVING; + [[fallthrough]]; } - if (!execution_info.empty()) { - ret.execution_info = execution_info; + case states::RECEIVING: { + const mg_list *row = mg_result_row(result); + processor.process_row(row); + return true; + } + case states::DONE: { + const mg_map *summary = mg_result_summary(result); + processor.process_summary(summary); + return false; } } + }; - if (const mg_value *mg_stats = mg_map_at(summary, "stats"); mg_stats) { - ret.stats.emplace(ParseStats(mg_stats)); - } - if (const mg_value *mg_notifications = mg_map_at(summary, "notifications"); mg_notifications) { - ret.notification.emplace(ParseNotifications(mg_notifications)); - } - } + mg_result *result; - ret.wall_time = std::chrono::system_clock::now() - start; - return ret; + while (true) { + status = mg_session_fetch(session, &result); + if (!result_handler(status, result)) break; + } } void PrintBatchesInfo(const std::vector &batches) { @@ -989,6 +1051,11 @@ BatchResult ExecuteBatch(mg_session *session, const Batch &batch) { } return BatchResult{.is_executed = true}; } +QueryResult ExecuteQuery(mg_session *session, const std::string &query) { + auto processor = QueryResultProcessor{}; + ExecuteQueryEx(session, query, processor); + return processor.finish(); +} } // namespace query diff --git a/src/utils/utils.hpp b/src/utils/utils.hpp index 2ed8f7e4..cc1093a8 100644 --- a/src/utils/utils.hpp +++ b/src/utils/utils.hpp @@ -29,6 +29,9 @@ #endif /* _WIN32 */ #include "mgclient.h" + +#include + #include "replxx.h" #include "query_type.hpp" @@ -321,6 +324,23 @@ struct BatchResult { // The extra part is perserved for the next GetQuery call std::optional GetQuery(Replxx *replxx_instance, bool collect_info = false); + +//auto build_handler(query::QueryResult &ret, mg_session *session) -> std::function; + +struct QueryProcessor{ + virtual void process_header(mg_list const *header) = 0; + + virtual void process_row(mg_list const *row) =0; + + virtual void process_summary(mg_map const *summary)= 0; + + virtual void process_fatal() = 0; + virtual void process_query_error() = 0; + +}; + +void ExecuteQueryEx(mg_session *session, const std::string &query, QueryProcessor & processor); + QueryResult ExecuteQuery(mg_session *session, const std::string &query); BatchResult ExecuteBatch(mg_session *session, const Batch &batch);