Tüketimi iptal etmek için basic_consume() iletilerini ve basic_cancel'i kullanıyorum, ancak bir sorun var. İşte Rabbitmq, pika python'da tüketimi hassas bir şekilde nasıl duraklatır ve devam ettirilir
_canceled listesine eklenirdef basic_consume(self, consumer_callback, queue='', no_ack=False,
exclusive=False, consumer_tag=None):
"""Sends the AMQP command Basic.Consume to the broker and binds messages
for the consumer_tag to the consumer callback. If you do not pass in
a consumer_tag, one will be automatically generated for you. Returns
the consumer tag.
For more information on basic_consume, see:
http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
:param method consumer_callback: The method to callback when consuming
:param queue: The queue to consume from
:type queue: str or unicode
:param bool no_ack: Tell the broker to not expect a response
:param bool exclusive: Don't allow other consumers on the queue
:param consumer_tag: Specify your own consumer tag
:type consumer_tag: str or unicode
:rtype: str
"""
self._validate_channel_and_callback(consumer_callback)
# If a consumer tag was not passed, create one
consumer_tag = consumer_tag or 'ctag%i.%s' % (self.channel_number,
uuid.uuid4().get_hex())
if consumer_tag in self._consumers or consumer_tag in self._cancelled:
raise exceptions.DuplicateConsumerTag(consumer_tag)
self._consumers[consumer_tag] = consumer_callback
self._pending[consumer_tag] = list()
self._rpc(spec.Basic.Consume(queue=queue,
consumer_tag=consumer_tag,
no_ack=no_ack,
exclusive=exclusive),
self._on_eventok,
[(spec.Basic.ConsumeOk,
{'consumer_tag': consumer_tag})])
return consumer_tag
def basic_cancel(self, callback=None, consumer_tag='', nowait=False):
"""This method cancels a consumer. This does not affect already
delivered messages, but it does mean the server will not send any more
messages for that consumer. The client may receive an arbitrary number
of messages in between sending the cancel method and receiving the
cancel-ok reply. It may also be sent from the server to the client in
the event of the consumer being unexpectedly cancelled (i.e. cancelled
for any reason other than the server receiving the corresponding
basic.cancel from the client). This allows clients to be notified of
the loss of consumers due to events such as queue deletion.
:param method callback: Method to call for a Basic.CancelOk response
:param str consumer_tag: Identifier for the consumer
:param bool nowait: Do not expect a Basic.CancelOk response
:raises: ValueError
"""
self._validate_channel_and_callback(callback)
if consumer_tag not in self.consumer_tags:
return
if callback:
if nowait is True:
raise ValueError('Can not pass a callback if nowait is True')
self.callbacks.add(self.channel_number,
spec.Basic.CancelOk,
callback)
self._cancelled.append(consumer_tag)
self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag,
nowait=nowait),
self._on_cancelok,
[(spec.Basic.CancelOk,
{'consumer_tag': consumer_tag})] if nowait is False else [])
ben tüketim consumer_tag iptal ediyorum her zaman görebileceğiniz gibi pika.channel
ait kodudur. Ve eğer bu etiketi basic_consume içinde tekrar kullanacaksam, duplicateConsumer istisnası yükseltilecektir. Her seferinde yeni bir customer_tag kullanabilirdim, ama aslında değilim. Er ya da geç üretilen etiket, bir öncekinin aynısı ile tam olarak eşleşeceğinden.Tüketimi pika'da incelikle nasıl durdurabilirim?
Pika modülünün ekli kodundan, istemci uygulamasının bir tüketici etiketi belirtmemesi durumunda bile, pika modülünün bir tane üreteceği ve onu tüketme isteğine dahil edeceği açıktır. – mike