-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Open
Labels
Description
Possible issue with consuming the data from MutableSharedFlow.
Reproduction conditions:
Cancelling the scope in which data is produced and collected and then launching emission and consuming on the new scope, but on the same global instance of MutableSharedFlow. After one or several restarts emitted items are no longer delivered to the collector.
Below is the minimal example to reproduce the issue. However, reproduction of this issue is greatly affected by the work load performed (or time spent) in the collector.
Coroutines versions the issue is tried to be reproduced on: 1.10.2, 1.8.0
Environment: JVM, Android.
Note: correctness of counter values themselves is not important in that example.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
var counter = 0L
val mutableSharedFlow = MutableSharedFlow<Long>()
var cs: CoroutineScope? = null
fun start() {
cs?.cancel()
cs = CoroutineScope(Dispatchers.IO + SupervisorJob())
mutableSharedFlow.onEach {
delay(50)
println(it)
}.launchIn(cs!!)
cs!!.launch {
while (currentCoroutineContext().isActive) {
mutableSharedFlow.emit(counter)
counter++
}
}
}
suspend fun main() {
while (true) {
start()
println("restart, counter = $counter")
delay(3000)
}
}
Output below:
....
47
48
49
50
51
52
53
54
restart, counter = 56
restart, counter = 56
restart, counter = 56
restart, counter = 56
restart, counter = 56
... // proper behavior (emissions happening) is never restored after this point
kirillolenev-dm