Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 64 additions & 5 deletions transact/src/main/java/dev/dbos/transact/DBOS.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.Queue;
import dev.dbos.transact.workflow.SerializationStrategy;
import dev.dbos.transact.workflow.StepInfo;
import dev.dbos.transact.workflow.StepOptions;
import dev.dbos.transact.workflow.Workflow;
Expand Down Expand Up @@ -144,7 +145,13 @@ private void registerClassWorkflows(

String name = wfTag.name().isEmpty() ? method.getName() : wfTag.name();
workflowRegistry.register(
className, name, target, instanceName, method, wfTag.maxRecoveryAttempts());
className,
name,
target,
instanceName,
method,
wfTag.maxRecoveryAttempts(),
wfTag.serializationStrategy());
return name;
}

Expand Down Expand Up @@ -545,6 +552,17 @@ public static <T, E extends Exception> T getResult(@NonNull String workflowId) t
return executor("getWorkflowStatus").getWorkflowStatus(workflowId);
}

/**
* Get the serialization format of the current workflow context.
*
* @return the serialization format name (e.g., "portable_json", "java_jackson"), or null if not
* in a workflow context or using default serialization
*/
public static @Nullable SerializationStrategy getSerialization() {
var ctx = DBOSContextHolder.get();
return ctx != null ? ctx.getSerialization() : null;
}

/**
* Send a message to a workflow
*
Expand All @@ -558,8 +576,33 @@ public static void send(
@NonNull Object message,
@NonNull String topic,
@Nullable String idempotencyKey) {
send(destinationId, message, topic, idempotencyKey, null);
}

/**
* Send a message to a workflow with serialization strategy
*
* @param destinationId recipient of the message
* @param message message to be sent
* @param topic topic to which the message is send
* @param idempotencyKey optional idempotency key for exactly-once send
* @param serialization serialization strategy to use (null for default)
*/
public static void send(
@NonNull String destinationId,
@NonNull Object message,
@NonNull String topic,
@Nullable String idempotencyKey,
@Nullable SerializationStrategy serialization) {
if (serialization == null) serialization = SerializationStrategy.DEFAULT;
executor("send")
.send(destinationId, message, topic, instance().internalWorkflowsService, idempotencyKey);
.send(
destinationId,
message,
topic,
instance().internalWorkflowsService,
idempotencyKey,
serialization);
}

/**
Expand All @@ -571,7 +614,7 @@ public static void send(
*/
public static void send(
@NonNull String destinationId, @NonNull Object message, @NonNull String topic) {
DBOS.send(destinationId, message, topic, null);
DBOS.send(destinationId, message, topic, null, null);
}

/**
Expand All @@ -586,13 +629,29 @@ public static void send(
}

/**
* Call within a workflow to publish a key value pair
* Call within a workflow to publish a key value pair. Uses the workflow's serialization format.
*
* @param key identifier for published data
* @param value data that is published
*/
public static void setEvent(@NonNull String key, @NonNull Object value) {
executor("setEvent").setEvent(key, value);
setEvent(key, value, null);
}

/**
* Call within a workflow to publish a key value pair with a specific serialization strategy.
*
* @param key identifier for published data
* @param value data that is published
* @param serialization serialization strategy to use (null to use workflow's default)
*/
public static void setEvent(
@NonNull String key, @NonNull Object value, @Nullable SerializationStrategy serialization) {
// If no explicit serialization specified, use the workflow context's serialization
if (serialization == null) {
serialization = getSerialization();
}
executor("setEvent").setEvent(key, value, serialization);
}

/**
Expand Down
Loading