Skip to content

Commit a371d24

Browse files
committed
Add support for SEDA style with Spring Integration message channels
Resolves #4719
1 parent 0bcb1bf commit a371d24

File tree

4 files changed

+250
-0
lines changed

4 files changed

+250
-0
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.integration.support;
18+
19+
import org.jspecify.annotations.Nullable;
20+
21+
import org.springframework.batch.infrastructure.item.ItemReader;
22+
import org.springframework.integration.core.MessagingTemplate;
23+
import org.springframework.messaging.MessageChannel;
24+
import org.springframework.util.Assert;
25+
26+
/**
27+
* An {@link ItemReader} implementation that receives items as messages from a message
28+
* channel using a {@link MessagingTemplate}. This item reader enables SEDA (Staged
29+
* Event-Driven Architecture) patterns in Spring Batch jobs by decoupling item production
30+
* from item consumption through messaging.
31+
*
32+
* @param <T> the type of items to be read
33+
* @author Mahmoud Ben Hassine
34+
* @since 6.0.0
35+
*/
36+
public class MessageChannelItemReader<T> implements ItemReader<T> {
37+
38+
private final MessagingTemplate messagingTemplate;
39+
40+
private final Class<T> targetType;
41+
42+
private MessageChannel messageChannel;
43+
44+
/**
45+
* Create a new {@link MessageChannelItemReader} instance. Messages will be read from
46+
* the default destination of the provided {@link MessagingTemplate} which must not be
47+
* null.
48+
* @param messagingTemplate the messaging template to use for reading messages
49+
* @param targetType the target type of items to convert messages to
50+
*/
51+
public MessageChannelItemReader(MessagingTemplate messagingTemplate, Class<T> targetType) {
52+
this.targetType = targetType;
53+
this.messagingTemplate = messagingTemplate;
54+
MessageChannel defaultDestination = messagingTemplate.getDefaultDestination();
55+
Assert.notNull(defaultDestination, "MessagingTemplate must have a default destination configured");
56+
this.messageChannel = defaultDestination;
57+
}
58+
59+
/**
60+
* Create a new {@link MessageChannelItemReader} instance. Messages will be read from
61+
* the provided target message channel.
62+
* @param messagingTemplate the messaging template to use for receiving messages
63+
* @param messageChannel the message channel to read messages from
64+
* @param targetType the target type of items to convert messages to
65+
*/
66+
public MessageChannelItemReader(MessagingTemplate messagingTemplate, MessageChannel messageChannel,
67+
Class<T> targetType) {
68+
this.messagingTemplate = messagingTemplate;
69+
this.messageChannel = messageChannel;
70+
this.targetType = targetType;
71+
}
72+
73+
/**
74+
* Set the source message channel.
75+
* @param messageChannel the message channel to read messages from
76+
*/
77+
public void setMessageChannel(MessageChannel messageChannel) {
78+
this.messageChannel = messageChannel;
79+
}
80+
81+
@Override
82+
public @Nullable T read() throws Exception {
83+
return this.messagingTemplate.receiveAndConvert(this.messageChannel, this.targetType);
84+
}
85+
86+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.integration.support;
18+
19+
import java.util.function.Consumer;
20+
21+
import org.springframework.batch.infrastructure.item.Chunk;
22+
import org.springframework.batch.infrastructure.item.ItemWriter;
23+
import org.springframework.integration.core.MessagingTemplate;
24+
import org.springframework.messaging.MessageChannel;
25+
import org.springframework.messaging.support.GenericMessage;
26+
import org.springframework.util.Assert;
27+
28+
/**
29+
* An {@link ItemWriter} implementation that sends items as messages to a message channel
30+
* using a {@link MessagingTemplate}. This item writer enables SEDA (Staged Event-Driven
31+
* Architecture) patterns in Spring Batch jobs by decoupling item production from item
32+
* consumption through messaging.
33+
*
34+
* @param <T> the type of items to be written
35+
* @author Mahmoud Ben Hassine
36+
* @since 6.0.0
37+
*/
38+
public class MessageChannelItemWriter<T> implements ItemWriter<T> {
39+
40+
private final MessagingTemplate messagingTemplate;
41+
42+
private MessageChannel messageChannel;
43+
44+
/**
45+
* Create a new {@link MessageChannelItemWriter} instance. Messages will be sent to
46+
* the default destination of the provided {@link MessagingTemplate} which must not be
47+
* null.
48+
* @param messagingGateway the messaging template to use for sending messages
49+
*/
50+
public MessageChannelItemWriter(MessagingTemplate messagingGateway) {
51+
this.messagingTemplate = messagingGateway;
52+
MessageChannel defaultDestination = messagingGateway.getDefaultDestination();
53+
Assert.notNull(defaultDestination, "MessagingTemplate must have a default destination configured");
54+
this.messageChannel = defaultDestination;
55+
}
56+
57+
/**
58+
* Create a new {@link MessageChannelItemWriter} instance. Messages will be sent to
59+
* the provided message channel.
60+
* @param messagingTemplate the messaging template to use for sending messages
61+
* @param messageChannel the message channel to send messages to
62+
*/
63+
public MessageChannelItemWriter(MessagingTemplate messagingTemplate, MessageChannel messageChannel) {
64+
this.messagingTemplate = messagingTemplate;
65+
this.messageChannel = messageChannel;
66+
}
67+
68+
/**
69+
* Set the target message channel.
70+
* @param messageChannel the message channel to send messages to
71+
*/
72+
public void setMessageChannel(MessageChannel messageChannel) {
73+
this.messageChannel = messageChannel;
74+
}
75+
76+
@Override
77+
public void write(Chunk<? extends T> items) throws Exception {
78+
if (!items.isEmpty()) {
79+
items.forEach((Consumer<T>) t -> this.messagingTemplate.send(this.messageChannel, new GenericMessage<>(t)));
80+
}
81+
}
82+
83+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.springframework.batch.integration.support;
2+
3+
import org.junit.jupiter.api.Assertions;
4+
import org.junit.jupiter.api.Test;
5+
import org.mockito.Mockito;
6+
7+
import org.springframework.integration.core.MessagingTemplate;
8+
import org.springframework.messaging.MessageChannel;
9+
10+
import static org.mockito.Mockito.mock;
11+
12+
class MessageChannelItemReaderTests {
13+
14+
@Test
15+
void testRead() throws Exception {
16+
// given
17+
MessageChannel messageChannel = mock();
18+
MessagingTemplate messagingTemplate = mock();
19+
Mockito.when(messagingTemplate.receiveAndConvert(messageChannel, String.class)).thenReturn("Foo");
20+
MessageChannelItemReader<String> reader = new MessageChannelItemReader<>(messagingTemplate, messageChannel,
21+
String.class);
22+
23+
// when
24+
String item = reader.read();
25+
26+
// then
27+
Mockito.verify(messagingTemplate, Mockito.times(1)).receiveAndConvert(messageChannel, String.class);
28+
Assertions.assertEquals("Foo", item);
29+
}
30+
31+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.integration.support;
17+
18+
import org.junit.jupiter.api.Test;
19+
import org.mockito.Mockito;
20+
21+
import org.springframework.batch.infrastructure.item.Chunk;
22+
import org.springframework.integration.core.MessagingTemplate;
23+
import org.springframework.messaging.MessageChannel;
24+
25+
import static org.mockito.ArgumentMatchers.eq;
26+
import static org.mockito.Mockito.mock;
27+
28+
/**
29+
* Tests for {@link MessageChannelItemWriter}.
30+
*
31+
* @author Mahmoud Ben Hassine
32+
*/
33+
class MessageChannelItemWriterTests {
34+
35+
@Test
36+
void testWrite() throws Exception {
37+
// given
38+
MessageChannel messageChannel = mock();
39+
MessagingTemplate messagingTemplate = mock();
40+
MessageChannelItemWriter<String> itemWriter = new MessageChannelItemWriter<>(messagingTemplate, messageChannel);
41+
Chunk<String> items = Chunk.of("foo", "bar");
42+
43+
// when
44+
itemWriter.write(items);
45+
46+
// then
47+
Mockito.verify(messagingTemplate, Mockito.times(2)).send(eq(messageChannel), Mockito.any());
48+
}
49+
50+
}

0 commit comments

Comments
 (0)