お疲れ様です。きざきまるおです。
今回はRabbitMQのメッセージ送受信をpythonで実装するサンプルを詳細します。
それではどうぞ。
RabbitMQインストール
こちらのURLに一通り手順が書いているので参考にしてみてください。
メッセージ送信
pythonでのメッセージ送信サンプルになります。
公式サイトのサンプルコードを一部変更した内容となっています。
まずはすべてのコードを載せてしまいます。
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.56.105', 5672, '/', credentials))
channel = connection.channel()
channel.exchange_declare(exchange='sample', exchange_type='fanout')
message = 'fanout sample'
channel.basic_publish(exchange='sample', routing_key='', body=message)
connection.close()
ブロックごと解説します。
import pika
こちらはRabbitMQのメッセージ送受信をするためのライブラリをインポートしてます。
pythonにこのライブラリを事前にインストールしておきましょう。
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.56.105', 5672, '/', credentials))
channel = connection.channel()
RabbitMQに接続してコネクションを張るコードです。
credentialの部分はRabbitMQにユーザーを追加している場合はないと接続エラーとなってしまいますので、デフォルトユーザー以外のユーザーを追加している場合は必ず入れましょう。
channel.exchange_declare(exchange='sample', exchange_type='fanout')
exchangeを作成しています。
RabbitMQの特性として、exchangeのタイプに応じて対応するqueueにメッセージを送信します。
サンプルではfanoutを指定しているので、exchangeに紐づくqueueすべてに同様のメッセージを送信する動きになります。
message = 'fanout sample'
channel.basic_publish(exchange='sample', routing_key='', body=message)
connection.close()
メッセージ送信です。
exchangeは作成したものを指定し、routing_keyはqueueの名前やタイプに応じた条件を入力するのですが、今回はfanoutなので指定せず、すべてに送信するようにしています。
最後にコネクションを切断するのを忘れないようにしましょう。
メッセージ受信
pythonでのメッセージ受信サンプルになります。
こちらも公式サイトのサンプルコードを一部変更した内容となっています。
すべてのコードを載せてしまいます。
import pika
credentials = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.56.105', 5672, '/', credentials))
channel = connection.channel()
channel.exchange_declare(exchange='sample', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='sample', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
前半の解説は割愛します。
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
受信側はqueueと紐づける必要があるので、queueを作成しています。
queueを空白にすることで自動で一意の文字列が設定されます。
exclusive=Trueを指定することでセッションを切断したときに自動でqueueが削除されるようになります。
後ほどexchangeとqueueを明示的に紐づける必要があるので、変数に自動でつけられたqueueの名前を格納しておきます。
channel.queue_bind(exchange='sample', queue=queue_name)
exchangeとqueueを紐づけています。
このようにすることでsampleという名前のexchangeにqueueが紐づけられ、fanoutで送られるメッセージが受信できるようになります。
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
メッセージを受信した際のコールバック関数作成と受信開始をしています。
今更ですが、RabbitMQではメッセージ送信する人をパブリッシャー、受信する人をコンシューマーというので、関数名にconsumeが入っています。
それではまた。