The introduction of the async and await keywords in Python 3.5 is a
continuation of years of progress and evolution in Python’s async networking
support. For a lot of folks, asyncio[1] represents something brand
new to Python. Really, though, async networking has been available in the
standard library since the version 1.5.2 release in 1999. The form it took then
was primitive and unwieldy in comparison to what’s now available but it formed
the foundation on which asyncio is built today.
My goal is to take you through that journey from 1999 to today. In the process I’ll cover the progression of async networking support in Python and how event loops like asyncio power the new Python syntax. This will be a deep dive but should provide you with a first principles understanding of how async, await, and asyncio work today.
Non-Blocking I/O
The foundation for async networking in most languages, Python included, comes from the operating system. For Unix-like environments, such as Linux, I/O resources are modeled as files including network sockets. By default, all I/O files start in blocking mode which means that a call to read or write will block the caller until the operation is complete. As an example, I’ll use a modified snippet from the Python standard library documentation for the socket module.
import socket
# Create an internet socket in TCP mode
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Bind the socket to a local address and port
s.bind(('127.0.0.1', 8888))
# Start accepting new connection, one at a time
s.listen(1)
# Get a client connection.
# This is the first part of the process that noticeably blocks. If there are
# no clients attempting to connect then the process hangs at this line of code.
# Once a client connects then the function returns and the process continues.
conn, addr = s.accept()
while 1:
# Read up to 1024 bytes from the connection.
# This is the next part of the code that may block if there is no data
# available to read.
data = conn.recv(1024)
if not data: break
# Send the input back as an echo.
# Sending, like receiving, may block depending on the state of the buffers.
conn.sendall(data)
conn.close()
The above example creates a listening socket that accepts client connections, reads all client input, and then echoes back the input. Each interaction with the socket is potentially blocking in the default mode. "Blocking" in this context refers to waiting for a resource to be available in order to use it. For example, the process cannot accept a new connection if a client is not trying to connect so it waits, or blocks, until there is one. Likewise, the process cannot receive bytes from a client if it hasn’t sent any so it blocks until bytes are available.
The default, blocking nature of sockets can make them difficult to use in concurrent situations. The above code, for example, can only manage one client. The classic approach to adding concurrency for blocking sockets is to use additional threads or child processes. An example of this can be found in the standard library SocketServer[2] module.
Another approach to concurrency is leveraging a socket’s "non-blocking" mode.
import socket
# Create an internet socket in TCP mode
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Disable blocking mode
s.setblocking(0)
# Bind the socket to a local address and port
s.bind(('127.0.0.1', 8888))
# Start accepting new connection, one at a time
s.listen(1)
# Get a client connection.
# If no client is currently attempting to connect then this call immediately
# raises an exception
conn, addr = s.accept()
# All files start in blocking mode so even connection received from a socket
# must be set to non-blocking explicitly.
conn.setblocking(0)
while 1:
# Read up to 1024 bytes from the connection.
# If the client has not written any data to the connection then this call
# immediately raises an exception.
data = conn.recv(1024)
if not data: break
# Send the input back as an echo.
# In addition to immediately raising an exception, this call could send
# only part of the data and then raise an exception.
conn.sendall(data)
conn.close()
Putting a socket in non-blocking mode prevents read and write calls from blocking. The way it does this, though, is by raising an exception if any socket call would result in blocking. The result is an explicit signal that a socket is not ready which allows a program to stash the operation and do something else until that socket is ready. This is how a program can manage concurrent I/O on a single thread.
However, the only signal that a program gets when using non-blocking sockets is the negative condition: the socket is not ready. There’s no socket function to request its status so implementing a program that can effectively and optimally manage concurrent I/O requires continually attempting to complete any outstanding I/O operation until it succeeds. Practically, writing a program that handles this directly is more complicated than it’s worth.
To account for this difficulty, there exists a suite of system calls that provide notifications when a file is ready for reading or writing. These tools have evolved over time and include select[3], poll[4], kqueue[5], and epoll[6]. Each of these has their own history and performance characteristics but they all present a similar enough API to generalize their description. These tools work by monitoring specific files and offering a polling based API for getting a list of monitored files that are ready for I/O. To illustrate, I’ll show a modified example from the asyncore[7] module that was originally introduced into the standard library in 1999.
import select
# Lists of socket objects that the program wants to either read from or write to.
want_read = []
want_write = []
def poll(timeout=0.0):
# This will block up to timeout seconds for any of the requested sockets
# to become ready. If none are ready by the timeout then it returns with
# empty results.
(r,w,_) = select.select(want_read,want_write,[],timeout)
for x in r:
# x is a read-ready socket
for x in w:
# x is a write-ready socket
The polling APIs bridge the gap between blocking and non-blocking. The whole application can block for a configurable amount of time until one or more sockets are ready. It can then operate on any sockets as soon as they are available. In between socket operations the application can perform other logic or go idle again while waiting on more sockets to be ready.
Repeated calls to the poll API in a loop are what async networking frameworks refer to as an event loop, engine, or reactor. The asyncio event loop is more complex than the above example but the technology that drives it and the principles of its operation are exactly the same.
The challenge from here is building a program that coordinates between read/write needs and the polling API. Code written for polling is no longer sequential. Generally, low level async networking is organized as a series of callbacks that execute when I/O readiness is detected. For example, here is an echo server written with asyncore:
import asyncore
import socket
class echosrv(asyncore.dispatcher):
def __init__(self):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(('127.0.0.1', 8080))
self.listen(1)
self.connections = []
def handle_accept(self):
conn, addr = self.accept()
self.handle_accepted(conn, addr)
def handle_accepted(self, conn, addr):
self.connections.append(echo(conn))
class echo(asyncore.dispatcher):
def __init__(self, sock):
asyncore.dispatcher.__init__(self, sock)
self.buffer = ''
def handle_connect(self):
pass
def handle_close(self):
self.close()
def handle_read(self):
self.buffer = self.buffer + self.recv(8192)
def writable(self):
return (len(self.buffer) > 0)
def handle_write(self):
sent = self.send(self.buffer)
self.buffer = self.buffer[sent:]
c = echosrv()
asyncore.loop()
In this example, asyncore is doing a lot of heavy lifting through its custom
implementation of the socket methods such as recv and send. Internally,
asyncore is catching and handling exceptions related to operations on an unready
socket. It is also managing an event loop and optionally registering the socket
with a polling API based on attributes like the writable property. For
example, on each pass of the event loop asyncore is checking writable. If it
is true, which happens when the echo server has something in its outgoing
buffer, then asyncore registers the socket with the polling API to alert when
the underlying file is ready for writes. When the polling API returns a signal
for write-readiness then asyncore calls handle_write as a callback.
This callback driven process from 1999 was, for several years, the only way to write async networking code in Python. The first, and longest running, attempt to build on this practice was the Twisted[8] project in 2001. Twisted formalized the callback pattern using "deferreds"[9], a form of promise or future[10], in addition to implementing support for a large number of web protocols like HTTP.
To this day, callbacks remain an integral behavior in Python’s async networking support but you don’t interact with them in this way. A major problem with callback style implementations is that they break the standard programming model of the language. A program is no longer written and executed top to bottom. Managing complex application state through a series of callbacks is challenging and requires different coding practices to maximize success. The key to what makes async networking what it is today is the additional syntax that Python has added over the years. New keywords and expressions provide support for easier state management and a more familiar programming style.
The Progression Of Pause
Consider this example async/await code:
async def get_something():
something = await get_something_from_network()
return something
This is effectively pseudo-code but the expected behavior is likely clear. The function needs to get something from the network and uses the await keyword to get it. Once it has a result, the function returns whatever it got.
The expectation in an async world is that the awaited call to
get_something_from_network does not result in blocking if its result is not
immediately available. To put this another way, if the result of
get_something_from_network would block then Python should pause this function
and work on something else until the result is ready. When the result is ready
then Python should resume the function until it returns or would block again.
Being able to pause a Python function when it calls await and only resume it
when the value it’s waiting for is available is the core feature that enables
synchronous looking but actually async networking code. The pause and resume
features of Python have evolved over multiple iterations and years. The
following sections detail the progression of pause/resume and how it became
async/await.
Iterators And Generators
The first concept of pausing came into Python with the introduction of custom
iterators. Iterators were formalized by PEP-234[11] which established
the iterator protocol. This allowed for new types to become compatible with
for and in statements by implementing the __iter__ and next methods.
For example, an iterator that produces a fixed number of fibonacci sequence
values could now be built like this:
class fibit:
def __init__(self, limit):
self.count = 0
self.limit = limit
self.a = 0
self.b = 1
def __iter__(self):
return self
def next(self):
if self.count >= self.limit:
raise StopIteration
self.count = self.count + 1
value = self.b
self.a, self.b = self.b, self.a + self.b
return value
for x in fibit(5):
print(x) # 1, 1, 2, 3, 5
Of course, fixed size fibonacci could be written before custom iterators. In fact, an arguably simpler implementation would be a as a function that produces a list:
def fib(limit):
results = []
x = 0
a = 0
b = 1
while x < limit:
results.append(b)
a,b = b,a+b
x = x+1
return results
for x in fib(5):
print(x) # 1, 1, 2, 3, 5
The downside to this approach, though, is that the logic to compute the set to
be iterated over is eager and blocking. Large values of limit will result in
the program noticeably hanging until the entire list has been computed. The
iterator version of fibonacci operates on a fixed amount of memory and has no
startup cost other than establishing the initial state. From there, it only has
to compute a single value on each call to next.
Iterators bring in support for infinite and/or lazy loaded sequences. From an async networking perspective, iterators introduce the critical concepts of "pause" and "resume" for functions. That is, the iterator version of fibonacci can be considered a resumable form of the function version. They both produce the same outputs but the iterator can be consumed when and as needed. The major downside of the iterator form is that it requires a developer to break down the function logic into discrete steps. If any step relies on state from a previous step, such as the fibonacci example, then that state must be maintained within the instance. The state between steps of a real-world problem is often complex and the more complex the state then the more difficult the management in the iterator form. In a large way, iterator state management mirrors the complexity of callback driven, low level async programming.
To compliment iterators and make the state management easier,
PEP-255[12] formalized the concept of a generator. A generator is a
special kind of function definition that is reorganized into an iterator when it
is called. Conceptually, generators are a function with one or more explicit
pause and resume points. To indicate the pause/resume points, the yield
keyword was introduced. To illustrate, here is a generator form of a limited
fibonacci:
def fib(limit):
x = 0
a = 0
b = 1
while x < limit:
yield b
a,b = b,a+b
x = x+1
for x in fib(5):
print(x) # 1, 1, 2, 3, 5
This version is nearly identical to the classic function from earlier except that it yields values rather than returning. All of the state management is the same as the classic function version but the generator form also receives all the same benefits as the manually written iterator. In effect, the above generator is identical in function and performance to the manually written iterator.
From an async networking perspective, being able to pause a function is fairly critical. That pause is where the async portion of the work is intended to happen. The function yields to wait for some resource and is resumed when that resource is ready. However, generators provide a one-way communication with the caller. That is, generators can emit values but cannot accept values from a caller other than when they are initialized. This makes generators one step closer but still sub-optimal for async networking use cases.
The Twisted project found a clever workaround[13] for this by yielding mutable objects and using a few layers of decorators to manage resuming the generator when the yielded object contained a value. It required a bit of boilerplate and adherence to a set of strict rules on how to interact with the async object but a synchronous looking async networking experience was possible even with simple generators.
Generator Coroutines
PEP-342[14] addressed the lack of built-in bidirectional communication
between generators and their callers. The PEP identified generators as being
close to, but not quite, coroutines[15]. A coroutine in the context of
the PEP is a function that can be paused and for which a value may be injected
on resumption. After the modifications, generators remained iterators but gained
some new methods. Notably, generators now have send, throw, and close
methods that inject different values or conditions into the pause point.
As a trivial illustration of the bidirectional communication feature, here is a coroutine that adds five to any value it receives:
def add_five():
v = 0
while True:
v = yield v+5
coro = add_five()
# Coroutines are often "primed" before they are used.
# This allows them to execute their code up to the first
# yield point where the outside caller can then inject a value.
coro.send(None)
print(coro.send(1)) # 6
print(coro.send(5)) # 10
With coroutines, the yield statement became an expression that returns a
value. The value it returns is whatever the caller provides in the send
method. Additionally, a caller can use throw with an exception and that
exception will be raised at the yield point.
One of the explicit and primary purposes for introducing the coroutine feature was to account for async networking. The PEP offers this as an example of what the authors envisioned:
data = (yield nonblocking_read(my_socket, nbytes))
In this hypothetical example, the result of nonblocking_read is returned to
the coroutine caller when it invokes send. The coroutine then expects that the
caller will inject the final value of the read back into the coroutine with
another call to send. This simplifies the coroutine’s interaction with the
result of the yield and standardizes how all Python code can push information
into coroutine.
Resolving the return value of nonblocking_read to a concrete value is handled
by some kind of coordinator that interacts with callback based networking. I’ve
intentionally not written, so far, about how that system actually resolves that
final value to inject into the paused function. I do cover that later, though.
Sub-Generator Delegation
There’s one more syntax change to consider in the evolution leading to async and await called sub-generator delegation. Let’s focus, first, on the problem.
The introduction of coroutines offered this line of code as an example of what the authors intended to be possible:
data = (yield nonblocking_read(my_socket, nbytes))
If nonblocking_read returns some special object that a higher level
coordinator can handle then the coordinator can inject the resulting value
back into the coroutine using the send method. However, it’s quite common for
code to have multiple layers of functions on top of low level networking code.
A more complex example might look like this:
class foo:
def __init__(self, bar):
self.bar = bar
def get_foo():
response = yield get_foo_http()
def get_foo_http():
url = 'https://example.com/foo'
body = yield http_client_get(url)
js = json.loads(body)
yield foo(js['bar'])
def http_client_get(url):
# create a socket
buffer = ''
eof = False
while not eof:
data, eof = yield nonblocking_read(my_socket, 1024)
buffer = buffer + data
yield buffer
The issue with this kind of code is that yield is not automatically recursive.
When get_foo yields from get_foo_http it doesn’t automatically execute any
logic within get_foo_http.
coro = get_foo()
print(coro) # <generator object get_foo at 0x7f40674b2820>
next_value = coro.send(None)
print(next_value) # <generator object get_foo_http at 0x7f1249bb1870>
another_value = next_value.send(None)
print(another_value) # <generator object http_client_get at 0x7f60afed9870>
This behavior creates complications for systems that intend to manage or coordinate coroutines such as async networking event loops. Practically, an event loop that coordinates coroutine execution must maintain a directed graph that connects every coroutine to the other coroutine that yielded it. As the coordinator reaches a leaf node then it can take the resulting, concrete value and inject it back into the parent coroutine’s yield point. In many ways, the coordinator becomes responsible for managing a call stack. This isn’t an impossible task. The Twisted project, again as an early leader in async Python, implemented this in effect. It could be much easier to manage, though.
PEP-380[16] defines the yield from expression. This is distinct from
yield because it automatically triggers the unravelling of nested coroutines and
also enables the use of return with a value to create a more natural
expression of setting the coroutine’s final value. The example from above can
now be revised to:
class foo:
def __init__(self, bar):
self.bar = bar
def get_foo():
response = yield from get_foo_http()
def get_foo_http():
url = 'https://example.com/foo'
body = yield from http_client_get(url)
js = json.loads(body)
return foo(js['bar'])
def http_client_get(url):
# create a socket
buffer = ''
eof = False
while not eof:
data, eof = yield from nonblocking_read(my_socket, 1024)
buffer = buffer + data
return buffer
print(get_foo().send(None)) # <__main__.foo object at 0x7f7c4f736690>
The automatic unravelling of nested coroutines is called "sub-generator
delegation". When the top-level coroutine yields from another then the outside
caller is unaware. The scope of send and throw automatically target the
active coroutine regardless of how deeply nested it was called.
This is the first language change on the list that is available exclusively in
Python 3. The yield from syntax is already somewhat obsolete for network
related code but the concept of sub-generator delegation is present in
async/await.
Futures
Coroutines and sub-generator delegation go a long way towards making async networking both possible and easier to implement. However, an async system can’t be built on coroutines alone. If every coroutine yielded from another coroutine then it’s turtles all the way down[17]. There has to be some final point where a coroutine offers up a non-coroutine value. In an async networking context, that non-coroutine value is the thing attached to the lower level callbacks. Futures are this non-coroutine value.
The concept of a future also goes by the names promise and deferred. It represents a future value that is not yet available. Most implementations, including the various Python implementations, perform two main functions: recording the final state of the value and executing callbacks when the state is finalized. For example, here’s a partial sample of the asyncio interface for futures:
class Future:
def result():
# If the future is resolved then return the value.
# Otherwise, raise a relevant exception.
def set_result(result):
# Mark as done and set the value
def set_exception(exc):
# Mark as done and set exception value
def done():
# Return true if done
def add_done_callback(callback):
# Add a callback to execute when the future is marked as done.
# The callback receives the future as an argument.
To illustrate how futures bridge between socket callbacks and coroutines, let’s bring back the last yield from example:
class foo:
def __init__(self, bar):
self.bar = bar
def get_foo():
response = yield from get_foo_http()
def get_foo_http():
url = 'https://example.com/foo'
body = yield from http_client_get(url)
js = json.loads(body)
return foo(js['bar'])
def http_client_get(url):
# create a socket
buffer = ''
eof = False
while not eof:
data, eof = yield from nonblocking_read(my_socket, 1024)
buffer = buffer + data
return buffer
print(get_foo().send(None)) # <__main__.foo object at 0x7f7c4f736690>
I used this example to illustrate the outcome of sub-generator delegation but
it’s technically incorrect. The output at the end would not actually be a foo
object. In this example, the nonblocking_read function wraps the low level,
callback based networking code. When it runs it creates a future, adds the given
socket to the polling API to be notified on read-readiness, and sets the
read-readiness callback to the future’s set_result method. It then yields the
future to the caller. The actual output at the end of the example should be:
print(get_foo().send(None)) # <Future pending>
From here, an event loop or other type of coordinator would use the future’s callback feature to connect the value back into the coroutine. For example:
coro = get_foo()
fut = coro.send(None)
fut.add_done_callback(lambda f: coro.send(f.result()))
I’ve omitted some important qualities like error handling or even continuing to handle more futures if they are yielded. Fundamentally, though, this is how async frameworks bridge between callbacks and coroutines. Some set of code wraps the callback system and returns futures. The coroutines yield from a stack of any depth that eventually results in a future being emitted. The loop or engine is built to detect futures and adapt their callbacks into pushing values back into coroutines.
Async, Await, And New Coroutines
Finally, we’ve come to PEP-492[18] which is the one that introduced the async and await keywords. It also introduced a new coroutine object that is distinct from the generator coroutines. Generally, the concepts introduced by PEP-492 are refinements of everything that lead up to here.
Before the async keyword, functions would implicitly convert to generators and
coroutines when they included the yield keyword in the body. The async
keyword signifies that a function definition is always a coroutine and cannot be
anything else.
async def coroutine():
return True
def generator_coroutine():
yield True
The await keyword replaced both yield and yield from in new style coroutines.
It also formalized the relationship with await, sub-generator delegation, and
what the PEP refers to as "future-like" objects.
async def coroutine():
await nonblocking_read(my_socket, 1024)
The await keyword always performs delegation when the awaited object is
another coroutine. It yields a value for any non-coroutine object with a
__await__ method. The new __await__ protocol effectively encapsulates
futures. As a result, a coroutine delegates to any sub-coroutines until a
future-like object is encountered. Any future-like objects are yielded so that
an event loop can operate on them.
PEP-492 introduced more and nuanced changes to the language but it largely formalized the major concepts from previous iterations: syntax for functions with pause/resume support, bidirectional communication with coroutines, sub-coroutine delegation, and futures to bridge between coroutines and callbacks.
The language will likely continue to evolve but these are the core elements that support async networking. From here, it’s all about the system that manages all those coroutines and network callbacks.
Event Loops
The syntax for async Python has a long history. Syntax, though, does not make a system asynchronous. The real asynchronicity comes from an event loop that interacts with the operating system’s non-blocking I/O and connects I/O callbacks with the async syntax.
To get deeper into how event loops work, let’s build one using all the new features of Python 3.5.
Selectors
To get started, we need a loop that continually checks for read or write ready sockets. As of Python 3.4 the standard library contains an easier to use abstraction over interacting with the operating system’s non-blocking I/O called selectors[19]. Here’s how we might implement a loop using the selectors module.
import selectors
class EventLoop:
def __init__(self, selector=None, select_timeout=1):
self._selector = selector or selectors.DefaultSelector()
self._select_timeout = select_timeout
def _check_sockets(self):
for key, event_mask in self._selector.select(self._select_timeout):
if event_mask & selectors.EVENT_READ:
# Socket is read-ready
pass
if event_mask & selectors.EVENT_WRITE:
# Socket is write-ready
pass
def run(self):
while True:
self._check_sockets()
EventLoop().run()
This code sets up the scaffolding for handling non-blocking I/O events. The
selectors module offers a DefaultSelector function that returns the optimal
polling API for the system the code is running on. The selector it returns has
a select method that calls the polling API and returns any I/O ready sockets.
The select timeout value indicates how long the select call should wait for the
API to return results. In this case I’ve set it to default to one second which
means each call to select will block for at most one second before it returns
empty results.
The polling APIs that the selectors module uses requires sockets to be registered in order to be monitored for readiness. The next step is to add a way to register and unregister sockets.
class EventLoop:
# ...
def add_reader(self, socket, read_callback):
key = None
try:
key = self._selector.get_key(socket)
except KeyError:
pass
if key is None:
# Socket was not previously registered. Create a new registration.
self._selector.register(socket, selectors.EVENT_READ, (read_callback, None, socket))
return None
# Socket was previously registered. Add read events to the registration.
_, write_callback, _ = key.data
self._selector.modify(socket, selectors.EVENT_READ | key.events, (read_callback, write_callback, socket))
def add_writer(self, socket, write_callback):
key = None
try:
key = self._selector.get_key(socket)
except KeyError:
pass
if key is None:
# Socket was not previously registered. Create a new registration.
self._selector.register(socket, selectors.EVENT_WRITE, (None, write_callback, socket))
return None
# Socket was previously registered. Add write events to the registration.
read_callback, _, _ = key.data
self._selector.modify(socket, selectors.EVENT_WRITE | key.events, (read_callback, write_callback, socket))
def remove_reader(self, socket):
try:
key = self._selector.get_key(socket)
except KeyError:
return None
_, write_callback, socket = key.data
new_mask = key.events & ~selectors.EVENT_READ
if not new_mask:
# No more events left to monitor
self._selector.unregister(socket)
return None
self._selector.modify(socket, new_mask, (None, write_callback, socket))
def remove_writer(self, socket):
try:
key = self._selector.get_key(socket)
except KeyError:
return None
read_callback, _, _ = key.data
new_mask = key.events & ~selectors.EVENT_WRITE
if not new_mask:
self._selector.unregister(socket)
return None
self._selector.modify(socket,new_mask,(read_callback, None, socket))
One of the core concepts this introduces is the selector key. The selectors
module creates a unique identifier for each socket that it uses to track
registrations. The key also contains a data field that can hold any arbitrary
input that we give it. The data field can be used to store the read and write
callbacks within the selector key so that our event loop doesn’t have to
maintain that state itself. For example, here is how selector polling would make
use of the key data:
class EventLoop:
# ...
def _check_sockets(self):
for key, event_mask in self._selector.select(self._select_timeout):
read_callback, write_callback, socket = key.data
if event_mask & selectors.EVENT_READ:
read_callback(socket)
if event_mask & selectors.EVENT_WRITE:
write_callback(socket)
The loop remains agnostic of what any particular callback does and focuses exclusively on bridging between non-blocking I/O events and other callbacks in the system.
At this point, we should have enough built that we can actually write some async networking code using callbacks. Let’s implement an async HTTP request to test it.
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setblocking(0)
s.connect_ex(('example.com', 80))
buffer = b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
loop = EventLoop()
def handle_read(s):
read = s.recv(1024)
print(read.decode("utf-8"))
if not read:
loop.remove_reader(s)
s.close()
def handle_write(s):
global buffer
count = s.send(buffer)
buffer = buffer[count:]
if not buffer:
loop.remove_writer(s)
loop.add_reader(s, handle_read)
loop.add_writer(s, handle_write)
loop.run()
# HTTP/1.0 200 OK
# ...
It’s not exactly ready to power a major project but in less than 100 lines of code we have a functioning event loop that enables callback driven, async networking.
Coroutines And Futures
With an event loop in place, the next thing to support is running a coroutine as part of the loop. Event loops are usually built to manage and schedule multiple coroutines at a time. For the sake of simplifying we’ll implement support for only one.
class EventLoop:
# ...
def run(self, coro):
while True:
try:
coro.send(None)
except StopIteration:
return
self._check_sockets()
async def hello():
print('hello')
EventLoop().run(hello())
# hello
This rudimentary support for coroutines accepts a single coroutine in the call
to run and the loop runs until that coroutine completes. The hello coroutine
doesn’t await anything so it is immediately executed to completion and the loop
exits after printing "hello".
We’ll implement something that the coroutine can await soon. First, though, we
need to add support in the loop for the await protocol. Coroutines defined
with async are designed such that the only values that ever come out of them
when calling send are "future-like" objects. In async Python, something is
future-like if it implements the __await__ method. Nearly everything about a
future is up to the event loop implementation but there is one specific behavior
that every future must implement. The __await__ method must return an
iterator that ends with the value that the future should resolve to. Here’s an
example future that implements the future-like protocol:
class Future:
def __init__(self):
self._value = None
self._done = False
def set_value(self, value):
self._value = value
self._done = True
def get_value(self):
return self._value
def is_done(self):
return self._done
def __await__(self):
while not self._done:
yield self
return self._value
This future implementation is a fairly limited container for a value that is set
at some future point in time. The __await__ method yields itself until the
value is resolved and then finally yields the future’s value. Real futures, like
asyncio.Future[20], will have much more functionality to cover all
the possible conditions that async code might encounter, including specific use
cases for the values that __await__ yields.
Now that we have a future for our event loop defined we can add support for it.
class EventLoop:
# ...
def run(self, coro):
current_future = None
try:
# Run the main coroutine until the first future-like is returned
current_future = coro.send(None)
except StopIteration:
return
while True:
try:
if current_future.is_done():
# Note that we don't pass an explicit value to send().
current_future = coro.send(None)
except StopIteration:
return
self._check_sockets()
async def hello():
print('hello')
EventLoop().run(hello())
# hello
This new version starts the coroutine by priming it with send(None). This
executes the coroutine code until a future-like value is awaited. The loop
captures this initial future, assumes that it is an instance of our Future type,
and then checks its status on each iteration. If the future is ready then the
coroutine is unpaused using send(None). Sending None instead of the future’s
value works because send at that point in time actually targets the future’s
__await__ generator and not the coroutine currently awaiting it. We don’t
need to send a value because the future returns its own value at the final
iteration which happens on send(None).
If the future is not yet ready on an iteration of the loop then the coroutine remains paused while the loop checks the I/O polling API. This technique for driving coroutines means that a coroutine will not resume until the loop iteration after its promise is resolved. I chose this technique because it’s the easiest to demonstrate and because this behavior loosely matches the asyncio event loop. However, there are alternative approaches. For example, you can use callbacks on the future, itself, to eagerly drive the value back into a coroutine as soon as the future’s value is resolved. Neither technique is necessarily better than the other.
Network Wrapped In Futures
Now that we have a future type and the loop supports handling future values, the next step is to wrap any I/O operations that we want to support with code that bridges between the I/O callbacks and a future. This will enable our coroutines to await network operations.
import socket
class Socket:
def __init__(self, loop, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None):
# This is identical to the socket.socket() constructor except that it
# also requires an event loop to be given.
self._loop = loop
self._socket = socket.socket(family, type, proto, fileno)
self._socket.setblocking(0)
def connect(self, address):
result = Future()
self._socket.connect_ex(address)
result.set_value(None) # Assume the connection works for now
return result
def recv(self, size):
result = Future()
def resolve(s):
result.set_value(self._socket.recv(size))
self._loop.remove_reader(s)
self._loop.add_reader(self._socket, resolve)
return result
def send(self, buffer):
result = Future()
def resolve(s):
result.set_value(self._socket.send(buffer))
self._loop.remove_writer(s)
self._loop.add_writer(self._socket, resolve)
return result
Certainly, there are quite a few more socket methods, potential exceptions, and
edge cases to cover. This minimal socket, though, adapts the lower level socket
methods by returning futures. Those futures are resolved at some later point
when the event loop executes the read or write callbacks. Note that the methods
are not defined using async. This is because the methods return future-like
object directly and should not be wrapped in a coroutine.
Now we can finally write async/await for our network code:
loop = EventLoop()
async def hello():
s = Socket(loop, socket.AF_INET, socket.SOCK_STREAM)
await s.connect(('example.com', 80))
buffer = b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
while buffer:
count = await s.send(buffer)
buffer = buffer[count:]
result = await s.recv(1024)
print(result.decode('utf-8'))
loop.run(hello())
# HTTP/1.0 200 OK
# ...
It took a lot to get to this point but this is effectively how systems like asyncio implement their async networking features. The loop manages coroutines and intercepts any future-like objects they emit. Coroutines stay paused until their future-likes are resolved. While paused, the loop checks the I/O status of any pending network operations and executes their callbacks when ready. The network primitives are wrapped to return futures that are integrated with the event loop’s socket registration methods. And the whole system works in a cycle of coroutines to network primitives, to unresolved futures, to I/O polling, to callbacks that resolve futures, and finally back to coroutines.
Time Travel
Almost as a bonus, event loops commonly offer time based features in addition
to networking. For example, the asyncio event loop offers methods like sleep
and call_later. These time based operations leverage futures but require
integrated support in the event loop. Time support ends up looking like a
parallel to network support except with a custom polling system.
import time
class TimeAction:
def __init__(self, callback, when):
self.callback = callback
self.when = when
class EventLoop:
def __init__(self, selector=None, select_timeout=1):
self._selector = selector or selectors.DefaultSelector()
self._select_timeout = select_timeout
self._timers = []
# ...
def sleep(self, seconds):
when = time.monotonic() + seconds
result = Future()
action = TimeAction(lambda: result.set_value(None), when)
self._timers.append(action)
return result
def run(self, coro):
current_future = None
try:
current_future = coro.send(None)
except StopIteration:
return
while True:
remove_timers = []
for offset,timer in enumerate(self._timers):
if timer.when <= time.monotonic():
remove_timers.append(offset)
timer.callback()
for offset in reversed(remove_timers):
self._timers.pop(offset)
try:
if current_future.is_done():
current_future = coro.send(None)
except StopIteration:
return
self._check_sockets()
loop = EventLoop()
async def hello():
s = Socket(loop, socket.AF_INET, socket.SOCK_STREAM)
await s.connect(('example.com', 80))
buffer = b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
while buffer:
count = await s.send(buffer)
buffer = buffer[count:]
result = await s.recv(1024)
await loop.sleep(5)
print(result.decode('utf-8'))
loop.run(hello())
# <wait 5 seconds>
# HTTP/1.0 200 OK
# ...
Calls to the sleep method are given back a future that resolves after the
sleep duration so that the caller can await it. The resolution of the future is
scheduled by calculating the target time in the future, combining it with a
callback that resolves the future, and adding the two values as a pair in a
list. Each iteration of the event loop then checks if any of the scheduled time
operations are ready and invokes their callback if they are. Finally, it removes
any completed time operations from the list.
Event loops will often have a more optimized version of this logic that includes, for example, keeping the list of actions sorted by time. This allows for more efficient short circuiting and removal from the list. Conceptually, though, this is how an event loop like asyncio provides time based features alongside its network features.
Where To Go From Here?
At this point, we’ve both gone through the history of async syntax in Python and built a working, toy version of asyncio.
If you’ve made it this far then I sincerely hope that you found the content useful. If I’ve succeeded in my goal then you should have a mental framework for how async Python works in enough detail that you can look at a coroutine and roughly imagine its journey through an event loop. If you want to know more past this point then I recommend that you start reading through the Twisted and asyncio source code.
If you choose to dig deeper into the code, I recommend looking for and considering a few items:
What are Tasks and how do they relate to futures and coroutines?
Why does asyncio resume coroutines at the next loop iteration after a future is resolved instead of immediately? What would be the consequences of doing it differently?
How are threads and multi-processing adapted to work with async code?
If Python didn’t have the GIL, how could multi-threaded coroutine scheduling work?
I also recommend reading into some of the alternative models for how async networking could have been implemented in Python. This would include projects like eventlet[21] and gevent[22] that implemented all the same concepts but tried to hide the async nature through monkey patching[23]. From there, maybe branch out into other languages that support async. They will generally implement the same concepts I’ve presented here but they often have different or unique expressions of how code becomes async.
The Complete Event Loop Code
For convenience, here is final and complete version of the event loop code that is presented in parts throughout the Event Loops section. If you have a Python 3.5 environment then you should be able to copy, paste, and execute the file.
import selectors
import socket
import time
class Future:
def __init__(self):
self._value = None
self._done = False
def set_value(self, value):
self._value = value
self._done = True
def get_value(self):
return self._value
def is_done(self):
return self._done
def __await__(self):
if not self._done:
yield self
return self._value
class TimeAction:
def __init__(self, callback, when):
self.callback = callback
self.when = when
class EventLoop:
def __init__(self, selector=None, select_timeout=1):
self._selector = selector or selectors.DefaultSelector()
self._select_timeout = select_timeout
self._timers = []
def _check_sockets(self):
for key, event_mask in self._selector.select(self._select_timeout):
read_callback, write_callback, socket = key.data
if event_mask & selectors.EVENT_READ:
read_callback(socket)
if event_mask & selectors.EVENT_WRITE:
write_callback(socket)
def sleep(self, seconds):
when = time.monotonic() + seconds
result = Future()
action = TimeAction(lambda: result.set_value(None), when)
self._timers.append(action)
return result
def add_reader(self, socket, read_callback):
key = None
try:
key = self._selector.get_key(socket)
except KeyError:
pass
if key is None:
# Socket was not previously registered. Create a new registration.
self._selector.register(socket, selectors.EVENT_READ, (read_callback, None, socket))
return None
# Socket was previously registered. Add read events to the registration.
_, write_callback, _ = key.data
self._selector.modify(socket, selectors.EVENT_READ | key.events, (read_callback, write_callback, socket))
def add_writer(self, socket, write_callback):
key = None
try:
key = self._selector.get_key(socket)
except KeyError:
pass
if key is None:
# Socket was not previously registered. Create a new registration.
self._selector.register(socket, selectors.EVENT_WRITE, (None, write_callback, socket))
return None
# Socket was previously registered. Add write events to the registration.
read_callback, _, _ = key.data
self._selector.modify(socket, selectors.EVENT_WRITE | key.events, (read_callback, write_callback, socket))
def remove_reader(self, socket):
try:
key = self._selector.get_key(socket)
except KeyError:
return None
_, write_callback, socket = key.data
new_mask = key.events & ~selectors.EVENT_READ
if not new_mask:
# No more events left to monitor
self._selector.unregister(socket)
return None
self._selector.modify(socket, new_mask, (None, write_callback, socket))
def remove_writer(self, socket):
try:
key = self._selector.get_key(socket)
except KeyError:
return None
read_callback, _, _ = key.data
new_mask = key.events & ~selectors.EVENT_WRITE
if not new_mask:
self._selector.unregister(socket)
return None
self._selector.modify(socket,new_mask,(read_callback, None, socket))
def run(self, coro):
current_future = None
try:
current_future = coro.send(None)
except StopIteration:
return
while True:
remove_timers = []
for offset,timer in enumerate(self._timers):
if timer.when <= time.monotonic():
remove_timers.append(offset)
timer.callback()
for offset in reversed(remove_timers):
self._timers.pop(offset)
try:
if current_future.is_done():
current_future = coro.send(None)
except StopIteration:
return
self._check_sockets()
class Socket:
def __init__(self, loop, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None):
self._loop = loop
self._socket = socket.socket(family, type, proto, fileno)
self._socket.setblocking(0)
def connect(self, address):
result = Future()
self._socket.connect_ex(address)
result.set_value(None)
return result
def recv(self, size):
result = Future()
def resolve(s):
result.set_value(self._socket.recv(size))
self._loop.remove_reader(s)
self._loop.add_reader(self._socket, resolve)
return result
def send(self, buffer):
result = Future()
def resolve(s):
count = self._socket.send(buffer)
result.set_value(count)
self._loop.remove_writer(s)
self._loop.add_writer(self._socket, resolve)
return result
loop = EventLoop()
async def hello():
s = Socket(loop, socket.AF_INET, socket.SOCK_STREAM)
await s.connect(('example.com', 80))
buffer = b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
while buffer:
count = await s.send(buffer)
buffer = buffer[count:]
result = await s.recv(1024)
await loop.sleep(5)
print(result.decode('utf-8'))
loop.run(hello())
# <wait 5 seconds>
# HTTP/1.0 200 OK
# ...