RabbitMQ を用いた開発のためのメモ

RabbitMQ についてどちらかといえば開発側寄りの視点で下記あたりのドキュメントを読んだメモ。

https://www.rabbitmq.com/documentation.html

メッセージの ack と reject

コンシューマーがメッセージを受信するとき、最終的に basic_ackbasic_reject で応答することでメッセージはキューから削除されます。

<?php
$channel->basic_consume(
    'hello',
    '',
    false, // $no_local
    false, // $no_ack
    false, // $exclusive
    false, // $nowait
    function (AMQPMessage $msg) use ($channel) {
        if (/* some */) {
            // 肯定応答
            $channel->basic_ack($msg->getDeliveryTag());
        } elseif (/* some */) {
            // 否定応答
            $channel->basic_reject($msg->getDeliveryTag(), false);
        }
    }
);

basic_ack だと単にメッセージは削除されます。

basic_reject は、第2引数($requeue)が false だと、もし設定されていればデッドレターへ、設定されていなければメッセージは削除されます。

basic_reject で、第2引数($requeue)が true ならメッセージは再ディスパッチされます。このとき、メッセージの redelivered プロパティが true になります。

<?php
$channel->basic_consume(
    'hello',
    '',
    false, // $no_local
    false, // $no_ack
    false, // $exclusive
    false, // $nowait
    function (AMQPMessage $msg) use ($channel) {
        if ($msg->get('redelivered')) { // 再ディスパッチなら true
            $channel->basic_ack($msg->getDeliveryTag());
        } else {
            // basic_reject で再ディスパッチ
            $channel->basic_reject($msg->getDeliveryTag(), true);
        }
    }
);

次のような状況でメッセージは再ディスパッチされます(redelivered プロパティが設定される)。

  • メッセージを basic_rejectbasic_nack$requeue = false で応答したとき
  • メッセージを basic_ackbasic_rejectbasic_nack せずにコンシューマーの接続が切れてチャンネルが閉じられたとき
  • マスターキューのノードが停止して RabbitMQ どのコンシューマーにメッセージを投げているかわからなくなったとき
  • 他にも?

Exchange の type

queue_declare で Exchange を宣言するときに指定する $type について。

default

事前に宣言されている空の名前の Exchange で、キューの作成時にキューと同じ名前のルーティングキーで自動的にバインドされる。

<?php
// キューを宣言
$channel->queue_declare('hello_queue');

// Exchange を明示的に宣言せずに $exchange は空で $routing_key にキューの名前を指定して送る
$channel->basic_publish($message, '', 'hello_queue');

direct

メッセージのルーティングキーに一致するバインディングキーでバインドされているキューに配信する。

<?php
// キューを宣言
$channel->queue_declare('hello_queue');

// Exchange を宣言してバインド
$channel->exchange_declare('hello_exchange', 'direct');
$channel->queue_bind('hello_queue', 'hello_exchange', 'hello_routing');

// メッセージは $exchange と $routing_key を指定して送る
$channel->basic_publish($message, 'hello_exchange', 'hello_routing');

複数のキューが同じバインディングキーでバインドしているときは、1つのメッセージが複数のキューに配信される。

fanout

ルーティングキーを無視して Exchange にバインドされているすべてのキューにメッセージを配信する。

<?php
// キューを宣言
$channel->queue_declare('hello_queue');

// Exchange を宣言してバインド
$channel->exchange_declare('hello_exchange', 'fanout');
$channel->queue_bind('hello_queue', 'hello_exchange');

// メッセージは $exchange だけ指定して送る
$channel->basic_publish($msg, 'hello_exchange');

// ルーティングキーは指定しても無視される
$channel->basic_publish($msg, 'hello_exchange', 'hello_routing');

topic

メッセージのルーティングキーにはドット区切りで複数の単語を指定する。

Exchange のバインディングキーもドット区切りだけれども *# のようなワイルドカードが使用できる(メッセージのルーティングキーにはワイルドカードは使用できない)。

  • * 任意の 1 つの単語
  • # 0 個以上の任意の単語
<?php
// キューを宣言
$channel->queue_declare('hello_queue');

// Exchange を作成してワイルドカードを含むバインディングキーでバインド
$channel->exchange_declare('hello_exchange', 'topic');
$channel->queue_bind('hello_queue', 'hello_exchange', 'aaa.*.zzz');

// ワイルドカードにマッチするルーティングキーを指定してメッセージをディスパッチ
$channel->basic_publish($msg, 'hello_exchange', 'aaa.xxx.zzz');

header

ルーティングキーを無視して、メッセージのヘッダーを元にルーティングする・・・よくわからん??

キューや Exchange やメッセージの永続化

キューや Exchange はデフォルトでは RabbitMQ を再起動すると削除されますが、queue_declareexchange_declare$durable = true を指定すると永続化され、再起動しても残るようになります。

<?php
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    true,  // $durable
    false, // $exclusive
    false  // $auto_delete
);

$channel->exchange_declare(
    'hello_exchange',
    'direct',
    false, // $passive
    true,  // $durable
    false  // $auto_delete
);

なお、キューの中のメッセージも永続化する場合は、queue_declare でキューを永続化するように宣言しつつ、個々のメッセージにも次のように永続化を指定する必要があります。

<?php
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

キューや Exchange の自動削除

queue_declareexchange_declare$auto_delete = true を指定すると、キューは接続しているコンシューマーがすべていなくなったときに、Exchange はバインドされているキューがすべて無くなったときに、自動的に削除されます。

キューはメッセージが残っていても削除されます。

<?php
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    false, // $durable
    false, // $exclusive
    true   // $auto_delete
);

$channel->exchange_declare(
    'hello_exchange',
    'direct',
    false, // $passive
    false, // $durable
    true   // $auto_delete
);

なお、RabbitMQ を再起動すると(あたりまえですが)コンシューマーからの接続はいったんすべて切れるため、$auto_delete = true で自動削除を有効にしていると $durable = true で永続化を有効にしていても再起動で削除されます。$durable = true なら自動的に $auto_delete = false になるかと思いきやそんなことはありません。ので $durable = true なら $auto_delete = false にしないと意味が無さそうです。

排他キュー

queue_declare$exclusive = false を指定して宣言すると、その接続専用の排他的に利用できるキューが作成されます。排他キューを他の接続が consume しようとするとエラーになります。

<?php
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    false, // $durable
    true,  // $exclusive
    false  // $auto_delete
);

排他キューはその接続専用なので $auto_delete の指定に関わらず接続が終われば削除されます。また、永続化されることもないし、ミラーリングが設定されていたとしてもミラーリングされません。

次のように匿名キューを用いたブロードキャストに利用できます。

<?php
// 匿名の排他キューを作成
list($queue_name) = $channel->queue_declare(
    '',
    false, // $passive
    false, // $durable
    true,  // $exclusive
    false  // $auto_delete
);

// Exchange を宣言してバインド
$channel->exchange_declare(
    'hello_broadcast',
    'fanout',
    false, // $passive
    true,  // $durable
    false  // $auto_delete
);
$channel->queue_bind($queue_name, 'hello_broadcast');

// 匿名キューを consume する
$channel->basic_consume($queue_name, '', false, false, false, false, function ($msg) {
    // ...
});

while ($channel->is_consuming()) {
    $channel->wait();
}
<?php
// メッセージをパブリッシュ
$channel->basic_publish($msg, 'hello_broadcast');

メッセージのプリフェッチ

RabbitMQ は複数のコンシューマーに対して単純にラウンドロビンでメッセージを配送します。また、コンシューマーが未ACKなメッセージを持っていたとしても構わず新しいメッセージをディスパッチするため、コンシューマーには複数の未ACKなメッセージが溜まることになります。

例えば 2 つのコンシューマーがいるとき、奇数番目のメッセージがすべて重く、偶数番目のメッセージがすべて軽い場合、重い奇数番目のメッセージが 1 つのコンシューマーに集中してしまい、もう片方のコンシューマーが空いているにも関わらず奇数番目のメッセージがなかなか処理されないことがある。

  • 1.重いメッセージ → コンシューマーA
  • 2.軽いメッセージ → コンシューマーB
  • 3.重いメッセージ → コンシューマーA ※「1.重いメッセージ」を処理中でもメッセージがディスパッチされる
  • 4.軽いメッセージ → コンシューマーB

次のように basic_qos$prefetch_count を指定すると、コンシューマーにディスパッチされている未 ACK なメッセージの上限が設定できる。1 を指定すれば、あるコンシューマーがなにかしらメッセージを処理中なら(=未ACKなメッセージがあるなら)コンシューマーへはメッセージが配信されません。

<?php
$channel->basic_qos(
    0,    // $prefetch_size
    1,    // $prefetch_count
    false // $a_global
);
  • 1.重いメッセージ → コンシューマーA
  • 2.軽いメッセージ → コンシューマーB
  • 3.重いメッセージ → コンシューマーB ※「コンシューマーA」は「1.重いメッセージ」が未ACKなので別のコンシューマーに送られる
  • 4.軽いメッセージ → ※「1.重いメッセージ」と「3.重いメッセージ」の先に終わった方に送られる

第3引数の $a_global はプリフェッチ数がどの範囲に適用されるかの指定ですが、AMQP 0-9-1 の仕様と RabbitMQ とで差があるようです。

$a_global AMQP 0-9-1 RabbitMQ
false チャンネル全体 コンシューマ個別
true 接続全体 チャンネル全体

$prefetch_size は RabbitMQ では実装されていないとのこと。

no_ack

basic_consume$no_ack = true にすると、コンシューマーにメッセージがディスパッチされた時点でメッセージがキューから削除されるため basic_ackbasic_reject は不要になります。

<?php
$channel->basic_consume(
    'hello',
    '',
    false, // $no_local
    true,  // $no_ack
    false, // $exclusive
    false, // $nowait
    function (AMQPMessage $msg) use ($channel) {
        // ...
    }
);

この方法は RabbitMQ と余分なやり取りがなくなるのでスループットが向上しますが、コンシューマーが突然死などするとメッセージが失われます。メッセージが削除されるのはコンシューマーのコールバックが呼ばれたときではなくコンシューマーにメッセージがディスパッチされたときなので、処理中の1つのメッセージが失われるだけではなく、そのコンシューマーにディスパッチしているすべてのメッセージが失われます。

なお、プリフェッチ数(basic_qos$prefetch_count)は「未ACKのメッセージ数」の上限なので、ACK不要な $no_ack = true だと効果ありません。

ところで $no_ack = true なコンシューマーを停止するときは、そのコンシューマーへのディスパッチを停止したうえでディスパッチ済のメッセージをすべて処理してから停止するのが良いと思うんですけど・・それどうすればいいんですかね?ちょっとわかりませんでした。それができないとコンシューマーの再起動時に確実にメッセージが失われそうな・・それでも構わないぐらいのケースで使うものですかね。

メッセージのTTL

メッセージの TTL はキューとメッセージの両方に指定できる。

# ポリシーで指定
rabbitmqctl set_policy my_ttl '.*' '{"message-ttl":60000}' --apply-to queues
<?php
// キュー宣言で指定
$args = new AMQPTable();
$args->set('x-message-ttl', 60000);
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    true,  // $durable
    false, // $exclusive
    false, // $auto_delete
    false, // $nowait
    $args
);
<?php
// メッセージごとに指定
$msg = new AMQPMessage($data, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'expiration' => 60000,
]);

有効期限が切れたメッセージは後述のデッドレターに送られるかもしくは削除される。

メッセージが再ディスパッチされるとき(basic_reject$requeue = false など )、メッセージの元の有効期限は保持される。つまりメッセージがキューに入った時点で TTL を元に有効期限の日時が確定し、その後 basic_reject などで再ディスパッチされたとしても有効期限の日時が再計算されることはない。

TTL が 0 だとメッセージがパブリッシュされた後直ぐにコンシューマーに送られない限り、メッセージは期限切れになる。

キューとメッセージの両方で TTL が指定されている場合は、より小さい方が採用される。

キューのTTL

キューにも TTL を設定することができる。前述の「キューのポリシーや宣言で指定するメッセージの TTL」ではなく、キュー自体の TTL です。

キューに TTL を設定すると、コンシューマーが居ないなどで一定時間使用されていないキューを自動で削除できる。

# ポリシーで指定
rabbitmqctl set_policy my_expiry '.*' '{"expires":1800000}' --apply-to queues
<?php
// キュー宣言で指定
$args = new AMQPTable();
$args->set('x-expires', 1800000);
$channel->queue_declare(
    'hello_queue',
    false, // $passive = false,
    true,  // $durable = false,
    false, // $exclusive = false,
    false, // $auto_delete = true,
    false, // $nowait = false,
    $args
);

キューが TTL で削除されるとき、そのキューにメッセージが残っているとデッドレターに送られることもなくキューと一緒に削除される。

デッドレター

メッセージは次のいずれかでデッドレターになる。

  • basic_rejectbasic_nack$requeue = false で呼ばれたとき
  • メッセージが TTL により期限切れになったとき
  • キューの長さ制限を超えたとき
  • 他にも?

このときキューにデッドレターの宛先の Exchange が指定されていると、メッセージはその Exchange に 送られます(指定していなければメッセージは削除される)。

# ポリシーで指定
rabbitmqctl set_policy my_dead '.*' '{"dead-letter-exchange":"dead_exchange"},"dead-letter-routing-key":"dead_route"' --apply-to queues
<?php
// キュー宣言で指定
$args = new AMQPTable();
$args->set('x-dead-letter-exchange', 'dead_exchange');
$args->set('x-dead-letter-routing-key', 'dead_route');
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    true,  // $durable
    false, // $exclusive
    false, // $auto_delete
    false, // $nowait
    $args
);

デッドレターメッセージのルーティング

キューに dead-letter-routing-key が設定されているならそのルーティングキーで dead-letter-exchange へ送られる。 設定されていないなら「そのメッセージがパブリッシュされたときの元のルーティングキー」で dead-letter-exchange へ送られる(ルーティングキーが空になるわけではない)。

デッドレターメッセージの書き換え

デッドレターメッセージにはそのメッセージの元の配送先のキューなどの情報がヘッダーに追加される。

<?php
$channel->basic_consume('dead_queue', '', false, true, false, false, function (AMQPMessage $msg) {
    $headers = $msg->get('application_headers');
    assert($headers instanceof AMQPTable);
    $data = $headers->getNativeData();
    var_dump(
        $data['x-first-death-exchange'],
        $data['x-first-death-queue'],
        $data['x-first-death-reason'],
        $data['x-death']
    );
});

元のメッセージに expiration が指定されていてもデッドレターメッセージでは除去される。その他のプロパティは元のメッセージのものがそのまま引き継がれる。

優先度付きキュー

キューの宣言時にそのキューがサポートする最大の優先度を x-max-priority で指定すると、そのキューは優先度付きキューとなる。

<?php
// キュー宣言で指定
$args = new AMQPTable();
$args->set('x-max-priority', 10);
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    true,  // $durable
    false, // $exclusive
    false, // $auto_delete
    false, // $nowait
    $args
);

10 を指定すると 0 ~ 10 の合計 11 個の優先度が設けられることになる。x-max-priority は最大で 255 まで指定できるものの、優先度の数ごとに余分なリソースを食らうので(CPU・メモリ・ディスク)、大量のレベルは作成しないほうが良く、多くても 10 ぐらいまでが奨励される。

パブリッシャーはメッセージの priority プロパティでメッセージの優先度を指定する。大きいほうが優先度は高い。

<?php
$msg = new AMQPMessage($data, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'priority' => 5,
]);

優先度が指定されないメッセージは優先度 0 となる。キューの x-max-priority よりも大きい優先度がメッセージに指定されたときは最大値と同じとして扱われる。

basic_qos でメッセージのプリフェッチ数を制限していないと、コンシューマーがすべてビジーだとしてもメッセージが RabbitMQ へパブリッシュされた時点で直ちにコンシューマーへ配信されるため、優先度の意味がほとんど無くなるので注意(来たものから最速でコンシューマーにディスパッチされるため)。

キューの x-max-priority はポリシーでは定義できない。なぜなら TTL やキューの長さの制限は動的なものなのでキューの宣言後でもポリシーで変更できるが、キューの優先度の最大値はキューの宣言後に変更できないため。

HTTP API でパブリッシュ

Management plugin が有効になっていれば HTTP API でもメッセージをパブリッシュできる。

curl -u 'ore:pass' -H 'content-type:application/json' -XPOST "http://localhost:15672/api/exchanges/%2f/hello_exchange/publish" --data-raw '{
  "properties":{
    "delivery_mode": 2,
    "priority": 5
  },
  "routing_key":"hello_route",
  "payload":"hoge",
  "payload_encoding": "string"
}'

ただ、1回の接続で複数のメッセージを送る分には AMQP のバイナリプロトコルの方が圧倒的に早いし、1回パブリッシュするたびに接続し直す場合でも AMQP の方がちょっと早いぐらいなので、HTTP API でパブリッシュするメリットは特にありません。

メッセージをキューの後ろに回す

メッセージを basic_rejectbasic_nack$requeue = true を指定してメッセージが再キューイングされるとき、メッセージはキューの元の位置に入るため、キューの後ろに同じメッセージが詰まっていても同じメッセージが繰り返しディスパッチされます。

再キューイングのメッセージは後回しにして詰まっているメッセージを先に処理したいときは、次のようにメッセージを ACK した上で再パブリッシュすればできます。

<?php
$channel->basic_consume(
    'hello',
    '',
    false, // $no_local
    false, // $no_ack
    false, // $exclusive
    false, // $nowait
    function (AMQPMessage $msg) use ($channel) {
        // 同じメッセージを再作成して同じ宛先へ再パブリッシュ
        $new = new AMQPMessage($msg->body, $msg->get_properties());
        $channel->basic_publish($new, $msg->delivery_info['exchange'], $msg->delivery_info['routing_key']);
        $channel->basic_ack($msg->getDeliveryTag());
    }
);

注意すべき点として、メッセージやキューにメッセージの TTL が指定されている場合、普通に basic_reject で再キューイングされたときは一番最初にメッセージがキューに入ったときから起算して有効期限が適用されるのに対して、↑の方法だとメッセージを再作成するたびに有効期限が計算され直すため、いつまでもメッセージが有効期限切れにならないことがあります。

メッセージを遅延させる

メッセージをパブリッシュするときに指定した秒数だけコンシューマーへ配信するのを遅延させる方法。普通にはできませんが、rabbitmq-delayed-message-exchange プラグインを有効にすればできます。

次のようにインストールします。

# プラグインのインストールディレクトリを調べる
rabbitmqctl eval 'application:get_env(rabbit, plugins_dir).'
#=> {ok,"/opt/rabbitmq/plugins"}

# プラグインのインストール
curl -sL https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez \
  -o /opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.8.0.ez

# プラグインの有効化
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Exchange の typex-delayed-message を指定します。本来の typex-delayed-type 引数で指定します。

<?php
$args = new AMQPTable();
$args->set('x-delayed-type', 'direct');
$channel->exchange_declare(
    'hello',
    'x-delayed-message',
    false, // $passive = false,
    false, // $durable = false,
    true,  // $auto_delete = true,
    false, // $internal = false,
    false, // $nowait = false,
    $args
);

パブリッシャーは x-delay ヘッダーで遅延時間をミリ秒で指定します。

<?php
$msg = new AMQPMessage($data, [
    'application_headers' => new AMQPTable(['x-delay' => 1000]),
]);
$channel->basic_publish($msg, 'hello');

README によるとこのプラグインは実験的ではあるものの安定しており、README に記載の制限を理解しているならプロダクションで使っても良さそうとのことです。

メッセージを遅延させる(非プラグイン)

rabbitmq-delayed-message-exchange を使わずにやる方法。TTLとデッドレターを使えばできます。

<?php
$args = new AMQPTable();
// デッドレターの宛先として default exchange を指定する
$args->set('x-dead-letter-exchange', '');
// ルーティングキーは指定しないのでデッドレターは元のルーティングキーが維持される
//$args->set('x-dead-letter-routing-key', '');
$channel->queue_declare('delay',
    false, // $passive
    false, // $durable
    false, // $exclusive
    true,  // $auto_delete
    false, // $nowait
    $args
);

// fanout exchange でルーティングキーを無視して delay キューへ送る
$channel->exchange_declare('delay', 'fanout');
$channel->queue_bind('delay', 'delay');
<?php
// 遅延時間を expiration で指定する(ミリ秒)
$msg = new AMQPMessage($data, ['expiration' => 1000]);

// exchange に delay を、ルーティングキーに遅延して配信したいキュー名を指定
$channel->basic_publish($msg, 'delay', 'hello');

この例では default exchange と fanout exchange を用いて、シンプルに1つの遅延キュー(delay)で複数のキューへ配信できるようにしています。

もっと複雑なルーティングを利用したい場合は最終的な宛先となる Exchange ごとに遅延キューを設けるなどする必要があるかもしれません。

<?php
$args = new AMQPTable();
$args->set('x-dead-letter-exchange', 'hello');
$channel->queue_declare('hello.delay',
    false, // $passive
    false, // $durable
    false, // $exclusive
    true,  // $auto_delete
    false, // $nowait
    $args
);

$channel->exchange_declare('hello.delay', 'fanout');
$channel->queue_bind('hello.delay', 'hello.delay');
<?php
$msg = new AMQPMessage($data, ['expiration' => 1000]);
$channel->basic_publish($msg, 'hello.delay', 'aaa.bbb.ccc');