diff --git a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java index 7f0a82539b..58462636ba 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridge.java @@ -3,6 +3,7 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.api.core.ApiFuture; +import com.google.bos.udmi.service.support.EtcdDataProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.common.base.Splitter; import com.google.protobuf.ByteString; @@ -62,6 +63,8 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import udmi.schema.IotAccess; +import udmi.schema.IotAccess.IotProvider; /** * A bridge that subscribes to an MQTT topic and publishes messages to a Google Cloud Pub/Sub topic. @@ -100,6 +103,8 @@ public static void main(String[] args) { String mqttPassword = commandLine.getOptionValue("mqtt_password"); String mqttClientCertPath = commandLine.getOptionValue("mqtt_client_cert_path"); String mqttClientKeyPath = commandLine.getOptionValue("mqtt_client_key_path"); + String etcdTarget = commandLine.getOptionValue("etcd_target"); + String etcdOptions = commandLine.getOptionValue("etcd_options"); if (gcpProjectId == null || pubsubTopicId == null) { logger.error("gcp_project_id and pubsub_topic_id are required."); @@ -108,8 +113,18 @@ public static void main(String[] args) { Publisher publisher = null; IMqttClient mqttClient = null; + EtcdDataProvider etcdProvider = null; try { + if (etcdTarget != null) { + IotAccess iotAccess = new IotAccess(); + iotAccess.provider = IotProvider.ETCD; + iotAccess.project_id = etcdTarget; + iotAccess.options = etcdOptions; + etcdProvider = new EtcdDataProvider(iotAccess); + logger.info("EtcdDataProvider initialized for target: {}", etcdTarget); + } + // Initialize Pub/Sub Publisher ProjectTopicName topicName = ProjectTopicName.of(gcpProjectId, pubsubTopicId); publisher = Publisher.newBuilder(topicName).build(); @@ -138,7 +153,7 @@ public static void main(String[] args) { logger.info("Connected to MQTT broker."); // Set up MQTT Message Callback - setupBridge(mqttClient, publisher, mqttSubscriptionTopic); + setupBridge(mqttClient, publisher, mqttSubscriptionTopic, etcdProvider); // Keep the application running while (true) { @@ -158,6 +173,14 @@ public static void main(String[] args) { logger.error("An unexpected error occurred", e); } finally { // Shutdown + if (etcdProvider != null) { + try { + etcdProvider.shutdown(); + logger.info("EtcdDataProvider shut down."); + } catch (Exception e) { + logger.warn("Error shutting down EtcdDataProvider", e); + } + } if (mqttClient != null && mqttClient.isConnected()) { try { mqttClient.disconnect(); @@ -189,6 +212,8 @@ private static CommandLine parseArgs(String[] args) throws ParseException { options.addOption(null, "mqtt_password", true, "MQTT password for authentication."); options.addOption(null, "mqtt_client_cert_path", true, "Path to client certificate for TLS."); options.addOption(null, "mqtt_client_key_path", true, "Path to client private key for TLS."); + options.addOption(null, "etcd_target", true, "etcd endpoint URL."); + options.addOption(null, "etcd_options", true, "etcd provider options (comma-separated)."); options.addOption("h", "help", false, "Print usage info."); CommandLineParser parser = new DefaultParser(); @@ -212,7 +237,7 @@ private static CommandLine parseArgs(String[] args) throws ParseException { * @throws MqttException If an MQTT error occurs. */ public static void setupBridge(IMqttClient mqttClient, Publisher publisher, - String mqttSubscriptionTopic) throws MqttException { + String mqttSubscriptionTopic, EtcdDataProvider etcdProvider) throws MqttException { mqttClient.setCallback( new MqttCallback() { @Override @@ -245,6 +270,26 @@ public void messageArrived(String topic, MqttMessage message) { attributes.put("deviceId", deviceId); attributes.put("deviceRegistryId", registryId); + if (etcdProvider != null + && !"unknown".equals(registryId) + && !"unknown".equals(deviceId)) { + try { + String numId = etcdProvider.ref() + .registry(registryId) + .device(deviceId) + .get("num_id"); + if (numId != null) { + attributes.put("deviceNumId", numId); + logger.info("Found numId {} for device {}/{}", numId, registryId, deviceId); + } else { + logger.warn("numId not found in etcd for device {}/{}", registryId, deviceId); + } + } catch (Exception e) { + logger.warn("Error reading numId from etcd for device {}/{}", + registryId, deviceId, e); + } + } + if (topicSuffix != null && topicSuffix.startsWith("events/")) { List parts = Splitter.on('/').splitToList(topicSuffix); if (parts.size() >= 2) { diff --git a/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java b/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java index e950b3034a..c5e68a6c88 100644 --- a/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java +++ b/udmis/src/test/java/com/google/bos/udmi/service/bridge/MqttToPubSubBridgeTest.java @@ -7,6 +7,8 @@ import static org.mockito.Mockito.when; import com.google.api.core.ApiFutures; +import com.google.bos.udmi.service.support.DataRef; +import com.google.bos.udmi.service.support.EtcdDataProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.pubsub.v1.PubsubMessage; import java.util.Map; @@ -31,7 +33,7 @@ void testSetupBridge() throws Exception { .thenReturn(ApiFutures.immediateFuture("msg-123")); // Call setupBridge - MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic); + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, null); // Verify subscription verify(mockMqttClient).subscribe(testTopic); @@ -69,7 +71,7 @@ void testSetupBridgeWithSubFolder() throws Exception { when(mockPublisher.publish(any(PubsubMessage.class))) .thenReturn(ApiFutures.immediateFuture("msg-123")); - MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic); + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, null); ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); verify(mockMqttClient).setCallback(callbackCaptor.capture()); @@ -100,7 +102,7 @@ void testSetupBridgeUnrecognizedTopic() throws Exception { when(mockPublisher.publish(any(PubsubMessage.class))) .thenReturn(ApiFutures.immediateFuture("msg-123")); - MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic); + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, null); ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); verify(mockMqttClient).setCallback(callbackCaptor.capture()); @@ -118,4 +120,116 @@ void testSetupBridgeUnrecognizedTopic() throws Exception { assertEquals("unknown", attributes.get("deviceId")); assertEquals("unknown", attributes.get("deviceRegistryId")); } + + @Test + void testSetupBridgeWithEtcd() throws Exception { + final IMqttClient mockMqttClient = mock(IMqttClient.class); + final Publisher mockPublisher = mock(Publisher.class); + final EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); + final DataRef mockDataRef = mock(DataRef.class); + + final String testTopic = "/r/my-registry/d/my-device/events"; + final String payloadStr = "Hello World"; + final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); + + when(mockPublisher.publish(any(PubsubMessage.class))) + .thenReturn(ApiFutures.immediateFuture("msg-123")); + + // Mock etcd provider to return a numId + when(mockEtcdProvider.ref()).thenReturn(mockDataRef); + when(mockDataRef.registry("my-registry")).thenReturn(mockDataRef); + when(mockDataRef.device("my-device")).thenReturn(mockDataRef); + when(mockDataRef.get("num_id")).thenReturn("123456"); + + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, mockEtcdProvider); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); + verify(mockMqttClient).setCallback(callbackCaptor.capture()); + MqttCallback callback = callbackCaptor.getValue(); + + callback.messageArrived(testTopic, mqttMessage); + + ArgumentCaptor pubsubMessageCaptor = + ArgumentCaptor.forClass(PubsubMessage.class); + verify(mockPublisher).publish(pubsubMessageCaptor.capture()); + + PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue(); + Map attributes = pubsubMessage.getAttributesMap(); + assertEquals("123456", attributes.get("deviceNumId")); + } + + @Test + void testSetupBridgeWithEtcdNullResult() throws Exception { + final IMqttClient mockMqttClient = mock(IMqttClient.class); + final Publisher mockPublisher = mock(Publisher.class); + final EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); + final DataRef mockDataRef = mock(DataRef.class); + + final String testTopic = "/r/my-registry/d/my-device/events"; + final String payloadStr = "Hello World"; + final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); + + when(mockPublisher.publish(any(PubsubMessage.class))) + .thenReturn(ApiFutures.immediateFuture("msg-123")); + + // Mock etcd provider to return null for numId + when(mockEtcdProvider.ref()).thenReturn(mockDataRef); + when(mockDataRef.registry("my-registry")).thenReturn(mockDataRef); + when(mockDataRef.device("my-device")).thenReturn(mockDataRef); + when(mockDataRef.get("num_id")).thenReturn(null); + + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, mockEtcdProvider); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); + verify(mockMqttClient).setCallback(callbackCaptor.capture()); + MqttCallback callback = callbackCaptor.getValue(); + + callback.messageArrived(testTopic, mqttMessage); + + ArgumentCaptor pubsubMessageCaptor = + ArgumentCaptor.forClass(PubsubMessage.class); + verify(mockPublisher).publish(pubsubMessageCaptor.capture()); + + PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue(); + Map attributes = pubsubMessage.getAttributesMap(); + org.junit.jupiter.api.Assertions.assertFalse(attributes.containsKey("deviceNumId")); + } + + @Test + void testSetupBridgeWithEtcdFailure() throws Exception { + final IMqttClient mockMqttClient = mock(IMqttClient.class); + final Publisher mockPublisher = mock(Publisher.class); + final EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class); + final DataRef mockDataRef = mock(DataRef.class); + + final String testTopic = "/r/my-registry/d/my-device/events"; + final String payloadStr = "Hello World"; + final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes()); + + when(mockPublisher.publish(any(PubsubMessage.class))) + .thenReturn(ApiFutures.immediateFuture("msg-123")); + + // Mock etcd provider to throw exception + when(mockEtcdProvider.ref()).thenReturn(mockDataRef); + when(mockDataRef.registry("my-registry")).thenReturn(mockDataRef); + when(mockDataRef.device("my-device")).thenReturn(mockDataRef); + when(mockDataRef.get("num_id")).thenThrow(new RuntimeException("etcd error")); + + MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, mockEtcdProvider); + + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class); + verify(mockMqttClient).setCallback(callbackCaptor.capture()); + MqttCallback callback = callbackCaptor.getValue(); + + // This should not throw exception and message should still be published + callback.messageArrived(testTopic, mqttMessage); + + ArgumentCaptor pubsubMessageCaptor = + ArgumentCaptor.forClass(PubsubMessage.class); + verify(mockPublisher).publish(pubsubMessageCaptor.capture()); + + PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue(); + Map attributes = pubsubMessage.getAttributesMap(); + org.junit.jupiter.api.Assertions.assertFalse(attributes.containsKey("deviceNumId")); + } }