From a30644fbdccce891b59215f5cecf3fceebc23592 Mon Sep 17 00:00:00 2001 From: guptapratykshh Date: Sat, 17 Jan 2026 15:31:39 +0530 Subject: [PATCH 1/4] fix: ensure Topic#publish1 returns accurate result during concurrent closure --- .../src/main/scala/fs2/concurrent/Topic.scala | 17 +++++++++++++++-- .../test/scala/fs2/concurrent/TopicSuite.scala | 2 +- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Topic.scala b/core/shared/src/main/scala/fs2/concurrent/Topic.scala index b069bcfe6f..896bbd39f1 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Topic.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Topic.scala @@ -164,8 +164,21 @@ object Topic { case State.Closed() => Topic.closed.pure[F] case State.Active(subs, _) => - foreach(subs)(_.send(a).void) - .as(Topic.rightUnit) + subs.foldLeft(F.pure(Topic.rightUnit)) { case (acc, (_, chan)) => + acc.flatMap { + case Left(Topic.Closed) => Topic.closed.pure[F] + case Right(_) => + chan.send(a).flatMap { + case Right(_) => Topic.rightUnit.pure[F] + case Left(_) => + // Channel send failed, check if topic was closed + state.get.map { + case State.Closed() => Topic.closed + case State.Active(_, _) => Topic.rightUnit + } + } + } + } } def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]] = diff --git a/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala b/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala index f2d889b92d..c037c32277 100644 --- a/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala @@ -218,7 +218,7 @@ class TopicSuite extends Fs2Suite { // https://github.com/typelevel/fs2/issues/3644 test( - "when publish1 returns success, subscribers must receive the event, even if the publish1 races with close".fail + "when publish1 returns success, subscribers must receive the event, even if the publish1 races with close" ) { val check: IO[Unit] = Topic[IO, String] From 29e4b62871b35cdf356ca536386743e2df1cb9e0 Mon Sep 17 00:00:00 2001 From: guptapratykshh Date: Mon, 2 Feb 2026 09:34:43 +0530 Subject: [PATCH 2/4] Fix race condition between Topic.publish1 and Topic.close --- .../src/main/scala/fs2/concurrent/Topic.scala | 76 ++++++++++++------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Topic.scala b/core/shared/src/main/scala/fs2/concurrent/Topic.scala index 896bbd39f1..7e2c5ecf5e 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Topic.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Topic.scala @@ -152,33 +152,45 @@ object Topic { ( F.ref(State.initial[F, A]), SignallingRef[F, Int](0), + F.deferred[Unit], F.deferred[Unit] - ).mapN { case (state, subscriberCount, signalClosure) => + ).mapN { case (state, subscriberCount, signalClosure, publishersFinished) => new Topic[F, A] { def foreach[B](lm: LongMap[B])(f: B => F[Unit]) = - lm.foldLeft(F.unit) { case (op, (_, b)) => op >> f(b) } + lm.foldLeft(F.unit) { case (op, (_, b)) => f(b) >> op } def publish1(a: A): F[Either[Topic.Closed, Unit]] = - state.get.flatMap { - case State.Closed() => - Topic.closed.pure[F] - case State.Active(subs, _) => - subs.foldLeft(F.pure(Topic.rightUnit)) { case (acc, (_, chan)) => - acc.flatMap { - case Left(Topic.Closed) => Topic.closed.pure[F] - case Right(_) => - chan.send(a).flatMap { - case Right(_) => Topic.rightUnit.pure[F] - case Left(_) => - // Channel send failed, check if topic was closed - state.get.map { - case State.Closed() => Topic.closed - case State.Active(_, _) => Topic.rightUnit - } + state.flatModify { + case s @ State.Active(subs, _, n, false) => + val inc = n + 1 + val newState = s.copy(publishing = inc) + + val sends = subs.foldLeft(F.pure(true)) { case (acc, (_, chan)) => + chan.send(a).map(_.isRight).map2(acc)(_ && _) + } + + val action = sends.flatMap { allSucceeded => + state.flatModify { + case s @ State.Active(subs, _, n, closing) => + val dec = n - 1 + if (dec == 0 && closing) { + val closeAction = foreach(subs)(_.close.void) + (State.Closed(), closeAction >> publishersFinished.complete(()).void) + } else { + (s.copy(publishing = dec), F.unit) } + case s @ State.Closed() => (s, F.unit) + }.map { _ => + if (allSucceeded) Topic.rightUnit else Topic.closed } } + (newState, action) + + case s @ State.Active(_, _, _, true) => + (s, Topic.closed.pure[F]) + case s @ State.Closed() => + (s, Topic.closed.pure[F]) } def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]] = @@ -194,18 +206,20 @@ object Topic { def subscribeAwaitImpl(chan: Channel[F, A]): Resource[F, Stream[F, A]] = { val subscribe: F[Option[Long]] = state.flatModify { - case State.Active(subs, nextId) => - val newState = State.Active(subs.updated(nextId, chan), nextId + 1) + case s @ State.Active(subs, nextId, _, false) => + val newState = s.copy(subscribers = subs.updated(nextId, chan), nextId = nextId + 1) val action = subscriberCount.update(_ + 1) val result = Some(nextId) newState -> action.as(result) + case s @ State.Active(_, _, _, true) => + s -> F.pure(None) case closed @ State.Closed() => closed -> F.pure(None) } def unsubscribe(id: Long): F[Unit] = state.flatModify { - case State.Active(subs, nextId) => + case s @ State.Active(subs, nextId, _, _) => // _After_ we remove the bounded channel for this // subscriber, we need to drain it to unblock to // publish loop which might have already enqueued @@ -215,7 +229,7 @@ object Topic { chan.close >> chan.stream.compile.drain } - State.Active(subs - id, nextId) -> (drainChannel *> subscriberCount.update(_ - 1)) + s.copy(subscribers = subs - id) -> (drainChannel *> subscriberCount.update(_ - 1)) case closed @ State.Closed() => closed -> F.unit @@ -249,9 +263,15 @@ object Topic { def close: F[Either[Topic.Closed, Unit]] = state.flatModify { - case State.Active(subs, _) => - val action = foreach(subs)(_.close.void) *> signalClosure.complete(()) - (State.Closed(), action.as(Topic.rightUnit)) + case s @ State.Active(subs, _, n, false) => + if (n == 0) { + val action = foreach(subs)(_.close.void) *> signalClosure.complete(()) + (State.Closed(), (action >> publishersFinished.complete(())).as(Topic.rightUnit)) + } else { + (s.copy(closing = true), publishersFinished.get.as(Topic.rightUnit)) + } + case s @ State.Active(_, _, _, true) => + (s, publishersFinished.get.as(Topic.rightUnit)) case closed @ State.Closed() => (closed, Topic.closed.pure[F]) } @@ -266,13 +286,15 @@ object Topic { private object State { case class Active[F[_], A]( subscribers: LongMap[Channel[F, A]], - nextId: Long + nextId: Long, + publishing: Long, + closing: Boolean ) extends State[F, A] case class Closed[F[_], A]() extends State[F, A] def initial[F[_], A]: State[F, A] = - Active(LongMap.empty, 1L) + Active(LongMap.empty, 1L, 0L, false) } private final val closed: Either[Closed, Unit] = Left(Closed) From 20a44ae7b585bdf661f3ada5ffd03ab5dd8b8d14 Mon Sep 17 00:00:00 2001 From: guptapratykshh Date: Mon, 2 Feb 2026 09:53:16 +0530 Subject: [PATCH 3/4] Apply scalafmt --- .../src/main/scala/fs2/concurrent/Topic.scala | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Topic.scala b/core/shared/src/main/scala/fs2/concurrent/Topic.scala index 7e2c5ecf5e..3a4d84c548 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Topic.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Topic.scala @@ -171,19 +171,21 @@ object Topic { } val action = sends.flatMap { allSucceeded => - state.flatModify { - case s @ State.Active(subs, _, n, closing) => - val dec = n - 1 - if (dec == 0 && closing) { - val closeAction = foreach(subs)(_.close.void) - (State.Closed(), closeAction >> publishersFinished.complete(()).void) - } else { - (s.copy(publishing = dec), F.unit) - } - case s @ State.Closed() => (s, F.unit) - }.map { _ => - if (allSucceeded) Topic.rightUnit else Topic.closed - } + state + .flatModify { + case s @ State.Active(subs, _, n, closing) => + val dec = n - 1 + if (dec == 0 && closing) { + val closeAction = foreach(subs)(_.close.void) + (State.Closed(), closeAction >> publishersFinished.complete(()).void) + } else { + (s.copy(publishing = dec), F.unit) + } + case s @ State.Closed() => (s, F.unit) + } + .map { _ => + if (allSucceeded) Topic.rightUnit else Topic.closed + } } (newState, action) From eb774c0b7b71d5718662d84108a581afd2603a74 Mon Sep 17 00:00:00 2001 From: guptapratykshh Date: Mon, 2 Feb 2026 10:09:38 +0530 Subject: [PATCH 4/4] Fix unused variable warning in Scala 2.12 --- core/shared/src/main/scala/fs2/concurrent/Topic.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Topic.scala b/core/shared/src/main/scala/fs2/concurrent/Topic.scala index 3a4d84c548..0b17437adb 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Topic.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Topic.scala @@ -221,7 +221,7 @@ object Topic { def unsubscribe(id: Long): F[Unit] = state.flatModify { - case s @ State.Active(subs, nextId, _, _) => + case s @ State.Active(subs, _, _, _) => // _After_ we remove the bounded channel for this // subscriber, we need to drain it to unblock to // publish loop which might have already enqueued