From b7f8d9c79dcb87b682e04b05e1becf5c1bf43952 Mon Sep 17 00:00:00 2001 From: Jean-Pierre Fortune Date: Wed, 8 Oct 2025 18:00:44 +0200 Subject: [PATCH 1/3] feat: add thread safety to ExecutorService and improve logging readability - Introduced `std::mutex` for thread-safe operations in `ExecutorService`. - Enhanced logging for enums by adding `operator<<` to format internal events, monitoring states, and plugin event types. --- .../service/ObservableLocalReaderAdapter.hpp | 37 ++++++++++++++++++ include/keyple/core/service/PluginEvent.hpp | 13 +++++-- .../core/service/cpp/ExecutorService.hpp | 6 +++ src/main/MonitoringState.cpp | 8 ++-- src/main/WaitForCardInsertionStateAdapter.cpp | 2 +- .../WaitForCardProcessingStateAdapter.cpp | 2 +- src/main/WaitForCardRemovalStateAdapter.cpp | 2 +- src/main/WaitForStartDetectStateAdapter.cpp | 2 +- src/main/cpp/ExecutorService.cpp | 39 +++++++++++++++++-- 9 files changed, 97 insertions(+), 14 deletions(-) diff --git a/include/keyple/core/service/ObservableLocalReaderAdapter.hpp b/include/keyple/core/service/ObservableLocalReaderAdapter.hpp index 883e6dd..defbe6b 100644 --- a/include/keyple/core/service/ObservableLocalReaderAdapter.hpp +++ b/include/keyple/core/service/ObservableLocalReaderAdapter.hpp @@ -469,6 +469,43 @@ class KEYPLESERVICE_API ObservableLocalReaderAdapter cardSelectionResponses); }; +/** + * Operator << for InternalEvent enum to enable readable logging. + * + * @param os The output stream. + * @param event The internal event. + * @return The output stream. + */ +inline std::ostream& +operator<<(std::ostream& os, + const ObservableLocalReaderAdapter::InternalEvent event) +{ + switch (event) { + case ObservableLocalReaderAdapter::InternalEvent::CARD_INSERTED: + os << "CARD_INSERTED"; + break; + case ObservableLocalReaderAdapter::InternalEvent::CARD_REMOVED: + os << "CARD_REMOVED"; + break; + case ObservableLocalReaderAdapter::InternalEvent::CARD_PROCESSED: + os << "CARD_PROCESSED"; + break; + case ObservableLocalReaderAdapter::InternalEvent::START_DETECT: + os << "START_DETECT"; + break; + case ObservableLocalReaderAdapter::InternalEvent::STOP_DETECT: + os << "STOP_DETECT"; + break; + case ObservableLocalReaderAdapter::InternalEvent::TIME_OUT: + os << "TIME_OUT"; + break; + default: + os << "UNKNOWN_EVENT(" << static_cast(event) << ")"; + break; + } + return os; +} + } /* namespace service */ } /* namespace core */ } /* namespace keyple */ diff --git a/include/keyple/core/service/PluginEvent.hpp b/include/keyple/core/service/PluginEvent.hpp index 6a2bcee..ae09794 100644 --- a/include/keyple/core/service/PluginEvent.hpp +++ b/include/keyple/core/service/PluginEvent.hpp @@ -93,20 +93,27 @@ class KEYPLESERVICE_API PluginEvent { virtual Type getType() const = 0; /** + * Operator << for PluginEvent::Type enum to enable readable logging. * + * @param os The output stream. + * @param t The event type. + * @return The output stream. */ friend std::ostream& operator<<(std::ostream& os, const Type t) { switch (t) { case Type::READER_CONNECTED: - os << "TYPE = READER_CONNECTED"; + os << "READER_CONNECTED"; break; case Type::READER_DISCONNECTED: - os << "TYPE = READER_DISCONNECTED"; + os << "READER_DISCONNECTED"; break; case Type::UNAVAILABLE: - os << "TYPE = UNAVAILABLE"; + os << "UNAVAILABLE"; + break; + default: + os << "UNKNOWN_TYPE(" << static_cast(t) << ")"; break; } diff --git a/include/keyple/core/service/cpp/ExecutorService.hpp b/include/keyple/core/service/cpp/ExecutorService.hpp index eae19b1..af420be 100644 --- a/include/keyple/core/service/cpp/ExecutorService.hpp +++ b/include/keyple/core/service/cpp/ExecutorService.hpp @@ -14,6 +14,7 @@ #pragma once #include +#include #include #include #include @@ -71,6 +72,11 @@ class KEYPLESERVICE_API ExecutorService final { */ std::vector> mPool; + /** + * + */ + std::mutex mMutex; + /** * */ diff --git a/src/main/MonitoringState.cpp b/src/main/MonitoringState.cpp index 3d9cf70..c5f0659 100644 --- a/src/main/MonitoringState.cpp +++ b/src/main/MonitoringState.cpp @@ -22,16 +22,16 @@ operator<<(std::ostream& os, const MonitoringState ms) { switch (ms) { case MonitoringState::WAIT_FOR_START_DETECTION: - os << "MONITORING_STATE = WAIT_FOR_START_DETECTION"; + os << "WAIT_FOR_START_DETECTION"; break; case MonitoringState::WAIT_FOR_CARD_PROCESSING: - os << "MONITORING_STATE = WAIT_FOR_CARD_PROCESSING"; + os << "WAIT_FOR_CARD_PROCESSING"; break; case MonitoringState::WAIT_FOR_CARD_REMOVAL: - os << "MONITORING_STATE = WAIT_FOR_CARD_REMOVAL"; + os << "WAIT_FOR_CARD_REMOVAL"; break; case MonitoringState::WAIT_FOR_CARD_INSERTION: - os << "MONITORING_STATE = WAIT_FOR_CARD_INSERTION"; + os << "WAIT_FOR_CARD_INSERTION"; break; } diff --git a/src/main/WaitForCardInsertionStateAdapter.cpp b/src/main/WaitForCardInsertionStateAdapter.cpp index bd8dd05..bfd57fb 100644 --- a/src/main/WaitForCardInsertionStateAdapter.cpp +++ b/src/main/WaitForCardInsertionStateAdapter.cpp @@ -42,8 +42,8 @@ WaitForCardInsertionStateAdapter::onEvent(const InternalEvent event) { mLogger->trace( "Internal event [%] received for reader [%] in current state [%]\n", - getReader()->getName(), event, + getReader()->getName(), getMonitoringState()); /* Process InternalEvent */ diff --git a/src/main/WaitForCardProcessingStateAdapter.cpp b/src/main/WaitForCardProcessingStateAdapter.cpp index 3f25677..2c697e3 100644 --- a/src/main/WaitForCardProcessingStateAdapter.cpp +++ b/src/main/WaitForCardProcessingStateAdapter.cpp @@ -44,8 +44,8 @@ WaitForCardProcessingStateAdapter::onEvent(const InternalEvent event) { mLogger->trace( "Internal event [%] received for reader [%] in current state [%]\n", - getReader()->getName(), event, + getReader()->getName(), getMonitoringState()); /* Process InternalEvent */ diff --git a/src/main/WaitForCardRemovalStateAdapter.cpp b/src/main/WaitForCardRemovalStateAdapter.cpp index d326108..8cbf5ad 100644 --- a/src/main/WaitForCardRemovalStateAdapter.cpp +++ b/src/main/WaitForCardRemovalStateAdapter.cpp @@ -42,8 +42,8 @@ WaitForCardRemovalStateAdapter::onEvent(const InternalEvent event) { mLogger->trace( "Internal event [%] received for reader [%] in current state [%]\n", - getReader()->getName(), event, + getReader()->getName(), getMonitoringState()); /* Process InternalEvent */ diff --git a/src/main/WaitForStartDetectStateAdapter.cpp b/src/main/WaitForStartDetectStateAdapter.cpp index 2f7826c..41b268a 100644 --- a/src/main/WaitForStartDetectStateAdapter.cpp +++ b/src/main/WaitForStartDetectStateAdapter.cpp @@ -42,8 +42,8 @@ WaitForStartDetectStateAdapter::onEvent(const InternalEvent event) { mLogger->trace( "Internal event [%] received for reader [%] in current state [%]\n", - getReader()->getName(), event, + getReader()->getName(), getMonitoringState()); /* Process InternalEvent */ diff --git a/src/main/cpp/ExecutorService.cpp b/src/main/cpp/ExecutorService.cpp index 721ab80..3be98be 100644 --- a/src/main/cpp/ExecutorService.cpp +++ b/src/main/cpp/ExecutorService.cpp @@ -48,13 +48,26 @@ ExecutorService::run() /* Emulates a SingleThreadExecutor (e.g. only one thread at a time) */ while (mRunning) { - if (mPool.size()) { + std::shared_ptr job; + + { + std::lock_guard lock(mMutex); + if (mPool.size() > 0) { + job = mPool[0]; + } + } + + if (job != nullptr) { /* Start first service and wait until completion */ - std::shared_ptr job = mPool[0]; job->run(); /* Remove from vector */ - mPool.erase(mPool.begin()); + { + std::lock_guard lock(mMutex); + mPool.erase(mPool.begin()); + } + + job = nullptr; } Thread::sleep(100); @@ -66,12 +79,32 @@ ExecutorService::run() void ExecutorService::execute(std::shared_ptr job) { + std::lock_guard lock(mMutex); mPool.push_back(job); } std::shared_ptr ExecutorService::submit(std::shared_ptr job) { + std::lock_guard lock(mMutex); + + /* + * Limit the queue to maximum 2 jobs: + * - 1 job currently executing (mPool[0]) + * - 1 job waiting to be executed (mPool[1]) + * + * When submitting a new job: + * - If pool is empty or has only 1 job → add the new job + * - If pool has 2+ jobs → replace all waiting jobs with the new one + * + * This prevents accumulation of obsolete jobs when state transitions + * happen faster than job execution. + */ + if (mPool.size() >= 2) { + /* Remove all waiting jobs (keep only the running one at index 0) */ + mPool.erase(mPool.begin() + 1, mPool.end()); + } + mPool.push_back(job); return mPool.back(); From c1df1e81138e4188eef1986866a6db3476bcb1bd Mon Sep 17 00:00:00 2001 From: Jean-Pierre Fortune Date: Thu, 9 Oct 2025 09:39:17 +0200 Subject: [PATCH 2/3] refactor: simplify job execution and cancellation logic in ExecutorService - Removed redundant mutex locks in `execute` and `submit` methods. - Optimized job execution loop to skip canceled jobs and directly access the pool. --- src/main/cpp/ExecutorService.cpp | 42 +++++--------------------------- src/main/cpp/Job.cpp | 4 +-- 2 files changed, 8 insertions(+), 38 deletions(-) diff --git a/src/main/cpp/ExecutorService.cpp b/src/main/cpp/ExecutorService.cpp index 3be98be..d611fc7 100644 --- a/src/main/cpp/ExecutorService.cpp +++ b/src/main/cpp/ExecutorService.cpp @@ -48,26 +48,16 @@ ExecutorService::run() /* Emulates a SingleThreadExecutor (e.g. only one thread at a time) */ while (mRunning) { - std::shared_ptr job; - - { - std::lock_guard lock(mMutex); - if (mPool.size() > 0) { - job = mPool[0]; - } - } - - if (job != nullptr) { + if (mPool.size()) { /* Start first service and wait until completion */ - job->run(); + std::shared_ptr job = mPool[0]; - /* Remove from vector */ - { - std::lock_guard lock(mMutex); - mPool.erase(mPool.begin()); + if(!job->isCancelled()) { + job->run(); } - job = nullptr; + /* Remove from vector */ + mPool.erase(mPool.begin()); } Thread::sleep(100); @@ -79,32 +69,12 @@ ExecutorService::run() void ExecutorService::execute(std::shared_ptr job) { - std::lock_guard lock(mMutex); mPool.push_back(job); } std::shared_ptr ExecutorService::submit(std::shared_ptr job) { - std::lock_guard lock(mMutex); - - /* - * Limit the queue to maximum 2 jobs: - * - 1 job currently executing (mPool[0]) - * - 1 job waiting to be executed (mPool[1]) - * - * When submitting a new job: - * - If pool is empty or has only 1 job → add the new job - * - If pool has 2+ jobs → replace all waiting jobs with the new one - * - * This prevents accumulation of obsolete jobs when state transitions - * happen faster than job execution. - */ - if (mPool.size() >= 2) { - /* Remove all waiting jobs (keep only the running one at index 0) */ - mPool.erase(mPool.begin() + 1, mPool.end()); - } - mPool.push_back(job); return mPool.back(); diff --git a/src/main/cpp/Job.cpp b/src/main/cpp/Job.cpp index da2349a..de2958f 100644 --- a/src/main/cpp/Job.cpp +++ b/src/main/cpp/Job.cpp @@ -39,12 +39,12 @@ Job::cancel(const bool mayInterruptIfRunning) "Unsupported value for mayInterruptIfRunning (true)"); } + mCancelled = true; + if (!isAlive()) { return false; } - mCancelled = true; - return true; } From bd4882f7e707599ba2631c48d0da9b6605bbcc13 Mon Sep 17 00:00:00 2001 From: Jean-Pierre Fortune Date: Thu, 9 Oct 2025 09:54:47 +0200 Subject: [PATCH 3/3] refactor: remove unused mutex from ExecutorService --- include/keyple/core/service/cpp/ExecutorService.hpp | 6 ------ 1 file changed, 6 deletions(-) diff --git a/include/keyple/core/service/cpp/ExecutorService.hpp b/include/keyple/core/service/cpp/ExecutorService.hpp index af420be..eae19b1 100644 --- a/include/keyple/core/service/cpp/ExecutorService.hpp +++ b/include/keyple/core/service/cpp/ExecutorService.hpp @@ -14,7 +14,6 @@ #pragma once #include -#include #include #include #include @@ -72,11 +71,6 @@ class KEYPLESERVICE_API ExecutorService final { */ std::vector> mPool; - /** - * - */ - std::mutex mMutex; - /** * */