Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions transact/src/main/java/dev/dbos/transact/conductor/Conductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.net.http.WebSocket.Listener;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -295,6 +296,9 @@ void dispatchLoop() {
.buildAsync(
URI.create(url),
new WebSocket.Listener() {
List<CharSequence> textBuffer = new ArrayList<>();
CompletableFuture<?> messageCompleteFuture = new CompletableFuture<>();

@Override
public void onOpen(WebSocket webSocket) {
logger.debug("Opened connection to DBOS conductor");
Expand Down Expand Up @@ -335,13 +339,25 @@ public void onError(WebSocket webSocket, Throwable error) {
@Override
public CompletionStage<?> onText(
WebSocket webSocket, CharSequence data, boolean last) {
BaseMessage request;
textBuffer.add(data);
webSocket.request(1);

if (!last) {
return messageCompleteFuture;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just return null in onText if we are not sending a response. I know the original code returned CompletableFuture.completedStage(null), but futher research indicates it would be more straighforward to return null.

}

String message = String.join("", textBuffer);
textBuffer = new ArrayList<>();
messageCompleteFuture.complete(null);
CompletionStage<?> cf = messageCompleteFuture;
messageCompleteFuture = new CompletableFuture<>();

BaseMessage request;
try {
request = JSONUtil.fromJson(data.toString(), BaseMessage.class);
request = JSONUtil.fromJson(message, BaseMessage.class);
} catch (Exception e) {
logger.error("Conductor JSON Parsing error", e);
return CompletableFuture.completedStage(null);
return cf;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null (see previous comment)

}

String responseText;
Expand All @@ -350,7 +366,7 @@ public CompletionStage<?> onText(
responseText = JSONUtil.toJson(response);
} catch (Exception e) {
logger.error("Conductor Response error", e);
return CompletableFuture.completedStage(null);
return cf;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null (see previous comment)

}

return webSocket
Expand Down