Форум сайта python.su
Доброго времени суток.
Есть код простенького watchdog (часть избыточных телеодвижений - результат попытки разобраться):
class Watchdog: def __init__(self, timeout: float, callback: Callable[[], Any], name:str = None, log: Logger = None): if timeout <= 0: raise ValueError() self.timeout: Final = timeout self.callback: Final = callback self.event: Final = Event() self.task: Optional[Task] = None self.task_waiting: Optional[Task] = None self.wd_name: Optional[str] = name self.log = log self.make_call = False def is_running(self) -> bool: return (self.task is not None) def start(self): if not self.is_running(): self.task = create_task(self.wd_task(), name = self.wd_name) self.wd_name = self.task.get_name() if self.log!= None: self.log.debug(f'{self.wd_name} start. timeout:{self.timeout}') else: self.feed() if self.log!= None: self.log.debug(f'{self.wd_name} REstart. timeout:{self.timeout}') async def do_stop(self, msg=''): self.make_call = False if self.is_running(): if (self.task_waiting is not None) and (not self.task_waiting.done()) : self.task_waiting.cancel() self.task_waiting = None if (self.task is not None) and (not self.task.done()) : self.task.cancel() self.task = None if self.log!= None: self.log.debug(f'{self.wd_name} stop: {msg}') else: if self.log!= None: self.log.debug(f'{self.wd_name} trying to stop not running: {msg}') async def cancel(self): await self.do_stop(msg='CANCEL') async def stop(self): await self.do_stop(msg='STOP') def feed(self): if self.is_running(): self.event.set() if self.log!= None: self.log.debug(f'{self.wd_name}: FEED') async def wd_task(self): try: self.make_call = True while True: try: self.event.clear() self.task_waiting = create_task(self.event.wait()) #, name=f'{self.wd_name}_waiting') await asyncio.wait_for(self.task_waiting, self.timeout) continue except CancelledError: if self.log!= None: self.log.debug(f'{self.wd_name} cancel waiting') raise except TimeoutError: if self.log!= None: self.log.debug(f'{self.wd_name} timeout') # callback only once! if self.make_call: self.make_call = False if self.log!= None: self.log.debug(f'{self.wd_name} callback') await self.callback() finally: self.event.clear() if (self.task_waiting is not None) and (not self.task_waiting.done()): await self.task_waiting.cancel() self.task_waiting = None break finally: self.task = None
async def do_callback(): log.debug('--------------------callback result') async def a_main(): wd = Watchdog(3, do_callback, name='test_wd', log=log) wd.start() for i in range(0,4): log.debug(f'loop:{i}') await asyncio.sleep(1.0) wd.feed() await asyncio.sleep(2.0) wd.stop() while True: await asyncio.sleep(1.0) if __name__ == "__main__": log.debug('aaaa') loop = asyncio.get_event_loop_policy().new_event_loop() asyncio.set_event_loop(loop) loop.create_task(a_main()) loop.run_forever()
Отредактировано artp (Апрель 7, 2023 12:29:57)
Офлайн
В общем, как-то странно срабатывает task.cancel() в комплекте с wait_for. Переделал whanchdog с использованием Future(). Примерно вот так:
def feed(self): if self.is_running(): if not self.future.done(): self.future.set_result(True) if self.log!= None: self.log.debug(f'{self.wd_name}: FEED') async def wd_task(self): self.make_call = False while True: self.future = Future() try: await asyncio.sleep(self.timeout) except CancelledError: if self.log!= None: self.log.debug(f'{self.wd_name} cancel waiting') break # raise if self.future.done() and self.future.result(): continue self.make_call = True if self.log!= None: self.log.debug(f'{self.wd_name} timeout') break # callback only once! if self.make_call: # not self.future.cancelled() and self.make_call = False if self.log!= None: self.log.debug(f'{self.wd_name} callback') await self.callback()
Отредактировано artp (Апрель 7, 2023 18:31:01)
Офлайн
В общем, при переходе на очередную версию питона (в моем случае на 3.11), полезно читать про нововведения. В конечном итоге переделал на диспетчер контекста asyncio.timeout()
Офлайн