Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package monix.execution.schedulers

import monix.execution.UncaughtExceptionReporter

import java.util.concurrent._

/** A mixin for adapting for the Java `ThreadPoolExecutor` implementation
* to report errors using the default thread exception handler.
*/
private[schedulers] abstract class AdaptedThreadPoolExecutor(corePoolSize: Int, factory: ThreadFactory)
extends ScheduledThreadPoolExecutor(corePoolSize, factory) {
def reportFailure(t: Throwable): Unit
private[schedulers] final class AdaptedScheduledThreadPoolExecutor(
corePoolSize: Int,
factory: ThreadFactory,
reporter: UncaughtExceptionReporter,
) extends ScheduledThreadPoolExecutor(corePoolSize, factory) {

override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
Expand All @@ -45,6 +49,9 @@ private[schedulers] abstract class AdaptedThreadPoolExecutor(corePoolSize: Int,
}
}

if (exception ne null) reportFailure(exception)
if (exception ne null) reporter.reportFailure(exception)
}

def withUncaughtExceptionReporter(reporter: UncaughtExceptionReporter): AdaptedScheduledThreadPoolExecutor =
new AdaptedScheduledThreadPoolExecutor(corePoolSize, factory, reporter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,31 @@

package monix.execution.schedulers

import java.util.concurrent.{ExecutorService, ForkJoinPool, ScheduledExecutorService}
import monix.execution.internal.forkJoin.{AdaptedForkJoinPool, DynamicWorkerThreadFactory, StandardWorkerThreadFactory}
import monix.execution.internal.{InterceptRunnable, Platform, ScheduledExecutors}
import monix.execution.{Cancelable, UncaughtExceptionReporter}
import monix.execution.{Features, Scheduler}
import monix.execution.{Cancelable, Features, Scheduler, UncaughtExceptionReporter}

import java.util.concurrent.{ExecutorService, ScheduledExecutorService}
// Prevents conflict with the deprecated symbol
import monix.execution.{ExecutionModel => ExecModel}
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}

import scala.concurrent.duration.TimeUnit
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}
import scala.util.control.NonFatal

/** An [[ExecutorScheduler]] is a class for building a
* [[monix.execution.schedulers.SchedulerService SchedulerService]]
* out of a Java `ExecutorService`.
*/
abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporter)
abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporter, additionalFeatures: Features)
extends SchedulerService with ReferenceScheduler with BatchingScheduler {

private val allFeatures: Features = additionalFeatures + Scheduler.BATCHING

// for backwards compatibility
def this(e: ExecutorService, r: UncaughtExceptionReporter) =
this(e, r, Features.empty)

/** Returns the underlying `ExecutorService` reference. */
def executor: ExecutorService = e

Expand All @@ -54,9 +61,10 @@ abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporte
awaitOn.execute(new Runnable {
override def run() =
try blocking {
p.success(e.awaitTermination(timeout, unit))
()
} catch {
p.success(e.awaitTermination(timeout, unit))
()
}
catch {
case ex if NonFatal(ex) =>
p.failure(ex); ()
}
Expand All @@ -75,13 +83,14 @@ abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporte
throw new NotImplementedError("ExecutorService.withUncaughtExceptionReporter")
// $COVERAGE-ON$
}

override def features: Features = allFeatures
}

object ExecutorScheduler {
/** Builder for an [[ExecutorScheduler]], converting a
* Java `ScheduledExecutorService`.
/** Builder for an [[ExecutorScheduler]], converting a Java `ExecutorService`.
*
* @param service is the Java `ScheduledExecutorService` that will take
* @param service is the Java `ExecutorService` that will take
* care of scheduling and execution of all runnables.
* @param reporter is the [[UncaughtExceptionReporter]] that logs uncaught exceptions.
* @param executionModel is the preferred
Expand All @@ -91,22 +100,54 @@ object ExecutorScheduler {
* provided `ExecutorService` implements, see the documentation
* for [[monix.execution.Scheduler.features Scheduler.features]]
*/
@deprecated("Use ExecutorScheduler.fromExecutorService", "3.4.2-avs.6")
def apply(
service: ExecutorService,
reporter: UncaughtExceptionReporter,
executionModel: ExecModel,
features: Features): ExecutorScheduler = {

// Implementations will inherit BatchingScheduler, so this is guaranteed
val ft = features + Scheduler.BATCHING
features: Features,
): ExecutorScheduler =
service match {
case ref: ScheduledExecutorService =>
new FromScheduledExecutor(ref, reporter, executionModel, ft)
case _ =>
val s = Defaults.scheduledExecutor
new FromSimpleExecutor(s, service, reporter, executionModel, ft)
case ref: AdaptedScheduledThreadPoolExecutor => scheduledThreadPool(ref, executionModel, features)
case _ => fromExecutorService(service, reporter, executionModel, features)
}
}

/** Builder for an [[ExecutorScheduler]], converting a Java `ExecutorService`.
*
* @param service is the Java `ExecutorService` that will take
* care of scheduling and execution of all runnables.
* @param reporter is the [[UncaughtExceptionReporter]] that logs uncaught exceptions.
* @param executionModel is the preferred
* [[monix.execution.ExecutionModel ExecutionModel]], a guideline
* for run-loops and producers of data.
* @param features is the set of [[Features]] that the
* provided `ExecutorService` implements, see the documentation
* for [[monix.execution.Scheduler.features Scheduler.features]]
*/
def fromExecutorService(
service: ExecutorService,
reporter: UncaughtExceptionReporter,
executionModel: ExecModel,
features: Features,
): ExecutorScheduler =
new FromSimpleExecutor(
scheduler = Defaults.scheduledExecutor,
executor = service,
reporter = reporter,
executionModel = executionModel,
additionalFeatures = features,
)

private[schedulers] def scheduledThreadPool(
service: AdaptedScheduledThreadPoolExecutor,
executionModel: ExecModel,
features: Features,
): ExecutorScheduler =
new FromAdaptedThreadPoolExecutor(
executor = service,
executionModel = executionModel,
additionalFeatures = features,
)

/**
* DEPRECATED — provided for binary backwards compatibility.
Expand All @@ -117,9 +158,10 @@ object ExecutorScheduler {
def apply(
service: ExecutorService,
reporter: UncaughtExceptionReporter,
executionModel: ExecModel): ExecutorScheduler = {
executionModel: ExecModel
): ExecutorScheduler = {
// $COVERAGE-OFF$
apply(service, reporter, executionModel, Features.empty)
fromExecutorService(service, reporter, executionModel, Features.empty)
// $COVERAGE-ON$
}

Expand All @@ -131,7 +173,8 @@ object ExecutorScheduler {
parallelism: Int,
daemonic: Boolean,
reporter: UncaughtExceptionReporter,
executionModel: ExecModel): ExecutorScheduler = {
executionModel: ExecModel,
): ExecutorScheduler = {

val handler = reporter.asJava
val pool = new AdaptedForkJoinPool(
Expand All @@ -142,7 +185,7 @@ object ExecutorScheduler {
asyncMode = true
)

apply(pool, reporter, executionModel, Features.empty)
fromExecutorService(pool, reporter, executionModel, Features.empty)
}

/** Creates an [[ExecutorScheduler]] backed by a `ForkJoinPool`
Expand All @@ -154,7 +197,8 @@ object ExecutorScheduler {
maxThreads: Int,
daemonic: Boolean,
reporter: UncaughtExceptionReporter,
executionModel: ExecModel): ExecutorScheduler = {
executionModel: ExecModel,
): ExecutorScheduler = {

val exceptionHandler = reporter.asJava
val pool = new AdaptedForkJoinPool(
Expand All @@ -165,7 +209,7 @@ object ExecutorScheduler {
asyncMode = true
)

apply(pool, reporter, executionModel, Features.empty)
fromExecutorService(pool, reporter, executionModel, Features.empty)
}

/** Converts a Java `ExecutorService`.
Expand All @@ -178,17 +222,18 @@ object ExecutorScheduler {
private final class FromSimpleExecutor(
scheduler: ScheduledExecutorService,
executor: ExecutorService,
r: UncaughtExceptionReporter,
reporter: UncaughtExceptionReporter,
override val executionModel: ExecModel,
override val features: Features)
extends ExecutorScheduler(executor, r) {
additionalFeatures: Features,
) extends ExecutorScheduler(executor, reporter, additionalFeatures) {

@deprecated("Provided for backwards compatibility", "3.0.0")
def this(
scheduler: ScheduledExecutorService,
executor: ExecutorService,
r: UncaughtExceptionReporter,
executionModel: ExecModel) = {
executionModel: ExecModel,
) = {
// $COVERAGE-OFF$
this(scheduler, executor, r, executionModel, Features.empty)
// $COVERAGE-ON$
Expand All @@ -198,53 +243,47 @@ object ExecutorScheduler {
ScheduledExecutors.scheduleOnce(this, scheduler)(initialDelay, unit, r)

override def withExecutionModel(em: ExecModel): SchedulerService =
new FromSimpleExecutor(scheduler, executor, r, em, features)
new FromSimpleExecutor(scheduler, executor, reporter, em, additionalFeatures)

override def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): SchedulerService =
new FromSimpleExecutor(scheduler, executor, r, executionModel, features)
new FromSimpleExecutor(scheduler, executor, r, executionModel, additionalFeatures)
}

/** Converts a Java `ScheduledExecutorService`. */
private final class FromScheduledExecutor(
s: ScheduledExecutorService,
r: UncaughtExceptionReporter,
/** Implementation of ExecutorScheduler backed by Java `ScheduledExecutorService`. Assumes error reporting is done by
* the underlying `ScheduledExecutorService`.
*
* Currently intended for use only with AdaptedThreadPoolExecutor.
*/
private final class FromAdaptedThreadPoolExecutor(
executor: AdaptedScheduledThreadPoolExecutor,
override val executionModel: ExecModel,
override val features: Features)
extends ExecutorScheduler(s, r) {

@deprecated("Provided for backwards compatibility", "3.0.0")
def this(scheduler: ScheduledExecutorService, r: UncaughtExceptionReporter, executionModel: ExecModel) = {
// $COVERAGE-OFF$
this(scheduler, r, executionModel, Features.empty)
// $COVERAGE-ON$
}

override def executor: ScheduledExecutorService = s
additionalFeatures: Features,
) extends ExecutorScheduler(executor, null, additionalFeatures) {

def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
if (initialDelay <= 0) {
execute(r)
Cancelable.empty
} else {
val task = s.schedule(r, initialDelay, unit)
val task = executor.schedule(r, initialDelay, unit)
Cancelable(() => { task.cancel(true); () })
}
}

override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
val task = s.scheduleWithFixedDelay(r, initialDelay, delay, unit)
val task = executor.scheduleWithFixedDelay(r, initialDelay, delay, unit)
Cancelable(() => { task.cancel(false); () })
}

override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = {
val task = s.scheduleAtFixedRate(r, initialDelay, period, unit)
val task = executor.scheduleAtFixedRate(r, initialDelay, period, unit)
Cancelable(() => { task.cancel(false); () })
}

override def withExecutionModel(em: ExecModel): SchedulerService =
new FromScheduledExecutor(s, r, em, features)
new FromAdaptedThreadPoolExecutor(executor, em, additionalFeatures)

override def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): SchedulerService =
new FromScheduledExecutor(s, r, executionModel, features)
new FromAdaptedThreadPoolExecutor(executor.withUncaughtExceptionReporter(r), executionModel, additionalFeatures)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
* @param reporter $reporter
*/
def apply(executor: ExecutorService, reporter: UncaughtExceptionReporter): SchedulerService =
ExecutorScheduler(executor, reporter, ExecModel.Default, Features.empty)
ExecutorScheduler.fromExecutorService(executor, reporter, ExecModel.Default, Features.empty)

/** [[monix.execution.Scheduler Scheduler]] builder that converts a
* Java `ExecutorService` into a scheduler.
Expand All @@ -124,18 +124,16 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
def apply(
executor: ExecutorService,
reporter: UncaughtExceptionReporter,
executionModel: ExecModel): SchedulerService = {

ExecutorScheduler(executor, reporter, executionModel, Features.empty)
}
executionModel: ExecModel): SchedulerService =
ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty)

/** [[monix.execution.Scheduler Scheduler]] builder that converts a
* Java `ExecutorService` into a scheduler.
*
* @param executor $executorService
*/
def apply(executor: ExecutorService): SchedulerService =
ExecutorScheduler(executor, UncaughtExceptionReporter.default, ExecModel.Default, Features.empty)
ExecutorScheduler.fromExecutorService(executor, UncaughtExceptionReporter.default, ExecModel.Default, Features.empty)

/** [[monix.execution.Scheduler Scheduler]] builder that converts a
* Java `ExecutorService` into a scheduler.
Expand All @@ -144,7 +142,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
* @param executionModel $executionModel
*/
def apply(executor: ExecutorService, executionModel: ExecModel): SchedulerService =
ExecutorScheduler(executor, UncaughtExceptionReporter.default, executionModel, Features.empty)
ExecutorScheduler.fromExecutorService(executor, UncaughtExceptionReporter.default, executionModel, Features.empty)

/** [[monix.execution.Scheduler Scheduler]] builder - uses monix's
* default `ScheduledExecutorService` for handling the scheduling of tasks.
Expand Down Expand Up @@ -298,7 +296,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
new SynchronousQueue[Runnable](false),
threadFactory)

ExecutorScheduler(executor, reporter, executionModel, Features.empty)
ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty)
}

/** Builds a [[Scheduler]] backed by an internal
Expand Down Expand Up @@ -340,7 +338,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
new SynchronousQueue[Runnable](false),
threadFactory)

ExecutorScheduler(executor, reporter, executionModel, Features.empty)
ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty)
}

/** Builds a [[monix.execution.Scheduler Scheduler]] that schedules and executes tasks on its own thread.
Expand All @@ -365,12 +363,9 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
executionModel: ExecModel = ExecModel.Default): SchedulerService = {

val factory = ThreadFactoryBuilder(name, reporter, daemonic)
val executor = new AdaptedThreadPoolExecutor(1, factory) {
override def reportFailure(t: Throwable): Unit =
reporter.reportFailure(t)
}
val executor = new AdaptedScheduledThreadPoolExecutor(1, factory, reporter)

ExecutorScheduler(executor, null, executionModel, Features.empty)
ExecutorScheduler.scheduledThreadPool(executor, executionModel, Features.empty)
}

/** Builds a [[monix.execution.Scheduler Scheduler]] with a fixed thread-pool.
Expand All @@ -394,12 +389,9 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
executionModel: ExecModel = ExecModel.Default): SchedulerService = {

val factory = ThreadFactoryBuilder(name, reporter, daemonic)
val executor = new AdaptedThreadPoolExecutor(poolSize, factory) {
override def reportFailure(t: Throwable): Unit =
reporter.reportFailure(t)
}
val executor = new AdaptedScheduledThreadPoolExecutor(poolSize, factory, reporter)

ExecutorScheduler(executor, null, executionModel, Features.empty)
ExecutorScheduler.scheduledThreadPool(executor, executionModel, Features.empty)
}

/** The explicit global `Scheduler`. Invoke `global` when you want
Expand Down
Loading
Loading