From 1b50cadf444323f363530a12362c35723b837bc5 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Fri, 17 Apr 2026 11:04:54 +0000 Subject: [PATCH 01/14] CI tests for bridgehead --- .github/workflows/testing.yml | 21 +++++ bridgehead/tests/test_bridgehead.sh | 122 ++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+) create mode 100755 bridgehead/tests/test_bridgehead.sh diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 018a0fd4a2..f2a2756917 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -454,3 +454,24 @@ jobs: run: | cd clientlib/python poetry run pylint src + + bridgehead: + name: Bridgehead Integration Test + runs-on: ubuntu-24.04 + timeout-minutes: 20 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '21' + - name: Run Bridgehead Test + run: | + bash bridgehead/tests/test_bridgehead.sh + - name: Upload logs on failure + if: ${{ failure() }} + uses: actions/upload-artifact@v4 + with: + name: bridgehead-logs + path: bridgehead/var/log/ + diff --git a/bridgehead/tests/test_bridgehead.sh b/bridgehead/tests/test_bridgehead.sh new file mode 100755 index 0000000000..dbb9dc036d --- /dev/null +++ b/bridgehead/tests/test_bridgehead.sh @@ -0,0 +1,122 @@ +#!/bin/bash -e + +# Navigate to bridgehead directory +cd $(dirname $0)/.. + +echo "Cloning default site model..." +# Remove existing udmi_site_model if it exists to avoid conflicts +sudo rm -rf udmi_site_model +sudo rm -rf site + +git clone https://github.com/faucetsdn/udmi_site_model.git site +ln -s site udmi_site_model + +# Backup existing .env if it exists +if [ -f .env ]; then + echo "Backing up existing .env" + cp .env .env.bak +fi + +echo "Generating .env file..." +cat < .env +HOST_IP=127.0.0.1 +AUTH_USER=admin +AUTH_PASS=$(openssl rand -hex 16) +SERV_USER=service +SERV_PASS=$(openssl rand -hex 16) +INFLUXDB_TOKEN=$(openssl rand -hex 32) +INFLUX_USER=influx +INFLUX_PASSWORD=$(openssl rand -hex 16) +GRAFANA_USER=grafana +GRAFANA_PASSWORD=$(openssl rand -hex 16) +EOF + +echo "Starting services with docker-compose..." +docker compose up -d --build + +# Function to clean up on exit +function cleanup { + echo "Cleaning up..." + docker compose down + sudo rm -rf site udmi_site_model + + # Restore .env if backup exists + if [ -f .env.bak ]; then + echo "Restoring .env" + mv .env.bak .env + else + rm -f .env + fi +} +trap cleanup EXIT + +echo "Waiting for services to be healthy..." +# Poll for UDMIS readiness. We can check logs for "Started UDMIS" +# For now, using a simple sleep as a fallback, but polling is better. +# Let's try to poll for 60 seconds. +for i in {1..60}; do + if docker logs udmis 2>&1 | grep -q "Started UDMIS"; then + echo "UDMIS is ready." + break + fi + echo "Waiting for UDMIS... ($i/60)" + sleep 2 +done + +# Also check if validator is running +if ! docker ps | grep -q validator; then + echo "Validator container is not running!" + exit 1 +fi + +echo "Executing discovery sequence..." + +echo "Running registrar initial setup..." +docker exec validator bin/registrar site_model/ //mqtt/mosquitto -x -d +docker exec validator bin/registrar site_model/ //mqtt/mosquitto GAT-123 + +echo "Starting Pubber from source..." +# bin/pubber handles building if needed +# Run in background +../bin/pubber site //mqtt/localhost GAT-123 852649 & +PUBBER_PID=$! + +echo "Pubber started with PID $PUBBER_PID" + +# Wait a bit for pubber to connect and send some messages +sleep 10 + +echo "Running discovery script..." +docker exec validator /root/discovery.sh + +echo "Running registrar to check results..." +docker exec validator bin/registrar site_model/ //mqtt/mosquitto > registrar_output.txt + +echo "Stopping Pubber..." +kill $PUBBER_PID || true + +echo "Verifying output..." +cat registrar_output.txt + +# Expected summary: +# Summary: +# Device envelope: 1 +# Device extra: 6 +# Device proxy: 2 +# Device status: 4 +# Device validation: 1 +# Out of 4 total. + +failed=0 +grep -q "Device envelope: 1" registrar_output.txt || failed=1 +grep -q "Device extra: 6" registrar_output.txt || failed=1 +grep -q "Device proxy: 2" registrar_output.txt || failed=1 +grep -q "Device status: 4" registrar_output.txt || failed=1 +grep -q "Device validation: 1" registrar_output.txt || failed=1 + +if [ $failed -eq 0 ]; then + echo "Test PASSED" +else + echo "Test FAILED" + exit 1 +fi From b67c88a3216f9d699d9946c9e61196e7692fd482 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Fri, 17 Apr 2026 13:24:40 +0000 Subject: [PATCH 02/14] CI tests for bridgehead --- bridgehead/tests/test_bridgehead.sh | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bridgehead/tests/test_bridgehead.sh b/bridgehead/tests/test_bridgehead.sh index dbb9dc036d..b050787924 100755 --- a/bridgehead/tests/test_bridgehead.sh +++ b/bridgehead/tests/test_bridgehead.sh @@ -31,6 +31,9 @@ GRAFANA_USER=grafana GRAFANA_PASSWORD=$(openssl rand -hex 16) EOF +echo "Building Pubber from source..." +../pubber/bin/build + echo "Starting services with docker-compose..." docker compose up -d --build @@ -86,8 +89,8 @@ echo "Pubber started with PID $PUBBER_PID" # Wait a bit for pubber to connect and send some messages sleep 10 -echo "Running discovery script..." -docker exec validator /root/discovery.sh +echo "Running discovery script with GAT-123..." +docker exec validator /root/discovery.sh GAT-123 echo "Running registrar to check results..." docker exec validator bin/registrar site_model/ //mqtt/mosquitto > registrar_output.txt From 9fe99fbce53215cdc560bf1918c8de5922a0aa1c Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Fri, 17 Apr 2026 13:44:20 +0000 Subject: [PATCH 03/14] CI tests for bridgehead --- .github/workflows/testing.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index f2a2756917..080fa92fab 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -465,6 +465,8 @@ jobs: with: distribution: 'temurin' java-version: '21' + - name: Setup Base + run: bin/setup_base - name: Run Bridgehead Test run: | bash bridgehead/tests/test_bridgehead.sh From bee4e7f65c0364efa03fd1311bc53236a5dcb752 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Fri, 17 Apr 2026 14:33:30 +0000 Subject: [PATCH 04/14] fixing CI tests --- bridgehead/tests/test_bridgehead.sh | 30 ++++++++++------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/bridgehead/tests/test_bridgehead.sh b/bridgehead/tests/test_bridgehead.sh index b050787924..ebb5d4e8db 100755 --- a/bridgehead/tests/test_bridgehead.sh +++ b/bridgehead/tests/test_bridgehead.sh @@ -6,10 +6,8 @@ cd $(dirname $0)/.. echo "Cloning default site model..." # Remove existing udmi_site_model if it exists to avoid conflicts sudo rm -rf udmi_site_model -sudo rm -rf site -git clone https://github.com/faucetsdn/udmi_site_model.git site -ln -s site udmi_site_model +git clone https://github.com/faucetsdn/udmi_site_model.git udmi_site_model # Backup existing .env if it exists if [ -f .env ]; then @@ -32,16 +30,21 @@ GRAFANA_PASSWORD=$(openssl rand -hex 16) EOF echo "Building Pubber from source..." +# Build pubber before starting services or running it, to avoid delays in test ../pubber/bin/build echo "Starting services with docker-compose..." docker compose up -d --build +# Fix permissions for files created by Docker in the mounted volume +echo "Fixing permissions for udmi_site_model..." +sudo chmod -R a+rw udmi_site_model + # Function to clean up on exit function cleanup { echo "Cleaning up..." docker compose down - sudo rm -rf site udmi_site_model + sudo rm -rf udmi_site_model # Restore .env if backup exists if [ -f .env.bak ]; then @@ -54,9 +57,6 @@ function cleanup { trap cleanup EXIT echo "Waiting for services to be healthy..." -# Poll for UDMIS readiness. We can check logs for "Started UDMIS" -# For now, using a simple sleep as a fallback, but polling is better. -# Let's try to poll for 60 seconds. for i in {1..60}; do if docker logs udmis 2>&1 | grep -q "Started UDMIS"; then echo "UDMIS is ready." @@ -78,10 +78,9 @@ echo "Running registrar initial setup..." docker exec validator bin/registrar site_model/ //mqtt/mosquitto -x -d docker exec validator bin/registrar site_model/ //mqtt/mosquitto GAT-123 -echo "Starting Pubber from source..." -# bin/pubber handles building if needed -# Run in background -../bin/pubber site //mqtt/localhost GAT-123 852649 & +echo "Starting Pubber..." +# Run in background, it should be already built +../bin/pubber udmi_site_model //mqtt/localhost GAT-123 852649 & PUBBER_PID=$! echo "Pubber started with PID $PUBBER_PID" @@ -101,15 +100,6 @@ kill $PUBBER_PID || true echo "Verifying output..." cat registrar_output.txt -# Expected summary: -# Summary: -# Device envelope: 1 -# Device extra: 6 -# Device proxy: 2 -# Device status: 4 -# Device validation: 1 -# Out of 4 total. - failed=0 grep -q "Device envelope: 1" registrar_output.txt || failed=1 grep -q "Device extra: 6" registrar_output.txt || failed=1 From e213e0e3d3418635aaf5c106c0a600949fcd19c9 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Sat, 18 Apr 2026 09:08:13 +0530 Subject: [PATCH 05/14] Fix out-of-sync gencode changes causing test failures (#6) --- .gencode_hash.txt | 25 ++++--------------------- gencode/python/poetry.lock | 8 ++++---- 2 files changed, 8 insertions(+), 25 deletions(-) diff --git a/.gencode_hash.txt b/.gencode_hash.txt index bdeffb33f2..fe83115014 100644 --- a/.gencode_hash.txt +++ b/.gencode_hash.txt @@ -7,26 +7,15 @@ a65e8177ca59cd51c4a8ff63ecaa194897f7e22b82afb14708d63efbd7b96a84 gencode/docs/c 11b21f73b6a4065102968a4c09979639b8a7ea6efb20e40d52cd21b2a60167bb gencode/docs/configuration_pod.html b34c136cee32cb88f32a427ff400c3898ed49168f6dcaca1bc9ba65365bc5ae4 gencode/docs/configuration_pubber.html 1057fa40fb7a31a23bb2773d21c38cf4590a935bd8b5ea4218e695c6204f5dd9 gencode/docs/data_template.html -<<<<<<< HEAD -12ff7903f025238ffa15113777863dd775f083f0b45f6413db4f37381a257b72 gencode/docs/events.html +ea06d489d98f96f8ac0134388fb4172b1fca6d924aba895c6e3119c19b0c7dc0 gencode/docs/events.html 70e57ad6ef39330d958727ebf9dcd61ef6ea30e4c8653eac412bf1867fdb3a70 gencode/docs/events_alarmset.html -50210ca9a5bdb379982604bd3092e14010c12cd441229b9e37760b60e00cfd6c gencode/docs/events_discovery.html -e550539d52ce1f63b35247d425c038e5d5ba33f997eedd09e486a3bd9838c2c2 gencode/docs/events_mapping.html -======= -c653dc74b151455da760fa44795cb8dc4604fed8b4edfbc591d64f32c1d876a4 gencode/docs/events.html -70e57ad6ef39330d958727ebf9dcd61ef6ea30e4c8653eac412bf1867fdb3a70 gencode/docs/events_alarmset.html -ac0066b41b20a14a2394d8a57c43d0f998b7aa3167d305fc48a0e8ee5778c173 gencode/docs/events_discovery.html +feb4138c5acee9b3626e9c2e541711cec304c0e67c2999e6d713ee2e7144ef53 gencode/docs/events_discovery.html 808ad1cad37e9f4bf08ea3631162a663998ce60fe8935cbd0ca5e548c3b6df2a gencode/docs/events_mapping.html ->>>>>>> master eafcc3c48189f605f114cde051fea9d13fc5f1a3e395d64fb0a91cb53d4c9aeb gencode/docs/events_pointset.html cac253f57c5c92ef32e2a5f91b6cec8229e8db1dcffcc96a58f06da068e741e7 gencode/docs/events_system.html 151c1b62db35e84e51d5ff2a7464f61ced4d7fb0c7eb795715c245ee0a1b3436 gencode/docs/events_udmi.html 73dbe799e7943ec20ac58b544998e986a39539d4ef0cb4f5023e92e7634d3124 gencode/docs/events_validation.html -<<<<<<< HEAD -32c33ce98db575966879b1864d8af52d45aa3e1e1e3e13a7f98d046bebddc7a2 gencode/docs/metadata.html -======= -29fb67f6947d41de74b107193aee11bbe9086678654b0c5e5f5af8d8ba8e37a6 gencode/docs/metadata.html ->>>>>>> master +0af72961d68e952c511f3edff1fb9d7c94ba1aadfa31fbe89128f8dae7f9703c gencode/docs/metadata.html c86682715d348bd3dd971fa5bd925a8a3d0f3c2944c65a47c4b64fe1a5ccdea2 gencode/docs/monitoring.html 474ca16edc9f3cad2bb3ab40b6993cbced90263f762f66ee6cd246a6c4a0d18f gencode/docs/persistent_device.html e11595fd11477947a27461f8ef4fb6facb5f60e2abd6212193f7581ab123ff84 gencode/docs/properties.html @@ -105,16 +94,10 @@ d0858bd7ba306176f4e3c19eeb69ef2467afec0832b6917b72d3ab87469e4974 gencode/java/u 5a0cc53317592a868f7f23a67a7493bd571054bb25d9e28de51000ad6473dfeb gencode/java/udmi/schema/Level.java fd66ce1118548ec8ff3cb4e057ecc0a3614a1e67f0207b592024cd1214259d3f gencode/java/udmi/schema/LinkExternalsModel.java cbb7cadcff210f188d97bad8a1bb4e369518d974fb7f1eac73bad44199293cd9 gencode/java/udmi/schema/LinkExternalsSite.java -<<<<<<< HEAD +decba67ea48b05e1e360670369a20fc9dc4e8cf7507399a2bd23445201048310 gencode/java/udmi/schema/Links.java 9693964888943e245b333aba51a496dc422a21aff5508c77fc2250c1289dfa37 gencode/java/udmi/schema/LocalnetConfig.java e8f266eb69de40457838fe80e1a3befe9e52ce5443e8962653dba09fda61fe97 gencode/java/udmi/schema/LocalnetModel.java 990aefd408a48ea59562a43c79495a0ad55dbff907fd47e6fd154400dead50ea gencode/java/udmi/schema/LocalnetState.java -======= -decba67ea48b05e1e360670369a20fc9dc4e8cf7507399a2bd23445201048310 gencode/java/udmi/schema/Links.java -6b4c29a682d0697d5420efb9e7788ec7358d94435da288938007205b8669e180 gencode/java/udmi/schema/LocalnetConfig.java -563bd616d4eb4201237ad356a144de80a4d0d1713701049009d0dbdab9fc37be gencode/java/udmi/schema/LocalnetModel.java -b250808d1818cbe5366473cf814e9949c8c17dc4b5b2b281ba5845f3a696ba97 gencode/java/udmi/schema/LocalnetState.java ->>>>>>> master 9fd5185768bae32e69020e85e6af424f0e42af7dd4a2d69302395880f699d046 gencode/java/udmi/schema/Location.java ca575e906937fbd2137e552a5943a976a81eccfbcd5db607ebb4e70254c604db gencode/java/udmi/schema/MappingCommand.java 3feaec057c0f8f97a389ad437f297b0b6ff5dbb4c92d0f500e74aa2629991b90 gencode/java/udmi/schema/MappingConfig.java diff --git a/gencode/python/poetry.lock b/gencode/python/poetry.lock index d923e4628b..a71cb86302 100644 --- a/gencode/python/poetry.lock +++ b/gencode/python/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.3.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.3.4 and should not be changed by hand. [[package]] name = "dataclasses-json" @@ -50,14 +50,14 @@ files = [ [[package]] name = "packaging" -version = "26.0" +version = "26.1" description = "Core utilities for Python packages" optional = false python-versions = ">=3.8" groups = ["main"] files = [ - {file = "packaging-26.0-py3-none-any.whl", hash = "sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529"}, - {file = "packaging-26.0.tar.gz", hash = "sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4"}, + {file = "packaging-26.1-py3-none-any.whl", hash = "sha256:5d9c0669c6285e491e0ced2eee587eaf67b670d94a19e94e3984a481aba6802f"}, + {file = "packaging-26.1.tar.gz", hash = "sha256:f042152b681c4bfac5cae2742a55e103d27ab2ec0f3d88037136b6bfe7c9c5de"}, ] [[package]] From b40ddf411c1153e5939030b14ae6f6598078f8d3 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Sun, 19 Apr 2026 10:55:00 +0000 Subject: [PATCH 06/14] bridgehead CI test fixes --- .github/workflows/testing.yml | 2 +- bridgehead/tests/test_bridgehead.sh | 62 ++++++++++++++++------------- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 080fa92fab..d7ffc25823 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -471,7 +471,7 @@ jobs: run: | bash bridgehead/tests/test_bridgehead.sh - name: Upload logs on failure - if: ${{ failure() }} + if: ${{ !cancelled() }} uses: actions/upload-artifact@v4 with: name: bridgehead-logs diff --git a/bridgehead/tests/test_bridgehead.sh b/bridgehead/tests/test_bridgehead.sh index ebb5d4e8db..da70547d01 100755 --- a/bridgehead/tests/test_bridgehead.sh +++ b/bridgehead/tests/test_bridgehead.sh @@ -9,6 +9,9 @@ sudo rm -rf udmi_site_model git clone https://github.com/faucetsdn/udmi_site_model.git udmi_site_model +mkdir -p udmi_site_model/out +chmod a+rwx udmi_site_model/out + # Backup existing .env if it exists if [ -f .env ]; then echo "Backing up existing .env" @@ -16,33 +19,38 @@ if [ -f .env ]; then fi echo "Generating .env file..." +HOST_IP=$(sudo hostname -I | awk '{print $1}') cat < .env -HOST_IP=127.0.0.1 -AUTH_USER=admin -AUTH_PASS=$(openssl rand -hex 16) -SERV_USER=service -SERV_PASS=$(openssl rand -hex 16) +HOST_IP=$HOST_IP +AUTH_USER=scrumptious +AUTH_PASS=aardvark +SERV_USER=rocket +SERV_PASS=monkey INFLUXDB_TOKEN=$(openssl rand -hex 32) -INFLUX_USER=influx -INFLUX_PASSWORD=$(openssl rand -hex 16) -GRAFANA_USER=grafana -GRAFANA_PASSWORD=$(openssl rand -hex 16) +INFLUX_USER=bridgehead +INFLUX_PASSWORD=password +GRAFANA_USER=bridgehead +GRAFANA_PASSWORD=password EOF -echo "Building Pubber from source..." -# Build pubber before starting services or running it, to avoid delays in test -../pubber/bin/build +echo "Building Pubber container from source..." +# Run from workspace root to make bin/container work correctly +(cd .. && bin/container pubber build --no-check) echo "Starting services with docker-compose..." docker compose up -d --build -# Fix permissions for files created by Docker in the mounted volume -echo "Fixing permissions for udmi_site_model..." -sudo chmod -R a+rw udmi_site_model - # Function to clean up on exit function cleanup { echo "Cleaning up..." + echo "Dumping UDMIS logs:" + docker logs udmis || true + echo "Dumping Validator logs:" + docker logs validator || true + + echo "Stopping Pubber container..." + docker stop pubber || true + echo "Stopping docker-compose services..." docker compose down sudo rm -rf udmi_site_model @@ -58,7 +66,7 @@ trap cleanup EXIT echo "Waiting for services to be healthy..." for i in {1..60}; do - if docker logs udmis 2>&1 | grep -q "Started UDMIS"; then + if docker logs udmis 2>&1 | grep -q "udmis running in the background"; then echo "UDMIS is ready." break fi @@ -66,9 +74,10 @@ for i in {1..60}; do sleep 2 done -# Also check if validator is running -if ! docker ps | grep -q validator; then - echo "Validator container is not running!" +if ! docker logs udmis 2>&1 | grep -q "udmis running in the background"; then + echo "UDMIS failed to become ready in time!" + echo "Dumping UDMIS logs:" + docker logs udmis || true exit 1 fi @@ -78,12 +87,12 @@ echo "Running registrar initial setup..." docker exec validator bin/registrar site_model/ //mqtt/mosquitto -x -d docker exec validator bin/registrar site_model/ //mqtt/mosquitto GAT-123 -echo "Starting Pubber..." -# Run in background, it should be already built -../bin/pubber udmi_site_model //mqtt/localhost GAT-123 852649 & -PUBBER_PID=$! +echo "Starting Pubber container in udminet network..." +# Use the locally built 'pubber' image and connect to 'udminet' +docker run -d --rm --name pubber --network udminet -v $(realpath udmi_site_model):/root/site_model pubber /bin/bash -c "tail -f /dev/null" -echo "Pubber started with PID $PUBBER_PID" +echo "Running Pubber inside container..." +docker exec -d pubber /bin/bash -c "bin/pubber site_model/ //mqtt/mosquitto GAT-123 852649" # Wait a bit for pubber to connect and send some messages sleep 10 @@ -94,9 +103,6 @@ docker exec validator /root/discovery.sh GAT-123 echo "Running registrar to check results..." docker exec validator bin/registrar site_model/ //mqtt/mosquitto > registrar_output.txt -echo "Stopping Pubber..." -kill $PUBBER_PID || true - echo "Verifying output..." cat registrar_output.txt From 879557d93a69feb1794ddeb04a6d321249baf6e0 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Sun, 19 Apr 2026 13:10:52 +0000 Subject: [PATCH 07/14] bridgehead CI test fixes --- bridgehead/tests/test_bridgehead.sh | 45 ++++++++--------------------- 1 file changed, 12 insertions(+), 33 deletions(-) diff --git a/bridgehead/tests/test_bridgehead.sh b/bridgehead/tests/test_bridgehead.sh index da70547d01..56e4c90391 100755 --- a/bridgehead/tests/test_bridgehead.sh +++ b/bridgehead/tests/test_bridgehead.sh @@ -33,10 +33,6 @@ GRAFANA_USER=bridgehead GRAFANA_PASSWORD=password EOF -echo "Building Pubber container from source..." -# Run from workspace root to make bin/container work correctly -(cd .. && bin/container pubber build --no-check) - echo "Starting services with docker-compose..." docker compose up -d --build @@ -81,37 +77,20 @@ if ! docker logs udmis 2>&1 | grep -q "udmis running in the background"; then exit 1 fi -echo "Executing discovery sequence..." - -echo "Running registrar initial setup..." -docker exec validator bin/registrar site_model/ //mqtt/mosquitto -x -d -docker exec validator bin/registrar site_model/ //mqtt/mosquitto GAT-123 - -echo "Starting Pubber container in udminet network..." -# Use the locally built 'pubber' image and connect to 'udminet' -docker run -d --rm --name pubber --network udminet -v $(realpath udmi_site_model):/root/site_model pubber /bin/bash -c "tail -f /dev/null" - -echo "Running Pubber inside container..." -docker exec -d pubber /bin/bash -c "bin/pubber site_model/ //mqtt/mosquitto GAT-123 852649" - -# Wait a bit for pubber to connect and send some messages -sleep 10 - -echo "Running discovery script with GAT-123..." -docker exec validator /root/discovery.sh GAT-123 - -echo "Running registrar to check results..." -docker exec validator bin/registrar site_model/ //mqtt/mosquitto > registrar_output.txt - -echo "Verifying output..." -cat registrar_output.txt +# Function to run a command with retries +function run_with_retry { + local n=0 + until [ "$n" -ge 3 ] + do + "$@" && return 0 + n=$((n+1)) + echo "Command failed, retrying ($n/3)..." >&2 + sleep 5 + done + return 1 +} failed=0 -grep -q "Device envelope: 1" registrar_output.txt || failed=1 -grep -q "Device extra: 6" registrar_output.txt || failed=1 -grep -q "Device proxy: 2" registrar_output.txt || failed=1 -grep -q "Device status: 4" registrar_output.txt || failed=1 -grep -q "Device validation: 1" registrar_output.txt || failed=1 if [ $failed -eq 0 ]; then echo "Test PASSED" From 421064549cf2535b3e7fd15a8a335ba21f34ebe3 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Sun, 19 Apr 2026 13:36:24 +0000 Subject: [PATCH 08/14] bridgehead CI test fixes --- .github/workflows/testing.yml | 4 +++- bridgehead/tests/test_bridgehead.sh | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index aefc2d57bd..9fd670856d 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -479,5 +479,7 @@ jobs: uses: actions/upload-artifact@v4 with: name: bridgehead-logs - path: bridgehead/var/log/ + path: | + bridgehead/var/mosquitto/log/ + bridgehead/var/tmp/udmis.log diff --git a/bridgehead/tests/test_bridgehead.sh b/bridgehead/tests/test_bridgehead.sh index 56e4c90391..f7bbeff308 100755 --- a/bridgehead/tests/test_bridgehead.sh +++ b/bridgehead/tests/test_bridgehead.sh @@ -44,8 +44,8 @@ function cleanup { echo "Dumping Validator logs:" docker logs validator || true - echo "Stopping Pubber container..." - docker stop pubber || true + # echo "Stopping Pubber container..." + # docker stop pubber || true echo "Stopping docker-compose services..." docker compose down sudo rm -rf udmi_site_model From 281f514897de1443fe4e0b1a8ca207916fe86fe4 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Mon, 27 Apr 2026 17:55:16 +0000 Subject: [PATCH 09/14] changes for mosquitto ACLs --- bin/mosquctl_client | 13 +++++- bin/mosquctl_gateway | 43 +++++++++++++++++++ bin/start_mosquitto | 3 -- bin/test_mosquitto | 2 +- .../access/ImplicitIotAccessProvider.java | 20 ++++++++- .../service/support/ConnectionBroker.java | 4 ++ .../udmi/service/support/MosquittoBroker.java | 18 +++++++- 7 files changed, 94 insertions(+), 9 deletions(-) create mode 100755 bin/mosquctl_gateway diff --git a/bin/mosquctl_client b/bin/mosquctl_client index b0eac2d74d..b54e17ac8c 100755 --- a/bin/mosquctl_client +++ b/bin/mosquctl_client @@ -17,9 +17,20 @@ shift 2 source $UDMI_ROOT/etc/mosquitto_ctrl.sh client_user=$client_id +role_name="role_${client_id//\//_}" $MOSQUITTO_CTRL deleteClient $client_user || true +$MOSQUITTO_CTRL deleteRole $role_name || true + if [[ $client_pass != "--" ]]; then $MOSQUITTO_CTRL createClient $client_user -p $client_pass -c $client_id - $MOSQUITTO_CTRL addClientRole $client_user device + + $MOSQUITTO_CTRL createRole $role_name + $MOSQUITTO_CTRL addClientRole $client_user $role_name + + $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/config" allow + $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/commands" allow + $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/errors" allow + $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/events" allow + $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/state" allow fi diff --git a/bin/mosquctl_gateway b/bin/mosquctl_gateway new file mode 100755 index 0000000000..232f73af36 --- /dev/null +++ b/bin/mosquctl_gateway @@ -0,0 +1,43 @@ +#!/bin/bash -e + +UDMI_ROOT=$(dirname $0)/.. +source $UDMI_ROOT/etc/shell_common.sh +source $UDMI_ROOT/etc/mosquitto_ctrl.sh + +if [[ $# != 3 ]]; then + echo Usage: $0 action gateway_id device_id + echo Actions: bind, unbind + false +fi + +action=$1 +gateway_id=$2 +device_id=$3 + +role_name="role_${gateway_id//\//_}" + +if [[ $action == "bind" ]]; then + echo Binding $device_id to gateway $gateway_id + # Create role if not exists (ignore error if exists) + $MOSQUITTO_CTRL createRole $role_name || true + # Add role to gateway client (ignore error if already added) + $MOSQUITTO_CTRL addClientRole $gateway_id $role_name || true + + # Add ACLs + $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/config" allow + $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/commands" allow + $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/errors" allow + $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/events" allow + $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/state" allow +elif [[ $action == "unbind" ]]; then + echo Unbinding $device_id from gateway $gateway_id + # Remove ACLs + $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/config" || true + $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/commands" || true + $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/errors" || true + $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/events" || true + $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/state" || true +else + echo Unknown action: $action + false +fi diff --git a/bin/start_mosquitto b/bin/start_mosquitto index 16a309dc0a..01799c236a 100755 --- a/bin/start_mosquitto +++ b/bin/start_mosquitto @@ -62,9 +62,6 @@ else echo Completed mosquitto startup. fi -$MOSQUITTO_CTRL createRole device -$MOSQUITTO_CTRL addRoleACL device subscribePattern '/r/+/d/+/#' allow -$MOSQUITTO_CTRL addRoleACL device publishClientSend '/r/+/d/+/#' allow $MOSQUITTO_CTRL createRole service $MOSQUITTO_CTRL addRoleACL service subscribePattern '/r/+/d/+/#' allow $MOSQUITTO_CTRL addRoleACL service publishClientSend '/r/+/d/+/#' allow diff --git a/bin/test_mosquitto b/bin/test_mosquitto index 2ec4df34fb..78c43fae11 100755 --- a/bin/test_mosquitto +++ b/bin/test_mosquitto @@ -33,7 +33,7 @@ CLIENT_OPTS="-i $CLNT_ID -u $CLNT_USER -P $CLNT_PASS --cafile $CA_CERT --cert $C $MOSQUITTO_CTRL deleteClient $CLNT_USER $MOSQUITTO_CTRL createClient $CLNT_USER -p $CLNT_PASS -c $CLNT_ID -$MOSQUITTO_CTRL addClientRole $CLNT_USER device +$MOSQUITTO_CTRL addClientRole $CLNT_USER service killall mosquitto_sub || true diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index d43ef01f3d..24eee4b558 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -109,8 +109,18 @@ public static String hashedDeviceId(String registryId, String deviceId) { private void bindDevicesToGateway(String registryId, String gatewayId, CloudModel cloudModel) { Set deviceIds = ImmutableSet.copyOf(cloudModel.gateway.proxy_ids); - deviceIds.forEach(deviceId -> - registryDeviceRef(registryId, deviceId).put(BOUND_TO_KEY, gatewayId)); + deviceIds.forEach(deviceId -> { + registryDeviceRef(registryId, deviceId).put(BOUND_TO_KEY, gatewayId); + broker.bindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); + }); + } + + private void unbindDevicesFromGateway(String registryId, String gatewayId, CloudModel cloudModel) { + Set deviceIds = ImmutableSet.copyOf(cloudModel.gateway.proxy_ids); + deviceIds.forEach(deviceId -> { + registryDeviceRef(registryId, deviceId).delete(BOUND_TO_KEY); + broker.unbindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); + }); } private void blockDevice(String registryId, String deviceId, CloudModel cloudModel) { @@ -158,9 +168,14 @@ private void createDevice(String registryId, String deviceId, CloudModel cloudMo private void deleteDevice(String registryId, String deviceId, CloudModel cloudModel) { DataRef properties = registryDeviceRef(registryId, deviceId); + String gatewayId = properties.get(BOUND_TO_KEY); properties.entries().keySet().forEach(properties::delete); registryDevicesRef(registryId).delete(deviceId); broker.authorize(clientId(registryId, deviceId), null); + + if (gatewayId != null) { + broker.unbindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); + } } private CloudModel getReply(String registryId, String deviceId, CloudModel request, @@ -339,6 +354,7 @@ public CloudModel modelDevice(String registryId, String deviceId, CloudModel clo case DELETE -> deleteDevice(registryId, deviceId, cloudModel); case MODIFY -> modifyDevice(registryId, deviceId, cloudModel); case BIND -> bindDevicesToGateway(registryId, deviceId, cloudModel); + case UNBIND -> unbindDevicesFromGateway(registryId, deviceId, cloudModel); case BLOCK -> blockDevice(registryId, deviceId, cloudModel); default -> throw new RuntimeException("Unknown device operation " + operation); } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java index 430174dcff..fb75ea41e4 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/ConnectionBroker.java @@ -13,6 +13,10 @@ public interface ConnectionBroker { Future addEventListener(String clientPrefix, Consumer eventConsumer); + void bindGateway(String gatewayId, String deviceId); + + void unbindGateway(String gatewayId, String deviceId); + /** * Simple event for connection broker happenings. */ diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index 3aa4750328..894e5a9650 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -26,6 +26,7 @@ public class MosquittoBroker extends ContainerBase implements ConnectionBroker { private static final String UDMI_ROOT = System.getenv("UDMI_ROOT"); private static final String MOSQUCTL_CLIENT_FMT = UDMI_ROOT + "/bin/mosquctl_client %s %s"; private static final String MOSQUCTL_LOG_FMT = UDMI_ROOT + "/bin/mosquctl_log %s"; + private static final String MOSQUCTL_GATEWAY_FMT = UDMI_ROOT + "/bin/mosquctl_gateway %s %s %s"; private static final long EXEC_TIMEOUT_SEC = 10; private static final String REVOKE_PASSWORD = "--"; private static final Pattern LOG_MATCHER = @@ -60,8 +61,7 @@ private void consumeStream(BufferedReader reader, Consumer consumer) { thread.start(); } - private void mosquctlClient(String clientId, String clientPass) { - String cmd = format(MOSQUCTL_CLIENT_FMT, clientId, clientPass); + private void executeCommand(String cmd) { synchronized (MosquittoBroker.class) { try { info("Executing command %s", cmd); @@ -77,6 +77,10 @@ private void mosquctlClient(String clientId, String clientPass) { } } + private void mosquctlClient(String clientId, String clientPass) { + executeCommand(format(MOSQUCTL_CLIENT_FMT, clientId, clientPass)); + } + private void mosquctlLog(String clientPrefix, Consumer eventConsumer) { String cmd = format(MOSQUCTL_LOG_FMT, clientPrefix); synchronized (MosquittoBroker.class) { @@ -148,4 +152,14 @@ public Future addEventListener(String clientPrefix, public void authorize(String clientId, String password) { mosquctlClient(clientId, ofNullable(password).orElse(REVOKE_PASSWORD)); } + + @Override + public void bindGateway(String gatewayId, String deviceId) { + executeCommand(format(MOSQUCTL_GATEWAY_FMT, "bind", gatewayId, deviceId)); + } + + @Override + public void unbindGateway(String gatewayId, String deviceId) { + executeCommand(format(MOSQUCTL_GATEWAY_FMT, "unbind", gatewayId, deviceId)); + } } From b9cc6d8fcbd65afa3a2f5d8edbd93d9469b8f15c Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Tue, 28 Apr 2026 05:53:23 +0000 Subject: [PATCH 10/14] changes for mosquitto ACLs --- .../bos/udmi/service/access/ImplicitIotAccessProvider.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java index 24eee4b558..b59244ed5d 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ImplicitIotAccessProvider.java @@ -115,7 +115,8 @@ private void bindDevicesToGateway(String registryId, String gatewayId, CloudMode }); } - private void unbindDevicesFromGateway(String registryId, String gatewayId, CloudModel cloudModel) { + private void unbindDevicesFromGateway(String registryId, String gatewayId, + CloudModel cloudModel) { Set deviceIds = ImmutableSet.copyOf(cloudModel.gateway.proxy_ids); deviceIds.forEach(deviceId -> { registryDeviceRef(registryId, deviceId).delete(BOUND_TO_KEY); @@ -168,10 +169,10 @@ private void createDevice(String registryId, String deviceId, CloudModel cloudMo private void deleteDevice(String registryId, String deviceId, CloudModel cloudModel) { DataRef properties = registryDeviceRef(registryId, deviceId); - String gatewayId = properties.get(BOUND_TO_KEY); properties.entries().keySet().forEach(properties::delete); registryDevicesRef(registryId).delete(deviceId); broker.authorize(clientId(registryId, deviceId), null); + String gatewayId = properties.get(BOUND_TO_KEY); if (gatewayId != null) { broker.unbindGateway(clientId(registryId, gatewayId), clientId(registryId, deviceId)); From 9729a1609f601fae8be3fa45fb60840967c216d4 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Tue, 28 Apr 2026 11:50:02 +0000 Subject: [PATCH 11/14] changes for mosquitto ACLs --- bin/mosquctl_client | 3 +++ bin/mosquctl_gateway | 2 ++ .../com/google/bos/udmi/service/support/MosquittoBroker.java | 1 + 3 files changed, 6 insertions(+) diff --git a/bin/mosquctl_client b/bin/mosquctl_client index b54e17ac8c..49d3ae7c09 100755 --- a/bin/mosquctl_client +++ b/bin/mosquctl_client @@ -33,4 +33,7 @@ if [[ $client_pass != "--" ]]; then $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/errors" allow $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/events" allow $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/state" allow + echo "Device $client_id registered correctly." +else + echo "Device $client_id deleted correctly." fi diff --git a/bin/mosquctl_gateway b/bin/mosquctl_gateway index 232f73af36..c0a64958b5 100755 --- a/bin/mosquctl_gateway +++ b/bin/mosquctl_gateway @@ -29,6 +29,7 @@ if [[ $action == "bind" ]]; then $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/errors" allow $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/events" allow $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/state" allow + echo "Binding successful for $device_id to gateway $gateway_id" elif [[ $action == "unbind" ]]; then echo Unbinding $device_id from gateway $gateway_id # Remove ACLs @@ -37,6 +38,7 @@ elif [[ $action == "unbind" ]]; then $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/errors" || true $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/events" || true $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/state" || true + echo "Unbinding successful for $device_id from gateway $gateway_id" else echo Unknown action: $action false diff --git a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java index 894e5a9650..8264461074 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/support/MosquittoBroker.java @@ -160,6 +160,7 @@ public void bindGateway(String gatewayId, String deviceId) { @Override public void unbindGateway(String gatewayId, String deviceId) { + info("Unbind device Id: %s from gateway Id: %s :%s", deviceId, gatewayId); executeCommand(format(MOSQUCTL_GATEWAY_FMT, "unbind", gatewayId, deviceId)); } } From c2503916f7444009011476703ce12a647b9526df Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Thu, 7 May 2026 08:28:40 +0000 Subject: [PATCH 12/14] mosquito ACL support changes --- bin/mosquctl_client | 2 +- bin/mosquctl_gateway | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/mosquctl_client b/bin/mosquctl_client index 49d3ae7c09..6256091452 100755 --- a/bin/mosquctl_client +++ b/bin/mosquctl_client @@ -31,7 +31,7 @@ if [[ $client_pass != "--" ]]; then $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/config" allow $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/commands" allow $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$client_id/errors" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/events" allow + $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/events/#" allow $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$client_id/state" allow echo "Device $client_id registered correctly." else diff --git a/bin/mosquctl_gateway b/bin/mosquctl_gateway index c0a64958b5..fc6f28d3db 100755 --- a/bin/mosquctl_gateway +++ b/bin/mosquctl_gateway @@ -27,7 +27,7 @@ if [[ $action == "bind" ]]; then $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/config" allow $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/commands" allow $MOSQUITTO_CTRL addRoleACL $role_name subscribePattern "$device_id/errors" allow - $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/events" allow + $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/events/#" allow $MOSQUITTO_CTRL addRoleACL $role_name publishClientSend "$device_id/state" allow echo "Binding successful for $device_id to gateway $gateway_id" elif [[ $action == "unbind" ]]; then @@ -36,7 +36,7 @@ elif [[ $action == "unbind" ]]; then $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/config" || true $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/commands" || true $MOSQUITTO_CTRL removeRoleACL $role_name subscribePattern "$device_id/errors" || true - $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/events" || true + $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/events/#" || true $MOSQUITTO_CTRL removeRoleACL $role_name publishClientSend "$device_id/state" || true echo "Unbinding successful for $device_id from gateway $gateway_id" else From d668909d749e1fd7ad434a65af5cbd6df8589c89 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Tue, 12 May 2026 07:54:23 +0000 Subject: [PATCH 13/14] changes for fetching numId --- .../service/bridge/MqttToPubSubBridge.java | 43 ++++++- .../bridge/MqttToPubSubBridgeTest.java | 120 +++++++++++++++++- 2 files changed, 158 insertions(+), 5 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java index 7f0a82539b..5c0a31c86b 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java @@ -59,8 +59,11 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; +import com.google.bos.udmi.service.support.EtcdDataProvider; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; +import udmi.schema.IotAccess; +import udmi.schema.IotAccess.IotProvider; import org.slf4j.LoggerFactory; /** @@ -100,6 +103,8 @@ public static void main(String[] args) { String mqttPassword = commandLine.getOptionValue("mqtt_password"); String mqttClientCertPath = commandLine.getOptionValue("mqtt_client_cert_path"); String mqttClientKeyPath = commandLine.getOptionValue("mqtt_client_key_path"); + String etcdTarget = commandLine.getOptionValue("etcd_target"); + String etcdOptions = commandLine.getOptionValue("etcd_options"); if (gcpProjectId == null || pubsubTopicId == null) { logger.error("gcp_project_id and pubsub_topic_id are required."); @@ -108,8 +113,18 @@ public static void main(String[] args) { Publisher publisher = null; IMqttClient mqttClient = null; + EtcdDataProvider etcdProvider = null; try { + if (etcdTarget != null) { + IotAccess iotAccess = new IotAccess(); + iotAccess.provider = IotProvider.ETCD; + iotAccess.project_id = etcdTarget; + iotAccess.options = etcdOptions; + etcdProvider = new EtcdDataProvider(iotAccess); + logger.info("EtcdDataProvider initialized for target: {}", etcdTarget); + } + // Initialize Pub/Sub Publisher ProjectTopicName topicName = ProjectTopicName.of(gcpProjectId, pubsubTopicId); publisher = Publisher.newBuilder(topicName).build(); @@ -138,7 +153,7 @@ public static void main(String[] args) { logger.info("Connected to MQTT broker."); // Set up MQTT Message Callback - setupBridge(mqttClient, publisher, mqttSubscriptionTopic); + setupBridge(mqttClient, publisher, mqttSubscriptionTopic, etcdProvider); // Keep the application running while (true) { @@ -158,6 +173,14 @@ public static void main(String[] args) { logger.error("An unexpected error occurred", e); } finally { // Shutdown + if (etcdProvider != null) { + try { + etcdProvider.shutdown(); + logger.info("EtcdDataProvider shut down."); + } catch (Exception e) { + logger.warn("Error shutting down EtcdDataProvider", e); + } + } if (mqttClient != null && mqttClient.isConnected()) { try { mqttClient.disconnect(); @@ -189,6 +212,8 @@ private static CommandLine parseArgs(String[] args) throws ParseException { options.addOption(null, "mqtt_password", true, "MQTT password for authentication."); options.addOption(null, "mqtt_client_cert_path", true, "Path to client certificate for TLS."); options.addOption(null, "mqtt_client_key_path", true, "Path to client private key for TLS."); + options.addOption(null, "etcd_target", true, "etcd endpoint URL."); + options.addOption(null, "etcd_options", true, "etcd provider options (comma-separated)."); options.addOption("h", "help", false, "Print usage info."); CommandLineParser parser = new DefaultParser(); @@ -212,7 +237,7 @@ private static CommandLine parseArgs(String[] args) throws ParseException { * @throws MqttException If an MQTT error occurs. */ public static void setupBridge(IMqttClient mqttClient, Publisher publisher, - String mqttSubscriptionTopic) throws MqttException { + String mqttSubscriptionTopic, EtcdDataProvider etcdProvider) throws MqttException { mqttClient.setCallback( new MqttCallback() { @Override @@ -245,6 +270,20 @@ public void messageArrived(String topic, MqttMessage message) { attributes.put("deviceId", deviceId); attributes.put("deviceRegistryId", registryId); + if (etcdProvider != null && !"unknown".equals(registryId) && !"unknown".equals(deviceId)) { + try { + String numId = etcdProvider.ref().registry(registryId).device(deviceId).get("num_id"); + if (numId != null) { + attributes.put("deviceNumId", numId); + logger.info("Found numId {} for device {}/{}", numId, registryId, deviceId); + } else { + logger.warn("numId not found in etcd for device {}/{}", registryId, deviceId); + } + } catch (Exception e) { + logger.warn("Error reading numId from etcd for device {}/{}", registryId, deviceId, e); + } + } + if (topicSuffix != null && topicSuffix.startsWith("events/")) { List parts = Splitter.on('/').splitToList(topicSuffix); if (parts.size() >= 2) { diff --git a/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java b/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java index e950b3034a..2744727476 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java @@ -14,6 +14,8 @@ import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.junit.jupiter.api.Test; +import com.google.bos.udmi.service.support.DataRef; +import com.google.bos.udmi.service.support.EtcdDataProvider; import org.mockito.ArgumentCaptor; class MqttToPubSubBridgeTest { @@ -31,7 +33,7 @@ void testSetupBridge() throws Exception { .thenReturn(ApiFutures.immediateFuture("msg-123")); // Call setupBridge - MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic); + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, null); // Verify subscription verify(mockMqttClient).subscribe(testTopic); @@ -69,7 +71,7 @@ void testSetupBridgeWithSubFolder() throws Exception { when(mockPublisher.publish(any(PubsubMessage.class))) .thenReturn(ApiFutures.immediateFuture("msg-123")); - MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic); + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, null); ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); verify(mockMqttClient).setCallback(callbackCaptor.capture()); @@ -100,7 +102,7 @@ void testSetupBridgeUnrecognizedTopic() throws Exception { when(mockPublisher.publish(any(PubsubMessage.class))) .thenReturn(ApiFutures.immediateFuture("msg-123")); - MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic); + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, null); ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); verify(mockMqttClient).setCallback(callbackCaptor.capture()); @@ -118,4 +120,116 @@ void testSetupBridgeUnrecognizedTopic() throws Exception { assertEquals("unknown", attributes.get("deviceId")); assertEquals("unknown", attributes.get("deviceRegistryId")); } + + @Test + void testSetupBridgeWithEtcd() throws Exception { + IMqttClient mockMqttClient = mock(IMqttClient.class); + Publisher mockPublisher = mock(Publisher.class); + EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); + DataRef mockDataRef = mock(DataRef.class); + + String testTopic = "/r/my-registry/d/my-device/events"; + String payloadStr = "Hello World"; + final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); + + when(mockPublisher.publish(any(PubsubMessage.class))) + .thenReturn(ApiFutures.immediateFuture("msg-123")); + + // Mock etcd provider to return a numId + when(mockEtcdProvider.ref()).thenReturn(mockDataRef); + when(mockDataRef.registry("my-registry")).thenReturn(mockDataRef); + when(mockDataRef.device("my-device")).thenReturn(mockDataRef); + when(mockDataRef.get("num_id")).thenReturn("123456"); + + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, mockEtcdProvider); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); + verify(mockMqttClient).setCallback(callbackCaptor.capture()); + MqttCallback callback = callbackCaptor.getValue(); + + callback.messageArrived(testTopic, mqttMessage); + + ArgumentCaptor pubsubMessageCaptor = + ArgumentCaptor.forClass(PubsubMessage.class); + verify(mockPublisher).publish(pubsubMessageCaptor.capture()); + + PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue(); + Map attributes = pubsubMessage.getAttributesMap(); + assertEquals("123456", attributes.get("deviceNumId")); + } + + @Test + void testSetupBridgeWithEtcdNullResult() throws Exception { + IMqttClient mockMqttClient = mock(IMqttClient.class); + Publisher mockPublisher = mock(Publisher.class); + EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); + DataRef mockDataRef = mock(DataRef.class); + + String testTopic = "/r/my-registry/d/my-device/events"; + String payloadStr = "Hello World"; + final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); + + when(mockPublisher.publish(any(PubsubMessage.class))) + .thenReturn(ApiFutures.immediateFuture("msg-123")); + + // Mock etcd provider to return null for numId + when(mockEtcdProvider.ref()).thenReturn(mockDataRef); + when(mockDataRef.registry("my-registry")).thenReturn(mockDataRef); + when(mockDataRef.device("my-device")).thenReturn(mockDataRef); + when(mockDataRef.get("num_id")).thenReturn(null); + + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, mockEtcdProvider); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); + verify(mockMqttClient).setCallback(callbackCaptor.capture()); + MqttCallback callback = callbackCaptor.getValue(); + + callback.messageArrived(testTopic, mqttMessage); + + ArgumentCaptor pubsubMessageCaptor = + ArgumentCaptor.forClass(PubsubMessage.class); + verify(mockPublisher).publish(pubsubMessageCaptor.capture()); + + PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue(); + Map attributes = pubsubMessage.getAttributesMap(); + org.junit.jupiter.api.Assertions.assertFalse(attributes.containsKey("deviceNumId")); + } + + @Test + void testSetupBridgeWithEtcdFailure() throws Exception { + IMqttClient mockMqttClient = mock(IMqttClient.class); + Publisher mockPublisher = mock(Publisher.class); + EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); + DataRef mockDataRef = mock(DataRef.class); + + String testTopic = "/r/my-registry/d/my-device/events"; + String payloadStr = "Hello World"; + final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); + + when(mockPublisher.publish(any(PubsubMessage.class))) + .thenReturn(ApiFutures.immediateFuture("msg-123")); + + // Mock etcd provider to throw exception + when(mockEtcdProvider.ref()).thenReturn(mockDataRef); + when(mockDataRef.registry("my-registry")).thenReturn(mockDataRef); + when(mockDataRef.device("my-device")).thenReturn(mockDataRef); + when(mockDataRef.get("num_id")).thenThrow(new RuntimeException("etcd error")); + + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, mockEtcdProvider); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); + verify(mockMqttClient).setCallback(callbackCaptor.capture()); + MqttCallback callback = callbackCaptor.getValue(); + + // This should not throw exception and message should still be published + callback.messageArrived(testTopic, mqttMessage); + + ArgumentCaptor pubsubMessageCaptor = + ArgumentCaptor.forClass(PubsubMessage.class); + verify(mockPublisher).publish(pubsubMessageCaptor.capture()); + + PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue(); + Map attributes = pubsubMessage.getAttributesMap(); + org.junit.jupiter.api.Assertions.assertFalse(attributes.containsKey("deviceNumId")); + } } From 8828d7657d65897abade4bc738d621fc18c44fa3 Mon Sep 17 00:00:00 2001 From: Nitish Jain Date: Tue, 12 May 2026 08:44:30 +0000 Subject: [PATCH 14/14] changes for fetching numId --- .../service/bridge/MqttToPubSubBridge.java | 16 +++++--- .../bridge/MqttToPubSubBridgeTest.java | 40 +++++++++---------- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java index 5c0a31c86b..58462636ba 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java @@ -3,6 +3,7 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.api.core.ApiFuture; +import com.google.bos.udmi.service.support.EtcdDataProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.common.base.Splitter; import com.google.protobuf.ByteString; @@ -59,12 +60,11 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; -import com.google.bos.udmi.service.support.EtcdDataProvider; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import udmi.schema.IotAccess; import udmi.schema.IotAccess.IotProvider; -import org.slf4j.LoggerFactory; /** * A bridge that subscribes to an MQTT topic and publishes messages to a Google Cloud Pub/Sub topic. @@ -270,9 +270,14 @@ public void messageArrived(String topic, MqttMessage message) { attributes.put("deviceId", deviceId); attributes.put("deviceRegistryId", registryId); - if (etcdProvider != null && !"unknown".equals(registryId) && !"unknown".equals(deviceId)) { + if (etcdProvider != null + && !"unknown".equals(registryId) + && !"unknown".equals(deviceId)) { try { - String numId = etcdProvider.ref().registry(registryId).device(deviceId).get("num_id"); + String numId = etcdProvider.ref() + .registry(registryId) + .device(deviceId) + .get("num_id"); if (numId != null) { attributes.put("deviceNumId", numId); logger.info("Found numId {} for device {}/{}", numId, registryId, deviceId); @@ -280,7 +285,8 @@ public void messageArrived(String topic, MqttMessage message) { logger.warn("numId not found in etcd for device {}/{}", registryId, deviceId); } } catch (Exception e) { - logger.warn("Error reading numId from etcd for device {}/{}", registryId, deviceId, e); + logger.warn("Error reading numId from etcd for device {}/{}", + registryId, deviceId, e); } } diff --git a/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java b/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java index 2744727476..c5e68a6c88 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java @@ -7,6 +7,8 @@ import static org.mockito.Mockito.when; import com.google.api.core.ApiFutures; +import com.google.bos.udmi.service.support.DataRef; +import com.google.bos.udmi.service.support.EtcdDataProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.pubsub.v1.PubsubMessage; import java.util.Map; @@ -14,8 +16,6 @@ import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.junit.jupiter.api.Test; -import com.google.bos.udmi.service.support.DataRef; -import com.google.bos.udmi.service.support.EtcdDataProvider; import org.mockito.ArgumentCaptor; class MqttToPubSubBridgeTest { @@ -123,13 +123,13 @@ void testSetupBridgeUnrecognizedTopic() throws Exception { @Test void testSetupBridgeWithEtcd() throws Exception { - IMqttClient mockMqttClient = mock(IMqttClient.class); - Publisher mockPublisher = mock(Publisher.class); - EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); - DataRef mockDataRef = mock(DataRef.class); + final IMqttClient mockMqttClient = mock(IMqttClient.class); + final Publisher mockPublisher = mock(Publisher.class); + final EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); + final DataRef mockDataRef = mock(DataRef.class); - String testTopic = "/r/my-registry/d/my-device/events"; - String payloadStr = "Hello World"; + final String testTopic = "/r/my-registry/d/my-device/events"; + final String payloadStr = "Hello World"; final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); when(mockPublisher.publish(any(PubsubMessage.class))) @@ -160,13 +160,13 @@ void testSetupBridgeWithEtcd() throws Exception { @Test void testSetupBridgeWithEtcdNullResult() throws Exception { - IMqttClient mockMqttClient = mock(IMqttClient.class); - Publisher mockPublisher = mock(Publisher.class); - EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); - DataRef mockDataRef = mock(DataRef.class); + final IMqttClient mockMqttClient = mock(IMqttClient.class); + final Publisher mockPublisher = mock(Publisher.class); + final EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); + final DataRef mockDataRef = mock(DataRef.class); - String testTopic = "/r/my-registry/d/my-device/events"; - String payloadStr = "Hello World"; + final String testTopic = "/r/my-registry/d/my-device/events"; + final String payloadStr = "Hello World"; final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); when(mockPublisher.publish(any(PubsubMessage.class))) @@ -197,13 +197,13 @@ void testSetupBridgeWithEtcdNullResult() throws Exception { @Test void testSetupBridgeWithEtcdFailure() throws Exception { - IMqttClient mockMqttClient = mock(IMqttClient.class); - Publisher mockPublisher = mock(Publisher.class); - EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); - DataRef mockDataRef = mock(DataRef.class); + final IMqttClient mockMqttClient = mock(IMqttClient.class); + final Publisher mockPublisher = mock(Publisher.class); + final EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); + final DataRef mockDataRef = mock(DataRef.class); - String testTopic = "/r/my-registry/d/my-device/events"; - String payloadStr = "Hello World"; + final String testTopic = "/r/my-registry/d/my-device/events"; + final String payloadStr = "Hello World"; final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); when(mockPublisher.publish(any(PubsubMessage.class)))