Neurohazard
暮雲煙月,皓首窮經;森羅萬象,如是我聞。

RabbitMQ pika.exceptions.ConnectionClosed

wpadmin~September 5, 2018 /Software Engineering

RabbitMQ pika.exceptions.ConnectionClosed

Contents

错误信息

pika.exceptions.ConnectionClosed: (-1, "error(10054, '')")

正文

如前文(link)所说,如果我们在 RabbitMQ/pika 基础消费 basic_consume 的 callback 中,执行其他耗时较长的任务 (比如执行一次 Nessus 扫描),可能丢失连接,在 callback 返回时出现 pika.exceptions.ConnectionClosed 异常。

在设计上,有一种规避的方案就是异步地执行耗时较长的任务。
但由于某些特殊原因,一定要以同步方式执行,又该怎么处理呢?

解决方案

根据
https://stackoverflow.com/questions/37321089/rabbitmq-pika-exceptions-connectionclosed/37528066

This is because you are keeping the main thread waiting, and because of this pika cannot handle incoming messages; in this case it cannot respond to the heartbeat until the subprocess is done. This causes RabbitMQ to think that the client is dead and forces a disconnection.

If you want this to work with heartbeats (which is recommend) you need to periodically call connection.process_data_events. This can be done by adding a loop that checks if the thread is done, and every 30s or so call process_data_events until the thread is done.

要求每 30s 调用一次 connection.process_data_events() documentation ref.

消费者

测试代码如下

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Time    : 2018/9/5 19:59
# Author  : BLKStone
# Site    : http://wp.blkstone.me
# File    : rabbitmq_long_wait_exception.py
# Software:  PyCharm

from rabbitmq_config import RabbitMQClient
import sys
import time

# 另一种写法
# 在 py2 中使用 py3 的 print 函数
# from __future__ import print_function
# print('.', end='')
# 但是感觉使用 sys 这种的兼容性更好(py2/py3 都可以执行)


def print_dot(total_seconds, dot_seconds=10, line_seconds=100):
    # total_seconds 总时间
    # dot_seconds 每个. 代表多少秒
    # line_seconds 每行 代表多少秒
    for i in range(total_seconds):
        if (i+1) % dot_seconds == 0:
            sys.stdout.write('.')
        if (i+1) % line_seconds == 0:
            sys.stdout.write('\n')
        time.sleep(1)


def keep_heartbeat(rabbitmq_channel, total_seconds, dot_seconds=10, line_seconds=100):
    connection = rabbitmq_channel._connection

    for i in range(total_seconds):
        if (i+1) % dot_seconds == 0:
            sys.stdout.write('.')
        if (i+1) % line_seconds == 0:
            sys.stdout.write('\n')
        if (i+1) % 30 == 0:
            connection.process_data_events()
        time.sleep(1)



def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # print_dot(600)
    keep_heartbeat(rabbitmq_channel=ch,total_seconds=600)



def demo_consume():
    rabbit_client = RabbitMQClient()
    channel, connection = rabbit_client.get_channel_and_connection()
    channel.queue_declare(queue=rabbit_client.task_queue_name, durable=rabbit_client.durable)
    channel.basic_consume(callback, queue=rabbit_client.task_queue_name, no_ack=rabbit_client.no_ack)

    # connection.is_open
    # print(dir(channel))
    # connection.process_data_events()
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()


if __name__ == '__main__':
    demo_consume()

对比的案例, 其中一种情况的 callback 如下, 未调用 connection.process_data_events(), 会抛出异常。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print_dot(600)

另一个方案则是定期调用 connection.process_data_events() 的情况

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    keep_heartbeat(rabbitmq_channel=ch,total_seconds=600)

生产者

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/9/2 16:42
# @Author  : BLKStone
# @Site    : http://wp.blkstone.me
# @File    : demo_rabbitmq_producer.py
# @Software: PyCharm

from rabbitmq_config import RabbitMQClient


def sender():
    rabbit_client = RabbitMQClient()
    channel, connection = rabbit_client.get_channel_and_connection()
    json_string = '{"task_id": 100001,"task_name": "scan-201808","target": "127.0.0.1/24, 127.0.0.2/24"}'

    # channel.queue_declare(queue=rabbit_client.task_queue_name)
    channel.basic_publish(exchange=rabbit_client.exchange,
                          routing_key=rabbit_client.task_queue_name, body=json_string)
    print(" [x] Sent {info}".format(info=json_string))
    connection.close()

if __name__ == '__main__':
    sender()

参考资料

1 pika.exceptions.ConnectionClosed 问题的解决
https://blog.csdn.net/csdn_am/article/details/79894662

2 RabbitMQ pika.exceptions.ConnectionClosed
https://stackoverflow.com/questions/37321089/rabbitmq-pika-exceptions-connectionclosed/37528066

3 其他 RabbitMQ/pika 异常解决的代码 / 供参考
https://programtalk.com/python-examples/pika.exceptions.ConnectionClosed/

4 RabbitMQ+Pika
https://thief.one/2017/04/06/RabbitMQ/

Leave a Reply

Your email address will not be published. Required fields are marked *