Programação Assíncrona com Asyncio

PythonDay Campina Grande 2017

Allisson Azevedo

Allisson Azevedo

allissonazevedo.com

youtube.com/user/allissonazevedo

github.com/allisson

twitter.com/allisson

linkedin.com/in/allisson/

allisson.github.io/slides/

allisson@gmail.com

The C10K problem

Como lidar com 10k conexões simultâneas

http://www.kegel.com/c10k.html

Concorrência

Não confundir com paralelismo

Asynchronous I/O

O exemplo do garçom

Programação síncrona


import time
import requests
from github import REPOS, ACCESS_TOKEN

start = time.time()
for repo_url in REPOS:
    response = requests.get(repo_url, params={'access_token': ACCESS_TOKEN}).json()
    repo_info = {
        'name': response['name'],
        'full_name': response['full_name'],
        'stargazers_count': response['stargazers_count']
    }
    print(repo_info)
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
                

Problemas com programação síncrona

Uma requisição http por vez

Concorrência usando threads


import time
import threading
import queue
import requests
from github import REPOS, ACCESS_TOKEN


def grab_data_from_queue():
    while not q.empty():
        repo_url = q.get()
        response = requests.get(repo_url, params={'access_token': ACCESS_TOKEN}).json()
        repo_info = {
            'name': response['name'],
            'full_name': response['full_name'],
            'stargazers_count': response['stargazers_count']
        }
        print(repo_info)
        q.task_done()


max_threads = 5
start = time.time()
q = queue.Queue()

for repo_url in REPOS:
    q.put(repo_url)

for i in range(max_threads):
    thread = threading.Thread(target=grab_data_from_queue)
    thread.start()

q.join()
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
                

Problemas com threads

Global Interpreter Lock (GIL)

Consumo de recursos

Concorrência usando process


import time
import multiprocessing
import requests
from github import REPOS, ACCESS_TOKEN


def grab_data_from_queue():
    while not q.empty():
        repo_url = q.get()
        response = requests.get(repo_url, params={'access_token': ACCESS_TOKEN}).json()
        repo_info = {
            'name': response['name'],
            'full_name': response['full_name'],
            'stargazers_count': response['stargazers_count']
        }
        print(repo_info)
        q.task_done()


workers = 5
start = time.time()
q = multiprocessing.JoinableQueue()

for repo_url in REPOS:
    q.put(repo_url)

for i in range(workers):
    process = multiprocessing.Process(target=grab_data_from_queue)
    process.start()

q.join()
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
                

Problemas com process

Consumo de recursos

Concorrência usando concurrent.futures


import time
from concurrent import futures
import requests
from github import REPOS, ACCESS_TOKEN


def get_repo_info(repo_url):
    response = requests.get(repo_url, params={'access_token': ACCESS_TOKEN}).json()
    repo_info = {
        'name': response['name'],
        'full_name': response['full_name'],
        'stargazers_count': response['stargazers_count']
    }
    print(repo_info)


workers = 5
start = time.time()

with futures.ThreadPoolExecutor(workers) as executor:
    executor.map(get_repo_info, REPOS)

end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
                

Problemas com concurrent.futures

ThreadPoolExecutor - usa threads

ProcessPoolExecutor - usa process

Asynchronous I/O com Python

Twisted

Tornado

Eventlet

Gevent

Asyncio

Asyncio

Tulip

PEP-3156

Python 3.4+

Hello World


import asyncio


async def hello_world():
    print('Hello World!')

loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world())
                

Hello World com Tasks


import asyncio


async def hello_world(name):
    print('Hello World, {}!'.format(name))

loop = asyncio.get_event_loop()
tasks = []
for name in ('fulano', 'cicrano', 'beltrano'):
    task = asyncio.ensure_future(hello_world(name))
    tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
                

Concorrência usando Asyncio


import time
import asyncio
import aiohttp
from github import REPOS, ACCESS_TOKEN


async def get_repo_info(repo_url):
    async with aiohttp.ClientSession() as session:
        async with session.get(repo_url, params={'access_token': ACCESS_TOKEN}) as response:
            response_data = await response.json()
            repo_info = {
                'name': response_data['name'],
                'full_name': response_data['full_name'],
                'stargazers_count': response_data['stargazers_count']
            }
            print(repo_info)

start = time.time()
loop = asyncio.get_event_loop()
tasks = []
for repo_url in REPOS:
    task = asyncio.ensure_future(get_repo_info(repo_url))
    tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
                

aio libs

https://github.com/aio-libs

https://github.com/python/asyncio/wiki/ThirdParty

paco


import time
import paco
import aiohttp
from github import REPOS, ACCESS_TOKEN


async def get_repo_info(repo_url):
    async with aiohttp.ClientSession() as session:
        async with session.get(repo_url, params={'access_token': ACCESS_TOKEN}) as response:
            response_data = await response.json()
            repo_info = {
                'name': response_data['name'],
                'full_name': response_data['full_name'],
                'stargazers_count': response_data['stargazers_count']
            }
            print(repo_info)

start = time.time()
tasks = [get_repo_info(repo_url) for repo_url in REPOS]
paco.run(paco.wait(tasks, limit=10))
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
                

aiohttp


from aiohttp import web


async def handle(request):
    return web.json_response({'message': 'Hello World'})

app = web.Application()
app.router.add_get('/', handle)
web.run_app(app, host='127.0.0.1', port=8080)
                

sanic


from sanic import Sanic
from sanic.response import json

app = Sanic()


@app.route('/')
async def test(request):
    return json({'message': 'Hello World'})

if __name__ == '__main__':
    app.run(host='127.0.0.1', port=8080)
                

aioredis


import asyncio
import aioredis

loop = asyncio.get_event_loop()


async def main():
    redis = await aioredis.create_redis(('localhost', 6379), loop=loop)
    await redis.set('key', 'hello world')
    val = await redis.get('key')
    print(val)
    redis.close()
    await redis.wait_closed()

loop.run_until_complete(main())
                

aiomcache


import asyncio
import aiomcache

loop = asyncio.get_event_loop()


async def main():
    mc = aiomcache.Client('127.0.0.1', 11211, loop=loop)
    await mc.set(b'key', b'hello world')
    value = await mc.get(b'key')
    print(value)

loop.run_until_complete(main())
                

aiopg


import asyncio
import aiopg
from speakers import SPEAKERS

dsn = 'dbname=pythonday user=pythonday password=pythonday host=127.0.0.1'


async def get_pool():
    return await aiopg.create_pool(dsn)


async def create_table():
    pool = await get_pool()
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute('DROP TABLE IF EXISTS speakers')
            await cur.execute('CREATE TABLE speakers (id serial PRIMARY KEY, name varchar(255))')
            for speaker in SPEAKERS:
                await cur.execute('INSERT INTO speakers (name) VALUES (%s)', (speaker,))


async def get_speakers():
    speakers = []
    pool = await get_pool()
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute('SELECT * FROM speakers')
            async for row in cur:
                speakers.append({'id': row[0], 'name': row[1]})
    return speakers


async def main():
    await create_table()
    speakers = await get_speakers()
    for speaker in speakers:
        print(speaker)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
                

aiopg sqlalchemy


import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa
from speakers import SPEAKERS

metadata = sa.MetaData()
speakers_table = sa.Table(
    'speakers',
    metadata,
    sa.Column('id', sa.Integer, primary_key=True),
    sa.Column('name', sa.String(255))
)


async def get_engine():
    return await create_engine(
        user='pythonday', database='pythonday', host='127.0.0.1', password='pythonday'
    )


async def create_table():
    engine = await get_engine()
    async with engine.acquire() as conn:
        await conn.execute('DROP TABLE IF EXISTS speakers')
        await conn.execute('CREATE TABLE speakers (id serial PRIMARY KEY, name varchar(255))')
        for speaker in SPEAKERS:
            await conn.execute(speakers_table.insert().values(name=speaker))


async def get_speakers():
    speakers = []
    engine = await get_engine()
    async with engine.acquire() as conn:
        async for row in conn.execute(speakers_table.select()):
            speakers.append({'id': row.id, 'name': row.name})
    return speakers


async def main():
    await create_table()
    speakers = await get_speakers()
    for speaker in speakers:
        print(speaker)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
                

pytest-asyncio


import pytest
import aiohttp
from github import REPOS, ACCESS_TOKEN


async def get_repo_info(repo_url):
    async with aiohttp.ClientSession() as session:
        async with session.get(repo_url, params={'access_token': ACCESS_TOKEN}) as response:
            response_data = await response.json()
            return {
                'name': response_data['name'],
                'full_name': response_data['full_name'],
                'stargazers_count': response_data['stargazers_count']
            }


@pytest.mark.asyncio
async def test_get_repo_info_1():
    repo_info = await get_repo_info(REPOS[0])
    assert 'name' in repo_info
    assert 'full_name' in repo_info
    assert 'stargazers_count' in repo_info


def test_get_repo_info_2(event_loop):
    repo_info = event_loop.run_until_complete(get_repo_info(REPOS[0]))
    assert 'name' in repo_info
    assert 'full_name' in repo_info
    assert 'stargazers_count' in repo_info
                

Perguntas?

Obrigado!