From cd969e0d716e46da8def7e26a9c0c443631c7308 Mon Sep 17 00:00:00 2001 From: Lucio Rossi Date: Fri, 26 Sep 2025 17:24:41 +0200 Subject: [PATCH 1/3] ver: 0.2.0 --- library.json | 2 +- library.properties | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/library.json b/library.json index 1fa6098..94d97ec 100644 --- a/library.json +++ b/library.json @@ -11,7 +11,7 @@ "url": "https://github.com/eigen-value", "maintainer": true }, - "version": "0.1.3", + "version": "0.2.0", "license": "MPL2.0", "frameworks": "arduino", "platforms": "*", diff --git a/library.properties b/library.properties index 8ebdfb6..1a36aeb 100644 --- a/library.properties +++ b/library.properties @@ -1,5 +1,5 @@ name=Arduino_RPClite -version=0.1.3 +version=0.2.0 author=Arduino, Lucio Rossi (eigen-value) maintainer=Arduino, Lucio Rossi (eigen-value) sentence=A MessagePack RPC library for Arduino From fb059ea76b53485772133aaf81e61c701bb96969 Mon Sep 17 00:00:00 2001 From: Lucio Rossi Date: Mon, 13 Oct 2025 17:40:23 +0200 Subject: [PATCH 2/3] feat: decoder.h can pick 1st available response anywhere in the buffer + tests --- examples/decoder_tests/decoder_tester.h | 30 +++ examples/decoder_tests/decoder_tests.ino | 230 ++++++++++++++++++++++- src/decoder.h | 101 +++++++--- 3 files changed, 331 insertions(+), 30 deletions(-) diff --git a/examples/decoder_tests/decoder_tester.h b/examples/decoder_tests/decoder_tester.h index 266064f..61c00b0 100644 --- a/examples/decoder_tests/decoder_tester.h +++ b/examples/decoder_tests/decoder_tester.h @@ -21,10 +21,40 @@ class DecoderTester { DecoderTester(RpcDecoder<>& _d): decoder(_d){} + void first_response_info() { + if (!decoder.response_queued()) { + Serial.println("No response queued"); + return; + } + Serial.println("-- First response info --"); + Serial.print("RESP OFFSET: "); + Serial.println(static_cast(decoder._response_offset)); + Serial.print("RESP SIZE: "); + Serial.println(static_cast(decoder._response_size)); + } + + size_t get_response_size() { + return decoder._response_size; + } + + size_t get_response_offset() { + return decoder._response_offset; + } + + template + bool get_response(const uint32_t msg_id, RType& result, RpcError& error) { + return decoder.get_response(msg_id, result, error); + } + void crop_bytes(size_t size, size_t offset){ decoder.consume(size, offset); } + void pop_first() { + uint8_t temp_buffer[512]; + decoder.pop_packet(temp_buffer, 512); + } + void print_raw_buf(){ Serial.print("Decoder raw buffer content: "); diff --git a/examples/decoder_tests/decoder_tests.ino b/examples/decoder_tests/decoder_tests.ino index b823bf9..46e92c0 100644 --- a/examples/decoder_tests/decoder_tests.ino +++ b/examples/decoder_tests/decoder_tests.ino @@ -48,7 +48,7 @@ void runDecoderTest(const char* label) { Serial.println("-- Done --\n"); } -void runDecoderConsumeTest(const char* label, size_t second_packet_sz) { +void runDecoderConsumeTest(const char* label, size_t expected_2nd_pack_size) { Serial.println(label); print_buf(); @@ -63,20 +63,184 @@ void runDecoderConsumeTest(const char* label, size_t second_packet_sz) { delay(50); } + dt.first_response_info(); + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + size_t pack_size = decoder.get_packet_size(); Serial.print("1st Packet size: "); Serial.println(pack_size); + dt.first_response_info(); + + if ((dt.get_response_offset()!=pack_size)||(dt.get_response_size()!=expected_2nd_pack_size)) { + Serial.println("ERROR parsing 1st response\n"); + return; + } + + Serial.print("Consuming 2nd packet of given size: "); + Serial.println(dt.get_response_size()); + + dt.crop_bytes(dt.get_response_size(), dt.get_response_offset()); + + dt.print_raw_buf(); + + Serial.println("-- Done --\n"); +} + +void runDecoderPopFirstTest(const char* label, size_t expected_2nd_pack_size) { + Serial.println(label); + + print_buf(); + DummyTransport dummy_transport(packer.data(), packer.size()); + RpcDecoder<> decoder(dummy_transport); + + DecoderTester dt(decoder); + + while (!decoder.packet_incoming()) { + Serial.println("Packet not ready"); + decoder.decode(); + delay(50); + } + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + + dt.first_response_info(); + + size_t pack_size = decoder.get_packet_size(); + Serial.print("Consuming 1st Packet of size: "); + Serial.println(pack_size); + dt.pop_first(); + dt.print_raw_buf(); + + dt.first_response_info(); + + if ((dt.get_response_offset()!=0)||(dt.get_response_size()!=expected_2nd_pack_size)) { + Serial.println("ERROR moving 1st response\n"); + return; + } + Serial.print("Consuming 2nd packet of given size: "); - Serial.println(second_packet_sz); + Serial.println(dt.get_response_size()); + + dt.crop_bytes(dt.get_response_size(), dt.get_response_offset()); + + dt.print_raw_buf(); + dt.first_response_info(); + + Serial.println("-- Done --\n"); +} + +void runDecoderGetResponseTest(const char* label, size_t expected_2nd_pack_size, int _id) { + Serial.println(label); + + print_buf(); + DummyTransport dummy_transport(packer.data(), packer.size()); + RpcDecoder<> decoder(dummy_transport); + + DecoderTester dt(decoder); + + while (!decoder.packet_incoming()) { + Serial.println("Packet not ready"); + decoder.decode(); + delay(50); + } + + dt.first_response_info(); + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + + size_t pack_size = decoder.get_packet_size(); + Serial.print("1st Packet size: "); + Serial.println(pack_size); + + dt.first_response_info(); + + if ((dt.get_response_offset()!=pack_size)||(dt.get_response_size()!=expected_2nd_pack_size)) { + Serial.println("ERROR parsing 1st response\n"); + return; + } + + Serial.print("Getting response (2nd packet) size: "); + Serial.println(dt.get_response_size()); + + int res; + RpcError _err; + dt.get_response(_id, res, _err); + + Serial.print("Result: "); + Serial.println(res); + + dt.print_raw_buf(); + + Serial.println("-- Done --\n"); +} + + +void runDecoderGetTopResponseTest(const char* label, size_t expected_size, int _id) { + Serial.println(label); + + print_buf(); + DummyTransport dummy_transport(packer.data(), packer.size()); + RpcDecoder<> decoder(dummy_transport); + + DecoderTester dt(decoder); + + while (!decoder.packet_incoming()) { + Serial.println("Packet not ready"); + decoder.decode(); + delay(50); + } - dt.crop_bytes(second_packet_sz, pack_size); + dt.first_response_info(); + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + + size_t pack_size = decoder.get_packet_size(); + Serial.print("1st Packet size: "); + Serial.println(pack_size); + + dt.first_response_info(); + + if ((dt.get_response_offset()!=0)||(dt.get_response_size()!=expected_size)) { + Serial.println("ERROR parsing 1st response\n"); + return; + } + + Serial.print("Getting response size: "); + Serial.println(dt.get_response_size()); + + int res; + RpcError _err; + dt.get_response(_id, res, _err); + + Serial.print("Result: "); + Serial.println(res); dt.print_raw_buf(); + dt.first_response_info(); + Serial.println("-- Done --\n"); } + void testNestedArrayRequest() { packer.clear(); MsgPack::arr_size_t outer_arr(3); @@ -166,6 +330,63 @@ void testMultipleRpcPackets() { } +// Multiple RPCs in one buffer. Pop the 1st request and then the 2nd response +void testPopRpcPackets() { + packer.clear(); + MsgPack::arr_size_t req_sz(4); + MsgPack::arr_size_t par_sz(2); + MsgPack::arr_size_t resp_sz(4); + MsgPack::object::nil_t nil; + + // 1st request + packer.serialize(req_sz, 0, 1, "sum", par_sz, 10, 20); + // 2nd response + packer.serialize(resp_sz, 1, 1, nil, 42); + // 3rd request + packer.serialize(req_sz, 0, 2, "echo", par_sz, "Hello", true); + + runDecoderPopFirstTest("== Test: Pop-first packet ==", 5); + +} + +// Multiple RPCs in one buffer. Get the response in the buffer +void testGetResponsePacket() { + packer.clear(); + MsgPack::arr_size_t req_sz(4); + MsgPack::arr_size_t par_sz(2); + MsgPack::arr_size_t resp_sz(4); + MsgPack::object::nil_t nil; + + // 1st request + packer.serialize(req_sz, 0, 1, "sum", par_sz, 10, 20); + // 2nd response + packer.serialize(resp_sz, 1, 1, nil, 101); + // 3rd request + packer.serialize(req_sz, 0, 2, "echo", par_sz, "Hello", true); + + runDecoderGetResponseTest("== Test: Get response packet ==", 5, 1); + +} + +// Multiple RPCs in one buffer. The response is top of the buffer +void testGetTopResponsePacket() { + packer.clear(); + MsgPack::arr_size_t req_sz(4); + MsgPack::arr_size_t par_sz(2); + MsgPack::arr_size_t resp_sz(4); + MsgPack::object::nil_t nil; + + // 1st response + packer.serialize(resp_sz, 1, 1, nil, 101); + // 2nd request + packer.serialize(req_sz, 0, 2, "echo", par_sz, "Hello", true); + // 3rd request + packer.serialize(req_sz, 0, 1, "sum", par_sz, 30, 30); + + runDecoderGetTopResponseTest("== Test: Get top response packet ==", 5, 1); + +} + // Binary parameter (e.g., binary blob) void testBinaryParam() { packer.clear(); @@ -225,6 +446,9 @@ void setup() { testDeepNestedStructure(); testArrayOfMapsResponse(); testMultipleRpcPackets(); + testPopRpcPackets(); + testGetResponsePacket(); + testGetTopResponsePacket(); testBinaryParam(); testExtensionParam(); testCombinedComplexBuffer(); diff --git a/src/decoder.h b/src/decoder.h index 883abeb..3636646 100644 --- a/src/decoder.h +++ b/src/decoder.h @@ -57,12 +57,12 @@ class RpcDecoder { template bool get_response(const uint32_t msg_id, RType& result, RpcError& error) { - if (!packet_incoming() || _packet_type!=RESP_MSG) return false; + if (!response_queued()) return false; MsgPack::Unpacker unpacker; unpacker.clear(); - if (!unpacker.feed(_raw_buffer, _packet_size)) return false; + if (!unpacker.feed(_raw_buffer + _response_offset, _response_size)) return false; MsgPack::arr_size_t resp_size; int resp_type; @@ -78,7 +78,7 @@ class RpcDecoder { // This should never happen error.code = PARSING_ERR; error.traceback = "Unexpected response type"; - discard(); + crop_response(true); return true; } @@ -86,7 +86,7 @@ class RpcDecoder { // This should never happen error.code = PARSING_ERR; error.traceback = "Unexpected RPC response size"; - discard(); + crop_response(true); return true; } @@ -95,20 +95,19 @@ class RpcDecoder { if (!unpacker.deserialize(nil, result)) { error.code = PARSING_ERR; error.traceback = "Result not parsable (check type)"; - discard(); + crop_response(true); return true; } } else { // RPC returned an error if (!unpacker.deserialize(error, nil)) { error.code = PARSING_ERR; error.traceback = "RPC Error not parsable (check type)"; - discard(); + crop_response(true); return true; } } - consume(_packet_size); - reset_packet(); + crop_response(false); return true; } @@ -188,37 +187,60 @@ class RpcDecoder { void parse_packet(){ - if (packet_incoming()){return;} + size_t offset = 0; + + if (packet_incoming()) { + if (response_queued()) { + return; + } + offset = _response_offset; + } size_t bytes_checked = 0; size_t container_size; int type; MsgPack::Unpacker unpacker; - while (bytes_checked < _bytes_stored){ + while (bytes_checked + offset < _bytes_stored){ bytes_checked++; unpacker.clear(); - if (!unpacker.feed(_raw_buffer, bytes_checked)) continue; + if (!unpacker.feed(_raw_buffer + offset, bytes_checked)) continue; if (unpackTypedArray(unpacker, container_size, type)) { if (type != CALL_MSG && type != RESP_MSG && type != NOTIFY_MSG) { - consume(bytes_checked); + consume(bytes_checked, offset); _discarded_packets++; break; // Not a valid RPC type (could be type=WRONG_MSG) } if ((type == CALL_MSG && container_size != REQUEST_SIZE) || (type == RESP_MSG && container_size != RESPONSE_SIZE) || (type == NOTIFY_MSG && container_size != NOTIFY_SIZE)) { - consume(bytes_checked); + consume(bytes_checked, offset); _discarded_packets++; break; // Not a valid RPC format } - _packet_type = type; - _packet_size = bytes_checked; + if (offset == 0) { // that's the first packet + _packet_type = type; + _packet_size = bytes_checked; + if (type == RESP_MSG) { // and it is for a client + _response_offset = 0; + _response_size = bytes_checked; + } else if (!response_queued()) { + _response_offset = bytes_checked; + _response_size = 0; + } + } else { + if (type == RESP_MSG) { // we have a response packet in the queue + _response_offset = offset; + _response_size = bytes_checked; + } else { // look further + _response_offset = offset + bytes_checked; + _response_size = 0; + } + } + break; - } else { - continue; } } @@ -227,6 +249,10 @@ class RpcDecoder { bool packet_incoming() const { return _packet_size >= MIN_RPC_BYTES; } + bool response_queued() const { + return (_response_offset < _bytes_stored) && (_response_size > 0); + } + int packet_type() const { return _packet_type; } size_t get_packet_size() const { return _packet_size;} @@ -243,6 +269,8 @@ class RpcDecoder { size_t _bytes_stored = 0; int _packet_type = NO_MSG; size_t _packet_size = 0; + size_t _response_offset = 0; + size_t _response_size = 0; uint32_t _msg_id = 0; uint32_t _discarded_packets = 0; @@ -275,9 +303,23 @@ class RpcDecoder { } reset_packet(); + if (_response_offset >= packet_size) { + _response_offset -= packet_size; + } return consume(packet_size); } + void crop_response(bool discard) { + consume(_response_size, _response_offset); + if (_response_offset==0) { // the response was in the first position + reset_packet(); + } + reset_response(); + if (discard) { + _discarded_packets++; + } + } + void discard() { consume(_packet_size); reset_packet(); @@ -289,18 +331,23 @@ class RpcDecoder { _packet_size = 0; } -size_t consume(size_t size, size_t offset = 0) { - // Boundary checks - if (offset + size > _bytes_stored || size == 0) return 0; - - size_t remaining_bytes = _bytes_stored - size; - for (size_t i=offset; i _bytes_stored || size == 0) return 0; + + size_t remaining_bytes = _bytes_stored - size; + for (size_t i=offset; i Date: Fri, 28 Nov 2025 12:05:44 +0100 Subject: [PATCH 3/3] impr: decoder code fix: decoder not consuming resp in buffer --- src/decoder.h | 74 +++++++++++++++++++++++---------------------------- 1 file changed, 33 insertions(+), 41 deletions(-) diff --git a/src/decoder.h b/src/decoder.h index 3636646..c969f50 100644 --- a/src/decoder.h +++ b/src/decoder.h @@ -15,11 +15,11 @@ #include "MsgPack.h" #include "transport.h" #include "rpclite_utils.h" +#include "error.h" using namespace RpcUtils::detail; #define MIN_RPC_BYTES 4 -#define CHUNK_SIZE 32 template class RpcDecoder { @@ -78,7 +78,9 @@ class RpcDecoder { // This should never happen error.code = PARSING_ERR; error.traceback = "Unexpected response type"; - crop_response(true); + consume(_response_size, _response_offset); + if (_response_offset == 0) reset_packet(); + _discarded_packets++; return true; } @@ -86,7 +88,9 @@ class RpcDecoder { // This should never happen error.code = PARSING_ERR; error.traceback = "Unexpected RPC response size"; - crop_response(true); + consume(_response_size, _response_offset); + if (_response_offset == 0) reset_packet(); + _discarded_packets++; return true; } @@ -95,19 +99,25 @@ class RpcDecoder { if (!unpacker.deserialize(nil, result)) { error.code = PARSING_ERR; error.traceback = "Result not parsable (check type)"; - crop_response(true); + consume(_response_size, _response_offset); + if (_response_offset == 0) reset_packet(); + _discarded_packets++; return true; } } else { // RPC returned an error if (!unpacker.deserialize(error, nil)) { error.code = PARSING_ERR; error.traceback = "RPC Error not parsable (check type)"; - crop_response(true); + consume(_response_size, _response_offset); + if (_response_offset == 0) reset_packet(); + _discarded_packets++; return true; } } - crop_response(false); + if (_response_offset == 0) reset_packet(); + consume(_response_size, _response_offset); + return true; } @@ -190,10 +200,8 @@ class RpcDecoder { size_t offset = 0; if (packet_incoming()) { - if (response_queued()) { - return; - } - offset = _response_offset; + if (response_queued()) return; // parsing complete + offset = _response_offset; // looking for a RESP } size_t bytes_checked = 0; @@ -220,24 +228,16 @@ class RpcDecoder { break; // Not a valid RPC format } - if (offset == 0) { // that's the first packet + if (offset == 0) { _packet_type = type; _packet_size = bytes_checked; - if (type == RESP_MSG) { // and it is for a client - _response_offset = 0; - _response_size = bytes_checked; - } else if (!response_queued()) { - _response_offset = bytes_checked; - _response_size = 0; - } + } + + if (type == RESP_MSG) { + _response_offset = offset; + _response_size = bytes_checked; // response queued } else { - if (type == RESP_MSG) { // we have a response packet in the queue - _response_offset = offset; - _response_size = bytes_checked; - } else { // look further - _response_offset = offset + bytes_checked; - _response_size = 0; - } + _response_offset = offset + bytes_checked; } break; @@ -250,7 +250,7 @@ class RpcDecoder { bool packet_incoming() const { return _packet_size >= MIN_RPC_BYTES; } bool response_queued() const { - return (_response_offset < _bytes_stored) && (_response_size > 0); + return _response_size > 0; } int packet_type() const { return _packet_type; } @@ -303,23 +303,9 @@ class RpcDecoder { } reset_packet(); - if (_response_offset >= packet_size) { - _response_offset -= packet_size; - } return consume(packet_size); } - void crop_response(bool discard) { - consume(_response_size, _response_offset); - if (_response_offset==0) { // the response was in the first position - reset_packet(); - } - reset_response(); - if (discard) { - _discarded_packets++; - } - } - void discard() { consume(_packet_size); reset_packet(); @@ -332,7 +318,7 @@ class RpcDecoder { } void reset_response() { - _response_offset = _bytes_stored; + _response_offset = 0; _response_size = 0; } @@ -345,6 +331,12 @@ class RpcDecoder { _raw_buffer[i] = _raw_buffer[i+size]; } + if (_response_offset >= offset + size) { + _response_offset -= size; + } else { + reset_response(); + } + _bytes_stored = remaining_bytes; return size; }