Python實現RabbitMQ中6種消息模型

RabbitMQ與Redis對比

​ RabbitMQ是一種比較流行的消息中間件,之前我一直使用redis作為消息中間件,但是生產環境比較推薦RabbitMQ來替代Redis,所以我去查詢了一些RabbitMQ的資料。相比於Redis,RabbitMQ優點很多,比如:

  1. 具有消息消費確認機制
  2. 隊列,消息,都可以選擇是否持久化,粒度更小、更靈活。
  3. 可以實現負載均衡

RabbitMQ應用場景

  1. 異步處理:比如用戶註冊時的確認郵件、短信等交由rabbitMQ進行異步處理
  2. 應用解耦:比如收發消息雙方可以使用消息隊列,具有一定的緩衝功能
  3. 流量削峰:一般應用於秒殺活動,可以控制用戶人數,也可以降低流量
  4. 日誌處理:將info、warning、error等不同的記錄分開存儲

RabbitMQ消息模型

​ 這裡使用 Pythonpika 這個庫來實現RabbitMQ中常見的6種消息模型。沒有的可以先安裝:

<code>pip install pika/<code>

1.單生產單消費模型:即完成基本的一對一消息轉發。

Python實現RabbitMQ中6種消息模型

<code># 生產者代碼
import pika


credentials = pika.PlainCredentials('chuan', '123')  # mq用戶名和密碼,沒有則需要自己創建
# 虛擬隊列需要指定參數 virtual_host,如果是默認的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
                                                               port=5672,
                                                               virtual_host='/',
                                                               credentials=credentials))

# 建立rabbit協議的通道
channel = connection.channel()
# 聲明消息隊列,消息將在這個隊列傳遞,如不存在,則創建。durable指定隊列是否持久化
channel.queue_declare(queue='python-test', durable=False)

# message不能直接發送給queue,需經exchange到達queue,此處使用以空字符串標識的默認的exchange
# 向隊列插入數值 routing_key是隊列名
channel.basic_publish(exchange='',
                      routing_key='python-test',
                      body='Hello world!2')
# 關閉與rabbitmq server的連接
connection.close()/<code>
<code># 消費者代碼
import pika

credentials = pika.PlainCredentials('chuan', '123')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
                                                               port=5672,
                                                               virtual_host='/',
                                                            credentials=credentials))
channel = connection.channel()
# 申明消息隊列。當不確定生產者和消費者哪個先啟動時,可以兩邊重複聲明消息隊列。
channel.queue_declare(queue='python-test', durable=False)
# 定義一個回調函數來處理消息隊列中的消息,這裡是打印出來
def callback(ch, method, properties, body):
    # 手動發送確認消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())
    # 告訴生產者,消費者已收到消息

# 告訴rabbitmq,用callback來接收消息
# 默認情況下是要對消息進行確認的,以防止消息丟失。
# 此處將auto_ack明確指明為True,不對消息進行確認。
channel.basic_consume('python-test',
                      on_message_callback=callback)
                      # auto_ack=True)  # 自動發送確認消息
# 開始接收信息,並進入阻塞狀態,隊列裡有信息才會調用callback進行處理
channel.start_consuming()/<code>

2.消息分發模型:多個收聽者監聽一個隊列。

Python實現RabbitMQ中6種消息模型

<code># 生產者代碼
import pika


credentials = pika.PlainCredentials('chuan', '123')  # mq用戶名和密碼
# 虛擬隊列需要指定參數 virtual_host,如果是默認的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
                                                               port=5672,
                                                               virtual_host='/',
                                                               credentials=credentials))

# 建立rabbit協議的通道
channel = connection.channel()
# 聲明消息隊列,消息將在這個隊列傳遞,如不存在,則創建。durable指定隊列是否持久化。確保沒有確認的消息不會丟失
channel.queue_declare(queue='rabbitmqtest', durable=True)

# message不能直接發送給queue,需經exchange到達queue,此處使用以空字符串標識的默認的exchange
# 向隊列插入數值 routing_key是隊列名
# basic_publish的properties參數指定message的屬性。此處delivery_mode=2指明message為持久的
for i in range(10):
    channel.basic_publish(exchange='',
                          routing_key='python-test',
                          body='Hello world!%s' % i,
                          properties=pika.BasicProperties(delivery_mode=2))
# 關閉與rabbitmq server的連接
connection.close()/<code>
<code># 消費者代碼,consume1與consume2
import pika
import time

credentials = pika.PlainCredentials('chuan', '123')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
                                                               port=5672,
                                                               virtual_host='/',
                                                               credentials=credentials))
channel = connection.channel()
# 申明消息隊列。當不確定生產者和消費者哪個先啟動時,可以兩邊重複聲明消息隊列。
channel.queue_declare(queue='rabbitmqtest', durable=True)
# 定義一個回調函數來處理消息隊列中的消息,這裡是打印出來
def callback(ch, method, properties, body):
    # 手動發送確認消息
    time.sleep(10)
    print(body.decode())
    # 告訴生產者,消費者已收到消息
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 如果該消費者的channel上未確認的消息數達到了prefetch_count數,則不向該消費者發送消息
channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接收消息
# 默認情況下是要對消息進行確認的,以防止消息丟失。
# 此處將no_ack明確指明為True,不對消息進行確認。
channel.basic_consume('python-test',
                      on_message_callback=callback)
                      # auto_ack=True)  # 自動發送確認消息
# 開始接收信息,並進入阻塞狀態,隊列裡有信息才會調用callback進行處理
channel.start_consuming()/<code>

3.fanout消息訂閱模式:生產者將消息發送到Exchange,Exchange再轉發到與之綁定的Queue中,每個消費者再到自己的Queue中取消息。

Python實現RabbitMQ中6種消息模型

<code># 生產者代碼
import pika


credentials = pika.PlainCredentials('chuan', '123')  # mq用戶名和密碼
# 虛擬隊列需要指定參數 virtual_host,如果是默認的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
                                                               port=5672,
                                                               virtual_host='/',
                                                               credentials=credentials))
# 建立rabbit協議的通道
channel = connection.channel()
# fanout: 所有綁定到此exchange的queue都可以接收消息(實時廣播)
# direct: 通過routingKey和exchange決定的那一組的queue可以接收消息(有選擇接受)
# topic: 所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息(更細緻的過濾)
channel.exchange_declare('logs', exchange_type='fanout')


#因為是fanout廣播類型的exchange,這裡無需指定routing_key
for i in range(10):
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body='Hello world!%s' % i)

# 關閉與rabbitmq server的連接
connection.close()/<code>
<code>import pika

credentials = pika.PlainCredentials('chuan', '123')
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',
                                                               port=5672,
                                                               virtual_host='/',
                                                               credentials=credentials))
channel = connection.channel()

#作為好的習慣,在producer和consumer中分別聲明一次以保證所要使用的exchange存在
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

# 隨機生成一個新的空的queue,將exclusive置為True,這樣在consumer從RabbitMQ斷開後會刪除該queue
# 是排他的。
result = channel.queue_declare('', exclusive=True)

# 用於獲取臨時queue的name
queue_name = result.method.queue

# exchange與queue之間的關係成為binding
# binding告訴exchange將message發送該哪些queue
channel.queue_bind(exchange='logs',
                   queue=queue_name)

# 定義一個回調函數來處理消息隊列中的消息,這裡是打印出來
def callback(ch, method, properties, body):
    # 手動發送確認消息
    print(body.decode())
    # 告訴生產者,消費者已收到消息
    #ch.basic_ack(delivery_tag=method.delivery_tag)

# 如果該消費者的channel上未確認的消息數達到了prefetch_count數,則不向該消費者發送消息
channel.basic_qos(prefetch_count=1)
# 告訴rabbitmq,用callback來接收消息
# 默認情況下是要對消息進行確認的,以防止消息丟失。
# 此處將no_ack明確指明為True,不對消息進行確認。
channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)  # 自動發送確認消息
# 開始接收信息,並進入阻塞狀態,隊列裡有信息才會調用callback進行處理
channel.start_consuming()/<code>

4.direct路由模式:此時生產者發送消息時需要指定RoutingKey,即路由Key,Exchange接收到消息時轉發到與RoutingKey相匹配的隊列中。

Python實現RabbitMQ中6種消息模型

<code># 生產者代碼,測試命令可以使用:python produce.py error 404error
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 聲明一個名為direct_logs的direct類型的exchange
# direct類型的exchange
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

# 從命令行獲取basic_publish的配置參數
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 向名為direct_logs的exchage按照設置的routing_key發送message
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

print(" [x] Sent %r:%r" % (severity, message))
connection.close()/<code>
<code># 消費者代碼,測試可以使用:python consume.py error
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 聲明一個名為direct_logs類型為direct的exchange
# 同時在producer和consumer中聲明exchage或queue是個好習慣,以保證其存在
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

# 從命令行獲取參數:routing_key
severities = sys.argv[1:]
if not severities:
    print(sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],))
    sys.exit(1)

for severity in severities:
    # exchange和queue之間的binding可接受routing_key參數
    # fanout類型的exchange直接忽略該參數。direct類型的exchange精確匹配該關鍵字進行message路由
    # 一個消費者可以綁定多個routing_key
    # Exchange就是根據這個RoutingKey和當前Exchange所有綁定的BindingKey做匹配,
    # 如果滿足要求,就往BindingKey所綁定的Queue發送消息
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))


channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)

channel.start_consuming()/<code> 

5.topic匹配模式:更細緻的分組,允許在RoutingKey中使用匹配符。

  • *:匹配一個單詞
  • #:匹配0個或多個單詞
Python實現RabbitMQ中6種消息模型

<code># 生產者代碼,基本不變,只需將exchange_type改為topic(測試:python produce.py rabbitmq.red 
# red color is my favorite
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 聲明一個名為direct_logs的direct類型的exchange
# direct類型的exchange
channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

# 從命令行獲取basic_publish的配置參數
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

# 向名為direct_logs的exchange按照設置的routing_key發送message
channel.basic_publish(exchange='topic_logs',
                      routing_key=severity,
                      body=message)

print(" [x] Sent %r:%r" % (severity, message))
connection.close()/<code>
<code># 消費者代碼,(測試:python consume.py *.red)
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 聲明一個名為direct_logs類型為direct的exchange
# 同時在producer和consumer中聲明exchage或queue是個好習慣,以保證其存在
channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

# 從命令行獲取參數:routing_key
severities = sys.argv[1:]
if not severities:
    print(sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],))
    sys.exit(1)

for severity in severities:
    # exchange和queue之間的binding可接受routing_key參數
    # fanout類型的exchange直接忽略該參數。direct類型的exchange精確匹配該關鍵字進行message路由
    # 一個消費者可以綁定多個routing_key
    # Exchange就是根據這個RoutingKey和當前Exchange所有綁定的BindingKey做匹配,
    # 如果滿足要求,就往BindingKey所綁定的Queue發送消息
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=severity)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))


channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)

channel.start_consuming()/<code> 

6.RPC遠程過程調用:客戶端與服務器之間是完全解耦的,即兩端既是消息的發送者也是接受者。

Python實現RabbitMQ中6種消息模型

<code># 生產者代碼
import pika
import uuid


# 在一個類中封裝了connection建立、queue聲明、consumer配置、回調函數等
class FibonacciRpcClient(object):
    def __init__(self):
        # 建立到RabbitMQ Server的connection
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        # 聲明一個臨時的回調隊列
        result = self.channel.queue_declare('', exclusive=True)
        self._queue = result.method.queue

        # 此處client既是producer又是consumer,因此要配置consume參數
        # 這裡的指明從client自己創建的臨時隊列中接收消息
        # 並使用on_response函數處理消息
        # 不對消息進行確認
        self.channel.basic_consume(queue=self._queue,
                                   on_message_callback=self.on_response,
                                   auto_ack=True)
        self.response = None
        self.corr_id = None

    # 定義回調函數
    # 比較類的corr_id屬性與props中corr_id屬性的值
    # 若相同則response屬性為接收到的message
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        # 初始化response和corr_id屬性
        self.corr_id = str(uuid.uuid4())

        # 使用默認exchange向server中定義的rpc_queue發送消息
        # 在properties中指定replay_to屬性和correlation_id屬性用於告知遠程server
        # correlation_id屬性用於匹配request和response
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self._queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   # message需為字符串
                                   body=str(n))

        while self.response is None:
            self.connection.process_data_events()

        return int(self.response)


# 生成類的實例
fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
# 調用實例的call方法
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)/<code> 
<code># 消費者代碼,這裡以生成斐波那契數列為例
import pika

# 建立到達RabbitMQ Server的connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 聲明一個名為rpc_queue的queue
channel.queue_declare(queue='rpc_queue')

# 計算指定數字的斐波那契數
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

# 回調函數,從queue接收到message後調用該函數進行處理
def on_request(ch, method, props, body):
    # 由message獲取要計算斐波那契數的數字
    n = int(body)
    print(" [.] fib(%s)" % n)
    # 調用fib函數獲得計算結果
    response = fib(n)

    # exchage為空字符串則將message發送個到routing_key指定的queue
    # 這裡queue為回調函數參數props中reply_ro指定的queue
    # 要發送的message為計算所得的斐波那契數
    # properties中correlation_id指定為回調函數參數props中co的rrelation_id
    # 最後對消息進行確認
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 只有consumer已經處理並確認了上一條message時queue才分派新的message給它
channel.basic_qos(prefetch_count=1)

# 設置consumeer參數,即從哪個queue獲取消息使用哪個函數進行處理,是否對消息進行確認
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")

# 開始接收並處理消息
channel.start_consuming()/<code>


分享到:


相關文章: