Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2018-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.stream;

import java.util.List;

import org.springframework.data.redis.connection.stream.Record;

/**
* Listener interface to receive delivery of {@link Record records}.
*
* @author Taewan Kim
* @param <K> Stream key and Stream field type.
* @param <V> Stream value type.
*/
@FunctionalInterface
public interface BatchStreamListener<K, V extends Record<K, ?>> extends GenericStreamListener<List<V>> {

/**
* Callback invoked on receiving {@link Record records}.
*
* @param messages never {@literal null}.
*/
@Override
void onMessage(List<V> messages);

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* @author Christoph Strobl
* @author Su Ko
* @author Yumin Jung
* @author Taewan Kim
* @since 2.2
*/
class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implements StreamMessageListenerContainer<K, V> {
Expand Down Expand Up @@ -213,8 +214,17 @@ public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<
return doRegister(getReadTask(streamRequest, listener));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private StreamPollTask<K, V> getReadTask(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
@Override
public Subscription registerBatch(StreamReadRequest<K> streamRequest, BatchStreamListener<K, V> listener) {

Assert.notNull(streamRequest, "StreamReadRequest must not be null");
Assert.notNull(listener, "BatchStreamListener must not be null");

return doRegister(getReadTask(streamRequest, listener));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private StreamPollTask<K, V> getReadTask(StreamReadRequest<K> streamRequest, GenericStreamListener<?> listener) {

Function<ReadOffset, List<ByteRecord>> readFunction = getReadFunction(streamRequest);
Function<ByteRecord, V> deserializerToUse = getDeserializer();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2018-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.stream;

/**
* Common listener contract for stream message callbacks.
*
* @author Taewan Kim
* @param <T> message type.
*/
@FunctionalInterface
public interface GenericStreamListener<T> {

void onMessage(T message);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@
* Listener interface to receive delivery of {@link Record messages}.
*
* @author Mark Paluch
* @author Taewan Kim
* @param <K> Stream key and Stream field type.
* @param <V> Stream value type.
* @since 2.2
*/
@FunctionalInterface
public interface StreamListener<K, V extends Record<K, ?>> {
public interface StreamListener<K, V extends Record<K, ?>> extends GenericStreamListener<V> {

/**
* Callback invoked on receiving a {@link Record}.
*
* @param message never {@literal null}.
*/
@Override
void onMessage(V message);

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
* @author Christian Rest
* @author DongCheol Kim
* @author Su Ko
* @author Taewan Kim
* @param <K> Stream key and Stream field type.
* @param <V> Stream value type.
* @since 2.2
Expand Down Expand Up @@ -228,7 +229,45 @@ default Subscription receiveAutoAck(Consumer consumer, StreamOffset<K> streamOff
return register(StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(true).build(), listener);
}

/**
/**
* Register a batch listener for a Redis Stream.
*
* @param streamOffset the stream and its offset.
* @param listener the message listener.
* @return the subscription handle.
*/
default Subscription receiveBatch(StreamOffset<K> streamOffset, BatchStreamListener<K, V> listener) {
return registerBatch(StreamReadRequest.builder(streamOffset).build(), listener);
}

/**
* Register a batch listener using a consumer group with external acknowledge.
*
* @param consumer consumer group and identity.
* @param streamOffset the stream and its offset.
* @param listener the message listener.
* @return the subscription handle.
*/
default Subscription receiveBatch(Consumer consumer, StreamOffset<K> streamOffset, BatchStreamListener<K, V> listener) {
return registerBatch(StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(false).build(),
listener);
}

/**
* Register a batch listener with auto acknowledge.
*
* @param consumer consumer group and identity.
* @param streamOffset the stream and its offset.
* @param listener the message listener.
* @return the subscription handle.
*/
default Subscription receiveBatchAutoAck(Consumer consumer, StreamOffset<K> streamOffset, BatchStreamListener<K, V> listener) {
return registerBatch(StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(true).build(),
listener);
}


/**
* Register a new subscription for a Redis Stream. If the container is already
* {@link StreamMessageListenerContainer#isRunning() running} the {@link Subscription} will be added and started
* immediately, otherwise it'll be scheduled and started once the container is actually
Expand All @@ -250,7 +289,16 @@ default Subscription receiveAutoAck(Consumer consumer, StreamOffset<K> streamOff
*/
Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener);

/**
/**
* Register a batch listener subscription.
*
* @param streamRequest must not be {@literal null}.
* @param listener must not be {@literal null}.
* @return the subscription handle.
*/
Subscription registerBatch(StreamReadRequest<K> streamRequest, BatchStreamListener<K, V> listener);

/**
* Unregister a given {@link Subscription} from the container. This prevents the {@link Subscription} to be restarted
* in a potential {@link SmartLifecycle#stop() stop}/{@link SmartLifecycle#start() start} scenario. An
* {@link Subscription#isActive() active} {@link Subscription subcription} is {@link Subscription#cancel() cancelled}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.data.redis.stream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand All @@ -42,11 +43,12 @@
* {@link Task} that invokes a {@link BiFunction read function} to poll on a Redis Stream.
*
* @author Mark Paluch
* @author Taewan Kim
* @see 2.2
*/
class StreamPollTask<K, V extends Record<K, ?>> implements Task {

private final StreamListener<K, V> listener;
private final GenericStreamListener<?> listener;
private final ErrorHandler errorHandler;
private final Predicate<Throwable> cancelSubscriptionOnError;
private final Function<ReadOffset, List<ByteRecord>> readFunction;
Expand All @@ -57,7 +59,7 @@ class StreamPollTask<K, V extends Record<K, ?>> implements Task {

private volatile boolean isInEventLoop = false;

StreamPollTask(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener, ErrorHandler errorHandler,
StreamPollTask(StreamReadRequest<K> streamRequest, GenericStreamListener<?> listener, ErrorHandler errorHandler,
TypeDescriptor targetType, Function<ReadOffset, List<ByteRecord>> readFunction,
Function<ByteRecord, V> deserializer) {

Expand Down Expand Up @@ -147,22 +149,37 @@ private List<ByteRecord> readRecords() {
return readFunction.apply(pollState.getCurrentReadOffset());
}

private void deserializeAndEmitRecords(List<ByteRecord> records) {
private void deserializeAndEmitRecords(List<ByteRecord> records) {

for (ByteRecord raw : records) {
if (listener instanceof StreamListener<?, ?>) {
emitIndividually(records);
return;
}

try {
if (listener instanceof BatchStreamListener<?, ?>) {
emitBatch(records);
return;
}

throw new IllegalStateException("Unsupported listener type %s".formatted(listener.getClass().getName()));
}

@SuppressWarnings("unchecked")
private void emitIndividually(List<ByteRecord> records) {

StreamListener<K, V> streamListener = (StreamListener<K, V>) listener;

for (ByteRecord raw : records) {
try {
pollState.updateReadOffset(raw.getId().getValue());
V record = convertRecord(raw);
listener.onMessage(record);
streamListener.onMessage(record);

} catch (RuntimeException ex) {

if (cancelSubscriptionOnError.test(ex)) {

cancel();
errorHandler.handleError(ex);

return;
}

Expand All @@ -171,6 +188,39 @@ private void deserializeAndEmitRecords(List<ByteRecord> records) {
}
}

@SuppressWarnings("unchecked")
private void emitBatch(List<ByteRecord> records) {

if (records.isEmpty()) {
return;
}

BatchStreamListener<K, V> batchStreamListener = (BatchStreamListener<K, V>) listener;

try {
List<V> converted = new ArrayList<>(records.size());

for (ByteRecord raw : records) {
converted.add(convertRecord(raw));
}

batchStreamListener.onMessage(converted);

ByteRecord last = records.get(records.size() - 1);
pollState.updateReadOffset(last.getId().getValue());

} catch (RuntimeException ex) {

if (cancelSubscriptionOnError.test(ex)) {
cancel();
errorHandler.handleError(ex);
return;
}

errorHandler.handleError(ex);
}
}

private V convertRecord(ByteRecord record) {

try {
Expand Down
Loading
Loading