Только начал работать с rabbit (использую kombu), и столкнулся с проблемой при чтении сообщений из очереди.
Логика такова, что прежде, чем удалять сообщения из очереди, нужно провести ряд проверок и ковертаций в полученных данных, и только в случае успеха, можно грохнуть очередь.
Но я вижу, что позволяющий перейти к следующему сообщению метод подтверждения ack() сразу удаляет сообщение.
Делаю так.
class RabbitMQProcessConsumer(ConsumerMixin): def __init__( self, host, user, password, exchange, queue, vhost, insist, entity): self.host = host self.user = user self.password = password self.exchange_name = exchange self.queue_name = queue self.vhost = vhost self.insist = insist self.logger = logging.getLogger('graylog') def get_consumers(self, Consumer, channel): return [Consumer( queues=[self.queue], on_message=self.on_request, # callback prefetch_count=1, )] def __enter__(self): self.connection = Connection( hostname=self.host, userid=self.user, password=self.password, virtual_host=self.vhost, insist=self.insist ) try: self.connection.connect() except (OSError, Exception) as e: self.logger.log(**{ 'msg': str(e), 'level': logging.ERROR, }) raise RabbitMQError(e) self.exchange = Exchange( name=self.exchange_name, type='topic', durable=True ) self.queue = Queue( name=self.queue_name, exchange=self.exchange, routing_key=self.queue_name ) return self def on_request(self, message): print(str(message.body)) # как бы так не удалять message.ack() def __exit__(self, exc_type, exc_val, exc_tb): self.connection.release()
Конечно, можно запихнуть какой-то обработчик между получение body и ack() конкретного сообщения, но хотелось бы обработать/проверить их все сразу, т.к. логика у них однообразная и к тому же процессинг затратен.
Но возможно существует стандартный метод прочитать все (на данный момент времени) сообщения из очереди, а только потом, при желании удалить их?
Заранее спасибо за ответы и возможные советы.