Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dev-support/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
</module>

<module name="SuppressWarningsFilter"/>
<module name="SuppressionSingleFilter">
<property name="checks" value="FileLength"/>
<property name="files" value="RaftServerImpl.java"/>
</module>

<!-- Checks that a package-info.java file exists for each package. -->
<!-- See http://checkstyle.sf.net/config_javadoc.html#JavadocPackage -->
Expand Down
6 changes: 5 additions & 1 deletion ratis-client/dev-support/findbugsExcludeFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<Class name="org.apache.ratis.client.impl.DataStreamClientImpl$DataStreamOutputImpl" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
<Class name="org.apache.ratis.client.impl.DataStreamInputImpl" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
<Class name="org.apache.ratis.client.impl.RaftClientRpcWithProxy" />
<Bug pattern="EI_EXPOSE_REP" />
Expand All @@ -39,4 +43,4 @@
<Class name="org.apache.ratis.client.retry.ClientRetryEvent" />
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
</FindBugsFilter>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.ratis.client;

import org.apache.ratis.datastream.DataStreamObserver;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.ReferenceCountedObject;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
Expand All @@ -36,4 +38,17 @@ default CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request
throw new UnsupportedOperationException(getClass() + " does not support "
+ JavaUtils.getCurrentStackTraceElement().getMethodName());
}

/**
* Async call to send a request to receive a stream of intermediate replies and a final reply.
*
* @param request the request
* @param replyHandler to handle intermediate replies
* @return a future the final reply
*/
default CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request,
DataStreamObserver<ReferenceCountedObject<DataStreamReply>> replyHandler) {
throw new UnsupportedOperationException(getClass() + " does not support "
+ JavaUtils.getCurrentStackTraceElement().getMethodName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ default DataStreamOutput stream() {

/** Create a stream by providing a customized header message and route table. */
DataStreamOutput stream(ByteBuffer headerMessage, RoutingTable routingTable);

/** Create a stream to read data. */
default DataStreamInput streamReadOnly() {
return streamReadOnly(null);
}

/** Create a stream by providing a customized header message for reading data. */
DataStreamInput streamReadOnly(ByteBuffer headerMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.client.api;

import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.util.ReferenceCountedObject;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

/**
* An asynchronous input stream supporting zero buffer copying.
*/
public interface DataStreamInput extends Closeable {
/**
* Read the next chunk in the stream asynchronously.
* The caller owns the returned {@link DataStreamReply} which is a {@link ReferenceCountedObject}.
* It must call {@link ReferenceCountedObject#release()} after consuming it.
*
* @return a future of the reference-counted reply.
*/
CompletableFuture<ReferenceCountedObject<DataStreamReply>> readAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamInput;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
Expand Down Expand Up @@ -265,6 +266,12 @@ public DataStreamOutputRpc stream(ByteBuffer headerMessage, RoutingTable routing
.build());
}

@Override
public DataStreamInput streamReadOnly(ByteBuffer headerMessage) {
return new DataStreamInputImpl(dataStreamClientRpc,
newBuilder(headerMessage).setType(RaftClientRequest.readRequestType()).build());
}

private RaftClientRequest.Builder newBuilder(ByteBuffer headerMessage) {
final Message message = headerMessage == null ? null : Message.valueOf(headerMessage);
return RaftClientRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.client.impl;

import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.api.DataStreamInput;
import org.apache.ratis.datastream.DataStreamObserver;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.ReferenceCountedObject;

import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;

final class DataStreamInputImpl implements DataStreamInput,
DataStreamObserver<ReferenceCountedObject<DataStreamReply>> {
private final RaftClientRequest header;
private final ClientId clientId;
private final Queue<ReferenceCountedObject<DataStreamReply>> replies = new LinkedList<>();
private final Queue<CompletableFuture<ReferenceCountedObject<DataStreamReply>>> pendingReads = new LinkedList<>();

/*
* null : the stream is open.
* AlreadyClosedException: the stream is closed.
* Other exception : the stream is failed.
*/
private Throwable readException;

DataStreamInputImpl(DataStreamClientRpc dataStreamClientRpc, RaftClientRequest request) {
this.header = request;
this.clientId = request.getClientId();
final ByteBuffer buffer = ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
final DataStreamRequestHeader h = new DataStreamRequestHeader(clientId, Type.STREAM_HEADER,
header.getCallId(), 0, buffer.remaining(), StandardWriteOption.FLUSH, StandardWriteOption.CLOSE);
dataStreamClientRpc.streamAsync(new DataStreamRequestByteBuffer(h, buffer), this)
.whenComplete(asWhenCompleteBiConsumer());
}

@Override
public synchronized void onNext(ReferenceCountedObject<DataStreamReply> reply) {
if (readException != null) {
return;
}

reply.retain();
for (CompletableFuture<ReferenceCountedObject<DataStreamReply>> pending;
(pending = pendingReads.poll()) != null; ) {
if (pending.complete(reply)) {
return;
}
}
replies.add(reply);
}

@Override
public synchronized void onError(Throwable throwable) {
// An error case, release the replies
releaseReplies();
if (readException == null) {
readException = throwable;
failPendingReads();
}
}

@Override
public synchronized void onCompleted() {
// Not an error case, do not release the replies
if (readException == null) {
// No more onNext(), the pending reads cannot be completed.
readException = new EOFException(clientId + ": end of stream, request=" + header);
failPendingReads();
}
}

private void releaseReplies() {
for (ReferenceCountedObject<DataStreamReply> reply; (reply = replies.poll()) != null; ) {
reply.release();
}
}

private void failPendingReads() {
Objects.requireNonNull(readException, "readException == null");
for (CompletableFuture<ReferenceCountedObject<DataStreamReply>> p; (p = pendingReads.poll()) != null; ) {
p.completeExceptionally(readException);
}
}

@Override
public synchronized CompletableFuture<ReferenceCountedObject<DataStreamReply>> readAsync() {
final ReferenceCountedObject<DataStreamReply> reply = replies.poll();
if (reply != null) {
return CompletableFuture.completedFuture(reply);
}
if (readException != null) {
return JavaUtils.completeExceptionally(readException);
}
final CompletableFuture<ReferenceCountedObject<DataStreamReply>> f =
new CompletableFuture<>();
pendingReads.add(f);
return f;
}

@Override
public synchronized void close() {
// When close() is called the first time, we set
// (1) release all replies
// (2) set readException to AlreadyClosedException
// (3) complete all pendingReads, and
releaseReplies();
if (readException == null) {
readException = new AlreadyClosedException(clientId + ": stream already closed, request=" + header);
failPendingReads();
}
}
}
10 changes: 9 additions & 1 deletion ratis-common/dev-support/findbugsExcludeFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
<Class name="org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer$Builder" />
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
<Match>
<Class name="org.apache.ratis.datastream.impl.DataStreamReplyByteBuf$Builder" />
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
<Match>
<Class name="org.apache.ratis.protocol.GroupInfoReply" />
<Bug pattern="EI_EXPOSE_REP2" />
Expand All @@ -43,6 +47,10 @@
<Class name="org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
<Class name="org.apache.ratis.datastream.impl.DataStreamReplyByteBuf" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
<Class name="org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer" />
<Bug pattern="EI_EXPOSE_REP" />
Expand Down Expand Up @@ -111,4 +119,4 @@
<Class name="org.apache.ratis.util.TimeDuration$Abbreviation" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
</FindBugsFilter>
</FindBugsFilter>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.datastream;

import java.util.function.BiConsumer;

/** An interface similar to gRPC {@link org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver}. */
public interface DataStreamObserver<V> {
void onNext(V value);

void onCompleted();

void onError(Throwable throwable);

default <T> BiConsumer<T, Throwable> asWhenCompleteBiConsumer() {
return (v, throwable) -> {
if (throwable != null) {
onError(throwable);
} else {
onCompleted();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* <p>
* This class is immutable.
*/
class DataStreamPacketByteBuf extends DataStreamPacketImpl {
public class DataStreamPacketByteBuf extends DataStreamPacketImpl {
private final AtomicReference<ByteBuf> buf;

DataStreamPacketByteBuf(ClientId clientId, Type type, long streamId, long streamOffset, ByteBuf buf) {
Expand All @@ -48,7 +48,8 @@ final ByteBuf getBuf() {

@Override
public final long getDataLength() {
return getBuf().readableBytes();
final ByteBuf got = buf.get();
return got != null ? got.readableBytes() : -1;
}

public final ByteBuf slice() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,10 @@ static ByteBuffer copy(ByteBuf buf) {
buf.readBytes(bytes);
return ByteBuffer.wrap(bytes);
}

public static void release(DataStreamReply reply) {
if (reply instanceof DataStreamReplyByteBuf) {
((DataStreamReplyByteBuf) reply).release();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about else? Is that possible other instances are passed here? If no maybe add an assertion, otherwise handle other instances?

@peterxcli peterxcli Jun 19, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two classes implement the DataStreamReply, which is DataStreamReplyByteBuf and DataStreamReplyByteBuffer.
And only DataStreamReplyByteBuf need to release the Netty byteBuffer reference count

public final void release() {
final ByteBuf got = buf.getAndSet(null);
if (got != null) {
got.release();
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ public interface DataStreamReply extends DataStreamPacket {

/** @return the commit information when the reply is created. */
Collection<CommitInfoProto> getCommitInfos();
}
}
Loading
Loading