diff --git a/.gencode_hash.txt b/.gencode_hash.txt index d22e124ce3..c48e946626 100644 --- a/.gencode_hash.txt +++ b/.gencode_hash.txt @@ -2,10 +2,10 @@ c21266e8de23b6e8acfac16a177db782375bf71df470c343c3f8eb8b822ef56a gencode/docs/commands_discovery.html 7bd462fd9b31a1a35b27f069194051161c484600161d7ef3fa7a4f8efa4e65fb gencode/docs/commands_mapping.html a65e8177ca59cd51c4a8ff63ecaa194897f7e22b82afb14708d63efbd7b96a84 gencode/docs/config.html -0bb798c8db6be37266946569a096e2f0b7520ee54d65e7e8d37675ef17820f14 gencode/docs/configuration_endpoint.html -7caf6f29bff7c358536444c0d8502169f708bcc6f32a3d0a8aae155708b2cd04 gencode/docs/configuration_execution.html -640ad5670f31f7c32f3e8d29e8f4f54b1f95c121521e1b6efc2fb943814d6918 gencode/docs/configuration_pod.html -b34c136cee32cb88f32a427ff400c3898ed49168f6dcaca1bc9ba65365bc5ae4 gencode/docs/configuration_pubber.html +4d2edf1c19b274e5273370f79a8e21c5e8009c1b1e2529d83a33392ddec46db6 gencode/docs/configuration_endpoint.html +8b644bcc965a4fee3fa66dc55abde2079adcef621b0d49aa44ae83185617071e gencode/docs/configuration_execution.html +5fe25ff7a6ac8ed09476f9e24f56e686ba35268d694ebc20968222a207445400 gencode/docs/configuration_pod.html +cf01efc05e3fcabb18786af43f7f93d7780b8156b91827e34b34698c4bcea5b8 gencode/docs/configuration_pubber.html 1057fa40fb7a31a23bb2773d21c38cf4590a935bd8b5ea4218e695c6204f5dd9 gencode/docs/data_template.html ffd9325c940b8e832a608a595c8ccc8903b935e899f42219ddc3c79ec65f6202 gencode/docs/events.html 70e57ad6ef39330d958727ebf9dcd61ef6ea30e4c8653eac412bf1867fdb3a70 gencode/docs/events_alarmset.html @@ -17,7 +17,7 @@ cac253f57c5c92ef32e2a5f91b6cec8229e8db1dcffcc96a58f06da068e741e7 gencode/docs/e 73dbe799e7943ec20ac58b544998e986a39539d4ef0cb4f5023e92e7634d3124 gencode/docs/events_validation.html cebf265b0c3d3a6e9c9e8c4e6c723ac36bc088dfabbc789e775388c3d2ec83d0 gencode/docs/metadata.html c86682715d348bd3dd971fa5bd925a8a3d0f3c2944c65a47c4b64fe1a5ccdea2 gencode/docs/monitoring.html -474ca16edc9f3cad2bb3ab40b6993cbced90263f762f66ee6cd246a6c4a0d18f gencode/docs/persistent_device.html +841b43bad8a76268435bf545cf7b8a57b1ba478a9cd244b3cd97aca536b1b360 gencode/docs/persistent_device.html e11595fd11477947a27461f8ef4fb6facb5f60e2abd6212193f7581ab123ff84 gencode/docs/properties.html c006d0f46c8f007caa90ac76c713cead907669c14c09f4a288fac5b25afe05dd gencode/docs/query_cloud.html 6f2cd8163a129667beb79f297f193ecd14d40c4f1ac06570db13d912ab98fd3f gencode/docs/readme.md @@ -70,7 +70,7 @@ ff98f706c967bbef7be35d826f5abad822c89d2fa981c4d8e6cffbb106d3740c gencode/java/u 1a04079116f4032b17108e873719bbb2c60c19f392136e7c906122199b472227 gencode/java/udmi/schema/DiscoveryModel.java b41f59d0c1aa74bb9abbdc7525d726f45a4ba8df3866c2dd40458601ffab60fb gencode/java/udmi/schema/DiscoveryState.java 7019b8a1522261a69d708e2e7725b8bc44510c5c80f5c056543af2b7728bfa42 gencode/java/udmi/schema/Electricity.java -a9ab0b95408ee04fc98cbc2363bead24c67e64e279e6088e0dd3e4b97154c65d gencode/java/udmi/schema/EndpointConfiguration.java +722263ceca9acb28c7b39d2133e2eede16c6f037d4325c7587a4563c80d77f2b gencode/java/udmi/schema/EndpointConfiguration.java d2e7afd6e1a9250480144c114ee8877afb9b0dd7048495fe96a821d9e6c80475 gencode/java/udmi/schema/Entry.java d4bfd2c997937cd1b0f9b1c73ed46b0133b2b264d367ca9001b7d84ecd598611 gencode/java/udmi/schema/Enumerations.java 11ccad9b1ed4a4745e7bf321adc4bb7f684e9bfa90877314094533b9749512b7 gencode/java/udmi/schema/Envelope.java @@ -157,7 +157,7 @@ a671f5341703d03200c3a4b4df41f109e587e3723292321998595da60b03e4a2 gencode/java/u 8cc9c88554fc6c9d7d6eba6884279eab3160b4e72d8edfef8155e69ec61c1eec gencode/java/udmi/schema/ValidationState.java eadc72e31b4796273479967303513b16563af0f946d1e1c7eba1748f9b133d40 gencode/java/udmi/schema/ValidationSummary.java 11f8dab5296d41e86cd623a4ed27b972ae673b141907cb913397d4eb53880c59 gencode/java/udmi/schema/Water.java -0d863349462e546bbfad54e75032170d9967f4ed95aa6bd900f36398fba012a4 gencode/presentation/presentation.json +dd8497a1183a85ae281a54b02bbf360f9ecb95176d1efb588804c63e585c0d79 gencode/presentation/presentation.json 4cf98cbd132cde0cc8813ac35cf3712cb46014154c817c04ad2902c268cdd8fe gencode/python/pyproject.toml a98b84029d33a421872a08f7bfb1bd2f23f8dc3bebc8d7a1c2a7f1c8596510bf gencode/python/udmi/schema/__init__.py f9d90861e568b27445bef241f04cce64cc44731c95c8bd9e3f65cef79d42dab0 gencode/python/udmi/schema/_base.py @@ -187,7 +187,7 @@ b65e7e9152629a9175f62ed706701d02814d62d8f3e9c0b46c2ba49ad8b51077 gencode/python faf4cdb1687868fadc411c144d57a5c596af790e43df4d7ddec5ee3fd10ba4bd gencode/python/udmi/schema/config_system.py ddd6e974840a19aa75e2f322e140676f71e0715f69088b88dfea76f0625ab035 gencode/python/udmi/schema/config_system_testing.py f187d5acbf931e97365af4283ee676786b7f26294db720b1c8535ab50387a6c1 gencode/python/udmi/schema/config_udmi.py -dea1560526d0e24431679ba7385bd5ae2fd6a0750c69c4548e2b12084b0467ff gencode/python/udmi/schema/configuration_endpoint.py +d1cf489cbf937f801213f1425508afcc37c548e08f1da6ac755f9282bd84992d gencode/python/udmi/schema/configuration_endpoint.py 8adbf9647a6c6666559837d1963d41e0fd7e9c9193bc73f40066ae3f97ce378c gencode/python/udmi/schema/configuration_execution.py c0853ff1838a11291b53a40c43a6cdaff17951a34b3d7f3de10d9fcc7b44b79d gencode/python/udmi/schema/configuration_pod.py 327db1ca7c7a8e4d8ff0eb40b183163c314a5b3bc56c25bd17492fec9c9a811f gencode/python/udmi/schema/configuration_pod_base.py diff --git a/bin/mosquctl_client b/bin/mosquctl_client deleted file mode 100755 index 6256091452..0000000000 --- a/bin/mosquctl_client +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -e - -UDMI_ROOT=$(dirname $0)/.. -cd $UDMI_ROOT - -source $UDMI_ROOT/etc/shell_common.sh - -if [[ $# != 2 ]]; then - echo Usage: $0 client_id client_pass - false -fi - -client_id=$1 -client_pass=$2 -shift 2 - -source $UDMI_ROOT/etc/mosquitto_ctrl.sh - -client_user=$client_id -role_name="role_${client_id//\//_}" - -$MOSQUITTO_CTRL deleteClient $client_user || true -$MOSQUITTO_CTRL deleteRole $role_name || true - -if [[ $client_pass != "--" ]]; then - $MOSQUITTO_CTRL createClient $client_user -p $client_pass -c $client_id - - $MOSQUITTO_CTRL createRole $role_name - $MOSQUITTO_CTRL addClientRole $client_user $role_name - - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/config" allow - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/commands" allow - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/errors" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/events/#" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/state" allow - echo "Device $client_id registered correctly." -else - echo "Device $client_id deleted correctly." -fi diff --git a/bin/mosquctl_gateway b/bin/mosquctl_gateway deleted file mode 100755 index 463d3bd7f2..0000000000 --- a/bin/mosquctl_gateway +++ /dev/null @@ -1,47 +0,0 @@ -#!/bin/bash -e - -UDMI_ROOT=$(dirname $0)/.. -source $UDMI_ROOT/etc/shell_common.sh -source $UDMI_ROOT/etc/mosquitto_ctrl.sh - -if [[ $# != 3 ]]; then - echo Usage: $0 action gateway_id device_id - echo Actions: bind, unbind - false -fi - -action=$1 -gateway_id=$2 -device_id=$3 - -role_name="role_${gateway_id//\//_}" - -if [[ $action == "bind" ]]; then - echo Binding $device_id to gateway $gateway_id - # Create role if not exists (ignore error if exists) - $MOSQUITTO_CTRL createRole $role_name || true - # Add role to gateway client (ignore error if already added) - $MOSQUITTO_CTRL addClientRole $gateway_id $role_name || true - - # Add ACLs - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/config" allow - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/commands" allow - $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/errors" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/events/#" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/state" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/attach" allow - echo "Binding successful for $device_id to gateway $gateway_id" -elif [[ $action == "unbind" ]]; then - echo Unbinding $device_id from gateway $gateway_id - # Remove ACLs - $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/config" || true - $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/commands" || true - $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/errors" || true - $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/events/#" || true - $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/state" || true - $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/attach" || true - echo "Unbinding successful for $device_id from gateway $gateway_id" -else - echo Unknown action: $action - false -fi diff --git a/bin/mosquctl_log b/bin/mosquctl_log deleted file mode 100755 index b692e3868d..0000000000 --- a/bin/mosquctl_log +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash -e - -UDMI_ROOT=$(dirname $0)/.. -source $UDMI_ROOT/etc/shell_common.sh -cd $UDMI_ROOT - -LOG_FILE=/var/log/mosquitto/mosquitto.log - -if [[ $# != 1 ]]; then - echo Usage: $0 client_id_prefix - false -fi - -prefix=$1 -shift - -echo $(date +%s): Starting MONITOR Client $prefix of $LOG_FILE - -tail -f $LOG_FILE | stdbuf -oL egrep " (from|to|as|Client) $prefix" - -echo $(date +%s): Finished MONITOR Client $prefix of $LOG_FILE diff --git a/bin/start_mosquitto b/bin/start_mosquitto index d7c60b9677..e611ad5dad 100755 --- a/bin/start_mosquitto +++ b/bin/start_mosquitto @@ -73,6 +73,7 @@ fi $MOSQUITTO_CTRL createRole service $MOSQUITTO_CTRL addRoleACL service subscribePattern '/r/+/d/+/#' allow $MOSQUITTO_CTRL addRoleACL service publishClientSend '/r/+/d/+/#' allow +$MOSQUITTO_CTRL addClientRole $AUTH_USER service clients=$($MOSQUITTO_CTRL listClients) if [[ $clients =~ ${AUTH_USER} ]]; then diff --git a/bin/test_mosquitto b/bin/test_mosquitto index 78c43fae11..fa049d140c 100755 --- a/bin/test_mosquitto +++ b/bin/test_mosquitto @@ -56,7 +56,17 @@ fgrep Racket out/mosquitto.sub || fail did not find expected message hash_pass=$(sha256sum < $site_path/devices/$device_id/rsa_private.pkcs8) dev_pass=${hash_pass:0:8} dev_id=/r/$registry_id/d/$device_id -bin/mosquctl_client $dev_id $dev_pass +$MOSQUITTO_CTRL deleteClient $dev_id || true +role_name="role_${dev_id//\//_}" +$MOSQUITTO_CTRL deleteRole $role_name || true +$MOSQUITTO_CTRL createClient $dev_id -p $dev_pass -c $dev_id +$MOSQUITTO_CTRL createRole $role_name +$MOSQUITTO_CTRL addClientRole $dev_id $role_name +$MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$dev_id/config" allow +$MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$dev_id/commands" allow +$MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$dev_id/errors" allow +$MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$dev_id/events/#" allow +$MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$dev_id/state" allow sleep 1 @@ -120,7 +130,9 @@ fi echo Cleanup subscribers and clients... killall mosquitto_sub || true -bin/mosquctl_client $dev_id -- +$MOSQUITTO_CTRL deleteClient $dev_id || true +role_name="role_${dev_id//\//_}" +$MOSQUITTO_CTRL deleteRole $role_name || true echo Received messages: cat out/mosquitto.sub | cut -c -120 diff --git a/common/src/main/java/com/google/udmi/util/CertManager.java b/common/src/main/java/com/google/udmi/util/CertManager.java index afb34a93e9..d29bbd4405 100644 --- a/common/src/main/java/com/google/udmi/util/CertManager.java +++ b/common/src/main/java/com/google/udmi/util/CertManager.java @@ -88,6 +88,26 @@ public CertManager(File caCrtFile, File clientDir, Transport transport, } } + public CertManager(File caCrtFile, File crtFile, File keyFile, Transport transport, + String passString, Consumer logging) { + this.caCrtFile = caCrtFile; + this.crtFile = crtFile; + this.keyFile = keyFile; + this.transport = transport; + this.caCertificate = null; + this.clientCertificate = null; + this.clientPrivateKey = null; + + if (Transport.SSL.equals(transport)) { + this.password = passString == null ? new char[0] : passString.toCharArray(); + logging.accept("CA cert file: " + caCrtFile); + logging.accept("Device cert file: " + crtFile); + logging.accept("Private key file: " + keyFile); + } else { + this.password = null; + } + } + public CertManager(String caCertificate, String clientCertificate, String clientPrivateKey, Transport transport, String passString) { caCrtFile = null; diff --git a/gencode/docs/configuration_endpoint.html b/gencode/docs/configuration_endpoint.html index 50f5297af3..f52acb6d0c 100644 --- a/gencode/docs/configuration_endpoint.html +++ b/gencode/docs/configuration_endpoint.html @@ -1024,6 +1024,108 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
diff --git a/gencode/docs/configuration_execution.html b/gencode/docs/configuration_execution.html index facf8a4a4b..6adec99c9c 100644 --- a/gencode/docs/configuration_execution.html +++ b/gencode/docs/configuration_execution.html @@ -1686,6 +1686,129 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
@@ -2962,6 +3085,129 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
diff --git a/gencode/docs/configuration_pod.html b/gencode/docs/configuration_pod.html index 0b9c2370e7..7c49c7f82b 100644 --- a/gencode/docs/configuration_pod.html +++ b/gencode/docs/configuration_pod.html @@ -1403,6 +1403,129 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
@@ -2916,6 +3039,150 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
@@ -4444,18 +4711,18 @@

-
+
-
+

- +

-
+
Type: stringFormat: date-time
-

The timestamp of the endpoint generation

+ ca_file
Type: string
+

Full path to the CA certificate file

-
-
Example:
-
"2019-01-17T14:02:29.364Z"
-
-
-
-
-
-
-
-
- - + -
+
-
+

- +

-
+
Type: object
- - - - - - - -
-
-
-

-

Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: stringFormat: date-time
+

The timestamp of the endpoint generation

+
+ + + + + +
+
Example:
+
"2019-01-17T14:02:29.364Z"
+
+
+
+
+
+
+
+
+
+
+
+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: object
+ + + + + + + +
+
+
+

+

@@ -6257,18 +6668,18 @@

-
+
-
+

- +

-
+
Type: stringFormat: date-time
-

The timestamp of the endpoint generation

+ ca_file
Type: string
+

Full path to the CA certificate file

-
-
Example:
-
"2019-01-17T14:02:29.364Z"
-
-
-
-
-
-
+
-
+
-
+

- +

-
+

Endpoint Configuration

Type: object
-

Parameters to define a message endpoint

+ from + + + + cert_file
Type: string
+

Full path to the client certificate file

- - No Additional Properties -
+
+
+
+
+
-
+

- +

-
+
Type: string
-

Friendly name for this flow (debugging and diagnostics)

+ key_file
Type: string
+

Full path to the client private key file

@@ -6421,18 +6833,18 @@

-
+
-
+

- +

-
+
Type: enum (of string)
-
-

Must be one of:

-
  • "local"
  • "pubsub"
  • "file"
  • "trace"
  • "mqtt"
-
+ generation
Type: stringFormat: date-time
+

The timestamp of the endpoint generation

+
- +
+
Example:
+
"2019-01-17T14:02:29.364Z"
+
+
-
+
+
+
+
+
-
+

- +

-
-
+
+
+ +

Endpoint Configuration

Type: object
+

Parameters to define a message endpoint

+
+ + No Additional Properties + + + + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Friendly name for this flow (debugging and diagnostics)

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: enum (of string)
+
+

Must be one of:

+
  • "local"
  • "pubsub"
  • "file"
  • "trace"
  • "mqtt"
+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
@@ -9648,9 +10389,206 @@

-
+
+
+ + Type: object
+ No Additional Properties + + + + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+ + + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+ + + + + + + +
+
+
+
+
+
+
+
+
+
+
+

+ +

+
+ +
Type: object
+ jwt
Type: object
No Additional Properties @@ -9696,18 +10634,18 @@

-
+
-
+

- +

-
+
Type: string
+ audience
Type: string
@@ -9764,18 +10702,26 @@

-
+
+
+
+
+

+
+
+
+
-
+

- +

-
+
Type: string
- + ca_file
Type: string
+

Full path to the CA certificate file

+
@@ -9832,22 +10765,18 @@

-
-
-
-
-
+
-
+

- +

-
+
Type: object
- No Additional Properties + cert_file
Type: string
+

Full path to the client certificate file

+
-
+
+
+
+
+
-
+

- +

-
+
Type: string
- + key_file
Type: string
+

Full path to the client private key file

+
-
-
-
-
-
-
-
-
@@ -11981,6 +12887,171 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
diff --git a/gencode/docs/configuration_pubber.html b/gencode/docs/configuration_pubber.html index b1ede1ae25..78c841a05c 100644 --- a/gencode/docs/configuration_pubber.html +++ b/gencode/docs/configuration_pubber.html @@ -1252,6 +1252,129 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
diff --git a/gencode/docs/persistent_device.html b/gencode/docs/persistent_device.html index 15d4d7f913..2020efb17c 100644 --- a/gencode/docs/persistent_device.html +++ b/gencode/docs/persistent_device.html @@ -1252,6 +1252,129 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the CA certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client certificate file

+
+ + + + + + +
+
+
+
+
+
+
+

+ +

+
+ +
+
+ + Type: string
+

Full path to the client private key file

+
+ + + + + +
diff --git a/gencode/java/udmi/schema/EndpointConfiguration.java b/gencode/java/udmi/schema/EndpointConfiguration.java index ebfba2835a..972fc7f29b 100644 --- a/gencode/java/udmi/schema/EndpointConfiguration.java +++ b/gencode/java/udmi/schema/EndpointConfiguration.java @@ -43,6 +43,9 @@ "keyBytes", "algorithm", "auth_provider", + "ca_file", + "cert_file", + "key_file", "generation" }) public class EndpointConfiguration { @@ -173,6 +176,27 @@ public class EndpointConfiguration { public String algorithm; @JsonProperty("auth_provider") public Auth_provider auth_provider; + /** + * Full path to the CA certificate file + * + */ + @JsonProperty("ca_file") + @JsonPropertyDescription("Full path to the CA certificate file") + public String ca_file; + /** + * Full path to the client certificate file + * + */ + @JsonProperty("cert_file") + @JsonPropertyDescription("Full path to the client certificate file") + public String cert_file; + /** + * Full path to the client private key file + * + */ + @JsonProperty("key_file") + @JsonPropertyDescription("Full path to the client private key file") + public String key_file; /** * The timestamp of the endpoint generation * @@ -184,30 +208,33 @@ public class EndpointConfiguration { @Override public int hashCode() { int result = 1; - result = ((result* 31)+((this.generation == null)? 0 :this.generation.hashCode())); result = ((result* 31)+((this.keyBytes == null)? 0 :this.keyBytes.hashCode())); + result = ((result* 31)+((this.ca_file == null)? 0 :this.ca_file.hashCode())); + result = ((result* 31)+((this.key_file == null)? 0 :this.key_file.hashCode())); + result = ((result* 31)+((this.cert_file == null)? 0 :this.cert_file.hashCode())); result = ((result* 31)+((this.side_id == null)? 0 :this.side_id.hashCode())); - result = ((result* 31)+((this.transport == null)? 0 :this.transport.hashCode())); - result = ((result* 31)+((this.publish_delay_sec == null)? 0 :this.publish_delay_sec.hashCode())); result = ((result* 31)+((this.error == null)? 0 :this.error.hashCode())); - result = ((result* 31)+((this.config_sync_sec == null)? 0 :this.config_sync_sec.hashCode())); result = ((result* 31)+((this.deviceId == null)? 0 :this.deviceId.hashCode())); result = ((result* 31)+((this.client_id == null)? 0 :this.client_id.hashCode())); result = ((result* 31)+((this.enabled == null)? 0 :this.enabled.hashCode())); result = ((result* 31)+((this.capacity == null)? 0 :this.capacity.hashCode())); - result = ((result* 31)+((this.send_id == null)? 0 :this.send_id.hashCode())); result = ((result* 31)+((this.protocol == null)? 0 :this.protocol.hashCode())); result = ((result* 31)+((this.hostname == null)? 0 :this.hostname.hashCode())); result = ((result* 31)+((this.payload == null)? 0 :this.payload.hashCode())); - result = ((result* 31)+((this.port == null)? 0 :this.port.hashCode())); result = ((result* 31)+((this.topic_prefix == null)? 0 :this.topic_prefix.hashCode())); + result = ((result* 31)+((this.recv_id == null)? 0 :this.recv_id.hashCode())); + result = ((result* 31)+((this.algorithm == null)? 0 :this.algorithm.hashCode())); + result = ((result* 31)+((this.generation == null)? 0 :this.generation.hashCode())); + result = ((result* 31)+((this.transport == null)? 0 :this.transport.hashCode())); + result = ((result* 31)+((this.publish_delay_sec == null)? 0 :this.publish_delay_sec.hashCode())); + result = ((result* 31)+((this.config_sync_sec == null)? 0 :this.config_sync_sec.hashCode())); + result = ((result* 31)+((this.send_id == null)? 0 :this.send_id.hashCode())); + result = ((result* 31)+((this.port == null)? 0 :this.port.hashCode())); result = ((result* 31)+((this.name == null)? 0 :this.name.hashCode())); result = ((result* 31)+((this.periodic_sec == null)? 0 :this.periodic_sec.hashCode())); result = ((result* 31)+((this.noConfigAck == null)? 0 :this.noConfigAck.hashCode())); - result = ((result* 31)+((this.recv_id == null)? 0 :this.recv_id.hashCode())); result = ((result* 31)+((this.gatewayId == null)? 0 :this.gatewayId.hashCode())); result = ((result* 31)+((this.auth_provider == null)? 0 :this.auth_provider.hashCode())); - result = ((result* 31)+((this.algorithm == null)? 0 :this.algorithm.hashCode())); return result; } @@ -220,7 +247,7 @@ public boolean equals(Object other) { return false; } EndpointConfiguration rhs = ((EndpointConfiguration) other); - return (((((((((((((((((((((((((this.generation == rhs.generation)||((this.generation!= null)&&this.generation.equals(rhs.generation)))&&((this.keyBytes == rhs.keyBytes)||((this.keyBytes!= null)&&this.keyBytes.equals(rhs.keyBytes))))&&((this.side_id == rhs.side_id)||((this.side_id!= null)&&this.side_id.equals(rhs.side_id))))&&((this.transport == rhs.transport)||((this.transport!= null)&&this.transport.equals(rhs.transport))))&&((this.publish_delay_sec == rhs.publish_delay_sec)||((this.publish_delay_sec!= null)&&this.publish_delay_sec.equals(rhs.publish_delay_sec))))&&((this.error == rhs.error)||((this.error!= null)&&this.error.equals(rhs.error))))&&((this.config_sync_sec == rhs.config_sync_sec)||((this.config_sync_sec!= null)&&this.config_sync_sec.equals(rhs.config_sync_sec))))&&((this.deviceId == rhs.deviceId)||((this.deviceId!= null)&&this.deviceId.equals(rhs.deviceId))))&&((this.client_id == rhs.client_id)||((this.client_id!= null)&&this.client_id.equals(rhs.client_id))))&&((this.enabled == rhs.enabled)||((this.enabled!= null)&&this.enabled.equals(rhs.enabled))))&&((this.capacity == rhs.capacity)||((this.capacity!= null)&&this.capacity.equals(rhs.capacity))))&&((this.send_id == rhs.send_id)||((this.send_id!= null)&&this.send_id.equals(rhs.send_id))))&&((this.protocol == rhs.protocol)||((this.protocol!= null)&&this.protocol.equals(rhs.protocol))))&&((this.hostname == rhs.hostname)||((this.hostname!= null)&&this.hostname.equals(rhs.hostname))))&&((this.payload == rhs.payload)||((this.payload!= null)&&this.payload.equals(rhs.payload))))&&((this.port == rhs.port)||((this.port!= null)&&this.port.equals(rhs.port))))&&((this.topic_prefix == rhs.topic_prefix)||((this.topic_prefix!= null)&&this.topic_prefix.equals(rhs.topic_prefix))))&&((this.name == rhs.name)||((this.name!= null)&&this.name.equals(rhs.name))))&&((this.periodic_sec == rhs.periodic_sec)||((this.periodic_sec!= null)&&this.periodic_sec.equals(rhs.periodic_sec))))&&((this.noConfigAck == rhs.noConfigAck)||((this.noConfigAck!= null)&&this.noConfigAck.equals(rhs.noConfigAck))))&&((this.recv_id == rhs.recv_id)||((this.recv_id!= null)&&this.recv_id.equals(rhs.recv_id))))&&((this.gatewayId == rhs.gatewayId)||((this.gatewayId!= null)&&this.gatewayId.equals(rhs.gatewayId))))&&((this.auth_provider == rhs.auth_provider)||((this.auth_provider!= null)&&this.auth_provider.equals(rhs.auth_provider))))&&((this.algorithm == rhs.algorithm)||((this.algorithm!= null)&&this.algorithm.equals(rhs.algorithm)))); + return ((((((((((((((((((((((((((((this.keyBytes == rhs.keyBytes)||((this.keyBytes!= null)&&this.keyBytes.equals(rhs.keyBytes)))&&((this.ca_file == rhs.ca_file)||((this.ca_file!= null)&&this.ca_file.equals(rhs.ca_file))))&&((this.key_file == rhs.key_file)||((this.key_file!= null)&&this.key_file.equals(rhs.key_file))))&&((this.cert_file == rhs.cert_file)||((this.cert_file!= null)&&this.cert_file.equals(rhs.cert_file))))&&((this.side_id == rhs.side_id)||((this.side_id!= null)&&this.side_id.equals(rhs.side_id))))&&((this.error == rhs.error)||((this.error!= null)&&this.error.equals(rhs.error))))&&((this.deviceId == rhs.deviceId)||((this.deviceId!= null)&&this.deviceId.equals(rhs.deviceId))))&&((this.client_id == rhs.client_id)||((this.client_id!= null)&&this.client_id.equals(rhs.client_id))))&&((this.enabled == rhs.enabled)||((this.enabled!= null)&&this.enabled.equals(rhs.enabled))))&&((this.capacity == rhs.capacity)||((this.capacity!= null)&&this.capacity.equals(rhs.capacity))))&&((this.protocol == rhs.protocol)||((this.protocol!= null)&&this.protocol.equals(rhs.protocol))))&&((this.hostname == rhs.hostname)||((this.hostname!= null)&&this.hostname.equals(rhs.hostname))))&&((this.payload == rhs.payload)||((this.payload!= null)&&this.payload.equals(rhs.payload))))&&((this.topic_prefix == rhs.topic_prefix)||((this.topic_prefix!= null)&&this.topic_prefix.equals(rhs.topic_prefix))))&&((this.recv_id == rhs.recv_id)||((this.recv_id!= null)&&this.recv_id.equals(rhs.recv_id))))&&((this.algorithm == rhs.algorithm)||((this.algorithm!= null)&&this.algorithm.equals(rhs.algorithm))))&&((this.generation == rhs.generation)||((this.generation!= null)&&this.generation.equals(rhs.generation))))&&((this.transport == rhs.transport)||((this.transport!= null)&&this.transport.equals(rhs.transport))))&&((this.publish_delay_sec == rhs.publish_delay_sec)||((this.publish_delay_sec!= null)&&this.publish_delay_sec.equals(rhs.publish_delay_sec))))&&((this.config_sync_sec == rhs.config_sync_sec)||((this.config_sync_sec!= null)&&this.config_sync_sec.equals(rhs.config_sync_sec))))&&((this.send_id == rhs.send_id)||((this.send_id!= null)&&this.send_id.equals(rhs.send_id))))&&((this.port == rhs.port)||((this.port!= null)&&this.port.equals(rhs.port))))&&((this.name == rhs.name)||((this.name!= null)&&this.name.equals(rhs.name))))&&((this.periodic_sec == rhs.periodic_sec)||((this.periodic_sec!= null)&&this.periodic_sec.equals(rhs.periodic_sec))))&&((this.noConfigAck == rhs.noConfigAck)||((this.noConfigAck!= null)&&this.noConfigAck.equals(rhs.noConfigAck))))&&((this.gatewayId == rhs.gatewayId)||((this.gatewayId!= null)&&this.gatewayId.equals(rhs.gatewayId))))&&((this.auth_provider == rhs.auth_provider)||((this.auth_provider!= null)&&this.auth_provider.equals(rhs.auth_provider)))); } public enum Protocol { diff --git a/gencode/presentation/presentation.json b/gencode/presentation/presentation.json index 3fbe159045..f960d84fa6 100644 --- a/gencode/presentation/presentation.json +++ b/gencode/presentation/presentation.json @@ -440,6 +440,27 @@ "section": "cloud_iot_config", "type": "string" }, + "reflector_endpoint.ca_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the CA certificate file" + }, + "reflector_endpoint.cert_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client certificate file" + }, + "reflector_endpoint.key_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client private key file" + }, "reflector_endpoint.generation": { "display": "show", "style": "bold", @@ -592,6 +613,27 @@ "section": "cloud_iot_config", "type": "string" }, + "device_endpoint.ca_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the CA certificate file" + }, + "device_endpoint.cert_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client certificate file" + }, + "device_endpoint.key_file": { + "display": "show", + "style": "bold", + "section": "cloud_iot_config", + "type": "string", + "description": "Full path to the client private key file" + }, "device_endpoint.generation": { "display": "show", "style": "bold", diff --git a/gencode/python/udmi/schema/configuration_endpoint.py b/gencode/python/udmi/schema/configuration_endpoint.py index 9c1d5d3d77..954286596b 100644 --- a/gencode/python/udmi/schema/configuration_endpoint.py +++ b/gencode/python/udmi/schema/configuration_endpoint.py @@ -69,4 +69,7 @@ class EndpointConfiguration(DataModel): keyBytes: Optional[Any] = None algorithm: Optional[str] = None auth_provider: Optional[AuthProvider] = None + ca_file: Optional[str] = None + cert_file: Optional[str] = None + key_file: Optional[str] = None generation: Optional[str] = None diff --git a/schema/configuration_endpoint.json b/schema/configuration_endpoint.json index fe8dcb8700..9397e59d14 100644 --- a/schema/configuration_endpoint.json +++ b/schema/configuration_endpoint.json @@ -224,6 +224,30 @@ } } }, + "ca_file": { + "description": "Full path to the CA certificate file", + "type": "string", + "$presentation": { + "display": "show", + "style": "bold" + } + }, + "cert_file": { + "description": "Full path to the client certificate file", + "type": "string", + "$presentation": { + "display": "show", + "style": "bold" + } + }, + "key_file": { + "description": "Full path to the client private key file", + "type": "string", + "$presentation": { + "display": "show", + "style": "bold" + } + }, "generation": { "description": "The timestamp of the endpoint generation", "type": "string", diff --git a/udmis/etc/local_pod.json b/udmis/etc/local_pod.json index 2ec9a19c1e..f0ecf17c80 100644 --- a/udmis/etc/local_pod.json +++ b/udmis/etc/local_pod.json @@ -4,6 +4,9 @@ "protocol": "mqtt", "transport": "ssl", "hostname": "localhost", + "ca_file": "/etc/mosquitto/certs/ca.crt", + "cert_file": "/etc/mosquitto/certs/rsa_private.crt", + "key_file": "/etc/mosquitto/certs/rsa_private.pem", "auth_provider": { "basic": { "username": "rocket", @@ -51,10 +54,13 @@ "transport": "ssl", "hostname": "localhost", "port": 8883, + "ca_file": "/etc/mosquitto/certs/ca.crt", + "cert_file": "/etc/mosquitto/certs/rsa_private.crt", + "key_file": "/etc/mosquitto/certs/rsa_private.pem", "auth_provider": { "basic": { - "username": "rocket", - "password": "monkey" + "username": "scrumptious", + "password": "aardvark" } } } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 50788538b0..c0bbc04f27 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -50,6 +50,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; import udmi.schema.Auth_provider; @@ -76,11 +77,13 @@ *
  • use_password: Sets the password for all devices to the specified value. * This is used when authentication is handled by an external proxy, and Mosquitto * still needs to enforce ACLs based on username.
  • + *
  • disable_logging: If set to true, disables tailing the mosquitto log file.
  • * */ public class ImplicitIotAccessProvider extends IotAccessBase { private static final String CONFIG_VER_KEY = "config_ver"; + private static final String DISABLE_LOGGING_KEY = "disable_logging"; private static final String USE_PASSWORD_KEY = "use_password"; private static final String BROKER_USER_KEY = "broker_user"; private static final String BROKER_PASS_KEY = "broker_pass"; @@ -104,6 +107,7 @@ public class ImplicitIotAccessProvider extends IotAccessBase { private static final String CONFIG_SUFFIX = "/config"; private static final String METADATA_STR_KEY = "metadata_str"; private static final String RESOURCE_TYPE_PROPERTY = "resource_type"; + private static final int DEVICE_FETCH_BATCH_SIZE = 100; private final boolean enabled; private final String usePassword; private final ConnectionBroker broker; @@ -140,13 +144,27 @@ public ImplicitIotAccessProvider(IotAccess iotAccess) { brokerHost = ofNullable(endpointConfig.hostname).orElse("localhost"); brokerPort = ofNullable(endpointConfig.port).map(Object::toString).orElse("8883"); } else { + endpointConfig = new EndpointConfiguration(); brokerUser = options.get(BROKER_USER_KEY); brokerPass = options.get(BROKER_PASS_KEY); brokerHost = ofNullable(options.get(BROKER_HOST_KEY)).orElse("localhost"); brokerPort = ofNullable(options.get(BROKER_PORT_KEY)).orElse("8883"); + + endpointConfig.hostname = brokerHost; + endpointConfig.port = Integer.parseInt(brokerPort); + endpointConfig.transport = "1883".equals(brokerPort) ? Transport.TCP : Transport.SSL; + endpointConfig.client_id = clientId; + + if (isPublishEnabled()) { + endpointConfig.auth_provider = new Auth_provider(); + endpointConfig.auth_provider.basic = new Basic(); + endpointConfig.auth_provider.basic.username = brokerUser; + endpointConfig.auth_provider.basic.password = brokerPass; + } } - broker = new MosquittoBroker(this); + boolean disableLogging = TRUE_OPTION.equals(options.get(DISABLE_LOGGING_KEY)); + broker = new MosquittoBroker(this, endpointConfig, disableLogging); connLogger = broker.addEventListener(CLIENT_PREFIX, this::brokerHandler); } @@ -158,19 +176,34 @@ public static String hashedDeviceId(String registryId, String deviceId) { return String.valueOf(Math.abs(Objects.hash(registryId, deviceId))); } - private void bindDevicesToGateway(String registryId, String gatewayId, CloudModel cloudModel) { + private void bindDevicesToGateway( + String registryId, String gatewayId, CloudModel cloudModel, Consumer progress) { Set deviceIds = ImmutableSet.copyOf(cloudModel.gateway.proxy_ids); + AtomicInteger count = new AtomicInteger(); + int total = deviceIds.size(); deviceIds.forEach(deviceId -> { + int current = count.incrementAndGet(); + if (current % 50 == 0 && progress != null) { + progress.accept(format("Binding %d/%d devices to %s...", current, total, gatewayId)); + } registryDeviceRef(registryId, deviceId).put(BOUND_TO_KEY, gatewayId); + gatewayBoundRef(registryId, gatewayId).put(deviceId, "bound"); broker.bindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); }); } private void unbindDevicesFromGateway(String registryId, String gatewayId, - CloudModel cloudModel) { + CloudModel cloudModel, Consumer progress) { Set deviceIds = ImmutableSet.copyOf(cloudModel.gateway.proxy_ids); + AtomicInteger count = new AtomicInteger(); + int total = deviceIds.size(); deviceIds.forEach(deviceId -> { + int current = count.incrementAndGet(); + if (current % 50 == 0 && progress != null) { + progress.accept(format("Unbinding %d/%d devices from %s...", current, total, gatewayId)); + } registryDeviceRef(registryId, deviceId).delete(BOUND_TO_KEY); + gatewayBoundRef(registryId, gatewayId).delete(deviceId); broker.unbindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); }); } @@ -220,12 +253,15 @@ private void createDevice(String registryId, String deviceId, CloudModel cloudMo private void deleteDevice(String registryId, String deviceId, CloudModel cloudModel) { DataRef properties = registryDeviceRef(registryId, deviceId); + String gatewayId = properties.get(BOUND_TO_KEY); properties.entries().keySet().forEach(properties::delete); registryDevicesRef(registryId).delete(deviceId); - broker.authorize(clientId(registryId, deviceId), null); - String gatewayId = properties.get(BOUND_TO_KEY); + if (gatewayId == null) { + broker.authorize(clientId(registryId, deviceId), null); + } if (gatewayId != null) { + gatewayBoundRef(registryId, gatewayId).delete(deviceId); broker.unbindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); } } @@ -242,11 +278,21 @@ private CloudModel getReply(String registryId, String deviceId, CloudModel reque private DataRef mungeDevice(String registryId, String deviceId, Map map) { DataRef properties = registryDeviceRef(registryId, deviceId); - map.forEach((key, value) -> - ifNotNullThen(value, v -> properties.put(key, value), () -> properties.delete(key))); + Map puts = map.entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Set deletes = map.entrySet().stream() + .filter(e -> e.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + properties.update(puts, deletes); if (map.containsKey(AUTH_PASSWORD_PROPERTY)) { - broker.authorize(clientId(registryId, deviceId), map.get(AUTH_PASSWORD_PROPERTY)); + boolean isAuthorized = properties.get(AUTH_KEY_PROPERTY) != null + || properties.get(AUTH_TYPE_PROPERTY) != null; + if (isAuthorized) { + broker.authorize(clientId(registryId, deviceId), map.get(AUTH_PASSWORD_PROPERTY)); + } } return properties; } @@ -259,6 +305,10 @@ private DataRef registryDevicesRef(String registryId) { return database.ref().registry(registryId).collection(DEVICES_ACTIVE); } + private DataRef gatewayBoundRef(String registryId, String gatewayId) { + return database.ref().registry(registryId).device(gatewayId).collection("bound_devices"); + } + private void sendConfigUpdate(String registryId, String deviceId, String config) { if (isPublishEnabled()) { publishMqtt(registryId, deviceId, config); @@ -349,20 +399,7 @@ public void activate() { private void connectMqttClient() { info("Initializing SimpleMqttPipe for ImplicitIotAccessProvider"); try { - if (endpointConfig == null) { - endpointConfig = new EndpointConfiguration(); - endpointConfig.hostname = brokerHost; - endpointConfig.port = Integer.parseInt(brokerPort); - endpointConfig.transport = "1883".equals(brokerPort) ? Transport.TCP : Transport.SSL; - endpointConfig.client_id = clientId; - - if (isPublishEnabled()) { - endpointConfig.auth_provider = new Auth_provider(); - endpointConfig.auth_provider.basic = new Basic(); - endpointConfig.auth_provider.basic.username = brokerUser; - endpointConfig.auth_provider.basic.password = brokerPass; - } - } + if (endpointConfig.send_id == null) { endpointConfig.send_id = "implicit"; @@ -371,7 +408,8 @@ private void connectMqttClient() { mqttPipe = new SimpleMqttPipe(endpointConfig); info("Initialized SimpleMqttPipe"); } catch (Exception e) { - error("Failed to initialize SimpleMqttPipe: " + friendlyStackTrace(e)); + error("Failed to initialize SimpleMqttPipe connecting to broker %s:%s: %s", + brokerHost, brokerPort, friendlyStackTrace(e)); } } @@ -392,7 +430,6 @@ public Entry fetchConfig(String registryId, String deviceId) { @Override public CloudModel fetchDevice(String registryId, String deviceId) { - touchDeviceEntry(registryId, deviceId); Map properties = registryDeviceRef(registryId, deviceId).entries(); if (properties == null) { return null; @@ -418,9 +455,11 @@ public CloudModel fetchDevice(String registryId, String deviceId) { cloudModel.password = properties.get(AUTH_PASSWORD_PROPERTY); - cloudModel.gateway = new GatewayModel(); - cloudModel.gateway.proxy_ids = - listBoundDevices(registryId, deviceId).keySet().stream().toList(); + if (GATEWAY.toString().equals(properties.get(RESOURCE_TYPE_PROPERTY))) { + cloudModel.gateway = new GatewayModel(); + cloudModel.gateway.proxy_ids = + listBoundDevices(registryId, deviceId).keySet().stream().toList(); + } cloudModel.operation = READ; return cloudModel; } @@ -456,16 +495,47 @@ public boolean isEnabled() { @Override public CloudModel listDevices(String registryId, Consumer progress) { Map entries = registryDevicesRef(registryId).entries(); - ifNotNullThen(progress, p -> p.accept(format("Fetched %d devices.", entries.size()))); + List deviceIds = entries.keySet().stream().toList(); + int total = deviceIds.size(); + ifNotNullThen(progress, p -> p.accept(format("Fetching %d devices...", total))); + Map deviceIdsMap = new ConcurrentHashMap<>(); + for (int i = 0; i < total; i += DEVICE_FETCH_BATCH_SIZE) { + List batch = deviceIds.subList(i, Math.min(i + DEVICE_FETCH_BATCH_SIZE, total)); + batch.parallelStream().forEach(id -> { + CloudModel partial = fetchDevicePartial(registryId, id); + if (partial != null) { + deviceIdsMap.put(id, partial); + } + }); + int currentCount = Math.min(i + DEVICE_FETCH_BATCH_SIZE, total); + ifNotNullThen(progress, p -> p.accept(format("Fetched %d devices...", currentCount))); + } CloudModel cloudModel = new CloudModel(); - cloudModel.device_ids = entries.keySet().stream().collect( - Collectors.toMap(id -> id, id -> fetchDevice(registryId, id))); + cloudModel.device_ids = deviceIdsMap; cloudModel.operation = READ; return cloudModel; } + private CloudModel fetchDevicePartial(String registryId, String deviceId) { + Map properties = registryDeviceRef(registryId, deviceId).entries(); + if (properties == null) { + return null; + } + CloudModel cloudModel = new CloudModel(); + cloudModel.num_id = properties.get(NUM_ID_PROPERTY); + String authType = properties.get(AUTH_TYPE_PROPERTY); + if (authType != null) { + cloudModel.auth_type = CloudModel.Auth_type.fromValue(authType); + } + cloudModel.resource_type = ofNullable(properties.get(RESOURCE_TYPE_PROPERTY)) + .map(Resource_type::fromValue).orElse(DIRECT); + cloudModel.blocked = "true".equals(properties.get(BLOCKED_PROPERTY)) ? true : null; + cloudModel.updated_time = JsonUtil.getDate(properties.get(CREATED_AT_PROPERTY)); + return cloudModel; + } + private Map listBoundDevices(String registryId, String gatewayId) { - Set deviceIds = registryDevicesRef(registryId).entries().keySet(); + Set deviceIds = gatewayBoundRef(registryId, gatewayId).entries().keySet(); Map devices = deviceIds.stream().filter(deviceId -> { String boundTo = registryDeviceRef(registryId, deviceId).get(BOUND_TO_KEY); return gatewayId.equals(boundTo); @@ -491,13 +561,15 @@ public CloudModel modelDevice(String registryId, String deviceId, CloudModel clo case UPDATE -> updateDevice(registryId, deviceId, cloudModel); case DELETE -> deleteDevice(registryId, deviceId, cloudModel); case MODIFY -> modifyDevice(registryId, deviceId, cloudModel); - case BIND -> bindDevicesToGateway(registryId, deviceId, cloudModel); - case UNBIND -> unbindDevicesFromGateway(registryId, deviceId, cloudModel); + case BIND -> bindDevicesToGateway(registryId, deviceId, cloudModel, progress); + case UNBIND -> unbindDevicesFromGateway(registryId, deviceId, cloudModel, progress); case BLOCK -> blockDevice(registryId, deviceId, cloudModel); default -> throw new RuntimeException("Unknown device operation " + operation); } return getReply(registryId, deviceId, cloudModel, deleteNumId); } catch (Exception e) { + error("Error during modelDevice %s for %s/%s: %s", + operation, registryId, deviceId, friendlyStackTrace(e)); throw new RuntimeException(format("While %sing %s/%s", operation, registryId, deviceId), e); } } @@ -527,11 +599,18 @@ public void saveState(String registryId, String deviceId, String stateBlob) { @Override public void sendCommandBase(Envelope baseEnvelope, SubFolder folder, String message) { - Envelope envelope = deepCopy(baseEnvelope); - envelope.subFolder = folder; - envelope.subType = SubType.COMMANDS; - envelope.source = IotProvider.IMPLICIT.value(); - reflect.getDispatcher().withEnvelope(envelope).publish(asMap(message)); + try { + Envelope envelope = deepCopy(baseEnvelope); + envelope.subFolder = folder; + envelope.subType = SubType.COMMANDS; + envelope.source = IotProvider.IMPLICIT.value(); + reflect.getDispatcher().withEnvelope(envelope).publish(asMap(message)); + } catch (Exception e) { + error("Failed to send command for %s/%s: %s", + baseEnvelope.deviceRegistryId, baseEnvelope.deviceId, friendlyStackTrace(e)); + throw new RuntimeException("While sending command for " + + baseEnvelope.deviceRegistryId + "/" + baseEnvelope.deviceId, e); + } } @Override @@ -544,6 +623,7 @@ public void shutdown() { } }); connLogger.cancel(true); + broker.shutdown(); super.shutdown(); } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java index 7f0a82539b..3d9de79cb3 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java @@ -3,8 +3,8 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.api.core.ApiFuture; +import com.google.bos.udmi.service.messaging.impl.SimpleMqttPipe; import com.google.cloud.pubsub.v1.Publisher; -import com.google.common.base.Splitter; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; @@ -22,11 +22,8 @@ import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; @@ -68,7 +65,6 @@ */ public final class MqttToPubSubBridge { - private static final Pattern TOPIC_PATTERN = Pattern.compile("/r/([^/]+)/d/([^/]+)/?(.*)"); private static final Logger logger = LoggerFactory.getLogger(MqttToPubSubBridge.class); /** @@ -227,30 +223,16 @@ public void messageArrived(String topic, MqttMessage message) { logger.info( "MQTT Message Received - Topic: {}, Payload Length: {}", topic, payload.length); - Matcher matcher = TOPIC_PATTERN.matcher(topic); - String registryId = "unknown"; - String deviceId = "unknown"; - String topicSuffix = ""; - if (matcher.matches()) { - registryId = matcher.group(1); - deviceId = matcher.group(2); - topicSuffix = matcher.group(3); - } else { - logger.warn("Could not parse registry/device from topic: {}", topic); + Map attributes; + try { + attributes = new HashMap<>(SimpleMqttPipe.parseEnvelopeTopic(topic)); + } catch (Exception e) { + logger.warn("Could not parse envelope from topic: {}", topic, e); + attributes = new HashMap<>(); } - - // Prepare Pub/Sub message - Map attributes = new HashMap<>(); attributes.put("mqttTopic", topic); - attributes.put("deviceId", deviceId); - attributes.put("deviceRegistryId", registryId); - - if (topicSuffix != null && topicSuffix.startsWith("events/")) { - List parts = Splitter.on('/').splitToList(topicSuffix); - if (parts.size() >= 2) { - attributes.put("subFolder", parts.get(1)); - } - } + attributes.putIfAbsent("deviceId", "unknown"); + attributes.putIfAbsent("deviceRegistryId", "unknown"); ByteString data = ByteString.copyFrom(payload); PubsubMessage.Builder pubsubMessageBuilder = diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java index 9c64c318ea..1cdfa3e03e 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java @@ -62,7 +62,7 @@ public abstract class MessageBase extends ContainerBase implements MessagePipe { public static final String INVALID_ENVELOPE_KEY = "invalid"; - public static final int EXECUTION_THREADS = 4; + public static final int EXECUTION_THREADS = 100; public static final String ERROR_MESSAGE_MARKER = "error-mark"; public static final String PUBLISH_STATS = "publish"; public static final String RECEIVE_STATS = "receive"; @@ -73,7 +73,7 @@ public abstract class MessageBase extends ContainerBase implements MessagePipe { private static final Set HANDLED_QUEUES = new HashSet<>(); private static final long DEFAULT_POLL_TIME_SEC = 1; private static final long AWAIT_TERMINATION_SEC = 10; - private static final int DEFAULT_CAPACITY = 1000; + private static final int DEFAULT_CAPACITY = 2000; protected final int queueCapacity; protected final long publishDelaySec; private final ExecutorService executor = Executors.newFixedThreadPool(EXECUTION_THREADS); diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java index 5aa7c59b9e..9b8cd79ff8 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java @@ -56,9 +56,10 @@ public class SimpleMqttPipe extends MessageBase { private static final String BROKER_URL_FORMAT = "%s://%s:%s"; private static final long RECONNECT_SEC = 10; private static final int DEFAULT_PORT = 8883; + private static final String LEGACY_TOPIC_PREFIX = "/devices/"; + private static final String IMPLICIT_TOPIC_PREFIX = "/r/"; private static final Envelope EXCEPTION_ENVELOPE = makeExceptionEnvelope(); private static final String SUB_BASE_FORMAT = "/r/+/d/+/%s"; - private static final String SSL_SECRETS_DIR = System.getenv("SSL_SECRETS_DIR"); private static final String DEFAULT_NAMESPACE = "default"; private static final long CONNECT_TIMEOUT_SEC = 10; private final String autoId = format("mqtt-%08x", (long) (Math.random() * 0x100000000L)); @@ -89,10 +90,22 @@ public SimpleMqttPipe(EndpointConfiguration config) { (publishMessages && sendId.startsWith(SEND_CHANNEL_PREFIX)) ? ("/" + sendId) : ""; clientId = ofNullable(config.client_id).orElse(autoId); - File secretsDir = ifTrueGet(isNotEmpty(SSL_SECRETS_DIR), () -> new File(SSL_SECRETS_DIR)); - certManager = ifNotNullGet(secretsDir, - secrets -> new CertManager(new File(secrets, CertManager.CA_CERT_FILE), secrets, - endpoint.transport, endpoint.auth_provider.basic.password, this::info)); + boolean useSsl = Transport.SSL.equals(endpoint.transport) + || (endpoint.port != null && endpoint.port == 8883); + if (useSsl) { + checkState(isNotEmpty(endpoint.ca_file), + "Missing required ca_file in endpoint configuration for SSL connection"); + checkState(isNotEmpty(endpoint.cert_file), + "Missing required cert_file in endpoint configuration for SSL connection"); + checkState(isNotEmpty(endpoint.key_file), + "Missing required key_file in endpoint configuration for SSL connection"); + String pass = ifNotNullGet(endpoint.auth_provider, + p -> ifNotNullGet(p.basic, b -> b.password)); + certManager = new CertManager(new File(endpoint.ca_file), new File(endpoint.cert_file), + new File(endpoint.key_file), endpoint.transport, pass, this::info); + } else { + certManager = null; + } mqttClient = createMqttClient(); tryConnect(false); scheduledFuture = Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay( @@ -120,35 +133,81 @@ private static Envelope makeExceptionEnvelope() { return envelope; } - static Map parseEnvelopeTopic(String topic) { + public static boolean isLegacyTopic(String topic) { + return topic != null && topic.startsWith(LEGACY_TOPIC_PREFIX); + } + + private static Map parseLegacyTopic(String topic) { + if (topic == null) { + throw new IllegalArgumentException("Topic cannot be null"); + } + String cleanTopic = topic.startsWith("/") ? topic.substring(1) : topic; + String[] parts = cleanTopic.split("/"); + Envelope envelope = new Envelope(); + if (parts.length >= 2 && !parts[1].isEmpty()) { + envelope.deviceId = nullAsNull(parts[1]); + } + if (parts.length >= 3 && !parts[2].isEmpty()) { + envelope.subType = convertSubType(parts[2]); + } + if (parts.length >= 4 && !parts[3].isEmpty()) { + // TODO: technically the subfolder is the remainder including all slashes until the end. + envelope.subFolder = convertSubFolder(parts[3]); + } + return toStringMap(envelope); + } + + private static Map parseImplicitTopic(String topic) { + if (topic == null) { + throw new IllegalArgumentException("Topic cannot be null"); + } + // 0/1/2 /3/4 /5 [/6 [/7 ]] + // /r/REGISTRY/d/DEVICE/TYPE[/FOLDER[/GATEWAY]] + String[] parts = topic.split("/", 12); + if (parts.length < 6 || parts.length > 10) { + throw new IllegalArgumentException("Unexpected topic length: " + topic); + } + Envelope envelope = new Envelope(); + checkState(Strings.isNullOrEmpty(parts[0]), "non-empty prefix"); + checkState("r".equals(parts[1]), "expected registries"); + envelope.deviceRegistryId = nullAsNull(parts[2]); + checkState("d".equals(parts[3]), "expected devices"); + envelope.deviceId = nullAsNull(parts[4]); + int base = parts[5].equals(SEND_CHANNEL_DESIGNATOR) ? 2 : 0; + if (parts.length < base + 6) { + throw new IllegalArgumentException("Unexpected topic length for implicit topic: " + topic); + } + if (base > 0) { + envelope.source = parts[6]; + } + envelope.subType = convertSubType(parts[base + 5]); + if (parts.length > base + 6 && !parts[base + 6].isEmpty()) { + envelope.subFolder = convertSubFolder(parts[base + 6]); + } + if (parts.length > base + 7 && !parts[base + 7].isEmpty()) { + envelope.gatewayId = nullAsNull(parts[base + 7]); + } + if (parts.length > base + 8) { + throw new IllegalArgumentException("Unrecognized extra topic arguments: " + parts[base + 8]); + } + return toStringMap(envelope); + } + + /** + * Parse an MQTT envelope topic (either legacy or implicit format) into attributes map. + * + * @param topic The MQTT topic string + * @return Map of envelope attributes + */ + public static Map parseEnvelopeTopic(String topic) { try { - // 0/1/2 /3/4 /5 [/6 [/7 ]] - // /r/REGISTRY/d/DEVICE/TYPE[/FOLDER[/GATEWAY]] - String[] parts = topic.split("/", 12); - if (parts.length < 6 || parts.length > 10) { - throw new RuntimeException("Unexpected topic length: " + topic); - } - Envelope envelope = new Envelope(); - checkState(Strings.isNullOrEmpty(parts[0]), "non-empty prefix"); - checkState("r".equals(parts[1]), "expected registries"); - envelope.deviceRegistryId = nullAsNull(parts[2]); - checkState("d".equals(parts[3]), "expected devices"); - envelope.deviceId = nullAsNull(parts[4]); - int base = parts[5].equals(SEND_CHANNEL_DESIGNATOR) ? 2 : 0; - if (base > 0) { - envelope.source = parts[6]; - } - envelope.subType = convertSubType(parts[base + 5]); - if (parts.length > base + 6) { - envelope.subFolder = convertSubFolder(parts[base + 6]); - } - if (parts.length > base + 7) { - envelope.gatewayId = nullAsNull(parts[base + 7]); - } - if (parts.length > base + 8) { - throw new RuntimeException("Unrecognized extra topic arguments: " + parts[base + 8]); + if (isLegacyTopic(topic)) { + return parseLegacyTopic(topic); + } else if (topic != null && topic.startsWith(IMPLICIT_TOPIC_PREFIX)) { + return parseImplicitTopic(topic); + } else { + throw new IllegalArgumentException("Unrecognized topic structure: " + topic); } - return toStringMap(envelope); } catch (Exception e) { throw new RuntimeException("While parsing envelope topic " + topic, e); } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java index fb75ea41e4..ffacb8f4a9 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java @@ -17,6 +17,8 @@ public interface ConnectionBroker { void unbindGateway(String gatewayId, String deviceId); + void shutdown(); + /** * Simple event for connection broker happenings. */ diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/DataRef.java b/udmis/src/main/java/com/google/bos/udmi/service/support/DataRef.java index 3ee67f6340..9d5b36fcb8 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/DataRef.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/DataRef.java @@ -3,6 +3,7 @@ import static java.lang.String.format; import java.util.Map; +import java.util.Set; /** * Container reference class for a database entry. @@ -43,6 +44,8 @@ public DataRef device(String deviceId) { public abstract void put(String key, String value); + public abstract void update(Map puts, Set deletes); + /** * Add a registry specification. */ diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java index 5d4ff36fb9..79e6c0885a 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/EtcdDataProvider.java @@ -16,10 +16,14 @@ import io.etcd.jetcd.Lock; import io.etcd.jetcd.cluster.Member; import io.etcd.jetcd.kv.GetResponse; +import io.etcd.jetcd.op.Op; +import io.etcd.jetcd.options.DeleteOption; import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; import java.net.URI; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -270,6 +274,33 @@ public AutoCloseable lock() { public void put(String key, String value) { putKey(getKeyPath(key), value); } + + @Override + public void update(Map puts, Set deletes) { + try { + List ops = new ArrayList<>(); + if (puts != null) { + puts.forEach((key, value) -> { + if (value != null) { + ops.add(Op.put(bytes(getKeyPath(key)), bytes(value), PutOption.DEFAULT)); + } + }); + } + if (deletes != null) { + deletes.forEach(key -> { + if (key != null) { + ops.add(Op.delete(bytes(getKeyPath(key)), DeleteOption.DEFAULT)); + } + }); + } + if (!ops.isEmpty()) { + kvClient.txn().Then(ops.toArray(new Op[0])).commit() + .get(QUERY_TIMEOUT_SEC, TimeUnit.SECONDS); + } + } catch (Exception e) { + throw new RuntimeException("While executing batch update on " + getKeyPath(""), e); + } + } } private class LockCloser implements AutoCloseable { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index 8264461074..2844eda431 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -4,40 +4,124 @@ import static com.google.udmi.util.GeneralUtils.catchToElse; import static com.google.udmi.util.GeneralUtils.catchToNull; import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; -import static com.google.udmi.util.GeneralUtils.ifNotNullThen; -import static java.lang.String.format; import static java.util.Optional.ofNullable; import com.google.bos.udmi.service.pod.ContainerBase; import java.io.BufferedReader; +import java.io.File; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import udmi.schema.EndpointConfiguration; +import udmi.schema.EndpointConfiguration.Transport; /** * Provider that links directly to a mosquitto broker. + * + *

    NOTE: The topic structure used in ACLs in this file is hardcoded to use the deviceId + * directly (e.g., deviceId/config, deviceId/commands, etc.) instead of using properties + * from the endpoint configuration. */ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { - private static final String UDMI_ROOT = System.getenv("UDMI_ROOT"); - private static final String MOSQUCTL_CLIENT_FMT = UDMI_ROOT + "/bin/mosquctl_client %s %s"; - private static final String MOSQUCTL_LOG_FMT = UDMI_ROOT + "/bin/mosquctl_log %s"; - private static final String MOSQUCTL_GATEWAY_FMT = UDMI_ROOT + "/bin/mosquctl_gateway %s %s %s"; + private static final long EXEC_TIMEOUT_SEC = 10; private static final String REVOKE_PASSWORD = "--"; + private static final String DEFAULT_MOSQUITTO_LOG_PATH = "/var/log/mosquitto/mosquitto.log"; private static final Pattern LOG_MATCHER = Pattern.compile("([0-9]+): (\\S+) (\\S+) (\\S+) (\\S+) (.*)"); private static final Pattern PUBLISH_MATCHER = Pattern.compile("\\(([d01,qr ]+), m([0-9]+), '(\\S+)', .*\\)"); private static final Pattern PUBACK_MATCHER = Pattern.compile("\\((m|Mid: )([0-9]+), \\S+\\)"); private final ContainerBase container; + private final EndpointConfiguration endpointConfig; + private final boolean disableLogging; + private final Object tailLock = new Object(); + private Process tailProcess; + + /** + * Create a new broker connection provider. + */ + public MosquittoBroker(ContainerBase container, EndpointConfiguration endpointConfig) { + this(container, endpointConfig, false); + } - public MosquittoBroker(ContainerBase container) { + /** + * Create a new broker connection provider with logging controls. + */ + public MosquittoBroker(ContainerBase container, EndpointConfiguration endpointConfig, + boolean disableLogging) { this.container = container; + this.endpointConfig = endpointConfig; + this.disableLogging = disableLogging; + if (!disableLogging) { + File logFile = new File(getMosquittoLogPath()); + if (!logFile.canRead()) { + throw new RuntimeException( + "Mosquitto log file is not readable: " + logFile.getAbsolutePath()); + } + } + } + + private String getMosquittoLogPath() { + return ofNullable(System.getenv("MOSQUITTO_LOG_PATH")).orElse(DEFAULT_MOSQUITTO_LOG_PATH); + } + + private String getMosquittoCtrlPath() { + return ofNullable(System.getenv("MOSQUITTO_CTRL_PATH")).orElse("mosquitto_ctrl"); + } + + private List buildCommandPrefix() { + List cmd = new ArrayList<>(); + cmd.add(getMosquittoCtrlPath()); + + if (endpointConfig.hostname != null) { + cmd.add("-h"); + cmd.add(endpointConfig.hostname); + } + if (endpointConfig.port != null) { + cmd.add("-p"); + cmd.add(endpointConfig.port.toString()); + } + if (endpointConfig.auth_provider != null && endpointConfig.auth_provider.basic != null) { + if (endpointConfig.auth_provider.basic.username != null) { + cmd.add("-u"); + cmd.add(endpointConfig.auth_provider.basic.username); + } + if (endpointConfig.auth_provider.basic.password != null) { + cmd.add("-P"); + cmd.add(endpointConfig.auth_provider.basic.password); + } + } + + boolean useSsl = endpointConfig.transport == Transport.SSL + || (endpointConfig.port != null && endpointConfig.port == 8883) + || endpointConfig.ca_file != null; + + if (useSsl) { + checkState(endpointConfig.ca_file != null && !endpointConfig.ca_file.isEmpty(), + "Missing required ca_file in endpoint configuration for SSL connection"); + checkState(endpointConfig.cert_file != null && !endpointConfig.cert_file.isEmpty(), + "Missing required cert_file in endpoint configuration for SSL connection"); + checkState(endpointConfig.key_file != null && !endpointConfig.key_file.isEmpty(), + "Missing required key_file in endpoint configuration for SSL connection"); + cmd.add("--cafile"); + cmd.add(endpointConfig.ca_file); + cmd.add("--cert"); + cmd.add(endpointConfig.cert_file); + cmd.add("--key"); + cmd.add(endpointConfig.key_file); + cmd.add("--insecure"); + } + + cmd.add("dynsec"); + return cmd; } private void consumeLogs(String clientPrefix, Consumer eventConsumer) { @@ -61,38 +145,160 @@ private void consumeStream(BufferedReader reader, Consumer consumer) { thread.start(); } - private void executeCommand(String cmd) { - synchronized (MosquittoBroker.class) { + private void executeCommand(List cmd) { + try { + info("Executing command %s", String.join(" ", cmd)); + ProcessBuilder pb = new ProcessBuilder(cmd); + pb.redirectErrorStream(true); // Merge stderr into stdout + Process exec = pb.start(); + + // Read output asynchronously to prevent buffer locks + CompletableFuture outputHandler = CompletableFuture.runAsync(() -> + exec.inputReader().lines().forEach(container::info) + ); + + // Enforce timeout + if (!exec.waitFor(EXEC_TIMEOUT_SEC, TimeUnit.SECONDS)) { + exec.destroyForcibly(); + outputHandler.cancel(true); + throw new RuntimeException("Command timed out: " + String.join(" ", cmd)); + } + try { - info("Executing command %s", cmd); - Process exec = Runtime.getRuntime().exec(cmd); - exec.waitFor(EXEC_TIMEOUT_SEC, TimeUnit.SECONDS); - exec.errorReader().lines().forEach(container::info); - exec.inputReader().lines().forEach(container::info); - int exitValue = exec.exitValue(); - checkState(exitValue == 0, "exit return code " + exitValue); + outputHandler.join(); } catch (Exception e) { - throw new RuntimeException("While executing " + cmd, e); + warn("Output handler interrupted or cancelled: " + e.getMessage()); } + int exitValue = exec.exitValue(); + checkState(exitValue == 0, "exit return code " + exitValue); + } catch (Exception e) { + throw new RuntimeException("While executing " + String.join(" ", cmd), e); } } private void mosquctlClient(String clientId, String clientPass) { - executeCommand(format(MOSQUCTL_CLIENT_FMT, clientId, clientPass)); + String clientUser = clientId; + String roleName = "role_" + clientId.replace("/", "_"); + + if ("--".equals(clientPass)) { + deleteClient(clientUser); + deleteRole(roleName); + info("Device %s deleted correctly.", clientId); + } else { + try { + createClient(clientUser, clientPass, clientId); + } catch (Exception e) { + warn("Client likely exists, updating password for %s: %s", clientUser, e.getMessage()); + setClientPassword(clientUser, clientPass); + } + try { + createRole(roleName); + } catch (Exception e) { + warn("Ignore error creating role: " + e.getMessage()); + } + addClientRole(clientUser, roleName); + + addRoleAcl(roleName, "subscribePattern", clientId + "/config", "allow"); + addRoleAcl(roleName, "subscribePattern", clientId + "/commands", "allow"); + addRoleAcl(roleName, "subscribePattern", clientId + "/errors", "allow"); + addRoleAcl(roleName, "publishClientSend", clientId + "/events/#", "allow"); + addRoleAcl(roleName, "publishClientSend", clientId + "/state", "allow"); + + info("Device %s registered correctly.", clientId); + } + } + + private void setClientPassword(String clientUser, String clientPass) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("setClientPassword"); + cmd.add(clientUser); + cmd.add(clientPass); + executeCommand(cmd); + } + + private void addRoleAcl(String roleName, String type, String pattern, String allow) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("addRoleACL"); + cmd.add(roleName); + cmd.add(type); + cmd.add(pattern); + cmd.add(allow); + executeCommand(cmd); + } + + private void deleteClient(String clientUser) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("deleteClient"); + cmd.add(clientUser); + try { + executeCommand(cmd); + } catch (Exception e) { + warn("Ignore error deleting client: " + e.getMessage()); + } + } + + private void deleteRole(String roleName) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("deleteRole"); + cmd.add(roleName); + try { + executeCommand(cmd); + } catch (Exception e) { + warn("Ignore error deleting role: " + e.getMessage()); + } + } + + private void createClient(String clientUser, String clientPass, String clientId) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("createClient"); + cmd.add(clientUser); + cmd.add("-p"); + cmd.add(clientPass); + cmd.add("-c"); + cmd.add(clientId); + executeCommand(cmd); + } + + private void createRole(String roleName) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("createRole"); + cmd.add(roleName); + executeCommand(cmd); + } + + private void addClientRole(String clientUser, String roleName) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("addClientRole"); + cmd.add(clientUser); + cmd.add(roleName); + executeCommand(cmd); } private void mosquctlLog(String clientPrefix, Consumer eventConsumer) { - String cmd = format(MOSQUCTL_LOG_FMT, clientPrefix); - synchronized (MosquittoBroker.class) { + if (disableLogging) { + info("Mosquitto logging disabled, skipping log consumer for prefix %s", clientPrefix); + return; + } + synchronized (tailLock) { try { - info("Starting log consumer %s", cmd); - Process exec = Runtime.getRuntime().exec(cmd); + info("Starting log consumer for prefix %s", clientPrefix); + ProcessBuilder pb = new ProcessBuilder("tail", "-f", getMosquittoLogPath()); + if (tailProcess != null) { + tailProcess.destroy(); + } + tailProcess = pb.start(); + Process exec = tailProcess; consumeStream(exec.errorReader(), line -> warn("log error: " + line)); - consumeStream(exec.inputReader(), line -> ifNotNullThen(parseLogLine(line), eventConsumer)); + consumeStream(exec.inputReader(), line -> { + BrokerEvent event = parseLogLine(line); + if (event != null && event.clientId != null && event.clientId.startsWith(clientPrefix)) { + eventConsumer.accept(event); + } + }); } catch (Exception e) { - throw new RuntimeException("While executing " + cmd, e); + throw new RuntimeException("While starting log consumer", e); } finally { - info("Completed log consumer"); + info("Completed log consumer setup"); } } } @@ -155,12 +361,51 @@ public void authorize(String clientId, String password) { @Override public void bindGateway(String gatewayId, String deviceId) { - executeCommand(format(MOSQUCTL_GATEWAY_FMT, "bind", gatewayId, deviceId)); + String roleName = "role_" + gatewayId.replace("/", "_"); + + // add ACLs + addRoleAcl(roleName, "subscribePattern", deviceId + "/config", "allow"); + addRoleAcl(roleName, "subscribePattern", deviceId + "/commands", "allow"); + addRoleAcl(roleName, "subscribePattern", deviceId + "/errors", "allow"); + addRoleAcl(roleName, "publishClientSend", deviceId + "/events/#", "allow"); + addRoleAcl(roleName, "publishClientSend", deviceId + "/state", "allow"); + addRoleAcl(roleName, "publishClientSend", deviceId + "/attach", "allow"); } @Override public void unbindGateway(String gatewayId, String deviceId) { - info("Unbind device Id: %s from gateway Id: %s :%s", deviceId, gatewayId); - executeCommand(format(MOSQUCTL_GATEWAY_FMT, "unbind", gatewayId, deviceId)); + info("Unbind device Id: %s from gateway Id: %s", deviceId, gatewayId); + String roleName = "role_" + gatewayId.replace("/", "_"); + + removeRoleAcl(roleName, "subscribePattern", deviceId + "/config"); + removeRoleAcl(roleName, "subscribePattern", deviceId + "/commands"); + removeRoleAcl(roleName, "subscribePattern", deviceId + "/errors"); + removeRoleAcl(roleName, "publishClientSend", deviceId + "/events/#"); + removeRoleAcl(roleName, "publishClientSend", deviceId + "/state"); + removeRoleAcl(roleName, "publishClientSend", deviceId + "/attach"); + } + + private void removeRoleAcl(String roleName, String type, String pattern) { + List cmd = new ArrayList<>(buildCommandPrefix()); + cmd.add("removeRoleACL"); + cmd.add(roleName); + cmd.add(type); + cmd.add(pattern); + try { + executeCommand(cmd); + } catch (Exception e) { + warn("Ignore error removing role ACL: " + e.getMessage()); + } + } + + @Override + public void shutdown() { + synchronized (tailLock) { + if (tailProcess != null) { + tailProcess.destroy(); + tailProcess = null; + } + } + super.shutdown(); } } diff --git a/udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java b/udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java new file mode 100644 index 0000000000..597cb639d9 --- /dev/null +++ b/udmis/src/test/java/com/google/bos/udmi/service/access/ImplicitIotAccessProviderTest.java @@ -0,0 +1,162 @@ +package com.google.bos.udmi.service.access; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import com.google.bos.udmi.service.core.ReflectProcessor; +import com.google.bos.udmi.service.pod.UdmiServicePod; +import com.google.bos.udmi.service.support.ConnectionBroker; +import com.google.bos.udmi.service.support.DataRef; +import com.google.bos.udmi.service.support.IotDataProvider; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import udmi.schema.CloudModel; +import udmi.schema.CloudModel.Auth_type; +import udmi.schema.CloudModel.ModelOperation; +import udmi.schema.Credential; +import udmi.schema.Credential.Key_format; +import udmi.schema.IotAccess; + +class ImplicitIotAccessProviderTest { + + private static final String TEST_REGISTRY = "test-reg"; + private static final String TEST_DEVICE = "test-dev"; + private static final String TEST_PASSWORD = "supersecret"; + private static final String CLIENT_ID = "/r/test-reg/d/test-dev"; + + private Map store; + private ImplicitIotAccessProvider provider; + private ConnectionBroker mockBroker; + + @BeforeEach + void setUp() throws Exception { + UdmiServicePod.resetForTest(); + store = new HashMap<>(); + IotDataProvider mockDatabase = mock(IotDataProvider.class); + when(mockDatabase.ref()).thenAnswer(inv -> new FakeDataRef(store)); + UdmiServicePod.putComponent("database", () -> mockDatabase); + + ReflectProcessor mockReflect = mock(ReflectProcessor.class); + UdmiServicePod.putComponent("reflect", () -> mockReflect); + + IotAccess iotAccess = new IotAccess(); + iotAccess.options = "enable, use_password=" + TEST_PASSWORD + ", disable_logging=true"; + provider = new ImplicitIotAccessProvider(iotAccess); + provider.activate(); + + mockBroker = mock(ConnectionBroker.class); + Field brokerField = ImplicitIotAccessProvider.class.getDeclaredField("broker"); + brokerField.setAccessible(true); + brokerField.set(provider, mockBroker); + } + + @AfterEach + void tearDown() { + if (provider != null) { + provider.shutdown(); + } + UdmiServicePod.resetForTest(); + } + + @Test + void testAuthorizeWithCredentials() { + CloudModel cloudModel = new CloudModel(); + cloudModel.operation = ModelOperation.CREATE; + Credential credential = new Credential(); + credential.key_format = Key_format.RS_256; + credential.key_data = "fake_key_data"; + cloudModel.credentials = List.of(credential); + + provider.modelDevice(TEST_REGISTRY, TEST_DEVICE, cloudModel, null); + + verify(mockBroker).authorize(eq(CLIENT_ID), eq(TEST_PASSWORD)); + } + + @Test + void testAuthorizeWithAuthType() { + CloudModel cloudModel = new CloudModel(); + cloudModel.operation = ModelOperation.CREATE; + cloudModel.auth_type = Auth_type.RS_256; + + provider.modelDevice(TEST_REGISTRY, TEST_DEVICE, cloudModel, null); + + verify(mockBroker).authorize(eq(CLIENT_ID), eq(TEST_PASSWORD)); + } + + @Test + void testDoNotAuthorizeWithoutCredentialsOrAuthType() { + CloudModel cloudModel = new CloudModel(); + cloudModel.operation = ModelOperation.CREATE; + + provider.modelDevice(TEST_REGISTRY, TEST_DEVICE, cloudModel, null); + + verifyNoInteractions(mockBroker); + } + + class FakeDataRef extends DataRef { + private final Map data; + + public FakeDataRef(Map data) { + this.data = data; + } + + private String getKeyPath(String key) { + return (registryId != null ? "r/" + registryId : "") + + (deviceId != null ? "/d/" + deviceId : "") + + (collection != null ? "/c/" + collection : "") + + ":" + + key; + } + + @Override + public void delete(String key) { + data.remove(getKeyPath(key)); + } + + @Override + public Map entries() { + String prefix = getKeyPath(""); + Map res = new HashMap<>(); + for (Map.Entry entry : data.entrySet()) { + if (entry.getKey().startsWith(prefix)) { + res.put(entry.getKey().substring(prefix.length()), entry.getValue()); + } + } + return res; + } + + @Override + public String get(String key) { + return data.get(getKeyPath(key)); + } + + @Override + public AutoCloseable lock() { + return () -> {}; + } + + @Override + public void put(String key, String value) { + data.put(getKeyPath(key), value); + } + + @Override + public void update(Map puts, Set deletes) { + if (puts != null) { + puts.forEach(this::put); + } + if (deletes != null) { + deletes.forEach(this::delete); + } + } + } +} diff --git a/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java b/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java index e950b3034a..2c4da14c5c 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java @@ -56,13 +56,14 @@ void testSetupBridge() throws Exception { assertEquals(testTopic, attributes.get("mqttTopic")); assertEquals("my-device", attributes.get("deviceId")); assertEquals("my-registry", attributes.get("deviceRegistryId")); + assertEquals("events", attributes.get("subType")); } @Test void testSetupBridgeWithSubFolder() throws Exception { IMqttClient mockMqttClient = mock(IMqttClient.class); Publisher mockPublisher = mock(Publisher.class); - String testTopic = "/r/my-registry/d/my-device/events/subfolder_name"; + String testTopic = "/r/my-registry/d/my-device/events/pointset"; String payloadStr = "Hello World"; final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); @@ -86,7 +87,40 @@ void testSetupBridgeWithSubFolder() throws Exception { assertEquals(testTopic, attributes.get("mqttTopic")); assertEquals("my-device", attributes.get("deviceId")); assertEquals("my-registry", attributes.get("deviceRegistryId")); - assertEquals("subfolder_name", attributes.get("subFolder")); + assertEquals("pointset", attributes.get("subFolder")); + assertEquals("events", attributes.get("subType")); + } + + @Test + void testSetupBridgeLegacyTopic() throws Exception { + IMqttClient mockMqttClient = mock(IMqttClient.class); + Publisher mockPublisher = mock(Publisher.class); + String testTopic = "/devices/my-device/events/pointset"; + String payloadStr = "Hello World"; + final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); + + when(mockPublisher.publish(any(PubsubMessage.class))) + .thenReturn(ApiFutures.immediateFuture("msg-123")); + + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); + verify(mockMqttClient).setCallback(callbackCaptor.capture()); + MqttCallback callback = callbackCaptor.getValue(); + + callback.messageArrived(testTopic, mqttMessage); + + ArgumentCaptor pubsubMessageCaptor = + ArgumentCaptor.forClass(PubsubMessage.class); + verify(mockPublisher).publish(pubsubMessageCaptor.capture()); + + PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue(); + Map attributes = pubsubMessage.getAttributesMap(); + assertEquals(testTopic, attributes.get("mqttTopic")); + assertEquals("my-device", attributes.get("deviceId")); + assertEquals("unknown", attributes.get("deviceRegistryId")); + assertEquals("pointset", attributes.get("subFolder")); + assertEquals("events", attributes.get("subType")); } @Test diff --git a/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipeTest.java b/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipeTest.java index effa195e3e..3f714fcd4f 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipeTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipeTest.java @@ -100,5 +100,17 @@ public void staticEnvelopeTopic() { assertEquals("g", channelState.get(GATEWAY_ID_KEY)); assertEquals("control", channelState.get(SOURCE_KEY)); assertEquals(6, channelState.keySet().size()); + + Map legacyBasic = parseEnvelopeTopic("/devices/my-dev/events"); + assertEquals("my-dev", legacyBasic.get(DEVICE_ID_KEY)); + assertEquals("events", legacyBasic.get(SUBTYPE_PROPERTY_KEY)); + assertNull(legacyBasic.get(SUBFOLDER_PROPERTY_KEY)); + assertEquals(2, legacyBasic.keySet().size()); + + Map legacyPointset = parseEnvelopeTopic("/devices/my-dev/events/pointset"); + assertEquals("my-dev", legacyPointset.get(DEVICE_ID_KEY)); + assertEquals("events", legacyPointset.get(SUBTYPE_PROPERTY_KEY)); + assertEquals("pointset", legacyPointset.get(SUBFOLDER_PROPERTY_KEY)); + assertEquals(3, legacyPointset.keySet().size()); } } \ No newline at end of file diff --git a/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java b/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java index 4bfd5813be..3424b167a2 100644 --- a/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java +++ b/validator/src/main/java/com/google/daq/mqtt/registrar/Registrar.java @@ -1293,7 +1293,7 @@ private void bindGatewayDevices(Map localDevices) { }); System.err.printf("Waiting for device binding...%n"); - dynamicTerminate(); + dynamicTerminate(gatewayBindings.size()); Duration between = Duration.between(start, Instant.now()); double seconds = between.getSeconds() + between.getNano() / 1e9; diff --git a/validator/src/main/java/com/google/udmi/util/PubSubReflector.java b/validator/src/main/java/com/google/udmi/util/PubSubReflector.java index 72993fe934..b6fda173e1 100644 --- a/validator/src/main/java/com/google/udmi/util/PubSubReflector.java +++ b/validator/src/main/java/com/google/udmi/util/PubSubReflector.java @@ -351,19 +351,17 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { String topic = format("/devices/%s/%s%s", attributes.get(DEVICE_ID_KEY), attributes.get(CATEGORY_PROPERTY_KEY), suffix); String messageSource = attributes.remove(SOURCE_KEY); - if (messageSource == null) { - return; - } - - Object dstSource = messageBundle.message.remove(SOURCE_KEY); - String[] source = messageSource.split(SOURCE_SEPARATOR_REGEX, 3); - if (source.length == 1) { - attributes.put(SOURCE_KEY, source[0]); - } else if (source.length == 2 && source[0].isEmpty()) { - topic += messageSource; - ifNotNullThen(dstSource, () -> messageBundle.message.put(SOURCE_KEY, source[1])); - } else { - System.err.println("Discarding message with malformed source: " + messageSource); + if (messageSource != null) { + Object dstSource = messageBundle.message.remove(SOURCE_KEY); + String[] source = messageSource.split(SOURCE_SEPARATOR_REGEX, 3); + if (source.length == 1) { + attributes.put(SOURCE_KEY, source[0]); + } else if (source.length == 2 && source[0].isEmpty()) { + topic += messageSource; + ifNotNullThen(dstSource, () -> messageBundle.message.put(SOURCE_KEY, source[1])); + } else { + System.err.println("Discarding message with malformed source: " + messageSource); + } } ofNullable(messageHandlers.get(deviceRegistryId)).orElse(defaultMessageHandler) .accept(topic, stringify(messageBundle.message));