From 97eeb5a948c06f745be18b9f02e9577025030895 Mon Sep 17 00:00:00 2001 From: wwan13 Date: Thu, 19 Mar 2026 14:28:57 +0900 Subject: [PATCH] Add batch listener support for Redis StreamMessageListenerContainer. Signed-off-by: wwan13 --- .../redis/stream/BatchStreamListener.java | 40 +++++ ...DefaultStreamMessageListenerContainer.java | 14 +- .../redis/stream/GenericStreamListener.java | 29 +++ .../data/redis/stream/StreamListener.java | 4 +- .../StreamMessageListenerContainer.java | 52 +++++- .../data/redis/stream/StreamPollTask.java | 66 ++++++- ...sageListenerContainerIntegrationTests.java | 166 +++++++++++++++++- 7 files changed, 356 insertions(+), 15 deletions(-) create mode 100644 src/main/java/org/springframework/data/redis/stream/BatchStreamListener.java create mode 100644 src/main/java/org/springframework/data/redis/stream/GenericStreamListener.java diff --git a/src/main/java/org/springframework/data/redis/stream/BatchStreamListener.java b/src/main/java/org/springframework/data/redis/stream/BatchStreamListener.java new file mode 100644 index 0000000000..e15edcd5bf --- /dev/null +++ b/src/main/java/org/springframework/data/redis/stream/BatchStreamListener.java @@ -0,0 +1,40 @@ +/* + * Copyright 2018-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.stream; + +import java.util.List; + +import org.springframework.data.redis.connection.stream.Record; + +/** + * Listener interface to receive delivery of {@link Record records}. + * + * @author Taewan Kim + * @param Stream key and Stream field type. + * @param Stream value type. + */ +@FunctionalInterface +public interface BatchStreamListener> extends GenericStreamListener> { + + /** + * Callback invoked on receiving {@link Record records}. + * + * @param messages never {@literal null}. + */ + @Override + void onMessage(List messages); + +} diff --git a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java index 7fa607caa1..353892fc01 100644 --- a/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java @@ -52,6 +52,7 @@ * @author Christoph Strobl * @author Su Ko * @author Yumin Jung + * @author Taewan Kim * @since 2.2 */ class DefaultStreamMessageListenerContainer> implements StreamMessageListenerContainer { @@ -213,8 +214,17 @@ public Subscription register(StreamReadRequest streamRequest, StreamListener< return doRegister(getReadTask(streamRequest, listener)); } - @SuppressWarnings({ "unchecked", "rawtypes" }) - private StreamPollTask getReadTask(StreamReadRequest streamRequest, StreamListener listener) { + @Override + public Subscription registerBatch(StreamReadRequest streamRequest, BatchStreamListener listener) { + + Assert.notNull(streamRequest, "StreamReadRequest must not be null"); + Assert.notNull(listener, "BatchStreamListener must not be null"); + + return doRegister(getReadTask(streamRequest, listener)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private StreamPollTask getReadTask(StreamReadRequest streamRequest, GenericStreamListener listener) { Function> readFunction = getReadFunction(streamRequest); Function deserializerToUse = getDeserializer(); diff --git a/src/main/java/org/springframework/data/redis/stream/GenericStreamListener.java b/src/main/java/org/springframework/data/redis/stream/GenericStreamListener.java new file mode 100644 index 0000000000..c9926bed5a --- /dev/null +++ b/src/main/java/org/springframework/data/redis/stream/GenericStreamListener.java @@ -0,0 +1,29 @@ +/* + * Copyright 2018-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.stream; + +/** + * Common listener contract for stream message callbacks. + * + * @author Taewan Kim + * @param message type. + */ +@FunctionalInterface +public interface GenericStreamListener { + + void onMessage(T message); + +} diff --git a/src/main/java/org/springframework/data/redis/stream/StreamListener.java b/src/main/java/org/springframework/data/redis/stream/StreamListener.java index e0670f18a8..f5ba9c61f4 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamListener.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamListener.java @@ -21,18 +21,20 @@ * Listener interface to receive delivery of {@link Record messages}. * * @author Mark Paluch + * @author Taewan Kim * @param Stream key and Stream field type. * @param Stream value type. * @since 2.2 */ @FunctionalInterface -public interface StreamListener> { +public interface StreamListener> extends GenericStreamListener { /** * Callback invoked on receiving a {@link Record}. * * @param message never {@literal null}. */ + @Override void onMessage(V message); } diff --git a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java index bce323a88b..12f8109828 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamMessageListenerContainer.java @@ -109,6 +109,7 @@ * @author Christian Rest * @author DongCheol Kim * @author Su Ko + * @author Taewan Kim * @param Stream key and Stream field type. * @param Stream value type. * @since 2.2 @@ -228,7 +229,45 @@ default Subscription receiveAutoAck(Consumer consumer, StreamOffset streamOff return register(StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(true).build(), listener); } - /** + /** + * Register a batch listener for a Redis Stream. + * + * @param streamOffset the stream and its offset. + * @param listener the message listener. + * @return the subscription handle. + */ + default Subscription receiveBatch(StreamOffset streamOffset, BatchStreamListener listener) { + return registerBatch(StreamReadRequest.builder(streamOffset).build(), listener); + } + + /** + * Register a batch listener using a consumer group with external acknowledge. + * + * @param consumer consumer group and identity. + * @param streamOffset the stream and its offset. + * @param listener the message listener. + * @return the subscription handle. + */ + default Subscription receiveBatch(Consumer consumer, StreamOffset streamOffset, BatchStreamListener listener) { + return registerBatch(StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(false).build(), + listener); + } + + /** + * Register a batch listener with auto acknowledge. + * + * @param consumer consumer group and identity. + * @param streamOffset the stream and its offset. + * @param listener the message listener. + * @return the subscription handle. + */ + default Subscription receiveBatchAutoAck(Consumer consumer, StreamOffset streamOffset, BatchStreamListener listener) { + return registerBatch(StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(true).build(), + listener); + } + + + /** * Register a new subscription for a Redis Stream. If the container is already * {@link StreamMessageListenerContainer#isRunning() running} the {@link Subscription} will be added and started * immediately, otherwise it'll be scheduled and started once the container is actually @@ -250,7 +289,16 @@ default Subscription receiveAutoAck(Consumer consumer, StreamOffset streamOff */ Subscription register(StreamReadRequest streamRequest, StreamListener listener); - /** + /** + * Register a batch listener subscription. + * + * @param streamRequest must not be {@literal null}. + * @param listener must not be {@literal null}. + * @return the subscription handle. + */ + Subscription registerBatch(StreamReadRequest streamRequest, BatchStreamListener listener); + + /** * Unregister a given {@link Subscription} from the container. This prevents the {@link Subscription} to be restarted * in a potential {@link SmartLifecycle#stop() stop}/{@link SmartLifecycle#start() start} scenario. An * {@link Subscription#isActive() active} {@link Subscription subcription} is {@link Subscription#cancel() cancelled} diff --git a/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java b/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java index 896eea6ca6..59c8edfa4c 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java @@ -16,6 +16,7 @@ package org.springframework.data.redis.stream; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -42,11 +43,12 @@ * {@link Task} that invokes a {@link BiFunction read function} to poll on a Redis Stream. * * @author Mark Paluch + * @author Taewan Kim * @see 2.2 */ class StreamPollTask> implements Task { - private final StreamListener listener; + private final GenericStreamListener listener; private final ErrorHandler errorHandler; private final Predicate cancelSubscriptionOnError; private final Function> readFunction; @@ -57,7 +59,7 @@ class StreamPollTask> implements Task { private volatile boolean isInEventLoop = false; - StreamPollTask(StreamReadRequest streamRequest, StreamListener listener, ErrorHandler errorHandler, + StreamPollTask(StreamReadRequest streamRequest, GenericStreamListener listener, ErrorHandler errorHandler, TypeDescriptor targetType, Function> readFunction, Function deserializer) { @@ -147,22 +149,37 @@ private List readRecords() { return readFunction.apply(pollState.getCurrentReadOffset()); } - private void deserializeAndEmitRecords(List records) { + private void deserializeAndEmitRecords(List records) { - for (ByteRecord raw : records) { + if (listener instanceof StreamListener) { + emitIndividually(records); + return; + } - try { + if (listener instanceof BatchStreamListener) { + emitBatch(records); + return; + } + + throw new IllegalStateException("Unsupported listener type %s".formatted(listener.getClass().getName())); + } + + @SuppressWarnings("unchecked") + private void emitIndividually(List records) { + StreamListener streamListener = (StreamListener) listener; + + for (ByteRecord raw : records) { + try { pollState.updateReadOffset(raw.getId().getValue()); V record = convertRecord(raw); - listener.onMessage(record); + streamListener.onMessage(record); + } catch (RuntimeException ex) { if (cancelSubscriptionOnError.test(ex)) { - cancel(); errorHandler.handleError(ex); - return; } @@ -171,6 +188,39 @@ private void deserializeAndEmitRecords(List records) { } } + @SuppressWarnings("unchecked") + private void emitBatch(List records) { + + if (records.isEmpty()) { + return; + } + + BatchStreamListener batchStreamListener = (BatchStreamListener) listener; + + try { + List converted = new ArrayList<>(records.size()); + + for (ByteRecord raw : records) { + converted.add(convertRecord(raw)); + } + + batchStreamListener.onMessage(converted); + + ByteRecord last = records.get(records.size() - 1); + pollState.updateReadOffset(last.getId().getValue()); + + } catch (RuntimeException ex) { + + if (cancelSubscriptionOnError.test(ex)) { + cancel(); + errorHandler.handleError(ex); + return; + } + + errorHandler.handleError(ex); + } + } + private V convertRecord(ByteRecord record) { try { diff --git a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java index b591112ae1..0f8a2e1e60 100644 --- a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java @@ -21,12 +21,15 @@ import io.lettuce.core.output.NestedMultiOutput; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; @@ -57,6 +60,7 @@ * @author Mark Paluch * @author Christoph Strobl * @author John Blum + * @author Taewan Kim */ @EnabledOnCommand("XREAD") @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -108,7 +112,36 @@ void shouldReceiveMapMessages() throws InterruptedException { assertThat(subscription.isActive()).isFalse(); } - @Test // DATAREDIS-864 + @Test // GH-3190 + void shouldReceiveBatchMapMessages() throws InterruptedException { + + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, containerOptions); + BlockingQueue>> batches = new LinkedBlockingQueue<>(); + + container.start(); + Subscription subscription = container.receiveBatch(StreamOffset.create("my-stream", ReadOffset.from("0-0")), + batches::add); + + subscription.await(DEFAULT_TIMEOUT); + + redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value1")); + redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value2")); + redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value3")); + + Awaitility.await().atMost(DEFAULT_TIMEOUT).untilAsserted(() -> { + List>> drained = new ArrayList<>(); + batches.drainTo(drained); + + assertThat(drained).isNotEmpty(); + assertThat(drained.stream().flatMap(List::stream).map(it -> it.getValue().get("key"))) + .contains("value1", "value2", "value3"); + }); + + cancelAwait(subscription); + } + + @Test // DATAREDIS-864 void shouldReceiveSimpleObjectHashRecords() throws InterruptedException { StreamMessageListenerContainerOptions> containerOptions = StreamMessageListenerContainerOptions @@ -183,6 +216,33 @@ void shouldReceiveMessagesInConsumerGroup() throws InterruptedException { cancelAwait(subscription); } + @Test // GH-3190 + void shouldReceiveBatchMessagesInConsumerGroup() throws InterruptedException { + + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, containerOptions); + BlockingQueue>> batches = new LinkedBlockingQueue<>(); + RecordId messageId = redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value1")); + redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from(messageId), "my-group"); + + container.start(); + Subscription subscription = container.receiveBatch(Consumer.from("my-group", "my-consumer"), + StreamOffset.create("my-stream", ReadOffset.lastConsumed()), batches::add); + + subscription.await(DEFAULT_TIMEOUT); + + redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value2")); + + List> messageBatch = batches.poll(1, TimeUnit.SECONDS); + assertThat(messageBatch).isNotNull(); + assertThat(messageBatch).isNotEmpty(); + assertThat(messageBatch.stream().map(it -> it.getValue().get("key"))).contains("value2"); + + assertThat(getNumberOfPending("my-stream", "my-group")).isOne(); + + cancelAwait(subscription); + } + @Test // DATAREDIS-1079 void shouldReceiveAndAckMessagesInConsumerGroup() throws InterruptedException { @@ -209,6 +269,33 @@ void shouldReceiveAndAckMessagesInConsumerGroup() throws InterruptedException { cancelAwait(subscription); } + @Test // GH-3190 + void shouldReceiveAndAutoAckBatchMessagesInConsumerGroup() throws InterruptedException { + + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, containerOptions); + BlockingQueue>> batches = new LinkedBlockingQueue<>(); + RecordId messageId = redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value1")); + redisTemplate.opsForStream().createGroup("my-stream", ReadOffset.from(messageId), "my-group"); + + container.start(); + Subscription subscription = container.receiveBatchAutoAck(Consumer.from("my-group", "my-consumer"), + StreamOffset.create("my-stream", ReadOffset.lastConsumed()), batches::add); + + subscription.await(DEFAULT_TIMEOUT); + + redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value2")); + + List> messageBatch = batches.poll(1, TimeUnit.SECONDS); + assertThat(messageBatch).isNotNull(); + assertThat(messageBatch).isNotEmpty(); + assertThat(messageBatch.stream().map(it -> it.getValue().get("key"))).contains("value2"); + + assertThat(getNumberOfPending("my-stream", "my-group")).isZero(); + + cancelAwait(subscription); + } + @Test // DATAREDIS-864 void shouldUseCustomErrorHandler() throws InterruptedException { @@ -339,7 +426,82 @@ void deserializationShouldContinueStreamRead() throws InterruptedException { cancelAwait(subscription); } - @Test // DATAREDIS-864 + @Test // GH-3190 + void emptyBatchPollShouldNotFailOrInvokeListener() throws InterruptedException { + + BlockingQueue failures = new LinkedBlockingQueue<>(); + AtomicInteger callbackInvocations = new AtomicInteger(); + + StreamMessageListenerContainerOptions> options = StreamMessageListenerContainerOptions + .builder().errorHandler(failures::add).pollTimeout(Duration.ofMillis(100)).build(); + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, options); + + container.start(); + Subscription subscription = container.receiveBatch(StreamOffset.create("my-stream", ReadOffset.from("0-0")), + messages -> callbackInvocations.incrementAndGet()); + + subscription.await(DEFAULT_TIMEOUT); + + Thread.sleep(250); + + assertThat(callbackInvocations).hasValue(0); + assertThat(failures).isEmpty(); + assertThat(subscription.isActive()).isTrue(); + + cancelAwait(subscription); + } + + @Test // GH-3190 + void batchListenerFailureShouldNotAdvanceOffsetWhenContinuingOnError() throws InterruptedException { + + BlockingQueue failures = new LinkedBlockingQueue<>(); + BlockingQueue>> received = new LinkedBlockingQueue<>(); + AtomicBoolean failOnce = new AtomicBoolean(true); + AtomicInteger callbackInvocations = new AtomicInteger(); + + StreamMessageListenerContainerOptions> options = StreamMessageListenerContainerOptions + .builder().batchSize(3).pollTimeout(Duration.ofMillis(100)).build(); + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, options); + + StreamReadRequest readRequest = StreamReadRequest + .builder(StreamOffset.create("my-stream", ReadOffset.from("0-0"))) // + .errorHandler(failures::add) // + .cancelOnError(it -> false) // + .build(); + + redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value1")); + redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value2")); + redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value3")); + + container.start(); + Subscription subscription = container.registerBatch(readRequest, messages -> { + + callbackInvocations.incrementAndGet(); + + if (failOnce.compareAndSet(true, false)) { + throw new RuntimeException("boom"); + } + + received.add(messages); + }); + + subscription.await(DEFAULT_TIMEOUT); + + List> delivered = received.poll(2, TimeUnit.SECONDS); + assertThat(delivered).isNotNull(); + assertThat(delivered).hasSize(3); + assertThat(delivered.stream().map(it -> it.getValue().get("key"))) + .containsExactly("value1", "value2", "value3"); + assertThat(callbackInvocations.get()).isGreaterThanOrEqualTo(2); + assertThat(failures).isNotEmpty(); + assertThat(subscription.isActive()).isTrue(); + + cancelAwait(subscription); + } + + @Test // DATAREDIS-864 void cancelledStreamShouldNotReceiveMessages() throws InterruptedException { StreamMessageListenerContainer> container = StreamMessageListenerContainer