Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public abstract class IotAccessBase extends ContainerBase implements IotAccessPr
protected static final String EMPTY_JSON = "{}";
private static final long REGISTRY_COMMAND_BACKOFF_SEC = 60;
private static final Map<Entry<String, String>, Instant> BACKOFF_MAP = new ConcurrentHashMap<>();
private static final Map<Entry<String, String>, Instant> ACTIVE_CONNECTIONS =
new ConcurrentHashMap<>();
private static final Duration ACTIVE_THRESHOLD = Duration.ofMinutes(30);
private static final long CONFIG_UPDATE_BACKOFF_MS = 1000;
private static final int CONFIG_UPDATE_MAX_RETRIES = 10;
private static final Duration REGISTRY_REFRESH = Duration.ofMinutes(10);
Expand Down Expand Up @@ -197,8 +200,15 @@ public void activate() {
}
}

/**
* Return the set of currently active connections.
*/
public Set<Entry<String, String>> getActiveConnections() {
return BACKOFF_MAP.keySet();
Instant cutoff = Instant.now().minus(ACTIVE_THRESHOLD);
return ACTIVE_CONNECTIONS.entrySet().stream()
.filter(entry -> entry.getValue().isAfter(cutoff))
.map(Entry::getKey)
.collect(Collectors.toSet());
}

/**
Expand Down Expand Up @@ -307,6 +317,7 @@ public final void sendCommand(Envelope envelope, SubFolder folder, String messag

public void setProviderAffinity(String registryId, String deviceId, String providerId) {
registryBackoffClear(registryId, deviceId);
ACTIVE_CONNECTIONS.put(getBackoffKey(registryId, deviceId), Instant.now());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void udmiConfigHandler(UdmiConfig config) {
message.payload = encodeBase64(stringify(config));
message.subType = SubType.CONFIG;
message.subFolder = SubFolder.UDMI;
iotAccess.getActiveConnections().forEach(entry -> {
iotAccess.getActiveConnections().parallelStream().forEach(entry -> {
debug("Propagate UdmiConfig to " + entry);
iotAccess.sendCommand(makeEntryEnvelope(entry), SubFolder.UDMI, stringify(message));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,9 @@ protected void errorHandler(Throwable throwable) {
receiveStats.update();
System.err.printf("Received mqtt client error: %s at %s%n",
throwable.getMessage(), getTimestamp());
close();
if (active) {
close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ private void sendMessage(String mqttTopic, byte[] mqttMessage) throws Exception

@Override
public synchronized void close() {
shutdown = true;
try {
LOG.debug(format("Shutting down executor %x", publisherExecutor.hashCode()));
ifNotNullThen(tickler, () -> tickler.cancel(false));
Expand Down Expand Up @@ -591,12 +592,12 @@ private long refreshJwtFuture() {
}

private void disconnectMqtt() throws MqttException {
long quiesceTimeout = TOKEN_EXPIRATION.toMillis() / 8;
long quiesceTimeoutMs = 5000;
try {
mqttClient.disconnect(quiesceTimeout);
mqttClient.disconnect(quiesceTimeoutMs);
} catch (Exception e) {
LOG.error("Graceful disconnect failed, forcing disconnect: " + e.getMessage());
mqttClient.disconnectForcibly(quiesceTimeout);
mqttClient.disconnectForcibly(quiesceTimeoutMs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,12 @@ private static void updateValidationState() {
validationState.timestamp = cleanDate();
JsonUtil.writeFile(validationState, getSequencerStateFile());
String validationString = stringify(validationState);
ifNotNullThen(client,
() -> client.publish(getDeviceId(), VALIDATION_STATE_TOPIC, validationString));
try {
ifNotNullThen(client,
() -> client.publish(getDeviceId(), VALIDATION_STATE_TOPIC, validationString));
} catch (Exception e) {
System.err.println("Error publishing validation state: " + Common.getExceptionMessage(e));
}
}

static File getSequencerStateFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,11 @@ private void processRaw() {
SubFolder kind = getTargetFacetKind(target);
SequenceBase.activePrimary = getTargetPrimary(target);
Set<String> targetFacets = getTargetFacets(kind);
for (String targetFacet : targetFacets) {
SequenceBase.activeFacet = ifNotNullGet(kind, f -> new SimpleEntry<>(f, targetFacet));
runCount += runOneTarget(request);
if (targetFacets != null) {
for (String targetFacet : targetFacets) {
SequenceBase.activeFacet = ifNotNullGet(kind, f -> new SimpleEntry<>(f, targetFacet));
runCount += runOneTarget(request);
}
}
}
}
Expand Down
Loading