From 98baafd88f39e1a175220af954ac14e5a1cd8765 Mon Sep 17 00:00:00 2001 From: Pascal Bach Date: Wed, 19 Nov 2025 11:59:36 +0100 Subject: [PATCH 1/2] Add support for Server Sent Events (SSE) This extension allows to handle SSE events by invoking a callback whenever an event is received. --- README.md | 5 +- cpr/CMakeLists.txt | 1 + cpr/session.cpp | 11 +- cpr/sse.cpp | 115 +++++++++++++++++ cpr/util.cpp | 5 + include/cpr/cpr.h | 1 + include/cpr/error.h | 128 +++++++++---------- include/cpr/session.h | 4 + include/cpr/sse.h | 101 +++++++++++++++ include/cpr/util.h | 2 + test/CMakeLists.txt | 1 + test/sse_tests.cpp | 290 ++++++++++++++++++++++++++++++++++++++++++ 12 files changed, 596 insertions(+), 68 deletions(-) create mode 100644 cpr/sse.cpp create mode 100644 include/cpr/sse.h create mode 100644 test/sse_tests.cpp diff --git a/README.md b/README.md index b55a26144..44cef1b93 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ And here's [less functional, more complicated code, without cpr](https://gist.gi ## Documentation -[![Documentation](https://img.shields.io/badge/docs-online-informational?style=for-the-badge&link=https://docs.libcpr.dev/)](https://docs.libcpr.dev/) +[![Documentation](https://img.shields.io/badge/docs-online-informational?style=for-the-badge&link=https://docs.libcpr.dev/)](https://docs.libcpr.dev/) You can find the latest documentation [here](https://docs.libcpr.dev/). It's a work in progress, but it should give you a better idea of how to use the library than the [tests](https://github.com/libcpr/cpr/tree/master/test) currently do. ## Features @@ -76,6 +76,7 @@ C++ Requests currently supports: * PATCH methods * Thread Safe access to [libCurl](https://curl.haxx.se/libcurl/c/threadsafe.html) * OpenSSL and WinSSL support for HTTPS requests +* Server Sent Events (SSE) handling ## Planned @@ -146,7 +147,7 @@ ctest -VV # -VV is optional since it enables verbose output ``` ### Bazel -Please refer to [hedronvision/bazel-make-cc-https-easy](https://github.com/hedronvision/bazel-make-cc-https-easy) or +Please refer to [hedronvision/bazel-make-cc-https-easy](https://github.com/hedronvision/bazel-make-cc-https-easy) or `cpr` can be added as an extension by adding the following lines to your bazel MODULE file (tested with Bazel 8). Edit the versions as needed. ```starlark diff --git a/cpr/CMakeLists.txt b/cpr/CMakeLists.txt index 3bdf67f0b..d25e65af2 100644 --- a/cpr/CMakeLists.txt +++ b/cpr/CMakeLists.txt @@ -19,6 +19,7 @@ add_library(cpr proxies.cpp proxyauth.cpp session.cpp + sse.cpp threadpool.cpp timeout.cpp unix_socket.cpp diff --git a/cpr/session.cpp b/cpr/session.cpp index 8c93c56c3..a62c8dd53 100644 --- a/cpr/session.cpp +++ b/cpr/session.cpp @@ -233,7 +233,7 @@ void Session::prepareCommon() { // Set Content: prepareBodyPayloadOrMultipart(); - if (!cbs_->writecb_.callback) { + if (!cbs_->writecb_.callback && !cbs_->ssecb_.callback) { curl_easy_setopt(curl_->handle, CURLOPT_WRITEFUNCTION, cpr::util::writeFunction); curl_easy_setopt(curl_->handle, CURLOPT_WRITEDATA, &response_string_); } @@ -322,6 +322,12 @@ void Session::SetWriteCallback(const WriteCallback& write) { curl_easy_setopt(curl_->handle, CURLOPT_WRITEDATA, &cbs_->writecb_); } +void Session::SetServerSentEventCallback(const ServerSentEventCallback& sse) { + curl_easy_setopt(curl_->handle, CURLOPT_WRITEFUNCTION, cpr::util::writeSSEFunction); + cbs_->ssecb_ = sse; + curl_easy_setopt(curl_->handle, CURLOPT_WRITEDATA, &cbs_->ssecb_); +} + void Session::SetProgressCallback(const ProgressCallback& progress) { cbs_->progresscb_ = progress; if (isCancellable) { @@ -529,7 +535,7 @@ void Session::SetSslOptions(const SslOptions& options) { } } #if SUPPORT_CURLOPT_SSLCERT_BLOB - else if(!options.cert_blob.empty()) { + else if (!options.cert_blob.empty()) { std::string cert_blob(options.cert_blob); curl_blob blob{}; // NOLINTNEXTLINE (readability-container-data-pointer) @@ -1079,6 +1085,7 @@ void Session::SetOption(const HeaderCallback& header) { SetHeaderCallback(header void Session::SetOption(const WriteCallback& write) { SetWriteCallback(write); } void Session::SetOption(const ProgressCallback& progress) { SetProgressCallback(progress); } void Session::SetOption(const DebugCallback& debug) { SetDebugCallback(debug); } +void Session::SetOption(const ServerSentEventCallback& sse) { SetServerSentEventCallback(sse); } void Session::SetOption(const Url& url) { SetUrl(url); } void Session::SetOption(const Parameters& parameters) { SetParameters(parameters); } void Session::SetOption(Parameters&& parameters) { SetParameters(std::move(parameters)); } diff --git a/cpr/sse.cpp b/cpr/sse.cpp new file mode 100644 index 000000000..146915726 --- /dev/null +++ b/cpr/sse.cpp @@ -0,0 +1,115 @@ +#include "cpr/sse.h" + +#include +#include +#include + +namespace cpr { + +bool ServerSentEventParser::parse(std::string_view data, const std::function& callback) { + // Append incoming data to buffer + buffer_.append(data); + + // Process complete lines + size_t pos = 0; + while ((pos = buffer_.find('\n')) != std::string::npos) { + std::string line = buffer_.substr(0, pos); + buffer_.erase(0, pos + 1); + + // Remove trailing \r if present (handles both \n and \r\n) + if (!line.empty() && line.back() == '\r') { + line.pop_back(); + } + + if (!processLine(line, callback)) { + return false; + } + } + + return true; +} + +void ServerSentEventParser::reset() { + buffer_.clear(); + current_event_ = ServerSentEvent(); +} + +bool ServerSentEventParser::processLine(const std::string& line, const std::function& callback) { + // Empty line means end of event + if (line.empty()) { + return dispatchEvent(callback); + } + + // Lines starting with ':' are comments, ignore them + if (line[0] == ':') { + return true; + } + + // Find the colon separator + size_t colon_pos = line.find(':'); + + std::string field; + std::string value; + + if (colon_pos == std::string::npos) { + // No colon, entire line is the field name + field = line; + value = ""; + } else { + field = line.substr(0, colon_pos); + // Skip the colon and optional leading space + size_t value_start = colon_pos + 1; + if (value_start < line.size() && line[value_start] == ' ') { + value_start++; + } + value = line.substr(value_start); + } + + // Process the field + if (field == "event") { + current_event_.event = value; + } else if (field == "data") { + // Multiple data fields are concatenated with newlines + if (!current_event_.data.empty()) { + current_event_.data += '\n'; + } + current_event_.data += value; + } else if (field == "id") { + // Only set id if the value doesn't contain null character + if (value.find('\0') == std::string::npos) { + current_event_.id = value; + } + } else if (field == "retry") { + // Parse retry value as integer + size_t retry_value = 0; + auto [ptr, ec] = std::from_chars(value.data(), value.data() + value.size(), retry_value); + if (ec == std::errc()) { + current_event_.retry = retry_value; + } + } + // Unknown fields are ignored per spec + + return true; +} + +bool ServerSentEventParser::dispatchEvent(const std::function& callback) { + // Don't dispatch if data is empty + if (current_event_.data.empty()) { + current_event_ = ServerSentEvent(); + return true; + } + + // Invoke callback with the current event + bool continue_parsing = callback(std::move(current_event_)); + + // Reset for next event (but keep event type as "message") + current_event_ = ServerSentEvent(); + + return continue_parsing; +} + +bool ServerSentEventCallback::handleData(std::string_view data) { + return parser_.parse(data, [this](ServerSentEvent&& event) { return (*this)(std::move(event)); }); +} + +} // namespace cpr diff --git a/cpr/util.cpp b/cpr/util.cpp index c622450ea..36a2b6750 100644 --- a/cpr/util.cpp +++ b/cpr/util.cpp @@ -152,6 +152,11 @@ size_t writeUserFunction(char* ptr, size_t size, size_t nmemb, const WriteCallba return (*write)({ptr, size}) ? size : 0; } +size_t writeSSEFunction(char* ptr, size_t size, size_t nmemb, ServerSentEventCallback* sse) { + size *= nmemb; + return sse->handleData({ptr, size}) ? size : 0; +} + int debugUserFunction(CURL* /*handle*/, curl_infotype type, char* data, size_t size, const DebugCallback* debug) { (*debug)(static_cast(type), std::string(data, size)); return 0; diff --git a/include/cpr/cpr.h b/include/cpr/cpr.h index 5c20f7ddb..a42058fbd 100644 --- a/include/cpr/cpr.h +++ b/include/cpr/cpr.h @@ -33,6 +33,7 @@ #include "cpr/resolve.h" #include "cpr/response.h" #include "cpr/session.h" +#include "cpr/sse.h" #include "cpr/ssl_ctx.h" #include "cpr/ssl_options.h" #include "cpr/status_codes.h" diff --git a/include/cpr/error.h b/include/cpr/error.h index 6f598f161..7613b8698 100644 --- a/include/cpr/error.h +++ b/include/cpr/error.h @@ -1,9 +1,9 @@ #ifndef CPR_ERROR_H #define CPR_ERROR_H -#include #include #include +#include #include "cpr/cprtypes.h" #include @@ -91,69 +91,69 @@ enum class ErrorCode { }; inline const std::unordered_map error_code_to_string_mapping = {{ErrorCode::OK, "OK"}, - {ErrorCode::UNSUPPORTED_PROTOCOL, "UNSUPPORTED_PROTOCOL"}, - {ErrorCode::FAILED_INIT, "FAILED_INIT"}, - {ErrorCode::URL_MALFORMAT, "URL_MALFORMAT"}, - {ErrorCode::NOT_BUILT_IN, "NOT_BUILT_IN"}, - {ErrorCode::COULDNT_RESOLVE_PROXY, "COULDNT_RESOLVE_PROXY"}, - {ErrorCode::COULDNT_RESOLVE_HOST, "COULDNT_RESOLVE_HOST"}, - {ErrorCode::COULDNT_CONNECT, "COULDNT_CONNECT"}, - {ErrorCode::WEIRD_SERVER_REPLY, "WEIRD_SERVER_REPLY"}, - {ErrorCode::REMOTE_ACCESS_DENIED, "REMOTE_ACCESS_DENIED"}, - {ErrorCode::HTTP2, "HTTP2"}, - {ErrorCode::PARTIAL_FILE, "PARTIAL_FILE"}, - {ErrorCode::QUOTE_ERROR, "QUOTE_ERROR"}, - {ErrorCode::HTTP_RETURNED_ERROR, "HTTP_RETURNED_ERROR"}, - {ErrorCode::WRITE_ERROR, "WRITE_ERROR"}, - {ErrorCode::UPLOAD_FAILED, "UPLOAD_FAILED"}, - {ErrorCode::READ_ERROR, "READ_ERROR"}, - {ErrorCode::OUT_OF_MEMORY, "OUT_OF_MEMORY"}, - {ErrorCode::OPERATION_TIMEDOUT, "OPERATION_TIMEDOUT"}, - {ErrorCode::RANGE_ERROR, "RANGE_ERROR"}, - {ErrorCode::HTTP_POST_ERROR, "HTTP_POST_ERROR"}, - {ErrorCode::SSL_CONNECT_ERROR, "SSL_CONNECT_ERROR"}, - {ErrorCode::BAD_DOWNLOAD_RESUME, "BAD_DOWNLOAD_RESUME"}, - {ErrorCode::FILE_COULDNT_READ_FILE, "FILE_COULDNT_READ_FILE"}, - {ErrorCode::FUNCTION_NOT_FOUND, "FUNCTION_NOT_FOUND"}, - {ErrorCode::ABORTED_BY_CALLBACK, "ABORTED_BY_CALLBACK"}, - {ErrorCode::BAD_FUNCTION_ARGUMENT, "BAD_FUNCTION_ARGUMENT"}, - {ErrorCode::INTERFACE_FAILED, "INTERFACE_FAILED"}, - {ErrorCode::TOO_MANY_REDIRECTS, "TOO_MANY_REDIRECTS"}, - {ErrorCode::UNKNOWN_OPTION, "UNKNOWN_OPTION"}, - {ErrorCode::SETOPT_OPTION_SYNTAX, "SETOPT_OPTION_SYNTAX"}, - {ErrorCode::GOT_NOTHING, "GOT_NOTHING"}, - {ErrorCode::SSL_ENGINE_NOTFOUND, "SSL_ENGINE_NOTFOUND"}, - {ErrorCode::SSL_ENGINE_SETFAILED, "SSL_ENGINE_SETFAILED"}, - {ErrorCode::SEND_ERROR, "SEND_ERROR"}, - {ErrorCode::RECV_ERROR, "RECV_ERROR"}, - {ErrorCode::SSL_CERTPROBLEM, "SSL_CERTPROBLEM"}, - {ErrorCode::SSL_CIPHER, "SSL_CIPHER"}, - {ErrorCode::PEER_FAILED_VERIFICATION, "PEER_FAILED_VERIFICATION"}, - {ErrorCode::BAD_CONTENT_ENCODING, "BAD_CONTENT_ENCODING"}, - {ErrorCode::FILESIZE_EXCEEDED, "FILESIZE_EXCEEDED"}, - {ErrorCode::USE_SSL_FAILED, "USE_SSL_FAILED"}, - {ErrorCode::SEND_FAIL_REWIND, "SEND_FAIL_REWIND"}, - {ErrorCode::SSL_ENGINE_INITFAILED, "SSL_ENGINE_INITFAILED"}, - {ErrorCode::LOGIN_DENIED, "LOGIN_DENIED"}, - {ErrorCode::SSL_CACERT_BADFILE, "SSL_CACERT_BADFILE"}, - {ErrorCode::SSL_SHUTDOWN_FAILED, "SSL_SHUTDOWN_FAILED"}, - {ErrorCode::AGAIN, "AGAIN"}, - {ErrorCode::SSL_CRL_BADFILE, "SSL_CRL_BADFILE"}, - {ErrorCode::SSL_ISSUER_ERROR, "SSL_ISSUER_ERROR"}, - {ErrorCode::CHUNK_FAILED, "CHUNK_FAILED"}, - {ErrorCode::NO_CONNECTION_AVAILABLE, "NO_CONNECTION_AVAILABLE"}, - {ErrorCode::SSL_PINNEDPUBKEYNOTMATCH, "SSL_PINNEDPUBKEYNOTMATCH"}, - {ErrorCode::SSL_INVALIDCERTSTATUS, "SSL_INVALIDCERTSTATUS"}, - {ErrorCode::HTTP2_STREAM, "HTTP2_STREAM"}, - {ErrorCode::RECURSIVE_API_CALL, "RECURSIVE_API_CALL"}, - {ErrorCode::AUTH_ERROR, "AUTH_ERROR"}, - {ErrorCode::HTTP3, "HTTP3"}, - {ErrorCode::QUIC_CONNECT_ERROR, "QUIC_CONNECT_ERROR"}, - {ErrorCode::PROXY, "PROXY"}, - {ErrorCode::SSL_CLIENTCERT, "SSL_CLIENTCERT"}, - {ErrorCode::UNRECOVERABLE_POLL, "UNRECOVERABLE_POLL"}, - {ErrorCode::TOO_LARGE, "TOO_LARGE"}, - {ErrorCode::UNKNOWN_ERROR, "UNKNOWN_ERROR"}}; + {ErrorCode::UNSUPPORTED_PROTOCOL, "UNSUPPORTED_PROTOCOL"}, + {ErrorCode::FAILED_INIT, "FAILED_INIT"}, + {ErrorCode::URL_MALFORMAT, "URL_MALFORMAT"}, + {ErrorCode::NOT_BUILT_IN, "NOT_BUILT_IN"}, + {ErrorCode::COULDNT_RESOLVE_PROXY, "COULDNT_RESOLVE_PROXY"}, + {ErrorCode::COULDNT_RESOLVE_HOST, "COULDNT_RESOLVE_HOST"}, + {ErrorCode::COULDNT_CONNECT, "COULDNT_CONNECT"}, + {ErrorCode::WEIRD_SERVER_REPLY, "WEIRD_SERVER_REPLY"}, + {ErrorCode::REMOTE_ACCESS_DENIED, "REMOTE_ACCESS_DENIED"}, + {ErrorCode::HTTP2, "HTTP2"}, + {ErrorCode::PARTIAL_FILE, "PARTIAL_FILE"}, + {ErrorCode::QUOTE_ERROR, "QUOTE_ERROR"}, + {ErrorCode::HTTP_RETURNED_ERROR, "HTTP_RETURNED_ERROR"}, + {ErrorCode::WRITE_ERROR, "WRITE_ERROR"}, + {ErrorCode::UPLOAD_FAILED, "UPLOAD_FAILED"}, + {ErrorCode::READ_ERROR, "READ_ERROR"}, + {ErrorCode::OUT_OF_MEMORY, "OUT_OF_MEMORY"}, + {ErrorCode::OPERATION_TIMEDOUT, "OPERATION_TIMEDOUT"}, + {ErrorCode::RANGE_ERROR, "RANGE_ERROR"}, + {ErrorCode::HTTP_POST_ERROR, "HTTP_POST_ERROR"}, + {ErrorCode::SSL_CONNECT_ERROR, "SSL_CONNECT_ERROR"}, + {ErrorCode::BAD_DOWNLOAD_RESUME, "BAD_DOWNLOAD_RESUME"}, + {ErrorCode::FILE_COULDNT_READ_FILE, "FILE_COULDNT_READ_FILE"}, + {ErrorCode::FUNCTION_NOT_FOUND, "FUNCTION_NOT_FOUND"}, + {ErrorCode::ABORTED_BY_CALLBACK, "ABORTED_BY_CALLBACK"}, + {ErrorCode::BAD_FUNCTION_ARGUMENT, "BAD_FUNCTION_ARGUMENT"}, + {ErrorCode::INTERFACE_FAILED, "INTERFACE_FAILED"}, + {ErrorCode::TOO_MANY_REDIRECTS, "TOO_MANY_REDIRECTS"}, + {ErrorCode::UNKNOWN_OPTION, "UNKNOWN_OPTION"}, + {ErrorCode::SETOPT_OPTION_SYNTAX, "SETOPT_OPTION_SYNTAX"}, + {ErrorCode::GOT_NOTHING, "GOT_NOTHING"}, + {ErrorCode::SSL_ENGINE_NOTFOUND, "SSL_ENGINE_NOTFOUND"}, + {ErrorCode::SSL_ENGINE_SETFAILED, "SSL_ENGINE_SETFAILED"}, + {ErrorCode::SEND_ERROR, "SEND_ERROR"}, + {ErrorCode::RECV_ERROR, "RECV_ERROR"}, + {ErrorCode::SSL_CERTPROBLEM, "SSL_CERTPROBLEM"}, + {ErrorCode::SSL_CIPHER, "SSL_CIPHER"}, + {ErrorCode::PEER_FAILED_VERIFICATION, "PEER_FAILED_VERIFICATION"}, + {ErrorCode::BAD_CONTENT_ENCODING, "BAD_CONTENT_ENCODING"}, + {ErrorCode::FILESIZE_EXCEEDED, "FILESIZE_EXCEEDED"}, + {ErrorCode::USE_SSL_FAILED, "USE_SSL_FAILED"}, + {ErrorCode::SEND_FAIL_REWIND, "SEND_FAIL_REWIND"}, + {ErrorCode::SSL_ENGINE_INITFAILED, "SSL_ENGINE_INITFAILED"}, + {ErrorCode::LOGIN_DENIED, "LOGIN_DENIED"}, + {ErrorCode::SSL_CACERT_BADFILE, "SSL_CACERT_BADFILE"}, + {ErrorCode::SSL_SHUTDOWN_FAILED, "SSL_SHUTDOWN_FAILED"}, + {ErrorCode::AGAIN, "AGAIN"}, + {ErrorCode::SSL_CRL_BADFILE, "SSL_CRL_BADFILE"}, + {ErrorCode::SSL_ISSUER_ERROR, "SSL_ISSUER_ERROR"}, + {ErrorCode::CHUNK_FAILED, "CHUNK_FAILED"}, + {ErrorCode::NO_CONNECTION_AVAILABLE, "NO_CONNECTION_AVAILABLE"}, + {ErrorCode::SSL_PINNEDPUBKEYNOTMATCH, "SSL_PINNEDPUBKEYNOTMATCH"}, + {ErrorCode::SSL_INVALIDCERTSTATUS, "SSL_INVALIDCERTSTATUS"}, + {ErrorCode::HTTP2_STREAM, "HTTP2_STREAM"}, + {ErrorCode::RECURSIVE_API_CALL, "RECURSIVE_API_CALL"}, + {ErrorCode::AUTH_ERROR, "AUTH_ERROR"}, + {ErrorCode::HTTP3, "HTTP3"}, + {ErrorCode::QUIC_CONNECT_ERROR, "QUIC_CONNECT_ERROR"}, + {ErrorCode::PROXY, "PROXY"}, + {ErrorCode::SSL_CLIENTCERT, "SSL_CLIENTCERT"}, + {ErrorCode::UNRECOVERABLE_POLL, "UNRECOVERABLE_POLL"}, + {ErrorCode::TOO_LARGE, "TOO_LARGE"}, + {ErrorCode::UNKNOWN_ERROR, "UNKNOWN_ERROR"}}; class Error { public: diff --git a/include/cpr/session.h b/include/cpr/session.h index 483b7ef2d..0781559f9 100644 --- a/include/cpr/session.h +++ b/include/cpr/session.h @@ -38,6 +38,7 @@ #include "cpr/reserve_size.h" #include "cpr/resolve.h" #include "cpr/response.h" +#include "cpr/sse.h" #include "cpr/ssl_options.h" #include "cpr/timeout.h" #include "cpr/unix_socket.h" @@ -103,6 +104,7 @@ class Session : public std::enable_shared_from_this { void SetWriteCallback(const WriteCallback& write); void SetProgressCallback(const ProgressCallback& progress); void SetDebugCallback(const DebugCallback& debug); + void SetServerSentEventCallback(const ServerSentEventCallback& sse); void SetVerbose(const Verbose& verbose); void SetInterface(const Interface& iface); void SetLocalPort(const LocalPort& local_port); @@ -165,6 +167,7 @@ class Session : public std::enable_shared_from_this { void SetOption(const WriteCallback& write); void SetOption(const ProgressCallback& progress); void SetOption(const DebugCallback& debug); + void SetOption(const ServerSentEventCallback& sse); void SetOption(const LowSpeed& low_speed); void SetOption(const VerifySsl& verify); void SetOption(const Verbose& verbose); @@ -276,6 +279,7 @@ class Session : public std::enable_shared_from_this { ProgressCallback progresscb_; DebugCallback debugcb_; CancellationCallback cancellationcb_; + ServerSentEventCallback ssecb_; }; std::unique_ptr cbs_{std::make_unique()}; diff --git a/include/cpr/sse.h b/include/cpr/sse.h new file mode 100644 index 000000000..bd6616330 --- /dev/null +++ b/include/cpr/sse.h @@ -0,0 +1,101 @@ +#ifndef CPR_SSE_H +#define CPR_SSE_H + +#include +#include +#include +#include +#include + +namespace cpr { + +/** + * Represents a Server-Sent Event (SSE) as defined in the HTML5 specification. + * https://html.spec.whatwg.org/multipage/server-sent-events.html + */ +struct ServerSentEvent { + /** + * The event ID. Can be used to track the last received event and resume from there. + */ + std::optional id; + + /** + * The event type. If not specified, defaults to "message". + */ + std::string event; + + /** + * The event data. Multiple data fields are concatenated with newlines. + */ + std::string data; + + /** + * The retry time in milliseconds. Used to set the reconnection time. + */ + std::optional retry; + + ServerSentEvent() : event("message") {} +}; + +/** + * Parser for Server-Sent Events (SSE) streams. + * This parser handles incoming SSE data according to the HTML5 specification. + */ +class ServerSentEventParser { + public: + ServerSentEventParser() = default; + + /** + * Parse incoming SSE data and invoke the callback for each complete event. + * @param data The incoming data chunk + * @param callback The callback to invoke for each parsed event + * @return true to continue receiving data, false to abort + */ + bool parse(std::string_view data, const std::function& callback); + + /** + * Reset the parser state. + */ + void reset(); + + private: + std::string buffer_; + ServerSentEvent current_event_; + + bool processLine(const std::string& line, const std::function& callback); + bool dispatchEvent(const std::function& callback); +}; + +/** + * Callback for handling Server-Sent Events. + * The callback receives each parsed SSE event and can return false to abort the connection. + */ +class ServerSentEventCallback { + public: + ServerSentEventCallback() = default; + // NOLINTNEXTLINE(google-explicit-constructor, hicpp-explicit-conversions) + ServerSentEventCallback(std::function p_callback, intptr_t p_userdata = 0) : userdata(p_userdata), callback(std::move(p_callback)) {} + + bool operator()(ServerSentEvent&& event) const { + if (!callback) { + return true; + } + return callback(std::move(event), userdata); + } + + /** + * Internal function used to handle raw data chunks and parse them into SSE events. + * This is called by the underlying write callback mechanism. + */ + bool handleData(std::string_view data); + + intptr_t userdata{}; + std::function callback; + + private: + ServerSentEventParser parser_; +}; + +} // namespace cpr + +#endif diff --git a/include/cpr/util.h b/include/cpr/util.h index 67a678121..4ffd02df6 100644 --- a/include/cpr/util.h +++ b/include/cpr/util.h @@ -10,6 +10,7 @@ #include "cpr/cookies.h" #include "cpr/cprtypes.h" #include "cpr/secure_string.h" +#include "cpr/sse.h" namespace cpr::util { @@ -20,6 +21,7 @@ size_t headerUserFunction(char* ptr, size_t size, size_t nmemb, const HeaderCall size_t writeFunction(char* ptr, size_t size, size_t nmemb, void* data); size_t writeFileFunction(char* ptr, size_t size, size_t nmemb, std::ofstream* file); size_t writeUserFunction(char* ptr, size_t size, size_t nmemb, const WriteCallback* write); +size_t writeSSEFunction(char* ptr, size_t size, size_t nmemb, ServerSentEventCallback* sse); template int progressUserFunction(const T* progress, cpr_pf_arg_t dltotal, cpr_pf_arg_t dlnow, cpr_pf_arg_t ultotal, cpr_pf_arg_t ulnow) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 14c3d0b90..3b624419c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -71,6 +71,7 @@ add_cpr_test(singleton) add_cpr_test(threadpool) add_cpr_test(testUtils) add_cpr_test(connection_pool) +add_cpr_test(sse) if (ENABLE_SSL_TESTS) add_cpr_test(ssl) diff --git a/test/sse_tests.cpp b/test/sse_tests.cpp new file mode 100644 index 000000000..a63bdb294 --- /dev/null +++ b/test/sse_tests.cpp @@ -0,0 +1,290 @@ +#include + +#include +#include +#include +#include + +#include "cpr/cpr.h" +#include "cpr/sse.h" +#include "httpServer.hpp" + +using namespace cpr; + +static HttpServer* server = new HttpServer(); + +TEST(SSETests, SSEParserBasicTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "data: Hello World\n\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Hello World"); + EXPECT_EQ(events[0].event, "message"); + EXPECT_FALSE(events[0].id.has_value()); + EXPECT_FALSE(events[0].retry.has_value()); +} + +TEST(SSETests, SSEParserMultilineDataTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "data: First line\ndata: Second line\n\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "First line\nSecond line"); + EXPECT_EQ(events[0].event, "message"); +} + +TEST(SSETests, SSEParserWithEventTypeTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "event: custom\ndata: Custom event data\n\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Custom event data"); + EXPECT_EQ(events[0].event, "custom"); +} + +TEST(SSETests, SSEParserWithIdTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "id: 123\ndata: Event with ID\n\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Event with ID"); + EXPECT_EQ(events[0].event, "message"); + ASSERT_TRUE(events[0].id.has_value()); + EXPECT_EQ(events[0].id.value(), "123"); +} + +TEST(SSETests, SSEParserWithRetryTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "retry: 5000\ndata: Event with retry\n\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Event with retry"); + ASSERT_TRUE(events[0].retry.has_value()); + EXPECT_EQ(events[0].retry.value(), 5000); +} + +TEST(SSETests, SSEParserCompleteEventTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = + "id: 42\n" + "event: update\n" + "retry: 3000\n" + "data: Complete event data\n" + "\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Complete event data"); + EXPECT_EQ(events[0].event, "update"); + ASSERT_TRUE(events[0].id.has_value()); + EXPECT_EQ(events[0].id.value(), "42"); + ASSERT_TRUE(events[0].retry.has_value()); + EXPECT_EQ(events[0].retry.value(), 3000); +} + +TEST(SSETests, SSEParserMultipleEventsTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = + "data: First event\n" + "\n" + "data: Second event\n" + "\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 2); + EXPECT_EQ(events[0].data, "First event"); + EXPECT_EQ(events[1].data, "Second event"); +} + +TEST(SSETests, SSEParserIgnoreCommentsTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = + ": This is a comment\n" + "data: Event data\n" + ": Another comment\n" + "\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Event data"); +} + +TEST(SSETests, SSEParserIgnoreEmptyDataTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = + "event: test\n" + "\n"; // Event with no data should be ignored + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 0); +} + +TEST(SSETests, SSEParserChunkedDataTest) { + ServerSentEventParser parser; + std::vector events; + + // Simulate data arriving in chunks + parser.parse("data: Partial", [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + EXPECT_EQ(events.size(), 0); // No complete event yet + + parser.parse(" event\n\n", [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Partial event"); +} + +TEST(SSETests, SSEParserCRLFTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "data: Windows line endings\r\n\r\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Windows line endings"); +} + +TEST(SSETests, SSEParserFieldWithoutColonTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = + "data: Some actual data\n" + "data\n" // Field without colon should have empty value, appended with newline + "\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Some actual data\n"); +} + +TEST(SSETests, SSECallbackTest) { + ServerSentEventCallback callback( + [](ServerSentEvent&& /*event*/, intptr_t userdata) { + int* count = reinterpret_cast(userdata); + (*count)++; + return true; + }, + 0); + + EXPECT_TRUE(callback.callback); +} + +TEST(SSETests, SSECallbackHandleDataTest) { + std::vector events; + ServerSentEventCallback callback( + [&events](ServerSentEvent&& event, intptr_t /*userdata*/) { + events.push_back(std::move(event)); + return true; + }, + 0); + + std::string sse_data = "data: Test event\n\n"; + callback.handleData(sse_data); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Test event"); +} + +TEST(SSETests, SSECallbackAbortTest) { + int event_count = 0; + ServerSentEventCallback callback( + [&event_count](ServerSentEvent&& /*event*/, intptr_t /*userdata*/) { + event_count++; + // Abort after second event + return event_count < 2; + }, + 0); + + std::string sse_data = + "data: First\n\n" + "data: Second\n\n" + "data: Third\n\n"; + + // First two events should be processed, third should cause abort + callback.handleData(sse_data); + + // The callback is called twice (returns true the first time, false the second) + // The parser stops after the callback returns false + EXPECT_EQ(event_count, 2); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + ::testing::AddGlobalTestEnvironment(server); + return RUN_ALL_TESTS(); +} From 036329da80bae37e89bbc92b6ebf91a9a90974d0 Mon Sep 17 00:00:00 2001 From: Pascal Bach Date: Sat, 22 Nov 2025 23:29:23 +0100 Subject: [PATCH 2/2] Address lint errors --- .clang-tidy | 1 + cpr/sse.cpp | 15 ++++++++++----- cpr/util.cpp | 1 + include/cpr/sse.h | 5 +++-- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/.clang-tidy b/.clang-tidy index 0db89ae0e..55570bf18 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -32,6 +32,7 @@ Checks: '*, -altera-id-dependent-backward-branch, -bugprone-easily-swappable-parameters, -modernize-return-braced-init-list, +-abseil-string-find-str-contains, -cppcoreguidelines-avoid-magic-numbers, -readability-magic-numbers, -cppcoreguidelines-avoid-do-while, diff --git a/cpr/sse.cpp b/cpr/sse.cpp index 146915726..c352fae35 100644 --- a/cpr/sse.cpp +++ b/cpr/sse.cpp @@ -1,8 +1,12 @@ #include "cpr/sse.h" -#include -#include #include +#include +#include +#include +#include +#include +#include namespace cpr { @@ -46,7 +50,7 @@ bool ServerSentEventParser::processLine(const std::string& line, const std::func } // Find the colon separator - size_t colon_pos = line.find(':'); + const size_t colon_pos = line.find(':'); std::string field; std::string value; @@ -82,7 +86,8 @@ bool ServerSentEventParser::processLine(const std::string& line, const std::func } else if (field == "retry") { // Parse retry value as integer size_t retry_value = 0; - auto [ptr, ec] = std::from_chars(value.data(), value.data() + value.size(), retry_value); + const std::string_view sv(value); + auto [ptr, ec] = std::from_chars(sv.begin(), sv.end(), retry_value); if (ec == std::errc()) { current_event_.retry = retry_value; } @@ -100,7 +105,7 @@ bool ServerSentEventParser::dispatchEvent(const std::function #include #include diff --git a/include/cpr/sse.h b/include/cpr/sse.h index bd6616330..2a137b361 100644 --- a/include/cpr/sse.h +++ b/include/cpr/sse.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -22,7 +23,7 @@ struct ServerSentEvent { /** * The event type. If not specified, defaults to "message". */ - std::string event; + std::string event{"message"}; /** * The event data. Multiple data fields are concatenated with newlines. @@ -34,7 +35,7 @@ struct ServerSentEvent { */ std::optional retry; - ServerSentEvent() : event("message") {} + ServerSentEvent() = default; }; /**