Skip to content

Add collect, flatMap and mapMulti methods to MessageStream API#4627

Open
hjohn wants to merge 2 commits into
mainfrom
feature/ms-flatmap
Open

Add collect, flatMap and mapMulti methods to MessageStream API#4627
hjohn wants to merge 2 commits into
mainfrom
feature/ms-flatmap

Conversation

@hjohn

@hjohn hjohn commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

The addition of these methods further complete the MessageStream API and can avoid some of awkward conactWith / reduce situations.

This PR also contains a 2nd commit that fixes a merge error; FetchResult.Value no longer needs to accept null values now that IgnoreEntriesMessageStream no longer produces null elements (it just skips them internally which was fixed in a previous PR)

@hjohn hjohn requested a review from a team as a code owner June 3, 2026 06:56
@hjohn hjohn requested review from MateuszNaKodach, hatzlj and jangalinski and removed request for a team June 3, 2026 06:56
@hjohn hjohn self-assigned this Jun 3, 2026
@hjohn hjohn added the Priority 2: Should High priority. Ideally, these issues are part of the release they’re assigned to. label Jun 3, 2026
@hjohn hjohn added this to the Release 5.2.0 milestone Jun 3, 2026
@hjohn hjohn force-pushed the feature/ms-flatmap branch from ff65cd5 to d8dac04 Compare June 3, 2026 07:44
* {@code N}, cannot be {@code null}
* @return a stream that is the concatenation of all inner streams produced by {@code mapper}
*/
default <N extends Message> MessageStream<N> flatMap(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SInCE tag needed on MessageStream.flatMap, mapMulti, and collect?

* @see #reduce(Object, BiFunction)
*/
default <C> CompletableFuture<C> collect(Supplier<C> containerSupplier, BiConsumer<? super C, M> accumulator) {
return reduce(containerSupplier.get(), (container, entry) -> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect doesn't Objects.requireNonNull its containerSupplier/accumulator (unlike flatMap/mapMulti)

}

@Override
protected FetchResult<Entry<N>> fetchNext() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchNext isn't synchronized here, but it is in FlatMappedMessageStream and ConcatenatingMessageStream. This isn't a bug: fetchNext is only ever called from AbstractMessageStream.next(), which is synchronized, so by the time we get here the this lock is already held and buffer is safe.

The only thing that bugs me is that the safety depends on a caller in the base class, which isn't visible when you're reading this file, and the three sibling classes don't agree on whether to mark it. Could you either add synchronized to match the others, or leave a one line note saying it relies on next() already holding the lock? I think this will help other readers

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd best make it consistent by removing synchronized there as well; the caller code will ensure it is synchronized (if it needs that), and it may not be locking on the object itself but using a normal local or another object to synchronize on. In that case adding an explicit synchronized on the methods would create a second lock.

I can add a line to fetchNext that the implementation doesn't need be thread-safe (but that is really the default) as abstract message stream deals with that.

* <p>
* This is the {@code MessageStream} equivalent of {@link java.util.stream.Stream#mapMulti(BiConsumer)}. Unlike
* {@link FlatMappedMessageStream}, the mapper is invoked synchronously: all output entries for a given input are
* pushed to the consumer during the same {@code fetchNext} call. This makes it more efficient for the common

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe one line javadoc hint (or on the buffer field) telling callers: if a single input can expand into a large or unbounded number of outputs, prefer flatMap, which streams them lazily, because mapMulti buffers all outputs for one input in memory at once. That way it is a little bit easier for a high volume caller to pick the right solution

* This method inspects the provided {@code delegate} in a non-blocking manner and translates its state into a
* corresponding {@code FetchResult}:
* <ul>
* <li>If {@link MessageStream#hasNextAvailable()} returns {@code true}, this method

@laura-devriendt-lemon laura-devriendt-lemon Jun 11, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a doc line needed for this adding of these methods? A one-line note in docs/reference-guide/modules/release-notes/pages/minor-releases.adoc for 5.2.0 maybe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is interesting enough to deserve a mention in the release notes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

me neither just thought to flag so you could decide maybe together with steven or so

).get(10, TimeUnit.SECONDS);
}

@SuppressWarnings("unchecked")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cast and @SuppressWarnings here can go away I think if you make the helper generic, like private List<? extends Message> collect(MessageStream stream) and call collect(c.stream). Giving the type a name lets ArrayList::new and List::add match up on their own, so there's nothing to cast. Same two-overload shape that reduce already uses. Not a blocker, just reads cleaner.

@laura-devriendt-lemon laura-devriendt-lemon left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just some suggestions

@hjohn hjohn force-pushed the feature/ms-flatmap branch from d8dac04 to a98d75f Compare June 11, 2026 10:01
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Priority 2: Should High priority. Ideally, these issues are part of the release they’re assigned to.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants