From 4283b9e4435ff51f5cac60cb5293962ef411dbe8 Mon Sep 17 00:00:00 2001 From: Trevor Pering Date: Fri, 1 May 2026 06:08:47 -0700 Subject: [PATCH 1/2] Suggested fixes --- .../com/google/bos/iot/core/proxy/IotReflectorClient.java | 4 +++- .../java/com/google/bos/iot/core/proxy/MqttPublisher.java | 7 ++++--- .../java/com/google/daq/mqtt/sequencer/SequenceBase.java | 8 ++++++-- .../com/google/daq/mqtt/sequencer/SequenceRunner.java | 8 +++++--- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java index 288b20baee..8add28cd25 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/IotReflectorClient.java @@ -546,7 +546,9 @@ protected void errorHandler(Throwable throwable) { receiveStats.update(); System.err.printf("Received mqtt client error: %s at %s%n", throwable.getMessage(), getTimestamp()); - close(); + if (active) { + close(); + } } @Override diff --git a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java index f0feb89e7e..2ff53afa9a 100644 --- a/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java +++ b/validator/src/main/java/com/google/bos/iot/core/proxy/MqttPublisher.java @@ -437,6 +437,7 @@ private void sendMessage(String mqttTopic, byte[] mqttMessage) throws Exception @Override public synchronized void close() { + shutdown = true; try { LOG.debug(format("Shutting down executor %x", publisherExecutor.hashCode())); ifNotNullThen(tickler, () -> tickler.cancel(false)); @@ -591,12 +592,12 @@ private long refreshJwtFuture() { } private void disconnectMqtt() throws MqttException { - long quiesceTimeout = TOKEN_EXPIRATION.toMillis() / 8; + long quiesceTimeoutMs = 5000; try { - mqttClient.disconnect(quiesceTimeout); + mqttClient.disconnect(quiesceTimeoutMs); } catch (Exception e) { LOG.error("Graceful disconnect failed, forcing disconnect: " + e.getMessage()); - mqttClient.disconnectForcibly(quiesceTimeout); + mqttClient.disconnectForcibly(quiesceTimeoutMs); } } diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java index ac3aa05aa5..dc66903d67 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceBase.java @@ -505,8 +505,12 @@ private static void updateValidationState() { validationState.timestamp = cleanDate(); JsonUtil.writeFile(validationState, getSequencerStateFile()); String validationString = stringify(validationState); - ifNotNullThen(client, - () -> client.publish(getDeviceId(), VALIDATION_STATE_TOPIC, validationString)); + try { + ifNotNullThen(client, + () -> client.publish(getDeviceId(), VALIDATION_STATE_TOPIC, validationString)); + } catch (Exception e) { + System.err.println("Error publishing validation state: " + Common.getExceptionMessage(e)); + } } static File getSequencerStateFile() { diff --git a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceRunner.java b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceRunner.java index 75528f2f0e..e71cc86dda 100644 --- a/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceRunner.java +++ b/validator/src/main/java/com/google/daq/mqtt/sequencer/SequenceRunner.java @@ -265,9 +265,11 @@ private void processRaw() { SubFolder kind = getTargetFacetKind(target); SequenceBase.activePrimary = getTargetPrimary(target); Set targetFacets = getTargetFacets(kind); - for (String targetFacet : targetFacets) { - SequenceBase.activeFacet = ifNotNullGet(kind, f -> new SimpleEntry<>(f, targetFacet)); - runCount += runOneTarget(request); + if (targetFacets != null) { + for (String targetFacet : targetFacets) { + SequenceBase.activeFacet = ifNotNullGet(kind, f -> new SimpleEntry<>(f, targetFacet)); + runCount += runOneTarget(request); + } } } } From c2d2a6a14e3e92937fc61f44489fca2170118f08 Mon Sep 17 00:00:00 2001 From: Trevor Pering Date: Fri, 1 May 2026 09:30:27 -0700 Subject: [PATCH 2/2] Blocking pool fixes --- .../bos/udmi/service/access/IotAccessBase.java | 13 ++++++++++++- .../bos/udmi/service/core/ControlProcessor.java | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java b/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java index deb758f3ef..01f2ed6d1b 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java @@ -44,6 +44,9 @@ public abstract class IotAccessBase extends ContainerBase implements IotAccessPr protected static final String EMPTY_JSON = "{}"; private static final long REGISTRY_COMMAND_BACKOFF_SEC = 60; private static final Map, Instant> BACKOFF_MAP = new ConcurrentHashMap<>(); + private static final Map, Instant> ACTIVE_CONNECTIONS = + new ConcurrentHashMap<>(); + private static final Duration ACTIVE_THRESHOLD = Duration.ofMinutes(30); private static final long CONFIG_UPDATE_BACKOFF_MS = 1000; private static final int CONFIG_UPDATE_MAX_RETRIES = 10; private static final Duration REGISTRY_REFRESH = Duration.ofMinutes(10); @@ -197,8 +200,15 @@ public void activate() { } } + /** + * Return the set of currently active connections. + */ public Set> getActiveConnections() { - return BACKOFF_MAP.keySet(); + Instant cutoff = Instant.now().minus(ACTIVE_THRESHOLD); + return ACTIVE_CONNECTIONS.entrySet().stream() + .filter(entry -> entry.getValue().isAfter(cutoff)) + .map(Entry::getKey) + .collect(Collectors.toSet()); } /** @@ -307,6 +317,7 @@ public final void sendCommand(Envelope envelope, SubFolder folder, String messag public void setProviderAffinity(String registryId, String deviceId, String providerId) { registryBackoffClear(registryId, deviceId); + ACTIVE_CONNECTIONS.put(getBackoffKey(registryId, deviceId), Instant.now()); } /** diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/ControlProcessor.java b/udmis/src/main/java/com/google/bos/udmi/service/core/ControlProcessor.java index 061c8cb75e..12048e231c 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/ControlProcessor.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/ControlProcessor.java @@ -53,7 +53,7 @@ public void udmiConfigHandler(UdmiConfig config) { message.payload = encodeBase64(stringify(config)); message.subType = SubType.CONFIG; message.subFolder = SubFolder.UDMI; - iotAccess.getActiveConnections().forEach(entry -> { + iotAccess.getActiveConnections().parallelStream().forEach(entry -> { debug("Propagate UdmiConfig to " + entry); iotAccess.sendCommand(makeEntryEnvelope(entry), SubFolder.UDMI, stringify(message)); });