How to Prevent Race Conditions in Coroutines

Thanks to structured concurrency, there are a lot of concerns that we don’t have to handle manually when working with Kotlin coroutines. For example, parent coroutines automatically wait for their children to complete before completing themselves. And when a parent coroutine is cancelled, its children are automatically cancelled, too.

But when it comes to shared mutable state - mutable data that multiple coroutines could update at the same time - there’s nothing magical in the design of coroutines to ensure that the state is updated properly.

To demonstrate this, let’s create 100,000 random orders for a bakery.

private val random = Random(seed = 546)
private val menu = listOf("🎂", "🥧", "🧁")

val orders = List(100_000) { menu.random(random) }

We’ll also make an enum class for the baked goods, to indicate the price of each. And we’ll need a bake() function to create the baked good.

enum class BakedGood(val cost: Int) {
    CAKE(15), PIE(10), CUPCAKE(5)
}

fun bake(item: String): BakedGood = when (item) {
    "🎂"  -> BakedGood.CAKE
    "🥧"  -> BakedGood.PIE
    "🧁"  -> BakedGood.CUPCAKE
    else -> throw IllegalArgumentException()
}

Now all we need is a coroutine to actually do the baking! We’ll just create a single coroutine that loops over all the orders, and bakes each one, adding the cost to its total revenue for the day.

fun main() {
    var total = 0

    runBlocking(Dispatchers.Default) {
        val baker = launch {
            orders.forEach { item ->
                val good = bake(item)
                total += good.cost
            }
        }
    }

    println("Total income: $${String.format("%,d", total)}")
}

When there’s a single baker coroutine updating the total, everything works just fine. Thanks to our random seed of 546 above, the revenue for the day lands at exactly $1,000,000.

Total income: $1,000,000

But what happens when we split up the orders across two different coroutines? For example, let’s give the first baker the first half of the orders, and create a second baker coroutine to handle the second half.

val baker = launch {
    orders.take(50_000).forEach { item ->
        val good = bake(item)
        total += good.cost
    }
}

val baker2 = launch {
    orders.drop(50_000).forEach { item ->
        val good = bake(item)
        total += good.cost
    }
}

When we run this, we’ll likely get a number much lower than $1,000,000.

Why’s that?

Well, the expression total += good.cost is the same as total = total + good.cost. So to evaluate that expression, Kotlin first reads the value of total, and then adds the value of good.cost to the value it read.

But since these coroutines are running in parallel, it’s possible that the other coroutine updated the value between the time when total was read and when it was assigned. That update is lost, stomped out by the update that uses the previous value.

Race condition diagramstale value here50+5=55write6050+10=60605550read50read5050totalvalueCoroutine ACoroutine B50write55

This is called a race condition. Even though Kotlin doesn’t automatically prevent race conditions, it does offer several tools to prevent them.

Let’s check ’em out!

Atomics

Kodee with an atom

Atomics were introduced to Kotlin’s standard library in version 2.1. At the time I’m writing this, they’re still considered experimental, so you’ll need to opt into them with the @OptIn(ExperimentalAtomicApi::class) annotation.

Atomics work great for simple values. If all you’re doing is updating a counter like we’re doing in our bakery code, they can be a very natural fit. To use them, just call the corresponding constructor. Since the total variable has a type of Int, I’ll update it to use an AtomicInt.

var total = AtomicInt(0)

With this change, the total variable no longer has an Int type, but an AtomicInt type. It wraps the underlying Int value, so when we want to update that value, we can use functions like fetchAndAdd() or compareAndSet(). But total itself no longer needs to change, so we can replace var with val.

val total = AtomicInt(0)

To read the underlying value, we can just call load().

println("Total income: $${String.format("%,d", total.load())}")

One really cool thing about atomic numbers is that they include an implementation of plusAssign. So the expression total += good.cost still works! We just need to make sure we’ve got the right import. If you’re using IntelliJ IDEA or Android Studio, it should show up automatically.

import kotlin.concurrent.atomics.plusAssign

With these changes, updates to the underlying value of total will be atomic. In other words, the expression total += good.cost is treated as a single operation, so when we run this, we’ll see $1,000,000 each time.

StateFlow

Kodee flowing on water

There are other types that support atomic operations, too. StateFlow is one of them. In fact, the changes required to use a StateFlow are similar to those we had to make to use AtomicInt.

To start with, we can change total to a MutableStateFlow.

val total = MutableStateFlow(0)

As with atomics, there are a few functions on StateFlow (they’re technically on its parent SharedFlow type) that support atomic operations. Let’s wrap our critical section - the part that updates the total - with a call to the update() function.

val baker = launch {
    orders.take(50_000).forEach { item ->
        val good = bake(item)
        total.update { it + good.cost }
    }
}

val baker2 = launch {
    orders.drop(50_000).forEach { item ->
        val good = bake(item)
        total.update { it + good.cost }
    }
}

And since the StateFlow wraps the underlying value, we’ll need to extract it with the value property.

println("Total income: $${String.format("%,d", total.value)}")

Although this technically works, the whole point of a Flow is to have its values collected over time. In this code, we don’t have that need, so I wouldn’t recommend it here. But if you’re doing this kind of work inside a ViewModel in a Compose app, for example, StateFlow will be the natural choice.

Mutex: Mutual Exclusion

Kodee holding a lock

When I was in elementary school, only one student was allowed to leave the classroom at a time. The teacher had a hall pass that she would give me whenever I needed to go to the library, for example. While I was out with the hall pass, no other students from my class could leave. When I came back, I returned the hall pass, which the teacher could then give to the next student who needed something outside the classroom.

The idea behind the hall pass is mutual exclusion, or mutex for short. If I was in the hall, then it excluded any other student from being in the hall.

Kotlin includes a type called a Mutex, which grants a hall pass - or, a lock as we call it - to a coroutine. We can wrap updates to our total variable with a mutex in order to make sure only one coroutine at a time can touch it.

To use a mutex, we can just call its factory function like this:

var total = 0
val mutex = Mutex()

Then, inside our coroutines, we wrap the total update with a lock and unlock.

mutex.lock()
total += good.cost
mutex.unlock()

Once a coroutine has locked the mutex, any other coroutines that try to lock it will suspend, waiting for the mutex to become unlocked again. This prevents simultaneous updates.

The expression total += good.cost is low risk, but it’s a good idea to make sure the mutex is unlocked even if an exception is thrown. The result would look like this.

mutex.lock()
try {
    total += good.cost
} finally {
    mutex.unlock()
}

That’s a lot to write! Thankfully, Mutex includes a function named withLock() that handles all this for us.

mutex.withLock { total += good.cost }

Much better!

Mutexes are powerful, but you need to use them carefully, especially when calling the lock() and unlock() functions directly. If we forget to release a lock, the app could get stuck.

Confinement: limitedParallelism

Kodee in a box

Any given thread can only do one thing at a time. So, as long as all updates to a variable happen on a single thread, we won’t get race conditions. Likewise, a coroutine can only do one thing at a time, so we can similarly prevent race conditions by running all updates through a single coroutine.

That’s the idea behind confinement - we restrict a variable’s access to something that can only run one instruction at a time.

To use confinement in Kotlin, one approach is to create a dispatcher that has a single thread.

val synchronized = newSingleThreadContext("synchronized")

Then we can wrap our critical section with that coroutine context.

withContext(synchronized) { total += good.cost }

The problem here is that newSingleThreadContext() requires you to manually close it to release the native resources. That’s why it’s marked as a delecate API.

Instead, most Kotlin developers use limitedParallelism(1). This ensures the dispatcher only allows a single coroutine to run on it at any given moment.

val synchronized = Dispatchers.Default.limitedParallelism(1)

The use site remains the same - just wrap the critical section with a call to withContext(synchronized).

Although limitedParallelism() doesn’t guarantee confinement to a particular thread, it does guarantee that only the specified number of coroutines (in the code above, we passed 1) will run on that dispatcher at the same time, so it has the same effect.

I’ll admit this approach is growing on me. For a long time, I preferred a mutex, because its purpose is so clear - there are many reasons we might change the context using withContext(), but when you see a mutex.withLock(), it’s obvious why it’s there.

Still, the limitedParallelism() approach is effective, and there’s no risk of forgetting to release a lock. For many Kotlin developers, this is their go-to solution for managing state across coroutines.

Confinement: Actors

Kodee on stage

As I mentioned earlier, as long as only a single coroutine can update the state, there’s no risk of race conditions, because a single coroutine can only do one thing at a time.

A common way to confine state to a single coroutine is with an actor.

The current implementation of the Actor API is marked as obsolete, but it’s been stable. I haven’t seen any progress on newer implementations yet. So, as long as you’re willing to update your actor code when a new implementation is released, they’re worth a look.

With an actor, all state changes happen as the result of messages that are sent to it. For example, here are a few messages - one to increase the total, and one to print it out.

sealed interface TotalMessage {
    data class Increase(val amount: Int) : TotalMessage
    data object Print : TotalMessage
}

With the messages in place, we can create an actor like this.

val total = actor<TotalMessage> {
    var amount = 0

    for (message in channel) {
        when (message) {
            is TotalMessage.Increase -> amount += message.amount
            is TotalMessage.Print    -> println("Total income: $${String.format("%,d", amount)}")
        }
    }
}

This creates a coroutine that runs the actor’s block of code, which loops over messages. The coroutine suspends, waiting for a new message to arrive. When it does, it resumes, and performs the work required for that message.

As you might recall, whenever we create a coroutine in Kotlin, we have to create it on a CoroutineScope, and the actor() function is no exception to that rule. It’s actually an extension function on CoroutineScope, so the call to actor() above only works when it’s nested inside a block of code where the receiver object is a CoroutineScope, such as inside a runBlocking() lambda. (See the full code listing below).

To get this actor to update its amount, we need to send it the Increase message. Let’s update our baker coroutines to do that.

val baker = launch {
    orders.take(50_000).forEach { item ->
        val good = bake(item)
        total.send(TotalMessage.Increase(good.cost))
    }
}

And when we’re ready to print the total, we can send the Print message.

total.send(TotalMessage.Print)

To put all of this in context, here’s the full main() function that makes this all work.

@OptIn(ObsoleteCoroutinesApi::class)
fun main() = runBlocking(Dispatchers.Default) {
    val total = actor<TotalMessage> {
        var amount = 0

        for (message in channel) {
            when (message) {
                is TotalMessage.Increase -> amount += message.amount
                is TotalMessage.Print    -> println("Total income: $${String.format("%,d", amount)}")
            }
        }
    }

    coroutineScope {
        val baker = launch {
            orders.take(50_000).forEach { item ->
                val good = bake(item)
                total.send(TotalMessage.Increase(good.cost))
            }
        }

        val baker2 = launch {
            orders.drop(50_000).forEach { item ->
                val good = bake(item)
                total.send(TotalMessage.Increase(good.cost))
            }
        }
    }

    total.send(TotalMessage.Print)
    total.close()
}

You might have also noticed that I’m calling close() on the actor at the end. Because of structured concurrency, parent coroutines wait for their children to complete before they complete themselves. That means the coroutine created by runBlocking will not complete until the actor’s coroutine completes. And the actor’s coroutine won’t complete until we close the channel that it’s using. So without that line, the program will hang.

On the whole, I’m not a big fan of actors. Compare this code to the atomics approach we used earlier - there’s just a lot more code involved here.

In cases where I’m already using channels and message-passing - for example, if I’m building out a workflow using coroutines - then an actor could fit in nicely. But for most general-purpose use cases, I tend to choose one of the simpler options.

Avoiding Shared Mutable State

In the example code we’ve been using, we’ve been calculating the revenue for a bakery. That’s all our project does. It’s not a long-running app with user data that changes over time. It just calculates a number.

In cases like these, it’s often possible to avoid shared mutable state entirely! For example, here’s a version of the code that has no mutable variables at all.

fun main() {
    val total = runBlocking(Dispatchers.Default) {
        val subtotal1 = async {
            orders.take(50_000).sumOf { item -> bake(item).cost }
        }

        val subtotal2 = async  {
            orders.drop(50_000).sumOf { item -> bake(item).cost }
        }

        subtotal1.await() + subtotal2.await()
    }

    println("Total income: $${String.format("%,d", total)}")
}

Rather than each coroutine adding to a shared total, it’s only responsible for adding up the total of its own work. Then, once both coroutines have completed their work, we add up the totals from each coroutine to get the grand total.

Since subtotal1.await() + subtotal2.await() is the last expression of runBlocking’s lambda, that’s what runBlocking returns, so we simply assign it directly to the total variable.

Which Should You Choose?

That’s a round-up of common approaches to sharing state across coroutines in Kotlin. With so many options to choose from, how can you know which is the best for your use case?

  • If you can avoid sharing mutable state entirely, that’s usually the best way to go. It completely eliminates the risk of race conditions.
  • When that’s not possible, I tend to go with the simplest option that works. For basic types like numbers, atomics can be a great fit. If my app needs to react to state changes as they occur, StateFlow will be the obvious choice.
  • For more complex updates, I’m fine with either mutexes or limitedParallelism(1).
  • And as I mentioned, I’m not a big fan of actors, but some developers absolutely love them.

So even though coroutines don’t automatically prevent race conditions, Kotlin gives us several great options to deal with them.