diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 6d3491d5..47fbdeaf 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -18,6 +18,7 @@ import java.net.http.WebSocket.Listener; import java.nio.ByteBuffer; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -295,6 +296,9 @@ void dispatchLoop() { .buildAsync( URI.create(url), new WebSocket.Listener() { + List textBuffer = new ArrayList<>(); + CompletableFuture messageCompleteFuture = new CompletableFuture<>(); + @Override public void onOpen(WebSocket webSocket) { logger.debug("Opened connection to DBOS conductor"); @@ -335,13 +339,25 @@ public void onError(WebSocket webSocket, Throwable error) { @Override public CompletionStage onText( WebSocket webSocket, CharSequence data, boolean last) { - BaseMessage request; + textBuffer.add(data); webSocket.request(1); + + if (!last) { + return messageCompleteFuture; + } + + String message = String.join("", textBuffer); + textBuffer = new ArrayList<>(); + messageCompleteFuture.complete(null); + CompletionStage cf = messageCompleteFuture; + messageCompleteFuture = new CompletableFuture<>(); + + BaseMessage request; try { - request = JSONUtil.fromJson(data.toString(), BaseMessage.class); + request = JSONUtil.fromJson(message, BaseMessage.class); } catch (Exception e) { logger.error("Conductor JSON Parsing error", e); - return CompletableFuture.completedStage(null); + return cf; } String responseText; @@ -350,7 +366,7 @@ public CompletionStage onText( responseText = JSONUtil.toJson(response); } catch (Exception e) { logger.error("Conductor Response error", e); - return CompletableFuture.completedStage(null); + return cf; } return webSocket