Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iwf-idl
106 changes: 90 additions & 16 deletions src/main/java/io/iworkflow/core/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,7 @@
import io.iworkflow.core.exceptions.WorkflowAlreadyStartedException;
import io.iworkflow.core.exceptions.WorkflowNotExistsException;
import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.gen.models.ErrorSubStatus;
import io.iworkflow.gen.models.KeyValue;
import io.iworkflow.gen.models.SearchAttribute;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
import io.iworkflow.gen.models.SearchAttributeValueType;
import io.iworkflow.gen.models.StateCompletionOutput;
import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse;
import io.iworkflow.gen.models.WorkflowGetResponse;
import io.iworkflow.gen.models.WorkflowGetSearchAttributesResponse;
import io.iworkflow.gen.models.WorkflowSearchRequest;
import io.iworkflow.gen.models.WorkflowSearchResponse;
import io.iworkflow.gen.models.*;
import io.iworkflow.gen.models.WorkflowStateOptions;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.implementation.MethodDelegation;
Expand All @@ -25,11 +15,7 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

import static io.iworkflow.core.WorkflowState.shouldSkipWaitUntil;
Expand Down Expand Up @@ -505,6 +491,94 @@ public void signalWorkflow(
signalWorkflow(workflowClass, workflowId, "", signalChannelName, signalValue);
}

/**
* Send a single empty message to internalChannel
*
* @param workflowClass required
* @param workflowId required
* @param internalChannelName required
* @throws NoRunningWorkflowException if the workflow is not existing or not running
*/
public void publishToInternalChannel(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String internalChannelName) {
publishToInternalChannel(workflowClass, workflowId, "", internalChannelName, null);
}

/**
* Send a single message to internalChannel
*
* @param workflowClass required
* @param workflowId required
* @param internalChannelName required
* @param channelMessage optional, can be null.
* @throws NoRunningWorkflowException if the workflow is not existing or not running
*/
public void publishToInternalChannel(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String internalChannelName,
final Object channelMessage) {
publishToInternalChannel(workflowClass, workflowId, "", internalChannelName, channelMessage);
}

/**
* Send a single message to internalChannel
*
* @param workflowClass required
* @param workflowId required
* @param workflowRunId optional, can be empty
* @param internalChannelName required
* @param channelMessage optional, can be null.
* @throws NoRunningWorkflowException if the workflow is not existing or not running
*/
public void publishToInternalChannel(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final String internalChannelName,
final Object channelMessage) {
publishToInternalChannelBatch(workflowClass, workflowId, workflowRunId, internalChannelName, Arrays.asList(channelMessage));
}

/**
* Send a batch of messages to internalChannel
*
* @param workflowClass required
* @param workflowId required
* @param workflowRunId optional, can be empty
* @param internalChannelName required
* @param channelMessages messages in batch
* @throws NoRunningWorkflowException if the workflow is not existing or not running
*/
public void publishToInternalChannelBatch(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final String internalChannelName,
final List<Object> channelMessages) {
final String wfType = workflowClass.getSimpleName();

checkWorkflowTypeExists(wfType);

final Class<?> channelValueType = registry.getInternalChannelTypeStore(wfType).getType(internalChannelName);

List<InterStateChannelPublishing> rawMessages = new ArrayList<>(channelMessages.size());
for (Object channelValue : channelMessages) {
if (channelValue != null && !channelValueType.isInstance(channelValue)) {
throw new IllegalArgumentException(String.format("message value is not of channel type %s", channelValueType.getName()));
}
rawMessages.add(
new InterStateChannelPublishing()
.channelName(internalChannelName)
.value(clientOptions.getObjectEncoder().encode(channelValue))
);
}

unregisteredClient.publishToInternalChannel(workflowId, workflowRunId, rawMessages);
}

/**
* @param workflowId required
* @param workflowRunId optional, can be empty
Expand Down
46 changes: 18 additions & 28 deletions src/main/java/io/iworkflow/core/UnregisteredClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,7 @@
import io.iworkflow.core.validator.CronScheduleValidator;
import io.iworkflow.gen.api.ApiClient;
import io.iworkflow.gen.api.DefaultApi;
import io.iworkflow.gen.models.EncodedObject;
import io.iworkflow.gen.models.KeyValue;
import io.iworkflow.gen.models.PersistenceLoadingPolicy;
import io.iworkflow.gen.models.SearchAttribute;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
import io.iworkflow.gen.models.StateCompletionOutput;
import io.iworkflow.gen.models.WorkflowGetDataObjectsRequest;
import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse;
import io.iworkflow.gen.models.WorkflowGetRequest;
import io.iworkflow.gen.models.WorkflowGetResponse;
import io.iworkflow.gen.models.WorkflowGetSearchAttributesRequest;
import io.iworkflow.gen.models.WorkflowGetSearchAttributesResponse;
import io.iworkflow.gen.models.WorkflowResetRequest;
import io.iworkflow.gen.models.WorkflowResetResponse;
import io.iworkflow.gen.models.WorkflowRpcRequest;
import io.iworkflow.gen.models.WorkflowRpcResponse;
import io.iworkflow.gen.models.WorkflowSearchRequest;
import io.iworkflow.gen.models.WorkflowSearchResponse;
import io.iworkflow.gen.models.WorkflowSetDataObjectsRequest;
import io.iworkflow.gen.models.WorkflowSetSearchAttributesRequest;
import io.iworkflow.gen.models.WorkflowSignalRequest;
import io.iworkflow.gen.models.WorkflowSkipTimerRequest;
import io.iworkflow.gen.models.WorkflowStartOptions;
import io.iworkflow.gen.models.WorkflowStartRequest;
import io.iworkflow.gen.models.WorkflowStartResponse;
import io.iworkflow.gen.models.WorkflowStatus;
import io.iworkflow.gen.models.WorkflowStopRequest;
import io.iworkflow.gen.models.WorkflowWaitForStateCompletionRequest;
import io.iworkflow.gen.models.*;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -449,6 +422,23 @@ public void signalWorkflow(
}
}

public void publishToInternalChannel(
final String workflowId,
final String workflowRunId,
final List<InterStateChannelPublishing> messages){

try {
defaultApi.apiV1WorkflowPublishToInternalChannelPost(
new PublishToInternalChannelRequest()
.messages(messages)
.workflowId(workflowId)
.workflowRunId(workflowRunId)
);
} catch (FeignException.FeignClientException exp) {
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
}
}

/**
* @param workflowId workflowId
* @param workflowRunId workflowRunId
Expand Down
14 changes: 14 additions & 0 deletions src/test/java/io/iworkflow/integ/InternalChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import io.iworkflow.core.Client;
import io.iworkflow.core.ClientOptions;
import io.iworkflow.integ.internalchannel.BasicInternalChannelWorkflow;
import io.iworkflow.integ.internalchannel.WaitingInternalChannelWorkflow;
import io.iworkflow.spring.TestSingletonWorkerService;
import io.iworkflow.spring.controller.WorkflowRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;

public class InternalChannelTest {
Expand All @@ -28,4 +30,16 @@ public void testBasicInternalWorkflow() throws InterruptedException {
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(3, output);
}

@Test
public void testWaitingInternalWorkflow() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final String wfId = "waiting-internal-test-id" + System.currentTimeMillis() / 1000;
final Integer input = 1;
final String runId = client.startWorkflow(
WaitingInternalChannelWorkflow.class, wfId, 10, input);
client.publishToInternalChannelBatch(WaitingInternalChannelWorkflow.class, wfId, runId, WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME, Arrays.asList(2, 3));
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(6, output);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.iworkflow.integ.internalchannel;

import io.iworkflow.core.ObjectWorkflow;
import io.iworkflow.core.StateDef;
import io.iworkflow.core.communication.CommunicationMethodDef;
import io.iworkflow.core.communication.InternalChannelDef;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;

@Component
public class WaitingInternalChannelWorkflow implements ObjectWorkflow {
public static final String INTER_STATE_CHANNEL_NAME = "test-inter-state-channel-1";

@Override
public List<CommunicationMethodDef> getCommunicationSchema() {
return Arrays.asList(
InternalChannelDef.create(Integer.class, INTER_STATE_CHANNEL_NAME)
);
}

@Override
public List<StateDef> getWorkflowStates() {
return Arrays.asList(
StateDef.startingState(new WaitingInternalChannelWorkflowState())
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.iworkflow.integ.internalchannel;

import io.iworkflow.core.Context;
import io.iworkflow.core.StateDecision;
import io.iworkflow.core.WorkflowState;
import io.iworkflow.core.command.CommandRequest;
import io.iworkflow.core.command.CommandResults;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.communication.InternalChannelCommand;
import io.iworkflow.core.communication.InternalChannelCommandResult;
import io.iworkflow.core.persistence.Persistence;

public class WaitingInternalChannelWorkflowState implements WorkflowState<Integer> {

@Override
public Class<Integer> getInputType() {
return Integer.class;
}

@Override
public CommandRequest waitUntil(
Context context,
Integer input,
Persistence persistence,
final Communication communication) {
return CommandRequest.forAllCommandCompleted(
InternalChannelCommand.create(WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME),
InternalChannelCommand.create(WaitingInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME)
);
}

@Override
public StateDecision execute(
Context context,
Integer input,
CommandResults commandResults,
Persistence persistence,
final Communication communication) {
final InternalChannelCommandResult result1 = commandResults.getAllInternalChannelCommandResult().get(0);
final InternalChannelCommandResult result2 = commandResults.getAllInternalChannelCommandResult().get(1);

Integer output = input + (Integer) result1.getValue().get() + (Integer) result2.getValue().get();

return StateDecision.gracefulCompleteWorkflow(output);
}
}
Loading