How Python Keeps Your Queues Thread-Safe
If you’ve ever done much work with multithreading in Python, you probably know that Python’s built in queues are thread safe.
But have you ever wondered how this all works behind the scenes?
In this article, we’ll take a quick peek beneath the hood to reveal how exactly Python keeps its queues thread safe (hint: it’s not just the GIL!)
What is “Thread Safety” anyways?
Before we dive into the guts of the Python standard library, let’s do a quick recap on what thread safety is.
If you’re already a multithreading expert and just want to dive into the CPython source, feel free to skip this section! (Try opening the link in a new tab if clicking doesn’t work).
A data structure (like a queue!) is considered “thread safe” if it can be accessed and manipulated by multiple threads while maintaining correct behavior. Of course, the natural followup question is— what exactly is it about working with multiple threads that leads to incorrect behavior?
Well, unlike in standard single threaded workflows, threads can be swapped out and interrupted by the OS at any time, regardless of whether this swap is convenient for you or not. Moreover (outside of the Python GIL at least), multiple threads can potentially run concurrently on multiple CPUs.
As a result, a lot of the complexity with handling multiple threads comes from handling race conditions, i.e. behavior that is non-deterministic depending on the order of a program’s execution.
But what does this actually mean? Well personally, I find the easiest way to understand thread safety is by counter example.
Consider the following toy problem. Suppose we’re processing a large sequence of text data that we need to fetch from an external API and place on a (very non-thread safe) queue for further processing. With just one thread, our setup might look like this:
To speed things up, we might decide to use multiple threads to fetch data from the external API:
Now we can fetch words from the API nearly twice as fast! Unfortunately, there’s no such thing as a free lunch.
Recall that the OS is liable to swap out our threads at any time. Let’s say that the OS decided to swap from Worker 1 to Worker 2 before Worker 1 was finished writing “world” to the queue. Then we might see something like this:
Oh no! Instead of greeting the world, we’re now left with a meaningless alphabet soup!
Ending up with a random string isn’t even the worst issue we could face. Here, Worker 1 at least managed to get the letter “w” out before it was rudely interrupted by the OS. In general however, we can’t count on this. If the worker had only written some of the bytes needed to represent a specific character to the queue, we could end up with data that is too corrupt to be properly parsed as text!
The Solution: Use a Mutex!
So how do we get around this issue?
Well, the standard answer is to use a mutex, or lock. A mutex (short for “mutual exclusion”) is essentially a flag that one (and only one) thread can hold at a given point in time. Let’s add a mutex to the prior example and see how it helps us avoid the data corruption we saw above.
We’ll integrate the mutex by enforcing the following constraints in our code:
- Before any Worker writes data to the queue, it must “acquire” the mutex.
- If the Worker fails to acquire the mutex (i.e. because another worker acquired it first), it needs to wait for that worker to release the mutex and try to acquire it again later.
- When the worker with the mutex has finished writing an element to the queue, it will release the mutex to allow other threads to write to the queue.
In pseudocode, it might look something like this:
mutex = Mutex()
if data ready to write:
- Try to acquire mutex
while mutex not acquired:
- wait for mutex to be released…
if acquired:
break
- Write data to queue
- Release mutex
- Fetch next item and repeat
Now, if Worker 2 tries to write “dolor” to the queue before Worker 1 has finished writing “world,” Worker 1 will still hold the mutex. So, when Worker 2 attempts to acquire the mutex, it will fail and wait before proceeding. This prevents Worker 2 from writing to the queue and corrupting Worker 1’s in-progress message. Only once Worker 1 has finished writing its message and released the mutex can Worker 2 finally acquire the mutex and write its data to the queue.
Thanks to this mutex setup, our two workers can now safely write to the queue without corrupting each other’s data!ᵃ
The Notorious G.I.L. — Python’s Mega Lock
Of course, thread safety isn’t only an issue when working with queues. Once you move outside of a single threaded context, all sorts of thorny issues can arise.
Python’s standard implementation (CPython) is no exception. Plenty of the code that underlies Python (in particular, critical memory management and garbage collection routines) is not naturally thread safe. So how does Python get around this issue?
With a mutex of course!
To keep the code underlying your Python programs thread safe, CPython uses a mutex known as the Global Interpreter Lock, or GIL for short. While we won’t dive into the details of the GIL here, essentially, the GIL ensures that only one Python bytecode instruction can run at any point in time. This prevents multiple Python instructions running in different threads from creating nasty race conditions.ᵇ
Back to Queues — Why isn’t the GIL enough?
You might be wondering whether we can just rely on the GIL to provide thread safety for Python queues. The answer is kind of, but its not that simple.
The GIL does make certain multithreading issues impossible. Take the example from above, in which one Worker was able to interrupt another Worker that was in the middle of writing data to the queue. The GIL would actually prevent this issue, since appending data to a queue requires only one Python bytecode instruction.
However, there are other race conditions we need to consider when dealing with a queue. Consider the following example:
Suppose we have one “producer” thread that writes data to a queue and two “consumer” threads that consume data from that queue for processing, like so:
To be safe, we probably want each consumer to check whether the queue is empty before attempting to fetch data from it, otherwise we could get a nasty exception.
So, our pseudocode for one of the consumers might look like this:
while there is still data to process:
check if queue is empty
if not empty:
data = queue.get()
process_data()
Seems fine no? Well most of the time it will be, but consider the following unfortunate sequence of events:
- Queue is empty (
queue = []
) - Producer writes “Hello” (
queue = ["Hello"]
) - Consumer 1 checks if the queue is empty -> queue is not empty (
queue = ["Hello"]
) - OS interrupts Consumer 1 and switches to Consumer 2 (
queue = ["Hello"]
) - Consumer 2 checks if the queue is empty -> queue is not empty (
queue = ["Hello"]
)
6. Since the queue was not empty in step 5, Consumer 2 proceeds to get an element from the queue (queue = []
)
7. OS interrupts Consumer 2 and switches to Consumer 1 (queue = []
)
8. Since the queue was not empty in step 3, Consumer 1 attempts to get an element from the queue (queue = []
)
9. Error! Tried to get element from an empty queue!
The core issue here is that we need multiple Python bytecode instructions to 1) check whether the Queue is empty and 2) fetch an element from the Queue. So even in the context of the GIL, the OS can interrupt our threads’ execution between the empty check and fetching the element, giving us a frustrating error!
Of course, you could just catch the error and wait, but is there a better way to solve this problem? To answer that, we’ll need to dive into the internals of the standard library Queue implementation.
How the Queue module solves its multithreading issues
So how does the Queue module actually work under the hood?
Internally, Python’s standard Queue implementation is built on top of the deque data structure from the collections module. When you instantiate a Queue
object in Python, Python actually just creates a new deque for you:
# cpython/Lib/queue.py
# (https://github.com/python/cpython/blob/3.11/Lib/queue.py)
from collections import deque
...
class Queue:
'''Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
'''
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
...
...
# Initialize the queue representation
def _init(self, maxsize):
self.queue = deque()
The deque is a container that can be used to implement both stacks and queues, since it allows pops and inserts from both ends. When you get
or put
an item on the Queue, you’re actually just using the append
and popleft
methods of the underlying deque:
# cpython/Lib/queue.py
# (https://github.com/python/cpython/blob/3.11/Lib/queue.py)
class Queue:
...
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
return self.queue.popleft()
...
Critically, the append
and popleft
methods of the underlying deque container are thread safe, so we don’t run into any issues where two threads are trying to write data to the queue at the same time and corrupt each other. Per the standard library docs:
Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction.¹
Internally, the deque relies on the GIL to provide this thread safety guarantee.
Of course, this still doesn’t resolve the thread safety issue we noted above. If you try to call popleft
on an empty deque, you’ll get an IndexError
:
>>> from collections import deque
>>> d = deque()
>>> d.popleft()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
IndexError: pop from an empty deque
How does the Queue solve this problem?
Well, the astute reader may have noticed that the _get
and _put
methods from above are not actually part of the public interface of the Queue class.ᶜ The public methods Queue.get()
and Queue.put()
are wrappers around the _get
and _put
methods that implement additional thread safety logic.
Let’s take a look at the source code for get()
:
# cpython/Lib/queue.py
# (https://github.com/python/cpython/blob/3.11/Lib/queue.py)
class Queue:
'''Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
'''
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
...
self.mutex = threading.Lock() # Init mutex
...
self.not_empty = threading.Condition(self.mutex)
...
def get(self, block=True, timeout=None):
...
with self.not_empty: # Step 1: Try to acquire mutex here
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize(): # Step 2
# Steps 3-5: Mutex released here to wait for an element.
# When an element is ready, mutex is reacquired and
# execution resumes.
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify() # Step 6
return item
# Mutex released after exiting the with block
....
Sure enough, the queue module uses an additional mutex to protect the Queue!
The threading.Condition
syntax might look a bit confusing, but it’s really just a special way to interact with the underlying mutex. We can trace the path of execution as follows:
- When you run
with self.not_empty:
, the current thread attempts to acquireself.mutex
(or waits if it’s held by another thread) - The thread holding
self.mutex
proceeds to check if the queue is not empty. Note that no other threads canget
orput
to the queue at this time since they would need to hold the mutex to do so. - If the queue is empty, the current thread stops at
self.not_empty.wait()
and releases the lock, potentially allowing other threads to put new elements on the queue. - When a new element is added to the queue,
self.not_empty.notify()
is called (see below), allowing threads that were waiting on this condition to proceed. - One of the waiting threads wakes up and reacquires the same mutex. Since we now know that the queue is not empty, we can proceed to pop an item from the queue. Note that no other threads can pop an item from the queue, as they would need to hold the mutex to proceed.
- After we’ve gotten the item, the consumer thread notifies waiting threads that the queue is no longer full, returns the item, and releases the mutex.
What about put?
Of course, get
isn’t the only queue method that runs into thread safety issues. The standard Queue
class supports an optional maxsize
argument that allows you to limit the number of elements in the queue at any point in time.
In exactly the same way that two threads might call get
on a potentially empty queue, we can get into trouble if two threads try to put
to a potentially full queue.
Luckily, we can rely on an almost identical pattern to prevent this behavior, with the not_empty
Condition replaced by not_full
:
# cpython/Lib/queue.py
# (https://github.com/python/cpython/blob/3.11/Lib/queue.py)
class Queue:
...
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
...
self.mutex = threading.Lock()
...
# Based on same mutex as self.not_empty
self.not_full = threading.Condition(self.mutex)
def put(self, item, block=True, timeout=None):
...
with self.not_full: # Attempt to acquire mutex here
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
# Wait here if the queue is full
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
# Notify waiting consumer threads here after
# the item has been put
self.not_empty.notify()
Critically, the get
and put
methods share the same mutex (self.mutex
). So, if one thread is attempting to put
an element on the Queue
, no other thread can attempt to get
from the queue (and vice versa). Only when the mutex is released (after either the not empty/not full check or when the method returns) can other threads modify the queue. In addition to preventing get
from interfering with put
, this shared mutex allows for higher level coordination between threads using the join
API.ᵈ
So, the Queue module protects you from race conditions with a variant of the strategy introduced above — using a mutex to control when threads can or cannot take specific actions. By sharing a single mutex across the get
and put
operations, the Queue module coordinates action between threads working on different parts of the queue, which elegantly solves both the “get while empty” and the “put while full” issues.
Conclusion
Of course, this isn’t the only way the Queue module could have been written. In fact, this Queue
is slower than its underlying deque container since it locks down the entire queue whenever a get
or put
is in progress. Is there a way the queue module could have been written to get similar performance to the underlying deque?
Well, one perfectly valid approach would be to leave thread safety to the programmer. In this alternate implementation, the programmer (you) would decide how much locking you need based on the specifics of the problem. If you’re working on a task that doesn’t require the strict locking protocol implemented above, you’d likely see significantly better performance!
So why not leave it all to the programmer?
Well, I’d argue that requiring the programmer to implement multithreading safeguards for a simple Queue
would go against the philosophy of Python. Python has always sought to be a “batteries included” language, with a rich standard library that allows you to focus on the complexity of your specific task by taking away the burden of implementing common boilerplate.
Sometimes, this comes at the cost of reduced performance. In return however, you get increased readability (the code you write doesn’t need to worry about locking semantics), faster development (you don’t have to rewrite a locking protocol each time you use a queue), and less exposure to nasty bugs (the Python standard library has been extensively tested by millions of users, and will almost certainly be more reliable than a quick, homegrown locking solution).
Moreover, neglecting to offer robust multithreading support in the “reference implementation” of the Queue would increase the barrier to entry for programmers new to multithreading. The current approach offers programmers new to multithreading a simple “plug and play” interface to one of the most commonly used data structures for working with multiple threads.
Of course, if performance is critical for your use case and you don’t need all the locking from the standard Queue
class, you’re not out of options — you can always use the underlying deque container from the collections module! You’ll still get thread safe append
and popleft
, but your code will probably run faster.
The point here however is that there’s a tradeoff between user friendliness and performance. Python chooses to prioritize a more user friendly version of queue (this one gets the name “Queue”), while giving you the option to go lower level and optimize performance if need be.
Ever faced a tricky multithreading bug that you fixed with a mutex? If so, I’d love to hear about it — feel free to shoot me an email at mail@jonbleiberg.com or reply in the comments!
Notes
[a] Of course, the downside of this standard solution is that it’s slower. In particular, now both workers need to spend time acquiring the lock before taking any action. Moreover, while one worker writes to the queue, other workers may be sitting idle waiting for the lock to be released. Things can get even hairier if you need to rely on multiple locks. In that case, you risk running into a “deadlock,” in which Worker 1 needs a lock held by Worker 2 to proceed, but Worker 2 can’t release that lock until it acquires a lock held by Worker 1. For these reasons, “lock free” data structures are an active area of research.
[b] Of course, this added safety does have a downside. Since only one thread can execute a Python instruction at a time, multiple Python threads cannot actually run in parallel. This makes Python threads of limited use for CPU bound tasks like heavy duty mathematical number crunching. If you want truly parallel Python execution, you’ll need to start up multiple processes, which involves significantly more overhead than starting up multiple threads. It’s worth noting that other languages like Java and C++ do not have this restriction, allowing them to potentially schedule multiple threads on different CPUs and run them in parallel. There has been some talk about what it would take to replace the GIL (See PEP 703), but for now, Python multithreading is not a good choice for tackling CPU bound problems.
[c] A single underscore is the convention to denote a private method, even though no methods in Python are truly private.
[d] The join method allows threads to coordinate work by blocking until all tasks on the Queue are complete. For the sake of brevity, we won’t go into the join
method here, but the join
method uses the same mutex as get
and put
to ensure that no additional tasks are put
on the queue while join
unblocks. For more details, see the relevant standard library docs or the CPython source.
References
[1] https://docs.python.org/3.11/library/collections.html#collections.deque