Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.api.core.ApiFuture;
import com.google.bos.udmi.service.support.EtcdDataProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.base.Splitter;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -62,6 +63,8 @@
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import udmi.schema.IotAccess;
import udmi.schema.IotAccess.IotProvider;

/**
* A bridge that subscribes to an MQTT topic and publishes messages to a Google Cloud Pub/Sub topic.
Expand Down Expand Up @@ -100,6 +103,8 @@ public static void main(String[] args) {
String mqttPassword = commandLine.getOptionValue("mqtt_password");
String mqttClientCertPath = commandLine.getOptionValue("mqtt_client_cert_path");
String mqttClientKeyPath = commandLine.getOptionValue("mqtt_client_key_path");
String etcdTarget = commandLine.getOptionValue("etcd_target");
String etcdOptions = commandLine.getOptionValue("etcd_options");

if (gcpProjectId == null || pubsubTopicId == null) {
logger.error("gcp_project_id and pubsub_topic_id are required.");
Expand All @@ -108,8 +113,18 @@ public static void main(String[] args) {

Publisher publisher = null;
IMqttClient mqttClient = null;
EtcdDataProvider etcdProvider = null;

try {
if (etcdTarget != null) {
IotAccess iotAccess = new IotAccess();
iotAccess.provider = IotProvider.ETCD;
iotAccess.project_id = etcdTarget;
iotAccess.options = etcdOptions;
etcdProvider = new EtcdDataProvider(iotAccess);
logger.info("EtcdDataProvider initialized for target: {}", etcdTarget);
}

// Initialize Pub/Sub Publisher
ProjectTopicName topicName = ProjectTopicName.of(gcpProjectId, pubsubTopicId);
publisher = Publisher.newBuilder(topicName).build();
Expand Down Expand Up @@ -138,7 +153,7 @@ public static void main(String[] args) {
logger.info("Connected to MQTT broker.");

// Set up MQTT Message Callback
setupBridge(mqttClient, publisher, mqttSubscriptionTopic);
setupBridge(mqttClient, publisher, mqttSubscriptionTopic, etcdProvider);

// Keep the application running
while (true) {
Expand All @@ -158,6 +173,14 @@ public static void main(String[] args) {
logger.error("An unexpected error occurred", e);
} finally {
// Shutdown
if (etcdProvider != null) {
try {
etcdProvider.shutdown();
logger.info("EtcdDataProvider shut down.");
} catch (Exception e) {
logger.warn("Error shutting down EtcdDataProvider", e);
}
}
if (mqttClient != null && mqttClient.isConnected()) {
try {
mqttClient.disconnect();
Expand Down Expand Up @@ -189,6 +212,8 @@ private static CommandLine parseArgs(String[] args) throws ParseException {
options.addOption(null, "mqtt_password", true, "MQTT password for authentication.");
options.addOption(null, "mqtt_client_cert_path", true, "Path to client certificate for TLS.");
options.addOption(null, "mqtt_client_key_path", true, "Path to client private key for TLS.");
options.addOption(null, "etcd_target", true, "etcd endpoint URL.");
options.addOption(null, "etcd_options", true, "etcd provider options (comma-separated).");
options.addOption("h", "help", false, "Print usage info.");

CommandLineParser parser = new DefaultParser();
Expand All @@ -212,7 +237,7 @@ private static CommandLine parseArgs(String[] args) throws ParseException {
* @throws MqttException If an MQTT error occurs.
*/
public static void setupBridge(IMqttClient mqttClient, Publisher publisher,
String mqttSubscriptionTopic) throws MqttException {
String mqttSubscriptionTopic, EtcdDataProvider etcdProvider) throws MqttException {
mqttClient.setCallback(
new MqttCallback() {
@Override
Expand Down Expand Up @@ -245,6 +270,26 @@ public void messageArrived(String topic, MqttMessage message) {
attributes.put("deviceId", deviceId);
attributes.put("deviceRegistryId", registryId);

if (etcdProvider != null
&& !"unknown".equals(registryId)
&& !"unknown".equals(deviceId)) {
try {
String numId = etcdProvider.ref()
.registry(registryId)
.device(deviceId)
.get("num_id");
if (numId != null) {
attributes.put("deviceNumId", numId);
logger.info("Found numId {} for device {}/{}", numId, registryId, deviceId);
} else {
logger.warn("numId not found in etcd for device {}/{}", registryId, deviceId);
}
} catch (Exception e) {
logger.warn("Error reading numId from etcd for device {}/{}",
registryId, deviceId, e);
}
}

if (topicSuffix != null && topicSuffix.startsWith("events/")) {
List<String> parts = Splitter.on('/').splitToList(topicSuffix);
if (parts.size() >= 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.bos.udmi.service.support.DataRef;
import com.google.bos.udmi.service.support.EtcdDataProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.pubsub.v1.PubsubMessage;
import java.util.Map;
Expand All @@ -31,7 +33,7 @@ void testSetupBridge() throws Exception {
.thenReturn(ApiFutures.immediateFuture("msg-123"));

// Call setupBridge
MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic);
MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, null);

// Verify subscription
verify(mockMqttClient).subscribe(testTopic);
Expand Down Expand Up @@ -69,7 +71,7 @@ void testSetupBridgeWithSubFolder() throws Exception {
when(mockPublisher.publish(any(PubsubMessage.class)))
.thenReturn(ApiFutures.immediateFuture("msg-123"));

MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic);
MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, null);

ArgumentCaptor<MqttCallback> callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class);
verify(mockMqttClient).setCallback(callbackCaptor.capture());
Expand Down Expand Up @@ -100,7 +102,7 @@ void testSetupBridgeUnrecognizedTopic() throws Exception {
when(mockPublisher.publish(any(PubsubMessage.class)))
.thenReturn(ApiFutures.immediateFuture("msg-123"));

MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic);
MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, null);

ArgumentCaptor<MqttCallback> callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class);
verify(mockMqttClient).setCallback(callbackCaptor.capture());
Expand All @@ -118,4 +120,116 @@ void testSetupBridgeUnrecognizedTopic() throws Exception {
assertEquals("unknown", attributes.get("deviceId"));
assertEquals("unknown", attributes.get("deviceRegistryId"));
}

@Test
void testSetupBridgeWithEtcd() throws Exception {
final IMqttClient mockMqttClient = mock(IMqttClient.class);
final Publisher mockPublisher = mock(Publisher.class);
final EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class);
final DataRef mockDataRef = mock(DataRef.class);

final String testTopic = "/r/my-registry/d/my-device/events";
final String payloadStr = "Hello World";
final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes());

when(mockPublisher.publish(any(PubsubMessage.class)))
.thenReturn(ApiFutures.immediateFuture("msg-123"));

// Mock etcd provider to return a numId
when(mockEtcdProvider.ref()).thenReturn(mockDataRef);
when(mockDataRef.registry("my-registry")).thenReturn(mockDataRef);
when(mockDataRef.device("my-device")).thenReturn(mockDataRef);
when(mockDataRef.get("num_id")).thenReturn("123456");

MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, mockEtcdProvider);

ArgumentCaptor<MqttCallback> callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class);
verify(mockMqttClient).setCallback(callbackCaptor.capture());
MqttCallback callback = callbackCaptor.getValue();

callback.messageArrived(testTopic, mqttMessage);

ArgumentCaptor<PubsubMessage> pubsubMessageCaptor =
ArgumentCaptor.forClass(PubsubMessage.class);
verify(mockPublisher).publish(pubsubMessageCaptor.capture());

PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue();
Map<String, String> attributes = pubsubMessage.getAttributesMap();
assertEquals("123456", attributes.get("deviceNumId"));
}

@Test
void testSetupBridgeWithEtcdNullResult() throws Exception {
final IMqttClient mockMqttClient = mock(IMqttClient.class);
final Publisher mockPublisher = mock(Publisher.class);
final EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class);
final DataRef mockDataRef = mock(DataRef.class);

final String testTopic = "/r/my-registry/d/my-device/events";
final String payloadStr = "Hello World";
final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes());

when(mockPublisher.publish(any(PubsubMessage.class)))
.thenReturn(ApiFutures.immediateFuture("msg-123"));

// Mock etcd provider to return null for numId
when(mockEtcdProvider.ref()).thenReturn(mockDataRef);
when(mockDataRef.registry("my-registry")).thenReturn(mockDataRef);
when(mockDataRef.device("my-device")).thenReturn(mockDataRef);
when(mockDataRef.get("num_id")).thenReturn(null);

MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, mockEtcdProvider);

ArgumentCaptor<MqttCallback> callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class);
verify(mockMqttClient).setCallback(callbackCaptor.capture());
MqttCallback callback = callbackCaptor.getValue();

callback.messageArrived(testTopic, mqttMessage);

ArgumentCaptor<PubsubMessage> pubsubMessageCaptor =
ArgumentCaptor.forClass(PubsubMessage.class);
verify(mockPublisher).publish(pubsubMessageCaptor.capture());

PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue();
Map<String, String> attributes = pubsubMessage.getAttributesMap();
org.junit.jupiter.api.Assertions.assertFalse(attributes.containsKey("deviceNumId"));
}

@Test
void testSetupBridgeWithEtcdFailure() throws Exception {
final IMqttClient mockMqttClient = mock(IMqttClient.class);
final Publisher mockPublisher = mock(Publisher.class);
final EtcdDataProvider mockEtcdProvider = mock(EtcdDataProvider.class);
final DataRef mockDataRef = mock(DataRef.class);

final String testTopic = "/r/my-registry/d/my-device/events";
final String payloadStr = "Hello World";
final MqttMessage mqttMessage = new MqttMessage(payloadStr.getBytes());

when(mockPublisher.publish(any(PubsubMessage.class)))
.thenReturn(ApiFutures.immediateFuture("msg-123"));

// Mock etcd provider to throw exception
when(mockEtcdProvider.ref()).thenReturn(mockDataRef);
when(mockDataRef.registry("my-registry")).thenReturn(mockDataRef);
when(mockDataRef.device("my-device")).thenReturn(mockDataRef);
when(mockDataRef.get("num_id")).thenThrow(new RuntimeException("etcd error"));

MqttToPubSubBridge.setupBridge(mockMqttClient, mockPublisher, testTopic, mockEtcdProvider);

ArgumentCaptor<MqttCallback> callbackCaptor = ArgumentCaptor.forClass(MqttCallback.class);
verify(mockMqttClient).setCallback(callbackCaptor.capture());
MqttCallback callback = callbackCaptor.getValue();

// This should not throw exception and message should still be published
callback.messageArrived(testTopic, mqttMessage);

ArgumentCaptor<PubsubMessage> pubsubMessageCaptor =
ArgumentCaptor.forClass(PubsubMessage.class);
verify(mockPublisher).publish(pubsubMessageCaptor.capture());

PubsubMessage pubsubMessage = pubsubMessageCaptor.getValue();
Map<String, String> attributes = pubsubMessage.getAttributesMap();
org.junit.jupiter.api.Assertions.assertFalse(attributes.containsKey("deviceNumId"));
}
}