Форум сайта python.su
Дорброго времени суток.
Только начал работать с rabbit (использую kombu), и столкнулся с проблемой при чтении сообщений из очереди.
Логика такова, что прежде, чем удалять сообщения из очереди, нужно провести ряд проверок и ковертаций в полученных данных, и только в случае успеха, можно грохнуть очередь.
Но я вижу, что позволяющий перейти к следующему сообщению метод подтверждения ack() сразу удаляет сообщение.
Делаю так.
class RabbitMQProcessConsumer(ConsumerMixin): def __init__( self, host, user, password, exchange, queue, vhost, insist, entity): self.host = host self.user = user self.password = password self.exchange_name = exchange self.queue_name = queue self.vhost = vhost self.insist = insist self.logger = logging.getLogger('graylog') def get_consumers(self, Consumer, channel): return [Consumer( queues=[self.queue], on_message=self.on_request, # callback prefetch_count=1, )] def __enter__(self): self.connection = Connection( hostname=self.host, userid=self.user, password=self.password, virtual_host=self.vhost, insist=self.insist ) try: self.connection.connect() except (OSError, Exception) as e: self.logger.log(**{ 'msg': str(e), 'level': logging.ERROR, }) raise RabbitMQError(e) self.exchange = Exchange( name=self.exchange_name, type='topic', durable=True ) self.queue = Queue( name=self.queue_name, exchange=self.exchange, routing_key=self.queue_name ) return self def on_request(self, message): print(str(message.body)) # как бы так не удалять message.ack() def __exit__(self, exc_type, exc_val, exc_tb): self.connection.release()
Офлайн