RabbitMQ についてどちらかといえば開発側寄りの視点で下記あたりのドキュメントを読んだメモ。
メッセージの ack と reject
コンシューマーがメッセージを受信するとき、最終的に basic_ack
や basic_reject
で応答することでメッセージはキューから削除されます。
<?php $channel->basic_consume( 'hello', '', false, // $no_local false, // $no_ack false, // $exclusive false, // $nowait function (AMQPMessage $msg) use ($channel) { if (/* some */) { // 肯定応答 $channel->basic_ack($msg->getDeliveryTag()); } elseif (/* some */) { // 否定応答 $channel->basic_reject($msg->getDeliveryTag(), false); } } );
basic_ack
だと単にメッセージは削除されます。
basic_reject
は、第2引数($requeue
)が false
だと、もし設定されていればデッドレターへ、設定されていなければメッセージは削除されます。
basic_reject
で、第2引数($requeue
)が true
ならメッセージは再ディスパッチされます。このとき、メッセージの redelivered
プロパティが true
になります。
<?php $channel->basic_consume( 'hello', '', false, // $no_local false, // $no_ack false, // $exclusive false, // $nowait function (AMQPMessage $msg) use ($channel) { if ($msg->get('redelivered')) { // 再ディスパッチなら true $channel->basic_ack($msg->getDeliveryTag()); } else { // basic_reject で再ディスパッチ $channel->basic_reject($msg->getDeliveryTag(), true); } } );
次のような状況でメッセージは再ディスパッチされます(redelivered
プロパティが設定される)。
- メッセージを
basic_reject
やbasic_nack
で$requeue = false
で応答したとき - メッセージを
basic_ack
やbasic_reject
やbasic_nack
せずにコンシューマーの接続が切れてチャンネルが閉じられたとき - マスターキューのノードが停止して RabbitMQ どのコンシューマーにメッセージを投げているかわからなくなったとき
- 他にも?
Exchange の type
queue_declare
で Exchange を宣言するときに指定する $type
について。
default
事前に宣言されている空の名前の Exchange で、キューの作成時にキューと同じ名前のルーティングキーで自動的にバインドされる。
<?php // キューを宣言 $channel->queue_declare('hello_queue'); // Exchange を明示的に宣言せずに $exchange は空で $routing_key にキューの名前を指定して送る $channel->basic_publish($message, '', 'hello_queue');
direct
メッセージのルーティングキーに一致するバインディングキーでバインドされているキューに配信する。
<?php // キューを宣言 $channel->queue_declare('hello_queue'); // Exchange を宣言してバインド $channel->exchange_declare('hello_exchange', 'direct'); $channel->queue_bind('hello_queue', 'hello_exchange', 'hello_routing'); // メッセージは $exchange と $routing_key を指定して送る $channel->basic_publish($message, 'hello_exchange', 'hello_routing');
複数のキューが同じバインディングキーでバインドしているときは、1つのメッセージが複数のキューに配信される。
fanout
ルーティングキーを無視して Exchange にバインドされているすべてのキューにメッセージを配信する。
<?php // キューを宣言 $channel->queue_declare('hello_queue'); // Exchange を宣言してバインド $channel->exchange_declare('hello_exchange', 'fanout'); $channel->queue_bind('hello_queue', 'hello_exchange'); // メッセージは $exchange だけ指定して送る $channel->basic_publish($msg, 'hello_exchange'); // ルーティングキーは指定しても無視される $channel->basic_publish($msg, 'hello_exchange', 'hello_routing');
topic
メッセージのルーティングキーにはドット区切りで複数の単語を指定する。
Exchange のバインディングキーもドット区切りだけれども *
や #
のようなワイルドカードが使用できる(メッセージのルーティングキーにはワイルドカードは使用できない)。
*
任意の 1 つの単語#
0 個以上の任意の単語
<?php // キューを宣言 $channel->queue_declare('hello_queue'); // Exchange を作成してワイルドカードを含むバインディングキーでバインド $channel->exchange_declare('hello_exchange', 'topic'); $channel->queue_bind('hello_queue', 'hello_exchange', 'aaa.*.zzz'); // ワイルドカードにマッチするルーティングキーを指定してメッセージをディスパッチ $channel->basic_publish($msg, 'hello_exchange', 'aaa.xxx.zzz');
header
ルーティングキーを無視して、メッセージのヘッダーを元にルーティングする・・・よくわからん??
キューや Exchange やメッセージの永続化
キューや Exchange はデフォルトでは RabbitMQ を再起動すると削除されますが、queue_declare
や exchange_declare
で $durable = true
を指定すると永続化され、再起動しても残るようになります。
<?php $channel->queue_declare( 'hello_queue', false, // $passive true, // $durable false, // $exclusive false // $auto_delete ); $channel->exchange_declare( 'hello_exchange', 'direct', false, // $passive true, // $durable false // $auto_delete );
なお、キューの中のメッセージも永続化する場合は、queue_declare
でキューを永続化するように宣言しつつ、個々のメッセージにも次のように永続化を指定する必要があります。
<?php $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
キューや Exchange の自動削除
queue_declare
や exchange_declare
で $auto_delete = true
を指定すると、キューは接続しているコンシューマーがすべていなくなったときに、Exchange はバインドされているキューがすべて無くなったときに、自動的に削除されます。
キューはメッセージが残っていても削除されます。
<?php $channel->queue_declare( 'hello_queue', false, // $passive false, // $durable false, // $exclusive true // $auto_delete ); $channel->exchange_declare( 'hello_exchange', 'direct', false, // $passive false, // $durable true // $auto_delete );
なお、RabbitMQ を再起動すると(あたりまえですが)コンシューマーからの接続はいったんすべて切れるため、$auto_delete = true
で自動削除を有効にしていると $durable = true
で永続化を有効にしていても再起動で削除されます。$durable = true
なら自動的に $auto_delete = false
になるかと思いきやそんなことはありません。ので $durable = true
なら $auto_delete = false
にしないと意味が無さそうです。
排他キュー
queue_declare
で $exclusive = false
を指定して宣言すると、その接続専用の排他的に利用できるキューが作成されます。排他キューを他の接続が consume
しようとするとエラーになります。
<?php $channel->queue_declare( 'hello_queue', false, // $passive false, // $durable true, // $exclusive false // $auto_delete );
排他キューはその接続専用なので $auto_delete
の指定に関わらず接続が終われば削除されます。また、永続化されることもないし、ミラーリングが設定されていたとしてもミラーリングされません。
次のように匿名キューを用いたブロードキャストに利用できます。
<?php // 匿名の排他キューを作成 list($queue_name) = $channel->queue_declare( '', false, // $passive false, // $durable true, // $exclusive false // $auto_delete ); // Exchange を宣言してバインド $channel->exchange_declare( 'hello_broadcast', 'fanout', false, // $passive true, // $durable false // $auto_delete ); $channel->queue_bind($queue_name, 'hello_broadcast'); // 匿名キューを consume する $channel->basic_consume($queue_name, '', false, false, false, false, function ($msg) { // ... }); while ($channel->is_consuming()) { $channel->wait(); }
<?php // メッセージをパブリッシュ $channel->basic_publish($msg, 'hello_broadcast');
メッセージのプリフェッチ
RabbitMQ は複数のコンシューマーに対して単純にラウンドロビンでメッセージを配送します。また、コンシューマーが未ACKなメッセージを持っていたとしても構わず新しいメッセージをディスパッチするため、コンシューマーには複数の未ACKなメッセージが溜まることになります。
例えば 2 つのコンシューマーがいるとき、奇数番目のメッセージがすべて重く、偶数番目のメッセージがすべて軽い場合、重い奇数番目のメッセージが 1 つのコンシューマーに集中してしまい、もう片方のコンシューマーが空いているにも関わらず奇数番目のメッセージがなかなか処理されないことがある。
- 1.重いメッセージ → コンシューマーA
- 2.軽いメッセージ → コンシューマーB
- 3.重いメッセージ → コンシューマーA ※「1.重いメッセージ」を処理中でもメッセージがディスパッチされる
- 4.軽いメッセージ → コンシューマーB
次のように basic_qos
で $prefetch_count
を指定すると、コンシューマーにディスパッチされている未 ACK なメッセージの上限が設定できる。1 を指定すれば、あるコンシューマーがなにかしらメッセージを処理中なら(=未ACKなメッセージがあるなら)コンシューマーへはメッセージが配信されません。
<?php $channel->basic_qos( 0, // $prefetch_size 1, // $prefetch_count false // $a_global );
- 1.重いメッセージ → コンシューマーA
- 2.軽いメッセージ → コンシューマーB
- 3.重いメッセージ → コンシューマーB ※「コンシューマーA」は「1.重いメッセージ」が未ACKなので別のコンシューマーに送られる
- 4.軽いメッセージ → ※「1.重いメッセージ」と「3.重いメッセージ」の先に終わった方に送られる
第3引数の $a_global
はプリフェッチ数がどの範囲に適用されるかの指定ですが、AMQP 0-9-1 の仕様と RabbitMQ とで差があるようです。
$a_global |
AMQP 0-9-1 | RabbitMQ |
---|---|---|
false | チャンネル全体 | コンシューマ個別 |
true | 接続全体 | チャンネル全体 |
$prefetch_size
は RabbitMQ では実装されていないとのこと。
no_ack
basic_consume
で $no_ack = true
にすると、コンシューマーにメッセージがディスパッチされた時点でメッセージがキューから削除されるため basic_ack
や basic_reject
は不要になります。
<?php $channel->basic_consume( 'hello', '', false, // $no_local true, // $no_ack false, // $exclusive false, // $nowait function (AMQPMessage $msg) use ($channel) { // ... } );
この方法は RabbitMQ と余分なやり取りがなくなるのでスループットが向上しますが、コンシューマーが突然死などするとメッセージが失われます。メッセージが削除されるのはコンシューマーのコールバックが呼ばれたときではなくコンシューマーにメッセージがディスパッチされたときなので、処理中の1つのメッセージが失われるだけではなく、そのコンシューマーにディスパッチしているすべてのメッセージが失われます。
なお、プリフェッチ数(basic_qos
の $prefetch_count
)は「未ACKのメッセージ数」の上限なので、ACK不要な $no_ack = true
だと効果ありません。
ところで $no_ack = true
なコンシューマーを停止するときは、そのコンシューマーへのディスパッチを停止したうえでディスパッチ済のメッセージをすべて処理してから停止するのが良いと思うんですけど・・それどうすればいいんですかね?ちょっとわかりませんでした。それができないとコンシューマーの再起動時に確実にメッセージが失われそうな・・それでも構わないぐらいのケースで使うものですかね。
メッセージのTTL
メッセージの TTL はキューとメッセージの両方に指定できる。
# ポリシーで指定 rabbitmqctl set_policy my_ttl '.*' '{"message-ttl":60000}' --apply-to queues
<?php // キュー宣言で指定 $args = new AMQPTable(); $args->set('x-message-ttl', 60000); $channel->queue_declare( 'hello_queue', false, // $passive true, // $durable false, // $exclusive false, // $auto_delete false, // $nowait $args );
<?php // メッセージごとに指定 $msg = new AMQPMessage($data, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'expiration' => 60000, ]);
有効期限が切れたメッセージは後述のデッドレターに送られるかもしくは削除される。
メッセージが再ディスパッチされるとき(basic_reject
の $requeue = false
など )、メッセージの元の有効期限は保持される。つまりメッセージがキューに入った時点で TTL を元に有効期限の日時が確定し、その後 basic_reject
などで再ディスパッチされたとしても有効期限の日時が再計算されることはない。
TTL が 0 だとメッセージがパブリッシュされた後直ぐにコンシューマーに送られない限り、メッセージは期限切れになる。
キューとメッセージの両方で TTL が指定されている場合は、より小さい方が採用される。
キューのTTL
キューにも TTL を設定することができる。前述の「キューのポリシーや宣言で指定するメッセージの TTL」ではなく、キュー自体の TTL です。
キューに TTL を設定すると、コンシューマーが居ないなどで一定時間使用されていないキューを自動で削除できる。
# ポリシーで指定 rabbitmqctl set_policy my_expiry '.*' '{"expires":1800000}' --apply-to queues
<?php // キュー宣言で指定 $args = new AMQPTable(); $args->set('x-expires', 1800000); $channel->queue_declare( 'hello_queue', false, // $passive = false, true, // $durable = false, false, // $exclusive = false, false, // $auto_delete = true, false, // $nowait = false, $args );
キューが TTL で削除されるとき、そのキューにメッセージが残っているとデッドレターに送られることもなくキューと一緒に削除される。
デッドレター
メッセージは次のいずれかでデッドレターになる。
basic_reject
やbasic_nack
が$requeue = false
で呼ばれたとき- メッセージが TTL により期限切れになったとき
- キューの長さ制限を超えたとき
- 他にも?
このときキューにデッドレターの宛先の Exchange が指定されていると、メッセージはその Exchange に 送られます(指定していなければメッセージは削除される)。
# ポリシーで指定 rabbitmqctl set_policy my_dead '.*' '{"dead-letter-exchange":"dead_exchange"},"dead-letter-routing-key":"dead_route"' --apply-to queues
<?php // キュー宣言で指定 $args = new AMQPTable(); $args->set('x-dead-letter-exchange', 'dead_exchange'); $args->set('x-dead-letter-routing-key', 'dead_route'); $channel->queue_declare( 'hello_queue', false, // $passive true, // $durable false, // $exclusive false, // $auto_delete false, // $nowait $args );
デッドレターメッセージのルーティング
キューに dead-letter-routing-key
が設定されているならそのルーティングキーで dead-letter-exchange
へ送られる。
設定されていないなら「そのメッセージがパブリッシュされたときの元のルーティングキー」で dead-letter-exchange
へ送られる(ルーティングキーが空になるわけではない)。
デッドレターメッセージの書き換え
デッドレターメッセージにはそのメッセージの元の配送先のキューなどの情報がヘッダーに追加される。
<?php $channel->basic_consume('dead_queue', '', false, true, false, false, function (AMQPMessage $msg) { $headers = $msg->get('application_headers'); assert($headers instanceof AMQPTable); $data = $headers->getNativeData(); var_dump( $data['x-first-death-exchange'], $data['x-first-death-queue'], $data['x-first-death-reason'], $data['x-death'] ); });
元のメッセージに expiration
が指定されていてもデッドレターメッセージでは除去される。その他のプロパティは元のメッセージのものがそのまま引き継がれる。
優先度付きキュー
キューの宣言時にそのキューがサポートする最大の優先度を x-max-priority
で指定すると、そのキューは優先度付きキューとなる。
<?php // キュー宣言で指定 $args = new AMQPTable(); $args->set('x-max-priority', 10); $channel->queue_declare( 'hello_queue', false, // $passive true, // $durable false, // $exclusive false, // $auto_delete false, // $nowait $args );
10
を指定すると 0 ~ 10 の合計 11 個の優先度が設けられることになる。x-max-priority
は最大で 255 まで指定できるものの、優先度の数ごとに余分なリソースを食らうので(CPU・メモリ・ディスク)、大量のレベルは作成しないほうが良く、多くても 10 ぐらいまでが奨励される。
パブリッシャーはメッセージの priority
プロパティでメッセージの優先度を指定する。大きいほうが優先度は高い。
<?php $msg = new AMQPMessage($data, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'priority' => 5, ]);
優先度が指定されないメッセージは優先度 0
となる。キューの x-max-priority
よりも大きい優先度がメッセージに指定されたときは最大値と同じとして扱われる。
basic_qos
でメッセージのプリフェッチ数を制限していないと、コンシューマーがすべてビジーだとしてもメッセージが RabbitMQ へパブリッシュされた時点で直ちにコンシューマーへ配信されるため、優先度の意味がほとんど無くなるので注意(来たものから最速でコンシューマーにディスパッチされるため)。
キューの x-max-priority
はポリシーでは定義できない。なぜなら TTL やキューの長さの制限は動的なものなのでキューの宣言後でもポリシーで変更できるが、キューの優先度の最大値はキューの宣言後に変更できないため。
HTTP API でパブリッシュ
Management plugin が有効になっていれば HTTP API でもメッセージをパブリッシュできる。
curl -u 'ore:pass' -H 'content-type:application/json' -XPOST "http://localhost:15672/api/exchanges/%2f/hello_exchange/publish" --data-raw '{ "properties":{ "delivery_mode": 2, "priority": 5 }, "routing_key":"hello_route", "payload":"hoge", "payload_encoding": "string" }'
ただ、1回の接続で複数のメッセージを送る分には AMQP のバイナリプロトコルの方が圧倒的に早いし、1回パブリッシュするたびに接続し直す場合でも AMQP の方がちょっと早いぐらいなので、HTTP API でパブリッシュするメリットは特にありません。
メッセージをキューの後ろに回す
メッセージを basic_reject
や basic_nack
で $requeue = true
を指定してメッセージが再キューイングされるとき、メッセージはキューの元の位置に入るため、キューの後ろに同じメッセージが詰まっていても同じメッセージが繰り返しディスパッチされます。
再キューイングのメッセージは後回しにして詰まっているメッセージを先に処理したいときは、次のようにメッセージを ACK した上で再パブリッシュすればできます。
<?php $channel->basic_consume( 'hello', '', false, // $no_local false, // $no_ack false, // $exclusive false, // $nowait function (AMQPMessage $msg) use ($channel) { // 同じメッセージを再作成して同じ宛先へ再パブリッシュ $new = new AMQPMessage($msg->body, $msg->get_properties()); $channel->basic_publish($new, $msg->delivery_info['exchange'], $msg->delivery_info['routing_key']); $channel->basic_ack($msg->getDeliveryTag()); } );
注意すべき点として、メッセージやキューにメッセージの TTL が指定されている場合、普通に basic_reject
で再キューイングされたときは一番最初にメッセージがキューに入ったときから起算して有効期限が適用されるのに対して、↑の方法だとメッセージを再作成するたびに有効期限が計算され直すため、いつまでもメッセージが有効期限切れにならないことがあります。
メッセージを遅延させる
メッセージをパブリッシュするときに指定した秒数だけコンシューマーへ配信するのを遅延させる方法。普通にはできませんが、rabbitmq-delayed-message-exchange プラグインを有効にすればできます。
次のようにインストールします。
# プラグインのインストールディレクトリを調べる rabbitmqctl eval 'application:get_env(rabbit, plugins_dir).' #=> {ok,"/opt/rabbitmq/plugins"} # プラグインのインストール curl -sL https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez \ -o /opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.8.0.ez # プラグインの有効化 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Exchange の type
で x-delayed-message
を指定します。本来の type
は x-delayed-type
引数で指定します。
<?php $args = new AMQPTable(); $args->set('x-delayed-type', 'direct'); $channel->exchange_declare( 'hello', 'x-delayed-message', false, // $passive = false, false, // $durable = false, true, // $auto_delete = true, false, // $internal = false, false, // $nowait = false, $args );
パブリッシャーは x-delay
ヘッダーで遅延時間をミリ秒で指定します。
<?php $msg = new AMQPMessage($data, [ 'application_headers' => new AMQPTable(['x-delay' => 1000]), ]); $channel->basic_publish($msg, 'hello');
README によるとこのプラグインは実験的ではあるものの安定しており、README に記載の制限を理解しているならプロダクションで使っても良さそうとのことです。
メッセージを遅延させる(非プラグイン)
rabbitmq-delayed-message-exchange
を使わずにやる方法。TTLとデッドレターを使えばできます。
<?php $args = new AMQPTable(); // デッドレターの宛先として default exchange を指定する $args->set('x-dead-letter-exchange', ''); // ルーティングキーは指定しないのでデッドレターは元のルーティングキーが維持される //$args->set('x-dead-letter-routing-key', ''); $channel->queue_declare('delay', false, // $passive false, // $durable false, // $exclusive true, // $auto_delete false, // $nowait $args ); // fanout exchange でルーティングキーを無視して delay キューへ送る $channel->exchange_declare('delay', 'fanout'); $channel->queue_bind('delay', 'delay');
<?php // 遅延時間を expiration で指定する(ミリ秒) $msg = new AMQPMessage($data, ['expiration' => 1000]); // exchange に delay を、ルーティングキーに遅延して配信したいキュー名を指定 $channel->basic_publish($msg, 'delay', 'hello');
この例では default exchange と fanout exchange を用いて、シンプルに1つの遅延キュー(delay
)で複数のキューへ配信できるようにしています。
もっと複雑なルーティングを利用したい場合は最終的な宛先となる Exchange ごとに遅延キューを設けるなどする必要があるかもしれません。
<?php $args = new AMQPTable(); $args->set('x-dead-letter-exchange', 'hello'); $channel->queue_declare('hello.delay', false, // $passive false, // $durable false, // $exclusive true, // $auto_delete false, // $nowait $args ); $channel->exchange_declare('hello.delay', 'fanout'); $channel->queue_bind('hello.delay', 'hello.delay');
<?php $msg = new AMQPMessage($data, ['expiration' => 1000]); $channel->basic_publish($msg, 'hello.delay', 'aaa.bbb.ccc');