From 96732e5a8ed62b81519d84f0934271a51d41a059 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Thu, 14 May 2026 10:09:43 +0000 Subject: [PATCH 01/22] refactor: replace custom mosquctl scripts with native mosquitto_ctrl integration for dynamic security management --- bin/mosquctl_client | 39 ------------------------------------ bin/mosquctl_gateway | 47 -------------------------------------------- bin/mosquctl_log | 21 -------------------- 3 files changed, 107 deletions(-) delete mode 100755 bin/mosquctl_client delete mode 100755 bin/mosquctl_gateway delete mode 100755 bin/mosquctl_log diff --git a/bin/mosquctl_client b/bin/mosquctl_client deleted file mode 100755 index 6256091452..0000000000 --- a/bin/mosquctl_client +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -e - -UDMI_ROOT=$(dirname $0)/.. -cd $UDMI_ROOT - -source $UDMI_ROOT/etc/shell_common.sh - -if [[ $# != 2 ]]; then - echo Usage: $0 client_id client_pass - false -fi - -client_id=$1 -client_pass=$2 -shift 2 - -source $UDMI_ROOT/etc/mosquitto_ctrl.sh - -client_user=$client_id -role_name="role_${client_id//\//_}" - -$MOSQUITTO_CTRL deleteClient $client_user || true -$MOSQUITTO_CTRL deleteRole $role_name || true - -if [[ $client_pass != "--" ]]; then - $MOSQUITTO_CTRL createClient $client_user -p $client_pass -c $client_id - - $MOSQUITTO_CTRL createRole $role_name - $MOSQUITTO_CTRL addClientRole $client_user $role_name - - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/config" allow - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/commands" allow - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/errors" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/events/#" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/state" allow - echo "Device $client_id registered correctly." -else - echo "Device $client_id deleted correctly." -fi diff --git a/bin/mosquctl_gateway b/bin/mosquctl_gateway deleted file mode 100755 index 463d3bd7f2..0000000000 --- a/bin/mosquctl_gateway +++ /dev/null @@ -1,47 +0,0 @@ -#!/bin/bash -e - -UDMI_ROOT=$(dirname $0)/.. -source $UDMI_ROOT/etc/shell_common.sh -source $UDMI_ROOT/etc/mosquitto_ctrl.sh - -if [[ $# != 3 ]]; then - echo Usage: $0 action gateway_id device_id - echo Actions: bind, unbind - false -fi - -action=$1 -gateway_id=$2 -device_id=$3 - -role_name="role_${gateway_id//\//_}" - -if [[ $action == "bind" ]]; then - echo Binding $device_id to gateway $gateway_id - # Create role if not exists (ignore error if exists) - $MOSQUITTO_CTRL createRole $role_name || true - # Add role to gateway client (ignore error if already added) - $MOSQUITTO_CTRL addClientRole $gateway_id $role_name || true - - # Add ACLs - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/config" allow - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/commands" allow - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/errors" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/events/#" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/state" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/attach" allow - echo "Binding successful for $device_id to gateway $gateway_id" -elif [[ $action == "unbind" ]]; then - echo Unbinding $device_id from gateway $gateway_id - # Remove ACLs - $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/config" || true - $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/commands" || true - $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/errors" || true - $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/events/#" || true - $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/state" || true - $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/attach" || true - echo "Unbinding successful for $device_id from gateway $gateway_id" -else - echo Unknown action: $action - false -fi diff --git a/bin/mosquctl_log b/bin/mosquctl_log deleted file mode 100755 index b692e3868d..0000000000 --- a/bin/mosquctl_log +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash -e - -UDMI_ROOT=$(dirname $0)/.. -source $UDMI_ROOT/etc/shell_common.sh -cd $UDMI_ROOT - -LOG_FILE=/var/log/mosquitto/mosquitto.log - -if [[ $# != 1 ]]; then - echo Usage: $0 client_id_prefix - false -fi - -prefix=$1 -shift - -echo $(date +%s): Starting MONITOR Client $prefix of $LOG_FILE - -tail -f $LOG_FILE | stdbuf -oL egrep " (from|to|as|Client) $prefix" - -echo $(date +%s): Finished MONITOR Client $prefix of $LOG_FILE From 4f37c04e7b861f6f8c267172594e88993fc0b9d8 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Thu, 14 May 2026 10:10:02 +0000 Subject: [PATCH 02/22] refactor: replace shell-scripted mosquctl calls with direct mosquitto_ctrl dynsec commands in MosquittoBroker --- bin/test_mosquitto | 16 +- .../access/ImplicitIotAccessProvider.java | 30 +-- .../udmi/service/support/MosquittoBroker.java | 196 ++++++++++++++++-- 3 files changed, 206 insertions(+), 36 deletions(-) diff --git a/bin/test_mosquitto b/bin/test_mosquitto index 78c43fae11..fa049d140c 100755 --- a/bin/test_mosquitto +++ b/bin/test_mosquitto @@ -56,7 +56,17 @@ fgrep Racket out/mosquitto.sub || fail did not find expected message hash_pass=$(sha256sum < $site_path/devices/$device_id/rsa_private.pkcs8) dev_pass=${hash_pass:0:8} dev_id=/r/$registry_id/d/$device_id -bin/mosquctl_client $dev_id $dev_pass +$MOSQUITTO_CTRL deleteClient $dev_id || true +role_name="role_${dev_id//\//_}" +$MOSQUITTO_CTRL deleteRole $role_name || true +$MOSQUITTO_CTRL createClient $dev_id -p $dev_pass -c $dev_id +$MOSQUITTO_CTRL createRole $role_name +$MOSQUITTO_CTRL addClientRole $dev_id $role_name +$MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$dev_id/config" allow +$MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$dev_id/commands" allow +$MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$dev_id/errors" allow +$MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$dev_id/events/#" allow +$MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$dev_id/state" allow sleep 1 @@ -120,7 +130,9 @@ fi echo Cleanup subscribers and clients... killall mosquitto_sub || true -bin/mosquctl_client $dev_id -- +$MOSQUITTO_CTRL deleteClient $dev_id || true +role_name="role_${dev_id//\//_}" +$MOSQUITTO_CTRL deleteRole $role_name || true echo Received messages: cat out/mosquitto.sub | cut -c -120 diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 50788538b0..eab6f9102b 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -140,13 +140,26 @@ public ImplicitIotAccessProvider(IotAccess iotAccess) { brokerHost = ofNullable(endpointConfig.hostname).orElse("localhost"); brokerPort = ofNullable(endpointConfig.port).map(Object::toString).orElse("8883"); } else { + endpointConfig = new EndpointConfiguration(); brokerUser = options.get(BROKER_USER_KEY); brokerPass = options.get(BROKER_PASS_KEY); brokerHost = ofNullable(options.get(BROKER_HOST_KEY)).orElse("localhost"); brokerPort = ofNullable(options.get(BROKER_PORT_KEY)).orElse("8883"); + + endpointConfig.hostname = brokerHost; + endpointConfig.port = Integer.parseInt(brokerPort); + endpointConfig.transport = "1883".equals(brokerPort) ? Transport.TCP : Transport.SSL; + endpointConfig.client_id = clientId; + + if (isPublishEnabled()) { + endpointConfig.auth_provider = new Auth_provider(); + endpointConfig.auth_provider.basic = new Basic(); + endpointConfig.auth_provider.basic.username = brokerUser; + endpointConfig.auth_provider.basic.password = brokerPass; + } } - broker = new MosquittoBroker(this); + broker = new MosquittoBroker(this, endpointConfig); connLogger = broker.addEventListener(CLIENT_PREFIX, this::brokerHandler); } @@ -349,20 +362,7 @@ public void activate() { private void connectMqttClient() { info("Initializing SimpleMqttPipe for ImplicitIotAccessProvider"); try { - if (endpointConfig == null) { - endpointConfig = new EndpointConfiguration(); - endpointConfig.hostname = brokerHost; - endpointConfig.port = Integer.parseInt(brokerPort); - endpointConfig.transport = "1883".equals(brokerPort) ? Transport.TCP : Transport.SSL; - endpointConfig.client_id = clientId; - - if (isPublishEnabled()) { - endpointConfig.auth_provider = new Auth_provider(); - endpointConfig.auth_provider.basic = new Basic(); - endpointConfig.auth_provider.basic.username = brokerUser; - endpointConfig.auth_provider.basic.password = brokerPass; - } - } + if (endpointConfig.send_id == null) { endpointConfig.send_id = "implicit"; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index 8264461074..d287244c37 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -10,23 +10,29 @@ import com.google.bos.udmi.service.pod.ContainerBase; import java.io.BufferedReader; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import udmi.schema.EndpointConfiguration; + +/** + * NOTE: The topic structure used in ACLs in this file is hardcoded to use the deviceId + * directly (e.g., deviceId/config, deviceId/commands, etc.) instead of using properties + * from the endpoint configuration. + */ /** * Provider that links directly to a mosquitto broker. */ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { - private static final String UDMI_ROOT = System.getenv("UDMI_ROOT"); - private static final String MOSQUCTL_CLIENT_FMT = UDMI_ROOT + "/bin/mosquctl_client %s %s"; - private static final String MOSQUCTL_LOG_FMT = UDMI_ROOT + "/bin/mosquctl_log %s"; - private static final String MOSQUCTL_GATEWAY_FMT = UDMI_ROOT + "/bin/mosquctl_gateway %s %s %s"; + private static final long EXEC_TIMEOUT_SEC = 10; private static final String REVOKE_PASSWORD = "--"; private static final Pattern LOG_MATCHER = @@ -35,9 +41,38 @@ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { Pattern.compile("\\(([d01,qr ]+), m([0-9]+), '(\\S+)', .*\\)"); private static final Pattern PUBACK_MATCHER = Pattern.compile("\\((m|Mid: )([0-9]+), \\S+\\)"); private final ContainerBase container; + private final EndpointConfiguration endpointConfig; - public MosquittoBroker(ContainerBase container) { + public MosquittoBroker(ContainerBase container, EndpointConfiguration endpointConfig) { this.container = container; + this.endpointConfig = endpointConfig; + } + + private List buildCommandPrefix() { + List cmd = new ArrayList<>(); + cmd.add("mosquitto_ctrl"); + + if (endpointConfig.hostname != null) { + cmd.add("-h"); + cmd.add(endpointConfig.hostname); + } + if (endpointConfig.port != null) { + cmd.add("-p"); + cmd.add(endpointConfig.port.toString()); + } + if (endpointConfig.auth_provider != null && endpointConfig.auth_provider.basic != null) { + if (endpointConfig.auth_provider.basic.username != null) { + cmd.add("-u"); + cmd.add(endpointConfig.auth_provider.basic.username); + } + if (endpointConfig.auth_provider.basic.password != null) { + cmd.add("-P"); + cmd.add(endpointConfig.auth_provider.basic.password); + } + } + + cmd.add("dynsec"); + return cmd; } private void consumeLogs(String clientPrefix, Consumer eventConsumer) { @@ -61,38 +96,130 @@ private void consumeStream(BufferedReader reader, Consumer consumer) { thread.start(); } - private void executeCommand(String cmd) { + private void executeCommand(List cmd) { synchronized (MosquittoBroker.class) { try { - info("Executing command %s", cmd); - Process exec = Runtime.getRuntime().exec(cmd); + info("Executing command %s", String.join(" ", cmd)); + ProcessBuilder pb = new ProcessBuilder(cmd); + Process exec = pb.start(); exec.waitFor(EXEC_TIMEOUT_SEC, TimeUnit.SECONDS); exec.errorReader().lines().forEach(container::info); exec.inputReader().lines().forEach(container::info); int exitValue = exec.exitValue(); checkState(exitValue == 0, "exit return code " + exitValue); } catch (Exception e) { - throw new RuntimeException("While executing " + cmd, e); + throw new RuntimeException("While executing " + String.join(" ", cmd), e); } } } private void mosquctlClient(String clientId, String clientPass) { - executeCommand(format(MOSQUCTL_CLIENT_FMT, clientId, clientPass)); + String clientUser = clientId; + String roleName = "role_" + clientId.replace("/", "_"); + + deleteClient(clientUser); + deleteRole(roleName); + + if (!"--".equals(clientPass)) { + createClient(clientUser, clientPass, clientId); + createRole(roleName); + addClientRole(clientUser, roleName); + + addRoleAcl(roleName, "subscribePattern", clientId + "/config", "allow"); + addRoleAcl(roleName, "subscribePattern", clientId + "/commands", "allow"); + addRoleAcl(roleName, "subscribePattern", clientId + "/errors", "allow"); + addRoleAcl(roleName, "publishClientSend", clientId + "/events/#", "allow"); + addRoleAcl(roleName, "publishClientSend", clientId + "/state", "allow"); + + info("Device %s registered correctly.", clientId); + } else { + info("Device %s deleted correctly.", clientId); + } + } + + private void addRoleAcl(String roleName, String type, String pattern, String allow) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("addRoleACL"); + cmd.add(roleName); + cmd.add(type); + cmd.add(pattern); + cmd.add(allow); + executeCommand(cmd); + } + + private void deleteClient(String clientUser) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("deleteClient"); + cmd.add(clientUser); + try { + executeCommand(cmd); + } catch (Exception e) { + warn("Ignore error deleting client: " + e.getMessage()); + } + } + + private void deleteRole(String roleName) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("deleteRole"); + cmd.add(roleName); + try { + executeCommand(cmd); + } catch (Exception e) { + warn("Ignore error deleting role: " + e.getMessage()); + } + } + + private void createClient(String clientUser, String clientPass, String clientId) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("createClient"); + cmd.add(clientUser); + cmd.add("-p"); + cmd.add(clientPass); + cmd.add("-c"); + cmd.add(clientId); + executeCommand(cmd); + } + + private void createRole(String roleName) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("createRole"); + cmd.add(roleName); + try { + executeCommand(cmd); + } catch (Exception e) { + warn("Ignore error creating role: " + e.getMessage()); + } + } + + private void addClientRole(String clientUser, String roleName) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("addClientRole"); + cmd.add(clientUser); + cmd.add(roleName); + try { + executeCommand(cmd); + } catch (Exception e) { + warn("Ignore error adding client role: " + e.getMessage()); + } } private void mosquctlLog(String clientPrefix, Consumer eventConsumer) { - String cmd = format(MOSQUCTL_LOG_FMT, clientPrefix); synchronized (MosquittoBroker.class) { try { - info("Starting log consumer %s", cmd); - Process exec = Runtime.getRuntime().exec(cmd); + info("Starting log consumer for prefix %s", clientPrefix); + ProcessBuilder pb = new ProcessBuilder("tail", "-f", "/var/log/mosquitto/mosquitto.log"); + Process exec = pb.start(); consumeStream(exec.errorReader(), line -> warn("log error: " + line)); - consumeStream(exec.inputReader(), line -> ifNotNullThen(parseLogLine(line), eventConsumer)); + consumeStream(exec.inputReader(), line -> { + BrokerEvent event = parseLogLine(line); + if (event != null && event.clientId != null && event.clientId.startsWith(clientPrefix)) { + eventConsumer.accept(event); + } + }); } catch (Exception e) { - throw new RuntimeException("While executing " + cmd, e); + throw new RuntimeException("While starting log consumer", e); } finally { - info("Completed log consumer"); + info("Completed log consumer setup"); } } } @@ -155,12 +282,43 @@ public void authorize(String clientId, String password) { @Override public void bindGateway(String gatewayId, String deviceId) { - executeCommand(format(MOSQUCTL_GATEWAY_FMT, "bind", gatewayId, deviceId)); + String roleName = "role_" + gatewayId.replace("/", "_"); + + createRole(roleName); + addClientRole(gatewayId, roleName); + + // add ACLs + addRoleAcl(roleName, "subscribePattern", deviceId + "/config", "allow"); + addRoleAcl(roleName, "subscribePattern", deviceId + "/commands", "allow"); + addRoleAcl(roleName, "subscribePattern", deviceId + "/errors", "allow"); + addRoleAcl(roleName, "publishClientSend", deviceId + "/events/#", "allow"); + addRoleAcl(roleName, "publishClientSend", deviceId + "/state", "allow"); + addRoleAcl(roleName, "publishClientSend", deviceId + "/attach", "allow"); } @Override public void unbindGateway(String gatewayId, String deviceId) { - info("Unbind device Id: %s from gateway Id: %s :%s", deviceId, gatewayId); - executeCommand(format(MOSQUCTL_GATEWAY_FMT, "unbind", gatewayId, deviceId)); + info("Unbind device Id: %s from gateway Id: %s", deviceId, gatewayId); + String roleName = "role_" + gatewayId.replace("/", "_"); + + removeRoleAcl(roleName, "subscribePattern", deviceId + "/config"); + removeRoleAcl(roleName, "subscribePattern", deviceId + "/commands"); + removeRoleAcl(roleName, "subscribePattern", deviceId + "/errors"); + removeRoleAcl(roleName, "publishClientSend", deviceId + "/events/#"); + removeRoleAcl(roleName, "publishClientSend", deviceId + "/state"); + removeRoleAcl(roleName, "publishClientSend", deviceId + "/attach"); + } + + private void removeRoleAcl(String roleName, String type, String pattern) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("removeRoleACL"); + cmd.add(roleName); + cmd.add(type); + cmd.add(pattern); + try { + executeCommand(cmd); + } catch (Exception e) { + warn("Ignore error removing role ACL: " + e.getMessage()); + } } } From c15c71f0fee1dbab760877308bd8d80f18e00728 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Thu, 14 May 2026 12:02:11 +0000 Subject: [PATCH 03/22] add keys to endpoint config --- .gencode_hash.txt | 16 +- gencode/docs/configuration_endpoint.html | 102 ++ gencode/docs/configuration_execution.html | 246 +++ gencode/docs/configuration_pod.html | 1451 ++++++++++++++--- gencode/docs/configuration_pubber.html | 123 ++ gencode/docs/persistent_device.html | 123 ++ .../udmi/schema/EndpointConfiguration.java | 45 +- gencode/presentation/presentation.json | 42 + .../udmi/schema/configuration_endpoint.py | 3 + misc/bambi/presentation.gld.json | 42 + schema/configuration_endpoint.json | 24 + udmis/etc/local_pod.json | 3 + .../udmi/service/support/MosquittoBroker.java | 13 + 13 files changed, 2026 insertions(+), 207 deletions(-) diff --git a/.gencode_hash.txt b/.gencode_hash.txt index d22e124ce3..c48e946626 100644 --- a/.gencode_hash.txt +++ b/.gencode_hash.txt @@ -2,10 +2,10 @@ c21266e8de23b6e8acfac16a177db782375bf71df470c343c3f8eb8b822ef56a gencode/docs/commands_discovery.html 7bd462fd9b31a1a35b27f069194051161c484600161d7ef3fa7a4f8efa4e65fb gencode/docs/commands_mapping.html a65e8177ca59cd51c4a8ff63ecaa194897f7e22b82afb14708d63efbd7b96a84 gencode/docs/config.html -0bb798c8db6be37266946569a096e2f0b7520ee54d65e7e8d37675ef17820f14 gencode/docs/configuration_endpoint.html -7caf6f29bff7c358536444c0d8502169f708bcc6f32a3d0a8aae155708b2cd04 gencode/docs/configuration_execution.html -640ad5670f31f7c32f3e8d29e8f4f54b1f95c121521e1b6efc2fb943814d6918 gencode/docs/configuration_pod.html -b34c136cee32cb88f32a427ff400c3898ed49168f6dcaca1bc9ba65365bc5ae4 gencode/docs/configuration_pubber.html +4d2edf1c19b274e5273370f79a8e21c5e8009c1b1e2529d83a33392ddec46db6 gencode/docs/configuration_endpoint.html +8b644bcc965a4fee3fa66dc55abde2079adcef621b0d49aa44ae83185617071e gencode/docs/configuration_execution.html +5fe25ff7a6ac8ed09476f9e24f56e686ba35268d694ebc20968222a207445400 gencode/docs/configuration_pod.html +cf01efc05e3fcabb18786af43f7f93d7780b8156b91827e34b34698c4bcea5b8 gencode/docs/configuration_pubber.html 1057fa40fb7a31a23bb2773d21c38cf4590a935bd8b5ea4218e695c6204f5dd9 gencode/docs/data_template.html ffd9325c940b8e832a608a595c8ccc8903b935e899f42219ddc3c79ec65f6202 gencode/docs/events.html 70e57ad6ef39330d958727ebf9dcd61ef6ea30e4c8653eac412bf1867fdb3a70 gencode/docs/events_alarmset.html @@ -17,7 +17,7 @@ cac253f57c5c92ef32e2a5f91b6cec8229e8db1dcffcc96a58f06da068e741e7 gencode/docs/e 73dbe799e7943ec20ac58b544998e986a39539d4ef0cb4f5023e92e7634d3124 gencode/docs/events_validation.html cebf265b0c3d3a6e9c9e8c4e6c723ac36bc088dfabbc789e775388c3d2ec83d0 gencode/docs/metadata.html c86682715d348bd3dd971fa5bd925a8a3d0f3c2944c65a47c4b64fe1a5ccdea2 gencode/docs/monitoring.html -474ca16edc9f3cad2bb3ab40b6993cbced90263f762f66ee6cd246a6c4a0d18f gencode/docs/persistent_device.html +841b43bad8a76268435bf545cf7b8a57b1ba478a9cd244b3cd97aca536b1b360 gencode/docs/persistent_device.html e11595fd11477947a27461f8ef4fb6facb5f60e2abd6212193f7581ab123ff84 gencode/docs/properties.html c006d0f46c8f007caa90ac76c713cead907669c14c09f4a288fac5b25afe05dd gencode/docs/query_cloud.html 6f2cd8163a129667beb79f297f193ecd14d40c4f1ac06570db13d912ab98fd3f gencode/docs/readme.md @@ -70,7 +70,7 @@ ff98f706c967bbef7be35d826f5abad822c89d2fa981c4d8e6cffbb106d3740c gencode/java/u 1a04079116f4032b17108e873719bbb2c60c19f392136e7c906122199b472227 gencode/java/udmi/schema/DiscoveryModel.java b41f59d0c1aa74bb9abbdc7525d726f45a4ba8df3866c2dd40458601ffab60fb gencode/java/udmi/schema/DiscoveryState.java 7019b8a1522261a69d708e2e7725b8bc44510c5c80f5c056543af2b7728bfa42 gencode/java/udmi/schema/Electricity.java -a9ab0b95408ee04fc98cbc2363bead24c67e64e279e6088e0dd3e4b97154c65d gencode/java/udmi/schema/EndpointConfiguration.java +722263ceca9acb28c7b39d2133e2eede16c6f037d4325c7587a4563c80d77f2b gencode/java/udmi/schema/EndpointConfiguration.java d2e7afd6e1a9250480144c114ee8877afb9b0dd7048495fe96a821d9e6c80475 gencode/java/udmi/schema/Entry.java d4bfd2c997937cd1b0f9b1c73ed46b0133b2b264d367ca9001b7d84ecd598611 gencode/java/udmi/schema/Enumerations.java 11ccad9b1ed4a4745e7bf321adc4bb7f684e9bfa90877314094533b9749512b7 gencode/java/udmi/schema/Envelope.java @@ -157,7 +157,7 @@ a671f5341703d03200c3a4b4df41f109e587e3723292321998595da60b03e4a2 gencode/java/u 8cc9c88554fc6c9d7d6eba6884279eab3160b4e72d8edfef8155e69ec61c1eec gencode/java/udmi/schema/ValidationState.java eadc72e31b4796273479967303513b16563af0f946d1e1c7eba1748f9b133d40 gencode/java/udmi/schema/ValidationSummary.java 11f8dab5296d41e86cd623a4ed27b972ae673b141907cb913397d4eb53880c59 gencode/java/udmi/schema/Water.java -0d863349462e546bbfad54e75032170d9967f4ed95aa6bd900f36398fba012a4 gencode/presentation/presentation.json +dd8497a1183a85ae281a54b02bbf360f9ecb95176d1efb588804c63e585c0d79 gencode/presentation/presentation.json 4cf98cbd132cde0cc8813ac35cf3712cb46014154c817c04ad2902c268cdd8fe gencode/python/pyproject.toml a98b84029d33a421872a08f7bfb1bd2f23f8dc3bebc8d7a1c2a7f1c8596510bf gencode/python/udmi/schema/__init__.py f9d90861e568b27445bef241f04cce64cc44731c95c8bd9e3f65cef79d42dab0 gencode/python/udmi/schema/_base.py @@ -187,7 +187,7 @@ b65e7e9152629a9175f62ed706701d02814d62d8f3e9c0b46c2ba49ad8b51077 gencode/python faf4cdb1687868fadc411c144d57a5c596af790e43df4d7ddec5ee3fd10ba4bd gencode/python/udmi/schema/config_system.py ddd6e974840a19aa75e2f322e140676f71e0715f69088b88dfea76f0625ab035 gencode/python/udmi/schema/config_system_testing.py f187d5acbf931e97365af4283ee676786b7f26294db720b1c8535ab50387a6c1 gencode/python/udmi/schema/config_udmi.py -dea1560526d0e24431679ba7385bd5ae2fd6a0750c69c4548e2b12084b0467ff gencode/python/udmi/schema/configuration_endpoint.py +d1cf489cbf937f801213f1425508afcc37c548e08f1da6ac755f9282bd84992d gencode/python/udmi/schema/configuration_endpoint.py 8adbf9647a6c6666559837d1963d41e0fd7e9c9193bc73f40066ae3f97ce378c gencode/python/udmi/schema/configuration_execution.py c0853ff1838a11291b53a40c43a6cdaff17951a34b3d7f3de10d9fcc7b44b79d gencode/python/udmi/schema/configuration_pod.py 327db1ca7c7a8e4d8ff0eb40b183163c314a5b3bc56c25bd17492fec9c9a811f gencode/python/udmi/schema/configuration_pod_base.py diff --git a/gencode/docs/configuration_endpoint.html b/gencode/docs/configuration_endpoint.html index 50f5297af3..f52acb6d0c 100644 --- a/gencode/docs/configuration_endpoint.html +++ b/gencode/docs/configuration_endpoint.html @@ -1024,6 +1024,108 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
diff --git a/gencode/docs/configuration_execution.html b/gencode/docs/configuration_execution.html index facf8a4a4b..6adec99c9c 100644 --- a/gencode/docs/configuration_execution.html +++ b/gencode/docs/configuration_execution.html @@ -1686,6 +1686,129 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
@@ -2962,6 +3085,129 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
diff --git a/gencode/docs/configuration_pod.html b/gencode/docs/configuration_pod.html index 0b9c2370e7..7c49c7f82b 100644 --- a/gencode/docs/configuration_pod.html +++ b/gencode/docs/configuration_pod.html @@ -1403,6 +1403,129 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
@@ -2916,6 +3039,150 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
@@ -4444,18 +4711,18 @@

-
+
-
+

- +

-
+
Type: stringFormat: date-time
-

The timestamp of the endpoint generation

+ ca_file
Type: string
+

Full path to the CA certificate file

-
-
Example:
-
"2019-01-17T14:02:29.364Z"
-
-
-
-
-
-
-
-
- - + -
+
-
+

- +

-
+
Type: object
- - - - - - - -
-
-
-

-

Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: stringFormat: date-time
+

The timestamp of the endpoint generation

+
+ + + + + +
+
Example:
+
"2019-01-17T14:02:29.364Z"
+
+
+
+
+
+
+
+
+
+
+
+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: object
+ + + + + + + +
+
+
+

+

@@ -6257,18 +6668,18 @@

-
+
-
+

- +

-
+
Type: stringFormat: date-time
-

The timestamp of the endpoint generation

+ ca_file
Type: string
+

Full path to the CA certificate file

-
-
Example:
-
"2019-01-17T14:02:29.364Z"
-
-
-
-
-
-
+
-
+
-
+

- +

-
+

Endpoint Configuration

Type: object
-

Parameters to define a message endpoint

+ from + + + + cert_file
Type: string
+

Full path to the client certificate file

- - No Additional Properties -
+
+
+
+
+
-
+

- +

-
+
Type: string
-

Friendly name for this flow (debugging and diagnostics)

+ key_file
Type: string
+

Full path to the client private key file

@@ -6421,18 +6833,18 @@

-
+
-
+

- +

-
+
Type: enum (of string)
-
-

Must be one of:

-
  • "local"
  • "pubsub"
  • "file"
  • "trace"
  • "mqtt"
-
+ generation
Type: stringFormat: date-time
+

The timestamp of the endpoint generation

+
- +
+
Example:
+
"2019-01-17T14:02:29.364Z"
+
+
-
+
+
+
+
+
-
+

- +

-
-
+
+
+ +

Endpoint Configuration

Type: object
+

Parameters to define a message endpoint

+
+ + No Additional Properties + + + + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Friendly name for this flow (debugging and diagnostics)

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: enum (of string)
+
+

Must be one of:

+
  • "local"
  • "pubsub"
  • "file"
  • "trace"
  • "mqtt"
+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
@@ -9648,9 +10389,206 @@

-
+
+
+ + Type: object
+ No Additional Properties + + + + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+ + + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+ + + + + + + +
+
+
+
+
+
+
+
+
+
+
+

+ +

+
+ +
Type: object
+ jwt
Type: object
No Additional Properties @@ -9696,18 +10634,18 @@

-
+
-
+

- +

-
+
Type: string
+ audience
Type: string
@@ -9764,18 +10702,26 @@

-
+
+
+
+
+

+
+
+
+
-
+

- +

-
+
Type: string
- + ca_file
Type: string
+

Full path to the CA certificate file

+
@@ -9832,22 +10765,18 @@

-
-
-
-
-
+
-
+

- +

-
+
Type: object
- No Additional Properties + cert_file
Type: string
+

Full path to the client certificate file

+
-
+
+
+
+
+
-
+

- +

-
+
Type: string
- + key_file
Type: string
+

Full path to the client private key file

+
-
-
-
-
-
-
-
-
@@ -11981,6 +12887,171 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
diff --git a/gencode/docs/configuration_pubber.html b/gencode/docs/configuration_pubber.html index b1ede1ae25..78c841a05c 100644 --- a/gencode/docs/configuration_pubber.html +++ b/gencode/docs/configuration_pubber.html @@ -1252,6 +1252,129 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
diff --git a/gencode/docs/persistent_device.html b/gencode/docs/persistent_device.html index 15d4d7f913..2020efb17c 100644 --- a/gencode/docs/persistent_device.html +++ b/gencode/docs/persistent_device.html @@ -1252,6 +1252,129 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
diff --git a/gencode/java/udmi/schema/EndpointConfiguration.java b/gencode/java/udmi/schema/EndpointConfiguration.java index ebfba2835a..972fc7f29b 100644 --- a/gencode/java/udmi/schema/EndpointConfiguration.java +++ b/gencode/java/udmi/schema/EndpointConfiguration.java @@ -43,6 +43,9 @@ "keyBytes", "algorithm", "auth_provider", + "ca_file", + "cert_file", + "key_file", "generation" }) public class EndpointConfiguration { @@ -173,6 +176,27 @@ public class EndpointConfiguration { public String algorithm; @JsonProperty("auth_provider") public Auth_provider auth_provider; + /** + * Full path to the CA certificate file + * + */ + @JsonProperty("ca_file") + @JsonPropertyDescription("Full path to the CA certificate file") + public String ca_file; + /** + * Full path to the client certificate file + * + */ + @JsonProperty("cert_file") + @JsonPropertyDescription("Full path to the client certificate file") + public String cert_file; + /** + * Full path to the client private key file + * + */ + @JsonProperty("key_file") + @JsonPropertyDescription("Full path to the client private key file") + public String key_file; /** * The timestamp of the endpoint generation * @@ -184,30 +208,33 @@ public class EndpointConfiguration { @Override public int hashCode() { int result = 1; - result = ((result* 31)+((this.generation == null)? 0 :this.generation.hashCode())); result = ((result* 31)+((this.keyBytes == null)? 0 :this.keyBytes.hashCode())); + result = ((result* 31)+((this.ca_file == null)? 0 :this.ca_file.hashCode())); + result = ((result* 31)+((this.key_file == null)? 0 :this.key_file.hashCode())); + result = ((result* 31)+((this.cert_file == null)? 0 :this.cert_file.hashCode())); result = ((result* 31)+((this.side_id == null)? 0 :this.side_id.hashCode())); - result = ((result* 31)+((this.transport == null)? 0 :this.transport.hashCode())); - result = ((result* 31)+((this.publish_delay_sec == null)? 0 :this.publish_delay_sec.hashCode())); result = ((result* 31)+((this.error == null)? 0 :this.error.hashCode())); - result = ((result* 31)+((this.config_sync_sec == null)? 0 :this.config_sync_sec.hashCode())); result = ((result* 31)+((this.deviceId == null)? 0 :this.deviceId.hashCode())); result = ((result* 31)+((this.client_id == null)? 0 :this.client_id.hashCode())); result = ((result* 31)+((this.enabled == null)? 0 :this.enabled.hashCode())); result = ((result* 31)+((this.capacity == null)? 0 :this.capacity.hashCode())); - result = ((result* 31)+((this.send_id == null)? 0 :this.send_id.hashCode())); result = ((result* 31)+((this.protocol == null)? 0 :this.protocol.hashCode())); result = ((result* 31)+((this.hostname == null)? 0 :this.hostname.hashCode())); result = ((result* 31)+((this.payload == null)? 0 :this.payload.hashCode())); - result = ((result* 31)+((this.port == null)? 0 :this.port.hashCode())); result = ((result* 31)+((this.topic_prefix == null)? 0 :this.topic_prefix.hashCode())); + result = ((result* 31)+((this.recv_id == null)? 0 :this.recv_id.hashCode())); + result = ((result* 31)+((this.algorithm == null)? 0 :this.algorithm.hashCode())); + result = ((result* 31)+((this.generation == null)? 0 :this.generation.hashCode())); + result = ((result* 31)+((this.transport == null)? 0 :this.transport.hashCode())); + result = ((result* 31)+((this.publish_delay_sec == null)? 0 :this.publish_delay_sec.hashCode())); + result = ((result* 31)+((this.config_sync_sec == null)? 0 :this.config_sync_sec.hashCode())); + result = ((result* 31)+((this.send_id == null)? 0 :this.send_id.hashCode())); + result = ((result* 31)+((this.port == null)? 0 :this.port.hashCode())); result = ((result* 31)+((this.name == null)? 0 :this.name.hashCode())); result = ((result* 31)+((this.periodic_sec == null)? 0 :this.periodic_sec.hashCode())); result = ((result* 31)+((this.noConfigAck == null)? 0 :this.noConfigAck.hashCode())); - result = ((result* 31)+((this.recv_id == null)? 0 :this.recv_id.hashCode())); result = ((result* 31)+((this.gatewayId == null)? 0 :this.gatewayId.hashCode())); result = ((result* 31)+((this.auth_provider == null)? 0 :this.auth_provider.hashCode())); - result = ((result* 31)+((this.algorithm == null)? 0 :this.algorithm.hashCode())); return result; } @@ -220,7 +247,7 @@ public boolean equals(Object other) { return false; } EndpointConfiguration rhs = ((EndpointConfiguration) other); - return (((((((((((((((((((((((((this.generation == rhs.generation)||((this.generation!= null)&&this.generation.equals(rhs.generation)))&&((this.keyBytes == rhs.keyBytes)||((this.keyBytes!= null)&&this.keyBytes.equals(rhs.keyBytes))))&&((this.side_id == rhs.side_id)||((this.side_id!= null)&&this.side_id.equals(rhs.side_id))))&&((this.transport == rhs.transport)||((this.transport!= null)&&this.transport.equals(rhs.transport))))&&((this.publish_delay_sec == rhs.publish_delay_sec)||((this.publish_delay_sec!= null)&&this.publish_delay_sec.equals(rhs.publish_delay_sec))))&&((this.error == rhs.error)||((this.error!= null)&&this.error.equals(rhs.error))))&&((this.config_sync_sec == rhs.config_sync_sec)||((this.config_sync_sec!= null)&&this.config_sync_sec.equals(rhs.config_sync_sec))))&&((this.deviceId == rhs.deviceId)||((this.deviceId!= null)&&this.deviceId.equals(rhs.deviceId))))&&((this.client_id == rhs.client_id)||((this.client_id!= null)&&this.client_id.equals(rhs.client_id))))&&((this.enabled == rhs.enabled)||((this.enabled!= null)&&this.enabled.equals(rhs.enabled))))&&((this.capacity == rhs.capacity)||((this.capacity!= null)&&this.capacity.equals(rhs.capacity))))&&((this.send_id == rhs.send_id)||((this.send_id!= null)&&this.send_id.equals(rhs.send_id))))&&((this.protocol == rhs.protocol)||((this.protocol!= null)&&this.protocol.equals(rhs.protocol))))&&((this.hostname == rhs.hostname)||((this.hostname!= null)&&this.hostname.equals(rhs.hostname))))&&((this.payload == rhs.payload)||((this.payload!= null)&&this.payload.equals(rhs.payload))))&&((this.port == rhs.port)||((this.port!= null)&&this.port.equals(rhs.port))))&&((this.topic_prefix == rhs.topic_prefix)||((this.topic_prefix!= null)&&this.topic_prefix.equals(rhs.topic_prefix))))&&((this.name == rhs.name)||((this.name!= null)&&this.name.equals(rhs.name))))&&((this.periodic_sec == rhs.periodic_sec)||((this.periodic_sec!= null)&&this.periodic_sec.equals(rhs.periodic_sec))))&&((this.noConfigAck == rhs.noConfigAck)||((this.noConfigAck!= null)&&this.noConfigAck.equals(rhs.noConfigAck))))&&((this.recv_id == rhs.recv_id)||((this.recv_id!= null)&&this.recv_id.equals(rhs.recv_id))))&&((this.gatewayId == rhs.gatewayId)||((this.gatewayId!= null)&&this.gatewayId.equals(rhs.gatewayId))))&&((this.auth_provider == rhs.auth_provider)||((this.auth_provider!= null)&&this.auth_provider.equals(rhs.auth_provider))))&&((this.algorithm == rhs.algorithm)||((this.algorithm!= null)&&this.algorithm.equals(rhs.algorithm)))); + return ((((((((((((((((((((((((((((this.keyBytes == rhs.keyBytes)||((this.keyBytes!= null)&&this.keyBytes.equals(rhs.keyBytes)))&&((this.ca_file == rhs.ca_file)||((this.ca_file!= null)&&this.ca_file.equals(rhs.ca_file))))&&((this.key_file == rhs.key_file)||((this.key_file!= null)&&this.key_file.equals(rhs.key_file))))&&((this.cert_file == rhs.cert_file)||((this.cert_file!= null)&&this.cert_file.equals(rhs.cert_file))))&&((this.side_id == rhs.side_id)||((this.side_id!= null)&&this.side_id.equals(rhs.side_id))))&&((this.error == rhs.error)||((this.error!= null)&&this.error.equals(rhs.error))))&&((this.deviceId == rhs.deviceId)||((this.deviceId!= null)&&this.deviceId.equals(rhs.deviceId))))&&((this.client_id == rhs.client_id)||((this.client_id!= null)&&this.client_id.equals(rhs.client_id))))&&((this.enabled == rhs.enabled)||((this.enabled!= null)&&this.enabled.equals(rhs.enabled))))&&((this.capacity == rhs.capacity)||((this.capacity!= null)&&this.capacity.equals(rhs.capacity))))&&((this.protocol == rhs.protocol)||((this.protocol!= null)&&this.protocol.equals(rhs.protocol))))&&((this.hostname == rhs.hostname)||((this.hostname!= null)&&this.hostname.equals(rhs.hostname))))&&((this.payload == rhs.payload)||((this.payload!= null)&&this.payload.equals(rhs.payload))))&&((this.topic_prefix == rhs.topic_prefix)||((this.topic_prefix!= null)&&this.topic_prefix.equals(rhs.topic_prefix))))&&((this.recv_id == rhs.recv_id)||((this.recv_id!= null)&&this.recv_id.equals(rhs.recv_id))))&&((this.algorithm == rhs.algorithm)||((this.algorithm!= null)&&this.algorithm.equals(rhs.algorithm))))&&((this.generation == rhs.generation)||((this.generation!= null)&&this.generation.equals(rhs.generation))))&&((this.transport == rhs.transport)||((this.transport!= null)&&this.transport.equals(rhs.transport))))&&((this.publish_delay_sec == rhs.publish_delay_sec)||((this.publish_delay_sec!= null)&&this.publish_delay_sec.equals(rhs.publish_delay_sec))))&&((this.config_sync_sec == rhs.config_sync_sec)||((this.config_sync_sec!= null)&&this.config_sync_sec.equals(rhs.config_sync_sec))))&&((this.send_id == rhs.send_id)||((this.send_id!= null)&&this.send_id.equals(rhs.send_id))))&&((this.port == rhs.port)||((this.port!= null)&&this.port.equals(rhs.port))))&&((this.name == rhs.name)||((this.name!= null)&&this.name.equals(rhs.name))))&&((this.periodic_sec == rhs.periodic_sec)||((this.periodic_sec!= null)&&this.periodic_sec.equals(rhs.periodic_sec))))&&((this.noConfigAck == rhs.noConfigAck)||((this.noConfigAck!= null)&&this.noConfigAck.equals(rhs.noConfigAck))))&&((this.gatewayId == rhs.gatewayId)||((this.gatewayId!= null)&&this.gatewayId.equals(rhs.gatewayId))))&&((this.auth_provider == rhs.auth_provider)||((this.auth_provider!= null)&&this.auth_provider.equals(rhs.auth_provider)))); } public enum Protocol { diff --git a/gencode/presentation/presentation.json b/gencode/presentation/presentation.json index 3fbe159045..f960d84fa6 100644 --- a/gencode/presentation/presentation.json +++ b/gencode/presentation/presentation.json @@ -440,6 +440,27 @@ "section": "cloud_iot_config", "type": "string" }, + "reflector_endpoint.ca_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the CA certificate file" + }, + "reflector_endpoint.cert_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client certificate file" + }, + "reflector_endpoint.key_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client private key file" + }, "reflector_endpoint.generation": { "display": "show", "style": "bold", @@ -592,6 +613,27 @@ "section": "cloud_iot_config", "type": "string" }, + "device_endpoint.ca_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the CA certificate file" + }, + "device_endpoint.cert_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client certificate file" + }, + "device_endpoint.key_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client private key file" + }, "device_endpoint.generation": { "display": "show", "style": "bold", diff --git a/gencode/python/udmi/schema/configuration_endpoint.py b/gencode/python/udmi/schema/configuration_endpoint.py index 9c1d5d3d77..954286596b 100644 --- a/gencode/python/udmi/schema/configuration_endpoint.py +++ b/gencode/python/udmi/schema/configuration_endpoint.py @@ -69,4 +69,7 @@ class EndpointConfiguration(DataModel): keyBytes: Optional[Any] = None algorithm: Optional[str] = None auth_provider: Optional[AuthProvider] = None + ca_file: Optional[str] = None + cert_file: Optional[str] = None + key_file: Optional[str] = None generation: Optional[str] = None diff --git a/misc/bambi/presentation.gld.json b/misc/bambi/presentation.gld.json index 3fbe159045..f960d84fa6 100644 --- a/misc/bambi/presentation.gld.json +++ b/misc/bambi/presentation.gld.json @@ -440,6 +440,27 @@ "section": "cloud_iot_config", "type": "string" }, + "reflector_endpoint.ca_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the CA certificate file" + }, + "reflector_endpoint.cert_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client certificate file" + }, + "reflector_endpoint.key_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client private key file" + }, "reflector_endpoint.generation": { "display": "show", "style": "bold", @@ -592,6 +613,27 @@ "section": "cloud_iot_config", "type": "string" }, + "device_endpoint.ca_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the CA certificate file" + }, + "device_endpoint.cert_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client certificate file" + }, + "device_endpoint.key_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client private key file" + }, "device_endpoint.generation": { "display": "show", "style": "bold", diff --git a/schema/configuration_endpoint.json b/schema/configuration_endpoint.json index fe8dcb8700..9397e59d14 100644 --- a/schema/configuration_endpoint.json +++ b/schema/configuration_endpoint.json @@ -224,6 +224,30 @@ } } }, + "ca_file": { + "description": "Full path to the CA certificate file", + "type": "string", + "$presentation": { + "display": "show", + "style": "bold" + } + }, + "cert_file": { + "description": "Full path to the client certificate file", + "type": "string", + "$presentation": { + "display": "show", + "style": "bold" + } + }, + "key_file": { + "description": "Full path to the client private key file", + "type": "string", + "$presentation": { + "display": "show", + "style": "bold" + } + }, "generation": { "description": "The timestamp of the endpoint generation", "type": "string", diff --git a/udmis/etc/local_pod.json b/udmis/etc/local_pod.json index 2ec9a19c1e..89ab2c5ff8 100644 --- a/udmis/etc/local_pod.json +++ b/udmis/etc/local_pod.json @@ -51,6 +51,9 @@ "transport": "ssl", "hostname": "localhost", "port": 8883, + "ca_file": "/etc/mosquitto/certs/ca.crt", + "cert_file": "/etc/mosquitto/certs/rsa_private.crt", + "key_file": "/etc/mosquitto/certs/rsa_private.pem", "auth_provider": { "basic": { "username": "rocket", diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index d287244c37..2d4da51ef5 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -71,6 +71,19 @@ private List buildCommandPrefix() { } } + if (endpointConfig.ca_file != null) { + cmd.add("--cafile"); + cmd.add(endpointConfig.ca_file); + } + if (endpointConfig.cert_file != null) { + cmd.add("--cert"); + cmd.add(endpointConfig.cert_file); + } + if (endpointConfig.key_file != null) { + cmd.add("--key"); + cmd.add(endpointConfig.key_file); + } + cmd.add("dynsec"); return cmd; } From 40c44fcdf383d47c4b0bae552c504e3bf3af89ee Mon Sep 17 00:00:00 2001 From: elsaidi Date: Thu, 14 May 2026 12:20:40 +0000 Subject: [PATCH 04/22] change username for iot_access --- udmis/etc/local_pod.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/udmis/etc/local_pod.json b/udmis/etc/local_pod.json index 89ab2c5ff8..18df1d57fa 100644 --- a/udmis/etc/local_pod.json +++ b/udmis/etc/local_pod.json @@ -56,8 +56,8 @@ "key_file": "/etc/mosquitto/certs/rsa_private.pem", "auth_provider": { "basic": { - "username": "rocket", - "password": "monkey" + "username": "scrumptious", + "password": "aardvark" } } } From 2a6922a8d2cc15f3d4f2109a8ae66c79c58a9fb9 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Thu, 14 May 2026 13:38:08 +0000 Subject: [PATCH 05/22] fix: improve error logging and context for ImplicitIotAccessProvider operations --- .../access/ImplicitIotAccessProvider.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index eab6f9102b..162a1be3ec 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -371,7 +371,7 @@ private void connectMqttClient() { mqttPipe = new SimpleMqttPipe(endpointConfig); info("Initialized SimpleMqttPipe"); } catch (Exception e) { - error("Failed to initialize SimpleMqttPipe: " + friendlyStackTrace(e)); + error("Failed to initialize SimpleMqttPipe connecting to broker %s:%s: %s", brokerHost, brokerPort, friendlyStackTrace(e)); } } @@ -498,6 +498,7 @@ public CloudModel modelDevice(String registryId, String deviceId, CloudModel clo } return getReply(registryId, deviceId, cloudModel, deleteNumId); } catch (Exception e) { + error("Error during modelDevice %s for %s/%s: %s", operation, registryId, deviceId, friendlyStackTrace(e)); throw new RuntimeException(format("While %sing %s/%s", operation, registryId, deviceId), e); } } @@ -527,11 +528,16 @@ public void saveState(String registryId, String deviceId, String stateBlob) { @Override public void sendCommandBase(Envelope baseEnvelope, SubFolder folder, String message) { - Envelope envelope = deepCopy(baseEnvelope); - envelope.subFolder = folder; - envelope.subType = SubType.COMMANDS; - envelope.source = IotProvider.IMPLICIT.value(); - reflect.getDispatcher().withEnvelope(envelope).publish(asMap(message)); + try { + Envelope envelope = deepCopy(baseEnvelope); + envelope.subFolder = folder; + envelope.subType = SubType.COMMANDS; + envelope.source = IotProvider.IMPLICIT.value(); + reflect.getDispatcher().withEnvelope(envelope).publish(asMap(message)); + } catch (Exception e) { + error("Failed to send command for %s/%s: %s", baseEnvelope.deviceRegistryId, baseEnvelope.deviceId, friendlyStackTrace(e)); + throw new RuntimeException("While sending command for " + baseEnvelope.deviceRegistryId + "/" + baseEnvelope.deviceId, e); + } } @Override From 7b8c4ea225fc232dad1077f11aca38c70369745d Mon Sep 17 00:00:00 2001 From: elsaidi Date: Thu, 14 May 2026 14:39:32 +0000 Subject: [PATCH 06/22] fix: grant service role to Mosquitto admin user to allow direct MQTT config publishing --- bin/start_mosquitto | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/start_mosquitto b/bin/start_mosquitto index d7c60b9677..e611ad5dad 100755 --- a/bin/start_mosquitto +++ b/bin/start_mosquitto @@ -73,6 +73,7 @@ fi $MOSQUITTO_CTRL createRole service $MOSQUITTO_CTRL addRoleACL service subscribePattern '/r/+/d/+/#' allow $MOSQUITTO_CTRL addRoleACL service publishClientSend '/r/+/d/+/#' allow +$MOSQUITTO_CTRL addClientRole $AUTH_USER service clients=$($MOSQUITTO_CTRL listClients) if [[ $clients =~ ${AUTH_USER} ]]; then From a012228ce0e9ef75b9423cc38a861fbf1e1a8fea Mon Sep 17 00:00:00 2001 From: elsaidi Date: Thu, 14 May 2026 15:29:49 +0000 Subject: [PATCH 07/22] checkstyle --- .../service/access/ImplicitIotAccessProvider.java | 12 ++++++++---- .../bos/udmi/service/support/MosquittoBroker.java | 10 +++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 162a1be3ec..4b40887f65 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -371,7 +371,8 @@ private void connectMqttClient() { mqttPipe = new SimpleMqttPipe(endpointConfig); info("Initialized SimpleMqttPipe"); } catch (Exception e) { - error("Failed to initialize SimpleMqttPipe connecting to broker %s:%s: %s", brokerHost, brokerPort, friendlyStackTrace(e)); + error("Failed to initialize SimpleMqttPipe connecting to broker %s:%s: %s", + brokerHost, brokerPort, friendlyStackTrace(e)); } } @@ -498,7 +499,8 @@ public CloudModel modelDevice(String registryId, String deviceId, CloudModel clo } return getReply(registryId, deviceId, cloudModel, deleteNumId); } catch (Exception e) { - error("Error during modelDevice %s for %s/%s: %s", operation, registryId, deviceId, friendlyStackTrace(e)); + error("Error during modelDevice %s for %s/%s: %s", + operation, registryId, deviceId, friendlyStackTrace(e)); throw new RuntimeException(format("While %sing %s/%s", operation, registryId, deviceId), e); } } @@ -535,8 +537,10 @@ public void sendCommandBase(Envelope baseEnvelope, SubFolder folder, String mess envelope.source = IotProvider.IMPLICIT.value(); reflect.getDispatcher().withEnvelope(envelope).publish(asMap(message)); } catch (Exception e) { - error("Failed to send command for %s/%s: %s", baseEnvelope.deviceRegistryId, baseEnvelope.deviceId, friendlyStackTrace(e)); - throw new RuntimeException("While sending command for " + baseEnvelope.deviceRegistryId + "/" + baseEnvelope.deviceId, e); + error("Failed to send command for %s/%s: %s", + baseEnvelope.deviceRegistryId, baseEnvelope.deviceId, friendlyStackTrace(e)); + throw new RuntimeException("While sending command for " + + baseEnvelope.deviceRegistryId + "/" + baseEnvelope.deviceId, e); } } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index 2d4da51ef5..3ec4e29605 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -4,8 +4,6 @@ import static com.google.udmi.util.GeneralUtils.catchToElse; import static com.google.udmi.util.GeneralUtils.catchToNull; import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; -import static com.google.udmi.util.GeneralUtils.ifNotNullThen; -import static java.lang.String.format; import static java.util.Optional.ofNullable; import com.google.bos.udmi.service.pod.ContainerBase; @@ -22,14 +20,12 @@ import udmi.schema.EndpointConfiguration; /** - * NOTE: The topic structure used in ACLs in this file is hardcoded to use the deviceId + * Provider that links directly to a mosquitto broker. + * + *

NOTE: The topic structure used in ACLs in this file is hardcoded to use the deviceId * directly (e.g., deviceId/config, deviceId/commands, etc.) instead of using properties * from the endpoint configuration. */ - -/** - * Provider that links directly to a mosquitto broker. - */ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { From 63a50707b748126397e7405b44de8a2e5f82c87b Mon Sep 17 00:00:00 2001 From: elsaidi Date: Thu, 14 May 2026 15:38:16 +0000 Subject: [PATCH 08/22] quality --- .../access/ImplicitIotAccessProvider.java | 1 + .../service/support/ConnectionBroker.java | 2 ++ .../udmi/service/support/MosquittoBroker.java | 32 ++++++++++++++++--- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 4b40887f65..4682551a25 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -554,6 +554,7 @@ public void shutdown() { } }); connLogger.cancel(true); + broker.shutdown(); super.shutdown(); } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java index fb75ea41e4..ffacb8f4a9 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java @@ -17,6 +17,8 @@ public interface ConnectionBroker { void unbindGateway(String gatewayId, String deviceId); + void shutdown(); + /** * Simple event for connection broker happenings. */ diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index 3ec4e29605..72edb71099 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -38,6 +38,7 @@ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { private static final Pattern PUBACK_MATCHER = Pattern.compile("\\((m|Mid: )([0-9]+), \\S+\\)"); private final ContainerBase container; private final EndpointConfiguration endpointConfig; + private Process tailProcess; public MosquittoBroker(ContainerBase container, EndpointConfiguration endpointConfig) { this.container = container; @@ -110,10 +111,21 @@ private void executeCommand(List cmd) { try { info("Executing command %s", String.join(" ", cmd)); ProcessBuilder pb = new ProcessBuilder(cmd); + pb.redirectErrorStream(true); // Merge stderr into stdout Process exec = pb.start(); - exec.waitFor(EXEC_TIMEOUT_SEC, TimeUnit.SECONDS); - exec.errorReader().lines().forEach(container::info); - exec.inputReader().lines().forEach(container::info); + + // Read output asynchronously to prevent buffer locks + CompletableFuture outputHandler = CompletableFuture.runAsync(() -> + exec.inputReader().lines().forEach(container::info) + ); + + // Enforce timeout + if (!exec.waitFor(EXEC_TIMEOUT_SEC, TimeUnit.SECONDS)) { + exec.destroyForcibly(); + throw new RuntimeException("Command timed out: " + String.join(" ", cmd)); + } + + outputHandler.join(); int exitValue = exec.exitValue(); checkState(exitValue == 0, "exit return code " + exitValue); } catch (Exception e) { @@ -217,7 +229,11 @@ private void mosquctlLog(String clientPrefix, Consumer eventConsume try { info("Starting log consumer for prefix %s", clientPrefix); ProcessBuilder pb = new ProcessBuilder("tail", "-f", "/var/log/mosquitto/mosquitto.log"); - Process exec = pb.start(); + if (tailProcess != null) { + tailProcess.destroy(); + } + tailProcess = pb.start(); + Process exec = tailProcess; consumeStream(exec.errorReader(), line -> warn("log error: " + line)); consumeStream(exec.inputReader(), line -> { BrokerEvent event = parseLogLine(line); @@ -330,4 +346,12 @@ private void removeRoleAcl(String roleName, String type, String pattern) { warn("Ignore error removing role ACL: " + e.getMessage()); } } + + @Override + public void shutdown() { + if (tailProcess != null) { + tailProcess.destroy(); + } + super.shutdown(); + } } From 7ee27a236f753cca3e6bd8986f35a94483dd005b Mon Sep 17 00:00:00 2001 From: elsaidi Date: Thu, 14 May 2026 15:41:19 +0000 Subject: [PATCH 09/22] gate logging because an alterntaive solution is needed for a remote broker --- .../access/ImplicitIotAccessProvider.java | 5 +++- .../udmi/service/support/MosquittoBroker.java | 28 ++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 4682551a25..4fb26a8e87 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -76,11 +76,13 @@ *

  • use_password: Sets the password for all devices to the specified value. * This is used when authentication is handled by an external proxy, and Mosquitto * still needs to enforce ACLs based on username.
  • + *
  • disable_logging: If set to true, disables tailing the mosquitto log file.
  • * */ public class ImplicitIotAccessProvider extends IotAccessBase { private static final String CONFIG_VER_KEY = "config_ver"; + private static final String DISABLE_LOGGING_KEY = "disable_logging"; private static final String USE_PASSWORD_KEY = "use_password"; private static final String BROKER_USER_KEY = "broker_user"; private static final String BROKER_PASS_KEY = "broker_pass"; @@ -159,7 +161,8 @@ public ImplicitIotAccessProvider(IotAccess iotAccess) { } } - broker = new MosquittoBroker(this, endpointConfig); + boolean disableLogging = TRUE_OPTION.equals(options.get(DISABLE_LOGGING_KEY)); + broker = new MosquittoBroker(this, endpointConfig, disableLogging); connLogger = broker.addEventListener(CLIENT_PREFIX, this::brokerHandler); } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index 72edb71099..7dbfea345e 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -8,6 +8,7 @@ import com.google.bos.udmi.service.pod.ContainerBase; import java.io.BufferedReader; +import java.io.File; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -31,6 +32,7 @@ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { private static final long EXEC_TIMEOUT_SEC = 10; private static final String REVOKE_PASSWORD = "--"; + private static final String MOSQUITTO_LOG_PATH = "/var/log/mosquitto/mosquitto.log"; private static final Pattern LOG_MATCHER = Pattern.compile("([0-9]+): (\\S+) (\\S+) (\\S+) (\\S+) (.*)"); private static final Pattern PUBLISH_MATCHER = @@ -38,11 +40,31 @@ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { private static final Pattern PUBACK_MATCHER = Pattern.compile("\\((m|Mid: )([0-9]+), \\S+\\)"); private final ContainerBase container; private final EndpointConfiguration endpointConfig; + private final boolean disableLogging; private Process tailProcess; + /** + * Create a new broker connection provider. + */ public MosquittoBroker(ContainerBase container, EndpointConfiguration endpointConfig) { + this(container, endpointConfig, false); + } + + /** + * Create a new broker connection provider with logging controls. + */ + public MosquittoBroker(ContainerBase container, EndpointConfiguration endpointConfig, + boolean disableLogging) { this.container = container; this.endpointConfig = endpointConfig; + this.disableLogging = disableLogging; + if (!disableLogging) { + File logFile = new File(MOSQUITTO_LOG_PATH); + if (!logFile.canRead()) { + throw new RuntimeException( + "Mosquitto log file is not readable: " + logFile.getAbsolutePath()); + } + } } private List buildCommandPrefix() { @@ -225,10 +247,14 @@ private void addClientRole(String clientUser, String roleName) { } private void mosquctlLog(String clientPrefix, Consumer eventConsumer) { + if (disableLogging) { + info("Mosquitto logging disabled, skipping log consumer for prefix %s", clientPrefix); + return; + } synchronized (MosquittoBroker.class) { try { info("Starting log consumer for prefix %s", clientPrefix); - ProcessBuilder pb = new ProcessBuilder("tail", "-f", "/var/log/mosquitto/mosquitto.log"); + ProcessBuilder pb = new ProcessBuilder("tail", "-f", MOSQUITTO_LOG_PATH); if (tailProcess != null) { tailProcess.destroy(); } From 870efe246c5113f490b448abac017d0b6991f401 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Fri, 15 May 2026 10:52:18 +0000 Subject: [PATCH 10/22] fix bad transport and revert bambi hanges --- misc/bambi/presentation.gld.json | 42 -------------- .../udmi/service/support/MosquittoBroker.java | 57 +++++++++++++------ 2 files changed, 41 insertions(+), 58 deletions(-) diff --git a/misc/bambi/presentation.gld.json b/misc/bambi/presentation.gld.json index f960d84fa6..3fbe159045 100644 --- a/misc/bambi/presentation.gld.json +++ b/misc/bambi/presentation.gld.json @@ -440,27 +440,6 @@ "section": "cloud_iot_config", "type": "string" }, - "reflector_endpoint.ca_file": { - "display": "show", - "style": "bold", - "section": "cloud_iot_config", - "type": "string", - "description": "Full path to the CA certificate file" - }, - "reflector_endpoint.cert_file": { - "display": "show", - "style": "bold", - "section": "cloud_iot_config", - "type": "string", - "description": "Full path to the client certificate file" - }, - "reflector_endpoint.key_file": { - "display": "show", - "style": "bold", - "section": "cloud_iot_config", - "type": "string", - "description": "Full path to the client private key file" - }, "reflector_endpoint.generation": { "display": "show", "style": "bold", @@ -613,27 +592,6 @@ "section": "cloud_iot_config", "type": "string" }, - "device_endpoint.ca_file": { - "display": "show", - "style": "bold", - "section": "cloud_iot_config", - "type": "string", - "description": "Full path to the CA certificate file" - }, - "device_endpoint.cert_file": { - "display": "show", - "style": "bold", - "section": "cloud_iot_config", - "type": "string", - "description": "Full path to the client certificate file" - }, - "device_endpoint.key_file": { - "display": "show", - "style": "bold", - "section": "cloud_iot_config", - "type": "string", - "description": "Full path to the client private key file" - }, "device_endpoint.generation": { "display": "show", "style": "bold", diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index 7dbfea345e..05e900338d 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -19,6 +19,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import udmi.schema.EndpointConfiguration; +import udmi.schema.EndpointConfiguration.Transport; /** * Provider that links directly to a mosquitto broker. @@ -33,6 +34,9 @@ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { private static final long EXEC_TIMEOUT_SEC = 10; private static final String REVOKE_PASSWORD = "--"; private static final String MOSQUITTO_LOG_PATH = "/var/log/mosquitto/mosquitto.log"; + private static final String DEFAULT_CA_FILE = "/etc/mosquitto/certs/ca.crt"; + private static final String DEFAULT_CERT_FILE = "/etc/mosquitto/certs/rsa_private.crt"; + private static final String DEFAULT_KEY_FILE = "/etc/mosquitto/certs/rsa_private.pem"; private static final Pattern LOG_MATCHER = Pattern.compile("([0-9]+): (\\S+) (\\S+) (\\S+) (\\S+) (.*)"); private static final Pattern PUBLISH_MATCHER = @@ -90,17 +94,18 @@ private List buildCommandPrefix() { } } - if (endpointConfig.ca_file != null) { + boolean useSsl = endpointConfig.transport == Transport.SSL + || (endpointConfig.port != null && endpointConfig.port == 8883) + || endpointConfig.ca_file != null; + + if (useSsl) { cmd.add("--cafile"); - cmd.add(endpointConfig.ca_file); - } - if (endpointConfig.cert_file != null) { + cmd.add(ofNullable(endpointConfig.ca_file).orElse(DEFAULT_CA_FILE)); cmd.add("--cert"); - cmd.add(endpointConfig.cert_file); - } - if (endpointConfig.key_file != null) { + cmd.add(ofNullable(endpointConfig.cert_file).orElse(DEFAULT_CERT_FILE)); cmd.add("--key"); - cmd.add(endpointConfig.key_file); + cmd.add(ofNullable(endpointConfig.key_file).orElse(DEFAULT_KEY_FILE)); + cmd.add("--insecure"); } cmd.add("dynsec"); @@ -160,11 +165,17 @@ private void mosquctlClient(String clientId, String clientPass) { String clientUser = clientId; String roleName = "role_" + clientId.replace("/", "_"); - deleteClient(clientUser); - deleteRole(roleName); - - if (!"--".equals(clientPass)) { - createClient(clientUser, clientPass, clientId); + if ("--".equals(clientPass)) { + deleteClient(clientUser); + deleteRole(roleName); + info("Device %s deleted correctly.", clientId); + } else { + try { + createClient(clientUser, clientPass, clientId); + } catch (Exception e) { + warn("Client likely exists, updating password for %s: %s", clientUser, e.getMessage()); + setClientPassword(clientUser, clientPass); + } createRole(roleName); addClientRole(clientUser, roleName); @@ -175,8 +186,18 @@ private void mosquctlClient(String clientId, String clientPass) { addRoleAcl(roleName, "publishClientSend", clientId + "/state", "allow"); info("Device %s registered correctly.", clientId); - } else { - info("Device %s deleted correctly.", clientId); + } + } + + private void setClientPassword(String clientUser, String clientPass) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("setClientPassword"); + cmd.add(clientUser); + cmd.add(clientPass); + try { + executeCommand(cmd); + } catch (Exception e) { + warn("Ignore error setting client password: " + e.getMessage()); } } @@ -187,7 +208,11 @@ private void addRoleAcl(String roleName, String type, String pattern, String all cmd.add(type); cmd.add(pattern); cmd.add(allow); - executeCommand(cmd); + try { + executeCommand(cmd); + } catch (Exception e) { + warn("Ignore error adding role ACL: " + e.getMessage()); + } } private void deleteClient(String clientUser) { From 8c584231b3838b08b0bf813b949ecaf5696016e4 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Fri, 15 May 2026 14:20:18 +0000 Subject: [PATCH 11/22] refactor: wrap source processing logic in a null check for PubSub message handling --- .../com/google/udmi/util/PubSubReflector.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/validator/src/main/java/com/google/udmi/util/PubSubReflector.java b/validator/src/main/java/com/google/udmi/util/PubSubReflector.java index 72993fe934..b6fda173e1 100644 --- a/validator/src/main/java/com/google/udmi/util/PubSubReflector.java +++ b/validator/src/main/java/com/google/udmi/util/PubSubReflector.java @@ -351,19 +351,17 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { String topic = format("/devices/%s/%s%s", attributes.get(DEVICE_ID_KEY), attributes.get(CATEGORY_PROPERTY_KEY), suffix); String messageSource = attributes.remove(SOURCE_KEY); - if (messageSource == null) { - return; - } - - Object dstSource = messageBundle.message.remove(SOURCE_KEY); - String[] source = messageSource.split(SOURCE_SEPARATOR_REGEX, 3); - if (source.length == 1) { - attributes.put(SOURCE_KEY, source[0]); - } else if (source.length == 2 && source[0].isEmpty()) { - topic += messageSource; - ifNotNullThen(dstSource, () -> messageBundle.message.put(SOURCE_KEY, source[1])); - } else { - System.err.println("Discarding message with malformed source: " + messageSource); + if (messageSource != null) { + Object dstSource = messageBundle.message.remove(SOURCE_KEY); + String[] source = messageSource.split(SOURCE_SEPARATOR_REGEX, 3); + if (source.length == 1) { + attributes.put(SOURCE_KEY, source[0]); + } else if (source.length == 2 && source[0].isEmpty()) { + topic += messageSource; + ifNotNullThen(dstSource, () -> messageBundle.message.put(SOURCE_KEY, source[1])); + } else { + System.err.println("Discarding message with malformed source: " + messageSource); + } } ofNullable(messageHandlers.get(deviceRegistryId)).orElse(defaultMessageHandler) .accept(topic, stringify(messageBundle.message)); From 07a10a73001d840b0d6ffc553e72c5784a3535ee Mon Sep 17 00:00:00 2001 From: elsaidi Date: Fri, 15 May 2026 15:21:44 +0000 Subject: [PATCH 12/22] unify MQTT topic parsing logic to support both implicit and legacy formats in SimpleMqttPipe and fix bad message reflection --- .../service/bridge/MqttToPubSubBridge.java | 35 ++------ .../messaging/impl/SimpleMqttPipe.java | 86 +++++++++++++------ .../bridge/MqttToPubSubBridgeTest.java | 38 +++++++- .../messaging/impl/SimpleMqttPipeTest.java | 12 +++ 4 files changed, 116 insertions(+), 55 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java index 7f0a82539b..f25bb4bc6a 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java @@ -3,8 +3,8 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.api.core.ApiFuture; +import com.google.bos.udmi.service.messaging.impl.SimpleMqttPipe; import com.google.cloud.pubsub.v1.Publisher; -import com.google.common.base.Splitter; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; @@ -25,8 +25,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; @@ -68,7 +66,6 @@ */ public final class MqttToPubSubBridge { - private static final Pattern TOPIC_PATTERN = Pattern.compile("/r/([^/]+)/d/([^/]+)/?(.*)"); private static final Logger logger = LoggerFactory.getLogger(MqttToPubSubBridge.class); /** @@ -227,30 +224,16 @@ public void messageArrived(String topic, MqttMessage message) { logger.info( "MQTT Message Received - Topic: {}, Payload Length: {}", topic, payload.length); - Matcher matcher = TOPIC_PATTERN.matcher(topic); - String registryId = "unknown"; - String deviceId = "unknown"; - String topicSuffix = ""; - if (matcher.matches()) { - registryId = matcher.group(1); - deviceId = matcher.group(2); - topicSuffix = matcher.group(3); - } else { - logger.warn("Could not parse registry/device from topic: {}", topic); + Map attributes; + try { + attributes = new HashMap<>(SimpleMqttPipe.parseEnvelopeTopic(topic)); + } catch (Exception e) { + logger.warn("Could not parse envelope from topic: {}", topic, e); + attributes = new HashMap<>(); } - - // Prepare Pub/Sub message - Map attributes = new HashMap<>(); attributes.put("mqttTopic", topic); - attributes.put("deviceId", deviceId); - attributes.put("deviceRegistryId", registryId); - - if (topicSuffix != null && topicSuffix.startsWith("events/")) { - List parts = Splitter.on('/').splitToList(topicSuffix); - if (parts.size() >= 2) { - attributes.put("subFolder", parts.get(1)); - } - } + attributes.putIfAbsent("deviceId", "unknown"); + attributes.putIfAbsent("deviceRegistryId", "unknown"); ByteString data = ByteString.copyFrom(payload); PubsubMessage.Builder pubsubMessageBuilder = diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java index 5aa7c59b9e..b85a4c6073 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java @@ -56,6 +56,8 @@ public class SimpleMqttPipe extends MessageBase { private static final String BROKER_URL_FORMAT = "%s://%s:%s"; private static final long RECONNECT_SEC = 10; private static final int DEFAULT_PORT = 8883; + private static final String LEGACY_TOPIC_PREFIX = "/devices/"; + private static final String IMPLICIT_TOPIC_PREFIX = "/r/"; private static final Envelope EXCEPTION_ENVELOPE = makeExceptionEnvelope(); private static final String SUB_BASE_FORMAT = "/r/+/d/+/%s"; private static final String SSL_SECRETS_DIR = System.getenv("SSL_SECRETS_DIR"); @@ -120,35 +122,65 @@ private static Envelope makeExceptionEnvelope() { return envelope; } - static Map parseEnvelopeTopic(String topic) { + public static boolean isLegacyTopic(String topic) { + return topic != null && topic.startsWith(LEGACY_TOPIC_PREFIX); + } + + private static Map parseLegacyTopic(String topic) { + String cleanTopic = topic.startsWith("/") ? topic.substring(1) : topic; + String[] parts = cleanTopic.split("/"); + Envelope envelope = new Envelope(); + if (parts.length >= 2) { + envelope.deviceId = nullAsNull(parts[1]); + } + if (parts.length >= 3) { + envelope.subType = convertSubType(parts[2]); + } + if (parts.length >= 4) { + envelope.subFolder = convertSubFolder(parts[3]); + } + return toStringMap(envelope); + } + + private static Map parseImplicitTopic(String topic) { + // 0/1/2 /3/4 /5 [/6 [/7 ]] + // /r/REGISTRY/d/DEVICE/TYPE[/FOLDER[/GATEWAY]] + String[] parts = topic.split("/", 12); + if (parts.length < 6 || parts.length > 10) { + throw new RuntimeException("Unexpected topic length: " + topic); + } + Envelope envelope = new Envelope(); + checkState(Strings.isNullOrEmpty(parts[0]), "non-empty prefix"); + checkState("r".equals(parts[1]), "expected registries"); + envelope.deviceRegistryId = nullAsNull(parts[2]); + checkState("d".equals(parts[3]), "expected devices"); + envelope.deviceId = nullAsNull(parts[4]); + int base = parts[5].equals(SEND_CHANNEL_DESIGNATOR) ? 2 : 0; + if (base > 0) { + envelope.source = parts[6]; + } + envelope.subType = convertSubType(parts[base + 5]); + if (parts.length > base + 6) { + envelope.subFolder = convertSubFolder(parts[base + 6]); + } + if (parts.length > base + 7) { + envelope.gatewayId = nullAsNull(parts[base + 7]); + } + if (parts.length > base + 8) { + throw new RuntimeException("Unrecognized extra topic arguments: " + parts[base + 8]); + } + return toStringMap(envelope); + } + + public static Map parseEnvelopeTopic(String topic) { try { - // 0/1/2 /3/4 /5 [/6 [/7 ]] - // /r/REGISTRY/d/DEVICE/TYPE[/FOLDER[/GATEWAY]] - String[] parts = topic.split("/", 12); - if (parts.length < 6 || parts.length > 10) { - throw new RuntimeException("Unexpected topic length: " + topic); - } - Envelope envelope = new Envelope(); - checkState(Strings.isNullOrEmpty(parts[0]), "non-empty prefix"); - checkState("r".equals(parts[1]), "expected registries"); - envelope.deviceRegistryId = nullAsNull(parts[2]); - checkState("d".equals(parts[3]), "expected devices"); - envelope.deviceId = nullAsNull(parts[4]); - int base = parts[5].equals(SEND_CHANNEL_DESIGNATOR) ? 2 : 0; - if (base > 0) { - envelope.source = parts[6]; - } - envelope.subType = convertSubType(parts[base + 5]); - if (parts.length > base + 6) { - envelope.subFolder = convertSubFolder(parts[base + 6]); - } - if (parts.length > base + 7) { - envelope.gatewayId = nullAsNull(parts[base + 7]); - } - if (parts.length > base + 8) { - throw new RuntimeException("Unrecognized extra topic arguments: " + parts[base + 8]); + if (isLegacyTopic(topic)) { + return parseLegacyTopic(topic); + } else if (topic != null && topic.startsWith(IMPLICIT_TOPIC_PREFIX)) { + return parseImplicitTopic(topic); + } else { + throw new IllegalArgumentException("Unrecognized topic structure: " + topic); } - return toStringMap(envelope); } catch (Exception e) { throw new RuntimeException("While parsing envelope topic " + topic, e); } diff --git a/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java b/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java index e950b3034a..2c4da14c5c 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java @@ -56,13 +56,14 @@ void testSetupBridge() throws Exception { assertEquals(testTopic, attributes.get("mqttTopic")); assertEquals("my-device", attributes.get("deviceId")); assertEquals("my-registry", attributes.get("deviceRegistryId")); + assertEquals("events", attributes.get("subType")); } @Test void testSetupBridgeWithSubFolder() throws Exception { IMqttClient mockMqttClient = mock(IMqttClient.class); Publisher mockPublisher = mock(Publisher.class); - String testTopic = "/r/my-registry/d/my-device/events/subfolder_name"; + String testTopic = "/r/my-registry/d/my-device/events/pointset"; String payloadStr = "Hello World"; final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); @@ -86,7 +87,40 @@ void testSetupBridgeWithSubFolder() throws Exception { assertEquals(testTopic, attributes.get("mqttTopic")); assertEquals("my-device", attributes.get("deviceId")); assertEquals("my-registry", attributes.get("deviceRegistryId")); - assertEquals("subfolder_name", attributes.get("subFolder")); + assertEquals("pointset", attributes.get("subFolder")); + assertEquals("events", attributes.get("subType")); + } + + @Test + void testSetupBridgeLegacyTopic() throws Exception { + IMqttClient mockMqttClient = mock(IMqttClient.class); + Publisher mockPublisher = mock(Publisher.class); + String testTopic = "/devices/my-device/events/pointset"; + String payloadStr = "Hello World"; + final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); + + when(mockPublisher.publish(any(PubsubMessage.class))) + .thenReturn(ApiFutures.immediateFuture("msg-123")); + + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); + verify(mockMqttClient).setCallback(callbackCaptor.capture()); + MqttCallback callback = callbackCaptor.getValue(); + + callback.messageArrived(testTopic, mqttMessage); + + ArgumentCaptor pubsubMessageCaptor = + ArgumentCaptor.forClass(PubsubMessage.class); + verify(mockPublisher).publish(pubsubMessageCaptor.capture()); + + PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue(); + Map attributes = pubsubMessage.getAttributesMap(); + assertEquals(testTopic, attributes.get("mqttTopic")); + assertEquals("my-device", attributes.get("deviceId")); + assertEquals("unknown", attributes.get("deviceRegistryId")); + assertEquals("pointset", attributes.get("subFolder")); + assertEquals("events", attributes.get("subType")); } @Test diff --git a/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipeTest.java b/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipeTest.java index effa195e3e..3f714fcd4f 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipeTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipeTest.java @@ -100,5 +100,17 @@ public void staticEnvelopeTopic() { assertEquals("g", channelState.get(GATEWAY_ID_KEY)); assertEquals("control", channelState.get(SOURCE_KEY)); assertEquals(6, channelState.keySet().size()); + + Map legacyBasic = parseEnvelopeTopic("/devices/my-dev/events"); + assertEquals("my-dev", legacyBasic.get(DEVICE_ID_KEY)); + assertEquals("events", legacyBasic.get(SUBTYPE_PROPERTY_KEY)); + assertNull(legacyBasic.get(SUBFOLDER_PROPERTY_KEY)); + assertEquals(2, legacyBasic.keySet().size()); + + Map legacyPointset = parseEnvelopeTopic("/devices/my-dev/events/pointset"); + assertEquals("my-dev", legacyPointset.get(DEVICE_ID_KEY)); + assertEquals("events", legacyPointset.get(SUBTYPE_PROPERTY_KEY)); + assertEquals("pointset", legacyPointset.get(SUBFOLDER_PROPERTY_KEY)); + assertEquals(3, legacyPointset.keySet().size()); } } \ No newline at end of file From 19e5a3fc1d786993dc594483f2a64a8686304e0c Mon Sep 17 00:00:00 2001 From: elsaidi Date: Fri, 15 May 2026 21:13:48 +0000 Subject: [PATCH 13/22] bugs and style --- .../service/bridge/MqttToPubSubBridge.java | 1 - .../messaging/impl/SimpleMqttPipe.java | 30 +++++-- .../udmi/service/support/MosquittoBroker.java | 78 +++++++++++-------- 3 files changed, 68 insertions(+), 41 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java index f25bb4bc6a..3d9de79cb3 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java @@ -22,7 +22,6 @@ import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import javax.net.ssl.KeyManagerFactory; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java index b85a4c6073..8040d12903 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java @@ -127,27 +127,34 @@ public static boolean isLegacyTopic(String topic) { } private static Map parseLegacyTopic(String topic) { + if (topic == null) { + throw new IllegalArgumentException("Topic cannot be null"); + } String cleanTopic = topic.startsWith("/") ? topic.substring(1) : topic; String[] parts = cleanTopic.split("/"); Envelope envelope = new Envelope(); - if (parts.length >= 2) { + if (parts.length >= 2 && !parts[1].isEmpty()) { envelope.deviceId = nullAsNull(parts[1]); } - if (parts.length >= 3) { + if (parts.length >= 3 && !parts[2].isEmpty()) { envelope.subType = convertSubType(parts[2]); } - if (parts.length >= 4) { + if (parts.length >= 4 && !parts[3].isEmpty()) { + // TODO: technically the subfolder is the remainder including all slashes until the end. envelope.subFolder = convertSubFolder(parts[3]); } return toStringMap(envelope); } private static Map parseImplicitTopic(String topic) { + if (topic == null) { + throw new IllegalArgumentException("Topic cannot be null"); + } // 0/1/2 /3/4 /5 [/6 [/7 ]] // /r/REGISTRY/d/DEVICE/TYPE[/FOLDER[/GATEWAY]] String[] parts = topic.split("/", 12); if (parts.length < 6 || parts.length > 10) { - throw new RuntimeException("Unexpected topic length: " + topic); + throw new IllegalArgumentException("Unexpected topic length: " + topic); } Envelope envelope = new Envelope(); checkState(Strings.isNullOrEmpty(parts[0]), "non-empty prefix"); @@ -156,22 +163,31 @@ private static Map parseImplicitTopic(String topic) { checkState("d".equals(parts[3]), "expected devices"); envelope.deviceId = nullAsNull(parts[4]); int base = parts[5].equals(SEND_CHANNEL_DESIGNATOR) ? 2 : 0; + if (parts.length < base + 6) { + throw new IllegalArgumentException("Unexpected topic length for implicit topic: " + topic); + } if (base > 0) { envelope.source = parts[6]; } envelope.subType = convertSubType(parts[base + 5]); - if (parts.length > base + 6) { + if (parts.length > base + 6 && !parts[base + 6].isEmpty()) { envelope.subFolder = convertSubFolder(parts[base + 6]); } - if (parts.length > base + 7) { + if (parts.length > base + 7 && !parts[base + 7].isEmpty()) { envelope.gatewayId = nullAsNull(parts[base + 7]); } if (parts.length > base + 8) { - throw new RuntimeException("Unrecognized extra topic arguments: " + parts[base + 8]); + throw new IllegalArgumentException("Unrecognized extra topic arguments: " + parts[base + 8]); } return toStringMap(envelope); } + /** + * Parse an MQTT envelope topic (either legacy or implicit format) into attributes map. + * + * @param topic The MQTT topic string + * @return Map of envelope attributes + */ public static Map parseEnvelopeTopic(String topic) { try { if (isLegacyTopic(topic)) { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index 05e900338d..ea30dad219 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -33,7 +33,7 @@ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { private static final long EXEC_TIMEOUT_SEC = 10; private static final String REVOKE_PASSWORD = "--"; - private static final String MOSQUITTO_LOG_PATH = "/var/log/mosquitto/mosquitto.log"; + private static final String DEFAULT_MOSQUITTO_LOG_PATH = "/var/log/mosquitto/mosquitto.log"; private static final String DEFAULT_CA_FILE = "/etc/mosquitto/certs/ca.crt"; private static final String DEFAULT_CERT_FILE = "/etc/mosquitto/certs/rsa_private.crt"; private static final String DEFAULT_KEY_FILE = "/etc/mosquitto/certs/rsa_private.pem"; @@ -45,6 +45,7 @@ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { private final ContainerBase container; private final EndpointConfiguration endpointConfig; private final boolean disableLogging; + private final Object tailLock = new Object(); private Process tailProcess; /** @@ -63,7 +64,7 @@ public MosquittoBroker(ContainerBase container, EndpointConfiguration endpointCo this.endpointConfig = endpointConfig; this.disableLogging = disableLogging; if (!disableLogging) { - File logFile = new File(MOSQUITTO_LOG_PATH); + File logFile = new File(getMosquittoLogPath()); if (!logFile.canRead()) { throw new RuntimeException( "Mosquitto log file is not readable: " + logFile.getAbsolutePath()); @@ -71,9 +72,17 @@ public MosquittoBroker(ContainerBase container, EndpointConfiguration endpointCo } } + private String getMosquittoLogPath() { + return ofNullable(System.getenv("MOSQUITTO_LOG_PATH")).orElse(DEFAULT_MOSQUITTO_LOG_PATH); + } + + private String getMosquittoCtrlPath() { + return ofNullable(System.getenv("MOSQUITTO_CTRL_PATH")).orElse("mosquitto_ctrl"); + } + private List buildCommandPrefix() { List cmd = new ArrayList<>(); - cmd.add("mosquitto_ctrl"); + cmd.add(getMosquittoCtrlPath()); if (endpointConfig.hostname != null) { cmd.add("-h"); @@ -100,11 +109,14 @@ private List buildCommandPrefix() { if (useSsl) { cmd.add("--cafile"); - cmd.add(ofNullable(endpointConfig.ca_file).orElse(DEFAULT_CA_FILE)); + cmd.add(ofNullable(endpointConfig.ca_file).orElseGet(() -> + ofNullable(System.getenv("MOSQUITTO_CA_FILE")).orElse(DEFAULT_CA_FILE))); cmd.add("--cert"); - cmd.add(ofNullable(endpointConfig.cert_file).orElse(DEFAULT_CERT_FILE)); + cmd.add(ofNullable(endpointConfig.cert_file).orElseGet(() -> + ofNullable(System.getenv("MOSQUITTO_CERT_FILE")).orElse(DEFAULT_CERT_FILE))); cmd.add("--key"); - cmd.add(ofNullable(endpointConfig.key_file).orElse(DEFAULT_KEY_FILE)); + cmd.add(ofNullable(endpointConfig.key_file).orElseGet(() -> + ofNullable(System.getenv("MOSQUITTO_KEY_FILE")).orElse(DEFAULT_KEY_FILE))); cmd.add("--insecure"); } @@ -149,10 +161,15 @@ private void executeCommand(List cmd) { // Enforce timeout if (!exec.waitFor(EXEC_TIMEOUT_SEC, TimeUnit.SECONDS)) { exec.destroyForcibly(); + outputHandler.cancel(true); throw new RuntimeException("Command timed out: " + String.join(" ", cmd)); } - outputHandler.join(); + try { + outputHandler.join(); + } catch (Exception e) { + warn("Output handler interrupted or cancelled: " + e.getMessage()); + } int exitValue = exec.exitValue(); checkState(exitValue == 0, "exit return code " + exitValue); } catch (Exception e) { @@ -176,7 +193,11 @@ private void mosquctlClient(String clientId, String clientPass) { warn("Client likely exists, updating password for %s: %s", clientUser, e.getMessage()); setClientPassword(clientUser, clientPass); } - createRole(roleName); + try { + createRole(roleName); + } catch (Exception e) { + warn("Ignore error creating role: " + e.getMessage()); + } addClientRole(clientUser, roleName); addRoleAcl(roleName, "subscribePattern", clientId + "/config", "allow"); @@ -194,11 +215,7 @@ private void setClientPassword(String clientUser, String clientPass) { cmd.add("setClientPassword"); cmd.add(clientUser); cmd.add(clientPass); - try { - executeCommand(cmd); - } catch (Exception e) { - warn("Ignore error setting client password: " + e.getMessage()); - } + executeCommand(cmd); } private void addRoleAcl(String roleName, String type, String pattern, String allow) { @@ -208,11 +225,7 @@ private void addRoleAcl(String roleName, String type, String pattern, String all cmd.add(type); cmd.add(pattern); cmd.add(allow); - try { - executeCommand(cmd); - } catch (Exception e) { - warn("Ignore error adding role ACL: " + e.getMessage()); - } + executeCommand(cmd); } private void deleteClient(String clientUser) { @@ -252,11 +265,7 @@ private void createRole(String roleName) { List cmd = new ArrayList<>(buildCommandPrefix()); cmd.add("createRole"); cmd.add(roleName); - try { - executeCommand(cmd); - } catch (Exception e) { - warn("Ignore error creating role: " + e.getMessage()); - } + executeCommand(cmd); } private void addClientRole(String clientUser, String roleName) { @@ -264,11 +273,7 @@ private void addClientRole(String clientUser, String roleName) { cmd.add("addClientRole"); cmd.add(clientUser); cmd.add(roleName); - try { - executeCommand(cmd); - } catch (Exception e) { - warn("Ignore error adding client role: " + e.getMessage()); - } + executeCommand(cmd); } private void mosquctlLog(String clientPrefix, Consumer eventConsumer) { @@ -276,10 +281,10 @@ private void mosquctlLog(String clientPrefix, Consumer eventConsume info("Mosquitto logging disabled, skipping log consumer for prefix %s", clientPrefix); return; } - synchronized (MosquittoBroker.class) { + synchronized (tailLock) { try { info("Starting log consumer for prefix %s", clientPrefix); - ProcessBuilder pb = new ProcessBuilder("tail", "-f", MOSQUITTO_LOG_PATH); + ProcessBuilder pb = new ProcessBuilder("tail", "-f", getMosquittoLogPath()); if (tailProcess != null) { tailProcess.destroy(); } @@ -360,7 +365,11 @@ public void authorize(String clientId, String password) { public void bindGateway(String gatewayId, String deviceId) { String roleName = "role_" + gatewayId.replace("/", "_"); - createRole(roleName); + try { + createRole(roleName); + } catch (Exception e) { + warn("Ignore error creating role: " + e.getMessage()); + } addClientRole(gatewayId, roleName); // add ACLs @@ -400,8 +409,11 @@ private void removeRoleAcl(String roleName, String type, String pattern) { @Override public void shutdown() { - if (tailProcess != null) { - tailProcess.destroy(); + synchronized (tailLock) { + if (tailProcess != null) { + tailProcess.destroy(); + tailProcess = null; + } } super.shutdown(); } From 8e6a28adb6a5e5fea4c11b17f8a43a24ddeca388 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Sat, 16 May 2026 18:50:00 +0000 Subject: [PATCH 14/22] refactor: require explicit SSL certificate configuration for Mosquitto and MqttPipe and update CertManager constructor --- .../com/google/udmi/util/CertManager.java | 20 +++++++++++++++++++ udmis/etc/local_pod.json | 3 +++ .../messaging/impl/SimpleMqttPipe.java | 17 +++++++++++----- .../udmi/service/support/MosquittoBroker.java | 18 ++++++++--------- .../google/daq/mqtt/registrar/Registrar.java | 2 +- 5 files changed, 45 insertions(+), 15 deletions(-) diff --git a/common/src/main/java/com/google/udmi/util/CertManager.java b/common/src/main/java/com/google/udmi/util/CertManager.java index afb34a93e9..d29bbd4405 100644 --- a/common/src/main/java/com/google/udmi/util/CertManager.java +++ b/common/src/main/java/com/google/udmi/util/CertManager.java @@ -88,6 +88,26 @@ public CertManager(File caCrtFile, File clientDir, Transport transport, } } + public CertManager(File caCrtFile, File crtFile, File keyFile, Transport transport, + String passString, Consumer logging) { + this.caCrtFile = caCrtFile; + this.crtFile = crtFile; + this.keyFile = keyFile; + this.transport = transport; + this.caCertificate = null; + this.clientCertificate = null; + this.clientPrivateKey = null; + + if (Transport.SSL.equals(transport)) { + this.password = passString == null ? new char[0] : passString.toCharArray(); + logging.accept("CA cert file: " + caCrtFile); + logging.accept("Device cert file: " + crtFile); + logging.accept("Private key file: " + keyFile); + } else { + this.password = null; + } + } + public CertManager(String caCertificate, String clientCertificate, String clientPrivateKey, Transport transport, String passString) { caCrtFile = null; diff --git a/udmis/etc/local_pod.json b/udmis/etc/local_pod.json index 18df1d57fa..f0ecf17c80 100644 --- a/udmis/etc/local_pod.json +++ b/udmis/etc/local_pod.json @@ -4,6 +4,9 @@ "protocol": "mqtt", "transport": "ssl", "hostname": "localhost", + "ca_file": "/etc/mosquitto/certs/ca.crt", + "cert_file": "/etc/mosquitto/certs/rsa_private.crt", + "key_file": "/etc/mosquitto/certs/rsa_private.pem", "auth_provider": { "basic": { "username": "rocket", diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java index 8040d12903..e7b576da9d 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java @@ -60,7 +60,6 @@ public class SimpleMqttPipe extends MessageBase { private static final String IMPLICIT_TOPIC_PREFIX = "/r/"; private static final Envelope EXCEPTION_ENVELOPE = makeExceptionEnvelope(); private static final String SUB_BASE_FORMAT = "/r/+/d/+/%s"; - private static final String SSL_SECRETS_DIR = System.getenv("SSL_SECRETS_DIR"); private static final String DEFAULT_NAMESPACE = "default"; private static final long CONNECT_TIMEOUT_SEC = 10; private final String autoId = format("mqtt-%08x", (long) (Math.random() * 0x100000000L)); @@ -91,10 +90,18 @@ public SimpleMqttPipe(EndpointConfiguration config) { (publishMessages && sendId.startsWith(SEND_CHANNEL_PREFIX)) ? ("/" + sendId) : ""; clientId = ofNullable(config.client_id).orElse(autoId); - File secretsDir = ifTrueGet(isNotEmpty(SSL_SECRETS_DIR), () -> new File(SSL_SECRETS_DIR)); - certManager = ifNotNullGet(secretsDir, - secrets -> new CertManager(new File(secrets, CertManager.CA_CERT_FILE), secrets, - endpoint.transport, endpoint.auth_provider.basic.password, this::info)); + boolean useSsl = Transport.SSL.equals(endpoint.transport) + || (endpoint.port != null && endpoint.port == 8883); + if (useSsl) { + checkState(isNotEmpty(endpoint.ca_file), "Missing required ca_file in endpoint configuration for SSL connection"); + checkState(isNotEmpty(endpoint.cert_file), "Missing required cert_file in endpoint configuration for SSL connection"); + checkState(isNotEmpty(endpoint.key_file), "Missing required key_file in endpoint configuration for SSL connection"); + String pass = ifNotNullGet(endpoint.auth_provider, p -> ifNotNullGet(p.basic, b -> b.password)); + certManager = new CertManager(new File(endpoint.ca_file), new File(endpoint.cert_file), + new File(endpoint.key_file), endpoint.transport, pass, this::info); + } else { + certManager = null; + } mqttClient = createMqttClient(); tryConnect(false); scheduledFuture = Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay( diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index ea30dad219..1e7fe79a78 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -34,9 +34,6 @@ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { private static final long EXEC_TIMEOUT_SEC = 10; private static final String REVOKE_PASSWORD = "--"; private static final String DEFAULT_MOSQUITTO_LOG_PATH = "/var/log/mosquitto/mosquitto.log"; - private static final String DEFAULT_CA_FILE = "/etc/mosquitto/certs/ca.crt"; - private static final String DEFAULT_CERT_FILE = "/etc/mosquitto/certs/rsa_private.crt"; - private static final String DEFAULT_KEY_FILE = "/etc/mosquitto/certs/rsa_private.pem"; private static final Pattern LOG_MATCHER = Pattern.compile("([0-9]+): (\\S+) (\\S+) (\\S+) (\\S+) (.*)"); private static final Pattern PUBLISH_MATCHER = @@ -108,15 +105,18 @@ private List buildCommandPrefix() { || endpointConfig.ca_file != null; if (useSsl) { + checkState(endpointConfig.ca_file != null && !endpointConfig.ca_file.isEmpty(), + "Missing required ca_file in endpoint configuration for SSL connection"); + checkState(endpointConfig.cert_file != null && !endpointConfig.cert_file.isEmpty(), + "Missing required cert_file in endpoint configuration for SSL connection"); + checkState(endpointConfig.key_file != null && !endpointConfig.key_file.isEmpty(), + "Missing required key_file in endpoint configuration for SSL connection"); cmd.add("--cafile"); - cmd.add(ofNullable(endpointConfig.ca_file).orElseGet(() -> - ofNullable(System.getenv("MOSQUITTO_CA_FILE")).orElse(DEFAULT_CA_FILE))); + cmd.add(endpointConfig.ca_file); cmd.add("--cert"); - cmd.add(ofNullable(endpointConfig.cert_file).orElseGet(() -> - ofNullable(System.getenv("MOSQUITTO_CERT_FILE")).orElse(DEFAULT_CERT_FILE))); + cmd.add(endpointConfig.cert_file); cmd.add("--key"); - cmd.add(ofNullable(endpointConfig.key_file).orElseGet(() -> - ofNullable(System.getenv("MOSQUITTO_KEY_FILE")).orElse(DEFAULT_KEY_FILE))); + cmd.add(endpointConfig.key_file); cmd.add("--insecure"); } diff --git a/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java b/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java index 4bfd5813be..3424b167a2 100644 --- a/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java +++ b/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java @@ -1293,7 +1293,7 @@ private void bindGatewayDevices(Map localDevices) { }); System.err.printf("Waiting for device binding...%n"); - dynamicTerminate(); + dynamicTerminate(gatewayBindings.size()); Duration between = Duration.between(start, Instant.now()); double seconds = between.getSeconds() + between.getNano() / 1e9; From 13c062237f97c610c945dcaa1c3c05a292202927 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Sat, 16 May 2026 19:11:48 +0000 Subject: [PATCH 15/22] feat: track gateway bound devices using a dedicated collection and fix model retrieval logic --- .../access/ImplicitIotAccessProvider.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 4fb26a8e87..16ecedf552 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -178,6 +178,7 @@ private void bindDevicesToGateway(String registryId, String gatewayId, CloudMode Set deviceIds = ImmutableSet.copyOf(cloudModel.gateway.proxy_ids); deviceIds.forEach(deviceId -> { registryDeviceRef(registryId, deviceId).put(BOUND_TO_KEY, gatewayId); + gatewayBoundRef(registryId, gatewayId).put(deviceId, "bound"); broker.bindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); }); } @@ -187,6 +188,7 @@ private void unbindDevicesFromGateway(String registryId, String gatewayId, Set deviceIds = ImmutableSet.copyOf(cloudModel.gateway.proxy_ids); deviceIds.forEach(deviceId -> { registryDeviceRef(registryId, deviceId).delete(BOUND_TO_KEY); + gatewayBoundRef(registryId, gatewayId).delete(deviceId); broker.unbindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); }); } @@ -236,12 +238,13 @@ private void createDevice(String registryId, String deviceId, CloudModel cloudMo private void deleteDevice(String registryId, String deviceId, CloudModel cloudModel) { DataRef properties = registryDeviceRef(registryId, deviceId); + String gatewayId = properties.get(BOUND_TO_KEY); properties.entries().keySet().forEach(properties::delete); registryDevicesRef(registryId).delete(deviceId); broker.authorize(clientId(registryId, deviceId), null); - String gatewayId = properties.get(BOUND_TO_KEY); if (gatewayId != null) { + gatewayBoundRef(registryId, gatewayId).delete(deviceId); broker.unbindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); } } @@ -275,6 +278,10 @@ private DataRef registryDevicesRef(String registryId) { return database.ref().registry(registryId).collection(DEVICES_ACTIVE); } + private DataRef gatewayBoundRef(String registryId, String gatewayId) { + return database.ref().registry(registryId).device(gatewayId).collection("bound_devices"); + } + private void sendConfigUpdate(String registryId, String deviceId, String config) { if (isPublishEnabled()) { publishMqtt(registryId, deviceId, config); @@ -396,7 +403,6 @@ public Entry fetchConfig(String registryId, String deviceId) { @Override public CloudModel fetchDevice(String registryId, String deviceId) { - touchDeviceEntry(registryId, deviceId); Map properties = registryDeviceRef(registryId, deviceId).entries(); if (properties == null) { return null; @@ -422,9 +428,11 @@ public CloudModel fetchDevice(String registryId, String deviceId) { cloudModel.password = properties.get(AUTH_PASSWORD_PROPERTY); - cloudModel.gateway = new GatewayModel(); - cloudModel.gateway.proxy_ids = - listBoundDevices(registryId, deviceId).keySet().stream().toList(); + if (GATEWAY.toString().equals(properties.get(RESOURCE_TYPE_PROPERTY))) { + cloudModel.gateway = new GatewayModel(); + cloudModel.gateway.proxy_ids = + listBoundDevices(registryId, deviceId).keySet().stream().toList(); + } cloudModel.operation = READ; return cloudModel; } @@ -469,7 +477,7 @@ public CloudModel listDevices(String registryId, Consumer progress) { } private Map listBoundDevices(String registryId, String gatewayId) { - Set deviceIds = registryDevicesRef(registryId).entries().keySet(); + Set deviceIds = gatewayBoundRef(registryId, gatewayId).entries().keySet(); Map devices = deviceIds.stream().filter(deviceId -> { String boundTo = registryDeviceRef(registryId, deviceId).get(BOUND_TO_KEY); return gatewayId.equals(boundTo); From bf4e9c5a92ed7339d2cb9e7d2aaf0918a8734bd7 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Sat, 16 May 2026 21:16:09 +0000 Subject: [PATCH 16/22] perf: optimize device listing in ImplicitIotAccessProvider using parallel batch fetching --- .../access/ImplicitIotAccessProvider.java | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 16ecedf552..6ab67b8d98 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -106,6 +106,7 @@ public class ImplicitIotAccessProvider extends IotAccessBase { private static final String CONFIG_SUFFIX = "/config"; private static final String METADATA_STR_KEY = "metadata_str"; private static final String RESOURCE_TYPE_PROPERTY = "resource_type"; + private static final int DEVICE_FETCH_BATCH_SIZE = 100; private final boolean enabled; private final String usePassword; private final ConnectionBroker broker; @@ -468,14 +469,45 @@ public boolean isEnabled() { @Override public CloudModel listDevices(String registryId, Consumer progress) { Map entries = registryDevicesRef(registryId).entries(); - ifNotNullThen(progress, p -> p.accept(format("Fetched %d devices.", entries.size()))); + List deviceIds = entries.keySet().stream().toList(); + int total = deviceIds.size(); + ifNotNullThen(progress, p -> p.accept(format("Fetching %d devices...", total))); + Map deviceIdsMap = new ConcurrentHashMap<>(); + for (int i = 0; i < total; i += DEVICE_FETCH_BATCH_SIZE) { + List batch = deviceIds.subList(i, Math.min(i + DEVICE_FETCH_BATCH_SIZE, total)); + batch.parallelStream().forEach(id -> { + CloudModel partial = fetchDevicePartial(registryId, id); + if (partial != null) { + deviceIdsMap.put(id, partial); + } + }); + int currentCount = Math.min(i + DEVICE_FETCH_BATCH_SIZE, total); + ifNotNullThen(progress, p -> p.accept(format("Fetched %d devices...", currentCount))); + } CloudModel cloudModel = new CloudModel(); - cloudModel.device_ids = entries.keySet().stream().collect( - Collectors.toMap(id -> id, id -> fetchDevice(registryId, id))); + cloudModel.device_ids = deviceIdsMap; cloudModel.operation = READ; return cloudModel; } + private CloudModel fetchDevicePartial(String registryId, String deviceId) { + Map properties = registryDeviceRef(registryId, deviceId).entries(); + if (properties == null) { + return null; + } + CloudModel cloudModel = new CloudModel(); + cloudModel.num_id = properties.get(NUM_ID_PROPERTY); + String authType = properties.get(AUTH_TYPE_PROPERTY); + if (authType != null) { + cloudModel.auth_type = CloudModel.Auth_type.fromValue(authType); + } + cloudModel.resource_type = ofNullable(properties.get(RESOURCE_TYPE_PROPERTY)) + .map(Resource_type::fromValue).orElse(DIRECT); + cloudModel.blocked = "true".equals(properties.get(BLOCKED_PROPERTY)) ? true : null; + cloudModel.updated_time = JsonUtil.getDate(properties.get(CREATED_AT_PROPERTY)); + return cloudModel; + } + private Map listBoundDevices(String registryId, String gatewayId) { Set deviceIds = gatewayBoundRef(registryId, gatewayId).entries().keySet(); Map devices = deviceIds.stream().filter(deviceId -> { From 4950c50041f3bd7763a1f063fe29c86b13196b20 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Sat, 16 May 2026 21:43:16 +0000 Subject: [PATCH 17/22] do not create client roles for proxy devices --- .../bos/udmi/service/access/ImplicitIotAccessProvider.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 6ab67b8d98..227c3d1968 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -266,7 +266,10 @@ private DataRef mungeDevice(String registryId, String deviceId, Map properties.put(key, value), () -> properties.delete(key))); if (map.containsKey(AUTH_PASSWORD_PROPERTY)) { - broker.authorize(clientId(registryId, deviceId), map.get(AUTH_PASSWORD_PROPERTY)); + boolean isAuthorized = properties.get(AUTH_KEY_PROPERTY) != null + || properties.get(AUTH_TYPE_PROPERTY) != null; + String password = isAuthorized ? map.get(AUTH_PASSWORD_PROPERTY) : null; + broker.authorize(clientId(registryId, deviceId), password); } return properties; } From 8011f38fcaf7d223d220487ed205b38d36dde777 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Sat, 16 May 2026 22:03:51 +0000 Subject: [PATCH 18/22] use etcd transactions to batch requests together --- .../access/ImplicitIotAccessProvider.java | 10 +- .../bos/udmi/service/support/DataRef.java | 3 + .../service/support/EtcdDataProvider.java | 30 ++++ .../access/ImplicitIotAccessProviderTest.java | 161 ++++++++++++++++++ 4 files changed, 202 insertions(+), 2 deletions(-) create mode 100644 udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 227c3d1968..d7108e61ac 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -262,8 +262,14 @@ private CloudModel getReply(String registryId, String deviceId, CloudModel reque private DataRef mungeDevice(String registryId, String deviceId, Map map) { DataRef properties = registryDeviceRef(registryId, deviceId); - map.forEach((key, value) -> - ifNotNullThen(value, v -> properties.put(key, value), () -> properties.delete(key))); + Map puts = map.entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Set deletes = map.entrySet().stream() + .filter(e -> e.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + properties.update(puts, deletes); if (map.containsKey(AUTH_PASSWORD_PROPERTY)) { boolean isAuthorized = properties.get(AUTH_KEY_PROPERTY) != null diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/DataRef.java b/udmis/src/main/java/com/google/bos/udmi/service/support/DataRef.java index 3ee67f6340..9d5b36fcb8 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/DataRef.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/DataRef.java @@ -3,6 +3,7 @@ import static java.lang.String.format; import java.util.Map; +import java.util.Set; /** * Container reference class for a database entry. @@ -43,6 +44,8 @@ public DataRef device(String deviceId) { public abstract void put(String key, String value); + public abstract void update(Map puts, Set deletes); + /** * Add a registry specification. */ diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java index 5d4ff36fb9..4dd0602fd2 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java @@ -17,7 +17,11 @@ import io.etcd.jetcd.cluster.Member; import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.op.Op; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.PutOption; import java.net.URI; +import java.util.ArrayList; import java.time.Duration; import java.time.Instant; import java.util.List; @@ -270,6 +274,32 @@ public AutoCloseable lock() { public void put(String key, String value) { putKey(getKeyPath(key), value); } + + @Override + public void update(Map puts, Set deletes) { + try { + List ops = new ArrayList<>(); + if (puts != null) { + puts.forEach((key, value) -> { + if (value != null) { + ops.add(Op.put(bytes(getKeyPath(key)), bytes(value), PutOption.DEFAULT)); + } + }); + } + if (deletes != null) { + deletes.forEach(key -> { + if (key != null) { + ops.add(Op.delete(bytes(getKeyPath(key)), DeleteOption.DEFAULT)); + } + }); + } + if (!ops.isEmpty()) { + kvClient.txn().Then(ops.toArray(new Op[0])).commit().get(QUERY_TIMEOUT_SEC, TimeUnit.SECONDS); + } + } catch (Exception e) { + throw new RuntimeException("While executing batch update on " + getKeyPath(""), e); + } + } } private class LockCloser implements AutoCloseable { diff --git a/udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java b/udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java new file mode 100644 index 0000000000..ec8e4aec3c --- /dev/null +++ b/udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java @@ -0,0 +1,161 @@ +package com.google.bos.udmi.service.access; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.bos.udmi.service.core.ReflectProcessor; +import com.google.bos.udmi.service.pod.UdmiServicePod; +import com.google.bos.udmi.service.support.ConnectionBroker; +import com.google.bos.udmi.service.support.DataRef; +import com.google.bos.udmi.service.support.IotDataProvider; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import udmi.schema.CloudModel; +import udmi.schema.CloudModel.Auth_type; +import udmi.schema.CloudModel.ModelOperation; +import udmi.schema.Credential; +import udmi.schema.Credential.Key_format; +import udmi.schema.IotAccess; + +class ImplicitIotAccessProviderTest { + + private static final String TEST_REGISTRY = "test-reg"; + private static final String TEST_DEVICE = "test-dev"; + private static final String TEST_PASSWORD = "supersecret"; + private static final String CLIENT_ID = "/r/test-reg/d/test-dev"; + + private Map store; + private ImplicitIotAccessProvider provider; + private ConnectionBroker mockBroker; + + @BeforeEach + void setUp() throws Exception { + UdmiServicePod.resetForTest(); + store = new HashMap<>(); + IotDataProvider mockDatabase = mock(IotDataProvider.class); + when(mockDatabase.ref()).thenAnswer(inv -> new FakeDataRef(store)); + UdmiServicePod.putComponent("database", () -> mockDatabase); + + ReflectProcessor mockReflect = mock(ReflectProcessor.class); + UdmiServicePod.putComponent("reflect", () -> mockReflect); + + IotAccess iotAccess = new IotAccess(); + iotAccess.options = "enable, use_password=" + TEST_PASSWORD + ", disable_logging=true"; + provider = new ImplicitIotAccessProvider(iotAccess); + provider.activate(); + + mockBroker = mock(ConnectionBroker.class); + Field brokerField = ImplicitIotAccessProvider.class.getDeclaredField("broker"); + brokerField.setAccessible(true); + brokerField.set(provider, mockBroker); + } + + @AfterEach + void tearDown() { + if (provider != null) { + provider.shutdown(); + } + UdmiServicePod.resetForTest(); + } + + @Test + void testAuthorizeWithCredentials() { + CloudModel cloudModel = new CloudModel(); + cloudModel.operation = ModelOperation.CREATE; + Credential credential = new Credential(); + credential.key_format = Key_format.RS_256; + credential.key_data = "fake_key_data"; + cloudModel.credentials = List.of(credential); + + provider.modelDevice(TEST_REGISTRY, TEST_DEVICE, cloudModel, null); + + verify(mockBroker).authorize(eq(CLIENT_ID), eq(TEST_PASSWORD)); + } + + @Test + void testAuthorizeWithAuthType() { + CloudModel cloudModel = new CloudModel(); + cloudModel.operation = ModelOperation.CREATE; + cloudModel.auth_type = Auth_type.RS_256; + + provider.modelDevice(TEST_REGISTRY, TEST_DEVICE, cloudModel, null); + + verify(mockBroker).authorize(eq(CLIENT_ID), eq(TEST_PASSWORD)); + } + + @Test + void testDoNotAuthorizeWithoutCredentialsOrAuthType() { + CloudModel cloudModel = new CloudModel(); + cloudModel.operation = ModelOperation.CREATE; + + provider.modelDevice(TEST_REGISTRY, TEST_DEVICE, cloudModel, null); + + verify(mockBroker).authorize(eq(CLIENT_ID), isNull()); + } + + class FakeDataRef extends DataRef { + private final Map data; + + public FakeDataRef(Map data) { + this.data = data; + } + + private String getKeyPath(String key) { + return (registryId != null ? "r/" + registryId : "") + + (deviceId != null ? "/d/" + deviceId : "") + + (collection != null ? "/c/" + collection : "") + + ":" + key; + } + + @Override + public void delete(String key) { + data.remove(getKeyPath(key)); + } + + @Override + public Map entries() { + String prefix = getKeyPath(""); + Map res = new HashMap<>(); + for (Map.Entry entry : data.entrySet()) { + if (entry.getKey().startsWith(prefix)) { + res.put(entry.getKey().substring(prefix.length()), entry.getValue()); + } + } + return res; + } + + @Override + public String get(String key) { + return data.get(getKeyPath(key)); + } + + @Override + public AutoCloseable lock() { + return () -> {}; + } + + @Override + public void put(String key, String value) { + data.put(getKeyPath(key), value); + } + + @Override + public void update(Map puts, Set deletes) { + if (puts != null) { + puts.forEach(this::put); + } + if (deletes != null) { + deletes.forEach(this::delete); + } + } + } +} From 0b5896b48d1cb515283c82911ece1598338e2847 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Sat, 16 May 2026 22:28:18 +0000 Subject: [PATCH 19/22] don't delete roles for proxy devices from mosquitto too (because they dont exist there) --- .../udmi/service/access/ImplicitIotAccessProvider.java | 9 ++++++--- .../service/access/ImplicitIotAccessProviderTest.java | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index d7108e61ac..d5c6e902ce 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -242,7 +242,9 @@ private void deleteDevice(String registryId, String deviceId, CloudModel cloudMo String gatewayId = properties.get(BOUND_TO_KEY); properties.entries().keySet().forEach(properties::delete); registryDevicesRef(registryId).delete(deviceId); - broker.authorize(clientId(registryId, deviceId), null); + if (gatewayId == null) { + broker.authorize(clientId(registryId, deviceId), null); + } if (gatewayId != null) { gatewayBoundRef(registryId, gatewayId).delete(deviceId); @@ -274,8 +276,9 @@ private DataRef mungeDevice(String registryId, String deviceId, Map Date: Sun, 17 May 2026 19:25:28 +0000 Subject: [PATCH 20/22] feat: add progress reporting for gateway bind operations and increase execution threads while cleaning up redundant role logic in MosquittoBroker --- .../access/ImplicitIotAccessProvider.java | 21 +++++++++++++++---- .../service/messaging/impl/MessageBase.java | 2 +- .../udmi/service/support/MosquittoBroker.java | 7 ------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index d5c6e902ce..8a011db7d5 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -50,6 +50,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; import udmi.schema.Auth_provider; @@ -175,9 +176,15 @@ public static String hashedDeviceId(String registryId, String deviceId) { return String.valueOf(Math.abs(Objects.hash(registryId, deviceId))); } - private void bindDevicesToGateway(String registryId, String gatewayId, CloudModel cloudModel) { + private void bindDevicesToGateway(String registryId, String gatewayId, CloudModel cloudModel, Consumer progress) { Set deviceIds = ImmutableSet.copyOf(cloudModel.gateway.proxy_ids); + AtomicInteger count = new AtomicInteger(); + int total = deviceIds.size(); deviceIds.forEach(deviceId -> { + int current = count.incrementAndGet(); + if (current % 50 == 0 && progress != null) { + progress.accept(format("Binding %d/%d devices to %s...", current, total, gatewayId)); + } registryDeviceRef(registryId, deviceId).put(BOUND_TO_KEY, gatewayId); gatewayBoundRef(registryId, gatewayId).put(deviceId, "bound"); broker.bindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); @@ -185,9 +192,15 @@ private void bindDevicesToGateway(String registryId, String gatewayId, CloudMode } private void unbindDevicesFromGateway(String registryId, String gatewayId, - CloudModel cloudModel) { + CloudModel cloudModel, Consumer progress) { Set deviceIds = ImmutableSet.copyOf(cloudModel.gateway.proxy_ids); + AtomicInteger count = new AtomicInteger(); + int total = deviceIds.size(); deviceIds.forEach(deviceId -> { + int current = count.incrementAndGet(); + if (current % 50 == 0 && progress != null) { + progress.accept(format("Unbinding %d/%d devices from %s...", current, total, gatewayId)); + } registryDeviceRef(registryId, deviceId).delete(BOUND_TO_KEY); gatewayBoundRef(registryId, gatewayId).delete(deviceId); broker.unbindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); @@ -547,8 +560,8 @@ public CloudModel modelDevice(String registryId, String deviceId, CloudModel clo case UPDATE -> updateDevice(registryId, deviceId, cloudModel); case DELETE -> deleteDevice(registryId, deviceId, cloudModel); case MODIFY -> modifyDevice(registryId, deviceId, cloudModel); - case BIND -> bindDevicesToGateway(registryId, deviceId, cloudModel); - case UNBIND -> unbindDevicesFromGateway(registryId, deviceId, cloudModel); + case BIND -> bindDevicesToGateway(registryId, deviceId, cloudModel, progress); + case UNBIND -> unbindDevicesFromGateway(registryId, deviceId, cloudModel, progress); case BLOCK -> blockDevice(registryId, deviceId, cloudModel); default -> throw new RuntimeException("Unknown device operation " + operation); } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java index 9c64c318ea..13d7b82360 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java @@ -62,7 +62,7 @@ public abstract class MessageBase extends ContainerBase implements MessagePipe { public static final String INVALID_ENVELOPE_KEY = "invalid"; - public static final int EXECUTION_THREADS = 4; + public static final int EXECUTION_THREADS = 32; public static final String ERROR_MESSAGE_MARKER = "error-mark"; public static final String PUBLISH_STATS = "publish"; public static final String RECEIVE_STATS = "receive"; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index 1e7fe79a78..a6ea4aaf0c 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -365,13 +365,6 @@ public void authorize(String clientId, String password) { public void bindGateway(String gatewayId, String deviceId) { String roleName = "role_" + gatewayId.replace("/", "_"); - try { - createRole(roleName); - } catch (Exception e) { - warn("Ignore error creating role: " + e.getMessage()); - } - addClientRole(gatewayId, roleName); - // add ACLs addRoleAcl(roleName, "subscribePattern", deviceId + "/config", "allow"); addRoleAcl(roleName, "subscribePattern", deviceId + "/commands", "allow"); From f0ab84edb0cc71080634b8a7d5549250c7db4d98 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Sun, 17 May 2026 20:30:27 +0000 Subject: [PATCH 21/22] : increase message processing capacity and concurrency while removing synchronization in mosquitto commabd execution --- .../service/messaging/impl/MessageBase.java | 4 +- .../udmi/service/support/MosquittoBroker.java | 50 +++++++++---------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java index 13d7b82360..1cdfa3e03e 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java @@ -62,7 +62,7 @@ public abstract class MessageBase extends ContainerBase implements MessagePipe { public static final String INVALID_ENVELOPE_KEY = "invalid"; - public static final int EXECUTION_THREADS = 32; + public static final int EXECUTION_THREADS = 100; public static final String ERROR_MESSAGE_MARKER = "error-mark"; public static final String PUBLISH_STATS = "publish"; public static final String RECEIVE_STATS = "receive"; @@ -73,7 +73,7 @@ public abstract class MessageBase extends ContainerBase implements MessagePipe { private static final Set HANDLED_QUEUES = new HashSet<>(); private static final long DEFAULT_POLL_TIME_SEC = 1; private static final long AWAIT_TERMINATION_SEC = 10; - private static final int DEFAULT_CAPACITY = 1000; + private static final int DEFAULT_CAPACITY = 2000; protected final int queueCapacity; protected final long publishDelaySec; private final ExecutorService executor = Executors.newFixedThreadPool(EXECUTION_THREADS); diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index a6ea4aaf0c..2844eda431 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -146,35 +146,33 @@ private void consumeStream(BufferedReader reader, Consumer consumer) { } private void executeCommand(List cmd) { - synchronized (MosquittoBroker.class) { - try { - info("Executing command %s", String.join(" ", cmd)); - ProcessBuilder pb = new ProcessBuilder(cmd); - pb.redirectErrorStream(true); // Merge stderr into stdout - Process exec = pb.start(); - - // Read output asynchronously to prevent buffer locks - CompletableFuture outputHandler = CompletableFuture.runAsync(() -> - exec.inputReader().lines().forEach(container::info) - ); - - // Enforce timeout - if (!exec.waitFor(EXEC_TIMEOUT_SEC, TimeUnit.SECONDS)) { - exec.destroyForcibly(); - outputHandler.cancel(true); - throw new RuntimeException("Command timed out: " + String.join(" ", cmd)); - } + try { + info("Executing command %s", String.join(" ", cmd)); + ProcessBuilder pb = new ProcessBuilder(cmd); + pb.redirectErrorStream(true); // Merge stderr into stdout + Process exec = pb.start(); + + // Read output asynchronously to prevent buffer locks + CompletableFuture outputHandler = CompletableFuture.runAsync(() -> + exec.inputReader().lines().forEach(container::info) + ); + + // Enforce timeout + if (!exec.waitFor(EXEC_TIMEOUT_SEC, TimeUnit.SECONDS)) { + exec.destroyForcibly(); + outputHandler.cancel(true); + throw new RuntimeException("Command timed out: " + String.join(" ", cmd)); + } - try { - outputHandler.join(); - } catch (Exception e) { - warn("Output handler interrupted or cancelled: " + e.getMessage()); - } - int exitValue = exec.exitValue(); - checkState(exitValue == 0, "exit return code " + exitValue); + try { + outputHandler.join(); } catch (Exception e) { - throw new RuntimeException("While executing " + String.join(" ", cmd), e); + warn("Output handler interrupted or cancelled: " + e.getMessage()); } + int exitValue = exec.exitValue(); + checkState(exitValue == 0, "exit return code " + exitValue); + } catch (Exception e) { + throw new RuntimeException("While executing " + String.join(" ", cmd), e); } } From 0ddc3b05a0c143e5504e0335da303169b2ad3d61 Mon Sep 17 00:00:00 2001 From: elsaidi Date: Mon, 18 May 2026 14:25:33 +0000 Subject: [PATCH 22/22] STYLE --- .../service/access/ImplicitIotAccessProvider.java | 3 ++- .../udmi/service/messaging/impl/SimpleMqttPipe.java | 12 ++++++++---- .../bos/udmi/service/support/EtcdDataProvider.java | 7 ++++--- .../access/ImplicitIotAccessProviderTest.java | 10 +++++----- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 8a011db7d5..c0bbc04f27 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -176,7 +176,8 @@ public static String hashedDeviceId(String registryId, String deviceId) { return String.valueOf(Math.abs(Objects.hash(registryId, deviceId))); } - private void bindDevicesToGateway(String registryId, String gatewayId, CloudModel cloudModel, Consumer progress) { + private void bindDevicesToGateway( + String registryId, String gatewayId, CloudModel cloudModel, Consumer progress) { Set deviceIds = ImmutableSet.copyOf(cloudModel.gateway.proxy_ids); AtomicInteger count = new AtomicInteger(); int total = deviceIds.size(); diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java index e7b576da9d..9b8cd79ff8 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java @@ -93,10 +93,14 @@ public SimpleMqttPipe(EndpointConfiguration config) { boolean useSsl = Transport.SSL.equals(endpoint.transport) || (endpoint.port != null && endpoint.port == 8883); if (useSsl) { - checkState(isNotEmpty(endpoint.ca_file), "Missing required ca_file in endpoint configuration for SSL connection"); - checkState(isNotEmpty(endpoint.cert_file), "Missing required cert_file in endpoint configuration for SSL connection"); - checkState(isNotEmpty(endpoint.key_file), "Missing required key_file in endpoint configuration for SSL connection"); - String pass = ifNotNullGet(endpoint.auth_provider, p -> ifNotNullGet(p.basic, b -> b.password)); + checkState(isNotEmpty(endpoint.ca_file), + "Missing required ca_file in endpoint configuration for SSL connection"); + checkState(isNotEmpty(endpoint.cert_file), + "Missing required cert_file in endpoint configuration for SSL connection"); + checkState(isNotEmpty(endpoint.key_file), + "Missing required key_file in endpoint configuration for SSL connection"); + String pass = ifNotNullGet(endpoint.auth_provider, + p -> ifNotNullGet(p.basic, b -> b.password)); certManager = new CertManager(new File(endpoint.ca_file), new File(endpoint.cert_file), new File(endpoint.key_file), endpoint.transport, pass, this::info); } else { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java index 4dd0602fd2..79e6c0885a 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java @@ -16,14 +16,14 @@ import io.etcd.jetcd.Lock; import io.etcd.jetcd.cluster.Member; import io.etcd.jetcd.kv.GetResponse; -import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.op.Op; import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.PutOption; import java.net.URI; -import java.util.ArrayList; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -294,7 +294,8 @@ public void update(Map puts, Set deletes) { }); } if (!ops.isEmpty()) { - kvClient.txn().Then(ops.toArray(new Op[0])).commit().get(QUERY_TIMEOUT_SEC, TimeUnit.SECONDS); + kvClient.txn().Then(ops.toArray(new Op[0])).commit() + .get(QUERY_TIMEOUT_SEC, TimeUnit.SECONDS); } } catch (Exception e) { throw new RuntimeException("While executing batch update on " + getKeyPath(""), e); diff --git a/udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java b/udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java index a597750aaa..597cb639d9 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java @@ -1,7 +1,6 @@ package com.google.bos.udmi.service.access; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -111,10 +110,11 @@ public FakeDataRef(Map data) { } private String getKeyPath(String key) { - return (registryId != null ? "r/" + registryId : "") + - (deviceId != null ? "/d/" + deviceId : "") + - (collection != null ? "/c/" + collection : "") + - ":" + key; + return (registryId != null ? "r/" + registryId : "") + + (deviceId != null ? "/d/" + deviceId : "") + + (collection != null ? "/c/" + collection : "") + + ":" + + key; } @Override