diff --git a/example/http_c++/http_client.cpp b/example/http_c++/http_client.cpp index 23222dee9b..7df7461135 100644 --- a/example/http_c++/http_client.cpp +++ b/example/http_c++/http_client.cpp @@ -25,8 +25,11 @@ #include #include #include +#include "bthread/countdown_event.h" DEFINE_string(d, "", "POST this data to the http server"); +DEFINE_bool(progressive, false, "whether or not progressive read data from server"); +DEFINE_int32(progressive_read_timeout_ms, 5000, "progressive read data idle timeout in milliseconds"); DEFINE_string(load_balancer, "", "The algorithm for load balancing"); DEFINE_int32(timeout_ms, 2000, "RPC timeout in milliseconds"); DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); @@ -36,6 +39,25 @@ namespace brpc { DECLARE_bool(http_verbose); } +class PartDataReader: public brpc::ProgressiveReader { +public: + explicit PartDataReader(bthread::CountdownEvent* done): _done(done){} + + butil::Status OnReadOnePart(const void* data, size_t length) { + memcpy(_buffer, data, length); + LOG(INFO) << "data : " << _buffer << " size : " << length; + return butil::Status::OK(); + } + + void OnEndOfMessage(const butil::Status& status) { + _done->signal(); + LOG(INFO) << "progressive read data final status : " << status; + } +private: + char _buffer[1024]; + bthread::CountdownEvent* _done; +}; + int main(int argc, char* argv[]) { // Parse gflags. We recommend you to use gflags as well. GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); @@ -71,6 +93,11 @@ int main(int argc, char* argv[]) { cntl.request_attachment().append(FLAGS_d); } + if (FLAGS_progressive) { + cntl.set_progressive_read_timeout_ms(FLAGS_progressive_read_timeout_ms); + cntl.response_will_be_read_progressively(); + } + // Because `done'(last parameter) is NULL, this function waits until // the response comes back or error occurs(including timedout). channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); @@ -78,6 +105,13 @@ int main(int argc, char* argv[]) { std::cerr << cntl.ErrorText() << std::endl; return -1; } + + if (FLAGS_progressive) { + bthread::CountdownEvent done(1); + cntl.ReadProgressiveAttachmentBy(new PartDataReader(&done)); + done.wait(); + LOG(INFO) << "wait client progressive read done safely"; + } // If -http_verbose is on, brpc already prints the response to stderr. if (!brpc::FLAGS_http_verbose) { std::cout << cntl.response_attachment() << std::endl; diff --git a/example/http_c++/http_server.cpp b/example/http_c++/http_server.cpp index 05c9a0ee4c..3cc4c63f86 100644 --- a/example/http_c++/http_server.cpp +++ b/example/http_c++/http_server.cpp @@ -31,6 +31,7 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " DEFINE_string(certificate, "cert.pem", "Certificate file path to enable SSL"); DEFINE_string(private_key, "key.pem", "Private key file path to enable SSL"); DEFINE_string(ciphers, "", "Cipher suite used for SSL connections"); +DEFINE_bool(enable_progressive_timeout, false, "whether or not trigger progressive write attachement data timeout"); namespace example { @@ -104,6 +105,9 @@ class FileServiceImpl : public FileService { // sleep a while to send another part. bthread_usleep(10000); + if (FLAGS_enable_progressive_timeout && i > 50) { + bthread_usleep(100000000UL); + } } return NULL; } @@ -194,6 +198,9 @@ class HttpSSEServiceImpl : public HttpSSEService { // sleep a while to send another part. bthread_usleep(10000 * 10); + if (FLAGS_enable_progressive_timeout && i > 50) { + bthread_usleep(100000000UL); + } } return NULL; } diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index b30a13476e..f4a813c87f 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -95,7 +95,8 @@ DEFINE_bool(graceful_quit_on_sigterm, false, "Register SIGTERM handle func to quit graceful"); DEFINE_bool(graceful_quit_on_sighup, false, "Register SIGHUP handle func to quit graceful"); - +DEFINE_bool(log_idle_progressive_read_close, false, + "Print log when an idle progressive read is closed"); const IdlNames idl_single_req_single_res = { "req", "res" }; const IdlNames idl_single_req_multi_res = { "req", "" }; const IdlNames idl_multi_req_single_res = { "", "res" }; @@ -174,6 +175,80 @@ class IgnoreAllRead : public ProgressiveReader { void OnEndOfMessage(const butil::Status&) {} }; +class ProgressiveTimeoutReader : public ProgressiveReader { +public: + explicit ProgressiveTimeoutReader(SocketId id, int32_t read_timeout_ms, ProgressiveReader* reader): + _socket_id(id), + _read_timeout_ms(read_timeout_ms), + _reader(reader), + _timeout_id(0), + _is_read_timeout(false) { + AddIdleReadTimeoutMonitor(); + } + + ~ProgressiveTimeoutReader() { + if(_timeout_id > 0) { + bthread_timer_del(_timeout_id); + } + } + + butil::Status OnReadOnePart(const void* data, size_t length) { + return _reader->OnReadOnePart(data, length); + } + + void OnEndOfMessage(const butil::Status& status) { + if (_is_read_timeout) { + _reader->OnEndOfMessage(butil::Status(EPROGREADTIMEOUT, "The progressive read timeout")); + } else { + _reader->OnEndOfMessage(status); + } + if(_timeout_id > 0) { + bthread_timer_del(_timeout_id); + _timeout_id = 0; + } + } + +private: + static void HandleIdleProgressiveReader(void* arg) { + if(arg == nullptr){ + LOG(ERROR) << "Controller::HandleIdleProgressiveReader arg is null."; + return; + } + ProgressiveTimeoutReader* reader = static_cast(arg); + SocketUniquePtr s; + if (Socket::Address(reader->_socket_id, &s) != 0) { + LOG(ERROR) << "not found the socket id : " << reader->_socket_id; + return; + } + auto log_idle = FLAGS_log_idle_progressive_read_close; + reader->_is_read_timeout = true; + LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << reader->_socket_id + << " progressive read timeout us : " << reader->_read_timeout_ms; + if (s->parsing_context() != NULL) { + s->parsing_context()->Destroy(); + } + s->ReleaseReferenceIfIdle(0); + } + void AddIdleReadTimeoutMonitor() { + if (_read_timeout_ms <= 0) { + return; + } + bthread_timer_add(&_timeout_id, + butil::milliseconds_from_now(_read_timeout_ms), + HandleIdleProgressiveReader, + this + ); + } + +private: + SocketId _socket_id; + int32_t _read_timeout_ms; + ProgressiveReader* _reader; + // Timer registered to trigger progressive timeout event + bthread_timer_t _timeout_id; + butil::atomic _is_read_timeout; +}; + static IgnoreAllRead* s_ignore_all_read = NULL; static pthread_once_t s_ignore_all_read_once = PTHREAD_ONCE_INIT; static void CreateIgnoreAllRead() { s_ignore_all_read = new IgnoreAllRead; } @@ -260,6 +335,7 @@ void Controller::ResetPods() { _backup_request_ms = UNSET_MAGIC_NUM; _backup_request_policy = NULL; _connect_timeout_ms = UNSET_MAGIC_NUM; + _progressive_read_timeout_ms = UNSET_MAGIC_NUM; _real_timeout_ms = UNSET_MAGIC_NUM; _deadline_us = -1; _timeout_id = 0; @@ -331,6 +407,15 @@ void Controller::Call::Reset() { stream_user_data = NULL; } +void Controller::set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms){ + if(progressive_read_timeout_ms <= 0x7fffffff){ + _progressive_read_timeout_ms = progressive_read_timeout_ms; + } else { + _progressive_read_timeout_ms = 0x7fffffff; + LOG(WARNING) << "progressive_read_timeout_seconds is limited to 0x7fffffff"; + } +} + void Controller::set_timeout_ms(int64_t timeout_ms) { if (timeout_ms <= 0x7fffffff) { _timeout_ms = timeout_ms; @@ -1028,6 +1113,7 @@ void Controller::SubmitSpan() { _span = NULL; } + void Controller::HandleSendFailed() { if (!FailedInline()) { SetFailed("Must be SetFailed() before calling HandleSendFailed()"); @@ -1543,6 +1629,10 @@ void Controller::ReadProgressiveAttachmentBy(ProgressiveReader* r) { __FUNCTION__)); } add_flag(FLAGS_PROGRESSIVE_READER); + if (progressive_read_timeout_ms() > 0) { + auto reader = new ProgressiveTimeoutReader(_rpa->GetSocketId(), _progressive_read_timeout_ms, r); + return _rpa->ReadProgressiveAttachmentBy(reader); + } return _rpa->ReadProgressiveAttachmentBy(r); } diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 69d859ea8f..7861b4cb37 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -47,7 +47,6 @@ #include "brpc/grpc.h" #include "brpc/kvmap.h" #include "brpc/rpc_dump.h" - // EAUTH is defined in MAC #ifndef EAUTH #define EAUTH ERPCAUTH @@ -163,7 +162,6 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); uint64_t log_id; std::string request_id; }; - public: Controller(); Controller(const Inheritable& parent_ctx); @@ -177,6 +175,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Set/get timeout in milliseconds for the RPC call. Use // ChannelOptions.timeout_ms on unset. + void set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms); + int32_t progressive_read_timeout_ms() const { return _progressive_read_timeout_ms; } + void set_timeout_ms(int64_t timeout_ms); int64_t timeout_ms() const { return _timeout_ms; } @@ -323,7 +324,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Make the RPC end when the HTTP response has complete headers and let // user read the remaining body by using ReadProgressiveAttachmentBy(). - void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); } + void response_will_be_read_progressively() { + add_flag(FLAGS_READ_PROGRESSIVELY); + } // Make the RPC end when the HTTP request has complete headers and let // user read the remaining body by using ReadProgressiveAttachmentBy(). void request_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); } @@ -837,6 +840,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); int32_t _timeout_ms; int32_t _connect_timeout_ms; int32_t _backup_request_ms; + int32_t _progressive_read_timeout_ms; // Priority: `_backup_request_policy' > `_backup_request_ms'. BackupRequestPolicy* _backup_request_policy; // If this rpc call has retry/backup request,this var save the real timeout for current call diff --git a/src/brpc/errno.proto b/src/brpc/errno.proto index 26ffadc201..45e0c00568 100644 --- a/src/brpc/errno.proto +++ b/src/brpc/errno.proto @@ -41,7 +41,8 @@ enum Errno { ESSL = 1016; // SSL related error EH2RUNOUTSTREAMS = 1017; // The H2 socket was run out of streams EREJECT = 1018; // The Request is rejected - + EPROGREADTIMEOUT = 1019; // The Progressive read timeout + // Errno caused by server EINTERNAL = 2001; // Internal Server Error ERESPONSE = 2002; // Bad Response diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 872c2897cc..d4dea9efb9 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -1197,6 +1197,7 @@ ParseResult ParseHttpMessage(butil::IOBuf *source, Socket *socket, LOG(FATAL) << "Fail to new HttpContext"; return MakeParseError(PARSE_ERROR_NO_RESOURCE); } + http_imsg->SetSocketId(socket->id()); // Parsing http is costly, parsing an incomplete http message from the // beginning repeatedly should be avoided, otherwise the cost may reach // O(n^2) in the worst case. Save incomplete http messages in sockets diff --git a/src/brpc/policy/http_rpc_protocol.h b/src/brpc/policy/http_rpc_protocol.h index bc8bd06593..2b2e9296ab 100644 --- a/src/brpc/policy/http_rpc_protocol.h +++ b/src/brpc/policy/http_rpc_protocol.h @@ -87,11 +87,20 @@ class HttpContext : public ReadableProgressiveAttachment , public InputMessageBase , public HttpMessage { public: + SocketId GetSocketId() override { + return _socket_id; + } + + void SetSocketId(SocketId id) { + _socket_id = id; + } + explicit HttpContext(bool read_body_progressively, HttpMethod request_method = HTTP_METHOD_GET) : InputMessageBase() , HttpMessage(read_body_progressively, request_method) - , _is_stage2(false) { + , _is_stage2(false) + , _socket_id(0) { // add one ref for Destroy butil::intrusive_ptr(this).detach(); } @@ -122,6 +131,7 @@ class HttpContext : public ReadableProgressiveAttachment private: bool _is_stage2; + SocketId _socket_id; }; // Implement functions required in protocol.h diff --git a/src/brpc/progressive_reader.h b/src/brpc/progressive_reader.h index 6f54ae68a7..860068e2e6 100644 --- a/src/brpc/progressive_reader.h +++ b/src/brpc/progressive_reader.h @@ -20,6 +20,7 @@ #define BRPC_PROGRESSIVE_READER_H #include "brpc/shared_object.h" +#include "brpc/socket.h" namespace brpc { @@ -84,6 +85,7 @@ class ReadableProgressiveAttachment : public SharedObject { // Any error occurred should destroy the reader by calling r->Destroy(). // r->Destroy() should be guaranteed to be called once and only once. virtual void ReadProgressiveAttachmentBy(ProgressiveReader* r) = 0; + virtual SocketId GetSocketId() = 0; }; } // namespace brpc