Есть код простенького watchdog (часть избыточных телеодвижений - результат попытки разобраться):
class Watchdog: def __init__(self, timeout: float, callback: Callable[[], Any], name:str = None, log: Logger = None): if timeout <= 0: raise ValueError() self.timeout: Final = timeout self.callback: Final = callback self.event: Final = Event() self.task: Optional[Task] = None self.task_waiting: Optional[Task] = None self.wd_name: Optional[str] = name self.log = log self.make_call = False def is_running(self) -> bool: return (self.task is not None) def start(self): if not self.is_running(): self.task = create_task(self.wd_task(), name = self.wd_name) self.wd_name = self.task.get_name() if self.log!= None: self.log.debug(f'{self.wd_name} start. timeout:{self.timeout}') else: self.feed() if self.log!= None: self.log.debug(f'{self.wd_name} REstart. timeout:{self.timeout}') async def do_stop(self, msg=''): self.make_call = False if self.is_running(): if (self.task_waiting is not None) and (not self.task_waiting.done()) : self.task_waiting.cancel() self.task_waiting = None if (self.task is not None) and (not self.task.done()) : self.task.cancel() self.task = None if self.log!= None: self.log.debug(f'{self.wd_name} stop: {msg}') else: if self.log!= None: self.log.debug(f'{self.wd_name} trying to stop not running: {msg}') async def cancel(self): await self.do_stop(msg='CANCEL') async def stop(self): await self.do_stop(msg='STOP') def feed(self): if self.is_running(): self.event.set() if self.log!= None: self.log.debug(f'{self.wd_name}: FEED') async def wd_task(self): try: self.make_call = True while True: try: self.event.clear() self.task_waiting = create_task(self.event.wait()) #, name=f'{self.wd_name}_waiting') await asyncio.wait_for(self.task_waiting, self.timeout) continue except CancelledError: if self.log!= None: self.log.debug(f'{self.wd_name} cancel waiting') raise except TimeoutError: if self.log!= None: self.log.debug(f'{self.wd_name} timeout') # callback only once! if self.make_call: self.make_call = False if self.log!= None: self.log.debug(f'{self.wd_name} callback') await self.callback() finally: self.event.clear() if (self.task_waiting is not None) and (not self.task_waiting.done()): await self.task_waiting.cancel() self.task_waiting = None break finally: self.task = None
В тестовом примере он нормально отрабатывает.
async def do_callback(): log.debug('--------------------callback result') async def a_main(): wd = Watchdog(3, do_callback, name='test_wd', log=log) wd.start() for i in range(0,4): log.debug(f'loop:{i}') await asyncio.sleep(1.0) wd.feed() await asyncio.sleep(2.0) wd.stop() while True: await asyncio.sleep(1.0) if __name__ == "__main__": log.debug('aaaa') loop = asyncio.get_event_loop_policy().new_event_loop() asyncio.set_event_loop(loop) loop.create_task(a_main()) loop.run_forever()
Но в основном коде во время отработки wd.stop() он не пишет “…. cancel waiting” (т.е. не проходит CancelledError) и после выдержки соответствующей паузы происходит срабатывание по таймауту (пишет: “… timeout”). Т.е. task_waiting не заканселилась, а нормально доработала и вышла по таймауту.
asyncio.sleep(0) уже добавлял. shield не использую нигде. Подскажите, плиз, куда стоит посмотреть?