Neurohazard
暮雲煙月,皓首窮經;森羅萬象,如是我聞。

Python 调用 RabbitMQ 案例

wpadmin~September 2, 2018 /Software Engineering

Python RabbitMQ demo

参考资料

RabbitMQ “Hello World!” Python
https://www.rabbitmq.com/tutorials/tutorial-one-python.html

pika documentation
https://pika.readthedocs.io/en/0.10.0/intro.html

简要说明

原始需求大概如下:
有一个 Python 项目 和 Java 项目希望进行对接。
希望由 Java 调度 Python 进行 新建一个 扫描任务, Python 调用 Nessus 扫描,完成扫描后将结果回传给 Java 。

(还是有一些问题, Python 部分在同步处理扫描任务,有必要修改成异步模式)
异步模式的一个思路
Task Creator 只负责新建任务
Task Checker 不断遍历 Nessus 任务队列的元素,如果任务状态为完成,就回传给 Java, 如果任务状态为未完成,就重新添加到 Nessus 任务队列的末尾。

这里仅展示部分 RabbitMQ 相关的框架性代码。

rabbitmq_config.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/9/2 18:21
# @Author  : BLKStone
# @Site    : http://wp.blkstone.me
# @File    : rabbitmq_config.py
# @Software: PyCharm

import pika

# 说明
# 1 运行 demo_rabbitmq_consumer.py
# python demo_rabbitmq_consumer.py
# 2 运行 demo_java_receiver_simulator.py
# python demo_java_receiver_simulator.py
# 3 运行 demo_rabbitmq_producer.py
# python demo_rabbitmq_producer.py

# RabbitMQ 客户端
# 对 pika 的部分参数进行进一步封装
class RabbitMQClient(object):
    def __init__(self):
        # amqp://username:password@host:port/<virtual_host>[?query-string]
        self.rabbitmq_schema = 'amqp://admin:admin@localhost/vhost'
        self.task_queue_name = 'q_vhost_task'
        self.result_queue_name = 'q_vhost_result'

        # Consumer 配置项
        self.durable = True
        self.no_ack = True

        # Producer 配置项
        self.exchange = ''
        self.routing_key = ''

    # 获取 channel 和 connection 对象
    def get_channel_and_connection(self):
        parameters = pika.connection.URLParameters(self.rabbitmq_schema)
        connection = pika.BlockingConnection(parameters)
        channel = connection.channel()
        return channel, connection

demo_rabbitmq_consumer.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/9/2 14:49
# @Author  : BLKStone
# @Site    : http://wp.blkstone.me
# @File    : demo_rabbitmq_consumer.py
# @Software: PyCharm

import json
import requests

from rabbitmq_config import RabbitMQClient


# 下载远程文件(FastDFS)到本地
# https://www.zhihu.com/question/41132103
def download_from_FastDFS(url):
    r = requests.get(url)
    file_name = url.split('/')[-1]

    print("FastDFS URI: {uri}".format(uri=url))
    print("Local FileName: {path}".format(path=file_name))

    with open(file_name, "wb") as f:
        f.write(r.content)
    f.close()


# 将 json 字符串输出
def print_json_string(json_string):
    json_object = json.loads(json_string)
    for k in json_object:
        if type(json_object[k]) == type(u""):
            val = json_object[k].encode('gbk').strip()
        else:
            val = str(json_object[k])
        print("{key} : {value}".format(key=k, value=val))


# 发送消息给另一 (Java 监听的) 队列
def send_to_java(message_payload):
    rabbit_client = RabbitMQClient()
    channel, connection = rabbit_client.get_channel_and_connection()
    channel.basic_publish(exchange=rabbit_client.exchange, routing_key=rabbit_client.result_queue_name,
                          body=message_payload)
    print(" [x] [Python] Sent {info}".format(info=message_payload))
    connection.close()


# basic_consume 回调函数
def callback(ch, method, properties, body):
    print(" [x] [Python] Received %r" % body)
    # print_json_string(body)
    message = '{"task_id": 100001,' \
              '"task_name": "scan-201808",' \
              '"start_time": "1535883121",' \
              '"end_time": "1535883165"}'
    send_to_java(message)


def main_consume():
    rabbit_client = RabbitMQClient()
    channel, connection = rabbit_client.get_channel_and_connection()
    channel.queue_declare(queue=rabbit_client.task_queue_name,durable=rabbit_client.durable)
    channel.basic_consume(callback, queue=rabbit_client.task_queue_name,no_ack=rabbit_client.no_ack)

    print(' [*] [Python] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()


if __name__ == '__main__':
    main_consume()

demo_java_receiver_simulator.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/9/2 15:12
# @Author  : BLKStone
# @Site    : http://wp.blkstone.me
# @File    : demo_java_receiver_simulator.py
# @Software: PyCharm

from rabbitmq_config import RabbitMQClient


def java_callback(ch, method, properties, body):
    print(" [x] [Java] Received %r" % body)

# 模拟 Java 监听 结果(result)队列
def java_consume():
    rabbit_client = RabbitMQClient()
    channel, connection = rabbit_client.get_channel_and_connection()
    channel.queue_declare(queue=rabbit_client.result_queue_name,durable=rabbit_client.durable)
    channel.basic_consume(java_callback, queue=rabbit_client.result_queue_name,no_ack=rabbit_client.no_ack)

    print(' [*] [Java] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()


if __name__ == '__main__':
    java_consume()

demo_rabbitmq_producer.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/9/2 16:42
# @Author  : BLKStone
# @Site    : http://wp.blkstone.me
# @File    : demo_rabbitmq_producer.py
# @Software: PyCharm

from rabbitmq_config import RabbitMQClient


def sender():
    rabbit_client = RabbitMQClient()
    channel, connection = rabbit_client.get_channel_and_connection()
    json_string = '{"task_id": 100001,"task_name": "scan-201808","target": "127.0.0.1/24, 127.0.0.2/24"}'

    # channel.queue_declare(queue=rabbit_client.task_queue_name)
    channel.basic_publish(exchange=rabbit_client.exchange,
                          routing_key=rabbit_client.task_queue_name, body=json_string)
    print(" [x] Sent {info}".format(info=json_string))
    connection.close()

if __name__ == '__main__':
    sender()


其他补充

关于 no_ack
no_ack=True 时, 消费者收到消息后无需确认。
no_ack=False 时, 消费者收到消息后需要手动确认(ack),消息才会从队列中消失。

Leave a Reply

Your email address will not be published. Required fields are marked *