From ae0d727c222e6e7a60b57907f53564046ec123e6 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Tue, 30 Jun 2026 17:25:57 +0200 Subject: [PATCH 1/2] [Fix #1489] Serializing Java defined workflows Signed-off-by: fjtirado --- experimental/fluent/func/pom.xml | 31 ++++- .../fluent/func/FuncTaskItemListBuilder.java | 5 + .../fluent/func/FuncDSLSerializationTest.java | 115 ++++++++++++++++++ .../func/src/test/resources/logback_test.xml | 19 +++ experimental/fluent/jackson/pom.xml | 32 +++++ .../jackson/CallJavaFunctionDeserializer.java | 78 ++++++++++++ .../jackson/CallJavaFunctionSerializer.java | 46 +++++++ .../jackson/CallTaskFunctionSerializer.java | 28 +++++ .../jackson/FuncJacksonModule.java | 33 +++++ .../jackson/FuncUnionCustomizer.java | 31 +++++ .../com.fasterxml.jackson.databind.Module | 1 + ...lessworkflow.serialization.UnionCustomizer | 1 + experimental/fluent/pom.xml | 1 + experimental/pom.xml | 5 + .../api/reflection/func/ReflectionUtils.java | 57 +++++++-- .../api/types/func/CallTaskJava.java | 8 ++ .../fluent/spec/BaseWorkflowBuilder.java | 2 +- .../fluent/spec/WorkflowBuilderTest.java | 2 +- .../serialization/DeserializeHelper.java | 51 ++++++-- .../serialization/UnionCustomizer.java | 23 ++++ 20 files changed, 544 insertions(+), 25 deletions(-) create mode 100644 experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLSerializationTest.java create mode 100644 experimental/fluent/func/src/test/resources/logback_test.xml create mode 100644 experimental/fluent/jackson/pom.xml create mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionDeserializer.java create mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionSerializer.java create mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallTaskFunctionSerializer.java create mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncJacksonModule.java create mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncUnionCustomizer.java create mode 100644 experimental/fluent/jackson/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module create mode 100644 experimental/fluent/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.serialization.UnionCustomizer create mode 100644 serialization/src/main/java/io/serverlessworkflow/serialization/UnionCustomizer.java diff --git a/experimental/fluent/func/pom.xml b/experimental/fluent/func/pom.xml index dcf2aa96f..f405a8ab4 100644 --- a/experimental/fluent/func/pom.xml +++ b/experimental/fluent/func/pom.xml @@ -33,17 +33,42 @@ io.cloudevents cloudevents-core - org.junit.jupiter - junit-jupiter-api + junit-jupiter-engine + test + + + org.junit.jupiter + junit-jupiter-params + test + + + org.assertj + assertj-core test org.mockito mockito-core - ${version.org.mockito} test + + io.serverlessworkflow + serverlessworkflow-api + test + + + ch.qos.logback + logback-classic + test + + + + io.serverlessworkflow + serverlessworkflow-experimental-fluent-serialization-jackson + test + + \ No newline at end of file diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java index 257a1f96b..302d2b89e 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java @@ -26,6 +26,7 @@ import io.serverlessworkflow.fluent.spec.WaitTaskBuilder; import io.serverlessworkflow.fluent.spec.WorkflowTaskBuilder; import java.util.List; +import java.util.Map; import java.util.function.Consumer; public class FuncTaskItemListBuilder extends BaseTaskItemListBuilder @@ -77,6 +78,10 @@ public FuncTaskItemListBuilder set(String name, String expr) { return this.set(name, s -> s.expr(expr)); } + public FuncTaskItemListBuilder set(String name, Map map) { + return this.set(name, s -> s.expr(map)); + } + @Override public FuncTaskItemListBuilder emit(String name, Consumer itemsConfigurer) { name = this.defaultNameAndRequireConfig(name, itemsConfigurer, TYPE_EMIT); diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLSerializationTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLSerializationTest.java new file mode 100644 index 000000000..e414eeaaa --- /dev/null +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLSerializationTest.java @@ -0,0 +1,115 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflow; +import static io.serverlessworkflow.api.WorkflowWriter.workflowAsBytes; +import static io.serverlessworkflow.api.WorkflowWriter.workflowAsString; +import static io.serverlessworkflow.api.WorkflowWriter.writeWorkflow; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import io.serverlessworkflow.api.WorkflowFormat; +import io.serverlessworkflow.api.types.Workflow; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FuncDSLSerializationTest { + + private static final Logger logger = LoggerFactory.getLogger(FuncDSLSerializationTest.class); + + @ParameterizedTest + @MethodSource("workflows") + public void testSpecFeaturesParsing(Workflow workflow) throws IOException { + assertWorkflow(workflow); + assertWorkflowEquals(workflow, writeAndReadInMemory(workflow)); + } + + static Stream workflows() { + return Stream.of( + FuncWorkflowBuilder.workflow("waitCompletable") + .tasks(function(FuncDSLSerializationTest::inc)) + .build(), + FuncWorkflowBuilder.workflow("hello") + .tasks(t -> t.set("sayHelloWorld", b -> b.expr(Map.of("result", "hello world!")))) + .build()); + } + + private static Workflow writeAndReadInMemory(Workflow workflow) throws IOException { + byte[] bytes; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + writeWorkflow(out, workflow, WorkflowFormat.JSON); + bytes = out.toByteArray(); + } + logger.debug("Serialized json is " + new String(bytes)); + try (ByteArrayInputStream in = new ByteArrayInputStream(bytes)) { + return readWorkflow(in, WorkflowFormat.JSON); + } + } + + private static void assertWorkflow(Workflow workflow) { + assertNotNull(workflow); + assertNotNull(workflow.getDocument()); + assertNotNull(workflow.getDo()); + } + + private static void assertWorkflowEquals(Workflow workflow, Workflow other) throws IOException { + assertThat(workflowAsString(workflow, WorkflowFormat.YAML)) + .isEqualTo(workflowAsString(other, WorkflowFormat.YAML)); + assertThat(workflowAsBytes(workflow, WorkflowFormat.JSON)) + .isEqualTo(workflowAsBytes(other, WorkflowFormat.JSON)); + } + + private static Integer inc(Integer quantity) { + return quantity++; + } + + // private static class MyFunction implements SerializableFunction { + // + // private static final long serialVersionUID = 1L; + // + // @Override + // public Integer apply(Integer arg0) { + // return arg0++; + // } + // } + // + // public static void main(String[] args) throws JsonProcessingException { + // System.out.println( + // WorkflowFormat.JSON + // .mapper() + // .writeValueAsString( + // new CallJava.CallJavaFunction( + // FuncDSLSerializationTest::inc, + // Optional.of(Integer.class), + // Optional.of(Integer.class)))); + // + // System.out.println( + // WorkflowFormat.JSON + // .mapper() + // .writeValueAsString( + // new CallJava.CallJavaFunction( + // new MyFunction(), Optional.of(Integer.class), Optional.of(Integer.class)))); + // } +} diff --git a/experimental/fluent/func/src/test/resources/logback_test.xml b/experimental/fluent/func/src/test/resources/logback_test.xml new file mode 100644 index 000000000..c3b6bcb5b --- /dev/null +++ b/experimental/fluent/func/src/test/resources/logback_test.xml @@ -0,0 +1,19 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + \ No newline at end of file diff --git a/experimental/fluent/jackson/pom.xml b/experimental/fluent/jackson/pom.xml new file mode 100644 index 000000000..363d1d99b --- /dev/null +++ b/experimental/fluent/jackson/pom.xml @@ -0,0 +1,32 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-experimental-fluent + 8.0.0-SNAPSHOT + + serverlessworkflow-experimental-fluent-serialization-jackson + Serverless Workflow :: Experimental :: Fluent :: Serialization:: Jackson + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + + io.serverlessworkflow + serverlessworkflow-experimental-types + + + io.serverlessworkflow + serverlessworkflow-serialization + + + io.serverlessworkflow + serverlessworkflow-impl-json + + + io.serverlessworkflow + serverlessworkflow-api + + + \ No newline at end of file diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionDeserializer.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionDeserializer.java new file mode 100644 index 000000000..a9d96bc86 --- /dev/null +++ b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionDeserializer.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.serialization.jackson; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import io.serverlessworkflow.api.reflection.func.ReflectionUtils; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.CallJava.CallJavaFunction; +import java.io.IOException; +import java.lang.invoke.SerializedLambda; +import java.lang.reflect.Method; +import java.util.Optional; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CallJavaFunctionDeserializer extends JsonDeserializer { + + private static Logger logger = LoggerFactory.getLogger(CallJavaFunctionDeserializer.class); + + @Override + public CallJavaFunction deserialize(JsonParser p, DeserializationContext ctxt) + throws IOException { + + TreeNode content = p.readValueAsTree(); + TreeNode callNode = content.get("call"); + if (!(callNode instanceof TextNode textNode) || !textNode.asText().equals("Java")) { + throw new JsonParseException( + "Expecting a call property which value shoud be Java but instead it was " + callNode); + } + TreeNode functionNode = content.get("function"); + if (functionNode == null) { + throw new JsonParseException("A CallJava function should have a function property"); + } + if (functionNode instanceof ObjectNode objectNode) { + SerializedLambda serializedLambda = ctxt.readTreeAsValue(objectNode, SerializedLambda.class); + try { + Method readResolve = SerializedLambda.class.getDeclaredMethod("readResolve"); + readResolve.setAccessible(true); + + return new CallJavaFunction( + (Function) readResolve.invoke(serializedLambda), + Optional.of(ReflectionUtils.inputType(serializedLambda)), + Optional.of(ReflectionUtils.outputType(serializedLambda))); + } catch (ReflectiveOperationException e) { + throw new IOException( + "Reflection exception while converting SerializedLamda " + + serializedLambda + + "into a funcion"); + } + + } else { + logger.error( + "function property: {} , does not contain enough information to be properly unmarshall, using an function that does nothing", + functionNode); + return new CallJavaFunction<>(v -> v, Optional.empty(), Optional.empty()); + } + } +} diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionSerializer.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionSerializer.java new file mode 100644 index 000000000..f6deb0f93 --- /dev/null +++ b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionSerializer.java @@ -0,0 +1,46 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.serialization.jackson; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import io.serverlessworkflow.api.reflection.func.ReflectionUtils; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.CallJava.CallJavaFunction; +import java.io.IOException; +import java.lang.invoke.SerializedLambda; +import java.util.Optional; + +public class CallJavaFunctionSerializer extends JsonSerializer { + + @Override + public void serialize(CallJavaFunction value, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + gen.writeStartObject(); + gen.writeStringField("call", "Java"); + gen.writeObjectFieldStart("with"); + Optional serializedLambda = + ReflectionUtils.getSerializedLambda(value.function()); + if (serializedLambda.isPresent()) { + gen.writeObjectField("function", serializedLambda.orElse(null)); + } else { + gen.writeStringField("function", value.function().toString()); + } + gen.writeEndObject(); + gen.writeEndObject(); + } +} diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallTaskFunctionSerializer.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallTaskFunctionSerializer.java new file mode 100644 index 000000000..88904b46d --- /dev/null +++ b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallTaskFunctionSerializer.java @@ -0,0 +1,28 @@ +package io.serverlessworkflow.fluent.func.serialization.jackson; + +import java.io.IOException; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import io.serverlessworkflow.api.types.CallTask; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.CallTaskJava; +import io.serverlessworkflow.api.types.jackson.CallTaskSerializer; + +public class CallTaskFunctionSerializer extends CallTaskSerializer { + + @Override + public void serialize(CallTask value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + + if (value instanceof CallTaskJava javaCall) { + + javaCall.getCallJava(); + + } else { + super.serialize(value, gen, serializers); + } + } + +} diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncJacksonModule.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncJacksonModule.java new file mode 100644 index 000000000..63ea3ea9a --- /dev/null +++ b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncJacksonModule.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.serialization.jackson; + +import com.fasterxml.jackson.databind.module.SimpleModule; + +import io.serverlessworkflow.api.types.CallTask; +import io.serverlessworkflow.api.types.func.CallJava; + +public class FuncJacksonModule extends SimpleModule { + + private static final long serialVersionUID = 1L; + + public void setupModule(com.fasterxml.jackson.databind.Module.SetupContext context) { + super.addSerializer(CallTask.class, new CallTaskFunctionSerializer()); +// super.addDeserializer(CallJava.CallJavaFunction.class, new CallJavaFunctionDeserializer()); + // super.addSerializer(CallJava.CallJavaFunction.class, new CallJavaFunctionSerializer()); + super.setupModule(context); + } +} diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncUnionCustomizer.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncUnionCustomizer.java new file mode 100644 index 000000000..2aacf5c18 --- /dev/null +++ b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncUnionCustomizer.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.serialization.jackson; + +import io.serverlessworkflow.api.types.CallTask; +import io.serverlessworkflow.api.types.func.CallJava.CallJavaFunction; +import io.serverlessworkflow.serialization.UnionCustomizer; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class FuncUnionCustomizer implements UnionCustomizer { + + @Override + public Map, Collection>> additionalClasses() { + return Map.of(CallTask.class, List.of(CallJavaFunction.class)); + } +} diff --git a/experimental/fluent/jackson/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module b/experimental/fluent/jackson/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module new file mode 100644 index 000000000..3ae3675c6 --- /dev/null +++ b/experimental/fluent/jackson/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module @@ -0,0 +1 @@ +io.serverlessworkflow.fluent.func.serialization.jackson.FuncJacksonModule \ No newline at end of file diff --git a/experimental/fluent/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.serialization.UnionCustomizer b/experimental/fluent/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.serialization.UnionCustomizer new file mode 100644 index 000000000..1f872175c --- /dev/null +++ b/experimental/fluent/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.serialization.UnionCustomizer @@ -0,0 +1 @@ +io.serverlessworkflow.fluent.func.serialization.jackson.FuncUnionCustomizer \ No newline at end of file diff --git a/experimental/fluent/pom.xml b/experimental/fluent/pom.xml index 56bd2f5aa..15f289354 100644 --- a/experimental/fluent/pom.xml +++ b/experimental/fluent/pom.xml @@ -60,5 +60,6 @@ func + jackson \ No newline at end of file diff --git a/experimental/pom.xml b/experimental/pom.xml index 54dd0e206..1fda5a72a 100644 --- a/experimental/pom.xml +++ b/experimental/pom.xml @@ -41,6 +41,11 @@ serverlessworkflow-experimental-fluent-func ${project.version} + + io.serverlessworkflow + serverlessworkflow-experimental-fluent-serialization-jackson + ${project.version} + diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java index 6b30fcdb2..2366b3716 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java @@ -19,8 +19,12 @@ import io.serverlessworkflow.api.types.func.FilterFunction; import java.lang.invoke.MethodType; import java.lang.invoke.SerializedLambda; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Optional; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Specially used by {@link Function} parameters in the Java Function. @@ -29,6 +33,8 @@ */ public final class ReflectionUtils { + private static final Logger logger = LoggerFactory.getLogger(ReflectionUtils.class); + private ReflectionUtils() {} @SuppressWarnings("unchecked") @@ -68,7 +74,7 @@ public static Class inferInputType(SerializableConsumer fn) { @SuppressWarnings("unchecked") public static Class inferResultType(Object fn) { - return (Class) inferMethodType(fn).returnType(); + return (Class) inferOutputType(inferMethodType(fn)); } /** @@ -78,21 +84,52 @@ public static Class inferResultType(Object fn) { * @param lambdaParamIndex The index of the payload parameter in the interface's apply method */ public static Class inferInputTypeFromAny(Object fn, int lambdaParamIndex) { - return inferMethodType(fn).parameterArray()[lambdaParamIndex]; + return inferInputType(inferMethodType(fn), lambdaParamIndex); } - private static MethodType inferMethodType(Object fn) { + public static Optional getSerializedLambda(Object fn) { try { - Method m = fn.getClass().getDeclaredMethod("writeReplace"); - m.setAccessible(true); - SerializedLambda sl = (SerializedLambda) m.invoke(fn); + return Optional.of(serializedLambda(fn)); + } catch (ReflectiveOperationException ex) { + logger.debug("Error resolving serialized lambda for {}", fn, ex); + return Optional.empty(); + } + } - ClassLoader cl = fn.getClass().getClassLoader(); + private static SerializedLambda serializedLambda(Object fn) + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { + Method m = fn.getClass().getDeclaredMethod("writeReplace"); + m.setAccessible(true); + return (SerializedLambda) m.invoke(fn); + } + + public static Class outputType(SerializedLambda lambda) { + return inferOutputType(inferMethodType(lambda)); + } - // getInstantiatedMethodType() provides the exact generic signature resolved - // by the compiler, completely bypassing captured variables and method kind switches! + public static Class inputType(SerializedLambda lambda) { + return inferInputType(inferMethodType(lambda), 0); + } - return MethodType.fromMethodDescriptorString(sl.getInstantiatedMethodType(), cl); + private static Class inferInputType(MethodType type, int index) { + return type.parameterType(index); + } + + private static Class inferOutputType(MethodType type) { + return type.returnType(); + } + + private static MethodType inferMethodType(SerializedLambda sl) { + // getInstantiatedMethodType() provides the exact generic signature resolved + // by the compiler, completely bypassing captured variables and method kind switches! + return MethodType.fromMethodDescriptorString( + sl.getInstantiatedMethodType(), sl.getClass().getClassLoader()); + } + + private static MethodType inferMethodType(Object fn) { + try { + SerializedLambda sl = serializedLambda(fn); + return inferMethodType(sl); } catch (ReflectiveOperationException ex) { throw new IllegalStateException( "Cannot infer type from lambda. Pass Class or use a method reference.", ex); diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallTaskJava.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallTaskJava.java index cde6281f2..3f53f90f1 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallTaskJava.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallTaskJava.java @@ -15,12 +15,15 @@ */ package io.serverlessworkflow.api.types.func; +import io.serverlessworkflow.annotations.OneOfSetter; import io.serverlessworkflow.api.types.CallTask; public class CallTaskJava extends CallTask { private CallJava callJava; + public CallTaskJava() {} + public CallTaskJava(CallJava callJava) { this.callJava = callJava; } @@ -29,6 +32,11 @@ public CallJava getCallJava() { return callJava; } + @OneOfSetter(CallJava.class) + public void setCallJava(CallJava callJava) { + this.callJava = callJava; + } + @Override public Object get() { return callJava != null ? callJava : super.get(); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseWorkflowBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseWorkflowBuilder.java index 0d305d8a8..8d1f16be9 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseWorkflowBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseWorkflowBuilder.java @@ -37,7 +37,7 @@ public abstract class BaseWorkflowBuilder< public static final String DSL = "1.0.0"; public static final String DEFAULT_VERSION = "0.0.1"; - public static final String DEFAULT_NAMESPACE = "org.acme"; + public static final String DEFAULT_NAMESPACE = "org-acme"; protected final Workflow workflow; private final Document document; diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java index b316de7fe..c275762b8 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java @@ -76,7 +76,7 @@ void testWorkflowDocumentDefaults() { assertNotNull(wf, "Workflow should not be null"); Document doc = wf.getDocument(); assertNotNull(doc, "Document should not be null"); - assertEquals("org.acme", doc.getNamespace(), "Default namespace should be org.acme"); + assertEquals("org-acme", doc.getNamespace(), "Default namespace should be org.acme"); assertEquals("0.0.1", doc.getVersion(), "Default version should be 0.0.1"); assertEquals("1.0.0", doc.getDsl(), "DSL version should be set to 1.0.0"); assertNotNull(doc.getName(), "Name should be auto-generated"); diff --git a/serialization/src/main/java/io/serverlessworkflow/serialization/DeserializeHelper.java b/serialization/src/main/java/io/serverlessworkflow/serialization/DeserializeHelper.java index 041474d91..aa826f5f6 100644 --- a/serialization/src/main/java/io/serverlessworkflow/serialization/DeserializeHelper.java +++ b/serialization/src/main/java/io/serverlessworkflow/serialization/DeserializeHelper.java @@ -26,30 +26,44 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.ServiceLoader; +import java.util.ServiceLoader.Provider; +import java.util.stream.Collectors; public class DeserializeHelper { + private static Map, Collection>> additionalClasses = + ServiceLoader.load(UnionCustomizer.class).stream() + .map(Provider::get) + .map(UnionCustomizer::additionalClasses) + .flatMap(map -> map.entrySet().stream()) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + public static T deserializeOneOf( JsonParser p, Class targetClass, Collection> oneOfTypes) throws IOException { TreeNode node = p.readValueAsTree(); try { T result = targetClass.getDeclaredConstructor().newInstance(); - Collection exceptions = new ArrayList<>(); - for (Class oneOfType : oneOfTypes) { - try { - assingIt(p, result, node, targetClass, oneOfType); - break; - } catch (IOException | ConstraintViolationException | InvocationTargetException ex) { - exceptions.add(ex); - } + Collection> possibleTypes; + Collection> additional = additionalClasses.get(targetClass); + if (additional != null && !additional.isEmpty()) { + possibleTypes = new HashSet>(oneOfTypes); + possibleTypes.addAll(additional); + } else { + possibleTypes = oneOfTypes; } - if (exceptions.size() == oneOfTypes.size()) { + Collection exceptions = + deserializeOneOf(result, node, p, targetClass, possibleTypes); + if (exceptions.size() == possibleTypes.size()) { JsonMappingException ex = new JsonMappingException( p, String.format( "Error deserializing class %s, all oneOf alternatives %s has failed ", - targetClass, oneOfTypes)); + targetClass, possibleTypes)); exceptions.forEach(ex::addSuppressed); throw ex; } @@ -59,6 +73,23 @@ public static T deserializeOneOf( } } + private static Collection deserializeOneOf( + T result, TreeNode node, JsonParser p, Class targetClass, Collection> oneOfTypes) + throws ReflectiveOperationException { + Collection exceptions = new ArrayList<>(); + for (Class oneOfType : oneOfTypes) { + try { + assingIt(p, result, node, targetClass, oneOfType); + break; + } catch (IOException | ConstraintViolationException ex) { + exceptions.add(ex); + } catch (InvocationTargetException ex) { + exceptions.add(ex.getCause()); + } + } + return exceptions; + } + private static void assingIt( JsonParser p, T result, TreeNode node, Class targetClass, Class type) throws JsonProcessingException, ReflectiveOperationException { diff --git a/serialization/src/main/java/io/serverlessworkflow/serialization/UnionCustomizer.java b/serialization/src/main/java/io/serverlessworkflow/serialization/UnionCustomizer.java new file mode 100644 index 000000000..b9f9155fe --- /dev/null +++ b/serialization/src/main/java/io/serverlessworkflow/serialization/UnionCustomizer.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.serialization; + +import java.util.Collection; +import java.util.Map; + +public interface UnionCustomizer { + Map, Collection>> additionalClasses(); +} From 307fdd4cd53d8351349fea2a15fd3f6bc9194952 Mon Sep 17 00:00:00 2001 From: fjtirado Date: Wed, 1 Jul 2026 21:52:22 +0200 Subject: [PATCH 2/2] [Fix #1489] Alternative approach Signed-off-by: fjtirado --- .../api/WorkflowWriter.java | 8 +- experimental/fluent/func/pom.xml | 31 +---- .../fluent/func/FuncCallTaskBuilder.java | 34 +++--- .../fluent/func/FuncForTaskBuilder.java | 9 +- .../fluent/func/FuncForkTaskBuilder.java | 5 +- .../fluent/func/dsl/FuncDSL.java | 2 +- .../fluent/func/FuncDSLSerializationTest.java | 115 ------------------ .../func/src/test/resources/logback_test.xml | 19 --- experimental/fluent/jackson/pom.xml | 6 +- .../jackson/CallJavaDeserializer.java | 75 ++++++++++++ .../jackson/CallJavaFunctionDeserializer.java | 78 ------------ .../jackson/CallTaskFunctionSerializer.java | 28 ----- .../jackson/FuncJacksonModule.java | 67 +++++++++- .../jackson/FuncUnionCustomizer.java | 31 ----- ...va => SerializableFunctionSerializer.java} | 18 +-- .../jackson/SerializedLambdaDeserializer.java | 95 +++++++++++++++ .../jackson/SerializedLambdaWriter.java | 56 +++++++++ ...lessworkflow.serialization.UnionCustomizer | 1 - .../func/JavaForExecutorBuilder.java | 8 +- .../func/CallJavaContextFunctionTest.java | 5 +- .../impl/executors/func/CallTest.java | 37 +++--- .../func/ForTaskFunctionRegressionTest.java | 59 +-------- experimental/test/pom.xml | 20 +++ .../fluent/test/ForEachFuncTest.java | 11 +- .../fluent/test/FuncDSLSerializationTest.java | 107 ++++++++++++++++ .../fluent/test/TestSerializationUtils.java | 48 ++++++++ .../test/src/test/resources/logback.xml | 15 +++ .../api/reflection/func/ReflectionUtils.java | 29 +++-- .../api/types/func/CallJava.java | 56 ++++++++- .../api/types/func/CallTaskJava.java | 44 ------- .../api/types/func/FilterPredicate.java | 3 +- .../api/types/func/ForTaskFunction.java | 28 +---- .../api/types/func/LoopFunction.java | 3 +- .../api/types/func/LoopFunctionIndex.java | 4 +- .../api/types/func/LoopPredicate.java | 3 +- .../api/types/func/LoopPredicateIndex.java | 4 +- .../types/func/LoopPredicateIndexContext.java | 3 +- .../types/func/LoopPredicateIndexFilter.java | 3 +- .../serialization/DeserializeHelper.java | 51 ++------ .../serialization/UnionCustomizer.java | 23 ---- 40 files changed, 667 insertions(+), 575 deletions(-) delete mode 100644 experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLSerializationTest.java delete mode 100644 experimental/fluent/func/src/test/resources/logback_test.xml create mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaDeserializer.java delete mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionDeserializer.java delete mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallTaskFunctionSerializer.java delete mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncUnionCustomizer.java rename experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/{CallJavaFunctionSerializer.java => SerializableFunctionSerializer.java} (60%) create mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializedLambdaDeserializer.java create mode 100644 experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializedLambdaWriter.java delete mode 100644 experimental/fluent/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.serialization.UnionCustomizer create mode 100644 experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncDSLSerializationTest.java create mode 100644 experimental/test/src/test/java/io/serverlessworkflow/fluent/test/TestSerializationUtils.java create mode 100644 experimental/test/src/test/resources/logback.xml delete mode 100644 experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallTaskJava.java delete mode 100644 serialization/src/main/java/io/serverlessworkflow/serialization/UnionCustomizer.java diff --git a/api/src/main/java/io/serverlessworkflow/api/WorkflowWriter.java b/api/src/main/java/io/serverlessworkflow/api/WorkflowWriter.java index 3285cff43..7af3d0290 100644 --- a/api/src/main/java/io/serverlessworkflow/api/WorkflowWriter.java +++ b/api/src/main/java/io/serverlessworkflow/api/WorkflowWriter.java @@ -22,6 +22,8 @@ import java.io.Writer; import java.nio.file.Files; import java.nio.file.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utility class for writing Serverless Workflow definitions to various outputs and formats. @@ -32,6 +34,8 @@ */ public class WorkflowWriter { + private static final Logger logger = LoggerFactory.getLogger(WorkflowWriter.class); + /** * Writes a {@link Workflow} to the given {@link OutputStream} in the specified format. * @@ -95,7 +99,9 @@ public static void writeWorkflow(Path output, Workflow workflow, WorkflowFormat */ public static String workflowAsString(Workflow workflow, WorkflowFormat format) throws IOException { - return format.mapper().writeValueAsString(workflow); + String result = format.mapper().writeValueAsString(workflow); + logger.trace("Workflow is {}", result); + return result; } /** diff --git a/experimental/fluent/func/pom.xml b/experimental/fluent/func/pom.xml index f405a8ab4..dcf2aa96f 100644 --- a/experimental/fluent/func/pom.xml +++ b/experimental/fluent/func/pom.xml @@ -33,42 +33,17 @@ io.cloudevents cloudevents-core + org.junit.jupiter - junit-jupiter-engine - test - - - org.junit.jupiter - junit-jupiter-params - test - - - org.assertj - assertj-core + junit-jupiter-api test org.mockito mockito-core + ${version.org.mockito} test - - io.serverlessworkflow - serverlessworkflow-api - test - - - ch.qos.logback - logback-classic - test - - - - io.serverlessworkflow - serverlessworkflow-experimental-fluent-serialization-jackson - test - - \ No newline at end of file diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java index 31bf37b46..3e92be04a 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java @@ -15,8 +15,8 @@ */ package io.serverlessworkflow.fluent.func; +import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallTaskJava; import io.serverlessworkflow.api.types.func.ContextFunction; import io.serverlessworkflow.api.types.func.FilterFunction; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; @@ -29,12 +29,9 @@ public class FuncCallTaskBuilder extends TaskBaseBuilder implements FuncTaskTransformations, ConditionalTaskBuilder { - private CallTaskJava callTaskJava; + private CallTask callTaskJava; - FuncCallTaskBuilder() { - callTaskJava = new CallTaskJava(new CallJava() {}); - super.setTask(callTaskJava.getCallJava()); - } + FuncCallTaskBuilder() {} @Override protected FuncCallTaskBuilder self() { @@ -51,8 +48,9 @@ public FuncCallTaskBuilder function(Function function, Class arg public FuncCallTaskBuilder function( Function function, Class argClass, Class returnClass) { - this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, returnClass)); - super.setTask(this.callTaskJava.getCallJava()); + this.callTaskJava = + new CallTask().withCallFunction(CallJava.function(function, argClass, returnClass)); + super.setTask(this.callTaskJava.getCallFunction()); return this; } @@ -66,8 +64,9 @@ public FuncCallTaskBuilder function(ContextFunction function, Class public FuncCallTaskBuilder function( ContextFunction function, Class argClass, Class returnClass) { - this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, returnClass)); - super.setTask(this.callTaskJava.getCallJava()); + this.callTaskJava = + new CallTask().withCallFunction(CallJava.function(function, argClass, returnClass)); + super.setTask(this.callTaskJava.getCallFunction()); return this; } @@ -81,26 +80,27 @@ public FuncCallTaskBuilder function(FilterFunction function, Class< public FuncCallTaskBuilder function( FilterFunction function, Class argClass, Class outputClass) { - this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, outputClass)); - super.setTask(this.callTaskJava.getCallJava()); + this.callTaskJava = + new CallTask().withCallFunction(CallJava.function(function, argClass, outputClass)); + super.setTask(this.callTaskJava.getCallFunction()); return this; } /** Accept a side-effect Consumer; engine should pass input through unchanged. */ public FuncCallTaskBuilder consumer(Consumer consumer) { - this.callTaskJava = new CallTaskJava(CallJava.consumer(consumer)); - super.setTask(this.callTaskJava.getCallJava()); + this.callTaskJava = new CallTask().withCallFunction(CallJava.consumer(consumer)); + super.setTask(this.callTaskJava.getCallFunction()); return this; } /** Accept a Consumer with explicit input type hint. */ public FuncCallTaskBuilder consumer(Consumer consumer, Class argClass) { - this.callTaskJava = new CallTaskJava(CallJava.consumer(consumer, argClass)); - super.setTask(this.callTaskJava.getCallJava()); + this.callTaskJava = new CallTask().withCallFunction(CallJava.consumer(consumer, argClass)); + super.setTask(this.callTaskJava.getCallFunction()); return this; } - public CallTaskJava build() { + public CallTask build() { return this.callTaskJava; } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForTaskBuilder.java index 3e0a59d60..6a5d62e32 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForTaskBuilder.java @@ -15,11 +15,11 @@ */ package io.serverlessworkflow.fluent.func; +import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.ForTaskConfiguration; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallTaskJava; import io.serverlessworkflow.api.types.func.ForTaskFunction; import io.serverlessworkflow.api.types.func.LoopFunction; import io.serverlessworkflow.api.types.func.LoopPredicate; @@ -84,9 +84,10 @@ public FuncForTaskBuilder tasks(String name, LoopFunction fun name, new Task() .withCallTask( - new CallTaskJava( - CallJava.loopFunction( - function, this.forTaskFunction.getFor().getEach()))))); + new CallTask() + .withCallFunction( + CallJava.loopFunction( + function, this.forTaskFunction.getFor().getEach()))))); return this; } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java index f7f87e62d..9d43116ce 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java @@ -15,10 +15,10 @@ */ package io.serverlessworkflow.fluent.func; +import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallTaskJava; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; import io.serverlessworkflow.fluent.spec.AbstractForkTaskBuilder; @@ -61,7 +61,8 @@ public FuncForkTaskBuilder branch( this.defaultBranchName(name, this.currentOffset()), new Task() .withCallTask( - new CallTaskJava(CallJava.function(function, argParam, returnClass))))); + new CallTask() + .withCallFunction(CallJava.function(function, argParam, returnClass))))); return this; } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java index bcf3f20fb..6a552be05 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java @@ -1599,7 +1599,7 @@ public static FuncTaskConfigurer forEach( } public static FuncTaskConfigurer forEachItem( - SerializableFunction> collection, Function function) { + SerializableFunction> collection, SerializableFunction function) { return forEachItem(null, collection, function); } diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLSerializationTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLSerializationTest.java deleted file mode 100644 index e414eeaaa..000000000 --- a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLSerializationTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.fluent.func; - -import static io.serverlessworkflow.api.WorkflowReader.readWorkflow; -import static io.serverlessworkflow.api.WorkflowWriter.workflowAsBytes; -import static io.serverlessworkflow.api.WorkflowWriter.workflowAsString; -import static io.serverlessworkflow.api.WorkflowWriter.writeWorkflow; -import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import io.serverlessworkflow.api.WorkflowFormat; -import io.serverlessworkflow.api.types.Workflow; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Map; -import java.util.stream.Stream; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FuncDSLSerializationTest { - - private static final Logger logger = LoggerFactory.getLogger(FuncDSLSerializationTest.class); - - @ParameterizedTest - @MethodSource("workflows") - public void testSpecFeaturesParsing(Workflow workflow) throws IOException { - assertWorkflow(workflow); - assertWorkflowEquals(workflow, writeAndReadInMemory(workflow)); - } - - static Stream workflows() { - return Stream.of( - FuncWorkflowBuilder.workflow("waitCompletable") - .tasks(function(FuncDSLSerializationTest::inc)) - .build(), - FuncWorkflowBuilder.workflow("hello") - .tasks(t -> t.set("sayHelloWorld", b -> b.expr(Map.of("result", "hello world!")))) - .build()); - } - - private static Workflow writeAndReadInMemory(Workflow workflow) throws IOException { - byte[] bytes; - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - writeWorkflow(out, workflow, WorkflowFormat.JSON); - bytes = out.toByteArray(); - } - logger.debug("Serialized json is " + new String(bytes)); - try (ByteArrayInputStream in = new ByteArrayInputStream(bytes)) { - return readWorkflow(in, WorkflowFormat.JSON); - } - } - - private static void assertWorkflow(Workflow workflow) { - assertNotNull(workflow); - assertNotNull(workflow.getDocument()); - assertNotNull(workflow.getDo()); - } - - private static void assertWorkflowEquals(Workflow workflow, Workflow other) throws IOException { - assertThat(workflowAsString(workflow, WorkflowFormat.YAML)) - .isEqualTo(workflowAsString(other, WorkflowFormat.YAML)); - assertThat(workflowAsBytes(workflow, WorkflowFormat.JSON)) - .isEqualTo(workflowAsBytes(other, WorkflowFormat.JSON)); - } - - private static Integer inc(Integer quantity) { - return quantity++; - } - - // private static class MyFunction implements SerializableFunction { - // - // private static final long serialVersionUID = 1L; - // - // @Override - // public Integer apply(Integer arg0) { - // return arg0++; - // } - // } - // - // public static void main(String[] args) throws JsonProcessingException { - // System.out.println( - // WorkflowFormat.JSON - // .mapper() - // .writeValueAsString( - // new CallJava.CallJavaFunction( - // FuncDSLSerializationTest::inc, - // Optional.of(Integer.class), - // Optional.of(Integer.class)))); - // - // System.out.println( - // WorkflowFormat.JSON - // .mapper() - // .writeValueAsString( - // new CallJava.CallJavaFunction( - // new MyFunction(), Optional.of(Integer.class), Optional.of(Integer.class)))); - // } -} diff --git a/experimental/fluent/func/src/test/resources/logback_test.xml b/experimental/fluent/func/src/test/resources/logback_test.xml deleted file mode 100644 index c3b6bcb5b..000000000 --- a/experimental/fluent/func/src/test/resources/logback_test.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - - - - - - - \ No newline at end of file diff --git a/experimental/fluent/jackson/pom.xml b/experimental/fluent/jackson/pom.xml index 363d1d99b..10b764f89 100644 --- a/experimental/fluent/jackson/pom.xml +++ b/experimental/fluent/jackson/pom.xml @@ -8,10 +8,14 @@ serverlessworkflow-experimental-fluent-serialization-jackson Serverless Workflow :: Experimental :: Fluent :: Serialization:: Jackson - + com.fasterxml.jackson.dataformat jackson-dataformat-yaml + + com.fasterxml.jackson.datatype + jackson-datatype-jdk8 + io.serverlessworkflow serverlessworkflow-experimental-types diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaDeserializer.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaDeserializer.java new file mode 100644 index 000000000..8e9757307 --- /dev/null +++ b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaDeserializer.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.serialization.jackson; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.deser.ResolvableDeserializer; +import io.serverlessworkflow.api.types.CallFunction; +import io.serverlessworkflow.api.types.FunctionArguments; +import io.serverlessworkflow.api.types.func.CallJava; +import java.io.IOException; +import java.lang.invoke.SerializedLambda; +import java.util.Map; + +public class CallJavaDeserializer extends JsonDeserializer + implements ResolvableDeserializer { + + private JsonDeserializer defaultDeserializer; + + public CallJavaDeserializer(JsonDeserializer defaultDeserializer) { + this.defaultDeserializer = defaultDeserializer; + } + + @Override + public T deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + CallFunction original = (CallFunction) defaultDeserializer.deserialize(p, ctxt); + if (original.getCall().equals(CallJava.JAVA_CALL_KEY)) { + FunctionArguments args = original.getWith(); + if (args != null) { + Map props = args.getAdditionalProperties(); + try { + return (T) + CallJava.fromFunctionProperties( + props, + unmarshall((ObjectMapper) p.getCodec(), props.get(CallJava.FUNCTION_NAME_KEY))); + } catch (ReflectiveOperationException e) { + throw new IOException("Error unmarshalling java call with args " + args, e); + } + } + } + return (T) original; + } + + private SerializedLambda unmarshall(ObjectMapper mapper, Object funcNode) throws IOException { + if (funcNode instanceof Map) { + return mapper.convertValue(funcNode, SerializedLambda.class); + } else { + throw new IOException( + "Expecting function object to be a Map, but it was " + + funcNode + + " Please check if you are using serializable lambdas in your workflow definition"); + } + } + + @Override + public void resolve(DeserializationContext ctxt) throws JsonMappingException { + ((ResolvableDeserializer) defaultDeserializer).resolve(ctxt); + } +} diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionDeserializer.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionDeserializer.java deleted file mode 100644 index a9d96bc86..000000000 --- a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionDeserializer.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.fluent.func.serialization.jackson; - -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.TreeNode; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; -import io.serverlessworkflow.api.reflection.func.ReflectionUtils; -import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallJava.CallJavaFunction; -import java.io.IOException; -import java.lang.invoke.SerializedLambda; -import java.lang.reflect.Method; -import java.util.Optional; -import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CallJavaFunctionDeserializer extends JsonDeserializer { - - private static Logger logger = LoggerFactory.getLogger(CallJavaFunctionDeserializer.class); - - @Override - public CallJavaFunction deserialize(JsonParser p, DeserializationContext ctxt) - throws IOException { - - TreeNode content = p.readValueAsTree(); - TreeNode callNode = content.get("call"); - if (!(callNode instanceof TextNode textNode) || !textNode.asText().equals("Java")) { - throw new JsonParseException( - "Expecting a call property which value shoud be Java but instead it was " + callNode); - } - TreeNode functionNode = content.get("function"); - if (functionNode == null) { - throw new JsonParseException("A CallJava function should have a function property"); - } - if (functionNode instanceof ObjectNode objectNode) { - SerializedLambda serializedLambda = ctxt.readTreeAsValue(objectNode, SerializedLambda.class); - try { - Method readResolve = SerializedLambda.class.getDeclaredMethod("readResolve"); - readResolve.setAccessible(true); - - return new CallJavaFunction( - (Function) readResolve.invoke(serializedLambda), - Optional.of(ReflectionUtils.inputType(serializedLambda)), - Optional.of(ReflectionUtils.outputType(serializedLambda))); - } catch (ReflectiveOperationException e) { - throw new IOException( - "Reflection exception while converting SerializedLamda " - + serializedLambda - + "into a funcion"); - } - - } else { - logger.error( - "function property: {} , does not contain enough information to be properly unmarshall, using an function that does nothing", - functionNode); - return new CallJavaFunction<>(v -> v, Optional.empty(), Optional.empty()); - } - } -} diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallTaskFunctionSerializer.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallTaskFunctionSerializer.java deleted file mode 100644 index 88904b46d..000000000 --- a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallTaskFunctionSerializer.java +++ /dev/null @@ -1,28 +0,0 @@ -package io.serverlessworkflow.fluent.func.serialization.jackson; - -import java.io.IOException; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.SerializerProvider; - -import io.serverlessworkflow.api.types.CallTask; -import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallTaskJava; -import io.serverlessworkflow.api.types.jackson.CallTaskSerializer; - -public class CallTaskFunctionSerializer extends CallTaskSerializer { - - @Override - public void serialize(CallTask value, JsonGenerator gen, SerializerProvider serializers) throws IOException { - - if (value instanceof CallTaskJava javaCall) { - - javaCall.getCallJava(); - - } else { - super.serialize(value, gen, serializers); - } - } - -} diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncJacksonModule.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncJacksonModule.java index 63ea3ea9a..07f9ebcb4 100644 --- a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncJacksonModule.java +++ b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncJacksonModule.java @@ -15,19 +15,74 @@ */ package io.serverlessworkflow.fluent.func.serialization.jackson; +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.SerializationConfig; +import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier; import com.fasterxml.jackson.databind.module.SimpleModule; - -import io.serverlessworkflow.api.types.CallTask; -import io.serverlessworkflow.api.types.func.CallJava; +import com.fasterxml.jackson.databind.ser.BeanPropertyWriter; +import com.fasterxml.jackson.databind.ser.BeanSerializerModifier; +import io.serverlessworkflow.api.reflection.func.SerializableConsumer; +import io.serverlessworkflow.api.reflection.func.SerializableFunction; +import io.serverlessworkflow.api.reflection.func.SerializablePredicate; +import io.serverlessworkflow.api.types.CallFunction; +import io.serverlessworkflow.api.types.func.ContextFunction; +import io.serverlessworkflow.api.types.func.FilterFunction; +import io.serverlessworkflow.api.types.func.LoopFunction; +import io.serverlessworkflow.api.types.func.LoopFunctionIndex; +import io.serverlessworkflow.api.types.func.LoopPredicate; +import io.serverlessworkflow.api.types.func.LoopPredicateIndex; +import io.serverlessworkflow.api.types.func.LoopPredicateIndexContext; +import io.serverlessworkflow.api.types.func.LoopPredicateIndexFilter; +import java.lang.invoke.SerializedLambda; +import java.util.List; public class FuncJacksonModule extends SimpleModule { private static final long serialVersionUID = 1L; public void setupModule(com.fasterxml.jackson.databind.Module.SetupContext context) { - super.addSerializer(CallTask.class, new CallTaskFunctionSerializer()); -// super.addDeserializer(CallJava.CallJavaFunction.class, new CallJavaFunctionDeserializer()); - // super.addSerializer(CallJava.CallJavaFunction.class, new CallJavaFunctionSerializer()); + SerializableFunctionSerializer serializer = new SerializableFunctionSerializer(); + super.addSerializer(SerializableFunction.class, serializer); + super.addSerializer(SerializablePredicate.class, serializer); + super.addSerializer(SerializableConsumer.class, serializer); + super.addSerializer(ContextFunction.class, serializer); + super.addSerializer(FilterFunction.class, serializer); + super.addSerializer(LoopFunction.class, serializer); + super.addSerializer(LoopFunctionIndex.class, serializer); + super.addSerializer(LoopPredicate.class, serializer); + super.addSerializer(LoopPredicateIndex.class, serializer); + super.addSerializer(LoopPredicateIndexContext.class, serializer); + super.addSerializer(LoopPredicateIndexFilter.class, serializer); + super.addDeserializer(SerializedLambda.class, new SerializedLambdaDeserializer()); + super.setDeserializerModifier( + new BeanDeserializerModifier() { + @Override + public JsonDeserializer modifyDeserializer( + DeserializationConfig config, + BeanDescription beanDesc, + JsonDeserializer deserializer) { + if (beanDesc.getBeanClass().equals(CallFunction.class)) { + return new CallJavaDeserializer<>(deserializer); + } + return deserializer; + } + }); + super.setSerializerModifier( + new BeanSerializerModifier() { + @Override + public List changeProperties( + SerializationConfig config, + BeanDescription beanDesc, + List beanProperties) { + if (beanDesc.getBeanClass().equals(SerializedLambda.class)) { + beanProperties.add(new SerializedLambdaWriter(beanProperties.get(0))); + } + + return beanProperties; + } + }); super.setupModule(context); } } diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncUnionCustomizer.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncUnionCustomizer.java deleted file mode 100644 index 2aacf5c18..000000000 --- a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/FuncUnionCustomizer.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.fluent.func.serialization.jackson; - -import io.serverlessworkflow.api.types.CallTask; -import io.serverlessworkflow.api.types.func.CallJava.CallJavaFunction; -import io.serverlessworkflow.serialization.UnionCustomizer; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -public class FuncUnionCustomizer implements UnionCustomizer { - - @Override - public Map, Collection>> additionalClasses() { - return Map.of(CallTask.class, List.of(CallJavaFunction.class)); - } -} diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionSerializer.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializableFunctionSerializer.java similarity index 60% rename from experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionSerializer.java rename to experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializableFunctionSerializer.java index f6deb0f93..8e90f5427 100644 --- a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/CallJavaFunctionSerializer.java +++ b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializableFunctionSerializer.java @@ -19,28 +19,20 @@ import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.SerializerProvider; import io.serverlessworkflow.api.reflection.func.ReflectionUtils; -import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallJava.CallJavaFunction; import java.io.IOException; import java.lang.invoke.SerializedLambda; import java.util.Optional; -public class CallJavaFunctionSerializer extends JsonSerializer { +public class SerializableFunctionSerializer extends JsonSerializer { @Override - public void serialize(CallJavaFunction value, JsonGenerator gen, SerializerProvider serializers) + public void serialize(Object value, JsonGenerator gen, SerializerProvider serializers) throws IOException { - gen.writeStartObject(); - gen.writeStringField("call", "Java"); - gen.writeObjectFieldStart("with"); - Optional serializedLambda = - ReflectionUtils.getSerializedLambda(value.function()); + Optional serializedLambda = ReflectionUtils.serializedFromFuntion(value); if (serializedLambda.isPresent()) { - gen.writeObjectField("function", serializedLambda.orElse(null)); + gen.writeObject(serializedLambda.orElseThrow()); } else { - gen.writeStringField("function", value.function().toString()); + gen.writeString(value.toString()); } - gen.writeEndObject(); - gen.writeEndObject(); } } diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializedLambdaDeserializer.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializedLambdaDeserializer.java new file mode 100644 index 000000000..6e796dca7 --- /dev/null +++ b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializedLambdaDeserializer.java @@ -0,0 +1,95 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.serialization.jackson; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.serverlessworkflow.api.reflection.func.ReflectionUtils; +import java.io.IOException; +import java.lang.invoke.SerializedLambda; + +public class SerializedLambdaDeserializer extends JsonDeserializer { + + static final String CAPTURING_CLASS = "capturingClass"; + static final String FUNCTIONAL_CLASS = "functionalInterfaceClass"; + static final String FUNCTIONAL_METHOD_NAME = "functionalInterfaceMethodName"; + static final String FUNCTIONAL_METHOD_SIGNATURE = "functionalInterfaceMethodSignature"; + static final String METHOD_KIND = "implMethodKind"; + static final String METHOD_CLASS = "implClass"; + static final String METHOD_NAME = "implMethodName"; + static final String METHOD_SIGNATURE = "implMethodSignature"; + static final String METHOD_TYPE = "instantiatedMethodType"; + static final String CAPTURED_ARGS = "capturedArgs"; + static final String TYPE_CAPTURE_ARG = "type"; + static final String DATA_CAPTURE_ARG = "data"; + + @Override + public SerializedLambda deserialize(JsonParser p, DeserializationContext ctxt) + throws IOException { + TreeNode tree = p.readValueAsTree(); + + if (tree instanceof ObjectNode node) { + try { + return new SerializedLambda( + ReflectionUtils.loadCapturingClass(node.get(CAPTURING_CLASS).asText()), + node.get(FUNCTIONAL_CLASS).asText(), + node.get(FUNCTIONAL_METHOD_NAME).asText(), + node.get(FUNCTIONAL_METHOD_SIGNATURE).asText(), + node.get(METHOD_KIND).asInt(), + node.get(METHOD_CLASS).asText(), + node.get(METHOD_NAME).asText(), + node.get(METHOD_SIGNATURE).asText(), + node.get(METHOD_TYPE).asText(), + fromArray(ctxt, (ArrayNode) node.get(CAPTURED_ARGS))); + } catch (ReflectiveOperationException ex) { + throw new IOException("Error unmarshalling SerializedLambda " + node, ex); + } + } else { + throw new IOException( + "Node " + + tree + + " is not an object and therefore cannot be converted into SerializedLambda"); + } + } + + private Object[] fromArray(DeserializationContext ctxt, ArrayNode node) + throws IOException, ReflectiveOperationException { + if (node == null) { + return new Object[0]; + } else { + Object[] result = new Object[node.size()]; + for (int i = 0; i < result.length; i++) { + ObjectNode objectNode = (ObjectNode) node.get(i); + JsonNode type = objectNode.get(TYPE_CAPTURE_ARG); + if (type != null && type.isTextual()) { + Object value = + ctxt.readTreeAsValue( + objectNode.get(DATA_CAPTURE_ARG), ReflectionUtils.loadClass(type.asText())); + result[i] = + value instanceof SerializedLambda sl + ? ReflectionUtils.functionFromSerialized(sl) + : value; + } + } + return result; + } + } +} diff --git a/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializedLambdaWriter.java b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializedLambdaWriter.java new file mode 100644 index 000000000..2a07fbd81 --- /dev/null +++ b/experimental/fluent/jackson/src/main/java/io/serverlessworkflow/fluent/func/serialization/jackson/SerializedLambdaWriter.java @@ -0,0 +1,56 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.serialization.jackson; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.ser.BeanPropertyWriter; +import io.serverlessworkflow.api.reflection.func.ReflectionUtils; +import java.io.IOException; +import java.lang.invoke.SerializedLambda; + +public class SerializedLambdaWriter extends BeanPropertyWriter { + + private static final long serialVersionUID = 1L; + + public SerializedLambdaWriter(BeanPropertyWriter base) { + super(base); + } + + @Override + public void serializeAsField(Object bean, JsonGenerator gen, SerializerProvider prov) + throws IOException { + SerializedLambda sl = (SerializedLambda) bean; + int size = sl.getCapturedArgCount(); + if (size > 0) { + gen.writeArrayFieldStart(SerializedLambdaDeserializer.CAPTURED_ARGS); + for (int i = 0; i < size; i++) { + Object obj = sl.getCapturedArg(i); + gen.writeStartObject(); + if (obj != null) { + gen.writeStringField( + SerializedLambdaDeserializer.TYPE_CAPTURE_ARG, + ReflectionUtils.serializedFromFuntion(obj) + .map(v -> SerializedLambda.class.getName()) + .orElse(obj.getClass().getName())); + gen.writeObjectField(SerializedLambdaDeserializer.DATA_CAPTURE_ARG, obj); + } + gen.writeEndObject(); + } + gen.writeEndArray(); + } + } +} diff --git a/experimental/fluent/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.serialization.UnionCustomizer b/experimental/fluent/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.serialization.UnionCustomizer deleted file mode 100644 index 1f872175c..000000000 --- a/experimental/fluent/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.serialization.UnionCustomizer +++ /dev/null @@ -1 +0,0 @@ -io.serverlessworkflow.fluent.func.serialization.jackson.FuncUnionCustomizer \ No newline at end of file diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java index 8b1c23b3c..59e9caf2f 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java @@ -41,7 +41,7 @@ protected JavaForExecutorBuilder( protected Optional buildWhileFilter() { if (task instanceof ForTaskFunction taskFunctions) { final LoopPredicateIndexFilter whilePred = taskFunctions.getWhilePredicate(); - Optional> whileClass = taskFunctions.getWhileClass(); + Optional> whileClass = taskFunctions.whileClass(); String varName = task.getFor().getEach(); String indexName = task.getFor().getAt(); if (whilePred != null) { @@ -70,13 +70,13 @@ protected WorkflowValueResolver> buildCollectionFilter() { private Object collectionFilterObject(ForTaskFunction taskFunctions) { return taskFunctions - .getForClass() + .forClass() .map(forClass -> typedCollectionFunction(taskFunctions, forClass)) - .orElse(taskFunctions.getCollection()); + .orElse(taskFunctions.collection()); } @SuppressWarnings({"rawtypes", "unchecked"}) private Object typedCollectionFunction(ForTaskFunction taskFunctions, Class forClass) { - return new TypedFunction(taskFunctions.getCollection(), forClass); + return new TypedFunction(taskFunctions.collection(), forClass); } } diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallJavaContextFunctionTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallJavaContextFunctionTest.java index 3a90da348..a7d46b94a 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallJavaContextFunctionTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallJavaContextFunctionTest.java @@ -17,12 +17,12 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.Document; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallTaskJava; import io.serverlessworkflow.api.types.func.ContextFunction; import io.serverlessworkflow.impl.WorkflowApplication; import java.util.List; @@ -57,7 +57,8 @@ void testJavaContextFunction_simple() throws InterruptedException, ExecutionExce "javaContextCall", new Task() .withCallTask( - new CallTaskJava(CallJava.function(ctxFn, Person.class)))))); + new CallTask() + .withCallFunction(CallJava.function(ctxFn, Person.class)))))); var out = app.workflowDefinition(workflow) diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallTest.java index 5427b5c28..9fb8ece52 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallTest.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.Document; import io.serverlessworkflow.api.types.FlowDirective; import io.serverlessworkflow.api.types.FlowDirectiveEnum; @@ -28,7 +29,6 @@ import io.serverlessworkflow.api.types.TaskMetadata; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallTaskJava; import io.serverlessworkflow.api.types.func.ForTaskFunction; import io.serverlessworkflow.api.types.func.SwitchCasePredicate; import io.serverlessworkflow.api.types.func.TaskMetadataKeys; @@ -72,7 +72,8 @@ private void internalJavaFunctionTest(Function function, Class c "javaCall", new Task() .withCallTask( - new CallTaskJava(CallJava.function(function, clazz)))))); + new CallTask() + .withCallFunction(CallJava.function(function, clazz)))))); assertThat( app.workflowDefinition(workflow) @@ -109,10 +110,11 @@ void testForLoop() throws InterruptedException, ExecutionException { "javaCall", new Task() .withCallTask( - new CallTaskJava( - CallJava.loopFunction( - CallTest::sum, - forConfig.getEach())))))))))); + new CallTask() + .withCallFunction( + CallJava.loopFunction( + CallTest::sum, + forConfig.getEach())))))))))); assertThat( app.workflowDefinition(workflow) @@ -152,7 +154,9 @@ void testSwitch() throws InterruptedException, ExecutionException { new TaskItem( "java", new Task() - .withCallTask(new CallTaskJava(CallJava.function(CallTest::zero)))))); + .withCallTask( + new CallTask() + .withCallFunction(CallJava.function(CallTest::zero)))))); WorkflowDefinition definition = app.workflowDefinition(workflow); assertThat(definition.instance(3).start().get().asNumber().orElseThrow()).isEqualTo(3); @@ -173,9 +177,11 @@ void testIf() throws InterruptedException, ExecutionException { "java", new Task() .withCallTask( - new CallTaskJava( - withPredicate( - CallJava.function(CallTest::zero), CallTest::isOdd)))))); + new CallTask() + .withCallFunction( + withPredicate( + CallJava.function(CallTest::zero), + CallTest::isOdd)))))); WorkflowDefinition definition = app.workflowDefinition(workflow); assertThat(definition.instance(3).start().get().asNumber().orElseThrow()).isEqualTo(0); assertThat(definition.instance(4).start().get().asNumber().orElseThrow()).isEqualTo(4); @@ -195,11 +201,12 @@ void testIfWithModel() throws InterruptedException, ExecutionException { "java", new Task() .withCallTask( - new CallTaskJava( - withPredicate( - CallJava.function( - CallTest::zeroWithModel, WorkflowModel.class), - CallTest::isOdd)))))); + new CallTask() + .withCallFunction( + withPredicate( + CallJava.function( + CallTest::zeroWithModel, WorkflowModel.class), + CallTest::isOdd)))))); WorkflowDefinition definition = app.workflowDefinition(workflow); assertThat(definition.instance(3).start().get().asNumber().orElseThrow()).isEqualTo(0); assertThat(definition.instance(4).start().get().asNumber().orElseThrow()).isEqualTo(4); diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ForTaskFunctionRegressionTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ForTaskFunctionRegressionTest.java index e000f4987..2bf377312 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ForTaskFunctionRegressionTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ForTaskFunctionRegressionTest.java @@ -17,20 +17,15 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.Document; import io.serverlessworkflow.api.types.ForTaskConfiguration; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallTaskJava; import io.serverlessworkflow.api.types.func.ForTaskFunction; import io.serverlessworkflow.impl.WorkflowApplication; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Field; import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutionException; @@ -38,30 +33,6 @@ class ForTaskFunctionRegressionTest { - @Test - void initializesOptionalFieldsAsEmpty() { - ForTaskFunction taskFunction = new ForTaskFunction(); - - assertThat(taskFunction.getWhileClass()).isNotNull().isEmpty(); - assertThat(taskFunction.getItemClass()).isNotNull().isEmpty(); - assertThat(taskFunction.getForClass()).isNotNull().isEmpty(); - } - - @Test - void optionalFieldsSurviveJavaSerializationRoundTrip() throws Exception { - ForTaskFunction taskFunction = new ForTaskFunction(); - clearField(taskFunction, "whileClass"); - clearField(taskFunction, "itemClass"); - clearField(taskFunction, "forClass"); - clearField(taskFunction, "collection"); - - ForTaskFunction copy = roundTrip(taskFunction); - - assertThat(copy.getWhileClass()).isNotNull().isEmpty(); - assertThat(copy.getItemClass()).isNotNull().isEmpty(); - assertThat(copy.getForClass()).isNotNull().isEmpty(); - } - @Test void forLoopWithExplicitCollectionClassExecutesSuccessfully() throws InterruptedException, ExecutionException { @@ -90,33 +61,15 @@ void forLoopWithExplicitCollectionClassExecutesSuccessfully() "javaCall", new Task() .withCallTask( - new CallTaskJava( - CallJava.loopFunction( - CallTest::sum, - forConfig.getEach())))))))))); + new CallTask() + .withCallFunction( + CallJava.loopFunction( + CallTest::sum, + forConfig.getEach())))))))))); var result = app.workflowDefinition(workflow).instance(List.of(2, 4, 6)).start().get(); assertThat(result.asNumber().orElseThrow()).isEqualTo(12); } } - - private static ForTaskFunction roundTrip(ForTaskFunction taskFunction) throws Exception { - ByteArrayOutputStream output = new ByteArrayOutputStream(); - try (ObjectOutputStream oos = new ObjectOutputStream(output)) { - oos.writeObject(taskFunction); - } - - try (ObjectInputStream ois = - new ObjectInputStream(new ByteArrayInputStream(output.toByteArray()))) { - return (ForTaskFunction) ois.readObject(); - } - } - - private static void clearField(Object target, String fieldName) - throws ReflectiveOperationException { - Field field = target.getClass().getDeclaredField(fieldName); - field.setAccessible(true); - field.set(target, null); - } } diff --git a/experimental/test/pom.xml b/experimental/test/pom.xml index 85f3d96c1..c730ab334 100644 --- a/experimental/test/pom.xml +++ b/experimental/test/pom.xml @@ -97,5 +97,25 @@ ${version.org.glassfish.jersey} test + + io.serverlessworkflow + serverlessworkflow-api + test + + + ch.qos.logback + logback-classic + test + + + org.junit.jupiter + junit-jupiter-params + test + + + io.serverlessworkflow + serverlessworkflow-experimental-fluent-serialization-jackson + test + \ No newline at end of file diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java index 01737f035..cea746e57 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.fluent.test; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*; +import static io.serverlessworkflow.fluent.test.TestSerializationUtils.writeAndReadInMemory; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -26,6 +27,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; +import java.io.IOException; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -44,12 +46,13 @@ private record OrdersPayload(List orders) {} private record OrderName(String id, String name) {} @Test - void testForEachIteration() { + void testForEachIteration() throws IOException { Workflow workflow = - FuncWorkflowBuilder.workflow("foreach-workflow") - .tasks(forEachItem(OrdersPayload::orders, ForEachFuncTest::enhace)) - .build(); + writeAndReadInMemory( + FuncWorkflowBuilder.workflow("foreach-workflow") + .tasks(forEachItem(OrdersPayload::orders, ForEachFuncTest::enhace)) + .build()); try (WorkflowApplication app = WorkflowApplication.builder().build()) { OrdersPayload input = diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncDSLSerializationTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncDSLSerializationTest.java new file mode 100644 index 000000000..c8d753f49 --- /dev/null +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/FuncDSLSerializationTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.test; + +import static io.serverlessworkflow.api.WorkflowWriter.workflowAsBytes; +import static io.serverlessworkflow.api.WorkflowWriter.workflowAsString; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.withContext; +import static io.serverlessworkflow.fluent.test.TestSerializationUtils.writeAndReadInMemory; +import static org.assertj.core.api.Assertions.assertThat; + +import io.serverlessworkflow.api.WorkflowFormat; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import java.io.IOException; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class FuncDSLSerializationTest { + + @ParameterizedTest + @MethodSource("workflows") + public void testSpecFeaturesParsing(Workflow workflow, Consumer assertion) + throws IOException { + Workflow otherWorkflow = writeAndReadInMemory(workflow); + assertWorkflowEquals(workflow, otherWorkflow); + try (WorkflowApplication application = WorkflowApplication.builder().build()) { + assertion.accept(application.workflowDefinition(otherWorkflow)); + } + } + + static Stream workflows() { + final int QUANTITY = 3; + return Stream.of( + Arguments.of( + FuncWorkflowBuilder.workflow("hello") + .tasks(t -> t.set("sayHelloWorld", b -> b.expr(Map.of("result", "hello world!")))) + .build(), + new CheckResult(Map.of(), Map.of("result", "hello world!"))), + Arguments.of( + FuncWorkflowBuilder.workflow("inc") + .tasks(function(FuncDSLSerializationTest::inc)) + .build(), + new CheckResult(1, 2)), + Arguments.of( + FuncWorkflowBuilder.workflow("incContext") + .tasks(withContext(FuncDSLSerializationTest::incContext)) + .build(), + new CheckResult(1, 3)), + Arguments.of( + FuncWorkflowBuilder.workflow("incLambda") + .tasks(function((Integer number) -> number + QUANTITY)) + .build(), + new CheckResult(1, 4))); + } + + private static class CheckResult implements Consumer { + + private final Object input; + private final Object output; + + public CheckResult(Object input, Object output) { + this.input = input; + this.output = output; + } + + @Override + public void accept(WorkflowDefinition t) { + assertThat(t.instance(this.input).start().join().asJavaObject()).isEqualTo(this.output); + } + } + + private static void assertWorkflowEquals(Workflow workflow, Workflow other) throws IOException { + assertThat(workflowAsString(workflow, WorkflowFormat.YAML)) + .isEqualTo(workflowAsString(other, WorkflowFormat.YAML)); + assertThat(workflowAsBytes(workflow, WorkflowFormat.JSON)) + .isEqualTo(workflowAsBytes(other, WorkflowFormat.JSON)); + } + + private static Integer inc(Integer quantity) { + return quantity + 1; + } + + private static Integer incContext(Integer quantity, WorkflowContextData workflowContext) { + return quantity + 2; + } +} diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/TestSerializationUtils.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/TestSerializationUtils.java new file mode 100644 index 000000000..b0dc16b0b --- /dev/null +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/TestSerializationUtils.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflow; +import static io.serverlessworkflow.api.WorkflowWriter.writeWorkflow; + +import io.serverlessworkflow.api.WorkflowFormat; +import io.serverlessworkflow.api.types.Workflow; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class TestSerializationUtils { + + private static final Logger logger = LoggerFactory.getLogger(TestSerializationUtils.class); + + private TestSerializationUtils() {} + + static Workflow writeAndReadInMemory(Workflow workflow) throws IOException { + byte[] bytes; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + writeWorkflow(out, workflow, WorkflowFormat.YAML); + bytes = out.toByteArray(); + } + if (logger.isDebugEnabled()) { + logger.debug("Workflow string representation is {}", new String(bytes)); + } + try (ByteArrayInputStream in = new ByteArrayInputStream(bytes)) { + return readWorkflow(in, WorkflowFormat.YAML); + } + } +} diff --git a/experimental/test/src/test/resources/logback.xml b/experimental/test/src/test/resources/logback.xml new file mode 100644 index 000000000..cafd295ff --- /dev/null +++ b/experimental/test/src/test/resources/logback.xml @@ -0,0 +1,15 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + \ No newline at end of file diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java index 2366b3716..dddf35ff7 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/reflection/func/ReflectionUtils.java @@ -87,7 +87,7 @@ public static Class inferInputTypeFromAny(Object fn, int lambdaParamIndex) { return inferInputType(inferMethodType(fn), lambdaParamIndex); } - public static Optional getSerializedLambda(Object fn) { + public static Optional serializedFromFuntion(Object fn) { try { return Optional.of(serializedLambda(fn)); } catch (ReflectiveOperationException ex) { @@ -96,6 +96,23 @@ public static Optional getSerializedLambda(Object fn) { } } + public static Class loadCapturingClass(String capturingClass) throws ClassNotFoundException { + return loadClass(capturingClass.replace('/', '.')); + } + + public static Class loadClass(String className) throws ClassNotFoundException { + return Thread.currentThread().getContextClassLoader().loadClass(className); + } + + public static Object functionFromSerialized(SerializedLambda sl) + throws ReflectiveOperationException { + Method deserializeMethod = + loadCapturingClass(sl.getCapturingClass()) + .getDeclaredMethod("$deserializeLambda$", SerializedLambda.class); + deserializeMethod.setAccessible(true); + return deserializeMethod.invoke(null, sl); + } + private static SerializedLambda serializedLambda(Object fn) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { Method m = fn.getClass().getDeclaredMethod("writeReplace"); @@ -103,14 +120,6 @@ private static SerializedLambda serializedLambda(Object fn) return (SerializedLambda) m.invoke(fn); } - public static Class outputType(SerializedLambda lambda) { - return inferOutputType(inferMethodType(lambda)); - } - - public static Class inputType(SerializedLambda lambda) { - return inferInputType(inferMethodType(lambda), 0); - } - private static Class inferInputType(MethodType type, int index) { return type.parameterType(index); } @@ -119,7 +128,7 @@ private static Class inferOutputType(MethodType type) { return type.returnType(); } - private static MethodType inferMethodType(SerializedLambda sl) { + public static MethodType inferMethodType(SerializedLambda sl) { // getInstantiatedMethodType() provides the exact generic signature resolved // by the compiler, completely bypassing captured variables and method kind switches! return MethodType.fromMethodDescriptorString( diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java index e441ecfb8..bc9e419c6 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallJava.java @@ -15,14 +15,23 @@ */ package io.serverlessworkflow.api.types.func; -import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.reflection.func.ReflectionUtils; +import io.serverlessworkflow.api.types.CallFunction; +import io.serverlessworkflow.api.types.FunctionArguments; +import java.lang.invoke.MethodType; +import java.lang.invoke.SerializedLambda; +import java.util.Map; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; -public abstract class CallJava extends TaskBase { +public abstract class CallJava extends CallFunction { private static final long serialVersionUID = 1L; + public static final String JAVA_CALL_KEY = "Java"; + public static final String FUNCTION_NAME_KEY = "function"; + private static final String VAR_NAME_KEY = "varName"; + private static final String INDEX_NAME_KEY = "index"; private final Optional> inputClass; @@ -31,6 +40,7 @@ protected CallJava() { } protected CallJava(Optional> inputClass) { + super.setCall(JAVA_CALL_KEY); this.inputClass = inputClass; } @@ -92,6 +102,35 @@ public static CallJava function( function, Optional.ofNullable(inputClass), Optional.ofNullable(outputClass)); } + @FunctionalInterface + public interface SerializedLambdaUnmarshaller { + SerializedLambda apply(Object serializedFunction) throws ReflectiveOperationException; + } + + public static CallJava fromFunctionProperties( + Map props, SerializedLambda sl) throws ReflectiveOperationException { + Object obj = ReflectionUtils.functionFromSerialized(sl); + MethodType methodType = ReflectionUtils.inferMethodType(sl); + Optional> input = Optional.of(methodType.parameterType(0)); + Optional> output = Optional.of(methodType.returnType()); + if (obj instanceof ContextFunction fn) { + return new CallJavaContextFunction(fn, input, output); + } else if (obj instanceof FilterFunction fn) { + return new CallJavaFilterFunction(fn, input, output); + } else if (obj instanceof LoopFunction loop) { + return new CallJavaLoopFunction(loop, (String) props.get(VAR_NAME_KEY)); + } else if (obj instanceof LoopFunctionIndex loop) { + return new CallJavaLoopFunctionIndex( + loop, (String) props.get(VAR_NAME_KEY), (String) props.get(INDEX_NAME_KEY)); + } else if (obj instanceof Function fn) { + return new CallJavaFunction(fn, input, output); + } else if (obj instanceof Consumer consumer) { + return new CallJavaConsumer(consumer, input); + } else { + throw new UnsupportedOperationException("Unrecognized function " + obj); + } + } + public static class CallJavaConsumer extends CallJava { private static final long serialVersionUID = 1L; private final Consumer consumer; @@ -115,6 +154,7 @@ public CallJavaFunction( Function function, Optional> inputClass, Optional> outputClass) { super(inputClass, outputClass); this.function = function; + this.withWith(new FunctionArguments().withAdditionalProperty(FUNCTION_NAME_KEY, function)); } public Function function() { @@ -132,6 +172,7 @@ public CallJavaContextFunction( Optional> outputClass) { super(inputClass, outputClass); this.function = function; + this.withWith(new FunctionArguments().withAdditionalProperty(FUNCTION_NAME_KEY, function)); } public ContextFunction function() { @@ -149,6 +190,7 @@ public CallJavaFilterFunction( Optional> outputClass) { super(inputClass, outputClass); this.function = function; + this.withWith(new FunctionArguments().withAdditionalProperty(FUNCTION_NAME_KEY, function)); } public FilterFunction function() { @@ -163,9 +205,12 @@ public static class CallJavaLoopFunction extends CallAbstractJavaFuncti private String varName; public CallJavaLoopFunction(LoopFunction function, String varName) { - this.function = function; this.varName = varName; + this.withWith( + new FunctionArguments() + .withAdditionalProperty(FUNCTION_NAME_KEY, function) + .withAdditionalProperty(VAR_NAME_KEY, varName)); } public LoopFunction function() { @@ -189,6 +234,11 @@ public CallJavaLoopFunctionIndex( this.function = function; this.varName = varName; this.indexName = indexName; + this.withWith( + new FunctionArguments() + .withAdditionalProperty(FUNCTION_NAME_KEY, function) + .withAdditionalProperty(VAR_NAME_KEY, varName) + .withAdditionalProperty(INDEX_NAME_KEY, indexName)); } public LoopFunctionIndex function() { diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallTaskJava.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallTaskJava.java deleted file mode 100644 index 3f53f90f1..000000000 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/CallTaskJava.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.api.types.func; - -import io.serverlessworkflow.annotations.OneOfSetter; -import io.serverlessworkflow.api.types.CallTask; - -public class CallTaskJava extends CallTask { - - private CallJava callJava; - - public CallTaskJava() {} - - public CallTaskJava(CallJava callJava) { - this.callJava = callJava; - } - - public CallJava getCallJava() { - return callJava; - } - - @OneOfSetter(CallJava.class) - public void setCallJava(CallJava callJava) { - this.callJava = callJava; - } - - @Override - public Object get() { - return callJava != null ? callJava : super.get(); - } -} diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/FilterPredicate.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/FilterPredicate.java index d81b3ac48..cdbf6f57d 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/FilterPredicate.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/FilterPredicate.java @@ -17,8 +17,9 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; +import java.io.Serializable; @FunctionalInterface -public interface FilterPredicate { +public interface FilterPredicate extends Serializable { boolean test(T value, WorkflowContextData workflow, TaskContextData task); } diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/ForTaskFunction.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/ForTaskFunction.java index d6932f2e3..56c80f2c9 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/ForTaskFunction.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/ForTaskFunction.java @@ -16,8 +16,6 @@ package io.serverlessworkflow.api.types.func; import io.serverlessworkflow.api.types.ForTask; -import java.io.IOException; -import java.io.ObjectInputStream; import java.util.Collection; import java.util.Optional; import java.util.function.Function; @@ -134,37 +132,19 @@ public ForTaskFunction withCollection( return whilePredicate; } - public Optional> getWhileClass() { + public Optional> whileClass() { return whileClass; } - public Optional> getForClass() { + public Optional> forClass() { return forClass; } - public Optional> getItemClass() { + public Optional> itemClass() { return itemClass; } - public Function> getCollection() { + public Function> collection() { return collection; } - - private void normalizeOptionalFields() { - if (whileClass == null) { - whileClass = Optional.empty(); - } - if (itemClass == null) { - itemClass = Optional.empty(); - } - if (forClass == null) { - forClass = Optional.empty(); - } - } - - private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { - input.defaultReadObject(); - // Preserve compatibility with older serialized instances that may have null optionals. - normalizeOptionalFields(); - } } diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopFunction.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopFunction.java index 094cff1c1..93950614e 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopFunction.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopFunction.java @@ -15,7 +15,8 @@ */ package io.serverlessworkflow.api.types.func; +import java.io.Serializable; import java.util.function.BiFunction; @FunctionalInterface -public interface LoopFunction extends BiFunction {} +public interface LoopFunction extends Serializable, BiFunction {} diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopFunctionIndex.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopFunctionIndex.java index b5831d098..2405e36be 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopFunctionIndex.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopFunctionIndex.java @@ -15,7 +15,9 @@ */ package io.serverlessworkflow.api.types.func; +import java.io.Serializable; + @FunctionalInterface -public interface LoopFunctionIndex { +public interface LoopFunctionIndex extends Serializable { R apply(T model, V item, Integer index); } diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicate.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicate.java index 38b6b3041..8695740eb 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicate.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicate.java @@ -15,7 +15,8 @@ */ package io.serverlessworkflow.api.types.func; +import java.io.Serializable; import java.util.function.BiPredicate; @FunctionalInterface -public interface LoopPredicate extends BiPredicate {} +public interface LoopPredicate extends Serializable, BiPredicate {} diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndex.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndex.java index 0e0a39a0d..09d724f50 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndex.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndex.java @@ -15,7 +15,9 @@ */ package io.serverlessworkflow.api.types.func; +import java.io.Serializable; + @FunctionalInterface -public interface LoopPredicateIndex { +public interface LoopPredicateIndex extends Serializable { boolean test(T model, V item, Integer index); } diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndexContext.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndexContext.java index 470bfbca5..26ddabb6a 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndexContext.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndexContext.java @@ -16,8 +16,9 @@ package io.serverlessworkflow.api.types.func; import io.serverlessworkflow.impl.WorkflowContextData; +import java.io.Serializable; @FunctionalInterface -public interface LoopPredicateIndexContext { +public interface LoopPredicateIndexContext extends Serializable { boolean test(T model, V item, Integer index, WorkflowContextData context); } diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndexFilter.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndexFilter.java index 808719c50..8cd22d673 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndexFilter.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/LoopPredicateIndexFilter.java @@ -17,8 +17,9 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; +import java.io.Serializable; @FunctionalInterface -public interface LoopPredicateIndexFilter { +public interface LoopPredicateIndexFilter extends Serializable { boolean test(T model, V item, Integer index, WorkflowContextData workflow, TaskContextData task); } diff --git a/serialization/src/main/java/io/serverlessworkflow/serialization/DeserializeHelper.java b/serialization/src/main/java/io/serverlessworkflow/serialization/DeserializeHelper.java index aa826f5f6..041474d91 100644 --- a/serialization/src/main/java/io/serverlessworkflow/serialization/DeserializeHelper.java +++ b/serialization/src/main/java/io/serverlessworkflow/serialization/DeserializeHelper.java @@ -26,44 +26,30 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.ServiceLoader; -import java.util.ServiceLoader.Provider; -import java.util.stream.Collectors; public class DeserializeHelper { - private static Map, Collection>> additionalClasses = - ServiceLoader.load(UnionCustomizer.class).stream() - .map(Provider::get) - .map(UnionCustomizer::additionalClasses) - .flatMap(map -> map.entrySet().stream()) - .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - public static T deserializeOneOf( JsonParser p, Class targetClass, Collection> oneOfTypes) throws IOException { TreeNode node = p.readValueAsTree(); try { T result = targetClass.getDeclaredConstructor().newInstance(); - Collection> possibleTypes; - Collection> additional = additionalClasses.get(targetClass); - if (additional != null && !additional.isEmpty()) { - possibleTypes = new HashSet>(oneOfTypes); - possibleTypes.addAll(additional); - } else { - possibleTypes = oneOfTypes; + Collection exceptions = new ArrayList<>(); + for (Class oneOfType : oneOfTypes) { + try { + assingIt(p, result, node, targetClass, oneOfType); + break; + } catch (IOException | ConstraintViolationException | InvocationTargetException ex) { + exceptions.add(ex); + } } - Collection exceptions = - deserializeOneOf(result, node, p, targetClass, possibleTypes); - if (exceptions.size() == possibleTypes.size()) { + if (exceptions.size() == oneOfTypes.size()) { JsonMappingException ex = new JsonMappingException( p, String.format( "Error deserializing class %s, all oneOf alternatives %s has failed ", - targetClass, possibleTypes)); + targetClass, oneOfTypes)); exceptions.forEach(ex::addSuppressed); throw ex; } @@ -73,23 +59,6 @@ public static T deserializeOneOf( } } - private static Collection deserializeOneOf( - T result, TreeNode node, JsonParser p, Class targetClass, Collection> oneOfTypes) - throws ReflectiveOperationException { - Collection exceptions = new ArrayList<>(); - for (Class oneOfType : oneOfTypes) { - try { - assingIt(p, result, node, targetClass, oneOfType); - break; - } catch (IOException | ConstraintViolationException ex) { - exceptions.add(ex); - } catch (InvocationTargetException ex) { - exceptions.add(ex.getCause()); - } - } - return exceptions; - } - private static void assingIt( JsonParser p, T result, TreeNode node, Class targetClass, Class type) throws JsonProcessingException, ReflectiveOperationException { diff --git a/serialization/src/main/java/io/serverlessworkflow/serialization/UnionCustomizer.java b/serialization/src/main/java/io/serverlessworkflow/serialization/UnionCustomizer.java deleted file mode 100644 index b9f9155fe..000000000 --- a/serialization/src/main/java/io/serverlessworkflow/serialization/UnionCustomizer.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.serialization; - -import java.util.Collection; -import java.util.Map; - -public interface UnionCustomizer { - Map, Collection>> additionalClasses(); -}