RabbitMQのメッセージ送受信をpythonで実装してみる

本サイトは広告収入およびアフィリエイト収益を受けております。

スポンサーリンク
スポンサーリンク

お疲れ様です。きざきまるおです。

今回はRabbitMQのメッセージ送受信をpythonで実装するサンプルを詳細します。

それではどうぞ。

RabbitMQインストール

こちらのURLに一通り手順が書いているので参考にしてみてください。

UbuntuにRabbitMQをインストールする方法/手順について
お疲れ様です。きざきまるおです。 今回はUbuntuにRabbitMQをインストールする方法/手順についてまとめてこうと思います。今更MQ?と思う方もいると思いますが、AWSのSQSなどシステムが疎結合になるにつれ、メッセージサーバーの役割
新しいタブで開く)

メッセージ送信

pythonでのメッセージ送信サンプルになります。
公式サイトのサンプルコードを一部変更した内容となっています。
まずはすべてのコードを載せてしまいます。

import pika

credentials = pika.PlainCredentials('test', 'test')

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.56.105', 5672, '/', credentials))

channel = connection.channel()

channel.exchange_declare(exchange='sample', exchange_type='fanout')

message = 'fanout sample'

channel.basic_publish(exchange='sample', routing_key='', body=message)

connection.close()

ブロックごと解説します。

import pika

こちらはRabbitMQのメッセージ送受信をするためのライブラリをインポートしてます。
pythonにこのライブラリを事前にインストールしておきましょう。

credentials = pika.PlainCredentials('test', 'test')

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.56.105', 5672, '/', credentials))

channel = connection.channel()

RabbitMQに接続してコネクションを張るコードです。
credentialの部分はRabbitMQにユーザーを追加している場合はないと接続エラーとなってしまいますので、デフォルトユーザー以外のユーザーを追加している場合は必ず入れましょう。

channel.exchange_declare(exchange='sample', exchange_type='fanout')

exchangeを作成しています。
RabbitMQの特性として、exchangeのタイプに応じて対応するqueueにメッセージを送信します。
サンプルではfanoutを指定しているので、exchangeに紐づくqueueすべてに同様のメッセージを送信する動きになります。

message = 'fanout sample'

channel.basic_publish(exchange='sample', routing_key='', body=message)

connection.close()

メッセージ送信です。
exchangeは作成したものを指定し、routing_keyはqueueの名前やタイプに応じた条件を入力するのですが、今回はfanoutなので指定せず、すべてに送信するようにしています。

最後にコネクションを切断するのを忘れないようにしましょう。

メッセージ受信

pythonでのメッセージ受信サンプルになります。
こちらも公式サイトのサンプルコードを一部変更した内容となっています。
すべてのコードを載せてしまいます。

import pika

credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.56.105', 5672, '/', credentials))

channel = connection.channel()

channel.exchange_declare(exchange='sample', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='sample', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(f" [x] {body}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

前半の解説は割愛します。

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

受信側はqueueと紐づける必要があるので、queueを作成しています。
queueを空白にすることで自動で一意の文字列が設定されます。
exclusive=Trueを指定することでセッションを切断したときに自動でqueueが削除されるようになります。

後ほどexchangeとqueueを明示的に紐づける必要があるので、変数に自動でつけられたqueueの名前を格納しておきます。

channel.queue_bind(exchange='sample', queue=queue_name)

exchangeとqueueを紐づけています。
このようにすることでsampleという名前のexchangeにqueueが紐づけられ、fanoutで送られるメッセージが受信できるようになります。

def callback(ch, method, properties, body):
    print(f" [x] {body}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

メッセージを受信した際のコールバック関数作成と受信開始をしています。
今更ですが、RabbitMQではメッセージ送信する人をパブリッシャー、受信する人をコンシューマーというので、関数名にconsumeが入っています。

それではまた。

タイトルとURLをコピーしました