diff --git a/CHANGELOG.md b/CHANGELOG.md index 93665f2..c1bd57b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file. - Disambiguated the README and `examples/basic_usage` subscription callback to avoid overload ambiguity on Arduino. - Worker task creation now uses native FreeRTOS `xTaskCreatePinnedToCore(...)` and keeps the same non-caps runtime path for broad ESP32 compatibility. - Hardened subscription-storage teardown/reset so allocator transitions are safe when toggling `usePSRAMBuffers` across lifecycles. +- Reworked `waitFor` to reuse persistent per-task/per-event waiter queues instead of creating/deleting a queue+subscription on every call, preventing high memory churn in tight loops. ## [1.0.0] - 2025-11-19 ### Added diff --git a/README.md b/README.md index dab77f2..68b710b 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ An asynchronous, FreeRTOS-native event bus for ESP32 projects. Producers post pa - Thread-safe and ISR-safe posting (ISRs can queue events without waking the worker unless needed). - Unlimited subscriptions per event with optional user data and one-shot semantics. - `std::function` callback support so you can bind private member methods or use capturing lambdas. -- Per-task `waitFor` helper implemented with short-lived queues so any task can await the next payload. +- Per-task/per-event `waitFor` helper with persistent waiter queues to avoid per-call heap churn in tight loops. - Queue overflow policies, pressure callbacks, and payload validation hooks for defensive firmware. - Optional `usePSRAMBuffers` toggle to route subscription/fan-out buffers and (when static FreeRTOS allocation is enabled) queue/mutex storage through `ESPBufferManager` with safe fallback. @@ -80,7 +80,7 @@ eventBus.subscribe(AppEvent::NetworkGotIP, ); ``` -Need to suspend the caller until a payload arrives? `waitFor` creates a one-shot subscription and blocks on a temporary queue: +Need to suspend the caller until a payload arrives? `waitFor` blocks on a reusable waiter queue owned by the calling task/event pair: ```cpp auto* payload = static_cast(eventBus.waitFor(AppEvent::NetworkGotIP, pdMS_TO_TICKS(1000))); @@ -97,6 +97,7 @@ Explore the sketches under `examples/`: ## Gotchas - The bus only stores the pointer you supply; keep payloads alive for as long as subscribers need them or use pools/ref-counted buffers. - `waitFor` refuses to run on the ESPEventBus worker task. Enable `INCLUDE_xTaskGetCurrentTaskHandle=1` (set by default on ESP-IDF/Arduino) so the guard can detect misuse. +- `waitFor` reuses one waiter queue per `(task, event)` pair. Multiple tasks can wait on the same event concurrently, but avoid overlapping waits for the same `(task, event)` pair. - `postFromISR` behaves like `post` but still runs callbacks on the worker. Long callbacks block the worker and delay other subscribers. - Overflow policies that drop events fire user callbacks in the posting context—keep those callbacks short and ISR-safe where applicable. @@ -108,7 +109,7 @@ Explore the sketches under `examples/`: - `EventBusSub subscribe(Id id, EventCallbackFn cb, void* userArg = nullptr, bool oneshot = false)` – register C-style callbacks; returns `0` on failure. - `EventBusSub subscribe(Id id, EventCallback cb, void* userArg = nullptr, bool oneshot = false)` – register `std::function` callbacks (bind/captures). - `void unsubscribe(EventBusSub subId)` – deactivate one subscription. -- `void* waitFor(Id id, TickType_t timeout = portMAX_DELAY)` – block the caller on a temporary queue and return the payload pointer or `nullptr` on timeout. +- `void* waitFor(Id id, TickType_t timeout = portMAX_DELAY)` – block the caller on a reusable task/event waiter queue and return the payload pointer or `nullptr` on timeout. `EventBusConfig` exposes guardrails so multiple components can safely share one bus: diff --git a/src/esp_eventbus/eventbus.cpp b/src/esp_eventbus/eventbus.cpp index 20a7ee4..14f6b83 100644 --- a/src/esp_eventbus/eventbus.cpp +++ b/src/esp_eventbus/eventbus.cpp @@ -27,6 +27,7 @@ bool ESPEventBus::init(const EventBusConfig& config) { stopEventPending_ = false; resetKernelStorage(); resetSubscriptions(config_.usePSRAMBuffers); + resetWaiters(config_.usePSRAMBuffers); if (!createKernelMutex()) { return false; @@ -57,6 +58,13 @@ bool ESPEventBus::init(const EventBusConfig& config) { void ESPEventBus::deinit() { stopTask(); + if (subMutex_ && xSemaphoreTake(subMutex_, portMAX_DELAY) == pdTRUE) { + clearWaiters(); + xSemaphoreGive(subMutex_); + } else { + clearWaiters(); + } + if (queue_) { vQueueDelete(queue_); queue_ = nullptr; @@ -69,6 +77,7 @@ void ESPEventBus::deinit() { resetKernelStorage(); resetSubscriptions(false); + resetWaiters(false); nextSubId_ = 0; stopEventPending_ = false; config_ = EventBusConfig{}; @@ -163,59 +172,43 @@ void ESPEventBus::unsubscribe(EventBusSub subId) { } void* ESPEventBus::waitFor(EventBusId id, TickType_t timeout) { - if (!queue_) { + if (!queue_ || !subMutex_) { return nullptr; } - if (task_ && currentTaskHandle() == task_) { + const TaskHandle_t callerTask = currentTaskHandle(); + if (!callerTask || (task_ && callerTask == task_)) { return nullptr; } QueueHandle_t responseQueue = nullptr; - void* responseQueueStorage = nullptr; - void* responseQueueControlStorage = nullptr; -#if defined(configSUPPORT_STATIC_ALLOCATION) && (configSUPPORT_STATIC_ALLOCATION == 1) - if (config_.usePSRAMBuffers) { - responseQueueStorage = allocateKernelStorage(sizeof(void*), true); - responseQueueControlStorage = allocateKernelStorage(sizeof(StaticQueue_t), true); - if (responseQueueStorage && responseQueueControlStorage) { - responseQueue = xQueueCreateStatic( - 1, - sizeof(void*), - static_cast(responseQueueStorage), - static_cast(responseQueueControlStorage)); - } - if (!responseQueue) { - freeKernelStorage(responseQueueStorage); - freeKernelStorage(responseQueueControlStorage); - responseQueueStorage = nullptr; - responseQueueControlStorage = nullptr; - } + if (xSemaphoreTake(subMutex_, portMAX_DELAY) != pdTRUE) { + return nullptr; } -#endif - if (!responseQueue) { - responseQueue = xQueueCreate(1, sizeof(void*)); + + WaiterContext* waiter = findWaiterLocked(callerTask, id); + if (!waiter) { + waiters_.push_back(WaiterContext{ callerTask, id, nullptr, nullptr, nullptr }); + waiter = &waiters_.back(); + if (!createWaiterQueue(*waiter)) { + waiters_.pop_back(); + xSemaphoreGive(subMutex_); + return nullptr; + } } + + responseQueue = waiter->queue; if (!responseQueue) { + xSemaphoreGive(subMutex_); return nullptr; } - EventBusSub sid = subscribe(id, &ESPEventBus::waiterCallback, responseQueue, true); - if (!sid) { - vQueueDelete(responseQueue); - freeKernelStorage(responseQueueStorage); - freeKernelStorage(responseQueueControlStorage); - return nullptr; - } + (void)xQueueReset(responseQueue); + xSemaphoreGive(subMutex_); void* payload = nullptr; BaseType_t res = xQueueReceive(responseQueue, &payload, timeout); - unsubscribe(sid); - vQueueDelete(responseQueue); - freeKernelStorage(responseQueueStorage); - freeKernelStorage(responseQueueControlStorage); - if (res != pdTRUE) { return nullptr; } @@ -276,6 +269,13 @@ void ESPEventBus::taskLoop() { compactSubscriptionsLocked(); } + for (auto& waiter : waiters_) { + if (waiter.eventId != ev.eventId || !waiter.queue) { + continue; + } + (void)xQueueSend(waiter.queue, &ev.payload, 0); + } + xSemaphoreGive(subMutex_); } @@ -329,14 +329,6 @@ void ESPEventBus::compactSubscriptionsLocked() { subs_.end()); } -void ESPEventBus::waiterCallback(void* payload, void* userArg) { - QueueHandle_t queue = reinterpret_cast(userArg); - if (!queue) { - return; - } - (void)xQueueSend(queue, &payload, 0); -} - bool ESPEventBus::enqueueFromTask(const QueuedEvent& ev, TickType_t timeout) { TickType_t waitTicks = timeout; if (config_.overflowPolicy != EventBusOverflowPolicy::Block) { @@ -542,6 +534,63 @@ void ESPEventBus::resetSubscriptions(bool usePSRAMBuffers) { new (&subs_) SubscriptionVector{ EventBusAllocator(usePSRAMBuffers) }; } +void ESPEventBus::clearWaiters() { + for (auto& waiter : waiters_) { + if (waiter.queue) { + void* nullPayload = nullptr; + (void)xQueueReset(waiter.queue); + (void)xQueueSend(waiter.queue, &nullPayload, 0); + vQueueDelete(waiter.queue); + waiter.queue = nullptr; + } + freeKernelStorage(waiter.queueStorage); + freeKernelStorage(waiter.queueControlStorage); + waiter.queueStorage = nullptr; + waiter.queueControlStorage = nullptr; + } + waiters_.clear(); +} + +void ESPEventBus::resetWaiters(bool usePSRAMBuffers) { + using WaiterVector = EventBusVector; + waiters_.~WaiterVector(); + new (&waiters_) WaiterVector{ EventBusAllocator(usePSRAMBuffers) }; +} + +ESPEventBus::WaiterContext* ESPEventBus::findWaiterLocked(TaskHandle_t ownerTask, EventBusId eventId) { + for (auto& waiter : waiters_) { + if (waiter.ownerTask == ownerTask && waiter.eventId == eventId) { + return &waiter; + } + } + return nullptr; +} + +bool ESPEventBus::createWaiterQueue(WaiterContext& waiter) { +#if defined(configSUPPORT_STATIC_ALLOCATION) && (configSUPPORT_STATIC_ALLOCATION == 1) + if (config_.usePSRAMBuffers) { + waiter.queueStorage = allocateKernelStorage(sizeof(void*), true); + waiter.queueControlStorage = allocateKernelStorage(sizeof(StaticQueue_t), true); + if (waiter.queueStorage && waiter.queueControlStorage) { + waiter.queue = xQueueCreateStatic( + 1, + sizeof(void*), + static_cast(waiter.queueStorage), + static_cast(waiter.queueControlStorage)); + } + if (waiter.queue) { + return true; + } + freeKernelStorage(waiter.queueStorage); + freeKernelStorage(waiter.queueControlStorage); + waiter.queueStorage = nullptr; + waiter.queueControlStorage = nullptr; + } +#endif + waiter.queue = xQueueCreate(1, sizeof(void*)); + return waiter.queue != nullptr; +} + void ESPEventBus::resetKernelStorage() { freeKernelStorage(mutexStorage_); freeKernelStorage(queueStorage_); diff --git a/src/esp_eventbus/eventbus.h b/src/esp_eventbus/eventbus.h index 04687e8..2a3f671 100644 --- a/src/esp_eventbus/eventbus.h +++ b/src/esp_eventbus/eventbus.h @@ -124,11 +124,22 @@ class ESPEventBus { bool stop = false; }; + struct WaiterContext { + TaskHandle_t ownerTask = nullptr; + EventBusId eventId = 0; + QueueHandle_t queue = nullptr; + void* queueStorage = nullptr; + void* queueControlStorage = nullptr; + }; + static void taskEntry(void* arg); void taskLoop(); void stopTask(); void compactSubscriptionsLocked(); - static void waiterCallback(void* payload, void* userArg); + void clearWaiters(); + void resetWaiters(bool usePSRAMBuffers); + WaiterContext* findWaiterLocked(TaskHandle_t ownerTask, EventBusId eventId); + bool createWaiterQueue(WaiterContext& waiter); bool enqueueFromTask(const QueuedEvent& ev, TickType_t timeout); bool enqueueFromISR(const QueuedEvent& ev, BaseType_t* higherPriorityTaskWoken); bool handleOverflowFromTask(const QueuedEvent& ev); @@ -151,6 +162,7 @@ class ESPEventBus { TaskHandle_t task_ = nullptr; SemaphoreHandle_t subMutex_ = nullptr; EventBusVector subs_; + EventBusVector waiters_; EventBusSub nextSubId_ = 0; EventBusConfig config_{}; bool running_ = false;