Форум сайта python.su
Добрый день.
Делаю проектик на Python и у меня возникли некоторые вопросы как правильно организовать программу.
Можно сказать, что я новичок в Python.
Суть в следующем. Есть задача, которая работает с файлами на диске, читает их, обрабатывает данные и отправляет в сеть по UDP. Делает она это периодически N раз в секунду. Через MQTT приходят команды типа остановить, читать определенный файл и т.д.
Буду использовать библиотеку asyncio. Полного понимания как с ней работать пока нет. Как асинхронно получать данные через MQTT я разобрался.
А вот с остальным пока не понятно. Вопрос именно как организовать структуру программы, где какие таски (в терминологии asyncio) использовать, как передавать команды из MQTT таску который работает с файлами и как организовать периодическую его работу.
Т.е. n-ное количество раз в секунду считывает данные из файла, обрабатывает их отправляет по сети. Асинхронно может прилететь команда на остановку, запуск или чтение другого файла.
Офлайн
Для начала предложу наверное одну из самых типовых схем, не претендующую на оптимальную и тп. Я просто предложу какое нибудь решение, а остальные участники форума подтянутся позже.
Вся ваша программа это event loop из asyncio, в которую помещаются таски через run_until_complete, и которая работает бесконечно по средством loop.run_forever().
Предположим что внутри вашей программы есть “текущие настройки”, в которых имя считываемого файла и тп.
Воспользоваться очередью asyncio.Queue для взаимодействия между тасками.
Запускать будут такие таски:
Таска1: считывает через MQTT команды и правит текущие настройки
Таска2: берет задачу из очереди и обрабатывает ее (считывает файл и отправляет по сети)
Таска3: в цикле с asyncio.sleep() периодически формирует сообщение-задачу на основе текущих настроек (имя файла и прочие параметры для обработчика очереди) которую посылает в очередь
Офлайн
Спасибо за подсказку, дало пищу для размышления. 2 дня ковырял документацию и родил такой набросок.
import asyncio from time import time def timer_gen(): t0 = time() while True: yield round(time()-t0, 3) async def queue_putter(queue, timer): i=0 while True: await asyncio.sleep(1) print (f'{next(timer)}: put item {i}') await queue.put(i) i += 1 async def main_task(val, timer): for i in range(3): print(f'{next(timer)}: main task {val, i}') await asyncio.sleep(0.2) async def manager(): queue = asyncio.Queue(maxsize=10) timer = timer_gen() asyncio.create_task(queue_putter(queue,timer)) print (f'{next(timer)}: start') get = asyncio.create_task(queue.get()) pending = {get} mt = None while True: done, pending = await asyncio.wait(pending, return_when = asyncio.FIRST_COMPLETED) print(f'{next(timer)}: wait compleete') if get in done: res = get.result() print(f' - receiced {res}') get = asyncio.create_task(queue.get()) if res%3 == 0: # для примера for t in pending: t.cancel() pending=set() if res%2 == 0: # для примера mt = asyncio.create_task(main_task(res, timer)) pending.add(mt) pending.add(get) if mt in done: print (f' - main_task is done') if __name__ == "__main__": asyncio.run(manager())
Отредактировано iZvezdin (Июнь 13, 2019 13:12:17)
Офлайн