2424#define MQTT_SUB_TOPIC_FMT " $aws/things/%s/shadow/update/delta"
2525#define MQTT_PUB_TOPIC_FMT " %s/sensors"
2626
27- #define MQTT_MESSAGE_BUFFERS 4
28-
2927#define NETWORK_CONN_FLAG 1 << 0
3028#define NETWORK_DISCONN_FLAG 1 << 1
3129#define BROKER_CONN_FLAG 1 << 2
@@ -56,12 +54,7 @@ static volatile uint16_t target_seconds = 0;
5654static volatile uint16_t data_frequency = 0 ;
5755static unsigned long last_data_time = 0 ;
5856
59- static char topic_buffer[MQTT_MESSAGE_BUFFERS][MQTT_TOPIC_MAX_LENGTH + 1 ] =
60- {" " , " " , " " , " " };
61- static volatile uint16_t message_length[MQTT_MESSAGE_BUFFERS] = {0 , 0 , 0 , 0 };
62- static volatile int32_t message_id[MQTT_MESSAGE_BUFFERS] = {-1 , -1 , -1 , -1 };
63- static volatile uint8_t message_head_index = 0 ;
64- static volatile uint8_t message_tail_index = 0 ;
57+ static volatile uint16_t awaiting_messages = 0 ;
6558
6659static const char heartbeat_message[] = " {\" type\" : \" heartbeat\" }" ;
6760
@@ -100,12 +93,7 @@ void disconnectedFromBroker(void) { event_flags |= BROKER_DISCONN_FLAG; }
10093void receivedMessage (const char *topic,
10194 const uint16_t msg_length,
10295 const int32_t msg_id) {
103-
104- memcpy (topic_buffer[message_head_index], topic, MQTT_TOPIC_MAX_LENGTH);
105- message_length[message_head_index] = msg_length;
106- message_id[message_head_index] = msg_id;
107-
108- message_head_index = (message_head_index + 1 ) % MQTT_MESSAGE_BUFFERS;
96+ awaiting_messages++;
10997}
11098
11199void connectMqtt () {
@@ -141,8 +129,9 @@ void startStreamTimer() {
141129
142130void stopStreamTimer () { TCA0.SINGLE .CTRLA = 0 ; }
143131
132+ static StaticJsonDocument<800 > doc;
133+
144134void decodeMessage (const char *message) {
145- StaticJsonDocument<800 > doc;
146135 DeserializationError error = deserializeJson (doc, message);
147136
148137 if (error) {
@@ -221,11 +210,11 @@ void decodeMessage(const char *message) {
221210}
222211
223212void printHelp () {
224- Log.rawf (" \n Available Commands\n "
225- " -----------------------\n "
226- " help\t\t Print this message\n "
213+ Log.rawf (" \r\ n Available Commands\r \n"
214+ " -----------------------\r\ n "
215+ " help\t\t Print this message\r\ n "
227216 " loglevel=level\t Set the log level. Available levels are debug, "
228- " info, warn, error\n "
217+ " info, warn, error\r\ n "
229218 " -----------------------\r\n " );
230219}
231220
@@ -322,7 +311,6 @@ void setup() {
322311unsigned long timeLastCellToggle = millis() + 500 ;
323312
324313void loop () {
325-
326314 // See if there are any messages for the command handler
327315 if (Serial3.available ()) {
328316 String extractedString = Serial3.readStringUntil (' \n ' );
@@ -378,7 +366,7 @@ void loop() {
378366
379367 Log.info (" Connected to MQTT broker, subscribing to topics!\r\n " );
380368
381- MqttClient.subscribe (mqtt_sub_topic, MqttQoS::AT_LEAST_ONCE );
369+ MqttClient.subscribe (mqtt_sub_topic);
382370
383371 break ;
384372 default :
@@ -415,27 +403,37 @@ void loop() {
415403 }
416404
417405 event_flags &= ~BROKER_DISCONN_FLAG;
418- } else if (message_head_index != message_tail_index ) {
406+ } else if (awaiting_messages > 0 ) {
419407
420408 switch (state) {
421409 case CONNECTED_TO_BROKER:
422410 case STREAMING_DATA: {
423411
424- // Extra space for termination
425- char message[message_length[message_tail_index] + 16 ] = " " ;
412+ char message[400 ] = " " ;
426413
427- if (!MqttClient.readMessage (topic_buffer[message_tail_index],
428- (uint8_t *)message,
429- sizeof (message),
430- message_id[message_tail_index])) {
414+ const bool message_read_successfully = MqttClient.readMessage (
415+ mqtt_sub_topic, (uint8_t *)message, sizeof (message));
431416
417+ cli ();
418+ awaiting_messages--;
419+ sei ();
420+
421+ if (message_read_successfully) {
422+ decodeMessage (message);
423+ } else {
432424 Log.error (" Failed to read message\r\n " );
433- }
434425
435- decodeMessage (message);
426+ if (awaiting_messages > 0 ) {
427+ Log.errorf (" Lost %d message(s) due to messages coming in "
428+ " too quickly to process\r\n " ,
429+ awaiting_messages);
436430
437- message_tail_index =
438- (message_tail_index + 1 ) % MQTT_MESSAGE_BUFFERS;
431+ MqttClient.clearMessages (mqtt_sub_topic, awaiting_messages);
432+ cli ();
433+ awaiting_messages = 0 ;
434+ sei ();
435+ }
436+ }
439437
440438 } break ;
441439
@@ -452,6 +450,7 @@ void loop() {
452450
453451 Log.info (" Sending hearbeat" );
454452 MqttClient.publish (mqtt_pub_topic, heartbeat_message);
453+
455454 last_heartbeat_time = millis ();
456455
457456 break ;
0 commit comments