Structured concurrency explained - Part 1: Introduction

This post explains structured concurrency in an intuitive way and shows how it simplifies concurrent programs in practice!
Walid Lezzar
Walid Lezzar
February 13, 2022
kotlin, coroutines, concurrency, structured-concurrency

Introduction

Structured concurrency is a powerful concept that is important to understand to fully take advantage of the power of Kotlin coroutines.

Today’s post is the first of a series about Structured concurrency: an introduction that takes the challenge of explaining the concepts behind it in a straightforward and intuitive manner. It aims to trigger in you the “Aha!” moment that will make you say: Why doesn’t every single concurrency library use structured concurrency?! Stay tuned ;)

Unstructured concurrency

To understand something and build a good intuition around the subject, it is often helpful to start from the problems it tries to solve in the first place. So let’s consider a simple example program that does not use structured concurrency and see what problems can arise.

The following code creates three concurrent tasks that run in the background in separate threads. Each task’s job is to fetch a bunch of data from a remote service using the fetchData function and add the resulting number to the global sum.

1
val sum = AtomicInteger(0)
2
3
// Create 3 concurrent tasks that compute the total sum in parallel
4
repeat(3) {
5
CompletableFuture.runAsync {
6
val data: Int = fetchData()
7
sum.addAndGet(data)
8
}
9
}
10
11
// Do something useful in the meantime while the sum is being computed
12
Thread.sleep(2000)
13
14
// And then use the sum.
15
println("The final sum is: $sum")

You should have already spotted a few problems in the snippet above:

  • We never wait for the created tasks (futures) to finish their job before using the final sum. The fetchData call might take longer than expected in some of the futures, and we end up using an incomplete sum without being aware.
  • What if one of the futures fails? The fetchData() call might raise an exception because of a networking problem. The future’s failure will go unnoticed and will never be propagated to the outside. In other words, we can leak exceptions! Causing, once again, our sum to be incorrect without our awareness.
  • Last but not least: what if one of the futures gets stuck indefinitely and never ends? Let’s say the fetchData call hangs and never returns because of a deadlock or a bug in its implementation. In this case, our future and the thread hosting it will keep running in the background forever, consuming unnecessary resources. We will likely never be aware of it and will have no way of shutting it down. In other words, we can leak threads!

We can try to fix the problems above by adding some boilerplate code to keep track of the futures in a list, wait for them one by one to finish, and forward any exception that would be thrown.

1
val sum = AtomicInteger(0)
2
3
// A list to keep track of the created futures
4
val futures = ArrayList<Future<Void>>()
5
6
repeat(3) {
7
val future = CompletableFuture.runAsync {
8
val data: Int = fetchData()
9
sum.addAndGet(data)
10
}
11
12
// Add the future to the list
13
futures.add(future)
14
}
15
16
// Do something useful in the meantime in parallel
17
Thread.sleep(2000)
18
19
// Wait for all the futures and propagate exceptions
20
for (future in futures) {
21
future.get()
22
}
23
24
// And then use the sum.
25
println("The final sum is: $sum")

What we did here is actually “Making our concurrency more structured”: Instead of launching the background futures in the air and losing sight of them, we keep track of them in a list and wait for their completion. That is exactly what we mean by structured concurrency: The start and the end of the concurrent paths are clear and explicit.

However, not only is this boilerplate code cumbersome to write, it is also far from being perfect:

  • What if the first future encountered by our for loop throws an exception? In that case, the for loop stops iterating and propagates the exception to the outside without waiting for the remaining futures. So if one of them hangs or takes an unreasonably long amount of time to finish, we would still be potentially leaking threads! To solve this edge case, we need to catch the exception and cancel the remaining futures that are still running before propagating the error to the outside. That requires a much more involved boilerplate code than what we wrote.
  • We didn’t handle user cancellations. What if we want to enable the user to cancel the sum computation in the middle? Doing so would require even more boilerplate code to listen for cancellation signals and to propagate them to the futures we launched one by one.

In summary, correctly structuring concurrency is complex and requires a lot of boilerplate code to make it bug free and 100% safe in all cases.

The good news is: structured concurrency is baked into the design of the Coroutines library, and we do not have to write any boilerplate code to use it and take advantage of it! So let’s check this out!

Structured concurrency with Kotlin Coroutines

Let’s rewrite our code using coroutines and see what that looks like:

1
suspend fun distributedSum() {
2
val sum = AtomicInteger(0)
3
4
// The coroutine scope acts like a parent that keeps track of
5
// all the child coroutines created inside it
6
coroutineScope {
7
// Create 3 coroutines that compute the total sum concurrently
8
repeat(3) {
9
launch {
10
val data: Int = fetchDataAsync()
11
sum.addAndGet(data)
12
}
13
}
14
}
15
16
println("The final sum is: $sum")
17
}

This short and extremely simple code is already free from all the problems mentioned in the previous section: It cannot leak exceptions or coroutines, it will never print an incomplete sum or a wrong one, and cancellation is already possible!

Feels like magic? Let’s unpack it step by step to understand how.

1
coroutineScope { // this: CoroutineScope (receiver)
2
...
3
}

We start by calling the coroutineScope function and giving it a kotlin lambda inside which we can create any number of background coroutines using launch. The reason why we are able to use launch inside this lambda is because its receiver1 is a CoroutineScope instance, of which launch is defined as an extension function.

1
coroutineScope { // this: CoroutineScope
2
repeat(3) {
3
// equivelent to: this.launch
4
launch {
5
...
6
}
7
}
8
}

Any background coroutine created within this block is attached as a child to this CoroutineScope instance which keeps track of its completion and failure. It is similar to what we tried to achieve by manually keeping track of the completable futures in a list. But instead of a list, the CoroutineScope uses a fancier hierarchical data structure called Job. We will dig deeper into this data structure later on in this post.

Here are the goodies that come with this setup:

  • We don’t need to keep track of the background coroutines ourselves, the CoroutineScope itself already does, and the coroutineScope call will suspend until all of them complete or fail. Thus, we know for sure that after this call, all our background coroutines are done; and we can then confidently use the computed sum.
  • If any of the background coroutines fails for a reason, the exception is caught by the coroutineScope and propagated to the outside. But first, it takes care of canceling the remaining coroutines that are still running2, thus making sure that no background coroutine is still pending after the exception is propagated.

Our background coroutines now have a clear and explicit lifetime defined by the coroutineScope block beyond which it is impossible to leak any coroutines or exceptions. That’s the core idea of structured concurrency: “every time our control splits into multiple concurrent paths, we make sure they join up again” 3. As a result, our program instantly becomes safer and easier to reason about.

The CoroutineScope and the Jobs hierarchy

Let’s take a closer look at the signature of the launch coroutine builder function:

1
public fun CoroutineScope.launch(...): Job

The launch function requires a CoroutineScope to be usable. But why is that?

The reason is to prevent background coroutines from being created in the air without having a parent CoroutineScope instance to keep track of them. This is the case for all coroutine builder functions, which is how Kotlin enforces structured concurrency.

The CoroutineScope instance keeps track of its children coroutines using a hierarchical data structure called Job. You can think of a job as a simple tree that:

  • Stores children Job instances to keep track of them (which themselves can have other nested children Job instances).
  • And has a status (active, completed, etc.).
Structured concurrency - Job Hierarchy
Structured concurrency - Job Hierarchy

Coroutine builder functions (like launch) also create their own CoroutineScope instance as the receiver of their lambda block, making it possible to create nested coroutines! That enables building complex hierarchies of concurrently running background coroutines without ever leaking a single one of them.

1
coroutineScope { // this: CoroutineScope
2
launch(CoroutineName("Coroutine A")) { // this: CoroutineScope (nested CoroutineScope with its own Job)
3
launch(CoroutineName("Coroutine A.1")) { ... }
4
launch(CoroutineName("Coroutine A.2")) { ... }
5
}
6
7
launch(CoroutineName("Coroutine B")) { ... }
8
}

The code above would create the following hierarchy:

Structured concurrency - Complex hierarchy
Structured concurrency - Complex hierarchy

The coroutineScope call will suspend and not complete until Coroutine A and Coroutine B are done. Coroutine A, on the other hand, will not complete until Coroutine A.1 and Coroutine A.2 are done. If Coroutine A.1 fails for some reason, then Coroutine A will fail as a whole, propagating the exception to the parent job, causing coroutineScope itself to fail. Any coroutine still running in this hierarchy will get canceled during the process.

Difference between CoroutineScope and CoroutineContext

One aspect that may confuse a lot of Kotliners is the difference between a coroutineContext and a CoroutineScope.

If you check the source code of CoroutineScope, you will find the following:

1
public interface CoroutineScope {
2
public val coroutineContext: CoroutineContext
3
}

Why do we need a CoroutineScope then? Why not just use CoroutineContext instead? The difference between both concepts as outlined by Roman Elizarov is in their intended purpose.

A CoroutineContext is simply a data structure holding 0 or several context elements. It is just like a HashMap and serves as a container that coroutines and their schedulers can access at any point in time to retrieve useful information from like the Dispatcher, the coroutine’s name, etc.

On the other hand, a CoroutineScope is like a “manager and a tracker for coroutines”. It is the entity that can create background coroutines using coroutine builder functions defined as extension methods to this class (like launch).

To be able to fullfill its job of creating such coroutines and keeping track of them, the CoroutineScope needs basically two things:

  • A Job instance to keep track of the children coroutines (to enforce structured concurrency).
  • Some optional coroutine context elements that we want to propagate to all children coroutines created inside this scope (ex. the Dispatcher).

It turns out that both the requirements above can be satisfied by having a single coroutineContext property:

  • The coroutineContext property can store the Job instance that will serve as a parent to the children coroutines created by this CoroutineScope. All the functions that instantiate a CoroutineScope will create such a Job and put it there.
  • It will also hold any additional context element that we want to propagate to the children coroutines.

Roman Elizarov explains this concept clearly in his article about the topic. I highly encourage you to read it as well as his other writings about Kotlin ;).

Conclusion

Structured concurrency makes your concurrency intensive programs safer and more understandable. It frees you from all the boilerplate code that you would otherwise need to write to synchronize your background tasks and make sure you are not leaking any of them.

The next topic of this series will be about Error handling and cancellation, maybe one of the trickiest aspects of structured concurrency. Stay tuned if you don’t want to miss out on that one ;) .


  1. The lambda receiver is the instance referred to by the this keyword inside this lambda’s block.
  2. This behavior can be overridden using Supervisor jobs. We will talk mpre about this in the second part of this series around Error Handling and cancellation.
  3. Quote taken from Notes on structured concurrency or go statement considered harmful
Share this page! 💖
Written by
Profile picture
Walid Lezzar

Software Engineer
Talks about Kotlin, AWS and software architecture.

Continue reading 👇

Structured concurrency explained - Part 2: Exceptions and Cancellations

March 23, 2022

kotlin
coroutines
concurrency
structured-concurrency
exceptions

Error handling is an important part to understand in the workflow of structured concurrency. In this post, we will dive deep into this and uncover some of the most surprising and misleading parts of Kotlin coroutines!

Read more >>

Structured concurrency explained - Part 2: Exceptions and Cancellations
Why are my coroutines slow?

December 27, 2021

kotlin
coroutines
concurrency
blockhound

In this post, we will discover the consequences of making blocking calls inside coroutines, what strategies we can use to deal with them, and how to detect illegitimate blocking calls inside coroutines in practice using Blockhound!

Read more >>

Why are my coroutines slow?
📚 More articles

Walid Lezzar © 2022