diff --git a/iwf-idl b/iwf-idl index 5a96d62..b249c5e 160000 --- a/iwf-idl +++ b/iwf-idl @@ -1 +1 @@ -Subproject commit 5a96d629c6e337cdc8f363dc52a6965e02a1c0ec +Subproject commit b249c5e81756d75433acc947d1552af5efa0fb55 diff --git a/src/main/java/io/iworkflow/core/Client.java b/src/main/java/io/iworkflow/core/Client.java index 1dfd949..f923c8c 100644 --- a/src/main/java/io/iworkflow/core/Client.java +++ b/src/main/java/io/iworkflow/core/Client.java @@ -5,17 +5,7 @@ import io.iworkflow.core.exceptions.WorkflowAlreadyStartedException; import io.iworkflow.core.exceptions.WorkflowNotExistsException; import io.iworkflow.core.persistence.PersistenceOptions; -import io.iworkflow.gen.models.ErrorSubStatus; -import io.iworkflow.gen.models.KeyValue; -import io.iworkflow.gen.models.SearchAttribute; -import io.iworkflow.gen.models.SearchAttributeKeyAndType; -import io.iworkflow.gen.models.SearchAttributeValueType; -import io.iworkflow.gen.models.StateCompletionOutput; -import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse; -import io.iworkflow.gen.models.WorkflowGetResponse; -import io.iworkflow.gen.models.WorkflowGetSearchAttributesResponse; -import io.iworkflow.gen.models.WorkflowSearchRequest; -import io.iworkflow.gen.models.WorkflowSearchResponse; +import io.iworkflow.gen.models.*; import io.iworkflow.gen.models.WorkflowStateOptions; import net.bytebuddy.ByteBuddy; import net.bytebuddy.implementation.MethodDelegation; @@ -25,11 +15,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; import static io.iworkflow.core.WorkflowState.shouldSkipWaitUntil; @@ -505,6 +491,94 @@ public void signalWorkflow( signalWorkflow(workflowClass, workflowId, "", signalChannelName, signalValue); } + /** + * Send a single empty message to internalChannel + * + * @param workflowClass required + * @param workflowId required + * @param internalChannelName required + * @throws NoRunningWorkflowException if the workflow is not existing or not running + */ + public void publishToInternalChannel( + final Class workflowClass, + final String workflowId, + final String internalChannelName) { + publishToInternalChannel(workflowClass, workflowId, "", internalChannelName, null); + } + + /** + * Send a single message to internalChannel + * + * @param workflowClass required + * @param workflowId required + * @param internalChannelName required + * @param channelMessage optional, can be null. + * @throws NoRunningWorkflowException if the workflow is not existing or not running + */ + public void publishToInternalChannel( + final Class workflowClass, + final String workflowId, + final String internalChannelName, + final Object channelMessage) { + publishToInternalChannel(workflowClass, workflowId, "", internalChannelName, channelMessage); + } + + /** + * Send a single message to internalChannel + * + * @param workflowClass required + * @param workflowId required + * @param workflowRunId optional, can be empty + * @param internalChannelName required + * @param channelMessage optional, can be null. + * @throws NoRunningWorkflowException if the workflow is not existing or not running + */ + public void publishToInternalChannel( + final Class workflowClass, + final String workflowId, + final String workflowRunId, + final String internalChannelName, + final Object channelMessage) { + publishToInternalChannelBatch(workflowClass, workflowId, workflowRunId, internalChannelName, Arrays.asList(channelMessage)); + } + + /** + * Send a batch of messages to internalChannel + * + * @param workflowClass required + * @param workflowId required + * @param workflowRunId optional, can be empty + * @param internalChannelName required + * @param channelMessages messages in batch + * @throws NoRunningWorkflowException if the workflow is not existing or not running + */ + public void publishToInternalChannelBatch( + final Class workflowClass, + final String workflowId, + final String workflowRunId, + final String internalChannelName, + final List channelMessages) { + final String wfType = workflowClass.getSimpleName(); + + checkWorkflowTypeExists(wfType); + + final Class channelValueType = registry.getInternalChannelTypeStore(wfType).getType(internalChannelName); + + List rawMessages = new ArrayList<>(channelMessages.size()); + for (Object channelValue : channelMessages) { + if (channelValue != null && !channelValueType.isInstance(channelValue)) { + throw new IllegalArgumentException(String.format("message value is not of channel type %s", channelValueType.getName())); + } + rawMessages.add( + new InterStateChannelPublishing() + .channelName(internalChannelName) + .value(clientOptions.getObjectEncoder().encode(channelValue)) + ); + } + + unregisteredClient.publishToInternalChannel(workflowId, workflowRunId, rawMessages); + } + /** * @param workflowId required * @param workflowRunId optional, can be empty diff --git a/src/main/java/io/iworkflow/core/UnregisteredClient.java b/src/main/java/io/iworkflow/core/UnregisteredClient.java index 825339a..da44e5a 100644 --- a/src/main/java/io/iworkflow/core/UnregisteredClient.java +++ b/src/main/java/io/iworkflow/core/UnregisteredClient.java @@ -8,34 +8,7 @@ import io.iworkflow.core.validator.CronScheduleValidator; import io.iworkflow.gen.api.ApiClient; import io.iworkflow.gen.api.DefaultApi; -import io.iworkflow.gen.models.EncodedObject; -import io.iworkflow.gen.models.KeyValue; -import io.iworkflow.gen.models.PersistenceLoadingPolicy; -import io.iworkflow.gen.models.SearchAttribute; -import io.iworkflow.gen.models.SearchAttributeKeyAndType; -import io.iworkflow.gen.models.StateCompletionOutput; -import io.iworkflow.gen.models.WorkflowGetDataObjectsRequest; -import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse; -import io.iworkflow.gen.models.WorkflowGetRequest; -import io.iworkflow.gen.models.WorkflowGetResponse; -import io.iworkflow.gen.models.WorkflowGetSearchAttributesRequest; -import io.iworkflow.gen.models.WorkflowGetSearchAttributesResponse; -import io.iworkflow.gen.models.WorkflowResetRequest; -import io.iworkflow.gen.models.WorkflowResetResponse; -import io.iworkflow.gen.models.WorkflowRpcRequest; -import io.iworkflow.gen.models.WorkflowRpcResponse; -import io.iworkflow.gen.models.WorkflowSearchRequest; -import io.iworkflow.gen.models.WorkflowSearchResponse; -import io.iworkflow.gen.models.WorkflowSetDataObjectsRequest; -import io.iworkflow.gen.models.WorkflowSetSearchAttributesRequest; -import io.iworkflow.gen.models.WorkflowSignalRequest; -import io.iworkflow.gen.models.WorkflowSkipTimerRequest; -import io.iworkflow.gen.models.WorkflowStartOptions; -import io.iworkflow.gen.models.WorkflowStartRequest; -import io.iworkflow.gen.models.WorkflowStartResponse; -import io.iworkflow.gen.models.WorkflowStatus; -import io.iworkflow.gen.models.WorkflowStopRequest; -import io.iworkflow.gen.models.WorkflowWaitForStateCompletionRequest; +import io.iworkflow.gen.models.*; import java.util.List; import java.util.Map; @@ -449,6 +422,23 @@ public void signalWorkflow( } } + public void publishToInternalChannel( + final String workflowId, + final String workflowRunId, + final List messages){ + + try { + defaultApi.apiV1WorkflowPublishToInternalChannelPost( + new PublishToInternalChannelRequest() + .messages(messages) + .workflowId(workflowId) + .workflowRunId(workflowRunId) + ); + } catch (FeignException.FeignClientException exp) { + throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp); + } + } + /** * @param workflowId workflowId * @param workflowRunId workflowRunId diff --git a/src/test/java/io/iworkflow/integ/InternalChannelTest.java b/src/test/java/io/iworkflow/integ/InternalChannelTest.java index 29017dd..d83c066 100644 --- a/src/test/java/io/iworkflow/integ/InternalChannelTest.java +++ b/src/test/java/io/iworkflow/integ/InternalChannelTest.java @@ -3,12 +3,14 @@ import io.iworkflow.core.Client; import io.iworkflow.core.ClientOptions; import io.iworkflow.integ.internalchannel.BasicInternalChannelWorkflow; +import io.iworkflow.integ.internalchannel.WaitingInternalChannelWorkflow; import io.iworkflow.spring.TestSingletonWorkerService; import io.iworkflow.spring.controller.WorkflowRegistry; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.concurrent.ExecutionException; public class InternalChannelTest { @@ -28,4 +30,16 @@ public void testBasicInternalWorkflow() throws InterruptedException { final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId); Assertions.assertEquals(3, output); } + + @Test + public void testWaitingInternalWorkflow() throws InterruptedException { + final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault); + final String wfId = "waiting-internal-test-id" + System.currentTimeMillis() / 1000; + final Integer input = 1; + final String runId = client.startWorkflow( + WaitingInternalChannelWorkflow.class, wfId, 10, input); + client.publishToInternalChannelBatch(WaitingInternalChannelWorkflow.class, wfId, runId, WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME, Arrays.asList(2, 3)); + final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId); + Assertions.assertEquals(6, output); + } } diff --git a/src/test/java/io/iworkflow/integ/internalchannel/WaitingInternalChannelWorkflow.java b/src/test/java/io/iworkflow/integ/internalchannel/WaitingInternalChannelWorkflow.java new file mode 100644 index 0000000..62e2fe7 --- /dev/null +++ b/src/test/java/io/iworkflow/integ/internalchannel/WaitingInternalChannelWorkflow.java @@ -0,0 +1,29 @@ +package io.iworkflow.integ.internalchannel; + +import io.iworkflow.core.ObjectWorkflow; +import io.iworkflow.core.StateDef; +import io.iworkflow.core.communication.CommunicationMethodDef; +import io.iworkflow.core.communication.InternalChannelDef; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +@Component +public class WaitingInternalChannelWorkflow implements ObjectWorkflow { + public static final String INTER_STATE_CHANNEL_NAME = "test-inter-state-channel-1"; + + @Override + public List getCommunicationSchema() { + return Arrays.asList( + InternalChannelDef.create(Integer.class, INTER_STATE_CHANNEL_NAME) + ); + } + + @Override + public List getWorkflowStates() { + return Arrays.asList( + StateDef.startingState(new WaitingInternalChannelWorkflowState()) + ); + } +} diff --git a/src/test/java/io/iworkflow/integ/internalchannel/WaitingInternalChannelWorkflowState.java b/src/test/java/io/iworkflow/integ/internalchannel/WaitingInternalChannelWorkflowState.java new file mode 100644 index 0000000..94284db --- /dev/null +++ b/src/test/java/io/iworkflow/integ/internalchannel/WaitingInternalChannelWorkflowState.java @@ -0,0 +1,46 @@ +package io.iworkflow.integ.internalchannel; + +import io.iworkflow.core.Context; +import io.iworkflow.core.StateDecision; +import io.iworkflow.core.WorkflowState; +import io.iworkflow.core.command.CommandRequest; +import io.iworkflow.core.command.CommandResults; +import io.iworkflow.core.communication.Communication; +import io.iworkflow.core.communication.InternalChannelCommand; +import io.iworkflow.core.communication.InternalChannelCommandResult; +import io.iworkflow.core.persistence.Persistence; + +public class WaitingInternalChannelWorkflowState implements WorkflowState { + + @Override + public Class getInputType() { + return Integer.class; + } + + @Override + public CommandRequest waitUntil( + Context context, + Integer input, + Persistence persistence, + final Communication communication) { + return CommandRequest.forAllCommandCompleted( + InternalChannelCommand.create(WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME), + InternalChannelCommand.create(WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME) + ); + } + + @Override + public StateDecision execute( + Context context, + Integer input, + CommandResults commandResults, + Persistence persistence, + final Communication communication) { + final InternalChannelCommandResult result1 = commandResults.getAllInternalChannelCommandResult().get(0); + final InternalChannelCommandResult result2 = commandResults.getAllInternalChannelCommandResult().get(1); + + Integer output = input + (Integer) result1.getValue().get() + (Integer) result2.getValue().get(); + + return StateDecision.gracefulCompleteWorkflow(output); + } +}