diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..8450693 --- /dev/null +++ b/.clang-format @@ -0,0 +1,11 @@ +BasedOnStyle: LLVM +ColumnLimit: 100 +BinPackArguments: false +BinPackParameters: false +AllowAllArgumentsOnNextLine: false +AlignAfterOpenBracket: BlockIndent +UseTab: ForIndentation +IndentWidth: 4 +TabWidth: 4 +ContinuationIndentWidth: 4 +AllowShortFunctionsOnASingleLine: None diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..d89c76d --- /dev/null +++ b/.editorconfig @@ -0,0 +1,11 @@ +root = true + +[*] +end_of_line = lf +insert_final_newline = true +charset = utf-8 + +[*.{c,cc,cpp,h,hpp,ino}] +indent_style = tab +indent_size = tab +tab_width = 4 diff --git a/.gitignore b/.gitignore index 78f49b6..6346d5c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ .venv build/ build_prev_runner/ -.vscode \ No newline at end of file diff --git a/.vscode/bin/clang-format b/.vscode/bin/clang-format new file mode 100755 index 0000000..0df371f --- /dev/null +++ b/.vscode/bin/clang-format @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +set -euo pipefail + +if command -v clang-format >/dev/null 2>&1; then + exec clang-format "$@" +fi + +_home_dir="${HOME:-}" +if [ -n "$_home_dir" ]; then + _candidate="$(ls -1d "$_home_dir"/.vscode/extensions/ms-vscode.cpptools-*-linux-x64/LLVM/bin/clang-format 2>/dev/null | tail -n 1 || true)" + if [ -n "$_candidate" ] && [ -x "$_candidate" ]; then + exec "$_candidate" "$@" + fi +fi + +echo "clang-format executable not found." >&2 +echo "Install clang-format system-wide or install/update ms-vscode.cpptools." >&2 +exit 127 diff --git a/.vscode/extensions.json b/.vscode/extensions.json new file mode 100644 index 0000000..f814711 --- /dev/null +++ b/.vscode/extensions.json @@ -0,0 +1,9 @@ +{ + "recommendations": [ + "pioarduino.pioarduino-ide", + "xaver.clang-format" + ], + "unwantedRecommendations": [ + "ms-vscode.cpptools-extension-pack" + ] +} diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..24368c8 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,30 @@ +{ + "files.associations": { + "*.ino": "cpp" + }, + "editor.defaultFormatter": "xaver.clang-format", + "C_Cpp.formatting": "Disabled", + "clang-format.style": "file", + "clang-format.executable": "${workspaceRoot}/.vscode/bin/clang-format", + "[cpp]": { + "editor.defaultFormatter": "xaver.clang-format", + "editor.detectIndentation": false, + "editor.insertSpaces": false, + "editor.tabSize": 4, + "editor.formatOnSave": true + }, + "[c]": { + "editor.defaultFormatter": "xaver.clang-format", + "editor.detectIndentation": false, + "editor.insertSpaces": false, + "editor.tabSize": 4, + "editor.formatOnSave": true + }, + "[arduino]": { + "editor.defaultFormatter": "xaver.clang-format", + "editor.detectIndentation": false, + "editor.insertSpaces": false, + "editor.tabSize": 4, + "editor.formatOnSave": true + } +} diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..20e66d5 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,12 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "Format Firmware Sources", + "type": "shell", + "command": "bash ${workspaceFolder}/scripts/format_cpp.sh", + "group": "build", + "problemMatcher": [] + } + ] +} diff --git a/README.md b/README.md index 68b710b..81099d8 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,13 @@ Combine `pressureCallback` and `dropCallback` to monitor noisy publishers, and w ## Tests Unity tests run under PlatformIO: plug in an ESP32 dev board and execute `pio test -e esp32dev` from the repo root. The suite covers overflow policies, subscription caps, payload validation, and graceful shutdown. +## Formatting Baseline + +This repository follows the firmware formatting baseline from `esptoolkit-template`: +- `.clang-format` is the source of truth for C/C++/INO layout. +- `.editorconfig` enforces tabs (`tab_width = 4`), LF endings, and final newline. +- Format all tracked firmware sources with `bash scripts/format_cpp.sh`. + ## License MIT — see [LICENSE.md](LICENSE.md). diff --git a/examples/basic_usage/basic_usage.ino b/examples/basic_usage/basic_usage.ino index 125d9bb..12a189d 100644 --- a/examples/basic_usage/basic_usage.ino +++ b/examples/basic_usage/basic_usage.ino @@ -2,72 +2,73 @@ #include enum class DemoEvent : uint16_t { - NetworkGotIP, + NetworkGotIP, }; struct NetworkGotIpPayload { - uint32_t sequence; - char address[16]; + uint32_t sequence; + char address[16]; }; ESPEventBus eventBus; -void onNetworkGotIp(void* payload, void*) { - auto* info = static_cast(payload); - Serial.printf("[callback] seq=%u ip=%s\n", info->sequence, info->address); +void onNetworkGotIp(void *payload, void *) { + auto *info = static_cast(payload); + Serial.printf("[callback] seq=%u ip=%s\n", info->sequence, info->address); } -void networkSimulatorTask(void* pv) { - static NetworkGotIpPayload payload{}; - uint8_t octet = 100; +void networkSimulatorTask(void *pv) { + static NetworkGotIpPayload payload{}; + uint8_t octet = 100; - for (;;) { - payload.sequence++; - snprintf(payload.address, sizeof(payload.address), "192.168.1.%u", octet++); - if (octet > 200) { - octet = 100; - } + for (;;) { + payload.sequence++; + snprintf(payload.address, sizeof(payload.address), "192.168.1.%u", octet++); + if (octet > 200) { + octet = 100; + } - eventBus.post(DemoEvent::NetworkGotIP, &payload, portMAX_DELAY); - vTaskDelay(pdMS_TO_TICKS(1500)); - } + eventBus.post(DemoEvent::NetworkGotIP, &payload, portMAX_DELAY); + vTaskDelay(pdMS_TO_TICKS(1500)); + } } -void blockingConsumerTask(void* pv) { - for (;;) { - auto* payload = static_cast( - eventBus.waitFor(DemoEvent::NetworkGotIP, portMAX_DELAY)); - if (payload) { - Serial.printf("[waitFor] seq=%u ip=%s\n", payload->sequence, payload->address); - } - } +void blockingConsumerTask(void *pv) { + for (;;) { + auto *payload = static_cast( + eventBus.waitFor(DemoEvent::NetworkGotIP, portMAX_DELAY) + ); + if (payload) { + Serial.printf("[waitFor] seq=%u ip=%s\n", payload->sequence, payload->address); + } + } } void setup() { - Serial.begin(115200); - while (!Serial) { - delay(10); - } + Serial.begin(115200); + while (!Serial) { + delay(10); + } - if (!eventBus.init()) { - Serial.println("Failed to init ESPEventBus"); - return; - } + if (!eventBus.init()) { + Serial.println("Failed to init ESPEventBus"); + return; + } - eventBus.subscribe(DemoEvent::NetworkGotIP, onNetworkGotIp); + eventBus.subscribe(DemoEvent::NetworkGotIP, onNetworkGotIp); - xTaskCreatePinnedToCore(networkSimulatorTask, "network-sim", 2048, nullptr, 4, nullptr, 0); - xTaskCreatePinnedToCore(blockingConsumerTask, "network-wait", 2048, nullptr, 4, nullptr, 1); + xTaskCreatePinnedToCore(networkSimulatorTask, "network-sim", 2048, nullptr, 4, nullptr, 0); + xTaskCreatePinnedToCore(blockingConsumerTask, "network-wait", 2048, nullptr, 4, nullptr, 1); } void loop() { - if (Serial.available() > 0) { - const int ch = Serial.read(); - if ((ch == 'x' || ch == 'X') && eventBus.isInitialized()) { - Serial.println("[ESPEventBus] deinitializing by user request"); - eventBus.deinit(); - } - } + if (Serial.available() > 0) { + const int ch = Serial.read(); + if ((ch == 'x' || ch == 'X') && eventBus.isInitialized()) { + Serial.println("[ESPEventBus] deinitializing by user request"); + eventBus.deinit(); + } + } - vTaskDelay(pdMS_TO_TICKS(1000)); + vTaskDelay(pdMS_TO_TICKS(1000)); } diff --git a/examples/overflow_monitor/overflow_monitor.ino b/examples/overflow_monitor/overflow_monitor.ino index a75cfae..31389bc 100644 --- a/examples/overflow_monitor/overflow_monitor.ino +++ b/examples/overflow_monitor/overflow_monitor.ino @@ -2,12 +2,12 @@ #include enum class SensorEvent : uint16_t { - Sample, + Sample, }; struct SensorPayload { - uint32_t sequence; - float reading; + uint32_t sequence; + float reading; }; ESPEventBus eventBus; @@ -15,68 +15,73 @@ static SensorPayload pool[4]; static size_t nextSlot = 0; struct PoolContext { - SensorPayload* pool; - size_t size; + SensorPayload *pool; + size_t size; }; -PoolContext poolCtx{ pool, sizeof(pool) / sizeof(pool[0]) }; +PoolContext poolCtx{pool, sizeof(pool) / sizeof(pool[0])}; -bool payloadValidator(EventBusId, void* payload, void* userArg) { - auto* ctx = static_cast(userArg); - for (size_t i = 0; i < ctx->size; ++i) { - if (payload == &ctx->pool[i]) { - return true; - } - } - return false; +bool payloadValidator(EventBusId, void *payload, void *userArg) { + auto *ctx = static_cast(userArg); + for (size_t i = 0; i < ctx->size; ++i) { + if (payload == &ctx->pool[i]) { + return true; + } + } + return false; } -void pressureCallback(UBaseType_t queued, UBaseType_t capacity, void*) { - Serial.printf("[ESPEventBus] queue pressure: %u/%u entries in use\n", queued, capacity); +void pressureCallback(UBaseType_t queued, UBaseType_t capacity, void *) { + Serial.printf("[ESPEventBus] queue pressure: %u/%u entries in use\n", queued, capacity); } -void dropCallback(EventBusId id, void*, void*) { - Serial.printf("[ESPEventBus] dropped payload for event %u (queue stayed full)\n", static_cast(id)); +void dropCallback(EventBusId id, void *, void *) { + Serial.printf( + "[ESPEventBus] dropped payload for event %u (queue stayed full)\n", + static_cast(id) + ); } -void sensorConsumer(void* payload, void*) { - auto* info = static_cast(payload); - Serial.printf("[ESPEventBus] seq=%u reading=%.3f\n", info->sequence, info->reading); - vTaskDelay(pdMS_TO_TICKS(80)); // Pretend to do slow work so we can exercise pressure/drop callbacks +void sensorConsumer(void *payload, void *) { + auto *info = static_cast(payload); + Serial.printf("[ESPEventBus] seq=%u reading=%.3f\n", info->sequence, info->reading); + vTaskDelay( + pdMS_TO_TICKS(80) + ); // Pretend to do slow work so we can exercise pressure/drop callbacks } void setup() { - Serial.begin(115200); - while (!Serial) { - delay(10); - } - - EventBusConfig cfg{}; - cfg.queueLength = 4; - cfg.overflowPolicy = EventBusOverflowPolicy::DropOldest; - cfg.pressureThresholdPercent = 75; - cfg.pressureCallback = pressureCallback; - cfg.dropCallback = dropCallback; - cfg.payloadValidator = payloadValidator; - cfg.payloadValidatorArg = &poolCtx; - - if (!eventBus.init(cfg)) { - Serial.println("Failed to init ESPEventBus"); - return; - } - - eventBus.subscribe(SensorEvent::Sample, sensorConsumer); + Serial.begin(115200); + while (!Serial) { + delay(10); + } + + EventBusConfig cfg{}; + cfg.queueLength = 4; + cfg.overflowPolicy = EventBusOverflowPolicy::DropOldest; + cfg.pressureThresholdPercent = 75; + cfg.pressureCallback = pressureCallback; + cfg.dropCallback = dropCallback; + cfg.payloadValidator = payloadValidator; + cfg.payloadValidatorArg = &poolCtx; + + if (!eventBus.init(cfg)) { + Serial.println("Failed to init ESPEventBus"); + return; + } + + eventBus.subscribe(SensorEvent::Sample, sensorConsumer); } void loop() { - auto& slot = pool[nextSlot]; - slot.sequence++; - slot.reading = analogReadMilliVolts(34) / 1000.0f; + auto &slot = pool[nextSlot]; + slot.sequence++; + slot.reading = analogReadMilliVolts(34) / 1000.0f; - if (!eventBus.post(SensorEvent::Sample, &slot, 0)) { - Serial.println("[ESPEventBus] queue is saturated; payload rejected"); - } + if (!eventBus.post(SensorEvent::Sample, &slot, 0)) { + Serial.println("[ESPEventBus] queue is saturated; payload rejected"); + } - nextSlot = (nextSlot + 1) % (sizeof(pool) / sizeof(pool[0])); - delay(10); + nextSlot = (nextSlot + 1) % (sizeof(pool) / sizeof(pool[0])); + delay(10); } diff --git a/examples/request_response/request_response.ino b/examples/request_response/request_response.ino index bc14062..7dabe48 100644 --- a/examples/request_response/request_response.ino +++ b/examples/request_response/request_response.ino @@ -2,73 +2,77 @@ #include enum class DemoEvent : uint16_t { - Command, - Response, + Command, + Response, }; struct CommandPayload { - uint32_t id; - const char* command; + uint32_t id; + const char *command; }; struct ResponsePayload { - uint32_t id; - String result; + uint32_t id; + String result; }; ESPEventBus eventBus; static ResponsePayload sharedResponse{}; -void workerCallback(void* payload, void*) { - auto* cmd = static_cast(payload); - sharedResponse.id = cmd->id; - sharedResponse.result = String("Ack for ") + cmd->command; - delay(15); // Simulate doing work before replying - eventBus.post(DemoEvent::Response, &sharedResponse, portMAX_DELAY); +void workerCallback(void *payload, void *) { + auto *cmd = static_cast(payload); + sharedResponse.id = cmd->id; + sharedResponse.result = String("Ack for ") + cmd->command; + delay(15); // Simulate doing work before replying + eventBus.post(DemoEvent::Response, &sharedResponse, portMAX_DELAY); } -void requestTask(void*) { - CommandPayload cmd{}; - uint32_t sequence = 0; +void requestTask(void *) { + CommandPayload cmd{}; + uint32_t sequence = 0; - for (;;) { - cmd.id = ++sequence; - cmd.command = (sequence % 2 == 0) ? "set-mode=eco" : "status?"; - eventBus.post(DemoEvent::Command, &cmd, portMAX_DELAY); + for (;;) { + cmd.id = ++sequence; + cmd.command = (sequence % 2 == 0) ? "set-mode=eco" : "status?"; + eventBus.post(DemoEvent::Command, &cmd, portMAX_DELAY); - auto* response = static_cast( - eventBus.waitFor(DemoEvent::Response, pdMS_TO_TICKS(100))); - if (response) { - Serial.printf("[requestTask] got response id=%u body=%s\n", - response->id, response->result.c_str()); - } else { - Serial.println("[requestTask] timed out waiting for response"); - } + auto *response = static_cast( + eventBus.waitFor(DemoEvent::Response, pdMS_TO_TICKS(100)) + ); + if (response) { + Serial.printf( + "[requestTask] got response id=%u body=%s\n", + response->id, + response->result.c_str() + ); + } else { + Serial.println("[requestTask] timed out waiting for response"); + } - vTaskDelay(pdMS_TO_TICKS(500)); - } + vTaskDelay(pdMS_TO_TICKS(500)); + } } void setup() { - Serial.begin(115200); - while (!Serial) { - delay(10); - } + Serial.begin(115200); + while (!Serial) { + delay(10); + } - EventBusConfig cfg{}; - cfg.queueLength = 8; - cfg.priority = 4; - cfg.stackSize = 4096; + EventBusConfig cfg{}; + cfg.queueLength = 8; + cfg.priority = 4; + cfg.stackSize = 4096; - if (!eventBus.init(cfg)) { - Serial.println("Failed to init ESPEventBus"); - return; - } + if (!eventBus.init(cfg)) { + Serial.println("Failed to init ESPEventBus"); + return; + } - eventBus.subscribe(DemoEvent::Command, workerCallback); - xTaskCreatePinnedToCore(requestTask, "request-task", 4096, nullptr, 3, nullptr, 1); + eventBus.subscribe(DemoEvent::Command, workerCallback); + xTaskCreatePinnedToCore(requestTask, "request-task", 4096, nullptr, 3, nullptr, 1); } void loop() { - vTaskDelay(pdMS_TO_TICKS(1000)); + vTaskDelay(pdMS_TO_TICKS(1000)); } diff --git a/scripts/format_cpp.sh b/scripts/format_cpp.sh new file mode 100755 index 0000000..7d17b04 --- /dev/null +++ b/scripts/format_cpp.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +set -euo pipefail + +_repo_root="$(git rev-parse --show-toplevel 2>/dev/null || pwd)" +_clang_format="${_repo_root}/.vscode/bin/clang-format" + +if [ ! -x "${_clang_format}" ]; then + echo "clang-format wrapper not found: ${_clang_format}" >&2 + exit 1 +fi + +mapfile -d '' _format_files < <( + git -C "${_repo_root}" ls-files -z -- '*.c' '*.cc' '*.cpp' '*.h' '*.hpp' '*.ino' +) + +if [ "${#_format_files[@]}" -eq 0 ]; then + echo "No tracked C/C++/INO files found to format." + exit 0 +fi + +"${_clang_format}" -i --style=file "${_format_files[@]}" + +echo "Formatted ${#_format_files[@]} files." diff --git a/src/esp_eventbus/eventbus.cpp b/src/esp_eventbus/eventbus.cpp index 14f6b83..e82f588 100644 --- a/src/esp_eventbus/eventbus.cpp +++ b/src/esp_eventbus/eventbus.cpp @@ -6,616 +6,636 @@ ESPEventBus::ESPEventBus() = default; ESPEventBus::~ESPEventBus() { - deinit(); -} - -bool ESPEventBus::init(const EventBusConfig& config) { - if (queue_ || task_ || subMutex_) { - deinit(); - } - - EventBusConfig sanitized = config; - if (sanitized.queueLength == 0 || sanitized.stackSize == 0) { - return false; - } - - if (sanitized.pressureThresholdPercent > 100) { - sanitized.pressureThresholdPercent = 100; - } - - config_ = sanitized; - stopEventPending_ = false; - resetKernelStorage(); - resetSubscriptions(config_.usePSRAMBuffers); - resetWaiters(config_.usePSRAMBuffers); - - if (!createKernelMutex()) { - return false; - } - - if (!createKernelQueue()) { - vSemaphoreDelete(subMutex_); - subMutex_ = nullptr; - resetKernelStorage(); - return false; - } - - running_ = true; - const char* taskName = (config_.taskName && config_.taskName[0] != '\0') ? config_.taskName : "ESPEventBus"; - if (!createWorkerTask(taskName)) { - running_ = false; - vQueueDelete(queue_); - queue_ = nullptr; - vSemaphoreDelete(subMutex_); - subMutex_ = nullptr; - resetKernelStorage(); - return false; - } - - return true; + deinit(); +} + +bool ESPEventBus::init(const EventBusConfig &config) { + if (queue_ || task_ || subMutex_) { + deinit(); + } + + EventBusConfig sanitized = config; + if (sanitized.queueLength == 0 || sanitized.stackSize == 0) { + return false; + } + + if (sanitized.pressureThresholdPercent > 100) { + sanitized.pressureThresholdPercent = 100; + } + + config_ = sanitized; + stopEventPending_ = false; + resetKernelStorage(); + resetSubscriptions(config_.usePSRAMBuffers); + resetWaiters(config_.usePSRAMBuffers); + + if (!createKernelMutex()) { + return false; + } + + if (!createKernelQueue()) { + vSemaphoreDelete(subMutex_); + subMutex_ = nullptr; + resetKernelStorage(); + return false; + } + + running_ = true; + const char *taskName = + (config_.taskName && config_.taskName[0] != '\0') ? config_.taskName : "ESPEventBus"; + if (!createWorkerTask(taskName)) { + running_ = false; + vQueueDelete(queue_); + queue_ = nullptr; + vSemaphoreDelete(subMutex_); + subMutex_ = nullptr; + resetKernelStorage(); + return false; + } + + return true; } void ESPEventBus::deinit() { - stopTask(); - - if (subMutex_ && xSemaphoreTake(subMutex_, portMAX_DELAY) == pdTRUE) { - clearWaiters(); - xSemaphoreGive(subMutex_); - } else { - clearWaiters(); - } - - if (queue_) { - vQueueDelete(queue_); - queue_ = nullptr; - } - - if (subMutex_) { - vSemaphoreDelete(subMutex_); - subMutex_ = nullptr; - } - - resetKernelStorage(); - resetSubscriptions(false); - resetWaiters(false); - nextSubId_ = 0; - stopEventPending_ = false; - config_ = EventBusConfig{}; - task_ = nullptr; + stopTask(); + + if (subMutex_ && xSemaphoreTake(subMutex_, portMAX_DELAY) == pdTRUE) { + clearWaiters(); + xSemaphoreGive(subMutex_); + } else { + clearWaiters(); + } + + if (queue_) { + vQueueDelete(queue_); + queue_ = nullptr; + } + + if (subMutex_) { + vSemaphoreDelete(subMutex_); + subMutex_ = nullptr; + } + + resetKernelStorage(); + resetSubscriptions(false); + resetWaiters(false); + nextSubId_ = 0; + stopEventPending_ = false; + config_ = EventBusConfig{}; + task_ = nullptr; } bool ESPEventBus::isInitialized() const { - return queue_ != nullptr && subMutex_ != nullptr && task_ != nullptr && running_; + return queue_ != nullptr && subMutex_ != nullptr && task_ != nullptr && running_; } -bool ESPEventBus::post(EventBusId id, void* payload, TickType_t timeout) { - if (!queue_) { - return false; - } +bool ESPEventBus::post(EventBusId id, void *payload, TickType_t timeout) { + if (!queue_) { + return false; + } - if (!validatePayload(id, payload)) { - return false; - } + if (!validatePayload(id, payload)) { + return false; + } - QueuedEvent ev{ id, payload, false }; - return enqueueFromTask(ev, timeout); + QueuedEvent ev{id, payload, false}; + return enqueueFromTask(ev, timeout); } -bool ESPEventBus::postFromISR(EventBusId id, void* payload, BaseType_t* higherPriorityTaskWoken) { - if (!queue_) { - return false; - } +bool ESPEventBus::postFromISR(EventBusId id, void *payload, BaseType_t *higherPriorityTaskWoken) { + if (!queue_) { + return false; + } - if (!validatePayload(id, payload)) { - return false; - } + if (!validatePayload(id, payload)) { + return false; + } - QueuedEvent ev{ id, payload, false }; - return enqueueFromISR(ev, higherPriorityTaskWoken); + QueuedEvent ev{id, payload, false}; + return enqueueFromISR(ev, higherPriorityTaskWoken); } -EventBusSub ESPEventBus::subscribe(EventBusId id, - EventCallbackFn cb, - void* userArg, - bool oneshot) { - if (!cb) { - return 0; - } +EventBusSub ESPEventBus::subscribe(EventBusId id, EventCallbackFn cb, void *userArg, bool oneshot) { + if (!cb) { + return 0; + } - return subscribe(id, EventCallback(cb), userArg, oneshot); + return subscribe(id, EventCallback(cb), userArg, oneshot); } -EventBusSub ESPEventBus::subscribe(EventBusId id, - EventCallback cb, - void* userArg, - bool oneshot) { - if (!cb || !subMutex_) { - return 0; - } +EventBusSub ESPEventBus::subscribe(EventBusId id, EventCallback cb, void *userArg, bool oneshot) { + if (!cb || !subMutex_) { + return 0; + } - if (xSemaphoreTake(subMutex_, portMAX_DELAY) != pdTRUE) { - return 0; - } + if (xSemaphoreTake(subMutex_, portMAX_DELAY) != pdTRUE) { + return 0; + } - if (config_.maxSubscriptions != 0) { - size_t activeCount = std::count_if(subs_.begin(), subs_.end(), [](const Subscription& sub) { return sub.active; }); - if (activeCount >= config_.maxSubscriptions) { - xSemaphoreGive(subMutex_); - return 0; - } - } + if (config_.maxSubscriptions != 0) { + size_t activeCount = std::count_if(subs_.begin(), subs_.end(), [](const Subscription &sub) { + return sub.active; + }); + if (activeCount >= config_.maxSubscriptions) { + xSemaphoreGive(subMutex_); + return 0; + } + } - EventBusSub subId = ++nextSubId_; - subs_.push_back(Subscription{ subId, id, std::move(cb), userArg, oneshot, true }); - xSemaphoreGive(subMutex_); - return subId; + EventBusSub subId = ++nextSubId_; + subs_.push_back(Subscription{subId, id, std::move(cb), userArg, oneshot, true}); + xSemaphoreGive(subMutex_); + return subId; } void ESPEventBus::unsubscribe(EventBusSub subId) { - if (!subId || !subMutex_) { - return; - } - - if (xSemaphoreTake(subMutex_, portMAX_DELAY) != pdTRUE) { - return; - } - - for (auto& sub : subs_) { - if (sub.subId == subId) { - sub.active = false; - break; - } - } - - compactSubscriptionsLocked(); - xSemaphoreGive(subMutex_); -} - -void* ESPEventBus::waitFor(EventBusId id, TickType_t timeout) { - if (!queue_ || !subMutex_) { - return nullptr; - } - - const TaskHandle_t callerTask = currentTaskHandle(); - if (!callerTask || (task_ && callerTask == task_)) { - return nullptr; - } - - QueueHandle_t responseQueue = nullptr; - if (xSemaphoreTake(subMutex_, portMAX_DELAY) != pdTRUE) { - return nullptr; - } - - WaiterContext* waiter = findWaiterLocked(callerTask, id); - if (!waiter) { - waiters_.push_back(WaiterContext{ callerTask, id, nullptr, nullptr, nullptr }); - waiter = &waiters_.back(); - if (!createWaiterQueue(*waiter)) { - waiters_.pop_back(); - xSemaphoreGive(subMutex_); - return nullptr; - } - } - - responseQueue = waiter->queue; - if (!responseQueue) { - xSemaphoreGive(subMutex_); - return nullptr; - } - - (void)xQueueReset(responseQueue); - xSemaphoreGive(subMutex_); - - void* payload = nullptr; - BaseType_t res = xQueueReceive(responseQueue, &payload, timeout); - - if (res != pdTRUE) { - return nullptr; - } - return payload; -} - -void ESPEventBus::taskEntry(void* arg) { - auto* instance = static_cast(arg); - instance->taskLoop(); + if (!subId || !subMutex_) { + return; + } + + if (xSemaphoreTake(subMutex_, portMAX_DELAY) != pdTRUE) { + return; + } + + for (auto &sub : subs_) { + if (sub.subId == subId) { + sub.active = false; + break; + } + } + + compactSubscriptionsLocked(); + xSemaphoreGive(subMutex_); +} + +void *ESPEventBus::waitFor(EventBusId id, TickType_t timeout) { + if (!queue_ || !subMutex_) { + return nullptr; + } + + const TaskHandle_t callerTask = currentTaskHandle(); + if (!callerTask || (task_ && callerTask == task_)) { + return nullptr; + } + + QueueHandle_t responseQueue = nullptr; + if (xSemaphoreTake(subMutex_, portMAX_DELAY) != pdTRUE) { + return nullptr; + } + + WaiterContext *waiter = findWaiterLocked(callerTask, id); + if (!waiter) { + waiters_.push_back(WaiterContext{callerTask, id, nullptr, nullptr, nullptr}); + waiter = &waiters_.back(); + if (!createWaiterQueue(*waiter)) { + waiters_.pop_back(); + xSemaphoreGive(subMutex_); + return nullptr; + } + } + + responseQueue = waiter->queue; + if (!responseQueue) { + xSemaphoreGive(subMutex_); + return nullptr; + } + + (void)xQueueReset(responseQueue); + xSemaphoreGive(subMutex_); + + void *payload = nullptr; + BaseType_t res = xQueueReceive(responseQueue, &payload, timeout); + + if (res != pdTRUE) { + return nullptr; + } + return payload; +} + +void ESPEventBus::taskEntry(void *arg) { + auto *instance = static_cast(arg); + instance->taskLoop(); } void ESPEventBus::taskLoop() { - if (!queue_) { - running_ = false; - stopEventPending_ = false; - task_ = nullptr; - return; - } - task_ = currentTaskHandle(); - - { - // Scope dynamic containers so they release memory before the task self-deletes. - EventBusVector snapshot{ EventBusAllocator(config_.usePSRAMBuffers) }; - snapshot.reserve(4); - - QueuedEvent ev; - - while (running_) { - if (xQueueReceive(queue_, &ev, portMAX_DELAY) != pdTRUE) { - continue; - } - - if (ev.stop) { - stopEventPending_ = false; - break; - } - - snapshot.clear(); - - if (xSemaphoreTake(subMutex_, portMAX_DELAY) == pdTRUE) { - bool needsCompact = false; - for (auto& sub : subs_) { - if (!sub.active) { - needsCompact = true; - continue; - } - - if (sub.eventId == ev.eventId) { - snapshot.push_back(sub); - if (sub.oneshot) { - sub.active = false; - needsCompact = true; - } - } - } - - if (needsCompact) { - compactSubscriptionsLocked(); - } - - for (auto& waiter : waiters_) { - if (waiter.eventId != ev.eventId || !waiter.queue) { - continue; - } - (void)xQueueSend(waiter.queue, &ev.payload, 0); - } - - xSemaphoreGive(subMutex_); - } - - for (auto& sub : snapshot) { - if (sub.cb) { - sub.cb(ev.payload, sub.userArg); - } - } - } - } - - running_ = false; - task_ = nullptr; - vTaskDelete(nullptr); + if (!queue_) { + running_ = false; + stopEventPending_ = false; + task_ = nullptr; + return; + } + task_ = currentTaskHandle(); + + { + // Scope dynamic containers so they release memory before the task self-deletes. + EventBusVector snapshot{ + EventBusAllocator(config_.usePSRAMBuffers) + }; + snapshot.reserve(4); + + QueuedEvent ev; + + while (running_) { + if (xQueueReceive(queue_, &ev, portMAX_DELAY) != pdTRUE) { + continue; + } + + if (ev.stop) { + stopEventPending_ = false; + break; + } + + snapshot.clear(); + + if (xSemaphoreTake(subMutex_, portMAX_DELAY) == pdTRUE) { + bool needsCompact = false; + for (auto &sub : subs_) { + if (!sub.active) { + needsCompact = true; + continue; + } + + if (sub.eventId == ev.eventId) { + snapshot.push_back(sub); + if (sub.oneshot) { + sub.active = false; + needsCompact = true; + } + } + } + + if (needsCompact) { + compactSubscriptionsLocked(); + } + + for (auto &waiter : waiters_) { + if (waiter.eventId != ev.eventId || !waiter.queue) { + continue; + } + (void)xQueueSend(waiter.queue, &ev.payload, 0); + } + + xSemaphoreGive(subMutex_); + } + + for (auto &sub : snapshot) { + if (sub.cb) { + sub.cb(ev.payload, sub.userArg); + } + } + } + } + + running_ = false; + task_ = nullptr; + vTaskDelete(nullptr); } void ESPEventBus::stopTask() { - if (!task_) { - running_ = false; - stopEventPending_ = false; - task_ = nullptr; - return; - } - - if (queue_ && !stopEventPending_) { - QueuedEvent stopEvent{}; - stopEvent.stop = true; - while (xQueueSend(queue_, &stopEvent, pdMS_TO_TICKS(10)) != pdPASS) { - vTaskDelay(pdMS_TO_TICKS(1)); - } - stopEventPending_ = true; - } - - TickType_t start = xTaskGetTickCount(); - while (task_ && (xTaskGetTickCount() - start) <= pdMS_TO_TICKS(3000)) { - vTaskDelay(pdMS_TO_TICKS(10)); - } - if (task_) { - vTaskDelete(task_); - task_ = nullptr; - } - task_ = nullptr; - running_ = false; - stopEventPending_ = false; + if (!task_) { + running_ = false; + stopEventPending_ = false; + task_ = nullptr; + return; + } + + if (queue_ && !stopEventPending_) { + QueuedEvent stopEvent{}; + stopEvent.stop = true; + while (xQueueSend(queue_, &stopEvent, pdMS_TO_TICKS(10)) != pdPASS) { + vTaskDelay(pdMS_TO_TICKS(1)); + } + stopEventPending_ = true; + } + + TickType_t start = xTaskGetTickCount(); + while (task_ && (xTaskGetTickCount() - start) <= pdMS_TO_TICKS(3000)) { + vTaskDelay(pdMS_TO_TICKS(10)); + } + if (task_) { + vTaskDelete(task_); + task_ = nullptr; + } + task_ = nullptr; + running_ = false; + stopEventPending_ = false; } void ESPEventBus::compactSubscriptionsLocked() { - subs_.erase( - std::remove_if(subs_.begin(), subs_.end(), - [](const Subscription& sub) { return !sub.active; }), - subs_.end()); -} - -bool ESPEventBus::enqueueFromTask(const QueuedEvent& ev, TickType_t timeout) { - TickType_t waitTicks = timeout; - if (config_.overflowPolicy != EventBusOverflowPolicy::Block) { - waitTicks = 0; - } - - if (xQueueSend(queue_, &ev, waitTicks) == pdTRUE) { - emitPressureMetricFromTask(); - return true; - } - - if (config_.overflowPolicy == EventBusOverflowPolicy::Block) { - notifyDrop(ev.eventId, ev.payload); - return false; - } - - return handleOverflowFromTask(ev); -} - -bool ESPEventBus::enqueueFromISR(const QueuedEvent& ev, BaseType_t* higherPriorityTaskWoken) { - BaseType_t localWoken = pdFALSE; - BaseType_t res = xQueueSendFromISR(queue_, &ev, &localWoken); - if (res == pdPASS) { - propagateYieldFromISR(localWoken, higherPriorityTaskWoken); - return true; - } - - if (config_.overflowPolicy == EventBusOverflowPolicy::Block) { - notifyDrop(ev.eventId, ev.payload); - propagateYieldFromISR(localWoken, higherPriorityTaskWoken); - return false; - } - - bool ok = handleOverflowFromISR(ev, &localWoken); - propagateYieldFromISR(localWoken, higherPriorityTaskWoken); - return ok; -} - -bool ESPEventBus::handleOverflowFromTask(const QueuedEvent& ev) { - switch (config_.overflowPolicy) { - case EventBusOverflowPolicy::DropNewest: - notifyDrop(ev.eventId, ev.payload); - return false; - case EventBusOverflowPolicy::DropOldest: { - QueuedEvent dropped{}; - if (xQueueReceive(queue_, &dropped, 0) == pdTRUE) { - notifyDrop(dropped.eventId, dropped.payload); - if (xQueueSend(queue_, &ev, 0) == pdTRUE) { - emitPressureMetricFromTask(); - return true; - } - } - notifyDrop(ev.eventId, ev.payload); - return false; - } - case EventBusOverflowPolicy::Block: - default: - notifyDrop(ev.eventId, ev.payload); - return false; - } -} - -bool ESPEventBus::handleOverflowFromISR(const QueuedEvent& ev, BaseType_t* localWokenAggregate) { - switch (config_.overflowPolicy) { - case EventBusOverflowPolicy::DropNewest: - notifyDrop(ev.eventId, ev.payload); - return false; - case EventBusOverflowPolicy::DropOldest: { - QueuedEvent dropped{}; - BaseType_t localWoken = pdFALSE; - if (xQueueReceiveFromISR(queue_, &dropped, &localWoken) == pdTRUE) { - if (localWokenAggregate) { - if (localWoken == pdTRUE) { - *localWokenAggregate = pdTRUE; - } - } - notifyDrop(dropped.eventId, dropped.payload); - BaseType_t sendWoken = pdFALSE; - if (xQueueSendFromISR(queue_, &ev, &sendWoken) == pdPASS) { - if (sendWoken == pdTRUE) { - localWoken = pdTRUE; - } - if (localWokenAggregate) { - if (localWoken == pdTRUE || sendWoken == pdTRUE) { - *localWokenAggregate = pdTRUE; - } - } - return true; - } - } - notifyDrop(ev.eventId, ev.payload); - return false; - } - case EventBusOverflowPolicy::Block: - default: - notifyDrop(ev.eventId, ev.payload); - return false; - } + subs_.erase( + std::remove_if( + subs_.begin(), + subs_.end(), + [](const Subscription &sub) { return !sub.active; } + ), + subs_.end() + ); +} + +bool ESPEventBus::enqueueFromTask(const QueuedEvent &ev, TickType_t timeout) { + TickType_t waitTicks = timeout; + if (config_.overflowPolicy != EventBusOverflowPolicy::Block) { + waitTicks = 0; + } + + if (xQueueSend(queue_, &ev, waitTicks) == pdTRUE) { + emitPressureMetricFromTask(); + return true; + } + + if (config_.overflowPolicy == EventBusOverflowPolicy::Block) { + notifyDrop(ev.eventId, ev.payload); + return false; + } + + return handleOverflowFromTask(ev); +} + +bool ESPEventBus::enqueueFromISR(const QueuedEvent &ev, BaseType_t *higherPriorityTaskWoken) { + BaseType_t localWoken = pdFALSE; + BaseType_t res = xQueueSendFromISR(queue_, &ev, &localWoken); + if (res == pdPASS) { + propagateYieldFromISR(localWoken, higherPriorityTaskWoken); + return true; + } + + if (config_.overflowPolicy == EventBusOverflowPolicy::Block) { + notifyDrop(ev.eventId, ev.payload); + propagateYieldFromISR(localWoken, higherPriorityTaskWoken); + return false; + } + + bool ok = handleOverflowFromISR(ev, &localWoken); + propagateYieldFromISR(localWoken, higherPriorityTaskWoken); + return ok; +} + +bool ESPEventBus::handleOverflowFromTask(const QueuedEvent &ev) { + switch (config_.overflowPolicy) { + case EventBusOverflowPolicy::DropNewest: + notifyDrop(ev.eventId, ev.payload); + return false; + case EventBusOverflowPolicy::DropOldest: { + QueuedEvent dropped{}; + if (xQueueReceive(queue_, &dropped, 0) == pdTRUE) { + notifyDrop(dropped.eventId, dropped.payload); + if (xQueueSend(queue_, &ev, 0) == pdTRUE) { + emitPressureMetricFromTask(); + return true; + } + } + notifyDrop(ev.eventId, ev.payload); + return false; + } + case EventBusOverflowPolicy::Block: + default: + notifyDrop(ev.eventId, ev.payload); + return false; + } +} + +bool ESPEventBus::handleOverflowFromISR(const QueuedEvent &ev, BaseType_t *localWokenAggregate) { + switch (config_.overflowPolicy) { + case EventBusOverflowPolicy::DropNewest: + notifyDrop(ev.eventId, ev.payload); + return false; + case EventBusOverflowPolicy::DropOldest: { + QueuedEvent dropped{}; + BaseType_t localWoken = pdFALSE; + if (xQueueReceiveFromISR(queue_, &dropped, &localWoken) == pdTRUE) { + if (localWokenAggregate) { + if (localWoken == pdTRUE) { + *localWokenAggregate = pdTRUE; + } + } + notifyDrop(dropped.eventId, dropped.payload); + BaseType_t sendWoken = pdFALSE; + if (xQueueSendFromISR(queue_, &ev, &sendWoken) == pdPASS) { + if (sendWoken == pdTRUE) { + localWoken = pdTRUE; + } + if (localWokenAggregate) { + if (localWoken == pdTRUE || sendWoken == pdTRUE) { + *localWokenAggregate = pdTRUE; + } + } + return true; + } + } + notifyDrop(ev.eventId, ev.payload); + return false; + } + case EventBusOverflowPolicy::Block: + default: + notifyDrop(ev.eventId, ev.payload); + return false; + } } void ESPEventBus::emitPressureMetricFromTask() { - if (!config_.pressureCallback || !queue_ || config_.queueLength == 0 || config_.pressureThresholdPercent == 0) { - return; - } - - UBaseType_t queued = uxQueueMessagesWaiting(queue_); - uint32_t percent = (queued * 100U) / config_.queueLength; - if (percent >= config_.pressureThresholdPercent) { - config_.pressureCallback(queued, config_.queueLength, config_.pressureUserArg); - } -} - -void ESPEventBus::notifyDrop(EventBusId id, void* payload) { - if (config_.dropCallback) { - config_.dropCallback(id, payload, config_.dropUserArg); - } -} - -bool ESPEventBus::validatePayload(EventBusId id, void* payload) const { - if (!config_.payloadValidator) { - return true; - } - return config_.payloadValidator(id, payload, config_.payloadValidatorArg); -} - -void ESPEventBus::propagateYieldFromISR(BaseType_t localWoken, BaseType_t* higherPriorityTaskWoken) { - if (higherPriorityTaskWoken) { - if (localWoken == pdTRUE) { - *higherPriorityTaskWoken = pdTRUE; - } - } else if (localWoken == pdTRUE) { + if (!config_.pressureCallback || !queue_ || config_.queueLength == 0 || + config_.pressureThresholdPercent == 0) { + return; + } + + UBaseType_t queued = uxQueueMessagesWaiting(queue_); + uint32_t percent = (queued * 100U) / config_.queueLength; + if (percent >= config_.pressureThresholdPercent) { + config_.pressureCallback(queued, config_.queueLength, config_.pressureUserArg); + } +} + +void ESPEventBus::notifyDrop(EventBusId id, void *payload) { + if (config_.dropCallback) { + config_.dropCallback(id, payload, config_.dropUserArg); + } +} + +bool ESPEventBus::validatePayload(EventBusId id, void *payload) const { + if (!config_.payloadValidator) { + return true; + } + return config_.payloadValidator(id, payload, config_.payloadValidatorArg); +} + +void ESPEventBus::propagateYieldFromISR( + BaseType_t localWoken, BaseType_t *higherPriorityTaskWoken +) { + if (higherPriorityTaskWoken) { + if (localWoken == pdTRUE) { + *higherPriorityTaskWoken = pdTRUE; + } + } else if (localWoken == pdTRUE) { #if defined(portYIELD_FROM_ISR) - portYIELD_FROM_ISR(); + portYIELD_FROM_ISR(); #endif - } + } } #if defined(INCLUDE_xTaskGetCurrentTaskHandle) && (INCLUDE_xTaskGetCurrentTaskHandle == 1) TaskHandle_t ESPEventBus::currentTaskHandle() { - return xTaskGetCurrentTaskHandle(); + return xTaskGetCurrentTaskHandle(); } #else -extern "C" void* volatile pxCurrentTCB; +extern "C" void *volatile pxCurrentTCB; TaskHandle_t ESPEventBus::currentTaskHandle() { - return reinterpret_cast(pxCurrentTCB); + return reinterpret_cast(pxCurrentTCB); } #endif bool ESPEventBus::createKernelMutex() { #if defined(configSUPPORT_STATIC_ALLOCATION) && (configSUPPORT_STATIC_ALLOCATION == 1) - if (config_.usePSRAMBuffers) { - mutexStorage_ = allocateKernelStorage(sizeof(StaticSemaphore_t), true); - if (mutexStorage_) { - subMutex_ = xSemaphoreCreateMutexStatic(static_cast(mutexStorage_)); - if (subMutex_) { - return true; - } - freeKernelStorage(mutexStorage_); - mutexStorage_ = nullptr; - } - } + if (config_.usePSRAMBuffers) { + mutexStorage_ = allocateKernelStorage(sizeof(StaticSemaphore_t), true); + if (mutexStorage_) { + subMutex_ = + xSemaphoreCreateMutexStatic(static_cast(mutexStorage_)); + if (subMutex_) { + return true; + } + freeKernelStorage(mutexStorage_); + mutexStorage_ = nullptr; + } + } #endif - subMutex_ = xSemaphoreCreateMutex(); - return subMutex_ != nullptr; + subMutex_ = xSemaphoreCreateMutex(); + return subMutex_ != nullptr; } bool ESPEventBus::createKernelQueue() { #if defined(configSUPPORT_STATIC_ALLOCATION) && (configSUPPORT_STATIC_ALLOCATION == 1) - if (config_.usePSRAMBuffers) { - queueStorage_ = allocateKernelStorage(static_cast(config_.queueLength) * sizeof(QueuedEvent), true); - queueControlStorage_ = allocateKernelStorage(sizeof(StaticQueue_t), true); - if (queueStorage_ && queueControlStorage_) { - queue_ = xQueueCreateStatic( - config_.queueLength, - sizeof(QueuedEvent), - static_cast(queueStorage_), - static_cast(queueControlStorage_)); - if (queue_) { - return true; - } - } - freeKernelStorage(queueStorage_); - freeKernelStorage(queueControlStorage_); - queueStorage_ = nullptr; - queueControlStorage_ = nullptr; - } + if (config_.usePSRAMBuffers) { + queueStorage_ = allocateKernelStorage( + static_cast(config_.queueLength) * sizeof(QueuedEvent), + true + ); + queueControlStorage_ = allocateKernelStorage(sizeof(StaticQueue_t), true); + if (queueStorage_ && queueControlStorage_) { + queue_ = xQueueCreateStatic( + config_.queueLength, + sizeof(QueuedEvent), + static_cast(queueStorage_), + static_cast(queueControlStorage_) + ); + if (queue_) { + return true; + } + } + freeKernelStorage(queueStorage_); + freeKernelStorage(queueControlStorage_); + queueStorage_ = nullptr; + queueControlStorage_ = nullptr; + } #endif - queue_ = xQueueCreate(config_.queueLength, sizeof(QueuedEvent)); - return queue_ != nullptr; + queue_ = xQueueCreate(config_.queueLength, sizeof(QueuedEvent)); + return queue_ != nullptr; } -bool ESPEventBus::createWorkerTask(const char* taskName) { - const char* resolvedTaskName = (taskName && taskName[0] != '\0') ? taskName : "ESPEventBus"; - task_ = nullptr; - const BaseType_t created = xTaskCreatePinnedToCore( - &ESPEventBus::taskEntry, resolvedTaskName, config_.stackSize, this, config_.priority, &task_, config_.coreId); - return created == pdPASS && task_ != nullptr; +bool ESPEventBus::createWorkerTask(const char *taskName) { + const char *resolvedTaskName = (taskName && taskName[0] != '\0') ? taskName : "ESPEventBus"; + task_ = nullptr; + const BaseType_t created = xTaskCreatePinnedToCore( + &ESPEventBus::taskEntry, + resolvedTaskName, + config_.stackSize, + this, + config_.priority, + &task_, + config_.coreId + ); + return created == pdPASS && task_ != nullptr; } void ESPEventBus::resetSubscriptions(bool usePSRAMBuffers) { - using SubscriptionVector = EventBusVector; - subs_.~SubscriptionVector(); - new (&subs_) SubscriptionVector{ EventBusAllocator(usePSRAMBuffers) }; + using SubscriptionVector = EventBusVector; + subs_.~SubscriptionVector(); + new (&subs_) SubscriptionVector{EventBusAllocator(usePSRAMBuffers)}; } void ESPEventBus::clearWaiters() { - for (auto& waiter : waiters_) { - if (waiter.queue) { - void* nullPayload = nullptr; - (void)xQueueReset(waiter.queue); - (void)xQueueSend(waiter.queue, &nullPayload, 0); - vQueueDelete(waiter.queue); - waiter.queue = nullptr; - } - freeKernelStorage(waiter.queueStorage); - freeKernelStorage(waiter.queueControlStorage); - waiter.queueStorage = nullptr; - waiter.queueControlStorage = nullptr; - } - waiters_.clear(); + for (auto &waiter : waiters_) { + if (waiter.queue) { + void *nullPayload = nullptr; + (void)xQueueReset(waiter.queue); + (void)xQueueSend(waiter.queue, &nullPayload, 0); + vQueueDelete(waiter.queue); + waiter.queue = nullptr; + } + freeKernelStorage(waiter.queueStorage); + freeKernelStorage(waiter.queueControlStorage); + waiter.queueStorage = nullptr; + waiter.queueControlStorage = nullptr; + } + waiters_.clear(); } void ESPEventBus::resetWaiters(bool usePSRAMBuffers) { - using WaiterVector = EventBusVector; - waiters_.~WaiterVector(); - new (&waiters_) WaiterVector{ EventBusAllocator(usePSRAMBuffers) }; + using WaiterVector = EventBusVector; + waiters_.~WaiterVector(); + new (&waiters_) WaiterVector{EventBusAllocator(usePSRAMBuffers)}; } -ESPEventBus::WaiterContext* ESPEventBus::findWaiterLocked(TaskHandle_t ownerTask, EventBusId eventId) { - for (auto& waiter : waiters_) { - if (waiter.ownerTask == ownerTask && waiter.eventId == eventId) { - return &waiter; - } - } - return nullptr; +ESPEventBus::WaiterContext * +ESPEventBus::findWaiterLocked(TaskHandle_t ownerTask, EventBusId eventId) { + for (auto &waiter : waiters_) { + if (waiter.ownerTask == ownerTask && waiter.eventId == eventId) { + return &waiter; + } + } + return nullptr; } -bool ESPEventBus::createWaiterQueue(WaiterContext& waiter) { +bool ESPEventBus::createWaiterQueue(WaiterContext &waiter) { #if defined(configSUPPORT_STATIC_ALLOCATION) && (configSUPPORT_STATIC_ALLOCATION == 1) - if (config_.usePSRAMBuffers) { - waiter.queueStorage = allocateKernelStorage(sizeof(void*), true); - waiter.queueControlStorage = allocateKernelStorage(sizeof(StaticQueue_t), true); - if (waiter.queueStorage && waiter.queueControlStorage) { - waiter.queue = xQueueCreateStatic( - 1, - sizeof(void*), - static_cast(waiter.queueStorage), - static_cast(waiter.queueControlStorage)); - } - if (waiter.queue) { - return true; - } - freeKernelStorage(waiter.queueStorage); - freeKernelStorage(waiter.queueControlStorage); - waiter.queueStorage = nullptr; - waiter.queueControlStorage = nullptr; - } + if (config_.usePSRAMBuffers) { + waiter.queueStorage = allocateKernelStorage(sizeof(void *), true); + waiter.queueControlStorage = allocateKernelStorage(sizeof(StaticQueue_t), true); + if (waiter.queueStorage && waiter.queueControlStorage) { + waiter.queue = xQueueCreateStatic( + 1, + sizeof(void *), + static_cast(waiter.queueStorage), + static_cast(waiter.queueControlStorage) + ); + } + if (waiter.queue) { + return true; + } + freeKernelStorage(waiter.queueStorage); + freeKernelStorage(waiter.queueControlStorage); + waiter.queueStorage = nullptr; + waiter.queueControlStorage = nullptr; + } #endif - waiter.queue = xQueueCreate(1, sizeof(void*)); - return waiter.queue != nullptr; + waiter.queue = xQueueCreate(1, sizeof(void *)); + return waiter.queue != nullptr; } void ESPEventBus::resetKernelStorage() { - freeKernelStorage(mutexStorage_); - freeKernelStorage(queueStorage_); - freeKernelStorage(queueControlStorage_); - freeKernelStorage(taskStackStorage_); - freeKernelStorage(taskControlStorage_); - mutexStorage_ = nullptr; - queueStorage_ = nullptr; - queueControlStorage_ = nullptr; - taskStackStorage_ = nullptr; - taskControlStorage_ = nullptr; + freeKernelStorage(mutexStorage_); + freeKernelStorage(queueStorage_); + freeKernelStorage(queueControlStorage_); + freeKernelStorage(taskStackStorage_); + freeKernelStorage(taskControlStorage_); + mutexStorage_ = nullptr; + queueStorage_ = nullptr; + queueControlStorage_ = nullptr; + taskStackStorage_ = nullptr; + taskControlStorage_ = nullptr; } size_t ESPEventBus::taskStackWords(uint32_t stackSizeBytes) { - const size_t bytes = static_cast(stackSizeBytes); - const size_t wordSize = sizeof(StackType_t); - return (bytes + (wordSize - 1U)) / wordSize; + const size_t bytes = static_cast(stackSizeBytes); + const size_t wordSize = sizeof(StackType_t); + return (bytes + (wordSize - 1U)) / wordSize; } -void* ESPEventBus::allocateKernelStorage(size_t bytes, bool usePSRAMBuffers) { - return eventbus_allocator_detail::allocate(bytes, usePSRAMBuffers); +void *ESPEventBus::allocateKernelStorage(size_t bytes, bool usePSRAMBuffers) { + return eventbus_allocator_detail::allocate(bytes, usePSRAMBuffers); } -void ESPEventBus::freeKernelStorage(void* ptr) { - if (ptr) { - eventbus_allocator_detail::deallocate(ptr); - } +void ESPEventBus::freeKernelStorage(void *ptr) { + if (ptr) { + eventbus_allocator_detail::deallocate(ptr); + } } diff --git a/src/esp_eventbus/eventbus.h b/src/esp_eventbus/eventbus.h index 2a3f671..4711d20 100644 --- a/src/esp_eventbus/eventbus.h +++ b/src/esp_eventbus/eventbus.h @@ -16,160 +16,149 @@ using EventBusId = uint16_t; using EventBusSub = uint32_t; -using EventCallbackFn = void (*)(void* payload, void* userArg); -using EventCallback = std::function; +using EventCallbackFn = void (*)(void *payload, void *userArg); +using EventCallback = std::function; enum class EventBusOverflowPolicy : uint8_t { - Block, - DropNewest, - DropOldest, + Block, + DropNewest, + DropOldest, }; -using EventBusQueuePressureFn = void (*)(UBaseType_t queued, UBaseType_t capacity, void* userArg); -using EventBusDropFn = void (*)(EventBusId id, void* payload, void* userArg); -using EventBusPayloadValidatorFn = bool (*)(EventBusId id, void* payload, void* userArg); +using EventBusQueuePressureFn = void (*)(UBaseType_t queued, UBaseType_t capacity, void *userArg); +using EventBusDropFn = void (*)(EventBusId id, void *payload, void *userArg); +using EventBusPayloadValidatorFn = bool (*)(EventBusId id, void *payload, void *userArg); struct EventBusConfig { - uint16_t queueLength = 16; - UBaseType_t priority = 5; - uint32_t stackSize = 4096 * sizeof(StackType_t); - BaseType_t coreId = tskNO_AFFINITY; - const char* taskName = "ESPEventBus"; - uint16_t maxSubscriptions = 0; // 0 => unlimited - bool usePSRAMBuffers = false; - EventBusOverflowPolicy overflowPolicy = EventBusOverflowPolicy::Block; - uint8_t pressureThresholdPercent = 90; // Percentage (1-100) before invoking pressure callback - EventBusQueuePressureFn pressureCallback = nullptr; - void* pressureUserArg = nullptr; - EventBusDropFn dropCallback = nullptr; - void* dropUserArg = nullptr; - EventBusPayloadValidatorFn payloadValidator = nullptr; - void* payloadValidatorArg = nullptr; + uint16_t queueLength = 16; + UBaseType_t priority = 5; + uint32_t stackSize = 4096 * sizeof(StackType_t); + BaseType_t coreId = tskNO_AFFINITY; + const char *taskName = "ESPEventBus"; + uint16_t maxSubscriptions = 0; // 0 => unlimited + bool usePSRAMBuffers = false; + EventBusOverflowPolicy overflowPolicy = EventBusOverflowPolicy::Block; + uint8_t pressureThresholdPercent = 90; // Percentage (1-100) before invoking pressure callback + EventBusQueuePressureFn pressureCallback = nullptr; + void *pressureUserArg = nullptr; + EventBusDropFn dropCallback = nullptr; + void *dropUserArg = nullptr; + EventBusPayloadValidatorFn payloadValidator = nullptr; + void *payloadValidatorArg = nullptr; }; class ESPEventBus { public: - ESPEventBus(); - ~ESPEventBus(); + ESPEventBus(); + ~ESPEventBus(); - ESPEventBus(const ESPEventBus&) = delete; - ESPEventBus& operator=(const ESPEventBus&) = delete; + ESPEventBus(const ESPEventBus &) = delete; + ESPEventBus &operator=(const ESPEventBus &) = delete; - bool init(const EventBusConfig& config = EventBusConfig{}); - void deinit(); - bool isInitialized() const; + bool init(const EventBusConfig &config = EventBusConfig{}); + void deinit(); + bool isInitialized() const; - bool post(EventBusId id, void* payload, TickType_t timeout = 0); + bool post(EventBusId id, void *payload, TickType_t timeout = 0); - template - bool post(Id id, void* payload, TickType_t timeout = 0) { - return post(static_cast(id), payload, timeout); - } + template bool post(Id id, void *payload, TickType_t timeout = 0) { + return post(static_cast(id), payload, timeout); + } - bool postFromISR(EventBusId id, void* payload, BaseType_t* higherPriorityTaskWoken = nullptr); + bool postFromISR(EventBusId id, void *payload, BaseType_t *higherPriorityTaskWoken = nullptr); - template - bool postFromISR(Id id, void* payload, BaseType_t* higherPriorityTaskWoken = nullptr) { - return postFromISR(static_cast(id), payload, higherPriorityTaskWoken); - } + template + bool postFromISR(Id id, void *payload, BaseType_t *higherPriorityTaskWoken = nullptr) { + return postFromISR(static_cast(id), payload, higherPriorityTaskWoken); + } - EventBusSub subscribe(EventBusId id, - EventCallbackFn cb, - void* userArg = nullptr, - bool oneshot = false); + EventBusSub + subscribe(EventBusId id, EventCallbackFn cb, void *userArg = nullptr, bool oneshot = false); - EventBusSub subscribe(EventBusId id, - EventCallback cb, - void* userArg = nullptr, - bool oneshot = false); + EventBusSub + subscribe(EventBusId id, EventCallback cb, void *userArg = nullptr, bool oneshot = false); - template - EventBusSub subscribe(Id id, - EventCallbackFn cb, - void* userArg = nullptr, - bool oneshot = false) { - return subscribe(static_cast(id), cb, userArg, oneshot); - } + template + EventBusSub + subscribe(Id id, EventCallbackFn cb, void *userArg = nullptr, bool oneshot = false) { + return subscribe(static_cast(id), cb, userArg, oneshot); + } - template - EventBusSub subscribe(Id id, - EventCallback cb, - void* userArg = nullptr, - bool oneshot = false) { - return subscribe(static_cast(id), std::move(cb), userArg, oneshot); - } + template + EventBusSub subscribe(Id id, EventCallback cb, void *userArg = nullptr, bool oneshot = false) { + return subscribe(static_cast(id), std::move(cb), userArg, oneshot); + } - void unsubscribe(EventBusSub subId); + void unsubscribe(EventBusSub subId); - void* waitFor(EventBusId id, TickType_t timeout = portMAX_DELAY); + void *waitFor(EventBusId id, TickType_t timeout = portMAX_DELAY); - template - void* waitFor(Id id, TickType_t timeout = portMAX_DELAY) { - return waitFor(static_cast(id), timeout); - } + template void *waitFor(Id id, TickType_t timeout = portMAX_DELAY) { + return waitFor(static_cast(id), timeout); + } private: - struct Subscription { - EventBusSub subId = 0; - EventBusId eventId = 0; - EventCallback cb; - void* userArg = nullptr; - bool oneshot = false; - bool active = false; - }; - - struct QueuedEvent { - EventBusId eventId = 0; - void* payload = nullptr; - bool stop = false; - }; - - struct WaiterContext { - TaskHandle_t ownerTask = nullptr; - EventBusId eventId = 0; - QueueHandle_t queue = nullptr; - void* queueStorage = nullptr; - void* queueControlStorage = nullptr; - }; - - static void taskEntry(void* arg); - void taskLoop(); - void stopTask(); - void compactSubscriptionsLocked(); - void clearWaiters(); - void resetWaiters(bool usePSRAMBuffers); - WaiterContext* findWaiterLocked(TaskHandle_t ownerTask, EventBusId eventId); - bool createWaiterQueue(WaiterContext& waiter); - bool enqueueFromTask(const QueuedEvent& ev, TickType_t timeout); - bool enqueueFromISR(const QueuedEvent& ev, BaseType_t* higherPriorityTaskWoken); - bool handleOverflowFromTask(const QueuedEvent& ev); - bool handleOverflowFromISR(const QueuedEvent& ev, BaseType_t* localWoken); - void emitPressureMetricFromTask(); - void notifyDrop(EventBusId id, void* payload); - bool validatePayload(EventBusId id, void* payload) const; - void propagateYieldFromISR(BaseType_t localWoken, BaseType_t* higherPriorityTaskWoken); - static TaskHandle_t currentTaskHandle(); - bool createKernelMutex(); - bool createKernelQueue(); - bool createWorkerTask(const char* taskName); - void resetSubscriptions(bool usePSRAMBuffers); - void resetKernelStorage(); - static size_t taskStackWords(uint32_t stackSizeBytes); - static void* allocateKernelStorage(size_t bytes, bool usePSRAMBuffers); - static void freeKernelStorage(void* ptr); - - QueueHandle_t queue_ = nullptr; - TaskHandle_t task_ = nullptr; - SemaphoreHandle_t subMutex_ = nullptr; - EventBusVector subs_; - EventBusVector waiters_; - EventBusSub nextSubId_ = 0; - EventBusConfig config_{}; - bool running_ = false; - bool stopEventPending_ = false; - void* mutexStorage_ = nullptr; - void* queueStorage_ = nullptr; - void* queueControlStorage_ = nullptr; - void* taskStackStorage_ = nullptr; - void* taskControlStorage_ = nullptr; + struct Subscription { + EventBusSub subId = 0; + EventBusId eventId = 0; + EventCallback cb; + void *userArg = nullptr; + bool oneshot = false; + bool active = false; + }; + + struct QueuedEvent { + EventBusId eventId = 0; + void *payload = nullptr; + bool stop = false; + }; + + struct WaiterContext { + TaskHandle_t ownerTask = nullptr; + EventBusId eventId = 0; + QueueHandle_t queue = nullptr; + void *queueStorage = nullptr; + void *queueControlStorage = nullptr; + }; + + static void taskEntry(void *arg); + void taskLoop(); + void stopTask(); + void compactSubscriptionsLocked(); + void clearWaiters(); + void resetWaiters(bool usePSRAMBuffers); + WaiterContext *findWaiterLocked(TaskHandle_t ownerTask, EventBusId eventId); + bool createWaiterQueue(WaiterContext &waiter); + bool enqueueFromTask(const QueuedEvent &ev, TickType_t timeout); + bool enqueueFromISR(const QueuedEvent &ev, BaseType_t *higherPriorityTaskWoken); + bool handleOverflowFromTask(const QueuedEvent &ev); + bool handleOverflowFromISR(const QueuedEvent &ev, BaseType_t *localWoken); + void emitPressureMetricFromTask(); + void notifyDrop(EventBusId id, void *payload); + bool validatePayload(EventBusId id, void *payload) const; + void propagateYieldFromISR(BaseType_t localWoken, BaseType_t *higherPriorityTaskWoken); + static TaskHandle_t currentTaskHandle(); + bool createKernelMutex(); + bool createKernelQueue(); + bool createWorkerTask(const char *taskName); + void resetSubscriptions(bool usePSRAMBuffers); + void resetKernelStorage(); + static size_t taskStackWords(uint32_t stackSizeBytes); + static void *allocateKernelStorage(size_t bytes, bool usePSRAMBuffers); + static void freeKernelStorage(void *ptr); + + QueueHandle_t queue_ = nullptr; + TaskHandle_t task_ = nullptr; + SemaphoreHandle_t subMutex_ = nullptr; + EventBusVector subs_; + EventBusVector waiters_; + EventBusSub nextSubId_ = 0; + EventBusConfig config_{}; + bool running_ = false; + bool stopEventPending_ = false; + void *mutexStorage_ = nullptr; + void *queueStorage_ = nullptr; + void *queueControlStorage_ = nullptr; + void *taskStackStorage_ = nullptr; + void *taskControlStorage_ = nullptr; }; diff --git a/src/esp_eventbus/eventbus_allocator.h b/src/esp_eventbus/eventbus_allocator.h index 8269e9d..76eab1c 100644 --- a/src/esp_eventbus/eventbus_allocator.h +++ b/src/esp_eventbus/eventbus_allocator.h @@ -16,74 +16,72 @@ #include namespace eventbus_allocator_detail { -inline void* allocate(size_t bytes, bool usePSRAMBuffers) noexcept { +inline void *allocate(size_t bytes, bool usePSRAMBuffers) noexcept { #if ESP_EVENTBUS_HAS_BUFFER_MANAGER - return ESPBufferManager::allocate(bytes, usePSRAMBuffers); + return ESPBufferManager::allocate(bytes, usePSRAMBuffers); #else - (void)usePSRAMBuffers; - return std::malloc(bytes); + (void)usePSRAMBuffers; + return std::malloc(bytes); #endif } -inline void deallocate(void* ptr) noexcept { +inline void deallocate(void *ptr) noexcept { #if ESP_EVENTBUS_HAS_BUFFER_MANAGER - ESPBufferManager::deallocate(ptr); + ESPBufferManager::deallocate(ptr); #else - std::free(ptr); + std::free(ptr); #endif } -} // namespace eventbus_allocator_detail +} // namespace eventbus_allocator_detail -template -class EventBusAllocator { +template class EventBusAllocator { public: - using value_type = T; - - EventBusAllocator() noexcept = default; - explicit EventBusAllocator(bool usePSRAMBuffers) noexcept : usePSRAMBuffers_(usePSRAMBuffers) {} - - template - EventBusAllocator(const EventBusAllocator& other) noexcept : usePSRAMBuffers_(other.usePSRAMBuffers()) {} - - T* allocate(size_t count) { - if (count == 0) { - return nullptr; - } - if (count > (std::numeric_limits::max() / sizeof(T))) { - std::abort(); - } - - void* mem = eventbus_allocator_detail::allocate(count * sizeof(T), usePSRAMBuffers_); - if (!mem) { - std::abort(); - } - return static_cast(mem); - } - - void deallocate(T* ptr, size_t) noexcept { - eventbus_allocator_detail::deallocate(ptr); - } - - bool usePSRAMBuffers() const noexcept { - return usePSRAMBuffers_; - } - - template - bool operator==(const EventBusAllocator& other) const noexcept { - return usePSRAMBuffers_ == other.usePSRAMBuffers(); - } - - template - bool operator!=(const EventBusAllocator& other) const noexcept { - return !(*this == other); - } + using value_type = T; + + EventBusAllocator() noexcept = default; + explicit EventBusAllocator(bool usePSRAMBuffers) noexcept : usePSRAMBuffers_(usePSRAMBuffers) { + } + + template + EventBusAllocator(const EventBusAllocator &other) noexcept + : usePSRAMBuffers_(other.usePSRAMBuffers()) { + } + + T *allocate(size_t count) { + if (count == 0) { + return nullptr; + } + if (count > (std::numeric_limits::max() / sizeof(T))) { + std::abort(); + } + + void *mem = eventbus_allocator_detail::allocate(count * sizeof(T), usePSRAMBuffers_); + if (!mem) { + std::abort(); + } + return static_cast(mem); + } + + void deallocate(T *ptr, size_t) noexcept { + eventbus_allocator_detail::deallocate(ptr); + } + + bool usePSRAMBuffers() const noexcept { + return usePSRAMBuffers_; + } + + template bool operator==(const EventBusAllocator &other) const noexcept { + return usePSRAMBuffers_ == other.usePSRAMBuffers(); + } + + template bool operator!=(const EventBusAllocator &other) const noexcept { + return !(*this == other); + } private: - template - friend class EventBusAllocator; + template friend class EventBusAllocator; - bool usePSRAMBuffers_ = false; + bool usePSRAMBuffers_ = false; }; -template -using EventBusVector = std::vector>; +template using EventBusVector = std::vector>; diff --git a/test/test_eventbus/test_eventbus.cpp b/test/test_eventbus/test_eventbus.cpp index 5ca7b11..d5b5bdb 100644 --- a/test/test_eventbus/test_eventbus.cpp +++ b/test/test_eventbus/test_eventbus.cpp @@ -6,280 +6,282 @@ #include "freertos/task.h" enum class TestEvent : uint16_t { - FastTick = 1, - SlowTick, + FastTick = 1, + SlowTick, }; -static void noopCallback(void*, void*) { +static void noopCallback(void *, void *) { } struct DropContext { - volatile int received = 0; - volatile int dropped = 0; + volatile int received = 0; + volatile int dropped = 0; }; struct CounterContext { - volatile int callbacks = 0; + volatile int callbacks = 0; }; -static void slowSubscriber(void*, void* userArg) { - auto* ctx = static_cast(userArg); - if (ctx) { - ctx->received++; - } - vTaskDelay(pdMS_TO_TICKS(25)); +static void slowSubscriber(void *, void *userArg) { + auto *ctx = static_cast(userArg); + if (ctx) { + ctx->received++; + } + vTaskDelay(pdMS_TO_TICKS(25)); } -static void dropCallback(EventBusId, void*, void* userArg) { - auto* ctx = static_cast(userArg); - if (ctx) { - ctx->dropped++; - } +static void dropCallback(EventBusId, void *, void *userArg) { + auto *ctx = static_cast(userArg); + if (ctx) { + ctx->dropped++; + } } -static void counterCallback(void*, void* userArg) { - auto* ctx = static_cast(userArg); - if (ctx) { - ctx->callbacks++; - } +static void counterCallback(void *, void *userArg) { + auto *ctx = static_cast(userArg); + if (ctx) { + ctx->callbacks++; + } } struct PressureContext { - volatile bool triggered = false; + volatile bool triggered = false; }; -static void pressureCallback(UBaseType_t, UBaseType_t, void* userArg) { - auto* ctx = static_cast(userArg); - if (ctx) { - ctx->triggered = true; - } +static void pressureCallback(UBaseType_t, UBaseType_t, void *userArg) { + auto *ctx = static_cast(userArg); + if (ctx) { + ctx->triggered = true; + } } struct TestPayload { - int value; + int value; }; struct PoolContext { - TestPayload* pool; - size_t size; + TestPayload *pool; + size_t size; }; -static bool poolValidator(EventBusId, void* payload, void* userArg) { - auto* ctx = static_cast(userArg); - if (!ctx) { - return true; - } - for (size_t i = 0; i < ctx->size; ++i) { - if (payload == &ctx->pool[i]) { - return true; - } - } - return false; +static bool poolValidator(EventBusId, void *payload, void *userArg) { + auto *ctx = static_cast(userArg); + if (!ctx) { + return true; + } + for (size_t i = 0; i < ctx->size; ++i) { + if (payload == &ctx->pool[i]) { + return true; + } + } + return false; } struct ProducerContext { - ESPEventBus* bus = nullptr; - TestPayload payload{}; - volatile bool stop = false; - TaskHandle_t handle = nullptr; + ESPEventBus *bus = nullptr; + TestPayload payload{}; + volatile bool stop = false; + TaskHandle_t handle = nullptr; }; -static void producerTask(void* arg) { - auto* ctx = static_cast(arg); - while (!ctx->stop) { - if (ctx->bus) { - ctx->bus->post(TestEvent::FastTick, &ctx->payload, 0); - } - vTaskDelay(pdMS_TO_TICKS(1)); - } - ctx->handle = nullptr; - vTaskDelete(nullptr); +static void producerTask(void *arg) { + auto *ctx = static_cast(arg); + while (!ctx->stop) { + if (ctx->bus) { + ctx->bus->post(TestEvent::FastTick, &ctx->payload, 0); + } + vTaskDelay(pdMS_TO_TICKS(1)); + } + ctx->handle = nullptr; + vTaskDelete(nullptr); } void test_subscription_cap_limits_handles() { - ESPEventBus bus; - EventBusConfig cfg{}; - cfg.maxSubscriptions = 2; - TEST_ASSERT_TRUE(bus.init(cfg)); + ESPEventBus bus; + EventBusConfig cfg{}; + cfg.maxSubscriptions = 2; + TEST_ASSERT_TRUE(bus.init(cfg)); - auto subA = bus.subscribe(TestEvent::FastTick, noopCallback); - auto subB = bus.subscribe(TestEvent::FastTick, noopCallback); - auto subC = bus.subscribe(TestEvent::FastTick, noopCallback); + auto subA = bus.subscribe(TestEvent::FastTick, noopCallback); + auto subB = bus.subscribe(TestEvent::FastTick, noopCallback); + auto subC = bus.subscribe(TestEvent::FastTick, noopCallback); - TEST_ASSERT_NOT_EQUAL(0U, subA); - TEST_ASSERT_NOT_EQUAL(0U, subB); - TEST_ASSERT_EQUAL_UINT32(0, subC); + TEST_ASSERT_NOT_EQUAL(0U, subA); + TEST_ASSERT_NOT_EQUAL(0U, subB); + TEST_ASSERT_EQUAL_UINT32(0, subC); - bus.deinit(); + bus.deinit(); } void test_payload_validator_rejects_unknown_payloads() { - ESPEventBus bus; - TestPayload pool[2]{}; - PoolContext ctx{ pool, 2 }; + ESPEventBus bus; + TestPayload pool[2]{}; + PoolContext ctx{pool, 2}; - EventBusConfig cfg{}; - cfg.payloadValidator = poolValidator; - cfg.payloadValidatorArg = &ctx; - TEST_ASSERT_TRUE(bus.init(cfg)); + EventBusConfig cfg{}; + cfg.payloadValidator = poolValidator; + cfg.payloadValidatorArg = &ctx; + TEST_ASSERT_TRUE(bus.init(cfg)); - TEST_ASSERT_TRUE(bus.post(TestEvent::FastTick, &pool[0], portMAX_DELAY)); - TestPayload notOwned{}; - TEST_ASSERT_FALSE(bus.post(TestEvent::FastTick, ¬Owned, 0)); + TEST_ASSERT_TRUE(bus.post(TestEvent::FastTick, &pool[0], portMAX_DELAY)); + TestPayload notOwned{}; + TEST_ASSERT_FALSE(bus.post(TestEvent::FastTick, ¬Owned, 0)); - vTaskDelay(pdMS_TO_TICKS(20)); - bus.deinit(); + vTaskDelay(pdMS_TO_TICKS(20)); + bus.deinit(); } void test_drop_oldest_policy_discards_backlog() { - ESPEventBus bus; - DropContext ctx{}; - TestPayload scratch[8]{}; - - EventBusConfig cfg{}; - cfg.queueLength = 2; - cfg.overflowPolicy = EventBusOverflowPolicy::DropOldest; - cfg.dropCallback = dropCallback; - cfg.dropUserArg = &ctx; - TEST_ASSERT_TRUE(bus.init(cfg)); - - bus.subscribe(TestEvent::FastTick, slowSubscriber, &ctx); - for (int i = 0; i < 8; ++i) { - bus.post(TestEvent::FastTick, &scratch[i], 0); - } - - vTaskDelay(pdMS_TO_TICKS(200)); - TEST_ASSERT_TRUE(ctx.dropped > 0); - TEST_ASSERT_TRUE(ctx.received > 0); - - bus.deinit(); + ESPEventBus bus; + DropContext ctx{}; + TestPayload scratch[8]{}; + + EventBusConfig cfg{}; + cfg.queueLength = 2; + cfg.overflowPolicy = EventBusOverflowPolicy::DropOldest; + cfg.dropCallback = dropCallback; + cfg.dropUserArg = &ctx; + TEST_ASSERT_TRUE(bus.init(cfg)); + + bus.subscribe(TestEvent::FastTick, slowSubscriber, &ctx); + for (int i = 0; i < 8; ++i) { + bus.post(TestEvent::FastTick, &scratch[i], 0); + } + + vTaskDelay(pdMS_TO_TICKS(200)); + TEST_ASSERT_TRUE(ctx.dropped > 0); + TEST_ASSERT_TRUE(ctx.received > 0); + + bus.deinit(); } void test_drop_newest_policy_discards_incoming_event() { - ESPEventBus bus; - DropContext ctx{}; - TestPayload scratch[6]{}; - - EventBusConfig cfg{}; - cfg.queueLength = 1; - cfg.overflowPolicy = EventBusOverflowPolicy::DropNewest; - cfg.dropCallback = dropCallback; - cfg.dropUserArg = &ctx; - TEST_ASSERT_TRUE(bus.init(cfg)); - - bus.subscribe(TestEvent::FastTick, slowSubscriber, &ctx); - for (int i = 0; i < 6; ++i) { - bus.post(TestEvent::FastTick, &scratch[i], 0); - } - - vTaskDelay(pdMS_TO_TICKS(200)); - TEST_ASSERT_TRUE(ctx.dropped > 0); - TEST_ASSERT_TRUE(ctx.received < 6); - - bus.deinit(); + ESPEventBus bus; + DropContext ctx{}; + TestPayload scratch[6]{}; + + EventBusConfig cfg{}; + cfg.queueLength = 1; + cfg.overflowPolicy = EventBusOverflowPolicy::DropNewest; + cfg.dropCallback = dropCallback; + cfg.dropUserArg = &ctx; + TEST_ASSERT_TRUE(bus.init(cfg)); + + bus.subscribe(TestEvent::FastTick, slowSubscriber, &ctx); + for (int i = 0; i < 6; ++i) { + bus.post(TestEvent::FastTick, &scratch[i], 0); + } + + vTaskDelay(pdMS_TO_TICKS(200)); + TEST_ASSERT_TRUE(ctx.dropped > 0); + TEST_ASSERT_TRUE(ctx.received < 6); + + bus.deinit(); } void test_pressure_callback_triggers_on_high_usage() { - ESPEventBus bus; - DropContext dropCtx{}; - PressureContext pressureCtx{}; - - EventBusConfig cfg{}; - cfg.queueLength = 3; - cfg.overflowPolicy = EventBusOverflowPolicy::DropOldest; - cfg.pressureThresholdPercent = 50; - cfg.pressureCallback = pressureCallback; - cfg.pressureUserArg = &pressureCtx; - cfg.dropCallback = dropCallback; - cfg.dropUserArg = &dropCtx; - - TEST_ASSERT_TRUE(bus.init(cfg)); - bus.subscribe(TestEvent::FastTick, slowSubscriber, &dropCtx); - - TestPayload scratch[5]{}; - for (int i = 0; i < 5; ++i) { - bus.post(TestEvent::FastTick, &scratch[i], 0); - } - - vTaskDelay(pdMS_TO_TICKS(150)); - TEST_ASSERT_TRUE(pressureCtx.triggered); - - bus.deinit(); + ESPEventBus bus; + DropContext dropCtx{}; + PressureContext pressureCtx{}; + + EventBusConfig cfg{}; + cfg.queueLength = 3; + cfg.overflowPolicy = EventBusOverflowPolicy::DropOldest; + cfg.pressureThresholdPercent = 50; + cfg.pressureCallback = pressureCallback; + cfg.pressureUserArg = &pressureCtx; + cfg.dropCallback = dropCallback; + cfg.dropUserArg = &dropCtx; + + TEST_ASSERT_TRUE(bus.init(cfg)); + bus.subscribe(TestEvent::FastTick, slowSubscriber, &dropCtx); + + TestPayload scratch[5]{}; + for (int i = 0; i < 5; ++i) { + bus.post(TestEvent::FastTick, &scratch[i], 0); + } + + vTaskDelay(pdMS_TO_TICKS(150)); + TEST_ASSERT_TRUE(pressureCtx.triggered); + + bus.deinit(); } void test_deinit_completes_when_queue_is_busy() { - ESPEventBus bus; - DropContext ctx{}; - - EventBusConfig cfg{}; - cfg.queueLength = 1; - cfg.overflowPolicy = EventBusOverflowPolicy::Block; - TEST_ASSERT_TRUE(bus.init(cfg)); - - bus.subscribe(TestEvent::FastTick, slowSubscriber, &ctx); - - ProducerContext producer{}; - producer.bus = &bus; - TEST_ASSERT_EQUAL(pdPASS, xTaskCreatePinnedToCore( - producerTask, "producer", 2048, &producer, 4, &producer.handle, 1)); - - vTaskDelay(pdMS_TO_TICKS(100)); - TickType_t start = xTaskGetTickCount(); - bus.deinit(); - TickType_t elapsed = xTaskGetTickCount() - start; - TEST_ASSERT_LESS_THAN_UINT32(pdMS_TO_TICKS(1000), elapsed); - - producer.stop = true; - while (producer.handle != nullptr) { - vTaskDelay(pdMS_TO_TICKS(10)); - } + ESPEventBus bus; + DropContext ctx{}; + + EventBusConfig cfg{}; + cfg.queueLength = 1; + cfg.overflowPolicy = EventBusOverflowPolicy::Block; + TEST_ASSERT_TRUE(bus.init(cfg)); + + bus.subscribe(TestEvent::FastTick, slowSubscriber, &ctx); + + ProducerContext producer{}; + producer.bus = &bus; + TEST_ASSERT_EQUAL( + pdPASS, + xTaskCreatePinnedToCore(producerTask, "producer", 2048, &producer, 4, &producer.handle, 1) + ); + + vTaskDelay(pdMS_TO_TICKS(100)); + TickType_t start = xTaskGetTickCount(); + bus.deinit(); + TickType_t elapsed = xTaskGetTickCount() - start; + TEST_ASSERT_LESS_THAN_UINT32(pdMS_TO_TICKS(1000), elapsed); + + producer.stop = true; + while (producer.handle != nullptr) { + vTaskDelay(pdMS_TO_TICKS(10)); + } } void test_deinit_is_safe_before_init_and_idempotent() { - ESPEventBus bus; - TestPayload payload{}; + ESPEventBus bus; + TestPayload payload{}; - TEST_ASSERT_FALSE(bus.isInitialized()); + TEST_ASSERT_FALSE(bus.isInitialized()); - bus.deinit(); - TEST_ASSERT_FALSE(bus.isInitialized()); - TEST_ASSERT_FALSE(bus.post(TestEvent::FastTick, &payload, 0)); + bus.deinit(); + TEST_ASSERT_FALSE(bus.isInitialized()); + TEST_ASSERT_FALSE(bus.post(TestEvent::FastTick, &payload, 0)); - bus.deinit(); - TEST_ASSERT_FALSE(bus.isInitialized()); - TEST_ASSERT_FALSE(bus.post(TestEvent::FastTick, &payload, 0)); + bus.deinit(); + TEST_ASSERT_FALSE(bus.isInitialized()); + TEST_ASSERT_FALSE(bus.post(TestEvent::FastTick, &payload, 0)); } void test_reinit_clears_subscriptions_and_restores_bus() { - ESPEventBus bus; - CounterContext ctx{}; - TestPayload payload{}; - - TEST_ASSERT_TRUE(bus.init()); - TEST_ASSERT_TRUE(bus.isInitialized()); - TEST_ASSERT_NOT_EQUAL(0U, bus.subscribe(TestEvent::FastTick, counterCallback, &ctx)); - TEST_ASSERT_TRUE(bus.post(TestEvent::FastTick, &payload, portMAX_DELAY)); - vTaskDelay(pdMS_TO_TICKS(30)); - TEST_ASSERT_TRUE(ctx.callbacks > 0); - - bus.deinit(); - TEST_ASSERT_FALSE(bus.isInitialized()); - - const int callbacksAfterFirstRun = ctx.callbacks; - TEST_ASSERT_TRUE(bus.init()); - TEST_ASSERT_TRUE(bus.isInitialized()); - TEST_ASSERT_TRUE(bus.post(TestEvent::FastTick, &payload, portMAX_DELAY)); - vTaskDelay(pdMS_TO_TICKS(30)); - TEST_ASSERT_EQUAL_INT(callbacksAfterFirstRun, ctx.callbacks); - - TEST_ASSERT_NOT_EQUAL(0U, bus.subscribe(TestEvent::FastTick, counterCallback, &ctx)); - TEST_ASSERT_TRUE(bus.post(TestEvent::FastTick, &payload, portMAX_DELAY)); - vTaskDelay(pdMS_TO_TICKS(30)); - TEST_ASSERT_TRUE(ctx.callbacks > callbacksAfterFirstRun); - - bus.deinit(); - bus.deinit(); - TEST_ASSERT_FALSE(bus.isInitialized()); + ESPEventBus bus; + CounterContext ctx{}; + TestPayload payload{}; + + TEST_ASSERT_TRUE(bus.init()); + TEST_ASSERT_TRUE(bus.isInitialized()); + TEST_ASSERT_NOT_EQUAL(0U, bus.subscribe(TestEvent::FastTick, counterCallback, &ctx)); + TEST_ASSERT_TRUE(bus.post(TestEvent::FastTick, &payload, portMAX_DELAY)); + vTaskDelay(pdMS_TO_TICKS(30)); + TEST_ASSERT_TRUE(ctx.callbacks > 0); + + bus.deinit(); + TEST_ASSERT_FALSE(bus.isInitialized()); + + const int callbacksAfterFirstRun = ctx.callbacks; + TEST_ASSERT_TRUE(bus.init()); + TEST_ASSERT_TRUE(bus.isInitialized()); + TEST_ASSERT_TRUE(bus.post(TestEvent::FastTick, &payload, portMAX_DELAY)); + vTaskDelay(pdMS_TO_TICKS(30)); + TEST_ASSERT_EQUAL_INT(callbacksAfterFirstRun, ctx.callbacks); + + TEST_ASSERT_NOT_EQUAL(0U, bus.subscribe(TestEvent::FastTick, counterCallback, &ctx)); + TEST_ASSERT_TRUE(bus.post(TestEvent::FastTick, &payload, portMAX_DELAY)); + vTaskDelay(pdMS_TO_TICKS(30)); + TEST_ASSERT_TRUE(ctx.callbacks > callbacksAfterFirstRun); + + bus.deinit(); + bus.deinit(); + TEST_ASSERT_FALSE(bus.isInitialized()); } void setUp() { @@ -289,19 +291,19 @@ void tearDown() { } void setup() { - delay(2000); // Give the serial monitor time to settle - UNITY_BEGIN(); - RUN_TEST(test_subscription_cap_limits_handles); - RUN_TEST(test_payload_validator_rejects_unknown_payloads); - RUN_TEST(test_drop_oldest_policy_discards_backlog); - RUN_TEST(test_drop_newest_policy_discards_incoming_event); - RUN_TEST(test_pressure_callback_triggers_on_high_usage); - RUN_TEST(test_deinit_completes_when_queue_is_busy); - RUN_TEST(test_deinit_is_safe_before_init_and_idempotent); - RUN_TEST(test_reinit_clears_subscriptions_and_restores_bus); - UNITY_END(); + delay(2000); // Give the serial monitor time to settle + UNITY_BEGIN(); + RUN_TEST(test_subscription_cap_limits_handles); + RUN_TEST(test_payload_validator_rejects_unknown_payloads); + RUN_TEST(test_drop_oldest_policy_discards_backlog); + RUN_TEST(test_drop_newest_policy_discards_incoming_event); + RUN_TEST(test_pressure_callback_triggers_on_high_usage); + RUN_TEST(test_deinit_completes_when_queue_is_busy); + RUN_TEST(test_deinit_is_safe_before_init_and_idempotent); + RUN_TEST(test_reinit_clears_subscriptions_and_restores_bus); + UNITY_END(); } void loop() { - vTaskDelay(pdMS_TO_TICKS(1000)); + vTaskDelay(pdMS_TO_TICKS(1000)); }