diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6ac90ad81..225849804 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -130,7 +130,7 @@ jobs: Import-Module $env:ChocolateyInstall\helpers\chocolateyProfile.psm1 refreshenv conan profile detect - conan install . --build=missing + conan install . --build=missing -o "&:opentelemetry_tracing=False" cmake . --preset conan-default -DVIAMCPPSDK_BUILD_EXAMPLES=ON cmake --build --preset=conan-release --target ALL_BUILD install -j 8 env: diff --git a/CMakeLists.txt b/CMakeLists.txt index 14839a080..9329b166c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -132,6 +132,15 @@ option(VIAMCPPSDK_SANITIZED_BUILD "Build with address and UB sanitizers (less pe # option(VIAMCPPSDK_CLANG_TIDY "Run the clang-tidy linter" OFF) + +# - `VIAMCPPSDK_OPENTELEMETRY_TRACING` +# +# When enabled, links against opentelemetry-cpp and compiles in +# W3C Trace Context propagation for all gRPC client and server calls. +# This is off by default and effectively conan-only, because apt packages are not generally available. +# +option(VIAMCPPSDK_OPENTELEMETRY_TRACING "Compile OpenTelemetry tracing into all gRPC calls" OFF) + # The following options are only defined if this project is not being included as a subproject if (CMAKE_CURRENT_SOURCE_DIR STREQUAL CMAKE_SOURCE_DIR) option(VIAMCPPSDK_BUILD_EXAMPLES "Build the example executables" ON) @@ -485,6 +494,10 @@ if(VIAMCPPSDK_GRPCXX_VERSION VERSION_LESS 1.43.0) set(VIAMCPPSDK_GRPCXX_NO_DIRECT_DIAL 1) endif() +if (VIAMCPPSDK_OPENTELEMETRY_TRACING) + find_package(opentelemetry-cpp CONFIG REQUIRED) +endif() + include(FetchContent) FetchContent_Declare( diff --git a/conanfile.py b/conanfile.py index 4fe56399c..914e705fb 100644 --- a/conanfile.py +++ b/conanfile.py @@ -1,4 +1,5 @@ from conan import ConanFile +from conan.errors import ConanInvalidConfiguration from conan.tools.cmake import CMake, CMakeDeps, CMakeToolchain, cmake_layout from conan.tools.build import valid_max_cppstd from conan.tools.files import load @@ -18,11 +19,13 @@ class ViamCppSdkRecipe(ConanFile): options = { "offline_proto_generation": [True, False], + "opentelemetry_tracing": [True, False], "shared": [True, False] } default_options = { "offline_proto_generation": True, + "opentelemetry_tracing": True, "shared": False } @@ -36,6 +39,14 @@ def configure(self): # Workaround an unfortunately long-standing boost/conan issue which breaks C++20 builds self.options["boost"].without_cobalt = True + if self.options.opentelemetry_tracing: + self.options["opentelemetry-cpp"].with_otlp_grpc = True + # Disable every HTTP-based exporter so libcurl never enters the + # dependency graph; we only use OTLP/gRPC. + self.options["opentelemetry-cpp"].with_otlp_http = False + self.options["opentelemetry-cpp"].with_zipkin = False + self.options["opentelemetry-cpp"].with_elasticsearch = False + if self.options.shared: # See https://github.com/conan-io/conan-center-index/issues/25107 self.options["grpc"].secure = True @@ -46,6 +57,13 @@ def configure(self): for lib in ["grpc", "protobuf", "abseil"]: self.options[lib].shared = True + def validate(self): + if self.options.opentelemetry_tracing: + if not self.dependencies["opentelemetry-cpp"].options.with_otlp_grpc: + raise ConanInvalidConfiguration("opentelemetry_tracing option requires opentelemetry-cpp/*:with_otlp_grpc") + + + def _xtensor_requires(self): if valid_max_cppstd(self, 14, False): return 'xtensor/[>=0.24.3 <0.26.0]' @@ -71,6 +89,15 @@ def requirements(self): self.requires('protobuf/[>=3.17.1 <6.30.0]') self.requires(self._xtensor_requires(), transitive_headers=True) + if self.options.opentelemetry_tracing: + # Oldest maintained conan package and first version with proper CMake support + if self.settings.os == "Windows": + # v1.25+ builds opentelemetry_proto as a DLL on Windows without + # exporting its protobuf-generated globals, breaking consumer link. + self.requires('opentelemetry-cpp/[>=1.21.0 <1.25.0]') + else: + self.requires('opentelemetry-cpp/[>=1.21.0]') + def build_requirements(self): if self.options.offline_proto_generation: self.tool_requires(self._grpc_requires()) @@ -83,6 +110,7 @@ def generate(self): tc = CMakeToolchain(self) tc.cache_variables["VIAMCPPSDK_OFFLINE_PROTO_GENERATION"] = self.options.offline_proto_generation + tc.cache_variables["VIAMCPPSDK_OPENTELEMETRY_TRACING"] = self.options.opentelemetry_tracing tc.cache_variables["VIAMCPPSDK_USE_DYNAMIC_PROTOS"] = True # We don't want to constrain these for conan builds because we @@ -130,6 +158,11 @@ def package_info(self): self.cpp_info.components["viamapi"].includedirs.append("include/viam/api") + if self.options.opentelemetry_tracing: + self.cpp_info.components["viamapi"].requires.append( + "opentelemetry-cpp::opentelemetry_proto" + ) + if self.options.shared: self.cpp_info.components["viamapi"].libs = ["viamapi"] else: @@ -156,6 +189,12 @@ def package_info(self): "viamapi", ]) + if self.options.opentelemetry_tracing: + self.cpp_info.components["viamsdk"].requires.extend([ + "opentelemetry-cpp::opentelemetry_trace", + "opentelemetry-cpp::opentelemetry_exporter_otlp_grpc", + ]) + self.cpp_info.components["viamsdk"].requires.extend([ "viam_rust_utils" ]) diff --git a/src/viam/api/CMakeLists.txt b/src/viam/api/CMakeLists.txt index 3bb8df79f..c61854587 100644 --- a/src/viam/api/CMakeLists.txt +++ b/src/viam/api/CMakeLists.txt @@ -365,12 +365,6 @@ target_sources(viamapi ${PROTO_GEN_DIR}/google/api/http.pb.cc ${PROTO_GEN_DIR}/google/api/httpbody.pb.cc ${PROTO_GEN_DIR}/google/rpc/status.pb.cc - ${PROTO_GEN_DIR}/opentelemetry/proto/common/v1/common.grpc.pb.cc - ${PROTO_GEN_DIR}/opentelemetry/proto/common/v1/common.pb.cc - ${PROTO_GEN_DIR}/opentelemetry/proto/resource/v1/resource.grpc.pb.cc - ${PROTO_GEN_DIR}/opentelemetry/proto/resource/v1/resource.pb.cc - ${PROTO_GEN_DIR}/opentelemetry/proto/trace/v1/trace.grpc.pb.cc - ${PROTO_GEN_DIR}/opentelemetry/proto/trace/v1/trace.pb.cc ${PROTO_GEN_DIR}/module/v1/module.grpc.pb.cc ${PROTO_GEN_DIR}/module/v1/module.pb.cc ${PROTO_GEN_DIR}/robot/v1/robot.grpc.pb.cc @@ -507,6 +501,26 @@ target_link_libraries(viamapi PRIVATE Threads::Threads ) +# `robot.proto` imports `opentelemetry/proto/trace/v1/trace.proto`, so the +# OTel proto descriptors are part of viamapi's ABI. When tracing is enabled, +# get them from the system `opentelemetry-cpp::proto` library to avoid +# double-registering them alongside `opentelemetry-cpp::otlp_grpc_exporter` +# (which links the same library transitively into viamsdk). When tracing is +# disabled, compile our locally-generated copies into viamapi instead. +if (VIAMCPPSDK_OPENTELEMETRY_TRACING) + target_link_libraries(viamapi PUBLIC opentelemetry-cpp::proto) +else() + target_sources(viamapi + PRIVATE + ${PROTO_GEN_DIR}/opentelemetry/proto/common/v1/common.grpc.pb.cc + ${PROTO_GEN_DIR}/opentelemetry/proto/common/v1/common.pb.cc + ${PROTO_GEN_DIR}/opentelemetry/proto/resource/v1/resource.grpc.pb.cc + ${PROTO_GEN_DIR}/opentelemetry/proto/resource/v1/resource.pb.cc + ${PROTO_GEN_DIR}/opentelemetry/proto/trace/v1/trace.grpc.pb.cc + ${PROTO_GEN_DIR}/opentelemetry/proto/trace/v1/trace.pb.cc + ) +endif() + install( TARGETS viamapi EXPORT viamapi diff --git a/src/viam/sdk/CMakeLists.txt b/src/viam/sdk/CMakeLists.txt index fb4ec7ac0..5874e4c62 100644 --- a/src/viam/sdk/CMakeLists.txt +++ b/src/viam/sdk/CMakeLists.txt @@ -71,6 +71,8 @@ target_sources(viamsdk common/version_metadata.cpp common/world_state.cpp common/private/service_helper.cpp + tracing/private/span_guard.cpp + tracing/private/tracer.cpp components/arm.cpp components/audio_in.cpp components/audio_out.cpp @@ -306,6 +308,14 @@ target_link_libraries(viamsdk PRIVATE Threads::Threads ) +if (VIAMCPPSDK_OPENTELEMETRY_TRACING) + target_compile_definitions(viamsdk PRIVATE VIAMCPPSDK_OPENTELEMETRY_TRACING) + target_link_libraries(viamsdk + PRIVATE opentelemetry-cpp::trace + PRIVATE opentelemetry-cpp::exporter_otlp_grpc + ) +endif() + # if the `viam_rust_utils` target exists then we should use it. If not then # we're probably on a non-x86_64 windows build or some other platform without # automated `rust-utils` builds, so we should use the stubs instead. diff --git a/src/viam/sdk/common/client_helper.cpp b/src/viam/sdk/common/client_helper.cpp index f7e428bce..83405c4cb 100644 --- a/src/viam/sdk/common/client_helper.cpp +++ b/src/viam/sdk/common/client_helper.cpp @@ -8,6 +8,7 @@ #include #include #include +#include namespace viam { namespace sdk { @@ -46,6 +47,7 @@ ClientContext::ClientContext(const ViamChannel& channel) : ClientContext() { if (channel.auth_token().has_value()) { wrapped_context_->AddMetadata("authorization", "Bearer " + *channel.auth_token()); } + impl::inject_trace_context(wrapped_context_.get()); } ClientContext::~ClientContext() = default; diff --git a/src/viam/sdk/common/instance.cpp b/src/viam/sdk/common/instance.cpp index 06baf29f4..a9e91ff2a 100644 --- a/src/viam/sdk/common/instance.cpp +++ b/src/viam/sdk/common/instance.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace viam { namespace sdk { @@ -28,6 +29,7 @@ Instance::Instance() { impl_ = std::make_unique(); impl_->registry.initialize(); impl_->log_mgr.init_logging(); + impl::Tracer::initialize_propagator(); } Instance::~Instance() { diff --git a/src/viam/sdk/common/instance.hpp b/src/viam/sdk/common/instance.hpp index de10ae896..979eab9b8 100644 --- a/src/viam/sdk/common/instance.hpp +++ b/src/viam/sdk/common/instance.hpp @@ -5,6 +5,10 @@ namespace viam { namespace sdk { +namespace impl { +class Tracer; +} // namespace impl + /// @brief Instance management for Viam C++ SDK applications. /// This is a single instance class which is responsible for global setup and teardown related to /// the SDK. An Instance must be constructed before doing anything else in a program, and it must @@ -29,6 +33,7 @@ class Instance { private: friend class Registry; friend class LogManager; + friend class impl::Tracer; struct Impl; std::unique_ptr impl_; diff --git a/src/viam/sdk/common/private/instance.hpp b/src/viam/sdk/common/private/instance.hpp index cb09480d9..dc4b176e8 100644 --- a/src/viam/sdk/common/private/instance.hpp +++ b/src/viam/sdk/common/private/instance.hpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace viam { namespace sdk { @@ -10,6 +11,7 @@ namespace sdk { struct Instance::Impl { Registry registry; LogManager log_mgr; + impl::Tracer tracer; }; } // namespace sdk diff --git a/src/viam/sdk/common/private/service_helper.hpp b/src/viam/sdk/common/private/service_helper.hpp index 52cb4ea2c..799335ca2 100644 --- a/src/viam/sdk/common/private/service_helper.hpp +++ b/src/viam/sdk/common/private/service_helper.hpp @@ -9,6 +9,7 @@ #include #include +#include namespace viam { namespace sdk { @@ -30,6 +31,10 @@ class ServiceHelperBase { protected: explicit ServiceHelperBase(const char* method) noexcept : method_{method} {} + const char* method_name() const noexcept { + return method_; + } + private: const char* method_; }; @@ -56,7 +61,8 @@ class ServiceHelper : public ServiceHelperBase { return failNoResource(request_->name()); } const GrpcContextObserver::Enable enable{*context_}; - return invoke_(std::forward(callable), std::move(resource)); + impl::ServerSpanGuard span_guard{context_, method_name()}; + return span_guard.commit(invoke_(std::forward(callable), std::move(resource))); } catch (const std::exception& xcp) { return failStdException(xcp); } catch (...) { diff --git a/src/viam/sdk/common/utils.cpp b/src/viam/sdk/common/utils.cpp index 15eaa7c62..1d30dec7e 100644 --- a/src/viam/sdk/common/utils.cpp +++ b/src/viam/sdk/common/utils.cpp @@ -179,16 +179,30 @@ boost::optional get_env(const char* var) { } bool is_env_var_true(const char* var) { + static constexpr const std::array truth_vals{ + {"true", "yes", "1", "TRUE", "YES"}}; + if (const auto& val = get_env(var)) { - for (const char* truth : {"true", "yes", "1", "TRUE", "YES"}) { - if (*val == truth) { - return true; - } - } + return std::any_of(truth_vals.begin(), truth_vals.end(), [&val](const char* truth) { + return *val == truth; + }); } return false; } +bool is_env_var_false(const char* var) { + static constexpr const std::array false_vals{ + {"false", "no", "0", "FALSE", "NO"}}; + + if (const auto& val = get_env(var)) { + return std::any_of(false_vals.begin(), false_vals.end(), [&val](const char* untruth) { + return *val == untruth; + }); + } else { + return true; + } +} + } // namespace sdk } // namespace viam diff --git a/src/viam/sdk/common/utils.hpp b/src/viam/sdk/common/utils.hpp index 747e91a55..7c8a7229e 100644 --- a/src/viam/sdk/common/utils.hpp +++ b/src/viam/sdk/common/utils.hpp @@ -125,5 +125,9 @@ boost::optional get_env(const char* var); /// "true", "yes", "1", "TRUE", or "YES" bool is_env_var_true(const char* var); +/// @brief Returns whether the environment variable named @param var is unset, or set and equal to +/// "false", "no", "0", "FALSE", or "NO" +bool is_env_var_false(const char* var); + } // namespace sdk } // namespace viam diff --git a/src/viam/sdk/components/private/board_server.cpp b/src/viam/sdk/components/private/board_server.cpp index 223b85efc..788b6a884 100644 --- a/src/viam/sdk/components/private/board_server.cpp +++ b/src/viam/sdk/components/private/board_server.cpp @@ -117,14 +117,17 @@ ::grpc::Status BoardServer::ReadAnalogReader( ::grpc::ServerContext* context, const ::viam::component::board::v1::ReadAnalogReaderRequest* request, ::viam::component::board::v1::ReadAnalogReaderResponse* response) { + ServerSpanGuard span_guard{context, "BoardServer::ReadAnalogReader"}; + if (!request) { - return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, - "Called [Board::ReadAnalogReader] without a request"); + return span_guard.commit(::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, + "Called [Board::ReadAnalogReader] without a request")); }; const std::shared_ptr rb = resource_manager()->resource(request->board_name()); if (!rb) { - return grpc::Status(grpc::UNKNOWN, "resource not found: " + request->board_name()); + return span_guard.commit( + grpc::Status(grpc::UNKNOWN, "resource not found: " + request->board_name())); } const std::shared_ptr board = std::dynamic_pointer_cast(rb); @@ -141,21 +144,24 @@ ::grpc::Status BoardServer::ReadAnalogReader( response->set_max_range(result.max_range); response->set_step_size(result.step_size); - return ::grpc::Status(); + return span_guard.commit(::grpc::Status()); } ::grpc::Status BoardServer::WriteAnalog( ::grpc::ServerContext* context, const ::viam::component::board::v1::WriteAnalogRequest* request, ::viam::component::board::v1::WriteAnalogResponse*) { + ServerSpanGuard span_guard{context, "BoardServer::WriteAnalog"}; + if (!request) { - return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, - "Called [Board::WriteAnalog] without a request"); + return span_guard.commit(::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, + "Called [Board::WriteAnalog] without a request")); }; const std::shared_ptr rb = resource_manager()->resource(request->name()); if (!rb) { - return grpc::Status(grpc::UNKNOWN, "resource not found: " + request->name()); + return span_guard.commit( + grpc::Status(grpc::UNKNOWN, "resource not found: " + request->name())); } const std::shared_ptr board = std::dynamic_pointer_cast(rb); @@ -168,21 +174,25 @@ ::grpc::Status BoardServer::WriteAnalog( const GrpcContextObserver::Enable enable{*context}; board->write_analog(request->pin(), request->value(), extra); - return ::grpc::Status(); + return span_guard.commit(::grpc::Status()); } ::grpc::Status BoardServer::GetDigitalInterruptValue( ::grpc::ServerContext* context, const ::viam::component::board::v1::GetDigitalInterruptValueRequest* request, ::viam::component::board::v1::GetDigitalInterruptValueResponse* response) { + ServerSpanGuard span_guard{context, "BoardServer::GetDigitalInterruptValue"}; + if (!request) { - return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, - "Called [Board::GetDigitalInterruptValue] without a request"); + return span_guard.commit( + ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, + "Called [Board::GetDigitalInterruptValue] without a request")); }; const std::shared_ptr rb = resource_manager()->resource(request->board_name()); if (!rb) { - return grpc::Status(grpc::UNKNOWN, "resource not found: " + request->board_name()); + return span_guard.commit( + grpc::Status(grpc::UNKNOWN, "resource not found: " + request->board_name())); } const std::shared_ptr board = std::dynamic_pointer_cast(rb); @@ -197,7 +207,7 @@ ::grpc::Status BoardServer::GetDigitalInterruptValue( board->read_digital_interrupt(request->digital_interrupt_name(), extra); response->set_value(result); - return ::grpc::Status(); + return span_guard.commit(::grpc::Status()); } ::grpc::Status BoardServer::StreamTicks( diff --git a/src/viam/sdk/module/service.cpp b/src/viam/sdk/module/service.cpp index a9fe75189..6fca4a49e 100644 --- a/src/viam/sdk/module/service.cpp +++ b/src/viam/sdk/module/service.cpp @@ -44,6 +44,8 @@ #include #include #include +#include +#include namespace viam { namespace sdk { @@ -68,6 +70,8 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service { ::grpc::Status AddResource(::grpc::ServerContext* ctx, const ::viam::module::v1::AddResourceRequest* request, ::viam::module::v1::AddResourceResponse*) override { + impl::ServerSpanGuard span_guard{ctx, "AddResource"}; + const viam::app::v1::ComponentConfig& proto = request->config(); const ResourceConfig cfg = from_proto(proto); const std::lock_guard lock(parent.lock_); @@ -81,23 +85,25 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service { res = reg->construct_resource(deps, cfg); res->set_log_level(cfg.get_log_level()); } catch (const std::exception& exc) { - return grpc::Status(::grpc::INTERNAL, exc.what()); + return span_guard.commit(grpc::Status(::grpc::INTERNAL, exc.what())); } } try { parent.server_->add_resource(res, ctx->deadline()); } catch (const std::exception& exc) { - return grpc::Status(::grpc::INTERNAL, exc.what()); + return span_guard.commit(grpc::Status(::grpc::INTERNAL, exc.what())); } - return grpc::Status(); + return span_guard.commit(grpc::Status()); } ::grpc::Status ReconfigureResource( - ::grpc::ServerContext*, + ::grpc::ServerContext* ctx, const ::viam::module::v1::ReconfigureResourceRequest* request, ::viam::module::v1::ReconfigureResourceResponse*) override { + impl::ServerSpanGuard span_guard{ctx, "ReconfigureResource"}; + const viam::app::v1::ComponentConfig& proto = request->config(); ResourceConfig cfg = from_proto(proto); @@ -105,8 +111,8 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service { auto resource_server = parent.server_->lookup_resource_server(cfg.api()); if (!resource_server) { - return grpc::Status(grpc::UNKNOWN, - "no rpc service for config: " + cfg.api().to_string()); + return span_guard.commit(grpc::Status( + grpc::UNKNOWN, "no rpc service for config: " + cfg.api().to_string())); } auto manager = resource_server->resource_manager(); @@ -116,9 +122,10 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service { { const std::shared_ptr res = manager->resource(cfg.resource_name().name()); if (!res) { - return grpc::Status(grpc::UNKNOWN, - "unable to stop resource " + cfg.resource_name().name() + - " as it doesn't exist."); + return span_guard.commit( + grpc::Status(grpc::UNKNOWN, + "unable to stop resource " + cfg.resource_name().name() + + " as it doesn't exist.")); } if (auto stoppable = std::dynamic_pointer_cast(res)) { @@ -130,8 +137,8 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service { Registry::get().lookup_model(cfg.api(), cfg.model()); if (!reg) { - return grpc::Status(::grpc::INTERNAL, - "Unable to rebuild resource: model registration not found"); + return span_guard.commit(grpc::Status( + ::grpc::INTERNAL, "Unable to rebuild resource: model registration not found")); } try { @@ -139,24 +146,27 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service { return reg->construct_resource(deps, cfg); }); } catch (const std::exception& exc) { - return grpc::Status(::grpc::INTERNAL, exc.what()); + return span_guard.commit(grpc::Status(::grpc::INTERNAL, exc.what())); } - return grpc::Status(); + return span_guard.commit(grpc::Status()); } - ::grpc::Status ValidateConfig(::grpc::ServerContext*, + ::grpc::Status ValidateConfig(::grpc::ServerContext* ctx, const ::viam::module::v1::ValidateConfigRequest* request, ::viam::module::v1::ValidateConfigResponse* response) override { + impl::ServerSpanGuard span_guard{ctx, "ValidateConfig"}; + const viam::app::v1::ComponentConfig& proto = request->config(); ResourceConfig cfg = from_proto(proto); const std::shared_ptr reg = Registry::get().lookup_model(cfg.api(), cfg.model()); if (!reg) { - return grpc::Status(grpc::UNKNOWN, - "unable to validate resource " + cfg.resource_name().name() + - " as it hasn't been registered."); + return span_guard.commit( + grpc::Status(grpc::UNKNOWN, + "unable to validate resource " + cfg.resource_name().name() + + " as it hasn't been registered.")); } try { const std::vector implicit_deps = reg->validate(cfg); @@ -164,26 +174,30 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service { response->add_dependencies(dep); } } catch (const std::exception& err) { - return grpc::Status(grpc::UNKNOWN, - "validation failure in resource " + cfg.name() + ": " + err.what()); + return span_guard.commit(grpc::Status( + grpc::UNKNOWN, + "validation failure in resource " + cfg.name() + ": " + err.what())); } - return grpc::Status(); + return span_guard.commit(grpc::Status()); } - ::grpc::Status RemoveResource(::grpc::ServerContext*, + ::grpc::Status RemoveResource(::grpc::ServerContext* ctx, const ::viam::module::v1::RemoveResourceRequest* request, ::viam::module::v1::RemoveResourceResponse*) override { + impl::ServerSpanGuard span_guard{ctx, "RemoveResource"}; + auto name = Name::from_string(request->name()); auto resource_server = parent.server_->lookup_resource_server(name.api()); if (!resource_server) { - return grpc::Status(grpc::UNKNOWN, "no grpc service for " + name.api().to_string()); + return span_guard.commit( + grpc::Status(grpc::UNKNOWN, "no grpc service for " + name.api().to_string())); } const std::shared_ptr manager = resource_server->resource_manager(); const std::shared_ptr res = manager->resource(name.name()); if (!res) { - return grpc::Status( + return span_guard.commit(grpc::Status( grpc::UNKNOWN, - "unable to remove resource " + name.to_string() + " as it doesn't exist."); + "unable to remove resource " + name.to_string() + " as it doesn't exist.")); } if (auto stoppable = std::dynamic_pointer_cast(res)) { @@ -191,7 +205,7 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service { } manager->remove(name); - return grpc::Status(); + return span_guard.commit(grpc::Status()); } ::grpc::Status Ready(::grpc::ServerContext*, @@ -210,6 +224,7 @@ struct ModuleService::ServiceImpl : viam::module::v1::ModuleService::Service { .set_reconnect_every_interval(std::chrono::seconds{1}); parent.parent_ = RobotClient::at_local_socket(parent.parent_addr_, opts); parent.parent_->connect_logging(); + parent.parent_->connect_tracing(); } } @@ -289,6 +304,8 @@ ModuleService::~ModuleService() { VIAM_SDK_LOG(info) << "Shutting down gracefully."; server_->shutdown(); + impl::Tracer::get().shutdown_provider(); + if (parent_) { try { parent_.reset(); diff --git a/src/viam/sdk/robot/client.cpp b/src/viam/sdk/robot/client.cpp index 67e9697ab..21ca1c936 100644 --- a/src/viam/sdk/robot/client.cpp +++ b/src/viam/sdk/robot/client.cpp @@ -30,6 +30,7 @@ #include #include #include +#include namespace viam { namespace sdk { @@ -97,6 +98,7 @@ struct RobotClient::impl { boost::log::core::get()->remove_sink(log_sink); LogManager::get().enable_console_logging(); } + sdk::impl::Tracer::get().shutdown_provider(); } template @@ -336,6 +338,19 @@ void RobotClient::log(const std::string& name, } } +bool RobotClient::send_traces(const robot::v1::SendTracesRequest* req) { + if (!impl_) { + return false; + } + robot::v1::SendTracesResponse resp; + ClientContext ctx; + return impl_->stub->SendTraces(ctx, *req, &resp).ok(); +} + +void RobotClient::connect_tracing() { + sdk::impl::Tracer::get().initialize_provider(this); +} + std::shared_ptr RobotClient::with_channel(ViamChannel channel, const Options& options) { auto robot = std::make_shared(std::move(channel)); diff --git a/src/viam/sdk/robot/client.hpp b/src/viam/sdk/robot/client.hpp index 519569e56..8b5e0dfdb 100644 --- a/src/viam/sdk/robot/client.hpp +++ b/src/viam/sdk/robot/client.hpp @@ -25,6 +25,7 @@ namespace v1 { class FrameSystemConfig; class Operation; +class SendTracesRequest; } // namespace v1 } // namespace robot @@ -33,7 +34,8 @@ namespace sdk { namespace impl { struct LogBackend; -} +class ParentSendTracesExporter; +} // namespace impl /// @defgroup Robot Classes related to a Robot representation. @@ -186,12 +188,16 @@ class RobotClient { private: friend class ModuleService; friend struct impl::LogBackend; + friend class impl::ParentSendTracesExporter; void log(const std::string& name, const std::string& level, const std::string& message, time_pt time); + // Ships a batch of OTLP traces to the parent. Returns true on success. + bool send_traces(const robot::v1::SendTracesRequest* req); + // Makes this RobotClient manage logging by sending logs over grpc to viam-server. // This is private and only ever called by ModuleService; in other words it is only called when // viam-server is running a Viam C++ SDK application as a module. @@ -199,6 +205,10 @@ class RobotClient { // re-enabled on destruction. void connect_logging(); + // Installs the SDK tracer provider so spans flow back to the parent over this connection. + // Only called by ModuleService when running as a module. + void connect_tracing(); + void refresh_every(); void check_connection(); diff --git a/src/viam/sdk/tracing/private/span_guard.cpp b/src/viam/sdk/tracing/private/span_guard.cpp new file mode 100644 index 000000000..92f4eba50 --- /dev/null +++ b/src/viam/sdk/tracing/private/span_guard.cpp @@ -0,0 +1,148 @@ +#include + +#include +#include + +#ifdef VIAMCPPSDK_OPENTELEMETRY_TRACING + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace otel_ctx = opentelemetry::context; +namespace otel_prop = opentelemetry::context::propagation; +namespace otel_trace = opentelemetry::trace; + +namespace viam { +namespace sdk { +namespace impl { + +namespace { + +constexpr const char* k_instrumentation_scope = "viam-cpp-sdk"; + +// Carrier for reading W3C trace context from incoming gRPC request metadata (server side). +class GrpcServerCarrier : public otel_prop::TextMapCarrier { + public: + explicit GrpcServerCarrier(const grpc::ServerContext& ctx) noexcept : ctx_(ctx) {} + + opentelemetry::nostd::string_view Get( + opentelemetry::nostd::string_view key) const noexcept override { + const auto& metadata = ctx_.client_metadata(); + const auto it = metadata.find(grpc::string_ref{key.data(), key.size()}); + if (it != metadata.end()) { + return {it->second.data(), it->second.size()}; + } + return {}; + } + + void Set(opentelemetry::nostd::string_view, + opentelemetry::nostd::string_view) noexcept override {} + + private: + const grpc::ServerContext& ctx_; +}; + +// Carrier for writing W3C trace context into outgoing gRPC request metadata (client side). +class GrpcClientCarrier : public otel_prop::TextMapCarrier { + public: + explicit GrpcClientCarrier(grpc::ClientContext* ctx) noexcept : ctx_(ctx) {} + + opentelemetry::nostd::string_view Get( + opentelemetry::nostd::string_view) const noexcept override { + return {}; + } + + void Set(opentelemetry::nostd::string_view key, + opentelemetry::nostd::string_view value) noexcept override { + ctx_->AddMetadata(std::string(key), std::string(value)); + } + + private: + grpc::ClientContext* ctx_; +}; + +} // namespace + +opentelemetry::nostd::shared_ptr ServerSpanGuard::start_span_( + const GrpcServerContext* ctx, const char* method) noexcept { + auto tracer = otel_trace::Provider::GetTracerProvider()->GetTracer(k_instrumentation_scope); + + otel_trace::StartSpanOptions opts; + opts.kind = otel_trace::SpanKind::kServer; + + if (ctx) { + GrpcServerCarrier carrier{*ctx}; + auto current_ctx = otel_ctx::RuntimeContext::GetCurrent(); + const auto extracted = otel_prop::GlobalTextMapPropagator::GetGlobalPropagator()->Extract( + carrier, current_ctx); + opts.parent = otel_trace::GetSpan(extracted)->GetContext(); + } + + auto span = tracer->StartSpan(method, opts); + span->SetAttribute("rpc.system", "grpc"); + return span; +} + +// Constructing `scope_` makes the span active on the current thread. +ServerSpanGuard::ServerSpanGuard(const GrpcServerContext* ctx, const char* method) noexcept + : span_(start_span_(ctx, method)), scope_(span_) {} + +ServerSpanGuard::~ServerSpanGuard() noexcept { + if (!committed_) { + span_->SetStatus(otel_trace::StatusCode::kError, "handler threw an exception"); + } + span_->End(); +} + +::grpc::Status ServerSpanGuard::commit(::grpc::Status status) noexcept { + committed_ = true; + if (status.error_code() == ::grpc::StatusCode::OK) { + span_->SetStatus(otel_trace::StatusCode::kOk); + } else { + span_->SetStatus(otel_trace::StatusCode::kError); + span_->SetAttribute("rpc.grpc.status_code", static_cast(status.error_code())); + } + return status; +} + +void inject_trace_context(GrpcClientContext* ctx) noexcept { + GrpcClientCarrier carrier{ctx}; + otel_prop::GlobalTextMapPropagator::GetGlobalPropagator()->Inject( + carrier, otel_ctx::RuntimeContext::GetCurrent()); +} + +} // namespace impl +} // namespace sdk +} // namespace viam + +#else // VIAMCPPSDK_OPENTELEMETRY_TRACING — provide no-op implementations + +namespace viam { +namespace sdk { +namespace impl { + +ServerSpanGuard::ServerSpanGuard(const GrpcServerContext*, const char*) noexcept {} + +ServerSpanGuard::~ServerSpanGuard() noexcept = default; + +::grpc::Status ServerSpanGuard::commit( // NOLINT(readability-convert-member-functions-to-static) + ::grpc::Status status) noexcept { + return status; +} + +void inject_trace_context(GrpcClientContext*) noexcept {} + +} // namespace impl +} // namespace sdk +} // namespace viam + +#endif // VIAMCPPSDK_OPENTELEMETRY_TRACING diff --git a/src/viam/sdk/tracing/private/span_guard.hpp b/src/viam/sdk/tracing/private/span_guard.hpp new file mode 100644 index 000000000..a95ede7af --- /dev/null +++ b/src/viam/sdk/tracing/private/span_guard.hpp @@ -0,0 +1,58 @@ +#pragma once + +#include + +#include + +#ifdef VIAMCPPSDK_OPENTELEMETRY_TRACING +#include +#include +#endif + +namespace viam { +namespace sdk { +namespace impl { + +/// @brief RAII guard that creates a server-side OpenTelemetry span for the duration of a gRPC +/// handler invocation. Extracts W3C trace context from the incoming gRPC request metadata and +/// starts a child span (or a new root span if no traceparent header is present). When +/// destroyed, ends the span and records the final gRPC status. +/// +/// If OpenTelemetry tracing is not compiled in, or no tracer provider has been configured, +/// uses a no-op implementation. +/// +/// @note Instances must be created and destroyed on the same thread (the gRPC handler thread). +class ServerSpanGuard { + public: + explicit ServerSpanGuard(const GrpcServerContext* ctx, const char* method) noexcept; + ~ServerSpanGuard() noexcept; + + /// @brief Record the final gRPC status before destruction and return it unchanged. + ::grpc::Status commit(::grpc::Status status) noexcept; + + ServerSpanGuard(const ServerSpanGuard&) = delete; + ServerSpanGuard& operator=(const ServerSpanGuard&) = delete; + +#ifdef VIAMCPPSDK_OPENTELEMETRY_TRACING + private: + // Builds the span and makes it active; runs before `scope_` in the constructor init list. + static opentelemetry::nostd::shared_ptr start_span_( + const GrpcServerContext* ctx, const char* method) noexcept; + + opentelemetry::nostd::shared_ptr span_; + opentelemetry::trace::Scope scope_; + bool committed_ = false; +#endif +}; + +/// @brief Inject the currently-active OpenTelemetry trace context as W3C @c traceparent / +/// @c tracestate metadata entries into an outgoing gRPC client context. +/// +/// This propagates trace context from any server span that is currently active on the calling +/// thread (e.g., one created by @c ServerSpanGuard) to downstream gRPC calls. It is a no-op +/// when no span is active or when OpenTelemetry tracing is not compiled in. +void inject_trace_context(GrpcClientContext* ctx) noexcept; + +} // namespace impl +} // namespace sdk +} // namespace viam diff --git a/src/viam/sdk/tracing/private/tracer.cpp b/src/viam/sdk/tracing/private/tracer.cpp new file mode 100644 index 000000000..dc6c932ee --- /dev/null +++ b/src/viam/sdk/tracing/private/tracer.cpp @@ -0,0 +1,173 @@ +#include + +#include +#include +#include + +#ifdef VIAMCPPSDK_OPENTELEMETRY_TRACING + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace otel_common = opentelemetry::sdk::common; +namespace otel_prop = opentelemetry::context::propagation; +namespace otel_trace = opentelemetry::trace; +namespace otel_otlp = opentelemetry::exporter::otlp; +namespace otel_sdk_resource = opentelemetry::sdk::resource; +namespace otel_sdk_trace = opentelemetry::sdk::trace; + +namespace viam { +namespace sdk { +namespace impl { + +namespace { + +constexpr const char* k_instrumentation_scope = "viam-cpp-sdk"; + +} // namespace + +// Ships OTLP-encoded spans to the parent process via RobotClient::send_traces. +class ParentSendTracesExporter final : public otel_sdk_trace::SpanExporter { + public: + explicit ParentSendTracesExporter(RobotClient* client) noexcept : client_(client) {} + + std::unique_ptr MakeRecordable() noexcept override { + return std::unique_ptr(new otel_otlp::OtlpRecordable()); + } + + otel_common::ExportResult Export( + const opentelemetry::nostd::span>& + spans) noexcept override { + if (is_shutdown_.load(std::memory_order_acquire)) { + return otel_common::ExportResult::kFailure; + } + if (spans.empty()) { + return otel_common::ExportResult::kSuccess; + } + + opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest otlp_req; + otel_otlp::OtlpRecordableUtils::PopulateRequest(spans, &otlp_req); + + viam::robot::v1::SendTracesRequest req; + req.mutable_resource_spans()->Swap(otlp_req.mutable_resource_spans()); + + return client_->send_traces(&req) ? otel_common::ExportResult::kSuccess + : otel_common::ExportResult::kFailure; + } + + bool ForceFlush(std::chrono::microseconds /*timeout*/) noexcept override { + return true; + } + + bool Shutdown(std::chrono::microseconds /*timeout*/) noexcept override { + is_shutdown_.store(true, std::memory_order_release); + return true; + } + + private: + RobotClient* client_; + std::atomic is_shutdown_{false}; +}; + +void Tracer::initialize_propagator() noexcept { + otel_prop::GlobalTextMapPropagator::SetGlobalPropagator( + opentelemetry::nostd::shared_ptr( + new opentelemetry::trace::propagation::HttpTraceContext())); +} + +Tracer::Tracer() = default; +Tracer::~Tracer() { + shutdown_provider(); +} + +Tracer& Tracer::get() { + return Instance::current(Instance::Creation::open_existing).impl_->tracer; +} + +void Tracer::initialize_provider(RobotClient* client) noexcept { + shutdown_provider(); + + if (is_env_var_false("VIAM_MODULE_TRACING")) { + return; + } + + auto exporter = + std::unique_ptr(new ParentSendTracesExporter(client)); + + auto processor = otel_sdk_trace::BatchSpanProcessorFactory::Create( + std::move(exporter), otel_sdk_trace::BatchSpanProcessorOptions{}); + + auto resource = otel_sdk_resource::Resource::Create({ + {"service.name", std::string{k_instrumentation_scope}}, + }); + + sdk_provider_ = std::shared_ptr( + otel_sdk_trace::TracerProviderFactory::Create(std::move(processor), resource)); + + std::shared_ptr base_provider = sdk_provider_; + otel_trace::Provider::SetTracerProvider( + opentelemetry::nostd::shared_ptr(std::move(base_provider))); +} + +void Tracer::shutdown_provider() noexcept { + if (!sdk_provider_) { + return; + } + sdk_provider_->Shutdown(); + otel_trace::Provider::SetTracerProvider( + opentelemetry::nostd::shared_ptr( + new otel_trace::NoopTracerProvider())); + sdk_provider_.reset(); +} + +} // namespace impl +} // namespace sdk +} // namespace viam + +#else // VIAMCPPSDK_OPENTELEMETRY_TRACING — provide no-op implementations + +namespace viam { +namespace sdk { + +class RobotClient; + +namespace impl { + +void Tracer::initialize_propagator() noexcept {} + +Tracer::Tracer() = default; +Tracer::~Tracer() = default; + +Tracer& Tracer::get() { + return Instance::current(Instance::Creation::open_existing).impl_->tracer; +} + +void Tracer::initialize_provider(RobotClient*) noexcept {} +void Tracer::shutdown_provider() noexcept {} + +} // namespace impl +} // namespace sdk +} // namespace viam + +#endif // VIAMCPPSDK_OPENTELEMETRY_TRACING diff --git a/src/viam/sdk/tracing/private/tracer.hpp b/src/viam/sdk/tracing/private/tracer.hpp new file mode 100644 index 000000000..65932ba95 --- /dev/null +++ b/src/viam/sdk/tracing/private/tracer.hpp @@ -0,0 +1,46 @@ +#pragma once + +#ifdef VIAMCPPSDK_OPENTELEMETRY_TRACING +#include + +#include +#endif + +namespace viam { +namespace sdk { + +class RobotClient; + +namespace impl { + +/// @brief Holds the SDK-side OpenTelemetry tracer provider. Spans go nowhere until +/// @c initialize_provider installs an exporter that ships them to the parent. +class Tracer { + public: + Tracer(); + ~Tracer(); + + Tracer(const Tracer&) = delete; + Tracer& operator=(const Tracer&) = delete; + + /// @brief Returns the @c Tracer owned by the current @c Instance. + static Tracer& get(); + + /// @brief Install the W3C Trace Context propagator. Called once during @c Instance + /// construction. + static void initialize_propagator() noexcept; + + /// @brief Install an exporter sourced from @p client. The caller must keep @p client alive + /// until @c shutdown_provider returns. + void initialize_provider(RobotClient* client) noexcept; + void shutdown_provider() noexcept; + +#ifdef VIAMCPPSDK_OPENTELEMETRY_TRACING + private: + std::shared_ptr sdk_provider_; +#endif +}; + +} // namespace impl +} // namespace sdk +} // namespace viam