From e5d3018b783b5528277e70f4a54d88664567f1e2 Mon Sep 17 00:00:00 2001 From: MaciejKaszynski <161459353+MaciejKaszynski@users.noreply.github.com> Date: Tue, 28 Apr 2026 15:13:37 +0100 Subject: [PATCH 01/13] Adding new mpmc queue * Adding new queue --- src/launch_manager_daemon/BUILD | 2 + src/launch_manager_daemon/common/BUILD | 12 +- .../saf/supervision/ProcessStateTracker.cpp | 2 +- .../src/concurrency/BUILD | 76 +++++ .../src/concurrency/helgrind_annotations.hpp | 29 ++ .../src/concurrency/mpmc_concurrent_queue.hpp | 253 +++++++++++++++++ .../mpmc_concurrent_queue_test.cpp | 260 ++++++++++++++++++ .../src/process_group_manager/graph.cpp | 9 +- .../src/process_group_manager/jobqueue.cpp | 90 ------ .../src/process_group_manager/jobqueue.hpp | 108 -------- .../processgroupmanager.cpp | 24 +- .../processgroupmanager.hpp | 21 +- .../process_group_manager/processinfonode.cpp | 8 +- .../process_group_manager/workerthread.cpp | 24 +- .../process_group_manager/workerthread.hpp | 48 ++-- 15 files changed, 683 insertions(+), 283 deletions(-) create mode 100644 src/launch_manager_daemon/src/concurrency/BUILD create mode 100644 src/launch_manager_daemon/src/concurrency/helgrind_annotations.hpp create mode 100644 src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp create mode 100644 src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp delete mode 100644 src/launch_manager_daemon/src/process_group_manager/jobqueue.cpp delete mode 100644 src/launch_manager_daemon/src/process_group_manager/jobqueue.hpp diff --git a/src/launch_manager_daemon/BUILD b/src/launch_manager_daemon/BUILD index b6b2e6f4..9d460fad 100644 --- a/src/launch_manager_daemon/BUILD +++ b/src/launch_manager_daemon/BUILD @@ -51,6 +51,7 @@ cc_binary_with_common_opts( "//src/launch_manager_daemon/common:log", "//src/launch_manager_daemon/health_monitor_lib:health_monitor", "//src/launch_manager_daemon/process_state_client_lib:process_state_client", + "//src/launch_manager_daemon/src/concurrency:mpmc_concurrent_queue", "@flatbuffers", "@score_baselibs//score/mw/log", "@score_baselibs//score/result", @@ -83,6 +84,7 @@ cc_library( "//src/launch_manager_daemon/common:lifecycle_error", "//src/launch_manager_daemon/common:log", "//src/launch_manager_daemon/health_monitor_lib:health_monitor", + "//src/launch_manager_daemon/src/concurrency:mpmc_concurrent_queue", "@flatbuffers", "@score_baselibs//score/mw/log", "@score_baselibs//score/result", diff --git a/src/launch_manager_daemon/common/BUILD b/src/launch_manager_daemon/common/BUILD index d65e43a9..3310be47 100644 --- a/src/launch_manager_daemon/common/BUILD +++ b/src/launch_manager_daemon/common/BUILD @@ -57,21 +57,15 @@ cc_library( cc_library( name = "osal", - srcs = select({ + srcs = glob(["src/internal/osal/posix/*.cpp"]) + select({ "@platforms//os:qnx": glob( [ - "src/internal/osal/**/*.cpp", - ], - exclude = [ - "src/internal/osal/linux/**", + "src/internal/osal/qnx/*.cpp", ], ), "@platforms//os:linux": glob( [ - "src/internal/osal/**/*.cpp", - ], - exclude = [ - "src/internal/osal/qnx/**", + "src/internal/osal/linux/*.cpp", ], ), }), diff --git a/src/launch_manager_daemon/health_monitor_lib/src/score/lcm/saf/supervision/ProcessStateTracker.cpp b/src/launch_manager_daemon/health_monitor_lib/src/score/lcm/saf/supervision/ProcessStateTracker.cpp index 8f159eb5..dbd24fb7 100644 --- a/src/launch_manager_daemon/health_monitor_lib/src/score/lcm/saf/supervision/ProcessStateTracker.cpp +++ b/src/launch_manager_daemon/health_monitor_lib/src/score/lcm/saf/supervision/ProcessStateTracker.cpp @@ -158,7 +158,7 @@ void ProcessStateTracker::updateProcessState(const ifexm::ProcessState& f_proces } // coverity[autosar_cpp14_a8_5_2_violation:FALSE] type auto shall not be initialized with {} AUTOSAR.8.5.3A - const auto& it = std::find(k_refProcesses.begin(), k_refProcesses.end(), &f_processIdentifier_r); + const auto it = std::find(k_refProcesses.begin(), k_refProcesses.end(), &f_processIdentifier_r); if (it != k_refProcesses.end()) { const bool processActive{isProcessActive(f_state)}; diff --git a/src/launch_manager_daemon/src/concurrency/BUILD b/src/launch_manager_daemon/src/concurrency/BUILD new file mode 100644 index 00000000..b68f6b38 --- /dev/null +++ b/src/launch_manager_daemon/src/concurrency/BUILD @@ -0,0 +1,76 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +load("@rules_cc//cc:defs.bzl", "cc_test") +load("//config:common_cc.bzl", "cc_library_with_common_opts") + +cc_library( + name = "helgrind_annotations", + hdrs = ["helgrind_annotations.hpp"], + strip_include_prefix = "//src/launch_manager_daemon/src", + visibility = ["//src:__subpackages__"], +) + +cc_library( + name = "mpmc_concurrent_queue", + hdrs = ["mpmc_concurrent_queue.hpp"], + strip_include_prefix = "//src/launch_manager_daemon/src", + visibility = ["//src:__subpackages__"], + deps = [ + ":helgrind_annotations", + "//src/launch_manager_daemon/common:osal", + ], +) + +cc_test( + name = "mpmc_concurrent_queue_test", + srcs = ["mpmc_concurrent_queue_test.cpp"], + visibility = ["//tests:__subpackages__"], + deps = [ + ":mpmc_concurrent_queue", + "@googletest//:gtest_main", + ], +) + +# Run with: +# bazel test --run_under="valgrind --tool=helgrind" //src/launch_manager_daemon/src/concurrency:mpmc_concurrent_queue_helgrind_test --config=host --test_output=all +# Note: This is using your host pacakges so need to install valgrind +cc_test( + name = "mpmc_concurrent_queue_helgrind_test", + srcs = ["mpmc_concurrent_queue_test.cpp"], + tags = [ + "helgrind", + "manual", + ], + visibility = ["//tests:__subpackages__"], + deps = [ + ":mpmc_concurrent_queue", + "@googletest//:gtest_main", + ], +) + +cc_test( + name = "mpmc_concurrent_queue_tsan_test", + srcs = ["mpmc_concurrent_queue_test.cpp"], + copts = [ + "-fsanitize=thread", + "-O0", + "-g", + ], + linkopts = ["-fsanitize=thread"], + tags = ["tsan"], + visibility = ["//tests:__subpackages__"], + deps = [ + ":mpmc_concurrent_queue", + "@googletest//:gtest_main", + ], +) diff --git a/src/launch_manager_daemon/src/concurrency/helgrind_annotations.hpp b/src/launch_manager_daemon/src/concurrency/helgrind_annotations.hpp new file mode 100644 index 00000000..4bc0c537 --- /dev/null +++ b/src/launch_manager_daemon/src/concurrency/helgrind_annotations.hpp @@ -0,0 +1,29 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef HELGRIND_ANNOTATIONS_HPP_INCLUDED +#define HELGRIND_ANNOTATIONS_HPP_INCLUDED + +// When built with --config=host the system compiler finds valgrind/helgrind.h +// in its default include paths and the real annotations are active. Under any +// other config the header is absent and the macros expand to no-ops. +#if __has_include() +#include +#else +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define ANNOTATE_HAPPENS_BEFORE(obj) do {} while (false) +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define ANNOTATE_HAPPENS_AFTER(obj) do {} while (false) +#endif + +#endif // HELGRIND_ANNOTATIONS_HPP_INCLUDED diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp new file mode 100644 index 00000000..ff2a2767 --- /dev/null +++ b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp @@ -0,0 +1,253 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef MPMC_CONCURRENT_QUEUE_HPP_INCLUDED +#define MPMC_CONCURRENT_QUEUE_HPP_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace score::lcm::internal +{ +// this is based on https://github.com/rigtorp/MPMCQueue + +/// @brief Lock-free MPMC ring buffer with semaphore-based blocking. +/// Producers and consumers each atomically claim independent slots via +/// fetch_add, so multiple producers and multiple consumers run concurrently. +/// @warning T must be default-constructible. +template +class MPMCConcurrentQueue +{ + static_assert(Capacity > 0U, "Capacity must be at least 1"); + + static_assert(Capacity <= std::numeric_limits::max(), + "Capacity exceeds uint32_t range used by the semaphore"); + + static_assert(std::is_default_constructible_v, + "T must be default-constructible for in-place slot storage"); + + // optimization to work out the turns + static_assert((Capacity & (Capacity - 1U)) == 0U, "Capacity must be a power of 2"); + + /// @brief Cache line size used to prevent false sharing between m_head, + /// m_tail, and m_stopped. + /// Defined as a literal rather than std::hardware_destructive_interference_size to avoid + /// -Winterference-size: that value is not ABI-stable across compiler versions and tuning flags. + /// 64 bytes is the cache line size on all targeted x86_64 and aarch64 platforms. + constexpr static std::size_t CacheLineSize = 64U; + + /// @brief A single ring-buffer entry pairing a sequenced turn counter with + /// its stored item. + /// + /// @detail The slots are not aligned to cachelines as in our usecase we + /// will mostly store pointers here, so we don't want to increase + /// the memory usage that much. + struct Slot + { + /// @brief Increasing counter that alternates ownership + /// between producers (even) and consumers (odd). + std::atomic turn{0}; + + /// @brief The stored item. + T item{}; + }; + + public: + MPMCConcurrentQueue() + { + static_cast(m_items.init(0U, false)); + static_cast(m_spaces.init(static_cast(Capacity), false)); + } + + ~MPMCConcurrentQueue() + { + static_cast(m_spaces.deinit()); + static_cast(m_items.deinit()); + } + + MPMCConcurrentQueue(const MPMCConcurrentQueue&) = delete; + MPMCConcurrentQueue& operator=(const MPMCConcurrentQueue&) = delete; + MPMCConcurrentQueue(MPMCConcurrentQueue&&) = delete; + MPMCConcurrentQueue& operator=(MPMCConcurrentQueue&&) = delete; + + /// @brief Blocks until a slot is free, then writes the item into the queue. + /// @detail Producers claim slots via fetch_add on m_head, and sleep inside + /// m_spaces.wait() when all slots are occupied. + /// The turn counter ensures a slot cannot be written until the + /// previous consumer has finished reading it. + /// @param timeout Maximum time to wait for a free slot. Zero means wait forever. + /// @return true if the item was pushed, false if stop() was called or the + /// timeout expired before a slot became available (item is not enqueued). + /// Note: If the push returns false, the object is still valid for + /// the user. + [[nodiscard]] bool push(T&& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + { + return push_impl(std::move(item), timeout); + } + + /// @brief Blocks until a slot is free, then writes the item into the queue. + /// @detail Producers claim slots via fetch_add on m_head, and sleep inside + /// m_spaces.wait() when all slots are occupied. + /// The turn counter ensures a slot cannot be written until the + /// previous consumer has finished reading it. + /// @param timeout Maximum time to wait for a free slot. Zero means wait forever. + /// @return true if the item was pushed, false if stop() was called or the + /// timeout expired before a slot became available (item is not enqueued). + [[nodiscard]] bool push(const T& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + { + return push_impl(item, timeout); + } + + /// @brief Signals all blocked pop() callers to return std::nullopt. + void stop() + { + m_stopped.store(true, std::memory_order_relaxed); + + // signal to consumers and publishers to wakeup + static_cast(m_items.post()); + static_cast(m_spaces.post()); + } + + /// @brief Blocks until an item is available or stop() is called. + /// @detail Consumers claim slots via fetch_add on m_tail and sleep + /// inside m_items.wait() when the queue is empty. + /// When stopped returns std::nullopt. + /// @return The next item, or std::nullopt if stop() was called. + [[nodiscard]] std::optional pop() + { + if (m_items.wait() != osal::OsalReturnType::kSuccess) + { + return std::nullopt; + } + + if (m_stopped.load(std::memory_order_relaxed)) + { + static_cast(m_items.post()); + return std::nullopt; + } + + return consume_slot(m_tail.fetch_add(1, std::memory_order_relaxed)); + } + + private: + /// @brief Spins until the slot at tail is ready, moves the item out, and + /// releases the slot. + T consume_slot(std::size_t tail) + { + auto& slot = m_slots[tail & (Capacity - 1U)]; + + // odd turns belong to consumer, +1 relative to the producer's even turn + const auto expected_turn = ((tail / Capacity) * 2) + 1; + + while (slot.turn.load(std::memory_order_acquire) != expected_turn) + { + // small spin, only fires if a prior producer claimed this slot but + // was preempted before completing its write and a full turn + // happened, should rarely happen + } + ANNOTATE_HAPPENS_AFTER(&slot.turn); + + T item = std::move_if_noexcept(slot.item); + + ANNOTATE_HAPPENS_BEFORE(&slot.turn); + // release store signals that the slot is now free for the next producer. + slot.turn.store(expected_turn + 1, std::memory_order_release); + + static_cast(m_spaces.post()); + return item; + } + + template + [[nodiscard]] bool push_impl(U&& item, std::chrono::milliseconds timeout) + { + const auto wait_result = (timeout == std::chrono::milliseconds{0}) + ? m_spaces.wait() + : m_spaces.timedWait(timeout); + + if (wait_result != osal::OsalReturnType::kSuccess) + { + return false; + } + + if (m_stopped.load(std::memory_order_relaxed)) + { + // chain-wake the next blocked producer then discard the item + static_cast(m_spaces.post()); + return false; + } + + const auto head = m_head.fetch_add(1, std::memory_order_relaxed); + auto& slot = m_slots[head & (Capacity - 1U)]; + + // even turns belong to producers, odd turns to consumers + // each lap of the ring increments the expected turn by 2 + const auto expected_turn = (head / Capacity) * 2; + + while (slot.turn.load(std::memory_order_acquire) != expected_turn) + { + // small spin, only fires if a prior producer claimed this slot but + // was preempted before completing its write and a full turn + // happened, should rarely happen + } + ANNOTATE_HAPPENS_AFTER(&slot.turn); + + slot.item = std::forward(item); + + ANNOTATE_HAPPENS_BEFORE(&slot.turn); + slot.turn.store(expected_turn + 1, std::memory_order_release); + + static_cast(m_items.post()); + return true; + + } + + /// @brief Underlying storage. + std::array m_slots; + + /// @brief The front of the queue. + /// @detail Aligned so that m_head and m_tail do not share a cache line. + alignas(CacheLineSize) std::atomic m_head{0}; + + /// @brief The back of the queue. + /// @detail Aligned so that m_head and m_tail do not share a cache line. + alignas(CacheLineSize) std::atomic m_tail{0}; + + /// @brief Set to true by stop(); causes push() to return false and pop() to + /// return std::nullopt instead of blocking. + /// @detail Aligned on its own cache line so that the single stop() write + /// does not cause false sharing with m_tail updates in pop(). + alignas(CacheLineSize) std::atomic m_stopped{false}; + + /// @brief Counts items currently in the queue; consumers block on this when + /// the queue is empty. + osal::Semaphore m_items{}; + + /// @brief Counts empty slots available; producers block on this when the + /// queue is full. + osal::Semaphore m_spaces{}; +}; + +} // namespace score::lcm::internal + +#endif // MPMC_CONCURRENT_QUEUE_HPP_INCLUDED diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp new file mode 100644 index 00000000..a5646f9d --- /dev/null +++ b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp @@ -0,0 +1,260 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace score::lcm::internal; + +class MPMCConcurrentQueueTest_Basic : public ::testing::Test +{ + protected: + MPMCConcurrentQueue queue_; +}; + +TEST_F(MPMCConcurrentQueueTest_Basic, PushAndPopSingleItem) +{ + RecordProperty("Description", "Verify that a single pushed item is returned by pop with its value intact."); + ASSERT_TRUE(queue_.push(42)); + auto result = queue_.pop(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 42); +} + +TEST_F(MPMCConcurrentQueueTest_Basic, PushAndPopPreservesOrder) +{ + RecordProperty("Description", "Verify that items are dequeued in FIFO order."); + for (int i = 0; i < 5; ++i) + { + ASSERT_TRUE(queue_.push(i)); + } + for (int i = 0; i < 5; ++i) + { + auto result = queue_.pop(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, i); + } +} + +TEST_F(MPMCConcurrentQueueTest_Basic, LvaluePushCopiesItem) +{ + RecordProperty("Description", "Verify that pushing an lvalue copies the item, leaving the source unchanged."); + int value = 42; + ASSERT_TRUE(queue_.push(value)); + EXPECT_EQ(value, 42); + auto result = queue_.pop(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 42); +} + +TEST_F(MPMCConcurrentQueueTest_Basic, RvaluePushWorksWithMoveOnlyType) +{ + RecordProperty("Description", + "Verify that rvalue push works with a move-only type and moves the item into the queue."); + MPMCConcurrentQueue, 4> queue; + auto item = std::make_unique(99); + ASSERT_TRUE(queue.push(std::move(item))); + EXPECT_EQ(item, nullptr); + auto result = queue.pop(); + ASSERT_TRUE(result.has_value()); + ASSERT_NE(*result, nullptr); + EXPECT_EQ(**result, 99); +} + +class MPMCConcurrentQueueTest_Stop : public ::testing::Test +{ + protected: + MPMCConcurrentQueue queue_; +}; + +TEST_F(MPMCConcurrentQueueTest_Stop, PushReturnsFalseAfterStop) +{ + RecordProperty("Description", "Verify that push returns false once stop() has been called."); + queue_.stop(); + EXPECT_FALSE(queue_.push(1)); +} + +TEST_F(MPMCConcurrentQueueTest_Stop, PopReturnsNulloptWhenStoppedAndEmpty) +{ + RecordProperty("Description", "Verify that pop returns nullopt immediately when the queue is stopped and empty."); + queue_.stop(); + EXPECT_FALSE(queue_.pop().has_value()); +} + +TEST_F(MPMCConcurrentQueueTest_Stop, ItemsAreDroppedAfterStop) +{ + RecordProperty("Description", "Verify that pop() returns nullopt after stop(), dropping any remaining items."); + ASSERT_TRUE(queue_.push(1)); + ASSERT_TRUE(queue_.push(2)); + ASSERT_TRUE(queue_.push(3)); + queue_.stop(); + + EXPECT_FALSE(queue_.pop().has_value()); +} + +class MPMCConcurrentQueueTest_Blocking : public ::testing::Test +{ + protected: + MPMCConcurrentQueue queue4_; + MPMCConcurrentQueue queue8_; +}; + +TEST_F(MPMCConcurrentQueueTest_Blocking, PopBlocksUntilItemAvailable) +{ + RecordProperty("Description", "Verify that pop blocks on an empty queue until a producer pushes an item."); + std::optional result; + + std::thread consumer([&] { + result = queue8_.pop(); + }); + + EXPECT_FALSE(result.has_value()); + + std::ignore = queue8_.push(7); + consumer.join(); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 7); +} + +TEST_F(MPMCConcurrentQueueTest_Blocking, PushBlocksWhenFull) +{ + RecordProperty("Description", "Verify that push blocks when the queue is full until a consumer pops an item."); + for (int i = 0; i < 4; ++i) + { + ASSERT_TRUE(queue4_.push(i)); + } + + std::atomic pushed{false}; + std::thread producer([&] { + pushed.store(queue4_.push(99), std::memory_order_release); + }); + + EXPECT_FALSE(pushed.load(std::memory_order_acquire)); + + std::ignore = queue4_.pop(); + producer.join(); + + EXPECT_TRUE(pushed.load(std::memory_order_acquire)); +} + +TEST_F(MPMCConcurrentQueueTest_Blocking, StopUnblocksBlockedConsumer) +{ + RecordProperty("Description", "Verify that stop() unblocks a consumer thread waiting on an empty queue."); + std::optional result; + + std::thread consumer([&] { + result = queue8_.pop(); + }); + + queue8_.stop(); + consumer.join(); + + EXPECT_FALSE(result.has_value()); +} + +TEST_F(MPMCConcurrentQueueTest_Blocking, StopUnblocksBlockedProducer) +{ + RecordProperty("Description", "Verify that stop() unblocks a producer thread waiting on a full queue."); + for (int i = 0; i < 4; ++i) + { + ASSERT_TRUE(queue4_.push(i)); + } + + bool pushed = true; + std::thread producer([&] { + pushed = queue4_.push(99); + }); + + queue4_.stop(); + producer.join(); + + EXPECT_FALSE(pushed); +} + +class MPMCConcurrentQueueTest_MPMC : public ::testing::Test +{ + protected: + static constexpr std::size_t kCapacity = 64U; + static constexpr int kItemsPerProducer = 50; + static constexpr std::size_t kNumProducers = 4U; + static constexpr std::size_t kNumConsumers = 4U; + static constexpr int kTotalItems = static_cast(kItemsPerProducer * kNumProducers); + + MPMCConcurrentQueue queue_; +}; + +TEST_F(MPMCConcurrentQueueTest_MPMC, AllItemsDelivered) +{ + RecordProperty("Description", + "Verify that all items pushed by multiple concurrent producers are received " + "by multiple concurrent consumers without loss or duplication."); + std::atomic received_count{0}; + + auto producer_fn = [this](int value) { + for (int i = 0; i < kItemsPerProducer; ++i) + { + EXPECT_TRUE(queue_.push(value)); + } + }; + + auto consumer_fn = [this, &received_count] { + while (true) + { + auto item = queue_.pop(); + if (!item.has_value()) + { + break; + } + received_count.fetch_add(1, std::memory_order_relaxed); + } + }; + + std::vector producers; + for (std::size_t i = 0U; i < kNumProducers; ++i) + { + producers.emplace_back(producer_fn, static_cast(i)); + } + + std::vector consumers; + for (std::size_t i = 0U; i < kNumConsumers; ++i) + { + consumers.emplace_back(consumer_fn); + } + + for (auto& t : producers) + { + t.join(); + } + + while (received_count.load(std::memory_order_acquire) < kTotalItems) + { + std::this_thread::yield(); + } + queue_.stop(); + + for (auto& t : consumers) + { + t.join(); + } + + EXPECT_EQ(received_count.load(), kTotalItems); +} diff --git a/src/launch_manager_daemon/src/process_group_manager/graph.cpp b/src/launch_manager_daemon/src/process_group_manager/graph.cpp index 5866c0b5..a8b6b464 100644 --- a/src/launch_manager_daemon/src/process_group_manager/graph.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/graph.cpp @@ -185,12 +185,11 @@ inline void Graph::queueHeadNodesForExecution() { inline void Graph::tryQueueNode(const std::shared_ptr& node) { while (GraphState::kInTransition == getState()) { - if (pgm_->getWorkerJobs()->addJobToQueue(node)) { + if (pgm_->getWorkerJobs()->push(node, kMaxQueueDelay)) { markNodeInFlight(); - break; - } else { - LM_LOG_WARN() << "Failed to add job to queue. Queue may be full or wait time too short."; - // Retry mechanism: continues looping until the job is queued successfully + } + else{ + LM_LOG_WARN() << "Failed to queue node for execution"; } } } diff --git a/src/launch_manager_daemon/src/process_group_manager/jobqueue.cpp b/src/launch_manager_daemon/src/process_group_manager/jobqueue.cpp deleted file mode 100644 index 8bad7e9d..00000000 --- a/src/launch_manager_daemon/src/process_group_manager/jobqueue.cpp +++ /dev/null @@ -1,90 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2025 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - -#include -#include -#include -#include "jobqueue.hpp" - -namespace score { - -namespace lcm { - -namespace internal { - -template -JobQueue::JobQueue(std::size_t capacity) - : num_items_(), num_spaces_(), in_index_(0), out_index_(0), capacity_(capacity), the_items_(capacity) { - num_items_.init(0U, false); - num_spaces_.init(static_cast(capacity & 0xFFFFFFFFUL), false); -} - -template -JobQueue::~JobQueue() { - num_spaces_.deinit(); - num_items_.deinit(); -} - -template -std::shared_ptr JobQueue::getJobFromQueue() { - std::shared_ptr result; - - if (osal::OsalReturnType::kSuccess == num_items_.wait() && is_running_) { - std::size_t index = static_cast(out_index_.fetch_add(1U, std::memory_order_relaxed)) % capacity_; - - result = std::atomic_load_explicit(&the_items_[index], std::memory_order_acquire); - num_spaces_.post(); - } - - return result; -} - -template -bool JobQueue::addJobToQueue(std::shared_ptr job) { - bool result = false; - - if (osal::OsalReturnType::kSuccess == num_spaces_.timedWait(score::lcm::internal::kMaxQueueDelay)) { - std::size_t index = static_cast(in_index_.fetch_add(1U, std::memory_order_relaxed)) % capacity_; - - std::atomic_store_explicit(&the_items_[index], job, std::memory_order_release); - num_items_.post(); - result = true; - } - - return result; -} - -template -void JobQueue::stopQueue(std::size_t nr_threads) { - is_running_ = false; - - // Wake up all threads servicing this queue. - // We do this by calling post as many times as we have threads. - for (std::size_t i = 0; i < nr_threads; ++i) { - num_items_.post(); - } -} - -template -bool JobQueue::isRunning() const { - return is_running_; -} - -class ProcessInfoNode; -template class JobQueue; - -} // namespace lcm - -} // namespace internal - -} // namespace score diff --git a/src/launch_manager_daemon/src/process_group_manager/jobqueue.hpp b/src/launch_manager_daemon/src/process_group_manager/jobqueue.hpp deleted file mode 100644 index bc5aac50..00000000 --- a/src/launch_manager_daemon/src/process_group_manager/jobqueue.hpp +++ /dev/null @@ -1,108 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2025 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - - -#ifndef JOB_QUEUE_HPP_INCLUDED -#define JOB_QUEUE_HPP_INCLUDED - -#include -#include - -#include -#include - -namespace score { - -namespace lcm { - -namespace internal { - -/// @brief A thread-safe job queue for managing and distributing jobs to worker threads. -/// The JobQueue class allows multiple threads to enqueue jobs, which are then executed -/// by multiple worker threads. It ensures thread safety while handling job pointers. -/// @tparam T The type of the items in the job queue (pointers to T). -template -class JobQueue final { - public: - /// @brief Constructs a JobQueue with a specified capacity. - /// Initializes the job queue to handle a maximum number of items defined by the capacity. - /// @param capacity The maximum number of items the queue can hold. - explicit JobQueue(std::size_t capacity); - - ///@brief Destructor that stops and destroy the timeout thread, ensuring no resources are left dangling. - ~JobQueue(); - - // Rule of five - /// @brief Copy constructor is deleted to prevent copying. - JobQueue(const JobQueue&) = delete; - - /// @brief Copy assignment operator is deleted to prevent copying. - JobQueue& operator=(const JobQueue&) = delete; - - /// @brief Move constructor is deleted to prevent moving. - JobQueue(JobQueue&&) = delete; - - /// @brief Move assignment operator is deleted to prevent moving. - JobQueue& operator=(JobQueue&&) = delete; - - /// @brief Adds a job to the queue, waiting for space to become available if necessary. - /// This operation is thread-safe and will wait up to kMaxQueueDelay milliseconds for space to become available. - /// @param job The item to add to the queue. - /// @return true if the job was successfully added, false if the operation timed out. - bool addJobToQueue(std::shared_ptr job); - - /// @brief Retrieves a job from the queue, waiting if necessary until a job is available. - /// This operation is thread-safe and will wait up to kMaxQueueDelay milliseconds for a job to become available. - /// @return A pointer to the job, or nullptr if the operation timed out. - std::shared_ptr getJobFromQueue(); - - /// @brief Stops the job queue, unblocking any waiting threads. - /// This method sets the running state to false and posts to the item semaphore - /// to unblock any threads waiting to retrieve jobs. - /// @param nr_threads The number of threads to unblock. - void stopQueue(std::size_t nr_threads); - - /// @brief Checks if the job queue is currently running. - /// @return - bool isRunning() const; - - private: - /// @brief Semaphore used to wait for an item to remove from the queue - osal::Semaphore num_items_; - - /// @brief Semaphore used to wait for space to place an item in the queue - osal::Semaphore num_spaces_; - - /// @brief Index of the next free space in the queue - std::atomic_uint32_t in_index_{0U}; - - /// @brief Index of the next available item in the queue - std::atomic_uint32_t out_index_{0U}; - - /// @brief The maximum capacity of the queue. - std::size_t capacity_; - - /// @brief Array of items that forms the queue - std::vector> the_items_{}; - - /// @brief Atomic flag indicating whether the queue is running - std::atomic_bool is_running_{true}; -}; - -} // namespace lcm - -} // namespace internal - -} // namespace score - -#endif /// JOB_QUEUE_HPP_INCLUDED diff --git a/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.cpp b/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.cpp index 520e6ef9..5c9a1b74 100644 --- a/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.cpp @@ -20,13 +20,7 @@ #include #include -namespace score -{ - -namespace lcm -{ - -namespace internal +namespace score::lcm::internal { using namespace score::lcm::internal::osal; @@ -238,8 +232,8 @@ inline void ProcessGroupManager::createProcessComponentsObjects() LM_LOG_DEBUG() << "Creating Safe Process Map with" << total_processes_ << "entries"; process_map_ = std::make_shared(total_processes_); - LM_LOG_DEBUG() << "Creating job queue with" << total_processes_ << "entries"; - worker_jobs_ = std::make_shared>(total_processes_); + LM_LOG_DEBUG() << "Creating job queue with capacity" << static_cast(ProcessLimits::kMaxProcesses); + worker_jobs_ = std::make_shared(); LM_LOG_DEBUG() << "Creating worker threads..."; worker_threads_ = std::make_unique>( @@ -364,7 +358,7 @@ inline void ProcessGroupManager::allProcessGroupsOff() if (!waitForStateCompletion(process_groups_, GraphState::kInTransition, 1000)) { LM_LOG_ERROR() << "NOTE: Transition to Off state timed out"; - worker_jobs_->stopQueue(static_cast(ProcessLimits::kNumWorkerThreads)); + worker_threads_->stop(); for (auto& pg : process_groups_) { for (auto& node : pg->getNodes()) @@ -704,7 +698,7 @@ inline void ProcessGroupManager::processGroupHandler(Graph& pg) } } - if (GraphState::kUndefinedState == pg.getState()) + if (GraphState::kUndefinedState == pg.getState()) { // at the moment graph is not running... // i.e. it is not in kInTransition, kAborting or kCancelled state @@ -785,13 +779,9 @@ std::shared_ptr ProcessGroupManager::getProcessMap() return process_map_; } -std::shared_ptr> ProcessGroupManager::getWorkerJobs() +std::shared_ptr ProcessGroupManager::getWorkerJobs() { return worker_jobs_; } -} // namespace internal - -} // namespace lcm - -} // namespace score +} // namespace score::lcm::internal diff --git a/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.hpp b/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.hpp index fd72bcb6..2143c9b0 100644 --- a/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.hpp +++ b/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.hpp @@ -24,7 +24,7 @@ #include #include #include -#include +#include #include #include #include @@ -32,12 +32,10 @@ #include #include #include +#include -namespace score { -namespace lcm { - -namespace internal { +namespace score::lcm::internal { /// @brief ProcessGroupManager provides the core functionality of LCM. /// Software that is deployed to the machine, should be managed through Process Groups. @@ -52,6 +50,8 @@ namespace internal { /// Interaction with OSAL to discover when processes terminated in an unexpected way. /// Fulfilling PG State transitions requests from SM, as well as informing SM about unexpected problems (for example process crashes). class ProcessGroupManager final { + using WorkerQueue = MPMCConcurrentQueue, + static_cast(ProcessLimits::kMaxProcesses)>; public: /// @brief Constructs a new ProcessGroupManager object. /// @@ -118,8 +118,8 @@ class ProcessGroupManager final { std::shared_ptr getProcessMap(); /// @brief Gets the job queue for worker threads. - /// @return Shared pointer to the JobQueue object for ProcessInfoNode jobs. - std::shared_ptr> getWorkerJobs(); + /// @return Shared pointer to the MpmcQueue object for ProcessInfoNode jobs. + std::shared_ptr getWorkerJobs(); /// @brief Calls QueuePosixProcess method of psn data member /// @details Writes via IPC the latest Process State change, so that PHM can be informed about it. @@ -271,7 +271,7 @@ class ProcessGroupManager final { std::unique_ptr> worker_threads_; /// @brief Shared pointer to the job queue for ProcessInfoNode jobs. - std::shared_ptr> worker_jobs_; + std::shared_ptr worker_jobs_; /// @brief Total number of processes. /// @deprecated there is no reason to store the total number of processes in the class @@ -304,10 +304,7 @@ class ProcessGroupManager final { std::shared_ptr recovery_client_{}; }; -} // namespace lcm - -} // namespace internal +} // namespace score::lcm::internal -} // namespace score #endif /// PROCESSGROUPMANAGER_HPP_INCLUDED diff --git a/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp b/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp index 57f2adb0..8402a5a3 100644 --- a/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp @@ -152,9 +152,8 @@ void ProcessInfoNode::queueTerminationSuccessorJobs() { if (successor_node->is_included_ && successor_node->dependencies_ > 0U && --successor_node->dependencies_ == 0U) { while (graph_->getState() == GraphState::kInTransition) { - if (graph_->getProcessGroupManager()->getWorkerJobs()->addJobToQueue(successor_node)) { + if(graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay)){ graph_->markNodeInFlight(); - break; } } } @@ -379,10 +378,9 @@ void ProcessInfoNode::processSuccessorNodes() { void ProcessInfoNode::checkForEmptyDependencies(std::shared_ptr& successor_node) { if (0U == --successor_node->dependencies_) { - while (graph_->getState() == GraphState::kInTransition) { - if (graph_->getProcessGroupManager()->getWorkerJobs()->addJobToQueue(successor_node)) { + if (graph_->getState() == GraphState::kInTransition) { + if(graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node)){ graph_->markNodeInFlight(); - break; } } } diff --git a/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp b/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp index fa0249b2..760cc667 100644 --- a/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp @@ -21,11 +21,11 @@ namespace lcm { namespace internal { template -WorkerThread::WorkerThread(std::shared_ptr> queue, uint32_t num_threads) - : the_job_queue_(queue), num_threads_(num_threads) { - worker_threads_.reserve(num_threads_); +WorkerThread::WorkerThread(std::shared_ptr queue, uint32_t num_threads) + : the_job_queue_(queue) { + worker_threads_.reserve(num_threads); - for (uint32_t i = 0U; i < num_threads_; ++i) { + for (uint32_t i = 0U; i < num_threads; ++i) { static_cast(i); worker_threads_.emplace_back(std::make_unique(&WorkerThread::run, this)); } @@ -33,7 +33,7 @@ WorkerThread::WorkerThread(std::shared_ptr> queue, uint32_t num_t template WorkerThread::~WorkerThread() { - the_job_queue_->stopQueue(num_threads_); + stop(); for (auto& thread : worker_threads_) { if (thread->joinable()) { @@ -43,12 +43,16 @@ WorkerThread::~WorkerThread() { } template -void WorkerThread::run() { - while (the_job_queue_->isRunning()) { - auto job = the_job_queue_->getJobFromQueue(); +void WorkerThread::stop() { + the_job_queue_->stop(); +} - if (job) { - job->doWork(); +template +void WorkerThread::run() { + while (auto job = the_job_queue_->pop()) { + if(*job) + { + (*job)->doWork(); } } } diff --git a/src/launch_manager_daemon/src/process_group_manager/workerthread.hpp b/src/launch_manager_daemon/src/process_group_manager/workerthread.hpp index e8a12d55..12a7cbc7 100644 --- a/src/launch_manager_daemon/src/process_group_manager/workerthread.hpp +++ b/src/launch_manager_daemon/src/process_group_manager/workerthread.hpp @@ -11,37 +11,36 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ - #ifndef WORKER_THREAD_HPP_INCLUDED #define WORKER_THREAD_HPP_INCLUDED -#include +#include +#include #include #include #include -#include - -namespace score { -namespace lcm { - -namespace internal { +namespace score::lcm::internal +{ /// @brief Templated worker thread pool for executing jobs from a queue. /// This class manages a pool of worker threads that continuously retrieve and execute jobs -/// from a JobQueue until the pool is destructed or stopped. -/// @tparam T The type of items stored in the JobQueue. +/// from an MPMCConcurrentQueue until the pool is stopped or destructed. +/// @tparam T The type of items stored in the queue (as std::shared_ptr). template -class WorkerThread final { - public: +class WorkerThread final +{ + using Queue = MPMCConcurrentQueue, static_cast(ProcessLimits::kMaxProcesses)>; + + public: /// @brief Constructs a WorkerThread pool with the specified number of threads. /// - /// @param queue The JobQueue from which threads will take work items. + /// @param queue The MpmcQueue from which threads will take work items. /// @param num_threads Number of threads in the pool. - WorkerThread(std::shared_ptr> queue, uint32_t num_threads); + WorkerThread(std::shared_ptr queue, uint32_t num_threads); /// @brief Destructor. - /// Ensures all threads exit gracefully by setting is_running_ to false and joining each thread. + /// Requests stop and joins all worker threads. ~WorkerThread(); // Rule of five @@ -57,25 +56,22 @@ class WorkerThread final { /// @brief Move assignment operator is deleted to prevent moving. WorkerThread& operator=(WorkerThread&&) = delete; - private: + /// @brief Requests all worker threads to stop. + /// Calls stop() on the queue, which unblocks all threads waiting in pop(). + void stop(); + + private: /// @brief Entry point for each worker thread. - /// Threads continuously retrieve and execute jobs from the_job_queue_ until is_running_ becomes false. + /// Blocks on pop() until a job arrives or the queue is stopped, then executes the job. void run(); /// @brief The queue from which each thread takes work. - std::shared_ptr> the_job_queue_{}; - - /// @brief Number of threads in the pool. Necessary to store this from the constructor to the destructor. - uint32_t num_threads_{}; + std::shared_ptr the_job_queue_{}; /// @brief Vector of worker threads. std::vector> worker_threads_{}; }; -} // namespace lcm - -} // namespace internal - -} // namespace score +} // namespace score::lcm::internal #endif // WORKER_THREAD_HPP_INCLUDED From 3031a8d023cb16c388ae5826820536b076f2b6f0 Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Tue, 28 Apr 2026 16:08:29 +0100 Subject: [PATCH 02/13] Fixing missing breaks --- .../src/process_group_manager/graph.cpp | 1 + .../src/process_group_manager/processinfonode.cpp | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/launch_manager_daemon/src/process_group_manager/graph.cpp b/src/launch_manager_daemon/src/process_group_manager/graph.cpp index a8b6b464..e0ac70de 100644 --- a/src/launch_manager_daemon/src/process_group_manager/graph.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/graph.cpp @@ -187,6 +187,7 @@ inline void Graph::tryQueueNode(const std::shared_ptr& node) { while (GraphState::kInTransition == getState()) { if (pgm_->getWorkerJobs()->push(node, kMaxQueueDelay)) { markNodeInFlight(); + break; } else{ LM_LOG_WARN() << "Failed to queue node for execution"; diff --git a/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp b/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp index 8402a5a3..fab5606b 100644 --- a/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp @@ -154,6 +154,7 @@ void ProcessInfoNode::queueTerminationSuccessorJobs() { while (graph_->getState() == GraphState::kInTransition) { if(graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay)){ graph_->markNodeInFlight(); + break; } } } @@ -378,9 +379,10 @@ void ProcessInfoNode::processSuccessorNodes() { void ProcessInfoNode::checkForEmptyDependencies(std::shared_ptr& successor_node) { if (0U == --successor_node->dependencies_) { - if (graph_->getState() == GraphState::kInTransition) { - if(graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node)){ + while (graph_->getState() == GraphState::kInTransition) { + if(graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay)){ graph_->markNodeInFlight(); + break; } } } From f6922e48ce44472d65e078ddcbad2e6871f960ce Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Tue, 28 Apr 2026 16:23:19 +0100 Subject: [PATCH 03/13] Adding a no-coverage tag --- .bazelrc | 1 + src/launch_manager_daemon/src/concurrency/BUILD | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.bazelrc b/.bazelrc index 10361b9b..9c36d24f 100644 --- a/.bazelrc +++ b/.bazelrc @@ -29,6 +29,7 @@ test --cxxopt=-Wno-deprecated-declarations coverage --features=coverage coverage --combined_report=lcov coverage --cache_test_results=no +coverage --test_tag_filters=-no-coverage # Common Lifecycle Toolchain flags for build (do not use it in case of system toolchains!) build:toolchain_common --incompatible_strict_action_env diff --git a/src/launch_manager_daemon/src/concurrency/BUILD b/src/launch_manager_daemon/src/concurrency/BUILD index b68f6b38..ba8f4555 100644 --- a/src/launch_manager_daemon/src/concurrency/BUILD +++ b/src/launch_manager_daemon/src/concurrency/BUILD @@ -67,7 +67,7 @@ cc_test( "-g", ], linkopts = ["-fsanitize=thread"], - tags = ["tsan"], + tags = ["tsan", "no-coverage"], visibility = ["//tests:__subpackages__"], deps = [ ":mpmc_concurrent_queue", From 50f3cecc30a7cbcaae23d3cb72c8278e8fabb80c Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Wed, 29 Apr 2026 09:04:13 +0100 Subject: [PATCH 04/13] renaminb head and tail to fit standard --- .../src/concurrency/mpmc_concurrent_queue.hpp | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp index ff2a2767..ad8ce09e 100644 --- a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp +++ b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp @@ -50,8 +50,8 @@ class MPMCConcurrentQueue // optimization to work out the turns static_assert((Capacity & (Capacity - 1U)) == 0U, "Capacity must be a power of 2"); - /// @brief Cache line size used to prevent false sharing between m_head, - /// m_tail, and m_stopped. + /// @brief Cache line size used to prevent false sharing between m_tail, + /// m_head, and m_stopped. /// Defined as a literal rather than std::hardware_destructive_interference_size to avoid /// -Winterference-size: that value is not ABI-stable across compiler versions and tuning flags. /// 64 bytes is the cache line size on all targeted x86_64 and aarch64 platforms. @@ -92,7 +92,7 @@ class MPMCConcurrentQueue MPMCConcurrentQueue& operator=(MPMCConcurrentQueue&&) = delete; /// @brief Blocks until a slot is free, then writes the item into the queue. - /// @detail Producers claim slots via fetch_add on m_head, and sleep inside + /// @detail Producers claim slots via fetch_add on m_tail, and sleep inside /// m_spaces.wait() when all slots are occupied. /// The turn counter ensures a slot cannot be written until the /// previous consumer has finished reading it. @@ -107,7 +107,7 @@ class MPMCConcurrentQueue } /// @brief Blocks until a slot is free, then writes the item into the queue. - /// @detail Producers claim slots via fetch_add on m_head, and sleep inside + /// @detail Producers claim slots via fetch_add on m_tail, and sleep inside /// m_spaces.wait() when all slots are occupied. /// The turn counter ensures a slot cannot be written until the /// previous consumer has finished reading it. @@ -130,7 +130,7 @@ class MPMCConcurrentQueue } /// @brief Blocks until an item is available or stop() is called. - /// @detail Consumers claim slots via fetch_add on m_tail and sleep + /// @detail Consumers claim slots via fetch_add on m_head and sleep /// inside m_items.wait() when the queue is empty. /// When stopped returns std::nullopt. /// @return The next item, or std::nullopt if stop() was called. @@ -147,7 +147,7 @@ class MPMCConcurrentQueue return std::nullopt; } - return consume_slot(m_tail.fetch_add(1, std::memory_order_relaxed)); + return consume_slot(m_head.fetch_add(1, std::memory_order_relaxed)); } private: @@ -197,12 +197,12 @@ class MPMCConcurrentQueue return false; } - const auto head = m_head.fetch_add(1, std::memory_order_relaxed); - auto& slot = m_slots[head & (Capacity - 1U)]; + const auto tail = m_tail.fetch_add(1, std::memory_order_relaxed); + auto& slot = m_slots[tail & (Capacity - 1U)]; // even turns belong to producers, odd turns to consumers // each lap of the ring increments the expected turn by 2 - const auto expected_turn = (head / Capacity) * 2; + const auto expected_turn = (tail / Capacity) * 2; while (slot.turn.load(std::memory_order_acquire) != expected_turn) { @@ -225,18 +225,18 @@ class MPMCConcurrentQueue /// @brief Underlying storage. std::array m_slots; - /// @brief The front of the queue. + /// @brief The front of the queue; claimed by consumers via fetch_add in pop. /// @detail Aligned so that m_head and m_tail do not share a cache line. alignas(CacheLineSize) std::atomic m_head{0}; - /// @brief The back of the queue. + /// @brief The back of the queue; claimed by producers via fetch_add in push_impl. /// @detail Aligned so that m_head and m_tail do not share a cache line. alignas(CacheLineSize) std::atomic m_tail{0}; /// @brief Set to true by stop(); causes push() to return false and pop() to /// return std::nullopt instead of blocking. /// @detail Aligned on its own cache line so that the single stop() write - /// does not cause false sharing with m_tail updates in pop(). + /// does not cause false sharing with m_tail updates in push_impl(). alignas(CacheLineSize) std::atomic m_stopped{false}; /// @brief Counts items currently in the queue; consumers block on this when From d35b1ce8cd2b61f16c6736e6837b02c0fb319bee Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Wed, 29 Apr 2026 11:17:46 +0100 Subject: [PATCH 05/13] Getting 100% coverage --- .../mpmc_concurrent_queue_test.cpp | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp index a5646f9d..8fb7240f 100644 --- a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp +++ b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp @@ -16,8 +16,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -79,6 +81,71 @@ TEST_F(MPMCConcurrentQueueTest_Basic, RvaluePushWorksWithMoveOnlyType) EXPECT_EQ(**result, 99); } +TEST_F(MPMCConcurrentQueueTest_Basic, PopReturnsNulloptOnSemaphoreWaitFailure) +{ + RecordProperty("Description", + "Verify that pop returns nullopt when the internal semaphore wait is " + "interrupted by a signal (sem_wait returns EINTR, triggering the kFail path)."); + + // Install a no-op handler without SA_RESTART so sem_wait is NOT restarted + // after signal delivery and returns -1 with EINTR instead. + struct sigaction sa{}; + sa.sa_handler = [](int) {}; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGUSR1, &sa, nullptr); + + std::optional result; + std::atomic tid_ready{false}; + + std::thread consumer([&] { + tid_ready.store(true, std::memory_order_release); + result = queue_.pop(); + }); + + // Obtain the pthread_t from the owning thread to avoid a cross-thread + // write to consumer_tid that Helgrind would flag as a data race. + pthread_t consumer_tid = consumer.native_handle(); + + while (!tid_ready.load(std::memory_order_acquire)) + { + std::this_thread::yield(); + } + // Allow the consumer to reach sem_wait before delivering the signal. + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + pthread_kill(consumer_tid, SIGUSR1); + consumer.join(); + + EXPECT_FALSE(result.has_value()); + + signal(SIGUSR1, SIG_DFL); +} + +class MPMCConcurrentQueueTest_Timeout : public ::testing::Test +{ + protected: + MPMCConcurrentQueue queue4_; +}; + +TEST_F(MPMCConcurrentQueueTest_Timeout, PushWithTimeoutSucceedsWhenSlotAvailable) +{ + RecordProperty("Description", "Verify that push with a non-zero timeout succeeds when a slot is free."); + EXPECT_TRUE(queue4_.push(1, std::chrono::milliseconds{100})); + auto result = queue4_.pop(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 1); +} + +TEST_F(MPMCConcurrentQueueTest_Timeout, PushWithTimeoutReturnsFalseWhenFull) +{ + RecordProperty("Description", "Verify that push with a non-zero timeout returns false when the queue is full and the timeout expires."); + for (int i = 0; i < 4; ++i) + { + ASSERT_TRUE(queue4_.push(i)); + } + EXPECT_FALSE(queue4_.push(99, std::chrono::milliseconds{20})); +} + class MPMCConcurrentQueueTest_Stop : public ::testing::Test { protected: From 02e8856efb7e7a7cf788d0137794104f8d6c639c Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Wed, 29 Apr 2026 15:32:26 +0100 Subject: [PATCH 06/13] Adding results --- .../src/concurrency/BUILD | 21 ++- .../concurrency/concurrency_error_domain.hpp | 68 +++++++++ .../src/concurrency/mpmc_concurrent_queue.hpp | 132 +++++++++++------- .../mpmc_concurrent_queue_test.cpp | 35 +++-- 4 files changed, 191 insertions(+), 65 deletions(-) create mode 100644 src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp diff --git a/src/launch_manager_daemon/src/concurrency/BUILD b/src/launch_manager_daemon/src/concurrency/BUILD index ba8f4555..6162fd93 100644 --- a/src/launch_manager_daemon/src/concurrency/BUILD +++ b/src/launch_manager_daemon/src/concurrency/BUILD @@ -22,12 +22,19 @@ cc_library( cc_library( name = "mpmc_concurrent_queue", - hdrs = ["mpmc_concurrent_queue.hpp"], + hdrs = ["mpmc_concurrent_queue.hpp", + "concurrency_error_domain.hpp" + ], + + linkopts = [ + "-latomic", + ], strip_include_prefix = "//src/launch_manager_daemon/src", visibility = ["//src:__subpackages__"], deps = [ ":helgrind_annotations", "//src/launch_manager_daemon/common:osal", + "@score_baselibs//score/result", ], ) @@ -42,11 +49,14 @@ cc_test( ) # Run with: -# bazel test --run_under="valgrind --tool=helgrind" //src/launch_manager_daemon/src/concurrency:mpmc_concurrent_queue_helgrind_test --config=host --test_output=all -# Note: This is using your host pacakges so need to install valgrind +# bazel test --run_under="valgrind --tool=helgrind --suppressions=src/launch_manager_daemon/src/concurrency/helgrind.supp --error-exitcode=1" //src/launch_manager_daemon/src/concurrency:mpmc_concurrent_queue_helgrind_test --config=host --test_output=all +# Note: This is using your host packages so you need to install valgrind. +# Two tests intentionally exercise semaphore failure paths (EINTR / EAGAIN). +# Those expected PthAPIerror reports are silenced via helgrind.supp. cc_test( name = "mpmc_concurrent_queue_helgrind_test", srcs = ["mpmc_concurrent_queue_test.cpp"], + data = ["helgrind.supp"], tags = [ "helgrind", "manual", @@ -67,7 +77,10 @@ cc_test( "-g", ], linkopts = ["-fsanitize=thread"], - tags = ["tsan", "no-coverage"], + tags = [ + "no-coverage", # coverage + tsan might cause problems + "tsan", + ], visibility = ["//tests:__subpackages__"], deps = [ ":mpmc_concurrent_queue", diff --git a/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp b/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp new file mode 100644 index 00000000..b5d8dfab --- /dev/null +++ b/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp @@ -0,0 +1,68 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef CONCURRENCY_ERROR_DOMAIN_HPP_INCLUDED +#define CONCURRENCY_ERROR_DOMAIN_HPP_INCLUDED + +#include "score/result/result.h" + +#include + +namespace score::lcm::internal +{ + +enum class ConcurrencyErrc : score::result::ErrorCode +{ + /// @brief An OS call returned an error. + kOsError = 1, + + // @brief The container has overflowed. + kOverflow = 2, + + // @brief The container has overflowed. + kStopped = 3, + + kTimeout = 4, +}; + +class ConcurrencyErrorDomain final : public score::result::ErrorDomain +{ + [[nodiscard]] std::string_view MessageFor(const score::result::ErrorCode& code) const noexcept override + { + switch (static_cast(code)) + { + case ConcurrencyErrc::kOsError: + return "Failed to initialize semaphore"; + + case ConcurrencyErrc::kOverflow: + return "The container has overflowed"; + + case ConcurrencyErrc::kStopped: + return "The container has been stopped"; + + default: + return "Unknown concurrency error"; + } + } +}; + +constexpr ConcurrencyErrorDomain g_ConcurrencyErrorDomain{}; + +constexpr score::result::Error MakeError(ConcurrencyErrc code, const std::string_view user_message = "") noexcept +{ + return score::result::Error{static_cast(code), g_ConcurrencyErrorDomain, user_message}; +} + +} // namespace score::lcm::internal + +#endif // CONCURRENCY_ERROR_DOMAIN_HPP_INCLUDED diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp index ad8ce09e..89597224 100644 --- a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp +++ b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp @@ -20,13 +20,14 @@ #include #include #include -#include #include #include -#include -#include +#include "concurrency_error_domain.hpp" +#include "score/result/result.h" #include +#include +#include namespace score::lcm::internal { @@ -44,8 +45,13 @@ class MPMCConcurrentQueue static_assert(Capacity <= std::numeric_limits::max(), "Capacity exceeds uint32_t range used by the semaphore"); - static_assert(std::is_default_constructible_v, - "T must be default-constructible for in-place slot storage"); + static_assert(std::is_default_constructible_v, "T must be default-constructible for in-place slot storage"); + + static_assert(std::is_nothrow_destructible_v, + "T must be nothrow-destructible to allow consume_slot to be noexcept"); + + static_assert(std::is_nothrow_move_constructible_v, + "T must be nothrow-move-constructible to wrap into std::optional in pop()"); // optimization to work out the turns static_assert((Capacity & (Capacity - 1U)) == 0U, "Capacity must be a power of 2"); @@ -76,12 +82,14 @@ class MPMCConcurrentQueue public: MPMCConcurrentQueue() { + // ignore sem_init error, the subsequent sem_* will fail static_cast(m_items.init(0U, false)); static_cast(m_spaces.init(static_cast(Capacity), false)); } - ~MPMCConcurrentQueue() + ~MPMCConcurrentQueue() noexcept { + // not much we can do static_cast(m_spaces.deinit()); static_cast(m_items.deinit()); } @@ -92,68 +100,90 @@ class MPMCConcurrentQueue MPMCConcurrentQueue& operator=(MPMCConcurrentQueue&&) = delete; /// @brief Blocks until a slot is free, then writes the item into the queue. - /// @detail Producers claim slots via fetch_add on m_tail, and sleep inside - /// m_spaces.wait() when all slots are occupied. - /// The turn counter ensures a slot cannot be written until the - /// previous consumer has finished reading it. + /// @details Producers claim slots via fetch_add on m_tail, and sleep inside + /// m_spaces.wait() when all slots are occupied. + /// The turn counter ensures a slot cannot be written until the + /// previous consumer has finished reading it. /// @param timeout Maximum time to wait for a free slot. Zero means wait forever. - /// @return true if the item was pushed, false if stop() was called or the - /// timeout expired before a slot became available (item is not enqueued). + /// @return Success if item was pushed, Error otherwise. /// Note: If the push returns false, the object is still valid for /// the user. - [[nodiscard]] bool push(T&& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + [[nodiscard]] score::Result push(T&& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) { return push_impl(std::move(item), timeout); } /// @brief Blocks until a slot is free, then writes the item into the queue. - /// @detail Producers claim slots via fetch_add on m_tail, and sleep inside - /// m_spaces.wait() when all slots are occupied. - /// The turn counter ensures a slot cannot be written until the - /// previous consumer has finished reading it. + /// @details Producers claim slots via fetch_add on m_tail, and sleep inside + /// m_spaces.wait() when all slots are occupied. + /// The turn counter ensures a slot cannot be written until the + /// previous consumer has finished reading it. /// @param timeout Maximum time to wait for a free slot. Zero means wait forever. /// @return true if the item was pushed, false if stop() was called or the /// timeout expired before a slot became available (item is not enqueued). - [[nodiscard]] bool push(const T& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + [[nodiscard]] score::Result push(const T& item, + std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) { return push_impl(item, timeout); } - /// @brief Signals all blocked pop() callers to return std::nullopt. - void stop() + /// @brief Signals all blocked pop() callers to return with a stopped error. + [[nodiscard]] score::Result stop() noexcept { m_stopped.store(true, std::memory_order_relaxed); // signal to consumers and publishers to wakeup - static_cast(m_items.post()); - static_cast(m_spaces.post()); + if (m_items.post() != osal::OsalReturnType::kSuccess) + { + return score::MakeUnexpected(ConcurrencyErrc::kOsError); + } + + if (m_spaces.post() != osal::OsalReturnType::kSuccess) + { + return score::MakeUnexpected(ConcurrencyErrc::kOsError); + } + + return {}; } /// @brief Blocks until an item is available or stop() is called. - /// @detail Consumers claim slots via fetch_add on m_head and sleep - /// inside m_items.wait() when the queue is empty. - /// When stopped returns std::nullopt. - /// @return The next item, or std::nullopt if stop() was called. - [[nodiscard]] std::optional pop() + /// @details Consumers claim slots via fetch_add on m_head and sleep + /// inside m_items.wait() when the queue is empty. + /// When stopped returns std::nullopt. + /// @return The next item, or error. + [[nodiscard]] score::Result pop() { - if (m_items.wait() != osal::OsalReturnType::kSuccess) + auto wait_result = m_items.wait(); + + if(wait_result == osal::OsalReturnType::kTimeout) + { + return score::MakeUnexpected(ConcurrencyErrc::kTimeout); + } + else if (wait_result != osal::OsalReturnType::kSuccess) { - return std::nullopt; + return score::MakeUnexpected(ConcurrencyErrc::kOsError); } if (m_stopped.load(std::memory_order_relaxed)) { static_cast(m_items.post()); - return std::nullopt; + return score::MakeUnexpected(ConcurrencyErrc::kStopped); + } + + T item = consume_slot(m_head.fetch_add(1, std::memory_order_relaxed)); + + if (m_spaces.post() != osal::OsalReturnType::kSuccess) + { + return score::MakeUnexpected(ConcurrencyErrc::kOsError); } - return consume_slot(m_head.fetch_add(1, std::memory_order_relaxed)); + return item; } private: /// @brief Spins until the slot at tail is ready, moves the item out, and /// releases the slot. - T consume_slot(std::size_t tail) + T consume_slot(std::size_t tail) noexcept { auto& slot = m_slots[tail & (Capacity - 1U)]; @@ -171,30 +201,33 @@ class MPMCConcurrentQueue T item = std::move_if_noexcept(slot.item); ANNOTATE_HAPPENS_BEFORE(&slot.turn); + // release store signals that the slot is now free for the next producer. slot.turn.store(expected_turn + 1, std::memory_order_release); - static_cast(m_spaces.post()); return item; } - - template - [[nodiscard]] bool push_impl(U&& item, std::chrono::milliseconds timeout) + + template + [[nodiscard]] score::Result push_impl(U&& item, std::chrono::milliseconds timeout) { - const auto wait_result = (timeout == std::chrono::milliseconds{0}) - ? m_spaces.wait() - : m_spaces.timedWait(timeout); + const auto wait_result = + (timeout == std::chrono::milliseconds{0}) ? m_spaces.wait() : m_spaces.timedWait(timeout); - if (wait_result != osal::OsalReturnType::kSuccess) + if(wait_result == osal::OsalReturnType::kTimeout) { - return false; + return score::MakeUnexpected(ConcurrencyErrc::kTimeout); + } + else if (wait_result != osal::OsalReturnType::kSuccess) + { + return score::MakeUnexpected(ConcurrencyErrc::kOsError); } if (m_stopped.load(std::memory_order_relaxed)) { // chain-wake the next blocked producer then discard the item static_cast(m_spaces.post()); - return false; + return score::MakeUnexpected(ConcurrencyErrc::kStopped); } const auto tail = m_tail.fetch_add(1, std::memory_order_relaxed); @@ -217,26 +250,29 @@ class MPMCConcurrentQueue ANNOTATE_HAPPENS_BEFORE(&slot.turn); slot.turn.store(expected_turn + 1, std::memory_order_release); - static_cast(m_items.post()); - return true; + if (m_items.post() != osal::OsalReturnType::kSuccess) + { + return score::MakeUnexpected(ConcurrencyErrc::kOsError); + } + return {}; } /// @brief Underlying storage. std::array m_slots; /// @brief The front of the queue; claimed by consumers via fetch_add in pop. - /// @detail Aligned so that m_head and m_tail do not share a cache line. + /// @details Aligned so that m_head and m_tail do not share a cache line. alignas(CacheLineSize) std::atomic m_head{0}; /// @brief The back of the queue; claimed by producers via fetch_add in push_impl. - /// @detail Aligned so that m_head and m_tail do not share a cache line. + /// @details Aligned so that m_head and m_tail do not share a cache line. alignas(CacheLineSize) std::atomic m_tail{0}; /// @brief Set to true by stop(); causes push() to return false and pop() to /// return std::nullopt instead of blocking. - /// @detail Aligned on its own cache line so that the single stop() write - /// does not cause false sharing with m_tail updates in push_impl(). + /// @details Aligned on its own cache line so that the single stop() write + /// does not cause false sharing with m_tail updates in push_impl(). alignas(CacheLineSize) std::atomic m_stopped{false}; /// @brief Counts items currently in the queue; consumers block on this when diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp index 8fb7240f..124e9072 100644 --- a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp +++ b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp @@ -14,12 +14,12 @@ #include #include +#include #include #include #include #include #include -#include #include #include #include @@ -95,7 +95,7 @@ TEST_F(MPMCConcurrentQueueTest_Basic, PopReturnsNulloptOnSemaphoreWaitFailure) sa.sa_flags = 0; sigaction(SIGUSR1, &sa, nullptr); - std::optional result; + score::Result result; std::atomic tid_ready{false}; std::thread consumer([&] { @@ -138,7 +138,9 @@ TEST_F(MPMCConcurrentQueueTest_Timeout, PushWithTimeoutSucceedsWhenSlotAvailable TEST_F(MPMCConcurrentQueueTest_Timeout, PushWithTimeoutReturnsFalseWhenFull) { - RecordProperty("Description", "Verify that push with a non-zero timeout returns false when the queue is full and the timeout expires."); + RecordProperty( + "Description", + "Verify that push with a non-zero timeout returns false when the queue is full and the timeout expires."); for (int i = 0; i < 4; ++i) { ASSERT_TRUE(queue4_.push(i)); @@ -155,14 +157,16 @@ class MPMCConcurrentQueueTest_Stop : public ::testing::Test TEST_F(MPMCConcurrentQueueTest_Stop, PushReturnsFalseAfterStop) { RecordProperty("Description", "Verify that push returns false once stop() has been called."); - queue_.stop(); + auto res = queue_.stop(); + ASSERT_TRUE(res.has_value()); EXPECT_FALSE(queue_.push(1)); } TEST_F(MPMCConcurrentQueueTest_Stop, PopReturnsNulloptWhenStoppedAndEmpty) { RecordProperty("Description", "Verify that pop returns nullopt immediately when the queue is stopped and empty."); - queue_.stop(); + auto res = queue_.stop(); + ASSERT_TRUE(res.has_value()); EXPECT_FALSE(queue_.pop().has_value()); } @@ -172,7 +176,8 @@ TEST_F(MPMCConcurrentQueueTest_Stop, ItemsAreDroppedAfterStop) ASSERT_TRUE(queue_.push(1)); ASSERT_TRUE(queue_.push(2)); ASSERT_TRUE(queue_.push(3)); - queue_.stop(); + auto res = queue_.stop(); + ASSERT_TRUE(res.has_value()); EXPECT_FALSE(queue_.pop().has_value()); } @@ -187,7 +192,7 @@ class MPMCConcurrentQueueTest_Blocking : public ::testing::Test TEST_F(MPMCConcurrentQueueTest_Blocking, PopBlocksUntilItemAvailable) { RecordProperty("Description", "Verify that pop blocks on an empty queue until a producer pushes an item."); - std::optional result; + score::Result result = score::MakeUnexpected(ConcurrencyErrc::kOsError); std::thread consumer([&] { result = queue8_.pop(); @@ -210,7 +215,8 @@ TEST_F(MPMCConcurrentQueueTest_Blocking, PushBlocksWhenFull) ASSERT_TRUE(queue4_.push(i)); } - std::atomic pushed{false}; + std::atomic> pushed{}; + pushed.store(score::MakeUnexpected(ConcurrencyErrc::kOsError)); std::thread producer([&] { pushed.store(queue4_.push(99), std::memory_order_release); }); @@ -226,13 +232,14 @@ TEST_F(MPMCConcurrentQueueTest_Blocking, PushBlocksWhenFull) TEST_F(MPMCConcurrentQueueTest_Blocking, StopUnblocksBlockedConsumer) { RecordProperty("Description", "Verify that stop() unblocks a consumer thread waiting on an empty queue."); - std::optional result; + score::Result result; std::thread consumer([&] { result = queue8_.pop(); }); - queue8_.stop(); + auto res = queue8_.stop(); + ASSERT_TRUE(res.has_value()); consumer.join(); EXPECT_FALSE(result.has_value()); @@ -246,12 +253,13 @@ TEST_F(MPMCConcurrentQueueTest_Blocking, StopUnblocksBlockedProducer) ASSERT_TRUE(queue4_.push(i)); } - bool pushed = true; + score::Result pushed{}; std::thread producer([&] { pushed = queue4_.push(99); }); - queue4_.stop(); + auto res = queue4_.stop(); + ASSERT_TRUE(res.has_value()); producer.join(); EXPECT_FALSE(pushed); @@ -316,7 +324,8 @@ TEST_F(MPMCConcurrentQueueTest_MPMC, AllItemsDelivered) { std::this_thread::yield(); } - queue_.stop(); + auto res = queue_.stop(); + ASSERT_TRUE(res.has_value()); for (auto& t : consumers) { From a6cb5ccd3afd99b044136a2a8df4e1b21ff0cad1 Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Wed, 29 Apr 2026 15:59:01 +0100 Subject: [PATCH 07/13] Fixing infinite while errors --- .../src/concurrency/BUILD | 4 - .../src/concurrency/helgrind.supp | 42 ++ .../src/concurrency/mpmc_concurrent_queue.hpp | 6 +- .../src/process_group_manager/graph.cpp | 343 ++++++++++------ .../process_group_manager/processinfonode.cpp | 371 ++++++++++++------ .../process_group_manager/workerthread.cpp | 57 ++- 6 files changed, 553 insertions(+), 270 deletions(-) create mode 100644 src/launch_manager_daemon/src/concurrency/helgrind.supp diff --git a/src/launch_manager_daemon/src/concurrency/BUILD b/src/launch_manager_daemon/src/concurrency/BUILD index 6162fd93..4edeeca7 100644 --- a/src/launch_manager_daemon/src/concurrency/BUILD +++ b/src/launch_manager_daemon/src/concurrency/BUILD @@ -25,10 +25,6 @@ cc_library( hdrs = ["mpmc_concurrent_queue.hpp", "concurrency_error_domain.hpp" ], - - linkopts = [ - "-latomic", - ], strip_include_prefix = "//src/launch_manager_daemon/src", visibility = ["//src:__subpackages__"], deps = [ diff --git a/src/launch_manager_daemon/src/concurrency/helgrind.supp b/src/launch_manager_daemon/src/concurrency/helgrind.supp new file mode 100644 index 00000000..b007296c --- /dev/null +++ b/src/launch_manager_daemon/src/concurrency/helgrind.supp @@ -0,0 +1,42 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + +# Helgrind suppression file + + +# --- mpmc_concurrent_queue_helgrind_test ---- + +# tests intentionally exercise semaphore failure paths and therefore +# trigger Helgrind's PthAPIerror reports +# +# * MPMCConcurrentQueueTest_Basic.PopReturnsNulloptOnSemaphoreWaitFailure +# Sends SIGUSR1 to the consumer so sem_wait() returns -1 / EINTR and +# the kFail path in Semaphore::wait() is taken. +{ + mpmc_queue_sem_wait_eintr_intentional + Helgrind:PthAPIerror + obj:*/vgpreload_helgrind-*.so + fun:_ZN5score3lcm8internal4osal9Semaphore4waitEv + ... +} + +# * MPMCConcurrentQueueTest_Timeout.PushWithTimeoutReturnsFalseWhenFull +# Fills the queue so sem_trywait() inside Semaphore::timedWait() +# returns -1 / EAGAIN until the timeout expires. +{ + mpmc_queue_sem_trywait_eagain_intentional + Helgrind:PthAPIerror + obj:*/vgpreload_helgrind-*.so + fun:_ZN5score3lcm8internal4osal9Semaphore9timedWaitENSt6chrono8durationIlSt5ratioILl1ELl1000EEEE + ... +} diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp index 89597224..204b62f6 100644 --- a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp +++ b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp @@ -150,10 +150,12 @@ class MPMCConcurrentQueue /// @details Consumers claim slots via fetch_add on m_head and sleep /// inside m_items.wait() when the queue is empty. /// When stopped returns std::nullopt. + /// @param timeout Maximum time to wait for an item. Zero means wait forever. /// @return The next item, or error. - [[nodiscard]] score::Result pop() + [[nodiscard]] score::Result pop(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) { - auto wait_result = m_items.wait(); + const auto wait_result = + (timeout == std::chrono::milliseconds{0}) ? m_items.wait() : m_items.timedWait(timeout); if(wait_result == osal::OsalReturnType::kTimeout) { diff --git a/src/launch_manager_daemon/src/process_group_manager/graph.cpp b/src/launch_manager_daemon/src/process_group_manager/graph.cpp index e0ac70de..ba0c4314 100644 --- a/src/launch_manager_daemon/src/process_group_manager/graph.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/graph.cpp @@ -15,16 +15,19 @@ #include -#include +#include #include #include -#include +#include -namespace score { +namespace score +{ -namespace lcm { +namespace lcm +{ -namespace internal { +namespace internal +{ Graph::Graph(uint32_t max_num_nodes, ProcessGroupManager* pgm) : pg_index_(0U), @@ -42,7 +45,8 @@ Graph::Graph(uint32_t max_num_nodes, ProcessGroupManager* pgm) pending_state_(""), event_(ControlClientCode::kNotSet), cancel_message_(), - request_start_time_() { + request_start_time_() +{ LM_LOG_DEBUG() << "Creating graph with" << max_num_nodes << "nodes"; nodes_.reserve(max_num_nodes); last_state_manager_.process_index_ = 0xFFFFU; // an invalid state manager @@ -50,15 +54,16 @@ Graph::Graph(uint32_t max_num_nodes, ProcessGroupManager* pgm) cancel_message_.request_or_response_ = ControlClientCode::kNotSet; } -Graph::~Graph() { +Graph::~Graph() +{ nodes_.clear(); LM_LOG_DEBUG() << "Graph destroyed"; } -void Graph::initProcessGroupNodes( IdentifierHash pg_name, uint32_t num_processes, uint32_t index ) +void Graph::initProcessGroupNodes(IdentifierHash pg_name, uint32_t num_processes, uint32_t index) { - pg_index_ = index; - off_state_ = pgm_->getConfigurationManager()->getNameOfOffState(pg_name); + pg_index_ = index; + off_state_ = pgm_->getConfigurationManager()->getNameOfOffState(pg_name); requested_state_.pg_state_name_ = off_state_; requested_state_.pg_name_ = pg_name; @@ -67,15 +72,18 @@ void Graph::initProcessGroupNodes( IdentifierHash pg_name, uint32_t num_processe createProcessInfoNodes(num_processes); - if (nodes_.size() == num_processes) { + if (nodes_.size() == num_processes) + { createSuccessorLists(pg_name); } } -inline void Graph::createProcessInfoNodes(uint32_t num_processes) { +inline void Graph::createProcessInfoNodes(uint32_t num_processes) +{ nodes_.reserve(num_processes); // Reserve space for efficiency - for (uint32_t process_id = 0U; process_id < num_processes; ++process_id) { + for (uint32_t process_id = 0U; process_id < num_processes; ++process_id) + { LM_LOG_DEBUG() << "Creating process node with id:" << process_id; nodes_.push_back(std::make_shared()); nodes_.back()->initNode(this, process_id); @@ -83,23 +91,25 @@ inline void Graph::createProcessInfoNodes(uint32_t num_processes) { LM_LOG_DEBUG() << "Created" << nodes_.size() << "process nodes"; } -inline void Graph::createSuccessorLists(IdentifierHash pg_name) { +inline void Graph::createSuccessorLists(IdentifierHash pg_name) +{ LM_LOG_DEBUG() << "Creating successor lists for process group" << pg_name; // Now create the successor lists for each process in this process group - for (auto& node : nodes_) { + for (auto& node : nodes_) + { // If the other process has a dependency on this one, put it on the correct list auto node_index = node->getNodeIndex(); const DependencyList* dep_list = pgm_->getConfigurationManager()->getOsProcessDependencies(pg_name, node_index).value_or(nullptr); - if( dep_list ) + if (dep_list) { - for( const Dependency& dep : *dep_list ) + for (const Dependency& dep : *dep_list) { - if( dep.os_process_index_ < nodes_.size() ) + if (dep.os_process_index_ < nodes_.size()) { - nodes_[dep.os_process_index_]->addSuccessorNode( node, dep.process_state_ ); + nodes_[dep.os_process_index_]->addSuccessorNode(node, dep.process_state_); LM_LOG_DEBUG() << "Added successor node dependency:" << dep.os_process_index_ << "->" << node_index; } } @@ -107,31 +117,37 @@ inline void Graph::createSuccessorLists(IdentifierHash pg_name) { } } -void Graph::setState(GraphState new_state) { +void Graph::setState(GraphState new_state) +{ GraphState old_state = getState(); // Notice that this is a private method and by design the states can't be out of range - //if( old_state > GraphState::kUndefinedState || + // if( old_state > GraphState::kUndefinedState || // new_state > GraphState::kUndefinedState ) //{ // LM_LOG_ERROR() << "Incorrect state transition:" << static_cast( old_state ) << "to" // << static_cast( new_state ); //} - //else + // else { score::cpp::span line{state_results[static_cast(new_state)]}; GraphState target_state = new_state; - while (old_state != target_state) { - // coverity[autosar_cpp14_a5_2_5_violation:FALSE] Line is an array of graphstates from state_results. There are no nullptrs inside state_results so a indexing without a check is allowed. - target_state = line.data()[static_cast(old_state)]; // score::cpp::span does not implement operator[] + while (old_state != target_state) + { + // coverity[autosar_cpp14_a5_2_5_violation:FALSE] Line is an array of graphstates from state_results. There + // are no nullptrs inside state_results so a indexing without a check is allowed. + target_state = + line.data()[static_cast(old_state)]; // score::cpp::span does not implement operator[] - if (state_.compare_exchange_strong(old_state, target_state)) { + if (state_.compare_exchange_strong(old_state, target_state)) + { LM_LOG_DEBUG() << "Graph::setState changes from" << toString(old_state) << "to" - << toString(target_state) << "for PG" << pg_index_ << "(" - << requested_state_.pg_name_ << ")"; + << toString(target_state) << "for PG" << pg_index_ << "(" << requested_state_.pg_name_ + << ")"; old_state = target_state; - if (new_state == GraphState::kSuccess) { + if (new_state == GraphState::kSuccess) + { // get state transition end time stamp auto request_end_time = std::chrono::steady_clock::now(); @@ -147,7 +163,8 @@ void Graph::setState(GraphState new_state) { } } -bool Graph::queueHeadNodes(bool start) { +bool Graph::queueHeadNodes(bool start) +{ // Count the number of nodes in this graph starting_ = start; @@ -156,18 +173,22 @@ bool Graph::queueHeadNodes(bool start) { nodes_to_execute_.store(executing_nodes); nodes_in_flight_.store(0); - if (executing_nodes > 0U) { + if (executing_nodes > 0U) + { queueHeadNodesForExecution(); } return (nodes_in_flight_ > 0); } -inline uint32_t Graph::countExecutableNodes(bool start) { +inline uint32_t Graph::countExecutableNodes(bool start) +{ uint32_t executable_nodes = 0U; - for (const auto& node : nodes_) { - if (node->constructGraphNode(start)) { + for (const auto& node : nodes_) + { + if (node->constructGraphNode(start)) + { ++executable_nodes; } } @@ -175,53 +196,74 @@ inline uint32_t Graph::countExecutableNodes(bool start) { return executable_nodes; } -inline void Graph::queueHeadNodesForExecution() { - for (const auto& node : nodes_) { - if (node->isHeadNode()) { +inline void Graph::queueHeadNodesForExecution() +{ + for (const auto& node : nodes_) + { + if (node->isHeadNode()) + { tryQueueNode(node); } } } -inline void Graph::tryQueueNode(const std::shared_ptr& node) { - while (GraphState::kInTransition == getState()) { - if (pgm_->getWorkerJobs()->push(node, kMaxQueueDelay)) { +inline void Graph::tryQueueNode(const std::shared_ptr& node) +{ + while (GraphState::kInTransition == getState()) + { + auto push_res = pgm_->getWorkerJobs()->push(node, kMaxQueueDelay); + if (push_res) + { markNodeInFlight(); break; } - else{ - LM_LOG_WARN() << "Failed to queue node for execution"; + else if (push_res.error() == ConcurrencyErrc::kTimeout) + { + continue; + } + else + { + LM_LOG_WARN() << "Failed to queue node for execution " << push_res.error(); + break; } } } -void Graph::queueStopJobs(const std::vector* process_index_list) { +void Graph::queueStopJobs(const std::vector* process_index_list) +{ // p is not nullptr - guaranteed by caller. // First mark all processes as being not in the requested state - for (auto node : nodes_) { + for (auto node : nodes_) + { node->markRequested(false); } // Then go through the processes in the requested state and mark them true - for (uint32_t index : *process_index_list) { - if (index < nodes_.size()) { + for (uint32_t index : *process_index_list) + { + if (index < nodes_.size()) + { nodes_[index]->markRequested(true); } } - if (!queueHeadNodes(false)) { + if (!queueHeadNodes(false)) + { queueStartJobs(); } } -void Graph::queueStartJobs() { - if (!queueHeadNodes(true)) { +void Graph::queueStartJobs() +{ + if (!queueHeadNodes(true)) + { setState(GraphState::kSuccess); // nothing to do, done nothing, success! setPendingEvent(ControlClientCode::kSetStateSuccess); } } -bool Graph::startTransition(ProcessGroupStateID pg_state) { +bool Graph::startTransition(ProcessGroupStateID pg_state) +{ IdentifierHash old_state_name; { std::lock_guard lock(requested_state_mutex_); @@ -231,12 +273,13 @@ bool Graph::startTransition(ProcessGroupStateID pg_state) { const std::vector* process_index_list = pgm_->getConfigurationManager()->getProcessIndexesList(requested_state_).value_or(nullptr); - if (nullptr != process_index_list) { + if (nullptr != process_index_list) + { setState(GraphState::kInTransition); - if( GraphState::kInTransition == getState() ) + if (GraphState::kInTransition == getState()) { - queueStopJobs( process_index_list ); + queueStopJobs(process_index_list); return true; } } @@ -247,12 +290,14 @@ bool Graph::startTransition(ProcessGroupStateID pg_state) { return false; } -bool Graph::startInitialTransition(ProcessGroupStateID pg_state) { +bool Graph::startInitialTransition(ProcessGroupStateID pg_state) +{ is_initial_state_transition_ = true; setRequestStartTime(); bool result = startTransition(pg_state); - if (!result) { + if (!result) + { is_initial_state_transition_ = false; pgm_->setInitialStateTransitionResult(ControlClientCode::kInitialMachineStateFailed); } @@ -260,7 +305,8 @@ bool Graph::startInitialTransition(ProcessGroupStateID pg_state) { return result; } -bool Graph::startTransitionToOffState() { +bool Graph::startTransitionToOffState() +{ // Guaranteed to transition to all off even if there // is no configured "Off" state. setRequestStartTime(); @@ -273,104 +319,133 @@ bool Graph::startTransitionToOffState() { if (GraphState::kInTransition == getState()) { std::vector empty_list{}; - queueStopJobs( &empty_list ); + queueStopJobs(&empty_list); result = true; } return result; } -void Graph::nodeExecuted() { +void Graph::nodeExecuted() +{ GraphState current_state = getState(); - if (current_state == GraphState::kInTransition) { + if (current_state == GraphState::kInTransition) + { handleTransitionExecution(); - } else { + } + else + { handleNonTransitionExecution(current_state); } } -inline void Graph::handleTransitionExecution() { - if (nodes_to_execute_.load() > 0U) { +inline void Graph::handleTransitionExecution() +{ + if (nodes_to_execute_.load() > 0U) + { --nodes_in_flight_; - if (0U == --nodes_to_execute_) { - if (starting_) { - if (is_initial_state_transition_) { + if (0U == --nodes_to_execute_) + { + if (starting_) + { + if (is_initial_state_transition_) + { is_initial_state_transition_ = false; pgm_->setInitialStateTransitionResult(ControlClientCode::kInitialMachineStateSuccess); - // RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS and does a C-style cast.", true) - LM_LOG_DEBUG() - << "clock() at successful initial state transition:" - // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a weird value in debug messages. - << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) << "ms"; + // RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS and does + // a C-style cast.", true) + LM_LOG_DEBUG() << "clock() at successful initial state transition:" + // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a + // weird value in debug messages. + << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) + << "ms"; } setState(GraphState::kSuccess); setPendingEvent(ControlClientCode::kSetStateSuccess); - } else { + } + else + { queueStartJobs(); } } } } -inline void Graph::handleNonTransitionExecution(GraphState current_state) { - if (0 >= --nodes_in_flight_) { - if (is_initial_state_transition_) { +inline void Graph::handleNonTransitionExecution(GraphState current_state) +{ + if (0 >= --nodes_in_flight_) + { + if (is_initial_state_transition_) + { is_initial_state_transition_ = false; pgm_->setInitialStateTransitionResult(ControlClientCode::kInitialMachineStateFailed); - // RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS and does a C-style cast.", true) - LM_LOG_FATAL() - << "clock() at failed initial state transition:" - // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a weird value in debug messages. - << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) << "ms"; + // RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS and does a + // C-style cast.", true) + LM_LOG_FATAL() << "clock() at failed initial state transition:" + // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a weird + // value in debug messages. + << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) << "ms"; } setState(GraphState::kUndefinedState); - if (current_state == GraphState::kAborting) { + if (current_state == GraphState::kAborting) + { setPendingEvent(abort_code_); - } else { + } + else + { ControlClientChannel::nudgeControlClientHandler(); } } } -void Graph::abort(uint32_t code, ControlClientCode reason) { - if (getState() < GraphState::kAborting) { +void Graph::abort(uint32_t code, ControlClientCode reason) +{ + if (getState() < GraphState::kAborting) + { setState(GraphState::kAborting); last_execution_error_.store(code); abort_code_.store(reason); } } -void Graph::cancel() { +void Graph::cancel() +{ setState(GraphState::kCancelled); - if (getState() == GraphState::kCancelled) { + if (getState() == GraphState::kCancelled) + { setPendingEvent(ControlClientCode::kSetStateCancelled); } - if (0 == nodes_in_flight_) { - if (is_initial_state_transition_) { + if (0 == nodes_in_flight_) + { + if (is_initial_state_transition_) + { is_initial_state_transition_ = false; - pgm_->setInitialStateTransitionResult( ControlClientCode::kInitialMachineStateFailed ); + pgm_->setInitialStateTransitionResult(ControlClientCode::kInitialMachineStateFailed); // Some may argue that not finishing MachineGF.Startup state transition, is a critical problem. - // Essentially, controller SM is requesting MachineGF.Startup transition, on an action list assigned to its initial state. - // RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS and does a C-style cast.", true) - LM_LOG_DEBUG() - << "clock() at canceled initial state transition:" - // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a weird value in debug messages. - << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) << "ms"; + // Essentially, controller SM is requesting MachineGF.Startup transition, on an action list assigned to its + // initial state. RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS + // and does a C-style cast.", true) + LM_LOG_DEBUG() << "clock() at canceled initial state transition:" + // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a weird + // value in debug messages. + << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) << "ms"; } setState(GraphState::kUndefinedState); } } -void Graph::setStateManager(ControlClientID& control_client_id) { +void Graph::setStateManager(ControlClientID& control_client_id) +{ ControlClientCode code = getPendingEvent(); - if (code != ControlClientCode::kNotSet) { + if (code != ControlClientCode::kNotSet) + { cancel_message_.process_group_state_ = requested_state_; cancel_message_.originating_control_client_ = last_state_manager_; cancel_message_.request_or_response_ = code; @@ -379,67 +454,82 @@ void Graph::setStateManager(ControlClientID& control_client_id) { last_state_manager_ = control_client_id; } -std::shared_ptr Graph::getProcessInfoNode(uint32_t process_index) { +std::shared_ptr Graph::getProcessInfoNode(uint32_t process_index) +{ std::shared_ptr node{}; - if (process_index < nodes_.size()) { + if (process_index < nodes_.size()) + { node = nodes_[process_index]; } return node; } -ProcessGroupManager* Graph::getProcessGroupManager() { +ProcessGroupManager* Graph::getProcessGroupManager() +{ return pgm_; } -IdentifierHash Graph::getProcessGroupName() { +IdentifierHash Graph::getProcessGroupName() +{ return requested_state_.pg_name_; } -bool Graph::isStarting() const { +bool Graph::isStarting() const +{ return starting_; } -void Graph::markNodeInFlight() { +void Graph::markNodeInFlight() +{ ++nodes_in_flight_; } -GraphState Graph::getState() const { +GraphState Graph::getState() const +{ return state_.load(); } -IdentifierHash Graph::getProcessGroupState() { +IdentifierHash Graph::getProcessGroupState() +{ std::lock_guard lock(requested_state_mutex_); return requested_state_.pg_state_name_; } -uint32_t Graph::getProcessGroupIndex() { +uint32_t Graph::getProcessGroupIndex() +{ return pg_index_; } -NodeList& Graph::getNodes() { +NodeList& Graph::getNodes() +{ return nodes_; } -ControlClientID Graph::getStateManager() { +ControlClientID Graph::getStateManager() +{ return last_state_manager_; } -uint32_t Graph::getLastExecutionError() { +uint32_t Graph::getLastExecutionError() +{ return last_execution_error_.load(); } -void Graph::setLastExecutionError(uint32_t code) { +void Graph::setLastExecutionError(uint32_t code) +{ last_execution_error_.store(code); } -IdentifierHash Graph::setPendingState(IdentifierHash new_state) { +IdentifierHash Graph::setPendingState(IdentifierHash new_state) +{ IdentifierHash result_state = pending_state_; pending_state_ = new_state; - if (new_state != result_state) { + if (new_state != result_state) + { LM_LOG_DEBUG() << "Pending state for process group" << requested_state_.pg_name_ << "changed from" << result_state << "to" << pending_state_; } @@ -447,29 +537,36 @@ IdentifierHash Graph::setPendingState(IdentifierHash new_state) { return result_state; } -IdentifierHash Graph::getPendingState() { +IdentifierHash Graph::getPendingState() +{ return pending_state_; } -ControlClientCode Graph::getPendingEvent() { +ControlClientCode Graph::getPendingEvent() +{ return event_.load(); } -void Graph::clearPendingEvent(ControlClientCode expected) { +void Graph::clearPendingEvent(ControlClientCode expected) +{ (void)event_.compare_exchange_strong(expected, ControlClientCode::kNotSet); } -void Graph::setPendingEvent(ControlClientCode event) { +void Graph::setPendingEvent(ControlClientCode event) +{ event_.store(event); ControlClientChannel::nudgeControlClientHandler(); } -ControlClientMessage& Graph::getCancelMessage() { +ControlClientMessage& Graph::getCancelMessage() +{ return cancel_message_; } -const char* Graph::toString(GraphState state) { - switch (state) { +const char* Graph::toString(GraphState state) +{ + switch (state) + { case GraphState::kAborting: return "kAborting"; @@ -490,16 +587,18 @@ const char* Graph::toString(GraphState state) { } } -void Graph::setRequestStartTime() { +void Graph::setRequestStartTime() +{ request_start_time_ = std::chrono::steady_clock::now(); } -std::chrono::time_point Graph::getRequestStartTime() { +std::chrono::time_point Graph::getRequestStartTime() +{ return request_start_time_; } -} // namespace lcm - } // namespace internal +} // namespace lcm + } // namespace score diff --git a/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp b/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp index fab5606b..7ffa078d 100644 --- a/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp @@ -11,20 +11,25 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -#include -#include +#include "processinfonode.hpp" #include "graph.hpp" #include "processgroupmanager.hpp" -#include "processinfonode.hpp" +#include +#include -namespace score { +namespace score +{ -namespace lcm { +namespace lcm +{ -namespace internal { +namespace internal +{ -void ProcessInfoNode::initNode(Graph* graph, uint32_t index) { - if (graph) { +void ProcessInfoNode::initNode(Graph* graph, uint32_t index) +{ + if (graph) + { IdentifierHash pg = graph->getProcessGroupName(); graph_ = graph; process_index_ = index; @@ -38,28 +43,36 @@ void ProcessInfoNode::initNode(Graph* graph, uint32_t index) { start_dependencies_ = 0U; auto cfg_mgr = graph_->getProcessGroupManager()->getConfigurationManager(); config_ = cfg_mgr->getOsProcessConfiguration(pg, index).value_or(nullptr); - if (config_) { - if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) { + if (config_) + { + if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) + { graph_->getProcessGroupManager()->setLaunchManagerConfiguration(config_); } dependency_list_ = cfg_mgr->getOsProcessDependencies(pg, process_index_).value_or(nullptr); - if (dependency_list_) { + if (dependency_list_) + { start_dependencies_ = static_cast(dependency_list_->size() & 0xFFFFFFFFUL); LM_LOG_DEBUG() << "Process" << process_index_ << "has" << start_dependencies_ << "start dependencies"; } - } else { + } + else + { LM_LOG_ERROR() << "No configuration for process" << process_index_ << "of process group" << pg; } } } -bool ProcessInfoNode::constructGraphNode(bool starting) { +bool ProcessInfoNode::constructGraphNode(bool starting) +{ bool included = false; - if (!starting) { + if (!starting) + { std::ptrdiff_t count = - std::count_if(dependent_on_running_.begin(), dependent_on_running_.end(), - [](auto& d) -> bool { return d->process_state_ == score::lcm::ProcessState::kRunning; }); + std::count_if(dependent_on_running_.begin(), dependent_on_running_.end(), [](auto& d) -> bool { + return d->process_state_ == score::lcm::ProcessState::kRunning; + }); if (count > 0L) stop_dependencies_ = static_cast(count & 0xFFFFFFFFL); @@ -71,9 +84,12 @@ bool ProcessInfoNode::constructGraphNode(bool starting) { dependencies_ = stop_dependencies_; // A stop node should be inserted for processes not in the idle or terminated state where: // The process is not listed in the requested state - included = - !((getState() == score::lcm::ProcessState::kIdle) || (getState() == score::lcm::ProcessState::kTerminated)) && !in_requested_state_; - } else { + included = !((getState() == score::lcm::ProcessState::kIdle) || + (getState() == score::lcm::ProcessState::kTerminated)) && + !in_requested_state_; + } + else + { LM_LOG_DEBUG() << "Start Dependencies:" << start_dependencies_; dependencies_ = start_dependencies_; // The process should be started (node inserted) if @@ -83,9 +99,12 @@ bool ProcessInfoNode::constructGraphNode(bool starting) { // Go through the predecessors nodes to check if those are already in the ExecutionState // that has been configured as part of the execution dependency - if (dependency_list_) { - for (const auto& dep : *dependency_list_) { - if (dep.process_state_ == graph_->getNodes()[dep.os_process_index_]->getState()) { + if (dependency_list_) + { + for (const auto& dep : *dependency_list_) + { + if (dep.process_state_ == graph_->getNodes()[dep.os_process_index_]->getState()) + { --dependencies_; } } @@ -97,36 +116,50 @@ bool ProcessInfoNode::constructGraphNode(bool starting) { return included; } -void ProcessInfoNode::addSuccessorNode(std::shared_ptr& successor_node, score::lcm::ProcessState dependency) { - if (dependency == score::lcm::ProcessState::kTerminated) { +void ProcessInfoNode::addSuccessorNode(std::shared_ptr& successor_node, + score::lcm::ProcessState dependency) +{ + if (dependency == score::lcm::ProcessState::kTerminated) + { LM_LOG_DEBUG() << "Adding kTerminated for process" << process_index_ << ":" << successor_node->process_index_; dependent_on_terminating_.push_back(successor_node); - } else if (dependency == score::lcm::ProcessState::kRunning) { + } + else if (dependency == score::lcm::ProcessState::kRunning) + { dependent_on_running_.push_back(successor_node); LM_LOG_DEBUG() << "Adding kRunning successor for process" << process_index_ << ":" << successor_node->process_index_; - } else { + } + else + { // all other dependency types are forbidden! LM_LOG_ERROR() << "Invalid dependency type for process" << process_index_ << ":" << static_cast(dependency); } } -bool ProcessInfoNode::setState(score::lcm::ProcessState new_state) { +bool ProcessInfoNode::setState(score::lcm::ProcessState new_state) +{ bool success = true; score::lcm::ProcessState old_state = getState(); if (score::lcm::ProcessState::kTerminated == new_state || - (new_state == score::lcm::ProcessState::kIdle && old_state == score::lcm::ProcessState::kTerminated)) { + (new_state == score::lcm::ProcessState::kIdle && old_state == score::lcm::ProcessState::kTerminated)) + { process_state_.store(new_state); - } else if (new_state >= old_state) { + } + else if (new_state >= old_state) + { success = process_state_.compare_exchange_strong(old_state, new_state); - } else { + } + else + { success = false; } if (success && config_->startup_config_.comms_type_ != osal::CommsType::kNoComms && - score::lcm::ProcessState::kIdle != new_state) { + score::lcm::ProcessState::kIdle != new_state) + { // for a reporting process, report a process state change to PHM score::lcm::PosixProcess process_info; process_info.id = config_->process_id_; @@ -138,21 +171,24 @@ bool ProcessInfoNode::setState(score::lcm::ProcessState new_state) { // b) &p.systemClockTimeStamp points outside the accessible address space, but it does not static_cast(clock_gettime(CLOCK_MONOTONIC, &process_info.systemClockTimestamp)); // Note that we ignore the return value from QueuePosixProcess. - // An error would indicate that PHM is not reading values fast enough from the shared memory; the buffer over-run - // should be visible at the PHM side and handled there. If PHM is not responding do we need to handle this? - // If PHM terminates state manager will be informed in any case. + // An error would indicate that PHM is not reading values fast enough from the shared memory; the buffer + // over-run should be visible at the PHM side and handled there. If PHM is not responding do we need to handle + // this? If PHM terminates state manager will be informed in any case. static_cast(graph_->getProcessGroupManager()->queuePosixProcess(process_info)); } return success; } -void ProcessInfoNode::queueTerminationSuccessorJobs() { +void ProcessInfoNode::queueTerminationSuccessorJobs() +{ auto processJob = [&](std::shared_ptr successor_node) { - if (successor_node->is_included_ && successor_node->dependencies_ > 0U && - --successor_node->dependencies_ == 0U) { - while (graph_->getState() == GraphState::kInTransition) { - if(graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay)){ + if (successor_node->is_included_ && successor_node->dependencies_ > 0U && --successor_node->dependencies_ == 0U) + { + while (graph_->getState() == GraphState::kInTransition) + { + if (graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay)) + { graph_->markNodeInFlight(); break; } @@ -160,91 +196,114 @@ void ProcessInfoNode::queueTerminationSuccessorJobs() { } }; - if (graph_->isStarting()) { - for (auto& successor_node : dependent_on_terminating_) { + if (graph_->isStarting()) + { + for (auto& successor_node : dependent_on_terminating_) + { processJob(successor_node); } - } else if (dependency_list_) // Successors given by our dependencies + } + else if (dependency_list_) // Successors given by our dependencies { - for (const auto& dependency : *dependency_list_) { + for (const auto& dependency : *dependency_list_) + { auto successorNode = graph_->getProcessInfoNode(dependency.os_process_index_); - if (successorNode->getState() != score::lcm::ProcessState::kTerminated) { + if (successorNode->getState() != score::lcm::ProcessState::kTerminated) + { processJob(successorNode); } } } } -void ProcessInfoNode::unexpectedTermination() { +void ProcessInfoNode::unexpectedTermination() +{ LM_LOG_WARN() << "unexpected termination of process" << process_index_ << "pid" << pid_ << "(" << config_->startup_config_.short_name_ << ")"; uint32_t execution_error_code = config_->pgm_config_.execution_error_code_; auto graph_state = graph_->getState(); - if (GraphState::kSuccess == graph_state) { + if (GraphState::kSuccess == graph_state) + { // We were in a defined state, this error needs to be reported to SM graph_->abort(execution_error_code, ControlClientCode::kFailedUnexpectedTermination); - } else if (score::lcm::ProcessState::kStarting == getState()) { + } + else if (score::lcm::ProcessState::kStarting == getState()) + { // for graph in any other state, the error will be found elsewhere. But if the graph is in // transition, and the process status is not yet kRunning, we may want to post on the semaphore // to save a little waiting time. auto sync = sync_; // take a copy as the pointer otherwise may become invalidated - if (sync) { + if (sync) + { // note that we ignore the return code. The semaphore operation may fail because it could // be destroyed by another thread static_cast(sync->send_sync_.post()); } - } else if (GraphState::kInTransition == graph_state) { + } + else if (GraphState::kInTransition == graph_state) + { // process has started, but graph is still in transition graph_->abort(execution_error_code, ControlClientCode::kFailedUnexpectedTerminationOnEnter); } } -void ProcessInfoNode::terminated(int32_t process_status) { - LM_LOG_DEBUG() << "Child process" << process_index_ << "of" << graph_->getProcessGroupName() << "pid" - << pid_ << "(" << config_->startup_config_.short_name_ << ") for node" << this - << "terminated with status" << process_status; +void ProcessInfoNode::terminated(int32_t process_status) +{ + LM_LOG_DEBUG() << "Child process" << process_index_ << "of" << graph_->getProcessGroupName() << "pid" << pid_ << "(" + << config_->startup_config_.short_name_ << ") for node" << this << "terminated with status" + << process_status; status_ = process_status; - if (!config_->pgm_config_.is_self_terminating_ || (process_status != 0)) { + if (!config_->pgm_config_.is_self_terminating_ || (process_status != 0)) + { // fudge the status if the process is not self-terminating but has still exited // with zero status: - if (0 == status_) { + if (0 == status_) + { status_ = -1; } - if (graph_->isStarting()) { + if (graph_->isStarting()) + { unexpectedTermination(); } } static_cast(setState(score::lcm::ProcessState::kTerminated)); // Cannot fail by design - if (control_client_channel_) { + if (control_client_channel_) + { control_client_channel_->releaseParentMapping(); control_client_channel_.reset(); } // Handle the situation where the graph is stalled waiting for a process to terminate - if (config_->pgm_config_.is_self_terminating_ && dependent_on_terminating_.size()) { + if (config_->pgm_config_.is_self_terminating_ && dependent_on_terminating_.size()) + { queueTerminationSuccessorJobs(); } // handle the situation where a worker thread is waiting for a process to terminate - if (has_semaphore_.exchange(false)) { + if (has_semaphore_.exchange(false)) + { terminator_.post(); } } -void ProcessInfoNode::startProcess() { +void ProcessInfoNode::startProcess() +{ LM_LOG_DEBUG() << "Starting process" << process_index_ << "(" << config_->startup_config_.short_name_ << ") from executable" << config_->startup_config_.executable_path_; restart_counter_ = config_->pgm_config_.number_of_restart_attempts; - do { + do + { status_ = 0; - if (setState(score::lcm::ProcessState::kIdle)) { + if (setState(score::lcm::ProcessState::kIdle)) + { uint32_t execution_error_code = config_->pgm_config_.execution_error_code_; auto pg_mgr = graph_->getProcessGroupManager(); pid_ = 0; status_ = 0; static_cast(setState(score::lcm::ProcessState::kStarting)); // Cannot fail by design - if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) { + if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) + { // Don't start launch manager, we're already running LM_LOG_DEBUG() << "Found myself (" << config_->startup_config_.argv_[0U] << ") in a process group to start, not starting, reporting kRunning"; @@ -255,15 +314,19 @@ void ProcessInfoNode::startProcess() { } if (osal::OsalReturnType::kSuccess == - pg_mgr->getProcessInterface()->startProcess(&pid_, &sync_, &config_->startup_config_)) { + pg_mgr->getProcessInterface()->startProcess(&pid_, &sync_, &config_->startup_config_)) + { LM_LOG_DEBUG() << "startProcess pid" << pid_ << "received for process:" << config_->startup_config_.short_name_; - if (osal::CommsType::kControlClient == config_->startup_config_.comms_type_) { + if (osal::CommsType::kControlClient == config_->startup_config_.comms_type_) + { setupControlClientChannel(); } handleProcessStarted(execution_error_code); - } else { + } + else + { setState(score::lcm::ProcessState::kTerminated); graph_->abort(execution_error_code, ControlClientCode::kSetStateFailed); } @@ -274,61 +337,76 @@ void ProcessInfoNode::startProcess() { << config_->startup_config_.short_name_ << ") done"; } -inline void ProcessInfoNode::setupControlClientChannel() { +inline void ProcessInfoNode::setupControlClientChannel() +{ // Make sure we store the control_client_channel before waiting for kRunning - std::atomic_store(&control_client_channel_,ControlClientChannel::getControlClientChannel(sync_)); + std::atomic_store(&control_client_channel_, ControlClientChannel::getControlClientChannel(sync_)); - if (control_client_channel_) { // Put it at the front of the list if it's not there already + if (control_client_channel_) + { // Put it at the front of the list if it's not there already auto node0 = graph_->getNodes()[0U]; - if (this != node0.get()) { + if (this != node0.get()) + { std::atomic_store(&next_state_manager_, node0->next_state_manager_); std::atomic_store(&node0->next_state_manager_, graph_->getNodes()[process_index_]); } } } -void ProcessInfoNode::handleProcessStillStarting(uint32_t execution_error_code) { - if (graph_->getState() == GraphState::kInTransition) { +void ProcessInfoNode::handleProcessStillStarting(uint32_t execution_error_code) +{ + if (graph_->getState() == GraphState::kInTransition) + { if (((osal::CommsType::kNoComms == config_->startup_config_.comms_type_) || (graph_->getProcessGroupManager()->getProcessInterface()->waitForkRunning( sync_, config_->pgm_config_.startup_timeout_ms_) == osal::OsalReturnType::kSuccess)) && - (0 == status_)) { + (0 == status_)) + { handleProcessRunning(execution_error_code); - } else // process is reporting, result is kFail or status is not zero (indicating the process has exited badly) + } + else // process is reporting, result is kFail or status is not zero (indicating the process has exited badly) { LM_LOG_WARN() << "Got kRunning timeout for process" << process_index_ << "(" << config_->startup_config_.short_name_ << ")"; ControlClientCode errcode = status_ ? ControlClientCode::kFailedUnexpectedTerminationOnEnter : ControlClientCode::kSetStateFailed; terminateProcess(); - if (0U == restart_counter_) { + if (0U == restart_counter_) + { graph_->abort(execution_error_code, errcode); } } } } -void ProcessInfoNode::handleProcessAlreadyTerminated(uint32_t execution_error_code) { - if ((0 != status_) || (osal::CommsType::kNoComms != config_->startup_config_.comms_type_)) { +void ProcessInfoNode::handleProcessAlreadyTerminated(uint32_t execution_error_code) +{ + if ((0 != status_) || (osal::CommsType::kNoComms != config_->startup_config_.comms_type_)) + { // Error. To get a legal terminated before kRunning the process must be self-terminating, non-reporting // and to have exited with zero status LM_LOG_WARN() << "Got process termination before kRunning for pid" << pid_ << "(" << config_->startup_config_.short_name_ << ") process" << process_index_ << "of group" << graph_->getProcessGroupName(); - //This will cause the graph to fail, we must report to SM (unless we have restart attempts left) - if (0U == restart_counter_) { + // This will cause the graph to fail, we must report to SM (unless we have restart attempts left) + if (0U == restart_counter_) + { graph_->abort(execution_error_code, ControlClientCode::kFailedUnexpectedTerminationOnEnter); } - } else { + } + else + { // case of a self-terminating, non-reporting process exiting nicely before we've had a chance to put an // entry in the map queueTerminationSuccessorJobs(); } } -void ProcessInfoNode::handleProcessStarted(uint32_t execution_error_code) { - switch (graph_->getProcessGroupManager()->getProcessMap()->insertIfNotTerminated(pid_, this)) { +void ProcessInfoNode::handleProcessStarted(uint32_t execution_error_code) +{ + switch (graph_->getProcessGroupManager()->getProcessMap()->insertIfNotTerminated(pid_, this)) + { case 0: // Normal case, entry was put in the map, process still running handleProcessStillStarting(execution_error_code); break; @@ -345,12 +423,16 @@ void ProcessInfoNode::handleProcessStarted(uint32_t execution_error_code) { } } -void ProcessInfoNode::handleProcessRunning(uint32_t execution_error_code) { - if (osal::CommsType::kNoComms == config_->startup_config_.comms_type_) { +void ProcessInfoNode::handleProcessRunning(uint32_t execution_error_code) +{ + if (osal::CommsType::kNoComms == config_->startup_config_.comms_type_) + { LM_LOG_DEBUG() << "Considered kRunning for Non Reporting Process pid" << pid_ << "(" << config_->startup_config_.short_name_ << ") process" << process_index_ << "of group" << graph_->getProcessGroupName(); - } else { + } + else + { LM_LOG_DEBUG() << "Got kRunning for pid" << pid_ << "(" << config_->startup_config_.short_name_ << ") process" << process_index_ << "of group" << graph_->getProcessGroupName(); } @@ -359,9 +441,12 @@ void ProcessInfoNode::handleProcessRunning(uint32_t execution_error_code) { // Therefore, a process in the terminated state is a new error not related to process // starting (and so not eligible for a restart), or it's OK because its a self- // terminating process. - if (setState(score::lcm::ProcessState::kRunning) || (config_->pgm_config_.is_self_terminating_ && (0 == status_))) { + if (setState(score::lcm::ProcessState::kRunning) || (config_->pgm_config_.is_self_terminating_ && (0 == status_))) + { processSuccessorNodes(); - } else if (restart_counter_ == 0U) { + } + else if (restart_counter_ == 0U) + { graph_->abort(execution_error_code, ControlClientCode::kSetStateFailed); } @@ -369,60 +454,81 @@ void ProcessInfoNode::handleProcessRunning(uint32_t execution_error_code) { // OSHandler thread, when terminated() is called } -void ProcessInfoNode::processSuccessorNodes() { - for (auto& successor_node : dependent_on_running_) { - if (successor_node->is_included_ && successor_node->dependencies_ > 0U) { +void ProcessInfoNode::processSuccessorNodes() +{ + for (auto& successor_node : dependent_on_running_) + { + if (successor_node->is_included_ && successor_node->dependencies_ > 0U) + { checkForEmptyDependencies(successor_node); } } } -void ProcessInfoNode::checkForEmptyDependencies(std::shared_ptr& successor_node) { - if (0U == --successor_node->dependencies_) { - while (graph_->getState() == GraphState::kInTransition) { - if(graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay)){ +void ProcessInfoNode::checkForEmptyDependencies(std::shared_ptr& successor_node) +{ + if (0U == --successor_node->dependencies_) + { + while (graph_->getState() == GraphState::kInTransition) + { + auto push_res = graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay); + if (push_res) + { graph_->markNodeInFlight(); break; } + else + { + break; + } } } } -void ProcessInfoNode::terminateProcess() { +void ProcessInfoNode::terminateProcess() +{ LM_LOG_DEBUG() << "terminating process" << process_index_ << "(" << config_->startup_config_.short_name_ << ")"; - if (setState(score::lcm::ProcessState::kTerminating)) { - if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) { + if (setState(score::lcm::ProcessState::kTerminating)) + { + if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) + { LM_LOG_DEBUG() << "Found myself (" << config_->startup_config_.argv_[0U] << ") in a process group to terminate, not terminating, reporting kTerminated"; static_cast(setState(score::lcm::ProcessState::kTerminated)); // Cannot fail by design - } else { + } + else + { handleTerminationProcess(); } } - if (!graph_->isStarting() || (0 == status_)) { + if (!graph_->isStarting() || (0 == status_)) + { queueTerminationSuccessorJobs(); } - LM_LOG_DEBUG() << "terminateProcess for" << graph_->getProcessGroupName() << "process" << process_index_ - << "(" << config_->startup_config_.short_name_ << ") done"; + LM_LOG_DEBUG() << "terminateProcess for" << graph_->getProcessGroupName() << "process" << process_index_ << "(" + << config_->startup_config_.short_name_ << ") done"; } -inline void ProcessInfoNode::handleTerminationProcess() { +inline void ProcessInfoNode::handleTerminationProcess() +{ auto pg_mgr = graph_->getProcessGroupManager(); terminator_.init(0U, false); has_semaphore_.store(true); - LM_LOG_DEBUG() << "Requesting termination of process" << process_index_ << "of" - << graph_->getProcessGroupName() << "pid" << pid_ << "(" - << config_->startup_config_.short_name_ << ")"; + LM_LOG_DEBUG() << "Requesting termination of process" << process_index_ << "of" << graph_->getProcessGroupName() + << "pid" << pid_ << "(" << config_->startup_config_.short_name_ << ")"; - //handle request termination + // handle request termination if ((pg_mgr->getProcessInterface()->requestTermination(pid_) == osal::OsalReturnType::kFail) || - (terminator_.timedWait(config_->pgm_config_.termination_timeout_ms_) == osal::OsalReturnType::kSuccess)) { + (terminator_.timedWait(config_->pgm_config_.termination_timeout_ms_) == osal::OsalReturnType::kSuccess)) + { LM_LOG_DEBUG() << "Queuing jobs after regular termination of process wait" << process_index_ << "(" << config_->startup_config_.short_name_ << ")"; - } else { - //handle forced termination + } + else + { + // handle forced termination handleForcedTermination(); } @@ -430,65 +536,80 @@ inline void ProcessInfoNode::handleTerminationProcess() { terminator_.deinit(); } -inline void ProcessInfoNode::handleForcedTermination() { +inline void ProcessInfoNode::handleForcedTermination() +{ LM_LOG_WARN() << "Process" << process_index_ << "(" << config_->startup_config_.short_name_ << ") did not respond to SIGTERM, sending SIGKILL"; while ((osal::OsalReturnType::kSuccess == graph_->getProcessGroupManager()->getProcessInterface()->forceTermination(pid_)) && (graph_->getState() == GraphState::kInTransition) && - (terminator_.timedWait(score::lcm::internal::kMaxSigKillDelay) != osal::OsalReturnType::kSuccess)) { + (terminator_.timedWait(score::lcm::internal::kMaxSigKillDelay) != osal::OsalReturnType::kSuccess)) + { LM_LOG_FATAL() << "Process" << process_index_ << "(" << config_->startup_config_.short_name_ << ") did not respond to SIGKILL!!"; } } -void ProcessInfoNode::doWork() { - if (graph_->getState() == GraphState::kInTransition) { - if (graph_->isStarting()) { +void ProcessInfoNode::doWork() +{ + if (graph_->getState() == GraphState::kInTransition) + { + if (graph_->isStarting()) + { startProcess(); - } else { + } + else + { terminateProcess(); } } graph_->nodeExecuted(); } -std::shared_ptr ProcessInfoNode::getNextStateManager() { +std::shared_ptr ProcessInfoNode::getNextStateManager() +{ // Remove dead state managers from the list on the fly - while (std::atomic_load(&next_state_manager_) && !std::atomic_load(&next_state_manager_)->control_client_channel_) { + while (std::atomic_load(&next_state_manager_) && !std::atomic_load(&next_state_manager_)->control_client_channel_) + { std::atomic_store(&next_state_manager_, next_state_manager_->next_state_manager_); } return std::atomic_load(&next_state_manager_); } -osal::ProcessID ProcessInfoNode::getPid() const { +osal::ProcessID ProcessInfoNode::getPid() const +{ return pid_; } -score::lcm::ProcessState ProcessInfoNode::getState() const { +score::lcm::ProcessState ProcessInfoNode::getState() const +{ return process_state_.load(); } -void ProcessInfoNode::markRequested(bool requested) { +void ProcessInfoNode::markRequested(bool requested) +{ in_requested_state_ = requested; } -bool ProcessInfoNode::isHeadNode() const { +bool ProcessInfoNode::isHeadNode() const +{ return is_head_node_; } -uint32_t ProcessInfoNode::getNodeIndex() const { +uint32_t ProcessInfoNode::getNodeIndex() const +{ return process_index_; } -ControlClientChannelP ProcessInfoNode::getControlClientChannel() { +ControlClientChannelP ProcessInfoNode::getControlClientChannel() +{ return std::atomic_load(&control_client_channel_); } -} // namespace lcm - } // namespace internal +} // namespace lcm + } // namespace score diff --git a/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp b/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp index 760cc667..ea435ee6 100644 --- a/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp @@ -11,57 +11,80 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -#include "processinfonode.hpp" #include "workerthread.hpp" +#include "processinfonode.hpp" -namespace score { +namespace score +{ -namespace lcm { +namespace lcm +{ -namespace internal { +namespace internal +{ template -WorkerThread::WorkerThread(std::shared_ptr queue, uint32_t num_threads) - : the_job_queue_(queue) { +WorkerThread::WorkerThread(std::shared_ptr queue, uint32_t num_threads) : the_job_queue_(queue) +{ worker_threads_.reserve(num_threads); - for (uint32_t i = 0U; i < num_threads; ++i) { + for (uint32_t i = 0U; i < num_threads; ++i) + { static_cast(i); worker_threads_.emplace_back(std::make_unique(&WorkerThread::run, this)); } } template -WorkerThread::~WorkerThread() { +WorkerThread::~WorkerThread() +{ stop(); - for (auto& thread : worker_threads_) { - if (thread->joinable()) { + for (auto& thread : worker_threads_) + { + if (thread->joinable()) + { thread->join(); } } } template -void WorkerThread::stop() { - the_job_queue_->stop(); +void WorkerThread::stop() +{ + static_cast(the_job_queue_->stop()); } template -void WorkerThread::run() { - while (auto job = the_job_queue_->pop()) { - if(*job) +void WorkerThread::run() +{ + while (auto job = the_job_queue_->pop(kMaxQueueDelay)) + { + if (job) { (*job)->doWork(); } + else if (job.error() == ConcurrencyErrc::kStopped) + { + break; + } + else if(job.error() == ConcurrencyErrc::kTimeout) + { + continue; + } + else + { + LM_LOG_ERROR() << "Got an error getting a job: " << job.error(); + continue; + } } } // Explicit instantiation for ProcessInfoNode template class WorkerThread; -} // namespace lcm - } // namespace internal +} // namespace lcm + } // namespace score From 55d687c8bac9ff9cade7320560ddfd670a359919 Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Wed, 29 Apr 2026 16:06:18 +0100 Subject: [PATCH 08/13] Statically linking libatomic --- src/launch_manager_daemon/src/concurrency/BUILD | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/launch_manager_daemon/src/concurrency/BUILD b/src/launch_manager_daemon/src/concurrency/BUILD index 4edeeca7..66c71f06 100644 --- a/src/launch_manager_daemon/src/concurrency/BUILD +++ b/src/launch_manager_daemon/src/concurrency/BUILD @@ -22,8 +22,12 @@ cc_library( cc_library( name = "mpmc_concurrent_queue", - hdrs = ["mpmc_concurrent_queue.hpp", - "concurrency_error_domain.hpp" + hdrs = [ + "concurrency_error_domain.hpp", + "mpmc_concurrent_queue.hpp", + ], + linkopts = [ + "-l:libatomic.a", ], strip_include_prefix = "//src/launch_manager_daemon/src", visibility = ["//src:__subpackages__"], From c3bcb7178846fe0bd50aaed9294137c69c28c298 Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Thu, 30 Apr 2026 08:35:13 +0100 Subject: [PATCH 09/13] Adding some more coverage --- .../src/concurrency/BUILD | 10 +++++ .../concurrency/concurrency_error_domain.hpp | 8 +++- .../concurrency_error_domain_test.cpp | 42 +++++++++++++++++++ .../process_group_manager/workerthread.cpp | 6 +-- 4 files changed, 59 insertions(+), 7 deletions(-) create mode 100644 src/launch_manager_daemon/src/concurrency/concurrency_error_domain_test.cpp diff --git a/src/launch_manager_daemon/src/concurrency/BUILD b/src/launch_manager_daemon/src/concurrency/BUILD index 66c71f06..5dd9da3c 100644 --- a/src/launch_manager_daemon/src/concurrency/BUILD +++ b/src/launch_manager_daemon/src/concurrency/BUILD @@ -38,6 +38,16 @@ cc_library( ], ) +cc_test( + name = "concurrency_error_domain_test", + srcs = ["concurrency_error_domain_test.cpp"], + visibility = ["//tests:__subpackages__"], + deps = [ + ":mpmc_concurrent_queue", + "@googletest//:gtest_main", + ], +) + cc_test( name = "mpmc_concurrent_queue_test", srcs = ["mpmc_concurrent_queue_test.cpp"], diff --git a/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp b/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp index b5d8dfab..37d4c2ad 100644 --- a/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp +++ b/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp @@ -29,9 +29,10 @@ enum class ConcurrencyErrc : score::result::ErrorCode // @brief The container has overflowed. kOverflow = 2, - // @brief The container has overflowed. + // @brief The container has stopped. kStopped = 3, + // @brief A timeout was triggered. kTimeout = 4, }; @@ -42,7 +43,7 @@ class ConcurrencyErrorDomain final : public score::result::ErrorDomain switch (static_cast(code)) { case ConcurrencyErrc::kOsError: - return "Failed to initialize semaphore"; + return "An OS call returned an error"; case ConcurrencyErrc::kOverflow: return "The container has overflowed"; @@ -50,6 +51,9 @@ class ConcurrencyErrorDomain final : public score::result::ErrorDomain case ConcurrencyErrc::kStopped: return "The container has been stopped"; + case ConcurrencyErrc::kTimeout: + return "A timer was triggered"; + default: return "Unknown concurrency error"; } diff --git a/src/launch_manager_daemon/src/concurrency/concurrency_error_domain_test.cpp b/src/launch_manager_daemon/src/concurrency/concurrency_error_domain_test.cpp new file mode 100644 index 00000000..d2fd1b79 --- /dev/null +++ b/src/launch_manager_daemon/src/concurrency/concurrency_error_domain_test.cpp @@ -0,0 +1,42 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include + +#include + +using namespace score::lcm::internal; + +class ConcurrencyErrorDomainMessageForTest : public ::testing::TestWithParam +{ +}; + +TEST_P(ConcurrencyErrorDomainMessageForTest, CoversAllBranches) +{ + const score::result::ErrorDomain& domain = g_ConcurrencyErrorDomain; + static_cast(domain.MessageFor(GetParam())); +} + +INSTANTIATE_TEST_SUITE_P(AllCodes, + ConcurrencyErrorDomainMessageForTest, + ::testing::Values(static_cast(ConcurrencyErrc::kOsError), + static_cast(ConcurrencyErrc::kOverflow), + static_cast(ConcurrencyErrc::kStopped), + static_cast(ConcurrencyErrc::kTimeout), + 99)); // default branch + +TEST(ConcurrencyErrorDomainTest, MakeError) +{ + static_cast(MakeError(ConcurrencyErrc::kOsError)); + static_cast(MakeError(ConcurrencyErrc::kOsError, "detail")); +} diff --git a/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp b/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp index ea435ee6..fcbfd9d2 100644 --- a/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp @@ -58,7 +58,7 @@ void WorkerThread::stop() template void WorkerThread::run() { - while (auto job = the_job_queue_->pop(kMaxQueueDelay)) + while (auto job = the_job_queue_->pop()) { if (job) { @@ -68,10 +68,6 @@ void WorkerThread::run() { break; } - else if(job.error() == ConcurrencyErrc::kTimeout) - { - continue; - } else { LM_LOG_ERROR() << "Got an error getting a job: " << job.error(); From b5b78c6c7426b81d41eb7abc229084f182af0877 Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Thu, 30 Apr 2026 10:07:09 +0100 Subject: [PATCH 10/13] Fixing review comments --- .../src/concurrency/mpmc_concurrent_queue.hpp | 12 +++++++----- .../process_group_manager/processinfonode.cpp | 4 ++++ .../src/process_group_manager/workerthread.cpp | 18 ++++++++---------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp index 204b62f6..0f8f01d1 100644 --- a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp +++ b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp @@ -53,6 +53,9 @@ class MPMCConcurrentQueue static_assert(std::is_nothrow_move_constructible_v, "T must be nothrow-move-constructible to wrap into std::optional in pop()"); + static_assert(std::is_move_assignable_v || std::is_copy_assignable_v, + "T must be move-assignable or copy-assignable to be stored via push()"); + // optimization to work out the turns static_assert((Capacity & (Capacity - 1U)) == 0U, "Capacity must be a power of 2"); @@ -119,8 +122,7 @@ class MPMCConcurrentQueue /// The turn counter ensures a slot cannot be written until the /// previous consumer has finished reading it. /// @param timeout Maximum time to wait for a free slot. Zero means wait forever. - /// @return true if the item was pushed, false if stop() was called or the - /// timeout expired before a slot became available (item is not enqueued). + /// @return Success if item was pushed, Error otherwise. [[nodiscard]] score::Result push(const T& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) { @@ -130,7 +132,7 @@ class MPMCConcurrentQueue /// @brief Signals all blocked pop() callers to return with a stopped error. [[nodiscard]] score::Result stop() noexcept { - m_stopped.store(true, std::memory_order_relaxed); + m_stopped.store(true, std::memory_order_release); // signal to consumers and publishers to wakeup if (m_items.post() != osal::OsalReturnType::kSuccess) @@ -166,7 +168,7 @@ class MPMCConcurrentQueue return score::MakeUnexpected(ConcurrencyErrc::kOsError); } - if (m_stopped.load(std::memory_order_relaxed)) + if (m_stopped.load(std::memory_order_acquire)) { static_cast(m_items.post()); return score::MakeUnexpected(ConcurrencyErrc::kStopped); @@ -225,7 +227,7 @@ class MPMCConcurrentQueue return score::MakeUnexpected(ConcurrencyErrc::kOsError); } - if (m_stopped.load(std::memory_order_relaxed)) + if (m_stopped.load(std::memory_order_acquire)) { // chain-wake the next blocked producer then discard the item static_cast(m_spaces.post()); diff --git a/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp b/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp index 7ffa078d..ffd9628b 100644 --- a/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp @@ -477,6 +477,10 @@ void ProcessInfoNode::checkForEmptyDependencies(std::shared_ptr graph_->markNodeInFlight(); break; } + else if (push_res.error() == ConcurrencyErrc::kTimeout) + { + continue; + } else { break; diff --git a/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp b/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp index fcbfd9d2..5937647c 100644 --- a/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp @@ -58,21 +58,19 @@ void WorkerThread::stop() template void WorkerThread::run() { - while (auto job = the_job_queue_->pop()) + while (true) { - if (job) - { - (*job)->doWork(); - } - else if (job.error() == ConcurrencyErrc::kStopped) - { - break; - } - else + auto job = the_job_queue_->pop(); + if (!job) { + if (job.error() == ConcurrencyErrc::kStopped) + { + break; + } LM_LOG_ERROR() << "Got an error getting a job: " << job.error(); continue; } + (*job)->doWork(); } } From c2ee6d4efb44e1b1a67f694471b93f532dde99e4 Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Mon, 4 May 2026 12:02:26 +0100 Subject: [PATCH 11/13] Making queue return slimmer --- .../src/concurrency/BUILD | 12 +----- .../concurrency/concurrency_error_domain.hpp | 40 +++++------------- .../concurrency_error_domain_test.cpp | 42 ------------------- .../src/concurrency/mpmc_concurrent_queue.hpp | 40 +++++++++--------- .../mpmc_concurrent_queue_test.cpp | 19 +++++---- 5 files changed, 42 insertions(+), 111 deletions(-) delete mode 100644 src/launch_manager_daemon/src/concurrency/concurrency_error_domain_test.cpp diff --git a/src/launch_manager_daemon/src/concurrency/BUILD b/src/launch_manager_daemon/src/concurrency/BUILD index 5dd9da3c..ea7299d6 100644 --- a/src/launch_manager_daemon/src/concurrency/BUILD +++ b/src/launch_manager_daemon/src/concurrency/BUILD @@ -34,17 +34,7 @@ cc_library( deps = [ ":helgrind_annotations", "//src/launch_manager_daemon/common:osal", - "@score_baselibs//score/result", - ], -) - -cc_test( - name = "concurrency_error_domain_test", - srcs = ["concurrency_error_domain_test.cpp"], - visibility = ["//tests:__subpackages__"], - deps = [ - ":mpmc_concurrent_queue", - "@googletest//:gtest_main", + "@score_baselibs//score/language/futurecpp", ], ) diff --git a/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp b/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp index 37d4c2ad..b9e0b49f 100644 --- a/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp +++ b/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp @@ -14,14 +14,13 @@ #ifndef CONCURRENCY_ERROR_DOMAIN_HPP_INCLUDED #define CONCURRENCY_ERROR_DOMAIN_HPP_INCLUDED -#include "score/result/result.h" - -#include +#include +#include namespace score::lcm::internal { -enum class ConcurrencyErrc : score::result::ErrorCode +enum class ConcurrencyErrc : std::uint8_t { /// @brief An OS call returned an error. kOsError = 1, @@ -36,35 +35,16 @@ enum class ConcurrencyErrc : score::result::ErrorCode kTimeout = 4, }; -class ConcurrencyErrorDomain final : public score::result::ErrorDomain +inline std::ostream& operator<<(std::ostream& os, ConcurrencyErrc errc) noexcept { - [[nodiscard]] std::string_view MessageFor(const score::result::ErrorCode& code) const noexcept override + switch (errc) { - switch (static_cast(code)) - { - case ConcurrencyErrc::kOsError: - return "An OS call returned an error"; - - case ConcurrencyErrc::kOverflow: - return "The container has overflowed"; - - case ConcurrencyErrc::kStopped: - return "The container has been stopped"; - - case ConcurrencyErrc::kTimeout: - return "A timer was triggered"; - - default: - return "Unknown concurrency error"; - } + case ConcurrencyErrc::kOsError: return os << "kOsError"; + case ConcurrencyErrc::kOverflow: return os << "kOverflow"; + case ConcurrencyErrc::kStopped: return os << "kStopped"; + case ConcurrencyErrc::kTimeout: return os << "kTimeout"; + default: return os << static_cast(errc); } -}; - -constexpr ConcurrencyErrorDomain g_ConcurrencyErrorDomain{}; - -constexpr score::result::Error MakeError(ConcurrencyErrc code, const std::string_view user_message = "") noexcept -{ - return score::result::Error{static_cast(code), g_ConcurrencyErrorDomain, user_message}; } } // namespace score::lcm::internal diff --git a/src/launch_manager_daemon/src/concurrency/concurrency_error_domain_test.cpp b/src/launch_manager_daemon/src/concurrency/concurrency_error_domain_test.cpp deleted file mode 100644 index d2fd1b79..00000000 --- a/src/launch_manager_daemon/src/concurrency/concurrency_error_domain_test.cpp +++ /dev/null @@ -1,42 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2026 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - -#include - -#include - -using namespace score::lcm::internal; - -class ConcurrencyErrorDomainMessageForTest : public ::testing::TestWithParam -{ -}; - -TEST_P(ConcurrencyErrorDomainMessageForTest, CoversAllBranches) -{ - const score::result::ErrorDomain& domain = g_ConcurrencyErrorDomain; - static_cast(domain.MessageFor(GetParam())); -} - -INSTANTIATE_TEST_SUITE_P(AllCodes, - ConcurrencyErrorDomainMessageForTest, - ::testing::Values(static_cast(ConcurrencyErrc::kOsError), - static_cast(ConcurrencyErrc::kOverflow), - static_cast(ConcurrencyErrc::kStopped), - static_cast(ConcurrencyErrc::kTimeout), - 99)); // default branch - -TEST(ConcurrencyErrorDomainTest, MakeError) -{ - static_cast(MakeError(ConcurrencyErrc::kOsError)); - static_cast(MakeError(ConcurrencyErrc::kOsError, "detail")); -} diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp index 0f8f01d1..ddb9a103 100644 --- a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp +++ b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp @@ -24,7 +24,7 @@ #include #include "concurrency_error_domain.hpp" -#include "score/result/result.h" +#include #include #include #include @@ -111,7 +111,8 @@ class MPMCConcurrentQueue /// @return Success if item was pushed, Error otherwise. /// Note: If the push returns false, the object is still valid for /// the user. - [[nodiscard]] score::Result push(T&& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + [[nodiscard]] score::cpp::expected_blank push( + T&& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) { return push_impl(std::move(item), timeout); } @@ -123,29 +124,29 @@ class MPMCConcurrentQueue /// previous consumer has finished reading it. /// @param timeout Maximum time to wait for a free slot. Zero means wait forever. /// @return Success if item was pushed, Error otherwise. - [[nodiscard]] score::Result push(const T& item, - std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + [[nodiscard]] score::cpp::expected_blank push( + const T& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) { return push_impl(item, timeout); } /// @brief Signals all blocked pop() callers to return with a stopped error. - [[nodiscard]] score::Result stop() noexcept + [[nodiscard]] score::cpp::expected_blank stop() noexcept { m_stopped.store(true, std::memory_order_release); // signal to consumers and publishers to wakeup if (m_items.post() != osal::OsalReturnType::kSuccess) { - return score::MakeUnexpected(ConcurrencyErrc::kOsError); + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); } if (m_spaces.post() != osal::OsalReturnType::kSuccess) { - return score::MakeUnexpected(ConcurrencyErrc::kOsError); + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); } - return {}; + return score::cpp::blank{}; } /// @brief Blocks until an item is available or stop() is called. @@ -154,31 +155,32 @@ class MPMCConcurrentQueue /// When stopped returns std::nullopt. /// @param timeout Maximum time to wait for an item. Zero means wait forever. /// @return The next item, or error. - [[nodiscard]] score::Result pop(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + [[nodiscard]] score::cpp::expected pop( + std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) { const auto wait_result = (timeout == std::chrono::milliseconds{0}) ? m_items.wait() : m_items.timedWait(timeout); if(wait_result == osal::OsalReturnType::kTimeout) { - return score::MakeUnexpected(ConcurrencyErrc::kTimeout); + return score::cpp::make_unexpected(ConcurrencyErrc::kTimeout); } else if (wait_result != osal::OsalReturnType::kSuccess) { - return score::MakeUnexpected(ConcurrencyErrc::kOsError); + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); } if (m_stopped.load(std::memory_order_acquire)) { static_cast(m_items.post()); - return score::MakeUnexpected(ConcurrencyErrc::kStopped); + return score::cpp::make_unexpected(ConcurrencyErrc::kStopped); } T item = consume_slot(m_head.fetch_add(1, std::memory_order_relaxed)); if (m_spaces.post() != osal::OsalReturnType::kSuccess) { - return score::MakeUnexpected(ConcurrencyErrc::kOsError); + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); } return item; @@ -213,25 +215,25 @@ class MPMCConcurrentQueue } template - [[nodiscard]] score::Result push_impl(U&& item, std::chrono::milliseconds timeout) + [[nodiscard]] score::cpp::expected_blank push_impl(U&& item, std::chrono::milliseconds timeout) { const auto wait_result = (timeout == std::chrono::milliseconds{0}) ? m_spaces.wait() : m_spaces.timedWait(timeout); if(wait_result == osal::OsalReturnType::kTimeout) { - return score::MakeUnexpected(ConcurrencyErrc::kTimeout); + return score::cpp::make_unexpected(ConcurrencyErrc::kTimeout); } else if (wait_result != osal::OsalReturnType::kSuccess) { - return score::MakeUnexpected(ConcurrencyErrc::kOsError); + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); } if (m_stopped.load(std::memory_order_acquire)) { // chain-wake the next blocked producer then discard the item static_cast(m_spaces.post()); - return score::MakeUnexpected(ConcurrencyErrc::kStopped); + return score::cpp::make_unexpected(ConcurrencyErrc::kStopped); } const auto tail = m_tail.fetch_add(1, std::memory_order_relaxed); @@ -256,10 +258,10 @@ class MPMCConcurrentQueue if (m_items.post() != osal::OsalReturnType::kSuccess) { - return score::MakeUnexpected(ConcurrencyErrc::kOsError); + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); } - return {}; + return score::cpp::blank{}; } /// @brief Underlying storage. diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp index 124e9072..361a1137 100644 --- a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp +++ b/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp @@ -95,7 +95,7 @@ TEST_F(MPMCConcurrentQueueTest_Basic, PopReturnsNulloptOnSemaphoreWaitFailure) sa.sa_flags = 0; sigaction(SIGUSR1, &sa, nullptr); - score::Result result; + score::cpp::expected result = score::cpp::make_unexpected(ConcurrencyErrc::kOsError); std::atomic tid_ready{false}; std::thread consumer([&] { @@ -192,7 +192,7 @@ class MPMCConcurrentQueueTest_Blocking : public ::testing::Test TEST_F(MPMCConcurrentQueueTest_Blocking, PopBlocksUntilItemAvailable) { RecordProperty("Description", "Verify that pop blocks on an empty queue until a producer pushes an item."); - score::Result result = score::MakeUnexpected(ConcurrencyErrc::kOsError); + score::cpp::expected result = score::cpp::make_unexpected(ConcurrencyErrc::kOsError); std::thread consumer([&] { result = queue8_.pop(); @@ -215,24 +215,25 @@ TEST_F(MPMCConcurrentQueueTest_Blocking, PushBlocksWhenFull) ASSERT_TRUE(queue4_.push(i)); } - std::atomic> pushed{}; - pushed.store(score::MakeUnexpected(ConcurrencyErrc::kOsError)); + std::atomic push_completed{false}; + score::cpp::expected_blank pushed = score::cpp::make_unexpected(ConcurrencyErrc::kOsError); std::thread producer([&] { - pushed.store(queue4_.push(99), std::memory_order_release); + pushed = queue4_.push(99); + push_completed.store(true, std::memory_order_release); }); - EXPECT_FALSE(pushed.load(std::memory_order_acquire)); + EXPECT_FALSE(push_completed.load(std::memory_order_acquire)); std::ignore = queue4_.pop(); producer.join(); - EXPECT_TRUE(pushed.load(std::memory_order_acquire)); + EXPECT_TRUE(pushed.has_value()); } TEST_F(MPMCConcurrentQueueTest_Blocking, StopUnblocksBlockedConsumer) { RecordProperty("Description", "Verify that stop() unblocks a consumer thread waiting on an empty queue."); - score::Result result; + score::cpp::expected result = score::cpp::make_unexpected(ConcurrencyErrc::kOsError); std::thread consumer([&] { result = queue8_.pop(); @@ -253,7 +254,7 @@ TEST_F(MPMCConcurrentQueueTest_Blocking, StopUnblocksBlockedProducer) ASSERT_TRUE(queue4_.push(i)); } - score::Result pushed{}; + score::cpp::expected_blank pushed = score::cpp::make_unexpected(ConcurrencyErrc::kOsError); std::thread producer([&] { pushed = queue4_.push(99); }); From c3f8907a2d62ce6dc486b9e08ff2545d4f50b559 Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Mon, 4 May 2026 12:07:25 +0100 Subject: [PATCH 12/13] Moving folder --- src/launch_manager_daemon/BUILD | 4 ++-- src/launch_manager_daemon/{src => common}/concurrency/BUILD | 6 +++--- .../concurrency/concurrency_error_domain.hpp | 0 .../{src => common}/concurrency/helgrind.supp | 0 .../{src => common}/concurrency/helgrind_annotations.hpp | 0 .../{src => common}/concurrency/mpmc_concurrent_queue.hpp | 0 .../concurrency/mpmc_concurrent_queue_test.cpp | 0 7 files changed, 5 insertions(+), 5 deletions(-) rename src/launch_manager_daemon/{src => common}/concurrency/BUILD (88%) rename src/launch_manager_daemon/{src => common}/concurrency/concurrency_error_domain.hpp (100%) rename src/launch_manager_daemon/{src => common}/concurrency/helgrind.supp (100%) rename src/launch_manager_daemon/{src => common}/concurrency/helgrind_annotations.hpp (100%) rename src/launch_manager_daemon/{src => common}/concurrency/mpmc_concurrent_queue.hpp (100%) rename src/launch_manager_daemon/{src => common}/concurrency/mpmc_concurrent_queue_test.cpp (100%) diff --git a/src/launch_manager_daemon/BUILD b/src/launch_manager_daemon/BUILD index 9d460fad..0327f2b3 100644 --- a/src/launch_manager_daemon/BUILD +++ b/src/launch_manager_daemon/BUILD @@ -49,9 +49,9 @@ cc_binary_with_common_opts( "//src/launch_manager_daemon/common:identifier_hash", "//src/launch_manager_daemon/common:lifecycle_error", "//src/launch_manager_daemon/common:log", + "//src/launch_manager_daemon/common/concurrency:mpmc_concurrent_queue", "//src/launch_manager_daemon/health_monitor_lib:health_monitor", "//src/launch_manager_daemon/process_state_client_lib:process_state_client", - "//src/launch_manager_daemon/src/concurrency:mpmc_concurrent_queue", "@flatbuffers", "@score_baselibs//score/mw/log", "@score_baselibs//score/result", @@ -84,7 +84,7 @@ cc_library( "//src/launch_manager_daemon/common:lifecycle_error", "//src/launch_manager_daemon/common:log", "//src/launch_manager_daemon/health_monitor_lib:health_monitor", - "//src/launch_manager_daemon/src/concurrency:mpmc_concurrent_queue", + "//src/launch_manager_daemon/common/concurrency:mpmc_concurrent_queue", "@flatbuffers", "@score_baselibs//score/mw/log", "@score_baselibs//score/result", diff --git a/src/launch_manager_daemon/src/concurrency/BUILD b/src/launch_manager_daemon/common/concurrency/BUILD similarity index 88% rename from src/launch_manager_daemon/src/concurrency/BUILD rename to src/launch_manager_daemon/common/concurrency/BUILD index ea7299d6..d9b6d540 100644 --- a/src/launch_manager_daemon/src/concurrency/BUILD +++ b/src/launch_manager_daemon/common/concurrency/BUILD @@ -16,7 +16,7 @@ load("//config:common_cc.bzl", "cc_library_with_common_opts") cc_library( name = "helgrind_annotations", hdrs = ["helgrind_annotations.hpp"], - strip_include_prefix = "//src/launch_manager_daemon/src", + strip_include_prefix = "//src/launch_manager_daemon/common", visibility = ["//src:__subpackages__"], ) @@ -29,7 +29,7 @@ cc_library( linkopts = [ "-l:libatomic.a", ], - strip_include_prefix = "//src/launch_manager_daemon/src", + strip_include_prefix = "//src/launch_manager_daemon/common", visibility = ["//src:__subpackages__"], deps = [ ":helgrind_annotations", @@ -49,7 +49,7 @@ cc_test( ) # Run with: -# bazel test --run_under="valgrind --tool=helgrind --suppressions=src/launch_manager_daemon/src/concurrency/helgrind.supp --error-exitcode=1" //src/launch_manager_daemon/src/concurrency:mpmc_concurrent_queue_helgrind_test --config=host --test_output=all +# bazel test --run_under="valgrind --tool=helgrind --suppressions=src/launch_manager_daemon/common/concurrency/helgrind.supp --error-exitcode=1" //src/launch_manager_daemon/common/concurrency:mpmc_concurrent_queue_helgrind_test --config=host --test_output=all # Note: This is using your host packages so you need to install valgrind. # Two tests intentionally exercise semaphore failure paths (EINTR / EAGAIN). # Those expected PthAPIerror reports are silenced via helgrind.supp. diff --git a/src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp b/src/launch_manager_daemon/common/concurrency/concurrency_error_domain.hpp similarity index 100% rename from src/launch_manager_daemon/src/concurrency/concurrency_error_domain.hpp rename to src/launch_manager_daemon/common/concurrency/concurrency_error_domain.hpp diff --git a/src/launch_manager_daemon/src/concurrency/helgrind.supp b/src/launch_manager_daemon/common/concurrency/helgrind.supp similarity index 100% rename from src/launch_manager_daemon/src/concurrency/helgrind.supp rename to src/launch_manager_daemon/common/concurrency/helgrind.supp diff --git a/src/launch_manager_daemon/src/concurrency/helgrind_annotations.hpp b/src/launch_manager_daemon/common/concurrency/helgrind_annotations.hpp similarity index 100% rename from src/launch_manager_daemon/src/concurrency/helgrind_annotations.hpp rename to src/launch_manager_daemon/common/concurrency/helgrind_annotations.hpp diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp b/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue.hpp similarity index 100% rename from src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue.hpp rename to src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue.hpp diff --git a/src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp b/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue_test.cpp similarity index 100% rename from src/launch_manager_daemon/src/concurrency/mpmc_concurrent_queue_test.cpp rename to src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue_test.cpp From 3958efec14ccc501361100bd5b6287c439b33d79 Mon Sep 17 00:00:00 2001 From: Maciej Kaszynski Date: Mon, 4 May 2026 14:39:31 +0100 Subject: [PATCH 13/13] Adding yields --- .../common/concurrency/mpmc_concurrent_queue.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue.hpp b/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue.hpp index ddb9a103..1a95de69 100644 --- a/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue.hpp +++ b/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "concurrency_error_domain.hpp" #include @@ -198,6 +199,7 @@ class MPMCConcurrentQueue while (slot.turn.load(std::memory_order_acquire) != expected_turn) { + std::this_thread::yield(); // small spin, only fires if a prior producer claimed this slot but // was preempted before completing its write and a full turn // happened, should rarely happen @@ -245,6 +247,7 @@ class MPMCConcurrentQueue while (slot.turn.load(std::memory_order_acquire) != expected_turn) { + std::this_thread::yield(); // small spin, only fires if a prior producer claimed this slot but // was preempted before completing its write and a full turn // happened, should rarely happen