diff --git a/CMakeLists.txt b/CMakeLists.txt index cb64b0cce..df9dde8ea 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,7 +103,7 @@ pkg_check_modules(LIB60870 IMPORTED_TARGET lib60870>=2.3.1) pkg_check_modules(LIBCONFIG IMPORTED_TARGET libconfig>=1.4.9) pkg_check_modules(MOSQUITTO IMPORTED_TARGET libmosquitto>=1.6.9) pkg_check_modules(MODBUS IMPORTED_TARGET libmodbus>=3.1.0) -pkg_check_modules(RDKAFKA IMPORTED_TARGET rdkafka>=1.5.0) +pkg_check_modules(RDKAFKAPP IMPORTED_TARGET rdkafka++>=1.5.0) pkg_check_modules(HIREDIS IMPORTED_TARGET hiredis>=1.0.0) pkg_check_modules(REDISPP IMPORTED_TARGET redis++>=1.2.0) pkg_check_modules(RABBITMQ_C IMPORTED_TARGET librabbitmq>=0.8.0) @@ -193,7 +193,7 @@ cmake_dependent_option(WITH_NODE_IEC60870 "Build with iec60870 node-types" cmake_dependent_option(WITH_NODE_IEC61850 "Build with iec61850 node-types" "${WITH_DEFAULTS}" "LIBIEC61850_FOUND; NOT WITHOUT_GPL" OFF) cmake_dependent_option(WITH_NODE_INFINIBAND "Build with infiniband node-type" "${WITH_DEFAULTS}" "IBVerbs_FOUND; RDMACM_FOUND" OFF) # Infiniband node-type is currenly broken cmake_dependent_option(WITH_NODE_INFLUXDB "Build with influxdb node-type" "${WITH_DEFAULTS}" "" OFF) -cmake_dependent_option(WITH_NODE_KAFKA "Build with kafka node-type" "${WITH_DEFAULTS}" "RDKAFKA_FOUND" OFF) +cmake_dependent_option(WITH_NODE_KAFKA "Build with kafka node-type" "${WITH_DEFAULTS}" "RDKAFKAPP_FOUND" OFF) cmake_dependent_option(WITH_NODE_LOOPBACK "Build with loopback node-type" "${WITH_DEFAULTS}" "" OFF) cmake_dependent_option(WITH_NODE_MODBUS "Build with modbus node-type" "${WITH_DEFAULTS}" "MODBUS_FOUND" OFF) cmake_dependent_option(WITH_NODE_MQTT "Build with mqtt node-type" "${WITH_DEFAULTS}" "MOSQUITTO_FOUND" OFF) @@ -309,7 +309,7 @@ add_feature_info(NODE_MODBUS WITH_NODE_MODBUS "Build with add_feature_info(NODE_MQTT WITH_NODE_MQTT "Build with mqtt node-type") add_feature_info(NODE_NANOMSG WITH_NODE_NANOMSG "Build with nanomsg node-type") add_feature_info(NODE_NGSI WITH_NODE_NGSI "Build with ngsi node-type") -add_feature_info(NODE_OPAL_AYSNC WITH_NODE_OPAL_ASYNC "Build with opal.async node-type") +add_feature_info(NODE_OPAL_ASYNC WITH_NODE_OPAL_ASYNC "Build with opal.async node-type") add_feature_info(NODE_OPAL_ORCHESTRA WITH_NODE_OPAL_ORCHESTRA "Build with opal.orchestra node-type") add_feature_info(NODE_OPENDSS WITH_NODE_OPENDSS "Build with opendss node-type") add_feature_info(NODE_REDIS WITH_NODE_REDIS "Build with redis node-type") diff --git a/doc/openapi/components/schemas/config/nodes/kafka.yaml b/doc/openapi/components/schemas/config/nodes/kafka.yaml index a86024f8f..56624e6e6 100644 --- a/doc/openapi/components/schemas/config/nodes/kafka.yaml +++ b/doc/openapi/components/schemas/config/nodes/kafka.yaml @@ -56,7 +56,7 @@ allOf: in: type: object properties: - consume: + topic: type: string description: The Kafka topic to which this node-type will subscribe for receiving messages. @@ -67,7 +67,7 @@ allOf: out: type: object properties: - produce: + topic: type: string description: The Kafka topic to which this node-type will publish messages. diff --git a/doc/openapi/components/schemas/config/nodes/opal_orchestra.yaml b/doc/openapi/components/schemas/config/nodes/opal_orchestra.yaml index 816bbee25..6fec75b63 100644 --- a/doc/openapi/components/schemas/config/nodes/opal_orchestra.yaml +++ b/doc/openapi/components/schemas/config/nodes/opal_orchestra.yaml @@ -59,12 +59,6 @@ allOf: description: >- If true, the DDF file provided in the 'dff' setting will be overwriting with settings and signals from the VILLASnode configuration. - ddf_overwrite_only: - type: boolean - default: false - description: >- - If true, VILLASnode will overwrite the file provided in the 'ddf' setting, and terminate immediately afterwards. - rate: type: number default: 1 diff --git a/etc/examples/nodes/kafka.conf b/etc/examples/nodes/kafka.conf index 4f74d5be6..e17f2ba84 100644 --- a/etc/examples/nodes/kafka.conf +++ b/etc/examples/nodes/kafka.conf @@ -7,27 +7,39 @@ nodes = { format = "json.kafka" - server = "localhost:9094" - protocol = "SASL_SSL" + server = "localhost:9092" + protocol = "PLAINTEXT" client_id = "villas-node" in = { - consume = "test-topic" + topic = "test-topic" group_id = "villas-node" } out = { - produce = "test-topic" + topic = "test-topic" } + } - ssl = { - ca = "/etc/ssl/certs/ca.pem" - } + siggen = { + type = "signal" - sasl = { - mechanisms = "SCRAM-SHA-512" - username = "scram-sha-512-usr" - password = "scram-sha-512-pwd" - } + rate = 20 + values = 5 + signal = "mixed" } } + +paths = ( + { + in = "siggen" + out = "kafka_node" + }, + { + in = "kafka_node" + + hooks = [ + "print" + ] + } +) diff --git a/include/villas/node.hpp b/include/villas/node.hpp index f4c52a62e..0b7ab6858 100644 --- a/include/villas/node.hpp +++ b/include/villas/node.hpp @@ -7,6 +7,7 @@ #pragma once +#include #include #include @@ -106,6 +107,13 @@ class Node { virtual json_t *_readStatus() const { return nullptr; } + int parseCommon( + json_t *json, + std::function + parse_signal = [](json_t *j, NodeDirection::Direction d) { + return Signal::fromJson(j); + }); + public: // Initialize node with default values Node(const uuid_t &id = {}, const std::string &name = ""); @@ -277,7 +285,6 @@ class Node { }; class NodeFactory : public villas::plugin::Plugin { - friend Node; protected: diff --git a/include/villas/node_direction.hpp b/include/villas/node_direction.hpp index ecd69b356..7ef96cacf 100644 --- a/include/villas/node_direction.hpp +++ b/include/villas/node_direction.hpp @@ -7,6 +7,8 @@ #pragma once +#include + #include #include @@ -50,7 +52,7 @@ class NodeDirection { NodeDirection(enum NodeDirection::Direction dir, Node *n); - int parse(json_t *json); + int parse(json_t *json, std::function parse_signal); void check(); int prepare(); int start(); diff --git a/include/villas/signal.hpp b/include/villas/signal.hpp index 71e90e629..c7fc48464 100644 --- a/include/villas/signal.hpp +++ b/include/villas/signal.hpp @@ -42,12 +42,14 @@ class Signal { // Parse signal description. int parse(json_t *json); - std::string toString(const union SignalData *d = nullptr) const; + virtual std::string toString(const union SignalData *d = nullptr) const; // Produce JSON representation of signal. - json_t *toJson() const; + virtual json_t *toJson() const; bool isNext(const Signal &sig); + + static Signal::Ptr fromJson(json_t *json); }; } // namespace node diff --git a/include/villas/signal_list.hpp b/include/villas/signal_list.hpp index 210a11bbb..ef6e08daf 100644 --- a/include/villas/signal_list.hpp +++ b/include/villas/signal_list.hpp @@ -7,7 +7,10 @@ #pragma once +#include #include +#include +#include #include @@ -24,16 +27,10 @@ class SignalList : public std::vector { using Ptr = std::shared_ptr; SignalList() {} - + SignalList(json_t *json, std::function parse_signal = + Signal::fromJson); + SignalList(std::string_view dt); SignalList(unsigned len, enum SignalType fmt); - SignalList(const char *dt); - SignalList(json_t *json) { - int ret = parse(json); - if (ret) - throw RuntimeError("Failed to parse signal list"); - } - - int parse(json_t *json); Ptr clone(); @@ -42,9 +39,13 @@ class SignalList : public std::vector { json_t *toJson() const; - int getIndexByName(const std::string &name); - Signal::Ptr getByName(const std::string &name); + int getIndexByName(std::string_view name); + Signal::Ptr getByName(std::string_view name); Signal::Ptr getByIndex(unsigned idx); + + void + parse(json_t *json_signals, + std::function parse_signal = Signal::fromJson); }; } // namespace node diff --git a/lib/hooks/lua.cpp b/lib/hooks/lua.cpp index 0dd89a402..d33951e7c 100644 --- a/lib/hooks/lua.cpp +++ b/lib/hooks/lua.cpp @@ -416,15 +416,13 @@ LuaHook::LuaHook(Path *p, Node *n, int fl, int prio, bool en) LuaHook::~LuaHook() { lua_close(L); } void LuaHook::parseExpressions(json_t *json_sigs) { - int ret; size_t i; json_t *json_sig; + expressions.clear(); signalsExpressions->clear(); - ret = signalsExpressions->parse(json_sigs); - if (ret) - throw ConfigError(json_sigs, "node-config-hook-lua-signals", - "Setting 'signals' must be a list of dicts"); + + signalsExpressions->parse(json_sigs); // cppcheck-suppress unknownMacro json_array_foreach(json_sigs, i, json_sig) diff --git a/lib/node.cpp b/lib/node.cpp index b7453eb4b..f4dd16f1f 100644 --- a/lib/node.cpp +++ b/lib/node.cpp @@ -97,7 +97,12 @@ int Node::prepare() { return 0; } -int Node::parse(json_t *json) { +int Node::parse(json_t *json) { return parseCommon(json); } + +int Node::parseCommon( + json_t *json, + std::function + parse_signal) { assert(state == State::INITIALIZED || state == State::PARSED || state == State::CHECKED); @@ -138,31 +143,37 @@ int Node::parse(json_t *json) { #endif // WITH_NETEM } - struct { + struct Direction { const char *str; - struct NodeDirection *dir; - } dirs[] = {{"in", &in}, {"out", &out}}; + struct NodeDirection *obj; + enum NodeDirection::Direction dir; + }; + std::vector dirs = {{"in", &in, NodeDirection::Direction::IN}, + {"out", &out, NodeDirection::Direction::OUT}}; - const char *fields[] = {"signals", "builtin", "vectorize", "hooks"}; + std::vector fields = {"signals", "builtin", "vectorize", + "hooks"}; - for (unsigned j = 0; j < ARRAY_LEN(dirs); j++) { - json_t *json_dir = json_object_get(json, dirs[j].str); + for (auto &dir : dirs) { + json_t *json_dir = json_object_get(json, dir.str); // Skip if direction is unused if (!json_dir) { json_dir = json_pack("{ s: b }", "enabled", 0); } - // Copy missing fields from main node config to direction config - for (unsigned i = 0; i < ARRAY_LEN(fields); i++) { - json_t *json_field_dir = json_object_get(json_dir, fields[i]); - json_t *json_field_node = json_object_get(json, fields[i]); + // Copy missing fields from main node config to direction config. + for (auto &field : fields) { + json_t *json_field_dir = json_object_get(json_dir, field.c_str()); + json_t *json_field_node = json_object_get(json, field.c_str()); - if (json_field_node && !json_field_dir) - json_object_set(json_dir, fields[i], json_field_node); + if (json_field_node && !json_field_dir) { + json_object_set(json_dir, field.c_str(), json_field_node); + } } - ret = dirs[j].dir->parse(json_dir); + ret = dir.obj->parse(json_dir, + [&](json_t *j) { return parse_signal(j, dir.dir); }); if (ret) return ret; } diff --git a/lib/node_compat.cpp b/lib/node_compat.cpp index 02fcd759c..48bb7a80e 100644 --- a/lib/node_compat.cpp +++ b/lib/node_compat.cpp @@ -74,7 +74,7 @@ int NodeCompat::prepare() { } int NodeCompat::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/node_direction.cpp b/lib/node_direction.cpp index 39088b014..ff6d02473 100644 --- a/lib/node_direction.cpp +++ b/lib/node_direction.cpp @@ -22,7 +22,8 @@ NodeDirection::NodeDirection(enum NodeDirection::Direction dir, Node *n) : direction(dir), path(nullptr), node(n), enabled(1), builtin(1), vectorize(1), config(nullptr) {} -int NodeDirection::parse(json_t *json) { +int NodeDirection::parse(json_t *json, + std::function parse_signal) { int ret; json_error_t err; @@ -44,53 +45,10 @@ int NodeDirection::parse(json_t *json) { signals = std::make_shared(); if (!signals) throw MemoryAllocationError(); - } else if (json_is_object(json_signals) || json_is_array(json_signals)) { - signals = std::make_shared(); + } else if (json_signals) { + signals = std::make_shared(json_signals, parse_signal); if (!signals) throw MemoryAllocationError(); - - if (json_is_object(json_signals)) { - json_t *json_name, *json_signal = json_signals; - int count; - - ret = json_unpack_ex(json_signal, &err, 0, "{ s: i }", "count", &count); - if (ret) - throw ConfigError(json_signals, "node-config-node-signals", - "Invalid signal definition"); - - json_signals = json_array(); - for (int i = 0; i < count; i++) { - json_t *json_signal_copy = json_copy(json_signal); - - json_object_del(json_signal, "count"); - - // Append signal index - json_name = json_object_get(json_signal_copy, "name"); - if (json_name) { - const char *name = json_string_value(json_name); - char *name_new; - - int ret __attribute__((unused)); - ret = asprintf(&name_new, "%s%d", name, i); - - json_string_set(json_name, name_new); - } - - json_array_append_new(json_signals, json_signal_copy); - } - json_object_set_new(json, "signals", json_signals); - } - - ret = signals->parse(json_signals); - if (ret) - throw ConfigError(json_signals, "node-config-node-signals", - "Failed to parse signal definition"); - } else if (json_is_string(json_signals)) { - const char *dt = json_string_value(json_signals); - - signals = std::make_shared(dt); - if (!signals) - return -1; } else { signals = std::make_shared(DEFAULT_SAMPLE_LENGTH, SignalType::FLOAT); diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 068ad9fba..efb6d7c02 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -123,7 +123,7 @@ endif() # Enable Kafka support if(WITH_NODE_KAFKA) list(APPEND NODE_SRC kafka.cpp) - list(APPEND LIBRARIES PkgConfig::RDKAFKA) + list(APPEND LIBRARIES PkgConfig::RDKAFKAPP) endif() # Enable Comedi support diff --git a/lib/nodes/api.cpp b/lib/nodes/api.cpp index 89b9dfe37..4bb1f9af6 100644 --- a/lib/nodes/api.cpp +++ b/lib/nodes/api.cpp @@ -88,7 +88,7 @@ int APINode::_write(struct Sample *smps[], unsigned cnt) { } int APINode::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/nodes/exec.cpp b/lib/nodes/exec.cpp index 6fe0cbde8..b94134e70 100644 --- a/lib/nodes/exec.cpp +++ b/lib/nodes/exec.cpp @@ -28,7 +28,7 @@ ExecNode::~ExecNode() { } int ExecNode::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/nodes/fpga.cpp b/lib/nodes/fpga.cpp index 45f1e3f24..dcf3b1bb7 100644 --- a/lib/nodes/fpga.cpp +++ b/lib/nodes/fpga.cpp @@ -112,7 +112,7 @@ int FpgaNode::prepare() { int FpgaNode::stop() { return Node::stop(); } int FpgaNode::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) { return ret; } diff --git a/lib/nodes/iec60870.cpp b/lib/nodes/iec60870.cpp index e28f1d952..5b73647f7 100644 --- a/lib/nodes/iec60870.cpp +++ b/lib/nodes/iec60870.cpp @@ -677,7 +677,7 @@ SlaveNode::SlaveNode(const uuid_t &id, const std::string &name) SlaveNode::~SlaveNode() { destroySlave(); } int SlaveNode::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/nodes/iec61850_goose.cpp b/lib/nodes/iec61850_goose.cpp index 45003a1c3..35d791a52 100644 --- a/lib/nodes/iec61850_goose.cpp +++ b/lib/nodes/iec61850_goose.cpp @@ -622,16 +622,15 @@ GooseNode::~GooseNode() { } int GooseNode::parse(json_t *json) { - int ret; - json_error_t err; - - ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; json_t *json_keys = nullptr; json_t *json_in = nullptr; json_t *json_out = nullptr; + + json_error_t err; ret = json_unpack_ex(json, &err, 0, // "{ s:?o, s:?o, s:?o }", // "keys", &json_keys, // diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index 1c56e1e5f..bce209b8a 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -1,600 +1,449 @@ /* Node type: kafka. * * Author: Juan Pablo Noreña + * Author: Steffen Vogel * SPDX-FileCopyrightText: 2021 Universidad Nacional de Colombia + * SPDX-FileCopyrightText: 2025 OPAL-RT Germany GmbH * SPDX-License-Identifier: Apache-2.0 */ -#include +#include #include -#include +#include #include +#include +#include #include -#include +#include +#include #include using namespace villas; -using namespace villas::node; using namespace villas::utils; +using namespace villas::node; -// Each process has a list of clients for which a thread invokes the kafka loop -static struct List clients; -static pthread_t thread; static Logger logger; -static void kafka_logger_cb(const rd_kafka_t *rk, int level, const char *fac, - const char *buf) { - - switch (level) { - case LOG_EMERG: - case LOG_CRIT: - case LOG_ERR: - logger->error("{}: {}", fac, buf); - break; - - case LOG_ALERT: - case LOG_WARNING: - logger->warn("{}: {}", fac, buf); - break; - - case LOG_DEBUG: - logger->debug("{}: {}", fac, buf); - break; - - case LOG_NOTICE: - case LOG_INFO: - default: - logger->info("{}: {}", fac, buf); - break; - } -} - -static void kafka_message_cb(void *ctx, const rd_kafka_message_t *msg) { - int ret; - auto *n = (NodeCompat *)ctx; - auto *k = n->getData(); - struct Sample *smps[n->in.vectorize]; - - n->logger->debug("Received a message of {} bytes from broker {}", msg->len, - k->server); - - ret = sample_alloc_many(&k->pool, smps, n->in.vectorize); - if (ret <= 0) { - n->logger->warn("Pool underrun in consumer"); - return; - } - - ret = k->formatter->sscan((char *)msg->payload, msg->len, nullptr, smps, - n->in.vectorize); - if (ret < 0) { - n->logger->warn("Received an invalid message"); - n->logger->warn(" Payload: {}", (char *)msg->payload); - return; - } - - if (ret == 0) { - n->logger->debug("Skip empty message"); - sample_decref_many(smps, n->in.vectorize); - return; - } - - ret = queue_signalled_push_many(&k->queue, (void **)smps, n->in.vectorize); - if (ret < (int)n->in.vectorize) - n->logger->warn("Failed to enqueue samples"); -} - -static void *kafka_loop_thread(void *ctx) { - int ret; - - // Set the cancel type of this thread to async - ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); - if (ret != 0) - throw RuntimeError("Unable to set cancel type of Kafka communication " - "thread to asynchronous."); - - while (true) { - for (unsigned i = 0; i < list_length(&clients); i++) { - auto *n = (NodeCompat *)list_at(&clients, i); - auto *k = n->getData(); - - // Execute kafka loop for this client - if (k->consumer.client) { - rd_kafka_message_t *msg = - rd_kafka_consumer_poll(k->consumer.client, k->timeout * 1000); - if (msg) { - kafka_message_cb((void *)n, msg); - rd_kafka_message_destroy(msg); - } - } +class KafkaNode : public Node, public RdKafka::EventCb { + +protected: + // Settings. + std::chrono::milliseconds timeout; + std::string server; // Hostname/IP:Port address of the bootstrap server. + std::string protocol; // Security protocol. + std::string produce; // Producer topic. + std::string consume; // Consumer topic. + std::string client_id; // Client ID. + std::string group_id; // Group ID. + std::string ssl_ca; // SSL CA file. + + struct { + std::unique_ptr client; + std::unique_ptr topic; + } producer; + + struct { + std::unique_ptr client; + std::unique_ptr topic; + std::unique_ptr queue; + int eventFd; + } consumer; + + struct { + std::string mechanisms; // SASL mechanisms. + std::string username; // SSL CA path. + std::string password; // SSL certificate. + } sasl; + + std::unique_ptr formatter; + + int _read(struct Sample *smps[], unsigned cnt) override { + assert(consumer.client != nullptr); + + auto msg = consumer.client->consume(consumer.queue.get(), timeout.count()); + + auto ret = formatter->sscan((char *)msg->payload(), msg->len(), nullptr, + smps, cnt); + if (ret < 0) { + logger->warn("Received an invalid message"); + logger->warn(" Payload: {}", (char *)msg->payload()); + return -1; } - } - - return nullptr; -} - -int villas::node::kafka_reverse(NodeCompat *n) { - auto *k = n->getData(); - - SWAP(k->produce, k->consume); - - return 0; -} - -int villas::node::kafka_init(NodeCompat *n) { - auto *k = n->getData(); - - // Default values - k->server = nullptr; - k->protocol = nullptr; - k->produce = nullptr; - k->consume = nullptr; - k->client_id = nullptr; - k->timeout = 1.0; - - k->consumer.client = nullptr; - k->consumer.group_id = nullptr; - k->producer.client = nullptr; - k->producer.topic = nullptr; - - k->sasl.mechanisms = nullptr; - k->sasl.username = nullptr; - k->sasl.password = nullptr; - - k->ssl.ca = nullptr; - - k->formatter = nullptr; - - return 0; -} - -int villas::node::kafka_parse(NodeCompat *n, json_t *json) { - int ret; - auto *k = n->getData(); - - const char *server; - const char *produce = nullptr; - const char *consume = nullptr; - const char *protocol; - const char *client_id = "villas-node"; - const char *group_id = nullptr; - - json_error_t err; - json_t *json_ssl = nullptr; - json_t *json_sasl = nullptr; - json_t *json_format = nullptr; - - ret = json_unpack_ex(json, &err, 0, - "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: o, s: s, " - "s?: F, s: s, s?: s, s?: o, s?: o }", - "out", "produce", &produce, "in", "consume", &consume, - "group_id", &group_id, "format", &json_format, "server", - &server, "timeout", &k->timeout, "protocol", &protocol, - "client_id", &client_id, "ssl", &json_ssl, "sasl", - &json_sasl); - if (ret) - throw ConfigError(json, err, "node-config-node-kafka"); - - k->server = strdup(server); - k->produce = produce ? strdup(produce) : nullptr; - k->consume = consume ? strdup(consume) : nullptr; - k->protocol = strdup(protocol); - k->client_id = strdup(client_id); - k->consumer.group_id = group_id ? strdup(group_id) : nullptr; - - if (strcmp(protocol, "SSL") && strcmp(protocol, "PLAINTEXT") && - strcmp(protocol, "SASL_SSL") && strcmp(protocol, "SASL_PLAINTEXT")) - throw ConfigError(json, "node-config-node-kafka-protocol", - "Invalid security protocol: {}", protocol); - - if (!k->produce && !k->consume) - throw ConfigError(json, "node-config-node-kafka", - "At least one topic has to be specified for node {}", - n->getName()); - - if (json_ssl) { - const char *ca; - - ret = json_unpack_ex(json_ssl, &err, 0, "{ s: s }", "ca", &ca); - if (ret) - throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", - "Failed to parse SSL configuration of node {}", - n->getName()); - - k->ssl.ca = strdup(ca); - } - - if (json_sasl) { - const char *mechanisms; - const char *username; - const char *password; - - ret = json_unpack_ex(json_sasl, &err, 0, "{ s: s, s: s, s: s }", - "mechanisms", &mechanisms, "username", &username, - "password", &password); - if (ret) - throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", - "Failed to parse SASL configuration"); - - k->sasl.mechanisms = strdup(mechanisms); - k->sasl.username = strdup(username); - k->sasl.password = strdup(password); - } - - // Format - if (k->formatter) - delete k->formatter; - k->formatter = json_format ? FormatFactory::make(json_format) - : FormatFactory::make("villas.binary"); - if (!k->formatter) - throw ConfigError(json_format, "node-config-node-kafka-format", - "Invalid format configuration"); - - return 0; -} -int villas::node::kafka_prepare(NodeCompat *n) { - int ret; - auto *k = n->getData(); - - k->formatter->start(n->getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET); - - ret = pool_init(&k->pool, 1024, - SAMPLE_LENGTH(n->getInputSignals(false)->size())); - if (ret) return ret; + } - ret = queue_signalled_init(&k->queue, 1024); - if (ret) - return ret; - - return 0; -} - -char *villas::node::kafka_print(NodeCompat *n) { - auto *k = n->getData(); - - char *buf = nullptr; + int _write(struct Sample *smps[], unsigned cnt) override { + assert(producer.client != nullptr); - strcatf(&buf, "bootstrap.server=%s, client.id=%s, security.protocol=%s", - k->server, k->client_id, k->protocol); + size_t wbytes; - // Only show if not default - if (k->produce) - strcatf(&buf, ", out.produce=%s", k->produce); + char data[DEFAULT_FORMAT_BUFFER_LENGTH]; - if (k->consume) - strcatf(&buf, ", in.consume=%s", k->consume); + auto ret = formatter->sprint(data, sizeof(data), &wbytes, smps, cnt); + if (ret < 0) + return ret; - return buf; -} + if (!produce.empty()) { + auto ret = producer.client->produce( + producer.topic.get(), RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, data, wbytes, NULL, 0, NULL); + if (ret != RdKafka::ErrorCode::ERR_NO_ERROR) { + logger->warn("Publish failed"); + return -abs(ret); + } + } else + logger->warn( + "No produce possible because no produce topic is configured"); -int villas::node::kafka_destroy(NodeCompat *n) { - int ret; - auto *k = n->getData(); + return cnt; + } - if (k->producer.client) - rd_kafka_destroy(k->producer.client); + int startProducer() { + std::string errstr; - if (k->consumer.client) - rd_kafka_destroy(k->consumer.client); + auto conf_prod = createCommonConf(); + if (!conf_prod) + throw MemoryAllocationError(); - if (k->formatter) - delete k->formatter; + producer.client = std::unique_ptr( + RdKafka::Producer::create(conf_prod.get(), errstr)); + if (!producer.client) + throw RuntimeError("{}", errstr); - ret = pool_destroy(&k->pool); - if (ret) - return ret; + auto topic_conf = std::unique_ptr( + RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC)); + if (!topic_conf) + throw MemoryAllocationError(); - ret = queue_signalled_destroy(&k->queue); - if (ret) - return ret; + auto cr = topic_conf->set("acks", "all", errstr); + if (cr != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - if (k->produce) - free(k->produce); + producer.topic = std::unique_ptr(RdKafka::Topic::create( + producer.client.get(), produce, topic_conf.get(), errstr)); + if (!producer.topic) + throw MemoryAllocationError(); - if (k->consume) - free(k->consume); + logger->info("Connected producer to bootstrap server {}", server); - if (k->protocol) - free(k->protocol); + return 0; + } - if (k->client_id) - free(k->client_id); + int startConsumer() { + std::string errstr; - free(k->server); + auto conf_cons = createCommonConf(); + if (!conf_cons) + throw MemoryAllocationError(); - return 0; -} + auto cr = conf_cons->set("group.id", group_id, errstr); + if (cr != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); -int villas::node::kafka_start(NodeCompat *n) { - int ret; - char errstr[1024]; - auto *k = n->getData(); + consumer.client = std::unique_ptr( + RdKafka::Consumer::create(conf_cons.get(), errstr)); + if (!consumer.client) + throw MemoryAllocationError(); - rd_kafka_conf_t *rdkconf = rd_kafka_conf_new(); - if (!rdkconf) - throw MemoryAllocationError(); + consumer.topic = std::unique_ptr(RdKafka::Topic::create( + consumer.client.get(), consume, nullptr, errstr)); + if (!consumer.topic) + throw MemoryAllocationError(); - rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb); + consumer.queue = std::unique_ptr( + RdKafka::Queue::create(consumer.client.get())); + if (!consumer.queue) + throw MemoryAllocationError(); - ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; + consumer.eventFd = eventfd(0, 0); - ret = rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; + uint64_t incr = 1; + consumer.queue->io_event_enable(consumer.eventFd, &incr, sizeof(incr)); - ret = rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; + auto ec = consumer.client->start(consumer.topic.get(), 0, 0, + consumer.queue.get()); + if (ec != RdKafka::ErrorCode::ERR_NO_ERROR) + throw RuntimeError("Error subscribing to {} at {}: {}", consume, server, + RdKafka::err2str(ec)); - if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) { - ret = rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.ca, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; - } + logger->info("Subscribed consumer from bootstrap server {}", server); - if (!strcmp(k->protocol, "SASL_PLAINTEXT") || - !strcmp(k->protocol, "SASL_SSL")) { - ret = rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanisms, - errstr, sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; - - ret = rd_kafka_conf_set(rdkconf, "sasl.username", k->sasl.username, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; - - ret = rd_kafka_conf_set(rdkconf, "sasl.password", k->sasl.password, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; + return 0; } - if (k->produce) { - // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, - // so we will need to create a copy first - rd_kafka_conf_t *rdkconf_prod = rd_kafka_conf_dup(rdkconf); - if (!rdkconf_prod) - throw MemoryAllocationError(); - - k->producer.client = - rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf_prod, errstr, sizeof(errstr)); - if (!k->producer.client) - goto kafka_config_error; + std::unique_ptr createCommonConf() { + std::string errstr; - rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); - if (!topic_conf) + auto conf = std::unique_ptr( + RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); + if (!conf) throw MemoryAllocationError(); - ret = rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; - - k->producer.topic = - rd_kafka_topic_new(k->producer.client, k->produce, topic_conf); - if (!k->producer.topic) - throw MemoryAllocationError(); + auto ret = conf->set("event_cb", this, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - n->logger->info("Connected producer to bootstrap server {}", k->server); - } + ret = conf->set("client.id", client_id, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - if (k->consume) { - // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, - // so we will need to create a copy first - rd_kafka_conf_t *rdkconf_cons = rd_kafka_conf_dup(rdkconf); - if (!rdkconf_cons) - throw MemoryAllocationError(); + ret = conf->set("bootstrap.servers", server, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - rd_kafka_topic_partition_list_t *partitions = - rd_kafka_topic_partition_list_new(1); - if (!partitions) - throw MemoryAllocationError(); + ret = conf->set("security.protocol", protocol, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - rd_kafka_topic_partition_t *partition = - rd_kafka_topic_partition_list_add(partitions, k->consume, 0); - if (!partition) - throw RuntimeError("Failed to add new partition"); + if (protocol == "SASL_SSL" || protocol == "SSL") { + ret = conf->set("ssl.ca.location", ssl_ca, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); + } - ret = rd_kafka_conf_set(rdkconf_cons, "group.id", k->consumer.group_id, - errstr, sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; + if (protocol == "SASL_PLAINTEXT" || protocol == "SASL_SSL") { + ret = conf->set("sasl.mechanisms", sasl.mechanisms, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - k->consumer.client = - rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf_cons, errstr, sizeof(errstr)); - if (!k->consumer.client) - throw MemoryAllocationError(); + ret = conf->set("sasl.username", sasl.username, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - ret = rd_kafka_subscribe(k->consumer.client, partitions); - if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) - throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, - k->server, rd_kafka_err2str((rd_kafka_resp_err_t)ret)); + ret = conf->set("sasl.password", sasl.password, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); + } - n->logger->info("Subscribed consumer from bootstrap server {}", k->server); + return conf; } - // Add client to global list of kafka clients - // so that thread can call kafka loop for this client - list_push(&clients, n); - - rd_kafka_conf_destroy(rdkconf); +public: + KafkaNode(const uuid_t &id = {}, const std::string &name = "") + : Node(id, name), timeout(1000), client_id("villas-node"), producer({}), + consumer({.eventFd = -1}) {} - return 0; + virtual ~KafkaNode() {} -kafka_config_error: - rd_kafka_conf_destroy(rdkconf); + int prepare() override { + formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET); - throw RuntimeError("{}", errstr); - - return -1; -} + return Node::prepare(); + } -int villas::node::kafka_stop(NodeCompat *n) { - int ret; - auto *k = n->getData(); + int parse(json_t *json) override { + int ret = Node::parseCommon(json); + if (ret) + return ret; + + const char *svr; + const char *prod = nullptr; + const char *cons = nullptr; + const char *proto; + const char *cid = nullptr; + const char *gid = nullptr; + double to = -1; + + json_t *json_ssl = nullptr; + json_t *json_sasl = nullptr; + json_t *json_format = nullptr; + + json_error_t err; + ret = json_unpack_ex(json, &err, 0, + "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: o, s: s, " + "s?: F, s: s, s?: s, s?: o, s?: o }", + "out", "topic", &prod, "in", "topic", &cons, + "group_id", &gid, "format", &json_format, "server", + &svr, "timeout", &to, "protocol", &proto, "client_id", + &cid, "ssl", &json_ssl, "sasl", &json_sasl); + if (ret) + throw ConfigError(json, err, "node-config-node-kafka"); - if (k->producer.client) { - ret = rd_kafka_flush(k->producer.client, k->timeout * 1000); - if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) - n->logger->error("Failed to flush messages: {}", - rd_kafka_err2str((rd_kafka_resp_err_t)ret)); + server = svr; + protocol = proto; - /* If the output queue is still not empty there is an issue - * with producing messages to the clusters. */ - if (rd_kafka_outq_len(k->producer.client) > 0) - n->logger->warn("{} message(s) were not delivered", - rd_kafka_outq_len(k->producer.client)); - } + if (prod) + produce = prod; - // Unregister client from global kafka client list - // so that kafka loop is no longer invoked for this client - // important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect - list_remove_all(&clients, n); + if (cons) + consume = cons; - ret = queue_signalled_close(&k->queue); - if (ret) - return ret; + if (cid) + client_id = cid; - return 0; -} + if (gid) + group_id = gid; -int villas::node::kafka_type_start(villas::node::SuperNode *sn) { - int ret; + if (to >= 0) + timeout = std::chrono::milliseconds((int)(to * 1000)); - logger = Log::get("node:kafka"); + if (protocol != "SSL" && protocol != "PLAINTEXT" && + protocol != "SASL_SSL" && protocol != "SASL_PLAINTEXT") + throw ConfigError(json, "node-config-node-kafka-protocol", + "Invalid security protocol: {}", protocol); - ret = list_init(&clients); - if (ret) - goto kafka_error; + if (produce.empty() && consume.empty()) + throw ConfigError(json, "node-config-node-kafka", + "At least one topic has to be specified for node {}", + getName()); - // Start thread here to run kafka loop for registered clients - ret = pthread_create(&thread, nullptr, kafka_loop_thread, nullptr); - if (ret) - goto kafka_error; + if (json_ssl) { + const char *ca = nullptr; - return 0; + ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s }", "ca", &ca); + if (ret) + throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", + "Failed to parse SSL configuration of node {}", + getName()); -kafka_error: - logger->warn("Error initialazing node type kafka"); + if (ca) + ssl_ca = ca; + } - return ret; -} + if (json_sasl) { + const char *mechanisms; + const char *username; + const char *password; + + ret = json_unpack_ex(json_sasl, &err, 0, "{ s: s, s: s, s: s }", + "mechanisms", &mechanisms, "username", &username, + "password", &password); + if (ret) + throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", + "Failed to parse SASL configuration"); + + sasl.mechanisms = mechanisms; + sasl.username = username; + sasl.password = password; + } -int villas::node::kafka_type_stop() { - int ret; + // Format + formatter = std::unique_ptr( + json_format ? FormatFactory::make(json_format) + : FormatFactory::make("villas.binary")); + if (!formatter) + throw ConfigError(json_format, "node-config-node-kafka-format", + "Invalid format configuration"); - // Stop thread here that executes kafka loop - ret = pthread_cancel(thread); - if (ret) - return ret; + return 0; + } - logger->debug( - "Called pthread_cancel() on kafka communication management thread."); + const std::string &getDetails() override { + details = fmt::format("server={}, client_id={}, protocol={}", server, + client_id, protocol); - ret = pthread_join(thread, nullptr); - if (ret) - goto kafka_error; + if (!produce.empty()) + details += fmt::format(", produce={}", produce); - // When this is called the list of clients should be empty - if (list_length(&clients) > 0) - throw RuntimeError( - "List of kafka clients contains elements at time of destruction. Call " - "node_stop for each kafka node before stopping node type!"); + if (!consume.empty()) + details += fmt::format(", consume={}", consume); - ret = list_destroy(&clients, nullptr, false); - if (ret) - goto kafka_error; + return details; + } - return 0; + int start() override { + if (!produce.empty()) { + auto ret = startProducer(); + if (ret) + return ret; + } -kafka_error: - logger->warn("Error stoping node type kafka"); + if (!consume.empty()) { + auto ret = startConsumer(); + if (ret) + return ret; + } - return ret; -} + int ret = Node::start(); + if (!ret) + state = State::STARTED; -int villas::node::kafka_read(NodeCompat *n, struct Sample *const smps[], - unsigned cnt) { - int pulled; - auto *k = n->getData(); - struct Sample *smpt[cnt]; + return 0; + } - pulled = queue_signalled_pull_many(&k->queue, (void **)smpt, cnt); + int stop() override { + int ret = Node::stop(); + if (ret) + return ret; + + if (producer.client) { + auto ret = producer.client->flush(timeout.count()); + if (ret != RdKafka::ErrorCode::ERR_NO_ERROR) + logger->error("Failed to flush messages: {}", RdKafka::err2str(ret)); + + // If the output queue is still not empty there is an issue + // with producing messages to the clusters. + if (producer.client->outq_len() > 0) + logger->warn("{} message(s) were not delivered", + producer.client->outq_len()); + } - sample_copy_many(smps, smpt, pulled); - sample_decref_many(smpt, pulled); + return 0; + } - return pulled; -} + int reverse() override { + SWAP(produce, consume); -int villas::node::kafka_write(NodeCompat *n, struct Sample *const smps[], - unsigned cnt) { - int ret; - auto *k = n->getData(); + return 0; + } - size_t wbytes; + std::vector getPollFDs() override { + if (consumer.eventFd >= 0) + return {consumer.eventFd}; - char data[DEFAULT_FORMAT_BUFFER_LENGTH]; + return {}; + } - ret = k->formatter->sprint(data, sizeof(data), &wbytes, smps, cnt); - if (ret < 0) - return ret; + void event_cb(RdKafka::Event &event) override { + switch (event.type()) { + case RdKafka::Event::EVENT_ERROR: + logger->error("Kafka error: {}", event.str()); + break; + + case RdKafka::Event::EVENT_STATS: + logger->info("Kafka stats: {}", event.str()); + break; + + case RdKafka::Event::EVENT_LOG: + switch (event.severity()) { + case RdKafka::Event::EVENT_SEVERITY_DEBUG: + logger->debug("{}", event.str()); + break; + + case RdKafka::Event::EVENT_SEVERITY_NOTICE: + case RdKafka::Event::EVENT_SEVERITY_INFO: + logger->info("{}", event.str()); + break; + + case RdKafka::Event::EVENT_SEVERITY_ALERT: + case RdKafka::Event::EVENT_SEVERITY_WARNING: + logger->warn("{}", event.str()); + break; + + case RdKafka::Event::EVENT_SEVERITY_ERROR: + case RdKafka::Event::EVENT_SEVERITY_CRITICAL: + case RdKafka::Event::EVENT_SEVERITY_EMERG: + logger->error("{}", event.str()); + break; + } - if (k->produce) { - ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, - RD_KAFKA_MSG_F_COPY, data, wbytes, NULL, 0, NULL); + break; - if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) { - n->logger->warn("Publish failed"); - return -abs(ret); + default: + logger->info("Kafka event {}: {}", (int)event.type(), event.str()); + break; } - } else - n->logger->warn( - "No produce possible because no produce topic is configured"); - - return cnt; -} - -int villas::node::kafka_poll_fds(NodeCompat *n, int fds[]) { - auto *k = n->getData(); - - fds[0] = queue_signalled_fd(&k->queue); - - return 1; -} - -static NodeCompatType p; - -__attribute__((constructor(110))) static void register_plugin() { - p.name = "kafka"; - p.description = "Kafka event message streaming (rdkafka)"; - p.vectorize = 0; - p.size = sizeof(struct kafka); - p.type.start = kafka_type_start; - p.type.stop = kafka_type_stop; - p.destroy = kafka_destroy; - p.prepare = kafka_prepare; - p.parse = kafka_parse; - p.prepare = kafka_prepare; - p.print = kafka_print; - p.init = kafka_init; - p.destroy = kafka_destroy; - p.start = kafka_start; - p.stop = kafka_stop; - p.read = kafka_read; - p.write = kafka_write; - p.reverse = kafka_reverse; - p.poll_fds = kafka_poll_fds; - - static NodeCompatFactory ncp(&p); -} + } +}; + +// Register node +static char n[] = "kafka"; +static char d[] = "Kafka event message streaming (rdkafka)"; +static NodePlugin + p; diff --git a/lib/nodes/kafka_old.cpp b/lib/nodes/kafka_old.cpp new file mode 100644 index 000000000..1c56e1e5f --- /dev/null +++ b/lib/nodes/kafka_old.cpp @@ -0,0 +1,600 @@ +/* Node type: kafka. + * + * Author: Juan Pablo Noreña + * SPDX-FileCopyrightText: 2021 Universidad Nacional de Colombia + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include +#include + +#include +#include +#include +#include + +using namespace villas; +using namespace villas::node; +using namespace villas::utils; + +// Each process has a list of clients for which a thread invokes the kafka loop +static struct List clients; +static pthread_t thread; +static Logger logger; + +static void kafka_logger_cb(const rd_kafka_t *rk, int level, const char *fac, + const char *buf) { + + switch (level) { + case LOG_EMERG: + case LOG_CRIT: + case LOG_ERR: + logger->error("{}: {}", fac, buf); + break; + + case LOG_ALERT: + case LOG_WARNING: + logger->warn("{}: {}", fac, buf); + break; + + case LOG_DEBUG: + logger->debug("{}: {}", fac, buf); + break; + + case LOG_NOTICE: + case LOG_INFO: + default: + logger->info("{}: {}", fac, buf); + break; + } +} + +static void kafka_message_cb(void *ctx, const rd_kafka_message_t *msg) { + int ret; + auto *n = (NodeCompat *)ctx; + auto *k = n->getData(); + struct Sample *smps[n->in.vectorize]; + + n->logger->debug("Received a message of {} bytes from broker {}", msg->len, + k->server); + + ret = sample_alloc_many(&k->pool, smps, n->in.vectorize); + if (ret <= 0) { + n->logger->warn("Pool underrun in consumer"); + return; + } + + ret = k->formatter->sscan((char *)msg->payload, msg->len, nullptr, smps, + n->in.vectorize); + if (ret < 0) { + n->logger->warn("Received an invalid message"); + n->logger->warn(" Payload: {}", (char *)msg->payload); + return; + } + + if (ret == 0) { + n->logger->debug("Skip empty message"); + sample_decref_many(smps, n->in.vectorize); + return; + } + + ret = queue_signalled_push_many(&k->queue, (void **)smps, n->in.vectorize); + if (ret < (int)n->in.vectorize) + n->logger->warn("Failed to enqueue samples"); +} + +static void *kafka_loop_thread(void *ctx) { + int ret; + + // Set the cancel type of this thread to async + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + if (ret != 0) + throw RuntimeError("Unable to set cancel type of Kafka communication " + "thread to asynchronous."); + + while (true) { + for (unsigned i = 0; i < list_length(&clients); i++) { + auto *n = (NodeCompat *)list_at(&clients, i); + auto *k = n->getData(); + + // Execute kafka loop for this client + if (k->consumer.client) { + rd_kafka_message_t *msg = + rd_kafka_consumer_poll(k->consumer.client, k->timeout * 1000); + if (msg) { + kafka_message_cb((void *)n, msg); + rd_kafka_message_destroy(msg); + } + } + } + } + + return nullptr; +} + +int villas::node::kafka_reverse(NodeCompat *n) { + auto *k = n->getData(); + + SWAP(k->produce, k->consume); + + return 0; +} + +int villas::node::kafka_init(NodeCompat *n) { + auto *k = n->getData(); + + // Default values + k->server = nullptr; + k->protocol = nullptr; + k->produce = nullptr; + k->consume = nullptr; + k->client_id = nullptr; + k->timeout = 1.0; + + k->consumer.client = nullptr; + k->consumer.group_id = nullptr; + k->producer.client = nullptr; + k->producer.topic = nullptr; + + k->sasl.mechanisms = nullptr; + k->sasl.username = nullptr; + k->sasl.password = nullptr; + + k->ssl.ca = nullptr; + + k->formatter = nullptr; + + return 0; +} + +int villas::node::kafka_parse(NodeCompat *n, json_t *json) { + int ret; + auto *k = n->getData(); + + const char *server; + const char *produce = nullptr; + const char *consume = nullptr; + const char *protocol; + const char *client_id = "villas-node"; + const char *group_id = nullptr; + + json_error_t err; + json_t *json_ssl = nullptr; + json_t *json_sasl = nullptr; + json_t *json_format = nullptr; + + ret = json_unpack_ex(json, &err, 0, + "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: o, s: s, " + "s?: F, s: s, s?: s, s?: o, s?: o }", + "out", "produce", &produce, "in", "consume", &consume, + "group_id", &group_id, "format", &json_format, "server", + &server, "timeout", &k->timeout, "protocol", &protocol, + "client_id", &client_id, "ssl", &json_ssl, "sasl", + &json_sasl); + if (ret) + throw ConfigError(json, err, "node-config-node-kafka"); + + k->server = strdup(server); + k->produce = produce ? strdup(produce) : nullptr; + k->consume = consume ? strdup(consume) : nullptr; + k->protocol = strdup(protocol); + k->client_id = strdup(client_id); + k->consumer.group_id = group_id ? strdup(group_id) : nullptr; + + if (strcmp(protocol, "SSL") && strcmp(protocol, "PLAINTEXT") && + strcmp(protocol, "SASL_SSL") && strcmp(protocol, "SASL_PLAINTEXT")) + throw ConfigError(json, "node-config-node-kafka-protocol", + "Invalid security protocol: {}", protocol); + + if (!k->produce && !k->consume) + throw ConfigError(json, "node-config-node-kafka", + "At least one topic has to be specified for node {}", + n->getName()); + + if (json_ssl) { + const char *ca; + + ret = json_unpack_ex(json_ssl, &err, 0, "{ s: s }", "ca", &ca); + if (ret) + throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", + "Failed to parse SSL configuration of node {}", + n->getName()); + + k->ssl.ca = strdup(ca); + } + + if (json_sasl) { + const char *mechanisms; + const char *username; + const char *password; + + ret = json_unpack_ex(json_sasl, &err, 0, "{ s: s, s: s, s: s }", + "mechanisms", &mechanisms, "username", &username, + "password", &password); + if (ret) + throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", + "Failed to parse SASL configuration"); + + k->sasl.mechanisms = strdup(mechanisms); + k->sasl.username = strdup(username); + k->sasl.password = strdup(password); + } + + // Format + if (k->formatter) + delete k->formatter; + k->formatter = json_format ? FormatFactory::make(json_format) + : FormatFactory::make("villas.binary"); + if (!k->formatter) + throw ConfigError(json_format, "node-config-node-kafka-format", + "Invalid format configuration"); + + return 0; +} + +int villas::node::kafka_prepare(NodeCompat *n) { + int ret; + auto *k = n->getData(); + + k->formatter->start(n->getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET); + + ret = pool_init(&k->pool, 1024, + SAMPLE_LENGTH(n->getInputSignals(false)->size())); + if (ret) + return ret; + + ret = queue_signalled_init(&k->queue, 1024); + if (ret) + return ret; + + return 0; +} + +char *villas::node::kafka_print(NodeCompat *n) { + auto *k = n->getData(); + + char *buf = nullptr; + + strcatf(&buf, "bootstrap.server=%s, client.id=%s, security.protocol=%s", + k->server, k->client_id, k->protocol); + + // Only show if not default + if (k->produce) + strcatf(&buf, ", out.produce=%s", k->produce); + + if (k->consume) + strcatf(&buf, ", in.consume=%s", k->consume); + + return buf; +} + +int villas::node::kafka_destroy(NodeCompat *n) { + int ret; + auto *k = n->getData(); + + if (k->producer.client) + rd_kafka_destroy(k->producer.client); + + if (k->consumer.client) + rd_kafka_destroy(k->consumer.client); + + if (k->formatter) + delete k->formatter; + + ret = pool_destroy(&k->pool); + if (ret) + return ret; + + ret = queue_signalled_destroy(&k->queue); + if (ret) + return ret; + + if (k->produce) + free(k->produce); + + if (k->consume) + free(k->consume); + + if (k->protocol) + free(k->protocol); + + if (k->client_id) + free(k->client_id); + + free(k->server); + + return 0; +} + +int villas::node::kafka_start(NodeCompat *n) { + int ret; + char errstr[1024]; + auto *k = n->getData(); + + rd_kafka_conf_t *rdkconf = rd_kafka_conf_new(); + if (!rdkconf) + throw MemoryAllocationError(); + + rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb); + + ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + ret = rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + ret = rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) { + ret = rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.ca, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + } + + if (!strcmp(k->protocol, "SASL_PLAINTEXT") || + !strcmp(k->protocol, "SASL_SSL")) { + ret = rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanisms, + errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + ret = rd_kafka_conf_set(rdkconf, "sasl.username", k->sasl.username, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + ret = rd_kafka_conf_set(rdkconf, "sasl.password", k->sasl.password, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + } + + if (k->produce) { + // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, + // so we will need to create a copy first + rd_kafka_conf_t *rdkconf_prod = rd_kafka_conf_dup(rdkconf); + if (!rdkconf_prod) + throw MemoryAllocationError(); + + k->producer.client = + rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf_prod, errstr, sizeof(errstr)); + if (!k->producer.client) + goto kafka_config_error; + + rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); + if (!topic_conf) + throw MemoryAllocationError(); + + ret = rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + k->producer.topic = + rd_kafka_topic_new(k->producer.client, k->produce, topic_conf); + if (!k->producer.topic) + throw MemoryAllocationError(); + + n->logger->info("Connected producer to bootstrap server {}", k->server); + } + + if (k->consume) { + // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, + // so we will need to create a copy first + rd_kafka_conf_t *rdkconf_cons = rd_kafka_conf_dup(rdkconf); + if (!rdkconf_cons) + throw MemoryAllocationError(); + + rd_kafka_topic_partition_list_t *partitions = + rd_kafka_topic_partition_list_new(1); + if (!partitions) + throw MemoryAllocationError(); + + rd_kafka_topic_partition_t *partition = + rd_kafka_topic_partition_list_add(partitions, k->consume, 0); + if (!partition) + throw RuntimeError("Failed to add new partition"); + + ret = rd_kafka_conf_set(rdkconf_cons, "group.id", k->consumer.group_id, + errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + k->consumer.client = + rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf_cons, errstr, sizeof(errstr)); + if (!k->consumer.client) + throw MemoryAllocationError(); + + ret = rd_kafka_subscribe(k->consumer.client, partitions); + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) + throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, + k->server, rd_kafka_err2str((rd_kafka_resp_err_t)ret)); + + n->logger->info("Subscribed consumer from bootstrap server {}", k->server); + } + + // Add client to global list of kafka clients + // so that thread can call kafka loop for this client + list_push(&clients, n); + + rd_kafka_conf_destroy(rdkconf); + + return 0; + +kafka_config_error: + rd_kafka_conf_destroy(rdkconf); + + throw RuntimeError("{}", errstr); + + return -1; +} + +int villas::node::kafka_stop(NodeCompat *n) { + int ret; + auto *k = n->getData(); + + if (k->producer.client) { + ret = rd_kafka_flush(k->producer.client, k->timeout * 1000); + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) + n->logger->error("Failed to flush messages: {}", + rd_kafka_err2str((rd_kafka_resp_err_t)ret)); + + /* If the output queue is still not empty there is an issue + * with producing messages to the clusters. */ + if (rd_kafka_outq_len(k->producer.client) > 0) + n->logger->warn("{} message(s) were not delivered", + rd_kafka_outq_len(k->producer.client)); + } + + // Unregister client from global kafka client list + // so that kafka loop is no longer invoked for this client + // important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect + list_remove_all(&clients, n); + + ret = queue_signalled_close(&k->queue); + if (ret) + return ret; + + return 0; +} + +int villas::node::kafka_type_start(villas::node::SuperNode *sn) { + int ret; + + logger = Log::get("node:kafka"); + + ret = list_init(&clients); + if (ret) + goto kafka_error; + + // Start thread here to run kafka loop for registered clients + ret = pthread_create(&thread, nullptr, kafka_loop_thread, nullptr); + if (ret) + goto kafka_error; + + return 0; + +kafka_error: + logger->warn("Error initialazing node type kafka"); + + return ret; +} + +int villas::node::kafka_type_stop() { + int ret; + + // Stop thread here that executes kafka loop + ret = pthread_cancel(thread); + if (ret) + return ret; + + logger->debug( + "Called pthread_cancel() on kafka communication management thread."); + + ret = pthread_join(thread, nullptr); + if (ret) + goto kafka_error; + + // When this is called the list of clients should be empty + if (list_length(&clients) > 0) + throw RuntimeError( + "List of kafka clients contains elements at time of destruction. Call " + "node_stop for each kafka node before stopping node type!"); + + ret = list_destroy(&clients, nullptr, false); + if (ret) + goto kafka_error; + + return 0; + +kafka_error: + logger->warn("Error stoping node type kafka"); + + return ret; +} + +int villas::node::kafka_read(NodeCompat *n, struct Sample *const smps[], + unsigned cnt) { + int pulled; + auto *k = n->getData(); + struct Sample *smpt[cnt]; + + pulled = queue_signalled_pull_many(&k->queue, (void **)smpt, cnt); + + sample_copy_many(smps, smpt, pulled); + sample_decref_many(smpt, pulled); + + return pulled; +} + +int villas::node::kafka_write(NodeCompat *n, struct Sample *const smps[], + unsigned cnt) { + int ret; + auto *k = n->getData(); + + size_t wbytes; + + char data[DEFAULT_FORMAT_BUFFER_LENGTH]; + + ret = k->formatter->sprint(data, sizeof(data), &wbytes, smps, cnt); + if (ret < 0) + return ret; + + if (k->produce) { + ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, data, wbytes, NULL, 0, NULL); + + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) { + n->logger->warn("Publish failed"); + return -abs(ret); + } + } else + n->logger->warn( + "No produce possible because no produce topic is configured"); + + return cnt; +} + +int villas::node::kafka_poll_fds(NodeCompat *n, int fds[]) { + auto *k = n->getData(); + + fds[0] = queue_signalled_fd(&k->queue); + + return 1; +} + +static NodeCompatType p; + +__attribute__((constructor(110))) static void register_plugin() { + p.name = "kafka"; + p.description = "Kafka event message streaming (rdkafka)"; + p.vectorize = 0; + p.size = sizeof(struct kafka); + p.type.start = kafka_type_start; + p.type.stop = kafka_type_stop; + p.destroy = kafka_destroy; + p.prepare = kafka_prepare; + p.parse = kafka_parse; + p.prepare = kafka_prepare; + p.print = kafka_print; + p.init = kafka_init; + p.destroy = kafka_destroy; + p.start = kafka_start; + p.stop = kafka_stop; + p.read = kafka_read; + p.write = kafka_write; + p.reverse = kafka_reverse; + p.poll_fds = kafka_poll_fds; + + static NodeCompatFactory ncp(&p); +} diff --git a/lib/nodes/loopback.cpp b/lib/nodes/loopback.cpp index 5cd411afa..b48e0f119 100644 --- a/lib/nodes/loopback.cpp +++ b/lib/nodes/loopback.cpp @@ -94,11 +94,13 @@ const std::string &LoopbackNode::getDetails() { } int LoopbackNode::parse(json_t *json) { + int ret = Node::parseCommon(json); + if (ret) + return ret; + const char *mode_str = nullptr; json_error_t err; - int ret; - ret = json_unpack_ex(json, &err, 0, "{ s?: i, s?: s }", "queuelen", &queuelen, "mode", &mode_str); if (ret) @@ -120,7 +122,7 @@ int LoopbackNode::parse(json_t *json) { "Unknown mode '{}'", mode_str); } - return Node::parse(json); + return 0; } // Register node diff --git a/lib/nodes/modbus.cpp b/lib/nodes/modbus.cpp index 90ee4a5a5..d55a5b76d 100644 --- a/lib/nodes/modbus.cpp +++ b/lib/nodes/modbus.cpp @@ -818,7 +818,8 @@ unsigned int ModbusNode::parseMappings(std::vector &mappings, } int ModbusNode::parse(json_t *json) { - if (auto ret = Node::parse(json)) + int ret = Node::parseCommon(json); + if (ret) return ret; json_error_t err; diff --git a/lib/nodes/opal_async.cpp b/lib/nodes/opal_async.cpp index f3a5dcee6..4c04e2c44 100644 --- a/lib/nodes/opal_async.cpp +++ b/lib/nodes/opal_async.cpp @@ -139,12 +139,13 @@ void LogSink::sink_it_(const spdlog::details::log_msg &msg) { } int OpalAsyncNode::parse(json_t *json) { - int ret, rply = -1, id = -1; - - ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; + int rply = -1; + int id = -1; + json_error_t err; ret = json_unpack_ex(json, &err, 0, "{ s: i, s?: { s?: b } }", "id", &id, "in", "reply", &rply); diff --git a/lib/nodes/opal_orchestra.cpp b/lib/nodes/opal_orchestra.cpp index cba9a456e..715f916f7 100644 --- a/lib/nodes/opal_orchestra.cpp +++ b/lib/nodes/opal_orchestra.cpp @@ -269,16 +269,7 @@ class OpalOrchestraNode : public Node { // Overwrite the data definition file (DDF). bool dataDefinitionFileOverwrite; - // Overwrite the data definition file (DDF) and terminate VILLASnode. - bool dataDefinitionFileWriteOnly; - int _read(struct Sample *smps[], unsigned cnt) override { - if (dataDefinitionFileWriteOnly) { - logger->warn("Stopping node after writing the DDF file"); - setState(State::STOPPING); - return -1; - } - assert(cnt == 1); if (!domain.synchronous) { @@ -314,12 +305,6 @@ class OpalOrchestraNode : public Node { } int _write(struct Sample *smps[], unsigned cnt) override { - if (dataDefinitionFileWriteOnly) { - logger->warn("Stopping node after writing the DDF file"); - setState(State::STOPPING); - return -1; - } - assert(cnt == 1); try { @@ -348,96 +333,93 @@ class OpalOrchestraNode : public Node { unsigned int key = 0) : Node(id, name), task(), connectionKey(key), status(nullptr), domain(), subscribeMappings(), publishMappings(), rate(1), connectTimeout(5), - skipWaitToGo(false), dataDefinitionFileOverwrite(false), - dataDefinitionFileWriteOnly(false) {} + skipWaitToGo(false), dataDefinitionFileOverwrite(false) {} - void parseSignals(json_t *json, SignalList::Ptr signals, DataSet &dataSet, - std::unordered_map, - OpalOrchestraMapping> &mappings) { - if (!json_is_array(json)) { - throw ConfigError(json, "node-config-node-opal-orchestra-signals", - "Signals must be an array"); - } + Signal::Ptr parseSignal(json_t *json_signal, NodeDirection::Direction dir) { + auto signal = Signal::fromJson(json_signal); - size_t i; - json_t *json_signal; - json_error_t err; + DataSet &dataSet = + dir == NodeDirection::Direction::IN ? domain.publish : domain.subscribe; + std::unordered_map, OpalOrchestraMapping> + &mappings = dir == NodeDirection::Direction::IN ? publishMappings + : subscribeMappings; - json_array_foreach(json, i, json_signal) { - auto signal = signals->getByIndex(i); + const char *nme = nullptr; + const char *typ = nullptr; + int oi = -1; - const char *nme = nullptr; - const char *typ = nullptr; - int oi = -1; + json_error_t err; + auto ret = json_unpack_ex(json_signal, &err, 0, "{ s?: s, s?: s, s?: i }", + "orchestra_name", &nme, "orchestra_type", &typ, + "orchestra_index", &oi); + if (ret) { + throw ConfigError(json_signal, err, + "node-config-node-opal-orchestra-signals"); + } - auto ret = json_unpack_ex(json_signal, &err, 0, "{ s?: s, s?: s, s?: i }", - "orchestra_name", &nme, "orchestra_type", &typ, - "orchestra_index", &oi); - if (ret) { - throw ConfigError(json_signal, err, - "node-config-node-opal-orchestra-signals"); - } + std::optional orchestraIdx; - std::optional orchestraIdx; + if (oi >= 0) { + orchestraIdx = oi; + } - if (oi >= 0) { - orchestraIdx = oi; - } + auto defaultValue = + signal->init.cast(signal->type, node::SignalType::FLOAT); - auto defaultValue = - signal->init.cast(signal->type, node::SignalType::FLOAT); + auto orchestraType = typ ? orchestra::signalTypeFromString(typ) + : orchestra::toOrchestraSignalType(signal->type); - auto orchestraType = typ ? orchestra::signalTypeFromString(typ) - : orchestra::toOrchestraSignalType(signal->type); + auto orchestraName = nme ? nme : signal->name; - auto orchestraName = nme ? nme : signal->name; + bool inserted = false; + auto item = dataSet.upsertItem(orchestraName, inserted); - bool inserted = false; - auto item = dataSet.upsertItem(orchestraName, inserted); + if (inserted) { + item->type = orchestraType; + item->defaultValue = defaultValue.f; - if (inserted) { - item->type = orchestraType; - item->defaultValue = defaultValue.f; + mappings.emplace(item, OpalOrchestraMapping(item, orchestraName)); + } - mappings.emplace(item, OpalOrchestraMapping(item, orchestraName)); - } + auto &mapping = mappings.at(item); + mapping.addSignal(signal, orchestraIdx); - auto &mapping = mappings.at(item); - mapping.addSignal(signal, orchestraIdx); - } + return signal; } int parse(json_t *json) override { - int ret = Node::parse(json); - if (ret) - return ret; + domain = Domain(); + publishMappings.clear(); + subscribeMappings.clear(); + + int reti = parseCommon( + json, [&](json_t *json_signal, NodeDirection::Direction dir) { + return parseSignal(json_signal, dir); + }); + if (reti) + return reti; + int sw = -1; + int ow = -1; + int sy = -1; + int sts = -1; const char *dn = nullptr; const char *ddf = nullptr; - json_t *json_in_signals = nullptr; - json_t *json_out_signals = nullptr; json_t *json_connection = nullptr; json_t *json_connect_timeout = nullptr; json_t *json_flag_delay = nullptr; json_t *json_flag_delay_tool = nullptr; - int sw = -1; - int ow = -1; - int owo = -1; - int sy = -1; - int sts = -1; - json_error_t err; - ret = json_unpack_ex( + auto ret = json_unpack_ex( json, &err, 0, "{ s: s, s?: b, s?: b, s?: o, s?: s, s?: o, s?: o, s?: o, s?: b, s?: " - "b, s?: b, s?: F, s?: { s?: o }, s?: { s?: o } }", + "b, s?: F }", "domain", &dn, "synchronous", &sy, "states", &sts, "connection", &json_connection, "ddf", &ddf, "connect_timeout", &json_connect_timeout, "flag_delay", &json_flag_delay, "flag_delay_tool", &json_flag_delay_tool, "skip_wait_to_go", &sw, "ddf_overwrite", &ow, - "ddf_overwrite_only", &owo, "rate", &rate, "in", "signals", - &json_in_signals, "out", "signals", &json_out_signals); + "rate", &rate); if (ret) { throw ConfigError(json, err, "node-config-node-opal-orchestra"); } @@ -464,10 +446,6 @@ class OpalOrchestraNode : public Node { dataDefinitionFileOverwrite = ow > 0; } - if (owo >= 0) { - dataDefinitionFileWriteOnly = owo > 0; - } - if (json_connect_timeout) { connectTimeout = parse_duration(json_connect_timeout); @@ -486,16 +464,6 @@ class OpalOrchestraNode : public Node { domain.connection = Connection::fromJson(json_connection); } - if (json_in_signals) { - parseSignals(json_in_signals, in.getSignals(false), domain.publish, - publishMappings); - } - - if (json_out_signals) { - parseSignals(json_out_signals, out.getSignals(false), domain.subscribe, - subscribeMappings); - } - return 0; } @@ -531,8 +499,7 @@ class OpalOrchestraNode : public Node { int prepare() override { // Write DDF. - if (dataDefinitionFilename && - (dataDefinitionFileOverwrite || dataDefinitionFileWriteOnly)) { + if (dataDefinitionFilename && dataDefinitionFileOverwrite) { // TODO: Possibly merge Orchestra domains from all nodes into one DDF. auto ddf = DataDefinitionFile(); @@ -541,10 +508,6 @@ class OpalOrchestraNode : public Node { logger->info("Wrote Orchestra Data Definition file (DDF) to '{}'", *dataDefinitionFilename); - - if (dataDefinitionFileWriteOnly) { - return Node::prepare(); - } } logger->debug("Connecting to Orchestra framework: domain={}, ddf={}, " @@ -555,11 +518,15 @@ class OpalOrchestraNode : public Node { RTConnectionLockGuard guard(connectionKey); - auto ret = - dataDefinitionFilename - ? RTConnectWithFile(dataDefinitionFilename->c_str(), - domain.name.c_str(), connectTimeout.count()) - : RTConnect(domain.name.c_str(), connectTimeout.count()); + auto ret = RTSetSkipWaitToGoAtConnection(skipWaitToGo); + if (ret != RTAPI_SUCCESS) { + throw RTError(ret, "Failed to check ready to go"); + } + + ret = dataDefinitionFilename + ? RTConnectWithFile(dataDefinitionFilename->c_str(), + domain.name.c_str(), connectTimeout.count()) + : RTConnect(domain.name.c_str(), connectTimeout.count()); if (ret != RTAPI_SUCCESS) { throw RTError(ret, "Failed to connect to Orchestra framework"); } @@ -585,11 +552,6 @@ class OpalOrchestraNode : public Node { } } - ret = RTSetSkipWaitToGoAtConnection(skipWaitToGo); - if (ret != RTAPI_SUCCESS) { - throw RTError(ret, "Failed to check ready to go"); - } - if (std::shared_ptr c = std::dynamic_pointer_cast(domain.connection)) { ret = RTSetupRefMemRemoteConnection(c->name.c_str(), c->pciIndex); @@ -623,14 +585,8 @@ class OpalOrchestraNode : public Node { } int start() override { - if (dataDefinitionFileWriteOnly) { - return Node::start(); - } - RTConnectionLockGuard guard(connectionKey); - RTWaitReadyToGo(); - if (!domain.synchronous) { task.setRate(rate); } diff --git a/lib/nodes/opendss.cpp b/lib/nodes/opendss.cpp index b9fcb107e..8757453aa 100644 --- a/lib/nodes/opendss.cpp +++ b/lib/nodes/opendss.cpp @@ -133,8 +133,7 @@ void OpenDSS::parseData(json_t *json, bool in) { } int OpenDSS::parse(json_t *json) { - - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/nodes/signal.cpp b/lib/nodes/signal.cpp index d5ce3fb24..3713ee68d 100644 --- a/lib/nodes/signal.cpp +++ b/lib/nodes/signal.cpp @@ -195,15 +195,15 @@ int SignalNode::prepare() { } int SignalNode::parse(json_t *json) { - int r = -1, m = -1, ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; - json_error_t err; - - size_t i; - json_t *json_signals, *json_signal; + int r = -1; + int m = -1; + json_t *json_signals; + json_error_t err; ret = json_unpack_ex(json, &err, 0, "{ s?: b, s?: i, s?: F, s?: b, s: { s: o } }", "realtime", &r, "limit", &limit, "rate", &rate, @@ -218,7 +218,11 @@ int SignalNode::parse(json_t *json) { monitor_missed = m != 0; signals.clear(); + unsigned j = 0; + + size_t i; + json_t *json_signal; json_array_foreach(json_signals, i, json_signal) { auto sig = SignalNodeSignal(json_signal); diff --git a/lib/nodes/webrtc.cpp b/lib/nodes/webrtc.cpp index e89026817..e28943d96 100644 --- a/lib/nodes/webrtc.cpp +++ b/lib/nodes/webrtc.cpp @@ -48,7 +48,7 @@ WebRTCNode::~WebRTCNode() { } int WebRTCNode::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/signal.cpp b/lib/signal.cpp index 875c2b33a..02cc3b405 100644 --- a/lib/signal.cpp +++ b/lib/signal.cpp @@ -123,3 +123,13 @@ bool Signal::isNext(const Signal &sig) { return isNextName(name, sig.name); } + +Signal::Ptr Signal::fromJson(json_t *json) { + auto signal = std::make_shared(); + + auto ret = signal->parse(json); + if (ret) + return nullptr; + + return signal; +} diff --git a/lib/signal_list.cpp b/lib/signal_list.cpp index dd1359973..6ef977521 100644 --- a/lib/signal_list.cpp +++ b/lib/signal_list.cpp @@ -17,65 +17,109 @@ using namespace villas; using namespace villas::node; using namespace villas::utils; -int SignalList::parse(json_t *json) { - int ret; +SignalList::SignalList(json_t *json_signals, + std::function parse_signal) { + parse(json_signals, parse_signal); +} - if (!json_is_array(json)) - return -1; +SignalList::SignalList(unsigned len, enum SignalType typ) { + auto typ_str = signalTypeToString(typ); - size_t i; - json_t *json_signal; - json_array_foreach(json, i, json_signal) { - auto sig = std::make_shared(); - if (!sig) - throw MemoryAllocationError(); + auto *json_signals = json_pack("{ s: s, s: s, s: i }", "name", "signal", + "type", typ_str.c_str(), "count", len); - ret = sig->parse(json_signal); - if (ret) - return ret; + parse(json_signals); +} - push_back(sig); - } +SignalList::SignalList(std::string_view dt) { + json_t *json_signals = json_array(); - return 0; -} + int i = 0; + char *e; -SignalList::SignalList(unsigned len, enum SignalType typ) { - char name[32]; + auto *dtc = dt.data(); - for (unsigned i = 0; i < len; i++) { - snprintf(name, sizeof(name), "signal%u", i); + for (const char *t = dtc; *t; t = e + 1) { + auto len = strtoul(t, &e, 10); + if (t == e) + len = 1; - auto sig = std::make_shared(name, "", typ); - if (!sig) + auto name = fmt::format("signal_{}", i++); + + auto typ = signalTypeFromFormatString(*e); + if (typ == SignalType::INVALID) throw RuntimeError("Failed to create signal list"); - push_back(sig); + auto typ_str = signalTypeToString(typ); + + auto *json_signal = json_pack("{ s: s, s: s, s: i }", "name", name.c_str(), + "type", typ_str.c_str(), "count", len); + + json_array_append_new(json_signals, json_signal); } + + parse(json_signals); } -SignalList::SignalList(const char *dt) { - int len, i = 0; - char name[32], *e; - enum SignalType typ; +void SignalList::parse(json_t *json_signals, + std::function parse_signal) { + clear(); - for (const char *t = dt; *t; t = e + 1) { - len = strtoul(t, &e, 10); - if (t == e) - len = 1; + if (json_is_string(json_signals)) { + SignalList(json_string_value(json_signals)); + } else if (json_is_object(json_signals)) { + auto *json_tmp = json_signals; - typ = signalTypeFromFormatString(*e); - if (typ == SignalType::INVALID) - throw RuntimeError("Failed to create signal list"); + json_signals = json_array(); + json_array_append_new(json_signals, json_tmp); + } else if (!json_is_array(json_signals)) { + throw ConfigError(json_signals, "node-config-node-signals", + "Invalid signal list"); + } + + size_t i; + json_t *json_signal; + json_array_foreach(json_signals, i, json_signal) { + if (!json_is_object(json_signal)) { + throw ConfigError(json_signal, + "node-config-node-signal" + "Signal definitions must be a JSON object"); + } + + std::string baseName = "signal"; + bool appendIndex = false; - for (int j = 0; j < len; j++) { - snprintf(name, sizeof(name), "signal%d", i++); + int count = 1; + const char *nme = nullptr; + + int ret = json_unpack(json_signal, "{ s?: i, s?: s }", "count", &count, + "name", &nme); + if (ret) { + throw ConfigError(json_signal, "node-config-node-signal", + "Failed to parse signal definition"); + } + + if (count > 1) { + json_object_del(json_signal, "count"); + appendIndex = true; + } + + if (nme) { + baseName = nme; + } + + for (int j = 0; j < count; j++) { + if (appendIndex) { + auto name = fmt::format("{}_{}", baseName, j); + json_object_set_new(json_signal, "name", json_string(name.c_str())); + } - auto sig = std::make_shared(name, "", typ); - if (!sig) - throw RuntimeError("Failed to create signal list"); + auto signal = parse_signal(json_signal); + if (!signal) + throw ConfigError(json_signal, "node-config-node-signal", + "Failed to parse signal definition"); - push_back(sig); + push_back(signal); } } } @@ -122,7 +166,7 @@ json_t *SignalList::toJson() const { Signal::Ptr SignalList::getByIndex(unsigned idx) { return this->at(idx); } -int SignalList::getIndexByName(const std::string &name) { +int SignalList::getIndexByName(std::string_view name) { unsigned i = 0; for (auto s : *this) { if (name == s->name) @@ -134,7 +178,7 @@ int SignalList::getIndexByName(const std::string &name) { return -1; } -Signal::Ptr SignalList::getByName(const std::string &name) { +Signal::Ptr SignalList::getByName(std::string_view name) { for (auto s : *this) { if (name == s->name) return s; diff --git a/python/villas/node/villas_pb2.py b/python/villas/node/villas_pb2.py index b764662c0..a3aa59453 100644 --- a/python/villas/node/villas_pb2.py +++ b/python/villas/node/villas_pb2.py @@ -22,6 +22,8 @@ _sym_db = _symbol_database.Default() + + DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cvillas.proto\x12\x0bvillas.node\"/\n\x07Message\x12$\n\x07samples\x18\x01 \x03(\x0b\x32\x13.villas.node.Sample\"\xfe\x01\n\x06Sample\x12,\n\x04type\x18\x01 \x02(\x0e\x32\x18.villas.node.Sample.Type:\x04\x44\x41TA\x12\x10\n\x08sequence\x18\x02 \x01(\x04\x12)\n\tts_origin\x18\x03 \x01(\x0b\x32\x16.villas.node.Timestamp\x12+\n\x0bts_received\x18\x04 \x01(\x0b\x32\x16.villas.node.Timestamp\x12\x11\n\tnew_frame\x18\x05 \x01(\x08\x12\"\n\x06values\x18\x64 \x03(\x0b\x32\x12.villas.node.Value\"%\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\t\n\x05START\x10\x02\x12\x08\n\x04STOP\x10\x03\"&\n\tTimestamp\x12\x0b\n\x03sec\x18\x01 \x02(\r\x12\x0c\n\x04nsec\x18\x02 \x02(\r\"Z\n\x05Value\x12\x0b\n\x01\x66\x18\x01 \x01(\x01H\x00\x12\x0b\n\x01i\x18\x02 \x01(\x03H\x00\x12\x0b\n\x01\x62\x18\x03 \x01(\x08H\x00\x12!\n\x01z\x18\x04 \x01(\x0b\x32\x14.villas.node.ComplexH\x00\x42\x07\n\x05value\"%\n\x07\x43omplex\x12\x0c\n\x04real\x18\x01 \x02(\x02\x12\x0c\n\x04imag\x18\x02 \x02(\x02') _globals = globals() diff --git a/tools/run-cppcheck.sh b/tools/run-cppcheck.sh index ee84ab588..7267efa0d 100755 --- a/tools/run-cppcheck.sh +++ b/tools/run-cppcheck.sh @@ -8,7 +8,7 @@ SOURCE_DIR=${SCRIPT_DIR}/.. BUILD_DIR=${SOURCE_DIR}/build # Generate compilation database -cmake -S ${SOURCE_DIR} -B ${BUILD_DIR} -DCMAKE_EXPORT_COMPILE_COMMANDS=ON +cmake -S ${SOURCE_DIR} -B ${BUILD_DIR} touch ${BUILD_DIR}/lib/formats/villas.pb-c.{c,h}