Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utility class for writing Serverless Workflow definitions to various outputs and formats.
Expand All @@ -32,6 +34,8 @@
*/
public class WorkflowWriter {

private static final Logger logger = LoggerFactory.getLogger(WorkflowWriter.class);

/**
* Writes a {@link Workflow} to the given {@link OutputStream} in the specified format.
*
Expand Down Expand Up @@ -95,7 +99,9 @@ public static void writeWorkflow(Path output, Workflow workflow, WorkflowFormat
*/
public static String workflowAsString(Workflow workflow, WorkflowFormat format)
throws IOException {
return format.mapper().writeValueAsString(workflow);
String result = format.mapper().writeValueAsString(workflow);
logger.trace("Workflow is {}", result);
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package io.serverlessworkflow.fluent.func;

import io.serverlessworkflow.api.types.CallTask;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallTaskJava;
import io.serverlessworkflow.api.types.func.ContextFunction;
import io.serverlessworkflow.api.types.func.FilterFunction;
import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder;
Expand All @@ -29,12 +29,9 @@ public class FuncCallTaskBuilder extends TaskBaseBuilder<FuncCallTaskBuilder>
implements FuncTaskTransformations<FuncCallTaskBuilder>,
ConditionalTaskBuilder<FuncCallTaskBuilder> {

private CallTaskJava callTaskJava;
private CallTask callTaskJava;

FuncCallTaskBuilder() {
callTaskJava = new CallTaskJava(new CallJava() {});
super.setTask(callTaskJava.getCallJava());
}
FuncCallTaskBuilder() {}

@Override
protected FuncCallTaskBuilder self() {
Expand All @@ -51,8 +48,9 @@ public <T, V> FuncCallTaskBuilder function(Function<T, V> function, Class<T> arg

public <T, V> FuncCallTaskBuilder function(
Function<T, V> function, Class<T> argClass, Class<V> returnClass) {
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, returnClass));
super.setTask(this.callTaskJava.getCallJava());
this.callTaskJava =
new CallTask().withCallFunction(CallJava.function(function, argClass, returnClass));
super.setTask(this.callTaskJava.getCallFunction());
return this;
}

Expand All @@ -66,8 +64,9 @@ public <T, V> FuncCallTaskBuilder function(ContextFunction<T, V> function, Class

public <T, V> FuncCallTaskBuilder function(
ContextFunction<T, V> function, Class<T> argClass, Class<V> returnClass) {
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, returnClass));
super.setTask(this.callTaskJava.getCallJava());
this.callTaskJava =
new CallTask().withCallFunction(CallJava.function(function, argClass, returnClass));
super.setTask(this.callTaskJava.getCallFunction());
return this;
}

Expand All @@ -81,26 +80,27 @@ public <T, V> FuncCallTaskBuilder function(FilterFunction<T, V> function, Class<

public <T, V> FuncCallTaskBuilder function(
FilterFunction<T, V> function, Class<T> argClass, Class<V> outputClass) {
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, outputClass));
super.setTask(this.callTaskJava.getCallJava());
this.callTaskJava =
new CallTask().withCallFunction(CallJava.function(function, argClass, outputClass));
super.setTask(this.callTaskJava.getCallFunction());
return this;
}

/** Accept a side-effect Consumer; engine should pass input through unchanged. */
public <T> FuncCallTaskBuilder consumer(Consumer<T> consumer) {
this.callTaskJava = new CallTaskJava(CallJava.consumer(consumer));
super.setTask(this.callTaskJava.getCallJava());
this.callTaskJava = new CallTask().withCallFunction(CallJava.consumer(consumer));
super.setTask(this.callTaskJava.getCallFunction());
return this;
}

/** Accept a Consumer with explicit input type hint. */
public <T> FuncCallTaskBuilder consumer(Consumer<T> consumer, Class<T> argClass) {
this.callTaskJava = new CallTaskJava(CallJava.consumer(consumer, argClass));
super.setTask(this.callTaskJava.getCallJava());
this.callTaskJava = new CallTask().withCallFunction(CallJava.consumer(consumer, argClass));
super.setTask(this.callTaskJava.getCallFunction());
return this;
}

public CallTaskJava build() {
public CallTask build() {
return this.callTaskJava;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package io.serverlessworkflow.fluent.func;

import io.serverlessworkflow.api.types.CallTask;
import io.serverlessworkflow.api.types.ForTaskConfiguration;
import io.serverlessworkflow.api.types.Task;
import io.serverlessworkflow.api.types.TaskItem;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallTaskJava;
import io.serverlessworkflow.api.types.func.ForTaskFunction;
import io.serverlessworkflow.api.types.func.LoopFunction;
import io.serverlessworkflow.api.types.func.LoopPredicate;
Expand Down Expand Up @@ -84,9 +84,10 @@ public <T, V, R> FuncForTaskBuilder tasks(String name, LoopFunction<T, V, R> fun
name,
new Task()
.withCallTask(
new CallTaskJava(
CallJava.loopFunction(
function, this.forTaskFunction.getFor().getEach())))));
new CallTask()
.withCallFunction(
CallJava.loopFunction(
function, this.forTaskFunction.getFor().getEach())))));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package io.serverlessworkflow.fluent.func;

import io.serverlessworkflow.api.types.CallTask;
import io.serverlessworkflow.api.types.Task;
import io.serverlessworkflow.api.types.TaskItem;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallTaskJava;
import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder;
import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations;
import io.serverlessworkflow.fluent.spec.AbstractForkTaskBuilder;
Expand Down Expand Up @@ -61,7 +61,8 @@ public <T, V> FuncForkTaskBuilder branch(
this.defaultBranchName(name, this.currentOffset()),
new Task()
.withCallTask(
new CallTaskJava(CallJava.function(function, argParam, returnClass)))));
new CallTask()
.withCallFunction(CallJava.function(function, argParam, returnClass)))));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.serverlessworkflow.fluent.spec.WaitTaskBuilder;
import io.serverlessworkflow.fluent.spec.WorkflowTaskBuilder;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

public class FuncTaskItemListBuilder extends BaseTaskItemListBuilder<FuncTaskItemListBuilder>
Expand Down Expand Up @@ -77,6 +78,10 @@ public FuncTaskItemListBuilder set(String name, String expr) {
return this.set(name, s -> s.expr(expr));
}

public FuncTaskItemListBuilder set(String name, Map<String, Object> map) {
return this.set(name, s -> s.expr(map));
}

@Override
public FuncTaskItemListBuilder emit(String name, Consumer<FuncEmitTaskBuilder> itemsConfigurer) {
name = this.defaultNameAndRequireConfig(name, itemsConfigurer, TYPE_EMIT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,7 @@ public static <T, V> FuncTaskConfigurer forEach(
}

public static <T, V> FuncTaskConfigurer forEachItem(
SerializableFunction<T, Collection<V>> collection, Function<V, ?> function) {
SerializableFunction<T, Collection<V>> collection, SerializableFunction<V, ?> function) {
return forEachItem(null, collection, function);
}

Expand Down
36 changes: 36 additions & 0 deletions experimental/fluent/jackson/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-experimental-fluent</artifactId>
<version>8.0.0-SNAPSHOT</version>
</parent>
<artifactId>serverlessworkflow-experimental-fluent-serialization-jackson</artifactId>
<name>Serverless Workflow :: Experimental :: Fluent :: Serialization:: Jackson</name>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</dependency>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-experimental-types</artifactId>
</dependency>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-serialization</artifactId>
</dependency>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl-json</artifactId>
</dependency>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-api</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.fluent.func.serialization.jackson;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.ResolvableDeserializer;
import io.serverlessworkflow.api.types.CallFunction;
import io.serverlessworkflow.api.types.FunctionArguments;
import io.serverlessworkflow.api.types.func.CallJava;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Map;

public class CallJavaDeserializer<T extends CallFunction> extends JsonDeserializer<T>
implements ResolvableDeserializer {

private JsonDeserializer<?> defaultDeserializer;

public CallJavaDeserializer(JsonDeserializer<?> defaultDeserializer) {
this.defaultDeserializer = defaultDeserializer;
}

@Override
public T deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
CallFunction original = (CallFunction) defaultDeserializer.deserialize(p, ctxt);
if (original.getCall().equals(CallJava.JAVA_CALL_KEY)) {
FunctionArguments args = original.getWith();
if (args != null) {
Map<String, Object> props = args.getAdditionalProperties();
try {
return (T)
CallJava.fromFunctionProperties(
props,
unmarshall((ObjectMapper) p.getCodec(), props.get(CallJava.FUNCTION_NAME_KEY)));
} catch (ReflectiveOperationException e) {
throw new IOException("Error unmarshalling java call with args " + args, e);
}
}
}
return (T) original;
}

private SerializedLambda unmarshall(ObjectMapper mapper, Object funcNode) throws IOException {
if (funcNode instanceof Map) {
return mapper.convertValue(funcNode, SerializedLambda.class);
} else {
throw new IOException(
"Expecting function object to be a Map, but it was "
+ funcNode
+ " Please check if you are using serializable lambdas in your workflow definition");
}
}

@Override
public void resolve(DeserializationContext ctxt) throws JsonMappingException {
((ResolvableDeserializer) defaultDeserializer).resolve(ctxt);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.fluent.func.serialization.jackson;

import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.SerializationConfig;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter;
import com.fasterxml.jackson.databind.ser.BeanSerializerModifier;
import io.serverlessworkflow.api.reflection.func.SerializableConsumer;
import io.serverlessworkflow.api.reflection.func.SerializableFunction;
import io.serverlessworkflow.api.reflection.func.SerializablePredicate;
import io.serverlessworkflow.api.types.CallFunction;
import io.serverlessworkflow.api.types.func.ContextFunction;
import io.serverlessworkflow.api.types.func.FilterFunction;
import io.serverlessworkflow.api.types.func.LoopFunction;
import io.serverlessworkflow.api.types.func.LoopFunctionIndex;
import io.serverlessworkflow.api.types.func.LoopPredicate;
import io.serverlessworkflow.api.types.func.LoopPredicateIndex;
import io.serverlessworkflow.api.types.func.LoopPredicateIndexContext;
import io.serverlessworkflow.api.types.func.LoopPredicateIndexFilter;
import java.lang.invoke.SerializedLambda;
import java.util.List;

public class FuncJacksonModule extends SimpleModule {

private static final long serialVersionUID = 1L;

public void setupModule(com.fasterxml.jackson.databind.Module.SetupContext context) {
SerializableFunctionSerializer serializer = new SerializableFunctionSerializer();
super.addSerializer(SerializableFunction.class, serializer);
super.addSerializer(SerializablePredicate.class, serializer);
super.addSerializer(SerializableConsumer.class, serializer);
super.addSerializer(ContextFunction.class, serializer);
super.addSerializer(FilterFunction.class, serializer);
super.addSerializer(LoopFunction.class, serializer);
super.addSerializer(LoopFunctionIndex.class, serializer);
super.addSerializer(LoopPredicate.class, serializer);
super.addSerializer(LoopPredicateIndex.class, serializer);
super.addSerializer(LoopPredicateIndexContext.class, serializer);
super.addSerializer(LoopPredicateIndexFilter.class, serializer);
super.addDeserializer(SerializedLambda.class, new SerializedLambdaDeserializer());
super.setDeserializerModifier(
new BeanDeserializerModifier() {
@Override
public JsonDeserializer<?> modifyDeserializer(
DeserializationConfig config,
BeanDescription beanDesc,
JsonDeserializer<?> deserializer) {
if (beanDesc.getBeanClass().equals(CallFunction.class)) {
return new CallJavaDeserializer<>(deserializer);
}
return deserializer;
}
});
super.setSerializerModifier(
new BeanSerializerModifier() {
@Override
public List<BeanPropertyWriter> changeProperties(
SerializationConfig config,
BeanDescription beanDesc,
List<BeanPropertyWriter> beanProperties) {
if (beanDesc.getBeanClass().equals(SerializedLambda.class)) {
beanProperties.add(new SerializedLambdaWriter(beanProperties.get(0)));
}

return beanProperties;
}
});
super.setupModule(context);
}
}
Loading
Loading