diff --git a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedThreadPoolExecutor.scala b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedScheduledThreadPoolExecutor.scala similarity index 73% rename from monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedThreadPoolExecutor.scala rename to monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedScheduledThreadPoolExecutor.scala index 7bde71c5a..57f7cdffc 100644 --- a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedThreadPoolExecutor.scala +++ b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedScheduledThreadPoolExecutor.scala @@ -17,14 +17,18 @@ package monix.execution.schedulers +import monix.execution.UncaughtExceptionReporter + import java.util.concurrent._ /** A mixin for adapting for the Java `ThreadPoolExecutor` implementation * to report errors using the default thread exception handler. */ -private[schedulers] abstract class AdaptedThreadPoolExecutor(corePoolSize: Int, factory: ThreadFactory) - extends ScheduledThreadPoolExecutor(corePoolSize, factory) { - def reportFailure(t: Throwable): Unit +private[schedulers] final class AdaptedScheduledThreadPoolExecutor( + corePoolSize: Int, + factory: ThreadFactory, + reporter: UncaughtExceptionReporter, +) extends ScheduledThreadPoolExecutor(corePoolSize, factory) { override def afterExecute(r: Runnable, t: Throwable): Unit = { super.afterExecute(r, t) @@ -45,6 +49,9 @@ private[schedulers] abstract class AdaptedThreadPoolExecutor(corePoolSize: Int, } } - if (exception ne null) reportFailure(exception) + if (exception ne null) reporter.reportFailure(exception) } + + def withUncaughtExceptionReporter(reporter: UncaughtExceptionReporter): AdaptedScheduledThreadPoolExecutor = + new AdaptedScheduledThreadPoolExecutor(corePoolSize, factory, reporter) } diff --git a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala index 51ea65143..fb9c97eb3 100644 --- a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala +++ b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala @@ -17,24 +17,31 @@ package monix.execution.schedulers -import java.util.concurrent.{ExecutorService, ForkJoinPool, ScheduledExecutorService} import monix.execution.internal.forkJoin.{AdaptedForkJoinPool, DynamicWorkerThreadFactory, StandardWorkerThreadFactory} import monix.execution.internal.{InterceptRunnable, Platform, ScheduledExecutors} -import monix.execution.{Cancelable, UncaughtExceptionReporter} -import monix.execution.{Features, Scheduler} +import monix.execution.{Cancelable, Features, Scheduler, UncaughtExceptionReporter} + +import java.util.concurrent.{ExecutorService, ScheduledExecutorService} // Prevents conflict with the deprecated symbol import monix.execution.{ExecutionModel => ExecModel} -import scala.concurrent.{ExecutionContext, Future, Promise, blocking} + import scala.concurrent.duration.TimeUnit +import scala.concurrent.{ExecutionContext, Future, Promise, blocking} import scala.util.control.NonFatal /** An [[ExecutorScheduler]] is a class for building a * [[monix.execution.schedulers.SchedulerService SchedulerService]] * out of a Java `ExecutorService`. */ -abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporter) +abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporter, additionalFeatures: Features) extends SchedulerService with ReferenceScheduler with BatchingScheduler { + private val allFeatures: Features = additionalFeatures + Scheduler.BATCHING + + // for backwards compatibility + def this(e: ExecutorService, r: UncaughtExceptionReporter) = + this(e, r, Features.empty) + /** Returns the underlying `ExecutorService` reference. */ def executor: ExecutorService = e @@ -54,9 +61,10 @@ abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporte awaitOn.execute(new Runnable { override def run() = try blocking { - p.success(e.awaitTermination(timeout, unit)) - () - } catch { + p.success(e.awaitTermination(timeout, unit)) + () + } + catch { case ex if NonFatal(ex) => p.failure(ex); () } @@ -75,13 +83,14 @@ abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporte throw new NotImplementedError("ExecutorService.withUncaughtExceptionReporter") // $COVERAGE-ON$ } + + override def features: Features = allFeatures } object ExecutorScheduler { - /** Builder for an [[ExecutorScheduler]], converting a - * Java `ScheduledExecutorService`. + /** Builder for an [[ExecutorScheduler]], converting a Java `ExecutorService`. * - * @param service is the Java `ScheduledExecutorService` that will take + * @param service is the Java `ExecutorService` that will take * care of scheduling and execution of all runnables. * @param reporter is the [[UncaughtExceptionReporter]] that logs uncaught exceptions. * @param executionModel is the preferred @@ -91,22 +100,54 @@ object ExecutorScheduler { * provided `ExecutorService` implements, see the documentation * for [[monix.execution.Scheduler.features Scheduler.features]] */ + @deprecated("Use ExecutorScheduler.fromExecutorService", "3.4.2-avs.6") def apply( service: ExecutorService, reporter: UncaughtExceptionReporter, executionModel: ExecModel, - features: Features): ExecutorScheduler = { - - // Implementations will inherit BatchingScheduler, so this is guaranteed - val ft = features + Scheduler.BATCHING + features: Features, + ): ExecutorScheduler = service match { - case ref: ScheduledExecutorService => - new FromScheduledExecutor(ref, reporter, executionModel, ft) - case _ => - val s = Defaults.scheduledExecutor - new FromSimpleExecutor(s, service, reporter, executionModel, ft) + case ref: AdaptedScheduledThreadPoolExecutor => scheduledThreadPool(ref, executionModel, features) + case _ => fromExecutorService(service, reporter, executionModel, features) } - } + + /** Builder for an [[ExecutorScheduler]], converting a Java `ExecutorService`. + * + * @param service is the Java `ExecutorService` that will take + * care of scheduling and execution of all runnables. + * @param reporter is the [[UncaughtExceptionReporter]] that logs uncaught exceptions. + * @param executionModel is the preferred + * [[monix.execution.ExecutionModel ExecutionModel]], a guideline + * for run-loops and producers of data. + * @param features is the set of [[Features]] that the + * provided `ExecutorService` implements, see the documentation + * for [[monix.execution.Scheduler.features Scheduler.features]] + */ + def fromExecutorService( + service: ExecutorService, + reporter: UncaughtExceptionReporter, + executionModel: ExecModel, + features: Features, + ): ExecutorScheduler = + new FromSimpleExecutor( + scheduler = Defaults.scheduledExecutor, + executor = service, + reporter = reporter, + executionModel = executionModel, + additionalFeatures = features, + ) + + private[schedulers] def scheduledThreadPool( + service: AdaptedScheduledThreadPoolExecutor, + executionModel: ExecModel, + features: Features, + ): ExecutorScheduler = + new FromAdaptedThreadPoolExecutor( + executor = service, + executionModel = executionModel, + additionalFeatures = features, + ) /** * DEPRECATED — provided for binary backwards compatibility. @@ -117,9 +158,10 @@ object ExecutorScheduler { def apply( service: ExecutorService, reporter: UncaughtExceptionReporter, - executionModel: ExecModel): ExecutorScheduler = { + executionModel: ExecModel + ): ExecutorScheduler = { // $COVERAGE-OFF$ - apply(service, reporter, executionModel, Features.empty) + fromExecutorService(service, reporter, executionModel, Features.empty) // $COVERAGE-ON$ } @@ -131,7 +173,8 @@ object ExecutorScheduler { parallelism: Int, daemonic: Boolean, reporter: UncaughtExceptionReporter, - executionModel: ExecModel): ExecutorScheduler = { + executionModel: ExecModel, + ): ExecutorScheduler = { val handler = reporter.asJava val pool = new AdaptedForkJoinPool( @@ -142,7 +185,7 @@ object ExecutorScheduler { asyncMode = true ) - apply(pool, reporter, executionModel, Features.empty) + fromExecutorService(pool, reporter, executionModel, Features.empty) } /** Creates an [[ExecutorScheduler]] backed by a `ForkJoinPool` @@ -154,7 +197,8 @@ object ExecutorScheduler { maxThreads: Int, daemonic: Boolean, reporter: UncaughtExceptionReporter, - executionModel: ExecModel): ExecutorScheduler = { + executionModel: ExecModel, + ): ExecutorScheduler = { val exceptionHandler = reporter.asJava val pool = new AdaptedForkJoinPool( @@ -165,7 +209,7 @@ object ExecutorScheduler { asyncMode = true ) - apply(pool, reporter, executionModel, Features.empty) + fromExecutorService(pool, reporter, executionModel, Features.empty) } /** Converts a Java `ExecutorService`. @@ -178,17 +222,18 @@ object ExecutorScheduler { private final class FromSimpleExecutor( scheduler: ScheduledExecutorService, executor: ExecutorService, - r: UncaughtExceptionReporter, + reporter: UncaughtExceptionReporter, override val executionModel: ExecModel, - override val features: Features) - extends ExecutorScheduler(executor, r) { + additionalFeatures: Features, + ) extends ExecutorScheduler(executor, reporter, additionalFeatures) { @deprecated("Provided for backwards compatibility", "3.0.0") def this( scheduler: ScheduledExecutorService, executor: ExecutorService, r: UncaughtExceptionReporter, - executionModel: ExecModel) = { + executionModel: ExecModel, + ) = { // $COVERAGE-OFF$ this(scheduler, executor, r, executionModel, Features.empty) // $COVERAGE-ON$ @@ -198,53 +243,47 @@ object ExecutorScheduler { ScheduledExecutors.scheduleOnce(this, scheduler)(initialDelay, unit, r) override def withExecutionModel(em: ExecModel): SchedulerService = - new FromSimpleExecutor(scheduler, executor, r, em, features) + new FromSimpleExecutor(scheduler, executor, reporter, em, additionalFeatures) override def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): SchedulerService = - new FromSimpleExecutor(scheduler, executor, r, executionModel, features) + new FromSimpleExecutor(scheduler, executor, r, executionModel, additionalFeatures) } - /** Converts a Java `ScheduledExecutorService`. */ - private final class FromScheduledExecutor( - s: ScheduledExecutorService, - r: UncaughtExceptionReporter, + /** Implementation of ExecutorScheduler backed by Java `ScheduledExecutorService`. Assumes error reporting is done by + * the underlying `ScheduledExecutorService`. + * + * Currently intended for use only with AdaptedThreadPoolExecutor. + */ + private final class FromAdaptedThreadPoolExecutor( + executor: AdaptedScheduledThreadPoolExecutor, override val executionModel: ExecModel, - override val features: Features) - extends ExecutorScheduler(s, r) { - - @deprecated("Provided for backwards compatibility", "3.0.0") - def this(scheduler: ScheduledExecutorService, r: UncaughtExceptionReporter, executionModel: ExecModel) = { - // $COVERAGE-OFF$ - this(scheduler, r, executionModel, Features.empty) - // $COVERAGE-ON$ - } - - override def executor: ScheduledExecutorService = s + additionalFeatures: Features, + ) extends ExecutorScheduler(executor, null, additionalFeatures) { def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = { if (initialDelay <= 0) { execute(r) Cancelable.empty } else { - val task = s.schedule(r, initialDelay, unit) + val task = executor.schedule(r, initialDelay, unit) Cancelable(() => { task.cancel(true); () }) } } override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = { - val task = s.scheduleWithFixedDelay(r, initialDelay, delay, unit) + val task = executor.scheduleWithFixedDelay(r, initialDelay, delay, unit) Cancelable(() => { task.cancel(false); () }) } override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = { - val task = s.scheduleAtFixedRate(r, initialDelay, period, unit) + val task = executor.scheduleAtFixedRate(r, initialDelay, period, unit) Cancelable(() => { task.cancel(false); () }) } override def withExecutionModel(em: ExecModel): SchedulerService = - new FromScheduledExecutor(s, r, em, features) + new FromAdaptedThreadPoolExecutor(executor, em, additionalFeatures) override def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): SchedulerService = - new FromScheduledExecutor(s, r, executionModel, features) + new FromAdaptedThreadPoolExecutor(executor.withUncaughtExceptionReporter(r), executionModel, additionalFeatures) } } diff --git a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/SchedulerCompanionImpl.scala b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/SchedulerCompanionImpl.scala index 3cff896e5..05d9be11b 100644 --- a/monix-execution/jvm/src/main/scala/monix/execution/schedulers/SchedulerCompanionImpl.scala +++ b/monix-execution/jvm/src/main/scala/monix/execution/schedulers/SchedulerCompanionImpl.scala @@ -112,7 +112,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { * @param reporter $reporter */ def apply(executor: ExecutorService, reporter: UncaughtExceptionReporter): SchedulerService = - ExecutorScheduler(executor, reporter, ExecModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService(executor, reporter, ExecModel.Default, Features.empty) /** [[monix.execution.Scheduler Scheduler]] builder that converts a * Java `ExecutorService` into a scheduler. @@ -124,10 +124,8 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { def apply( executor: ExecutorService, reporter: UncaughtExceptionReporter, - executionModel: ExecModel): SchedulerService = { - - ExecutorScheduler(executor, reporter, executionModel, Features.empty) - } + executionModel: ExecModel): SchedulerService = + ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty) /** [[monix.execution.Scheduler Scheduler]] builder that converts a * Java `ExecutorService` into a scheduler. @@ -135,7 +133,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { * @param executor $executorService */ def apply(executor: ExecutorService): SchedulerService = - ExecutorScheduler(executor, UncaughtExceptionReporter.default, ExecModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService(executor, UncaughtExceptionReporter.default, ExecModel.Default, Features.empty) /** [[monix.execution.Scheduler Scheduler]] builder that converts a * Java `ExecutorService` into a scheduler. @@ -144,7 +142,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { * @param executionModel $executionModel */ def apply(executor: ExecutorService, executionModel: ExecModel): SchedulerService = - ExecutorScheduler(executor, UncaughtExceptionReporter.default, executionModel, Features.empty) + ExecutorScheduler.fromExecutorService(executor, UncaughtExceptionReporter.default, executionModel, Features.empty) /** [[monix.execution.Scheduler Scheduler]] builder - uses monix's * default `ScheduledExecutorService` for handling the scheduling of tasks. @@ -298,7 +296,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { new SynchronousQueue[Runnable](false), threadFactory) - ExecutorScheduler(executor, reporter, executionModel, Features.empty) + ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty) } /** Builds a [[Scheduler]] backed by an internal @@ -340,7 +338,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { new SynchronousQueue[Runnable](false), threadFactory) - ExecutorScheduler(executor, reporter, executionModel, Features.empty) + ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty) } /** Builds a [[monix.execution.Scheduler Scheduler]] that schedules and executes tasks on its own thread. @@ -365,12 +363,9 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { executionModel: ExecModel = ExecModel.Default): SchedulerService = { val factory = ThreadFactoryBuilder(name, reporter, daemonic) - val executor = new AdaptedThreadPoolExecutor(1, factory) { - override def reportFailure(t: Throwable): Unit = - reporter.reportFailure(t) - } + val executor = new AdaptedScheduledThreadPoolExecutor(1, factory, reporter) - ExecutorScheduler(executor, null, executionModel, Features.empty) + ExecutorScheduler.scheduledThreadPool(executor, executionModel, Features.empty) } /** Builds a [[monix.execution.Scheduler Scheduler]] with a fixed thread-pool. @@ -394,12 +389,9 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion { executionModel: ExecModel = ExecModel.Default): SchedulerService = { val factory = ThreadFactoryBuilder(name, reporter, daemonic) - val executor = new AdaptedThreadPoolExecutor(poolSize, factory) { - override def reportFailure(t: Throwable): Unit = - reporter.reportFailure(t) - } + val executor = new AdaptedScheduledThreadPoolExecutor(poolSize, factory, reporter) - ExecutorScheduler(executor, null, executionModel, Features.empty) + ExecutorScheduler.scheduledThreadPool(executor, executionModel, Features.empty) } /** The explicit global `Scheduler`. Invoke `global` when you want diff --git a/monix-execution/jvm/src/test/scala/monix/execution/FeaturesJVMSuite.scala b/monix-execution/jvm/src/test/scala/monix/execution/FeaturesJVMSuite.scala index dec28e227..134f25617 100644 --- a/monix-execution/jvm/src/test/scala/monix/execution/FeaturesJVMSuite.scala +++ b/monix-execution/jvm/src/test/scala/monix/execution/FeaturesJVMSuite.scala @@ -70,7 +70,12 @@ object FeaturesJVMSuite extends SimpleTestSuite with Checkers { test("ExecutorScheduler(Executor") { val ref = { val ec = Executors.newSingleThreadExecutor() - ExecutorScheduler(ec, UncaughtExceptionReporter.default, ExecutionModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService( + ec, + UncaughtExceptionReporter.default, + ExecutionModel.Default, + Features.empty + ) } try { assert(ref.features.contains(Scheduler.BATCHING)) @@ -83,7 +88,12 @@ object FeaturesJVMSuite extends SimpleTestSuite with Checkers { test("ExecutorScheduler(ScheduledExecutor") { val ref = { val ec = Executors.newSingleThreadScheduledExecutor() - ExecutorScheduler(ec, UncaughtExceptionReporter.default, ExecutionModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService( + ec, + UncaughtExceptionReporter.default, + ExecutionModel.Default, + Features.empty + ) } try { assert(ref.features.contains(Scheduler.BATCHING)) @@ -96,7 +106,12 @@ object FeaturesJVMSuite extends SimpleTestSuite with Checkers { test("ExecutorScheduler(ScheduledExecutor)") { val ref = { val ec = Executors.newSingleThreadScheduledExecutor() - ExecutorScheduler(ec, UncaughtExceptionReporter.default, ExecutionModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService( + ec, + UncaughtExceptionReporter.default, + ExecutionModel.Default, + Features.empty + ) } try { assert(ref.features.contains(Scheduler.BATCHING)) diff --git a/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala b/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala index 56280681a..318f9f81a 100644 --- a/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala +++ b/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ExecutorSchedulerSuite.scala @@ -17,16 +17,16 @@ package monix.execution.schedulers -import java.util.concurrent.{CountDownLatch, TimeUnit, TimeoutException} - import minitest.TestSuite import monix.execution.ExecutionModel.{AlwaysAsyncExecution, Default => DefaultExecutionModel} import monix.execution.cancelables.SingleAssignCancelable import monix.execution.exceptions.DummyException +import monix.execution.schedulers.ExecutorSchedulerSuite.TestException import monix.execution.{Cancelable, Scheduler, UncaughtExceptionReporter} +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException} import scala.concurrent.duration._ -import scala.concurrent.{blocking, Await, Promise} +import scala.concurrent.{Await, Promise, blocking} abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self => var lastReportedFailure = null: Throwable @@ -101,7 +101,8 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self } else if (value < 4) { value += 1 } - }) + } + ) assert(Await.result(p.future, 5.second) == 4) } @@ -124,7 +125,8 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self } else if (value < 4) { value += 1 } - }) + } + ) assert(Await.result(p.future, 5.second) == 4) } @@ -151,55 +153,66 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self } test("reports errors on execute") { scheduler => - val latch = new CountDownLatch(1) - self.synchronized { - lastReportedFailure = null - lastReportedFailureLatch = latch - } + val latch = setupReporterLatch() try { - val ex = DummyException("dummy") - - scheduler.execute(new Runnable { - override def run() = - throw ex - }) + scheduler.execute(() => throw TestException) - assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") - self.synchronized(assertEquals(lastReportedFailure, ex)) + assertTestExceptionCaught(latch) } finally { - self.synchronized { - lastReportedFailure = null - lastReportedFailureLatch = null - } + clearReporterLatch() } } test("reports errors on scheduleOnce") { scheduler => + testScheduledErrorReporting( + scheduleFailure = () => scheduler.scheduleOnce(1.milli)(throw TestException) + ) + } + + test("reports errors on scheduleAtFixedRate") { scheduler => + testScheduledErrorReporting( + scheduleFailure = () => scheduler.scheduleAtFixedRate(0.seconds, 1.second)(throw TestException) + ) + } + + test("reports errors on scheduleWithFixedDelay") { scheduler => + testScheduledErrorReporting( + scheduleFailure = () => scheduler.scheduleWithFixedDelay(0.seconds, 1.second)(throw TestException), + ) + } + + private def testScheduledErrorReporting(scheduleFailure: () => Cancelable): Unit = { + val latch = setupReporterLatch() + + val schedule = scheduleFailure() + + try { + assertTestExceptionCaught(latch) + } finally { + schedule.cancel() + clearReporterLatch() + } + } + + private def setupReporterLatch() = { val latch = new CountDownLatch(1) self.synchronized { lastReportedFailure = null lastReportedFailureLatch = latch } + latch + } - try { - val ex = DummyException("dummy") - - scheduler.scheduleOnce( - 1, - TimeUnit.MILLISECONDS, - new Runnable { - override def run() = - throw ex - }) - - assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") - self.synchronized(assertEquals(lastReportedFailure, ex)) - } finally { - self.synchronized { - lastReportedFailure = null - lastReportedFailureLatch = null - } + private def assertTestExceptionCaught(latch: CountDownLatch): Unit = { + assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") + self.synchronized(assertEquals(lastReportedFailure, TestException)) + } + + private def clearReporterLatch(): Unit = { + self.synchronized { + lastReportedFailure = null + lastReportedFailureLatch = null } } @@ -207,6 +220,10 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self new Runnable { def run() = f } } +object ExecutorSchedulerSuite { + private val TestException = DummyException("dummy") +} + object ComputationSchedulerSuite extends ExecutorSchedulerSuite { def setup(): SchedulerService = monix.execution.Scheduler @@ -256,3 +273,8 @@ object IOSchedulerSuite extends ExecutorSchedulerSuite { def setup(): SchedulerService = monix.execution.Scheduler.io("monix-tests-io", reporter = testsReporter) } + +object ScheduledExecutorSuite extends ExecutorSchedulerSuite { + def setup(): SchedulerService = + monix.execution.Scheduler(Executors.newSingleThreadScheduledExecutor(), reporter = testsReporter) +} diff --git a/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ScheduledExecutorToSchedulerSuite.scala b/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ScheduledExecutorToSchedulerSuite.scala index c8e549f41..74f0de54f 100644 --- a/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ScheduledExecutorToSchedulerSuite.scala +++ b/monix-execution/jvm/src/test/scala/monix/execution/schedulers/ScheduledExecutorToSchedulerSuite.scala @@ -41,7 +41,7 @@ object ScheduledExecutorToSchedulerSuite extends TestSuite[ExecutorScheduler] { daemonic = true )) - ExecutorScheduler(executor, reporter, ExecModel.Default, Features.empty) + ExecutorScheduler.fromExecutorService(executor, reporter, ExecModel.Default, Features.empty) } override def tearDown(scheduler: ExecutorScheduler): Unit = { diff --git a/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala b/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala index 12041af74..147f10aed 100644 --- a/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala +++ b/monix-execution/shared/src/main/scala/monix/execution/schedulers/ReferenceScheduler.scala @@ -119,11 +119,11 @@ object ReferenceScheduler { override def reportFailure(t: Throwable): Unit = reporterRef.reportFailure(t) override def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = - s.scheduleOnce(initialDelay, unit, r) + s.scheduleOnce(initialDelay, unit, InterceptRunnable(r, reporter)) override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = - s.scheduleWithFixedDelay(initialDelay, delay, unit, r) + s.scheduleWithFixedDelay(initialDelay, delay, unit, InterceptRunnable(r, reporter)) override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = - s.scheduleAtFixedRate(initialDelay, period, unit, r) + s.scheduleAtFixedRate(initialDelay, period, unit, InterceptRunnable(r, reporter)) override def clockRealTime(unit: TimeUnit): Long = s.clockRealTime(unit) override def clockMonotonic(unit: TimeUnit): Long = diff --git a/monix-execution/shared/src/test/scala/monix/execution/schedulers/UncaughtExceptionReporterSuite.scala b/monix-execution/shared/src/test/scala/monix/execution/schedulers/UncaughtExceptionReporterSuite.scala index 5def437c8..4b5498da0 100644 --- a/monix-execution/shared/src/test/scala/monix/execution/schedulers/UncaughtExceptionReporterSuite.scala +++ b/monix-execution/shared/src/test/scala/monix/execution/schedulers/UncaughtExceptionReporterSuite.scala @@ -17,11 +17,12 @@ package monix.execution.schedulers -import scala.concurrent.{ExecutionContext, Promise} -import scala.concurrent.duration._ import minitest.TestSuite import monix.execution.{ExecutionModel, FutureUtils, Scheduler, UncaughtExceptionReporter} +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Promise} + class UncaughtExceptionReporterBaseSuite extends TestSuite[Promise[Throwable]] { protected val immediateEC = TrampolineExecutionContext.immediate @@ -41,13 +42,42 @@ class UncaughtExceptionReporterBaseSuite extends TestSuite[Promise[Throwable]] { def testReports(name: String)(f: UncaughtExceptionReporter => Scheduler) = { testAsync(name) { p => f(reporter(p)).execute(throwRunnable) - FutureUtils.timeout(p.future.collect { case Dummy => }(immediateEC), 500.millis)(Scheduler.global) + assertExceptionCaught(p) } testAsync(name + ".withUncaughtExceptionReporter") { p => f(UncaughtExceptionReporter.default).withUncaughtExceptionReporter(reporter(p)).execute(throwRunnable) - FutureUtils.timeout(p.future.collect { case Dummy => }(immediateEC), 500.millis)(Scheduler.global) + assertExceptionCaught(p) + } + + testAsync(name + ".withUncaughtExceptionReporter + scheduleOnce") { p => + f(UncaughtExceptionReporter.default) + .withUncaughtExceptionReporter(reporter(p)) + .scheduleOnce(1.milli)(throwRunnable.run()) + assertExceptionCaught(p) } + + testAsync(name + ".withUncaughtExceptionReporter + scheduleAtFixedRate") { p => + val schedule = f(UncaughtExceptionReporter.default) + .withUncaughtExceptionReporter(reporter(p)) + .scheduleAtFixedRate(0.millis, 1.second)(throwRunnable.run()) + val result = assertExceptionCaught(p) + result.onComplete(_ => schedule.cancel())(immediateEC) + result + } + + testAsync(name + ".withUncaughtExceptionReporter + scheduleWithFixedDelay") { p => + val schedule = f(UncaughtExceptionReporter.default) + .withUncaughtExceptionReporter(reporter(p)) + .scheduleWithFixedDelay(0.millis, 1.second)(throwRunnable.run()) + val result = assertExceptionCaught(p) + result.onComplete(_ => schedule.cancel())(immediateEC) + result + } + } + + private def assertExceptionCaught(p: Promise[Throwable]) = { + FutureUtils.timeout(p.future.collect { case Dummy => }(immediateEC), 500.millis)(Scheduler.global) } } diff --git a/project/MimaFilters.scala b/project/MimaFilters.scala index 48a1ae9f5..0c99819a4 100644 --- a/project/MimaFilters.scala +++ b/project/MimaFilters.scala @@ -105,6 +105,8 @@ object MimaFilters { // TrampolineExecutionContext signature tweaks (internal API) exclude[IncompatibleMethTypeProblem]("monix.execution.schedulers.TrampolineExecutionContext#JVMNormalTrampoline.startLoop"), exclude[IncompatibleMethTypeProblem]("monix.execution.schedulers.TrampolineExecutionContext#JVMOptimalTrampoline.startLoop"), - exclude[IncompatibleMethTypeProblem]("monix.execution.schedulers.TrampolineExecutionContext.this") + exclude[IncompatibleMethTypeProblem]("monix.execution.schedulers.TrampolineExecutionContext.this"), + exclude[MissingClassProblem]("monix.execution.schedulers.AdaptedThreadPoolExecutor"), + exclude[MissingClassProblem]("monix.execution.schedulers.ExecutorScheduler$FromScheduledExecutor"), ) }