diff --git a/include/bitcoin/network/beast.hpp b/include/bitcoin/network/beast.hpp index 27dac5c54..06c25611d 100644 --- a/include/bitcoin/network/beast.hpp +++ b/include/bitcoin/network/beast.hpp @@ -24,6 +24,10 @@ #include #include #include + +// The default is 4096, which hits the variant http_body. +#define BOOST_BEAST_FILE_BUFFER_SIZE 1024 + #include #include #include diff --git a/include/bitcoin/network/channels/channel_rpc.hpp b/include/bitcoin/network/channels/channel_rpc.hpp index 9120e1569..3715ca1e1 100644 --- a/include/bitcoin/network/channels/channel_rpc.hpp +++ b/include/bitcoin/network/channels/channel_rpc.hpp @@ -64,6 +64,9 @@ class channel_rpc result_handler&& handler) NOEXCEPT; inline void send_result(rpc::value_t&& result, size_t size_hint, result_handler&& handler) NOEXCEPT; + inline void send_notification(const rpc::string_t& method, + rpc::params_t&& notification, size_t size_hint, + result_handler&& handler) NOEXCEPT; /// Resume reading from the socket (requires strand). inline void resume() NOEXCEPT override; @@ -71,9 +74,21 @@ class channel_rpc protected: /// Serialize and write response to client (requires strand). /// Completion handler is always invoked on the channel strand. - inline void send(rpc::response_t&& message, size_t size_hint, + template + inline void send(Message&& message, size_t size_hint, result_handler&& handler) NOEXCEPT; + /// Size and assign response_buffer_ (value type is json-rpc::json). + template + inline rpc::message_ptr assign_message(Message&& message, + size_t size_hint) NOEXCEPT; + + /// Handle send completion, invokes receive(). + template + inline void handle_send(const code& ec, size_t bytes, + const rpc::message_cptr& message, + const result_handler& handler) NOEXCEPT; + /// Stranded handler invoked from stop(). inline void stopping(const code& ec) NOEXCEPT override; @@ -83,10 +98,6 @@ class channel_rpc /// Override to dispatch request to subscribers by requested method. virtual inline void dispatch(const rpc::request_cptr& request) NOEXCEPT; - /// Size and assign response_buffer_ (value type is json-rpc::json). - virtual inline rpc::response_ptr assign_message(rpc::response_t&& message, - size_t size_hint) NOEXCEPT; - /// Must call after successful message handling if no stop. virtual inline void receive() NOEXCEPT; @@ -94,11 +105,6 @@ class channel_rpc virtual inline void handle_receive(const code& ec, size_t bytes, const rpc::request_cptr& request) NOEXCEPT; - /// Handle send completion, invokes receive(). - virtual inline void handle_send(const code& ec, size_t bytes, - const rpc::response_cptr& response, - const result_handler& handler) NOEXCEPT; - private: // These are protected by strand. rpc::version version_; diff --git a/include/bitcoin/network/impl/channels/channel_rpc.ipp b/include/bitcoin/network/impl/channels/channel_rpc.ipp index 9f3f4a6b8..2c7d6308c 100644 --- a/include/bitcoin/network/impl/channels/channel_rpc.ipp +++ b/include/bitcoin/network/impl/channels/channel_rpc.ipp @@ -140,40 +140,15 @@ inline http::flat_buffer& CLASS::request_buffer() NOEXCEPT // Send. // ---------------------------------------------------------------------------- -TEMPLATE -void CLASS::send_code(const code& ec, result_handler&& handler) NOEXCEPT -{ - send_error({ .code = ec.value(), .message = ec.message() }, - std::move(handler)); -} - -TEMPLATE -void CLASS::send_error(rpc::result_t&& error, - result_handler&& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - const auto hint = two * error.message.size(); - send({ .jsonrpc = version_, .id = identity_, .error = std::move(error) }, - hint, std::move(handler)); -} - -TEMPLATE -void CLASS::send_result(rpc::value_t&& result, size_t size_hint, - result_handler&& handler) NOEXCEPT -{ - BC_ASSERT(stranded()); - send({ .jsonrpc = version_, .id = identity_, .result = std::move(result) }, - size_hint, std::move(handler)); -} - // protected TEMPLATE -inline void CLASS::send(rpc::response_t&& model, size_t size_hint, +template +inline void CLASS::send(Message&& model, size_t size_hint, result_handler&& handler) NOEXCEPT { BC_ASSERT(stranded()); const auto out = assign_message(std::move(model), size_hint); - count_handler complete = std::bind(&CLASS::handle_send, + count_handler complete = std::bind(&CLASS::handle_send, shared_from_base(), _1, _2, out, std::move(handler)); if (!out) @@ -187,8 +162,24 @@ inline void CLASS::send(rpc::response_t&& model, size_t size_hint, // protected TEMPLATE +template +inline rpc::message_ptr CLASS::assign_message(Message&& message, + size_t size_hint) NOEXCEPT +{ + BC_ASSERT(stranded()); + response_buffer_->max_size(size_hint); + const auto ptr = system::to_shared>(); + ptr->message = std::move(message); + ptr->buffer = response_buffer_; + return ptr; +} + +// protected +TEMPLATE +template inline void CLASS::handle_send(const code& ec, size_t bytes, - const rpc::response_cptr& response, const result_handler& handler) NOEXCEPT + const rpc::message_cptr& message, + const result_handler& handler) NOEXCEPT { BC_ASSERT(stranded()); if (ec) stop(ec); @@ -196,24 +187,71 @@ inline void CLASS::handle_send(const code& ec, size_t bytes, // Typically a noop, but handshake may pause channel here. handler(ec); - LOGA("Rpc response: (" << bytes << ") bytes [" << endpoint() << "] " - << response->message.error.value_or(rpc::result_t{}).message); + if constexpr (is_same_type) + { + LOGA("Rpc response: (" << bytes << ") bytes [" << endpoint() << "] " + << message->message.error.value_or(rpc::result_t{}).message); - // Continue read loop (does not unpause or restart channel). - receive(); + // Continue the read loop (does not unpause or restart). + receive(); + } + else + { + LOGA("Rpc notification: (" << bytes << ") bytes [" << endpoint() << "] " + << message->message.method); + } } -// private TEMPLATE -inline rpc::response_ptr CLASS::assign_message(rpc::response_t&& message, - size_t size_hint) NOEXCEPT +inline void CLASS::send_code(const code& ec, result_handler&& handler) NOEXCEPT +{ + send_error( + { + .code = ec.value(), + .message = ec.message() + }, + std::move(handler)); +} + +TEMPLATE +inline void CLASS::send_error(rpc::result_t&& error, + result_handler&& handler) NOEXCEPT { BC_ASSERT(stranded()); - response_buffer_->max_size(size_hint); - const auto ptr = system::to_shared(); - ptr->message = std::move(message); - ptr->buffer = response_buffer_; - return ptr; + const auto hint = two * error.message.size(); + send(rpc::response_t + { + .jsonrpc = version_, + .id = identity_, + .error = std::move(error) + }, hint, std::move(handler)); +} + +TEMPLATE +inline void CLASS::send_result(rpc::value_t&& result, size_t size_hint, + result_handler&& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + send(rpc::response_t + { + .jsonrpc = version_, + .id = identity_, + .result = std::move(result) + }, size_hint, std::move(handler)); +} + +TEMPLATE +inline void CLASS::send_notification(const rpc::string_t& method, + rpc::params_t&& notification, size_t size_hint, + result_handler&& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + send(rpc::request_t + { + .jsonrpc = version_, + .method = method, + .params = std::move(notification) + }, size_hint, std::move(handler)); } BC_POP_WARNING() diff --git a/include/bitcoin/network/impl/protocols/protocol_rpc.ipp b/include/bitcoin/network/impl/protocols/protocol_rpc.ipp index 7ed48681b..3ba1a3f55 100644 --- a/include/bitcoin/network/impl/protocols/protocol_rpc.ipp +++ b/include/bitcoin/network/impl/protocols/protocol_rpc.ipp @@ -69,6 +69,15 @@ inline void CLASS::send_result(rpc::value_t&& result, size_t size_hint, channel_->send_result(std::move(result), size_hint, std::move(handler)); } +TEMPLATE +inline void CLASS::send_notification(const rpc::string_t& method, + rpc::params_t&& notification, size_t size_hint, + result_handler&& handler) NOEXCEPT +{ + channel_->send_notification(method, std::move(notification), size_hint, + std::move(handler)); +} + } // namespace network } // namespace libbitcoin diff --git a/include/bitcoin/network/messages/http_body.hpp b/include/bitcoin/network/messages/http_body.hpp index 45377ab7c..a3fc52230 100644 --- a/include/bitcoin/network/messages/http_body.hpp +++ b/include/bitcoin/network/messages/http_body.hpp @@ -51,6 +51,8 @@ using body_reader = std::variant rpc::reader >; +// TODO: file_writer is eating 4k stack for each type. +// BOOST_BEAST_FILE_BUFFER_SIZE set to 1024 in beast.hpp. using empty_writer = http::empty_body::writer; using data_writer = http::chunk_body::writer; using file_writer = http::file_body::writer; @@ -60,15 +62,15 @@ using string_writer = http::string_body::writer; using json_writer = http::json_body::writer; using body_writer = std::variant < - std::monostate, - empty_writer, - data_writer, - file_writer, - span_writer, - buffer_writer, - string_writer, - json_writer, - rpc::writer + std::monostate, // 1 byte + empty_writer, // 1 byte + data_writer, // 8 bytes + file_writer, // 1,040 bytes! (4,112 bytes by default) + span_writer, // 8 bytes + buffer_writer, // 16 bytes + string_writer, // 8 bytes + json_writer, // 136 bytes! + rpc::writer // 144 bytes! >; using empty_value = http::empty_body::value_type; diff --git a/include/bitcoin/network/messages/rpc/body.hpp b/include/bitcoin/network/messages/rpc/body.hpp index 099c41b48..8f66643a4 100644 --- a/include/bitcoin/network/messages/rpc/body.hpp +++ b/include/bitcoin/network/messages/rpc/body.hpp @@ -114,6 +114,14 @@ using response_cptr = std::shared_ptr; using response_ptr = std::shared_ptr; using writer = response_body::writer; +// Allows request to be sent (for notifications). +template +using message_value = typename body::value_type; +template +using message_ptr = std::shared_ptr>; +template +using message_cptr = std::shared_ptr>; + } // namespace rpc } // namespace network } // namespace libbitcoin diff --git a/include/bitcoin/network/net/proxy.hpp b/include/bitcoin/network/net/proxy.hpp index f153df22b..f4bcb9c79 100644 --- a/include/bitcoin/network/net/proxy.hpp +++ b/include/bitcoin/network/net/proxy.hpp @@ -143,6 +143,10 @@ class BCT_API proxy virtual void write(rpc::response& response, count_handler&& handler) NOEXCEPT; + /// Write rpc notification (request) to the socket (json buffer in body). + virtual void write(rpc::request& notification, + count_handler&& handler) NOEXCEPT; + /// WS (generic). /// ----------------------------------------------------------------------- diff --git a/include/bitcoin/network/net/socket.hpp b/include/bitcoin/network/net/socket.hpp index 5f5e5f62b..1a135fcbd 100644 --- a/include/bitcoin/network/net/socket.hpp +++ b/include/bitcoin/network/net/socket.hpp @@ -132,6 +132,10 @@ class BCT_API socket virtual void rpc_write(rpc::response& response, count_handler&& handler) NOEXCEPT; + /// Write rpc notification to the socket, handler posted to socket strand. + virtual void rpc_notify(rpc::request& notification, + count_handler&& handler) NOEXCEPT; + /// WS (generic). /// ----------------------------------------------------------------------- @@ -230,7 +234,7 @@ class BCT_API socket } rpc::request& value; - rpc::reader reader; + rpc::request_body::reader reader; http::flat_buffer& buffer; }; @@ -245,7 +249,21 @@ class BCT_API socket } rpc::response& value; - rpc::writer writer; + rpc::response_body::writer writer; + }; + + struct notify_rpc + { + typedef std::shared_ptr ptr; + using out_buffer = rpc::writer::out_buffer; + + notify_rpc(rpc::request& request) NOEXCEPT + : value{ request }, writer{ value } + { + } + + rpc::request& value; + rpc::request_body::writer writer; }; // do @@ -276,6 +294,8 @@ class BCT_API socket const count_handler& handler) NOEXCEPT; void do_rpc_write(boost_code ec, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; + void do_rpc_notify(boost_code ec, size_t total, const notify_rpc::ptr& out, + const count_handler& handler) NOEXCEPT; // ws (generic) void do_ws_read(ref out, @@ -322,6 +342,8 @@ class BCT_API socket const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT; void handle_rpc_write(boost_code ec, size_t size, size_t total, const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT; + void handle_rpc_notify(boost_code ec, size_t size, size_t total, + const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT; // ws (generic) void handle_ws_read(const boost_code& ec, size_t size, diff --git a/include/bitcoin/network/protocols/protocol_rpc.hpp b/include/bitcoin/network/protocols/protocol_rpc.hpp index 932f53de7..c43725dfc 100644 --- a/include/bitcoin/network/protocols/protocol_rpc.hpp +++ b/include/bitcoin/network/protocols/protocol_rpc.hpp @@ -61,6 +61,9 @@ class protocol_rpc result_handler&& handler) NOEXCEPT; virtual inline void send_result(rpc::value_t&& result, size_t size_hint, result_handler&& handler) NOEXCEPT; + virtual inline void send_notification(const rpc::string_t& method, + rpc::params_t&& notification, size_t size_hint, + result_handler&& handler) NOEXCEPT; /// Default noop completion handler. virtual inline void complete(const code&) NOEXCEPT {}; diff --git a/src/messages/rpc/body.cpp b/src/messages/rpc/body.cpp index 3a3f9f430..9656bffc2 100644 --- a/src/messages/rpc/body.cpp +++ b/src/messages/rpc/body.cpp @@ -35,7 +35,7 @@ BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) BC_PUSH_WARNING(NO_UNGUARDED_POINTERS) BC_PUSH_WARNING(NO_POINTER_ARITHMETIC) -// rpc::body::reader +// rpc::body::reader // ---------------------------------------------------------------------------- template <> @@ -140,6 +140,9 @@ finish(boost_code& ec) NOEXCEPT } } +// rpc::body::reader (unused) +// ---------------------------------------------------------------------------- + template <> size_t body::reader:: put(const buffer_type&, boost_code&) NOEXCEPT @@ -163,7 +166,7 @@ done() const NOEXCEPT return {}; } -// rpc::body::writer +// rpc::body::writer // ---------------------------------------------------------------------------- template <> @@ -223,28 +226,64 @@ done() const NOEXCEPT return base::writer::done() && (!terminate_ || set_terminator_); } +// rpc::body::writer +// ---------------------------------------------------------------------------- + template <> void body::writer:: -init(boost_code&) NOEXCEPT +init(boost_code& ec) NOEXCEPT { - BC_ASSERT(false); + base::writer::init(ec); + if (ec) return; + + try + { + boost::json::value_from(value_.message, value_.model); + } + catch (const boost::system::system_error& e) + { + // Primary exception type for parsing operations. + ec = e.code(); + return; + } + catch (...) + { + ec = code{ error::jsonrpc_writer_exception }; + return; + } + + set_terminator_ = false; + serializer_.reset(&value_.model); } template <> body::writer::out_buffer body::writer:: -get(boost_code&) NOEXCEPT +get(boost_code& ec) NOEXCEPT { - BC_ASSERT(false); - return {}; + auto out = base::writer::done() ? out_buffer{} : base::writer::get(ec); + if (ec) return out; + + // Override json reader !more so terminator can be added. + if (out.has_value()) + { + out.value().second = true; + return out; + } + + // Add terminator and signal done. + set_terminator_ = true; + using namespace boost::asio; + static constexpr auto line = '\n'; + return out_buffer{ std::make_pair(buffer(&line, sizeof(line)), false) }; } template <> bool body::writer:: done() const NOEXCEPT { - BC_ASSERT(false); - return {}; + // Done is redundant with !out.second, but provides a cleaner interface. + return base::writer::done() && (!terminate_ || set_terminator_); } BC_POP_WARNING() diff --git a/src/net/proxy.cpp b/src/net/proxy.cpp index d9f0defec..bc3b7c633 100644 --- a/src/net/proxy.cpp +++ b/src/net/proxy.cpp @@ -215,10 +215,16 @@ void proxy::read(http::flat_buffer& buffer, rpc::request& request, void proxy::write(rpc::response& response, count_handler&& handler) NOEXCEPT { - // TODO: compose? + // TODO: compose (potentially full duplex). socket_->rpc_write(response, std::move(handler)); } +void proxy::write(rpc::request& notification, count_handler&& handler) NOEXCEPT +{ + // TODO: compose (full duplex). + socket_->rpc_notify(notification, std::move(handler)); +} + // WS (generic). // ---------------------------------------------------------------------------- @@ -230,6 +236,7 @@ void proxy::ws_read(http::flat_buffer& out, count_handler&& handler) NOEXCEPT void proxy::ws_write(const asio::const_buffer& in, bool binary, count_handler&& handler) NOEXCEPT { + // TODO: compose (potentially full duplex). socket_->ws_write(in, binary, std::move(handler)); } diff --git a/src/net/socket_rpc.cpp b/src/net/socket_rpc.cpp index 30a877188..343c18a17 100644 --- a/src/net/socket_rpc.cpp +++ b/src/net/socket_rpc.cpp @@ -178,6 +178,70 @@ void socket::handle_rpc_write(boost_code ec, size_t size, size_t total, do_rpc_write(ec, total, out, handler); } +/// RPC Notify. +// ---------------------------------------------------------------------------- +// This is identical to 'RPC Write' apart from request and notify_rpc types. + +void socket::rpc_notify(rpc::request& notification, + count_handler&& handler) NOEXCEPT +{ + boost_code ec{}; + const auto out = emplace_shared(notification); + out->writer.init(ec); + + // Dispatch success or fail, for handler invoke on strand. + boost::asio::dispatch(strand_, + std::bind(&socket::do_rpc_notify, + shared_from_this(), ec, zero, out, std::move(handler))); +} + +void socket::do_rpc_notify(boost_code ec, size_t total, + const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + + const auto buffer = ec ? notify_rpc::out_buffer{} : out->writer.get(ec); + if (ec) + { + // Json serializer emits rpc, http and json codes. + const auto code = error::rpc_to_error_code(ec); + if (code == error::unknown) logx("rpc-notify", ec); + handler(code, total); + return; + } + + BC_ASSERT(buffer.has_value()); + + // Internally this may compose multiple async_write_some to consume buffer. + // Writes one buffer from writer, must still iterator until writer is done. + VARIANT_DISPATCH_FUNCTION(boost::asio::async_write, get_tcp(), + buffer.value().first, + std::bind(&socket::handle_rpc_notify, + shared_from_this(), _1, _2, total, out, handler)); +} + +void socket::handle_rpc_notify(boost_code ec, size_t size, size_t total, + const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT +{ + BC_ASSERT(stranded()); + + total = ceilinged_add(total, size); + if (error::asio_is_canceled(ec)) + { + handler(error::channel_stopped, total); + return; + } + + if (!ec && out->writer.done()) + { + handler(error::success, total); + return; + } + + // Handle error condition or incomplete message. + do_rpc_notify(ec, total, out, handler); +} + BC_POP_WARNING() BC_POP_WARNING() BC_POP_WARNING()