Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
*
* @see SimulatorEndpointAdapter
*/
public class SimulationFailedUnexpectedlyException extends DefaultMessage {
public class SimulationFailedUnexpectedlyExceptionMessage extends DefaultMessage {

public static final String EXCEPTION_TYPE = SimulationFailedUnexpectedlyException.class.getSimpleName() + ":Exception";
public static final String EXCEPTION_TYPE = SimulationFailedUnexpectedlyExceptionMessage.class.getSimpleName() + ":Exception";

public SimulationFailedUnexpectedlyException(Throwable e) {
public SimulationFailedUnexpectedlyExceptionMessage(Throwable e) {
super(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import static java.util.Collections.emptyList;
import static java.util.Objects.nonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.citrusframework.simulator.endpoint.SimulationFailedUnexpectedlyException.EXCEPTION_TYPE;
import static org.citrusframework.simulator.endpoint.SimulationFailedUnexpectedlyExceptionMessage.EXCEPTION_TYPE;
import static org.citrusframework.util.StringUtils.hasText;

public class SimulatorEndpointAdapter extends RequestDispatchingEndpointAdapter {
Expand Down Expand Up @@ -79,7 +79,7 @@ protected Message handleMessageInternal(Message message) {

private Message handleMessageWithCorrelation(Message request, CorrelationHandler handler) {
CompletableFuture<Message> responseFuture = new CompletableFuture<>();
handler.getScenarioEndpoint().add(request, responseFuture);
// handler.getScenarioEndpoint().add(request, responseFuture);

return awaitResponseOrThrowException(responseFuture, handler.getScenarioEndpoint().getName());
}
Expand All @@ -98,10 +98,13 @@ public Message dispatchMessage(Message message, String mappingName) {
scenario.getScenarioEndpoint().setName(scenarioName);

CompletableFuture<Message> responseFuture = new CompletableFuture<>();
scenario.getScenarioEndpoint().add(message, responseFuture);

try {
scenarioExecutorService.run(scenario, scenarioName, emptyList());
scenarioExecutorService.run(
scenario,
scenarioName,
emptyList(),
new ScenarioExecutorService.ExecutionRequestAndResponse(message, responseFuture)
);
} catch (Exception e) {
throw getResponseStatusException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,46 @@

package org.citrusframework.simulator.scenario;

import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.citrusframework.context.TestContext;
import org.citrusframework.endpoint.AbstractEndpoint;
import org.citrusframework.message.Message;
import org.citrusframework.messaging.Consumer;
import org.citrusframework.messaging.Producer;
import org.citrusframework.simulator.endpoint.EndpointMessageHandler;
import org.citrusframework.simulator.endpoint.SimulationFailedUnexpectedlyException;
import org.citrusframework.simulator.endpoint.SimulationFailedUnexpectedlyExceptionMessage;
import org.citrusframework.simulator.exception.SimulatorException;

import java.util.Stack;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import org.citrusframework.simulator.service.ScenarioExecutorService;

import static java.lang.Thread.currentThread;
import static java.util.Objects.isNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.Objects.nonNull;
import static org.citrusframework.simulator.service.runner.DefaultScenarioExecutorService.REQUEST_RESPONSE_MAPPING_VARIABLE_NAME;

@Slf4j
public class ScenarioEndpoint extends AbstractEndpoint implements Producer, Consumer {

/**
* Internal im memory message channel
*/
private final LinkedBlockingQueue<Message> channel = new LinkedBlockingQueue<>();

/**
* Stack of response futures to complete
*/
private final Stack<CompletableFuture<Message>> responseFutures = new Stack<>();

/**
* Default constructor using endpoint configuration.
*
* @param endpointConfiguration
*/
public ScenarioEndpoint(ScenarioEndpointConfiguration endpointConfiguration) {
super(endpointConfiguration);
@VisibleForTesting
static final String NO_REQUEST_RESPONSE_MAPPING_IN_TEST_CONTEXT_MESSAGE = "No request-response mapping found in test context! This may happen if you're using the deprecated `ScenarioEndpoint#fail(Throwable)` API.";

@VisibleForTesting
static final String NO_RESPONSE_FUTURE_IN_TEST_CONTEXT_MESSAGE = "Failed to match response futures to test context! This may happen if you're using the deprecated `ScenarioEndpoint#fail(Throwable)` API.";

private static @Nonnull ScenarioExecutorService.ExecutionRequestAndResponse getExecutionRequestAndResponse(TestContext testContext) {
var requestResponseMapping = testContext.getVariables().get(REQUEST_RESPONSE_MAPPING_VARIABLE_NAME);

if (nonNull(requestResponseMapping) &&
requestResponseMapping instanceof ScenarioExecutorService.ExecutionRequestAndResponse executionRequestAndResponse) {
return executionRequestAndResponse;
}

throw new SimulatorException(NO_REQUEST_RESPONSE_MAPPING_IN_TEST_CONTEXT_MESSAGE);
}

/**
* Adds new message for direct message consumption.
*
* @param request
*/
public void add(Message request, CompletableFuture<Message> future) {
responseFutures.push(future);
channel.add(request);
public ScenarioEndpoint(ScenarioEndpointConfiguration endpointConfiguration) {
super(endpointConfiguration);
}

@Override
Expand All @@ -81,38 +75,43 @@ public Message receive(TestContext context) {

@Override
public Message receive(TestContext context, long timeout) {
try {
Message message = channel.poll(timeout, MILLISECONDS);
var message = pollMessageForExecution(context, timeout);
messageReceived(message, context);

return message;
}

if (isNull(message)) {
throw new SimulatorException("Failed to receive scenario inbound message");
}
@Override
public void send(Message message, TestContext testContext) {
messageSent(message, testContext);
completeNextResponseFuture(message, testContext);
}

messageReceived(message, context);
void fail(Throwable e, TestContext testContext) {
completeNextResponseFuture(new SimulationFailedUnexpectedlyExceptionMessage(e), testContext);
}

return message;
private Message pollMessageForExecution(TestContext testContext, long timeout) {
try {
return receiveNextMessageFromChannel(testContext);
} catch (InterruptedException e) {
currentThread().interrupt();
throw new SimulatorException(e);
}
}

@Override
public void send(Message message, TestContext context) {
messageSent(message, context);
completeNextResponseFuture(message);
private Message receiveNextMessageFromChannel(TestContext testContext) throws InterruptedException {
return getExecutionRequestAndResponse(testContext).requestMessage();
}

void fail(Throwable e) {
completeNextResponseFuture(new SimulationFailedUnexpectedlyException(e));
}
private void completeNextResponseFuture(Message message, TestContext testContext) {
var responseFuture = getExecutionRequestAndResponse(testContext).responseFuture();

private void completeNextResponseFuture(Message message) {
if (responseFutures.isEmpty()) {
throw new SimulatorException("Failed to process scenario response message - missing response consumer!");
} else {
responseFutures.pop().complete(message);
if (isNull(responseFuture)) {
throw new SimulatorException(NO_RESPONSE_FUTURE_IN_TEST_CONTEXT_MESSAGE);
}

responseFuture.complete(message);
}

private void messageSent(Message message, TestContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ default String getName() {
default Void registerException(Throwable e) {
if (nonNull(getTestCaseRunner()) && getTestCaseRunner() instanceof DefaultTestCaseRunner defaultTestCaseRunner) {
defaultTestCaseRunner.getContext().addException(new CitrusRuntimeException(e));
getScenarioEndpoint().fail(e, defaultTestCaseRunner.getContext());
}

getScenarioEndpoint().fail(e);

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.citrusframework.simulator.service;

import jakarta.annotation.Nullable;
import org.citrusframework.message.Message;
import org.citrusframework.simulator.model.ScenarioParameter;
import org.citrusframework.simulator.scenario.SimulatorScenario;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Service capable of executing test executables. It takes care on setting up the executable before execution. The given
Expand All @@ -40,7 +42,7 @@ public interface ScenarioExecutorService {
* @param scenarioParameters the list of parameters to pass to the scenario when starting
* @return the scenario execution id
*/
Long run(String name, @Nullable List<ScenarioParameter> scenarioParameters);
Long run(String name, @Nullable List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse);

/**
* Starts a new scenario instance using the collection of supplied parameters.
Expand All @@ -50,5 +52,10 @@ public interface ScenarioExecutorService {
* @param scenarioParameters the list of parameters to pass to the scenario when starting
* @return the scenario execution id
*/
Long run(SimulatorScenario scenario, String name, @Nullable List<ScenarioParameter> scenarioParameters);
Long run(SimulatorScenario scenario, String name, @Nullable List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse);

record ExecutionRequestAndResponse(@Nullable Message requestMessage, @Nullable CompletableFuture<Message> responseFuture) {

public static ExecutionRequestAndResponse NOOP_EXECUTION = new ExecutionRequestAndResponse(null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import static java.util.concurrent.CompletableFuture.runAsync;
Expand Down Expand Up @@ -101,30 +102,30 @@ public void onApplicationEvent(ContextClosedEvent event) {
}

/**
* Overrides the {@link DefaultScenarioExecutorService#startScenario(Long, String, SimulatorScenario, List)} method
* Overrides the {@link DefaultScenarioExecutorService#startScenario(Long, String, SimulatorScenario, List, ExecutionRequestAndResponse)} method
* to execute the scenario asynchronously using the executor service.
*
* @param executionId the unique identifier for the scenario execution
* @param name the name of the scenario to start
* @param scenario the scenario instance to execute
* @param scenarioParameters the list of parameters to pass to the scenario when starting
* @param executionId the unique identifier for the scenario execution
* @param name the name of the scenario to start
* @param scenario the scenario instance to execute
* @param scenarioParameters the list of parameters to pass to the scenario when starting
*/
@Override
public void startScenario(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters) {
startScenarioAsync(executionId, name, scenario, scenarioParameters);
public void startScenario(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse) {
startScenarioAsync(executionId, name, scenario, scenarioParameters, executionRequestAndResponse);
}

/**
* Submits the scenario execution task to the executor service for asynchronous execution.
*
* @param executionId the unique identifier for the scenario execution
* @param name the name of the scenario to start
* @param scenario the scenario instance to execute
* @param scenarioParameters the list of parameters to pass to the scenario when starting
* @param executionId the unique identifier for the scenario execution
* @param name the name of the scenario to start
* @param scenario the scenario instance to execute
* @param scenarioParameters the list of parameters to pass to the scenario when starting
*/
private void startScenarioAsync(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters) {
runAsync(() -> super.startScenario(executionId, name, scenario, scenarioParameters), executorService)
.exceptionally(scenario::registerException);
private void startScenarioAsync(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse) {
runAsync(() -> super.startScenario(executionId, name, scenario, scenarioParameters, executionRequestAndResponse), executorService)
.exceptionally(scenario::registerException);
}

private void shutdownExecutor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class DefaultScenarioExecutorService implements ScenarioExecutorService {

private static final Logger logger = LoggerFactory.getLogger(DefaultScenarioExecutorService.class);

public static final String REQUEST_RESPONSE_MAPPING_VARIABLE_NAME = "scenario-execution-message-to-response-future";

private final ApplicationContext applicationContext;
private final Citrus citrus;
private final ScenarioExecutionService scenarioExecutionService;
Expand All @@ -79,41 +81,44 @@ public DefaultScenarioExecutorService(ApplicationContext applicationContext, Cit
* the provided parameters. This method serves as an entry point for scenario execution, handling the entire
* lifecycle from scenario lookup to execution completion.
*
* @param name the name of the scenario to execute, used to look up the corresponding {@link SimulatorScenario} bean
* @param scenarioParameters a list of {@link ScenarioParameter}s to pass to the scenario, may be {@code null}
* @param name the name of the scenario to execute, used to look up the corresponding {@link SimulatorScenario} bean
* @param scenarioParameters a list of {@link ScenarioParameter}s to pass to the scenario, may be {@code null}
* @param executionRequestAndResponse mapping of request and response message
* @return the unique identifier of the scenario execution, used for tracking and management purposes
*/
@Override
public final Long run(String name, @Nullable List<ScenarioParameter> scenarioParameters) {
return run(applicationContext.getBean(name, SimulatorScenario.class), name, scenarioParameters);
public final Long run(String name, @Nullable List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse) {
return run(applicationContext.getBean(name, SimulatorScenario.class), name, scenarioParameters, executionRequestAndResponse);
}

/**
* Executes the given {@link SimulatorScenario} with the provided name and parameters. This method orchestrates
* the scenario execution process, including pre-execution preparation, scenario execution, and post-execution
* cleanup, ensuring a consistent execution environment for each scenario.
*
* @param scenario the {@link SimulatorScenario} to execute
* @param name the name of the scenario, used for logging and tracking purposes
* @param scenarioParameters a list of {@link ScenarioParameter}s to pass to the scenario, may be {@code null}
* @param scenario the {@link SimulatorScenario} to execute
* @param name the name of the scenario, used for logging and tracking purposes
* @param scenarioParameters a list of {@link ScenarioParameter}s to pass to the scenario, may be {@code null}
* @param executionRequestAndResponse mapping of request and response message
* @return the unique identifier of the scenario execution
*/
@Override
public final Long run(SimulatorScenario scenario, String name, @Nullable List<ScenarioParameter> scenarioParameters) {
public final Long run(SimulatorScenario scenario, String name, @Nullable List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse) {
ScenarioExecution scenarioExecution = scenarioExecutionService.createAndSaveExecutionScenario(name, scenarioParameters);

prepareBeforeExecution(scenario);

startScenario(scenarioExecution.getExecutionId(), name, scenario, scenarioParameters);
startScenario(scenarioExecution.getExecutionId(), name, scenario, scenarioParameters, executionRequestAndResponse);

return scenarioExecution.getExecutionId();
}

protected void startScenario(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters) {
protected void startScenario(Long executionId, String name, SimulatorScenario scenario, List<ScenarioParameter> scenarioParameters, ExecutionRequestAndResponse executionRequestAndResponse) {
logger.info("Starting scenario : {}", name);

var context = createTestContext();
createAndRunScenarioRunner(context, executionId, name, scenario, scenarioParameters);
var testContext = createTestContext();
testContext.setVariable(REQUEST_RESPONSE_MAPPING_VARIABLE_NAME, executionRequestAndResponse);
createAndRunScenarioRunner(testContext, executionId, name, scenario, scenarioParameters);

logger.debug("Scenario completed: {}", name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static java.net.URLDecoder.decode;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Comparator.comparing;
import static org.citrusframework.simulator.service.ScenarioExecutorService.ExecutionRequestAndResponse.NOOP_EXECUTION;
import static org.citrusframework.simulator.web.rest.ScenarioResource.Scenario.ScenarioType.MESSAGE_TRIGGERED;
import static org.citrusframework.simulator.web.rest.ScenarioResource.Scenario.ScenarioType.STARTER;
import static org.citrusframework.simulator.web.util.PaginationUtil.createPage;
Expand Down Expand Up @@ -140,7 +141,7 @@ public Collection<ScenarioParameterDTO> getScenarioParameters(@PathVariable("sce
@PostMapping("scenarios/{scenarioName}/launch")
public Long launchScenario(@NotEmpty @PathVariable("scenarioName") String scenarioName, @RequestBody(required = false) List<ScenarioParameterDTO> scenarioParameters) {
logger.debug("REST request to launch Scenario '{}' with Parameters: {}", scenarioName, scenarioParameters);
return scenarioExecutorService.run(scenarioName, scenarioParameters.stream().map(scenarioParameterMapper::toEntity).toList());
return scenarioExecutorService.run(scenarioName, scenarioParameters.stream().map(scenarioParameterMapper::toEntity).toList(), NOOP_EXECUTION);
}

public record Scenario(String name, ScenarioResource.Scenario.ScenarioType type) {
Expand Down
Loading
Loading