diff --git a/.github/workflows/agent_ci.yml b/.github/workflows/agent_ci.yml index 9272a78..0cdde55 100644 --- a/.github/workflows/agent_ci.yml +++ b/.github/workflows/agent_ci.yml @@ -4,12 +4,12 @@ on: push: branches: [ "main", "devel" ] paths: - - 'sources/agent/' + - 'sources/agent/**' - '.github/workflows/agent_ci.yml' pull_request: branches: [ "main", "devel" ] paths: - - 'sources/agent/' + - 'sources/agent/**' - '.github/workflows/agent_ci.yml' jobs: @@ -62,4 +62,5 @@ jobs: --inline-suppr \ --force \ -i sources/agent/build \ - sources/agent \ No newline at end of file + -i sources/agent/gen \ + sources/agent diff --git a/sources/agent/.gitignore b/sources/agent/.gitignore new file mode 100644 index 0000000..6892fd2 --- /dev/null +++ b/sources/agent/.gitignore @@ -0,0 +1,2 @@ +build/* +gen/* diff --git a/sources/agent/CMakeLists.txt b/sources/agent/CMakeLists.txt index 2ead23e..47adb01 100644 --- a/sources/agent/CMakeLists.txt +++ b/sources/agent/CMakeLists.txt @@ -20,19 +20,21 @@ find_package(Threads REQUIRED) # --- Proto / gRPC code generation --- set(PROTO_DIR ${CMAKE_SOURCE_DIR}/../proto) set(PROTO_FILE ${PROTO_DIR}/volta.proto) +set(GENERATED_DIR ${CMAKE_SOURCE_DIR}/gen) get_target_property(GRPC_CPP_PLUGIN gRPC::grpc_cpp_plugin LOCATION) -set(PROTO_SRC "${CMAKE_CURRENT_BINARY_DIR}/volta.pb.cc") -set(PROTO_HDR "${CMAKE_CURRENT_BINARY_DIR}/volta.pb.h") -set(GRPC_SRC "${CMAKE_CURRENT_BINARY_DIR}/volta.grpc.pb.cc") -set(GRPC_HDR "${CMAKE_CURRENT_BINARY_DIR}/volta.grpc.pb.h") +set(PROTO_SRC "${GENERATED_DIR}/volta.pb.cc") +set(PROTO_HDR "${GENERATED_DIR}/volta.pb.h") +set(GRPC_SRC "${GENERATED_DIR}/volta.grpc.pb.cc") +set(GRPC_HDR "${GENERATED_DIR}/volta.grpc.pb.h") add_custom_command( OUTPUT "${PROTO_SRC}" "${PROTO_HDR}" "${GRPC_SRC}" "${GRPC_HDR}" + COMMAND ${CMAKE_COMMAND} -E make_directory ${GENERATED_DIR} COMMAND protobuf::protoc - ARGS --cpp_out="${CMAKE_CURRENT_BINARY_DIR}" - --grpc_out="${CMAKE_CURRENT_BINARY_DIR}" + ARGS --cpp_out="${GENERATED_DIR}" + --grpc_out="${GENERATED_DIR}" --plugin=protoc-gen-grpc="${GRPC_CPP_PLUGIN}" -I "${PROTO_DIR}" "${PROTO_FILE}" @@ -42,7 +44,7 @@ add_custom_command( add_library(volta_proto STATIC ${PROTO_SRC} ${PROTO_HDR} ${GRPC_SRC} ${GRPC_HDR}) -target_include_directories(volta_proto PUBLIC ${CMAKE_CURRENT_BINARY_DIR}) +target_include_directories(volta_proto PUBLIC ${GENERATED_DIR}) target_link_libraries(volta_proto PUBLIC protobuf::libprotobuf @@ -164,4 +166,4 @@ if(GIT_EXE) PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE) message(STATUS "Git pre-commit hook installed") endif() -endif() \ No newline at end of file +endif() diff --git a/sources/agent/agent.example.conf b/sources/agent/agent.example.conf index a80c3bc..ae0a6b4 100644 --- a/sources/agent/agent.example.conf +++ b/sources/agent/agent.example.conf @@ -1 +1,7 @@ core_affinity = [2, "3-10"] + +# Collect CPU and GPU power metrics +metrics = [ + "METRIC_TYPE_CPU_POWER_PACKAGE", + "METRIC_TYPE_GPU_POWER", +] diff --git a/sources/agent/src/collectors/collector.h b/sources/agent/src/collectors/collector.h index a43c76a..bdcfa78 100644 --- a/sources/agent/src/collectors/collector.h +++ b/sources/agent/src/collectors/collector.h @@ -4,6 +4,7 @@ #include #include "metric.h" +#include "platform/hardware_info.h" namespace volta { namespace agent { @@ -15,9 +16,66 @@ class Collector { virtual std::vector Collect() = 0; + virtual bool IsSupported() = 0; + + virtual std::vector Satisfiable() = 0; + + virtual void SetRequestedMetrics( + const std::vector& metrics) = 0; + virtual bool Init() { return true; } }; +class CollectorRegistry { + public: + static CollectorRegistry& Instance() { + static CollectorRegistry instance; + return instance; + } + + void Register(std::unique_ptr collector) { + entries_.push_back(std::move(collector)); + } + + std::vector Resolve(const std::vector& desired) { + std::vector result; + for (auto& collector : entries_) { + if (!collector->IsSupported()) continue; + + std::vector active; + for (const auto& type : collector->Satisfiable()) { + if (std::find(desired.begin(), desired.end(), type) != desired.end()) { + active.push_back(type); + } + } + + if (!active.empty()) { + collector->SetRequestedMetrics(active); + result.push_back(collector.get()); + } + } + return result; + } + + private: + CollectorRegistry() = default; + std::vector> entries_; +}; + +template +class RegisteredCollector : public Collector { + static bool Register() { + std::unique_ptr c = std::make_unique(); + CollectorRegistry::Instance().Register(std::move(c)); + return true; + } + static inline bool registered_ = Register(); + + static constexpr std::integral_constant force_{}; + + public: +}; + } // namespace collectors } // namespace agent } // namespace volta diff --git a/sources/agent/src/collectors/nvml_collector.cc b/sources/agent/src/collectors/nvml_collector.cc index 4d6cddf..a0958e1 100644 --- a/sources/agent/src/collectors/nvml_collector.cc +++ b/sources/agent/src/collectors/nvml_collector.cc @@ -1,5 +1,6 @@ #include "collectors/nvml_collector.h" +#include #include #include @@ -7,9 +8,7 @@ namespace volta { namespace agent { namespace collectors { -NvmlCollector::NvmlCollector() { - // Initialization moved to Init() -} +NvmlCollector::NvmlCollector() = default; NvmlCollector::~NvmlCollector() { if (initialized_) { @@ -29,6 +28,7 @@ bool NvmlCollector::Init() { if (result != NVML_SUCCESS) { std::cerr << "Failed to get device handle: " << nvmlErrorString(result) << std::endl; + nvmlShutdown(); return false; } @@ -36,54 +36,109 @@ bool NvmlCollector::Init() { return true; } +bool NvmlCollector::IsSupported() { + nvmlReturn_t result = nvmlInit(); + if (result != NVML_SUCCESS) { + return false; + } + + unsigned int device_count = 0; + result = nvmlDeviceGetCount(&device_count); + if (result != NVML_SUCCESS || device_count == 0) { + nvmlShutdown(); + return false; + } + + nvmlDevice_t device; + result = nvmlDeviceGetHandleByIndex(0, &device); + nvmlShutdown(); + return result == NVML_SUCCESS; +} + +void NvmlCollector::SetRequestedMetrics( + const std::vector& metrics) { + requested_metrics_ = metrics; +} + std::vector NvmlCollector::Collect() { - if (!initialized_) return {}; + if (!initialized_ || requested_metrics_.empty()) return {}; std::vector metrics; unsigned int power_mw = 0; auto now = std::chrono::system_clock::now().time_since_epoch().count(); - nvmlReturn_t result = nvmlDeviceGetPowerUsage(device_handle_, &power_mw); - - if (result == NVML_SUCCESS) { - metrics.push_back({"gpu_0_power_watts", - static_cast(power_mw) / 1000.0, // mW -> W - now}); + auto needs = [&](v1::MetricType type) { + return std::find(requested_metrics_.begin(), requested_metrics_.end(), + type) != requested_metrics_.end(); + }; + + nvmlReturn_t result; + if (needs(v1::MetricType::METRIC_TYPE_GPU_POWER)) { + result = nvmlDeviceGetPowerUsage(device_handle_, &power_mw); + if (result == NVML_SUCCESS) { + metrics.push_back({v1::MetricType::METRIC_TYPE_GPU_POWER, + {.index = 0}, + static_cast(power_mw) / 1000.0, + now}); + } } - unsigned int temp_c = 0; - result = - nvmlDeviceGetTemperature(device_handle_, NVML_TEMPERATURE_GPU, &temp_c); - if (result == NVML_SUCCESS) { - metrics.push_back({"gpu_0_temp_celsius", static_cast(temp_c), now}); + if (needs(v1::MetricType::METRIC_TYPE_GPU_TEMPERATURE)) { + unsigned int temp_c = 0; + result = + nvmlDeviceGetTemperature(device_handle_, NVML_TEMPERATURE_GPU, &temp_c); + if (result == NVML_SUCCESS) { + metrics.push_back({v1::MetricType::METRIC_TYPE_GPU_TEMPERATURE, + {.index = 0}, + static_cast(temp_c), + now}); + } } - nvmlUtilization_t utilization; - result = nvmlDeviceGetUtilizationRates(device_handle_, &utilization); - - if (result == NVML_SUCCESS) { - metrics.push_back({"gpu_0_utilization_percent", - static_cast(utilization.gpu), now}); - - metrics.push_back({"gpu_0_memory_activity_percent", - static_cast(utilization.memory), now}); + if (needs(v1::MetricType::METRIC_TYPE_GPU_UTILIZATION) || + needs(v1::MetricType::METRIC_TYPE_GPU_SHARED_MEMORY_UTILIZATION)) { + nvmlUtilization_t utilization; + result = nvmlDeviceGetUtilizationRates(device_handle_, &utilization); + if (result == NVML_SUCCESS) { + if (needs(v1::MetricType::METRIC_TYPE_GPU_UTILIZATION)) { + metrics.push_back({v1::MetricType::METRIC_TYPE_GPU_UTILIZATION, + {.index = 0}, + static_cast(utilization.gpu), + now}); + } + if (needs(v1::MetricType::METRIC_TYPE_GPU_SHARED_MEMORY_UTILIZATION)) { + metrics.push_back( + {v1::MetricType::METRIC_TYPE_GPU_SHARED_MEMORY_UTILIZATION, + {.index = 0}, + static_cast(utilization.memory), + now}); + } + } } - nvmlMemory_t memory; - result = nvmlDeviceGetMemoryInfo(device_handle_, &memory); - - if (result == NVML_SUCCESS) { - metrics.push_back( - {"gpu_0_memory_total_bytes", static_cast(memory.total), now}); - metrics.push_back( - {"gpu_0_memory_used_bytes", static_cast(memory.used), now}); - double used_percent = (double)memory.used / memory.total * 100.0; - metrics.push_back({"gpu_0_memory_used_percent", used_percent, now}); + if (needs(v1::MetricType::METRIC_TYPE_GPU_VRAM_USED)) { + nvmlMemory_t memory; + result = nvmlDeviceGetMemoryInfo(device_handle_, &memory); + if (result == NVML_SUCCESS) { + metrics.push_back( + {v1::MetricType::METRIC_TYPE_GPU_VRAM_USED, + {.index = 0}, + static_cast(memory.used) / static_cast(memory.total), + now}); + } } return metrics; } +std::vector NvmlCollector::Satisfiable() { + return {v1::MetricType::METRIC_TYPE_GPU_VRAM_USED, + v1::MetricType::METRIC_TYPE_GPU_UTILIZATION, + v1::MetricType::METRIC_TYPE_GPU_SHARED_MEMORY_UTILIZATION, + v1::MetricType::METRIC_TYPE_GPU_TEMPERATURE, + v1::MetricType::METRIC_TYPE_GPU_POWER}; +} + } // namespace collectors } // namespace agent } // namespace volta diff --git a/sources/agent/src/collectors/nvml_collector.h b/sources/agent/src/collectors/nvml_collector.h index 7223e34..fb0bb4a 100644 --- a/sources/agent/src/collectors/nvml_collector.h +++ b/sources/agent/src/collectors/nvml_collector.h @@ -3,21 +3,31 @@ #include +#include + #include "collectors/collector.h" namespace volta { namespace agent { namespace collectors { -class NvmlCollector : public Collector { +class NvmlCollector : public RegisteredCollector { public: NvmlCollector(); ~NvmlCollector() override; + NvmlCollector(NvmlCollector&&) = default; + NvmlCollector& operator=(NvmlCollector&&) = default; + bool Init() override; std::vector Collect() override; + bool IsSupported() override; + std::vector Satisfiable() override; + void SetRequestedMetrics(const std::vector& metrics) override; private: + std::vector requested_metrics_; + nvmlDevice_t device_handle_; bool initialized_ = false; }; diff --git a/sources/agent/src/collectors/proc_stat_collector.cc b/sources/agent/src/collectors/proc_stat_collector.cc index e177a8b..9668a17 100644 --- a/sources/agent/src/collectors/proc_stat_collector.cc +++ b/sources/agent/src/collectors/proc_stat_collector.cc @@ -1,13 +1,30 @@ #include "collectors/proc_stat_collector.h" +#include #include +#include #include namespace volta { namespace agent { namespace collectors { +bool ProcStatCollector::Init() { return true; } + +void ProcStatCollector::SetRequestedMetrics( + const std::vector& metrics) { + requested_metrics_ = metrics; +} + std::vector ProcStatCollector::Collect() { + if (requested_metrics_.empty()) return {}; + + if (std::find(requested_metrics_.begin(), requested_metrics_.end(), + v1::MetricType::METRIC_TYPE_CPU_UTILIZATION) == + requested_metrics_.end()) { + return {}; + } + uint64_t current_total = 0; uint64_t current_idle = 0; @@ -17,14 +34,16 @@ std::vector ProcStatCollector::Collect() { uint64_t diff_idle = current_idle - prev_idle_; double usage_percent = 0.0; - if (diff_total > 0) + if (diff_total > 0) { usage_percent = (double)(diff_total - diff_idle) / diff_total * 100.0; + } prev_total_ = current_total; prev_idle_ = current_idle; Metric m; - m.name = "cpu_usage_total_percent"; + m.type = v1::MetricType::METRIC_TYPE_CPU_UTILIZATION; + m.devId = {}; m.value = usage_percent; m.timestamp = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) @@ -33,6 +52,14 @@ std::vector ProcStatCollector::Collect() { return {m}; } +bool ProcStatCollector::IsSupported() { + return std::filesystem::exists("/proc/stat"); +} + +std::vector ProcStatCollector::Satisfiable() { + return {v1::MetricType::METRIC_TYPE_CPU_UTILIZATION}; +} + void ProcStatCollector::ReadCpuStats(uint64_t& total, uint64_t& idle) { std::ifstream file("/proc/stat"); std::string line; @@ -52,4 +79,4 @@ void ProcStatCollector::ReadCpuStats(uint64_t& total, uint64_t& idle) { } // namespace collectors } // namespace agent -} // namespace volta \ No newline at end of file +} // namespace volta diff --git a/sources/agent/src/collectors/proc_stat_collector.h b/sources/agent/src/collectors/proc_stat_collector.h index 258143a..313f317 100644 --- a/sources/agent/src/collectors/proc_stat_collector.h +++ b/sources/agent/src/collectors/proc_stat_collector.h @@ -9,13 +9,18 @@ namespace volta { namespace agent { namespace collectors { -class ProcStatCollector : public Collector { +class ProcStatCollector : public RegisteredCollector { public: + bool Init() override; std::vector Collect() override; + bool IsSupported() override; + std::vector Satisfiable() override; + void SetRequestedMetrics(const std::vector& metrics) override; private: void ReadCpuStats(uint64_t& idle_time, uint64_t& total_time); + std::vector requested_metrics_; uint64_t prev_total_ = 0; uint64_t prev_idle_ = 0; }; diff --git a/sources/agent/src/collectors/ram_collector.cc b/sources/agent/src/collectors/ram_collector.cc index b135002..6cbd49c 100644 --- a/sources/agent/src/collectors/ram_collector.cc +++ b/sources/agent/src/collectors/ram_collector.cc @@ -1,6 +1,8 @@ #include "collectors/ram_collector.h" +#include #include +#include #include #include @@ -8,43 +10,80 @@ namespace volta { namespace agent { namespace collectors { +bool RamCollector::Init() { + initialized_ = true; + return true; +} + +void RamCollector::SetRequestedMetrics( + const std::vector& metrics) { + requested_metrics_ = metrics; +} + std::vector RamCollector::Collect() { + if (!initialized_ || requested_metrics_.empty()) return {}; + + bool needs_total = + std::find(requested_metrics_.begin(), requested_metrics_.end(), + v1::MetricType::METRIC_TYPE_RAM_TOTAL) != + requested_metrics_.end(); + bool needs_used = + std::find(requested_metrics_.begin(), requested_metrics_.end(), + v1::MetricType::METRIC_TYPE_RAM_USED) != + requested_metrics_.end(); + if (!needs_total && !needs_used) return {}; + uint64_t used = 0; uint64_t total = 0; - ReadStats(used, total); auto now = std::chrono::system_clock::now().time_since_epoch().count(); + std::vector metrics; + if (needs_total) { + metrics.push_back({v1::MetricType::METRIC_TYPE_RAM_TOTAL, + {.name = "ram"}, + (double)total, + now}); + } + if (needs_used) { + metrics.push_back({v1::MetricType::METRIC_TYPE_RAM_USED, + {.name = "ram"}, + (double)used, + now}); + } + return metrics; +} - return {{"ram_total_bytes", (double)total, now}, - {"ram_used_bytes", (double)used, now}, - {"ram_used_percent", (double)used / total * 100.0, now}}; +std::vector RamCollector::Satisfiable() { + return {v1::MetricType::METRIC_TYPE_RAM_TOTAL, + v1::MetricType::METRIC_TYPE_RAM_USED}; } void RamCollector::ReadStats(uint64_t& used, uint64_t& total) { std::ifstream file("/proc/meminfo"); std::string line, key; uint64_t value; - // "kB" std::string unit; - uint64_t available = 0; while (std::getline(file, line)) { std::istringstream iss(line); iss >> key >> value >> unit; - // kB to bytes - if (key == "MemTotal:") + if (key == "MemTotal:") { total = value * 1024; - else if (key == "MemAvailable:") + } else if (key == "MemAvailable:") { available = value * 1024; - + } if (total > 0 && available > 0) break; } used = total - available; } +bool RamCollector::IsSupported() { + return std::filesystem::exists("/proc/meminfo"); +} + } // namespace collectors } // namespace agent } // namespace volta diff --git a/sources/agent/src/collectors/ram_collector.h b/sources/agent/src/collectors/ram_collector.h index f634910..7a08379 100644 --- a/sources/agent/src/collectors/ram_collector.h +++ b/sources/agent/src/collectors/ram_collector.h @@ -7,13 +7,18 @@ namespace volta { namespace agent { namespace collectors { -class RamCollector : public Collector { +class RamCollector : public RegisteredCollector { public: + bool Init() override; std::vector Collect() override; + bool IsSupported() override; + std::vector Satisfiable() override; + void SetRequestedMetrics(const std::vector& metrics) override; private: void ReadStats(uint64_t& used, uint64_t& total); + std::vector requested_metrics_; bool initialized_ = false; }; diff --git a/sources/agent/src/collectors/rapl_collector.cc b/sources/agent/src/collectors/rapl_collector.cc index 6616659..b307688 100644 --- a/sources/agent/src/collectors/rapl_collector.cc +++ b/sources/agent/src/collectors/rapl_collector.cc @@ -12,74 +12,118 @@ namespace volta { namespace agent { namespace collectors { -RaplCollector::RaplCollector() { - OpenMSR(); - uint64_t readout = ReadMSR(0, MSR_RAPL::POWER_UNIT); - power_units_ = pow(0.5, (double)(readout & 0xf)); - energy_units_ = pow(0.5, (double)((readout >> 8) & 0x1f)); - time_units_ = pow(0.5, (double)((readout >> 16) & 0xf)); - readout = ReadMSR(0, MSR_RAPL::PKG::ENERGY_STATUS); - last_value = energy_units_ * readout; +RaplCollector::RaplCollector() = default; + +bool RaplCollector::Init() { + try { + OpenMSR(); + uint64_t readout = ReadMSR(0, MSR_RAPL::POWER_UNIT); + power_units_ = pow(0.5, (double)(readout & 0xf)); + energy_units_ = pow(0.5, (double)((readout >> 8) & 0x1f)); + time_units_ = pow(0.5, (double)((readout >> 16) & 0xf)); + readout = ReadMSR(0, MSR_RAPL::PKG::ENERGY_STATUS); + last_value = energy_units_ * readout; + initialized_ = true; + return true; + } catch (const MSR_Open_Exception&) { + return false; + } catch (const MSR_Read_Exception&) { + return false; + } +} + +bool RaplCollector::IsSupported() { + const std::filesystem::path cpu_base = "/dev/cpu"; + std::error_code ec; + if (!std::filesystem::exists(cpu_base, ec) || + !std::filesystem::is_directory(cpu_base, ec)) { + return false; + } + + for (const auto& entry : std::filesystem::directory_iterator(cpu_base)) { + if (!entry.is_directory()) continue; + + const auto msr_path = entry.path() / "msr"; + if (std::filesystem::exists(msr_path, ec) && + access(msr_path.c_str(), R_OK) == 0) { + return true; + } + } + + return false; +} + +void RaplCollector::SetRequestedMetrics( + const std::vector& metrics) { + requested_metrics_ = metrics; } std::vector RaplCollector::Collect() { - uint64_t readout; + if (!initialized_ || requested_metrics_.empty()) return {}; + if (std::find(requested_metrics_.begin(), requested_metrics_.end(), + v1::MetricType::METRIC_TYPE_CPU_POWER_PACKAGE) == + requested_metrics_.end()) { + return {}; + } + uint64_t readout; try { readout = ReadMSR(0, MSR_RAPL::PKG::ENERGY_STATUS); - } catch (const MSR_Read_Exception &e) { + } catch (const MSR_Read_Exception&) { return {}; } double value = energy_units_ * readout; - Metric m; - m.name = "cpu_energy_usage_total"; + m.type = v1::MetricType::METRIC_TYPE_CPU_POWER_PACKAGE; + m.devId = {}; m.value = value - last_value; - m.timestamp = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + m.timestamp = std::chrono::system_clock::now().time_since_epoch().count(); last_value = value; return {m}; } +std::vector RaplCollector::Satisfiable() { + return {v1::MetricType::METRIC_TYPE_CPU_POWER_PACKAGE}; +} + uint64_t RaplCollector::ReadMSR(uint8_t core, uint32_t offset) { uint64_t data; if (core + 1 > MSR_files_.size()) { throw MSR_Read_Exception(); } - // c-like read for thread safety if (pread(MSR_files_[core], &data, sizeof data, offset) != sizeof data) { - return {}; + throw MSR_Read_Exception(); } - return data; } void RaplCollector::OpenMSR() { const std::filesystem::path cpu_base = "/dev/cpu"; - MSR_files_ = std::vector(); + MSR_files_.clear(); std::error_code ec; if (!std::filesystem::exists(cpu_base, ec)) { throw MSR_Open_Exception(); } std::vector> cpu_entries; - for (const auto &entry : std::filesystem::directory_iterator(cpu_base)) { + for (const auto& entry : std::filesystem::directory_iterator(cpu_base)) { if (!entry.is_directory()) continue; - const auto &dirname = entry.path().filename().string(); + const auto& dirname = entry.path().filename().string(); if (!std::ranges::all_of(dirname, ::isdigit)) continue; cpu_entries.emplace_back(std::stoi(dirname), entry.path()); } std::ranges::sort(cpu_entries); - - for (const auto &[id, path] : cpu_entries) { + for (const auto& [id, path] : cpu_entries) { int fd = open((path / "msr").c_str(), O_RDONLY); if (fd >= 0) { MSR_files_.push_back(fd); } } + if (MSR_files_.empty()) { + throw MSR_Open_Exception(); + } } void RaplCollector::CloseMSR(int fd) { close(fd); } @@ -88,7 +132,8 @@ RaplCollector::~RaplCollector() { for (auto file : MSR_files_) { CloseMSR(file); } -}; +} + } // namespace collectors } // namespace agent } // namespace volta diff --git a/sources/agent/src/collectors/rapl_collector.h b/sources/agent/src/collectors/rapl_collector.h index 4696516..583f299 100644 --- a/sources/agent/src/collectors/rapl_collector.h +++ b/sources/agent/src/collectors/rapl_collector.h @@ -7,23 +7,27 @@ namespace volta { namespace agent { namespace collectors { -class RaplCollector : public Collector { +class RaplCollector : public RegisteredCollector { public: RaplCollector(); - // ~RaplCollector() override; RaplCollector(const RaplCollector&) = delete; RaplCollector& operator=(const RaplCollector&) = delete; + bool Init() override; std::vector Collect() override; + bool IsSupported() override; + std::vector Satisfiable() override; + void SetRequestedMetrics(const std::vector& metrics) override; ~RaplCollector(); private: uint64_t ReadMSR(uint8_t core, uint32_t offset); void OpenMSR(); void CloseMSR(int fd); + std::vector requested_metrics_; bool initialized_ = false; - double power_units_, energy_units_, time_units_; + double power_units_ = 0, energy_units_ = 0, time_units_ = 0; std::vector MSR_files_; - double last_value; + double last_value = 0; class MSR_Read_Exception : std::exception {}; class MSR_Open_Exception : std::exception {}; diff --git a/sources/agent/src/config/config.h b/sources/agent/src/config/config.h index 06421b8..e0a07aa 100644 --- a/sources/agent/src/config/config.h +++ b/sources/agent/src/config/config.h @@ -1,6 +1,7 @@ #ifndef VOLTA_AGENT_CONFIG_CONFIG_H_ #define VOLTA_AGENT_CONFIG_CONFIG_H_ +#include #include #include @@ -13,35 +14,6 @@ namespace volta { namespace agent { namespace config { -// TODO: Metrics and Collector names into something that can be read with a -// string -namespace CollectorNames { -// CPU -static constexpr char const* kProcStat = "proc_stat"; -static constexpr char const* kCpuFreq = "cpu_freq"; -static constexpr char const* kRapl = "rapl"; -static constexpr char const* kZenPower = "zenpower"; -static constexpr char const* kPmu = "pmu"; - -// GPU -static constexpr char const* kNvml = "nvml"; -static constexpr char const* kDcgm = "dcgm"; -static constexpr char const* kRocm = "rocm"; -static constexpr char const* kLevelZero = "level_zero"; - -// RAM -static constexpr char const* kMemInfo = "mem_info"; -static constexpr char const* kVmStat = "vm_stat"; - -// Disc and Network (I/O) -static constexpr char const* kDiskStats = "disk_stats"; -static constexpr char const* kNetDev = "net_dev"; -} // namespace CollectorNames - -struct CollectorConfig { - bool enabled = false; - std::map metrics; -}; struct Config { void PrintCurrentAffinity() { @@ -62,7 +34,8 @@ struct Config { } std::cout << "\n"; } - + // TODO: move this initialization logic to the ConfigLoader's + // LoadDefaultConfig method static constexpr int32_t kDefaultIntervalMs = 500; static constexpr char const* kDefaultServerAddress = "localhost"; static constexpr uint16_t kDefaultServerPort = 50051; @@ -83,11 +56,9 @@ struct Config { std::chrono::milliseconds collection_interval = std::chrono::milliseconds(kDefaultIntervalMs); cpu_set_t core_affinity = kDefaultAffinity; - std::string server_address = kDefaultServerAddress; uint16_t server_port = kDefaultServerPort; - - std::map collectors; + std::vector requestedMetrics; }; } // namespace config diff --git a/sources/agent/src/config/config_loader.cc b/sources/agent/src/config/config_loader.cc index 16cf662..86b84a7 100644 --- a/sources/agent/src/config/config_loader.cc +++ b/sources/agent/src/config/config_loader.cc @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -12,6 +13,7 @@ #include "utils/utils.h" namespace utils = volta::agent::utils; +namespace v1 = volta::v1; namespace volta { namespace agent { @@ -22,14 +24,7 @@ std::filesystem::path ConfigLoader::kConfigFile = "agent.conf"; std::filesystem::path ConfigLoader::kUUIDFile = "agent.uuid"; std::set> ConfigLoader::kValidTopLevelKeys = { - "core_affinity", "interval", "server_address", "server_port", "collectors"}; - -std::map>, std::less<>> - ConfigLoader::kValidCollectors = { - {"cpu", {"proc_stat", "cpu_freq", "rapl", "zenpower", "pmu"}}, - {"gpu", {"nvml", "dcgm", "rocm", "level_zero"}}, - {"ram", {"mem_info", "vm_stat"}}, - {"io", {"disk_stats", "net_dev"}}}; + "core_affinity", "interval", "server_address", "server_port", "metrics"}; Config ConfigLoader::LoadConfig() { Config config = LoadDefaultConfig(); @@ -42,24 +37,44 @@ Config ConfigLoader::LoadDefaultConfig() { if (!LoadUUID(config)) CreateUUID(config); - CollectorConfig nvml_collector; - nvml_collector.enabled = true; - nvml_collector.metrics = { - {"gpu_utilization", true}, - {"memory_utilization", true}, - {"temperature", true}, + // request all metrics by default for now + config.requestedMetrics = { + v1::MetricType::METRIC_TYPE_UNSPECIFIED, + v1::MetricType::METRIC_TYPE_CPU_POWER_PACKAGE, + v1::MetricType::METRIC_TYPE_CPU_POWER_CORES, + v1::MetricType::METRIC_TYPE_CPU_CLOCK_SPEED, + v1::MetricType::METRIC_TYPE_CPU_UTILIZATION, + v1::MetricType::METRIC_TYPE_CPU_TEMPERATURE, + v1::MetricType::METRIC_TYPE_CPU_IOWAIT, + v1::MetricType::METRIC_TYPE_CPU_CACHE_HIT_RATIO, + v1::MetricType::METRIC_TYPE_CPU_ACTIVE_PROCESSES, + v1::MetricType::METRIC_TYPE_GPU_POWER, + v1::MetricType::METRIC_TYPE_GPU_CLOCK_SPEED, + v1::MetricType::METRIC_TYPE_GPU_UTILIZATION, + v1::MetricType::METRIC_TYPE_GPU_TEMPERATURE, + v1::MetricType::METRIC_TYPE_GPU_VRAM_USED, + v1::MetricType::METRIC_TYPE_GPU_PCIE_BANDWIDTH, + v1::MetricType::METRIC_TYPE_GPU_COMPUTE_UNIT_UTILIZATION, + v1::MetricType::METRIC_TYPE_GPU_SHARED_MEMORY_UTILIZATION, + v1::MetricType::METRIC_TYPE_GPU_REGISTER_UTILIZATION, + v1::MetricType::METRIC_TYPE_RAM_POWER, + v1::MetricType::METRIC_TYPE_RAM_TOTAL, + v1::MetricType::METRIC_TYPE_RAM_AVAILABLE, + v1::MetricType::METRIC_TYPE_RAM_USED, + v1::MetricType::METRIC_TYPE_RAM_CACHED, + v1::MetricType::METRIC_TYPE_SWAP_USED, + v1::MetricType::METRIC_TYPE_SWAP_ACTIVITY, + v1::MetricType::METRIC_TYPE_DISK_READ_THROUGHPUT, + v1::MetricType::METRIC_TYPE_DISK_WRITE_THROUGHPUT, + v1::MetricType::METRIC_TYPE_DISK_READ_IOPS, + v1::MetricType::METRIC_TYPE_DISK_WRITE_IOPS, + v1::MetricType::METRIC_TYPE_DISK_BUSY_TIME, + v1::MetricType::METRIC_TYPE_DISK_CAPACITY_USED, + v1::MetricType::METRIC_TYPE_NET_BYTES_RECEIVED, + v1::MetricType::METRIC_TYPE_NET_BYTES_SENT, + v1::MetricType::METRIC_TYPE_NET_PACKETS_RECEIVED, + v1::MetricType::METRIC_TYPE_NET_PACKETS_SENT, }; - config.collectors[CollectorNames::kNvml] = nvml_collector; - - CollectorConfig proc_stat_config; - proc_stat_config.enabled = true; - proc_stat_config.metrics["cpu_usage_percent"] = true; - config.collectors[CollectorNames::kProcStat] = proc_stat_config; - - CollectorConfig rapl_collector; - rapl_collector.enabled = true; - rapl_collector.metrics = {{"cpu_energy_usage_total", true}}; - config.collectors[CollectorNames::kRapl] = rapl_collector; return config; } @@ -97,7 +112,7 @@ void ConfigLoader::LoadConfigFile(Config& out_config) { LoadInterval(tbl, out_config); LoadServerAddress(tbl, out_config); LoadServerPort(tbl, out_config); - LoadCollectors(tbl, out_config); + LoadMetrics(tbl, out_config); CheckKeys(tbl); } catch (const toml::parse_error& err) { std::cerr << "Parsing Agent config failed: " << err.description() << " at " @@ -232,43 +247,70 @@ void ConfigLoader::LoadServerPort(toml::table& tbl, Config& out_config) { } } -void ConfigLoader::LoadCollectors(toml::table& tbl, Config& out_config) { - if (!tbl.contains("collectors")) return; +void ConfigLoader::LoadMetrics(toml::table& tbl, Config& out_config) { + if (!tbl.contains("metrics")) return; - auto collectors_node = tbl["collectors"].as_table(); + auto metrics_node = tbl["metrics"]; - for (auto&& [hardware_type, hardware_node] : *collectors_node) { - if (!kValidCollectors.contains(hardware_type.str())) { - std::cerr << "Invalid hardware type: " << hardware_type << std::endl; - continue; - } + std::vector metrics; - auto collectors = hardware_node.as_array(); - if (!collectors) { - std::cout << "Element " << hardware_type << " is not an array\n"; - continue; + auto append_metric = [&](v1::MetricType metric) { + if (metric == v1::MetricType::METRIC_TYPE_UNSPECIFIED) { + return; + } + if (std::find(metrics.begin(), metrics.end(), metric) == metrics.end()) { + metrics.push_back(metric); } + }; - CollectorConfig collector_config; + auto parse_metric_name = [&](const std::string& name, + v1::MetricType& metric) -> bool { + if (v1::MetricType_Parse(name, &metric)) return true; + if (v1::MetricType_Parse(std::string("METRIC_TYPE_") + name, &metric)) + return true; + return false; + }; - std::cout << hardware_type << std::endl; - for (auto&& collector : *collectors) { - if (auto str = collector.value()) { - const auto& collector_set = kValidCollectors[hardware_type.str()]; - if (!collector_set.contains(*str)) { - std::cout << "Invalid collector: " << *str - << ", for hardware: " << hardware_type << std::endl; + if (auto arr = metrics_node.as_array()) { + for (const auto& item : *arr) { + if (auto metric_num = item.value()) { + if (!v1::MetricType_IsValid(*metric_num)) { + std::cerr << "Invalid metric value: " << *metric_num << std::endl; continue; } + append_metric(static_cast(*metric_num)); + continue; + } - std::cout << *str << std::endl; - // TODO: Add metrics - - collector_config.enabled = !collector_config.metrics.empty(); - } else { - std::cerr << "Invalid type in " << hardware_type << " array\n"; + if (auto metric_name = item.value()) { + v1::MetricType metric; + if (!parse_metric_name(*metric_name, metric)) { + std::cerr << "Invalid metric name: " << *metric_name << std::endl; + continue; + } + append_metric(metric); + continue; } + + std::cerr << "Invalid metric entry type" << std::endl; + } + } else if (auto val = metrics_node.value()) { + v1::MetricType metric; + if (parse_metric_name(*val, metric)) { + append_metric(metric); + } else { + std::cerr << "Invalid metric name: " << *val << std::endl; } + } else { + std::cerr << "Invalid metrics value, use array of strings or numbers" + << std::endl; + return; + } + + if (!metrics.empty()) { + out_config.requestedMetrics = std::move(metrics); + std::cout << "Requested metrics updated to " + << out_config.requestedMetrics.size() << " entries" << std::endl; } } diff --git a/sources/agent/src/config/config_loader.h b/sources/agent/src/config/config_loader.h index 2014423..b738009 100644 --- a/sources/agent/src/config/config_loader.h +++ b/sources/agent/src/config/config_loader.h @@ -27,15 +27,12 @@ class ConfigLoader { static void LoadInterval(toml::table& tbl, Config& out_config); static void LoadServerAddress(toml::table& tbl, Config& out_config); static void LoadServerPort(toml::table& tbl, Config& out_config); - static void LoadCollectors(toml::table& tbl, Config& out_config); + static void LoadMetrics(toml::table& tbl, Config& out_config); static void CheckKeys(toml::table& tbl); static std::filesystem::path kConfigFile; static std::filesystem::path kUUIDFile; static std::set> kValidTopLevelKeys; - static std::map>, - std::less<>> - kValidCollectors; }; } // namespace config diff --git a/sources/agent/src/main.cc b/sources/agent/src/main.cc index caece51..1260a37 100644 --- a/sources/agent/src/main.cc +++ b/sources/agent/src/main.cc @@ -21,24 +21,14 @@ int main() { try { auto config = config::ConfigLoader::LoadConfig(); - platform::PlatformDetector detector; - auto hw = detector.Detect(); - detector.PrintDetectedInfo(hw); - - std::vector> active_collectors; - - active_collectors.push_back( - std::make_unique()); - active_collectors.push_back(std::make_unique()); - active_collectors.push_back(std::make_unique()); - for (const auto &gpu : hw.gpus) { - if (gpu.vendor == platform::GpuVendor::NVIDIA) { - auto nvml = std::make_unique(); - if (nvml->Init()) { - active_collectors.push_back(std::move(nvml)); - } - } - } + // platform::PlatformDetector detector; + // auto hw = detector.Detect(); + // detector.PrintDetectedInfo(hw); + + auto active_collectors = collectors::CollectorRegistry::Instance().Resolve( + config.requestedMetrics); + + std::cin.get(); Scheduler scheduler(config, std::move(active_collectors)); scheduler.Run(); diff --git a/sources/agent/src/metric.h b/sources/agent/src/metric.h index 04a4c2f..87235d9 100644 --- a/sources/agent/src/metric.h +++ b/sources/agent/src/metric.h @@ -2,13 +2,25 @@ #define VOLTA_AGENT_SRC_METRIC_H #include +#include #include +#include "volta.pb.h" + namespace volta { namespace agent { +struct DeviceId { + std::optional + uuid; // device UUID from NVML/ROCm, disk serial, etc. + std::optional name; // Human-readable: "sda", "eth0", "GPU 0" + std::optional index; // 0-based index (GPU slot, CPU core, etc.) + std::optional vendor; // "nvidia" | "amd" | "intel" +}; + struct Metric { - std::string name; + v1::MetricType type; + DeviceId devId; double value; int64_t timestamp; }; diff --git a/sources/agent/src/scheduler.cc b/sources/agent/src/scheduler.cc index d6c806a..5af7682 100644 --- a/sources/agent/src/scheduler.cc +++ b/sources/agent/src/scheduler.cc @@ -7,17 +7,17 @@ namespace volta { namespace agent { -Scheduler::Scheduler( - const config::Config& config, - std::vector>&& collectors) +Scheduler::Scheduler(const config::Config& config, + std::vector&& collectors) : config_(config), collectors_(std::move(collectors)) {} void Scheduler::Run() { + for (auto collector : collectors_) { + collector->Init(); + } std::cout << "[" << config_.uuid << "] Starting collection loop (Interval: " << config_.collection_interval.count() << "ms)..." << std::endl; - std::this_thread::sleep_for(config_.collection_interval); - while (true) { std::vector batch; for (const auto& collector : collectors_) { @@ -39,17 +39,19 @@ void Scheduler::PrintDashboard(const std::vector& metrics) { std::cout << " VOLTA AGENT v0.1 (POC) - ACTIVE MONITOR \n"; std::cout << "===============================================\n"; - std::cout << std::left << std::setw(30) << "METRIC NAME" << "VALUE\n"; + std::cout << std::left << std::setw(30) << "METRIC NAME" + << "VALUE\n"; std::cout << "-----------------------------------------------\n"; for (const auto& m : metrics) { - std::cout << std::left << std::setw(30) << m.name << std::fixed - << std::setprecision(2) << m.value << "\n"; + std::cout << std::left << std::setw(30) << v1::MetricType_Name(m.type) + << std::fixed << std::setprecision(2) << m.value << "\n"; } std::cout << "-----------------------------------------------\n"; std::cout << "Data points collected: " << metrics.size() << "\n"; - std::cout << "Press Ctrl+C to exit." << "\n"; + std::cout << "Press Ctrl+C to exit." + << "\n"; std::cout.flush(); } diff --git a/sources/agent/src/scheduler.h b/sources/agent/src/scheduler.h index 9caafee..99919ed 100644 --- a/sources/agent/src/scheduler.h +++ b/sources/agent/src/scheduler.h @@ -13,16 +13,15 @@ namespace agent { class Scheduler { public: - explicit Scheduler( - const config::Config& config, - std::vector>&& collectors); + explicit Scheduler(const config::Config& config, + std::vector&& collectors); void Run(); private: void PrintDashboard(const std::vector& metrics); - std::vector> collectors_; + std::vector collectors_; const config::Config& config_; }; diff --git a/sources/proto/volta.proto b/sources/proto/volta.proto index 1eee8fb..24b072e 100644 --- a/sources/proto/volta.proto +++ b/sources/proto/volta.proto @@ -4,6 +4,55 @@ import "google/protobuf/timestamp.proto"; package volta.v1; + +enum MetricType { + METRIC_TYPE_UNSPECIFIED = 0; + + // CPU — range [100, 199] + METRIC_TYPE_CPU_POWER_PACKAGE = 100; + METRIC_TYPE_CPU_POWER_CORES = 101; + METRIC_TYPE_CPU_CLOCK_SPEED = 102; + METRIC_TYPE_CPU_UTILIZATION = 103; + METRIC_TYPE_CPU_TEMPERATURE = 104; + METRIC_TYPE_CPU_IOWAIT = 105; + METRIC_TYPE_CPU_CACHE_HIT_RATIO = 106; + METRIC_TYPE_CPU_ACTIVE_PROCESSES = 107; + + // GPU — range [200, 299] + METRIC_TYPE_GPU_POWER = 200; + METRIC_TYPE_GPU_CLOCK_SPEED = 201; + METRIC_TYPE_GPU_UTILIZATION = 202; + METRIC_TYPE_GPU_TEMPERATURE = 203; + METRIC_TYPE_GPU_VRAM_USED = 204; + METRIC_TYPE_GPU_PCIE_BANDWIDTH = 205; + METRIC_TYPE_GPU_COMPUTE_UNIT_UTILIZATION = 206; + METRIC_TYPE_GPU_SHARED_MEMORY_UTILIZATION = 207; + METRIC_TYPE_GPU_REGISTER_UTILIZATION = 208; + + // RAM — range [300, 399] + METRIC_TYPE_RAM_POWER = 300; + METRIC_TYPE_RAM_TOTAL = 301; + METRIC_TYPE_RAM_AVAILABLE = 302; + METRIC_TYPE_RAM_USED = 303; + METRIC_TYPE_RAM_CACHED = 304; + METRIC_TYPE_SWAP_USED = 305; + METRIC_TYPE_SWAP_ACTIVITY = 306; + + // Disk — range [400, 499] + METRIC_TYPE_DISK_READ_THROUGHPUT = 400; + METRIC_TYPE_DISK_WRITE_THROUGHPUT = 401; + METRIC_TYPE_DISK_READ_IOPS = 402; + METRIC_TYPE_DISK_WRITE_IOPS = 403; + METRIC_TYPE_DISK_BUSY_TIME = 404; + METRIC_TYPE_DISK_CAPACITY_USED = 405; + + // Network — range [500, 599] + METRIC_TYPE_NET_BYTES_RECEIVED = 500; + METRIC_TYPE_NET_BYTES_SENT = 501; + METRIC_TYPE_NET_PACKETS_RECEIVED = 502; + METRIC_TYPE_NET_PACKETS_SENT = 503; +} + service VoltaCollector { rpc CollectMetrics(stream MetricBatch) returns (CollectResponse); }