diff --git a/.clang-tidy b/.clang-tidy index 0db89ae0e..55570bf18 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -32,6 +32,7 @@ Checks: '*, -altera-id-dependent-backward-branch, -bugprone-easily-swappable-parameters, -modernize-return-braced-init-list, +-abseil-string-find-str-contains, -cppcoreguidelines-avoid-magic-numbers, -readability-magic-numbers, -cppcoreguidelines-avoid-do-while, diff --git a/README.md b/README.md index 644dcdb0f..c82534f7e 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ And here's [less functional, more complicated code, without cpr](https://gist.gi ## Documentation -[![Documentation](https://img.shields.io/badge/docs-online-informational?style=for-the-badge&link=https://docs.libcpr.dev/)](https://docs.libcpr.dev/) +[![Documentation](https://img.shields.io/badge/docs-online-informational?style=for-the-badge&link=https://docs.libcpr.dev/)](https://docs.libcpr.dev/) You can find the latest documentation [here](https://docs.libcpr.dev/). It's a work in progress, but it should give you a better idea of how to use the library than the [tests](https://github.com/libcpr/cpr/tree/master/test) currently do. ## Features @@ -76,6 +76,7 @@ C++ Requests currently supports: * PATCH methods * Thread Safe access to [libCurl](https://curl.haxx.se/libcurl/c/threadsafe.html) * OpenSSL and WinSSL support for HTTPS requests +* Server Sent Events (SSE) handling ## Planned @@ -146,7 +147,7 @@ ctest -VV # -VV is optional since it enables verbose output ``` ### Bazel -Please refer to [hedronvision/bazel-make-cc-https-easy](https://github.com/hedronvision/bazel-make-cc-https-easy) or +Please refer to [hedronvision/bazel-make-cc-https-easy](https://github.com/hedronvision/bazel-make-cc-https-easy) or `cpr` can be added as an extension by adding the following lines to your bazel MODULE file (tested with Bazel 8). Edit the versions as needed. ```starlark diff --git a/cpr/CMakeLists.txt b/cpr/CMakeLists.txt index 3bdf67f0b..d25e65af2 100644 --- a/cpr/CMakeLists.txt +++ b/cpr/CMakeLists.txt @@ -19,6 +19,7 @@ add_library(cpr proxies.cpp proxyauth.cpp session.cpp + sse.cpp threadpool.cpp timeout.cpp unix_socket.cpp diff --git a/cpr/session.cpp b/cpr/session.cpp index 8c93c56c3..a62c8dd53 100644 --- a/cpr/session.cpp +++ b/cpr/session.cpp @@ -233,7 +233,7 @@ void Session::prepareCommon() { // Set Content: prepareBodyPayloadOrMultipart(); - if (!cbs_->writecb_.callback) { + if (!cbs_->writecb_.callback && !cbs_->ssecb_.callback) { curl_easy_setopt(curl_->handle, CURLOPT_WRITEFUNCTION, cpr::util::writeFunction); curl_easy_setopt(curl_->handle, CURLOPT_WRITEDATA, &response_string_); } @@ -322,6 +322,12 @@ void Session::SetWriteCallback(const WriteCallback& write) { curl_easy_setopt(curl_->handle, CURLOPT_WRITEDATA, &cbs_->writecb_); } +void Session::SetServerSentEventCallback(const ServerSentEventCallback& sse) { + curl_easy_setopt(curl_->handle, CURLOPT_WRITEFUNCTION, cpr::util::writeSSEFunction); + cbs_->ssecb_ = sse; + curl_easy_setopt(curl_->handle, CURLOPT_WRITEDATA, &cbs_->ssecb_); +} + void Session::SetProgressCallback(const ProgressCallback& progress) { cbs_->progresscb_ = progress; if (isCancellable) { @@ -529,7 +535,7 @@ void Session::SetSslOptions(const SslOptions& options) { } } #if SUPPORT_CURLOPT_SSLCERT_BLOB - else if(!options.cert_blob.empty()) { + else if (!options.cert_blob.empty()) { std::string cert_blob(options.cert_blob); curl_blob blob{}; // NOLINTNEXTLINE (readability-container-data-pointer) @@ -1079,6 +1085,7 @@ void Session::SetOption(const HeaderCallback& header) { SetHeaderCallback(header void Session::SetOption(const WriteCallback& write) { SetWriteCallback(write); } void Session::SetOption(const ProgressCallback& progress) { SetProgressCallback(progress); } void Session::SetOption(const DebugCallback& debug) { SetDebugCallback(debug); } +void Session::SetOption(const ServerSentEventCallback& sse) { SetServerSentEventCallback(sse); } void Session::SetOption(const Url& url) { SetUrl(url); } void Session::SetOption(const Parameters& parameters) { SetParameters(parameters); } void Session::SetOption(Parameters&& parameters) { SetParameters(std::move(parameters)); } diff --git a/cpr/sse.cpp b/cpr/sse.cpp new file mode 100644 index 000000000..c352fae35 --- /dev/null +++ b/cpr/sse.cpp @@ -0,0 +1,120 @@ +#include "cpr/sse.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace cpr { + +bool ServerSentEventParser::parse(std::string_view data, const std::function& callback) { + // Append incoming data to buffer + buffer_.append(data); + + // Process complete lines + size_t pos = 0; + while ((pos = buffer_.find('\n')) != std::string::npos) { + std::string line = buffer_.substr(0, pos); + buffer_.erase(0, pos + 1); + + // Remove trailing \r if present (handles both \n and \r\n) + if (!line.empty() && line.back() == '\r') { + line.pop_back(); + } + + if (!processLine(line, callback)) { + return false; + } + } + + return true; +} + +void ServerSentEventParser::reset() { + buffer_.clear(); + current_event_ = ServerSentEvent(); +} + +bool ServerSentEventParser::processLine(const std::string& line, const std::function& callback) { + // Empty line means end of event + if (line.empty()) { + return dispatchEvent(callback); + } + + // Lines starting with ':' are comments, ignore them + if (line[0] == ':') { + return true; + } + + // Find the colon separator + const size_t colon_pos = line.find(':'); + + std::string field; + std::string value; + + if (colon_pos == std::string::npos) { + // No colon, entire line is the field name + field = line; + value = ""; + } else { + field = line.substr(0, colon_pos); + // Skip the colon and optional leading space + size_t value_start = colon_pos + 1; + if (value_start < line.size() && line[value_start] == ' ') { + value_start++; + } + value = line.substr(value_start); + } + + // Process the field + if (field == "event") { + current_event_.event = value; + } else if (field == "data") { + // Multiple data fields are concatenated with newlines + if (!current_event_.data.empty()) { + current_event_.data += '\n'; + } + current_event_.data += value; + } else if (field == "id") { + // Only set id if the value doesn't contain null character + if (value.find('\0') == std::string::npos) { + current_event_.id = value; + } + } else if (field == "retry") { + // Parse retry value as integer + size_t retry_value = 0; + const std::string_view sv(value); + auto [ptr, ec] = std::from_chars(sv.begin(), sv.end(), retry_value); + if (ec == std::errc()) { + current_event_.retry = retry_value; + } + } + // Unknown fields are ignored per spec + + return true; +} + +bool ServerSentEventParser::dispatchEvent(const std::function& callback) { + // Don't dispatch if data is empty + if (current_event_.data.empty()) { + current_event_ = ServerSentEvent(); + return true; + } + + // Invoke callback with the current event + const bool continue_parsing = callback(std::move(current_event_)); + + // Reset for next event (but keep event type as "message") + current_event_ = ServerSentEvent(); + + return continue_parsing; +} + +bool ServerSentEventCallback::handleData(std::string_view data) { + return parser_.parse(data, [this](ServerSentEvent&& event) { return (*this)(std::move(event)); }); +} + +} // namespace cpr diff --git a/cpr/util.cpp b/cpr/util.cpp index c622450ea..8bf5426a2 100644 --- a/cpr/util.cpp +++ b/cpr/util.cpp @@ -4,6 +4,7 @@ #include "cpr/cprtypes.h" #include "cpr/curlholder.h" #include "cpr/secure_string.h" +#include "cpr/sse.h" #include #include #include @@ -152,6 +153,11 @@ size_t writeUserFunction(char* ptr, size_t size, size_t nmemb, const WriteCallba return (*write)({ptr, size}) ? size : 0; } +size_t writeSSEFunction(char* ptr, size_t size, size_t nmemb, ServerSentEventCallback* sse) { + size *= nmemb; + return sse->handleData({ptr, size}) ? size : 0; +} + int debugUserFunction(CURL* /*handle*/, curl_infotype type, char* data, size_t size, const DebugCallback* debug) { (*debug)(static_cast(type), std::string(data, size)); return 0; diff --git a/include/cpr/cpr.h b/include/cpr/cpr.h index 5c20f7ddb..a42058fbd 100644 --- a/include/cpr/cpr.h +++ b/include/cpr/cpr.h @@ -33,6 +33,7 @@ #include "cpr/resolve.h" #include "cpr/response.h" #include "cpr/session.h" +#include "cpr/sse.h" #include "cpr/ssl_ctx.h" #include "cpr/ssl_options.h" #include "cpr/status_codes.h" diff --git a/include/cpr/error.h b/include/cpr/error.h index 738d91040..0548ba9e4 100644 --- a/include/cpr/error.h +++ b/include/cpr/error.h @@ -1,9 +1,9 @@ #ifndef CPR_ERROR_H #define CPR_ERROR_H -#include #include #include +#include #include "cpr/cprtypes.h" #include diff --git a/include/cpr/session.h b/include/cpr/session.h index 483b7ef2d..0781559f9 100644 --- a/include/cpr/session.h +++ b/include/cpr/session.h @@ -38,6 +38,7 @@ #include "cpr/reserve_size.h" #include "cpr/resolve.h" #include "cpr/response.h" +#include "cpr/sse.h" #include "cpr/ssl_options.h" #include "cpr/timeout.h" #include "cpr/unix_socket.h" @@ -103,6 +104,7 @@ class Session : public std::enable_shared_from_this { void SetWriteCallback(const WriteCallback& write); void SetProgressCallback(const ProgressCallback& progress); void SetDebugCallback(const DebugCallback& debug); + void SetServerSentEventCallback(const ServerSentEventCallback& sse); void SetVerbose(const Verbose& verbose); void SetInterface(const Interface& iface); void SetLocalPort(const LocalPort& local_port); @@ -165,6 +167,7 @@ class Session : public std::enable_shared_from_this { void SetOption(const WriteCallback& write); void SetOption(const ProgressCallback& progress); void SetOption(const DebugCallback& debug); + void SetOption(const ServerSentEventCallback& sse); void SetOption(const LowSpeed& low_speed); void SetOption(const VerifySsl& verify); void SetOption(const Verbose& verbose); @@ -276,6 +279,7 @@ class Session : public std::enable_shared_from_this { ProgressCallback progresscb_; DebugCallback debugcb_; CancellationCallback cancellationcb_; + ServerSentEventCallback ssecb_; }; std::unique_ptr cbs_{std::make_unique()}; diff --git a/include/cpr/sse.h b/include/cpr/sse.h new file mode 100644 index 000000000..2a137b361 --- /dev/null +++ b/include/cpr/sse.h @@ -0,0 +1,102 @@ +#ifndef CPR_SSE_H +#define CPR_SSE_H + +#include +#include +#include +#include +#include +#include + +namespace cpr { + +/** + * Represents a Server-Sent Event (SSE) as defined in the HTML5 specification. + * https://html.spec.whatwg.org/multipage/server-sent-events.html + */ +struct ServerSentEvent { + /** + * The event ID. Can be used to track the last received event and resume from there. + */ + std::optional id; + + /** + * The event type. If not specified, defaults to "message". + */ + std::string event{"message"}; + + /** + * The event data. Multiple data fields are concatenated with newlines. + */ + std::string data; + + /** + * The retry time in milliseconds. Used to set the reconnection time. + */ + std::optional retry; + + ServerSentEvent() = default; +}; + +/** + * Parser for Server-Sent Events (SSE) streams. + * This parser handles incoming SSE data according to the HTML5 specification. + */ +class ServerSentEventParser { + public: + ServerSentEventParser() = default; + + /** + * Parse incoming SSE data and invoke the callback for each complete event. + * @param data The incoming data chunk + * @param callback The callback to invoke for each parsed event + * @return true to continue receiving data, false to abort + */ + bool parse(std::string_view data, const std::function& callback); + + /** + * Reset the parser state. + */ + void reset(); + + private: + std::string buffer_; + ServerSentEvent current_event_; + + bool processLine(const std::string& line, const std::function& callback); + bool dispatchEvent(const std::function& callback); +}; + +/** + * Callback for handling Server-Sent Events. + * The callback receives each parsed SSE event and can return false to abort the connection. + */ +class ServerSentEventCallback { + public: + ServerSentEventCallback() = default; + // NOLINTNEXTLINE(google-explicit-constructor, hicpp-explicit-conversions) + ServerSentEventCallback(std::function p_callback, intptr_t p_userdata = 0) : userdata(p_userdata), callback(std::move(p_callback)) {} + + bool operator()(ServerSentEvent&& event) const { + if (!callback) { + return true; + } + return callback(std::move(event), userdata); + } + + /** + * Internal function used to handle raw data chunks and parse them into SSE events. + * This is called by the underlying write callback mechanism. + */ + bool handleData(std::string_view data); + + intptr_t userdata{}; + std::function callback; + + private: + ServerSentEventParser parser_; +}; + +} // namespace cpr + +#endif diff --git a/include/cpr/util.h b/include/cpr/util.h index 67a678121..4ffd02df6 100644 --- a/include/cpr/util.h +++ b/include/cpr/util.h @@ -10,6 +10,7 @@ #include "cpr/cookies.h" #include "cpr/cprtypes.h" #include "cpr/secure_string.h" +#include "cpr/sse.h" namespace cpr::util { @@ -20,6 +21,7 @@ size_t headerUserFunction(char* ptr, size_t size, size_t nmemb, const HeaderCall size_t writeFunction(char* ptr, size_t size, size_t nmemb, void* data); size_t writeFileFunction(char* ptr, size_t size, size_t nmemb, std::ofstream* file); size_t writeUserFunction(char* ptr, size_t size, size_t nmemb, const WriteCallback* write); +size_t writeSSEFunction(char* ptr, size_t size, size_t nmemb, ServerSentEventCallback* sse); template int progressUserFunction(const T* progress, cpr_pf_arg_t dltotal, cpr_pf_arg_t dlnow, cpr_pf_arg_t ultotal, cpr_pf_arg_t ulnow) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 14c3d0b90..3b624419c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -71,6 +71,7 @@ add_cpr_test(singleton) add_cpr_test(threadpool) add_cpr_test(testUtils) add_cpr_test(connection_pool) +add_cpr_test(sse) if (ENABLE_SSL_TESTS) add_cpr_test(ssl) diff --git a/test/sse_tests.cpp b/test/sse_tests.cpp new file mode 100644 index 000000000..a63bdb294 --- /dev/null +++ b/test/sse_tests.cpp @@ -0,0 +1,290 @@ +#include + +#include +#include +#include +#include + +#include "cpr/cpr.h" +#include "cpr/sse.h" +#include "httpServer.hpp" + +using namespace cpr; + +static HttpServer* server = new HttpServer(); + +TEST(SSETests, SSEParserBasicTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "data: Hello World\n\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Hello World"); + EXPECT_EQ(events[0].event, "message"); + EXPECT_FALSE(events[0].id.has_value()); + EXPECT_FALSE(events[0].retry.has_value()); +} + +TEST(SSETests, SSEParserMultilineDataTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "data: First line\ndata: Second line\n\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "First line\nSecond line"); + EXPECT_EQ(events[0].event, "message"); +} + +TEST(SSETests, SSEParserWithEventTypeTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "event: custom\ndata: Custom event data\n\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Custom event data"); + EXPECT_EQ(events[0].event, "custom"); +} + +TEST(SSETests, SSEParserWithIdTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "id: 123\ndata: Event with ID\n\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Event with ID"); + EXPECT_EQ(events[0].event, "message"); + ASSERT_TRUE(events[0].id.has_value()); + EXPECT_EQ(events[0].id.value(), "123"); +} + +TEST(SSETests, SSEParserWithRetryTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "retry: 5000\ndata: Event with retry\n\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Event with retry"); + ASSERT_TRUE(events[0].retry.has_value()); + EXPECT_EQ(events[0].retry.value(), 5000); +} + +TEST(SSETests, SSEParserCompleteEventTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = + "id: 42\n" + "event: update\n" + "retry: 3000\n" + "data: Complete event data\n" + "\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Complete event data"); + EXPECT_EQ(events[0].event, "update"); + ASSERT_TRUE(events[0].id.has_value()); + EXPECT_EQ(events[0].id.value(), "42"); + ASSERT_TRUE(events[0].retry.has_value()); + EXPECT_EQ(events[0].retry.value(), 3000); +} + +TEST(SSETests, SSEParserMultipleEventsTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = + "data: First event\n" + "\n" + "data: Second event\n" + "\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 2); + EXPECT_EQ(events[0].data, "First event"); + EXPECT_EQ(events[1].data, "Second event"); +} + +TEST(SSETests, SSEParserIgnoreCommentsTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = + ": This is a comment\n" + "data: Event data\n" + ": Another comment\n" + "\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Event data"); +} + +TEST(SSETests, SSEParserIgnoreEmptyDataTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = + "event: test\n" + "\n"; // Event with no data should be ignored + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 0); +} + +TEST(SSETests, SSEParserChunkedDataTest) { + ServerSentEventParser parser; + std::vector events; + + // Simulate data arriving in chunks + parser.parse("data: Partial", [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + EXPECT_EQ(events.size(), 0); // No complete event yet + + parser.parse(" event\n\n", [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Partial event"); +} + +TEST(SSETests, SSEParserCRLFTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = "data: Windows line endings\r\n\r\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Windows line endings"); +} + +TEST(SSETests, SSEParserFieldWithoutColonTest) { + ServerSentEventParser parser; + std::vector events; + + std::string sse_data = + "data: Some actual data\n" + "data\n" // Field without colon should have empty value, appended with newline + "\n"; + + parser.parse(sse_data, [&events](ServerSentEvent&& event) { + events.push_back(std::move(event)); + return true; + }); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Some actual data\n"); +} + +TEST(SSETests, SSECallbackTest) { + ServerSentEventCallback callback( + [](ServerSentEvent&& /*event*/, intptr_t userdata) { + int* count = reinterpret_cast(userdata); + (*count)++; + return true; + }, + 0); + + EXPECT_TRUE(callback.callback); +} + +TEST(SSETests, SSECallbackHandleDataTest) { + std::vector events; + ServerSentEventCallback callback( + [&events](ServerSentEvent&& event, intptr_t /*userdata*/) { + events.push_back(std::move(event)); + return true; + }, + 0); + + std::string sse_data = "data: Test event\n\n"; + callback.handleData(sse_data); + + ASSERT_EQ(events.size(), 1); + EXPECT_EQ(events[0].data, "Test event"); +} + +TEST(SSETests, SSECallbackAbortTest) { + int event_count = 0; + ServerSentEventCallback callback( + [&event_count](ServerSentEvent&& /*event*/, intptr_t /*userdata*/) { + event_count++; + // Abort after second event + return event_count < 2; + }, + 0); + + std::string sse_data = + "data: First\n\n" + "data: Second\n\n" + "data: Third\n\n"; + + // First two events should be processed, third should cause abort + callback.handleData(sse_data); + + // The callback is called twice (returns true the first time, false the second) + // The parser stops after the callback returns false + EXPECT_EQ(event_count, 2); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + ::testing::AddGlobalTestEnvironment(server); + return RUN_ALL_TESTS(); +}