diff --git a/dev-support/checkstyle.xml b/dev-support/checkstyle.xml
index db4954fb49..f7e168b029 100644
--- a/dev-support/checkstyle.xml
+++ b/dev-support/checkstyle.xml
@@ -60,6 +60,10 @@
+
+
+
+
diff --git a/ratis-client/dev-support/findbugsExcludeFile.xml b/ratis-client/dev-support/findbugsExcludeFile.xml
index 3a808c4486..ddeb10b40c 100644
--- a/ratis-client/dev-support/findbugsExcludeFile.xml
+++ b/ratis-client/dev-support/findbugsExcludeFile.xml
@@ -27,6 +27,10 @@
+
+
+
+
@@ -39,4 +43,4 @@
-
\ No newline at end of file
+
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
index a9bcd9d58a..6f94709b0d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
@@ -18,9 +18,11 @@
package org.apache.ratis.client;
+import org.apache.ratis.datastream.DataStreamObserver;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
@@ -36,4 +38,17 @@ default CompletableFuture streamAsync(DataStreamRequest request
throw new UnsupportedOperationException(getClass() + " does not support "
+ JavaUtils.getCurrentStackTraceElement().getMethodName());
}
+
+ /**
+ * Async call to send a request to receive a stream of intermediate replies and a final reply.
+ *
+ * @param request the request
+ * @param replyHandler to handle intermediate replies
+ * @return a future the final reply
+ */
+ default CompletableFuture streamAsync(DataStreamRequest request,
+ DataStreamObserver> replyHandler) {
+ throw new UnsupportedOperationException(getClass() + " does not support "
+ + JavaUtils.getCurrentStackTraceElement().getMethodName());
+ }
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
index 9e5e2438cb..7152d35ba2 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
@@ -50,4 +50,12 @@ default DataStreamOutput stream() {
/** Create a stream by providing a customized header message and route table. */
DataStreamOutput stream(ByteBuffer headerMessage, RoutingTable routingTable);
+
+ /** Create a stream to read data. */
+ default DataStreamInput streamReadOnly() {
+ return streamReadOnly(null);
+ }
+
+ /** Create a stream by providing a customized header message for reading data. */
+ DataStreamInput streamReadOnly(ByteBuffer headerMessage);
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamInput.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamInput.java
new file mode 100644
index 0000000000..e50033405f
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamInput.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.api;
+
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.util.ReferenceCountedObject;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An asynchronous input stream supporting zero buffer copying.
+ */
+public interface DataStreamInput extends Closeable {
+ /**
+ * Read the next chunk in the stream asynchronously.
+ * The caller owns the returned {@link DataStreamReply} which is a {@link ReferenceCountedObject}.
+ * It must call {@link ReferenceCountedObject#release()} after consuming it.
+ *
+ * @return a future of the reference-counted reply.
+ */
+ CompletableFuture> readAsync();
+}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 3c055009f1..bc8334bf98 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -23,6 +23,7 @@
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.api.DataStreamInput;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
@@ -265,6 +266,12 @@ public DataStreamOutputRpc stream(ByteBuffer headerMessage, RoutingTable routing
.build());
}
+ @Override
+ public DataStreamInput streamReadOnly(ByteBuffer headerMessage) {
+ return new DataStreamInputImpl(dataStreamClientRpc,
+ newBuilder(headerMessage).setType(RaftClientRequest.readRequestType()).build());
+ }
+
private RaftClientRequest.Builder newBuilder(ByteBuffer headerMessage) {
final Message message = headerMessage == null ? null : Message.valueOf(headerMessage);
return RaftClientRequest.newBuilder()
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamInputImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamInputImpl.java
new file mode 100644
index 0000000000..c3e4f3420d
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamInputImpl.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.client.api.DataStreamInput;
+import org.apache.ratis.datastream.DataStreamObserver;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ReferenceCountedObject;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+final class DataStreamInputImpl implements DataStreamInput,
+ DataStreamObserver> {
+ private final RaftClientRequest header;
+ private final ClientId clientId;
+ private final Queue> replies = new LinkedList<>();
+ private final Queue>> pendingReads = new LinkedList<>();
+
+ /*
+ * null : the stream is open.
+ * AlreadyClosedException: the stream is closed.
+ * Other exception : the stream is failed.
+ */
+ private Throwable readException;
+
+ DataStreamInputImpl(DataStreamClientRpc dataStreamClientRpc, RaftClientRequest request) {
+ this.header = request;
+ this.clientId = request.getClientId();
+ final ByteBuffer buffer = ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
+ final DataStreamRequestHeader h = new DataStreamRequestHeader(clientId, Type.STREAM_HEADER,
+ header.getCallId(), 0, buffer.remaining(), StandardWriteOption.FLUSH, StandardWriteOption.CLOSE);
+ dataStreamClientRpc.streamAsync(new DataStreamRequestByteBuffer(h, buffer), this)
+ .whenComplete(asWhenCompleteBiConsumer());
+ }
+
+ @Override
+ public synchronized void onNext(ReferenceCountedObject reply) {
+ if (readException != null) {
+ return;
+ }
+
+ reply.retain();
+ for (CompletableFuture> pending;
+ (pending = pendingReads.poll()) != null; ) {
+ if (pending.complete(reply)) {
+ return;
+ }
+ }
+ replies.add(reply);
+ }
+
+ @Override
+ public synchronized void onError(Throwable throwable) {
+ // An error case, release the replies
+ releaseReplies();
+ if (readException == null) {
+ readException = throwable;
+ failPendingReads();
+ }
+ }
+
+ @Override
+ public synchronized void onCompleted() {
+ // Not an error case, do not release the replies
+ if (readException == null) {
+ // No more onNext(), the pending reads cannot be completed.
+ readException = new EOFException(clientId + ": end of stream, request=" + header);
+ failPendingReads();
+ }
+ }
+
+ private void releaseReplies() {
+ for (ReferenceCountedObject reply; (reply = replies.poll()) != null; ) {
+ reply.release();
+ }
+ }
+
+ private void failPendingReads() {
+ Objects.requireNonNull(readException, "readException == null");
+ for (CompletableFuture> p; (p = pendingReads.poll()) != null; ) {
+ p.completeExceptionally(readException);
+ }
+ }
+
+ @Override
+ public synchronized CompletableFuture> readAsync() {
+ final ReferenceCountedObject reply = replies.poll();
+ if (reply != null) {
+ return CompletableFuture.completedFuture(reply);
+ }
+ if (readException != null) {
+ return JavaUtils.completeExceptionally(readException);
+ }
+ final CompletableFuture> f =
+ new CompletableFuture<>();
+ pendingReads.add(f);
+ return f;
+ }
+
+ @Override
+ public synchronized void close() {
+ // When close() is called the first time, we set
+ // (1) release all replies
+ // (2) set readException to AlreadyClosedException
+ // (3) complete all pendingReads, and
+ releaseReplies();
+ if (readException == null) {
+ readException = new AlreadyClosedException(clientId + ": stream already closed, request=" + header);
+ failPendingReads();
+ }
+ }
+}
diff --git a/ratis-common/dev-support/findbugsExcludeFile.xml b/ratis-common/dev-support/findbugsExcludeFile.xml
index 882f08b7fa..cec157b527 100644
--- a/ratis-common/dev-support/findbugsExcludeFile.xml
+++ b/ratis-common/dev-support/findbugsExcludeFile.xml
@@ -19,6 +19,10 @@
+
+
+
+
@@ -43,6 +47,10 @@
+
+
+
+
@@ -111,4 +119,4 @@
-
\ No newline at end of file
+
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamObserver.java b/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamObserver.java
new file mode 100644
index 0000000000..92f62b7518
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/DataStreamObserver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.datastream;
+
+import java.util.function.BiConsumer;
+
+/** An interface similar to gRPC {@link org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver}. */
+public interface DataStreamObserver {
+ void onNext(V value);
+
+ void onCompleted();
+
+ void onError(Throwable throwable);
+
+ default BiConsumer asWhenCompleteBiConsumer() {
+ return (v, throwable) -> {
+ if (throwable != null) {
+ onError(throwable);
+ } else {
+ onCompleted();
+ }
+ };
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuf.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuf.java
index 76400ff803..e6ceeb93df 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuf.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuf.java
@@ -30,7 +30,7 @@
*
* This class is immutable.
*/
-class DataStreamPacketByteBuf extends DataStreamPacketImpl {
+public class DataStreamPacketByteBuf extends DataStreamPacketImpl {
private final AtomicReference buf;
DataStreamPacketByteBuf(ClientId clientId, Type type, long streamId, long streamOffset, ByteBuf buf) {
@@ -48,7 +48,8 @@ final ByteBuf getBuf() {
@Override
public final long getDataLength() {
- return getBuf().readableBytes();
+ final ByteBuf got = buf.get();
+ return got != null ? got.readableBytes() : -1;
}
public final ByteBuf slice() {
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuf.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuf.java
index 2477b089b7..6ce707dc17 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuf.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuf.java
@@ -118,4 +118,10 @@ static ByteBuffer copy(ByteBuf buf) {
buf.readBytes(bytes);
return ByteBuffer.wrap(bytes);
}
+
+ public static void release(DataStreamReply reply) {
+ if (reply instanceof DataStreamReplyByteBuf) {
+ ((DataStreamReplyByteBuf) reply).release();
+ }
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 459aee363c..3d4f53d378 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -30,4 +30,4 @@ public interface DataStreamReply extends DataStreamPacket {
/** @return the commit information when the reply is created. */
Collection getCommitInfos();
-}
\ No newline at end of file
+}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 2c5015b123..9f8cbecc8b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -169,6 +169,22 @@ static void encodeDataStreamReply(DataStreamReplyByteBuf reply, Consumer