Skip to content

Commit a970269

Browse files
authored
Adding support for instances object (#1028)
Some callable tasks might require to share internal implementation resources per workflow instance (for example a Python embedded interpreter or a l4cj agenticscope instance) Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent b573e61 commit a970269

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.atomic.AtomicReference;
3535
import java.util.concurrent.locks.Lock;
3636
import java.util.concurrent.locks.ReentrantLock;
37+
import java.util.function.Supplier;
3738

3839
public class WorkflowMutableInstance implements WorkflowInstance {
3940

@@ -47,6 +48,8 @@ public class WorkflowMutableInstance implements WorkflowInstance {
4748
protected AtomicReference<CompletableFuture<WorkflowModel>> futureRef = new AtomicReference<>();
4849
protected Instant completedAt;
4950

51+
protected final Map<String, Object> additionalObjects = new ConcurrentHashMap<String, Object>();
52+
5053
private Lock statusLock = new ReentrantLock();
5154
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
5255

@@ -84,14 +87,18 @@ protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnabl
8487
.inputFilter()
8588
.map(f -> f.apply(workflowContext, null, input))
8689
.orElse(input))
87-
.whenComplete(this::whenFailed)
90+
.whenComplete(this::whenCompleted)
8891
.thenApply(this::whenSuccess);
8992
futureRef.set(future);
9093
return future;
9194
}
9295

93-
private void whenFailed(WorkflowModel result, Throwable ex) {
96+
private void whenCompleted(WorkflowModel result, Throwable ex) {
9497
completedAt = Instant.now();
98+
additionalObjects.values().stream()
99+
.filter(AutoCloseable.class::isInstance)
100+
.map(AutoCloseable.class::cast)
101+
.forEach(WorkflowUtils::safeClose);
95102
if (ex != null) {
96103
handleException(ex instanceof CompletionException ? ex = ex.getCause() : ex);
97104
}
@@ -278,5 +285,9 @@ public boolean cancel() {
278285
}
279286
}
280287

288+
public <T> T additionalObject(String key, Supplier<T> supplier) {
289+
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
290+
}
291+
281292
public void restoreContext(WorkflowContext workflow, TaskContext context) {}
282293
}

0 commit comments

Comments
 (0)