Skip to content

Commit 08da14a

Browse files
committed
- fix error reporting for Schedulers backed by ScheduledExecutorService
- fix updating UncaughtExceptionReporter for Scheduler backed by ScheduledThreadPoolExecutor - fix updating UncaughtExceptionReporter for WrappedSchedulers - fix error reporting for wrapped schedulers
1 parent 2a585b7 commit 08da14a

File tree

8 files changed

+226
-95
lines changed

8 files changed

+226
-95
lines changed

monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedThreadPoolExecutor.scala renamed to monix-execution/jvm/src/main/scala/monix/execution/schedulers/AdaptedScheduledThreadPoolExecutor.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717

1818
package monix.execution.schedulers
1919

20+
import monix.execution.UncaughtExceptionReporter
21+
2022
import java.util.concurrent._
2123

2224
/** A mixin for adapting for the Java `ThreadPoolExecutor` implementation
2325
* to report errors using the default thread exception handler.
2426
*/
25-
private[schedulers] abstract class AdaptedThreadPoolExecutor(corePoolSize: Int, factory: ThreadFactory)
26-
extends ScheduledThreadPoolExecutor(corePoolSize, factory) {
27-
def reportFailure(t: Throwable): Unit
27+
private[schedulers] final class AdaptedScheduledThreadPoolExecutor(
28+
corePoolSize: Int,
29+
factory: ThreadFactory,
30+
reporter: UncaughtExceptionReporter,
31+
) extends ScheduledThreadPoolExecutor(corePoolSize, factory) {
2832

2933
override def afterExecute(r: Runnable, t: Throwable): Unit = {
3034
super.afterExecute(r, t)
@@ -45,6 +49,9 @@ private[schedulers] abstract class AdaptedThreadPoolExecutor(corePoolSize: Int,
4549
}
4650
}
4751

48-
if (exception ne null) reportFailure(exception)
52+
if (exception ne null) reporter.reportFailure(exception)
4953
}
54+
55+
def withUncaughtExceptionReporter(reporter: UncaughtExceptionReporter): AdaptedScheduledThreadPoolExecutor =
56+
new AdaptedScheduledThreadPoolExecutor(corePoolSize, factory, reporter)
5057
}

monix-execution/jvm/src/main/scala/monix/execution/schedulers/ExecutorScheduler.scala

Lines changed: 86 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717

1818
package monix.execution.schedulers
1919

20-
import java.util.concurrent.{ExecutorService, ForkJoinPool, ScheduledExecutorService}
2120
import monix.execution.internal.forkJoin.{AdaptedForkJoinPool, DynamicWorkerThreadFactory, StandardWorkerThreadFactory}
2221
import monix.execution.internal.{InterceptRunnable, Platform, ScheduledExecutors}
23-
import monix.execution.{Cancelable, UncaughtExceptionReporter}
24-
import monix.execution.{Features, Scheduler}
22+
import monix.execution.{Cancelable, Features, Scheduler, UncaughtExceptionReporter}
23+
24+
import java.util.concurrent.{ExecutorService, ScheduledExecutorService}
2525
// Prevents conflict with the deprecated symbol
2626
import monix.execution.{ExecutionModel => ExecModel}
27-
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}
27+
2828
import scala.concurrent.duration.TimeUnit
29+
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}
2930
import scala.util.control.NonFatal
3031

3132
/** An [[ExecutorScheduler]] is a class for building a
@@ -54,9 +55,10 @@ abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporte
5455
awaitOn.execute(new Runnable {
5556
override def run() =
5657
try blocking {
57-
p.success(e.awaitTermination(timeout, unit))
58-
()
59-
} catch {
58+
p.success(e.awaitTermination(timeout, unit))
59+
()
60+
}
61+
catch {
6062
case ex if NonFatal(ex) =>
6163
p.failure(ex); ()
6264
}
@@ -78,10 +80,9 @@ abstract class ExecutorScheduler(e: ExecutorService, r: UncaughtExceptionReporte
7880
}
7981

8082
object ExecutorScheduler {
81-
/** Builder for an [[ExecutorScheduler]], converting a
82-
* Java `ScheduledExecutorService`.
83+
/** Builder for an [[ExecutorScheduler]], converting a Java `ExecutorService`.
8384
*
84-
* @param service is the Java `ScheduledExecutorService` that will take
85+
* @param service is the Java `ExecutorService` that will take
8586
* care of scheduling and execution of all runnables.
8687
* @param reporter is the [[UncaughtExceptionReporter]] that logs uncaught exceptions.
8788
* @param executionModel is the preferred
@@ -91,22 +92,58 @@ object ExecutorScheduler {
9192
* provided `ExecutorService` implements, see the documentation
9293
* for [[monix.execution.Scheduler.features Scheduler.features]]
9394
*/
95+
@deprecated("Use ExecutorScheduler.fromExecutorService", "3.4.2-avs.6")
9496
def apply(
9597
service: ExecutorService,
9698
reporter: UncaughtExceptionReporter,
9799
executionModel: ExecModel,
98-
features: Features): ExecutorScheduler = {
99-
100-
// Implementations will inherit BatchingScheduler, so this is guaranteed
101-
val ft = features + Scheduler.BATCHING
100+
features: Features,
101+
): ExecutorScheduler =
102102
service match {
103-
case ref: ScheduledExecutorService =>
104-
new FromScheduledExecutor(ref, reporter, executionModel, ft)
105-
case _ =>
106-
val s = Defaults.scheduledExecutor
107-
new FromSimpleExecutor(s, service, reporter, executionModel, ft)
103+
case ref: AdaptedScheduledThreadPoolExecutor => scheduledThreadPool(ref, executionModel, features)
104+
case _ => fromExecutorService(service, reporter, executionModel, features)
108105
}
109-
}
106+
107+
/** Builder for an [[ExecutorScheduler]], converting a Java `ExecutorService`.
108+
*
109+
* @param service is the Java `ExecutorService` that will take
110+
* care of scheduling and execution of all runnables.
111+
* @param reporter is the [[UncaughtExceptionReporter]] that logs uncaught exceptions.
112+
* @param executionModel is the preferred
113+
* [[monix.execution.ExecutionModel ExecutionModel]], a guideline
114+
* for run-loops and producers of data.
115+
* @param features is the set of [[Features]] that the
116+
* provided `ExecutorService` implements, see the documentation
117+
* for [[monix.execution.Scheduler.features Scheduler.features]]
118+
*/
119+
def fromExecutorService(
120+
service: ExecutorService,
121+
reporter: UncaughtExceptionReporter,
122+
executionModel: ExecModel,
123+
features: Features,
124+
): ExecutorScheduler =
125+
new FromSimpleExecutor(
126+
scheduler = Defaults.scheduledExecutor,
127+
executor = service,
128+
reporter = reporter,
129+
executionModel = executionModel,
130+
features = withBatching(features)
131+
)
132+
133+
private[schedulers] def scheduledThreadPool(
134+
service: AdaptedScheduledThreadPoolExecutor,
135+
executionModel: ExecModel,
136+
features: Features,
137+
): ExecutorScheduler =
138+
new FromAdaptedThreadPoolExecutor(
139+
executor = service,
140+
executionModel = executionModel,
141+
features = withBatching(features)
142+
)
143+
144+
private def withBatching(features: Features): Features =
145+
// Implementations will inherit BatchingScheduler, so this is guaranteed
146+
features + Scheduler.BATCHING
110147

111148
/**
112149
* DEPRECATED — provided for binary backwards compatibility.
@@ -117,9 +154,10 @@ object ExecutorScheduler {
117154
def apply(
118155
service: ExecutorService,
119156
reporter: UncaughtExceptionReporter,
120-
executionModel: ExecModel): ExecutorScheduler = {
157+
executionModel: ExecModel
158+
): ExecutorScheduler = {
121159
// $COVERAGE-OFF$
122-
apply(service, reporter, executionModel, Features.empty)
160+
fromExecutorService(service, reporter, executionModel, Features.empty)
123161
// $COVERAGE-ON$
124162
}
125163

@@ -131,7 +169,8 @@ object ExecutorScheduler {
131169
parallelism: Int,
132170
daemonic: Boolean,
133171
reporter: UncaughtExceptionReporter,
134-
executionModel: ExecModel): ExecutorScheduler = {
172+
executionModel: ExecModel,
173+
): ExecutorScheduler = {
135174

136175
val handler = reporter.asJava
137176
val pool = new AdaptedForkJoinPool(
@@ -142,7 +181,7 @@ object ExecutorScheduler {
142181
asyncMode = true
143182
)
144183

145-
apply(pool, reporter, executionModel, Features.empty)
184+
fromExecutorService(pool, reporter, executionModel, Features.empty)
146185
}
147186

148187
/** Creates an [[ExecutorScheduler]] backed by a `ForkJoinPool`
@@ -154,7 +193,8 @@ object ExecutorScheduler {
154193
maxThreads: Int,
155194
daemonic: Boolean,
156195
reporter: UncaughtExceptionReporter,
157-
executionModel: ExecModel): ExecutorScheduler = {
196+
executionModel: ExecModel,
197+
): ExecutorScheduler = {
158198

159199
val exceptionHandler = reporter.asJava
160200
val pool = new AdaptedForkJoinPool(
@@ -165,7 +205,7 @@ object ExecutorScheduler {
165205
asyncMode = true
166206
)
167207

168-
apply(pool, reporter, executionModel, Features.empty)
208+
fromExecutorService(pool, reporter, executionModel, Features.empty)
169209
}
170210

171211
/** Converts a Java `ExecutorService`.
@@ -178,17 +218,18 @@ object ExecutorScheduler {
178218
private final class FromSimpleExecutor(
179219
scheduler: ScheduledExecutorService,
180220
executor: ExecutorService,
181-
r: UncaughtExceptionReporter,
221+
reporter: UncaughtExceptionReporter,
182222
override val executionModel: ExecModel,
183-
override val features: Features)
184-
extends ExecutorScheduler(executor, r) {
223+
override val features: Features
224+
) extends ExecutorScheduler(executor, reporter) {
185225

186226
@deprecated("Provided for backwards compatibility", "3.0.0")
187227
def this(
188228
scheduler: ScheduledExecutorService,
189229
executor: ExecutorService,
190230
r: UncaughtExceptionReporter,
191-
executionModel: ExecModel) = {
231+
executionModel: ExecModel,
232+
) = {
192233
// $COVERAGE-OFF$
193234
this(scheduler, executor, r, executionModel, Features.empty)
194235
// $COVERAGE-ON$
@@ -198,53 +239,47 @@ object ExecutorScheduler {
198239
ScheduledExecutors.scheduleOnce(this, scheduler)(initialDelay, unit, r)
199240

200241
override def withExecutionModel(em: ExecModel): SchedulerService =
201-
new FromSimpleExecutor(scheduler, executor, r, em, features)
242+
new FromSimpleExecutor(scheduler, executor, reporter, em, features)
202243

203244
override def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): SchedulerService =
204245
new FromSimpleExecutor(scheduler, executor, r, executionModel, features)
205246
}
206247

207-
/** Converts a Java `ScheduledExecutorService`. */
208-
private final class FromScheduledExecutor(
209-
s: ScheduledExecutorService,
210-
r: UncaughtExceptionReporter,
248+
/** Implementation of ExecutorScheduler backed by Java `ScheduledExecutorService`. Assumes error reporting is done by
249+
* the underlying `ScheduledExecutorService`.
250+
*
251+
* Currently intended for use only with AdaptedThreadPoolExecutor.
252+
*/
253+
private final class FromAdaptedThreadPoolExecutor(
254+
executor: AdaptedScheduledThreadPoolExecutor,
211255
override val executionModel: ExecModel,
212-
override val features: Features)
213-
extends ExecutorScheduler(s, r) {
214-
215-
@deprecated("Provided for backwards compatibility", "3.0.0")
216-
def this(scheduler: ScheduledExecutorService, r: UncaughtExceptionReporter, executionModel: ExecModel) = {
217-
// $COVERAGE-OFF$
218-
this(scheduler, r, executionModel, Features.empty)
219-
// $COVERAGE-ON$
220-
}
221-
222-
override def executor: ScheduledExecutorService = s
256+
override val features: Features
257+
) extends ExecutorScheduler(executor, null) {
223258

224259
def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
225260
if (initialDelay <= 0) {
226261
execute(r)
227262
Cancelable.empty
228263
} else {
229-
val task = s.schedule(r, initialDelay, unit)
264+
val task = executor.schedule(r, initialDelay, unit)
230265
Cancelable(() => { task.cancel(true); () })
231266
}
232267
}
233268

234269
override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
235-
val task = s.scheduleWithFixedDelay(r, initialDelay, delay, unit)
270+
val task = executor.scheduleWithFixedDelay(r, initialDelay, delay, unit)
236271
Cancelable(() => { task.cancel(false); () })
237272
}
238273

239274
override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = {
240-
val task = s.scheduleAtFixedRate(r, initialDelay, period, unit)
275+
val task = executor.scheduleAtFixedRate(r, initialDelay, period, unit)
241276
Cancelable(() => { task.cancel(false); () })
242277
}
243278

244279
override def withExecutionModel(em: ExecModel): SchedulerService =
245-
new FromScheduledExecutor(s, r, em, features)
280+
new FromAdaptedThreadPoolExecutor(executor, em, features)
246281

247282
override def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): SchedulerService =
248-
new FromScheduledExecutor(s, r, executionModel, features)
283+
new FromAdaptedThreadPoolExecutor(executor.withUncaughtExceptionReporter(r), executionModel, features)
249284
}
250285
}

monix-execution/jvm/src/main/scala/monix/execution/schedulers/SchedulerCompanionImpl.scala

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
112112
* @param reporter $reporter
113113
*/
114114
def apply(executor: ExecutorService, reporter: UncaughtExceptionReporter): SchedulerService =
115-
ExecutorScheduler(executor, reporter, ExecModel.Default, Features.empty)
115+
ExecutorScheduler.fromExecutorService(executor, reporter, ExecModel.Default, Features.empty)
116116

117117
/** [[monix.execution.Scheduler Scheduler]] builder that converts a
118118
* Java `ExecutorService` into a scheduler.
@@ -124,18 +124,16 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
124124
def apply(
125125
executor: ExecutorService,
126126
reporter: UncaughtExceptionReporter,
127-
executionModel: ExecModel): SchedulerService = {
128-
129-
ExecutorScheduler(executor, reporter, executionModel, Features.empty)
130-
}
127+
executionModel: ExecModel): SchedulerService =
128+
ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty)
131129

132130
/** [[monix.execution.Scheduler Scheduler]] builder that converts a
133131
* Java `ExecutorService` into a scheduler.
134132
*
135133
* @param executor $executorService
136134
*/
137135
def apply(executor: ExecutorService): SchedulerService =
138-
ExecutorScheduler(executor, UncaughtExceptionReporter.default, ExecModel.Default, Features.empty)
136+
ExecutorScheduler.fromExecutorService(executor, UncaughtExceptionReporter.default, ExecModel.Default, Features.empty)
139137

140138
/** [[monix.execution.Scheduler Scheduler]] builder that converts a
141139
* Java `ExecutorService` into a scheduler.
@@ -144,7 +142,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
144142
* @param executionModel $executionModel
145143
*/
146144
def apply(executor: ExecutorService, executionModel: ExecModel): SchedulerService =
147-
ExecutorScheduler(executor, UncaughtExceptionReporter.default, executionModel, Features.empty)
145+
ExecutorScheduler.fromExecutorService(executor, UncaughtExceptionReporter.default, executionModel, Features.empty)
148146

149147
/** [[monix.execution.Scheduler Scheduler]] builder - uses monix's
150148
* default `ScheduledExecutorService` for handling the scheduling of tasks.
@@ -298,7 +296,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
298296
new SynchronousQueue[Runnable](false),
299297
threadFactory)
300298

301-
ExecutorScheduler(executor, reporter, executionModel, Features.empty)
299+
ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty)
302300
}
303301

304302
/** Builds a [[Scheduler]] backed by an internal
@@ -340,7 +338,7 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
340338
new SynchronousQueue[Runnable](false),
341339
threadFactory)
342340

343-
ExecutorScheduler(executor, reporter, executionModel, Features.empty)
341+
ExecutorScheduler.fromExecutorService(executor, reporter, executionModel, Features.empty)
344342
}
345343

346344
/** Builds a [[monix.execution.Scheduler Scheduler]] that schedules and executes tasks on its own thread.
@@ -365,12 +363,9 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
365363
executionModel: ExecModel = ExecModel.Default): SchedulerService = {
366364

367365
val factory = ThreadFactoryBuilder(name, reporter, daemonic)
368-
val executor = new AdaptedThreadPoolExecutor(1, factory) {
369-
override def reportFailure(t: Throwable): Unit =
370-
reporter.reportFailure(t)
371-
}
366+
val executor = new AdaptedScheduledThreadPoolExecutor(1, factory, reporter)
372367

373-
ExecutorScheduler(executor, null, executionModel, Features.empty)
368+
ExecutorScheduler.scheduledThreadPool(executor, executionModel, Features.empty)
374369
}
375370

376371
/** Builds a [[monix.execution.Scheduler Scheduler]] with a fixed thread-pool.
@@ -394,12 +389,9 @@ private[execution] class SchedulerCompanionImpl extends SchedulerCompanion {
394389
executionModel: ExecModel = ExecModel.Default): SchedulerService = {
395390

396391
val factory = ThreadFactoryBuilder(name, reporter, daemonic)
397-
val executor = new AdaptedThreadPoolExecutor(poolSize, factory) {
398-
override def reportFailure(t: Throwable): Unit =
399-
reporter.reportFailure(t)
400-
}
392+
val executor = new AdaptedScheduledThreadPoolExecutor(poolSize, factory, reporter)
401393

402-
ExecutorScheduler(executor, null, executionModel, Features.empty)
394+
ExecutorScheduler.scheduledThreadPool(executor, executionModel, Features.empty)
403395
}
404396

405397
/** The explicit global `Scheduler`. Invoke `global` when you want

0 commit comments

Comments
 (0)