PythonDay Campina Grande 2017
Allisson Azevedo
Como lidar com 10k conexões simultâneas
Asynchronous I/O
O exemplo do garçom
import time
import requests
from github import REPOS, ACCESS_TOKEN
start = time.time()
for repo_url in REPOS:
response = requests.get(repo_url, params={'access_token': ACCESS_TOKEN}).json()
repo_info = {
'name': response['name'],
'full_name': response['full_name'],
'stargazers_count': response['stargazers_count']
}
print(repo_info)
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
Uma requisição http por vez
import time
import threading
import queue
import requests
from github import REPOS, ACCESS_TOKEN
def grab_data_from_queue():
while not q.empty():
repo_url = q.get()
response = requests.get(repo_url, params={'access_token': ACCESS_TOKEN}).json()
repo_info = {
'name': response['name'],
'full_name': response['full_name'],
'stargazers_count': response['stargazers_count']
}
print(repo_info)
q.task_done()
max_threads = 5
start = time.time()
q = queue.Queue()
for repo_url in REPOS:
q.put(repo_url)
for i in range(max_threads):
thread = threading.Thread(target=grab_data_from_queue)
thread.start()
q.join()
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
Consumo de recursos
import time
import multiprocessing
import requests
from github import REPOS, ACCESS_TOKEN
def grab_data_from_queue():
while not q.empty():
repo_url = q.get()
response = requests.get(repo_url, params={'access_token': ACCESS_TOKEN}).json()
repo_info = {
'name': response['name'],
'full_name': response['full_name'],
'stargazers_count': response['stargazers_count']
}
print(repo_info)
q.task_done()
workers = 5
start = time.time()
q = multiprocessing.JoinableQueue()
for repo_url in REPOS:
q.put(repo_url)
for i in range(workers):
process = multiprocessing.Process(target=grab_data_from_queue)
process.start()
q.join()
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
Consumo de recursos
import time
from concurrent import futures
import requests
from github import REPOS, ACCESS_TOKEN
def get_repo_info(repo_url):
response = requests.get(repo_url, params={'access_token': ACCESS_TOKEN}).json()
repo_info = {
'name': response['name'],
'full_name': response['full_name'],
'stargazers_count': response['stargazers_count']
}
print(repo_info)
workers = 5
start = time.time()
with futures.ThreadPoolExecutor(workers) as executor:
executor.map(get_repo_info, REPOS)
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
ThreadPoolExecutor - usa threads
ProcessPoolExecutor - usa process
Python 3.4+
import asyncio
async def hello_world():
print('Hello World!')
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world())
import asyncio
async def hello_world(name):
print('Hello World, {}!'.format(name))
loop = asyncio.get_event_loop()
tasks = []
for name in ('fulano', 'cicrano', 'beltrano'):
task = asyncio.ensure_future(hello_world(name))
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
import time
import asyncio
import aiohttp
from github import REPOS, ACCESS_TOKEN
async def get_repo_info(repo_url):
async with aiohttp.ClientSession() as session:
async with session.get(repo_url, params={'access_token': ACCESS_TOKEN}) as response:
response_data = await response.json()
repo_info = {
'name': response_data['name'],
'full_name': response_data['full_name'],
'stargazers_count': response_data['stargazers_count']
}
print(repo_info)
start = time.time()
loop = asyncio.get_event_loop()
tasks = []
for repo_url in REPOS:
task = asyncio.ensure_future(get_repo_info(repo_url))
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
import time
import paco
import aiohttp
from github import REPOS, ACCESS_TOKEN
async def get_repo_info(repo_url):
async with aiohttp.ClientSession() as session:
async with session.get(repo_url, params={'access_token': ACCESS_TOKEN}) as response:
response_data = await response.json()
repo_info = {
'name': response_data['name'],
'full_name': response_data['full_name'],
'stargazers_count': response_data['stargazers_count']
}
print(repo_info)
start = time.time()
tasks = [get_repo_info(repo_url) for repo_url in REPOS]
paco.run(paco.wait(tasks, limit=10))
end = time.time()
print('Tempo de execução={:.2f} segundos'.format(end - start))
from aiohttp import web
async def handle(request):
return web.json_response({'message': 'Hello World'})
app = web.Application()
app.router.add_get('/', handle)
web.run_app(app, host='127.0.0.1', port=8080)
from sanic import Sanic
from sanic.response import json
app = Sanic()
@app.route('/')
async def test(request):
return json({'message': 'Hello World'})
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8080)
import asyncio
import aioredis
loop = asyncio.get_event_loop()
async def main():
redis = await aioredis.create_redis(('localhost', 6379), loop=loop)
await redis.set('key', 'hello world')
val = await redis.get('key')
print(val)
redis.close()
await redis.wait_closed()
loop.run_until_complete(main())
import asyncio
import aiomcache
loop = asyncio.get_event_loop()
async def main():
mc = aiomcache.Client('127.0.0.1', 11211, loop=loop)
await mc.set(b'key', b'hello world')
value = await mc.get(b'key')
print(value)
loop.run_until_complete(main())
import asyncio
import aiopg
from speakers import SPEAKERS
dsn = 'dbname=pythonday user=pythonday password=pythonday host=127.0.0.1'
async def get_pool():
return await aiopg.create_pool(dsn)
async def create_table():
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute('DROP TABLE IF EXISTS speakers')
await cur.execute('CREATE TABLE speakers (id serial PRIMARY KEY, name varchar(255))')
for speaker in SPEAKERS:
await cur.execute('INSERT INTO speakers (name) VALUES (%s)', (speaker,))
async def get_speakers():
speakers = []
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT * FROM speakers')
async for row in cur:
speakers.append({'id': row[0], 'name': row[1]})
return speakers
async def main():
await create_table()
speakers = await get_speakers()
for speaker in speakers:
print(speaker)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa
from speakers import SPEAKERS
metadata = sa.MetaData()
speakers_table = sa.Table(
'speakers',
metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(255))
)
async def get_engine():
return await create_engine(
user='pythonday', database='pythonday', host='127.0.0.1', password='pythonday'
)
async def create_table():
engine = await get_engine()
async with engine.acquire() as conn:
await conn.execute('DROP TABLE IF EXISTS speakers')
await conn.execute('CREATE TABLE speakers (id serial PRIMARY KEY, name varchar(255))')
for speaker in SPEAKERS:
await conn.execute(speakers_table.insert().values(name=speaker))
async def get_speakers():
speakers = []
engine = await get_engine()
async with engine.acquire() as conn:
async for row in conn.execute(speakers_table.select()):
speakers.append({'id': row.id, 'name': row.name})
return speakers
async def main():
await create_table()
speakers = await get_speakers()
for speaker in speakers:
print(speaker)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import pytest
import aiohttp
from github import REPOS, ACCESS_TOKEN
async def get_repo_info(repo_url):
async with aiohttp.ClientSession() as session:
async with session.get(repo_url, params={'access_token': ACCESS_TOKEN}) as response:
response_data = await response.json()
return {
'name': response_data['name'],
'full_name': response_data['full_name'],
'stargazers_count': response_data['stargazers_count']
}
@pytest.mark.asyncio
async def test_get_repo_info_1():
repo_info = await get_repo_info(REPOS[0])
assert 'name' in repo_info
assert 'full_name' in repo_info
assert 'stargazers_count' in repo_info
def test_get_repo_info_2(event_loop):
repo_info = event_loop.run_until_complete(get_repo_info(REPOS[0]))
assert 'name' in repo_info
assert 'full_name' in repo_info
assert 'stargazers_count' in repo_info