David Beazley - Python Concurrency From the Ground Up: LIVE! - PyCon 2015

By: PyCon 2015

1338   6   89937

Uploaded on 04/11/2015

"Speaker: David Beazley

There are currently three popular approaches to Python concurrency: threads, event loops, and coroutines. Each is shrouded by various degrees of mystery and peril. In this talk, all three approaches will be deconstructed and explained in a epic ground-up live coding battle.

Slides can be found at: https://speakerdeck.com/pycon2015 and https://github.com/PyCon/2015-slides"

Comments (12):

By anonymous    2017-09-20

Short answer:

yield from is an old way to await for coroutine.

await is an modern way to await for coroutine.

Detailed answer:

Python has generators - special kind of functions that produces a sequence of results instead of a single value. Starting with Python 3.3 yield from expression was added. It allows one generator to delegate part of its operations to another generator.

Starting with Python 3.4 asyncio module was added to standard library. It allow us to write clear and understandable asynchronous code. While technically asyncio's coroutines could be implemented different ways, in asyncio they were implemented using generators (you can watch for excellent video where shown how generators can be used to implement coroutines). @asyncio.coroutine was a way to make coroutine from generator and yield from was a way to await for coroutine - just details of implementation.

That's how happened that yield from started to be used for two "different things".

Starting with Python 3.5 (see PEP 492) coroutines got need syntax. Now you can define coroutine with async def and await for it using await expression. It's not only shorter to write, but also makes clearer to understand that we work with coroutines.

If you're using Python 3.5+ you can forgot of using yield from for anything except generators and use await for coroutines.

Original Thread

By anonymous    2017-09-20

Does each task get launched on separate thread?

No, usually asyncio runs in single thread.

How asyncio package enables bar, to be an async task, with these keywords, under the hood?

When you define function as async this function becomes generator what allows to execute it "by steps" using __next__() method. await - is yield (yield from actually) point where execution flow returns to global event loop that manages executing of all coroutines.

This simple example shows how you can switch between execution flow of different generators:

def task(i):
    yield 1
    print('task {}: step 1'.format(i))
    yield 2
    print('task {}: step 2'.format(i))


tasks = [
    task(1),
    task(2),
    task(3),
]


def execute_tasks(tasks):
    i = 0
    finished = []
    while True:
        # start executing tasks:
        try:
            tasks[i].__next__()
        except StopIteration:
            finished.append(i)
        # check if any task unfinished:
        if len(finished) == len(tasks):
            return
        # move to next unfinished task:
        while True:
            i += 1
            if i > len(tasks) - 1:
                i = 0
            if not i in finished:
                break


if __name__ == '__main__':
    execute_tasks(tasks)

Output:

task 1: step 1
task 2: step 1
task 3: step 1
task 1: step 2
task 2: step 2
task 3: step 2

asyncio of course is much more complex and allows you much more.

Probably best explanation of how you can implement coroutines using generators I saw in this PyCon 2015 video: David Beazley - Python Concurrency From the Ground Up: LIVE! (source code). You should definitely watch it if you're going implement this.

But I advice you to use asyncio instead - it already exists for you, there's no need to invent your own.

Original Thread

By anonymous    2017-10-22

@wvxvw I really think it contains all needed stuff to see how to use asyncio. If you want something more deep, to understand how asyncio itself works, I can advice you to watch this video: https://www.youtube.com/watch?v=MCs5OvhV9S4 It's more complex, but it shows how event loop (abstract one) organized and how it manages things.

Original Thread

By anonymous    2018-01-07

Code you provided is not concurrent in both cases. It happens because of this line:

async def asleep(n):
    time.sleep(n)   # blocking method

time.sleep freezes asyncio event loop making it unable to process other running same time coroutines. You should never do such thing when using asyncio (if you want to run some blocking code inside coroutine, you should run it in another thread using loop.run_in_executor as shown here).

To make your code coroutine working properly you should use special asyncio version of sleep that doesn't block event loop:

async def asleep(n):
    await asyncio.sleep(n)   # blocking method

Compare executing this version (in case1) with version you posted (in case1): 6 seconds vs. 2 seconds.


Once we creating tasks from coroutines using ensure_future these coroutines start running concurently:

tasks = [asyncio.ensure_future(c) for c in coros]  # this run all coroutines concurently

Using asyncio.gather - is alternative way to run coroutines concurently:

await asyncio.gather(*coros)  # this run all coroutines concurently

Way asyncio works in detail may seem pretty complex. Best explanation I've seen: David Beazley - Python Concurrency From the Ground Up. Note, that you usually don't need to understand low-level detail to use asyncio on practice.

Original Thread

By anonymous    2018-01-07

This seems like a pretty straightforward use case for asyncio. I wouldn't consider using asyncio as "using a particular library" since socket programming paired with asyncio's event loop is pretty low-level and the concept is very transparent if you have experience with other languages and just want to see how async programming works in Python.

You can use this async chat as an example: https://gist.github.com/gregvish/7665915

Essentially, you create a non-blocking socket, see standard library reference on socket.setblocking(0):

https://docs.python.org/3/library/socket.html#socket.socket.setblocking

I'd also suggest this amazing session by David Beazley as a must-see for async Python programming. He explains the concurrency concepts in Python using sockets, exactly what you need: https://www.youtube.com/watch?v=MCs5OvhV9S4

Original Thread

By anonymous    2018-01-29

You have two options for running things in parallel in Python, either use the multiprocessing (docs) library , or write the parallel code in cython and release the GIL. The latter is significantly more work and less applicable generally speaking.

Python threads are limited by the Global Interpreter Lock (GIL), I won't go into detail here as you will find more than enough information online on it. In short, the GIL, as the name suggests, is a global lock within the CPython interpreter that ensures multiple threads do not modify objects, that are within the confines of said interpreter, simultaneously. This is why, for instance, cython programs can run code in parallel because they can exist outside the GIL.


As to your code, one problem is that you're running both the number crunching (binarize) and the socket.send inside the GIL, this will run them strictly serially. The queue is also connected very strangely, and there is a NameError but let's leave those aside.

With the caveats already pointed out by Jeremy Friesner in mind, I suggest you re-structure the code in the following manner: you have two processes (not threads) one for binarising the data and the other for sending data. In addition to those, there is also the parent process that started both children, and a queue connecting child 1 to child 2.

  • Subprocess-1 does number crunching and produces crunched data into a queue
  • Subprocess-2 consumes data from a queue and does socket.send

in code the setup would look something like

from multiprocessing import Process, Queue

work_queue = Queue()
p1 = Process(target=binarize, args=(100, work_queue))
p2 = Process(target=send_data, args=(ip, port, work_queue))
p1.start()
p2.start()
p1.join()
p2.join()

binarize can remain as it is in your code, with the exception that instead of a yield at the end, you add elements into the queue

def binarize(num_of_chunks, q):
    ''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''

    ordered_col_list = ('col1', 'col2')
    columns = dict.fromkeys(ordered_col_list)
    for i in range(num_of_chunks):
        columns['col1'] = array.array('i', range(10000)).tostring()
        columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
        data = b''.join((columns[col_name] for col_name in ordered_col_list))
        q.put(data)

send_data should just be the while loop from the bottom of your code, with the connection open/close functionality

def send_data(ip, addr, q):
     s = socket.create_connection((ip, addr))
     while True:
         try:
             binary_chunk = q.get(False)
         except:
             sleep(0.1)
             continue
         else:    
             socket.sendall(binary_chunk)
             socket.recv(1000) # Get confirmation
    # maybe remember to close the socket before killing the process

Now you have two (three actually if you count the parent) processes that are processing data independently. You can force the two processes to synchronise their operations by setting the max_size of the queue to a single element. The operation of these two separate processes is also easy to monitor from the process manager on your computer top (Linux), Activity Monitor (OsX), don't remember what it's called under Windows.


Finally, Python 3 comes with the option of using co-routines which are neither processes nor threads, but something else entirely. Co-routines are pretty cool from a CS point of view, but a bit of a head scratcher at first. There is plenty of resources to learn from though, like this post on Medium and this talk by David Beazley.


Even more generally, you might want to look into the producer/consumer pattern, if you are not already familiar with it.

Original Thread

By anonymous    2018-02-12

@AmitTripathi If you mean asyncio, I read a bunch of tutorials - e.g. [this one](https://medium.freecodecamp.org/a-guide-to-asynchronous-programming-in-python-with-asyncio-232e2afa44f6) and [this one](http://www.giantflyingsaucer.com/blog/?p=5557) for the basics, [this tutorial](https://hackernoon.com/asyncio-for-the-working-python-developer-5c468e6e2e8e) and [this reference](https://pymotw.com/3/asyncio/) more in-depth, and [this article](https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/) and [this video](https://www.youtube.com/watch?v=MCs5OvhV9S4) for event loop details.

Original Thread

By anonymous    2018-02-12

The line:

a_stack(words).send('hello')

does two things:

  1. create and run a new generator
  2. send the string hello into the generator

The created generator waits for item to arrive, and then, once resumed, does something with the item. And that is the problem, you never resume the generator, you throw it away and create a new one, and proceed to use in the same manner. To fix it, your sending code should do something like:

coro = a_stack(words)
coro.send('hello')
log(words)
coro.send('world')
log(words)

But there is another problem. Before actually appending to the stack, a_stack defers its execution to another iterator, which never stops yielding. One way to code a_sleep that fixes the problem is:

@coroutine
def a_sleep(count):
    t0 = time()
    while time() - t0 < count:
        yield 'notyet'

Then you need either a scheduler or at least a more resilient version of send, which can actually deal with a task deferring its execution. A simple (and very inefficient) one could looks like this:

def sync_send(c, v):
    while True:
        ret = c.send(v)
        if ret != 'notyet':
            return ret

After replacing coro.send('hello') with sync_send(coro, 'hello'), the expected output is displayed.

A real scheduler would never busy-loop; it would be instructed by sleep and by other potentially blocking calls, such as reads from files or sockets, which IO/timing events it must wait for. After the appropriate event arrived, it would wake up the correct task. This is the core of what asyncio does.

To learn more about how generators and yield from are used as a core abstraction for asynchronous programming, I recommend the excellent lecture Python Concurrency From the Ground by Dave Beazley. In the lecture Dave implements a coroutine scheduler in front of live audience, showing the design of his curio library.

Original Thread

By anonymous    2018-03-05

Your coro desugaring is conceptually correct, but slightly incomplete.

await doesn't suspend unconditionally, but only if it encounters a blocking call. How does it know that a call is blocking? This is decided by the awaitable object. Let's say you're awaiting read on a socket. The implementation of read could be desugared to:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

(In real asyncio the equivalent code returns a Future, but the concept is the same.) When appropriately decorated, this is something you can await. Now, when your coroutine contains:

data = await read(sock, 1024)

It desugars into something close to:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

People familiar with generators tend to describe the above in terms of yield from which does the suspension automatically.

The suspension chain continues all the way up to the event loop, which notices that the coroutine is suspended, removes it from the runnable set, and goes on to execute coroutines that are runnable, if any. If no coroutines are runnable, the loop waits in select() until either a file descriptor a coroutine is interested in becomes ready for IO. (The event loop maintains a file-descriptor-to-coroutine mapping.)

In the above example, once select() tells the event loop that sock is readable, it will re-add coro to the runnable set, so it will be continued from the point of suspension.

In other words:

  1. Everything happens in the same thread by default.

  2. The event loop is responsible for scheduling the coroutines and waking them up when whatever they were waiting for (typically an IO call that would normally block, or a timeout) becomes ready.

For insight on coroutine-driving event loops, I recommend this talk by Dave Beazley, where he demonstrates coding an event loop from scratch in front of live audience.

Original Thread

By anonymous    2018-05-14

I wrote a simple Echo Server with co-routines, which would yield the fd along with the interested action like this yield fd, 'read', yield fd, 'write' etc and then the Event Loop would register the select accordingly.

This is similar to how Dave Beazley's curio works. To learn more about the concept, see this lecture where he builds an event loop from the very basics. (He uses the pre-3.5 yield from syntax, but it works exactly the same as await.)

As you discovered, asyncio works a bit differently, although the principle is still similar.

Now I am just trying to understand how await actually works. It doesn't seem to yield fds, and string corresponding to the action like the above example, instead it gives you a Future object. So what exactly happens under-the-hood? How is it communicating with the Event Loop?

The short version is that blocking coroutines use a global variable (through asyncio.get_event_loop()) to fetch the event loop. The event loop has methods that schedule callbacks to be invoked when an interesting event occurs. asyncio.sleep calls loop.call_later to ensure its resumption when the timeout elapses.

The Future that is yielded is just a convenient way for the event loop to be notified of the result once it's ready, so that it can correctly resume the Task (coroutine driven by the event loop) that was awaiting the blocking operation, while also handling exceptions and cancellation. See Task.__step for the gory details.

timerfd_create was just taken as an example because I couldn't understand how timers can be implemented in an Event Loop.

Timers are implemented so that the event loop keeps track of both file descriptors and timeouts, and issues a select that terminates when the earliest timeout elapses. Dave's lecture linked above demonstrates the concept succinctly.

Original Thread

Popular Videos 100

Submit Your Video

If you have some great dev videos to share, please fill out this form.