Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file.
- Disambiguated the README and `examples/basic_usage` subscription callback to avoid overload ambiguity on Arduino.
- Worker task creation now uses native FreeRTOS `xTaskCreatePinnedToCore(...)` and keeps the same non-caps runtime path for broad ESP32 compatibility.
- Hardened subscription-storage teardown/reset so allocator transitions are safe when toggling `usePSRAMBuffers` across lifecycles.
- Reworked `waitFor` to reuse persistent per-task/per-event waiter queues instead of creating/deleting a queue+subscription on every call, preventing high memory churn in tight loops.

## [1.0.0] - 2025-11-19
### Added
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ An asynchronous, FreeRTOS-native event bus for ESP32 projects. Producers post pa
- Thread-safe and ISR-safe posting (ISRs can queue events without waking the worker unless needed).
- Unlimited subscriptions per event with optional user data and one-shot semantics.
- `std::function` callback support so you can bind private member methods or use capturing lambdas.
- Per-task `waitFor` helper implemented with short-lived queues so any task can await the next payload.
- Per-task/per-event `waitFor` helper with persistent waiter queues to avoid per-call heap churn in tight loops.
- Queue overflow policies, pressure callbacks, and payload validation hooks for defensive firmware.
- Optional `usePSRAMBuffers` toggle to route subscription/fan-out buffers and (when static FreeRTOS allocation is enabled) queue/mutex storage through `ESPBufferManager` with safe fallback.

Expand Down Expand Up @@ -80,7 +80,7 @@ eventBus.subscribe(AppEvent::NetworkGotIP,
);
```

Need to suspend the caller until a payload arrives? `waitFor` creates a one-shot subscription and blocks on a temporary queue:
Need to suspend the caller until a payload arrives? `waitFor` blocks on a reusable waiter queue owned by the calling task/event pair:

```cpp
auto* payload = static_cast<NetworkGotIpPayload*>(eventBus.waitFor(AppEvent::NetworkGotIP, pdMS_TO_TICKS(1000)));
Expand All @@ -97,6 +97,7 @@ Explore the sketches under `examples/`:
## Gotchas
- The bus only stores the pointer you supply; keep payloads alive for as long as subscribers need them or use pools/ref-counted buffers.
- `waitFor` refuses to run on the ESPEventBus worker task. Enable `INCLUDE_xTaskGetCurrentTaskHandle=1` (set by default on ESP-IDF/Arduino) so the guard can detect misuse.
- `waitFor` reuses one waiter queue per `(task, event)` pair. Multiple tasks can wait on the same event concurrently, but avoid overlapping waits for the same `(task, event)` pair.
- `postFromISR` behaves like `post` but still runs callbacks on the worker. Long callbacks block the worker and delay other subscribers.
- Overflow policies that drop events fire user callbacks in the posting context—keep those callbacks short and ISR-safe where applicable.

Expand All @@ -108,7 +109,7 @@ Explore the sketches under `examples/`:
- `EventBusSub subscribe(Id id, EventCallbackFn cb, void* userArg = nullptr, bool oneshot = false)` – register C-style callbacks; returns `0` on failure.
- `EventBusSub subscribe(Id id, EventCallback cb, void* userArg = nullptr, bool oneshot = false)` – register `std::function` callbacks (bind/captures).
- `void unsubscribe(EventBusSub subId)` – deactivate one subscription.
- `void* waitFor(Id id, TickType_t timeout = portMAX_DELAY)` – block the caller on a temporary queue and return the payload pointer or `nullptr` on timeout.
- `void* waitFor(Id id, TickType_t timeout = portMAX_DELAY)` – block the caller on a reusable task/event waiter queue and return the payload pointer or `nullptr` on timeout.

`EventBusConfig` exposes guardrails so multiple components can safely share one bus:

Expand Down
137 changes: 93 additions & 44 deletions src/esp_eventbus/eventbus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ bool ESPEventBus::init(const EventBusConfig& config) {
stopEventPending_ = false;
resetKernelStorage();
resetSubscriptions(config_.usePSRAMBuffers);
resetWaiters(config_.usePSRAMBuffers);

if (!createKernelMutex()) {
return false;
Expand Down Expand Up @@ -57,6 +58,13 @@ bool ESPEventBus::init(const EventBusConfig& config) {
void ESPEventBus::deinit() {
stopTask();

if (subMutex_ && xSemaphoreTake(subMutex_, portMAX_DELAY) == pdTRUE) {
clearWaiters();
xSemaphoreGive(subMutex_);
} else {
clearWaiters();
}

if (queue_) {
vQueueDelete(queue_);
queue_ = nullptr;
Expand All @@ -69,6 +77,7 @@ void ESPEventBus::deinit() {

resetKernelStorage();
resetSubscriptions(false);
resetWaiters(false);
nextSubId_ = 0;
stopEventPending_ = false;
config_ = EventBusConfig{};
Expand Down Expand Up @@ -163,59 +172,43 @@ void ESPEventBus::unsubscribe(EventBusSub subId) {
}

void* ESPEventBus::waitFor(EventBusId id, TickType_t timeout) {
if (!queue_) {
if (!queue_ || !subMutex_) {
return nullptr;
}

if (task_ && currentTaskHandle() == task_) {
const TaskHandle_t callerTask = currentTaskHandle();
if (!callerTask || (task_ && callerTask == task_)) {
return nullptr;
}

QueueHandle_t responseQueue = nullptr;
void* responseQueueStorage = nullptr;
void* responseQueueControlStorage = nullptr;
#if defined(configSUPPORT_STATIC_ALLOCATION) && (configSUPPORT_STATIC_ALLOCATION == 1)
if (config_.usePSRAMBuffers) {
responseQueueStorage = allocateKernelStorage(sizeof(void*), true);
responseQueueControlStorage = allocateKernelStorage(sizeof(StaticQueue_t), true);
if (responseQueueStorage && responseQueueControlStorage) {
responseQueue = xQueueCreateStatic(
1,
sizeof(void*),
static_cast<uint8_t*>(responseQueueStorage),
static_cast<StaticQueue_t*>(responseQueueControlStorage));
}
if (!responseQueue) {
freeKernelStorage(responseQueueStorage);
freeKernelStorage(responseQueueControlStorage);
responseQueueStorage = nullptr;
responseQueueControlStorage = nullptr;
}
if (xSemaphoreTake(subMutex_, portMAX_DELAY) != pdTRUE) {
return nullptr;
}
#endif
if (!responseQueue) {
responseQueue = xQueueCreate(1, sizeof(void*));

WaiterContext* waiter = findWaiterLocked(callerTask, id);
if (!waiter) {
waiters_.push_back(WaiterContext{ callerTask, id, nullptr, nullptr, nullptr });
waiter = &waiters_.back();
if (!createWaiterQueue(*waiter)) {
waiters_.pop_back();
xSemaphoreGive(subMutex_);
return nullptr;
}
}

responseQueue = waiter->queue;
if (!responseQueue) {
xSemaphoreGive(subMutex_);
return nullptr;
}

EventBusSub sid = subscribe(id, &ESPEventBus::waiterCallback, responseQueue, true);
if (!sid) {
vQueueDelete(responseQueue);
freeKernelStorage(responseQueueStorage);
freeKernelStorage(responseQueueControlStorage);
return nullptr;
}
(void)xQueueReset(responseQueue);
xSemaphoreGive(subMutex_);

void* payload = nullptr;
BaseType_t res = xQueueReceive(responseQueue, &payload, timeout);

unsubscribe(sid);
vQueueDelete(responseQueue);
freeKernelStorage(responseQueueStorage);
freeKernelStorage(responseQueueControlStorage);

if (res != pdTRUE) {
return nullptr;
}
Expand Down Expand Up @@ -276,6 +269,13 @@ void ESPEventBus::taskLoop() {
compactSubscriptionsLocked();
}

for (auto& waiter : waiters_) {
if (waiter.eventId != ev.eventId || !waiter.queue) {
continue;
}
(void)xQueueSend(waiter.queue, &ev.payload, 0);
}

xSemaphoreGive(subMutex_);
}

Expand Down Expand Up @@ -329,14 +329,6 @@ void ESPEventBus::compactSubscriptionsLocked() {
subs_.end());
}

void ESPEventBus::waiterCallback(void* payload, void* userArg) {
QueueHandle_t queue = reinterpret_cast<QueueHandle_t>(userArg);
if (!queue) {
return;
}
(void)xQueueSend(queue, &payload, 0);
}

bool ESPEventBus::enqueueFromTask(const QueuedEvent& ev, TickType_t timeout) {
TickType_t waitTicks = timeout;
if (config_.overflowPolicy != EventBusOverflowPolicy::Block) {
Expand Down Expand Up @@ -542,6 +534,63 @@ void ESPEventBus::resetSubscriptions(bool usePSRAMBuffers) {
new (&subs_) SubscriptionVector{ EventBusAllocator<Subscription>(usePSRAMBuffers) };
}

void ESPEventBus::clearWaiters() {
for (auto& waiter : waiters_) {
if (waiter.queue) {
void* nullPayload = nullptr;
(void)xQueueReset(waiter.queue);
(void)xQueueSend(waiter.queue, &nullPayload, 0);
vQueueDelete(waiter.queue);
waiter.queue = nullptr;
}
freeKernelStorage(waiter.queueStorage);
freeKernelStorage(waiter.queueControlStorage);
waiter.queueStorage = nullptr;
waiter.queueControlStorage = nullptr;
}
waiters_.clear();
}

void ESPEventBus::resetWaiters(bool usePSRAMBuffers) {
using WaiterVector = EventBusVector<WaiterContext>;
waiters_.~WaiterVector();
new (&waiters_) WaiterVector{ EventBusAllocator<WaiterContext>(usePSRAMBuffers) };
}

ESPEventBus::WaiterContext* ESPEventBus::findWaiterLocked(TaskHandle_t ownerTask, EventBusId eventId) {
for (auto& waiter : waiters_) {
if (waiter.ownerTask == ownerTask && waiter.eventId == eventId) {
return &waiter;
}
}
return nullptr;
}

bool ESPEventBus::createWaiterQueue(WaiterContext& waiter) {
#if defined(configSUPPORT_STATIC_ALLOCATION) && (configSUPPORT_STATIC_ALLOCATION == 1)
if (config_.usePSRAMBuffers) {
waiter.queueStorage = allocateKernelStorage(sizeof(void*), true);
waiter.queueControlStorage = allocateKernelStorage(sizeof(StaticQueue_t), true);
if (waiter.queueStorage && waiter.queueControlStorage) {
waiter.queue = xQueueCreateStatic(
1,
sizeof(void*),
static_cast<uint8_t*>(waiter.queueStorage),
static_cast<StaticQueue_t*>(waiter.queueControlStorage));
}
if (waiter.queue) {
return true;
}
freeKernelStorage(waiter.queueStorage);
freeKernelStorage(waiter.queueControlStorage);
waiter.queueStorage = nullptr;
waiter.queueControlStorage = nullptr;
}
#endif
waiter.queue = xQueueCreate(1, sizeof(void*));
return waiter.queue != nullptr;
}

void ESPEventBus::resetKernelStorage() {
freeKernelStorage(mutexStorage_);
freeKernelStorage(queueStorage_);
Expand Down
14 changes: 13 additions & 1 deletion src/esp_eventbus/eventbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,22 @@ class ESPEventBus {
bool stop = false;
};

struct WaiterContext {
TaskHandle_t ownerTask = nullptr;
EventBusId eventId = 0;
QueueHandle_t queue = nullptr;
void* queueStorage = nullptr;
void* queueControlStorage = nullptr;
};

static void taskEntry(void* arg);
void taskLoop();
void stopTask();
void compactSubscriptionsLocked();
static void waiterCallback(void* payload, void* userArg);
void clearWaiters();
void resetWaiters(bool usePSRAMBuffers);
WaiterContext* findWaiterLocked(TaskHandle_t ownerTask, EventBusId eventId);
bool createWaiterQueue(WaiterContext& waiter);
bool enqueueFromTask(const QueuedEvent& ev, TickType_t timeout);
bool enqueueFromISR(const QueuedEvent& ev, BaseType_t* higherPriorityTaskWoken);
bool handleOverflowFromTask(const QueuedEvent& ev);
Expand All @@ -151,6 +162,7 @@ class ESPEventBus {
TaskHandle_t task_ = nullptr;
SemaphoreHandle_t subMutex_ = nullptr;
EventBusVector<Subscription> subs_;
EventBusVector<WaiterContext> waiters_;
EventBusSub nextSubId_ = 0;
EventBusConfig config_{};
bool running_ = false;
Expand Down