RabbitMQ pika.exceptions.ConnectionClosed
Contents
错误信息
pika.exceptions.ConnectionClosed: (-1, "error(10054, '')")
正文
如前文(link)所说,如果我们在 RabbitMQ/pika 基础消费 basic_consume
的 callback 中,执行其他耗时较长的任务 (比如执行一次 Nessus 扫描),可能丢失连接,在 callback 返回时出现 pika.exceptions.ConnectionClosed
异常。
在设计上,有一种规避的方案就是异步地执行耗时较长的任务。
但由于某些特殊原因,一定要以同步方式执行,又该怎么处理呢?
解决方案
根据
https://stackoverflow.com/questions/37321089/rabbitmq-pika-exceptions-connectionclosed/37528066
This is because you are keeping the main thread waiting, and because of this pika cannot handle incoming messages; in this case it cannot respond to the heartbeat until the subprocess is done. This causes RabbitMQ to think that the client is dead and forces a disconnection.
If you want this to work with heartbeats (which is recommend) you need to periodically call connection.process_data_events. This can be done by adding a loop that checks if the thread is done, and every 30s or so call process_data_events until the thread is done.
要求每 30s 调用一次 connection.process_data_events()
documentation ref.
消费者
测试代码如下
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Time : 2018/9/5 19:59
# Author : BLKStone
# Site : http://wp.blkstone.me
# File : rabbitmq_long_wait_exception.py
# Software: PyCharm
from rabbitmq_config import RabbitMQClient
import sys
import time
# 另一种写法
# 在 py2 中使用 py3 的 print 函数
# from __future__ import print_function
# print('.', end='')
# 但是感觉使用 sys 这种的兼容性更好(py2/py3 都可以执行)
def print_dot(total_seconds, dot_seconds=10, line_seconds=100):
# total_seconds 总时间
# dot_seconds 每个. 代表多少秒
# line_seconds 每行 代表多少秒
for i in range(total_seconds):
if (i+1) % dot_seconds == 0:
sys.stdout.write('.')
if (i+1) % line_seconds == 0:
sys.stdout.write('\n')
time.sleep(1)
def keep_heartbeat(rabbitmq_channel, total_seconds, dot_seconds=10, line_seconds=100):
connection = rabbitmq_channel._connection
for i in range(total_seconds):
if (i+1) % dot_seconds == 0:
sys.stdout.write('.')
if (i+1) % line_seconds == 0:
sys.stdout.write('\n')
if (i+1) % 30 == 0:
connection.process_data_events()
time.sleep(1)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# print_dot(600)
keep_heartbeat(rabbitmq_channel=ch,total_seconds=600)
def demo_consume():
rabbit_client = RabbitMQClient()
channel, connection = rabbit_client.get_channel_and_connection()
channel.queue_declare(queue=rabbit_client.task_queue_name, durable=rabbit_client.durable)
channel.basic_consume(callback, queue=rabbit_client.task_queue_name, no_ack=rabbit_client.no_ack)
# connection.is_open
# print(dir(channel))
# connection.process_data_events()
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
demo_consume()
对比的案例, 其中一种情况的 callback 如下, 未调用 connection.process_data_events()
, 会抛出异常。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
print_dot(600)
另一个方案则是定期调用 connection.process_data_events()
的情况
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
keep_heartbeat(rabbitmq_channel=ch,total_seconds=600)
生产者
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/9/2 16:42
# @Author : BLKStone
# @Site : http://wp.blkstone.me
# @File : demo_rabbitmq_producer.py
# @Software: PyCharm
from rabbitmq_config import RabbitMQClient
def sender():
rabbit_client = RabbitMQClient()
channel, connection = rabbit_client.get_channel_and_connection()
json_string = '{"task_id": 100001,"task_name": "scan-201808","target": "127.0.0.1/24, 127.0.0.2/24"}'
# channel.queue_declare(queue=rabbit_client.task_queue_name)
channel.basic_publish(exchange=rabbit_client.exchange,
routing_key=rabbit_client.task_queue_name, body=json_string)
print(" [x] Sent {info}".format(info=json_string))
connection.close()
if __name__ == '__main__':
sender()
参考资料
1 pika.exceptions.ConnectionClosed 问题的解决
https://blog.csdn.net/csdn_am/article/details/79894662
2 RabbitMQ pika.exceptions.ConnectionClosed
https://stackoverflow.com/questions/37321089/rabbitmq-pika-exceptions-connectionclosed/37528066
3 其他 RabbitMQ/pika 异常解决的代码 / 供参考
https://programtalk.com/python-examples/pika.exceptions.ConnectionClosed/
4 RabbitMQ+Pika
https://thief.one/2017/04/06/RabbitMQ/
Leave a Reply