diff --git a/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java b/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java index 62f71be36..71dc933b1 100644 --- a/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java +++ b/services/sensorhub-service-mqtt-hivemq/src/main/java/org/sensorhub/impl/service/hivemq/WebSocketProxy.java @@ -21,6 +21,11 @@ import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketListener; @@ -45,6 +50,9 @@ public class WebSocketProxy implements WebSocketListener ByteBuffer socketReadBuffer; CompletionHandler socketReadHandler; CompletionHandler socketWriteHandler; + + private final Queue writeQueue = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean writeInProgress = new AtomicBoolean(false); WebSocketProxy(InetSocketAddress mqttHost, Logger logger) @@ -143,13 +151,63 @@ public void onWebSocketClose(int statusCode, String reason) @Override - public void onWebSocketBinary(byte[] payload, int offset, int len) - { - if (mqttSocket != null && mqttSocket.isOpen()) - mqttSocket.write(ByteBuffer.wrap(payload)); + public void onWebSocketBinary(byte[] payload, int offset, int len) { + if (mqttSocket == null || !mqttSocket.isOpen()) + return; + + ByteBuffer buf = ByteBuffer.wrap(Arrays.copyOfRange(payload, offset, offset + len)); + + writeQueue.add(buf); + tryWrite(); + } + + private void tryWrite() { + if (!writeInProgress.compareAndSet(false, true)) + return; + + ByteBuffer first = writeQueue.poll(); + if (first == null) { + writeInProgress.set(false); + return; + } + + // Wrapper so the handler can mutate the current buffer + class State { + ByteBuffer current = first; + } + + State state = new State(); + + mqttSocket.write(state.current, null, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { + // Write the remaining bytes + if (state.current.hasRemaining()) { + mqttSocket.write(state.current, null, this); + return; + } + + // Get next buffer if finished + ByteBuffer next = writeQueue.poll(); + if (next == null) { + writeInProgress.set(false); + return; + } + + state.current = next; + mqttSocket.write(state.current, null, this); + } + + @Override + public void failed(Throwable exc, Void attachment) { + writeInProgress.set(false); + log.error("Error writing to MQTT TCP socket", exc); + } + }); } + @Override public void onWebSocketText(String message) {