@@ -462,6 +462,8 @@ private record ReactorExchangeResponseFunction(
462462 @ Nullable ReactiveAdapter returnTypeAdapter ,
463463 boolean blockForOptional , @ Nullable Duration blockTimeout ) implements ResponseFunction {
464464
465+ private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow" ;
466+
465467 @ Override
466468 public @ Nullable Object execute (HttpRequestValues requestValues ) {
467469
@@ -491,14 +493,16 @@ public static ResponseFunction create(ReactorHttpExchangeAdapter client, Method
491493 MethodParameter returnParam = new MethodParameter (method , -1 );
492494 Class <?> returnType = returnParam .getParameterType ();
493495 boolean isSuspending = KotlinDetector .isSuspendingFunction (method );
496+ boolean hasFlowReturnType = COROUTINES_FLOW_CLASS_NAME .equals (returnType .getName ());
497+ boolean isUnwrapped = isSuspending && !hasFlowReturnType ;
494498 if (isSuspending ) {
495- returnType = Mono .class ;
499+ returnType = ( hasFlowReturnType ? Flux . class : Mono .class ) ;
496500 }
497501
498502 ReactiveAdapter reactiveAdapter = client .getReactiveAdapterRegistry ().getAdapter (returnType );
499503
500504 MethodParameter actualParam = (reactiveAdapter != null ? returnParam .nested () : returnParam .nestedIfOptional ());
501- Class <?> actualType = isSuspending ? actualParam .getParameterType () : actualParam .getNestedParameterType ();
505+ Class <?> actualType = isUnwrapped ? actualParam .getParameterType () : actualParam .getNestedParameterType ();
502506
503507 Function <HttpRequestValues , Publisher <?>> responseFunction ;
504508 if (ClassUtils .isVoidType (actualType )) {
@@ -511,18 +515,18 @@ else if (actualType.equals(HttpHeaders.class)) {
511515 responseFunction = client ::exchangeForHeadersMono ;
512516 }
513517 else if (actualType .equals (ResponseEntity .class )) {
514- MethodParameter bodyParam = isSuspending ? actualParam : actualParam .nested ();
518+ MethodParameter bodyParam = isUnwrapped ? actualParam : actualParam .nested ();
515519 Class <?> bodyType = bodyParam .getNestedParameterType ();
516520 if (bodyType .equals (Void .class )) {
517521 responseFunction = client ::exchangeForBodilessEntityMono ;
518522 }
519523 else {
520524 ReactiveAdapter bodyAdapter = client .getReactiveAdapterRegistry ().getAdapter (bodyType );
521- responseFunction = initResponseEntityFunction (client , bodyParam , bodyAdapter , isSuspending );
525+ responseFunction = initResponseEntityFunction (client , bodyParam , bodyAdapter , isUnwrapped );
522526 }
523527 }
524528 else {
525- responseFunction = initBodyFunction (client , actualParam , reactiveAdapter , isSuspending );
529+ responseFunction = initBodyFunction (client , actualParam , reactiveAdapter , isUnwrapped );
526530 }
527531
528532 return new ReactorExchangeResponseFunction (
@@ -532,7 +536,7 @@ else if (actualType.equals(ResponseEntity.class)) {
532536 @ SuppressWarnings ("ConstantConditions" )
533537 private static Function <HttpRequestValues , Publisher <?>> initResponseEntityFunction (
534538 ReactorHttpExchangeAdapter client , MethodParameter methodParam ,
535- @ Nullable ReactiveAdapter reactiveAdapter , boolean isSuspending ) {
539+ @ Nullable ReactiveAdapter reactiveAdapter , boolean isUnwrapped ) {
536540
537541 if (reactiveAdapter == null ) {
538542 return request -> client .exchangeForEntityMono (
@@ -543,7 +547,7 @@ private static Function<HttpRequestValues, Publisher<?>> initResponseEntityFunct
543547 "ResponseEntity body must be a concrete value or a multi-value Publisher" );
544548
545549 ParameterizedTypeReference <?> bodyType =
546- ParameterizedTypeReference .forType (isSuspending ? methodParam .nested ().getGenericParameterType () :
550+ ParameterizedTypeReference .forType (isUnwrapped ? methodParam .nested ().getGenericParameterType () :
547551 methodParam .nested ().getNestedGenericParameterType ());
548552
549553 // Shortcut for Flux
0 commit comments