From a5cdfc1b40e2771f866f715e0b66551fa715d2ae Mon Sep 17 00:00:00 2001 From: zchuang185 Date: Tue, 25 Nov 2025 09:56:12 +0800 Subject: [PATCH 1/6] add the progressive read idle timout checker --- example/http_c++/http_client.cpp | 34 ++++++++++++++++++++ example/http_c++/http_server.cpp | 7 +++++ src/brpc/controller.cpp | 54 +++++++++++++++++++++++++++++++- src/brpc/controller.h | 25 +++++++++++++-- 4 files changed, 116 insertions(+), 4 deletions(-) diff --git a/example/http_c++/http_client.cpp b/example/http_c++/http_client.cpp index 23222dee9b..7df7461135 100644 --- a/example/http_c++/http_client.cpp +++ b/example/http_c++/http_client.cpp @@ -25,8 +25,11 @@ #include #include #include +#include "bthread/countdown_event.h" DEFINE_string(d, "", "POST this data to the http server"); +DEFINE_bool(progressive, false, "whether or not progressive read data from server"); +DEFINE_int32(progressive_read_timeout_ms, 5000, "progressive read data idle timeout in milliseconds"); DEFINE_string(load_balancer, "", "The algorithm for load balancing"); DEFINE_int32(timeout_ms, 2000, "RPC timeout in milliseconds"); DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); @@ -36,6 +39,25 @@ namespace brpc { DECLARE_bool(http_verbose); } +class PartDataReader: public brpc::ProgressiveReader { +public: + explicit PartDataReader(bthread::CountdownEvent* done): _done(done){} + + butil::Status OnReadOnePart(const void* data, size_t length) { + memcpy(_buffer, data, length); + LOG(INFO) << "data : " << _buffer << " size : " << length; + return butil::Status::OK(); + } + + void OnEndOfMessage(const butil::Status& status) { + _done->signal(); + LOG(INFO) << "progressive read data final status : " << status; + } +private: + char _buffer[1024]; + bthread::CountdownEvent* _done; +}; + int main(int argc, char* argv[]) { // Parse gflags. We recommend you to use gflags as well. GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); @@ -71,6 +93,11 @@ int main(int argc, char* argv[]) { cntl.request_attachment().append(FLAGS_d); } + if (FLAGS_progressive) { + cntl.set_progressive_read_timeout_ms(FLAGS_progressive_read_timeout_ms); + cntl.response_will_be_read_progressively(); + } + // Because `done'(last parameter) is NULL, this function waits until // the response comes back or error occurs(including timedout). channel.CallMethod(NULL, &cntl, NULL, NULL, NULL); @@ -78,6 +105,13 @@ int main(int argc, char* argv[]) { std::cerr << cntl.ErrorText() << std::endl; return -1; } + + if (FLAGS_progressive) { + bthread::CountdownEvent done(1); + cntl.ReadProgressiveAttachmentBy(new PartDataReader(&done)); + done.wait(); + LOG(INFO) << "wait client progressive read done safely"; + } // If -http_verbose is on, brpc already prints the response to stderr. if (!brpc::FLAGS_http_verbose) { std::cout << cntl.response_attachment() << std::endl; diff --git a/example/http_c++/http_server.cpp b/example/http_c++/http_server.cpp index 05c9a0ee4c..3da17ccc0c 100644 --- a/example/http_c++/http_server.cpp +++ b/example/http_c++/http_server.cpp @@ -31,6 +31,7 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " DEFINE_string(certificate, "cert.pem", "Certificate file path to enable SSL"); DEFINE_string(private_key, "key.pem", "Private key file path to enable SSL"); DEFINE_string(ciphers, "", "Cipher suite used for SSL connections"); +DEFINE_bool(enable_progressive_timeout, false, "whether or not trigger progressive write attachement data timeout"); namespace example { @@ -104,6 +105,9 @@ class FileServiceImpl : public FileService { // sleep a while to send another part. bthread_usleep(10000); + if (FLAGS_enable_progressive_timout && i > 50) { + bthread_usleep(100000000UL); + } } return NULL; } @@ -194,6 +198,9 @@ class HttpSSEServiceImpl : public HttpSSEService { // sleep a while to send another part. bthread_usleep(10000 * 10); + if (FLAGS_enable_progressive_timout && i > 50) { + bthread_usleep(100000000UL); + } } return NULL; } diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index d4dbab951b..4c15385a0c 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -95,7 +95,8 @@ DEFINE_bool(graceful_quit_on_sigterm, false, "Register SIGTERM handle func to quit graceful"); DEFINE_bool(graceful_quit_on_sighup, false, "Register SIGHUP handle func to quit graceful"); - +DEFINE_bool(log_idle_progressive_read_close, false, + "Print log when an idle progressive read is closed"); const IdlNames idl_single_req_single_res = { "req", "res" }; const IdlNames idl_single_req_multi_res = { "req", "" }; const IdlNames idl_multi_req_single_res = { "", "res" }; @@ -331,6 +332,15 @@ void Controller::Call::Reset() { stream_user_data = NULL; } +void Controller::set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms){ + if(progressive_read_timeout_ms <= 0x7fffffff){ + _progressive_read_timeout_ms = progressive_read_timeout_ms; + } else { + _progressive_read_timeout_ms = 0x7fffffff; + LOG(WARNING) << "progressive_read_timeout_seconds is limited to 0x7fffffff"; + } +} + void Controller::set_timeout_ms(int64_t timeout_ms) { if (timeout_ms <= 0x7fffffff) { _timeout_ms = timeout_ms; @@ -1027,6 +1037,43 @@ void Controller::SubmitSpan() { _span = NULL; } +void* Controller::HandleIdleProgressiveReader(void* arg) { + auto* cntl = static_cast(arg); + const uint64_t CHECK_INTERVAL_US = 1000000UL; + auto log_idle = FLAGS_log_idle_progressive_read_close; + std::vector remove_socket_ids; + while (bthread_usleep(CHECK_INTERVAL_US) == 0) { + // TODO: this is not efficient for a lot of connections(>100K) + auto socketIds = cntl->_checking_progressive_read_fds; + int64_t progressive_read_timeout_us = cntl->_progressive_read_timeout_ms * 1000; + for (auto socket_id : socketIds){ + SocketUniquePtr s; + if (Socket::Address(socket_id, &s) == 0) { + auto cpuwide_time_us = butil::cpuwide_time_us(); + const int64_t last_active_us = s->last_active_time_us(); + if (cpuwide_time_us - last_active_us <= progressive_read_timeout_us) { + continue; + } + LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << socket_id + << " progressive read timeout us : " << progressive_read_timeout_us + << " progressive read idle duration : " << cpuwide_time_us - last_active_us; + if (s->parsing_context() != NULL) { + s->parsing_context()->Destroy(); + } + s->ReleaseReferenceIfIdle(0xffffffff); + remove_socket_ids.push_back(socket_id); + } else { + LOG(ERROR) << "not found the socket id : " << socket_id; + remove_socket_ids.push_back(socket_id); + } + } + for (auto remove_socket_id : remove_socket_ids) { + socketIds.erase(remove_socket_id); + } + } + return NULL; +} + void Controller::HandleSendFailed() { if (!FailedInline()) { SetFailed("Must be SetFailed() before calling HandleSendFailed()"); @@ -1179,6 +1226,11 @@ void Controller::IssueRPC(int64_t start_realtime_us) { // Tag the socket so that when the response comes back, the parser will // stop before reading all body. _current_call.sending_sock->read_will_be_progressive(_connection_type); + auto socket_id = _current_call.sending_sock->id(); + if (_progressive_read_timeout_ms > 0 && _checking_progressive_read_fds.seek(socket_id) == NULL) { + _checking_progressive_read_fds.insert(socket_id); + LOG(INFO) << "insert the progressive read fd : " << socket_id << " socket fds size : " << _checking_progressive_read_fds.size(); + } } // Handle authentication diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 69d859ea8f..190b2fba57 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -47,7 +47,6 @@ #include "brpc/grpc.h" #include "brpc/kvmap.h" #include "brpc/rpc_dump.h" - // EAUTH is defined in MAC #ifndef EAUTH #define EAUTH ERPCAUTH @@ -163,7 +162,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); uint64_t log_id; std::string request_id; }; - + static void* HandleIdleProgressiveReader(void* arg); public: Controller(); Controller(const Inheritable& parent_ctx); @@ -177,6 +176,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Set/get timeout in milliseconds for the RPC call. Use // ChannelOptions.timeout_ms on unset. + void set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms); + int32_t progressive_read_timeout_ms() const { return _progressive_read_timeout_ms; } + void set_timeout_ms(int64_t timeout_ms); int64_t timeout_ms() const { return _timeout_ms; } @@ -323,7 +325,19 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Make the RPC end when the HTTP response has complete headers and let // user read the remaining body by using ReadProgressiveAttachmentBy(). - void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); } + void response_will_be_read_progressively() { + if(has_flag(FLAGS_READ_PROGRESSIVELY) && _progressive_read_idle_tid > 0) { + return; + } + bthread_attr_t tmp = BTHREAD_ATTR_NORMAL; + tmp.tag = _bthread_tag; + if(bthread_start_background(&_progressive_read_idle_tid, &tmp, HandleIdleProgressiveReader, this) != 0){ + LOG(FATAL) << "Failed to start controller bthread id : " << _progressive_read_idle_tid; + } + LOG(INFO) << "Start Response progressive reader idle checker close idle_tid : " << _progressive_read_idle_tid + << " _bthread_tag : " << _bthread_tag; + add_flag(FLAGS_READ_PROGRESSIVELY); + } // Make the RPC end when the HTTP request has complete headers and let // user read the remaining body by using ReadProgressiveAttachmentBy(). void request_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); } @@ -837,6 +851,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); int32_t _timeout_ms; int32_t _connect_timeout_ms; int32_t _backup_request_ms; + int32_t _progressive_read_timeout_ms; + butil::FlatSet _checking_progressive_read_fds; + bthread_t _progressive_read_idle_tid; + // Controller belongs to this tag + bthread_tag_t _bthread_tag = bthread_self_tag(); // Priority: `_backup_request_policy' > `_backup_request_ms'. BackupRequestPolicy* _backup_request_policy; // If this rpc call has retry/backup request,this var save the real timeout for current call From ae7bc0f0b7fd3e6f2b869c7ec4a7ac5f5c2674f0 Mon Sep 17 00:00:00 2001 From: zchuang185 Date: Tue, 25 Nov 2025 10:10:08 +0800 Subject: [PATCH 2/6] modify the bug for http c++ example --- example/http_c++/http_server.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/example/http_c++/http_server.cpp b/example/http_c++/http_server.cpp index 3da17ccc0c..3cc4c63f86 100644 --- a/example/http_c++/http_server.cpp +++ b/example/http_c++/http_server.cpp @@ -105,7 +105,7 @@ class FileServiceImpl : public FileService { // sleep a while to send another part. bthread_usleep(10000); - if (FLAGS_enable_progressive_timout && i > 50) { + if (FLAGS_enable_progressive_timeout && i > 50) { bthread_usleep(100000000UL); } } @@ -198,7 +198,7 @@ class HttpSSEServiceImpl : public HttpSSEService { // sleep a while to send another part. bthread_usleep(10000 * 10); - if (FLAGS_enable_progressive_timout && i > 50) { + if (FLAGS_enable_progressive_timeout && i > 50) { bthread_usleep(100000000UL); } } From f08f266fb265384ca89dfba3eb9edb3d27a4bde8 Mon Sep 17 00:00:00 2001 From: zchuang185 Date: Fri, 28 Nov 2025 16:41:10 +0800 Subject: [PATCH 3/6] change the timeout checker bthread to timer bthread --- src/brpc/controller.cpp | 129 ++++++++++++++++++++++++++++++---------- src/brpc/controller.h | 16 +---- 2 files changed, 101 insertions(+), 44 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 4c15385a0c..0c53970371 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -175,6 +175,61 @@ class IgnoreAllRead : public ProgressiveReader { void OnEndOfMessage(const butil::Status&) {} }; +class ProgressiveTimeoutRead : public ProgressiveReader { +public: + explicit ProgressiveTimeoutRead(Controller* cntl, ProgressiveReader* reader): + _cntl(cntl), _reader(reader), _timeout_id(0), _latest_add_timer_ms(0), _add_timer_delay_ms(1000) { + AddIdleReadTimeoutMonitor(); + } + butil::Status OnReadOnePart(const void* data, size_t length) { + auto status = _reader->OnReadOnePart(data, length); + AddIdleReadTimeoutMonitor(); + return status; + } + void OnEndOfMessage(const butil::Status& status) { + _reader->OnEndOfMessage(status); + if(_timeout_id > 0) { + bthread_timer_del(_timeout_id); + } + } +private: + void AddToTimer() { + if (_timeout_id > 0) { + bthread_timer_del(_timeout_id); + } + bthread_timer_add(&_timeout_id, + butil::milliseconds_from_now(_cntl->progressive_read_timeout_ms()), + Controller::HandleIdleProgressiveReader, + _cntl + ); + } + + void AddIdleReadTimeoutMonitor() { + if (_cntl->progressive_read_timeout_ms() <= 0) { + return; + } + if(_cntl->progressive_read_timeout_ms() < _add_timer_delay_ms) { + AddToTimer(); + return; + } + auto current_ms = butil::cpuwide_time_ms(); + if (current_ms - _latest_add_timer_ms < _add_timer_delay_ms) { + return; + } + _latest_add_timer_ms = current_ms; + AddToTimer(); + } + +private: + Controller* _cntl; + ProgressiveReader* _reader; + // Timer registered to trigger progressive timeout event + bthread_timer_t _timeout_id; + int64_t _latest_add_timer_ms; + // avoid frequently add timer for idle handler + int32_t _add_timer_delay_ms; +}; + static IgnoreAllRead* s_ignore_all_read = NULL; static pthread_once_t s_ignore_all_read_once = PTHREAD_ONCE_INIT; static void CreateIgnoreAllRead() { s_ignore_all_read = new IgnoreAllRead; } @@ -1037,41 +1092,52 @@ void Controller::SubmitSpan() { _span = NULL; } -void* Controller::HandleIdleProgressiveReader(void* arg) { +void Controller::HandleIdleProgressiveReader(void* arg) { + if(arg == nullptr){ + LOG(ERROR) << "Controller::HandleIdleProgressiveReader arg is null."; + return; + } auto* cntl = static_cast(arg); - const uint64_t CHECK_INTERVAL_US = 1000000UL; auto log_idle = FLAGS_log_idle_progressive_read_close; + int64_t progressive_read_timeout_us = cntl->_progressive_read_timeout_ms * 1000; std::vector remove_socket_ids; - while (bthread_usleep(CHECK_INTERVAL_US) == 0) { - // TODO: this is not efficient for a lot of connections(>100K) - auto socketIds = cntl->_checking_progressive_read_fds; - int64_t progressive_read_timeout_us = cntl->_progressive_read_timeout_ms * 1000; - for (auto socket_id : socketIds){ - SocketUniquePtr s; - if (Socket::Address(socket_id, &s) == 0) { - auto cpuwide_time_us = butil::cpuwide_time_us(); - const int64_t last_active_us = s->last_active_time_us(); - if (cpuwide_time_us - last_active_us <= progressive_read_timeout_us) { - continue; - } - LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << socket_id - << " progressive read timeout us : " << progressive_read_timeout_us - << " progressive read idle duration : " << cpuwide_time_us - last_active_us; - if (s->parsing_context() != NULL) { - s->parsing_context()->Destroy(); - } - s->ReleaseReferenceIfIdle(0xffffffff); - remove_socket_ids.push_back(socket_id); - } else { - LOG(ERROR) << "not found the socket id : " << socket_id; - remove_socket_ids.push_back(socket_id); + butil::AutoLock guard(cntl->_progressive_read_lock); + auto socketIds = cntl->_checking_progressive_read_fds; + for (auto socket_id : socketIds){ + SocketUniquePtr s; + if (Socket::Address(socket_id, &s) == 0) { + int64_t pre_idle_duration_us = 0; + int64_t idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us(); + while (progressive_read_timeout_us > idle_duration_us && idle_duration_us > pre_idle_duration_us) { + auto sleep_ms = (progressive_read_timeout_us - idle_duration_us) / 1000; + bthread_usleep(sleep_ms > 0 ? sleep_ms : 1); + pre_idle_duration_us = idle_duration_us; + idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us(); } - } - for (auto remove_socket_id : remove_socket_ids) { - socketIds.erase(remove_socket_id); + if (idle_duration_us <= pre_idle_duration_us) { + LOG_IF(INFO, log_idle) << "stop pgressive read timeout checking process!" + << " progressive_read_timeout_us : " << progressive_read_timeout_us + << " idle_duration_us : " << idle_duration_us + << " pre_idle_duration_us : " << pre_idle_duration_us; + return; + } + LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << socket_id + << " progressive read timeout us : " << progressive_read_timeout_us + << " progressive read idle duration : " << idle_duration_us; + if (s->parsing_context() != NULL) { + s->parsing_context()->Destroy(); + } + s->ReleaseReferenceIfIdle(0); + cntl->CloseConnection("progressive read timeout"); + remove_socket_ids.push_back(socket_id); + } else { + LOG(ERROR) << "not found the socket id : " << socket_id; + remove_socket_ids.push_back(socket_id); } } - return NULL; + for (auto remove_socket_id : remove_socket_ids) { + socketIds.erase(remove_socket_id); + } } void Controller::HandleSendFailed() { @@ -1227,9 +1293,9 @@ void Controller::IssueRPC(int64_t start_realtime_us) { // stop before reading all body. _current_call.sending_sock->read_will_be_progressive(_connection_type); auto socket_id = _current_call.sending_sock->id(); + butil::AutoLock guard(_progressive_read_lock); if (_progressive_read_timeout_ms > 0 && _checking_progressive_read_fds.seek(socket_id) == NULL) { _checking_progressive_read_fds.insert(socket_id); - LOG(INFO) << "insert the progressive read fd : " << socket_id << " socket fds size : " << _checking_progressive_read_fds.size(); } } @@ -1594,6 +1660,9 @@ void Controller::ReadProgressiveAttachmentBy(ProgressiveReader* r) { __FUNCTION__)); } add_flag(FLAGS_PROGRESSIVE_READER); + if (progressive_read_timeout_ms() > 0) { + return _rpa->ReadProgressiveAttachmentBy(new ProgressiveTimeoutRead(this, r)); + } return _rpa->ReadProgressiveAttachmentBy(r); } diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 190b2fba57..8c82a41496 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -162,7 +162,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); uint64_t log_id; std::string request_id; }; - static void* HandleIdleProgressiveReader(void* arg); + static void HandleIdleProgressiveReader(void* arg); public: Controller(); Controller(const Inheritable& parent_ctx); @@ -326,16 +326,6 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Make the RPC end when the HTTP response has complete headers and let // user read the remaining body by using ReadProgressiveAttachmentBy(). void response_will_be_read_progressively() { - if(has_flag(FLAGS_READ_PROGRESSIVELY) && _progressive_read_idle_tid > 0) { - return; - } - bthread_attr_t tmp = BTHREAD_ATTR_NORMAL; - tmp.tag = _bthread_tag; - if(bthread_start_background(&_progressive_read_idle_tid, &tmp, HandleIdleProgressiveReader, this) != 0){ - LOG(FATAL) << "Failed to start controller bthread id : " << _progressive_read_idle_tid; - } - LOG(INFO) << "Start Response progressive reader idle checker close idle_tid : " << _progressive_read_idle_tid - << " _bthread_tag : " << _bthread_tag; add_flag(FLAGS_READ_PROGRESSIVELY); } // Make the RPC end when the HTTP request has complete headers and let @@ -853,9 +843,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); int32_t _backup_request_ms; int32_t _progressive_read_timeout_ms; butil::FlatSet _checking_progressive_read_fds; - bthread_t _progressive_read_idle_tid; - // Controller belongs to this tag - bthread_tag_t _bthread_tag = bthread_self_tag(); + mutable butil::Lock _progressive_read_lock; // Priority: `_backup_request_policy' > `_backup_request_ms'. BackupRequestPolicy* _backup_request_policy; // If this rpc call has retry/backup request,this var save the real timeout for current call From 9bd8016bd084b02d01280957562dbd850e330133 Mon Sep 17 00:00:00 2001 From: zchuang185 Date: Wed, 3 Dec 2025 15:33:53 +0800 Subject: [PATCH 4/6] refine ProgressiveReadTimeoutReader class hold SocketId and read_timeout_ms fields --- src/brpc/controller.cpp | 117 ++++++++++++++------------ src/brpc/controller.h | 2 - src/brpc/policy/http_rpc_protocol.cpp | 1 + src/brpc/policy/http_rpc_protocol.h | 9 ++ src/brpc/progressive_reader.h | 3 + 5 files changed, 77 insertions(+), 55 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 09acbd133c..a6f026761b 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -175,10 +175,16 @@ class IgnoreAllRead : public ProgressiveReader { void OnEndOfMessage(const butil::Status&) {} }; -class ProgressiveTimeoutRead : public ProgressiveReader { +class ProgressiveTimeoutReader : public ProgressiveReader { public: - explicit ProgressiveTimeoutRead(Controller* cntl, ProgressiveReader* reader): - _cntl(cntl), _reader(reader), _timeout_id(0), _latest_add_timer_ms(0), _add_timer_delay_ms(1000) { + explicit ProgressiveTimeoutReader(SocketId id, int32_t read_timeout_ms, ProgressiveReader* reader): + _socket_id(id), + _read_timeout_ms(read_timeout_ms), + _reader(reader), + _timeout_id(0), + _latest_add_timer_ms(0), + _add_timer_delay_ms(1000), + _is_read_timeout(false) { AddIdleReadTimeoutMonitor(); } butil::Status OnReadOnePart(const void* data, size_t length) { @@ -187,28 +193,45 @@ class ProgressiveTimeoutRead : public ProgressiveReader { return status; } void OnEndOfMessage(const butil::Status& status) { - _reader->OnEndOfMessage(status); + if (_is_read_timeout) { + _reader->OnEndOfMessage(butil::Status(ECONNRESET, "The progressive read timeout")); + } else { + _reader->OnEndOfMessage(status); + } if(_timeout_id > 0) { bthread_timer_del(_timeout_id); } } + + SocketId GetSocketId() { + return _socket_id; + } + + int32_t read_timeout_ms() { + return _read_timeout_ms; + } + + void set_read_timeout(bool read_timeout = true) { + _is_read_timeout = read_timeout; + } + private: void AddToTimer() { if (_timeout_id > 0) { bthread_timer_del(_timeout_id); } bthread_timer_add(&_timeout_id, - butil::milliseconds_from_now(_cntl->progressive_read_timeout_ms()), + butil::milliseconds_from_now(_read_timeout_ms), Controller::HandleIdleProgressiveReader, - _cntl + this ); } void AddIdleReadTimeoutMonitor() { - if (_cntl->progressive_read_timeout_ms() <= 0) { + if (_read_timeout_ms <= 0) { return; } - if(_cntl->progressive_read_timeout_ms() < _add_timer_delay_ms) { + if(_read_timeout_ms < _add_timer_delay_ms) { AddToTimer(); return; } @@ -221,13 +244,15 @@ class ProgressiveTimeoutRead : public ProgressiveReader { } private: - Controller* _cntl; + SocketId _socket_id; + int32_t _read_timeout_ms; ProgressiveReader* _reader; // Timer registered to trigger progressive timeout event bthread_timer_t _timeout_id; int64_t _latest_add_timer_ms; // avoid frequently add timer for idle handler int32_t _add_timer_delay_ms; + bool _is_read_timeout; }; static IgnoreAllRead* s_ignore_all_read = NULL; @@ -1098,47 +1123,37 @@ void Controller::HandleIdleProgressiveReader(void* arg) { LOG(ERROR) << "Controller::HandleIdleProgressiveReader arg is null."; return; } - auto* cntl = static_cast(arg); + ProgressiveTimeoutReader* reader = static_cast(arg); + SocketUniquePtr s; + if (Socket::Address(reader->GetSocketId(), &s) != 0) { + LOG(ERROR) << "not found the socket id : " << reader->GetSocketId(); + return; + } auto log_idle = FLAGS_log_idle_progressive_read_close; - int64_t progressive_read_timeout_us = cntl->_progressive_read_timeout_ms * 1000; - std::vector remove_socket_ids; - butil::AutoLock guard(cntl->_progressive_read_lock); - auto socketIds = cntl->_checking_progressive_read_fds; - for (auto socket_id : socketIds){ - SocketUniquePtr s; - if (Socket::Address(socket_id, &s) == 0) { - int64_t pre_idle_duration_us = 0; - int64_t idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us(); - while (progressive_read_timeout_us > idle_duration_us && idle_duration_us > pre_idle_duration_us) { - auto sleep_ms = (progressive_read_timeout_us - idle_duration_us) / 1000; - bthread_usleep(sleep_ms > 0 ? sleep_ms : 1); - pre_idle_duration_us = idle_duration_us; - idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us(); - } - if (idle_duration_us <= pre_idle_duration_us) { - LOG_IF(INFO, log_idle) << "stop pgressive read timeout checking process!" - << " progressive_read_timeout_us : " << progressive_read_timeout_us - << " idle_duration_us : " << idle_duration_us - << " pre_idle_duration_us : " << pre_idle_duration_us; - return; - } - LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << socket_id - << " progressive read timeout us : " << progressive_read_timeout_us - << " progressive read idle duration : " << idle_duration_us; - if (s->parsing_context() != NULL) { - s->parsing_context()->Destroy(); - } - s->ReleaseReferenceIfIdle(0); - cntl->CloseConnection("progressive read timeout"); - remove_socket_ids.push_back(socket_id); - } else { - LOG(ERROR) << "not found the socket id : " << socket_id; - remove_socket_ids.push_back(socket_id); - } + int64_t progressive_read_timeout_us = reader->read_timeout_ms() * 1000; + int64_t pre_idle_duration_us = 0; + int64_t idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us(); + while (progressive_read_timeout_us > idle_duration_us && idle_duration_us > pre_idle_duration_us) { + auto sleep_ms = (progressive_read_timeout_us - idle_duration_us) / 1000; + bthread_usleep(sleep_ms > 0 ? sleep_ms : 1); + pre_idle_duration_us = idle_duration_us; + idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us(); + } + if (idle_duration_us <= pre_idle_duration_us) { + LOG_IF(INFO, log_idle) << "stop progressive read timeout checking process!" + << " progressive_read_timeout_us : " << progressive_read_timeout_us + << " idle_duration_us : " << idle_duration_us + << " pre_idle_duration_us : " << pre_idle_duration_us; + return; } - for (auto remove_socket_id : remove_socket_ids) { - socketIds.erase(remove_socket_id); + reader->set_read_timeout(); + LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << reader->GetSocketId() + << " progressive read timeout us : " << progressive_read_timeout_us + << " progressive read idle duration : " << idle_duration_us; + if (s->parsing_context() != NULL) { + s->parsing_context()->Destroy(); } + s->ReleaseReferenceIfIdle(0); } void Controller::HandleSendFailed() { @@ -1293,11 +1308,6 @@ void Controller::IssueRPC(int64_t start_realtime_us) { // Tag the socket so that when the response comes back, the parser will // stop before reading all body. _current_call.sending_sock->read_will_be_progressive(_connection_type); - auto socket_id = _current_call.sending_sock->id(); - butil::AutoLock guard(_progressive_read_lock); - if (_progressive_read_timeout_ms > 0 && _checking_progressive_read_fds.seek(socket_id) == NULL) { - _checking_progressive_read_fds.insert(socket_id); - } } // Handle authentication @@ -1662,7 +1672,8 @@ void Controller::ReadProgressiveAttachmentBy(ProgressiveReader* r) { } add_flag(FLAGS_PROGRESSIVE_READER); if (progressive_read_timeout_ms() > 0) { - return _rpa->ReadProgressiveAttachmentBy(new ProgressiveTimeoutRead(this, r)); + auto reader = new ProgressiveTimeoutReader(_rpa->GetSocketId(), _progressive_read_timeout_ms, r); + return _rpa->ReadProgressiveAttachmentBy(reader); } return _rpa->ReadProgressiveAttachmentBy(r); } diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 8c82a41496..f2d9f01fd3 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -842,8 +842,6 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); int32_t _connect_timeout_ms; int32_t _backup_request_ms; int32_t _progressive_read_timeout_ms; - butil::FlatSet _checking_progressive_read_fds; - mutable butil::Lock _progressive_read_lock; // Priority: `_backup_request_policy' > `_backup_request_ms'. BackupRequestPolicy* _backup_request_policy; // If this rpc call has retry/backup request,this var save the real timeout for current call diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 872c2897cc..d4dea9efb9 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -1197,6 +1197,7 @@ ParseResult ParseHttpMessage(butil::IOBuf *source, Socket *socket, LOG(FATAL) << "Fail to new HttpContext"; return MakeParseError(PARSE_ERROR_NO_RESOURCE); } + http_imsg->SetSocketId(socket->id()); // Parsing http is costly, parsing an incomplete http message from the // beginning repeatedly should be avoided, otherwise the cost may reach // O(n^2) in the worst case. Save incomplete http messages in sockets diff --git a/src/brpc/policy/http_rpc_protocol.h b/src/brpc/policy/http_rpc_protocol.h index bc8bd06593..ee0c7924c6 100644 --- a/src/brpc/policy/http_rpc_protocol.h +++ b/src/brpc/policy/http_rpc_protocol.h @@ -87,6 +87,14 @@ class HttpContext : public ReadableProgressiveAttachment , public InputMessageBase , public HttpMessage { public: + SocketId GetSocketId() override { + return _socket_id; + } + + void SetSocketId(SocketId id) override { + _socket_id = id; + } + explicit HttpContext(bool read_body_progressively, HttpMethod request_method = HTTP_METHOD_GET) : InputMessageBase() @@ -122,6 +130,7 @@ class HttpContext : public ReadableProgressiveAttachment private: bool _is_stage2; + SocketId _socket_id; }; // Implement functions required in protocol.h diff --git a/src/brpc/progressive_reader.h b/src/brpc/progressive_reader.h index 6f54ae68a7..799d996fdf 100644 --- a/src/brpc/progressive_reader.h +++ b/src/brpc/progressive_reader.h @@ -20,6 +20,7 @@ #define BRPC_PROGRESSIVE_READER_H #include "brpc/shared_object.h" +#include "brpc/socket.h" namespace brpc { @@ -84,6 +85,8 @@ class ReadableProgressiveAttachment : public SharedObject { // Any error occurred should destroy the reader by calling r->Destroy(). // r->Destroy() should be guaranteed to be called once and only once. virtual void ReadProgressiveAttachmentBy(ProgressiveReader* r) = 0; + virtual SocketId GetSocketId() = 0; + virtual void SetSocketId(SocketId id) = 0; }; } // namespace brpc From 8830eab09fcb8b25162c06b0d8c16106e6edaf5c Mon Sep 17 00:00:00 2001 From: Chuang Zhang Date: Sat, 6 Dec 2025 00:10:51 +0800 Subject: [PATCH 5/6] Prog read timeout dev merge (#4) * change the timeout checker bthread to timer bthread * refine ProgressiveReadTimeoutReader class hold SocketId and read_timeout_ms fields * refine socektId access method, change HandleIdleProgressiveReader belong and logic --- src/brpc/controller.cpp | 112 +++++++++------------------- src/brpc/controller.h | 1 - src/brpc/policy/http_rpc_protocol.h | 5 +- src/brpc/progressive_reader.h | 1 - 4 files changed, 38 insertions(+), 81 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index a6f026761b..84d979d5ae 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -182,16 +182,20 @@ class ProgressiveTimeoutReader : public ProgressiveReader { _read_timeout_ms(read_timeout_ms), _reader(reader), _timeout_id(0), - _latest_add_timer_ms(0), - _add_timer_delay_ms(1000), _is_read_timeout(false) { AddIdleReadTimeoutMonitor(); } + + ~ProgressiveTimeoutReader() { + if(_timeout_id > 0) { + bthread_timer_del(_timeout_id); + } + } + butil::Status OnReadOnePart(const void* data, size_t length) { - auto status = _reader->OnReadOnePart(data, length); - AddIdleReadTimeoutMonitor(); - return status; + return _reader->OnReadOnePart(data, length); } + void OnEndOfMessage(const butil::Status& status) { if (_is_read_timeout) { _reader->OnEndOfMessage(butil::Status(ECONNRESET, "The progressive read timeout")); @@ -200,47 +204,40 @@ class ProgressiveTimeoutReader : public ProgressiveReader { } if(_timeout_id > 0) { bthread_timer_del(_timeout_id); + _timeout_id = 0; } } - SocketId GetSocketId() { - return _socket_id; - } - - int32_t read_timeout_ms() { - return _read_timeout_ms; - } - - void set_read_timeout(bool read_timeout = true) { - _is_read_timeout = read_timeout; - } - private: - void AddToTimer() { - if (_timeout_id > 0) { - bthread_timer_del(_timeout_id); - } - bthread_timer_add(&_timeout_id, - butil::milliseconds_from_now(_read_timeout_ms), - Controller::HandleIdleProgressiveReader, - this - ); - } - - void AddIdleReadTimeoutMonitor() { - if (_read_timeout_ms <= 0) { + static void HandleIdleProgressiveReader(void* arg) { + if(arg == nullptr){ + LOG(ERROR) << "Controller::HandleIdleProgressiveReader arg is null."; return; } - if(_read_timeout_ms < _add_timer_delay_ms) { - AddToTimer(); + ProgressiveTimeoutReader* reader = static_cast(arg); + SocketUniquePtr s; + if (Socket::Address(reader->_socket_id, &s) != 0) { + LOG(ERROR) << "not found the socket id : " << reader->_socket_id; return; } - auto current_ms = butil::cpuwide_time_ms(); - if (current_ms - _latest_add_timer_ms < _add_timer_delay_ms) { + auto log_idle = FLAGS_log_idle_progressive_read_close; + reader->_is_read_timeout = true; + LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << reader->_socket_id + << " progressive read timeout us : " << reader->_read_timeout_ms; + if (s->parsing_context() != NULL) { + s->parsing_context()->Destroy(); + } + s->ReleaseReferenceIfIdle(0); + } + void AddIdleReadTimeoutMonitor() { + if (_read_timeout_ms <= 0) { return; } - _latest_add_timer_ms = current_ms; - AddToTimer(); + bthread_timer_add(&_timeout_id, + butil::milliseconds_from_now(_read_timeout_ms), + HandleIdleProgressiveReader, + this + ); } private: @@ -249,10 +246,7 @@ class ProgressiveTimeoutReader : public ProgressiveReader { ProgressiveReader* _reader; // Timer registered to trigger progressive timeout event bthread_timer_t _timeout_id; - int64_t _latest_add_timer_ms; - // avoid frequently add timer for idle handler - int32_t _add_timer_delay_ms; - bool _is_read_timeout; + butil::atomic _is_read_timeout; }; static IgnoreAllRead* s_ignore_all_read = NULL; @@ -341,6 +335,7 @@ void Controller::ResetPods() { _backup_request_ms = UNSET_MAGIC_NUM; _backup_request_policy = NULL; _connect_timeout_ms = UNSET_MAGIC_NUM; + _progressive_read_timeout_ms = UNSET_MAGIC_NUM; _real_timeout_ms = UNSET_MAGIC_NUM; _deadline_us = -1; _timeout_id = 0; @@ -1118,43 +1113,6 @@ void Controller::SubmitSpan() { _span = NULL; } -void Controller::HandleIdleProgressiveReader(void* arg) { - if(arg == nullptr){ - LOG(ERROR) << "Controller::HandleIdleProgressiveReader arg is null."; - return; - } - ProgressiveTimeoutReader* reader = static_cast(arg); - SocketUniquePtr s; - if (Socket::Address(reader->GetSocketId(), &s) != 0) { - LOG(ERROR) << "not found the socket id : " << reader->GetSocketId(); - return; - } - auto log_idle = FLAGS_log_idle_progressive_read_close; - int64_t progressive_read_timeout_us = reader->read_timeout_ms() * 1000; - int64_t pre_idle_duration_us = 0; - int64_t idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us(); - while (progressive_read_timeout_us > idle_duration_us && idle_duration_us > pre_idle_duration_us) { - auto sleep_ms = (progressive_read_timeout_us - idle_duration_us) / 1000; - bthread_usleep(sleep_ms > 0 ? sleep_ms : 1); - pre_idle_duration_us = idle_duration_us; - idle_duration_us = butil::cpuwide_time_us() - s->last_active_time_us(); - } - if (idle_duration_us <= pre_idle_duration_us) { - LOG_IF(INFO, log_idle) << "stop progressive read timeout checking process!" - << " progressive_read_timeout_us : " << progressive_read_timeout_us - << " idle_duration_us : " << idle_duration_us - << " pre_idle_duration_us : " << pre_idle_duration_us; - return; - } - reader->set_read_timeout(); - LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << reader->GetSocketId() - << " progressive read timeout us : " << progressive_read_timeout_us - << " progressive read idle duration : " << idle_duration_us; - if (s->parsing_context() != NULL) { - s->parsing_context()->Destroy(); - } - s->ReleaseReferenceIfIdle(0); -} void Controller::HandleSendFailed() { if (!FailedInline()) { diff --git a/src/brpc/controller.h b/src/brpc/controller.h index f2d9f01fd3..7861b4cb37 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -162,7 +162,6 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); uint64_t log_id; std::string request_id; }; - static void HandleIdleProgressiveReader(void* arg); public: Controller(); Controller(const Inheritable& parent_ctx); diff --git a/src/brpc/policy/http_rpc_protocol.h b/src/brpc/policy/http_rpc_protocol.h index ee0c7924c6..2b2e9296ab 100644 --- a/src/brpc/policy/http_rpc_protocol.h +++ b/src/brpc/policy/http_rpc_protocol.h @@ -91,7 +91,7 @@ class HttpContext : public ReadableProgressiveAttachment return _socket_id; } - void SetSocketId(SocketId id) override { + void SetSocketId(SocketId id) { _socket_id = id; } @@ -99,7 +99,8 @@ class HttpContext : public ReadableProgressiveAttachment HttpMethod request_method = HTTP_METHOD_GET) : InputMessageBase() , HttpMessage(read_body_progressively, request_method) - , _is_stage2(false) { + , _is_stage2(false) + , _socket_id(0) { // add one ref for Destroy butil::intrusive_ptr(this).detach(); } diff --git a/src/brpc/progressive_reader.h b/src/brpc/progressive_reader.h index 799d996fdf..860068e2e6 100644 --- a/src/brpc/progressive_reader.h +++ b/src/brpc/progressive_reader.h @@ -86,7 +86,6 @@ class ReadableProgressiveAttachment : public SharedObject { // r->Destroy() should be guaranteed to be called once and only once. virtual void ReadProgressiveAttachmentBy(ProgressiveReader* r) = 0; virtual SocketId GetSocketId() = 0; - virtual void SetSocketId(SocketId id) = 0; }; } // namespace brpc From 6f301da89761400ee86e4c1ccbf00c214f04840d Mon Sep 17 00:00:00 2001 From: Chuang Zhang Date: Wed, 10 Dec 2025 09:33:51 +0800 Subject: [PATCH 6/6] Prog read timeout dev (#5) * change the timeout checker bthread to timer bthread * refine ProgressiveReadTimeoutReader class hold SocketId and read_timeout_ms fields * refine socektId access method, change HandleIdleProgressiveReader belong and logic --- src/brpc/controller.cpp | 2 +- src/brpc/errno.proto | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 84d979d5ae..f4a813c87f 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -198,7 +198,7 @@ class ProgressiveTimeoutReader : public ProgressiveReader { void OnEndOfMessage(const butil::Status& status) { if (_is_read_timeout) { - _reader->OnEndOfMessage(butil::Status(ECONNRESET, "The progressive read timeout")); + _reader->OnEndOfMessage(butil::Status(EPROGREADTIMEOUT, "The progressive read timeout")); } else { _reader->OnEndOfMessage(status); } diff --git a/src/brpc/errno.proto b/src/brpc/errno.proto index 26ffadc201..45e0c00568 100644 --- a/src/brpc/errno.proto +++ b/src/brpc/errno.proto @@ -41,7 +41,8 @@ enum Errno { ESSL = 1016; // SSL related error EH2RUNOUTSTREAMS = 1017; // The H2 socket was run out of streams EREJECT = 1018; // The Request is rejected - + EPROGREADTIMEOUT = 1019; // The Progressive read timeout + // Errno caused by server EINTERNAL = 2001; // Internal Server Error ERESPONSE = 2002; // Bad Response