RabbitMQ のクラスタリングのメモ

RabbitMQ のクラスタリングについて下記あたりのドキュメントを読んだメモ。

なおクラスタリングとは別にインターネット経由のような遅い回線を用いたメッセージの複製にフェデレーションというのもあるけれども、そっちは DR とかの用途のようなのでスルー。

ざっくり

  • キュー以外のすべてのデータや状態(vhost/exchange/user/permission)はクラスタのすべてのノードに自動的に複製される
  • キューは単一のノードに配置することもできるし複数のノードに複製することもできる
    • クライアントが接続したノードに存在しないキューでもクライアントからは透過的に利用できる
  • リーダーノードのようなものはなくクラスタのすべてのノードは同等のピア
    • ただしキューにはマスターキューが1つだけあってその他はミラーリングされたキュー
  • rabbitmq-diagnosticsrabbitmqctl などのコマンドはどのノードで実行してもクラスタのすべてのノードを利用できる
    • Management UI も同様
    • rabbitmq-diagnostics environment などの一部のコマンドは接続したノードのみが対象になる

クラスタの形成方法

クラスタの形成はいくつか方法がありますが RabbitMQ がネイティブでサポートしている方法だけ試しました。

他にも AWS/Kubernetes/Consul/etcd などを用いたオートスケーリングに対応した方法もあります(いずれもプラグインによるサポート)。

rabbitmqctl で手動

rabbitmqctl コマンドで手動でクラスタへジョインする。

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@mq01
rabbitmqctl start_app

設定ファイルにクラスタノードをリスト

設定ファイルにノードの一覧を記述すれば、ノードが空のデータディレクトリから起動したときに自動的にクラスタを形成することができる。

cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq01
cluster_formation.classic_config.nodes.2 = rabbit@mq02
cluster_formation.classic_config.nodes.3 = rabbit@mq03

ノードが起動したとき、まずは設定ファイルに羅列されたピアへ接続を試みる。接続に成功すればそのピアのクラスタにジョインする。もしどのピアとも接続できなければ自身がクラスタの最初のノードとして起動する。

複数のノードを同時に起動すると、それら複数のノードが自身を最初のノードとして起動してしまい、クラスタが形成されない可能性がある。それを回避するために設定ファイルやDNSベースのディスカバリではブート後にピアを探しに行く前にランダムな遅延時間が入る(5〜60秒の範囲)。

DNS ベースディスカバリ

設定ファイルに A または AAAA レコードの名前を記載する。

cluster_formation.peer_discovery_backend = rabbit_peer_discovery_dns
cluster_formation.dns.hostname = discovery.local

この名前をルックアップして得られた IP アドレスのリストを逆ルックアップしてホスト名を得て、そのホスト名にプレフィックス rabbit@ を付与してノード名とする。

例えば、次のようにDNSレコードが登録されています。

mq01.local. IN A 192.0.2.101
mq02.local. IN A 192.0.2.102
mq03.local. IN A 192.0.2.103

discovery.local. IN A 192.0.2.101
discovery.local. IN A 192.0.2.102
discovery.local. IN A 192.0.2.103

.101.0.2.192.in-addr.arpa. IN PTR mq01.local.
.102.0.2.192.in-addr.arpa. IN PTR mq02.local.
.103.0.2.192.in-addr.arpa. IN PTR mq03.local.

クラスタの再起動

クラスタのノードを再起動したとき、オンラインなノードに対して30秒10回のタイムアウトでそのピアとの通信を試みる。接続できればそのピアから同期して正常に起動する。ピアが使用可能にならないなら再起動したノードはあきらめて停止する。ただし、ノードをシャットダウンしたときにオンラインな他のピアが無かった場合、そのノードは再起動時に既知のピアとの同期を施行せずに起動する。クラスタの最初の1台として起動する。

つまり、クラスタ全体を停止した場合、最後に停止したノードだけが単体で正常に起動させることができて、その他のノードは起動後に300秒以内にクラスタのいずれかのピアと接続できなければならない。

もしクラスタの最後にシャットダウンしたノードが起動できないときは(例えば停電などですべてのノードが同時に停止するとすべてのノードが自分が最後だと思わないので、最後にシャットダウンしたノードが存在しなくなる)、別のノードで rabbitmqctl force_boot してから RabbitMQ を開始すれば他のピアとの同期を施行せずに起動させることができる。

systemctl stop rabbitmq-server
rabbitmqctl force_boot
systemctl start rabbitmq-server

「30秒10回」の期間は下記の設定で変更できる。

mnesia_table_loading_retry_timeout = 30000
mnesia_table_loading_retry_limit = 10

ノードのリセット後のクラスタへの再ジョイン

ノードがオフラインになっている間にノード名が変更されたり rabbitmqctl reset などでデータディレクトリがリセットされると、ノードを開始しても以前のクラスタには再ジョインできなくなる。 そのようなノードは rabbitmqctl forget_cluster_node <node> でいったんクラスタから削除した上で、クラスタへ再ジョインする必要がある。

# 既存のノードで
rabbitmqctl forget_cluster_node rabbit@mq03

# リセットしたノードで
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@mq01
rabbitmqctl start_app

キューのミラーリング

キューのミラーリングは1つのマスターと複数のミラーで構成されており、キューに対する操作はまずマスターに適用された後にミラーに伝達される(FIFOを保証するため)。

ミラーリングはポリシーを用いて有効化する。ミラーリングには複数のモードがあって ha-mode ポリシーキーで指定する。

exactly モードはキューのレプリカ数(マスターとミラーの合計数)を ha-params で指定する。ha-params: 1 はマスターのみでミラー無しを意味する。キューのマスターが故障すると自動的に他のミラーがマスターに昇格する。クラスタのノードがレプリカ数よりも少ない場合、すべてのノードにミラーされる。クラスタのノードがレプリカ数よりも多い場合、ミラーのいずれかのノードが故障すると別のノードが新たにミラーになる。

all モードはすべてのノードにミラーする。新しいノードが追加されるとそのノードにもミラーリングされる。

nodes モード は ha-params で指定したノードのリストにミラーされる。指定したノードのいずれかが実際のクラスタのメンバーに無くてもエラーにはならず、キューが宣言されたときにどのノードもクラスタに存在しなければ、クライアントが接続されているノードにキューが作成される。

レプリカ数はどれぐらい?

all モードですべてのノードにミラーすると、ネットワークI/O、ディスクI/O、ディスク領域、などでノードに負荷が大きいので、クラスタノードのクオラム数(いわゆる過半数)をレプリカ数にするのがおすすめ。

もしくは、要件によってはキューのメッセージは一過性のものだったり時間にセンシティブだったりするので、一部のキューでは少数のミラーやそもそもミラーリングしないようにすることもありうる。

キューマスターの位置

キューマスターが配置されるノードは下記のいずれかで指定できる。上にあるものほど優先される。

  • キュー宣言時の x-queue-master-locator
  • ポリシーキー queue-master-locator
  • 設定ファイルの queue_master_locator

以下のいずれかが指定できる。

  • min-masters
    • マスターキューが最も少ないノード
  • client-local
    • キューを宣言するクライアントが接続したノード
  • random
    • ランダム

設定ファイルの queue_master_locator のデフォルトが client-local なのでなにも指定しなければ client-local になります。なので最初に接続したプロセスが必要なキューを全部一括で宣言すると、そのプロセスが接続したノードにマスターキューが偏ります。

マスターキューの障害

ミラーリングされたキューからメッセージを取得しているコンシューマがいるとき、そのミラーのマスターが配置されているノードの障害によってキューがフェイルオーバーしても、コンシューマーにはなにも通知されずに自動的に新しいマスターから透過的にメッセージが配信されるようになります。

もちろん、コンシューマーが接続しているノード自身で停止するとその時点で接続が切れます。コンシューマーが接続しているノードとは別のノードにマスターがあって、そのノードが障害でとまったときの話です。

このとき、マスターキューがフェイルオーバーするとどのメッセージがコンシューマーに送信済かわからなくなるため、フェイルオーバー後にすべての ACK されていないメッセージが redelivered フラグを付けて再配信されます。そのためコンシューマーは同じメッセージを再度受信する可能性があります。

basic_consumex-cancel-on-ha-failover: true を指定すると、マスターキューのノードが停止してフェイルオーバーしたときに、キューの basic_consume がキャンセルされてコンシューマーに通知されるため、それを以てコンシューマーはフェイルオーバーしたことを知ることができる。

php-amqplib ならコンシューマーのコードは次のようになります。

$args = new AMQPTable();
$args->set('x-cancel-on-ha-failover', true);
$channel->basic_consume(
    'hello_queue',
    '',
    false, // $no_local = false,
    false, // $no_ack = false,
    false, // $exclusive = false,
    false, // $nowait = false,
    function (AMQPMessage $msg) use ($channel) {
        // ...
    },
    null,
    $args
);

次のような例外として通知されます。

PhpAmqpLib\Exception\AMQPBasicCancelException: Channel was canceled

ポリシーの変更による再構成

キューのポリシーが変更されたとき、なるべく既存のミラーを維持するように構成が自動で変更される。

nodes モードでポリシーを変更したとき、新しい ha-params のリストに以前のマスターが存在しない場合、ミラーのいずれかの同期が完了するのを待ってから既存のマスターキューが削除され、同期済のミラーが新しいマスターになる。

新しいノードのミラーへの同期

クラスタに新しいノードが追加されてそのノードにミラーが追加されたとき、デフォルトの動作ではミラーのキューは空となり既存のメッセージはそのミラーには存在しない。それ以降に新しくパブリッシュされたメッセージのみがそのミラーには含まれる。マスターキューの既存のメッセージがすべて ACK されて削除されると、ミラーキューは完全に同期された状態になる。

rabbitmqctl sync_queue $queue_name などで手動で同期することもできるが、同期中はキューが応答しなくなる。

ポリシーキー ha-sync-mode で新しいミラーが追加されたときにマスターキューにある既存のメッセージを同期するかどうか指定できる。

  • ha-sync-mode: manual
    • 新しいミラーは既存のメッセージは同期せずに新しくパブリッシュされたメッセージのみを受信する
    • これがデフォルト
  • ha-sync-mode: automatic
    • 新しいミラーには既存のメッセージが同期される
    • キューの同期中はキューへのアクセスがブロックされる
    • キューが十分小さく収まるならこれでも良いこともある

完全には同期されていないノードをマスターの停止や障害時のフェイルオーバー先にするかどうかを、以下のポリシーキーで制御できる。

  • ha-promote-on-shutdown
    • マスターキューのノードをシャットダウンするとき
    • when-synced がデフォルト
  • ha-promote-on-failure
    • マスターキューのノードが障害などで利用不可能になったとき
    • always がデフォルト

指定できる値は次の通り。

  • when-synced
    • 完全に同期されているノードのみをフェイルオーバー先に選択する
    • 完全に同期しているノードがなければマスターキューのノードを復帰させるまでそのキューは利用できなくなる
    • もしマスターキューのノードが復帰不可能だとキューを再作成するしかなくなる
  • always
    • 完全に同期されていないノードでもフェイルオーバー先にする

バッチ同期

ミラーリングキューの同期は最大で ha-sync-batch-size で指定した数のメッセージがバッチで送信される。

ノード間の通信が平均のメッセージサイズから求められるバッチ処理の最大サイズのトラフィックに耐えられるか確認しておく必要がある。もしそのトラフィックが net_ticktime よりも長い時間を要するのであればそれが原因でネットワーク分断になってしまうかもしれない。

ポリシーでキューにミラーリングを設定。

rabbitmqctl set_policy ha '^(?!amq\.).*' '{"ha-mode":"exactly","ha-params":2,"queue-master-locator":"min-masters"}' --apply-to queues

正規表現で amq. から始まるキューを除外しています。キューの宣言時に名前を指定せずに匿名キューになったといに amq. から始まる名前に付けられるよです。つまりこのように指定すれば匿名キューはミラーリングから除外されます。がしかし・・そもそも匿名キューは排他キューとしてしか使うことはなさそうなので、もともとミラーリングされることは無いわけで、正規表現は ^ とかですべてのキューを対象にするので良いような気もします。

キューのミラーリング関係の情報を表示(ポリシー、マスターノード、ミラーノード)

rabbitmqctl list_queues name policy pid slave_pids -q
# name            policy  pid                     slave_pids
# hello_queue     ha      <rabbit@mq01.1.2712.0>  [<rabbit@mq02.3.1372.0>]
# hoge_queue      ha      <rabbit@mq01.1.2725.0>  [<rabbit@mq03.2.1134.0>]

ハートビート

RabbitMQ では、RabbitMQ のノード間の通信と、RabbitMQ とクライアント接続の、2種類のハートビート交換が行われている。

ノード間接続(Net Tick Time)

クラスタのノードのすべてのピア間で定期的に Tick message の交換が行われる。net_ticktime 設定値の 1/4 の頻度で交換され、net_ticktime の期間ずっと通信がなければそのピアはダウンしたとみなされる。

net_ticktime の値は advanced.config で次のように指定できる。デフォルトは 60 です。

[
  {kernel, [{net_ticktime,  120}]}
].

クライアント接続のハートビート

RabbitMQ とクライアントとの間でもハートビートが交換されており、タイムアウト時間は heartbeat で指定可能でデフォルトは 60 秒。タイムアウト時間の 1/2 の秒数の頻度でハートビートの交換は行われる。

ネットワーク分断

クラスタのノード間の接続が失われてからその接続が復活したとき、双方が互いに相手方がダウンしていたと判断しているとネットワーク分断によるスプリットブレインになる。

ネットワーク分断が発生しているかは rabbitmqctl cluster_status コマンドで partitions が空かどうかで判断できる。分断が発生していないなら空です。

正常な状態

rabbitmqctl cluster_status
#=> Cluster status of node rabbit@mq01 ...
#=> [{nodes,[{disc,[rabbit@mq01,rabbit@mq02,rabbit@mq03]}]},
#=>  {running_nodes,[rabbit@mq03,rabbit@mq02,rabbit@mq01]},
#=>  {cluster_name,<<"rabbit@mq01">>},
#=>  {partitions,[]},
#=>  {alarms,[{rabbit@mq03,[]},{rabbit@mq02,[]},{rabbit@mq01,[]}]}]

cluster partition handling

設定 cluster_partition_handling でネットワーク分断発生時にどうするかが指定できる。

  • ignore
  • pause-minority
  • pause-if-all-down
  • autoheal

ignore ならネットワーク分断が発生してもそれぞれのパーティションは独立して稼働し、他方のパーティションのノードがクラッシュしたと判斷する。ミラーリングキューはそれぞれのパーティションにマスターが配置され、両方が独立して動作する。ネットワーク接続が回復した後も分断はそのままで、パーティションごとに独立して動作する。

pause-minority は、ネットワーク分断が発生した時点でクラスタ全体のノード数の過半数に満たないパーティションは自動的に一時停止し、接続が回復した時点で自動的に開始する。これはスプリットブレインは発生しないので矛盾なくネットワーク分断から回復できる。

pause_if_all_down は追加のパラメータで複数のノードを指定し、ネットワーク分断が発生したときは指定された複数のノードのいずれにも到達できないノードを自動的に一時停止する。例えば2つのラックに2台ずつの合計4台のクラスタで、ラック間の通信に障害が発生したときに、どちらのラックのノードを優先するかを設定したりできる。pause-minority とは異なり、リストされた複数のノードが分断されると接続が回復したときにスプリットブレインになる。追加のパラメータ cluster_partition_handling.pause_if_all_down.recover でこのときにどうすうるかを指定できる(ignore または autoheal)。

autoheal は、ネットワーク分断が発生した時点では ignore と同様になにも起こらず、接続が回復した時点で下記の方法でパーティションをいずれか 1 つ選択し、それ以外のパーティションのノードを再起動する。

  • 最も多くのクライアントが接続されているパーティション
  • ↑が引き分けなら、ノードが最も多いパーティション
  • ↑が引き分けなら、ランダムに 1 つのパーティションを選択

どのモードを選ぶべき?

  • ignore
    • ネットワークはとても信頼できる
    • すべてのノードがラックにあって、スイッチで接続されていて、そのスイッチは外部へのルートでもある
    • クラスタの一部で障害が発生したときにクラスタ全体がシャットダウンするリスクを負いたくない
    • 2ノードクラスタである
  • pause-minority
    • ネットワークの信頼性が高くない
    • 複数のデータセンタにまたがってクラスタ化されていて一度に1つのデータセンタのみに障害が発生すると想定している
      • これらのデータセンタにはEC2のアベイラビリティーゾーンのように相互に直接かつ信頼性の高い接続が必要
  • autoheal
    • ネットワークが信頼できない可能性がある
    • データの整合性よりもサービスの継続性を重視する
    • 2ノードクラスタである

スプリットブレインからの復旧

ネットワーク分断が発生した後に接続が回復し、それぞれのパーティションが独立に動作するスプリットブレインになった場合、分断したパーティションから一番信頼できるものを選択し、それ以外のパーティションのノードをすべて再起動すると復旧できる。

参考