I recently found a super nice example that shows how Kotlin Coroutines work. It provides the bare minimum of code but still shows “all” the building blocks necessary to understand the fundamentals. At work, I took the opportunity to discuss it with my team, and the response was overwhelming, even for me. It was a nice journey because I could also strengthen my knowledge about Kotlin Coroutines.
In this blog, I want to share with you what I did with my colleagues. Let’s dive in by showing you the “almost identical to original” code:
class UpdateDiskAfterSomeDependencyCollectUpdater(
private val someDependency: SomeDependency,
private val diskUpdater: DiskUpdater
) {
suspend fun update() {
someDependency.aflow
.onEach { diskUpdater.writeToDisk(it) }
.collect()
}
}
interface SomeDependency {
val aflow: Flow<String>
}
interface DiskUpdater {
fun writeToDisk(textToWrite: String)
}
There is only one class that we’re concerned with, and the update()
function is also the only function that exists within this class. Let me quickly explain what it does:
- It takes the
Flow
fromSomeDependency
- Once this
Flow
emits
, we callDiskUpdater.writeToDisk
with the string provided by theFlow
- We start collecting on the
Flow
by callingcollect()
on it
To test this class, we wrote a test in the following manner:
var someDependencyUpdate = MutableSharedFlow<String>()
val fakeSomeDependency = object : SomeDependency {
override val aflow: Flow<String> = someDependencyUpdate
}
val fakeDiskUpdater = object : DiskUpdater {
var writeToDiskCalled = false
override fun writeToDisk(textToWrite: String) {
writeToDiskCalled = true
}
}
@Test
fun `after emit of some dependency diskUpdater should write to disk`() = runTest {
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)
updater.update()
someDependencyUpdate.emit("Saves this please")
assert(fakeDiskUpdater.writeToDiskCalled)
}
We create a MutableSharedFlow
as a fake for the SomeDependency.aFlow
, faking both dependencies, create an instance of the Updater
class, call update()
on that class, send a value to the MutableSharedFlow
and finally assert()
that the DiskUpter.writeToDisk
function was called.
If you have already figured out what the problem with this test is, feel free to leave. If you don’t, or if you’re just curious, read on.
This test stuck forever
Why is that? Let us look at what we are doing in our test. In addition to creating all that fake noise, we call runTest{}
(which is the recommended way to run a test with coroutines), and later call the suspend
function update()
.
Is this already the problem? Yes, it is!
What does suspend
actually mean?
It means that the function can be paused and resumed at a later time.
Okay, nice phrase. But what exactly does this mean?
To me (and in “normal words”) it means this:
I will not execute the following code until I’m done with my work. While I’m doing this (my work), I will not block the thread so it can do other things in the meantime. As soon as I’m done with my work, I will inform the thread again so that it can resume at the point where I was called to continue executing the following code.
With this in mind, we can explore our (stuck) test again.
We call the suspend
function update()
. This function calls another suspend
function — collect()
. Now, collect()
says “I will not execute the following code until I’m done with my work”. What does “done with my work” mean in this context? It means that the Flow
is either “done” (because it produces no more values), the coroutine is canceled
, or the scope
is canceled
.
Look at the code above, none of these options apply. The Flow
(which is actually a MutableSharedFlow
) is not “done”, the coroutine is not canceled
, and the scope
(which is the TestScope
provided by the runTest
function) is not canceled
. This is why our test is stuck.
The fix in words
Before we discuss potential solutions, let’s consider how to fix the problem. What do we need to do so that it doesn’t get stuck anymore?
As mentioned above, we have three options:
Cancel
“a”scope
Cancel
“a” coroutine- “Finishing” the
Flow
Canceling the scope
is nonsense because this is our TestScope
that we need to run the actual test. Since the Flow
in the actual implementation is a never-ending Flow
I would suggest that we focus on canceling
a coroutine. However, at the end of the blog, I will provide a solution for “finishing” the Flow
.
To cancel
a coroutine, we must call either job.cancel
or scope.cancel
. The latter will also cancel
all of its child coroutines. However, both solutions are, in our case, chasing our own tail. If we cancel
one of them — the TestScope
provided by the runTest
function or the job that is part of the TestScope
— we cancel
our whole test, and therefore do not execute anything anymore, right?
What if we… create a new coroutine instead?
Creating a new coroutine and suspend
that instead of the coroutine provided by TestScope
, would this work? Let’s find out…
The first attempt
To create a new coroutine, we can use one of the existing coroutine builders such as launch
. The following code did not work, but shows you the direction we are heading:
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)
launch {
updater.update()
}
someDependencyUpdate.emit("Saves this please")
assert(fakeDiskUpdater.writeToDiskCalled)
Why does this not work? Because the new coroutine (the code inside launch
) is executed too late. In fact, it will never be executed because the test failed earlier with an “Assertion failed” error.
Remember what I wrote above about suspending
functions.
While I’m doing this (my work), I will not block the thread so it can do other things in the meantime.
Here, the code inside the runTest{}
block is also a suspend
function, but the thread (which is a single thread) never enters a so-called suspending point. Therefore this thread can not “do other things in the meantime” (and thus execute the updater.update()
function). It will be canceled
(by an Assertion failed error) before it has time to execute other coroutines.
Thank God runTest{}
has a nice documentation and gives us some handy information about this case. It suggests using either job.join()
or testScope.advanceUntilIdle()
to suspend
so that the new coroutine can be executed.
On the other hand, the first sentence in the kdoc about job.join()
stated
Suspends the coroutine until this job is complete.
Complete? The newly created coroutine is a never-ending one. So, will it get stuck again? Yes, it will. And so will testScope.advanceUntilIdle()
.
So we need another solution.
We need to wait until the coroutine has been executed, but not until it has been completed.
What about to adding another suspend
function that suspends
the test coroutine so that the (single) thread can get the time and execute other (scheduled) coroutines, and then “jump” back to execute the following code?
The second (failed) attempt
I heard that delay
is a fantastic suspending
function to wait for a certain amount of time and then rusume to execute the following code:
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)
launch {
updater.update()
}
delay(1)
someDependencyUpdate.emit("Saves this please")
assert(fakeDiskUpdater.writeToDiskCalled)
It works 🎉… well, at least the coroutine inside launch
is executed.
But, the test is stuck again. Why so?
Before we continue, let us quickly examine what is happening here. How the thread “jumps” and why it got stuck again.
By default, code is executed from top-to-bottom. This happens here as well. But when it reaches the coroutine builder launch
, the thread will add the code inside it to its “task list”. You can think of it like this. The thread has a list of tasks. As soon as one task is done, it executes the next one and so on. Execution continues, and we reach the delay
— which is a suspend
function and therefore a suspending point. As I wrote above about suspend
functions, the thread can now do other things and will be informed as soon as the suspend
function signals it. Instead of being idle, the thread will look in its task list and find a task — the “call the update.update()
function” task. So it does that, and… finds another suspend
function (the update()
function itself), and thus another suspending point. Now the thread has time to do other things again. The delay
informs the thread that it is done with delaying (suspending
). So the thread jumps back to the delay
and continues executing the code (calling someDependencyUpdate.emit
and assert
).
So, why are we stuck? Because the newly created coroutine is a child of the TestScope
that runs a test coroutine. And that scope and that test coroutine will only be canceled (or finished) when all of their child coroutines are also canceled or finished. But remember, the Flow
never finish.
Wait, what? Did I just say that all child coroutines must be canceled
? 💡
The final fix
The solution is to cancel
the never-ending suspending
coroutine as soon as we don’t need it anymore for our testing purposes:
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)
val updateJob = launch {
updater.update()
}
delay(1)
someDependencyUpdate.emit("Saves this please")
updateJob.cancel()
assert(fakeDiskUpdater.writeToDiskCalled)
Because we are explicitly canceling
the coroutine, the TestScope
, and its test coroutine can be finished as expected. The test is no longer stuck.
Why did I choose this as a good example to show coroutines?
Because it shows what suspend
functions are. They really only suspend
or pause the current code and do not block the current thread. The thread can do other things in the meantime. For example, executing other coroutines that can also call other suspend
functions.
In this example, we can clearly see how the thread “jumps” from one point in the code to another, and then sometime later resumes (“jumps back”) to the starting point.
This is the reason why I think it is a nice example and wrote this blog.
Three more solutions — why not?
Below, I’ll show you three more solutions that don’t get stuck while executing the test. As you read, please keep in mind that I’m not recommending one over the other. They all have their advantages and disadvantages. One may be “better” or nicer to read than the other. But at the end of the day, you should decide what you think fits better to your case.
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)
val job = launch(Dispatchers.Unconfined) {
updater.update()
}
someDependencyUpdate.emit("Saves this please")
job.cancel()
assert(fakeDiskUpdater.writeToDiskCalled)
Here, we don’t need to call another suspend
function (like the delay
we used above). This is because the coroutine uses the Unconfined
Dispatcher. This means that the code is not be added to the threads task list (and executed at a later time), but is executed immediately.
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)
val updateJob = launch {
updater.update()
}
val emitJob = launch {
someDependencyUpdate.emit("Saves this please")
updateJob.cancel()
}
emitJob.join()
assert(fakeDiskUpdater.writeToDiskCalled)
Want to make heavy use of coroutines? Use this code 🙂. The emitJob.join
will suspend
the thread, allowing it to work through its tasks list (which currently contains two tasks). First, it will execute the updater.update()
coroutine. Since this is also a suspending
function, the thread will execute its second task, calling someDependencyUpdate.emit
and cancel
the first coroutine. Since this (second) task has been completed (and the first one was canceled
) it will jump back to the emitJob.join
function and resume from there.
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)
val updateJob = launch {
updater.update()
}
suspendCoroutine<Unit> {
thread {
Thread.sleep(1)
it.resume(Unit)
}
}
someDependencyUpdate.emit("Saves this please")
updateJob.cancel()
assert(fakeDiskUpdater.writeToDiskCalled)
This solution is basically the same as calling delay
, but using the most fundamental function suspendCoroutine
. It behaves exactly like the delay
solution. The code inside the suspendCoroutine
is executed immediately and creates a new thread. The newly created thread will sleep for a millisecond and then resume. Since “the work “ is being moved to another thread, the function returns and thus suspend
here. This allows the thread to do other things (and executing the other coroutine).
The “finish the Flow” solution
As promised, the following shows a solution to finish the Flow
and not deal with any child coroutines.
val someDependencyUpdate = MutableSharedFlow<String>(replay = 1)
val fakeSomeDependency = object : SomeDependency {
override val aflow: Flow<String> = someDependencyUpdate.take(1)
}
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)
someDependencyUpdate.emit("Saves this please")
updater.update()
assert(fakeDiskUpdater.writeToDiskCalled)
First, we add replay = 1
to the MutableSharedFlow
. This means that any new collector will also receive the latest value of this SharedFlow
. An alternative would be to use MutableStateFlow
, which has the same behviour in that case.
Second, we tell the Flow
inside the SomeDependecy
that it should finish (actually, cancel
is the correct wording for it) as soon as it receives its first item.
With these changes, the update
function will still be suspended
until the original Flow
is done. But since it finishes immediately, it will resume shortly after.
I have also published all the code I posted here to my GitHub account in that repository. Feel free to check it out:
I hope you learned a bit or at least strengthened your coroutine knowledge. Personally, I couldn’t resist writing this blog and sharing it with the world. As mentioned above, I think it shows everything you need to know about the fundamentals around Kotlin Coroutines, while having only a bare minimum of required code.