Найти - Пользователи
Полная версия: Как оседлать asyncio?
Начало » Python для новичков » Как оседлать asyncio?
1
SergeyChmutov
Привте форумчане!
Что-то не получается оседлать asyncio в связке с pyramid. С asyncio знаком ровно пол дня, поэтому вероятно есть какая-то тонкость с event loop. Вот суть проблемы:
Необходимо проверять доступность ресурсов. Доступ к ресурсам может быть как по HTTP так и сокеты. Опять же список ресурсов может быть очень большим, поэтому и решил их опрашивать ассинхронно.

Есть вьюха на пирамиде к которой я обращаюсь:
     def collection_get(self) -> dict:
        plugins = self.db.query(Plugin.filename, Plugin.urlcheck).all()
        urls = [plugin.urlcheck for plugin in plugins]
        loop = asyncio.get_event_loop()
        status = loop.run_until_complete(get_url_status(urls))
        return status

И есть к ней контроллер, который и проводит опрос ресурсов:
 import asyncio
import requests
async def get_url_status(urls: list) -> dict:
    resource_status = await asyncio.gather(*[get_resource_status(url) for url in urls])
    return {url: status for url, status in resource_status}
async def get_resource_status(url: str) -> tuple:
    req = requests.request('OPTIONS', url)
    return url, req.status_code

При попытке обратиться к этой вьюхе выхватываю ошибку: RuntimeError: There is no current event loop in thread ‘waitress’.

С трейслогом:
 2019-08-06 11:28:12,620 ERROR [web_if.exceptions.controllers.exceptions:22][waitress]   File ".../views/settings/status.py", line 43, in collection_get
    loop = asyncio.get_event_loop()
2019-08-06 11:28:12,620 ERROR [web_if.exceptions.controllers.exceptions:22][waitress]   File "/usr/lib/python3.5/asyncio/events.py", line 671, in get_event_loop
    return get_event_loop_policy().get_event_loop()
2019-08-06 11:28:12,620 ERROR [web_if.exceptions.controllers.exceptions:22][waitress]   File "/usr/lib/python3.5/asyncio/events.py", line 583, in get_event_loop
2019-08-06 11:28:12,620 ERROR [web_if.exceptions.controllers.exceptions:22][waitress] RuntimeError: There is no current event loop in thread 'waitress'.


При этом, если я контроллер запущу, как отдельный скрипт, все работает как-надо:
 if __name__ == '__main__':
    urls = [...]
    loop = asyncio.get_event_loop()
    status = loop.run_until_complete(get_uri_status(urls))
SergeyChmutov
Кажется разобрался.
Дело в том, что первый вызов asyncio.get_event_loop() создаёт новый цикл событий только в главном потоке, а в любом другом выбрасывает исключение. А так как он работает в связке с пирамидой, то вероятно работает не в главном потоке.
И в этом случае необходимо самому создавать новый цикл событий:
 loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
JOHN_16
Возможно, я не вникал.
Но вот точно что я увидел - что никакой асинхронности в итоге у Вас нету. Дело в том что в корутине get_resource_status вы нигде не делаете await и используете синхронный запрос (requests.request). То есть просто не существует места где бы выполнение приостановилось в ожидании IO операции и переключилось на выполнение другой корутины. Используйте например клиентскую часть aiohttp что бы делать асинхронные http запросы.
SergeyChmutov
JOHN_16
Возможно, я не вникал.Но вот точно что я увидел - что никакой асинхронности в итоге у Вас нету. Дело в том что в корутине get_resource_status вы нигде не делаете await и используете синхронный запрос (requests.request). То есть просто не существует места где бы выполнение приостановилось в ожидании IO операции и переключилось на выполнение другой корутины. Используйте например клиентскую часть aiohttp что бы делать асинхронные http запросы.
Я так понял, что здесь
 resource_status = await asyncio.gather(*[get_resource_status(url) for url in urls])
как раз я передаю элементы списка и не жду результат выполнения каждого элемента

Про aiohttp я думал, но если правильно посмотрел он не поддерживает подключение через сокеты, которые мне тоже надо проверять. Хотя у него есть поддержка веб-сокетов, возможно этой подойдет. Но предполагаю, в дальнейшем могут добавиться еще протоколы.

Насколько понял в корутине мне необходимо создавать еще одну корутину, которая выполняет именно запрос по http?
  import asyncio
import requests
async def get_url_status(urls: list) -> dict:
    resource_status = await asyncio.gather(*[get_resource_status(url) for url in urls])
    return {url: status for url, status in resource_status}
async def get_resource_status(url: str) -> tuple:
    req = await http_request(url)
    return url, req.status_code
async def http_request(url):
    return requests.request('OPTIONS', url)

Хотя, как мне кажется это эквивалентно.
JOHN_16
Вы явно не понимаете как в целом работает асинхронность через asyncio. Вам надо восполнить пробелы в базовых знаниях. Иначе это все не заработает.
У Вас сейчас типичная ошибка новичка, которую буквально на днях сделал мой коллега - Вы придумываете как должно работать, на основании какого то вашего опыта, вместо того что бы понимать как оно фактически работает.
SergeyChmutov
Про aiohttp я думал, но если правильно посмотрел он не поддерживает подключение через сокеты, которые мне тоже надо проверять.
SergeyChmutov
Но предполагаю, в дальнейшем могут добавиться еще протоколы.
А вы хотите найти 1 комбайн который делает все? Ну возможно такой и есть. Но в целом можно для каждого протокола/группы протоколов иметь свой обработчик.
SergeyChmutov
Хотя у него есть поддержка веб-сокетов, возможно этой подойдет
Сокеты и веб-сокеты все таки разные вещи в вашем аспекте.
SergeyChmutov
JOHN_16
Вы явно не понимаете как в целом работает асинхронность через asyncio.
Да, вы абсолютно правы, хоть я и нашел достаточно много материала по этой теме, но что-то не совсем пойму, как необходимо орагизовывать код. Статьи, которые находи были по разным версиям asyncio и везде по разному реализуется код.

Ниже я написал, то что понял. И если await asyncio.sleep(10) останавливает сопрограмму на 10 секунд, то вероятно этот код работает ассинхронно т.к. при увличении количества задач event loop не увеличивает время выполнения всего кода на пропорциональное количество секунд задержки.

 async def get_resource_status(name: str, resource: str) -> tuple:
    req = requests.request('OPTIONS', resource)
    await asyncio.sleep(10)
    return name, req.status_code
def poll(plugins: dict):
    loop = asyncio.new_event_loop()  # создаем новый поток
    asyncio.set_event_loop(loop)
    for prot, url in plugins.items():
        loop.create_task(get_resource_status(prot, url))
    pending = asyncio.Task.all_tasks(loop=loop)
    resource_status = asyncio.gather(*pending)
    status = loop.run_until_complete(resource_status)
    loop.close()
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB