diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallGrpcTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallGrpcTaskBuilder.java new file mode 100644 index 000000000..6d3b377f0 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallGrpcTaskBuilder.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import io.serverlessworkflow.api.types.CallGRPC; +import io.serverlessworkflow.api.types.GRPCArguments; +import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; +import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; +import io.serverlessworkflow.fluent.spec.TaskBaseBuilder; +import io.serverlessworkflow.fluent.spec.spi.CallGrpcTaskFluent; + +public class FuncCallGrpcTaskBuilder extends TaskBaseBuilder + implements CallGrpcTaskFluent, + FuncTaskTransformations, + ConditionalTaskBuilder { + + FuncCallGrpcTaskBuilder() { + final CallGRPC callGRPC = new CallGRPC(); + callGRPC.setWith(new GRPCArguments()); + super.setTask(callGRPC); + } + + @Override + public FuncCallGrpcTaskBuilder self() { + return this; + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java index 176bc5690..8de4ff865 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java @@ -104,6 +104,12 @@ public FuncDoTaskBuilder openapi( return this; } + @Override + public FuncDoTaskBuilder grpc(String name, Consumer itemsConfigurer) { + this.listBuilder().grpc(name, itemsConfigurer); + return this; + } + @Override public FuncDoTaskBuilder workflow(String name, Consumer itemsConfigurer) { this.listBuilder().workflow(name, itemsConfigurer); diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java index 257a1f96b..07b32c5f8 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.fluent.func; +import io.serverlessworkflow.api.types.CallGRPC; import io.serverlessworkflow.api.types.CallHTTP; import io.serverlessworkflow.api.types.CallOpenAPI; import io.serverlessworkflow.api.types.CallTask; @@ -168,6 +169,23 @@ public FuncTaskItemListBuilder openapi( return this.addTaskItem(new TaskItem(name, task)); } + @Override + public FuncTaskItemListBuilder grpc( + String name, Consumer itemsConfigurer) { + name = this.defaultNameAndRequireConfig(name, itemsConfigurer, TYPE_GRPC); + + final FuncCallGrpcTaskBuilder grpcTaskBuilder = new FuncCallGrpcTaskBuilder(); + itemsConfigurer.accept(grpcTaskBuilder); + + final CallGRPC callGRPC = grpcTaskBuilder.build(); + final CallTask callTask = new CallTask(); + callTask.setCallGRPC(callGRPC); + final Task task = new Task(); + task.setCallTask(callTask); + + return this.addTaskItem(new TaskItem(name, task)); + } + @Override public FuncTaskItemListBuilder workflow( String name, Consumer itemsConfigurer) { diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncCallGrpcConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncCallGrpcConfigurer.java new file mode 100644 index 000000000..a3e552f50 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncCallGrpcConfigurer.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.configurers; + +import io.serverlessworkflow.fluent.func.FuncCallGrpcTaskBuilder; +import java.util.function.Consumer; + +@FunctionalInterface +public interface FuncCallGrpcConfigurer extends Consumer {} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallGrpcStep.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallGrpcStep.java new file mode 100644 index 000000000..daa93ad6f --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallGrpcStep.java @@ -0,0 +1,93 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.serverlessworkflow.fluent.func.FuncCallGrpcTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; +import io.serverlessworkflow.fluent.spec.configurers.AuthenticationConfigurer; +import io.serverlessworkflow.fluent.spec.spi.CallGrpcTaskFluent; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public class FuncCallGrpcStep extends Step { + + private final List>> steps = new ArrayList<>(); + + private String name; + + public FuncCallGrpcStep(String name) { + this.name = name; + } + + public FuncCallGrpcStep() {} + + public void setName(String name) { + this.name = name; + } + + public FuncCallGrpcStep proto(String uri) { + steps.add(b -> b.proto(uri)); + return this; + } + + public FuncCallGrpcStep proto(String uri, AuthenticationConfigurer authenticationConfigurer) { + steps.add(b -> b.proto(uri, authenticationConfigurer)); + return this; + } + + public FuncCallGrpcStep service(String name, String host) { + steps.add(b -> b.service(name, host)); + return this; + } + + public FuncCallGrpcStep service(String name, String host, int port) { + steps.add(b -> b.service(name, host, port)); + return this; + } + + public FuncCallGrpcStep method(String method) { + steps.add(b -> b.method(method)); + return this; + } + + public FuncCallGrpcStep argument(String name, Object value) { + steps.add(b -> b.argument(name, value)); + return this; + } + + public FuncCallGrpcStep arguments(java.util.Map arguments) { + steps.add(b -> b.arguments(arguments)); + return this; + } + + public FuncCallGrpcStep authentication(AuthenticationConfigurer authenticationConfigurer) { + steps.add(b -> b.authentication(authenticationConfigurer)); + return this; + } + + @Override + protected void configure(FuncTaskItemListBuilder list, Consumer post) { + list.grpc( + name, + builder -> { + for (Consumer> c : steps) { + c.accept(builder); + } + post.accept(builder); + }); + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java index bcf3f20fb..160d1b084 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java @@ -33,6 +33,7 @@ import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; import io.serverlessworkflow.fluent.func.FuncTryTaskBuilder; +import io.serverlessworkflow.fluent.func.configurers.FuncCallGrpcConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncCallHttpConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncCallOpenAPIConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer; @@ -1946,19 +1947,90 @@ public static FuncTaskConfigurer call(String name, FuncCallOpenAPIConfigurer con return list -> list.openapi(name, configurer); } + /** + * gRPC call using a fluent {@link FuncCallGrpcStep}. + * + *

This overload creates an unnamed gRPC call task. + * + *

{@code
+   * tasks(FuncDSL.call(FuncDSL.grpc()
+   *     .proto("proto/greeter.proto")
+   *     .service("Greeter", "localhost")
+   *     .method("SayHello")
+   *     .argument("name", "World")));
+   * }
+ * + * @param spec fluent gRPC spec built via {@link #grpc()} + * @return a {@link FuncTaskConfigurer} that adds a gRPC call task + */ + public static FuncTaskConfigurer call(FuncCallGrpcStep spec) { + return call(null, spec); + } + + /** + * gRPC call using a fluent {@link FuncCallGrpcStep} with an explicit task name. + * + *
{@code
+   * tasks(
+   * FuncDSL.call(
+   * "greet",
+   * FuncDSL.grpc()
+   *     .proto("proto/greeter.proto")
+   *     .service("Greeter", "localhost")
+   *     .method("SayHello")
+   *     .argument("name", "World"))
+   * );
+   * }
+ * + * @param name task name, or {@code null} for an anonymous task + * @param spec fluent gRPC spec built via {@link #grpc()} + * @return a {@link FuncTaskConfigurer} that adds a named gRPC call task + */ + public static FuncTaskConfigurer call(String name, FuncCallGrpcStep spec) { + Objects.requireNonNull(spec, "spec"); + spec.setName(name); + return spec; + } + + /** + * Low-level gRPC call entrypoint using a {@link FuncCallGrpcConfigurer}. + * + *

This overload creates an unnamed gRPC call task. + * + * @param configurer configurer that mutates the underlying gRPC call builder + * @return a {@link FuncTaskConfigurer} that adds a gRPC call task + */ + public static FuncTaskConfigurer call(FuncCallGrpcConfigurer configurer) { + return call(null, configurer); + } + + /** + * Low-level gRPC call entrypoint using a {@link FuncCallGrpcConfigurer}. + * + *

This overload allows assigning an explicit task name. + * + * @param name task name, or {@code null} for an anonymous task + * @param configurer configurer that mutates the underlying gRPC call builder + * @return a {@link FuncTaskConfigurer} that adds a gRPC call task + */ + public static FuncTaskConfigurer call(String name, FuncCallGrpcConfigurer configurer) { + Objects.requireNonNull(configurer, "configurer"); + return list -> list.grpc(name, configurer); + } + /** * Create a new OpenAPI specification to be used with {@link #call(FuncCallOpenAPIStep)}. * *

Typical usage: * *

{@code
-   * FuncDSL.call(openapi().document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth")).operation("getPetById").parameter("id", 123));
+   * FuncDSL.call(
+   * openapi()
+   * .document("https://petstore.swagger.io/v2/swagger.json")
+   * .operation("getPetById"))
+   * );
    * }
* - *

The returned spec is a fluent builder that records operations (document, operation, - * parameters, authentication, etc.) and applies them to the underlying OpenAPI call task at build - * time. - * * @return a new {@link FuncCallOpenAPIStep} */ public static FuncCallOpenAPIStep openapi() { @@ -1977,7 +2049,37 @@ public static FuncCallOpenAPIStep openapi(String name) { } /** - * Create a new, empty HTTP specification to be used with {@link #call(FuncCallHttpStep)}. + * Create a new gRPC specification to be used with {@link #call(FuncCallGrpcStep)}. + * + *

Typical usage: + * + *

{@code
+   * FuncDSL.call(
+   * grpc()
+   * .proto("proto/greeter.proto")
+   *          .service("Greeter", "localhost")
+   *          .method("SayHello")
+   *          .argument("name", "World"));
+   * }
+ * + * @return a new {@link FuncCallGrpcStep} + */ + public static FuncCallGrpcStep grpc() { + return new FuncCallGrpcStep(); + } + + /** + * Named variant of {@link #grpc()}. + * + * @param name task name to be used when the spec is attached via {@link #call(FuncCallGrpcStep)} + * @return a new named {@link FuncCallGrpcStep} + */ + public static FuncCallGrpcStep grpc(String name) { + return new FuncCallGrpcStep(name); + } + + /** + * Create a new HTTP specification to be used with {@link #call(FuncCallHttpStep)}. * *

Typical usage: * diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/CallGrpcFluent.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/CallGrpcFluent.java new file mode 100644 index 000000000..977d0d834 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/CallGrpcFluent.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.spi; + +import io.serverlessworkflow.fluent.spec.TaskBaseBuilder; +import java.util.function.Consumer; + +public interface CallGrpcFluent, LIST> { + + LIST grpc(String name, Consumer itemsConfigurer); + + default LIST grpc(Consumer itemsConfigurer) { + return this.grpc(null, itemsConfigurer); + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java index 3fd13eb5e..3efcae1c1 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncDoFluent.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.fluent.func.spi; +import io.serverlessworkflow.fluent.func.FuncCallGrpcTaskBuilder; import io.serverlessworkflow.fluent.func.FuncCallHttpTaskBuilder; import io.serverlessworkflow.fluent.func.FuncCallOpenAPITaskBuilder; import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; @@ -52,6 +53,7 @@ public interface FuncDoFluent> CallFnFluent, CallHttpFluent, CallOpenAPIFluent, + CallGrpcFluent, WorkflowFluent { default SELF subflow(String name, Consumer itemsConfigurer) { diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java index 08c2aa003..1f518e99c 100644 --- a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java @@ -20,6 +20,7 @@ import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emit; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.get; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.grpc; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.produced; @@ -31,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import io.serverlessworkflow.api.types.CallGRPC; import io.serverlessworkflow.api.types.CallHTTP; import io.serverlessworkflow.api.types.Export; import io.serverlessworkflow.api.types.FlowDirectiveEnum; @@ -612,4 +614,58 @@ void subflow_named_with_workflow_spec_creates_named_run_task() { assertEquals("2.0.0", runWorkflow.getWorkflow().getVersion()); assertEquals(true, runWorkflow.isAwait(), "Await should be true"); } + + @Test + @DisplayName("grpc(name).proto().service().method().argument() builds CallGRPC task") + void grpc_step_builds_call_grpc_task() { + Workflow wf = + FuncWorkflowBuilder.workflow("grpc-dsl-test") + .tasks( + call( + "greet", + grpc("greet") + .proto("proto/greeter.proto") + .service("Greeter", "localhost") + .method("SayHello") + .argument("name", "World"))) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + assertEquals("greet", items.get(0).getName()); + + Task t = items.get(0).getTask(); + assertNotNull(t.getCallTask(), "CallTask expected for gRPC call"); + assertInstanceOf(CallGRPC.class, t.getCallTask().get(), "Should be CallGRPC"); + + CallGRPC grpcArgs = (CallGRPC) t.getCallTask().get(); + assertNotNull(grpcArgs.getWith().getProto(), "Proto should be set"); + assertEquals("Greeter", grpcArgs.getWith().getService().getName()); + assertEquals("localhost", grpcArgs.getWith().getService().getHost()); + assertEquals("SayHello", grpcArgs.getWith().getMethod()); + assertEquals("World", grpcArgs.getWith().getArguments().getAdditionalProperties().get("name")); + } + + @Test + @DisplayName("call(grpc().proto().service().method().argument(...)) unnamed gRPC task") + void grpc_unnamed_call_builds_task() { + Workflow wf = + FuncWorkflowBuilder.workflow("grpc-dsl-unnamed") + .tasks( + call( + grpc() + .proto("proto/svc.proto") + .service("Svc", "host") + .method("Call") + .arguments(Map.of("k", "v")))) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + + Task t = items.get(0).getTask(); + assertNotNull(t.getCallTask()); + assertInstanceOf(CallGRPC.class, t.getCallTask().get()); + assertEquals("Call", ((CallGRPC) t.getCallTask().get()).getWith().getMethod()); + } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java index c7ea88670..d227718f3 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java @@ -18,6 +18,7 @@ import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.fluent.spec.configurers.CallGrpcConfigurer; import io.serverlessworkflow.fluent.spec.spi.DoFluent; import java.util.List; import java.util.function.Consumer; @@ -168,6 +169,11 @@ public TaskItemListBuilder grpc(String name, Consumer items return addTaskItem(new TaskItem(name, task)); } + public TaskItemListBuilder grpc(String name, CallGrpcConfigurer configurer) { + Consumer consumer = configurer; + return grpc(name, consumer); + } + @Override public TaskItemListBuilder workflow(String name, Consumer itemsConfigurer) { name = defaultNameAndRequireConfig(name, itemsConfigurer, TYPE_WORKFLOW); diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java index bbde7cf0d..f4e426ef3 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcBiDirectionalStreamingTest.java @@ -15,32 +15,43 @@ */ package io.serverlessworkflow.impl.test.grpc; +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.test.grpc.handlers.ContributorBiDiStreamingHandler; import io.serverlessworkflow.impl.test.junit.DisabledIfProtocUnavailable; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @DisabledIfProtocUnavailable public class GrpcBiDirectionalStreamingTest { private static final int PORT_FOR_EXAMPLES = 5011; - private static WorkflowApplication app; - private static Server server; + private WorkflowApplication app; + private Server server; - @BeforeAll - static void setUpApp() throws IOException { + @BeforeEach + void setUp() throws IOException { server = ServerBuilder.forPort(PORT_FOR_EXAMPLES) .addService(new ContributorBiDiStreamingHandler()) @@ -51,8 +62,14 @@ static void setUpApp() throws IOException { } @AfterEach - void cleanup() throws InterruptedException { - server.shutdown().awaitTermination(); + void tearDown() throws InterruptedException { + if (server != null) { + server.shutdownNow(); + server.awaitTermination(10, TimeUnit.SECONDS); + } + if (app != null) { + app.close(); + } } @Test @@ -64,17 +81,60 @@ void grpcContributors() throws IOException { WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); - String filename = - getClass() - .getClassLoader() - .getResource("workflows-samples/grpc/proto/contributors.proto") - .getFile(); + String protoFilePath = + java.util.Objects.requireNonNull( + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto")) + .toString(); WorkflowModel model = - workflowDefinition.instance(Map.of("protoFilePath", "file://" + filename)).start().join(); + workflowDefinition.instance(Map.of("protoFilePath", protoFilePath)).start().join(); Collection collection = model.asCollection(); Assertions.assertThat(collection).hasSize(5); } + + @ParameterizedTest(name = "{0}") + @MethodSource("contributorsBidiStreamSources") + void testContributorsBidiStreamDsl(String sourceName, Workflow workflow) throws IOException { + String protoFilePath = + java.util.Objects.requireNonNull( + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto")) + .toString(); + + WorkflowModel model = + app.workflowDefinition(workflow) + .instance(Map.of("protoFilePath", protoFilePath)) + .start() + .join(); + + Collection collection = model.asCollection(); + + assertThat(collection).hasSize(5); + } + + private static Stream contributorsBidiStreamSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath("workflows-samples/grpc/contributors-bidi-stream-call.yaml"), + contributorsBidiStreamWorkflow()) + .map(w -> Arguments.of(WorkflowDefinitionId.of(w).toString(), w)); + } + + private static Workflow contributorsBidiStreamWorkflow() { + return WorkflowBuilder.workflow("grpc-example", "test", "0.1.0") + .tasks( + doTasks( + call( + "greet", + grpc() + .proto("workflows-samples/grpc/proto/contributors.proto") + .service("BiDirectionalStreaming", "localhost", PORT_FOR_EXAMPLES) + .method("CreateContributor") + .argument("github", "dependabot[bot]")))) + .build(); + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java index f22c34d5a..0a0e98f9f 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java @@ -15,31 +15,42 @@ */ package io.serverlessworkflow.impl.test.grpc; +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.test.grpc.handlers.ContributorClientStreamingHandler; import io.serverlessworkflow.impl.test.junit.DisabledIfProtocUnavailable; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @DisabledIfProtocUnavailable public class GrpcClientStreamingTest { private static final int PORT_FOR_EXAMPLES = 5011; - private static WorkflowApplication app; - private static Server server; + private WorkflowApplication app; + private Server server; - @BeforeAll - static void setUpApp() throws IOException { + @BeforeEach + void setUp() throws IOException { server = ServerBuilder.forPort(PORT_FOR_EXAMPLES) .addService(new ContributorClientStreamingHandler()) @@ -50,8 +61,14 @@ static void setUpApp() throws IOException { } @AfterEach - void cleanup() throws InterruptedException { - server.shutdown().awaitTermination(); + void tearDown() throws InterruptedException { + if (server != null) { + server.shutdownNow(); + server.awaitTermination(10, TimeUnit.SECONDS); + } + if (app != null) { + app.close(); + } } @Test @@ -70,4 +87,37 @@ void grpcPerson() throws IOException { Assertions.assertThat(list).isNotEmpty(); } + + @ParameterizedTest(name = "{0}") + @MethodSource("contributorsClientStreamSources") + void testContributorsClientStreamDsl(String sourceName, Workflow workflow) { + List> list = + app.workflowDefinition(workflow).instance(Map.of()).start().join().asCollection().stream() + .map(m -> m.asMap().orElseThrow()) + .toList(); + + assertThat(list).isNotEmpty(); + } + + private static Stream contributorsClientStreamSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-client-stream-call.yaml"), + contributorsClientStreamWorkflow()) + .map(w -> Arguments.of(WorkflowDefinitionId.of(w).toString(), w)); + } + + private static Workflow contributorsClientStreamWorkflow() { + return WorkflowBuilder.workflow("grpc-example", "test", "0.1.0") + .tasks( + doTasks( + call( + "greet", + grpc() + .proto("workflows-samples/grpc/proto/contributors.proto") + .service("ClientStreaming", "localhost", PORT_FOR_EXAMPLES) + .method("CreateContributor") + .argument("github", "dependabot[bot]")))) + .build(); + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java index 5f18ca670..9d871d507 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcServerStreamingTest.java @@ -15,32 +15,44 @@ */ package io.serverlessworkflow.impl.test.grpc; +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.test.grpc.handlers.ContributorServerStreamingHandler; import io.serverlessworkflow.impl.test.junit.DisabledIfProtocUnavailable; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @DisabledIfProtocUnavailable public class GrpcServerStreamingTest { private static final int PORT_FOR_EXAMPLES = 5011; - private static WorkflowApplication app; - private static Server server; + private WorkflowApplication app; + private Server server; - @BeforeAll - static void setUpApp() throws IOException { + @BeforeEach + void setUp() throws IOException { server = ServerBuilder.forPort(PORT_FOR_EXAMPLES) .addService(new ContributorServerStreamingHandler()) @@ -51,8 +63,14 @@ static void setUpApp() throws IOException { } @AfterEach - void cleanup() throws InterruptedException { - server.shutdown().awaitTermination(); + void tearDown() throws InterruptedException { + if (server != null) { + server.shutdownNow(); + server.awaitTermination(10, TimeUnit.SECONDS); + } + if (app != null) { + app.close(); + } } @Test @@ -64,17 +82,61 @@ void grpcContributors() throws IOException { WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); - String filename = - getClass() - .getClassLoader() - .getResource("workflows-samples/grpc/proto/contributors.proto") - .getFile(); + String protoFilePath = + Objects.requireNonNull( + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto")) + .toString(); WorkflowModel model = - workflowDefinition.instance(Map.of("protoFilePath", "file://" + filename)).start().join(); + workflowDefinition.instance(Map.of("protoFilePath", protoFilePath)).start().join(); Collection collection = model.asCollection(); Assertions.assertThat(collection).hasSize(5); } + + @ParameterizedTest(name = "{0}") + @MethodSource("contributorsServerStreamSources") + void testContributorsServerStreamDsl(String sourceName, Workflow workflow) throws IOException { + String protoFilePath = + Objects.requireNonNull( + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto")) + .toString(); + + WorkflowModel model = + app.workflowDefinition(workflow) + .instance(Map.of("protoFilePath", protoFilePath)) + .start() + .join(); + + Collection collection = model.asCollection(); + + assertThat(collection).hasSize(5); + } + + private static Stream contributorsServerStreamSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-server-stream-call.yaml"), + contributorsServerStreamWorkflow()) + .map(w -> Arguments.of(WorkflowDefinitionId.of(w).toString(), w)); + } + + private static Workflow contributorsServerStreamWorkflow() { + return WorkflowBuilder.workflow("grpc-example", "test", "0.1.0") + .tasks( + doTasks( + call( + "greet", + grpc() + .proto("workflows-samples/grpc/proto/contributors.proto") + .service("ServerStreaming", "localhost", PORT_FOR_EXAMPLES) + .method("CreateContributor") + .argument("github", "dependabot[bot]")))) + .build(); + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java index 594b481e5..b2ea54884 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryArgsExprTest.java @@ -15,30 +15,42 @@ */ package io.serverlessworkflow.impl.test.grpc; +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; +import io.serverlessworkflow.impl.jackson.JsonUtils; import io.serverlessworkflow.impl.test.grpc.handlers.ContributorUnaryArgsExprHandler; import io.serverlessworkflow.impl.test.junit.DisabledIfProtocUnavailable; import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @DisabledIfProtocUnavailable public class GrpcUnaryArgsExprTest { private static final int PORT_FOR_EXAMPLES = 5011; - private static WorkflowApplication app; - private static Server server; + private WorkflowApplication app; + private Server server; - @BeforeAll - static void setUpApp() throws IOException { + @BeforeEach + void setUp() throws IOException { server = ServerBuilder.forPort(PORT_FOR_EXAMPLES) .addService(new ContributorUnaryArgsExprHandler()) @@ -49,8 +61,14 @@ static void setUpApp() throws IOException { } @AfterEach - void cleanup() throws InterruptedException { - server.shutdown().awaitTermination(); + void tearDown() throws InterruptedException { + if (server != null) { + server.shutdownNow(); + server.awaitTermination(10, TimeUnit.SECONDS); + } + if (app != null) { + app.close(); + } } @Test @@ -72,4 +90,40 @@ void grpcPerson() throws IOException { Assertions.assertThat(output).contains(Map.entry("message", "Success with bootable[origin]")); } + + @ParameterizedTest(name = "{0}") + @MethodSource("contributorsUnaryArgsExprSources") + void testContributorsUnaryArgsExprDsl(String sourceName, Workflow workflow) { + Map output = + app.workflowDefinition(workflow) + .instance(Map.of("github", "bootable[origin]")) + .start() + .thenApply( + model -> (Map) JsonUtils.toJavaValue(JsonUtils.modelToJson(model))) + .join(); + + assertThat(output).contains(Map.entry("message", "Success with bootable[origin]")); + } + + private static Stream contributorsUnaryArgsExprSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-unary-args-expr-call.yaml"), + contributorsUnaryArgsExprWorkflow()) + .map(w -> Arguments.of(WorkflowDefinitionId.of(w).toString(), w)); + } + + private static Workflow contributorsUnaryArgsExprWorkflow() { + return WorkflowBuilder.workflow("grpc-example", "test", "0.1.0") + .tasks( + doTasks( + call( + "greet", + grpc() + .proto("workflows-samples/grpc/proto/contributors.proto") + .service("UnaryArgsExpr", "localhost", PORT_FOR_EXAMPLES) + .method("CreateContributor") + .argument("github", "${ .github }")))) + .build(); + } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java index bce72a72e..498f7c951 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcUnaryTest.java @@ -15,30 +15,42 @@ */ package io.serverlessworkflow.impl.test.grpc; +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; +import io.serverlessworkflow.impl.jackson.JsonUtils; import io.serverlessworkflow.impl.test.grpc.handlers.PersonUnaryHandler; import io.serverlessworkflow.impl.test.junit.DisabledIfProtocUnavailable; import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @DisabledIfProtocUnavailable public class GrpcUnaryTest { private static final int PORT_FOR_EXAMPLES = 5011; - private static WorkflowApplication app; - private static Server server; + private WorkflowApplication app; + private Server server; - @BeforeAll - static void setUpApp() throws IOException { + @BeforeEach + void setUp() throws IOException { server = ServerBuilder.forPort(PORT_FOR_EXAMPLES).addService(new PersonUnaryHandler()).build(); server.start(); @@ -46,8 +58,14 @@ static void setUpApp() throws IOException { } @AfterEach - void cleanup() throws InterruptedException { - server.shutdown().awaitTermination(); + void tearDown() throws InterruptedException { + if (server != null) { + server.shutdownNow(); + server.awaitTermination(10, TimeUnit.SECONDS); + } + if (app != null) { + app.close(); + } } @Test @@ -58,15 +76,16 @@ void grpcPerson() throws IOException { WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); - String filename = - getClass() - .getClassLoader() - .getResource("workflows-samples/grpc/proto/person.proto") - .getFile(); + String protoFilePath = + java.util.Objects.requireNonNull( + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/person.proto")) + .toString(); Map output = workflowDefinition - .instance(Map.of("protoFilePath", "file://" + filename)) + .instance(Map.of("protoFilePath", protoFilePath)) .start() .join() .asMap() @@ -74,4 +93,45 @@ void grpcPerson() throws IOException { Assertions.assertThat(output).contains(Map.entry("name", "John Doe"), Map.entry("id", 891182)); } + + @ParameterizedTest(name = "{0}") + @MethodSource("getPersonCallSources") + void testGetPersonCallDsl(String sourceName, Workflow workflow) throws IOException { + String protoFilePath = + java.util.Objects.requireNonNull( + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/person.proto")) + .toString(); + + Map output = + app.workflowDefinition(workflow) + .instance(Map.of("protoFilePath", protoFilePath)) + .start() + .thenApply( + model -> (Map) JsonUtils.toJavaValue(JsonUtils.modelToJson(model))) + .join(); + + assertThat(output).contains(Map.entry("name", "John Doe"), Map.entry("id", 891182)); + } + + private static Stream getPersonCallSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath("workflows-samples/grpc/get-person-call.yaml"), + getPersonCallWorkflow()) + .map(w -> Arguments.of(WorkflowDefinitionId.of(w).toString(), w)); + } + + private static Workflow getPersonCallWorkflow() { + return WorkflowBuilder.workflow("grpc-example", "test", "0.1.0") + .tasks( + doTasks( + call( + "greet", + grpc() + .proto("workflows-samples/grpc/proto/person.proto") + .service("Person", "localhost", PORT_FOR_EXAMPLES) + .method("GetPerson")))) + .build(); + } }