Structured concurrency explained - Part 2: Exceptions and Cancellations

Error handling is an important part to understand in the workflow of structured concurrency. In this post, we will dive deep into this and uncover some of the most surprising and misleading parts of Kotlin coroutines!
Walid Lezzar
Walid Lezzar
March 23, 2022
kotlin, coroutines, concurrency, structured-concurrency, exceptions

Introduction

In part 1 of this series, we introduced the concept of structured concurrency. We showed how it immediately makes concurrent programs safer, more correct, and easier to reason about. If you missed this part, I definitely recommend checking it out!

To achieve this safety in every situation, including when things fail, structured concurrency is very strict and opinionated about the way exceptions are handled and propagated throughout coroutines job hierarchies. This behavior can sometimes seem counterintuitive and confusing if not understood correctly. This is precisely what we will cover in this part 2 of the series. More specifically, we will go over the following topics:

  1. Exception handling inside a coroutine scope
  2. The confusing part!
  3. Using supervisorScope when child failures are not fatal
  4. Cancellation and blocking code

Ready? Let’s go!

Exception handling inside a coroutine scope

Let’s start our journey with this simple example: We want to create a simple pipeline between a Producer coroutine and a Consumer coroutine exchanging data through a channel. Both coroutines run concurrently within a coroutine scope:

  • The Producer continuously calls the fetchDataFromRemote() function to download some data from a remote service before piping it out to a channel.
  • The Consumer continuously polls data from the channel and prints it on the screen.
1
fun runPipeline() {
2
coroutineScope {
3
val channel = Channel<String>()
4
5
launch(CoroutineName("Producer")) {
6
while (true) {
7
val data = fetchDataFromRemote()
8
channel.send(data)
9
}
10
}
11
12
launch(CoroutineName("Consumer")) {
13
for (data in channel) {
14
println(data)
15
}
16
}
17
}
18
}

In the setup above, our coroutines job hierarchy would look like the following:

Coroutines job Hierarchy describing example 1
Coroutines job Hierarchy describing example 1

Now, let’s imagine that the fetchDataFromRemote() call throws an exception (ex. connectivity error).

1
fun runPipeline() {
2
coroutineScope {
3
val channel = Channel<String>()
4
5
launch(CoroutineName("Producer")) {
6
while (true) {
7
// Boom!! Throws an exception!!!
8
val data = fetchDataFromRemote()
9
channel.send(data)
10
}
11
}
12
13
launch(CoroutineName("Consumer")) {
14
for (data in channel) {
15
println(data)
16
}
17
}
18
}
19
}

This would cause the Producer coroutine to fail. What do you think would happen then? Will the exception go unnoticed as we did not log it anywhere? Will we end up with the Consumer coroutine running alone forever, desperately waiting for some data to arrive in the channel?

Fortunately, none of those bad things would happen thanks to structured concurrency: The parent job will notice the Producer coroutine’s failure and will immediately take care of cancelling the other children within the scope (to prevent them from staying active forever) before bubbling the exception up to the outside.

Let’s review this workflow step by step (an animation below will further summarize it):

  1. It all starts with the fetchFromRemote() throwing an exception, causing the Producer coroutine to fail.
  2. The parent job notices the failure of one of its children and, consequently, marks itself as failed and cancels all its other children that are still running.
  3. The child coroutines that are cancelled will throw a CancellationException on their next call to a suspending function (or immediately if they are already suspended). That’s how they trigger their exit.
  4. Once all child coroutines are stopped, the original exception is forwarded out of the coroutineScope call.

These steps are summarized by the animation below1 :

This is how structured concurrency saves us from leaking exceptions and coroutines when failures occur!

The confusing part

The error handling behavior we saw above is desirable in most scenarios. But sometimes, it might be extremely confusing. Especially when we want to handle the exceptions of the child coroutines ourselves.

Let’s demonstrate this using a slightly different example: We want to fetch and print the Github profile of a user on the screen. We decide that failures are not fatal: if we fail to fetch the Github profile of the user (ex. connectivity error), we just want to print an error message on the screen and prevent the exception from bubbling up.

1
coroutineScope {
2
val profile = async(CoroutineName("Fetcher")) {
3
// This call might fail!!
4
fetchGithubProfile("wlezzar")
5
}
6
7
try {
8
println(profile.await())
9
} catch (err: Exception) {
10
println("Failed to fetch data from github: ${err}")
11
}
12
}

The associated job hierarchy would look like this:

Coroutines job Hierarchy describing example 2
Coroutines job Hierarchy describing example 2

We would think that catching the exception during the .await() call would safely prevent the exception from bubbling up. But, unfortunately, that’s not the case! In fact, if you run the program above and simulate a failure in the fetchGithubProfile call, the whole program will crash, and the exception will propagate outside of the coroutineScope. Why is that?

Again, the explanation is the same as in the previous example: by the time we catch the exception at the .await() call, it is already too late, the associated child coroutine (spun up using async ) would have already failed as we did not catch the exception inside the async { ... } block. Thus, the same mechanics as previously will happen: The parent job will notice its child failure, marks itself as failed, cancels the other remaining child coroutines, and bubbles the exception up. The try ... catch around the .await() only allows us not to exit immediately and gives us a chance to react to the failure (ex. do some resources cleaning).

To prevent this scenario from happening, we have two options:

  1. Either we catch the exception inside the async block itself. This would prevent the child coroutine from failing in the first place.
  2. Or we could use supervisorScope instead of coroutineScope to ignore child coroutine failures.

Using supervisorScope when child failures are not fatal

Sometimes, like in the last example, child coroutines failures are not fatal and should not cause the entire job hierarchy to fail. Instead, we would like to react to those failures in our own way. This is why supervisorScope exists. It lets us do exactly that.

supervisorScope is a slightly modified version of coroutineScope . Both have the same purpose of creating a new coroutine scope with an explicit lifetime in which we can spin up child coroutines. The difference between the two is that: supervisorScope uses a SupervisorJob as the parent job to hold the child coroutines whereas coroutineScope uses a regular Job instead.

SupervisorJob is a special implementation of Job that ignores child failures. When those occur, it does not cancel the other children coroutines and does not bubble up the exception.

Here is how it can be used in the previous example:

1
supervisorScope {
2
val profile = async(CoroutineName("Fetcher")) {
3
fetchGithubProfil("wlezzar")
4
}
5
6
try {
7
println(profile.await())
8
} catch (err: Exception) {
9
println("Failed to fetch profile data: ${err}")
10
}
11
}

Now, when a failure happens in the fetchGithubProfile call, this will still cause the child coroutine to fail but the parent job (SupervisorJob) will ignore this failure, and the supervisorScope will not bubble it up to the outside.

One critical point to keep in mind: whenever we use a supervisor job as a parent in a scope, it becomes very important to catch the exceptions of child coroutines ourselves, otherwise they would be lost!

The way we handle a child coroutine’s exception when using a supervisor job as a parent depends on whether this coroutine was spinned up using launch or async:

  • Failures of coroutines spinned using async are caught by wrapping the .await() inside a try...catch.
  • On the other hand, failures of coroutines spinned up using launch can be handled using coroutine exception handlers.

Cancellation and blocking code

When failures occur, the parent job is responsible for triggering the cancellation of its child coroutines. However, cancellation is cooperative! This means that when coroutines are asked to cancel, they need to cooperate by listening to those cancellation signals and reacting appropriately (ex. by exiting). You cannot force them to! In this case, we say that a coroutine is cooperative to cancellation or cancellable.

In the Kotlin coroutines world, you rarely need to worry about making your coroutines cooperative because all the suspending functions (like delay, consuming from a channel, etc.) are natively already cancellable. Thus, your code will usually be cancellable right away without any extra effort.

However, when running a blocking code inside coroutines, you do need to worry about making it cooperative to cancellation.

Here is a hypothetical example:

1
// Blocking code
2
fun someSlowBlockingCode() {
3
// Do some computation
4
}
5
6
val job = launch(Dispatchers.IO) {
7
someSlowBlockingCode()
8
}
9
10
job.cancel()
11
job.join() // Will hang until the blocking code is done!

The call to someSlowBlockingCode() is not coroutines aware. It will not suspend and will never react to cancellations. Consequently, canceling the associated job will not cause the blocking function to interrupt. Instead, it will keep running until it’s done.

How can we fix this?

There are several approaches depending on whether you own or not the implementation of the blocking code.

One approach to make a blocking code cooperative to cancellations is to wrap it inside a runInterruptible block, which is specifically built for that purpose.

1
val job = launch(Dispatchers.IO) {
2
runInterruptible {
3
someSlowBlockingCode()
4
}
5
}
6
7
job.cancel()
8
job.join()

When the coroutine is cancelled, runInterruptible interrupts the underlying thread before throwing a CancellationException to the outside. However, for this to work, the blocking call underneath must itself be cooperative to thread interupts which is not guaranteed! This is usually the case when:

  • The thread is blocked on Object.wait(), Thread.sleep(), or Thread.join()
  • Most java.util.concurrent structures are interruptible.
  • When using Java NIO (but not java.io).

You can find more information around thread interrupts in the Thread.interrupt() javadoc.

Despite not being a bullet proof solution, the runInterruptible function is still very useful in many situations, especially when you don’t have control over the implementation of the underlying blocking code.

Otherwise, if you do own the blocking code’s implementation, another way to make it cooperative to cancellations is by periodically watching the value of the isActive property of the enclosing coroutine scope. When this value is false, it means that the enclosing scope has been cancelled, and the blocking code should exit. Here is an example:

1
fun someSlowBlockingCode(scope: CoroutineScope) {
2
while (scope.isActive) {
3
// Do some short living blocking operation
4
}
5
}
6
7
val job = launch(Dispatchers.IO) {
8
someSlowBlockingCode()
9
}
10
11
job.cancel()
12
job.join()

Conclusion

Error handling in the context of coroutines is certainly one of the most challenging parts to understand and to master. Hopefully, this post gave you some clarity on its inner workings and the reasons why it was designed this way.

So far we have only seen how to create coroutine scopes inside suspend functions using coroutineScope and supervisorScope. In the third and last part of this series, we will discover another useful way of creating independent scopes that are not tied to the lifetime of a function call. This can be achieved using the CoroutineScope factory function. We will also talk about context elements and how they allow us to easily pass data to child coroutines. Stay tuned!


  1. The names used to express the statuses of the jobs in the different steps are for clarity and illustration purpose. Please check the documentation of Job for the actual state machine and status names.
Share this page! 💖
Written by
Profile picture
Walid Lezzar

Software Engineer
Talks about Kotlin, AWS and software architecture.

Continue reading 👇

5 Things that will change the way you use AWS Lambda

September 03, 2022

aws
lambda
functions
faas

5 of the most important things that improved my usage of AWS Lambda and that I wish I had known from the start!

Read more >>

5 Things that will change the way you use AWS Lambda
Structured concurrency explained - Part 1: Introduction

February 13, 2022

kotlin
coroutines
concurrency
structured-concurrency

This post explains structured concurrency in an intuitive way and shows how it simplifies concurrent programs in practice!

Read more >>

Structured concurrency explained - Part 1: Introduction
📚 More articles

Walid Lezzar © 2022