Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/bitcoin/network/beast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
#include <optional>
#include <memory>
#include <utility>

// The default is 4096, which hits the variant http_body.
#define BOOST_BEAST_FILE_BUFFER_SIZE 1024

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/websocket.hpp>
Expand Down
26 changes: 16 additions & 10 deletions include/bitcoin/network/channels/channel_rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,31 @@ class channel_rpc
result_handler&& handler) NOEXCEPT;
inline void send_result(rpc::value_t&& result, size_t size_hint,
result_handler&& handler) NOEXCEPT;
inline void send_notification(const rpc::string_t& method,
rpc::params_t&& notification, size_t size_hint,
result_handler&& handler) NOEXCEPT;

/// Resume reading from the socket (requires strand).
inline void resume() NOEXCEPT override;

protected:
/// Serialize and write response to client (requires strand).
/// Completion handler is always invoked on the channel strand.
inline void send(rpc::response_t&& message, size_t size_hint,
template <typename Message>
inline void send(Message&& message, size_t size_hint,
result_handler&& handler) NOEXCEPT;

/// Size and assign response_buffer_ (value type is json-rpc::json).
template <typename Message>
inline rpc::message_ptr<Message> assign_message(Message&& message,
size_t size_hint) NOEXCEPT;

/// Handle send<response> completion, invokes receive().
template <typename Message>
inline void handle_send(const code& ec, size_t bytes,
const rpc::message_cptr<Message>& message,
const result_handler& handler) NOEXCEPT;

/// Stranded handler invoked from stop().
inline void stopping(const code& ec) NOEXCEPT override;

Expand All @@ -83,22 +98,13 @@ class channel_rpc
/// Override to dispatch request to subscribers by requested method.
virtual inline void dispatch(const rpc::request_cptr& request) NOEXCEPT;

/// Size and assign response_buffer_ (value type is json-rpc::json).
virtual inline rpc::response_ptr assign_message(rpc::response_t&& message,
size_t size_hint) NOEXCEPT;

/// Must call after successful message handling if no stop.
virtual inline void receive() NOEXCEPT;

/// Handle incoming messages.
virtual inline void handle_receive(const code& ec, size_t bytes,
const rpc::request_cptr& request) NOEXCEPT;

/// Handle send completion, invokes receive().
virtual inline void handle_send(const code& ec, size_t bytes,
const rpc::response_cptr& response,
const result_handler& handler) NOEXCEPT;

private:
// These are protected by strand.
rpc::version version_;
Expand Down
120 changes: 79 additions & 41 deletions include/bitcoin/network/impl/channels/channel_rpc.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -140,40 +140,15 @@ inline http::flat_buffer& CLASS::request_buffer() NOEXCEPT
// Send.
// ----------------------------------------------------------------------------

TEMPLATE
void CLASS::send_code(const code& ec, result_handler&& handler) NOEXCEPT
{
send_error({ .code = ec.value(), .message = ec.message() },
std::move(handler));
}

TEMPLATE
void CLASS::send_error(rpc::result_t&& error,
result_handler&& handler) NOEXCEPT
{
BC_ASSERT(stranded());
const auto hint = two * error.message.size();
send({ .jsonrpc = version_, .id = identity_, .error = std::move(error) },
hint, std::move(handler));
}

TEMPLATE
void CLASS::send_result(rpc::value_t&& result, size_t size_hint,
result_handler&& handler) NOEXCEPT
{
BC_ASSERT(stranded());
send({ .jsonrpc = version_, .id = identity_, .result = std::move(result) },
size_hint, std::move(handler));
}

// protected
TEMPLATE
inline void CLASS::send(rpc::response_t&& model, size_t size_hint,
template <typename Message>
inline void CLASS::send(Message&& model, size_t size_hint,
result_handler&& handler) NOEXCEPT
{
BC_ASSERT(stranded());
const auto out = assign_message(std::move(model), size_hint);
count_handler complete = std::bind(&CLASS::handle_send,
count_handler complete = std::bind(&CLASS::handle_send<Message>,
shared_from_base<CLASS>(), _1, _2, out, std::move(handler));

if (!out)
Expand All @@ -187,33 +162,96 @@ inline void CLASS::send(rpc::response_t&& model, size_t size_hint,

// protected
TEMPLATE
template <typename Message>
inline rpc::message_ptr<Message> CLASS::assign_message(Message&& message,
size_t size_hint) NOEXCEPT
{
BC_ASSERT(stranded());
response_buffer_->max_size(size_hint);
const auto ptr = system::to_shared<rpc::message_value<Message>>();
ptr->message = std::move(message);
ptr->buffer = response_buffer_;
return ptr;
}

// protected
TEMPLATE
template <typename Message>
inline void CLASS::handle_send(const code& ec, size_t bytes,
const rpc::response_cptr& response, const result_handler& handler) NOEXCEPT
const rpc::message_cptr<Message>& message,
const result_handler& handler) NOEXCEPT
{
BC_ASSERT(stranded());
if (ec) stop(ec);

// Typically a noop, but handshake may pause channel here.
handler(ec);

LOGA("Rpc response: (" << bytes << ") bytes [" << endpoint() << "] "
<< response->message.error.value_or(rpc::result_t{}).message);
if constexpr (is_same_type<Message, rpc::response_t>)
{
LOGA("Rpc response: (" << bytes << ") bytes [" << endpoint() << "] "
<< message->message.error.value_or(rpc::result_t{}).message);

// Continue read loop (does not unpause or restart channel).
receive();
// Continue the read loop (does not unpause or restart).
receive();
}
else
{
LOGA("Rpc notification: (" << bytes << ") bytes [" << endpoint() << "] "
<< message->message.method);
}
}

// private
TEMPLATE
inline rpc::response_ptr CLASS::assign_message(rpc::response_t&& message,
size_t size_hint) NOEXCEPT
inline void CLASS::send_code(const code& ec, result_handler&& handler) NOEXCEPT
{
send_error(
{
.code = ec.value(),
.message = ec.message()
},
std::move(handler));
}

TEMPLATE
inline void CLASS::send_error(rpc::result_t&& error,
result_handler&& handler) NOEXCEPT
{
BC_ASSERT(stranded());
response_buffer_->max_size(size_hint);
const auto ptr = system::to_shared<rpc::response>();
ptr->message = std::move(message);
ptr->buffer = response_buffer_;
return ptr;
const auto hint = two * error.message.size();
send(rpc::response_t
{
.jsonrpc = version_,
.id = identity_,
.error = std::move(error)
}, hint, std::move(handler));
}

TEMPLATE
inline void CLASS::send_result(rpc::value_t&& result, size_t size_hint,
result_handler&& handler) NOEXCEPT
{
BC_ASSERT(stranded());
send(rpc::response_t
{
.jsonrpc = version_,
.id = identity_,
.result = std::move(result)
}, size_hint, std::move(handler));
}

TEMPLATE
inline void CLASS::send_notification(const rpc::string_t& method,
rpc::params_t&& notification, size_t size_hint,
result_handler&& handler) NOEXCEPT
{
BC_ASSERT(stranded());
send(rpc::request_t
{
.jsonrpc = version_,
.method = method,
.params = std::move(notification)
}, size_hint, std::move(handler));
}

BC_POP_WARNING()
Expand Down
9 changes: 9 additions & 0 deletions include/bitcoin/network/impl/protocols/protocol_rpc.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ inline void CLASS::send_result(rpc::value_t&& result, size_t size_hint,
channel_->send_result(std::move(result), size_hint, std::move(handler));
}

TEMPLATE
inline void CLASS::send_notification(const rpc::string_t& method,
rpc::params_t&& notification, size_t size_hint,
result_handler&& handler) NOEXCEPT
{
channel_->send_notification(method, std::move(notification), size_hint,
std::move(handler));
}

} // namespace network
} // namespace libbitcoin

Expand Down
20 changes: 11 additions & 9 deletions include/bitcoin/network/messages/http_body.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ using body_reader = std::variant
rpc::reader
>;

// TODO: file_writer is eating 4k stack for each type.
// BOOST_BEAST_FILE_BUFFER_SIZE set to 1024 in beast.hpp.
using empty_writer = http::empty_body::writer;
using data_writer = http::chunk_body::writer;
using file_writer = http::file_body::writer;
Expand All @@ -60,15 +62,15 @@ using string_writer = http::string_body::writer;
using json_writer = http::json_body::writer;
using body_writer = std::variant
<
std::monostate,
empty_writer,
data_writer,
file_writer,
span_writer,
buffer_writer,
string_writer,
json_writer,
rpc::writer
std::monostate, // 1 byte
empty_writer, // 1 byte
data_writer, // 8 bytes
file_writer, // 1,040 bytes! (4,112 bytes by default)
span_writer, // 8 bytes
buffer_writer, // 16 bytes
string_writer, // 8 bytes
json_writer, // 136 bytes!
rpc::writer // 144 bytes!
>;

using empty_value = http::empty_body::value_type;
Expand Down
8 changes: 8 additions & 0 deletions include/bitcoin/network/messages/rpc/body.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ using response_cptr = std::shared_ptr<const response>;
using response_ptr = std::shared_ptr<response>;
using writer = response_body::writer;

// Allows request to be sent (for notifications).
template <typename Message>
using message_value = typename body<Message>::value_type;
template <typename Message>
using message_ptr = std::shared_ptr<message_value<Message>>;
template <typename Message>
using message_cptr = std::shared_ptr<const message_value<Message>>;

} // namespace rpc
} // namespace network
} // namespace libbitcoin
Expand Down
4 changes: 4 additions & 0 deletions include/bitcoin/network/net/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class BCT_API proxy
virtual void write(rpc::response& response,
count_handler&& handler) NOEXCEPT;

/// Write rpc notification (request) to the socket (json buffer in body).
virtual void write(rpc::request& notification,
count_handler&& handler) NOEXCEPT;

/// WS (generic).
/// -----------------------------------------------------------------------

Expand Down
26 changes: 24 additions & 2 deletions include/bitcoin/network/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ class BCT_API socket
virtual void rpc_write(rpc::response& response,
count_handler&& handler) NOEXCEPT;

/// Write rpc notification to the socket, handler posted to socket strand.
virtual void rpc_notify(rpc::request& notification,
count_handler&& handler) NOEXCEPT;

/// WS (generic).
/// -----------------------------------------------------------------------

Expand Down Expand Up @@ -230,7 +234,7 @@ class BCT_API socket
}

rpc::request& value;
rpc::reader reader;
rpc::request_body::reader reader;
http::flat_buffer& buffer;
};

Expand All @@ -245,7 +249,21 @@ class BCT_API socket
}

rpc::response& value;
rpc::writer writer;
rpc::response_body::writer writer;
};

struct notify_rpc
{
typedef std::shared_ptr<notify_rpc> ptr;
using out_buffer = rpc::writer::out_buffer;

notify_rpc(rpc::request& request) NOEXCEPT
: value{ request }, writer{ value }
{
}

rpc::request& value;
rpc::request_body::writer writer;
};

// do
Expand Down Expand Up @@ -276,6 +294,8 @@ class BCT_API socket
const count_handler& handler) NOEXCEPT;
void do_rpc_write(boost_code ec, size_t total, const write_rpc::ptr& out,
const count_handler& handler) NOEXCEPT;
void do_rpc_notify(boost_code ec, size_t total, const notify_rpc::ptr& out,
const count_handler& handler) NOEXCEPT;

// ws (generic)
void do_ws_read(ref<http::flat_buffer> out,
Expand Down Expand Up @@ -322,6 +342,8 @@ class BCT_API socket
const read_rpc::ptr& in, const count_handler& handler) NOEXCEPT;
void handle_rpc_write(boost_code ec, size_t size, size_t total,
const write_rpc::ptr& out, const count_handler& handler) NOEXCEPT;
void handle_rpc_notify(boost_code ec, size_t size, size_t total,
const notify_rpc::ptr& out, const count_handler& handler) NOEXCEPT;

// ws (generic)
void handle_ws_read(const boost_code& ec, size_t size,
Expand Down
3 changes: 3 additions & 0 deletions include/bitcoin/network/protocols/protocol_rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class protocol_rpc
result_handler&& handler) NOEXCEPT;
virtual inline void send_result(rpc::value_t&& result, size_t size_hint,
result_handler&& handler) NOEXCEPT;
virtual inline void send_notification(const rpc::string_t& method,
rpc::params_t&& notification, size_t size_hint,
result_handler&& handler) NOEXCEPT;

/// Default noop completion handler.
virtual inline void complete(const code&) NOEXCEPT {};
Expand Down
Loading
Loading