Skip to content

Commit bb7586b

Browse files
Merge pull request #1363 from ricardozanini/fix-forexecutor-async
Fix #1354 - Isolate variables in ForExecutor to avoid racing condition to overwrite loop variables
2 parents 861ca31 + 67a1689 commit bb7586b

7 files changed

Lines changed: 73 additions & 39 deletions

File tree

experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
2626
import io.serverlessworkflow.impl.WorkflowApplication;
2727
import io.serverlessworkflow.impl.WorkflowModel;
28-
import io.serverlessworkflow.impl.events.InMemoryEvents;
28+
import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener;
2929
import java.time.Duration;
3030
import java.util.Collection;
3131
import java.util.List;
@@ -35,13 +35,13 @@
3535

3636
public class ForEachFuncTest {
3737

38-
private static record Order(String id) {}
38+
private record Order(String id) {}
3939

40-
private static record EnhancedOrder(String id, int salary) {}
40+
private record EnhancedOrder(String id, int salary) {}
4141

42-
private static record OrdersPayload(List<Order> orders) {}
42+
private record OrdersPayload(List<Order> orders) {}
4343

44-
private static record OrderName(String id, String name) {}
44+
private record OrderName(String id, String name) {}
4545

4646
@Test
4747
void testForEachIteration() {
@@ -75,13 +75,14 @@ void testForEachEmit() {
7575
.build();
7676

7777
List<CloudEvent> publishedEvents = new CopyOnWriteArrayList<>();
78-
InMemoryEvents eventBroker = new InMemoryEvents();
78+
LaggedInMemoryEvents eventBroker = new LaggedInMemoryEvents();
7979
eventBroker.register(eventType, ce -> publishedEvents.add(ce));
8080

8181
try (WorkflowApplication app =
8282
WorkflowApplication.builder()
8383
.withEventConsumer(eventBroker)
8484
.withEventPublisher(eventBroker)
85+
.withListener(new TraceExecutionListener())
8586
.build()) {
8687
app.workflowDefinition(workflow)
8788
.instance(
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.fluent.test;
17+
18+
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.events.InMemoryEvents;
20+
import java.util.concurrent.CompletableFuture;
21+
22+
public class LaggedInMemoryEvents extends InMemoryEvents {
23+
24+
@Override
25+
public CompletableFuture<Void> publish(CloudEvent ce) {
26+
27+
return super.publish(ce)
28+
.thenRun(
29+
() -> {
30+
try {
31+
Thread.sleep(50);
32+
} catch (InterruptedException e) {
33+
Thread.currentThread().interrupt();
34+
}
35+
});
36+
}
37+
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
*/
3131
public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher {
3232

33+
private final ExecutorServiceFactory serviceFactory;
34+
private final Map<String, Consumer<CloudEvent>> topicMap = new ConcurrentHashMap<>();
35+
private final AtomicReference<Consumer<CloudEvent>> allConsumerRef = new AtomicReference<>();
36+
3337
public InMemoryEvents() {
3438
this(new DefaultExecutorServiceFactory());
3539
}
@@ -38,12 +42,6 @@ public InMemoryEvents(ExecutorServiceFactory serviceFactory) {
3842
this.serviceFactory = serviceFactory;
3943
}
4044

41-
private ExecutorServiceFactory serviceFactory;
42-
43-
private Map<String, Consumer<CloudEvent>> topicMap = new ConcurrentHashMap<>();
44-
45-
private AtomicReference<Consumer<CloudEvent>> allConsumerRef = new AtomicReference<>();
46-
4745
@Override
4846
public void register(String topicName, Consumer<CloudEvent> consumer) {
4947
topicMap.put(topicName, consumer);

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,23 +75,30 @@ protected ForExecutor(ForExecutorBuilder builder) {
7575
@Override
7676
protected CompletableFuture<WorkflowModel> internalExecute(
7777
WorkflowContext workflow, TaskContext taskContext) {
78-
Iterator<?> iter = collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator();
79-
int i = 0;
80-
CompletableFuture<WorkflowModel> future =
81-
CompletableFuture.completedFuture(taskContext.input());
82-
while (iter.hasNext()) {
78+
return buildLoopFuture(
79+
workflow,
80+
taskContext,
81+
taskContext.input(),
82+
collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator(),
83+
-1);
84+
}
85+
86+
private CompletableFuture<WorkflowModel> buildLoopFuture(
87+
WorkflowContext workflow,
88+
TaskContext taskContext,
89+
WorkflowModel input,
90+
Iterator<?> iter,
91+
int index) {
92+
final int newIndex = index + 1;
93+
if (iter.hasNext()) {
8394
taskContext.variables().put(task.getFor().getEach(), iter.next());
84-
taskContext.variables().put(task.getFor().getAt(), i++);
85-
if (whileExpr.map(w -> w.test(workflow, taskContext, taskContext.input())).orElse(true)) {
86-
future =
87-
future.thenCompose(
88-
input ->
89-
TaskExecutorHelper.processTaskList(
90-
taskExecutor, workflow, Optional.of(taskContext), input));
91-
} else {
92-
break;
95+
taskContext.variables().put(task.getFor().getAt(), newIndex);
96+
if (whileExpr.map(w -> w.test(workflow, taskContext, input)).orElse(true)) {
97+
return TaskExecutorHelper.processTaskList(
98+
taskExecutor, workflow, Optional.of(taskContext), input)
99+
.thenCompose(output -> buildLoopFuture(workflow, taskContext, output, iter, newIndex));
93100
}
94101
}
95-
return future;
102+
return CompletableFuture.completedFuture(input);
96103
}
97104
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java renamed to impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,8 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.serverlessworkflow.impl.test;
16+
package io.serverlessworkflow.impl.lifecycle;
1717

18-
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
19-
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
20-
import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent;
21-
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
22-
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
23-
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
24-
import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent;
25-
import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent;
26-
import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent;
27-
import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent;
28-
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
2918
import org.slf4j.Logger;
3019
import org.slf4j.LoggerFactory;
3120

impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.serverlessworkflow.impl.WorkflowApplication;
2121
import io.serverlessworkflow.impl.WorkflowDefinition;
2222
import io.serverlessworkflow.impl.WorkflowInstance;
23+
import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener;
2324
import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers;
2425
import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder;
2526
import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers;

impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.serverlessworkflow.impl.WorkflowDefinition;
2323
import io.serverlessworkflow.impl.WorkflowInstance;
2424
import io.serverlessworkflow.impl.WorkflowStatus;
25+
import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener;
2526
import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers;
2627
import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder;
2728
import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers;

0 commit comments

Comments
 (0)