Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions transact/src/main/java/dev/dbos/transact/AlertHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dev.dbos.transact;

import java.util.Map;

@FunctionalInterface
public interface AlertHandler {
void invoke(String name, String message, Map<String, String> metadata);
}
36 changes: 33 additions & 3 deletions transact/src/main/java/dev/dbos/transact/DBOS.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@
import dev.dbos.transact.migrations.MigrationManager;
import dev.dbos.transact.tempworkflows.InternalWorkflowsService;
import dev.dbos.transact.tempworkflows.InternalWorkflowsServiceImpl;
import dev.dbos.transact.workflow.*;
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.Queue;
import dev.dbos.transact.workflow.StepInfo;
import dev.dbos.transact.workflow.StepOptions;
import dev.dbos.transact.workflow.Workflow;
import dev.dbos.transact.workflow.WorkflowClassName;
import dev.dbos.transact.workflow.WorkflowHandle;
import dev.dbos.transact.workflow.WorkflowStatus;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -79,6 +87,7 @@ public static class Instance {
private final WorkflowRegistry workflowRegistry = new WorkflowRegistry();
private final QueueRegistry queueRegistry = new QueueRegistry();
private final Set<DBOSLifecycleListener> lifecycleRegistry = ConcurrentHashMap.newKeySet();
private AlertHandler alertHandler;

private DBOSConfig config;

Expand Down Expand Up @@ -183,6 +192,14 @@ void clearRegistry() {
registerInternals();
}

public void registerAlertHandler(AlertHandler handler) {
if (dbosExecutor.get() != null) {
throw new IllegalStateException("Cannot set alert handler after DBOS is launched");
}

this.alertHandler = handler;
}

// package private methods for test purposes
@Nullable DBOSExecutor getDbosExecutor() {
return dbosExecutor.get();
Expand Down Expand Up @@ -216,10 +233,11 @@ public void launch() {
if (dbosExecutor.compareAndSet(null, executor)) {
executor.start(
this,
new HashSet<DBOSLifecycleListener>(this.lifecycleRegistry),
new HashSet<>(this.lifecycleRegistry),
workflowRegistry.getWorkflowSnapshot(),
workflowRegistry.getInstanceSnapshot(),
queueRegistry.getSnapshot());
queueRegistry.getSnapshot(),
alertHandler);
}
}
}
Expand Down Expand Up @@ -299,6 +317,18 @@ public static void registerLifecycleListener(@NonNull DBOSLifecycleListener list
ensureInstance().registerLifecycleListener(listener);
}

/**
* Registers an {@link AlertHandler} to handle alerts generated by DBOS. This method must be
* called before DBOS is launched; attempting to register an alert handler after launch will
* result in an {@link IllegalStateException}.
*
* @param handler the {@link AlertHandler} instance to register; must not be null
* @throws IllegalStateException if called after DBOS has been launched
*/
public static void registerAlertHandler(AlertHandler handler) {
ensureInstance().registerAlertHandler(handler);
}

/**
* Reinitializes the singleton instance of DBOS with config. For use in tests that reinitialize
* DBOS @DBOSConfig config dbos configuration
Expand Down
65 changes: 54 additions & 11 deletions transact/src/main/java/dev/dbos/transact/conductor/Conductor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,34 @@
package dev.dbos.transact.conductor;

import dev.dbos.transact.conductor.protocol.*;
import dev.dbos.transact.conductor.protocol.AlertRequest;
import dev.dbos.transact.conductor.protocol.BaseMessage;
import dev.dbos.transact.conductor.protocol.BaseResponse;
import dev.dbos.transact.conductor.protocol.CancelRequest;
import dev.dbos.transact.conductor.protocol.DeleteRequest;
import dev.dbos.transact.conductor.protocol.ExecutorInfoResponse;
import dev.dbos.transact.conductor.protocol.ExistPendingWorkflowsRequest;
import dev.dbos.transact.conductor.protocol.ExistPendingWorkflowsResponse;
import dev.dbos.transact.conductor.protocol.ExportWorkflowRequest;
import dev.dbos.transact.conductor.protocol.ExportWorkflowResponse;
import dev.dbos.transact.conductor.protocol.ForkWorkflowRequest;
import dev.dbos.transact.conductor.protocol.ForkWorkflowResponse;
import dev.dbos.transact.conductor.protocol.GetMetricsRequest;
import dev.dbos.transact.conductor.protocol.GetMetricsResponse;
import dev.dbos.transact.conductor.protocol.GetWorkflowRequest;
import dev.dbos.transact.conductor.protocol.GetWorkflowResponse;
import dev.dbos.transact.conductor.protocol.ImportWorkflowRequest;
import dev.dbos.transact.conductor.protocol.ListQueuedWorkflowsRequest;
import dev.dbos.transact.conductor.protocol.ListStepsRequest;
import dev.dbos.transact.conductor.protocol.ListStepsResponse;
import dev.dbos.transact.conductor.protocol.ListWorkflowsRequest;
import dev.dbos.transact.conductor.protocol.MessageType;
import dev.dbos.transact.conductor.protocol.RecoveryRequest;
import dev.dbos.transact.conductor.protocol.RestartRequest;
import dev.dbos.transact.conductor.protocol.ResumeRequest;
import dev.dbos.transact.conductor.protocol.RetentionRequest;
import dev.dbos.transact.conductor.protocol.SuccessResponse;
import dev.dbos.transact.conductor.protocol.WorkflowOutputsResponse;
import dev.dbos.transact.conductor.protocol.WorkflowsOutput;
import dev.dbos.transact.database.SystemDatabase;
import dev.dbos.transact.execution.DBOSExecutor;
import dev.dbos.transact.json.JSONUtil;
Expand Down Expand Up @@ -78,22 +106,23 @@ public class Conductor implements AutoCloseable {
static {
Map<MessageType, BiFunction<Conductor, BaseMessage, CompletableFuture<BaseResponse>>> map =
new java.util.EnumMap<>(MessageType.class);
map.put(MessageType.EXECUTOR_INFO, Conductor::handleExecutorInfo);
map.put(MessageType.RECOVERY, Conductor::handleRecovery);
map.put(MessageType.ALERT, Conductor::handleAlert);
map.put(MessageType.CANCEL, Conductor::handleCancel);
map.put(MessageType.DELETE, Conductor::handleDelete);
map.put(MessageType.RESUME, Conductor::handleResume);
map.put(MessageType.RESTART, Conductor::handleRestart);
map.put(MessageType.EXECUTOR_INFO, Conductor::handleExecutorInfo);
map.put(MessageType.EXIST_PENDING_WORKFLOWS, Conductor::handleExistPendingWorkflows);
map.put(MessageType.EXPORT_WORKFLOW, Conductor::handleExportWorkflow);
map.put(MessageType.FORK_WORKFLOW, Conductor::handleFork);
map.put(MessageType.LIST_WORKFLOWS, Conductor::handleListWorkflows);
map.put(MessageType.GET_METRICS, Conductor::handleGetMetrics);
map.put(MessageType.GET_WORKFLOW, Conductor::handleGetWorkflow);
map.put(MessageType.IMPORT_WORKFLOW, Conductor::handleImportWorkflow);
map.put(MessageType.LIST_QUEUED_WORKFLOWS, Conductor::handleListQueuedWorkflows);
map.put(MessageType.LIST_STEPS, Conductor::handleListSteps);
map.put(MessageType.EXIST_PENDING_WORKFLOWS, Conductor::handleExistPendingWorkflows);
map.put(MessageType.GET_WORKFLOW, Conductor::handleGetWorkflow);
map.put(MessageType.LIST_WORKFLOWS, Conductor::handleListWorkflows);
map.put(MessageType.RECOVERY, Conductor::handleRecovery);
map.put(MessageType.RESTART, Conductor::handleRestart);
map.put(MessageType.RESUME, Conductor::handleResume);
map.put(MessageType.RETENTION, Conductor::handleRetention);
map.put(MessageType.GET_METRICS, Conductor::handleGetMetrics);
map.put(MessageType.IMPORT_WORKFLOW, Conductor::handleImportWorkflow);
map.put(MessageType.EXPORT_WORKFLOW, Conductor::handleExportWorkflow);

dispatchMap = Collections.unmodifiableMap(map);
}
Expand Down Expand Up @@ -1008,4 +1037,18 @@ static String serializeExportedWorkflows(List<ExportedWorkflow> workflows) throw

return Base64.getEncoder().encodeToString(out.toByteArray());
}

static CompletableFuture<BaseResponse> handleAlert(Conductor conductor, BaseMessage message) {
return CompletableFuture.supplyAsync(
() -> {
AlertRequest request = (AlertRequest) message;
try {
conductor.dbosExecutor.fireAlertHandler(
request.name, request.message, request.metadata);
return new SuccessResponse(request, true);
} catch (Exception e) {
return new SuccessResponse(request, e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package dev.dbos.transact.conductor.protocol;

import java.util.Map;

public class AlertRequest extends BaseMessage {
public String name;
public String message;
public Map<String, String> metadata;

public AlertRequest() {}

public AlertRequest(String requestId, String name, String message, Map<String, String> metadata) {
this.type = MessageType.ALERT.getValue();
this.request_id = requestId;
this.name = name;
this.message = message;
this.metadata = metadata;
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
package dev.dbos.transact.conductor.protocol;

import com.fasterxml.jackson.annotation.*;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "type",
visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = ExecutorInfoRequest.class, name = "executor_info"),
@JsonSubTypes.Type(value = RecoveryRequest.class, name = "recovery"),
@JsonSubTypes.Type(value = AlertRequest.class, name = "alert"),
@JsonSubTypes.Type(value = CancelRequest.class, name = "cancel"),
@JsonSubTypes.Type(value = DeleteRequest.class, name = "delete"),
@JsonSubTypes.Type(value = ResumeRequest.class, name = "resume"),
@JsonSubTypes.Type(value = RestartRequest.class, name = "restart"),
@JsonSubTypes.Type(value = ExecutorInfoRequest.class, name = "executor_info"),
@JsonSubTypes.Type(value = ExistPendingWorkflowsRequest.class, name = "exist_pending_workflows"),
@JsonSubTypes.Type(value = ExportWorkflowRequest.class, name = "export_workflow"),
@JsonSubTypes.Type(value = ForkWorkflowRequest.class, name = "fork_workflow"),
@JsonSubTypes.Type(value = ListWorkflowsRequest.class, name = "list_workflows"),
@JsonSubTypes.Type(value = ListQueuedWorkflowsRequest.class, name = "list_queued_workflows"),
@JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"),
@JsonSubTypes.Type(value = GetWorkflowRequest.class, name = "get_workflow"),
@JsonSubTypes.Type(value = ExistPendingWorkflowsRequest.class, name = "exist_pending_workflows"),
@JsonSubTypes.Type(value = ImportWorkflowRequest.class, name = "import_workflow"),
@JsonSubTypes.Type(value = ListQueuedWorkflowsRequest.class, name = "list_queued_workflows"),
@JsonSubTypes.Type(value = ListStepsRequest.class, name = "list_steps"),
@JsonSubTypes.Type(value = ListWorkflowsRequest.class, name = "list_workflows"),
@JsonSubTypes.Type(value = RecoveryRequest.class, name = "recovery"),
@JsonSubTypes.Type(value = RestartRequest.class, name = "restart"),
@JsonSubTypes.Type(value = ResumeRequest.class, name = "resume"),
@JsonSubTypes.Type(value = RetentionRequest.class, name = "retention"),
@JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"),
@JsonSubTypes.Type(value = ExportWorkflowRequest.class, name = "export_workflow"),
@JsonSubTypes.Type(value = ImportWorkflowRequest.class, name = "import_workflow"),
})
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class BaseMessage {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package dev.dbos.transact.conductor.protocol;

public enum MessageType {
EXECUTOR_INFO("executor_info"),
RECOVERY("recovery"),
ALERT("alert"),
CANCEL("cancel"),
DELETE("delete"),
LIST_WORKFLOWS("list_workflows"),
LIST_QUEUED_WORKFLOWS("list_queued_workflows"),
RESUME("resume"),
RESTART("restart"),
GET_WORKFLOW("get_workflow"),
EXECUTOR_INFO("executor_info"),
EXIST_PENDING_WORKFLOWS("exist_pending_workflows"),
LIST_STEPS("list_steps"),
EXPORT_WORKFLOW("export_workflow"),
FORK_WORKFLOW("fork_workflow"),
RETENTION("retention"),
GET_METRICS("get_metrics"),
EXPORT_WORKFLOW("export_workflow"),
IMPORT_WORKFLOW("import_workflow");
GET_WORKFLOW("get_workflow"),
IMPORT_WORKFLOW("import_workflow"),
LIST_QUEUED_WORKFLOWS("list_queued_workflows"),
LIST_STEPS("list_steps"),
LIST_WORKFLOWS("list_workflows"),
RECOVERY("recovery"),
RESTART("restart"),
RESUME("resume"),
RETENTION("retention");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.dbos.transact.execution;

import dev.dbos.transact.AlertHandler;
import dev.dbos.transact.Constants;
import dev.dbos.transact.DBOS;
import dev.dbos.transact.StartWorkflowOptions;
Expand All @@ -15,7 +16,11 @@
import dev.dbos.transact.database.Result;
import dev.dbos.transact.database.SystemDatabase;
import dev.dbos.transact.database.WorkflowInitResult;
import dev.dbos.transact.exceptions.*;
import dev.dbos.transact.exceptions.DBOSAwaitedWorkflowCancelledException;
import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException;
import dev.dbos.transact.exceptions.DBOSWorkflowCancelledException;
import dev.dbos.transact.exceptions.DBOSWorkflowExecutionConflictException;
import dev.dbos.transact.exceptions.DBOSWorkflowFunctionNotFoundException;
import dev.dbos.transact.internal.AppVersionComputer;
import dev.dbos.transact.internal.DBOSInvocationHandler;
import dev.dbos.transact.internal.Invocation;
Expand Down Expand Up @@ -51,7 +56,13 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -85,6 +96,7 @@ public class DBOSExecutor implements AutoCloseable {
private Conductor conductor;
private ExecutorService executorService;
private ScheduledExecutorService timeoutScheduler;
private AlertHandler alertHandler;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

public DBOSExecutor(DBOSConfig config) {
Expand Down Expand Up @@ -119,7 +131,8 @@ public void start(
Set<DBOSLifecycleListener> listenerSet,
Map<String, RegisteredWorkflow> workflowMap,
Map<String, RegisteredWorkflowInstance> instanceMap,
List<Queue> queues) {
List<Queue> queues,
AlertHandler alertHandler) {

if (isRunning.compareAndSet(false, true)) {
logger.info("DBOS Executor starting");
Expand All @@ -128,6 +141,7 @@ public void start(
this.instanceMap = Collections.unmodifiableMap(instanceMap);
this.queues = Collections.unmodifiableList(queues);
this.listeners = listenerSet;
this.alertHandler = alertHandler;

if (this.appVersion == null || this.appVersion.isEmpty()) {
List<Class<?>> registeredClasses =
Expand Down Expand Up @@ -331,6 +345,18 @@ public Optional<Queue> getQueue(String queueName) {
return Optional.empty();
}

public void fireAlertHandler(String name, String message, Map<String, String> metadata) {
if (alertHandler != null) {
alertHandler.invoke(name, message, metadata);
} else {
logger.warn(
"No AlertHandler configured; dropping alert. name='{}', message='{}', metadata={}",
name,
message,
metadata);
}
}

WorkflowHandle<?, ?> recoverWorkflow(GetPendingWorkflowsOutput output) {
Objects.requireNonNull(output, "output must not be null");
String workflowId = Objects.requireNonNull(output.workflowId(), "workflowId must not be null");
Expand Down
Loading