11package kotlinx.coroutines.experimental.reactor
22
3- import kotlinx.coroutines.experimental.CancellableContinuation
4- import kotlinx.coroutines.experimental.CoroutineDispatcher
5- import kotlinx.coroutines.experimental.Delay
6- import kotlinx.coroutines.experimental.DisposableHandle
7- import kotlinx.coroutines.experimental.disposeOnCompletion
8- import reactor.core.Cancellation
3+ import kotlinx.coroutines.experimental.*
4+ import reactor.core.Disposable
95import reactor.core.scheduler.Scheduler
10- import reactor.core.scheduler.TimedScheduler
116import java.util.concurrent.TimeUnit
127import kotlin.coroutines.experimental.CoroutineContext
138
@@ -16,45 +11,31 @@ import kotlin.coroutines.experimental.CoroutineContext
1611 */
1712fun Scheduler.asCoroutineDispatcher () = SchedulerCoroutineDispatcher (this )
1813
19- /* *
20- * Converts an instance of [TimedScheduler] to an implementation of [CoroutineDispatcher]
21- * and provides native [delay][Delay.delay] support.
22- */
23- fun TimedScheduler.asCoroutineDispatcher () = TimedSchedulerCoroutineDispatcher (this )
24-
2514/* *
2615 * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
2716 * @param scheduler a scheduler.
2817 */
29- open class SchedulerCoroutineDispatcher (private val scheduler : Scheduler ) : CoroutineDispatcher() {
18+ open class SchedulerCoroutineDispatcher (private val scheduler : Scheduler ) : CoroutineDispatcher(), Delay {
3019 override fun dispatch (context : CoroutineContext , block : Runnable ) {
3120 scheduler.schedule(block)
3221 }
3322
34- override fun toString (): String = scheduler.toString()
35- override fun equals (other : Any? ): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler == = scheduler
36- override fun hashCode (): Int = System .identityHashCode(scheduler)
37- }
38-
39- /* *
40- * Implements [CoroutineDispatcher] on top of an arbitrary [TimedScheduler].
41- * @param scheduler a timed scheduler.
42- */
43- open class TimedSchedulerCoroutineDispatcher (private val scheduler : TimedScheduler ) : SchedulerCoroutineDispatcher(scheduler), Delay {
44-
4523 override fun scheduleResumeAfterDelay (time : Long , unit : TimeUnit , continuation : CancellableContinuation <Unit >) {
4624 val disposable = scheduler.schedule({
4725 with (continuation) { resumeUndispatched(Unit ) }
4826 }, time, unit)
49-
5027 continuation.disposeOnCompletion(disposable.asDisposableHandle())
5128 }
5229
5330 override fun invokeOnTimeout (time : Long , unit : TimeUnit , block : Runnable ): DisposableHandle =
5431 scheduler.schedule(block, time, unit).asDisposableHandle()
32+
33+ override fun toString (): String = scheduler.toString()
34+ override fun equals (other : Any? ): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler == = scheduler
35+ override fun hashCode (): Int = System .identityHashCode(scheduler)
5536}
5637
57- private fun Cancellation .asDisposableHandle (): DisposableHandle =
58- object : DisposableHandle {
59- override fun dispose () = this @asDisposableHandle.dispose()
60- }
38+ private fun Disposable .asDisposableHandle (): DisposableHandle =
39+ object : DisposableHandle {
40+ override fun dispose () = this @asDisposableHandle.dispose()
41+ }
0 commit comments