Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions benchmarks/distributed_work_stealing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <atomic>
#include <chrono>
#include <cstdint>
#include <thread>
#include <vector>

Expand All @@ -25,8 +26,8 @@ static void BM_WorkStealing_LocalOnly(benchmark::State& state) {
for (size_t i = 0; i < num_tasks; ++i) {
cluster.submitToNode(0, [&completed]() {
// Simulate work
volatile int sum = 0;
for (int j = 0; j < 100; ++j) {
volatile int32_t sum = 0;
for (int32_t j = 0; j < 100; ++j) {
sum += j;
}
completed++;
Expand Down Expand Up @@ -63,8 +64,8 @@ static void BM_WorkStealing_TwoNodes_100us(benchmark::State& state) {
// Submit tasks to both nodes (round-robin)
for (size_t i = 0; i < num_tasks; ++i) {
cluster.submitToNode(i % 2, [&completed]() {
volatile int sum = 0;
for (int j = 0; j < 100; ++j) {
volatile int32_t sum = 0;
for (int32_t j = 0; j < 100; ++j) {
sum += j;
}
completed++;
Expand Down Expand Up @@ -100,8 +101,8 @@ static void BM_WorkStealing_TwoNodes_500us(benchmark::State& state) {

for (size_t i = 0; i < num_tasks; ++i) {
cluster.submitToNode(i % 2, [&completed]() {
volatile int sum = 0;
for (int j = 0; j < 100; ++j) {
volatile int32_t sum = 0;
for (int32_t j = 0; j < 100; ++j) {
sum += j;
}
completed++;
Expand Down Expand Up @@ -136,8 +137,8 @@ static void BM_WorkStealing_TwoNodes_1ms(benchmark::State& state) {

for (size_t i = 0; i < num_tasks; ++i) {
cluster.submitToNode(i % 2, [&completed]() {
volatile int sum = 0;
for (int j = 0; j < 100; ++j) {
volatile int32_t sum = 0;
for (int32_t j = 0; j < 100; ++j) {
sum += j;
}
completed++;
Expand Down Expand Up @@ -171,8 +172,8 @@ static void BM_LoadBalancing_Imbalanced(benchmark::State& state) {
// Submit all tasks to node 0 (creates imbalance)
for (size_t i = 0; i < num_tasks; ++i) {
cluster.submitToNode(0, [&completed]() {
volatile int sum = 0;
for (int j = 0; j < 100; ++j) {
volatile int32_t sum = 0;
for (int32_t j = 0; j < 100; ++j) {
sum += j;
}
completed++;
Expand Down Expand Up @@ -266,8 +267,8 @@ static void BM_AgentAffinity_Registered(benchmark::State& state) {
std::vector<std::string> agents = {"agent_A", "agent_B", "agent_C", "agent_D"};
for (size_t i = 0; i < num_tasks; ++i) {
cluster.submit(agents[i % 4], [&completed]() {
volatile int sum = 0;
for (int j = 0; j < 100; ++j) {
volatile int32_t sum = 0;
for (int32_t j = 0; j < 100; ++j) {
sum += j;
}
completed++;
Expand Down
27 changes: 14 additions & 13 deletions benchmarks/hierarchy_performance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <atomic>
#include <chrono>
#include <cstdint>
#include <memory>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -84,7 +85,7 @@ static void BM_4LayerHierarchy_MessageFlow(benchmark::State& state) {
module2->setMessageBus(&bus);

std::vector<std::shared_ptr<TaskAgent>> tasks;
for (int i = 1; i <= 6; ++i) {
for (int32_t i = 1; i <= 6; ++i) {
auto task = std::make_shared<TaskAgent>("task" + std::to_string(i));
bus.registerAgent(task->getAgentId(), task);
task->setMessageBus(&bus);
Expand All @@ -107,20 +108,20 @@ BENCHMARK(BM_4LayerHierarchy_MessageFlow);

// Benchmark: Task Agent Registry Lookup Scalability
static void BM_TaskAgent_RegistryLookup(benchmark::State& state) {
int num_agents = state.range(0);
int32_t num_agents = static_cast<int32_t>(state.range(0));

MessageBus bus;
std::vector<std::shared_ptr<TaskAgent>> agents;

// Register N task agents
for (int i = 0; i < num_agents; ++i) {
for (int32_t i = 0; i < num_agents; ++i) {
auto agent = std::make_shared<TaskAgent>("task" + std::to_string(i));
bus.registerAgent(agent->getAgentId(), agent);
agents.push_back(agent);
}

// Benchmark lookup operations
int lookup_idx = 0;
int32_t lookup_idx = 0;
for (auto _ : state) {
std::string agent_id = "task" + std::to_string(lookup_idx % num_agents);
benchmark::DoNotOptimize(bus.hasAgent(agent_id));
Expand All @@ -134,15 +135,15 @@ BENCHMARK(BM_TaskAgent_RegistryLookup)->Range(10, 1000);

// Benchmark: Scheduler Submission Rate (Work-Stealing)
static void BM_Scheduler_SubmissionRate(benchmark::State& state) {
const int num_workers = state.range(0);
const int32_t num_workers = static_cast<int32_t>(state.range(0));

WorkStealingScheduler scheduler(num_workers);
scheduler.start();

// Warmup: get workers ready
for (int i = 0; i < num_workers; ++i) {
for (int32_t i = 0; i < num_workers; ++i) {
scheduler.submit([]() {
volatile int x = 42;
volatile int32_t x = 42;
(void)x;
});
}
Expand All @@ -151,8 +152,8 @@ static void BM_Scheduler_SubmissionRate(benchmark::State& state) {
// Benchmark submission rate
for (auto _ : state) {
scheduler.submit([]() {
volatile int sum = 0;
for (int j = 0; j < 10; ++j) {
volatile int32_t sum = 0;
for (int32_t j = 0; j < 10; ++j) {
sum += j;
}
});
Expand Down Expand Up @@ -189,15 +190,15 @@ BENCHMARK(BM_Agent_MessageProcessing);

// Benchmark: Component Leadership with Multiple Modules
static void BM_ComponentLead_MultiModule(benchmark::State& state) {
const int num_modules = state.range(0);
const int32_t num_modules = static_cast<int32_t>(state.range(0));

MessageBus bus;
auto comp_lead = std::make_shared<ComponentLeadAgent>("comp_lead");
bus.registerAgent(comp_lead->getAgentId(), comp_lead);
comp_lead->setMessageBus(&bus);

std::vector<std::shared_ptr<ModuleLeadAgent>> modules;
for (int i = 0; i < num_modules; ++i) {
for (int32_t i = 0; i < num_modules; ++i) {
auto module = std::make_shared<ModuleLeadAgent>("module" + std::to_string(i));
bus.registerAgent(module->getAgentId(), module);
module->setMessageBus(&bus);
Expand All @@ -219,15 +220,15 @@ BENCHMARK(BM_ComponentLead_MultiModule)->Range(2, 16);

// Benchmark: Module Leadership with Multiple Tasks
static void BM_ModuleLead_MultiTask(benchmark::State& state) {
const int num_tasks = state.range(0);
const int32_t num_tasks = static_cast<int32_t>(state.range(0));

MessageBus bus;
auto module = std::make_shared<ModuleLeadAgent>("module");
bus.registerAgent(module->getAgentId(), module);
module->setMessageBus(&bus);

std::vector<std::shared_ptr<TaskAgent>> tasks;
for (int i = 0; i < num_tasks; ++i) {
for (int32_t i = 0; i < num_tasks; ++i) {
auto task = std::make_shared<TaskAgent>("task" + std::to_string(i));
bus.registerAgent(task->getAgentId(), task);
task->setMessageBus(&bus);
Expand Down
27 changes: 14 additions & 13 deletions benchmarks/message_bus_performance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "core/message_bus.hpp"

#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -46,12 +47,12 @@ BENCHMARK(BM_MessageRouting_SingleAgent);

// Benchmark: Routing to many agents (fan-out)
static void BM_MessageRouting_FanOut(benchmark::State& state) {
int num_agents = state.range(0);
int32_t num_agents = static_cast<int32_t>(state.range(0));

MessageBus bus;
std::vector<std::shared_ptr<TaskAgent>> agents;

for (int i = 0; i < num_agents; ++i) {
for (int32_t i = 0; i < num_agents; ++i) {
auto agent = std::make_shared<TaskAgent>("agent-" + std::to_string(i));
bus.registerAgent(agent->getAgentId(), agent);
agent->setMessageBus(&bus);
Expand Down Expand Up @@ -89,21 +90,21 @@ BENCHMARK(BM_AgentRegistration);

// Benchmark: Agent unregistration overhead
static void BM_AgentUnregistration(benchmark::State& state) {
int num_agents = 1000;
int32_t num_agents = 1000;

for (auto _ : state) {
state.PauseTiming();
MessageBus bus;
std::vector<std::shared_ptr<TaskAgent>> agents;

for (int i = 0; i < num_agents; ++i) {
for (int32_t i = 0; i < num_agents; ++i) {
auto agent = std::make_shared<TaskAgent>("agent-" + std::to_string(i));
bus.registerAgent(agent->getAgentId(), agent);
agents.push_back(agent);
}
state.ResumeTiming();

for (int i = 0; i < num_agents; ++i) {
for (int32_t i = 0; i < num_agents; ++i) {
bus.unregisterAgent("agent-" + std::to_string(i));
}
}
Expand All @@ -114,12 +115,12 @@ BENCHMARK(BM_AgentUnregistration);

// Benchmark: Agent lookup (hasAgent)
static void BM_AgentLookup(benchmark::State& state) {
int num_agents = state.range(0);
int32_t num_agents = static_cast<int32_t>(state.range(0));

MessageBus bus;
std::vector<std::shared_ptr<TaskAgent>> agents;

for (int i = 0; i < num_agents; ++i) {
for (int32_t i = 0; i < num_agents; ++i) {
auto agent = std::make_shared<TaskAgent>("agent-" + std::to_string(i));
bus.registerAgent(agent->getAgentId(), agent);
agents.push_back(agent);
Expand All @@ -138,12 +139,12 @@ BENCHMARK(BM_AgentLookup)->Range(8, 1024);

// Benchmark: List all agents
static void BM_ListAgents(benchmark::State& state) {
int num_agents = state.range(0);
int32_t num_agents = static_cast<int32_t>(state.range(0));

MessageBus bus;
std::vector<std::shared_ptr<TaskAgent>> agents;

for (int i = 0; i < num_agents; ++i) {
for (int32_t i = 0; i < num_agents; ++i) {
auto agent = std::make_shared<TaskAgent>("agent-" + std::to_string(i));
bus.registerAgent(agent->getAgentId(), agent);
agents.push_back(agent);
Expand Down Expand Up @@ -179,7 +180,7 @@ BENCHMARK(BM_ConcurrentRouting)->ThreadRange(1, 8);

// Benchmark: Message routing with payload (varying sizes)
static void BM_MessageRouting_WithPayload(benchmark::State& state) {
int payload_size = state.range(0); // Size in bytes
int32_t payload_size = static_cast<int32_t>(state.range(0)); // Size in bytes

MessageBus bus;
auto agent = std::make_shared<TaskAgent>("test-agent");
Expand Down Expand Up @@ -226,12 +227,12 @@ BENCHMARK(BM_MessageRoundTrip);

// Benchmark: Broadcast to multiple agents
static void BM_MessageBroadcast(benchmark::State& state) {
int num_agents = state.range(0);
int32_t num_agents = static_cast<int32_t>(state.range(0));

MessageBus bus;
std::vector<std::shared_ptr<TaskAgent>> agents;

for (int i = 0; i < num_agents; ++i) {
for (int32_t i = 0; i < num_agents; ++i) {
auto agent = std::make_shared<TaskAgent>("agent-" + std::to_string(i));
bus.registerAgent(agent->getAgentId(), agent);
agent->setMessageBus(&bus);
Expand All @@ -240,7 +241,7 @@ static void BM_MessageBroadcast(benchmark::State& state) {

for (auto _ : state) {
// Broadcast: send message to all agents
for (int i = 0; i < num_agents; ++i) {
for (int32_t i = 0; i < num_agents; ++i) {
auto msg = KeystoneMessage::create("broadcaster", "agent-" + std::to_string(i), "broadcast");
bus.routeMessage(msg);
}
Expand Down
26 changes: 14 additions & 12 deletions benchmarks/message_pool_performance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "core/message.hpp"
#include "core/message_pool.hpp"

#include <cstdint>

#include <benchmark/benchmark.h>

using namespace keystone::core;
Expand All @@ -32,7 +34,7 @@ BENCHMARK(BM_MessageCreation_NoPooing);
*/
static void BM_MessageCreation_WithPooling(benchmark::State& state) {
// Warmup pool
for (int i = 0; i < 100; ++i) {
for (int32_t i = 0; i < 100; ++i) {
auto msg = MessagePool::acquire();
MessagePool::release(std::move(msg));
}
Expand All @@ -56,14 +58,14 @@ BENCHMARK(BM_MessageCreation_WithPooling);
* Burst pattern: Create many messages then destroy them (no pooling)
*/
static void BM_MessageBurst_NoPooling(benchmark::State& state) {
const int burst_size = state.range(0);
const int32_t burst_size = static_cast<int32_t>(state.range(0));

for (auto _ : state) {
std::vector<KeystoneMessage> messages;
messages.reserve(burst_size);
messages.reserve(static_cast<size_t>(burst_size));

// Create burst
for (int i = 0; i < burst_size; ++i) {
for (int32_t i = 0; i < burst_size; ++i) {
messages.push_back(KeystoneMessage::create("sender", "receiver", "cmd"));
}

Expand All @@ -78,21 +80,21 @@ BENCHMARK(BM_MessageBurst_NoPooling)->Arg(10)->Arg(100)->Arg(1000);
* Burst pattern: Acquire many messages then release them (with pooling)
*/
static void BM_MessageBurst_WithPooling(benchmark::State& state) {
const int burst_size = state.range(0);
const int32_t burst_size = static_cast<int32_t>(state.range(0));

// Warmup pool
MessagePool::clear();
for (int i = 0; i < burst_size; ++i) {
for (int32_t i = 0; i < burst_size; ++i) {
auto msg = MessagePool::acquire();
MessagePool::release(std::move(msg));
}

for (auto _ : state) {
std::vector<KeystoneMessage> messages;
messages.reserve(burst_size);
messages.reserve(static_cast<size_t>(burst_size));

// Acquire burst
for (int i = 0; i < burst_size; ++i) {
for (int32_t i = 0; i < burst_size; ++i) {
messages.push_back(MessagePool::acquire());
}

Expand Down Expand Up @@ -124,7 +126,7 @@ BENCHMARK(BM_SteadyState_NoPooling);
*/
static void BM_SteadyState_WithPooling(benchmark::State& state) {
// Warmup
for (int i = 0; i < 10; ++i) {
for (int32_t i = 0; i < 10; ++i) {
auto msg = MessagePool::acquire();
MessagePool::release(std::move(msg));
}
Expand All @@ -147,7 +149,7 @@ BENCHMARK(BM_SteadyState_WithPooling);
*/
static void BM_PoolStatistics(benchmark::State& state) {
// Warmup
for (int i = 0; i < 100; ++i) {
for (int32_t i = 0; i < 100; ++i) {
auto msg = MessagePool::acquire();
MessagePool::release(std::move(msg));
}
Expand All @@ -169,7 +171,7 @@ static void BM_PoolHitRate(benchmark::State& state) {

// Build up pool
std::vector<KeystoneMessage> warmup;
for (int i = 0; i < 100; ++i) {
for (int32_t i = 0; i < 100; ++i) {
warmup.push_back(MessagePool::acquire());
}
for (auto& msg : warmup) {
Expand Down Expand Up @@ -201,7 +203,7 @@ static void BM_ThreadLocalPooling(benchmark::State& state) {
}

// Warmup per-thread pool
for (int i = 0; i < 10; ++i) {
for (int32_t i = 0; i < 10; ++i) {
auto msg = MessagePool::acquire();
MessagePool::release(std::move(msg));
}
Expand Down
Loading
Loading