The introduction of the async and await keywords in Python 3.5 is a continuation of years of progress and evolution in Python’s async networking support. For a lot of folks, asyncio[1] represents something brand new to Python. Really, though, async networking has been available in the standard library since the version 1.5.2 release in 1999. The form it took then was primitive and unwieldy in comparison to what’s now available but it formed the foundation on which asyncio is built today.

My goal is to take you through that journey from 1999 to today. In the process I’ll cover the progression of async networking support in Python and how event loops like asyncio power the new Python syntax. This will be a deep dive but should provide you with a first principles understanding of how async, await, and asyncio work today.

Non-Blocking I/O

The foundation for async networking in most languages, Python included, comes from the operating system. For Unix-like environments, such as Linux, I/O resources are modeled as files including network sockets. By default, all I/O files start in blocking mode which means that a call to read or write will block the caller until the operation is complete. As an example, I’ll use a modified snippet from the Python standard library documentation for the socket module.

import socket

# Create an internet socket in TCP mode
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Bind the socket to a local address and port
s.bind(('127.0.0.1', 8888))
# Start accepting new connection, one at a time
s.listen(1)

# Get a client connection.
# This is the first part of the process that noticeably blocks. If there are
# no clients attempting to connect then the process hangs at this line of code.
# Once a client connects then the function returns and the process continues.
conn, addr = s.accept()

while 1:
    # Read up to 1024 bytes from the connection.
    # This is the next part of the code that may block if there is no data
    # available to read.
    data = conn.recv(1024)
    if not data: break
    # Send the input back as an echo.
    # Sending, like receiving, may block depending on the state of the buffers.
    conn.sendall(data)

conn.close()

The above example creates a listening socket that accepts client connections, reads all client input, and then echoes back the input. Each interaction with the socket is potentially blocking in the default mode. "Blocking" in this context refers to waiting for a resource to be available in order to use it. For example, the process cannot accept a new connection if a client is not trying to connect so it waits, or blocks, until there is one. Likewise, the process cannot receive bytes from a client if it hasn’t sent any so it blocks until bytes are available.

The default, blocking nature of sockets can make them difficult to use in concurrent situations. The above code, for example, can only manage one client. The classic approach to adding concurrency for blocking sockets is to use additional threads or child processes. An example of this can be found in the standard library SocketServer[2] module.

Another approach to concurrency is leveraging a socket’s "non-blocking" mode.

import socket

# Create an internet socket in TCP mode
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Disable blocking mode
s.setblocking(0)
# Bind the socket to a local address and port
s.bind(('127.0.0.1', 8888))
# Start accepting new connection, one at a time
s.listen(1)

# Get a client connection.
# If no client is currently attempting to connect then this call immediately
# raises an exception
conn, addr = s.accept()
# All files start in blocking mode so even connection received from a socket
# must be set to non-blocking explicitly.
conn.setblocking(0)
while 1:
    # Read up to 1024 bytes from the connection.
    # If the client has not written any data to the connection then this call
    # immediately raises an exception.
    data = conn.recv(1024)
    if not data: break
    # Send the input back as an echo.
    # In addition to immediately raising an exception, this call could send
    # only part of the data and then raise an exception.
    conn.sendall(data)

conn.close()

Putting a socket in non-blocking mode prevents read and write calls from blocking. The way it does this, though, is by raising an exception if any socket call would result in blocking. The result is an explicit signal that a socket is not ready which allows a program to stash the operation and do something else until that socket is ready. This is how a program can manage concurrent I/O on a single thread.

However, the only signal that a program gets when using non-blocking sockets is the negative condition: the socket is not ready. There’s no socket function to request its status so implementing a program that can effectively and optimally manage concurrent I/O requires continually attempting to complete any outstanding I/O operation until it succeeds. Practically, writing a program that handles this directly is more complicated than it’s worth.

To account for this difficulty, there exists a suite of system calls that provide notifications when a file is ready for reading or writing. These tools have evolved over time and include select[3], poll[4], kqueue[5], and epoll[6]. Each of these has their own history and performance characteristics but they all present a similar enough API to generalize their description. These tools work by monitoring specific files and offering a polling based API for getting a list of monitored files that are ready for I/O. To illustrate, I’ll show a modified example from the asyncore[7] module that was originally introduced into the standard library in 1999.

import select

# Lists of socket objects that the program wants to either read from or write to.
want_read = []
want_write = []

def poll(timeout=0.0):
    # This will block up to timeout seconds for any of the requested sockets
    # to become ready. If none are ready by the timeout then it returns with
    # empty results.
    (r,w,_) = select.select(want_read,want_write,[],timeout)
		for x in r:
			# x is a read-ready socket
		for x in w:
			# x is a write-ready socket

The polling APIs bridge the gap between blocking and non-blocking. The whole application can block for a configurable amount of time until one or more sockets are ready. It can then operate on any sockets as soon as they are available. In between socket operations the application can perform other logic or go idle again while waiting on more sockets to be ready.

Repeated calls to the poll API in a loop are what async networking frameworks refer to as an event loop, engine, or reactor. The asyncio event loop is more complex than the above example but the technology that drives it and the principles of its operation are exactly the same.

The challenge from here is building a program that coordinates between read/write needs and the polling API. Code written for polling is no longer sequential. Generally, low level async networking is organized as a series of callbacks that execute when I/O readiness is detected. For example, here is an echo server written with asyncore:

import asyncore
import socket

class echosrv(asyncore.dispatcher):
    def __init__(self):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(('127.0.0.1', 8080))
        self.listen(1)
        self.connections = []
    def handle_accept(self):
        conn, addr = self.accept()
        self.handle_accepted(conn, addr)
    def handle_accepted(self, conn, addr):
        self.connections.append(echo(conn))

class echo(asyncore.dispatcher):
    def __init__(self, sock):
        asyncore.dispatcher.__init__(self, sock)
        self.buffer = ''

    def handle_connect(self):
        pass

    def handle_close(self):
        self.close()

    def handle_read(self):
        self.buffer = self.buffer + self.recv(8192)

    def writable(self):
        return (len(self.buffer) > 0)

    def handle_write(self):
        sent = self.send(self.buffer)
        self.buffer = self.buffer[sent:]

c = echosrv()
asyncore.loop()

In this example, asyncore is doing a lot of heavy lifting through its custom implementation of the socket methods such as recv and send. Internally, asyncore is catching and handling exceptions related to operations on an unready socket. It is also managing an event loop and optionally registering the socket with a polling API based on attributes like the writable property. For example, on each pass of the event loop asyncore is checking writable. If it is true, which happens when the echo server has something in its outgoing buffer, then asyncore registers the socket with the polling API to alert when the underlying file is ready for writes. When the polling API returns a signal for write-readiness then asyncore calls handle_write as a callback.

This callback driven process from 1999 was, for several years, the only way to write async networking code in Python. The first, and longest running, attempt to build on this practice was the Twisted[8] project in 2001. Twisted formalized the callback pattern using "deferreds"[9], a form of promise or future[10], in addition to implementing support for a large number of web protocols like HTTP.

To this day, callbacks remain an integral behavior in Python’s async networking support but you don’t interact with them in this way. A major problem with callback style implementations is that they break the standard programming model of the language. A program is no longer written and executed top to bottom. Managing complex application state through a series of callbacks is challenging and requires different coding practices to maximize success. The key to what makes async networking what it is today is the additional syntax that Python has added over the years. New keywords and expressions provide support for easier state management and a more familiar programming style.

The Progression Of Pause

Consider this example async/await code:

async def get_something():
    something = await get_something_from_network()
    return something

This is effectively pseudo-code but the expected behavior is likely clear. The function needs to get something from the network and uses the await keyword to get it. Once it has a result, the function returns whatever it got.

The expectation in an async world is that the awaited call to get_something_from_network does not result in blocking if its result is not immediately available. To put this another way, if the result of get_something_from_network would block then Python should pause this function and work on something else until the result is ready. When the result is ready then Python should resume the function until it returns or would block again.

Being able to pause a Python function when it calls await and only resume it when the value it’s waiting for is available is the core feature that enables synchronous looking but actually async networking code. The pause and resume features of Python have evolved over multiple iterations and years. The following sections detail the progression of pause/resume and how it became async/await.

Iterators And Generators

The first concept of pausing came into Python with the introduction of custom iterators. Iterators were formalized by PEP-234[11] which established the iterator protocol. This allowed for new types to become compatible with for and in statements by implementing the __iter__ and next methods. For example, an iterator that produces a fixed number of fibonacci sequence values could now be built like this:

class fibit:
    def __init__(self, limit):
        self.count = 0
        self.limit = limit
        self.a = 0
        self.b = 1
    def __iter__(self):
        return self
    def next(self):
        if self.count >= self.limit:
            raise StopIteration
        self.count = self.count + 1
        value = self.b
        self.a, self.b = self.b, self.a + self.b
        return value

for x in fibit(5):
    print(x) # 1, 1, 2, 3, 5

Of course, fixed size fibonacci could be written before custom iterators. In fact, an arguably simpler implementation would be a as a function that produces a list:

def fib(limit):
    results = []
    x = 0
    a = 0
    b = 1
    while x < limit:
        results.append(b)
        a,b = b,a+b
        x = x+1
    return results

for x in fib(5):
    print(x) # 1, 1, 2, 3, 5

The downside to this approach, though, is that the logic to compute the set to be iterated over is eager and blocking. Large values of limit will result in the program noticeably hanging until the entire list has been computed. The iterator version of fibonacci operates on a fixed amount of memory and has no startup cost other than establishing the initial state. From there, it only has to compute a single value on each call to next.

Iterators bring in support for infinite and/or lazy loaded sequences. From an async networking perspective, iterators introduce the critical concepts of "pause" and "resume" for functions. That is, the iterator version of fibonacci can be considered a resumable form of the function version. They both produce the same outputs but the iterator can be consumed when and as needed. The major downside of the iterator form is that it requires a developer to break down the function logic into discrete steps. If any step relies on state from a previous step, such as the fibonacci example, then that state must be maintained within the instance. The state between steps of a real-world problem is often complex and the more complex the state then the more difficult the management in the iterator form. In a large way, iterator state management mirrors the complexity of callback driven, low level async programming.

To compliment iterators and make the state management easier, PEP-255[12] formalized the concept of a generator. A generator is a special kind of function definition that is reorganized into an iterator when it is called. Conceptually, generators are a function with one or more explicit pause and resume points. To indicate the pause/resume points, the yield keyword was introduced. To illustrate, here is a generator form of a limited fibonacci:

def fib(limit):
    x = 0
    a = 0
    b = 1
    while x < limit:
        yield b
        a,b = b,a+b
        x = x+1

for x in fib(5):
    print(x) # 1, 1, 2, 3, 5

This version is nearly identical to the classic function from earlier except that it yields values rather than returning. All of the state management is the same as the classic function version but the generator form also receives all the same benefits as the manually written iterator. In effect, the above generator is identical in function and performance to the manually written iterator.

From an async networking perspective, being able to pause a function is fairly critical. That pause is where the async portion of the work is intended to happen. The function yields to wait for some resource and is resumed when that resource is ready. However, generators provide a one-way communication with the caller. That is, generators can emit values but cannot accept values from a caller other than when they are initialized. This makes generators one step closer but still sub-optimal for async networking use cases.

The Twisted project found a clever workaround[13] for this by yielding mutable objects and using a few layers of decorators to manage resuming the generator when the yielded object contained a value. It required a bit of boilerplate and adherence to a set of strict rules on how to interact with the async object but a synchronous looking async networking experience was possible even with simple generators.

Generator Coroutines

PEP-342[14] addressed the lack of built-in bidirectional communication between generators and their callers. The PEP identified generators as being close to, but not quite, coroutines[15]. A coroutine in the context of the PEP is a function that can be paused and for which a value may be injected on resumption. After the modifications, generators remained iterators but gained some new methods. Notably, generators now have send, throw, and close methods that inject different values or conditions into the pause point.

As a trivial illustration of the bidirectional communication feature, here is a coroutine that adds five to any value it receives:

def add_five():
    v = 0
    while True:
        v = yield v+5

coro = add_five()
# Coroutines are often "primed" before they are used.
# This allows them to execute their code up to the first
# yield point where the outside caller can then inject a value.
coro.send(None)
print(coro.send(1)) # 6
print(coro.send(5)) # 10

With coroutines, the yield statement became an expression that returns a value. The value it returns is whatever the caller provides in the send method. Additionally, a caller can use throw with an exception and that exception will be raised at the yield point.

One of the explicit and primary purposes for introducing the coroutine feature was to account for async networking. The PEP offers this as an example of what the authors envisioned:

data = (yield nonblocking_read(my_socket, nbytes))

In this hypothetical example, the result of nonblocking_read is returned to the coroutine caller when it invokes send. The coroutine then expects that the caller will inject the final value of the read back into the coroutine with another call to send. This simplifies the coroutine’s interaction with the result of the yield and standardizes how all Python code can push information into coroutine.

Resolving the return value of nonblocking_read to a concrete value is handled by some kind of coordinator that interacts with callback based networking. I’ve intentionally not written, so far, about how that system actually resolves that final value to inject into the paused function. I do cover that later, though.

Sub-Generator Delegation

There’s one more syntax change to consider in the evolution leading to async and await called sub-generator delegation. Let’s focus, first, on the problem.

The introduction of coroutines offered this line of code as an example of what the authors intended to be possible:

data = (yield nonblocking_read(my_socket, nbytes))

If nonblocking_read returns some special object that a higher level coordinator can handle then the coordinator can inject the resulting value back into the coroutine using the send method. However, it’s quite common for code to have multiple layers of functions on top of low level networking code. A more complex example might look like this:

class foo:
    def __init__(self, bar):
        self.bar = bar

def get_foo():
    response = yield get_foo_http()
def get_foo_http():
    url = 'https://example.com/foo'
    body = yield http_client_get(url)
    js = json.loads(body)
    yield foo(js['bar'])
def http_client_get(url):
    # create a socket
    buffer = ''
    eof = False
    while not eof:
        data, eof = yield nonblocking_read(my_socket, 1024)
        buffer = buffer + data
    yield buffer

The issue with this kind of code is that yield is not automatically recursive. When get_foo yields from get_foo_http it doesn’t automatically execute any logic within get_foo_http.

coro = get_foo()
print(coro) # <generator object get_foo at 0x7f40674b2820>
next_value = coro.send(None)
print(next_value) # <generator object get_foo_http at 0x7f1249bb1870>
another_value = next_value.send(None)
print(another_value) # <generator object http_client_get at 0x7f60afed9870>

This behavior creates complications for systems that intend to manage or coordinate coroutines such as async networking event loops. Practically, an event loop that coordinates coroutine execution must maintain a directed graph that connects every coroutine to the other coroutine that yielded it. As the coordinator reaches a leaf node then it can take the resulting, concrete value and inject it back into the parent coroutine’s yield point. In many ways, the coordinator becomes responsible for managing a call stack. This isn’t an impossible task. The Twisted project, again as an early leader in async Python, implemented this in effect. It could be much easier to manage, though.

PEP-380[16] defines the yield from expression. This is distinct from yield because it automatically triggers the unravelling of nested coroutines and also enables the use of return with a value to create a more natural expression of setting the coroutine’s final value. The example from above can now be revised to:

class foo:
    def __init__(self, bar):
        self.bar = bar

def get_foo():
    response = yield from get_foo_http()
def get_foo_http():
    url = 'https://example.com/foo'
    body = yield from http_client_get(url)
    js = json.loads(body)
    return foo(js['bar'])
def http_client_get(url):
    # create a socket
    buffer = ''
    eof = False
    while not eof:
        data, eof = yield from nonblocking_read(my_socket, 1024)
        buffer = buffer + data
    return buffer

print(get_foo().send(None)) # <__main__.foo object at 0x7f7c4f736690>

The automatic unravelling of nested coroutines is called "sub-generator delegation". When the top-level coroutine yields from another then the outside caller is unaware. The scope of send and throw automatically target the active coroutine regardless of how deeply nested it was called.

This is the first language change on the list that is available exclusively in Python 3. The yield from syntax is already somewhat obsolete for network related code but the concept of sub-generator delegation is present in async/await.

Futures

Coroutines and sub-generator delegation go a long way towards making async networking both possible and easier to implement. However, an async system can’t be built on coroutines alone. If every coroutine yielded from another coroutine then it’s turtles all the way down[17]. There has to be some final point where a coroutine offers up a non-coroutine value. In an async networking context, that non-coroutine value is the thing attached to the lower level callbacks. Futures are this non-coroutine value.

The concept of a future also goes by the names promise and deferred. It represents a future value that is not yet available. Most implementations, including the various Python implementations, perform two main functions: recording the final state of the value and executing callbacks when the state is finalized. For example, here’s a partial sample of the asyncio interface for futures:

class Future:
    def result():
        # If the future is resolved then return the value.
        # Otherwise, raise a relevant exception.
    def set_result(result):
        # Mark as done and set the value
    def set_exception(exc):
        # Mark as done and set exception value
    def done():
        # Return true if done
    def add_done_callback(callback):
        # Add a callback to execute when the future is marked as done.
        # The callback receives the future as an argument.

To illustrate how futures bridge between socket callbacks and coroutines, let’s bring back the last yield from example:

class foo:
    def __init__(self, bar):
        self.bar = bar

def get_foo():
    response = yield from get_foo_http()
def get_foo_http():
    url = 'https://example.com/foo'
    body = yield from http_client_get(url)
    js = json.loads(body)
    return foo(js['bar'])
def http_client_get(url):
    # create a socket
    buffer = ''
    eof = False
    while not eof:
        data, eof = yield from nonblocking_read(my_socket, 1024)
        buffer = buffer + data
    return buffer

print(get_foo().send(None)) # <__main__.foo object at 0x7f7c4f736690>

I used this example to illustrate the outcome of sub-generator delegation but it’s technically incorrect. The output at the end would not actually be a foo object. In this example, the nonblocking_read function wraps the low level, callback based networking code. When it runs it creates a future, adds the given socket to the polling API to be notified on read-readiness, and sets the read-readiness callback to the future’s set_result method. It then yields the future to the caller. The actual output at the end of the example should be:

print(get_foo().send(None)) # <Future pending>

From here, an event loop or other type of coordinator would use the future’s callback feature to connect the value back into the coroutine. For example:

coro = get_foo()
fut = coro.send(None)
fut.add_done_callback(lambda f: coro.send(f.result()))

I’ve omitted some important qualities like error handling or even continuing to handle more futures if they are yielded. Fundamentally, though, this is how async frameworks bridge between callbacks and coroutines. Some set of code wraps the callback system and returns futures. The coroutines yield from a stack of any depth that eventually results in a future being emitted. The loop or engine is built to detect futures and adapt their callbacks into pushing values back into coroutines.

Async, Await, And New Coroutines

Finally, we’ve come to PEP-492[18] which is the one that introduced the async and await keywords. It also introduced a new coroutine object that is distinct from the generator coroutines. Generally, the concepts introduced by PEP-492 are refinements of everything that lead up to here.

Before the async keyword, functions would implicitly convert to generators and coroutines when they included the yield keyword in the body. The async keyword signifies that a function definition is always a coroutine and cannot be anything else.

async def coroutine():
    return True

def generator_coroutine():
    yield True

The await keyword replaced both yield and yield from in new style coroutines. It also formalized the relationship with await, sub-generator delegation, and what the PEP refers to as "future-like" objects.

async def coroutine():
    await nonblocking_read(my_socket, 1024)

The await keyword always performs delegation when the awaited object is another coroutine. It yields a value for any non-coroutine object with a __await__ method. The new __await__ protocol effectively encapsulates futures. As a result, a coroutine delegates to any sub-coroutines until a future-like object is encountered. Any future-like objects are yielded so that an event loop can operate on them.

PEP-492 introduced more and nuanced changes to the language but it largely formalized the major concepts from previous iterations: syntax for functions with pause/resume support, bidirectional communication with coroutines, sub-coroutine delegation, and futures to bridge between coroutines and callbacks.

The language will likely continue to evolve but these are the core elements that support async networking. From here, it’s all about the system that manages all those coroutines and network callbacks.

Event Loops

The syntax for async Python has a long history. Syntax, though, does not make a system asynchronous. The real asynchronicity comes from an event loop that interacts with the operating system’s non-blocking I/O and connects I/O callbacks with the async syntax.

To get deeper into how event loops work, let’s build one using all the new features of Python 3.5.

Selectors

To get started, we need a loop that continually checks for read or write ready sockets. As of Python 3.4 the standard library contains an easier to use abstraction over interacting with the operating system’s non-blocking I/O called selectors[19]. Here’s how we might implement a loop using the selectors module.

import selectors

class EventLoop:
    def __init__(self, selector=None, select_timeout=1):
        self._selector = selector or selectors.DefaultSelector()
        self._select_timeout = select_timeout
    def _check_sockets(self):
        for key, event_mask in self._selector.select(self._select_timeout):
            if event_mask & selectors.EVENT_READ:
                # Socket is read-ready
                pass
            if event_mask & selectors.EVENT_WRITE:
                # Socket is write-ready
                pass
    def run(self):
        while True:
            self._check_sockets()

EventLoop().run()

This code sets up the scaffolding for handling non-blocking I/O events. The selectors module offers a DefaultSelector function that returns the optimal polling API for the system the code is running on. The selector it returns has a select method that calls the polling API and returns any I/O ready sockets. The select timeout value indicates how long the select call should wait for the API to return results. In this case I’ve set it to default to one second which means each call to select will block for at most one second before it returns empty results.

The polling APIs that the selectors module uses requires sockets to be registered in order to be monitored for readiness. The next step is to add a way to register and unregister sockets.

class EventLoop:
    # ...
    def add_reader(self, socket, read_callback):
        key = None
        try:
            key = self._selector.get_key(socket)
        except KeyError:
            pass
        if key is None:
            # Socket was not previously registered. Create a new registration.
            self._selector.register(socket, selectors.EVENT_READ, (read_callback, None, socket))
            return None
        # Socket was previously registered. Add read events to the registration.
        _, write_callback, _ = key.data
        self._selector.modify(socket, selectors.EVENT_READ | key.events, (read_callback, write_callback, socket))
    def add_writer(self, socket, write_callback):
        key = None
        try:
            key = self._selector.get_key(socket)
        except KeyError:
            pass
        if key is None:
            # Socket was not previously registered. Create a new registration.
            self._selector.register(socket, selectors.EVENT_WRITE, (None, write_callback, socket))
            return None
        # Socket was previously registered. Add write events to the registration.
        read_callback, _, _ = key.data
        self._selector.modify(socket, selectors.EVENT_WRITE | key.events, (read_callback, write_callback, socket))
    def remove_reader(self, socket):
        try:
            key = self._selector.get_key(socket)
        except KeyError:
            return None
        _, write_callback, socket = key.data
        new_mask = key.events & ~selectors.EVENT_READ
        if not new_mask:
            # No more events left to monitor
            self._selector.unregister(socket)
            return None
        self._selector.modify(socket, new_mask, (None, write_callback, socket))
    def remove_writer(self, socket):
        try:
            key = self._selector.get_key(socket)
        except KeyError:
            return None
        read_callback, _, _ = key.data
        new_mask = key.events & ~selectors.EVENT_WRITE
        if not new_mask:
            self._selector.unregister(socket)
            return None
        self._selector.modify(socket,new_mask,(read_callback, None, socket))

One of the core concepts this introduces is the selector key. The selectors module creates a unique identifier for each socket that it uses to track registrations. The key also contains a data field that can hold any arbitrary input that we give it. The data field can be used to store the read and write callbacks within the selector key so that our event loop doesn’t have to maintain that state itself. For example, here is how selector polling would make use of the key data:

class EventLoop:
    # ...
    def _check_sockets(self):
        for key, event_mask in self._selector.select(self._select_timeout):
            read_callback, write_callback, socket = key.data
            if event_mask & selectors.EVENT_READ:
                read_callback(socket)
            if event_mask & selectors.EVENT_WRITE:
                write_callback(socket)

The loop remains agnostic of what any particular callback does and focuses exclusively on bridging between non-blocking I/O events and other callbacks in the system.

At this point, we should have enough built that we can actually write some async networking code using callbacks. Let’s implement an async HTTP request to test it.

import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setblocking(0)
s.connect_ex(('example.com', 80))
buffer = b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
loop = EventLoop()

def handle_read(s):
    read = s.recv(1024)
    print(read.decode("utf-8"))
    if not read:
        loop.remove_reader(s)
        s.close()

def handle_write(s):
    global buffer
    count = s.send(buffer)
    buffer = buffer[count:]
    if not buffer:
        loop.remove_writer(s)


loop.add_reader(s, handle_read)
loop.add_writer(s, handle_write)
loop.run()
# HTTP/1.0 200 OK
# ...

It’s not exactly ready to power a major project but in less than 100 lines of code we have a functioning event loop that enables callback driven, async networking.

Coroutines And Futures

With an event loop in place, the next thing to support is running a coroutine as part of the loop. Event loops are usually built to manage and schedule multiple coroutines at a time. For the sake of simplifying we’ll implement support for only one.

class EventLoop:
    # ...
    def run(self, coro):
        while True:
            try:
                coro.send(None)
            except StopIteration:
                return
            self._check_sockets()

async def hello():
    print('hello')

EventLoop().run(hello())
# hello

This rudimentary support for coroutines accepts a single coroutine in the call to run and the loop runs until that coroutine completes. The hello coroutine doesn’t await anything so it is immediately executed to completion and the loop exits after printing "hello".

We’ll implement something that the coroutine can await soon. First, though, we need to add support in the loop for the await protocol. Coroutines defined with async are designed such that the only values that ever come out of them when calling send are "future-like" objects. In async Python, something is future-like if it implements the __await__ method. Nearly everything about a future is up to the event loop implementation but there is one specific behavior that every future must implement. The __await__ method must return an iterator that ends with the value that the future should resolve to. Here’s an example future that implements the future-like protocol:

class Future:
    def __init__(self):
        self._value = None
        self._done = False

    def set_value(self, value):
        self._value = value
        self._done = True

    def get_value(self):
        return self._value

    def is_done(self):
        return self._done

    def __await__(self):
        while not self._done:
            yield self
        return self._value

This future implementation is a fairly limited container for a value that is set at some future point in time. The __await__ method yields itself until the value is resolved and then finally yields the future’s value. Real futures, like asyncio.Future[20], will have much more functionality to cover all the possible conditions that async code might encounter, including specific use cases for the values that __await__ yields.

Now that we have a future for our event loop defined we can add support for it.

class EventLoop:
    # ...
    def run(self, coro):
        current_future = None
        try:
            # Run the main coroutine until the first future-like is returned
            current_future = coro.send(None)
        except StopIteration:
            return
        while True:
            try:
                if current_future.is_done():
                    # Note that we don't pass an explicit value to send().
                    current_future = coro.send(None)
            except StopIteration:
                return
            self._check_sockets()

async def hello():
    print('hello')

EventLoop().run(hello())
# hello

This new version starts the coroutine by priming it with send(None). This executes the coroutine code until a future-like value is awaited. The loop captures this initial future, assumes that it is an instance of our Future type, and then checks its status on each iteration. If the future is ready then the coroutine is unpaused using send(None). Sending None instead of the future’s value works because send at that point in time actually targets the future’s __await__ generator and not the coroutine currently awaiting it. We don’t need to send a value because the future returns its own value at the final iteration which happens on send(None).

If the future is not yet ready on an iteration of the loop then the coroutine remains paused while the loop checks the I/O polling API. This technique for driving coroutines means that a coroutine will not resume until the loop iteration after its promise is resolved. I chose this technique because it’s the easiest to demonstrate and because this behavior loosely matches the asyncio event loop. However, there are alternative approaches. For example, you can use callbacks on the future, itself, to eagerly drive the value back into a coroutine as soon as the future’s value is resolved. Neither technique is necessarily better than the other.

Network Wrapped In Futures

Now that we have a future type and the loop supports handling future values, the next step is to wrap any I/O operations that we want to support with code that bridges between the I/O callbacks and a future. This will enable our coroutines to await network operations.

import socket

class Socket:
    def __init__(self, loop, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None):
        # This is identical to the socket.socket() constructor except that it
        # also requires an event loop to be given.
        self._loop = loop
        self._socket = socket.socket(family, type, proto, fileno)
        self._socket.setblocking(0)
    def connect(self, address):
        result = Future()
        self._socket.connect_ex(address)
        result.set_value(None) # Assume the connection works for now
        return result
    def recv(self, size):
        result = Future()
        def resolve(s):
            result.set_value(self._socket.recv(size))
            self._loop.remove_reader(s)
        self._loop.add_reader(self._socket, resolve)
        return result
    def send(self, buffer):
        result = Future()
        def resolve(s):
            result.set_value(self._socket.send(buffer))
            self._loop.remove_writer(s)
        self._loop.add_writer(self._socket, resolve)
        return result

Certainly, there are quite a few more socket methods, potential exceptions, and edge cases to cover. This minimal socket, though, adapts the lower level socket methods by returning futures. Those futures are resolved at some later point when the event loop executes the read or write callbacks. Note that the methods are not defined using async. This is because the methods return future-like object directly and should not be wrapped in a coroutine.

Now we can finally write async/await for our network code:

loop = EventLoop()

async def hello():
    s = Socket(loop, socket.AF_INET, socket.SOCK_STREAM)
    await s.connect(('example.com', 80))
    buffer = b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    while buffer:
        count = await s.send(buffer)
        buffer = buffer[count:]
    result = await s.recv(1024)
    print(result.decode('utf-8'))

loop.run(hello())
# HTTP/1.0 200 OK
# ...

It took a lot to get to this point but this is effectively how systems like asyncio implement their async networking features. The loop manages coroutines and intercepts any future-like objects they emit. Coroutines stay paused until their future-likes are resolved. While paused, the loop checks the I/O status of any pending network operations and executes their callbacks when ready. The network primitives are wrapped to return futures that are integrated with the event loop’s socket registration methods. And the whole system works in a cycle of coroutines to network primitives, to unresolved futures, to I/O polling, to callbacks that resolve futures, and finally back to coroutines.

Time Travel

Almost as a bonus, event loops commonly offer time based features in addition to networking. For example, the asyncio event loop offers methods like sleep and call_later. These time based operations leverage futures but require integrated support in the event loop. Time support ends up looking like a parallel to network support except with a custom polling system.

import time

class TimeAction:
    def __init__(self, callback, when):
        self.callback = callback
        self.when = when

class EventLoop:
    def __init__(self, selector=None, select_timeout=1):
        self._selector = selector or selectors.DefaultSelector()
        self._select_timeout = select_timeout
        self._timers = []
    # ...
    def sleep(self, seconds):
        when = time.monotonic() + seconds
        result = Future()
        action = TimeAction(lambda: result.set_value(None), when)
        self._timers.append(action)
        return result
    def run(self, coro):
        current_future = None
        try:
            current_future = coro.send(None)
        except StopIteration:
            return
        while True:
            remove_timers = []
            for offset,timer in enumerate(self._timers):
                if timer.when <= time.monotonic():
                    remove_timers.append(offset)
                    timer.callback()
            for offset in reversed(remove_timers):
                self._timers.pop(offset)
            try:
                if current_future.is_done():
                    current_future = coro.send(None)
            except StopIteration:
                return
            self._check_sockets()

loop = EventLoop()
async def hello():
    s = Socket(loop, socket.AF_INET, socket.SOCK_STREAM)
    await s.connect(('example.com', 80))
    buffer = b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    while buffer:
        count = await s.send(buffer)
        buffer = buffer[count:]
    result = await s.recv(1024)
    await loop.sleep(5)
    print(result.decode('utf-8'))

loop.run(hello())
# <wait 5 seconds>
# HTTP/1.0 200 OK
# ...

Calls to the sleep method are given back a future that resolves after the sleep duration so that the caller can await it. The resolution of the future is scheduled by calculating the target time in the future, combining it with a callback that resolves the future, and adding the two values as a pair in a list. Each iteration of the event loop then checks if any of the scheduled time operations are ready and invokes their callback if they are. Finally, it removes any completed time operations from the list.

Event loops will often have a more optimized version of this logic that includes, for example, keeping the list of actions sorted by time. This allows for more efficient short circuiting and removal from the list. Conceptually, though, this is how an event loop like asyncio provides time based features alongside its network features.

Where To Go From Here?

At this point, we’ve both gone through the history of async syntax in Python and built a working, toy version of asyncio.

If you’ve made it this far then I sincerely hope that you found the content useful. If I’ve succeeded in my goal then you should have a mental framework for how async Python works in enough detail that you can look at a coroutine and roughly imagine its journey through an event loop. If you want to know more past this point then I recommend that you start reading through the Twisted and asyncio source code.

If you choose to dig deeper into the code, I recommend looking for and considering a few items:

  • What are Tasks and how do they relate to futures and coroutines?

  • Why does asyncio resume coroutines at the next loop iteration after a future is resolved instead of immediately? What would be the consequences of doing it differently?

  • How are threads and multi-processing adapted to work with async code?

  • If Python didn’t have the GIL, how could multi-threaded coroutine scheduling work?

I also recommend reading into some of the alternative models for how async networking could have been implemented in Python. This would include projects like eventlet[21] and gevent[22] that implemented all the same concepts but tried to hide the async nature through monkey patching[23]. From there, maybe branch out into other languages that support async. They will generally implement the same concepts I’ve presented here but they often have different or unique expressions of how code becomes async.

The Complete Event Loop Code

For convenience, here is final and complete version of the event loop code that is presented in parts throughout the Event Loops section. If you have a Python 3.5 environment then you should be able to copy, paste, and execute the file.

import selectors
import socket
import time

class Future:
    def __init__(self):
        self._value = None
        self._done = False

    def set_value(self, value):
        self._value = value
        self._done = True

    def get_value(self):
        return self._value

    def is_done(self):
        return self._done

    def __await__(self):
        if not self._done:
            yield self
        return self._value

class TimeAction:
    def __init__(self, callback, when):
        self.callback = callback
        self.when = when

class EventLoop:
    def __init__(self, selector=None, select_timeout=1):
        self._selector = selector or selectors.DefaultSelector()
        self._select_timeout = select_timeout
        self._timers = []
    def _check_sockets(self):
        for key, event_mask in self._selector.select(self._select_timeout):
            read_callback, write_callback, socket = key.data
            if event_mask & selectors.EVENT_READ:
                read_callback(socket)
            if event_mask & selectors.EVENT_WRITE:
                write_callback(socket)
    def sleep(self, seconds):
        when = time.monotonic() + seconds
        result = Future()
        action = TimeAction(lambda: result.set_value(None), when)
        self._timers.append(action)
        return result
    def add_reader(self, socket, read_callback):
        key = None
        try:
            key = self._selector.get_key(socket)
        except KeyError:
            pass
        if key is None:
            # Socket was not previously registered. Create a new registration.
            self._selector.register(socket, selectors.EVENT_READ, (read_callback, None, socket))
            return None
        # Socket was previously registered. Add read events to the registration.
        _, write_callback, _ = key.data
        self._selector.modify(socket, selectors.EVENT_READ | key.events, (read_callback, write_callback, socket))
    def add_writer(self, socket, write_callback):
        key = None
        try:
            key = self._selector.get_key(socket)
        except KeyError:
            pass
        if key is None:
            # Socket was not previously registered. Create a new registration.
            self._selector.register(socket, selectors.EVENT_WRITE, (None, write_callback, socket))
            return None
        # Socket was previously registered. Add write events to the registration.
        read_callback, _, _ = key.data
        self._selector.modify(socket, selectors.EVENT_WRITE | key.events, (read_callback, write_callback, socket))
    def remove_reader(self, socket):
        try:
            key = self._selector.get_key(socket)
        except KeyError:
            return None
        _, write_callback, socket = key.data
        new_mask = key.events & ~selectors.EVENT_READ
        if not new_mask:
            # No more events left to monitor
            self._selector.unregister(socket)
            return None
        self._selector.modify(socket, new_mask, (None, write_callback, socket))
    def remove_writer(self, socket):
        try:
            key = self._selector.get_key(socket)
        except KeyError:
            return None
        read_callback, _, _ = key.data
        new_mask = key.events & ~selectors.EVENT_WRITE
        if not new_mask:
            self._selector.unregister(socket)
            return None
        self._selector.modify(socket,new_mask,(read_callback, None, socket))
    def run(self, coro):
        current_future = None
        try:
            current_future = coro.send(None)
        except StopIteration:
            return
        while True:
            remove_timers = []
            for offset,timer in enumerate(self._timers):
                if timer.when <= time.monotonic():
                    remove_timers.append(offset)
                    timer.callback()
            for offset in reversed(remove_timers):
                self._timers.pop(offset)
            try:
                if current_future.is_done():
                    current_future = coro.send(None)
            except StopIteration:
                return
            self._check_sockets()

class Socket:
    def __init__(self, loop, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None):
        self._loop = loop
        self._socket = socket.socket(family, type, proto, fileno)
        self._socket.setblocking(0)
    def connect(self, address):
        result = Future()
        self._socket.connect_ex(address)
        result.set_value(None)
        return result
    def recv(self, size):
        result = Future()
        def resolve(s):
            result.set_value(self._socket.recv(size))
            self._loop.remove_reader(s)
        self._loop.add_reader(self._socket, resolve)
        return result
    def send(self, buffer):
        result = Future()
        def resolve(s):
            count = self._socket.send(buffer)
            result.set_value(count)
            self._loop.remove_writer(s)
        self._loop.add_writer(self._socket, resolve)
        return result

loop = EventLoop()
async def hello():
    s = Socket(loop, socket.AF_INET, socket.SOCK_STREAM)
    await s.connect(('example.com', 80))
    buffer = b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
    while buffer:
        count = await s.send(buffer)
        buffer = buffer[count:]
    result = await s.recv(1024)
    await loop.sleep(5)
    print(result.decode('utf-8'))

loop.run(hello())
# <wait 5 seconds>
# HTTP/1.0 200 OK
# ...