Python Brasil 2018
import requests
url = 'https://swapi.co/api/people/'
response = requests.get(url)
for person in response.json()['results']:
print(person['name']))
import asyncio
import aiohttp
async def get_people(session):
url = 'https://swapi.co/api/people/'
async with session.get(url) as response:
response_data = await response.json()
for person in response_data['results']:
print(person['name'])
async def main():
async with aiohttp.ClientSession() as session:
await get_people(session)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import requests
import status
from requests.exceptions import ConnectionError, Timeout
from .exceptions import ClientConnectionError, ClientError, ServerError
class API:
def __init__(self, timeout=5):
self.api_root_url = 'https://swapi.co/api'
self.session = requests.Session()
self.timeout = timeout
def _make_request(self, method, url, **kwargs):
request_method = getattr(self.session, method.lower())
try:
response = request_method(url, timeout=self.timeout, **kwargs)
except (ConnectionError, Timeout) as exc:
raise ClientConnectionError(exc)
if status.is_client_error(code=response.status_code):
raise ClientError(response)
if status.is_server_error(code=response.status_code):
raise ServerError(response)
return response
def people(self, page=1):
url = '{}/people/'.format(self.api_root_url)
params = {'page': page}
response = self._make_request('GET', url, params=params)
return response.json()
def people_detail(self, pk):
url = '{}/people/{}/'.format(self.api_root_url, pk)
response = self._make_request('GET', url)
return response.json()
from requests_example.api import API
from requests_example.exceptions import ClientError
api = API()
def get_people():
for page in range(1, 10):
response = api.people(page=page)
for p in response['results']:
print(p['name'])
def get_luke():
response = api.people_detail(1)
print(response)
def get_404():
try:
api.people_detail(9999)
except ClientError as exc:
response = exc.args[0]
print('ClientError, status_code={}, content={}'.format(response.status_code, response.text))
get_people()
get_luke()
get_404()
from simple_rest_client.resource import AsyncResource, Resource
people_actions = {
'list': {
'method': 'GET',
'url': 'people'
},
'retrieve': {
'method': 'GET',
'url': 'people/{}',
},
'schema': {
'method': 'GET',
'url': 'people/schema',
},
}
class PeopleAsyncResource(AsyncResource):
actions = people_actions
class PeopleResource(Resource):
actions = people_actions
from simple_rest_client.api import API
from . import resources
def get_api_instance(api_root_url='https://swapi.co/api/', timeout=5):
api = API(api_root_url=api_root_url, timeout=timeout)
api.add_resource(resource_name='people', resource_class=resources.PeopleResource)
return api
def get_async_api_instance(api_root_url='https://swapi.co/api/', timeout=5):
api = API(api_root_url=api_root_url, timeout=timeout)
api.add_resource(resource_name='people', resource_class=resources.PeopleAsyncResource)
return api
from simple_rest_client.exceptions import ClientError
from simple_rest_client_example.api import get_api_instance
api = get_api_instance()
def get_people():
for page in range(1, 10):
response = api.people.list(params={'page': page})
for p in response.body['results']:
print(p['name'])
def get_luke():
response = api.people.retrieve(1)
print(response.body)
def get_404():
try:
api.people.retrieve(9999)
except ClientError as exc:
print('ClientError, message={}, response={!r}'.format(exc.message, exc.response))
get_people()
get_luke()
get_404()
import asyncio
from simple_rest_client.exceptions import ClientError
from simple_rest_client_example.api import get_async_api_instance
api = get_async_api_instance()
async def _get_people(page):
response = await api.people.list(params={'page': page})
for p in response.body['results']:
print(p['name'])
async def get_people():
tasks = [_get_people(page) for page in range(1, 10)]
await asyncio.gather(*tasks)
async def get_luke():
response = await api.people.retrieve(1)
print(response.body)
async def get_404():
try:
await api.people.retrieve(9999)
except ClientError as exc:
print('ClientError, message={}, response={!r}'.format(exc.message, exc.response))
async def main():
# await asyncio.gather(get_people(), get_luke(), get_404())
await get_people()
await get_luke()
await get_404()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())