diff --git a/.bazelrc b/.bazelrc index 10361b9b..9c36d24f 100644 --- a/.bazelrc +++ b/.bazelrc @@ -29,6 +29,7 @@ test --cxxopt=-Wno-deprecated-declarations coverage --features=coverage coverage --combined_report=lcov coverage --cache_test_results=no +coverage --test_tag_filters=-no-coverage # Common Lifecycle Toolchain flags for build (do not use it in case of system toolchains!) build:toolchain_common --incompatible_strict_action_env diff --git a/src/launch_manager_daemon/BUILD b/src/launch_manager_daemon/BUILD index b6b2e6f4..0327f2b3 100644 --- a/src/launch_manager_daemon/BUILD +++ b/src/launch_manager_daemon/BUILD @@ -49,6 +49,7 @@ cc_binary_with_common_opts( "//src/launch_manager_daemon/common:identifier_hash", "//src/launch_manager_daemon/common:lifecycle_error", "//src/launch_manager_daemon/common:log", + "//src/launch_manager_daemon/common/concurrency:mpmc_concurrent_queue", "//src/launch_manager_daemon/health_monitor_lib:health_monitor", "//src/launch_manager_daemon/process_state_client_lib:process_state_client", "@flatbuffers", @@ -83,6 +84,7 @@ cc_library( "//src/launch_manager_daemon/common:lifecycle_error", "//src/launch_manager_daemon/common:log", "//src/launch_manager_daemon/health_monitor_lib:health_monitor", + "//src/launch_manager_daemon/common/concurrency:mpmc_concurrent_queue", "@flatbuffers", "@score_baselibs//score/mw/log", "@score_baselibs//score/result", diff --git a/src/launch_manager_daemon/common/BUILD b/src/launch_manager_daemon/common/BUILD index d65e43a9..3310be47 100644 --- a/src/launch_manager_daemon/common/BUILD +++ b/src/launch_manager_daemon/common/BUILD @@ -57,21 +57,15 @@ cc_library( cc_library( name = "osal", - srcs = select({ + srcs = glob(["src/internal/osal/posix/*.cpp"]) + select({ "@platforms//os:qnx": glob( [ - "src/internal/osal/**/*.cpp", - ], - exclude = [ - "src/internal/osal/linux/**", + "src/internal/osal/qnx/*.cpp", ], ), "@platforms//os:linux": glob( [ - "src/internal/osal/**/*.cpp", - ], - exclude = [ - "src/internal/osal/qnx/**", + "src/internal/osal/linux/*.cpp", ], ), }), diff --git a/src/launch_manager_daemon/common/concurrency/BUILD b/src/launch_manager_daemon/common/concurrency/BUILD new file mode 100644 index 00000000..d9b6d540 --- /dev/null +++ b/src/launch_manager_daemon/common/concurrency/BUILD @@ -0,0 +1,89 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +load("@rules_cc//cc:defs.bzl", "cc_test") +load("//config:common_cc.bzl", "cc_library_with_common_opts") + +cc_library( + name = "helgrind_annotations", + hdrs = ["helgrind_annotations.hpp"], + strip_include_prefix = "//src/launch_manager_daemon/common", + visibility = ["//src:__subpackages__"], +) + +cc_library( + name = "mpmc_concurrent_queue", + hdrs = [ + "concurrency_error_domain.hpp", + "mpmc_concurrent_queue.hpp", + ], + linkopts = [ + "-l:libatomic.a", + ], + strip_include_prefix = "//src/launch_manager_daemon/common", + visibility = ["//src:__subpackages__"], + deps = [ + ":helgrind_annotations", + "//src/launch_manager_daemon/common:osal", + "@score_baselibs//score/language/futurecpp", + ], +) + +cc_test( + name = "mpmc_concurrent_queue_test", + srcs = ["mpmc_concurrent_queue_test.cpp"], + visibility = ["//tests:__subpackages__"], + deps = [ + ":mpmc_concurrent_queue", + "@googletest//:gtest_main", + ], +) + +# Run with: +# bazel test --run_under="valgrind --tool=helgrind --suppressions=src/launch_manager_daemon/common/concurrency/helgrind.supp --error-exitcode=1" //src/launch_manager_daemon/common/concurrency:mpmc_concurrent_queue_helgrind_test --config=host --test_output=all +# Note: This is using your host packages so you need to install valgrind. +# Two tests intentionally exercise semaphore failure paths (EINTR / EAGAIN). +# Those expected PthAPIerror reports are silenced via helgrind.supp. +cc_test( + name = "mpmc_concurrent_queue_helgrind_test", + srcs = ["mpmc_concurrent_queue_test.cpp"], + data = ["helgrind.supp"], + tags = [ + "helgrind", + "manual", + ], + visibility = ["//tests:__subpackages__"], + deps = [ + ":mpmc_concurrent_queue", + "@googletest//:gtest_main", + ], +) + +cc_test( + name = "mpmc_concurrent_queue_tsan_test", + srcs = ["mpmc_concurrent_queue_test.cpp"], + copts = [ + "-fsanitize=thread", + "-O0", + "-g", + ], + linkopts = ["-fsanitize=thread"], + tags = [ + "no-coverage", # coverage + tsan might cause problems + "tsan", + ], + visibility = ["//tests:__subpackages__"], + deps = [ + ":mpmc_concurrent_queue", + "@googletest//:gtest_main", + ], +) diff --git a/src/launch_manager_daemon/common/concurrency/concurrency_error_domain.hpp b/src/launch_manager_daemon/common/concurrency/concurrency_error_domain.hpp new file mode 100644 index 00000000..b9e0b49f --- /dev/null +++ b/src/launch_manager_daemon/common/concurrency/concurrency_error_domain.hpp @@ -0,0 +1,52 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef CONCURRENCY_ERROR_DOMAIN_HPP_INCLUDED +#define CONCURRENCY_ERROR_DOMAIN_HPP_INCLUDED + +#include +#include + +namespace score::lcm::internal +{ + +enum class ConcurrencyErrc : std::uint8_t +{ + /// @brief An OS call returned an error. + kOsError = 1, + + // @brief The container has overflowed. + kOverflow = 2, + + // @brief The container has stopped. + kStopped = 3, + + // @brief A timeout was triggered. + kTimeout = 4, +}; + +inline std::ostream& operator<<(std::ostream& os, ConcurrencyErrc errc) noexcept +{ + switch (errc) + { + case ConcurrencyErrc::kOsError: return os << "kOsError"; + case ConcurrencyErrc::kOverflow: return os << "kOverflow"; + case ConcurrencyErrc::kStopped: return os << "kStopped"; + case ConcurrencyErrc::kTimeout: return os << "kTimeout"; + default: return os << static_cast(errc); + } +} + +} // namespace score::lcm::internal + +#endif // CONCURRENCY_ERROR_DOMAIN_HPP_INCLUDED diff --git a/src/launch_manager_daemon/common/concurrency/helgrind.supp b/src/launch_manager_daemon/common/concurrency/helgrind.supp new file mode 100644 index 00000000..b007296c --- /dev/null +++ b/src/launch_manager_daemon/common/concurrency/helgrind.supp @@ -0,0 +1,42 @@ +# ******************************************************************************* +# Copyright (c) 2025 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + +# Helgrind suppression file + + +# --- mpmc_concurrent_queue_helgrind_test ---- + +# tests intentionally exercise semaphore failure paths and therefore +# trigger Helgrind's PthAPIerror reports +# +# * MPMCConcurrentQueueTest_Basic.PopReturnsNulloptOnSemaphoreWaitFailure +# Sends SIGUSR1 to the consumer so sem_wait() returns -1 / EINTR and +# the kFail path in Semaphore::wait() is taken. +{ + mpmc_queue_sem_wait_eintr_intentional + Helgrind:PthAPIerror + obj:*/vgpreload_helgrind-*.so + fun:_ZN5score3lcm8internal4osal9Semaphore4waitEv + ... +} + +# * MPMCConcurrentQueueTest_Timeout.PushWithTimeoutReturnsFalseWhenFull +# Fills the queue so sem_trywait() inside Semaphore::timedWait() +# returns -1 / EAGAIN until the timeout expires. +{ + mpmc_queue_sem_trywait_eagain_intentional + Helgrind:PthAPIerror + obj:*/vgpreload_helgrind-*.so + fun:_ZN5score3lcm8internal4osal9Semaphore9timedWaitENSt6chrono8durationIlSt5ratioILl1ELl1000EEEE + ... +} diff --git a/src/launch_manager_daemon/common/concurrency/helgrind_annotations.hpp b/src/launch_manager_daemon/common/concurrency/helgrind_annotations.hpp new file mode 100644 index 00000000..4bc0c537 --- /dev/null +++ b/src/launch_manager_daemon/common/concurrency/helgrind_annotations.hpp @@ -0,0 +1,29 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef HELGRIND_ANNOTATIONS_HPP_INCLUDED +#define HELGRIND_ANNOTATIONS_HPP_INCLUDED + +// When built with --config=host the system compiler finds valgrind/helgrind.h +// in its default include paths and the real annotations are active. Under any +// other config the header is absent and the macros expand to no-ops. +#if __has_include() +#include +#else +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define ANNOTATE_HAPPENS_BEFORE(obj) do {} while (false) +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define ANNOTATE_HAPPENS_AFTER(obj) do {} while (false) +#endif + +#endif // HELGRIND_ANNOTATIONS_HPP_INCLUDED diff --git a/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue.hpp b/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue.hpp new file mode 100644 index 00000000..1a95de69 --- /dev/null +++ b/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue.hpp @@ -0,0 +1,298 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef MPMC_CONCURRENT_QUEUE_HPP_INCLUDED +#define MPMC_CONCURRENT_QUEUE_HPP_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "concurrency_error_domain.hpp" +#include +#include +#include +#include + +namespace score::lcm::internal +{ +// this is based on https://github.com/rigtorp/MPMCQueue + +/// @brief Lock-free MPMC ring buffer with semaphore-based blocking. +/// Producers and consumers each atomically claim independent slots via +/// fetch_add, so multiple producers and multiple consumers run concurrently. +/// @warning T must be default-constructible. +template +class MPMCConcurrentQueue +{ + static_assert(Capacity > 0U, "Capacity must be at least 1"); + + static_assert(Capacity <= std::numeric_limits::max(), + "Capacity exceeds uint32_t range used by the semaphore"); + + static_assert(std::is_default_constructible_v, "T must be default-constructible for in-place slot storage"); + + static_assert(std::is_nothrow_destructible_v, + "T must be nothrow-destructible to allow consume_slot to be noexcept"); + + static_assert(std::is_nothrow_move_constructible_v, + "T must be nothrow-move-constructible to wrap into std::optional in pop()"); + + static_assert(std::is_move_assignable_v || std::is_copy_assignable_v, + "T must be move-assignable or copy-assignable to be stored via push()"); + + // optimization to work out the turns + static_assert((Capacity & (Capacity - 1U)) == 0U, "Capacity must be a power of 2"); + + /// @brief Cache line size used to prevent false sharing between m_tail, + /// m_head, and m_stopped. + /// Defined as a literal rather than std::hardware_destructive_interference_size to avoid + /// -Winterference-size: that value is not ABI-stable across compiler versions and tuning flags. + /// 64 bytes is the cache line size on all targeted x86_64 and aarch64 platforms. + constexpr static std::size_t CacheLineSize = 64U; + + /// @brief A single ring-buffer entry pairing a sequenced turn counter with + /// its stored item. + /// + /// @detail The slots are not aligned to cachelines as in our usecase we + /// will mostly store pointers here, so we don't want to increase + /// the memory usage that much. + struct Slot + { + /// @brief Increasing counter that alternates ownership + /// between producers (even) and consumers (odd). + std::atomic turn{0}; + + /// @brief The stored item. + T item{}; + }; + + public: + MPMCConcurrentQueue() + { + // ignore sem_init error, the subsequent sem_* will fail + static_cast(m_items.init(0U, false)); + static_cast(m_spaces.init(static_cast(Capacity), false)); + } + + ~MPMCConcurrentQueue() noexcept + { + // not much we can do + static_cast(m_spaces.deinit()); + static_cast(m_items.deinit()); + } + + MPMCConcurrentQueue(const MPMCConcurrentQueue&) = delete; + MPMCConcurrentQueue& operator=(const MPMCConcurrentQueue&) = delete; + MPMCConcurrentQueue(MPMCConcurrentQueue&&) = delete; + MPMCConcurrentQueue& operator=(MPMCConcurrentQueue&&) = delete; + + /// @brief Blocks until a slot is free, then writes the item into the queue. + /// @details Producers claim slots via fetch_add on m_tail, and sleep inside + /// m_spaces.wait() when all slots are occupied. + /// The turn counter ensures a slot cannot be written until the + /// previous consumer has finished reading it. + /// @param timeout Maximum time to wait for a free slot. Zero means wait forever. + /// @return Success if item was pushed, Error otherwise. + /// Note: If the push returns false, the object is still valid for + /// the user. + [[nodiscard]] score::cpp::expected_blank push( + T&& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + { + return push_impl(std::move(item), timeout); + } + + /// @brief Blocks until a slot is free, then writes the item into the queue. + /// @details Producers claim slots via fetch_add on m_tail, and sleep inside + /// m_spaces.wait() when all slots are occupied. + /// The turn counter ensures a slot cannot be written until the + /// previous consumer has finished reading it. + /// @param timeout Maximum time to wait for a free slot. Zero means wait forever. + /// @return Success if item was pushed, Error otherwise. + [[nodiscard]] score::cpp::expected_blank push( + const T& item, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + { + return push_impl(item, timeout); + } + + /// @brief Signals all blocked pop() callers to return with a stopped error. + [[nodiscard]] score::cpp::expected_blank stop() noexcept + { + m_stopped.store(true, std::memory_order_release); + + // signal to consumers and publishers to wakeup + if (m_items.post() != osal::OsalReturnType::kSuccess) + { + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + } + + if (m_spaces.post() != osal::OsalReturnType::kSuccess) + { + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + } + + return score::cpp::blank{}; + } + + /// @brief Blocks until an item is available or stop() is called. + /// @details Consumers claim slots via fetch_add on m_head and sleep + /// inside m_items.wait() when the queue is empty. + /// When stopped returns std::nullopt. + /// @param timeout Maximum time to wait for an item. Zero means wait forever. + /// @return The next item, or error. + [[nodiscard]] score::cpp::expected pop( + std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) + { + const auto wait_result = + (timeout == std::chrono::milliseconds{0}) ? m_items.wait() : m_items.timedWait(timeout); + + if(wait_result == osal::OsalReturnType::kTimeout) + { + return score::cpp::make_unexpected(ConcurrencyErrc::kTimeout); + } + else if (wait_result != osal::OsalReturnType::kSuccess) + { + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + } + + if (m_stopped.load(std::memory_order_acquire)) + { + static_cast(m_items.post()); + return score::cpp::make_unexpected(ConcurrencyErrc::kStopped); + } + + T item = consume_slot(m_head.fetch_add(1, std::memory_order_relaxed)); + + if (m_spaces.post() != osal::OsalReturnType::kSuccess) + { + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + } + + return item; + } + + private: + /// @brief Spins until the slot at tail is ready, moves the item out, and + /// releases the slot. + T consume_slot(std::size_t tail) noexcept + { + auto& slot = m_slots[tail & (Capacity - 1U)]; + + // odd turns belong to consumer, +1 relative to the producer's even turn + const auto expected_turn = ((tail / Capacity) * 2) + 1; + + while (slot.turn.load(std::memory_order_acquire) != expected_turn) + { + std::this_thread::yield(); + // small spin, only fires if a prior producer claimed this slot but + // was preempted before completing its write and a full turn + // happened, should rarely happen + } + ANNOTATE_HAPPENS_AFTER(&slot.turn); + + T item = std::move_if_noexcept(slot.item); + + ANNOTATE_HAPPENS_BEFORE(&slot.turn); + + // release store signals that the slot is now free for the next producer. + slot.turn.store(expected_turn + 1, std::memory_order_release); + + return item; + } + + template + [[nodiscard]] score::cpp::expected_blank push_impl(U&& item, std::chrono::milliseconds timeout) + { + const auto wait_result = + (timeout == std::chrono::milliseconds{0}) ? m_spaces.wait() : m_spaces.timedWait(timeout); + + if(wait_result == osal::OsalReturnType::kTimeout) + { + return score::cpp::make_unexpected(ConcurrencyErrc::kTimeout); + } + else if (wait_result != osal::OsalReturnType::kSuccess) + { + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + } + + if (m_stopped.load(std::memory_order_acquire)) + { + // chain-wake the next blocked producer then discard the item + static_cast(m_spaces.post()); + return score::cpp::make_unexpected(ConcurrencyErrc::kStopped); + } + + const auto tail = m_tail.fetch_add(1, std::memory_order_relaxed); + auto& slot = m_slots[tail & (Capacity - 1U)]; + + // even turns belong to producers, odd turns to consumers + // each lap of the ring increments the expected turn by 2 + const auto expected_turn = (tail / Capacity) * 2; + + while (slot.turn.load(std::memory_order_acquire) != expected_turn) + { + std::this_thread::yield(); + // small spin, only fires if a prior producer claimed this slot but + // was preempted before completing its write and a full turn + // happened, should rarely happen + } + ANNOTATE_HAPPENS_AFTER(&slot.turn); + + slot.item = std::forward(item); + + ANNOTATE_HAPPENS_BEFORE(&slot.turn); + slot.turn.store(expected_turn + 1, std::memory_order_release); + + if (m_items.post() != osal::OsalReturnType::kSuccess) + { + return score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + } + + return score::cpp::blank{}; + } + + /// @brief Underlying storage. + std::array m_slots; + + /// @brief The front of the queue; claimed by consumers via fetch_add in pop. + /// @details Aligned so that m_head and m_tail do not share a cache line. + alignas(CacheLineSize) std::atomic m_head{0}; + + /// @brief The back of the queue; claimed by producers via fetch_add in push_impl. + /// @details Aligned so that m_head and m_tail do not share a cache line. + alignas(CacheLineSize) std::atomic m_tail{0}; + + /// @brief Set to true by stop(); causes push() to return false and pop() to + /// return std::nullopt instead of blocking. + /// @details Aligned on its own cache line so that the single stop() write + /// does not cause false sharing with m_tail updates in push_impl(). + alignas(CacheLineSize) std::atomic m_stopped{false}; + + /// @brief Counts items currently in the queue; consumers block on this when + /// the queue is empty. + osal::Semaphore m_items{}; + + /// @brief Counts empty slots available; producers block on this when the + /// queue is full. + osal::Semaphore m_spaces{}; +}; + +} // namespace score::lcm::internal + +#endif // MPMC_CONCURRENT_QUEUE_HPP_INCLUDED diff --git a/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue_test.cpp b/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue_test.cpp new file mode 100644 index 00000000..361a1137 --- /dev/null +++ b/src/launch_manager_daemon/common/concurrency/mpmc_concurrent_queue_test.cpp @@ -0,0 +1,337 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace score::lcm::internal; + +class MPMCConcurrentQueueTest_Basic : public ::testing::Test +{ + protected: + MPMCConcurrentQueue queue_; +}; + +TEST_F(MPMCConcurrentQueueTest_Basic, PushAndPopSingleItem) +{ + RecordProperty("Description", "Verify that a single pushed item is returned by pop with its value intact."); + ASSERT_TRUE(queue_.push(42)); + auto result = queue_.pop(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 42); +} + +TEST_F(MPMCConcurrentQueueTest_Basic, PushAndPopPreservesOrder) +{ + RecordProperty("Description", "Verify that items are dequeued in FIFO order."); + for (int i = 0; i < 5; ++i) + { + ASSERT_TRUE(queue_.push(i)); + } + for (int i = 0; i < 5; ++i) + { + auto result = queue_.pop(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, i); + } +} + +TEST_F(MPMCConcurrentQueueTest_Basic, LvaluePushCopiesItem) +{ + RecordProperty("Description", "Verify that pushing an lvalue copies the item, leaving the source unchanged."); + int value = 42; + ASSERT_TRUE(queue_.push(value)); + EXPECT_EQ(value, 42); + auto result = queue_.pop(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 42); +} + +TEST_F(MPMCConcurrentQueueTest_Basic, RvaluePushWorksWithMoveOnlyType) +{ + RecordProperty("Description", + "Verify that rvalue push works with a move-only type and moves the item into the queue."); + MPMCConcurrentQueue, 4> queue; + auto item = std::make_unique(99); + ASSERT_TRUE(queue.push(std::move(item))); + EXPECT_EQ(item, nullptr); + auto result = queue.pop(); + ASSERT_TRUE(result.has_value()); + ASSERT_NE(*result, nullptr); + EXPECT_EQ(**result, 99); +} + +TEST_F(MPMCConcurrentQueueTest_Basic, PopReturnsNulloptOnSemaphoreWaitFailure) +{ + RecordProperty("Description", + "Verify that pop returns nullopt when the internal semaphore wait is " + "interrupted by a signal (sem_wait returns EINTR, triggering the kFail path)."); + + // Install a no-op handler without SA_RESTART so sem_wait is NOT restarted + // after signal delivery and returns -1 with EINTR instead. + struct sigaction sa{}; + sa.sa_handler = [](int) {}; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGUSR1, &sa, nullptr); + + score::cpp::expected result = score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + std::atomic tid_ready{false}; + + std::thread consumer([&] { + tid_ready.store(true, std::memory_order_release); + result = queue_.pop(); + }); + + // Obtain the pthread_t from the owning thread to avoid a cross-thread + // write to consumer_tid that Helgrind would flag as a data race. + pthread_t consumer_tid = consumer.native_handle(); + + while (!tid_ready.load(std::memory_order_acquire)) + { + std::this_thread::yield(); + } + // Allow the consumer to reach sem_wait before delivering the signal. + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + pthread_kill(consumer_tid, SIGUSR1); + consumer.join(); + + EXPECT_FALSE(result.has_value()); + + signal(SIGUSR1, SIG_DFL); +} + +class MPMCConcurrentQueueTest_Timeout : public ::testing::Test +{ + protected: + MPMCConcurrentQueue queue4_; +}; + +TEST_F(MPMCConcurrentQueueTest_Timeout, PushWithTimeoutSucceedsWhenSlotAvailable) +{ + RecordProperty("Description", "Verify that push with a non-zero timeout succeeds when a slot is free."); + EXPECT_TRUE(queue4_.push(1, std::chrono::milliseconds{100})); + auto result = queue4_.pop(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 1); +} + +TEST_F(MPMCConcurrentQueueTest_Timeout, PushWithTimeoutReturnsFalseWhenFull) +{ + RecordProperty( + "Description", + "Verify that push with a non-zero timeout returns false when the queue is full and the timeout expires."); + for (int i = 0; i < 4; ++i) + { + ASSERT_TRUE(queue4_.push(i)); + } + EXPECT_FALSE(queue4_.push(99, std::chrono::milliseconds{20})); +} + +class MPMCConcurrentQueueTest_Stop : public ::testing::Test +{ + protected: + MPMCConcurrentQueue queue_; +}; + +TEST_F(MPMCConcurrentQueueTest_Stop, PushReturnsFalseAfterStop) +{ + RecordProperty("Description", "Verify that push returns false once stop() has been called."); + auto res = queue_.stop(); + ASSERT_TRUE(res.has_value()); + EXPECT_FALSE(queue_.push(1)); +} + +TEST_F(MPMCConcurrentQueueTest_Stop, PopReturnsNulloptWhenStoppedAndEmpty) +{ + RecordProperty("Description", "Verify that pop returns nullopt immediately when the queue is stopped and empty."); + auto res = queue_.stop(); + ASSERT_TRUE(res.has_value()); + EXPECT_FALSE(queue_.pop().has_value()); +} + +TEST_F(MPMCConcurrentQueueTest_Stop, ItemsAreDroppedAfterStop) +{ + RecordProperty("Description", "Verify that pop() returns nullopt after stop(), dropping any remaining items."); + ASSERT_TRUE(queue_.push(1)); + ASSERT_TRUE(queue_.push(2)); + ASSERT_TRUE(queue_.push(3)); + auto res = queue_.stop(); + ASSERT_TRUE(res.has_value()); + + EXPECT_FALSE(queue_.pop().has_value()); +} + +class MPMCConcurrentQueueTest_Blocking : public ::testing::Test +{ + protected: + MPMCConcurrentQueue queue4_; + MPMCConcurrentQueue queue8_; +}; + +TEST_F(MPMCConcurrentQueueTest_Blocking, PopBlocksUntilItemAvailable) +{ + RecordProperty("Description", "Verify that pop blocks on an empty queue until a producer pushes an item."); + score::cpp::expected result = score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + + std::thread consumer([&] { + result = queue8_.pop(); + }); + + EXPECT_FALSE(result.has_value()); + + std::ignore = queue8_.push(7); + consumer.join(); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(*result, 7); +} + +TEST_F(MPMCConcurrentQueueTest_Blocking, PushBlocksWhenFull) +{ + RecordProperty("Description", "Verify that push blocks when the queue is full until a consumer pops an item."); + for (int i = 0; i < 4; ++i) + { + ASSERT_TRUE(queue4_.push(i)); + } + + std::atomic push_completed{false}; + score::cpp::expected_blank pushed = score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + std::thread producer([&] { + pushed = queue4_.push(99); + push_completed.store(true, std::memory_order_release); + }); + + EXPECT_FALSE(push_completed.load(std::memory_order_acquire)); + + std::ignore = queue4_.pop(); + producer.join(); + + EXPECT_TRUE(pushed.has_value()); +} + +TEST_F(MPMCConcurrentQueueTest_Blocking, StopUnblocksBlockedConsumer) +{ + RecordProperty("Description", "Verify that stop() unblocks a consumer thread waiting on an empty queue."); + score::cpp::expected result = score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + + std::thread consumer([&] { + result = queue8_.pop(); + }); + + auto res = queue8_.stop(); + ASSERT_TRUE(res.has_value()); + consumer.join(); + + EXPECT_FALSE(result.has_value()); +} + +TEST_F(MPMCConcurrentQueueTest_Blocking, StopUnblocksBlockedProducer) +{ + RecordProperty("Description", "Verify that stop() unblocks a producer thread waiting on a full queue."); + for (int i = 0; i < 4; ++i) + { + ASSERT_TRUE(queue4_.push(i)); + } + + score::cpp::expected_blank pushed = score::cpp::make_unexpected(ConcurrencyErrc::kOsError); + std::thread producer([&] { + pushed = queue4_.push(99); + }); + + auto res = queue4_.stop(); + ASSERT_TRUE(res.has_value()); + producer.join(); + + EXPECT_FALSE(pushed); +} + +class MPMCConcurrentQueueTest_MPMC : public ::testing::Test +{ + protected: + static constexpr std::size_t kCapacity = 64U; + static constexpr int kItemsPerProducer = 50; + static constexpr std::size_t kNumProducers = 4U; + static constexpr std::size_t kNumConsumers = 4U; + static constexpr int kTotalItems = static_cast(kItemsPerProducer * kNumProducers); + + MPMCConcurrentQueue queue_; +}; + +TEST_F(MPMCConcurrentQueueTest_MPMC, AllItemsDelivered) +{ + RecordProperty("Description", + "Verify that all items pushed by multiple concurrent producers are received " + "by multiple concurrent consumers without loss or duplication."); + std::atomic received_count{0}; + + auto producer_fn = [this](int value) { + for (int i = 0; i < kItemsPerProducer; ++i) + { + EXPECT_TRUE(queue_.push(value)); + } + }; + + auto consumer_fn = [this, &received_count] { + while (true) + { + auto item = queue_.pop(); + if (!item.has_value()) + { + break; + } + received_count.fetch_add(1, std::memory_order_relaxed); + } + }; + + std::vector producers; + for (std::size_t i = 0U; i < kNumProducers; ++i) + { + producers.emplace_back(producer_fn, static_cast(i)); + } + + std::vector consumers; + for (std::size_t i = 0U; i < kNumConsumers; ++i) + { + consumers.emplace_back(consumer_fn); + } + + for (auto& t : producers) + { + t.join(); + } + + while (received_count.load(std::memory_order_acquire) < kTotalItems) + { + std::this_thread::yield(); + } + auto res = queue_.stop(); + ASSERT_TRUE(res.has_value()); + + for (auto& t : consumers) + { + t.join(); + } + + EXPECT_EQ(received_count.load(), kTotalItems); +} diff --git a/src/launch_manager_daemon/health_monitor_lib/src/score/lcm/saf/supervision/ProcessStateTracker.cpp b/src/launch_manager_daemon/health_monitor_lib/src/score/lcm/saf/supervision/ProcessStateTracker.cpp index 8f159eb5..dbd24fb7 100644 --- a/src/launch_manager_daemon/health_monitor_lib/src/score/lcm/saf/supervision/ProcessStateTracker.cpp +++ b/src/launch_manager_daemon/health_monitor_lib/src/score/lcm/saf/supervision/ProcessStateTracker.cpp @@ -158,7 +158,7 @@ void ProcessStateTracker::updateProcessState(const ifexm::ProcessState& f_proces } // coverity[autosar_cpp14_a8_5_2_violation:FALSE] type auto shall not be initialized with {} AUTOSAR.8.5.3A - const auto& it = std::find(k_refProcesses.begin(), k_refProcesses.end(), &f_processIdentifier_r); + const auto it = std::find(k_refProcesses.begin(), k_refProcesses.end(), &f_processIdentifier_r); if (it != k_refProcesses.end()) { const bool processActive{isProcessActive(f_state)}; diff --git a/src/launch_manager_daemon/src/process_group_manager/graph.cpp b/src/launch_manager_daemon/src/process_group_manager/graph.cpp index 5866c0b5..ba0c4314 100644 --- a/src/launch_manager_daemon/src/process_group_manager/graph.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/graph.cpp @@ -15,16 +15,19 @@ #include -#include +#include #include #include -#include +#include -namespace score { +namespace score +{ -namespace lcm { +namespace lcm +{ -namespace internal { +namespace internal +{ Graph::Graph(uint32_t max_num_nodes, ProcessGroupManager* pgm) : pg_index_(0U), @@ -42,7 +45,8 @@ Graph::Graph(uint32_t max_num_nodes, ProcessGroupManager* pgm) pending_state_(""), event_(ControlClientCode::kNotSet), cancel_message_(), - request_start_time_() { + request_start_time_() +{ LM_LOG_DEBUG() << "Creating graph with" << max_num_nodes << "nodes"; nodes_.reserve(max_num_nodes); last_state_manager_.process_index_ = 0xFFFFU; // an invalid state manager @@ -50,15 +54,16 @@ Graph::Graph(uint32_t max_num_nodes, ProcessGroupManager* pgm) cancel_message_.request_or_response_ = ControlClientCode::kNotSet; } -Graph::~Graph() { +Graph::~Graph() +{ nodes_.clear(); LM_LOG_DEBUG() << "Graph destroyed"; } -void Graph::initProcessGroupNodes( IdentifierHash pg_name, uint32_t num_processes, uint32_t index ) +void Graph::initProcessGroupNodes(IdentifierHash pg_name, uint32_t num_processes, uint32_t index) { - pg_index_ = index; - off_state_ = pgm_->getConfigurationManager()->getNameOfOffState(pg_name); + pg_index_ = index; + off_state_ = pgm_->getConfigurationManager()->getNameOfOffState(pg_name); requested_state_.pg_state_name_ = off_state_; requested_state_.pg_name_ = pg_name; @@ -67,15 +72,18 @@ void Graph::initProcessGroupNodes( IdentifierHash pg_name, uint32_t num_processe createProcessInfoNodes(num_processes); - if (nodes_.size() == num_processes) { + if (nodes_.size() == num_processes) + { createSuccessorLists(pg_name); } } -inline void Graph::createProcessInfoNodes(uint32_t num_processes) { +inline void Graph::createProcessInfoNodes(uint32_t num_processes) +{ nodes_.reserve(num_processes); // Reserve space for efficiency - for (uint32_t process_id = 0U; process_id < num_processes; ++process_id) { + for (uint32_t process_id = 0U; process_id < num_processes; ++process_id) + { LM_LOG_DEBUG() << "Creating process node with id:" << process_id; nodes_.push_back(std::make_shared()); nodes_.back()->initNode(this, process_id); @@ -83,23 +91,25 @@ inline void Graph::createProcessInfoNodes(uint32_t num_processes) { LM_LOG_DEBUG() << "Created" << nodes_.size() << "process nodes"; } -inline void Graph::createSuccessorLists(IdentifierHash pg_name) { +inline void Graph::createSuccessorLists(IdentifierHash pg_name) +{ LM_LOG_DEBUG() << "Creating successor lists for process group" << pg_name; // Now create the successor lists for each process in this process group - for (auto& node : nodes_) { + for (auto& node : nodes_) + { // If the other process has a dependency on this one, put it on the correct list auto node_index = node->getNodeIndex(); const DependencyList* dep_list = pgm_->getConfigurationManager()->getOsProcessDependencies(pg_name, node_index).value_or(nullptr); - if( dep_list ) + if (dep_list) { - for( const Dependency& dep : *dep_list ) + for (const Dependency& dep : *dep_list) { - if( dep.os_process_index_ < nodes_.size() ) + if (dep.os_process_index_ < nodes_.size()) { - nodes_[dep.os_process_index_]->addSuccessorNode( node, dep.process_state_ ); + nodes_[dep.os_process_index_]->addSuccessorNode(node, dep.process_state_); LM_LOG_DEBUG() << "Added successor node dependency:" << dep.os_process_index_ << "->" << node_index; } } @@ -107,31 +117,37 @@ inline void Graph::createSuccessorLists(IdentifierHash pg_name) { } } -void Graph::setState(GraphState new_state) { +void Graph::setState(GraphState new_state) +{ GraphState old_state = getState(); // Notice that this is a private method and by design the states can't be out of range - //if( old_state > GraphState::kUndefinedState || + // if( old_state > GraphState::kUndefinedState || // new_state > GraphState::kUndefinedState ) //{ // LM_LOG_ERROR() << "Incorrect state transition:" << static_cast( old_state ) << "to" // << static_cast( new_state ); //} - //else + // else { score::cpp::span line{state_results[static_cast(new_state)]}; GraphState target_state = new_state; - while (old_state != target_state) { - // coverity[autosar_cpp14_a5_2_5_violation:FALSE] Line is an array of graphstates from state_results. There are no nullptrs inside state_results so a indexing without a check is allowed. - target_state = line.data()[static_cast(old_state)]; // score::cpp::span does not implement operator[] + while (old_state != target_state) + { + // coverity[autosar_cpp14_a5_2_5_violation:FALSE] Line is an array of graphstates from state_results. There + // are no nullptrs inside state_results so a indexing without a check is allowed. + target_state = + line.data()[static_cast(old_state)]; // score::cpp::span does not implement operator[] - if (state_.compare_exchange_strong(old_state, target_state)) { + if (state_.compare_exchange_strong(old_state, target_state)) + { LM_LOG_DEBUG() << "Graph::setState changes from" << toString(old_state) << "to" - << toString(target_state) << "for PG" << pg_index_ << "(" - << requested_state_.pg_name_ << ")"; + << toString(target_state) << "for PG" << pg_index_ << "(" << requested_state_.pg_name_ + << ")"; old_state = target_state; - if (new_state == GraphState::kSuccess) { + if (new_state == GraphState::kSuccess) + { // get state transition end time stamp auto request_end_time = std::chrono::steady_clock::now(); @@ -147,7 +163,8 @@ void Graph::setState(GraphState new_state) { } } -bool Graph::queueHeadNodes(bool start) { +bool Graph::queueHeadNodes(bool start) +{ // Count the number of nodes in this graph starting_ = start; @@ -156,18 +173,22 @@ bool Graph::queueHeadNodes(bool start) { nodes_to_execute_.store(executing_nodes); nodes_in_flight_.store(0); - if (executing_nodes > 0U) { + if (executing_nodes > 0U) + { queueHeadNodesForExecution(); } return (nodes_in_flight_ > 0); } -inline uint32_t Graph::countExecutableNodes(bool start) { +inline uint32_t Graph::countExecutableNodes(bool start) +{ uint32_t executable_nodes = 0U; - for (const auto& node : nodes_) { - if (node->constructGraphNode(start)) { + for (const auto& node : nodes_) + { + if (node->constructGraphNode(start)) + { ++executable_nodes; } } @@ -175,53 +196,74 @@ inline uint32_t Graph::countExecutableNodes(bool start) { return executable_nodes; } -inline void Graph::queueHeadNodesForExecution() { - for (const auto& node : nodes_) { - if (node->isHeadNode()) { +inline void Graph::queueHeadNodesForExecution() +{ + for (const auto& node : nodes_) + { + if (node->isHeadNode()) + { tryQueueNode(node); } } } -inline void Graph::tryQueueNode(const std::shared_ptr& node) { - while (GraphState::kInTransition == getState()) { - if (pgm_->getWorkerJobs()->addJobToQueue(node)) { +inline void Graph::tryQueueNode(const std::shared_ptr& node) +{ + while (GraphState::kInTransition == getState()) + { + auto push_res = pgm_->getWorkerJobs()->push(node, kMaxQueueDelay); + if (push_res) + { markNodeInFlight(); break; - } else { - LM_LOG_WARN() << "Failed to add job to queue. Queue may be full or wait time too short."; - // Retry mechanism: continues looping until the job is queued successfully + } + else if (push_res.error() == ConcurrencyErrc::kTimeout) + { + continue; + } + else + { + LM_LOG_WARN() << "Failed to queue node for execution " << push_res.error(); + break; } } } -void Graph::queueStopJobs(const std::vector* process_index_list) { +void Graph::queueStopJobs(const std::vector* process_index_list) +{ // p is not nullptr - guaranteed by caller. // First mark all processes as being not in the requested state - for (auto node : nodes_) { + for (auto node : nodes_) + { node->markRequested(false); } // Then go through the processes in the requested state and mark them true - for (uint32_t index : *process_index_list) { - if (index < nodes_.size()) { + for (uint32_t index : *process_index_list) + { + if (index < nodes_.size()) + { nodes_[index]->markRequested(true); } } - if (!queueHeadNodes(false)) { + if (!queueHeadNodes(false)) + { queueStartJobs(); } } -void Graph::queueStartJobs() { - if (!queueHeadNodes(true)) { +void Graph::queueStartJobs() +{ + if (!queueHeadNodes(true)) + { setState(GraphState::kSuccess); // nothing to do, done nothing, success! setPendingEvent(ControlClientCode::kSetStateSuccess); } } -bool Graph::startTransition(ProcessGroupStateID pg_state) { +bool Graph::startTransition(ProcessGroupStateID pg_state) +{ IdentifierHash old_state_name; { std::lock_guard lock(requested_state_mutex_); @@ -231,12 +273,13 @@ bool Graph::startTransition(ProcessGroupStateID pg_state) { const std::vector* process_index_list = pgm_->getConfigurationManager()->getProcessIndexesList(requested_state_).value_or(nullptr); - if (nullptr != process_index_list) { + if (nullptr != process_index_list) + { setState(GraphState::kInTransition); - if( GraphState::kInTransition == getState() ) + if (GraphState::kInTransition == getState()) { - queueStopJobs( process_index_list ); + queueStopJobs(process_index_list); return true; } } @@ -247,12 +290,14 @@ bool Graph::startTransition(ProcessGroupStateID pg_state) { return false; } -bool Graph::startInitialTransition(ProcessGroupStateID pg_state) { +bool Graph::startInitialTransition(ProcessGroupStateID pg_state) +{ is_initial_state_transition_ = true; setRequestStartTime(); bool result = startTransition(pg_state); - if (!result) { + if (!result) + { is_initial_state_transition_ = false; pgm_->setInitialStateTransitionResult(ControlClientCode::kInitialMachineStateFailed); } @@ -260,7 +305,8 @@ bool Graph::startInitialTransition(ProcessGroupStateID pg_state) { return result; } -bool Graph::startTransitionToOffState() { +bool Graph::startTransitionToOffState() +{ // Guaranteed to transition to all off even if there // is no configured "Off" state. setRequestStartTime(); @@ -273,104 +319,133 @@ bool Graph::startTransitionToOffState() { if (GraphState::kInTransition == getState()) { std::vector empty_list{}; - queueStopJobs( &empty_list ); + queueStopJobs(&empty_list); result = true; } return result; } -void Graph::nodeExecuted() { +void Graph::nodeExecuted() +{ GraphState current_state = getState(); - if (current_state == GraphState::kInTransition) { + if (current_state == GraphState::kInTransition) + { handleTransitionExecution(); - } else { + } + else + { handleNonTransitionExecution(current_state); } } -inline void Graph::handleTransitionExecution() { - if (nodes_to_execute_.load() > 0U) { +inline void Graph::handleTransitionExecution() +{ + if (nodes_to_execute_.load() > 0U) + { --nodes_in_flight_; - if (0U == --nodes_to_execute_) { - if (starting_) { - if (is_initial_state_transition_) { + if (0U == --nodes_to_execute_) + { + if (starting_) + { + if (is_initial_state_transition_) + { is_initial_state_transition_ = false; pgm_->setInitialStateTransitionResult(ControlClientCode::kInitialMachineStateSuccess); - // RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS and does a C-style cast.", true) - LM_LOG_DEBUG() - << "clock() at successful initial state transition:" - // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a weird value in debug messages. - << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) << "ms"; + // RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS and does + // a C-style cast.", true) + LM_LOG_DEBUG() << "clock() at successful initial state transition:" + // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a + // weird value in debug messages. + << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) + << "ms"; } setState(GraphState::kSuccess); setPendingEvent(ControlClientCode::kSetStateSuccess); - } else { + } + else + { queueStartJobs(); } } } } -inline void Graph::handleNonTransitionExecution(GraphState current_state) { - if (0 >= --nodes_in_flight_) { - if (is_initial_state_transition_) { +inline void Graph::handleNonTransitionExecution(GraphState current_state) +{ + if (0 >= --nodes_in_flight_) + { + if (is_initial_state_transition_) + { is_initial_state_transition_ = false; pgm_->setInitialStateTransitionResult(ControlClientCode::kInitialMachineStateFailed); - // RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS and does a C-style cast.", true) - LM_LOG_FATAL() - << "clock() at failed initial state transition:" - // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a weird value in debug messages. - << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) << "ms"; + // RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS and does a + // C-style cast.", true) + LM_LOG_FATAL() << "clock() at failed initial state transition:" + // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a weird + // value in debug messages. + << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) << "ms"; } setState(GraphState::kUndefinedState); - if (current_state == GraphState::kAborting) { + if (current_state == GraphState::kAborting) + { setPendingEvent(abort_code_); - } else { + } + else + { ControlClientChannel::nudgeControlClientHandler(); } } } -void Graph::abort(uint32_t code, ControlClientCode reason) { - if (getState() < GraphState::kAborting) { +void Graph::abort(uint32_t code, ControlClientCode reason) +{ + if (getState() < GraphState::kAborting) + { setState(GraphState::kAborting); last_execution_error_.store(code); abort_code_.store(reason); } } -void Graph::cancel() { +void Graph::cancel() +{ setState(GraphState::kCancelled); - if (getState() == GraphState::kCancelled) { + if (getState() == GraphState::kCancelled) + { setPendingEvent(ControlClientCode::kSetStateCancelled); } - if (0 == nodes_in_flight_) { - if (is_initial_state_transition_) { + if (0 == nodes_in_flight_) + { + if (is_initial_state_transition_) + { is_initial_state_transition_ = false; - pgm_->setInitialStateTransitionResult( ControlClientCode::kInitialMachineStateFailed ); + pgm_->setInitialStateTransitionResult(ControlClientCode::kInitialMachineStateFailed); // Some may argue that not finishing MachineGF.Startup state transition, is a critical problem. - // Essentially, controller SM is requesting MachineGF.Startup transition, on an action list assigned to its initial state. - // RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS and does a C-style cast.", true) - LM_LOG_DEBUG() - << "clock() at canceled initial state transition:" - // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a weird value in debug messages. - << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) << "ms"; + // Essentially, controller SM is requesting MachineGF.Startup transition, on an action list assigned to its + // initial state. RULECHECKER_comment(1, 3, check_c_style_cast, "This is the definition provided by the OS + // and does a C-style cast.", true) + LM_LOG_DEBUG() << "clock() at canceled initial state transition:" + // coverity[cert_err33_c_violation:INTENTIONAL] Does not matter if clock() gives a weird + // value in debug messages. + << (static_cast(clock()) / (static_cast(CLOCKS_PER_SEC) / 1000.0)) << "ms"; } setState(GraphState::kUndefinedState); } } -void Graph::setStateManager(ControlClientID& control_client_id) { +void Graph::setStateManager(ControlClientID& control_client_id) +{ ControlClientCode code = getPendingEvent(); - if (code != ControlClientCode::kNotSet) { + if (code != ControlClientCode::kNotSet) + { cancel_message_.process_group_state_ = requested_state_; cancel_message_.originating_control_client_ = last_state_manager_; cancel_message_.request_or_response_ = code; @@ -379,67 +454,82 @@ void Graph::setStateManager(ControlClientID& control_client_id) { last_state_manager_ = control_client_id; } -std::shared_ptr Graph::getProcessInfoNode(uint32_t process_index) { +std::shared_ptr Graph::getProcessInfoNode(uint32_t process_index) +{ std::shared_ptr node{}; - if (process_index < nodes_.size()) { + if (process_index < nodes_.size()) + { node = nodes_[process_index]; } return node; } -ProcessGroupManager* Graph::getProcessGroupManager() { +ProcessGroupManager* Graph::getProcessGroupManager() +{ return pgm_; } -IdentifierHash Graph::getProcessGroupName() { +IdentifierHash Graph::getProcessGroupName() +{ return requested_state_.pg_name_; } -bool Graph::isStarting() const { +bool Graph::isStarting() const +{ return starting_; } -void Graph::markNodeInFlight() { +void Graph::markNodeInFlight() +{ ++nodes_in_flight_; } -GraphState Graph::getState() const { +GraphState Graph::getState() const +{ return state_.load(); } -IdentifierHash Graph::getProcessGroupState() { +IdentifierHash Graph::getProcessGroupState() +{ std::lock_guard lock(requested_state_mutex_); return requested_state_.pg_state_name_; } -uint32_t Graph::getProcessGroupIndex() { +uint32_t Graph::getProcessGroupIndex() +{ return pg_index_; } -NodeList& Graph::getNodes() { +NodeList& Graph::getNodes() +{ return nodes_; } -ControlClientID Graph::getStateManager() { +ControlClientID Graph::getStateManager() +{ return last_state_manager_; } -uint32_t Graph::getLastExecutionError() { +uint32_t Graph::getLastExecutionError() +{ return last_execution_error_.load(); } -void Graph::setLastExecutionError(uint32_t code) { +void Graph::setLastExecutionError(uint32_t code) +{ last_execution_error_.store(code); } -IdentifierHash Graph::setPendingState(IdentifierHash new_state) { +IdentifierHash Graph::setPendingState(IdentifierHash new_state) +{ IdentifierHash result_state = pending_state_; pending_state_ = new_state; - if (new_state != result_state) { + if (new_state != result_state) + { LM_LOG_DEBUG() << "Pending state for process group" << requested_state_.pg_name_ << "changed from" << result_state << "to" << pending_state_; } @@ -447,29 +537,36 @@ IdentifierHash Graph::setPendingState(IdentifierHash new_state) { return result_state; } -IdentifierHash Graph::getPendingState() { +IdentifierHash Graph::getPendingState() +{ return pending_state_; } -ControlClientCode Graph::getPendingEvent() { +ControlClientCode Graph::getPendingEvent() +{ return event_.load(); } -void Graph::clearPendingEvent(ControlClientCode expected) { +void Graph::clearPendingEvent(ControlClientCode expected) +{ (void)event_.compare_exchange_strong(expected, ControlClientCode::kNotSet); } -void Graph::setPendingEvent(ControlClientCode event) { +void Graph::setPendingEvent(ControlClientCode event) +{ event_.store(event); ControlClientChannel::nudgeControlClientHandler(); } -ControlClientMessage& Graph::getCancelMessage() { +ControlClientMessage& Graph::getCancelMessage() +{ return cancel_message_; } -const char* Graph::toString(GraphState state) { - switch (state) { +const char* Graph::toString(GraphState state) +{ + switch (state) + { case GraphState::kAborting: return "kAborting"; @@ -490,16 +587,18 @@ const char* Graph::toString(GraphState state) { } } -void Graph::setRequestStartTime() { +void Graph::setRequestStartTime() +{ request_start_time_ = std::chrono::steady_clock::now(); } -std::chrono::time_point Graph::getRequestStartTime() { +std::chrono::time_point Graph::getRequestStartTime() +{ return request_start_time_; } -} // namespace lcm - } // namespace internal +} // namespace lcm + } // namespace score diff --git a/src/launch_manager_daemon/src/process_group_manager/jobqueue.cpp b/src/launch_manager_daemon/src/process_group_manager/jobqueue.cpp deleted file mode 100644 index 8bad7e9d..00000000 --- a/src/launch_manager_daemon/src/process_group_manager/jobqueue.cpp +++ /dev/null @@ -1,90 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2025 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - -#include -#include -#include -#include "jobqueue.hpp" - -namespace score { - -namespace lcm { - -namespace internal { - -template -JobQueue::JobQueue(std::size_t capacity) - : num_items_(), num_spaces_(), in_index_(0), out_index_(0), capacity_(capacity), the_items_(capacity) { - num_items_.init(0U, false); - num_spaces_.init(static_cast(capacity & 0xFFFFFFFFUL), false); -} - -template -JobQueue::~JobQueue() { - num_spaces_.deinit(); - num_items_.deinit(); -} - -template -std::shared_ptr JobQueue::getJobFromQueue() { - std::shared_ptr result; - - if (osal::OsalReturnType::kSuccess == num_items_.wait() && is_running_) { - std::size_t index = static_cast(out_index_.fetch_add(1U, std::memory_order_relaxed)) % capacity_; - - result = std::atomic_load_explicit(&the_items_[index], std::memory_order_acquire); - num_spaces_.post(); - } - - return result; -} - -template -bool JobQueue::addJobToQueue(std::shared_ptr job) { - bool result = false; - - if (osal::OsalReturnType::kSuccess == num_spaces_.timedWait(score::lcm::internal::kMaxQueueDelay)) { - std::size_t index = static_cast(in_index_.fetch_add(1U, std::memory_order_relaxed)) % capacity_; - - std::atomic_store_explicit(&the_items_[index], job, std::memory_order_release); - num_items_.post(); - result = true; - } - - return result; -} - -template -void JobQueue::stopQueue(std::size_t nr_threads) { - is_running_ = false; - - // Wake up all threads servicing this queue. - // We do this by calling post as many times as we have threads. - for (std::size_t i = 0; i < nr_threads; ++i) { - num_items_.post(); - } -} - -template -bool JobQueue::isRunning() const { - return is_running_; -} - -class ProcessInfoNode; -template class JobQueue; - -} // namespace lcm - -} // namespace internal - -} // namespace score diff --git a/src/launch_manager_daemon/src/process_group_manager/jobqueue.hpp b/src/launch_manager_daemon/src/process_group_manager/jobqueue.hpp deleted file mode 100644 index bc5aac50..00000000 --- a/src/launch_manager_daemon/src/process_group_manager/jobqueue.hpp +++ /dev/null @@ -1,108 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2025 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - - -#ifndef JOB_QUEUE_HPP_INCLUDED -#define JOB_QUEUE_HPP_INCLUDED - -#include -#include - -#include -#include - -namespace score { - -namespace lcm { - -namespace internal { - -/// @brief A thread-safe job queue for managing and distributing jobs to worker threads. -/// The JobQueue class allows multiple threads to enqueue jobs, which are then executed -/// by multiple worker threads. It ensures thread safety while handling job pointers. -/// @tparam T The type of the items in the job queue (pointers to T). -template -class JobQueue final { - public: - /// @brief Constructs a JobQueue with a specified capacity. - /// Initializes the job queue to handle a maximum number of items defined by the capacity. - /// @param capacity The maximum number of items the queue can hold. - explicit JobQueue(std::size_t capacity); - - ///@brief Destructor that stops and destroy the timeout thread, ensuring no resources are left dangling. - ~JobQueue(); - - // Rule of five - /// @brief Copy constructor is deleted to prevent copying. - JobQueue(const JobQueue&) = delete; - - /// @brief Copy assignment operator is deleted to prevent copying. - JobQueue& operator=(const JobQueue&) = delete; - - /// @brief Move constructor is deleted to prevent moving. - JobQueue(JobQueue&&) = delete; - - /// @brief Move assignment operator is deleted to prevent moving. - JobQueue& operator=(JobQueue&&) = delete; - - /// @brief Adds a job to the queue, waiting for space to become available if necessary. - /// This operation is thread-safe and will wait up to kMaxQueueDelay milliseconds for space to become available. - /// @param job The item to add to the queue. - /// @return true if the job was successfully added, false if the operation timed out. - bool addJobToQueue(std::shared_ptr job); - - /// @brief Retrieves a job from the queue, waiting if necessary until a job is available. - /// This operation is thread-safe and will wait up to kMaxQueueDelay milliseconds for a job to become available. - /// @return A pointer to the job, or nullptr if the operation timed out. - std::shared_ptr getJobFromQueue(); - - /// @brief Stops the job queue, unblocking any waiting threads. - /// This method sets the running state to false and posts to the item semaphore - /// to unblock any threads waiting to retrieve jobs. - /// @param nr_threads The number of threads to unblock. - void stopQueue(std::size_t nr_threads); - - /// @brief Checks if the job queue is currently running. - /// @return - bool isRunning() const; - - private: - /// @brief Semaphore used to wait for an item to remove from the queue - osal::Semaphore num_items_; - - /// @brief Semaphore used to wait for space to place an item in the queue - osal::Semaphore num_spaces_; - - /// @brief Index of the next free space in the queue - std::atomic_uint32_t in_index_{0U}; - - /// @brief Index of the next available item in the queue - std::atomic_uint32_t out_index_{0U}; - - /// @brief The maximum capacity of the queue. - std::size_t capacity_; - - /// @brief Array of items that forms the queue - std::vector> the_items_{}; - - /// @brief Atomic flag indicating whether the queue is running - std::atomic_bool is_running_{true}; -}; - -} // namespace lcm - -} // namespace internal - -} // namespace score - -#endif /// JOB_QUEUE_HPP_INCLUDED diff --git a/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.cpp b/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.cpp index 520e6ef9..5c9a1b74 100644 --- a/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.cpp @@ -20,13 +20,7 @@ #include #include -namespace score -{ - -namespace lcm -{ - -namespace internal +namespace score::lcm::internal { using namespace score::lcm::internal::osal; @@ -238,8 +232,8 @@ inline void ProcessGroupManager::createProcessComponentsObjects() LM_LOG_DEBUG() << "Creating Safe Process Map with" << total_processes_ << "entries"; process_map_ = std::make_shared(total_processes_); - LM_LOG_DEBUG() << "Creating job queue with" << total_processes_ << "entries"; - worker_jobs_ = std::make_shared>(total_processes_); + LM_LOG_DEBUG() << "Creating job queue with capacity" << static_cast(ProcessLimits::kMaxProcesses); + worker_jobs_ = std::make_shared(); LM_LOG_DEBUG() << "Creating worker threads..."; worker_threads_ = std::make_unique>( @@ -364,7 +358,7 @@ inline void ProcessGroupManager::allProcessGroupsOff() if (!waitForStateCompletion(process_groups_, GraphState::kInTransition, 1000)) { LM_LOG_ERROR() << "NOTE: Transition to Off state timed out"; - worker_jobs_->stopQueue(static_cast(ProcessLimits::kNumWorkerThreads)); + worker_threads_->stop(); for (auto& pg : process_groups_) { for (auto& node : pg->getNodes()) @@ -704,7 +698,7 @@ inline void ProcessGroupManager::processGroupHandler(Graph& pg) } } - if (GraphState::kUndefinedState == pg.getState()) + if (GraphState::kUndefinedState == pg.getState()) { // at the moment graph is not running... // i.e. it is not in kInTransition, kAborting or kCancelled state @@ -785,13 +779,9 @@ std::shared_ptr ProcessGroupManager::getProcessMap() return process_map_; } -std::shared_ptr> ProcessGroupManager::getWorkerJobs() +std::shared_ptr ProcessGroupManager::getWorkerJobs() { return worker_jobs_; } -} // namespace internal - -} // namespace lcm - -} // namespace score +} // namespace score::lcm::internal diff --git a/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.hpp b/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.hpp index fd72bcb6..2143c9b0 100644 --- a/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.hpp +++ b/src/launch_manager_daemon/src/process_group_manager/processgroupmanager.hpp @@ -24,7 +24,7 @@ #include #include #include -#include +#include #include #include #include @@ -32,12 +32,10 @@ #include #include #include +#include -namespace score { -namespace lcm { - -namespace internal { +namespace score::lcm::internal { /// @brief ProcessGroupManager provides the core functionality of LCM. /// Software that is deployed to the machine, should be managed through Process Groups. @@ -52,6 +50,8 @@ namespace internal { /// Interaction with OSAL to discover when processes terminated in an unexpected way. /// Fulfilling PG State transitions requests from SM, as well as informing SM about unexpected problems (for example process crashes). class ProcessGroupManager final { + using WorkerQueue = MPMCConcurrentQueue, + static_cast(ProcessLimits::kMaxProcesses)>; public: /// @brief Constructs a new ProcessGroupManager object. /// @@ -118,8 +118,8 @@ class ProcessGroupManager final { std::shared_ptr getProcessMap(); /// @brief Gets the job queue for worker threads. - /// @return Shared pointer to the JobQueue object for ProcessInfoNode jobs. - std::shared_ptr> getWorkerJobs(); + /// @return Shared pointer to the MpmcQueue object for ProcessInfoNode jobs. + std::shared_ptr getWorkerJobs(); /// @brief Calls QueuePosixProcess method of psn data member /// @details Writes via IPC the latest Process State change, so that PHM can be informed about it. @@ -271,7 +271,7 @@ class ProcessGroupManager final { std::unique_ptr> worker_threads_; /// @brief Shared pointer to the job queue for ProcessInfoNode jobs. - std::shared_ptr> worker_jobs_; + std::shared_ptr worker_jobs_; /// @brief Total number of processes. /// @deprecated there is no reason to store the total number of processes in the class @@ -304,10 +304,7 @@ class ProcessGroupManager final { std::shared_ptr recovery_client_{}; }; -} // namespace lcm - -} // namespace internal +} // namespace score::lcm::internal -} // namespace score #endif /// PROCESSGROUPMANAGER_HPP_INCLUDED diff --git a/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp b/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp index 57f2adb0..ffd9628b 100644 --- a/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/processinfonode.cpp @@ -11,20 +11,25 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -#include -#include +#include "processinfonode.hpp" #include "graph.hpp" #include "processgroupmanager.hpp" -#include "processinfonode.hpp" +#include +#include -namespace score { +namespace score +{ -namespace lcm { +namespace lcm +{ -namespace internal { +namespace internal +{ -void ProcessInfoNode::initNode(Graph* graph, uint32_t index) { - if (graph) { +void ProcessInfoNode::initNode(Graph* graph, uint32_t index) +{ + if (graph) + { IdentifierHash pg = graph->getProcessGroupName(); graph_ = graph; process_index_ = index; @@ -38,28 +43,36 @@ void ProcessInfoNode::initNode(Graph* graph, uint32_t index) { start_dependencies_ = 0U; auto cfg_mgr = graph_->getProcessGroupManager()->getConfigurationManager(); config_ = cfg_mgr->getOsProcessConfiguration(pg, index).value_or(nullptr); - if (config_) { - if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) { + if (config_) + { + if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) + { graph_->getProcessGroupManager()->setLaunchManagerConfiguration(config_); } dependency_list_ = cfg_mgr->getOsProcessDependencies(pg, process_index_).value_or(nullptr); - if (dependency_list_) { + if (dependency_list_) + { start_dependencies_ = static_cast(dependency_list_->size() & 0xFFFFFFFFUL); LM_LOG_DEBUG() << "Process" << process_index_ << "has" << start_dependencies_ << "start dependencies"; } - } else { + } + else + { LM_LOG_ERROR() << "No configuration for process" << process_index_ << "of process group" << pg; } } } -bool ProcessInfoNode::constructGraphNode(bool starting) { +bool ProcessInfoNode::constructGraphNode(bool starting) +{ bool included = false; - if (!starting) { + if (!starting) + { std::ptrdiff_t count = - std::count_if(dependent_on_running_.begin(), dependent_on_running_.end(), - [](auto& d) -> bool { return d->process_state_ == score::lcm::ProcessState::kRunning; }); + std::count_if(dependent_on_running_.begin(), dependent_on_running_.end(), [](auto& d) -> bool { + return d->process_state_ == score::lcm::ProcessState::kRunning; + }); if (count > 0L) stop_dependencies_ = static_cast(count & 0xFFFFFFFFL); @@ -71,9 +84,12 @@ bool ProcessInfoNode::constructGraphNode(bool starting) { dependencies_ = stop_dependencies_; // A stop node should be inserted for processes not in the idle or terminated state where: // The process is not listed in the requested state - included = - !((getState() == score::lcm::ProcessState::kIdle) || (getState() == score::lcm::ProcessState::kTerminated)) && !in_requested_state_; - } else { + included = !((getState() == score::lcm::ProcessState::kIdle) || + (getState() == score::lcm::ProcessState::kTerminated)) && + !in_requested_state_; + } + else + { LM_LOG_DEBUG() << "Start Dependencies:" << start_dependencies_; dependencies_ = start_dependencies_; // The process should be started (node inserted) if @@ -83,9 +99,12 @@ bool ProcessInfoNode::constructGraphNode(bool starting) { // Go through the predecessors nodes to check if those are already in the ExecutionState // that has been configured as part of the execution dependency - if (dependency_list_) { - for (const auto& dep : *dependency_list_) { - if (dep.process_state_ == graph_->getNodes()[dep.os_process_index_]->getState()) { + if (dependency_list_) + { + for (const auto& dep : *dependency_list_) + { + if (dep.process_state_ == graph_->getNodes()[dep.os_process_index_]->getState()) + { --dependencies_; } } @@ -97,36 +116,50 @@ bool ProcessInfoNode::constructGraphNode(bool starting) { return included; } -void ProcessInfoNode::addSuccessorNode(std::shared_ptr& successor_node, score::lcm::ProcessState dependency) { - if (dependency == score::lcm::ProcessState::kTerminated) { +void ProcessInfoNode::addSuccessorNode(std::shared_ptr& successor_node, + score::lcm::ProcessState dependency) +{ + if (dependency == score::lcm::ProcessState::kTerminated) + { LM_LOG_DEBUG() << "Adding kTerminated for process" << process_index_ << ":" << successor_node->process_index_; dependent_on_terminating_.push_back(successor_node); - } else if (dependency == score::lcm::ProcessState::kRunning) { + } + else if (dependency == score::lcm::ProcessState::kRunning) + { dependent_on_running_.push_back(successor_node); LM_LOG_DEBUG() << "Adding kRunning successor for process" << process_index_ << ":" << successor_node->process_index_; - } else { + } + else + { // all other dependency types are forbidden! LM_LOG_ERROR() << "Invalid dependency type for process" << process_index_ << ":" << static_cast(dependency); } } -bool ProcessInfoNode::setState(score::lcm::ProcessState new_state) { +bool ProcessInfoNode::setState(score::lcm::ProcessState new_state) +{ bool success = true; score::lcm::ProcessState old_state = getState(); if (score::lcm::ProcessState::kTerminated == new_state || - (new_state == score::lcm::ProcessState::kIdle && old_state == score::lcm::ProcessState::kTerminated)) { + (new_state == score::lcm::ProcessState::kIdle && old_state == score::lcm::ProcessState::kTerminated)) + { process_state_.store(new_state); - } else if (new_state >= old_state) { + } + else if (new_state >= old_state) + { success = process_state_.compare_exchange_strong(old_state, new_state); - } else { + } + else + { success = false; } if (success && config_->startup_config_.comms_type_ != osal::CommsType::kNoComms && - score::lcm::ProcessState::kIdle != new_state) { + score::lcm::ProcessState::kIdle != new_state) + { // for a reporting process, report a process state change to PHM score::lcm::PosixProcess process_info; process_info.id = config_->process_id_; @@ -138,21 +171,24 @@ bool ProcessInfoNode::setState(score::lcm::ProcessState new_state) { // b) &p.systemClockTimeStamp points outside the accessible address space, but it does not static_cast(clock_gettime(CLOCK_MONOTONIC, &process_info.systemClockTimestamp)); // Note that we ignore the return value from QueuePosixProcess. - // An error would indicate that PHM is not reading values fast enough from the shared memory; the buffer over-run - // should be visible at the PHM side and handled there. If PHM is not responding do we need to handle this? - // If PHM terminates state manager will be informed in any case. + // An error would indicate that PHM is not reading values fast enough from the shared memory; the buffer + // over-run should be visible at the PHM side and handled there. If PHM is not responding do we need to handle + // this? If PHM terminates state manager will be informed in any case. static_cast(graph_->getProcessGroupManager()->queuePosixProcess(process_info)); } return success; } -void ProcessInfoNode::queueTerminationSuccessorJobs() { +void ProcessInfoNode::queueTerminationSuccessorJobs() +{ auto processJob = [&](std::shared_ptr successor_node) { - if (successor_node->is_included_ && successor_node->dependencies_ > 0U && - --successor_node->dependencies_ == 0U) { - while (graph_->getState() == GraphState::kInTransition) { - if (graph_->getProcessGroupManager()->getWorkerJobs()->addJobToQueue(successor_node)) { + if (successor_node->is_included_ && successor_node->dependencies_ > 0U && --successor_node->dependencies_ == 0U) + { + while (graph_->getState() == GraphState::kInTransition) + { + if (graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay)) + { graph_->markNodeInFlight(); break; } @@ -160,91 +196,114 @@ void ProcessInfoNode::queueTerminationSuccessorJobs() { } }; - if (graph_->isStarting()) { - for (auto& successor_node : dependent_on_terminating_) { + if (graph_->isStarting()) + { + for (auto& successor_node : dependent_on_terminating_) + { processJob(successor_node); } - } else if (dependency_list_) // Successors given by our dependencies + } + else if (dependency_list_) // Successors given by our dependencies { - for (const auto& dependency : *dependency_list_) { + for (const auto& dependency : *dependency_list_) + { auto successorNode = graph_->getProcessInfoNode(dependency.os_process_index_); - if (successorNode->getState() != score::lcm::ProcessState::kTerminated) { + if (successorNode->getState() != score::lcm::ProcessState::kTerminated) + { processJob(successorNode); } } } } -void ProcessInfoNode::unexpectedTermination() { +void ProcessInfoNode::unexpectedTermination() +{ LM_LOG_WARN() << "unexpected termination of process" << process_index_ << "pid" << pid_ << "(" << config_->startup_config_.short_name_ << ")"; uint32_t execution_error_code = config_->pgm_config_.execution_error_code_; auto graph_state = graph_->getState(); - if (GraphState::kSuccess == graph_state) { + if (GraphState::kSuccess == graph_state) + { // We were in a defined state, this error needs to be reported to SM graph_->abort(execution_error_code, ControlClientCode::kFailedUnexpectedTermination); - } else if (score::lcm::ProcessState::kStarting == getState()) { + } + else if (score::lcm::ProcessState::kStarting == getState()) + { // for graph in any other state, the error will be found elsewhere. But if the graph is in // transition, and the process status is not yet kRunning, we may want to post on the semaphore // to save a little waiting time. auto sync = sync_; // take a copy as the pointer otherwise may become invalidated - if (sync) { + if (sync) + { // note that we ignore the return code. The semaphore operation may fail because it could // be destroyed by another thread static_cast(sync->send_sync_.post()); } - } else if (GraphState::kInTransition == graph_state) { + } + else if (GraphState::kInTransition == graph_state) + { // process has started, but graph is still in transition graph_->abort(execution_error_code, ControlClientCode::kFailedUnexpectedTerminationOnEnter); } } -void ProcessInfoNode::terminated(int32_t process_status) { - LM_LOG_DEBUG() << "Child process" << process_index_ << "of" << graph_->getProcessGroupName() << "pid" - << pid_ << "(" << config_->startup_config_.short_name_ << ") for node" << this - << "terminated with status" << process_status; +void ProcessInfoNode::terminated(int32_t process_status) +{ + LM_LOG_DEBUG() << "Child process" << process_index_ << "of" << graph_->getProcessGroupName() << "pid" << pid_ << "(" + << config_->startup_config_.short_name_ << ") for node" << this << "terminated with status" + << process_status; status_ = process_status; - if (!config_->pgm_config_.is_self_terminating_ || (process_status != 0)) { + if (!config_->pgm_config_.is_self_terminating_ || (process_status != 0)) + { // fudge the status if the process is not self-terminating but has still exited // with zero status: - if (0 == status_) { + if (0 == status_) + { status_ = -1; } - if (graph_->isStarting()) { + if (graph_->isStarting()) + { unexpectedTermination(); } } static_cast(setState(score::lcm::ProcessState::kTerminated)); // Cannot fail by design - if (control_client_channel_) { + if (control_client_channel_) + { control_client_channel_->releaseParentMapping(); control_client_channel_.reset(); } // Handle the situation where the graph is stalled waiting for a process to terminate - if (config_->pgm_config_.is_self_terminating_ && dependent_on_terminating_.size()) { + if (config_->pgm_config_.is_self_terminating_ && dependent_on_terminating_.size()) + { queueTerminationSuccessorJobs(); } // handle the situation where a worker thread is waiting for a process to terminate - if (has_semaphore_.exchange(false)) { + if (has_semaphore_.exchange(false)) + { terminator_.post(); } } -void ProcessInfoNode::startProcess() { +void ProcessInfoNode::startProcess() +{ LM_LOG_DEBUG() << "Starting process" << process_index_ << "(" << config_->startup_config_.short_name_ << ") from executable" << config_->startup_config_.executable_path_; restart_counter_ = config_->pgm_config_.number_of_restart_attempts; - do { + do + { status_ = 0; - if (setState(score::lcm::ProcessState::kIdle)) { + if (setState(score::lcm::ProcessState::kIdle)) + { uint32_t execution_error_code = config_->pgm_config_.execution_error_code_; auto pg_mgr = graph_->getProcessGroupManager(); pid_ = 0; status_ = 0; static_cast(setState(score::lcm::ProcessState::kStarting)); // Cannot fail by design - if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) { + if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) + { // Don't start launch manager, we're already running LM_LOG_DEBUG() << "Found myself (" << config_->startup_config_.argv_[0U] << ") in a process group to start, not starting, reporting kRunning"; @@ -255,15 +314,19 @@ void ProcessInfoNode::startProcess() { } if (osal::OsalReturnType::kSuccess == - pg_mgr->getProcessInterface()->startProcess(&pid_, &sync_, &config_->startup_config_)) { + pg_mgr->getProcessInterface()->startProcess(&pid_, &sync_, &config_->startup_config_)) + { LM_LOG_DEBUG() << "startProcess pid" << pid_ << "received for process:" << config_->startup_config_.short_name_; - if (osal::CommsType::kControlClient == config_->startup_config_.comms_type_) { + if (osal::CommsType::kControlClient == config_->startup_config_.comms_type_) + { setupControlClientChannel(); } handleProcessStarted(execution_error_code); - } else { + } + else + { setState(score::lcm::ProcessState::kTerminated); graph_->abort(execution_error_code, ControlClientCode::kSetStateFailed); } @@ -274,61 +337,76 @@ void ProcessInfoNode::startProcess() { << config_->startup_config_.short_name_ << ") done"; } -inline void ProcessInfoNode::setupControlClientChannel() { +inline void ProcessInfoNode::setupControlClientChannel() +{ // Make sure we store the control_client_channel before waiting for kRunning - std::atomic_store(&control_client_channel_,ControlClientChannel::getControlClientChannel(sync_)); + std::atomic_store(&control_client_channel_, ControlClientChannel::getControlClientChannel(sync_)); - if (control_client_channel_) { // Put it at the front of the list if it's not there already + if (control_client_channel_) + { // Put it at the front of the list if it's not there already auto node0 = graph_->getNodes()[0U]; - if (this != node0.get()) { + if (this != node0.get()) + { std::atomic_store(&next_state_manager_, node0->next_state_manager_); std::atomic_store(&node0->next_state_manager_, graph_->getNodes()[process_index_]); } } } -void ProcessInfoNode::handleProcessStillStarting(uint32_t execution_error_code) { - if (graph_->getState() == GraphState::kInTransition) { +void ProcessInfoNode::handleProcessStillStarting(uint32_t execution_error_code) +{ + if (graph_->getState() == GraphState::kInTransition) + { if (((osal::CommsType::kNoComms == config_->startup_config_.comms_type_) || (graph_->getProcessGroupManager()->getProcessInterface()->waitForkRunning( sync_, config_->pgm_config_.startup_timeout_ms_) == osal::OsalReturnType::kSuccess)) && - (0 == status_)) { + (0 == status_)) + { handleProcessRunning(execution_error_code); - } else // process is reporting, result is kFail or status is not zero (indicating the process has exited badly) + } + else // process is reporting, result is kFail or status is not zero (indicating the process has exited badly) { LM_LOG_WARN() << "Got kRunning timeout for process" << process_index_ << "(" << config_->startup_config_.short_name_ << ")"; ControlClientCode errcode = status_ ? ControlClientCode::kFailedUnexpectedTerminationOnEnter : ControlClientCode::kSetStateFailed; terminateProcess(); - if (0U == restart_counter_) { + if (0U == restart_counter_) + { graph_->abort(execution_error_code, errcode); } } } } -void ProcessInfoNode::handleProcessAlreadyTerminated(uint32_t execution_error_code) { - if ((0 != status_) || (osal::CommsType::kNoComms != config_->startup_config_.comms_type_)) { +void ProcessInfoNode::handleProcessAlreadyTerminated(uint32_t execution_error_code) +{ + if ((0 != status_) || (osal::CommsType::kNoComms != config_->startup_config_.comms_type_)) + { // Error. To get a legal terminated before kRunning the process must be self-terminating, non-reporting // and to have exited with zero status LM_LOG_WARN() << "Got process termination before kRunning for pid" << pid_ << "(" << config_->startup_config_.short_name_ << ") process" << process_index_ << "of group" << graph_->getProcessGroupName(); - //This will cause the graph to fail, we must report to SM (unless we have restart attempts left) - if (0U == restart_counter_) { + // This will cause the graph to fail, we must report to SM (unless we have restart attempts left) + if (0U == restart_counter_) + { graph_->abort(execution_error_code, ControlClientCode::kFailedUnexpectedTerminationOnEnter); } - } else { + } + else + { // case of a self-terminating, non-reporting process exiting nicely before we've had a chance to put an // entry in the map queueTerminationSuccessorJobs(); } } -void ProcessInfoNode::handleProcessStarted(uint32_t execution_error_code) { - switch (graph_->getProcessGroupManager()->getProcessMap()->insertIfNotTerminated(pid_, this)) { +void ProcessInfoNode::handleProcessStarted(uint32_t execution_error_code) +{ + switch (graph_->getProcessGroupManager()->getProcessMap()->insertIfNotTerminated(pid_, this)) + { case 0: // Normal case, entry was put in the map, process still running handleProcessStillStarting(execution_error_code); break; @@ -345,12 +423,16 @@ void ProcessInfoNode::handleProcessStarted(uint32_t execution_error_code) { } } -void ProcessInfoNode::handleProcessRunning(uint32_t execution_error_code) { - if (osal::CommsType::kNoComms == config_->startup_config_.comms_type_) { +void ProcessInfoNode::handleProcessRunning(uint32_t execution_error_code) +{ + if (osal::CommsType::kNoComms == config_->startup_config_.comms_type_) + { LM_LOG_DEBUG() << "Considered kRunning for Non Reporting Process pid" << pid_ << "(" << config_->startup_config_.short_name_ << ") process" << process_index_ << "of group" << graph_->getProcessGroupName(); - } else { + } + else + { LM_LOG_DEBUG() << "Got kRunning for pid" << pid_ << "(" << config_->startup_config_.short_name_ << ") process" << process_index_ << "of group" << graph_->getProcessGroupName(); } @@ -359,9 +441,12 @@ void ProcessInfoNode::handleProcessRunning(uint32_t execution_error_code) { // Therefore, a process in the terminated state is a new error not related to process // starting (and so not eligible for a restart), or it's OK because its a self- // terminating process. - if (setState(score::lcm::ProcessState::kRunning) || (config_->pgm_config_.is_self_terminating_ && (0 == status_))) { + if (setState(score::lcm::ProcessState::kRunning) || (config_->pgm_config_.is_self_terminating_ && (0 == status_))) + { processSuccessorNodes(); - } else if (restart_counter_ == 0U) { + } + else if (restart_counter_ == 0U) + { graph_->abort(execution_error_code, ControlClientCode::kSetStateFailed); } @@ -369,60 +454,85 @@ void ProcessInfoNode::handleProcessRunning(uint32_t execution_error_code) { // OSHandler thread, when terminated() is called } -void ProcessInfoNode::processSuccessorNodes() { - for (auto& successor_node : dependent_on_running_) { - if (successor_node->is_included_ && successor_node->dependencies_ > 0U) { +void ProcessInfoNode::processSuccessorNodes() +{ + for (auto& successor_node : dependent_on_running_) + { + if (successor_node->is_included_ && successor_node->dependencies_ > 0U) + { checkForEmptyDependencies(successor_node); } } } -void ProcessInfoNode::checkForEmptyDependencies(std::shared_ptr& successor_node) { - if (0U == --successor_node->dependencies_) { - while (graph_->getState() == GraphState::kInTransition) { - if (graph_->getProcessGroupManager()->getWorkerJobs()->addJobToQueue(successor_node)) { +void ProcessInfoNode::checkForEmptyDependencies(std::shared_ptr& successor_node) +{ + if (0U == --successor_node->dependencies_) + { + while (graph_->getState() == GraphState::kInTransition) + { + auto push_res = graph_->getProcessGroupManager()->getWorkerJobs()->push(successor_node, kMaxQueueDelay); + if (push_res) + { graph_->markNodeInFlight(); break; } + else if (push_res.error() == ConcurrencyErrc::kTimeout) + { + continue; + } + else + { + break; + } } } } -void ProcessInfoNode::terminateProcess() { +void ProcessInfoNode::terminateProcess() +{ LM_LOG_DEBUG() << "terminating process" << process_index_ << "(" << config_->startup_config_.short_name_ << ")"; - if (setState(score::lcm::ProcessState::kTerminating)) { - if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) { + if (setState(score::lcm::ProcessState::kTerminating)) + { + if (osal::CommsType::kLaunchManager == config_->startup_config_.comms_type_) + { LM_LOG_DEBUG() << "Found myself (" << config_->startup_config_.argv_[0U] << ") in a process group to terminate, not terminating, reporting kTerminated"; static_cast(setState(score::lcm::ProcessState::kTerminated)); // Cannot fail by design - } else { + } + else + { handleTerminationProcess(); } } - if (!graph_->isStarting() || (0 == status_)) { + if (!graph_->isStarting() || (0 == status_)) + { queueTerminationSuccessorJobs(); } - LM_LOG_DEBUG() << "terminateProcess for" << graph_->getProcessGroupName() << "process" << process_index_ - << "(" << config_->startup_config_.short_name_ << ") done"; + LM_LOG_DEBUG() << "terminateProcess for" << graph_->getProcessGroupName() << "process" << process_index_ << "(" + << config_->startup_config_.short_name_ << ") done"; } -inline void ProcessInfoNode::handleTerminationProcess() { +inline void ProcessInfoNode::handleTerminationProcess() +{ auto pg_mgr = graph_->getProcessGroupManager(); terminator_.init(0U, false); has_semaphore_.store(true); - LM_LOG_DEBUG() << "Requesting termination of process" << process_index_ << "of" - << graph_->getProcessGroupName() << "pid" << pid_ << "(" - << config_->startup_config_.short_name_ << ")"; + LM_LOG_DEBUG() << "Requesting termination of process" << process_index_ << "of" << graph_->getProcessGroupName() + << "pid" << pid_ << "(" << config_->startup_config_.short_name_ << ")"; - //handle request termination + // handle request termination if ((pg_mgr->getProcessInterface()->requestTermination(pid_) == osal::OsalReturnType::kFail) || - (terminator_.timedWait(config_->pgm_config_.termination_timeout_ms_) == osal::OsalReturnType::kSuccess)) { + (terminator_.timedWait(config_->pgm_config_.termination_timeout_ms_) == osal::OsalReturnType::kSuccess)) + { LM_LOG_DEBUG() << "Queuing jobs after regular termination of process wait" << process_index_ << "(" << config_->startup_config_.short_name_ << ")"; - } else { - //handle forced termination + } + else + { + // handle forced termination handleForcedTermination(); } @@ -430,65 +540,80 @@ inline void ProcessInfoNode::handleTerminationProcess() { terminator_.deinit(); } -inline void ProcessInfoNode::handleForcedTermination() { +inline void ProcessInfoNode::handleForcedTermination() +{ LM_LOG_WARN() << "Process" << process_index_ << "(" << config_->startup_config_.short_name_ << ") did not respond to SIGTERM, sending SIGKILL"; while ((osal::OsalReturnType::kSuccess == graph_->getProcessGroupManager()->getProcessInterface()->forceTermination(pid_)) && (graph_->getState() == GraphState::kInTransition) && - (terminator_.timedWait(score::lcm::internal::kMaxSigKillDelay) != osal::OsalReturnType::kSuccess)) { + (terminator_.timedWait(score::lcm::internal::kMaxSigKillDelay) != osal::OsalReturnType::kSuccess)) + { LM_LOG_FATAL() << "Process" << process_index_ << "(" << config_->startup_config_.short_name_ << ") did not respond to SIGKILL!!"; } } -void ProcessInfoNode::doWork() { - if (graph_->getState() == GraphState::kInTransition) { - if (graph_->isStarting()) { +void ProcessInfoNode::doWork() +{ + if (graph_->getState() == GraphState::kInTransition) + { + if (graph_->isStarting()) + { startProcess(); - } else { + } + else + { terminateProcess(); } } graph_->nodeExecuted(); } -std::shared_ptr ProcessInfoNode::getNextStateManager() { +std::shared_ptr ProcessInfoNode::getNextStateManager() +{ // Remove dead state managers from the list on the fly - while (std::atomic_load(&next_state_manager_) && !std::atomic_load(&next_state_manager_)->control_client_channel_) { + while (std::atomic_load(&next_state_manager_) && !std::atomic_load(&next_state_manager_)->control_client_channel_) + { std::atomic_store(&next_state_manager_, next_state_manager_->next_state_manager_); } return std::atomic_load(&next_state_manager_); } -osal::ProcessID ProcessInfoNode::getPid() const { +osal::ProcessID ProcessInfoNode::getPid() const +{ return pid_; } -score::lcm::ProcessState ProcessInfoNode::getState() const { +score::lcm::ProcessState ProcessInfoNode::getState() const +{ return process_state_.load(); } -void ProcessInfoNode::markRequested(bool requested) { +void ProcessInfoNode::markRequested(bool requested) +{ in_requested_state_ = requested; } -bool ProcessInfoNode::isHeadNode() const { +bool ProcessInfoNode::isHeadNode() const +{ return is_head_node_; } -uint32_t ProcessInfoNode::getNodeIndex() const { +uint32_t ProcessInfoNode::getNodeIndex() const +{ return process_index_; } -ControlClientChannelP ProcessInfoNode::getControlClientChannel() { +ControlClientChannelP ProcessInfoNode::getControlClientChannel() +{ return std::atomic_load(&control_client_channel_); } -} // namespace lcm - } // namespace internal +} // namespace lcm + } // namespace score diff --git a/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp b/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp index fa0249b2..5937647c 100644 --- a/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp +++ b/src/launch_manager_daemon/src/process_group_manager/workerthread.cpp @@ -11,53 +11,74 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -#include "processinfonode.hpp" #include "workerthread.hpp" +#include "processinfonode.hpp" -namespace score { +namespace score +{ -namespace lcm { +namespace lcm +{ -namespace internal { +namespace internal +{ template -WorkerThread::WorkerThread(std::shared_ptr> queue, uint32_t num_threads) - : the_job_queue_(queue), num_threads_(num_threads) { - worker_threads_.reserve(num_threads_); +WorkerThread::WorkerThread(std::shared_ptr queue, uint32_t num_threads) : the_job_queue_(queue) +{ + worker_threads_.reserve(num_threads); - for (uint32_t i = 0U; i < num_threads_; ++i) { + for (uint32_t i = 0U; i < num_threads; ++i) + { static_cast(i); worker_threads_.emplace_back(std::make_unique(&WorkerThread::run, this)); } } template -WorkerThread::~WorkerThread() { - the_job_queue_->stopQueue(num_threads_); +WorkerThread::~WorkerThread() +{ + stop(); - for (auto& thread : worker_threads_) { - if (thread->joinable()) { + for (auto& thread : worker_threads_) + { + if (thread->joinable()) + { thread->join(); } } } template -void WorkerThread::run() { - while (the_job_queue_->isRunning()) { - auto job = the_job_queue_->getJobFromQueue(); +void WorkerThread::stop() +{ + static_cast(the_job_queue_->stop()); +} - if (job) { - job->doWork(); +template +void WorkerThread::run() +{ + while (true) + { + auto job = the_job_queue_->pop(); + if (!job) + { + if (job.error() == ConcurrencyErrc::kStopped) + { + break; + } + LM_LOG_ERROR() << "Got an error getting a job: " << job.error(); + continue; } + (*job)->doWork(); } } // Explicit instantiation for ProcessInfoNode template class WorkerThread; -} // namespace lcm - } // namespace internal +} // namespace lcm + } // namespace score diff --git a/src/launch_manager_daemon/src/process_group_manager/workerthread.hpp b/src/launch_manager_daemon/src/process_group_manager/workerthread.hpp index e8a12d55..12a7cbc7 100644 --- a/src/launch_manager_daemon/src/process_group_manager/workerthread.hpp +++ b/src/launch_manager_daemon/src/process_group_manager/workerthread.hpp @@ -11,37 +11,36 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ - #ifndef WORKER_THREAD_HPP_INCLUDED #define WORKER_THREAD_HPP_INCLUDED -#include +#include +#include #include #include #include -#include - -namespace score { -namespace lcm { - -namespace internal { +namespace score::lcm::internal +{ /// @brief Templated worker thread pool for executing jobs from a queue. /// This class manages a pool of worker threads that continuously retrieve and execute jobs -/// from a JobQueue until the pool is destructed or stopped. -/// @tparam T The type of items stored in the JobQueue. +/// from an MPMCConcurrentQueue until the pool is stopped or destructed. +/// @tparam T The type of items stored in the queue (as std::shared_ptr). template -class WorkerThread final { - public: +class WorkerThread final +{ + using Queue = MPMCConcurrentQueue, static_cast(ProcessLimits::kMaxProcesses)>; + + public: /// @brief Constructs a WorkerThread pool with the specified number of threads. /// - /// @param queue The JobQueue from which threads will take work items. + /// @param queue The MpmcQueue from which threads will take work items. /// @param num_threads Number of threads in the pool. - WorkerThread(std::shared_ptr> queue, uint32_t num_threads); + WorkerThread(std::shared_ptr queue, uint32_t num_threads); /// @brief Destructor. - /// Ensures all threads exit gracefully by setting is_running_ to false and joining each thread. + /// Requests stop and joins all worker threads. ~WorkerThread(); // Rule of five @@ -57,25 +56,22 @@ class WorkerThread final { /// @brief Move assignment operator is deleted to prevent moving. WorkerThread& operator=(WorkerThread&&) = delete; - private: + /// @brief Requests all worker threads to stop. + /// Calls stop() on the queue, which unblocks all threads waiting in pop(). + void stop(); + + private: /// @brief Entry point for each worker thread. - /// Threads continuously retrieve and execute jobs from the_job_queue_ until is_running_ becomes false. + /// Blocks on pop() until a job arrives or the queue is stopped, then executes the job. void run(); /// @brief The queue from which each thread takes work. - std::shared_ptr> the_job_queue_{}; - - /// @brief Number of threads in the pool. Necessary to store this from the constructor to the destructor. - uint32_t num_threads_{}; + std::shared_ptr the_job_queue_{}; /// @brief Vector of worker threads. std::vector> worker_threads_{}; }; -} // namespace lcm - -} // namespace internal - -} // namespace score +} // namespace score::lcm::internal #endif // WORKER_THREAD_HPP_INCLUDED