diff --git a/transact/src/main/java/dev/dbos/transact/AlertHandler.java b/transact/src/main/java/dev/dbos/transact/AlertHandler.java new file mode 100644 index 00000000..5c7fde96 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/AlertHandler.java @@ -0,0 +1,8 @@ +package dev.dbos.transact; + +import java.util.Map; + +@FunctionalInterface +public interface AlertHandler { + void invoke(String name, String message, Map metadata); +} diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java index 68b71a9c..70cccf9d 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOS.java +++ b/transact/src/main/java/dev/dbos/transact/DBOS.java @@ -16,7 +16,15 @@ import dev.dbos.transact.migrations.MigrationManager; import dev.dbos.transact.tempworkflows.InternalWorkflowsService; import dev.dbos.transact.tempworkflows.InternalWorkflowsServiceImpl; -import dev.dbos.transact.workflow.*; +import dev.dbos.transact.workflow.ForkOptions; +import dev.dbos.transact.workflow.ListWorkflowsInput; +import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.StepInfo; +import dev.dbos.transact.workflow.StepOptions; +import dev.dbos.transact.workflow.Workflow; +import dev.dbos.transact.workflow.WorkflowClassName; +import dev.dbos.transact.workflow.WorkflowHandle; +import dev.dbos.transact.workflow.WorkflowStatus; import java.io.IOException; import java.io.InputStream; @@ -79,6 +87,7 @@ public static class Instance { private final WorkflowRegistry workflowRegistry = new WorkflowRegistry(); private final QueueRegistry queueRegistry = new QueueRegistry(); private final Set lifecycleRegistry = ConcurrentHashMap.newKeySet(); + private AlertHandler alertHandler; private DBOSConfig config; @@ -183,6 +192,14 @@ void clearRegistry() { registerInternals(); } + public void registerAlertHandler(AlertHandler handler) { + if (dbosExecutor.get() != null) { + throw new IllegalStateException("Cannot set alert handler after DBOS is launched"); + } + + this.alertHandler = handler; + } + // package private methods for test purposes @Nullable DBOSExecutor getDbosExecutor() { return dbosExecutor.get(); @@ -216,10 +233,11 @@ public void launch() { if (dbosExecutor.compareAndSet(null, executor)) { executor.start( this, - new HashSet(this.lifecycleRegistry), + new HashSet<>(this.lifecycleRegistry), workflowRegistry.getWorkflowSnapshot(), workflowRegistry.getInstanceSnapshot(), - queueRegistry.getSnapshot()); + queueRegistry.getSnapshot(), + alertHandler); } } } @@ -299,6 +317,18 @@ public static void registerLifecycleListener(@NonNull DBOSLifecycleListener list ensureInstance().registerLifecycleListener(listener); } + /** + * Registers an {@link AlertHandler} to handle alerts generated by DBOS. This method must be + * called before DBOS is launched; attempting to register an alert handler after launch will + * result in an {@link IllegalStateException}. + * + * @param handler the {@link AlertHandler} instance to register; must not be null + * @throws IllegalStateException if called after DBOS has been launched + */ + public static void registerAlertHandler(AlertHandler handler) { + ensureInstance().registerAlertHandler(handler); + } + /** * Reinitializes the singleton instance of DBOS with config. For use in tests that reinitialize * DBOS @DBOSConfig config dbos configuration diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 8b4f76f1..32bb6af4 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -1,6 +1,34 @@ package dev.dbos.transact.conductor; -import dev.dbos.transact.conductor.protocol.*; +import dev.dbos.transact.conductor.protocol.AlertRequest; +import dev.dbos.transact.conductor.protocol.BaseMessage; +import dev.dbos.transact.conductor.protocol.BaseResponse; +import dev.dbos.transact.conductor.protocol.CancelRequest; +import dev.dbos.transact.conductor.protocol.DeleteRequest; +import dev.dbos.transact.conductor.protocol.ExecutorInfoResponse; +import dev.dbos.transact.conductor.protocol.ExistPendingWorkflowsRequest; +import dev.dbos.transact.conductor.protocol.ExistPendingWorkflowsResponse; +import dev.dbos.transact.conductor.protocol.ExportWorkflowRequest; +import dev.dbos.transact.conductor.protocol.ExportWorkflowResponse; +import dev.dbos.transact.conductor.protocol.ForkWorkflowRequest; +import dev.dbos.transact.conductor.protocol.ForkWorkflowResponse; +import dev.dbos.transact.conductor.protocol.GetMetricsRequest; +import dev.dbos.transact.conductor.protocol.GetMetricsResponse; +import dev.dbos.transact.conductor.protocol.GetWorkflowRequest; +import dev.dbos.transact.conductor.protocol.GetWorkflowResponse; +import dev.dbos.transact.conductor.protocol.ImportWorkflowRequest; +import dev.dbos.transact.conductor.protocol.ListQueuedWorkflowsRequest; +import dev.dbos.transact.conductor.protocol.ListStepsRequest; +import dev.dbos.transact.conductor.protocol.ListStepsResponse; +import dev.dbos.transact.conductor.protocol.ListWorkflowsRequest; +import dev.dbos.transact.conductor.protocol.MessageType; +import dev.dbos.transact.conductor.protocol.RecoveryRequest; +import dev.dbos.transact.conductor.protocol.RestartRequest; +import dev.dbos.transact.conductor.protocol.ResumeRequest; +import dev.dbos.transact.conductor.protocol.RetentionRequest; +import dev.dbos.transact.conductor.protocol.SuccessResponse; +import dev.dbos.transact.conductor.protocol.WorkflowOutputsResponse; +import dev.dbos.transact.conductor.protocol.WorkflowsOutput; import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.execution.DBOSExecutor; import dev.dbos.transact.json.JSONUtil; @@ -78,22 +106,23 @@ public class Conductor implements AutoCloseable { static { Map>> map = new java.util.EnumMap<>(MessageType.class); - map.put(MessageType.EXECUTOR_INFO, Conductor::handleExecutorInfo); - map.put(MessageType.RECOVERY, Conductor::handleRecovery); + map.put(MessageType.ALERT, Conductor::handleAlert); map.put(MessageType.CANCEL, Conductor::handleCancel); map.put(MessageType.DELETE, Conductor::handleDelete); - map.put(MessageType.RESUME, Conductor::handleResume); - map.put(MessageType.RESTART, Conductor::handleRestart); + map.put(MessageType.EXECUTOR_INFO, Conductor::handleExecutorInfo); + map.put(MessageType.EXIST_PENDING_WORKFLOWS, Conductor::handleExistPendingWorkflows); + map.put(MessageType.EXPORT_WORKFLOW, Conductor::handleExportWorkflow); map.put(MessageType.FORK_WORKFLOW, Conductor::handleFork); - map.put(MessageType.LIST_WORKFLOWS, Conductor::handleListWorkflows); + map.put(MessageType.GET_METRICS, Conductor::handleGetMetrics); + map.put(MessageType.GET_WORKFLOW, Conductor::handleGetWorkflow); + map.put(MessageType.IMPORT_WORKFLOW, Conductor::handleImportWorkflow); map.put(MessageType.LIST_QUEUED_WORKFLOWS, Conductor::handleListQueuedWorkflows); map.put(MessageType.LIST_STEPS, Conductor::handleListSteps); - map.put(MessageType.EXIST_PENDING_WORKFLOWS, Conductor::handleExistPendingWorkflows); - map.put(MessageType.GET_WORKFLOW, Conductor::handleGetWorkflow); + map.put(MessageType.LIST_WORKFLOWS, Conductor::handleListWorkflows); + map.put(MessageType.RECOVERY, Conductor::handleRecovery); + map.put(MessageType.RESTART, Conductor::handleRestart); + map.put(MessageType.RESUME, Conductor::handleResume); map.put(MessageType.RETENTION, Conductor::handleRetention); - map.put(MessageType.GET_METRICS, Conductor::handleGetMetrics); - map.put(MessageType.IMPORT_WORKFLOW, Conductor::handleImportWorkflow); - map.put(MessageType.EXPORT_WORKFLOW, Conductor::handleExportWorkflow); dispatchMap = Collections.unmodifiableMap(map); } @@ -1008,4 +1037,18 @@ static String serializeExportedWorkflows(List workflows) throw return Base64.getEncoder().encodeToString(out.toByteArray()); } + + static CompletableFuture handleAlert(Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + AlertRequest request = (AlertRequest) message; + try { + conductor.dbosExecutor.fireAlertHandler( + request.name, request.message, request.metadata); + return new SuccessResponse(request, true); + } catch (Exception e) { + return new SuccessResponse(request, e); + } + }); + } } diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/AlertRequest.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/AlertRequest.java new file mode 100644 index 00000000..e87e5cd0 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/AlertRequest.java @@ -0,0 +1,19 @@ +package dev.dbos.transact.conductor.protocol; + +import java.util.Map; + +public class AlertRequest extends BaseMessage { + public String name; + public String message; + public Map metadata; + + public AlertRequest() {} + + public AlertRequest(String requestId, String name, String message, Map metadata) { + this.type = MessageType.ALERT.getValue(); + this.request_id = requestId; + this.name = name; + this.message = message; + this.metadata = metadata; + } +} diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java index e4175781..a7a371da 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java @@ -1,6 +1,8 @@ package dev.dbos.transact.conductor.protocol; -import com.fasterxml.jackson.annotation.*; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, @@ -8,22 +10,23 @@ property = "type", visible = true) @JsonSubTypes({ - @JsonSubTypes.Type(value = ExecutorInfoRequest.class, name = "executor_info"), - @JsonSubTypes.Type(value = RecoveryRequest.class, name = "recovery"), + @JsonSubTypes.Type(value = AlertRequest.class, name = "alert"), @JsonSubTypes.Type(value = CancelRequest.class, name = "cancel"), @JsonSubTypes.Type(value = DeleteRequest.class, name = "delete"), - @JsonSubTypes.Type(value = ResumeRequest.class, name = "resume"), - @JsonSubTypes.Type(value = RestartRequest.class, name = "restart"), + @JsonSubTypes.Type(value = ExecutorInfoRequest.class, name = "executor_info"), + @JsonSubTypes.Type(value = ExistPendingWorkflowsRequest.class, name = "exist_pending_workflows"), + @JsonSubTypes.Type(value = ExportWorkflowRequest.class, name = "export_workflow"), @JsonSubTypes.Type(value = ForkWorkflowRequest.class, name = "fork_workflow"), - @JsonSubTypes.Type(value = ListWorkflowsRequest.class, name = "list_workflows"), - @JsonSubTypes.Type(value = ListQueuedWorkflowsRequest.class, name = "list_queued_workflows"), + @JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"), @JsonSubTypes.Type(value = GetWorkflowRequest.class, name = "get_workflow"), - @JsonSubTypes.Type(value = ExistPendingWorkflowsRequest.class, name = "exist_pending_workflows"), + @JsonSubTypes.Type(value = ImportWorkflowRequest.class, name = "import_workflow"), + @JsonSubTypes.Type(value = ListQueuedWorkflowsRequest.class, name = "list_queued_workflows"), @JsonSubTypes.Type(value = ListStepsRequest.class, name = "list_steps"), + @JsonSubTypes.Type(value = ListWorkflowsRequest.class, name = "list_workflows"), + @JsonSubTypes.Type(value = RecoveryRequest.class, name = "recovery"), + @JsonSubTypes.Type(value = RestartRequest.class, name = "restart"), + @JsonSubTypes.Type(value = ResumeRequest.class, name = "resume"), @JsonSubTypes.Type(value = RetentionRequest.class, name = "retention"), - @JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"), - @JsonSubTypes.Type(value = ExportWorkflowRequest.class, name = "export_workflow"), - @JsonSubTypes.Type(value = ImportWorkflowRequest.class, name = "import_workflow"), }) @JsonIgnoreProperties(ignoreUnknown = true) public abstract class BaseMessage { diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java index 508a4316..3efea793 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java @@ -1,22 +1,23 @@ package dev.dbos.transact.conductor.protocol; public enum MessageType { - EXECUTOR_INFO("executor_info"), - RECOVERY("recovery"), + ALERT("alert"), CANCEL("cancel"), DELETE("delete"), - LIST_WORKFLOWS("list_workflows"), - LIST_QUEUED_WORKFLOWS("list_queued_workflows"), - RESUME("resume"), - RESTART("restart"), - GET_WORKFLOW("get_workflow"), + EXECUTOR_INFO("executor_info"), EXIST_PENDING_WORKFLOWS("exist_pending_workflows"), - LIST_STEPS("list_steps"), + EXPORT_WORKFLOW("export_workflow"), FORK_WORKFLOW("fork_workflow"), - RETENTION("retention"), GET_METRICS("get_metrics"), - EXPORT_WORKFLOW("export_workflow"), - IMPORT_WORKFLOW("import_workflow"); + GET_WORKFLOW("get_workflow"), + IMPORT_WORKFLOW("import_workflow"), + LIST_QUEUED_WORKFLOWS("list_queued_workflows"), + LIST_STEPS("list_steps"), + LIST_WORKFLOWS("list_workflows"), + RECOVERY("recovery"), + RESTART("restart"), + RESUME("resume"), + RETENTION("retention"); private final String value; diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index 3319b546..5053a753 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -1,5 +1,6 @@ package dev.dbos.transact.execution; +import dev.dbos.transact.AlertHandler; import dev.dbos.transact.Constants; import dev.dbos.transact.DBOS; import dev.dbos.transact.StartWorkflowOptions; @@ -15,7 +16,11 @@ import dev.dbos.transact.database.Result; import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.database.WorkflowInitResult; -import dev.dbos.transact.exceptions.*; +import dev.dbos.transact.exceptions.DBOSAwaitedWorkflowCancelledException; +import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException; +import dev.dbos.transact.exceptions.DBOSWorkflowCancelledException; +import dev.dbos.transact.exceptions.DBOSWorkflowExecutionConflictException; +import dev.dbos.transact.exceptions.DBOSWorkflowFunctionNotFoundException; import dev.dbos.transact.internal.AppVersionComputer; import dev.dbos.transact.internal.DBOSInvocationHandler; import dev.dbos.transact.internal.Invocation; @@ -51,7 +56,13 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -85,6 +96,7 @@ public class DBOSExecutor implements AutoCloseable { private Conductor conductor; private ExecutorService executorService; private ScheduledExecutorService timeoutScheduler; + private AlertHandler alertHandler; private final AtomicBoolean isRunning = new AtomicBoolean(false); public DBOSExecutor(DBOSConfig config) { @@ -119,7 +131,8 @@ public void start( Set listenerSet, Map workflowMap, Map instanceMap, - List queues) { + List queues, + AlertHandler alertHandler) { if (isRunning.compareAndSet(false, true)) { logger.info("DBOS Executor starting"); @@ -128,6 +141,7 @@ public void start( this.instanceMap = Collections.unmodifiableMap(instanceMap); this.queues = Collections.unmodifiableList(queues); this.listeners = listenerSet; + this.alertHandler = alertHandler; if (this.appVersion == null || this.appVersion.isEmpty()) { List> registeredClasses = @@ -331,6 +345,18 @@ public Optional getQueue(String queueName) { return Optional.empty(); } + public void fireAlertHandler(String name, String message, Map metadata) { + if (alertHandler != null) { + alertHandler.invoke(name, message, metadata); + } else { + logger.warn( + "No AlertHandler configured; dropping alert. name='{}', message='{}', metadata={}", + name, + message, + metadata); + } + } + WorkflowHandle recoverWorkflow(GetPendingWorkflowsOutput output) { Objects.requireNonNull(output, "output must not be null"); String workflowId = Objects.requireNonNull(output.workflowId(), "workflowId must not be null"); diff --git a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java index 9fd167ac..c3273d51 100644 --- a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java +++ b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java @@ -1,13 +1,21 @@ package dev.dbos.transact.conductor; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import dev.dbos.transact.DBOS; import dev.dbos.transact.conductor.TestWebSocketServer.WebSocketTestListener; @@ -280,12 +288,12 @@ public void send(MessageType type, String requestId, Map fields, throws Exception { logger.debug("sending {}", type.getValue()); - Map message = new LinkedHashMap<>(); - message.put("type", Objects.requireNonNull(type).getValue()); - message.put("request_id", Objects.requireNonNull(requestId)); - message.putAll(fields); + Map msg = new LinkedHashMap<>(); + msg.put("type", Objects.requireNonNull(type).getValue()); + msg.put("request_id", Objects.requireNonNull(requestId)); + msg.putAll(fields); - String json = ConductorTest.mapper.writeValueAsString(message); + String json = ConductorTest.mapper.writeValueAsString(msg); if (chunkSize > 0) { sendFragmented(json, chunkSize); } else { @@ -352,12 +360,12 @@ public void onWebsocketMessage(WebSocket conn, Framedata frame) { // Create a large list of steps to exceed 32KB List steps = new ArrayList<>(); for (int i = 0; i < 200; i++) { - var builder = new StringBuilder(1024); - builder.append("output_%d_".formatted(i)); + var stringBuilder = new StringBuilder(1024); + stringBuilder.append("output_%d_".formatted(i)); for (int j = 0; j < 1024; j++) { - builder.append(characters.charAt(random.nextInt(characters.length()))); + stringBuilder.append(characters.charAt(random.nextInt(characters.length()))); } - steps.add(new StepInfo(i, "function" + i, builder.toString(), null, null, null, null)); + steps.add(new StepInfo(i, "function" + i, stringBuilder.toString(), null, null, null, null)); } when(mockExec.listWorkflowSteps("large-wf")).thenReturn(steps); @@ -826,7 +834,7 @@ public void canForkThrow() throws Exception { public void canListWorkflows() throws Exception { MessageListener listener = new MessageListener(); testServer.setListener(listener); - List statuses = new ArrayList(); + List statuses = new ArrayList<>(); statuses.add( new WorkflowStatusBuilder("wf-1") .status(WorkflowState.PENDING) @@ -900,7 +908,7 @@ public void canListWorkflows() throws Exception { public void canListQueuedWorkflows() throws Exception { MessageListener listener = new MessageListener(); testServer.setListener(listener); - List statuses = new ArrayList(); + List statuses = new ArrayList<>(); statuses.add( new WorkflowStatusBuilder("wf-1") .status(WorkflowState.PENDING) @@ -1018,7 +1026,7 @@ public void canExistPendingWorkflows() throws Exception { String executorId = "exec-id"; String appVersion = "app-version"; - List outputs = new ArrayList(); + List outputs = new ArrayList<>(); outputs.add(new GetPendingWorkflowsOutput("wf-1", null)); outputs.add(new GetPendingWorkflowsOutput("wf-2", "queue")); @@ -1051,7 +1059,7 @@ public void canExistPendingWorkflowsFalse() throws Exception { String executorId = "exec-id"; String appVersion = "app-version"; - List outputs = new ArrayList(); + List outputs = new ArrayList<>(); when(mockDB.getPendingWorkflows(executorId, appVersion)).thenReturn(outputs); try (Conductor conductor = builder.build()) { @@ -1086,7 +1094,7 @@ public void canListSteps() throws Exception { testServer.setListener(listener); String workflowId = "workflow-id-1"; - List steps = new ArrayList(); + List steps = new ArrayList<>(); steps.add(new StepInfo(0, "function1", null, null, null, null, null)); steps.add(new StepInfo(1, "function2", null, null, null, null, null)); steps.add(new StepInfo(2, "function3", null, null, null, null, null)); @@ -1613,4 +1621,104 @@ private static List createTestExportedWorkflows() { return workflows; } + + @RetryingTest(3) + public void canAlert() throws Exception { + + MessageListener listener = new MessageListener(); + testServer.setListener(listener); + + try (Conductor conductor = builder.build()) { + conductor.start(); + assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out"); + + Map metadata = Map.of("one", "1", "two", "2", "three", "3"); + Map message = + Map.of( + "name", + "name-value", + "message", + "message-value", + "metadata", + metadata, + "unknown-field", + "unknown-field-value"); + listener.send(MessageType.ALERT, "12345", message); + + assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out"); + + ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(String.class); + @SuppressWarnings("unchecked") + ArgumentCaptor> metadataCaptor = + ArgumentCaptor.forClass((Class>) (Class) Map.class); + + verify(mockExec) + .fireAlertHandler( + nameCaptor.capture(), messageCaptor.capture(), metadataCaptor.capture()); + + assertEquals("name-value", nameCaptor.getValue()); + assertEquals("message-value", messageCaptor.getValue()); + assertEquals(metadata, metadataCaptor.getValue()); + + JsonNode jsonNode = mapper.readTree(listener.message); + assertNotNull(jsonNode); + assertEquals("alert", jsonNode.get("type").asText()); + assertEquals("12345", jsonNode.get("request_id").asText()); + assertNull(jsonNode.get("error_message")); + assertTrue(jsonNode.get("success").asBoolean()); + } + } + + @RetryingTest(3) + public void canAlertThrows() throws Exception { + MessageListener listener = new MessageListener(); + testServer.setListener(listener); + + String errorMessage = "canAlertThrows error"; + doThrow(new RuntimeException(errorMessage)) + .when(mockExec) + .fireAlertHandler(anyString(), anyString(), any()); + + try (Conductor conductor = builder.build()) { + conductor.start(); + assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out"); + + Map metadata = Map.of("one", "1", "two", "2", "three", "3"); + Map message = + Map.of( + "name", + "name-value", + "message", + "message-value", + "metadata", + metadata, + "unknown-field", + "unknown-field-value"); + listener.send(MessageType.ALERT, "12345", message); + + assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out"); + + ArgumentCaptor nameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(String.class); + @SuppressWarnings("unchecked") + ArgumentCaptor> metadataCaptor = + ArgumentCaptor.forClass((Class>) (Class) Map.class); + + verify(mockExec) + .fireAlertHandler( + nameCaptor.capture(), messageCaptor.capture(), metadataCaptor.capture()); + + assertEquals("name-value", nameCaptor.getValue()); + assertEquals("message-value", messageCaptor.getValue()); + assertEquals(metadata, metadataCaptor.getValue()); + + JsonNode jsonNode = mapper.readTree(listener.message); + assertNotNull(jsonNode); + assertEquals("alert", jsonNode.get("type").asText()); + assertEquals("12345", jsonNode.get("request_id").asText()); + assertEquals(errorMessage, jsonNode.get("error_message").asText()); + assertFalse(jsonNode.get("success").asBoolean()); + } + } }