We are working on a project that uses rabbitMQ as message brocker as backbone using pika. We publish a message at every 0.5 seconds using a publisher running in a computer on the same network. A consumer runs on another system in the same network to consume the message. Unfortunately, I face latency in publishing the messages to the MQ after running the publisher and subscriber continuously for over 10 minutes. Publishing a message to the MQ occasionally takes a couple of seconds (even 30 seconds a couple of times). I have isolated this issue using small scripts to publish and subscribe.Observatons

  1. This latency is not observed when the publisher and subscriber runs on the same system.
  2. This latency is not observed even when channel.confirm_delivery() is not set.
  3. This issue is observed irrespective of the specs of the system. We tried this on several systems including a HP OMEN with corei7 cpu.
  4. The network connectivity is also good and we were able to reproduce this issue using different networks at different places.

Publisher code:

import pikaimport jsonimport timePUBLISH_FREQUENCY = 2.0PUBLISH_TIMEOUT = 0.5 #SecondsMQ_IP_ADDRESS = #MQ IP ADDRESSEXCHANGE = "test3" EXCHANGE_TYPE = "topic"ROUTING_KEY = "ag.ack"QUEUE = ROUTING_KEYfeedback = {"task": {"start_time": 1655203648.916807, "id": 135, "end_time": None, "marker_scanned_a": None, "name_user": "Z_Z", "actual_duration": None, "status_work": "Progress", "message": None}, "consumer_identifier": "W1", "worker_availability": "In_progress", "worker_battery_parameter": {"battery_capacity": 1.0, "temp": 12.586999893188477}}params = pika.ConnectionParameters(heartbeat=60, blocked_connection_timeout=60, host=MQ_IP_ADDRESS)print('Connecting to MQ at IP '+MQ_IP_ADDRESS +' '+ ROUTING_KEY)connection = pika.BlockingConnection(params)channel = connection.channel()# channel.queue_declare(queue=QUEUE)channel.exchange_declare(exchange=EXCHANGE,exchange_type=EXCHANGE_TYPE)# channel.queue_bind(exchange=EXCHANGE, queue=QUEUE,routing_key=ROUTING_KEY)channel.confirm_delivery()print('Connected to MQ at IP '+MQ_IP_ADDRESS+' '+ ROUTING_KEY)count = 0while True:feedback['task']['id'] = countmessage = json.dumps(feedback)count = count + 1start_time = time.time() a= channel.basic_publish(exchange=EXCHANGE,routing_key=ROUTING_KEY,body=message)end_time = time.time()duration = end_time - start_timeif end_time-start_time > PUBLISH_TIMEOUT:print('Time taken to send the message is '+str(duration)+'. For message number '+str(count) + ' . Resetting the count to zero.')count = 0time.sleep(1/PUBLISH_FREQUENCY)channel.close()

Consumer code

import pikaimport timeimport jsonMQ_IP_ADDRESS = '127.0.0.1'EXCHANGE = "test3" EXCHANGE_TYPE = "topic"ROUTING_KEY = "ag.ack"QUEUE = ROUTING_KEYparams = pika.ConnectionParameters(heartbeat=600, blocked_connection_timeout=300, host=MQ_IP_ADDRESS)try:connection = pika.BlockingConnection(params)print('connected')except Exception as e:print(e)time.sleep(20)print("waiting to reconnect")connection = pika.BlockingConnection(params)# connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.exchange_declare(exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)channel.queue_declare(queue=QUEUE, durable=False)channel.queue_bind(exchange=EXCHANGE, queue=QUEUE,routing_key=ROUTING_KEY)print(' Dummy AOS receiver...')print('[*] Waiting for messages from Agent. To exit press CTRL+C')def callback(ch, method, properties, body):payload = body.decode()message = json.loads(payload)msg_counter = message["task"]["id"]time.sleep(0.5)ch.basic_ack(delivery_tag=method.delivery_tag)print('received '+str(msg_counter))channel.basic_consume(queue=QUEUE, on_message_callback=callback,)channel.start_consuming()
1

Best Answer


Any time you report an issue with any software, not just RabbitMQ, you must report the versions of the software you are using. In this case, I'm assuming that you are using the following versions, since they are the most recent:

  • Erlang 25.0.2
  • RabbitMQ 3.10.5
  • Pika 1.2.1
  • Python 3.10.5

Your Python code uses time.sleep, which blocks Pika's I/O loop. You should switch to using the connection.sleep method instead.

You also set the heartbeat to 600 seconds, which is not necessary.

Finally, by using BlockingConnection and confirm_delivery(), your code is using synchronous publisher confirmations. Instead, you should be using SelectConnection and process confirmations asynchronously. You would keep a dict of outstanding confirmations and then mark those messages as confirmed when the confirmation callback is called. Note that RabbitMQ may confirm messages in batches.

Please see this example.

This is an interesting enough situation that if you'd like further assistance, share your complete, runnable code on GitHub and post a message in the Pika discussions and we can continue there.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.