Skip to content

Commit 9099a94

Browse files
committed
Add remote step execution support
Resolves #5024
1 parent 3e327eb commit 9099a94

File tree

9 files changed

+528
-0
lines changed

9 files changed

+528
-0
lines changed
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.integration.remote;
17+
18+
import org.springframework.batch.core.job.JobExecution;
19+
import org.springframework.batch.core.step.StepExecution;
20+
import org.springframework.batch.core.repository.JobRepository;
21+
import org.springframework.batch.core.step.AbstractStep;
22+
import org.springframework.batch.infrastructure.poller.DirectPoller;
23+
import org.springframework.batch.infrastructure.poller.Poller;
24+
import org.springframework.batch.integration.partition.StepExecutionRequest;
25+
import org.springframework.integration.core.MessagingTemplate;
26+
import org.springframework.messaging.MessageChannel;
27+
28+
import java.util.concurrent.Callable;
29+
import java.util.concurrent.Future;
30+
import java.util.concurrent.TimeUnit;
31+
32+
import org.apache.commons.logging.Log;
33+
import org.apache.commons.logging.LogFactory;
34+
35+
/**
36+
* A {@link org.springframework.batch.core.step.Step} implementation that delegates the
37+
* execution to a remote worker step through messaging.
38+
* <p>
39+
* The remote worker step must be listening to the same message channel to receive step
40+
* execution requests.
41+
* <p>
42+
* The step execution is created locally and sent to the remote worker step which will
43+
* update its status and context in the job repository. The {@link RemoteStep} will poll
44+
* the job repository to check for step completion.
45+
*
46+
* @author Mahmoud Ben Hassine
47+
* @since 6.0.0
48+
*/
49+
public class RemoteStep extends AbstractStep {
50+
51+
private static final Log logger = LogFactory.getLog(RemoteStep.class.getName());
52+
53+
private final String remoteStepName;
54+
55+
private final MessagingTemplate messagingTemplate;
56+
57+
private MessageChannel messageChannel;
58+
59+
private long pollInterval = 10000;
60+
61+
private long timeout = -1;
62+
63+
/**
64+
* Create a new {@link RemoteStep} instance.
65+
* @param name the name of this step
66+
* @param remoteStepName the name of the remote worker step to execute
67+
* @param jobRepository the job repository to use
68+
* @param messagingTemplate the messaging template to use to send step execution
69+
* requests
70+
*/
71+
public RemoteStep(String name, String remoteStepName, JobRepository jobRepository,
72+
MessagingTemplate messagingTemplate) {
73+
super(jobRepository);
74+
setName(name);
75+
this.remoteStepName = remoteStepName;
76+
this.messagingTemplate = messagingTemplate;
77+
this.messageChannel = this.messagingTemplate.getDefaultDestination();
78+
}
79+
80+
public void setTimeout(long timeout) {
81+
this.timeout = timeout;
82+
}
83+
84+
public void setPollInterval(long pollInterval) {
85+
this.pollInterval = pollInterval;
86+
}
87+
88+
public void setMessageChannel(MessageChannel messageChannel) {
89+
this.messageChannel = messageChannel;
90+
}
91+
92+
@Override
93+
protected void doExecute(StepExecution stepExecution) throws Exception {
94+
// create a step execution for the remote worker step
95+
JobExecution jobExecution = stepExecution.getJobExecution();
96+
StepExecution workerStepExecution = getJobRepository().createStepExecution(this.remoteStepName, jobExecution);
97+
98+
// pass the same context to the remote worker step
99+
workerStepExecution.setExecutionContext(stepExecution.getExecutionContext());
100+
getJobRepository().update(workerStepExecution);
101+
getJobRepository().updateExecutionContext(workerStepExecution);
102+
103+
// send step execution request and wait for the remote step to finish
104+
StepExecutionRequest stepExecutionRequest = new StepExecutionRequest(this.remoteStepName, jobExecution.getId(),
105+
workerStepExecution.getId());
106+
this.messagingTemplate.convertAndSend(this.messageChannel, stepExecutionRequest);
107+
StepExecution updatedWorkerExecution = pollRemoteStep(workerStepExecution);
108+
109+
// upgrade status and context based on the remote step
110+
stepExecution.setExecutionContext(updatedWorkerExecution.getExecutionContext());
111+
stepExecution.upgradeStatus(updatedWorkerExecution.getStatus());
112+
stepExecution.setExitStatus(updatedWorkerExecution.getExitStatus());
113+
}
114+
115+
private StepExecution pollRemoteStep(StepExecution workerStepExecution) throws Exception {
116+
Poller<StepExecution> poller = new DirectPoller<>(this.pollInterval);
117+
Callable<StepExecution> callable = () -> {
118+
StepExecution updatedExecution = getJobRepository().getStepExecution(workerStepExecution.getId());
119+
if (updatedExecution != null && updatedExecution.getStatus().isRunning()) {
120+
logger.info("Waiting for remote step to finish");
121+
return null;
122+
}
123+
else {
124+
return updatedExecution;
125+
}
126+
};
127+
Future<StepExecution> executionFuture = poller.poll(callable);
128+
if (this.timeout >= 0) {
129+
return executionFuture.get(this.timeout, TimeUnit.MILLISECONDS);
130+
}
131+
else {
132+
return executionFuture.get();
133+
}
134+
}
135+
136+
}

spring-batch-samples/README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,31 @@ $>docker run --name mongodb --rm -d -p 27017:27017 mongo
617617
Once MongoDB is up and running, run the `org.springframework.batch.samples.mongodb.MongoDBSampleApp`
618618
class without any argument to start the sample.
619619

620+
### Remote step sample
621+
622+
This sample shows how to configure and run a remote step using Spring Integration. The sample consists of a manager application
623+
that launches a job with a remote step, and a worker application that executes the remote step.
624+
625+
First, you need to start the shared job repository database:
626+
627+
```
628+
$>cd spring-batch-samples/src/main/resources/org/springframework/batch/samples/remotestep
629+
$>docker-compose up -d
630+
```
631+
632+
Then, you need to start the worker application. You can do this by running the `org.springframework.batch.samples.remotestep.WorkerConfiguration` class without any argument.
633+
634+
Once the worker is up and running, you can start the manager application by running the `org.springframework.batch.samples.remotestep.ManagerConfiguration` class without any argument.
635+
636+
You should see the manager application waiting for the worker step to finish, and the worker application processing the remote step.
637+
638+
Once the remote step is finished, the manager application will complete the job.
639+
640+
You can stop the docker container running the database by executing:
641+
642+
```
643+
$>docker-compose down
644+
```
620645
### PetClinic sample
621646

622647
This sample uses the [PetClinic Spring application](https://github.com/spring-projects/spring-petclinic) to show how to use

spring-batch-samples/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,11 @@
202202
<artifactId>mongodb-driver-sync</artifactId>
203203
<version>${mongodb-driver.version}</version>
204204
</dependency>
205+
<dependency>
206+
<groupId>org.postgresql</groupId>
207+
<artifactId>postgresql</artifactId>
208+
<version>${postgresql.version}</version>
209+
</dependency>
205210
<dependency>
206211
<groupId>io.prometheus</groupId>
207212
<artifactId>prometheus-metrics-exporter-pushgateway</artifactId>
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.samples.remotestep;
17+
18+
import javax.sql.DataSource;
19+
import jakarta.jms.JMSException;
20+
21+
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
22+
import org.postgresql.ds.PGSimpleDataSource;
23+
24+
import org.springframework.beans.factory.annotation.Value;
25+
import org.springframework.context.annotation.Bean;
26+
import org.springframework.context.annotation.Configuration;
27+
import org.springframework.context.annotation.PropertySource;
28+
import org.springframework.core.env.Environment;
29+
import org.springframework.jdbc.support.JdbcTransactionManager;
30+
31+
@Configuration
32+
@PropertySource("classpath:org/springframework/batch/samples/remotestep/remote-step-sample.properties")
33+
public class InfrastructureConfiguration {
34+
35+
/*
36+
* Data source configuration
37+
*/
38+
@Bean
39+
public DataSource dataSource(Environment environment) {
40+
PGSimpleDataSource dataSource = new PGSimpleDataSource();
41+
dataSource.setUrl(environment.getProperty("spring.datasource.url"));
42+
dataSource.setUser(environment.getProperty("spring.datasource.username"));
43+
dataSource.setPassword(environment.getProperty("spring.datasource.password"));
44+
return dataSource;
45+
}
46+
47+
@Bean
48+
public JdbcTransactionManager transactionManager(DataSource dataSource) {
49+
return new JdbcTransactionManager(dataSource);
50+
}
51+
52+
/*
53+
* Broker configuration
54+
*/
55+
@Bean
56+
public ActiveMQConnectionFactory connectionFactory(@Value("${broker.url}") String brokerUrl) throws JMSException {
57+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
58+
connectionFactory.setBrokerURL(brokerUrl);
59+
return connectionFactory;
60+
}
61+
62+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.samples.remotestep;
17+
18+
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
19+
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
20+
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
21+
22+
import org.springframework.batch.core.configuration.annotation.EnableJdbcJobRepository;
23+
import org.springframework.batch.core.job.Job;
24+
import org.springframework.batch.core.job.JobExecution;
25+
import org.springframework.batch.core.job.parameters.JobParameters;
26+
import org.springframework.batch.core.launch.JobOperator;
27+
import org.springframework.batch.core.step.Step;
28+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
29+
import org.springframework.batch.core.job.builder.JobBuilder;
30+
import org.springframework.batch.core.repository.JobRepository;
31+
import org.springframework.batch.integration.remote.RemoteStep;
32+
import org.springframework.beans.factory.BeanFactory;
33+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.context.annotation.Import;
37+
import org.springframework.integration.channel.DirectChannel;
38+
import org.springframework.integration.config.EnableIntegration;
39+
import org.springframework.integration.core.MessagingTemplate;
40+
import org.springframework.integration.dsl.IntegrationFlow;
41+
import org.springframework.integration.dsl.StandardIntegrationFlow;
42+
import org.springframework.integration.jms.dsl.Jms;
43+
import org.springframework.messaging.MessageChannel;
44+
45+
@Configuration
46+
@EnableBatchProcessing
47+
@EnableJdbcJobRepository
48+
@EnableIntegration
49+
@Import(value = { InfrastructureConfiguration.class })
50+
public class ManagerConfiguration {
51+
52+
/*
53+
* Configure outbound flow (requests going to workers)
54+
*/
55+
@Bean
56+
public DirectChannel requests() {
57+
return new DirectChannel();
58+
}
59+
60+
@Bean
61+
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory, BeanFactory beanFactory) {
62+
StandardIntegrationFlow integrationFlow = IntegrationFlow.from(requests())
63+
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
64+
.get();
65+
return integrationFlow;
66+
}
67+
68+
@Bean
69+
public MessagingTemplate messagingTemplate(MessageChannel requests) {
70+
MessagingTemplate messagingTemplate = new MessagingTemplate();
71+
messagingTemplate.setDefaultDestination(requests);
72+
messagingTemplate.setReceiveTimeout(60000);
73+
messagingTemplate.setSendTimeout(10000);
74+
return messagingTemplate;
75+
}
76+
77+
@Bean
78+
public Step step(MessagingTemplate messagingTemplate, JobRepository jobRepository) {
79+
return new RemoteStep("step", "workerStep", jobRepository, messagingTemplate);
80+
}
81+
82+
@Bean
83+
public Job job(JobRepository jobRepository, Step step) {
84+
return new JobBuilder("job", jobRepository).start(step).build();
85+
}
86+
87+
public static void main(String[] args) throws Exception {
88+
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ManagerConfiguration.class);
89+
org.apache.activemq.artemis.core.config.Configuration configuration = new ConfigurationImpl()
90+
.addAcceptorConfiguration("jms", "tcp://localhost:61617")
91+
.setPersistenceEnabled(false)
92+
.setSecurityEnabled(false)
93+
.setJMXManagementEnabled(false)
94+
.setJournalDatasync(false);
95+
96+
EmbeddedActiveMQ brokerService = new EmbeddedActiveMQ().setConfiguration(configuration).start();
97+
StandardIntegrationFlow integrationFlow = context.getBean(StandardIntegrationFlow.class);
98+
integrationFlow.start();
99+
100+
JobOperator jobOperator = context.getBean(JobOperator.class);
101+
Job job = context.getBean(Job.class);
102+
JobExecution jobExecution = jobOperator.start(job, new JobParameters());
103+
System.out.println("jobExecution = " + jobExecution);
104+
integrationFlow.stop();
105+
brokerService.stop();
106+
}
107+
108+
}

0 commit comments

Comments
 (0)