Skip to content

[bugfix] ensure the generated data can be returned in time when in multi flux which have dependency#45

Merged
tomsun28 merged 1 commit intoa2ap:mainfrom
flycloud-hz:fix-multi-flux-async
Oct 13, 2025
Merged

[bugfix] ensure the generated data can be returned in time when in multi flux which have dependency#45
tomsun28 merged 1 commit intoa2ap:mainfrom
flycloud-hz:fix-multi-flux-async

Conversation

@flycloud-hz
Copy link
Copy Markdown
Contributor

@flycloud-hz flycloud-hz commented Sep 10, 2025

When dealing with multi-level Flux connections with dependencies, concurrent execution is required to ensure the generated data can be returned as quickly as possible

What's changed?

@PostMapping(value = "/a2a/server4", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<JSONRPCResponse> dispatchStream4(JSONRPCRequest request) {
    Sinks.Many<JSONRPCResponse> sink2 = Sinks.many().multicast().onBackpressureBuffer();
    Mono<Void> mono = Mono.fromRunnable(() -> {
        log.info("xxx fromRunnable mono-1");
        sink2.tryEmitNext(createResponse("mono-1"));
    }).then(Mono.delay(Duration.ofMillis(1000))).then(Mono.fromRunnable(() -> {
        log.info("xxx fromRunnable mono-2");
        sink2.tryEmitNext(createResponse("mono-2"));
    })).then(Mono.delay(Duration.ofMillis(1000))).then(Mono.fromRunnable(() -> {
        log.info("xxx fromRunnable mono-3");
        sink2.tryEmitNext(createResponse("mono-3"));
    })).then(Mono.delay(Duration.ofMillis(1000))).then(Mono.fromRunnable(() -> {
        log.info("xxx fromRunnable mono complete");
        sink2.tryEmitComplete();
    })).then();
    Flux<JSONRPCResponse> flux2 = sink2.asFlux().timeout(Duration.ofSeconds(10));

    // option-wrong:
    //        return mono.thenMany(flux2)
    //                .doOnNext(event -> log.info("xxx doOnNext {}", event))
    //                .map(event -> {
    //                    log.info("xxx map {}", event);
    //                    return event;
    //                });

    // option-right
    return Flux.merge(mono.thenMany(Mono.empty()), flux2).doOnNext(event -> log.info("xxx doOnNext {}", event))
            .map(event -> {
                log.info("xxx map {}", event);
                return event;
            });
}

The above code describes the returned two different results about the multiple flux which have dependency.

  1. in "option-wrong" way
image

The data is returned in one time almost.

  1. in "option-right" way
image

The data is returned in time.

The data which is emitted by sink should be returned as quickly as possible, so use flux.merge instead of thenMany.

Checklist

  • I have read the Contributing Guide
  • I have written the necessary doc or comment.
  • I have added the necessary unit tests and all cases have passed.

Any Else Note

When dealing with multi-level Flux connections with dependencies, concurrent execution is required to ensure the generated data can be returned as quickly as possible
@flycloud-hz flycloud-hz changed the title [bugfix] ensure the generated data can be returned in time when in multi flux have dependency [bugfix] ensure the generated data can be returned in time when in multi flux which have dependency Sep 10, 2025
@flycloud-hz
Copy link
Copy Markdown
Contributor Author

@tomsun28 Sorry to bother you, but could you please help review this?

Copy link
Copy Markdown
Member

@tomsun28 tomsun28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flycloud-hz Sorry for slow response, LGTM 👍

@tomsun28 tomsun28 merged commit 6ca4df2 into a2ap:main Oct 13, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants