Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions tests/unit/test_agent_core.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
#include "agents/agent_core.hpp"
#include "concurrency/logger.hpp"
#include "core/config.hpp"
#include "core/message.hpp"
#include "core/message_bus.hpp"

#include <algorithm>
#include <string>
#include <thread>
#include <vector>

#include <gtest/gtest.h>
#include <spdlog/sinks/ringbuffer_sink.h>
#include <spdlog/spdlog.h>

using namespace keystone;
using namespace keystone::agents;
Expand Down Expand Up @@ -518,3 +524,109 @@ TEST_F(AgentCoreTest, FairnessDoesNotLoseMessages) {
EXPECT_EQ(normal_count, 5) << "All 5 NORMAL messages should be received";
EXPECT_EQ(commands.size(), 15u) << "Total of 15 messages should be received";
}

// ---------------------------------------------------------------------------
// Logger output assertions for backpressure events
// ---------------------------------------------------------------------------

namespace {

/// Attach a ringbuffer sink to the "keystone" spdlog logger (after
/// Logger::init()), run @p fn, then return all formatted log lines captured.
std::vector<std::string> captureLogLines(std::function<void()> fn) {
keystone::concurrency::Logger::init(spdlog::level::trace);

auto logger = spdlog::get("keystone");
auto sink = std::make_shared<spdlog::sinks::ringbuffer_sink_mt>(256);
sink->set_level(spdlog::level::trace);
logger->sinks().push_back(sink);

fn();

// Flush to ensure all records are in the buffer
logger->flush();

// Remove the test sink before returning
auto& sinks = logger->sinks();
sinks.erase(std::remove(sinks.begin(), sinks.end(), sink), sinks.end());

return sink->last_formatted();
}

bool anyLineContains(const std::vector<std::string>& lines, const std::string& substr) {
for (const auto& line : lines) {
if (line.find(substr) != std::string::npos) {
return true;
}
}
return false;
}

} // namespace

// Test: backpressure WARN is logged when inbox is full
TEST(AgentCoreLogTest, BackpressureInboxFullLogsWarn) {
// Must shutdown any existing logger so we get a fresh one with our sink
keystone::concurrency::Logger::shutdown();

auto agent = std::make_shared<TestAgent>("log_test_agent");

auto lines = captureLogLines([&]() {
// Fill queue beyond max to trigger backpressure log
size_t max_size = keystone::core::Config::AGENT_MAX_QUEUE_SIZE;
for (size_t i = 0; i < max_size + 20; ++i) {
auto msg = keystone::core::KeystoneMessage::create(
"sender", agent->getAgentId(), "msg_" + std::to_string(i));
msg.priority = keystone::core::Priority::NORMAL;
agent->receiveMessage(msg);
}
});

EXPECT_TRUE(anyLineContains(lines, "[BACKPRESSURE]"))
<< "Expected [BACKPRESSURE] warning in log output";
EXPECT_TRUE(anyLineContains(lines, "inbox full"))
<< "Expected 'inbox full' phrase in log output";
EXPECT_TRUE(anyLineContains(lines, "log_test_agent"))
<< "Expected agent id in backpressure log line";

keystone::concurrency::Logger::shutdown();
}

// Test: backpressure recovery INFO is logged when queue drains below low watermark
TEST(AgentCoreLogTest, BackpressureRecoveryLogsInfo) {
keystone::concurrency::Logger::shutdown();

auto agent = std::make_shared<TestAgent>("recovery_log_agent");

auto lines = captureLogLines([&]() {
size_t max_size = keystone::core::Config::AGENT_MAX_QUEUE_SIZE;

// Fill to trigger backpressure
for (size_t i = 0; i < max_size + 20; ++i) {
auto msg = keystone::core::KeystoneMessage::create(
"sender", agent->getAgentId(), "fill_" + std::to_string(i));
msg.priority = keystone::core::Priority::NORMAL;
agent->receiveMessage(msg);
}

// Drain below low watermark so recovery log fires
size_t low_watermark =
static_cast<size_t>(max_size * keystone::core::Config::AGENT_QUEUE_LOW_WATERMARK_PERCENT);
size_t drain_count = max_size - low_watermark + 20;
for (size_t i = 0; i < drain_count; ++i) {
agent->getMessage();
}

// Send one more message — this triggers the recovery CAS log
auto msg = keystone::core::KeystoneMessage::create("sender", agent->getAgentId(), "trigger");
msg.priority = keystone::core::Priority::NORMAL;
agent->receiveMessage(msg);
});

EXPECT_TRUE(anyLineContains(lines, "recovered"))
<< "Expected 'recovered' in log output after backpressure clears";
EXPECT_TRUE(anyLineContains(lines, "accepting messages"))
<< "Expected 'accepting messages' phrase in recovery log line";

keystone::concurrency::Logger::shutdown();
}
101 changes: 101 additions & 0 deletions tests/unit/test_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@
* @brief Unit tests for ThreadPool
*/

#include "concurrency/logger.hpp"
#include "concurrency/task.hpp"
#include "concurrency/thread_pool.hpp"

#include <algorithm>
#include <atomic>
#include <chrono>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>

#include <gtest/gtest.h>
#include <spdlog/sinks/ringbuffer_sink.h>
#include <spdlog/spdlog.h>

using namespace keystone::concurrency;

Expand Down Expand Up @@ -222,3 +229,97 @@ TEST(ThreadPoolTest, DestructorShutdown) {
// After scope, all work should be done
EXPECT_EQ(counter.load(), 5);
}

// ---------------------------------------------------------------------------
// Logger output assertions for worker exception events
// ---------------------------------------------------------------------------

namespace {

/// Capture all spdlog "keystone" lines produced while @p fn runs.
std::vector<std::string> captureThreadPoolLogLines(std::function<void()> fn) {
Logger::init(spdlog::level::trace);

auto logger = spdlog::get("keystone");
auto sink = std::make_shared<spdlog::sinks::ringbuffer_sink_mt>(256);
sink->set_level(spdlog::level::trace);
logger->sinks().push_back(sink);

fn();

logger->flush();

auto& sinks = logger->sinks();
sinks.erase(std::remove(sinks.begin(), sinks.end(), sink), sinks.end());

return sink->last_formatted();
}

bool anyLineContains(const std::vector<std::string>& lines, const std::string& substr) {
for (const auto& line : lines) {
if (line.find(substr) != std::string::npos) {
return true;
}
}
return false;
}

} // namespace

// Test: std::exception thrown in worker is logged at error level
TEST(ThreadPoolLogTest, WorkerStdExceptionIsLogged) {
Logger::shutdown();

std::vector<std::string> lines;
{
ThreadPool pool(1);

lines = captureThreadPoolLogLines([&]() {
std::atomic<bool> done{false};
pool.submit([&done]() {
done.store(true);
throw std::runtime_error("worker-boom");
});
// Wait for the task to execute and the exception to be caught/logged
for (int i = 0; i < 50 && !done.load(); ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// Give the catch block a moment to emit the log line
std::this_thread::sleep_for(std::chrono::milliseconds(50));
});
}

EXPECT_TRUE(anyLineContains(lines, "worker-boom"))
<< "Expected exception message in log output";
EXPECT_TRUE(anyLineContains(lines, "Exception in worker"))
<< "Expected 'Exception in worker' prefix in log output";

Logger::shutdown();
}

// Test: unknown exception thrown in worker is logged at error level
TEST(ThreadPoolLogTest, WorkerUnknownExceptionIsLogged) {
Logger::shutdown();

std::vector<std::string> lines;
{
ThreadPool pool(1);

lines = captureThreadPoolLogLines([&]() {
std::atomic<bool> done{false};
pool.submit([&done]() {
done.store(true);
throw 42; // non-std::exception
});
for (int i = 0; i < 50 && !done.load(); ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
});
}

EXPECT_TRUE(anyLineContains(lines, "Unknown exception"))
<< "Expected 'Unknown exception' in log output for non-std throw";

Logger::shutdown();
}
Loading