diff --git a/Esp32LocalServer.cpp b/Esp32LocalServer.cpp index 5a68a61..7192d6f 100644 --- a/Esp32LocalServer.cpp +++ b/Esp32LocalServer.cpp @@ -52,6 +52,9 @@ size_t Esp32LocalClient::write(const char *buffer, size_t length) { return client.write((const uint8_t *)buffer, length); } +size_t Esp32LocalClient::write(const __FlashStringHelper *buffer, size_t length) { + return client.write((const uint8_t *)buffer, length); +} int Esp32LocalClient::peek() { return client.peek(); } diff --git a/Esp32LocalServer.h b/Esp32LocalServer.h index 399dfc9..fe24a06 100644 --- a/Esp32LocalServer.h +++ b/Esp32LocalServer.h @@ -22,6 +22,7 @@ namespace OTF { void print(const char *data); void print(const __FlashStringHelper *data); size_t write(const char *buffer, size_t length); + size_t write(const __FlashStringHelper *buffer, size_t length); int peek(); void setTimeout(int timeout); void flush(); diff --git a/Esp8266LocalServer.cpp b/Esp8266LocalServer.cpp index 359075c..9b5bce0 100644 --- a/Esp8266LocalServer.cpp +++ b/Esp8266LocalServer.cpp @@ -40,10 +40,6 @@ size_t Esp8266LocalClient::readBytesUntil(char terminator, char *buffer, size_t return client.readBytesUntil(terminator, buffer, length); } -size_t Esp8266LocalClient::write(const char *buffer, size_t size) { - return client.write((const uint8_t *)buffer, size); -} - void Esp8266LocalClient::print(const char *data) { client.print(data); } @@ -52,6 +48,14 @@ void Esp8266LocalClient::print(const __FlashStringHelper *data) { client.print(data); } +size_t Esp8266LocalClient::write(const char *buffer, size_t size) { + return client.write((const uint8_t *)buffer, size); +} + +size_t Esp8266LocalClient::write(const __FlashStringHelper *buffer, size_t size) { + return client.write_P((const char*) buffer, size); +} + int Esp8266LocalClient::peek() { return client.peek(); } diff --git a/Esp8266LocalServer.h b/Esp8266LocalServer.h index ee1deea..271c4ff 100644 --- a/Esp8266LocalServer.h +++ b/Esp8266LocalServer.h @@ -19,9 +19,10 @@ namespace OTF { bool dataAvailable(); size_t readBytes(char *buffer, size_t length); size_t readBytesUntil(char terminator, char *buffer, size_t length); - size_t write(const char *buffer, size_t length); void print(const char *data); void print(const __FlashStringHelper *data); + size_t write(const char *buffer, size_t length); + size_t write(const __FlashStringHelper *buffer, size_t length); int peek(); void setTimeout(int timeout); void flush(); diff --git a/EspLocalServer.cpp b/EspLocalServer.cpp new file mode 100644 index 0000000..c3f1369 --- /dev/null +++ b/EspLocalServer.cpp @@ -0,0 +1,75 @@ +#if defined(ESP8266) || defined(ESP32) +#include "EspLocalServer.h" + +using namespace OTF; + +EspLocalServer::EspLocalServer(uint16_t port) : server(port) {} + +LocalClient *EspLocalServer::acceptClient() { + if (activeClient != nullptr) { + delete activeClient; + } + + WiFiClient wiFiClient = server.available(); + if (wiFiClient) { + activeClient = new EspLocalClient(wiFiClient); + } else { + activeClient = nullptr; + } + return activeClient; +} + +void EspLocalServer::begin() { + server.begin(); +} + + +EspLocalClient::EspLocalClient(WiFiClient client) { + this->client = client; +} + +bool EspLocalClient::dataAvailable() { + return client.available(); +} + +size_t EspLocalClient::readBytes(char *buffer, size_t length) { + return client.readBytes(buffer, length); +} + +size_t EspLocalClient::readBytesUntil(char terminator, char *buffer, size_t length) { + return client.readBytesUntil(terminator, buffer, length); +} + +void EspLocalClient::print(const char *data) { + client.print(data); +} + +void EspLocalClient::print(const __FlashStringHelper *data) { + client.print(data); +} + +size_t EspLocalClient::write(const char *buffer, size_t size) { + return client.write(buffer, size); +} + +size_t EspLocalClient::write(const __FlashStringHelper *const buffer, size_t size) { + return client.write_P((const char*) buffer, size); +} + +int EspLocalClient::peek() { + return client.peek(); +} + +void EspLocalClient::setTimeout(int timeout) { + client.setTimeout(timeout); +} + +void EspLocalClient::flush() { + client.flush(); +} + +void EspLocalClient::stop() { + client.stop(); +} + +#endif diff --git a/EspLocalServer.h b/EspLocalServer.h new file mode 100644 index 0000000..8ac263e --- /dev/null +++ b/EspLocalServer.h @@ -0,0 +1,51 @@ +#if defined(ESP8266) || defined(ESP32) +#ifndef OTF_ESPLOCALSERVER_H +#define OTF_ESPLOCALSERVER_H + +#include "LocalServer.h" + +#include +#if defined(ESP8266) + #include +#else + #include +#endif + +namespace OTF { + class EspLocalClient : public LocalClient { + friend class EspLocalServer; + + private: + WiFiClient client; + EspLocalClient(WiFiClient client); + + public: + bool dataAvailable(); + size_t readBytes(char *buffer, size_t length); + size_t readBytesUntil(char terminator, char *buffer, size_t length); + void print(const char *data); + void print(const __FlashStringHelper *const data); + size_t write(const char *buffer, size_t size); + size_t write(const __FlashStringHelper *buffer, size_t size); + int peek(); + void setTimeout(int timeout); + void flush(); + void stop(); + }; + + + class EspLocalServer : public LocalServer { + private: + WiFiServer server; + EspLocalClient *activeClient = nullptr; + + public: + EspLocalServer(uint16_t port); + + LocalClient *acceptClient(); + void begin(); + }; +}// namespace OTF + +#endif +#endif diff --git a/LocalServer.h b/LocalServer.h index 055122e..89db984 100644 --- a/LocalServer.h +++ b/LocalServer.h @@ -25,11 +25,14 @@ namespace OTF { virtual void print(const char *data) = 0; /** Prints a null-terminated string to the response stream. This method may be called multiple times before the stream is closed. */ - virtual void print(const __FlashStringHelper *data) = 0; + virtual void print(const __FlashStringHelper *const data) = 0; /** Writes `size` bytes from `buffer` to the response stream. */ virtual size_t write(const char *buffer, size_t size) = 0; + /** Writes `size` bytes from `buffer` to the response stream from program memory. */ + virtual size_t write(const __FlashStringHelper *buffer, size_t size) = 0; + /** Returns the next character in the request stream (without advancing the stream), or returns -1 if no character is available. */ virtual int peek() = 0; diff --git a/OpenThingsFramework.cpp b/OpenThingsFramework.cpp index bdc3cc6..861cac6 100755 --- a/OpenThingsFramework.cpp +++ b/OpenThingsFramework.cpp @@ -20,9 +20,12 @@ OpenThingsFramework::OpenThingsFramework(uint16_t webServerPort, char *hdBuffer, headerBufferSize = HEADERS_BUFFER_SIZE; } missingPageCallback = defaultMissingPageCallback; - localServer.begin(); }; +void OpenThingsFramework::localServerBegin() { + localServer.begin(); +} + OpenThingsFramework::OpenThingsFramework(uint16_t webServerPort, const String &webSocketHost, uint16_t webSocketPort, const String &deviceKey, bool useSsl, char *hdBuffer, int hdBufferSize) : OpenThingsFramework(webServerPort, hdBuffer, hdBufferSize) { setCloudStatus(UNABLE_TO_CONNECT); @@ -30,32 +33,24 @@ OpenThingsFramework::OpenThingsFramework(uint16_t webServerPort, const String &w webSocket = new WebsocketClient(); // Wrap the member function in a static function. - webSocket->onEvent([this](websockets::WebsocketsEvent event, String data) -> void { + webSocket->onEvent([this](WSEvent_t type, uint8_t *payload, size_t length) -> void { DEBUG(Serial.printf((char *) F("Received websocket event of type %d\n"), event);) - webSocketEventCallback(event, data); + webSocketEventCallback(type, payload, length); }); - // Wrap the member function in a static function. - webSocket->onMessage([this](websockets::WebsocketsMessage message) -> void { - DEBUG(Serial.println(F("Received websocket message"));) - webSocketMessageCallback(message); - }); - - bool connected; if (useSsl) { DEBUG(Serial.println(F("Connecting to websocket with SSL"));) - // connected = webSocket->connectSecure(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey); + // webSocket->connectSecure(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey); } else { DEBUG(Serial.println(F("Connecting to websocket without SSL"));) - connected = webSocket->connect(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey); + webSocket->connect(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey); } DEBUG(Serial.println(F("Initialized websocket"));) - // Try to reconnect to the websocket if the connection is lost. The first time it is connecting has a timeout of 1 second/ - // After that, it will try to reconnect every WEBSOCKET_RECONNECT_INTERVAL milliseconds. - webSocket->enableWebSocketReconnect(1000, WEBSOCKET_RECONNECT_INTERVAL); + // Try to reconnect to the websocket if the connection is lost. + webSocket->setReconnectInterval(WEBSOCKET_RECONNECT_INTERVAL); // Ping the server every 15 seconds with a timeout of 5 seconds, and treat 1 missed ping as a lost connection. - webSocket->enableWebSocketHeartbeat(15000, 5000, 1); + webSocket->enableHeartbeat(15000, 5000, 1); } char *makeMapKey(StringBuilder *sb, HTTPMethod method, const char *path) { @@ -101,8 +96,7 @@ void OpenThingsFramework::localServerLoop() { // got new client data, reset wait_to to 0 wait_to = 0; - - // Update the timeout for each data read to ensure that the total timeout is WIFI_CONNECTION_TIMEOUT. + // Update the timeout for each data read to ensure that the total timeout is WIFI_CONNECTION_TIMEOUT. unsigned int timeout = millis()+WIFI_CONNECTION_TIMEOUT; @@ -122,6 +116,7 @@ void OpenThingsFramework::localServerLoop() { buffer[length++] = '\n'; if(read==1 && rc=='\r') { break; } } + DEBUG(Serial.printf((char *) F("Finished reading data from client. Request line + headers were %d bytes\n"), length);) buffer[length] = 0; @@ -161,7 +156,7 @@ void OpenThingsFramework::localServerLoop() { // Make response stream to client Response res = Response(); - res.enableStream([this](const char *buffer, size_t length, bool streaming) -> void { + res.enableStream([this](const char *buffer, size_t length, bool first_message) -> void { localClient->write(buffer, length); }, [this]() -> void { localClient->flush(); @@ -202,9 +197,9 @@ void OpenThingsFramework::loop() { } } -void OpenThingsFramework::webSocketEventCallback(websockets::WebsocketsEvent event, String data) { - switch (event) { - case websockets::WebsocketsEvent::ConnectionClosed: { +void OpenThingsFramework::webSocketEventCallback(WSEvent_t type, uint8_t *payload, size_t length) { + switch (type) { + case WSEvent_DISCONNECTED: { DEBUG(Serial.println(F("Websocket connection closed"));) if (cloudStatus == CONNECTED) { // Make sure the cloud status is only set to disconnected if it was previously connected. @@ -213,35 +208,29 @@ void OpenThingsFramework::webSocketEventCallback(websockets::WebsocketsEvent eve break; } - case websockets::WebsocketsEvent::ConnectionOpened: { + case WSEvent_CONNECTED: { DEBUG(Serial.println(F("Websocket connection opened"));) setCloudStatus(CONNECTED); break; } - case websockets::WebsocketsEvent::GotPing: { + case WSEvent_PING: { DEBUG(Serial.println(F("Received a ping from the server"));) break; } - case websockets::WebsocketsEvent::GotPong: { + case WSEvent_PONG: { DEBUG(Serial.println(F("Received a pong from the server"));) break; } - } -} -void OpenThingsFramework::webSocketMessageCallback(websockets::WebsocketsMessage message) { - websockets::MessageType type = message.type(); - switch (type) { - case websockets::MessageType::Text: { + case WSEvent_TEXT: { #define PREFIX_LENGTH 5 #define ID_LENGTH 4 // Length of the prefix, request ID, carriage return, and line feed. #define HEADER_LENGTH PREFIX_LENGTH + ID_LENGTH + 2 - char *message_data = (char*) message.c_str(); - size_t length = message.length(); + char *message_data = (char*) payload; if (strncmp_P(message_data, (char *) F("FWD: "), PREFIX_LENGTH) == 0) { DEBUG(Serial.println(F("Message is a forwarded request."));) @@ -252,9 +241,10 @@ void OpenThingsFramework::webSocketMessageCallback(websockets::WebsocketsMessage Request request(&message_data[HEADER_LENGTH], length - HEADER_LENGTH, true); Response res = Response(); // Make response stream to websocket - res.enableStream([this] (const char *buffer, size_t length, bool streaming) -> void { + res.enableStream([this] (const char *buffer, size_t length, bool first_message) -> void { // If the websocket is not already streaming, start streaming. - if (!streaming) { + if (first_message) { + WS_DEBUG("Starting stream\n"); webSocket->stream(); } @@ -290,12 +280,7 @@ void OpenThingsFramework::webSocketMessageCallback(websockets::WebsocketsMessage } break; } - - case websockets::MessageType::Ping: - case websockets::MessageType::Pong: - // These do not get forwarded to the message callback. - break; - + default: { DEBUG(Serial.printf((char *) F("Received unsupported websocket event of type %d\n"), type);) break; diff --git a/OpenThingsFramework.h b/OpenThingsFramework.h index beed154..d4c7e0a 100644 --- a/OpenThingsFramework.h +++ b/OpenThingsFramework.h @@ -7,13 +7,8 @@ #include #include "Websocket.h" -#if defined(ESP8266) - #include "Esp8266LocalServer.h" - #define LOCAL_SERVER_CLASS Esp8266LocalServer -#elif defined(ESP32) - #include "Esp32LocalServer.h" - #define LOCAL_SERVER_CLASS Esp32LocalServer -#endif +#include "EspLocalServer.h" +#define LOCAL_SERVER_CLASS EspLocalServer // The size of the buffer to store the incoming request line and headers (does not include body). Larger requests will be discarded. #define HEADERS_BUFFER_SIZE 1536 @@ -44,8 +39,7 @@ namespace OTF { char *headerBuffer = NULL; int headerBufferSize = 0; - void webSocketMessageCallback(websockets::WebsocketsMessage message); - void webSocketEventCallback(websockets::WebsocketsEvent event, String data); + void webSocketEventCallback(WSEvent_t type, uint8_t *payload, size_t length); void fillResponse(const Request &req, Response &res); void localServerLoop(); @@ -95,7 +89,8 @@ namespace OTF { void onMissingPage(callback_t callback); void loop(); - + void localServerBegin(); + /** Returns the current status of the connection to the OpenThings Cloud server. */ CLOUD_STATUS getCloudStatus(); diff --git a/StringBuilder.cpp b/StringBuilder.cpp index 8dc5b45..abe0cde 100644 --- a/StringBuilder.cpp +++ b/StringBuilder.cpp @@ -8,7 +8,7 @@ StringBuilder::StringBuilder(size_t maxLength) { } StringBuilder::~StringBuilder() { - delete buffer; + delete[] buffer; } void StringBuilder::bprintf(char *format, va_list args) { @@ -20,9 +20,10 @@ void StringBuilder::bprintf(char *format, va_list args) { size_t res = vsnprintf(&buffer[length], maxLength - length, format, args); - if (stream_write && ((res >= maxLength) || (length + res >= maxLength))) { + if (streaming && ((res >= maxLength) || (length + res >= maxLength))) { // If in streaming mode flush the buffer and continue writing if the data doesn't fit. stream_write(buffer, length, streaming); + first_message = false; stream_flush(); clear(); res = vsnprintf(&buffer[length], maxLength - length, format, args); @@ -75,8 +76,9 @@ size_t StringBuilder::_write(const char *data, size_t data_length, bool use_pgm) // If the buffer is full, flush it and continue writing. if (write_length == 0) { - if (stream_write) { + if (streaming) { stream_write(buffer, length, streaming); + first_message = false; stream_flush(); clear(); } else { @@ -111,6 +113,8 @@ size_t StringBuilder::write_P(const __FlashStringHelper *const data, size_t data } void StringBuilder::enableStream(stream_write_t write, stream_flush_t flush, stream_end_t end) { + streaming = true; + first_message = true; stream_write = write; stream_flush = flush; stream_end = end; diff --git a/StringBuilder.hpp b/StringBuilder.hpp index ab883c3..7aa2880 100644 --- a/StringBuilder.hpp +++ b/StringBuilder.hpp @@ -25,6 +25,7 @@ namespace OTF { stream_flush_t stream_flush = nullptr; stream_end_t stream_end = nullptr; bool streaming = false; + bool first_message = true; /** * Internal write function diff --git a/Websocket.cpp b/Websocket.cpp index 5cf42af..98ecd03 100644 --- a/Websocket.cpp +++ b/Websocket.cpp @@ -1,102 +1,80 @@ #include "Websocket.h" -void WebsocketClient::enableWebSocketHeartbeat(unsigned long interval, unsigned long timeout, unsigned long maxMissed) { - webSocketHeartbeatInterval = interval; - webSocketHeartbeatTimeout = timeout; - webSocketHeartbeatMaxMissed = maxMissed; - webSocketHeartbeatEnabled = true; +void WebsocketClient::enableHeartbeat(unsigned long interval, unsigned long timeout, uint8_t maxMissed) { + WebSocketsClient::enableHeartbeat(interval, timeout, maxMissed); } -void WebsocketClient::disableWebSocketHeartbeat() { - webSocketHeartbeatEnabled = false; +void WebsocketClient::disableHeartbeat() { + WebSocketsClient::disableHeartbeat(); } -void WebsocketClient::enableWebSocketReconnect(unsigned long firstInterval, unsigned long interval) { - webSocketFirstReconnectInterval = firstInterval; - webSocketReconnectInterval = interval; - webSocketReconnectEnabled = true; +void WebsocketClient::setReconnectInterval(unsigned long interval) { + WebSocketsClient::setReconnectInterval(interval); } -void WebsocketClient::disableWebSocketReconnect() { - webSocketReconnectEnabled = false; +void WebsocketClient::poll() { + WebSocketsClient::loop(); } -void WebsocketClient::poll() { - websockets::WebsocketsClient::poll(); - if (webSocketHeartbeatEnabled && available()) { - if (!webSocketHeartbeatInProgress && (millis() - webSocketHeartbeatLastSent > webSocketHeartbeatInterval)) { - if (webSocketHeartbeatMissed >= webSocketHeartbeatMaxMissed) { - // Too many missed heartbeats, close the connection - WS_DEBUG("Too many missed heartbeats, closing connection\n"); - webSocketReconnectLastAttempt = 0; - webSocketHeartbeatMissed = 0; - websockets::WebsocketsClient::close(); - return; - } - - WS_DEBUG("Sending ping\n"); - ping(); - webSocketHeartbeatLastSent = millis(); - webSocketHeartbeatInProgress = true; - } +void WebsocketClient::onEvent(WebSocketEventCallback callback) { + WS_DEBUG("Setting event callback\n"); + this->eventCallback = callback; +} - if (webSocketHeartbeatInProgress && (millis() - webSocketHeartbeatLastSent > webSocketHeartbeatTimeout)) { - // Heartbeat timeout - WS_DEBUG("Heartbeat timeout\n"); - webSocketHeartbeatMissed++; - webSocketHeartbeatInProgress = false; - return; - } +void WebsocketClient::connect(WSInterfaceString host, int port, WSInterfaceString path) { + WS_DEBUG("Connecting to ws://%s:%d%s\n", host.c_str(), port, path.c_str()); + WebSocketsClient::begin(host, port, path); +} + +void WebsocketClient::connectSecure(WSInterfaceString host, int port, WSInterfaceString path) { + WebSocketsClient::beginSSL(host.c_str(), port, path.c_str()); +} + +bool WebsocketClient::stream() { + if (isStreaming) { + WS_DEBUG("Already streaming\n"); + return false; } - if (webSocketReconnectEnabled && webSocketShouldReconnect && !available()) { - if (millis() - webSocketReconnectLastAttempt > (webSocketReconnectFirstAttempt ? webSocketFirstReconnectInterval : webSocketReconnectInterval)) { - WS_DEBUG("Reconnecting...\n"); - // Attempt to reconnect - if (isSecure) { - websockets::WebsocketsClient::connectSecure(host, port, path); - } else { - websockets::WebsocketsClient::connect(host, port, path); - } - - WS_DEBUG("Reconnect attempt complete\n"); - WS_DEBUG("Connection status: %d\n", websockets::WebsocketsClient::available()); - webSocketReconnectLastAttempt = millis(); - } + if (clientIsConnected(&_client)) { + isStreaming = sendFrame(&_client, WSop_text, (uint8_t *)"", 0, false, false); + } else { + isStreaming = false; } - websockets::WebsocketsClient::poll(); + return isStreaming; } -void WebsocketClient::onEvent(websockets::PartialEventCallback callback) { - WS_DEBUG("Setting event callback\n"); - this->eventCallback = [callback](WebsocketsClient &, websockets::WebsocketsEvent event, websockets::WSInterfaceString data) { - callback(event, data); - }; +bool WebsocketClient::send(uint8_t *payload, size_t length, bool headerToPayload) { + WS_DEBUG("Sending message of length %d\n", length); + + if (length == 0) { + length = strlen((const char *) payload); + } + + if (clientIsConnected(&_client)) { + if (isStreaming) { + return sendFrame(&_client, WSop_continuation, payload, length, false, headerToPayload); + } else { + return sendFrame(&_client, WSop_text, payload, length, true, headerToPayload); + } + } + + return false; } -bool WebsocketClient::connect(websockets::WSInterfaceString host, int port, websockets::WSInterfaceString path) { - WS_DEBUG("Connecting to ws://%s:%d%s\n", host.c_str(), port, path.c_str()); - this->host = host; - this->port = port; - this->path = path; - webSocketReconnectFirstAttempt = true; - webSocketShouldReconnect = true; - webSocketHeartbeatMissed = 0; - webSocketHeartbeatInProgress = false; - isSecure = false; - return websockets::WebsocketsClient::connect(host, port, path); +bool WebsocketClient::send(const char *payload, size_t length, bool headerToPayload) { + return send((uint8_t *) payload, length, headerToPayload); } -bool WebsocketClient::connectSecure(websockets::WSInterfaceString host, int port, websockets::WSInterfaceString path) { - WS_DEBUG("Connecting to wss://%s:%d%s\n", host.c_str(), port, path.c_str()); - this->host = host; - this->port = port; - this->path = path; - webSocketReconnectFirstAttempt = true; - webSocketShouldReconnect = true; - webSocketHeartbeatMissed = 0; - webSocketHeartbeatInProgress = false; - isSecure = true; - return websockets::WebsocketsClient::connectSecure(host, port, path); +bool WebsocketClient::end() { + if (!isStreaming) { + return true; + } + + WS_DEBUG("Ending stream\n"); + + bool res = sendFrame(&_client, WSop_continuation, (uint8_t *)"", 0, true, false); + isStreaming = !res; + return res; } \ No newline at end of file diff --git a/Websocket.h b/Websocket.h index 860b937..92a72ae 100644 --- a/Websocket.h +++ b/Websocket.h @@ -1,50 +1,72 @@ #ifndef WEBSOCKET_H #define WEBSOCKET_H -#include +#include -#ifdef DEBUG -#define WS_DEBUG(...) Serial.print("Websocket: "); Serial.printf(__VA_ARGS__) +// #define ENABLE_DEBUG +#ifdef ENABLE_DEBUG +#define WS_DEBUG(...) \ + Serial.print("Websocket: "); \ + Serial.printf(__VA_ARGS__) #else #define WS_DEBUG(...) #endif -class WebsocketClient : public websockets::WebsocketsClient { +typedef String WSInterfaceString; + +typedef enum { + WSEvent_ERROR, + WSEvent_DISCONNECTED, + WSEvent_CONNECTED, + WSEvent_TEXT, + WSEvent_BIN, + // WStype_FRAGMENT_TEXT_START, + // WStype_FRAGMENT_BIN_START, + // WStype_FRAGMENT, + // WStype_FRAGMENT_FIN, + WSEvent_PING, + WSEvent_PONG, +} WSEvent_t; + +typedef std::function WebSocketEventCallback; + +class WebsocketClient : protected WebSocketsClient { public: - WebsocketClient() : websockets::WebsocketsClient() { + WebsocketClient() : WebSocketsClient() { // Set up a callback to handle incoming pings - websockets::WebsocketsClient::onEvent([this](websockets::WebsocketsEvent event, websockets::WSInterfaceString message) { - switch (event) { - case websockets::WebsocketsEvent::GotPing: - // Respond to the ping - WS_DEBUG("Ponged a ping\n"); - pong(message); + WebSocketsClient::onEvent([this](WStype_t type, uint8_t *payload, size_t length) { + switch (type) { + case WStype_ERROR: + WS_DEBUG("Error!\n"); + _callback(WSEvent_ERROR, payload, length); break; - case websockets::WebsocketsEvent::GotPong: - WS_DEBUG("Received a pong\n"); - if (webSocketHeartbeatEnabled) { - // If heartbeat is enabled, reset the missed coun and set the heartbeat in progress flag to false - webSocketHeartbeatMissed = 0; - webSocketHeartbeatInProgress = false; - } + case WStype_DISCONNECTED: + WS_DEBUG("Disconnected!\n"); + _callback(WSEvent_DISCONNECTED, payload, length); break; - case websockets::WebsocketsEvent::ConnectionOpened: - WS_DEBUG("Connection opened\n"); - // Mark the first attempt to reconnect as false, so it will use the slower reconnect interval - webSocketReconnectFirstAttempt = false; + case WStype_CONNECTED: { + WS_DEBUG("Connected to url: %s\n", payload); + _callback(WSEvent_CONNECTED, payload, length); + } break; + case WStype_TEXT: + WS_DEBUG("get text: %s\n", payload); + _callback(WSEvent_TEXT, payload, length); break; - case websockets::WebsocketsEvent::ConnectionClosed: - WS_DEBUG("Connection closed\n"); - // If the connection was closed, set the heartbeat in progress flag to false - if (webSocketHeartbeatEnabled) { - webSocketHeartbeatMissed = 0; - webSocketHeartbeatInProgress = false; - } + case WStype_BIN: + WS_DEBUG("get binary length: %u\n", length); + _callback(WSEvent_BIN, payload, length); + break; + case WStype_PING: + WS_DEBUG("get ping\n"); + _callback(WSEvent_PING, payload, length); + break; + case WStype_PONG: + WS_DEBUG("get pong\n"); + _callback(WSEvent_PONG, payload, length); + break; + default: + WS_DEBUG("Unknown event type: %d\n", type); break; - } - - if (eventCallback) { - eventCallback(*this, event, message); } }); } @@ -55,10 +77,8 @@ class WebsocketClient : public websockets::WebsocketsClient { * @param host String containing the host name or IP address of the server * @param port Port number to connect to * @param path Path to connect to on the server - * @return true Connection was successful - * @return false Connection was unsuccessful */ - bool connect(websockets::WSInterfaceString host, int port, websockets::WSInterfaceString path); + void connect(WSInterfaceString host, int port, WSInterfaceString path); /** * @brief Connect to a websocket server using a secure connection @@ -66,10 +86,8 @@ class WebsocketClient : public websockets::WebsocketsClient { * @param host String containing the host name or IP address of the server * @param port Port number to connect to * @param path Path to connect to on the server - * @return true Connection was successful - * @return false Connection was unsuccessful */ - bool connectSecure(websockets::WSInterfaceString host, int port, websockets::WSInterfaceString path); + void connectSecure(WSInterfaceString host, int port, WSInterfaceString path); /** * @brief Close the connection to the websocket server @@ -77,10 +95,7 @@ class WebsocketClient : public websockets::WebsocketsClient { */ void close() { WS_DEBUG("Closing connection\n"); - webSocketShouldReconnect = false; - webSocketHeartbeatMissed = 0; - webSocketHeartbeatInProgress = false; - websockets::WebsocketsClient::close(); + WebSocketsClient::disconnect(); } /** @@ -90,27 +105,20 @@ class WebsocketClient : public websockets::WebsocketsClient { * @param timeout Time in milliseconds to wait for a response to the heartbeat * @param maxMissed Maximum number of missed heartbeats before closing the connection */ - void enableWebSocketHeartbeat(unsigned long interval, unsigned long timeout, unsigned long maxMissed); + void enableHeartbeat(unsigned long interval, unsigned long timeout, uint8_t maxMissed); /** * @brief Disable the heartbeat * */ - void disableWebSocketHeartbeat(); + void disableHeartbeat(); /** - * @brief Enable automatic reconnection to the websocket server + * @brief Sets the interval between reconnection attempts * - * @param firstInterval Time in milliseconds to wait before the first successful connection attempt * @param interval Time in milliseconds between reconnection attempts */ - void enableWebSocketReconnect(unsigned long firstInterval, unsigned long interval); - - /** - * @brief Disable automatic reconnection - * - */ - void disableWebSocketReconnect(); + void setReconnectInterval(unsigned long interval); /** * @brief Poll the websocket connection @@ -123,30 +131,76 @@ class WebsocketClient : public websockets::WebsocketsClient { * * @param callback Function to run when an event occurs */ - void onEvent(websockets::PartialEventCallback callback); + void onEvent(WebSocketEventCallback callback); + + /** + * @brief Enable streaming mode + * @return true Streaming mode enabled + * @return false Streaming mode not enabled + */ + bool stream(); + + /** + * @brief Send a text message to the server + * @param payload Data to send + * @param length Length of the data to send + * @param headerToPayload bool (see sendFrame for more details) + * @return true Message was successful + * @return false Message was unsuccessful + */ + bool send(uint8_t *payload, size_t length, bool headerToPayload = false); + + /** + * @brief Send a text message to the server + * @param payload Data to send + * @param length Length of the data to send + * @param headerToPayload bool (see sendFrame for more details) + * @return true Message was successful + * @return false Message was unsuccessful + */ + bool send(const char *payload, size_t length, bool headerToPayload = false); + + /** + * @brief End the stream + * @return true Stream ended + * @return false Stream failed to end + */ + bool end(); private: - unsigned int webSocketHeartbeatInterval = 0; - unsigned int webSocketHeartbeatTimeout = 0; - unsigned long webSocketHeartbeatLastSent = 0; - unsigned int webSocketHeartbeatMissed = 0; - unsigned int webSocketHeartbeatMaxMissed = 0; - bool webSocketHeartbeatInProgress = false; - bool webSocketHeartbeatEnabled = false; - - unsigned int webSocketReconnectInterval = 0; - unsigned int webSocketFirstReconnectInterval = 0; - unsigned long webSocketReconnectLastAttempt = 0; - bool webSocketReconnectFirstAttempt = true; - bool webSocketReconnectEnabled = false; - bool webSocketShouldReconnect = false; - websockets::WSInterfaceString host; - int port; - websockets::WSInterfaceString path; - - websockets::EventCallback eventCallback = nullptr; + // unsigned int webSocketHeartbeatInterval = 0; + // unsigned int webSocketHeartbeatTimeout = 0; + // unsigned long webSocketHeartbeatLastSent = 0; + // unsigned int webSocketHeartbeatMissed = 0; + // unsigned int webSocketHeartbeatMaxMissed = 0; + // bool webSocketHeartbeatInProgress = false; + // bool webSocketHeartbeatEnabled = false; + + // unsigned int webSocketReconnectInterval = 0; + // unsigned int webSocketFirstReconnectInterval = 0; + // unsigned long webSocketReconnectLastAttempt = 0; + // bool webSocketReconnectFirstAttempt = true; + // bool webSocketReconnectEnabled = false; + // bool webSocketShouldReconnect = false; + + bool enableReconnect = false; + unsigned long reconnectInterval = 0; + + WSInterfaceString host; + int port; + WSInterfaceString path; + + WebSocketEventCallback eventCallback = nullptr; + + void _callback(WSEvent_t type, uint8_t * payload, size_t length) { + if (eventCallback) { + eventCallback(type, payload, length); + } + } bool isSecure = false; + + bool isStreaming = false; }; #endif \ No newline at end of file diff --git a/library.json b/library.json index 1f870bf..6bdd84d 100644 --- a/library.json +++ b/library.json @@ -4,6 +4,6 @@ "author": "rayshobby", "description": "OpenThings Framework Library", "dependencies": { - "ArduinoWebsockets": "gilmaimon/ArduinoWebsockets@^0.5.4" + "WebSockets": "links2004/WebSockets@^2.4.2" } } \ No newline at end of file