Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Esp32LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ size_t Esp32LocalClient::write(const char *buffer, size_t length) {
return client.write((const uint8_t *)buffer, length);
}

size_t Esp32LocalClient::write(const __FlashStringHelper *buffer, size_t length) {
return client.write((const uint8_t *)buffer, length);
}
int Esp32LocalClient::peek() {
return client.peek();
}
Expand Down
1 change: 1 addition & 0 deletions Esp32LocalServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace OTF {
void print(const char *data);
void print(const __FlashStringHelper *data);
size_t write(const char *buffer, size_t length);
size_t write(const __FlashStringHelper *buffer, size_t length);
int peek();
void setTimeout(int timeout);
void flush();
Expand Down
12 changes: 8 additions & 4 deletions Esp8266LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ size_t Esp8266LocalClient::readBytesUntil(char terminator, char *buffer, size_t
return client.readBytesUntil(terminator, buffer, length);
}

size_t Esp8266LocalClient::write(const char *buffer, size_t size) {
return client.write((const uint8_t *)buffer, size);
}

void Esp8266LocalClient::print(const char *data) {
client.print(data);
}
Expand All @@ -52,6 +48,14 @@ void Esp8266LocalClient::print(const __FlashStringHelper *data) {
client.print(data);
}

size_t Esp8266LocalClient::write(const char *buffer, size_t size) {
return client.write((const uint8_t *)buffer, size);
}

size_t Esp8266LocalClient::write(const __FlashStringHelper *buffer, size_t size) {
return client.write_P((const char*) buffer, size);
}

int Esp8266LocalClient::peek() {
return client.peek();
}
Expand Down
3 changes: 2 additions & 1 deletion Esp8266LocalServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ namespace OTF {
bool dataAvailable();
size_t readBytes(char *buffer, size_t length);
size_t readBytesUntil(char terminator, char *buffer, size_t length);
size_t write(const char *buffer, size_t length);
void print(const char *data);
void print(const __FlashStringHelper *data);
size_t write(const char *buffer, size_t length);
size_t write(const __FlashStringHelper *buffer, size_t length);
int peek();
void setTimeout(int timeout);
void flush();
Expand Down
75 changes: 75 additions & 0 deletions EspLocalServer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#if defined(ESP8266) || defined(ESP32)
#include "EspLocalServer.h"

using namespace OTF;

EspLocalServer::EspLocalServer(uint16_t port) : server(port) {}

LocalClient *EspLocalServer::acceptClient() {
if (activeClient != nullptr) {
delete activeClient;
}

WiFiClient wiFiClient = server.available();
if (wiFiClient) {
activeClient = new EspLocalClient(wiFiClient);
} else {
activeClient = nullptr;
}
return activeClient;
}

void EspLocalServer::begin() {
server.begin();
}


EspLocalClient::EspLocalClient(WiFiClient client) {
this->client = client;
}

bool EspLocalClient::dataAvailable() {
return client.available();
}

size_t EspLocalClient::readBytes(char *buffer, size_t length) {
return client.readBytes(buffer, length);
}

size_t EspLocalClient::readBytesUntil(char terminator, char *buffer, size_t length) {
return client.readBytesUntil(terminator, buffer, length);
}

void EspLocalClient::print(const char *data) {
client.print(data);
}

void EspLocalClient::print(const __FlashStringHelper *data) {
client.print(data);
}

size_t EspLocalClient::write(const char *buffer, size_t size) {
return client.write(buffer, size);
}

size_t EspLocalClient::write(const __FlashStringHelper *const buffer, size_t size) {
return client.write_P((const char*) buffer, size);
}

int EspLocalClient::peek() {
return client.peek();
}

void EspLocalClient::setTimeout(int timeout) {
client.setTimeout(timeout);
}

void EspLocalClient::flush() {
client.flush();
}

void EspLocalClient::stop() {
client.stop();
}

#endif
51 changes: 51 additions & 0 deletions EspLocalServer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#if defined(ESP8266) || defined(ESP32)
#ifndef OTF_ESPLOCALSERVER_H
#define OTF_ESPLOCALSERVER_H

#include "LocalServer.h"

#include <Arduino.h>
#if defined(ESP8266)
#include <ESP8266WiFi.h>
#else
#include <WiFi.h>
#endif

namespace OTF {
class EspLocalClient : public LocalClient {
friend class EspLocalServer;

private:
WiFiClient client;
EspLocalClient(WiFiClient client);

public:
bool dataAvailable();
size_t readBytes(char *buffer, size_t length);
size_t readBytesUntil(char terminator, char *buffer, size_t length);
void print(const char *data);
void print(const __FlashStringHelper *const data);
size_t write(const char *buffer, size_t size);
size_t write(const __FlashStringHelper *buffer, size_t size);
int peek();
void setTimeout(int timeout);
void flush();
void stop();
};


class EspLocalServer : public LocalServer {
private:
WiFiServer server;
EspLocalClient *activeClient = nullptr;

public:
EspLocalServer(uint16_t port);

LocalClient *acceptClient();
void begin();
};
}// namespace OTF

#endif
#endif
5 changes: 4 additions & 1 deletion LocalServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ namespace OTF {
virtual void print(const char *data) = 0;

/** Prints a null-terminated string to the response stream. This method may be called multiple times before the stream is closed. */
virtual void print(const __FlashStringHelper *data) = 0;
virtual void print(const __FlashStringHelper *const data) = 0;

/** Writes `size` bytes from `buffer` to the response stream. */
virtual size_t write(const char *buffer, size_t size) = 0;

/** Writes `size` bytes from `buffer` to the response stream from program memory. */
virtual size_t write(const __FlashStringHelper *buffer, size_t size) = 0;

/** Returns the next character in the request stream (without advancing the stream), or returns -1 if no character is available. */
virtual int peek() = 0;

Expand Down
67 changes: 26 additions & 41 deletions OpenThingsFramework.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,37 @@ OpenThingsFramework::OpenThingsFramework(uint16_t webServerPort, char *hdBuffer,
headerBufferSize = HEADERS_BUFFER_SIZE;
}
missingPageCallback = defaultMissingPageCallback;
localServer.begin();
};

void OpenThingsFramework::localServerBegin() {
localServer.begin();
}

OpenThingsFramework::OpenThingsFramework(uint16_t webServerPort, const String &webSocketHost, uint16_t webSocketPort,
const String &deviceKey, bool useSsl, char *hdBuffer, int hdBufferSize) : OpenThingsFramework(webServerPort, hdBuffer, hdBufferSize) {
setCloudStatus(UNABLE_TO_CONNECT);
DEBUG(Serial.println(F("Initializing websocket..."));)
webSocket = new WebsocketClient();

// Wrap the member function in a static function.
webSocket->onEvent([this](websockets::WebsocketsEvent event, String data) -> void {
webSocket->onEvent([this](WSEvent_t type, uint8_t *payload, size_t length) -> void {
DEBUG(Serial.printf((char *) F("Received websocket event of type %d\n"), event);)
webSocketEventCallback(event, data);
webSocketEventCallback(type, payload, length);
});

// Wrap the member function in a static function.
webSocket->onMessage([this](websockets::WebsocketsMessage message) -> void {
DEBUG(Serial.println(F("Received websocket message"));)
webSocketMessageCallback(message);
});

bool connected;
if (useSsl) {
DEBUG(Serial.println(F("Connecting to websocket with SSL"));)
// connected = webSocket->connectSecure(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey);
// webSocket->connectSecure(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey);
} else {
DEBUG(Serial.println(F("Connecting to websocket without SSL"));)
connected = webSocket->connect(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey);
webSocket->connect(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey);
}
DEBUG(Serial.println(F("Initialized websocket"));)

// Try to reconnect to the websocket if the connection is lost. The first time it is connecting has a timeout of 1 second/
// After that, it will try to reconnect every WEBSOCKET_RECONNECT_INTERVAL milliseconds.
webSocket->enableWebSocketReconnect(1000, WEBSOCKET_RECONNECT_INTERVAL);
// Try to reconnect to the websocket if the connection is lost.
webSocket->setReconnectInterval(WEBSOCKET_RECONNECT_INTERVAL);
// Ping the server every 15 seconds with a timeout of 5 seconds, and treat 1 missed ping as a lost connection.
webSocket->enableWebSocketHeartbeat(15000, 5000, 1);
webSocket->enableHeartbeat(15000, 5000, 1);
}

char *makeMapKey(StringBuilder *sb, HTTPMethod method, const char *path) {
Expand Down Expand Up @@ -101,8 +96,7 @@ void OpenThingsFramework::localServerLoop() {
// got new client data, reset wait_to to 0
wait_to = 0;


// Update the timeout for each data read to ensure that the total timeout is WIFI_CONNECTION_TIMEOUT.
// Update the timeout for each data read to ensure that the total timeout is WIFI_CONNECTION_TIMEOUT.
unsigned int timeout = millis()+WIFI_CONNECTION_TIMEOUT;


Expand All @@ -122,6 +116,7 @@ void OpenThingsFramework::localServerLoop() {
buffer[length++] = '\n';
if(read==1 && rc=='\r') { break; }
}

DEBUG(Serial.printf((char *) F("Finished reading data from client. Request line + headers were %d bytes\n"), length);)
buffer[length] = 0;

Expand Down Expand Up @@ -161,7 +156,7 @@ void OpenThingsFramework::localServerLoop() {

// Make response stream to client
Response res = Response();
res.enableStream([this](const char *buffer, size_t length, bool streaming) -> void {
res.enableStream([this](const char *buffer, size_t length, bool first_message) -> void {
localClient->write(buffer, length);
}, [this]() -> void {
localClient->flush();
Expand Down Expand Up @@ -202,9 +197,9 @@ void OpenThingsFramework::loop() {
}
}

void OpenThingsFramework::webSocketEventCallback(websockets::WebsocketsEvent event, String data) {
switch (event) {
case websockets::WebsocketsEvent::ConnectionClosed: {
void OpenThingsFramework::webSocketEventCallback(WSEvent_t type, uint8_t *payload, size_t length) {
switch (type) {
case WSEvent_DISCONNECTED: {
DEBUG(Serial.println(F("Websocket connection closed"));)
if (cloudStatus == CONNECTED) {
// Make sure the cloud status is only set to disconnected if it was previously connected.
Expand All @@ -213,35 +208,29 @@ void OpenThingsFramework::webSocketEventCallback(websockets::WebsocketsEvent eve
break;
}

case websockets::WebsocketsEvent::ConnectionOpened: {
case WSEvent_CONNECTED: {
DEBUG(Serial.println(F("Websocket connection opened"));)
setCloudStatus(CONNECTED);
break;
}

case websockets::WebsocketsEvent::GotPing: {
case WSEvent_PING: {
DEBUG(Serial.println(F("Received a ping from the server"));)
break;
}

case websockets::WebsocketsEvent::GotPong: {
case WSEvent_PONG: {
DEBUG(Serial.println(F("Received a pong from the server"));)
break;
}
}
}

void OpenThingsFramework::webSocketMessageCallback(websockets::WebsocketsMessage message) {
websockets::MessageType type = message.type();
switch (type) {
case websockets::MessageType::Text: {
case WSEvent_TEXT: {
#define PREFIX_LENGTH 5
#define ID_LENGTH 4
// Length of the prefix, request ID, carriage return, and line feed.
#define HEADER_LENGTH PREFIX_LENGTH + ID_LENGTH + 2

char *message_data = (char*) message.c_str();
size_t length = message.length();
char *message_data = (char*) payload;

if (strncmp_P(message_data, (char *) F("FWD: "), PREFIX_LENGTH) == 0) {
DEBUG(Serial.println(F("Message is a forwarded request."));)
Expand All @@ -252,9 +241,10 @@ void OpenThingsFramework::webSocketMessageCallback(websockets::WebsocketsMessage
Request request(&message_data[HEADER_LENGTH], length - HEADER_LENGTH, true);
Response res = Response();
// Make response stream to websocket
res.enableStream([this] (const char *buffer, size_t length, bool streaming) -> void {
res.enableStream([this] (const char *buffer, size_t length, bool first_message) -> void {
// If the websocket is not already streaming, start streaming.
if (!streaming) {
if (first_message) {
WS_DEBUG("Starting stream\n");
webSocket->stream();
}

Expand Down Expand Up @@ -290,12 +280,7 @@ void OpenThingsFramework::webSocketMessageCallback(websockets::WebsocketsMessage
}
break;
}

case websockets::MessageType::Ping:
case websockets::MessageType::Pong:
// These do not get forwarded to the message callback.
break;


default: {
DEBUG(Serial.printf((char *) F("Received unsupported websocket event of type %d\n"), type);)
break;
Expand Down
15 changes: 5 additions & 10 deletions OpenThingsFramework.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,8 @@
#include <Arduino.h>
#include "Websocket.h"

#if defined(ESP8266)
#include "Esp8266LocalServer.h"
#define LOCAL_SERVER_CLASS Esp8266LocalServer
#elif defined(ESP32)
#include "Esp32LocalServer.h"
#define LOCAL_SERVER_CLASS Esp32LocalServer
#endif
#include "EspLocalServer.h"
#define LOCAL_SERVER_CLASS EspLocalServer

// The size of the buffer to store the incoming request line and headers (does not include body). Larger requests will be discarded.
#define HEADERS_BUFFER_SIZE 1536
Expand Down Expand Up @@ -44,8 +39,7 @@ namespace OTF {
char *headerBuffer = NULL;
int headerBufferSize = 0;

void webSocketMessageCallback(websockets::WebsocketsMessage message);
void webSocketEventCallback(websockets::WebsocketsEvent event, String data);
void webSocketEventCallback(WSEvent_t type, uint8_t *payload, size_t length);

void fillResponse(const Request &req, Response &res);
void localServerLoop();
Expand Down Expand Up @@ -95,7 +89,8 @@ namespace OTF {
void onMissingPage(callback_t callback);

void loop();

void localServerBegin();

/** Returns the current status of the connection to the OpenThings Cloud server. */
CLOUD_STATUS getCloudStatus();

Expand Down
Loading