My attempt on explaining Kotlin Coroutines

Stefan M.
9 min readFeb 27, 2023

--

Image taken and modified from here

I recently found a super nice example that shows how Kotlin Coroutines work. It provides the bare minimum of code but still shows “all” the building blocks necessary to understand the fundamentals. At work, I took the opportunity to discuss it with my team, and the response was overwhelming, even for me. It was a nice journey because I could also strengthen my knowledge about Kotlin Coroutines.

In this blog, I want to share with you what I did with my colleagues. Let’s dive in by showing you the “almost identical to original” code:

class UpdateDiskAfterSomeDependencyCollectUpdater(
private val someDependency: SomeDependency,
private val diskUpdater: DiskUpdater
) {
suspend fun update() {
someDependency.aflow
.onEach { diskUpdater.writeToDisk(it) }
.collect()
}
}

interface SomeDependency {
val aflow: Flow<String>
}

interface DiskUpdater {
fun writeToDisk(textToWrite: String)
}

There is only one class that we’re concerned with, and the update() function is also the only function that exists within this class. Let me quickly explain what it does:

  • It takes the Flow from SomeDependency
  • Once this Flow emits, we call DiskUpdater.writeToDisk with the string provided by the Flow
  • We start collecting on the Flow by calling collect() on it

To test this class, we wrote a test in the following manner:

var someDependencyUpdate = MutableSharedFlow<String>()

val fakeSomeDependency = object : SomeDependency {
override val aflow: Flow<String> = someDependencyUpdate
}

val fakeDiskUpdater = object : DiskUpdater {
var writeToDiskCalled = false
override fun writeToDisk(textToWrite: String) {
writeToDiskCalled = true
}
}

@Test
fun `after emit of some dependency diskUpdater should write to disk`() = runTest {
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)

updater.update()

someDependencyUpdate.emit("Saves this please")

assert(fakeDiskUpdater.writeToDiskCalled)
}

We create a MutableSharedFlow as a fake for the SomeDependency.aFlow, faking both dependencies, create an instance of the Updater class, call update() on that class, send a value to the MutableSharedFlow and finally assert() that the DiskUpter.writeToDisk function was called.

If you have already figured out what the problem with this test is, feel free to leave. If you don’t, or if you’re just curious, read on.

This test stuck forever

Why is that? Let us look at what we are doing in our test. In addition to creating all that fake noise, we call runTest{} (which is the recommended way to run a test with coroutines), and later call the suspend function update().

Is this already the problem? Yes, it is!

What does suspend actually mean?
It means that the function can be paused and resumed at a later time.

Okay, nice phrase. But what exactly does this mean?
To me (and in “normal words”) it means this:

I will not execute the following code until I’m done with my work. While I’m doing this (my work), I will not block the thread so it can do other things in the meantime. As soon as I’m done with my work, I will inform the thread again so that it can resume at the point where I was called to continue executing the following code.

With this in mind, we can explore our (stuck) test again.
We call the suspend function update(). This function calls another suspend function — collect(). Now, collect() says “I will not execute the following code until I’m done with my work”. What does “done with my work” mean in this context? It means that the Flow is either “done” (because it produces no more values), the coroutine is canceled, or the scope is canceled.

Look at the code above, none of these options apply. The Flow (which is actually a MutableSharedFlow) is not “done”, the coroutine is not canceled, and the scope (which is the TestScope provided by the runTest function) is not canceled. This is why our test is stuck.

The fix in words

Before we discuss potential solutions, let’s consider how to fix the problem. What do we need to do so that it doesn’t get stuck anymore?

As mentioned above, we have three options:

  • Cancel “a” scope
  • Cancel “a” coroutine
  • “Finishing” the Flow

Canceling the scope is nonsense because this is our TestScope that we need to run the actual test. Since the Flow in the actual implementation is a never-ending Flow I would suggest that we focus on canceling a coroutine. However, at the end of the blog, I will provide a solution for “finishing” the Flow.

To cancel a coroutine, we must call either job.cancel or scope.cancel. The latter will also cancel all of its child coroutines. However, both solutions are, in our case, chasing our own tail. If we cancel one of them — the TestScope provided by the runTest function or the job that is part of the TestScope — we cancel our whole test, and therefore do not execute anything anymore, right?

What if we… create a new coroutine instead?

Creating a new coroutine and suspend that instead of the coroutine provided by TestScope, would this work? Let’s find out…

The first attempt

To create a new coroutine, we can use one of the existing coroutine builders such as launch. The following code did not work, but shows you the direction we are heading:

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)

launch {
updater.update()
}

someDependencyUpdate.emit("Saves this please")

assert(fakeDiskUpdater.writeToDiskCalled)

Why does this not work? Because the new coroutine (the code inside launch) is executed too late. In fact, it will never be executed because the test failed earlier with an “Assertion failed” error.

Remember what I wrote above about suspending functions.

While I’m doing this (my work), I will not block the thread so it can do other things in the meantime.

Here, the code inside the runTest{} block is also a suspend function, but the thread (which is a single thread) never enters a so-called suspending point. Therefore this thread can notdo other things in the meantime” (and thus execute the updater.update() function). It will be canceled (by an Assertion failed error) before it has time to execute other coroutines.

Thank God runTest{} has a nice documentation and gives us some handy information about this case. It suggests using either job.join() or testScope.advanceUntilIdle() to suspend so that the new coroutine can be executed.

On the other hand, the first sentence in the kdoc about job.join() stated

Suspends the coroutine until this job is complete.

Complete? The newly created coroutine is a never-ending one. So, will it get stuck again? Yes, it will. And so will testScope.advanceUntilIdle().

So we need another solution.
We need to wait until the coroutine has been executed, but not until it has been completed.

What about to adding another suspend function that suspends the test coroutine so that the (single) thread can get the time and execute other (scheduled) coroutines, and then “jump” back to execute the following code?

The second (failed) attempt

I heard that delay is a fantastic suspending function to wait for a certain amount of time and then rusume to execute the following code:

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)

launch {
updater.update()
}

delay(1)
someDependencyUpdate.emit("Saves this please")

assert(fakeDiskUpdater.writeToDiskCalled)

It works 🎉… well, at least the coroutine inside launch is executed.
But, the test is stuck again. Why so?

Before we continue, let us quickly examine what is happening here. How the thread “jumps” and why it got stuck again.

By default, code is executed from top-to-bottom. This happens here as well. But when it reaches the coroutine builder launch, the thread will add the code inside it to its “task list”. You can think of it like this. The thread has a list of tasks. As soon as one task is done, it executes the next one and so on. Execution continues, and we reach the delay — which is a suspend function and therefore a suspending point. As I wrote above about suspend functions, the thread can now do other things and will be informed as soon as the suspend function signals it. Instead of being idle, the thread will look in its task list and find a task — the “call the update.update() function” task. So it does that, and… finds another suspend function (the update() function itself), and thus another suspending point. Now the thread has time to do other things again. The delay informs the thread that it is done with delaying (suspending). So the thread jumps back to the delay and continues executing the code (calling someDependencyUpdate.emit and assert).

So, why are we stuck? Because the newly created coroutine is a child of the TestScope that runs a test coroutine. And that scope and that test coroutine will only be canceled (or finished) when all of their child coroutines are also canceled or finished. But remember, the Flow never finish.

Wait, what? Did I just say that all child coroutines must be canceled? 💡

The final fix

The solution is to cancel the never-ending suspending coroutine as soon as we don’t need it anymore for our testing purposes:

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)

val updateJob = launch {
updater.update()
}

delay(1)
someDependencyUpdate.emit("Saves this please")
updateJob.cancel()

assert(fakeDiskUpdater.writeToDiskCalled)

Because we are explicitly canceling the coroutine, the TestScope, and its test coroutine can be finished as expected. The test is no longer stuck.

Why did I choose this as a good example to show coroutines?

Because it shows what suspend functions are. They really only suspend or pause the current code and do not block the current thread. The thread can do other things in the meantime. For example, executing other coroutines that can also call other suspend functions.

In this example, we can clearly see how the thread “jumps” from one point in the code to another, and then sometime later resumes (“jumps back”) to the starting point.

This is the reason why I think it is a nice example and wrote this blog.

Three more solutions — why not?

Below, I’ll show you three more solutions that don’t get stuck while executing the test. As you read, please keep in mind that I’m not recommending one over the other. They all have their advantages and disadvantages. One may be “better” or nicer to read than the other. But at the end of the day, you should decide what you think fits better to your case.

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)

val job = launch(Dispatchers.Unconfined) {
updater.update()
}

someDependencyUpdate.emit("Saves this please")
job.cancel()

assert(fakeDiskUpdater.writeToDiskCalled)

Here, we don’t need to call another suspend function (like the delay we used above). This is because the coroutine uses the Unconfined Dispatcher. This means that the code is not be added to the threads task list (and executed at a later time), but is executed immediately.

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)

val updateJob = launch {
updater.update()
}

val emitJob = launch {
someDependencyUpdate.emit("Saves this please")
updateJob.cancel()
}

emitJob.join()

assert(fakeDiskUpdater.writeToDiskCalled)

Want to make heavy use of coroutines? Use this code 🙂. The emitJob.join will suspend the thread, allowing it to work through its tasks list (which currently contains two tasks). First, it will execute the updater.update() coroutine. Since this is also a suspending function, the thread will execute its second task, calling someDependencyUpdate.emit and cancel the first coroutine. Since this (second) task has been completed (and the first one was canceled) it will jump back to the emitJob.join function and resume from there.

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)

val updateJob = launch {
updater.update()
}

suspendCoroutine<Unit> {
thread {
Thread.sleep(1)
it.resume(Unit)
}
}
someDependencyUpdate.emit("Saves this please")
updateJob.cancel()

assert(fakeDiskUpdater.writeToDiskCalled)

This solution is basically the same as calling delay, but using the most fundamental function suspendCoroutine. It behaves exactly like the delay solution. The code inside the suspendCoroutine is executed immediately and creates a new thread. The newly created thread will sleep for a millisecond and then resume. Since “the work “ is being moved to another thread, the function returns and thus suspend here. This allows the thread to do other things (and executing the other coroutine).

The “finish the Flow” solution

As promised, the following shows a solution to finish the Flow and not deal with any child coroutines.

val someDependencyUpdate = MutableSharedFlow<String>(replay = 1)
val fakeSomeDependency = object : SomeDependency {
override val aflow: Flow<String> = someDependencyUpdate.take(1)
}
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
someDependency = fakeSomeDependency,
diskUpdater = fakeDiskUpdater
)
someDependencyUpdate.emit("Saves this please")

updater.update()

assert(fakeDiskUpdater.writeToDiskCalled)

First, we add replay = 1 to the MutableSharedFlow. This means that any new collector will also receive the latest value of this SharedFlow. An alternative would be to use MutableStateFlow, which has the same behviour in that case.

Second, we tell the Flow inside the SomeDependecy that it should finish (actually, cancel is the correct wording for it) as soon as it receives its first item.

With these changes, the update function will still be suspended until the original Flow is done. But since it finishes immediately, it will resume shortly after.

I hope you learned a bit or at least strengthened your coroutine knowledge. Personally, I couldn’t resist writing this blog and sharing it with the world. As mentioned above, I think it shows everything you need to know about the fundamentals around Kotlin Coroutines, while having only a bare minimum of required code.

--

--

No responses yet