From b16763df3a7ceaef9e1edb2856dd0804f8bf0e45 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Sun, 5 Apr 2026 09:00:08 -0700 Subject: [PATCH 1/3] feature: Add Temporal workflow example project for Scala 3 usability exploration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a uni-temporal-example module with two example workflows: - HelloWorkflow: minimal workflow to demonstrate the basics - DataPipelineWorkflow: multi-step pipeline (fetch → transform → store) with retry handling for transient failures Includes ScalaDataConverter for Jackson/Scala 3 case class interop, and tests using Temporal's in-process TestWorkflowEnvironment. Co-Authored-By: Claude Opus 4.6 (1M context) --- build.sbt | 24 +++++- plans/2026-04-04-temporal-example.md | 54 ++++++++++++ .../uni/temporal/example/DataPipeline.scala | 34 ++++++++ .../temporal/example/DataPipelineImpl.scala | 73 ++++++++++++++++ .../uni/temporal/example/HelloWorkflow.scala | 21 +++++ .../temporal/example/HelloWorkflowImpl.scala | 33 +++++++ .../temporal/example/ScalaDataConverter.scala | 27 ++++++ .../temporal/example/TemporalWorkerApp.scala | 86 +++++++++++++++++++ .../example/DataPipelineWorkflowTest.scala | 75 ++++++++++++++++ .../temporal/example/HelloWorkflowTest.scala | 55 ++++++++++++ 10 files changed, 481 insertions(+), 1 deletion(-) create mode 100644 plans/2026-04-04-temporal-example.md create mode 100644 uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipeline.scala create mode 100644 uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala create mode 100644 uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflow.scala create mode 100644 uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflowImpl.scala create mode 100644 uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/ScalaDataConverter.scala create mode 100644 uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala create mode 100644 uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala create mode 100644 uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala diff --git a/build.sbt b/build.sbt index 9111a15f..11dd84b2 100644 --- a/build.sbt +++ b/build.sbt @@ -133,7 +133,8 @@ lazy val jvmProjects: Seq[ProjectReference] = Seq( agent, bedrock, netty, - test.jvm + test.jvm, + temporalExample ) lazy val jsProjects: Seq[ProjectReference] = Seq(core.js, uni.js, domTest, test.js) @@ -306,3 +307,24 @@ lazy val integrationTest = project ideSkipProject := false ) .dependsOn(bedrock, test.jvm % Test) + +val TEMPORAL_SDK_VERSION = "1.27.0" + +lazy val temporalExample = project + .in(file("uni-temporal-example")) + .settings( + buildSettings, + noPublish, + name := "uni-temporal-example", + description := "Temporal workflow example for Scala 3 usability exploration", + ideSkipProject := false, + libraryDependencies ++= + Seq( + "io.temporal" % "temporal-sdk" % TEMPORAL_SDK_VERSION, + "io.temporal" % "temporal-testing" % TEMPORAL_SDK_VERSION % Test, + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.14.2", + // Redirect slf4j to airframe-log + "org.slf4j" % "slf4j-jdk14" % "2.0.17" + ) + ) + .dependsOn(uni.jvm, test.jvm % Test) diff --git a/plans/2026-04-04-temporal-example.md b/plans/2026-04-04-temporal-example.md new file mode 100644 index 00000000..8607885d --- /dev/null +++ b/plans/2026-04-04-temporal-example.md @@ -0,0 +1,54 @@ +# Temporal Example Project for Scala 3 + +## Objective + +Create a small Temporal example project inside `uni` to evaluate the Temporal workflow engine's usability, SDK ergonomics, and fit for durable workflow orchestration in the Scala 3 ecosystem. + +## Background + +Temporal is a workflow orchestration platform built for durability. Workflows survive crashes; activities are retried automatically. This exploration aims to understand: + +1. How Temporal's Java SDK feels in Scala 3 +2. How to write workflows and activities idiomatically +3. How retries, error handling, and observability work +4. Whether Temporal complements the lightweight `uni-task` approach + +## Module: `uni-temporal-example` + +A JVM-only example module (not published) that demonstrates: + +### Workflows + +1. **HelloWorkflow** — minimal "hello world" to understand the basics +2. **DataPipelineWorkflow** — multi-step pipeline: fetch → transform → store + +### Activities + +- `GreetingActivities` — simple greeting activity +- `DataActivities` — fetch, transform, and store with simulated failures for retry demo + +### Key Concepts Covered + +- Workflow interface + implementation separation (required by Temporal) +- Activity interface + implementation +- `ActivityOptions` with retry policy +- Test server (`TestWorkflowEnvironment`) for unit tests without a real server +- Workflow signals and queries (stretch goal) + +## Implementation Plan + +1. Add `uni-temporal-example` module to `build.sbt` +2. Add Temporal SDK dependencies: + - `io.temporal:temporal-sdk` + - `io.temporal:temporal-testing` (test scope) +3. Implement `HelloWorkflow` with `GreetingActivities` +4. Implement `DataPipelineWorkflow` with `DataActivities` +5. Write `UniTest`-based tests using `TestWorkflowEnvironment` +6. Add a `TemporalWorkerApp` entry point for running against a real local server + +## Findings (to be filled in) + +- SDK ergonomics: +- Test experience: +- Comparison to uni-task: +- Recommendation: diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipeline.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipeline.scala new file mode 100644 index 00000000..f6cdbf8c --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipeline.scala @@ -0,0 +1,34 @@ +package wvlet.uni.temporal.example + +import io.temporal.activity.{ActivityInterface, ActivityMethod} +import io.temporal.workflow.{WorkflowInterface, WorkflowMethod} + +/** Input/output data types for the pipeline. */ +case class PipelineInput(sourceId: String, rawData: String) +case class PipelineResult(sourceId: String, processedData: String, recordCount: Int) + +/** + * A multi-step data pipeline workflow. + * + * Demonstrates sequential activity chaining. Each step is independently retried on failure, and + * the workflow survives worker restarts — Temporal replays history to restore state. + */ +@WorkflowInterface +trait DataPipelineWorkflow: + @WorkflowMethod + def process(input: PipelineInput): PipelineResult + +/** Activities for the data pipeline. Each method is a separately-retried unit of work. */ +@ActivityInterface +trait DataActivities: + /** Fetch raw data from a source. Simulates transient failures to show retry behaviour. */ + @ActivityMethod + def fetchData(sourceId: String): String + + /** Transform/clean the raw data. */ + @ActivityMethod + def transformData(rawData: String): String + + /** Persist the result and return a record count. */ + @ActivityMethod + def storeData(sourceId: String, processedData: String): Int diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala new file mode 100644 index 00000000..34226715 --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala @@ -0,0 +1,73 @@ +package wvlet.uni.temporal.example + +import io.temporal.activity.ActivityOptions +import io.temporal.common.RetryOptions +import io.temporal.workflow.Workflow +import wvlet.uni.log.LogSupport + +import java.time.Duration +import java.util.concurrent.atomic.AtomicInteger + +/** + * DataPipeline workflow implementation. + * + * Chaining activities sequentially: fetchData → transformData → storeData. Each activity is + * scheduled with explicit retry options demonstrating Temporal's built-in retry mechanism. + */ +class DataPipelineWorkflowImpl extends DataPipelineWorkflow with LogSupport: + + private val activities: DataActivities = Workflow.newActivityStub( + classOf[DataActivities], + ActivityOptions + .newBuilder() + // Each activity call must complete within 30s + .setStartToCloseTimeout(Duration.ofSeconds(30)) + .setRetryOptions( + RetryOptions + .newBuilder() + // Retry up to 3 times with 1-second initial interval + .setMaximumAttempts(3) + .setInitialInterval(Duration.ofSeconds(1)) + .setBackoffCoefficient(2.0) + .build() + ) + .build() + ) + + override def process(input: PipelineInput): PipelineResult = + // Step 1: fetch + val rawData = activities.fetchData(input.sourceId) + // Step 2: transform + val processedData = activities.transformData(rawData) + // Step 3: store + val recordCount = activities.storeData(input.sourceId, processedData) + PipelineResult(input.sourceId, processedData, recordCount) + +/** + * DataActivities implementation with a simulated transient failure on the first fetch attempt. + * + * The static failCount tracks attempts across activity retries (the activity worker process is the + * same in tests). In production each activity runs in its own thread; Temporal supplies the + * attempt number via `ActivityExecutionContext`. + */ +class DataActivitiesImpl extends DataActivities with LogSupport: + + // Tracks how many times fetchData has been attempted (simulates transient failures) + private val fetchAttempts = AtomicInteger(0) + + override def fetchData(sourceId: String): String = + val attempt = fetchAttempts.incrementAndGet() + logger.info(s"fetchData attempt ${attempt} for source '${sourceId}'") + if attempt == 1 then + // Simulate a transient failure on the first attempt + throw RuntimeException(s"Transient network error fetching '${sourceId}' (attempt 1)") + s"raw-data-for-${sourceId}" + + override def transformData(rawData: String): String = + logger.info(s"Transforming: ${rawData}") + rawData.toUpperCase.replace("-", "_") + + override def storeData(sourceId: String, processedData: String): Int = + logger.info(s"Storing ${processedData} for source '${sourceId}'") + // Simulate storing: return a fake record count + processedData.length diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflow.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflow.scala new file mode 100644 index 00000000..4f628c71 --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflow.scala @@ -0,0 +1,21 @@ +package wvlet.uni.temporal.example + +import io.temporal.activity.{ActivityInterface, ActivityMethod} +import io.temporal.workflow.{WorkflowInterface, WorkflowMethod} + +/** + * The simplest possible Temporal workflow: greet a user. + * + * Temporal requires workflow and activity interfaces to be annotated with @WorkflowInterface / + * @ActivityInterface. + * Implementations are registered separately with the worker. + */ +@WorkflowInterface +trait HelloWorkflow: + @WorkflowMethod + def sayHello(name: String): String + +@ActivityInterface +trait GreetingActivities: + @ActivityMethod + def composeGreeting(greeting: String, name: String): String diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflowImpl.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflowImpl.scala new file mode 100644 index 00000000..18819900 --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflowImpl.scala @@ -0,0 +1,33 @@ +package wvlet.uni.temporal.example + +import io.temporal.activity.ActivityOptions +import io.temporal.workflow.Workflow +import wvlet.uni.log.LogSupport + +import java.time.Duration + +/** + * Workflow implementation. + * + * Workflow code must be deterministic — no I/O, random numbers, or wall-clock time. All side + * effects go through activities. Temporal replays workflow history on restarts; non-determinism + * breaks replay. + */ +class HelloWorkflowImpl extends HelloWorkflow with LogSupport: + + // Activity stub: calls are scheduled as Temporal activities, not direct method calls. + private val activities: GreetingActivities = Workflow.newActivityStub( + classOf[GreetingActivities], + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build() + ) + + override def sayHello(name: String): String = activities.composeGreeting("Hello", name) + +/** + * Activity implementation. Activities are the side-effectful units — they do real work and can be + * retried independently. + */ +class GreetingActivitiesImpl extends GreetingActivities with LogSupport: + override def composeGreeting(greeting: String, name: String): String = + logger.info(s"Composing greeting for ${name}") + s"${greeting}, ${name}!" diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/ScalaDataConverter.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/ScalaDataConverter.scala new file mode 100644 index 00000000..f64110d4 --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/ScalaDataConverter.scala @@ -0,0 +1,27 @@ +package wvlet.uni.temporal.example + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import io.temporal.common.converter.{ + DefaultDataConverter, + JacksonJsonPayloadConverter, + PayloadConverter +} + +/** + * Custom DataConverter that registers Jackson's Scala module so Scala 3 case classes can be + * serialized/deserialized by Temporal. + * + * Without this, Jackson cannot construct Scala case class instances because they lack a no-arg + * constructor. The Scala module teaches Jackson about Scala product types, Option, collections, + * etc. + */ +object ScalaDataConverter: + private val objectMapper: ObjectMapper = + val mapper = ObjectMapper() + mapper.registerModule(DefaultScalaModule) + mapper + + val converter: DefaultDataConverter = DefaultDataConverter + .newDefaultInstance() + .withPayloadConverterOverrides(JacksonJsonPayloadConverter(objectMapper)) diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala new file mode 100644 index 00000000..d5ca5a91 --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala @@ -0,0 +1,86 @@ +package wvlet.uni.temporal.example + +import io.temporal.client.WorkflowClient +import io.temporal.client.WorkflowOptions +import io.temporal.serviceclient.WorkflowServiceStubs +import io.temporal.worker.WorkerFactory +import wvlet.uni.log.LogSupport + +/** Shared task-queue name used by all workers and clients in this example. */ +object TemporalExample: + val TaskQueue = "uni-example-queue" + +/** + * Starts a Temporal worker connected to a locally-running Temporal dev server (`temporal server + * start-dev`). + * + * This is an entry point for manual exploration — run the Temporal dev server first, then run this + * app alongside `TemporalClientApp` to see workflows execute end-to-end. + * + * {{{ + * # Terminal 1 + * temporal server start-dev + * # Terminal 2 + * ./sbt "temporalExample/run" # starts worker + * }}} + */ +object TemporalWorkerApp extends App with LogSupport: + + logger.info("Connecting to local Temporal dev server...") + + val service = WorkflowServiceStubs.newLocalServiceStubs() + val client = WorkflowClient.newInstance(service) + val factory = WorkerFactory.newInstance(client) + + val worker = factory.newWorker(TemporalExample.TaskQueue) + + // Register workflow and activity implementations + worker.registerWorkflowImplementationTypes(classOf[HelloWorkflowImpl]) + worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl]) + worker.registerActivitiesImplementations(GreetingActivitiesImpl()) + worker.registerActivitiesImplementations(DataActivitiesImpl()) + + factory.start() + logger.info(s"Worker started on task queue '${TemporalExample.TaskQueue}'. Ctrl+C to stop.") + +/** + * Submits example workflows to the running worker. + * + * Run after `TemporalWorkerApp` is running. + */ +object TemporalClientApp extends App with LogSupport: + + val service = WorkflowServiceStubs.newLocalServiceStubs() + val client = WorkflowClient.newInstance(service) + + // --- Hello workflow --- + val helloStub = client.newWorkflowStub( + classOf[HelloWorkflow], + WorkflowOptions + .newBuilder() + .setTaskQueue(TemporalExample.TaskQueue) + .setWorkflowId("hello-workflow-1") + .build() + ) + + val greeting = helloStub.sayHello("Temporal") + logger.info(s"HelloWorkflow result: ${greeting}") + + // --- Data pipeline workflow --- + val pipelineStub = client.newWorkflowStub( + classOf[DataPipelineWorkflow], + WorkflowOptions + .newBuilder() + .setTaskQueue(TemporalExample.TaskQueue) + .setWorkflowId("pipeline-workflow-1") + .build() + ) + + val result = pipelineStub.process(PipelineInput("sensor-42", "raw,data,stream")) + logger.info( + s"DataPipeline result: processed=${result.processedData}, records=${result.recordCount}" + ) + + service.shutdownNow() + +end TemporalClientApp diff --git a/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala new file mode 100644 index 00000000..efaf4276 --- /dev/null +++ b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala @@ -0,0 +1,75 @@ +package wvlet.uni.temporal.example + +import io.temporal.client.WorkflowClientOptions +import io.temporal.client.WorkflowOptions +import io.temporal.testing.TestWorkflowEnvironment +import io.temporal.testing.TestEnvironmentOptions +import wvlet.uni.test.UniTest + +/** + * Tests for DataPipelineWorkflow including retry behaviour. + * + * Each test creates its own TestWorkflowEnvironment so state (e.g. attempt counters in + * DataActivitiesImpl) is isolated between test runs. + */ +class DataPipelineWorkflowTest extends UniTest: + + private def newTestEnv(): TestWorkflowEnvironment = TestWorkflowEnvironment.newInstance( + TestEnvironmentOptions + .newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder().setDataConverter(ScalaDataConverter.converter).build() + ) + .build() + ) + + test("DataPipelineWorkflow completes after transient fetch failure") { + val testEnv = newTestEnv() + try + val worker = testEnv.newWorker(TemporalExample.TaskQueue) + worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl]) + worker.registerActivitiesImplementations(DataActivitiesImpl()) + testEnv.start() + + val stub = testEnv + .getWorkflowClient + .newWorkflowStub( + classOf[DataPipelineWorkflow], + WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() + ) + + val result = stub.process(PipelineInput("sensor-42", "raw,data,stream")) + + // fetchData fails once, retries, then the pipeline completes + result.sourceId shouldBe "sensor-42" + // transformData uppercases and replaces "-" with "_" + result.processedData shouldBe "RAW_DATA_FOR_SENSOR_42" + // storeData returns processedData.length + result.recordCount shouldBe "RAW_DATA_FOR_SENSOR_42".length + finally + testEnv.close() + } + + test("PipelineResult captures sourceId") { + val testEnv = newTestEnv() + try + val worker = testEnv.newWorker(TemporalExample.TaskQueue) + worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl]) + worker.registerActivitiesImplementations(DataActivitiesImpl()) + testEnv.start() + + val stub = testEnv + .getWorkflowClient + .newWorkflowStub( + classOf[DataPipelineWorkflow], + WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() + ) + + val result = stub.process(PipelineInput("device-99", "another,raw,payload")) + result.sourceId shouldBe "device-99" + (result.recordCount > 0) shouldBe true + finally + testEnv.close() + } + +end DataPipelineWorkflowTest diff --git a/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala new file mode 100644 index 00000000..a9fd9731 --- /dev/null +++ b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala @@ -0,0 +1,55 @@ +package wvlet.uni.temporal.example + +import io.temporal.testing.TestWorkflowEnvironment +import io.temporal.client.WorkflowOptions +import wvlet.uni.test.UniTest + +/** + * Tests for HelloWorkflow using Temporal's in-process test environment. + * + * TestWorkflowEnvironment runs a full Temporal server in memory — no external process needed. This + * allows testing workflow + activity integration without a real Temporal cluster. + */ +class HelloWorkflowTest extends UniTest: + + test("HelloWorkflow returns greeting from activity") { + val testEnv = TestWorkflowEnvironment.newInstance() + try + val worker = testEnv.newWorker(TemporalExample.TaskQueue) + worker.registerWorkflowImplementationTypes(classOf[HelloWorkflowImpl]) + worker.registerActivitiesImplementations(GreetingActivitiesImpl()) + testEnv.start() + + val stub = testEnv + .getWorkflowClient + .newWorkflowStub( + classOf[HelloWorkflow], + WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() + ) + + val result = stub.sayHello("World") + result shouldBe "Hello, World!" + finally + testEnv.close() + } + + test("HelloWorkflow greets different names") { + val testEnv = TestWorkflowEnvironment.newInstance() + try + val worker = testEnv.newWorker(TemporalExample.TaskQueue) + worker.registerWorkflowImplementationTypes(classOf[HelloWorkflowImpl]) + worker.registerActivitiesImplementations(GreetingActivitiesImpl()) + testEnv.start() + + val client = testEnv.getWorkflowClient + for name <- Seq("Alice", "Bob", "Temporal") do + val stub = client.newWorkflowStub( + classOf[HelloWorkflow], + WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() + ) + stub.sayHello(name) shouldBe s"Hello, ${name}!" + finally + testEnv.close() + } + +end HelloWorkflowTest From 60faeb70529ffe6bf01118d35c94a5f7dc5681c7 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Sun, 5 Apr 2026 09:03:00 -0700 Subject: [PATCH 2/3] fix: Wire ScalaDataConverter into worker/client apps, use unique workflow IDs Address code review findings: - TemporalWorkerApp and TemporalClientApp now use ScalaDataConverter for proper Scala case class serialization - Workflow IDs use UUID to avoid WorkflowExecutionAlreadyStarted on re-runs Co-Authored-By: Claude Opus 4.6 (1M context) --- .../temporal/example/TemporalWorkerApp.scala | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala index d5ca5a91..d0ac3344 100644 --- a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala @@ -1,11 +1,12 @@ package wvlet.uni.temporal.example -import io.temporal.client.WorkflowClient -import io.temporal.client.WorkflowOptions +import io.temporal.client.{WorkflowClient, WorkflowClientOptions, WorkflowOptions} import io.temporal.serviceclient.WorkflowServiceStubs import io.temporal.worker.WorkerFactory import wvlet.uni.log.LogSupport +import java.util.UUID + /** Shared task-queue name used by all workers and clients in this example. */ object TemporalExample: val TaskQueue = "uni-example-queue" @@ -28,8 +29,13 @@ object TemporalWorkerApp extends App with LogSupport: logger.info("Connecting to local Temporal dev server...") - val service = WorkflowServiceStubs.newLocalServiceStubs() - val client = WorkflowClient.newInstance(service) + val service = WorkflowServiceStubs.newLocalServiceStubs() + val clientOptions = WorkflowClientOptions + .newBuilder() + .setDataConverter(ScalaDataConverter.converter) + .build() + + val client = WorkflowClient.newInstance(service, clientOptions) val factory = WorkerFactory.newInstance(client) val worker = factory.newWorker(TemporalExample.TaskQueue) @@ -50,8 +56,13 @@ object TemporalWorkerApp extends App with LogSupport: */ object TemporalClientApp extends App with LogSupport: - val service = WorkflowServiceStubs.newLocalServiceStubs() - val client = WorkflowClient.newInstance(service) + val service = WorkflowServiceStubs.newLocalServiceStubs() + val clientOptions = WorkflowClientOptions + .newBuilder() + .setDataConverter(ScalaDataConverter.converter) + .build() + + val client = WorkflowClient.newInstance(service, clientOptions) // --- Hello workflow --- val helloStub = client.newWorkflowStub( @@ -59,7 +70,7 @@ object TemporalClientApp extends App with LogSupport: WorkflowOptions .newBuilder() .setTaskQueue(TemporalExample.TaskQueue) - .setWorkflowId("hello-workflow-1") + .setWorkflowId(s"hello-${UUID.randomUUID()}") .build() ) @@ -72,7 +83,7 @@ object TemporalClientApp extends App with LogSupport: WorkflowOptions .newBuilder() .setTaskQueue(TemporalExample.TaskQueue) - .setWorkflowId("pipeline-workflow-1") + .setWorkflowId(s"pipeline-${UUID.randomUUID()}") .build() ) From 08a822f421437bf268500f757bd593f419b4363a Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Sun, 5 Apr 2026 09:08:09 -0700 Subject: [PATCH 3/3] refactor: Address review feedback from Gemini and Codex - Use Activity.getExecutionContext() for attempt tracking instead of AtomicInteger - Extract JACKSON_SCALA_VERSION constant in build.sbt - Apply loan pattern to test fixtures to reduce boilerplate - Wire ScalaDataConverter into worker/client apps (already done, refined) Co-Authored-By: Claude Opus 4.6 (1M context) --- build.sbt | 11 +++-- .../temporal/example/DataPipelineImpl.scala | 14 ++---- .../example/DataPipelineWorkflowTest.scala | 46 +++++++------------ .../temporal/example/HelloWorkflowTest.scala | 44 ++++++++---------- 4 files changed, 46 insertions(+), 69 deletions(-) diff --git a/build.sbt b/build.sbt index 11dd84b2..a995c4b5 100644 --- a/build.sbt +++ b/build.sbt @@ -308,7 +308,8 @@ lazy val integrationTest = project ) .dependsOn(bedrock, test.jvm % Test) -val TEMPORAL_SDK_VERSION = "1.27.0" +val TEMPORAL_SDK_VERSION = "1.27.0" +val JACKSON_SCALA_VERSION = "2.14.2" lazy val temporalExample = project .in(file("uni-temporal-example")) @@ -320,11 +321,11 @@ lazy val temporalExample = project ideSkipProject := false, libraryDependencies ++= Seq( - "io.temporal" % "temporal-sdk" % TEMPORAL_SDK_VERSION, - "io.temporal" % "temporal-testing" % TEMPORAL_SDK_VERSION % Test, - "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.14.2", + "io.temporal" % "temporal-sdk" % TEMPORAL_SDK_VERSION, + "io.temporal" % "temporal-testing" % TEMPORAL_SDK_VERSION % Test, + "com.fasterxml.jackson.module" %% "jackson-module-scala" % JACKSON_SCALA_VERSION, // Redirect slf4j to airframe-log - "org.slf4j" % "slf4j-jdk14" % "2.0.17" + "org.slf4j" % "slf4j-jdk14" % "2.0.17" ) ) .dependsOn(uni.jvm, test.jvm % Test) diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala index 34226715..66e1fb62 100644 --- a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala @@ -1,12 +1,11 @@ package wvlet.uni.temporal.example -import io.temporal.activity.ActivityOptions +import io.temporal.activity.{Activity, ActivityOptions} import io.temporal.common.RetryOptions import io.temporal.workflow.Workflow import wvlet.uni.log.LogSupport import java.time.Duration -import java.util.concurrent.atomic.AtomicInteger /** * DataPipeline workflow implementation. @@ -46,17 +45,14 @@ class DataPipelineWorkflowImpl extends DataPipelineWorkflow with LogSupport: /** * DataActivities implementation with a simulated transient failure on the first fetch attempt. * - * The static failCount tracks attempts across activity retries (the activity worker process is the - * same in tests). In production each activity runs in its own thread; Temporal supplies the - * attempt number via `ActivityExecutionContext`. + * Uses Temporal's ActivityExecutionContext to get the server-tracked attempt number, which is the + * recommended approach — it works correctly across different worker instances and survives + * restarts. */ class DataActivitiesImpl extends DataActivities with LogSupport: - // Tracks how many times fetchData has been attempted (simulates transient failures) - private val fetchAttempts = AtomicInteger(0) - override def fetchData(sourceId: String): String = - val attempt = fetchAttempts.incrementAndGet() + val attempt = Activity.getExecutionContext().getInfo().getAttempt() logger.info(s"fetchData attempt ${attempt} for source '${sourceId}'") if attempt == 1 then // Simulate a transient failure on the first attempt diff --git a/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala index efaf4276..42f1f7d9 100644 --- a/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala +++ b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala @@ -14,17 +14,15 @@ import wvlet.uni.test.UniTest */ class DataPipelineWorkflowTest extends UniTest: - private def newTestEnv(): TestWorkflowEnvironment = TestWorkflowEnvironment.newInstance( - TestEnvironmentOptions - .newBuilder() - .setWorkflowClientOptions( - WorkflowClientOptions.newBuilder().setDataConverter(ScalaDataConverter.converter).build() - ) - .build() - ) - - test("DataPipelineWorkflow completes after transient fetch failure") { - val testEnv = newTestEnv() + private def withPipelineStub(testCode: DataPipelineWorkflow => Unit): Unit = + val testEnv = TestWorkflowEnvironment.newInstance( + TestEnvironmentOptions + .newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder().setDataConverter(ScalaDataConverter.converter).build() + ) + .build() + ) try val worker = testEnv.newWorker(TemporalExample.TaskQueue) worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl]) @@ -37,7 +35,12 @@ class DataPipelineWorkflowTest extends UniTest: classOf[DataPipelineWorkflow], WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() ) + testCode(stub) + finally + testEnv.close() + test("DataPipelineWorkflow completes after transient fetch failure") { + withPipelineStub { stub => val result = stub.process(PipelineInput("sensor-42", "raw,data,stream")) // fetchData fails once, retries, then the pipeline completes @@ -46,30 +49,15 @@ class DataPipelineWorkflowTest extends UniTest: result.processedData shouldBe "RAW_DATA_FOR_SENSOR_42" // storeData returns processedData.length result.recordCount shouldBe "RAW_DATA_FOR_SENSOR_42".length - finally - testEnv.close() + } } test("PipelineResult captures sourceId") { - val testEnv = newTestEnv() - try - val worker = testEnv.newWorker(TemporalExample.TaskQueue) - worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl]) - worker.registerActivitiesImplementations(DataActivitiesImpl()) - testEnv.start() - - val stub = testEnv - .getWorkflowClient - .newWorkflowStub( - classOf[DataPipelineWorkflow], - WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() - ) - + withPipelineStub { stub => val result = stub.process(PipelineInput("device-99", "another,raw,payload")) result.sourceId shouldBe "device-99" (result.recordCount > 0) shouldBe true - finally - testEnv.close() + } } end DataPipelineWorkflowTest diff --git a/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala index a9fd9731..ba1cf7c0 100644 --- a/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala +++ b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala @@ -12,44 +12,36 @@ import wvlet.uni.test.UniTest */ class HelloWorkflowTest extends UniTest: - test("HelloWorkflow returns greeting from activity") { + private def withHelloEnv(testCode: TestWorkflowEnvironment => Unit): Unit = val testEnv = TestWorkflowEnvironment.newInstance() try val worker = testEnv.newWorker(TemporalExample.TaskQueue) worker.registerWorkflowImplementationTypes(classOf[HelloWorkflowImpl]) worker.registerActivitiesImplementations(GreetingActivitiesImpl()) testEnv.start() - - val stub = testEnv - .getWorkflowClient - .newWorkflowStub( - classOf[HelloWorkflow], - WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() - ) - - val result = stub.sayHello("World") - result shouldBe "Hello, World!" + testCode(testEnv) finally testEnv.close() + + private def newHelloStub(testEnv: TestWorkflowEnvironment): HelloWorkflow = testEnv + .getWorkflowClient + .newWorkflowStub( + classOf[HelloWorkflow], + WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() + ) + + test("HelloWorkflow returns greeting from activity") { + withHelloEnv { testEnv => + newHelloStub(testEnv).sayHello("World") shouldBe "Hello, World!" + } } test("HelloWorkflow greets different names") { - val testEnv = TestWorkflowEnvironment.newInstance() - try - val worker = testEnv.newWorker(TemporalExample.TaskQueue) - worker.registerWorkflowImplementationTypes(classOf[HelloWorkflowImpl]) - worker.registerActivitiesImplementations(GreetingActivitiesImpl()) - testEnv.start() - - val client = testEnv.getWorkflowClient + withHelloEnv { testEnv => + // Each stub is bound to a single workflow execution, so a new stub is needed per call for name <- Seq("Alice", "Bob", "Temporal") do - val stub = client.newWorkflowStub( - classOf[HelloWorkflow], - WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() - ) - stub.sayHello(name) shouldBe s"Hello, ${name}!" - finally - testEnv.close() + newHelloStub(testEnv).sayHello(name) shouldBe s"Hello, ${name}!" + } } end HelloWorkflowTest