diff --git a/build.sbt b/build.sbt index 9111a15f..a995c4b5 100644 --- a/build.sbt +++ b/build.sbt @@ -133,7 +133,8 @@ lazy val jvmProjects: Seq[ProjectReference] = Seq( agent, bedrock, netty, - test.jvm + test.jvm, + temporalExample ) lazy val jsProjects: Seq[ProjectReference] = Seq(core.js, uni.js, domTest, test.js) @@ -306,3 +307,25 @@ lazy val integrationTest = project ideSkipProject := false ) .dependsOn(bedrock, test.jvm % Test) + +val TEMPORAL_SDK_VERSION = "1.27.0" +val JACKSON_SCALA_VERSION = "2.14.2" + +lazy val temporalExample = project + .in(file("uni-temporal-example")) + .settings( + buildSettings, + noPublish, + name := "uni-temporal-example", + description := "Temporal workflow example for Scala 3 usability exploration", + ideSkipProject := false, + libraryDependencies ++= + Seq( + "io.temporal" % "temporal-sdk" % TEMPORAL_SDK_VERSION, + "io.temporal" % "temporal-testing" % TEMPORAL_SDK_VERSION % Test, + "com.fasterxml.jackson.module" %% "jackson-module-scala" % JACKSON_SCALA_VERSION, + // Redirect slf4j to airframe-log + "org.slf4j" % "slf4j-jdk14" % "2.0.17" + ) + ) + .dependsOn(uni.jvm, test.jvm % Test) diff --git a/plans/2026-04-04-temporal-example.md b/plans/2026-04-04-temporal-example.md new file mode 100644 index 00000000..8607885d --- /dev/null +++ b/plans/2026-04-04-temporal-example.md @@ -0,0 +1,54 @@ +# Temporal Example Project for Scala 3 + +## Objective + +Create a small Temporal example project inside `uni` to evaluate the Temporal workflow engine's usability, SDK ergonomics, and fit for durable workflow orchestration in the Scala 3 ecosystem. + +## Background + +Temporal is a workflow orchestration platform built for durability. Workflows survive crashes; activities are retried automatically. This exploration aims to understand: + +1. How Temporal's Java SDK feels in Scala 3 +2. How to write workflows and activities idiomatically +3. How retries, error handling, and observability work +4. Whether Temporal complements the lightweight `uni-task` approach + +## Module: `uni-temporal-example` + +A JVM-only example module (not published) that demonstrates: + +### Workflows + +1. **HelloWorkflow** — minimal "hello world" to understand the basics +2. **DataPipelineWorkflow** — multi-step pipeline: fetch → transform → store + +### Activities + +- `GreetingActivities` — simple greeting activity +- `DataActivities` — fetch, transform, and store with simulated failures for retry demo + +### Key Concepts Covered + +- Workflow interface + implementation separation (required by Temporal) +- Activity interface + implementation +- `ActivityOptions` with retry policy +- Test server (`TestWorkflowEnvironment`) for unit tests without a real server +- Workflow signals and queries (stretch goal) + +## Implementation Plan + +1. Add `uni-temporal-example` module to `build.sbt` +2. Add Temporal SDK dependencies: + - `io.temporal:temporal-sdk` + - `io.temporal:temporal-testing` (test scope) +3. Implement `HelloWorkflow` with `GreetingActivities` +4. Implement `DataPipelineWorkflow` with `DataActivities` +5. Write `UniTest`-based tests using `TestWorkflowEnvironment` +6. Add a `TemporalWorkerApp` entry point for running against a real local server + +## Findings (to be filled in) + +- SDK ergonomics: +- Test experience: +- Comparison to uni-task: +- Recommendation: diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipeline.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipeline.scala new file mode 100644 index 00000000..f6cdbf8c --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipeline.scala @@ -0,0 +1,34 @@ +package wvlet.uni.temporal.example + +import io.temporal.activity.{ActivityInterface, ActivityMethod} +import io.temporal.workflow.{WorkflowInterface, WorkflowMethod} + +/** Input/output data types for the pipeline. */ +case class PipelineInput(sourceId: String, rawData: String) +case class PipelineResult(sourceId: String, processedData: String, recordCount: Int) + +/** + * A multi-step data pipeline workflow. + * + * Demonstrates sequential activity chaining. Each step is independently retried on failure, and + * the workflow survives worker restarts — Temporal replays history to restore state. + */ +@WorkflowInterface +trait DataPipelineWorkflow: + @WorkflowMethod + def process(input: PipelineInput): PipelineResult + +/** Activities for the data pipeline. Each method is a separately-retried unit of work. */ +@ActivityInterface +trait DataActivities: + /** Fetch raw data from a source. Simulates transient failures to show retry behaviour. */ + @ActivityMethod + def fetchData(sourceId: String): String + + /** Transform/clean the raw data. */ + @ActivityMethod + def transformData(rawData: String): String + + /** Persist the result and return a record count. */ + @ActivityMethod + def storeData(sourceId: String, processedData: String): Int diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala new file mode 100644 index 00000000..66e1fb62 --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/DataPipelineImpl.scala @@ -0,0 +1,69 @@ +package wvlet.uni.temporal.example + +import io.temporal.activity.{Activity, ActivityOptions} +import io.temporal.common.RetryOptions +import io.temporal.workflow.Workflow +import wvlet.uni.log.LogSupport + +import java.time.Duration + +/** + * DataPipeline workflow implementation. + * + * Chaining activities sequentially: fetchData → transformData → storeData. Each activity is + * scheduled with explicit retry options demonstrating Temporal's built-in retry mechanism. + */ +class DataPipelineWorkflowImpl extends DataPipelineWorkflow with LogSupport: + + private val activities: DataActivities = Workflow.newActivityStub( + classOf[DataActivities], + ActivityOptions + .newBuilder() + // Each activity call must complete within 30s + .setStartToCloseTimeout(Duration.ofSeconds(30)) + .setRetryOptions( + RetryOptions + .newBuilder() + // Retry up to 3 times with 1-second initial interval + .setMaximumAttempts(3) + .setInitialInterval(Duration.ofSeconds(1)) + .setBackoffCoefficient(2.0) + .build() + ) + .build() + ) + + override def process(input: PipelineInput): PipelineResult = + // Step 1: fetch + val rawData = activities.fetchData(input.sourceId) + // Step 2: transform + val processedData = activities.transformData(rawData) + // Step 3: store + val recordCount = activities.storeData(input.sourceId, processedData) + PipelineResult(input.sourceId, processedData, recordCount) + +/** + * DataActivities implementation with a simulated transient failure on the first fetch attempt. + * + * Uses Temporal's ActivityExecutionContext to get the server-tracked attempt number, which is the + * recommended approach — it works correctly across different worker instances and survives + * restarts. + */ +class DataActivitiesImpl extends DataActivities with LogSupport: + + override def fetchData(sourceId: String): String = + val attempt = Activity.getExecutionContext().getInfo().getAttempt() + logger.info(s"fetchData attempt ${attempt} for source '${sourceId}'") + if attempt == 1 then + // Simulate a transient failure on the first attempt + throw RuntimeException(s"Transient network error fetching '${sourceId}' (attempt 1)") + s"raw-data-for-${sourceId}" + + override def transformData(rawData: String): String = + logger.info(s"Transforming: ${rawData}") + rawData.toUpperCase.replace("-", "_") + + override def storeData(sourceId: String, processedData: String): Int = + logger.info(s"Storing ${processedData} for source '${sourceId}'") + // Simulate storing: return a fake record count + processedData.length diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflow.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflow.scala new file mode 100644 index 00000000..4f628c71 --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflow.scala @@ -0,0 +1,21 @@ +package wvlet.uni.temporal.example + +import io.temporal.activity.{ActivityInterface, ActivityMethod} +import io.temporal.workflow.{WorkflowInterface, WorkflowMethod} + +/** + * The simplest possible Temporal workflow: greet a user. + * + * Temporal requires workflow and activity interfaces to be annotated with @WorkflowInterface / + * @ActivityInterface. + * Implementations are registered separately with the worker. + */ +@WorkflowInterface +trait HelloWorkflow: + @WorkflowMethod + def sayHello(name: String): String + +@ActivityInterface +trait GreetingActivities: + @ActivityMethod + def composeGreeting(greeting: String, name: String): String diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflowImpl.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflowImpl.scala new file mode 100644 index 00000000..18819900 --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/HelloWorkflowImpl.scala @@ -0,0 +1,33 @@ +package wvlet.uni.temporal.example + +import io.temporal.activity.ActivityOptions +import io.temporal.workflow.Workflow +import wvlet.uni.log.LogSupport + +import java.time.Duration + +/** + * Workflow implementation. + * + * Workflow code must be deterministic — no I/O, random numbers, or wall-clock time. All side + * effects go through activities. Temporal replays workflow history on restarts; non-determinism + * breaks replay. + */ +class HelloWorkflowImpl extends HelloWorkflow with LogSupport: + + // Activity stub: calls are scheduled as Temporal activities, not direct method calls. + private val activities: GreetingActivities = Workflow.newActivityStub( + classOf[GreetingActivities], + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build() + ) + + override def sayHello(name: String): String = activities.composeGreeting("Hello", name) + +/** + * Activity implementation. Activities are the side-effectful units — they do real work and can be + * retried independently. + */ +class GreetingActivitiesImpl extends GreetingActivities with LogSupport: + override def composeGreeting(greeting: String, name: String): String = + logger.info(s"Composing greeting for ${name}") + s"${greeting}, ${name}!" diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/ScalaDataConverter.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/ScalaDataConverter.scala new file mode 100644 index 00000000..f64110d4 --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/ScalaDataConverter.scala @@ -0,0 +1,27 @@ +package wvlet.uni.temporal.example + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import io.temporal.common.converter.{ + DefaultDataConverter, + JacksonJsonPayloadConverter, + PayloadConverter +} + +/** + * Custom DataConverter that registers Jackson's Scala module so Scala 3 case classes can be + * serialized/deserialized by Temporal. + * + * Without this, Jackson cannot construct Scala case class instances because they lack a no-arg + * constructor. The Scala module teaches Jackson about Scala product types, Option, collections, + * etc. + */ +object ScalaDataConverter: + private val objectMapper: ObjectMapper = + val mapper = ObjectMapper() + mapper.registerModule(DefaultScalaModule) + mapper + + val converter: DefaultDataConverter = DefaultDataConverter + .newDefaultInstance() + .withPayloadConverterOverrides(JacksonJsonPayloadConverter(objectMapper)) diff --git a/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala new file mode 100644 index 00000000..d0ac3344 --- /dev/null +++ b/uni-temporal-example/src/main/scala/wvlet/uni/temporal/example/TemporalWorkerApp.scala @@ -0,0 +1,97 @@ +package wvlet.uni.temporal.example + +import io.temporal.client.{WorkflowClient, WorkflowClientOptions, WorkflowOptions} +import io.temporal.serviceclient.WorkflowServiceStubs +import io.temporal.worker.WorkerFactory +import wvlet.uni.log.LogSupport + +import java.util.UUID + +/** Shared task-queue name used by all workers and clients in this example. */ +object TemporalExample: + val TaskQueue = "uni-example-queue" + +/** + * Starts a Temporal worker connected to a locally-running Temporal dev server (`temporal server + * start-dev`). + * + * This is an entry point for manual exploration — run the Temporal dev server first, then run this + * app alongside `TemporalClientApp` to see workflows execute end-to-end. + * + * {{{ + * # Terminal 1 + * temporal server start-dev + * # Terminal 2 + * ./sbt "temporalExample/run" # starts worker + * }}} + */ +object TemporalWorkerApp extends App with LogSupport: + + logger.info("Connecting to local Temporal dev server...") + + val service = WorkflowServiceStubs.newLocalServiceStubs() + val clientOptions = WorkflowClientOptions + .newBuilder() + .setDataConverter(ScalaDataConverter.converter) + .build() + + val client = WorkflowClient.newInstance(service, clientOptions) + val factory = WorkerFactory.newInstance(client) + + val worker = factory.newWorker(TemporalExample.TaskQueue) + + // Register workflow and activity implementations + worker.registerWorkflowImplementationTypes(classOf[HelloWorkflowImpl]) + worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl]) + worker.registerActivitiesImplementations(GreetingActivitiesImpl()) + worker.registerActivitiesImplementations(DataActivitiesImpl()) + + factory.start() + logger.info(s"Worker started on task queue '${TemporalExample.TaskQueue}'. Ctrl+C to stop.") + +/** + * Submits example workflows to the running worker. + * + * Run after `TemporalWorkerApp` is running. + */ +object TemporalClientApp extends App with LogSupport: + + val service = WorkflowServiceStubs.newLocalServiceStubs() + val clientOptions = WorkflowClientOptions + .newBuilder() + .setDataConverter(ScalaDataConverter.converter) + .build() + + val client = WorkflowClient.newInstance(service, clientOptions) + + // --- Hello workflow --- + val helloStub = client.newWorkflowStub( + classOf[HelloWorkflow], + WorkflowOptions + .newBuilder() + .setTaskQueue(TemporalExample.TaskQueue) + .setWorkflowId(s"hello-${UUID.randomUUID()}") + .build() + ) + + val greeting = helloStub.sayHello("Temporal") + logger.info(s"HelloWorkflow result: ${greeting}") + + // --- Data pipeline workflow --- + val pipelineStub = client.newWorkflowStub( + classOf[DataPipelineWorkflow], + WorkflowOptions + .newBuilder() + .setTaskQueue(TemporalExample.TaskQueue) + .setWorkflowId(s"pipeline-${UUID.randomUUID()}") + .build() + ) + + val result = pipelineStub.process(PipelineInput("sensor-42", "raw,data,stream")) + logger.info( + s"DataPipeline result: processed=${result.processedData}, records=${result.recordCount}" + ) + + service.shutdownNow() + +end TemporalClientApp diff --git a/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala new file mode 100644 index 00000000..42f1f7d9 --- /dev/null +++ b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/DataPipelineWorkflowTest.scala @@ -0,0 +1,63 @@ +package wvlet.uni.temporal.example + +import io.temporal.client.WorkflowClientOptions +import io.temporal.client.WorkflowOptions +import io.temporal.testing.TestWorkflowEnvironment +import io.temporal.testing.TestEnvironmentOptions +import wvlet.uni.test.UniTest + +/** + * Tests for DataPipelineWorkflow including retry behaviour. + * + * Each test creates its own TestWorkflowEnvironment so state (e.g. attempt counters in + * DataActivitiesImpl) is isolated between test runs. + */ +class DataPipelineWorkflowTest extends UniTest: + + private def withPipelineStub(testCode: DataPipelineWorkflow => Unit): Unit = + val testEnv = TestWorkflowEnvironment.newInstance( + TestEnvironmentOptions + .newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder().setDataConverter(ScalaDataConverter.converter).build() + ) + .build() + ) + try + val worker = testEnv.newWorker(TemporalExample.TaskQueue) + worker.registerWorkflowImplementationTypes(classOf[DataPipelineWorkflowImpl]) + worker.registerActivitiesImplementations(DataActivitiesImpl()) + testEnv.start() + + val stub = testEnv + .getWorkflowClient + .newWorkflowStub( + classOf[DataPipelineWorkflow], + WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() + ) + testCode(stub) + finally + testEnv.close() + + test("DataPipelineWorkflow completes after transient fetch failure") { + withPipelineStub { stub => + val result = stub.process(PipelineInput("sensor-42", "raw,data,stream")) + + // fetchData fails once, retries, then the pipeline completes + result.sourceId shouldBe "sensor-42" + // transformData uppercases and replaces "-" with "_" + result.processedData shouldBe "RAW_DATA_FOR_SENSOR_42" + // storeData returns processedData.length + result.recordCount shouldBe "RAW_DATA_FOR_SENSOR_42".length + } + } + + test("PipelineResult captures sourceId") { + withPipelineStub { stub => + val result = stub.process(PipelineInput("device-99", "another,raw,payload")) + result.sourceId shouldBe "device-99" + (result.recordCount > 0) shouldBe true + } + } + +end DataPipelineWorkflowTest diff --git a/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala new file mode 100644 index 00000000..ba1cf7c0 --- /dev/null +++ b/uni-temporal-example/src/test/scala/wvlet/uni/temporal/example/HelloWorkflowTest.scala @@ -0,0 +1,47 @@ +package wvlet.uni.temporal.example + +import io.temporal.testing.TestWorkflowEnvironment +import io.temporal.client.WorkflowOptions +import wvlet.uni.test.UniTest + +/** + * Tests for HelloWorkflow using Temporal's in-process test environment. + * + * TestWorkflowEnvironment runs a full Temporal server in memory — no external process needed. This + * allows testing workflow + activity integration without a real Temporal cluster. + */ +class HelloWorkflowTest extends UniTest: + + private def withHelloEnv(testCode: TestWorkflowEnvironment => Unit): Unit = + val testEnv = TestWorkflowEnvironment.newInstance() + try + val worker = testEnv.newWorker(TemporalExample.TaskQueue) + worker.registerWorkflowImplementationTypes(classOf[HelloWorkflowImpl]) + worker.registerActivitiesImplementations(GreetingActivitiesImpl()) + testEnv.start() + testCode(testEnv) + finally + testEnv.close() + + private def newHelloStub(testEnv: TestWorkflowEnvironment): HelloWorkflow = testEnv + .getWorkflowClient + .newWorkflowStub( + classOf[HelloWorkflow], + WorkflowOptions.newBuilder().setTaskQueue(TemporalExample.TaskQueue).build() + ) + + test("HelloWorkflow returns greeting from activity") { + withHelloEnv { testEnv => + newHelloStub(testEnv).sayHello("World") shouldBe "Hello, World!" + } + } + + test("HelloWorkflow greets different names") { + withHelloEnv { testEnv => + // Each stub is bound to a single workflow execution, so a new stub is needed per call + for name <- Seq("Alice", "Bob", "Temporal") do + newHelloStub(testEnv).sayHello(name) shouldBe s"Hello, ${name}!" + } + } + +end HelloWorkflowTest