From e664d0ef0d8434938bccdc88e21a79eb6aef3ec4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Bul?= Date: Mon, 30 Mar 2026 14:58:35 +0200 Subject: [PATCH 1/6] - fix error reporting for Schedulers backed by ScheduledExecutorService - fix updating UncaughtExceptionReporter for Scheduler backed by ScheduledThreadPoolExecutor - fix updating UncaughtExceptionReporter for WrappedSchedulers - fix error reporting for wrapped schedulers --- ... AdaptedScheduledThreadPoolExecutor.scala} | 15 +- .../schedulers/ExecutorScheduler.scala | 137 +++++++++++------- .../schedulers/SchedulerCompanionImpl.scala | 30 ++-- .../monix/execution/FeaturesJVMSuite.scala | 21 ++- .../schedulers/ExecutorSchedulerSuite.scala | 70 +++++++-- .../ScheduledExecutorToSchedulerSuite.scala | 2 +- .../schedulers/ReferenceScheduler.scala | 8 +- .../UncaughtExceptionReporterSuite.scala | 38 ++++- 8 files changed, 226 insertions(+), 95 deletions(-) rename monix-execution/jvm/src/main/scala/monix/execution/schedulers/{AdaptedThreadPoolExecutor.scala => AdaptedScheduledThreadPoolExecutor.scala} (73%) diff --git a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedThreadPoolExecutor.scala b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedScheduledThreadPoolExecutor.scala similarity index 73% rename from monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedThreadPoolExecutor.scala rename to monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedScheduledThreadPoolExecutor.scala index 7bde71c5a..57f7cdffc 100644 --- a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedThreadPoolExecutor.scala +++ b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedScheduledThreadPoolExecutor.scala @@ -17,14 +17,18 @@ package monix.execution.schedulers +import monix.execution.UncaughtExceptionReporter + import java.util.concurrent._ /** A mixin for adapting for the Java `ThreadPoolExecutor` implementation * to report errors using the default thread exception handler. */ -private[schedulers] abstract class AdaptedThreadPoolExecutor(corePoolSize: Int, factory: ThreadFactory) - extends ScheduledThreadPoolExecutor(corePoolSize, factory) { - def reportFailure(t: Throwable): Unit +private[schedulers] final class AdaptedScheduledThreadPoolExecutor( + corePoolSize: Int, + factory: ThreadFactory, + reporter: UncaughtExceptionReporter, +) extends ScheduledThreadPoolExecutor(corePoolSize, factory) { override def afterExecute(r: Runnable, t: Throwable): Unit = { super.afterExecute(r, t) @@ -45,6 +49,9 @@ private[schedulers] abstract class AdaptedThreadPoolExecutor(corePoolSize: Int, } } - if (exception ne null) reportFailure(exception) + if (exception ne null) reporter.reportFailure(exception) } + + def withUncaughtExceptionReporter(reporter: UncaughtExceptionReporter): AdaptedScheduledThreadPoolExecutor = + new AdaptedScheduledThreadPoolExecutor(corePoolSize, factory, reporter) } diff --git a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala index 51ea65143..dc23e2573 100644 --- a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala +++ b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala @@ -17,15 +17,16 @@ package monix.execution.schedulers -import java.util.concurrent.{ExecutorService, ForkJoinPool, ScheduledExecutorService} import monix.execution.internal.forkJoin.{AdaptedForkJoinPool, DynamicWorkerThreadFactory, StandardWorkerThreadFactory} import monix.execution.internal.{InterceptRunnable, Platform, ScheduledExecutors} -import monix.execution.{Cancelable, UncaughtExceptionReporter} -import monix.execution.{Features, Scheduler} +import monix.execution.{Cancelable, Features, Scheduler, UncaughtExceptionReporter} + +import java.util.concurrent.{ExecutorService, ScheduledExecutorService} // Prevents conflict with the deprecated symbol import monix.execution.{ExecutionModel => ExecModel} -import scala.concurrent.{ExecutionContext, Future, Promise, blocking} + import scala.concurrent.duration.TimeUnit +import scala.concurrent.{ExecutionContext, Future, Promise, blocking} import scala.util.control.NonFatal /** An [[ExecutorScheduler]] is a class for building a @@ -54,9 +55,10 @@ abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporte awaitOn.execute(new Runnable { override def run() = try blocking { - p.success(e.awaitTermination(timeout, unit)) - () - } catch { + p.success(e.awaitTermination(timeout, unit)) + () + } + catch { case ex if NonFatal(ex) => p.failure(ex); () } @@ -78,10 +80,9 @@ abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporte } object ExecutorScheduler { - /** Builder for an [[ExecutorScheduler]], converting a - * Java `ScheduledExecutorService`. + /** Builder for an [[ExecutorScheduler]], converting a Java `ExecutorService`. * - * @param service is the Java `ScheduledExecutorService` that will take + * @param service is the Java `ExecutorService` that will take * care of scheduling and execution of all runnables. * @param reporter is the [[UncaughtExceptionReporter]] that logs uncaught exceptions. * @param executionModel is the preferred @@ -91,22 +92,58 @@ object ExecutorScheduler { * provided `ExecutorService` implements, see the documentation * for [[monix.execution.Scheduler.features Scheduler.features]] */ + @deprecated("Use ExecutorScheduler.fromExecutorService", "3.4.2-avs.6") def apply( service: ExecutorService, reporter: UncaughtExceptionReporter, executionModel: ExecModel, - features: Features): ExecutorScheduler = { - - // Implementations will inherit BatchingScheduler, so this is guaranteed - val ft = features + Scheduler.BATCHING + features: Features, + ): ExecutorScheduler = service match { - case ref: ScheduledExecutorService => - new FromScheduledExecutor(ref, reporter, executionModel, ft) - case _ => - val s = Defaults.scheduledExecutor - new FromSimpleExecutor(s, service, reporter, executionModel, ft) + case ref: AdaptedScheduledThreadPoolExecutor => scheduledThreadPool(ref, executionModel, features) + case _ => fromExecutorService(service, reporter, executionModel, features) } - } + + /** Builder for an [[ExecutorScheduler]], converting a Java `ExecutorService`. + * + * @param service is the Java `ExecutorService` that will take + * care of scheduling and execution of all runnables. + * @param reporter is the [[UncaughtExceptionReporter]] that logs uncaught exceptions. + * @param executionModel is the preferred + * [[monix.execution.ExecutionModel ExecutionModel]], a guideline + * for run-loops and producers of data. + * @param features is the set of [[Features]] that the + * provided `ExecutorService` implements, see the documentation + * for [[monix.execution.Scheduler.features Scheduler.features]] + */ + def fromExecutorService( + service: ExecutorService, + reporter: UncaughtExceptionReporter, + executionModel: ExecModel, + features: Features, + ): ExecutorScheduler = + new FromSimpleExecutor( + scheduler = Defaults.scheduledExecutor, + executor = service, + reporter = reporter, + executionModel = executionModel, + features = withBatching(features) + ) + + private[schedulers] def scheduledThreadPool( + service: AdaptedScheduledThreadPoolExecutor, + executionModel: ExecModel, + features: Features, + ): ExecutorScheduler = + new FromAdaptedThreadPoolExecutor( + executor = service, + executionModel = executionModel, + features = withBatching(features) + ) + + private def withBatching(features: Features): Features = + // Implementations will inherit BatchingScheduler, so this is guaranteed + features + Scheduler.BATCHING /** * DEPRECATED — provided for binary backwards compatibility. @@ -117,9 +154,10 @@ object ExecutorScheduler { def apply( service: ExecutorService, reporter: UncaughtExceptionReporter, - executionModel: ExecModel): ExecutorScheduler = { + executionModel: ExecModel + ): ExecutorScheduler = { // $COVERAGE-OFF$ - apply(service, reporter, executionModel, Features.empty) + fromExecutorService(service, reporter, executionModel, Features.empty) // $COVERAGE-ON$ } @@ -131,7 +169,8 @@ object ExecutorScheduler { parallelism: Int, daemonic: Boolean, reporter: UncaughtExceptionReporter, - executionModel: ExecModel): ExecutorScheduler = { + executionModel: ExecModel, + ): ExecutorScheduler = { val handler = reporter.asJava val pool = new AdaptedForkJoinPool( @@ -142,7 +181,7 @@ object ExecutorScheduler { asyncMode = true ) - apply(pool, reporter, executionModel, Features.empty) + fromExecutorService(pool, reporter, executionModel, Features.empty) } /** Creates an [[ExecutorScheduler]] backed by a `ForkJoinPool` @@ -154,7 +193,8 @@ object ExecutorScheduler { maxThreads: Int, daemonic: Boolean, reporter: UncaughtExceptionReporter, - executionModel: ExecModel): ExecutorScheduler = { + executionModel: ExecModel, + ): ExecutorScheduler = { val exceptionHandler = reporter.asJava val pool = new AdaptedForkJoinPool( @@ -165,7 +205,7 @@ object ExecutorScheduler { asyncMode = true ) - apply(pool, reporter, executionModel, Features.empty) + fromExecutorService(pool, reporter, executionModel, Features.empty) } /** Converts a Java `ExecutorService`. @@ -178,17 +218,18 @@ object ExecutorScheduler { private final class FromSimpleExecutor( scheduler: ScheduledExecutorService, executor: ExecutorService, - r: UncaughtExceptionReporter, + reporter: UncaughtExceptionReporter, override val executionModel: ExecModel, - override val features: Features) - extends ExecutorScheduler(executor, r) { + override val features: Features + ) extends ExecutorScheduler(executor, reporter) { @deprecated("Provided for backwards compatibility", "3.0.0") def this( scheduler: ScheduledExecutorService, executor: ExecutorService, r: UncaughtExceptionReporter, - executionModel: ExecModel) = { + executionModel: ExecModel, + ) = { // $COVERAGE-OFF$ this(scheduler, executor, r, executionModel, Features.empty) // $COVERAGE-ON$ @@ -198,53 +239,47 @@ object ExecutorScheduler { ScheduledExecutors.scheduleOnce(this, scheduler)(initialDelay, unit, r) override def withExecutionModel(em: ExecModel): SchedulerService = - new FromSimpleExecutor(scheduler, executor, r, em, features) + new FromSimpleExecutor(scheduler, executor, reporter, em, features) override def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): SchedulerService = new FromSimpleExecutor(scheduler, executor, r, executionModel, features) } - /** Converts a Java `ScheduledExecutorService`. */ - private final class FromScheduledExecutor( - s: ScheduledExecutorService, - r: UncaughtExceptionReporter, + /** Implementation of ExecutorScheduler backed by Java `ScheduledExecutorService`. Assumes error reporting is done by + * the underlying `ScheduledExecutorService`. + * + * Currently intended for use only with AdaptedThreadPoolExecutor. + */ + private final class FromAdaptedThreadPoolExecutor( + executor: AdaptedScheduledThreadPoolExecutor, override val executionModel: ExecModel, - override val features: Features) - extends ExecutorScheduler(s, r) { - - @deprecated("Provided for backwards compatibility", "3.0.0") - def this(scheduler: ScheduledExecutorService, r: UncaughtExceptionReporter, executionModel: ExecModel) = { - // $COVERAGE-OFF$ - this(scheduler, r, executionModel, Features.empty) - // $COVERAGE-ON$ - } - - override def executor: ScheduledExecutorService = s + override val features: Features + ) extends ExecutorScheduler(executor, null) { def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = { if (initialDelay <= 0) { execute(r) Cancelable.empty } else { - val task = s.schedule(r, initialDelay, unit) + val task = executor.schedule(r, initialDelay, unit) Cancelable(() => { task.cancel(true); () }) } } override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = { - val task = s.scheduleWithFixedDelay(r, initialDelay, delay, unit) + val task = executor.scheduleWithFixedDelay(r, initialDelay, delay, unit) Cancelable(() => { task.cancel(false); () }) } override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = { - val task = s.scheduleAtFixedRate(r, initialDelay, period, unit) + val task = executor.scheduleAtFixedRate(r, initialDelay, period, unit) Cancelable(() => { task.cancel(false); () }) } override def withExecutionModel(em: ExecModel): SchedulerService = - new FromScheduledExecutor(s, r, em, features) + new FromAdaptedThreadPoolExecutor(executor, em, features) override def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): SchedulerService = - new FromScheduledExecutor(s, r, executionModel, features) + new FromAdaptedThreadPoolExecutor(executor.withUncaughtExceptionReporter(r), executionModel, features) } } diff --git a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/SchedulerCompanionImpl.scala b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/SchedulerCompanionImpl.scala index 3cff896e5..05d9be11b 100644 --- a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/SchedulerCompanionImpl.scala +++ b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/SchedulerCompanionImpl.scala @@ -112,7 +112,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { * @param reporter $reporter */ def apply(executor: ExecutorService, reporter: UncaughtExceptionReporter): SchedulerService = - ExecutorScheduler(executor, reporter, ExecModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService(executor, reporter, ExecModel.Default, Features.empty) /** [[monix.execution.Scheduler Scheduler]] builder that converts a * Java `ExecutorService` into a scheduler. @@ -124,10 +124,8 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { def apply( executor: ExecutorService, reporter: UncaughtExceptionReporter, - executionModel: ExecModel): SchedulerService = { - - ExecutorScheduler(executor, reporter, executionModel, Features.empty) - } + executionModel: ExecModel): SchedulerService = + ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty) /** [[monix.execution.Scheduler Scheduler]] builder that converts a * Java `ExecutorService` into a scheduler. @@ -135,7 +133,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { * @param executor $executorService */ def apply(executor: ExecutorService): SchedulerService = - ExecutorScheduler(executor, UncaughtExceptionReporter.default, ExecModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService(executor, UncaughtExceptionReporter.default, ExecModel.Default, Features.empty) /** [[monix.execution.Scheduler Scheduler]] builder that converts a * Java `ExecutorService` into a scheduler. @@ -144,7 +142,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { * @param executionModel $executionModel */ def apply(executor: ExecutorService, executionModel: ExecModel): SchedulerService = - ExecutorScheduler(executor, UncaughtExceptionReporter.default, executionModel, Features.empty) + ExecutorScheduler.fromExecutorService(executor, UncaughtExceptionReporter.default, executionModel, Features.empty) /** [[monix.execution.Scheduler Scheduler]] builder - uses monix's * default `ScheduledExecutorService` for handling the scheduling of tasks. @@ -298,7 +296,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { new SynchronousQueue[Runnable](false), threadFactory) - ExecutorScheduler(executor, reporter, executionModel, Features.empty) + ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty) } /** Builds a [[Scheduler]] backed by an internal @@ -340,7 +338,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { new SynchronousQueue[Runnable](false), threadFactory) - ExecutorScheduler(executor, reporter, executionModel, Features.empty) + ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty) } /** Builds a [[monix.execution.Scheduler Scheduler]] that schedules and executes tasks on its own thread. @@ -365,12 +363,9 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { executionModel: ExecModel = ExecModel.Default): SchedulerService = { val factory = ThreadFactoryBuilder(name, reporter, daemonic) - val executor = new AdaptedThreadPoolExecutor(1, factory) { - override def reportFailure(t: Throwable): Unit = - reporter.reportFailure(t) - } + val executor = new AdaptedScheduledThreadPoolExecutor(1, factory, reporter) - ExecutorScheduler(executor, null, executionModel, Features.empty) + ExecutorScheduler.scheduledThreadPool(executor, executionModel, Features.empty) } /** Builds a [[monix.execution.Scheduler Scheduler]] with a fixed thread-pool. @@ -394,12 +389,9 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { executionModel: ExecModel = ExecModel.Default): SchedulerService = { val factory = ThreadFactoryBuilder(name, reporter, daemonic) - val executor = new AdaptedThreadPoolExecutor(poolSize, factory) { - override def reportFailure(t: Throwable): Unit = - reporter.reportFailure(t) - } + val executor = new AdaptedScheduledThreadPoolExecutor(poolSize, factory, reporter) - ExecutorScheduler(executor, null, executionModel, Features.empty) + ExecutorScheduler.scheduledThreadPool(executor, executionModel, Features.empty) } /** The explicit global `Scheduler`. Invoke `global` when you want diff --git a/monix-execution/jvm/src/test/scala/monix/execution/FeaturesJVMSuite.scala b/monix-execution/jvm/src/test/scala/monix/execution/FeaturesJVMSuite.scala index dec28e227..134f25617 100644 --- a/monix-execution/jvm/src/test/scala/monix/execution/FeaturesJVMSuite.scala +++ b/monix-execution/jvm/src/test/scala/monix/execution/FeaturesJVMSuite.scala @@ -70,7 +70,12 @@ object FeaturesJVMSuite extends SimpleTestSuite with Checkers { test("ExecutorScheduler(Executor") { val ref = { val ec = Executors.newSingleThreadExecutor() - ExecutorScheduler(ec, UncaughtExceptionReporter.default, ExecutionModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService( + ec, + UncaughtExceptionReporter.default, + ExecutionModel.Default, + Features.empty + ) } try { assert(ref.features.contains(Scheduler.BATCHING)) @@ -83,7 +88,12 @@ object FeaturesJVMSuite extends SimpleTestSuite with Checkers { test("ExecutorScheduler(ScheduledExecutor") { val ref = { val ec = Executors.newSingleThreadScheduledExecutor() - ExecutorScheduler(ec, UncaughtExceptionReporter.default, ExecutionModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService( + ec, + UncaughtExceptionReporter.default, + ExecutionModel.Default, + Features.empty + ) } try { assert(ref.features.contains(Scheduler.BATCHING)) @@ -96,7 +106,12 @@ object FeaturesJVMSuite extends SimpleTestSuite with Checkers { test("ExecutorScheduler(ScheduledExecutor)") { val ref = { val ec = Executors.newSingleThreadScheduledExecutor() - ExecutorScheduler(ec, UncaughtExceptionReporter.default, ExecutionModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService( + ec, + UncaughtExceptionReporter.default, + ExecutionModel.Default, + Features.empty + ) } try { assert(ref.features.contains(Scheduler.BATCHING)) diff --git a/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala b/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala index 56280681a..b98800a61 100644 --- a/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala +++ b/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala @@ -17,16 +17,15 @@ package monix.execution.schedulers -import java.util.concurrent.{CountDownLatch, TimeUnit, TimeoutException} - import minitest.TestSuite import monix.execution.ExecutionModel.{AlwaysAsyncExecution, Default => DefaultExecutionModel} import monix.execution.cancelables.SingleAssignCancelable import monix.execution.exceptions.DummyException import monix.execution.{Cancelable, Scheduler, UncaughtExceptionReporter} +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException} import scala.concurrent.duration._ -import scala.concurrent.{blocking, Await, Promise} +import scala.concurrent.{Await, Promise, blocking} abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self => var lastReportedFailure = null: Throwable @@ -101,7 +100,8 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self } else if (value < 4) { value += 1 } - }) + } + ) assert(Await.result(p.future, 5.second) == 4) } @@ -124,7 +124,8 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self } else if (value < 4) { value += 1 } - }) + } + ) assert(Await.result(p.future, 5.second) == 4) } @@ -188,14 +189,60 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self scheduler.scheduleOnce( 1, TimeUnit.MILLISECONDS, - new Runnable { - override def run() = - throw ex - }) + () => throw ex, + ) + + assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") + self.synchronized(assertEquals(lastReportedFailure, ex)) + } finally { + self.synchronized { + lastReportedFailure = null + lastReportedFailureLatch = null + } + } + } + + test("reports errors on scheduleAtFixedRate") { scheduler => + val latch = new CountDownLatch(1) + self.synchronized { + lastReportedFailure = null + lastReportedFailureLatch = latch + } + + val ex = DummyException("dummy") + val schedule = scheduler.scheduleAtFixedRate(0.seconds, 1.second) { + throw ex + } + try { assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") self.synchronized(assertEquals(lastReportedFailure, ex)) } finally { + schedule.cancel() + self.synchronized { + lastReportedFailure = null + lastReportedFailureLatch = null + } + } + } + + test("reports errors on scheduleWithFixedDelay") { scheduler => + val latch = new CountDownLatch(1) + self.synchronized { + lastReportedFailure = null + lastReportedFailureLatch = latch + } + + val ex = DummyException("dummy") + val schedule = scheduler.scheduleWithFixedDelay(0.seconds, 1.second) { + throw ex + } + + try { + assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") + self.synchronized(assertEquals(lastReportedFailure, ex)) + } finally { + schedule.cancel() self.synchronized { lastReportedFailure = null lastReportedFailureLatch = null @@ -256,3 +303,8 @@ object IOSchedulerSuite extends ExecutorSchedulerSuite { def setup(): SchedulerService = monix.execution.Scheduler.io("monix-tests-io", reporter = testsReporter) } + +object ScheduledExecutorSuite extends ExecutorSchedulerSuite { + def setup(): SchedulerService = + monix.execution.Scheduler(Executors.newSingleThreadScheduledExecutor(), reporter = testsReporter) +} diff --git a/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ScheduledExecutorToSchedulerSuite.scala b/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ScheduledExecutorToSchedulerSuite.scala index c8e549f41..74f0de54f 100644 --- a/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ScheduledExecutorToSchedulerSuite.scala +++ b/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ScheduledExecutorToSchedulerSuite.scala @@ -41,7 +41,7 @@ object ScheduledExecutorToSchedulerSuite extends TestSuite[ExecutorScheduler] { daemonic = true )) - ExecutorScheduler(executor, reporter, ExecModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService(executor, reporter, ExecModel.Default, Features.empty) } override def tearDown(scheduler: ExecutorScheduler): Unit = { diff --git a/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala b/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala index 12041af74..9bcd4234a 100644 --- a/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala +++ b/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala @@ -115,15 +115,15 @@ object ReferenceScheduler { private[this] val reporterRef = if (reporter eq null) s else reporter override def execute(runnable: Runnable): Unit = - s.execute(InterceptRunnable(runnable, reporter)) + s.execute(InterceptRunnable(runnable, reporterRef)) override def reportFailure(t: Throwable): Unit = reporterRef.reportFailure(t) override def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = - s.scheduleOnce(initialDelay, unit, r) + s.scheduleOnce(initialDelay, unit, InterceptRunnable(r, reporterRef)) override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = - s.scheduleWithFixedDelay(initialDelay, delay, unit, r) + s.scheduleWithFixedDelay(initialDelay, delay, unit, InterceptRunnable(r, reporterRef)) override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = - s.scheduleAtFixedRate(initialDelay, period, unit, r) + s.scheduleAtFixedRate(initialDelay, period, unit, InterceptRunnable(r, reporterRef)) override def clockRealTime(unit: TimeUnit): Long = s.clockRealTime(unit) override def clockMonotonic(unit: TimeUnit): Long = diff --git a/monix-execution/shared/src/test/scala/monix/execution/schedulers/UncaughtExceptionReporterSuite.scala b/monix-execution/shared/src/test/scala/monix/execution/schedulers/UncaughtExceptionReporterSuite.scala index 5def437c8..4b5498da0 100644 --- a/monix-execution/shared/src/test/scala/monix/execution/schedulers/UncaughtExceptionReporterSuite.scala +++ b/monix-execution/shared/src/test/scala/monix/execution/schedulers/UncaughtExceptionReporterSuite.scala @@ -17,11 +17,12 @@ package monix.execution.schedulers -import scala.concurrent.{ExecutionContext, Promise} -import scala.concurrent.duration._ import minitest.TestSuite import monix.execution.{ExecutionModel, FutureUtils, Scheduler, UncaughtExceptionReporter} +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Promise} + class UncaughtExceptionReporterBaseSuite extends TestSuite[Promise[Throwable]] { protected val immediateEC = TrampolineExecutionContext.immediate @@ -41,13 +42,42 @@ class UncaughtExceptionReporterBaseSuite extends TestSuite[Promise[Throwable]] { def testReports(name: String)(f: UncaughtExceptionReporter => Scheduler) = { testAsync(name) { p => f(reporter(p)).execute(throwRunnable) - FutureUtils.timeout(p.future.collect { case Dummy => }(immediateEC), 500.millis)(Scheduler.global) + assertExceptionCaught(p) } testAsync(name + ".withUncaughtExceptionReporter") { p => f(UncaughtExceptionReporter.default).withUncaughtExceptionReporter(reporter(p)).execute(throwRunnable) - FutureUtils.timeout(p.future.collect { case Dummy => }(immediateEC), 500.millis)(Scheduler.global) + assertExceptionCaught(p) + } + + testAsync(name + ".withUncaughtExceptionReporter + scheduleOnce") { p => + f(UncaughtExceptionReporter.default) + .withUncaughtExceptionReporter(reporter(p)) + .scheduleOnce(1.milli)(throwRunnable.run()) + assertExceptionCaught(p) } + + testAsync(name + ".withUncaughtExceptionReporter + scheduleAtFixedRate") { p => + val schedule = f(UncaughtExceptionReporter.default) + .withUncaughtExceptionReporter(reporter(p)) + .scheduleAtFixedRate(0.millis, 1.second)(throwRunnable.run()) + val result = assertExceptionCaught(p) + result.onComplete(_ => schedule.cancel())(immediateEC) + result + } + + testAsync(name + ".withUncaughtExceptionReporter + scheduleWithFixedDelay") { p => + val schedule = f(UncaughtExceptionReporter.default) + .withUncaughtExceptionReporter(reporter(p)) + .scheduleWithFixedDelay(0.millis, 1.second)(throwRunnable.run()) + val result = assertExceptionCaught(p) + result.onComplete(_ => schedule.cancel())(immediateEC) + result + } + } + + private def assertExceptionCaught(p: Promise[Throwable]) = { + FutureUtils.timeout(p.future.collect { case Dummy => }(immediateEC), 500.millis)(Scheduler.global) } } From ee75b3bdc252b96f996be708d12f9c6ca7066426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Bul?= Date: Mon, 30 Mar 2026 16:16:44 +0200 Subject: [PATCH 2/6] update MimaFilters --- project/MimaFilters.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/MimaFilters.scala b/project/MimaFilters.scala index 48a1ae9f5..0c99819a4 100644 --- a/project/MimaFilters.scala +++ b/project/MimaFilters.scala @@ -105,6 +105,8 @@ object MimaFilters { // TrampolineExecutionContext signature tweaks (internal API) exclude[IncompatibleMethTypeProblem]("monix.execution.schedulers.TrampolineExecutionContext#JVMNormalTrampoline.startLoop"), exclude[IncompatibleMethTypeProblem]("monix.execution.schedulers.TrampolineExecutionContext#JVMOptimalTrampoline.startLoop"), - exclude[IncompatibleMethTypeProblem]("monix.execution.schedulers.TrampolineExecutionContext.this") + exclude[IncompatibleMethTypeProblem]("monix.execution.schedulers.TrampolineExecutionContext.this"), + exclude[MissingClassProblem]("monix.execution.schedulers.AdaptedThreadPoolExecutor"), + exclude[MissingClassProblem]("monix.execution.schedulers.ExecutorScheduler$FromScheduledExecutor"), ) } From 227c7b740529cf493ba0d24d1659624e0448c826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Bul?= Date: Mon, 30 Mar 2026 21:06:26 +0200 Subject: [PATCH 3/6] DRY BATCHING --- .../schedulers/ExecutorScheduler.scala | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala index dc23e2573..da5575f93 100644 --- a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala +++ b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala @@ -33,9 +33,11 @@ import scala.util.control.NonFatal * [[monix.execution.schedulers.SchedulerService SchedulerService]] * out of a Java `ExecutorService`. */ -abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporter) +abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporter, additionalFeatures: Features) extends SchedulerService with ReferenceScheduler with BatchingScheduler { + override final val features: Features = additionalFeatures + Scheduler.BATCHING + /** Returns the underlying `ExecutorService` reference. */ def executor: ExecutorService = e @@ -127,7 +129,7 @@ object ExecutorScheduler { executor = service, reporter = reporter, executionModel = executionModel, - features = withBatching(features) + additionalFeatures = features, ) private[schedulers] def scheduledThreadPool( @@ -138,13 +140,9 @@ object ExecutorScheduler { new FromAdaptedThreadPoolExecutor( executor = service, executionModel = executionModel, - features = withBatching(features) + additionalFeatures = features, ) - private def withBatching(features: Features): Features = - // Implementations will inherit BatchingScheduler, so this is guaranteed - features + Scheduler.BATCHING - /** * DEPRECATED — provided for binary backwards compatibility. * @@ -220,8 +218,8 @@ object ExecutorScheduler { executor: ExecutorService, reporter: UncaughtExceptionReporter, override val executionModel: ExecModel, - override val features: Features - ) extends ExecutorScheduler(executor, reporter) { + additionalFeatures: Features, + ) extends ExecutorScheduler(executor, reporter, additionalFeatures) { @deprecated("Provided for backwards compatibility", "3.0.0") def this( @@ -239,10 +237,10 @@ object ExecutorScheduler { ScheduledExecutors.scheduleOnce(this, scheduler)(initialDelay, unit, r) override def withExecutionModel(em: ExecModel): SchedulerService = - new FromSimpleExecutor(scheduler, executor, reporter, em, features) + new FromSimpleExecutor(scheduler, executor, reporter, em, additionalFeatures) override def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): SchedulerService = - new FromSimpleExecutor(scheduler, executor, r, executionModel, features) + new FromSimpleExecutor(scheduler, executor, r, executionModel, additionalFeatures) } /** Implementation of ExecutorScheduler backed by Java `ScheduledExecutorService`. Assumes error reporting is done by @@ -253,8 +251,8 @@ object ExecutorScheduler { private final class FromAdaptedThreadPoolExecutor( executor: AdaptedScheduledThreadPoolExecutor, override val executionModel: ExecModel, - override val features: Features - ) extends ExecutorScheduler(executor, null) { + additionalFeatures: Features, + ) extends ExecutorScheduler(executor, null, additionalFeatures) { def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = { if (initialDelay <= 0) { @@ -277,9 +275,9 @@ object ExecutorScheduler { } override def withExecutionModel(em: ExecModel): SchedulerService = - new FromAdaptedThreadPoolExecutor(executor, em, features) + new FromAdaptedThreadPoolExecutor(executor, em, additionalFeatures) override def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): SchedulerService = - new FromAdaptedThreadPoolExecutor(executor.withUncaughtExceptionReporter(r), executionModel, features) + new FromAdaptedThreadPoolExecutor(executor.withUncaughtExceptionReporter(r), executionModel, additionalFeatures) } } From c90c9032e4690c7ab8835897d40cbef869a9a55e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Bul?= Date: Mon, 30 Mar 2026 21:12:05 +0200 Subject: [PATCH 4/6] let the wrapped scheduler create InterceptRunnable when no UncaughtExceptionReporter override is present --- .../monix/execution/schedulers/ReferenceScheduler.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala b/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala index 9bcd4234a..147f10aed 100644 --- a/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala +++ b/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala @@ -115,15 +115,15 @@ object ReferenceScheduler { private[this] val reporterRef = if (reporter eq null) s else reporter override def execute(runnable: Runnable): Unit = - s.execute(InterceptRunnable(runnable, reporterRef)) + s.execute(InterceptRunnable(runnable, reporter)) override def reportFailure(t: Throwable): Unit = reporterRef.reportFailure(t) override def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = - s.scheduleOnce(initialDelay, unit, InterceptRunnable(r, reporterRef)) + s.scheduleOnce(initialDelay, unit, InterceptRunnable(r, reporter)) override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = - s.scheduleWithFixedDelay(initialDelay, delay, unit, InterceptRunnable(r, reporterRef)) + s.scheduleWithFixedDelay(initialDelay, delay, unit, InterceptRunnable(r, reporter)) override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = - s.scheduleAtFixedRate(initialDelay, period, unit, InterceptRunnable(r, reporterRef)) + s.scheduleAtFixedRate(initialDelay, period, unit, InterceptRunnable(r, reporter)) override def clockRealTime(unit: TimeUnit): Long = s.clockRealTime(unit) override def clockMonotonic(unit: TimeUnit): Long = From 441ed9fb0e6c4a2e8cfbe8f0a268d02ed22f292b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Bul?= Date: Mon, 30 Mar 2026 21:26:28 +0200 Subject: [PATCH 5/6] fix Mima issues --- .../monix/execution/schedulers/ExecutorScheduler.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala index da5575f93..fb9c97eb3 100644 --- a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala +++ b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala @@ -36,7 +36,11 @@ import scala.util.control.NonFatal abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporter, additionalFeatures: Features) extends SchedulerService with ReferenceScheduler with BatchingScheduler { - override final val features: Features = additionalFeatures + Scheduler.BATCHING + private val allFeatures: Features = additionalFeatures + Scheduler.BATCHING + + // for backwards compatibility + def this(e: ExecutorService, r: UncaughtExceptionReporter) = + this(e, r, Features.empty) /** Returns the underlying `ExecutorService` reference. */ def executor: ExecutorService = e @@ -79,6 +83,8 @@ abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporte throw new NotImplementedError("ExecutorService.withUncaughtExceptionReporter") // $COVERAGE-ON$ } + + override def features: Features = allFeatures } object ExecutorScheduler { From f064b4aec77cc94d9a02817950a929456f38325c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Bul?= Date: Tue, 31 Mar 2026 09:57:36 +0200 Subject: [PATCH 6/6] DRY tests --- .../schedulers/ExecutorSchedulerSuite.scala | 106 +++++++----------- 1 file changed, 38 insertions(+), 68 deletions(-) diff --git a/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala b/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala index b98800a61..318f9f81a 100644 --- a/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala +++ b/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala @@ -21,6 +21,7 @@ import minitest.TestSuite import monix.execution.ExecutionModel.{AlwaysAsyncExecution, Default => DefaultExecutionModel} import monix.execution.cancelables.SingleAssignCancelable import monix.execution.exceptions.DummyException +import monix.execution.schedulers.ExecutorSchedulerSuite.TestException import monix.execution.{Cancelable, Scheduler, UncaughtExceptionReporter} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException} @@ -152,101 +153,66 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self } test("reports errors on execute") { scheduler => - val latch = new CountDownLatch(1) - self.synchronized { - lastReportedFailure = null - lastReportedFailureLatch = latch - } + val latch = setupReporterLatch() try { - val ex = DummyException("dummy") - - scheduler.execute(new Runnable { - override def run() = - throw ex - }) + scheduler.execute(() => throw TestException) - assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") - self.synchronized(assertEquals(lastReportedFailure, ex)) + assertTestExceptionCaught(latch) } finally { - self.synchronized { - lastReportedFailure = null - lastReportedFailureLatch = null - } + clearReporterLatch() } } test("reports errors on scheduleOnce") { scheduler => - val latch = new CountDownLatch(1) - self.synchronized { - lastReportedFailure = null - lastReportedFailureLatch = latch - } - - try { - val ex = DummyException("dummy") + testScheduledErrorReporting( + scheduleFailure = () => scheduler.scheduleOnce(1.milli)(throw TestException) + ) + } - scheduler.scheduleOnce( - 1, - TimeUnit.MILLISECONDS, - () => throw ex, - ) + test("reports errors on scheduleAtFixedRate") { scheduler => + testScheduledErrorReporting( + scheduleFailure = () => scheduler.scheduleAtFixedRate(0.seconds, 1.second)(throw TestException) + ) + } - assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") - self.synchronized(assertEquals(lastReportedFailure, ex)) - } finally { - self.synchronized { - lastReportedFailure = null - lastReportedFailureLatch = null - } - } + test("reports errors on scheduleWithFixedDelay") { scheduler => + testScheduledErrorReporting( + scheduleFailure = () => scheduler.scheduleWithFixedDelay(0.seconds, 1.second)(throw TestException), + ) } - test("reports errors on scheduleAtFixedRate") { scheduler => - val latch = new CountDownLatch(1) - self.synchronized { - lastReportedFailure = null - lastReportedFailureLatch = latch - } + private def testScheduledErrorReporting(scheduleFailure: () => Cancelable): Unit = { + val latch = setupReporterLatch() - val ex = DummyException("dummy") - val schedule = scheduler.scheduleAtFixedRate(0.seconds, 1.second) { - throw ex - } + val schedule = scheduleFailure() try { - assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") - self.synchronized(assertEquals(lastReportedFailure, ex)) + assertTestExceptionCaught(latch) } finally { schedule.cancel() - self.synchronized { - lastReportedFailure = null - lastReportedFailureLatch = null - } + clearReporterLatch() } } - test("reports errors on scheduleWithFixedDelay") { scheduler => + private def setupReporterLatch() = { val latch = new CountDownLatch(1) self.synchronized { lastReportedFailure = null lastReportedFailureLatch = latch } + latch + } - val ex = DummyException("dummy") - val schedule = scheduler.scheduleWithFixedDelay(0.seconds, 1.second) { - throw ex - } + private def assertTestExceptionCaught(latch: CountDownLatch): Unit = { + assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") + self.synchronized(assertEquals(lastReportedFailure, TestException)) + } - try { - assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") - self.synchronized(assertEquals(lastReportedFailure, ex)) - } finally { - schedule.cancel() - self.synchronized { - lastReportedFailure = null - lastReportedFailureLatch = null - } + private def clearReporterLatch(): Unit = { + self.synchronized { + lastReportedFailure = null + lastReportedFailureLatch = null } } @@ -254,6 +220,10 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self new Runnable { def run() = f } } +object ExecutorSchedulerSuite { + private val TestException = DummyException("dummy") +} + object ComputationSchedulerSuite extends ExecutorSchedulerSuite { def setup(): SchedulerService = monix.execution.Scheduler