Skip to content

Add async/reactive variants for AuthProvider and HttpRequestDecorator SPIs #1510

Description

@mcruzdev

Summary

Both AuthProvider.content(...) and WorkflowDecorator.decorate(...) (parent of HttpRequestDecorator) have synchronous contracts:

  • AuthProvider.content() returns String
  • WorkflowDecorator.decorate() returns void

This forces any implementation that needs I/O (e.g., calling an OIDC token endpoint) to block the calling thread. While AuthProvider.content() currently runs inside CompletableFuture.supplyAsync on a worker thread (safe today), HttpRequestDecorator.decorate() runs on the calling thread before the async dispatch — on a Vert.x event-loop thread this throws VertxException: Thread blocked.

Problem

HttpRequestDecorator (critical)

In HttpExecutor.apply() (line 78), decorators run synchronously on the calling thread:

Builder request = target.request();
requestDecorators.forEach(d -> d.decorate(request, workflow, taskContext)); // <-- calling thread
headersMap.ifPresent(...);
return CompletableFuture.supplyAsync(
    () -> requestFunction.apply(request, uri, workflow, taskContext, input),
    workflow.definition().application().executorService());

If a decorator calls a reactive/async API (e.g., Quarkus OidcClient.getTokens() which returns Uni<Tokens>), it must block with .await().indefinitely(). On a Vert.x event-loop thread (@NonBlocking), this throws VertxException: Thread blocked.

AuthProvider (safe today, fragile long-term)

AuthProvider.content() runs inside AbstractRequestExecutor.apply() (line 52), which is called from CompletableFuture.supplyAsync on the executor service thread — so blocking I/O is safe as long as the executor provides worker threads. However:

  • The API contract (String content(...)) is inherently synchronous — there's no way for an implementation to signal it needs async execution
  • The worker-thread guarantee depends on the injected ExecutorService — if the executor were changed to run on event-loop threads, implementations would break silently

Current Execution Flow

HttpExecutor.apply() — CALLING THREAD (may be Vert.x event loop)
│
├─ 1. Build WebTarget (URI + query params)                    [line 72-76]
├─ 2. Create Invocation.Builder                               [line 77]
├─ 3. Run HttpRequestDecorators (SYNC, on calling thread)     [line 78]  ← PROBLEM
├─ 4. Apply task headers                                      [line 79]
│
└─ 5. CompletableFuture.supplyAsync(...) — WORKER THREAD
       │
       └─ AbstractRequestExecutor.apply()
          ├─ 6a. AuthProvider.content() (SYNC, on worker)     [line 52]  ← safe today
          └─ 6b. invokeRequest()                              [line 53]

Proposal

Add default async methods to both interfaces, with the synchronous methods as fallback. This is fully backward compatible — existing implementations continue to work unchanged.

AuthProvider — add contentAsync

// impl/core/src/main/java/io/serverlessworkflow/impl/auth/AuthProvider.java
public interface AuthProvider {

  String scheme();

  String content(WorkflowContext workflow, TaskContext task, WorkflowModel model, URI uri);

  /**
   * Async variant of {@link #content}. Override this method when the token
   * acquisition involves I/O (e.g., calling an OIDC token endpoint).
   * <p>Default implementation delegates to the synchronous {@link #content} method.
   */
  default CompletableFuture<String> contentAsync(
      WorkflowContext workflow, TaskContext task, WorkflowModel model, URI uri) {
    return CompletableFuture.completedFuture(content(workflow, task, model, uri));
  }
}

WorkflowDecorator — add decorateAsync

// impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDecorator.java
public interface WorkflowDecorator<T> extends ServicePriority {

  void decorate(T decorated, WorkflowContext workflowContext, TaskContext taskContext);

  /**
   * Async variant of {@link #decorate}. Override this method when the
   * decoration involves I/O (e.g., fetching a token from an external provider).
   * <p>Default implementation delegates to the synchronous {@link #decorate} method.
   */
  default CompletableFuture<Void> decorateAsync(
      T decorated, WorkflowContext workflowContext, TaskContext taskContext) {
    decorate(decorated, workflowContext, taskContext);
    return CompletableFuture.completedFuture(null);
  }
}

Callsite Changes

HttpExecutor.apply() — chain decorators as async

// Before (line 78, sync, blocks calling thread):
requestDecorators.forEach(d -> d.decorate(request, workflow, taskContext));
return CompletableFuture.supplyAsync(
    () -> requestFunction.apply(request, uri, workflow, taskContext, input),
    workflow.definition().application().executorService());

// After (async, safe on any thread):
CompletableFuture<Void> decorateChain = CompletableFuture.completedFuture(null);
for (HttpRequestDecorator d : requestDecorators) {
    decorateChain = decorateChain.thenCompose(
        v -> d.decorateAsync(request, workflow, taskContext));
}
return decorateChain.thenComposeAsync(
    v -> {
        headersMap.ifPresent(h -> h.apply(workflow, taskContext, input).forEach(request::header));
        return CompletableFuture.supplyAsync(
            () -> requestFunction.apply(request, uri, workflow, taskContext, input),
            workflow.definition().application().executorService());
    },
    workflow.definition().application().executorService());

Note: The headers application (line 79) should also move inside the thenComposeAsync to maintain the correct ordering: decorators → headers → auth → request.

AbstractRequestExecutor.apply() — use contentAsync

// Before (line 52):
authProvider.ifPresent(auth -> addAuthHeader(auth, uri, request, workflow, task, model));

// After (already on worker thread, so .join() is safe):
authProvider.ifPresent(auth -> {
    String parameter = auth.contentAsync(workflow, task, model, uri).join();
    String scheme = auth.scheme();
    task.authorization(scheme, parameter);
    request.header(AuthUtils.AUTH_HEADER_NAME, AuthUtils.authHeaderValue(scheme, parameter));
});

Files to Modify

File Change
impl/core/.../auth/AuthProvider.java Add contentAsync() default method
impl/core/.../WorkflowDecorator.java Add decorateAsync() default method
impl/http/.../HttpExecutor.java Rewrite apply() to chain decorators via decorateAsync()
impl/http/.../AbstractRequestExecutor.java Use contentAsync().join() in addAuthHeader()

Backward Compatibility

  • Fully backward compatible: default methods delegate to the existing sync methods
  • Existing AuthProvider and HttpRequestDecorator implementations require no changes
  • New implementations can override contentAsync()/decorateAsync() to return truly async futures
  • The sync methods remain the primary implementation point for simple (non-I/O) cases
  • No new dependencies — CompletableFuture is in java.util.concurrent

Testing Plan

  1. Existing tests pass unchanged — default methods delegate to sync, so all current behavior is preserved

  2. New test: async decorator does not block calling thread

    • Create a decorator that returns a CompletableFuture completed on a different thread (simulating async I/O)
    • Run HttpExecutor.apply() from a thread that throws on blocking (simulating Vert.x event loop)
    • Assert the decorator completes without blocking
  3. New test: async AuthProvider returns token via future

    • Create an AuthProvider that overrides contentAsync() with a delayed future
    • Assert the token is correctly applied to the Authorization header
  4. New test: mixed sync and async decorators

    • Register both sync-only (only overrides decorate()) and async (overrides decorateAsync()) decorators
    • Assert both execute in priority order and all headers are applied

Context

This issue was identified during the Quarkus Flow OIDC integration (Phase 0 validation). Quarkus Flow uses Quarkus OidcClient which returns reactive types (Uni<Tokens>). The current workaround is to use AuthProviderFactory/AuthProvider (which runs on a worker thread via supplyAsync), avoiding HttpRequestDecorator entirely for auth concerns. This works but limits the decorator SPI's usefulness for any I/O-bound decoration.

Related issues

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Fields

No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions