Grafana Loki を素振りしたメモ

[Loki]Grafana Loki を素振りしたメモ

Grafana Lokiちょっと前に v1.0.0 がリリースされて GA になっていた ので素振りしました。

試したバージョンは v1.2.0 です。Kubernetes とかはよくわからないのでスルーしてます。

概要

Grafana Loki は Grafana Labs で作られているログストレージ&クエリエンジンです。単体だとログを貯めるしかできませんが Grafana でデータソースとして指定して貯めたログを閲覧できます。

Loki へログを送るには Promtail というエージェントを使います(他にもいくつか公式でサポートされている方法はあります)。

似たようなプロダクトでは「EFKスタック」として知られる ElasticSearch/Fluentd/Kibana が有名です。

Grafana と Loki

Grafana と Loki を Docker でサクッと実行します。

version: '3.7'

services:
  grafana:
    container_name: grafana
    image: grafana/grafana
    ports:
      - '3000:3000'

  loki:
    container_name: loki
    image: grafana/loki
    ports:
      - '3100:3100'

Promtail

Promtail を適当な Apache が動いている Linux サーバにインストールして実行します。下記から最新版の URL を調べてきて、

https://github.com/grafana/loki/releases

適当なディレクトリにダウンロードします。

curl -O -L https://github.com/grafana/loki/releases/download/v1.2.0/promtail-linux-amd64.zip
unzip promtail-linux-amd64.zip
chmod +x promtail-linux-amd64

Promtail の設定ファイル promtail.yml を作成します。

server:
  # 9080 ポートで HTTP でリッスンする
  # ブラウザで開くとログのディスカバリの状態が見れます
  http_listen_port: 9080

  # grpc のリッスンポート、0 ならランダムに決定される・・・これなにに使っているの?
  grpc_listen_port: 0

clients:
  # Loki の URL を指定
  - url: http://localhost:3100/loki/api/v1/push

# ログのディスカバリの設定
scrape_configs:

  # Apache のアクセスログを収集
  - job_name: access_log
    static_configs:
      - targets:
          # targets は localhost または自ホスト名のどちらかを指定する必要がある
          # とドキュメントに書いてた気がするけど何でも通る? というかこれ指定できる意味あるの?
          - localhost
        labels:
          # __path__ ラベルでファイル名を指定する、ワイルドカードも指定可能
          __path__: /var/log/httpd/access_log

設定ファイルを指定して開始します。

./promtail-linux-amd64 -config.file ./promtail.yml

docker-compose で立ち上げていた Grafana にログインしてデータソースに Loki を http://loki:3100 のような URL で追加します。 上手く追加できれば、Explore の画面から Aapche のアクセスログが閲覧できます。

Apache のログをラベル付け

先程の設定ではログに filename ラベルで /var/log/httpd/access_log という値が付与されます。static_configs でログファイルを収集すればこのラベルは自動的に付与されます。__path__ にはワイルドカードが指定できますが filename は実際のファイル名になるので、複数のログにマッチしてもこのラベルで区別できます。

pipeline_stages でログの内容に基づいてもっと細かな制御ができます。

scrape_configs:
  - job_name: access_log
    static_configs:
      - targets:
          - localhost
        labels:
          __path__: /var/log/httpd/access_log
    pipeline_stages:
      - regex:
          # 正規表現で名前付きキャプチャ
          expression: |-
            ^(?P<addr>\S+)\s+\S+\s+\S+\s+\[(?P<time>.*?)\]\s+"(?P<method>\S+)\s+(?P<path>\S+)\s+\S+"\s+(?P<status>\S+)\s+
      - labels:
          # ↑でキャプチャした名前・値でラベルを付与
          addr:
          method:
          path:
          status:
      - timestamp:
          # ↑でキャプチャした値でログのタイムスタンプを上書き、デフォはログが収集された時間です
          source: time
          # golang での日時のフォーマット指定
          format: '02/Jan/2006:15:04:05 -0700'

この例では regex で正規表現で名前付きキャプチャした値を元にラベルを付与し、さらにログに出力されている日時からタイムスタンプを抽出しています。

次のように Apache のログをステータスコードやパスでフィルタして見たりできます。

f:id:ngyuki:20200103231223p:plain

pipeline_stages では他にもいろいろ利用可能です。詳細は下記にまとまっています。

https://github.com/grafana/loki/blob/v1.2.0/docs/clients/promtail/pipelines.md

systemd journal ログを収集

Promtail は systemd の journal ログも収集できます。scrape_configsjournal で指定します。

scrape_configs:
  # systemd の journal からログを収集
  - job_name: systemd
    journal:
      # Promtail の起動時にどこまで過去のログを読むかを指定
      max_age: '1h'
      # journal ログのディレクトリ
      path: /run/log/journal
    relabel_configs:
      # session-12345.scope のようなユニットは除外
      - source_labels: [__journal__systemd_unit]
        regex: ^session-\d+.scope$
        action: drop
      # ユニット名でラベルを付ける
      - source_labels: [__journal__systemd_unit]
        target_label: systemd

収集したログには __journal__systemd_unit のようなラベルが付与されます。先頭が __ のラベルは Loki へ送信する際に除去されるため、これらのラベルはそのままではラベルとして残りません。

↑の例では relabel_configs で、ラベルの値が特定のパターンに一致するログを除外したり、ユニット名も Loki へ送信するためにラベルを付け直したりしています。

Prometheus でも relabel_configs という設定がありますが、だいたい同じように使えます。

次のように、systemd のユニットごとのログが見れたりします。

f:id:ngyuki:20200103231252p:plain

なお、下記によるとラベルは元の journal のエントリのフィールド名を小文字化したものにプレフィックス __journal_ を付与して付けられるようです。

https://github.com/grafana/loki/blob/v1.2.0/pkg/promtail/targets/journaltarget.go#L307

どのようなフィールドがあるかは journalctl -o verbose とか journalctl -o json-pretty とかで見られます。

syslog

Promtail は syslog サーバとしてリッスンして syslog プロトコルでログを受信することもできます・・・のですが、最新の v1.2.0 だと未サポートで master のバージョンじゃないとまだ syslog は使えませんでした。きっと次のバージョンでは syslog も使えるようになります。

metrics stage

pipeline_stagesmetrics stage を使うと、ログを Loki に送信するのではなく、パターンにマッチしたログの出現回数をカウントし、Prometheus から参照可能な形式で /metrics で公開できます。

scrape_configs:
  - job_name: access_log
    static_configs:
      - targets:
          - localhost
        labels:
          __path__: /var/log/httpd/access_log
    pipeline_stages:
      - regex:
          expression: |-
            ^(?P<addr>\S+)\s+\S+\s+\S+\s+\[(?P<time>.*?)\]\s+"(?P<method>\S+)\s+(?P<path>\S+)\s+\S+"\s+(?P<status>\S+)\s+
      - timestamp:
          source: time
          format: '02/Jan/2006:15:04:05 -0700'
      - metrics:
          # log_lines_total というメトリクス名で公開
          log_lines_total:
            # カウンターとして公開、他にも Gauge や Histogram が指定可能
            type: Counter
            description: total number of log lines
            # パイプラインで time というフィールドがある時だけ処理する
            source: time
            config:
              # メトリクス値をインクリメントする
              # Counter なら inc 以外に add も指定できる(source の値で加算される)
              action: inc

この設定で、下記のようなメトリクスが Promtail の HTTP エンドポイントの /metrics で公開されます。

# HELP promtail_custom_log_lines_total total number of log lines
# TYPE promtail_custom_log_lines_total counter
promtail_custom_log_lines_total{filename="/var/log/httpd/access_log"} 11

例えば、ステータスコードごとの統計などをメトリクスとして Prometheus へ入れたりできます。Loki でもログのクエリで似たような集計はできると思いますが、この方法の方が圧倒的に負荷は小さそうですね。

relabel_configs と pipeline_stages

ログに対して正規表現などでアレコレする方法として relabel_configspipeline_stages の2種類ありますが次のような違いがあります。

relabel_configs はログにラベルを条件にラベルやラベルの値を置換したりラベルを付け外ししたりログをドロップしたりできます。要するにラベルにたいして作用し(ドロップもまあ全部のラベルを引っ剥がすと思えば)、ログの内容には手を出しません。

pipeline_stages はログの内容を条件にいろいろできます(ログの内容を変更したり、ラベルを変更したり、タイムスタンプを変更したり)。また、match でラベルも条件にできます。

ログを HTTP で送信

Loki へログを HTTP で直接登録できます。通常であれば Protobuf を使うようなのですが Content-Type: application/json を指定すれば JSON でも遅れます。

curl -H 'Content-Type: application/json' -XPOST -s 'http://localhost:3100/loki/api/v1/push' --data-raw '{
  "streams": [
    {
      "stream": {
        "ore": "are"
      },
      "values": [
          [ "1577433513448137047" , "this is log" ]
      ]
    }
  ]
}'

"stream" の部分でラベルを、"values" のとこでナノ秒単位のタイムスタンプとログのメッセージを指定します。

さいごに

ElasticSearch と比べると Loki はログのラベルのみにインデックスを付けるため軽量で運用が簡単とのことです。ただし、ログのテキストの内容に基づいたクエリは検索範囲内のすべてのログをロードする必要があるためその種のクエリは重くなります。

Promtail は fluentd と比べるとできることが少なそうですが、ワンバイナリで低依存でインストールできるのでとりあえず入れる分には気が楽そうです(というか fluentd 多機能すぎる・・Ruby がバコーンと入るのもどうかと、比べるなら fluentbit の方かな?)。

強いて言えば、Promtail の pipeline_stages のデバッグが辛いです。fluentd なら宛先を stdout にすることで試行錯誤もやりやすかったと思うんですけど、Promtail でもデバッグログを有効にすればパイプラインの途中経過が見られるようなのですが見難すぎる・・pipeline_stages の途中にデバッグステージみたいなのを挟んでピンポイントでログのメッセージ、ラベル、キャプチャなどを標準出力に出したりできないものですかね。

ただ、どうせ Prometheus のために Grafana を立てているならとりあえず promtail も入れておいて pipeline_stages で凝ったことはせずに timestamp だけログの内容から抽出するように設定するぐらいで始めるのでも良いかも。

systemd のテンプレートユニットで同じプロセスを複数立ち上げる

systemd で同じプロセスを複数立ち上げる方法。Supervisord なら numprocs を指定するだけですけど systemd には相当する設定項目がありません。

ので、テンプレートユニットを使って、1つのユニットファイルを元に複数のプロセスを立ち上げます。

worker@.service

[Unit]
After=network.target

[Service]
ExecStart=/path/to/ore-no-worker
Restart=always

[Install]
WantedBy=multi-user.target

ただ、これだとプロセスの数だけ複数回コマンドを打つ必要があって煩雑ですね。

systemctl start worker@1.service && systemctl enable worker@1.service
systemctl start worker@2.service && systemctl enable worker@2.service
systemctl start worker@3.service && systemctl enable worker@3.service
systemctl start worker@4.service && systemctl enable worker@4.service
systemctl start worker@5.service && systemctl enable worker@5.service
systemctl start worker@6.service && systemctl enable worker@6.service
systemctl start worker@7.service && systemctl enable worker@7.service
systemctl start worker@8.service && systemctl enable worker@8.service

次のようにターゲットユニットを使えばスッキリします。

worker@.service

[Unit]
After=network.target
PartOf=worker.target

[Service]
ExecStart=/path/to/ore-no-worker
Restart=always

[Install]

worker.target

[Unit]
Wants=worker@1.service
Wants=worker@2.service
Wants=worker@3.service
Wants=worker@4.service
Wants=worker@5.service
Wants=worker@6.service
Wants=worker@7.service
Wants=worker@8.service

[Install]
WantedBy=multi-user.target

次のようにまとめて管理できます。

# 開始
systemctl start worker.target

# リスタート
systemctl restart worker.target

# 停止
systemctl stop worker.target

# 自動起動
systemctl enable worker.target

プロセス数が変わったとき(worker.targetWants が増減したとき)、systemctl restart だけだと反映されないので停止→開始する必要があります。

systemctl daemon-reload
systemctl stop worker.target
systemctl start worker.target

systemd で Requires/Wants/BindsTo/PartOf を設定したサービスの開始や停止時の伝播のマトリクス

改めて man を読んでもよくわからなかったので表に整理したメモ。

ユニット A のファイルで Requires B のように指定したときの動作。

操作 Requires Wants BindsTo PartOf
start A start B start B start B -
stop A - - - -
restart A - - - -
exit A - - - -
failed A - - - -
start B - - - -
stop B stop A - stop A stop A
restart B restart A - restart A restart A
exit B - - stop A -
failed B - - stop A -

操作 の内容はそれぞれ次の通り。

  • start
    • systemctl start で開始
  • stop
    • systemctl stop で停止
  • restart
    • systemctl restart でリスタート
  • exit
    • サービスが正常開始後に終了コード 0 で停止
  • failed
    • サービスが正常開始後に終了コード 1 で停止

あと、ユニット A で After B を指定してて start A により開始した B が failed になったとき、Requires や BindsTo の場合は A の開始がブロックされる。これは Requires や Wants の違いとしてよく知られているものだけど↑の表にどう書くと良いか思いつかなかったので書いてません。

RabbitMQ を用いた開発のためのメモ

RabbitMQ についてどちらかといえば開発側寄りの視点で下記あたりのドキュメントを読んだメモ。

https://www.rabbitmq.com/documentation.html

メッセージの ack と reject

コンシューマーがメッセージを受信するとき、最終的に basic_ackbasic_reject で応答することでメッセージはキューから削除されます。

<?php
$channel->basic_consume(
    'hello',
    '',
    false, // $no_local
    false, // $no_ack
    false, // $exclusive
    false, // $nowait
    function (AMQPMessage $msg) use ($channel) {
        if (/* some */) {
            // 肯定応答
            $channel->basic_ack($msg->getDeliveryTag());
        } elseif (/* some */) {
            // 否定応答
            $channel->basic_reject($msg->getDeliveryTag(), false);
        }
    }
);

basic_ack だと単にメッセージは削除されます。

basic_reject は、第2引数($requeue)が false だと、もし設定されていればデッドレターへ、設定されていなければメッセージは削除されます。

basic_reject で、第2引数($requeue)が true ならメッセージは再ディスパッチされます。このとき、メッセージの redelivered プロパティが true になります。

<?php
$channel->basic_consume(
    'hello',
    '',
    false, // $no_local
    false, // $no_ack
    false, // $exclusive
    false, // $nowait
    function (AMQPMessage $msg) use ($channel) {
        if ($msg->get('redelivered')) { // 再ディスパッチなら true
            $channel->basic_ack($msg->getDeliveryTag());
        } else {
            // basic_reject で再ディスパッチ
            $channel->basic_reject($msg->getDeliveryTag(), true);
        }
    }
);

次のような状況でメッセージは再ディスパッチされます(redelivered プロパティが設定される)。

  • メッセージを basic_rejectbasic_nack$requeue = false で応答したとき
  • メッセージを basic_ackbasic_rejectbasic_nack せずにコンシューマーの接続が切れてチャンネルが閉じられたとき
  • マスターキューのノードが停止して RabbitMQ どのコンシューマーにメッセージを投げているかわからなくなったとき
  • 他にも?

Exchange の type

queue_declare で Exchange を宣言するときに指定する $type について。

default

事前に宣言されている空の名前の Exchange で、キューの作成時にキューと同じ名前のルーティングキーで自動的にバインドされる。

<?php
// キューを宣言
$channel->queue_declare('hello_queue');

// Exchange を明示的に宣言せずに $exchange は空で $routing_key にキューの名前を指定して送る
$channel->basic_publish($message, '', 'hello_queue');

direct

メッセージのルーティングキーに一致するバインディングキーでバインドされているキューに配信する。

<?php
// キューを宣言
$channel->queue_declare('hello_queue');

// Exchange を宣言してバインド
$channel->exchange_declare('hello_exchange', 'direct');
$channel->queue_bind('hello_queue', 'hello_exchange', 'hello_routing');

// メッセージは $exchange と $routing_key を指定して送る
$channel->basic_publish($message, 'hello_exchange', 'hello_routing');

複数のキューが同じバインディングキーでバインドしているときは、1つのメッセージが複数のキューに配信される。

fanout

ルーティングキーを無視して Exchange にバインドされているすべてのキューにメッセージを配信する。

<?php
// キューを宣言
$channel->queue_declare('hello_queue');

// Exchange を宣言してバインド
$channel->exchange_declare('hello_exchange', 'fanout');
$channel->queue_bind('hello_queue', 'hello_exchange');

// メッセージは $exchange だけ指定して送る
$channel->basic_publish($msg, 'hello_exchange');

// ルーティングキーは指定しても無視される
$channel->basic_publish($msg, 'hello_exchange', 'hello_routing');

topic

メッセージのルーティングキーにはドット区切りで複数の単語を指定する。

Exchange のバインディングキーもドット区切りだけれども *# のようなワイルドカードが使用できる(メッセージのルーティングキーにはワイルドカードは使用できない)。

  • * 任意の 1 つの単語
  • # 0 個以上の任意の単語
<?php
// キューを宣言
$channel->queue_declare('hello_queue');

// Exchange を作成してワイルドカードを含むバインディングキーでバインド
$channel->exchange_declare('hello_exchange', 'topic');
$channel->queue_bind('hello_queue', 'hello_exchange', 'aaa.*.zzz');

// ワイルドカードにマッチするルーティングキーを指定してメッセージをディスパッチ
$channel->basic_publish($msg, 'hello_exchange', 'aaa.xxx.zzz');

header

ルーティングキーを無視して、メッセージのヘッダーを元にルーティングする・・・よくわからん??

キューや Exchange やメッセージの永続化

キューや Exchange はデフォルトでは RabbitMQ を再起動すると削除されますが、queue_declareexchange_declare$durable = true を指定すると永続化され、再起動しても残るようになります。

<?php
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    true,  // $durable
    false, // $exclusive
    false  // $auto_delete
);

$channel->exchange_declare(
    'hello_exchange',
    'direct',
    false, // $passive
    true,  // $durable
    false  // $auto_delete
);

なお、キューの中のメッセージも永続化する場合は、queue_declare でキューを永続化するように宣言しつつ、個々のメッセージにも次のように永続化を指定する必要があります。

<?php
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

キューや Exchange の自動削除

queue_declareexchange_declare$auto_delete = true を指定すると、キューは接続しているコンシューマーがすべていなくなったときに、Exchange はバインドされているキューがすべて無くなったときに、自動的に削除されます。

キューはメッセージが残っていても削除されます。

<?php
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    false, // $durable
    false, // $exclusive
    true   // $auto_delete
);

$channel->exchange_declare(
    'hello_exchange',
    'direct',
    false, // $passive
    false, // $durable
    true   // $auto_delete
);

なお、RabbitMQ を再起動すると(あたりまえですが)コンシューマーからの接続はいったんすべて切れるため、$auto_delete = true で自動削除を有効にしていると $durable = true で永続化を有効にしていても再起動で削除されます。$durable = true なら自動的に $auto_delete = false になるかと思いきやそんなことはありません。ので $durable = true なら $auto_delete = false にしないと意味が無さそうです。

排他キュー

queue_declare$exclusive = false を指定して宣言すると、その接続専用の排他的に利用できるキューが作成されます。排他キューを他の接続が consume しようとするとエラーになります。

<?php
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    false, // $durable
    true,  // $exclusive
    false  // $auto_delete
);

排他キューはその接続専用なので $auto_delete の指定に関わらず接続が終われば削除されます。また、永続化されることもないし、ミラーリングが設定されていたとしてもミラーリングされません。

次のように匿名キューを用いたブロードキャストに利用できます。

<?php
// 匿名の排他キューを作成
list($queue_name) = $channel->queue_declare(
    '',
    false, // $passive
    false, // $durable
    true,  // $exclusive
    false  // $auto_delete
);

// Exchange を宣言してバインド
$channel->exchange_declare(
    'hello_broadcast',
    'fanout',
    false, // $passive
    true,  // $durable
    false  // $auto_delete
);
$channel->queue_bind($queue_name, 'hello_broadcast');

// 匿名キューを consume する
$channel->basic_consume($queue_name, '', false, false, false, false, function ($msg) {
    // ...
});

while ($channel->is_consuming()) {
    $channel->wait();
}
<?php
// メッセージをパブリッシュ
$channel->basic_publish($msg, 'hello_broadcast');

メッセージのプリフェッチ

RabbitMQ は複数のコンシューマーに対して単純にラウンドロビンでメッセージを配送します。また、コンシューマーが未ACKなメッセージを持っていたとしても構わず新しいメッセージをディスパッチするため、コンシューマーには複数の未ACKなメッセージが溜まることになります。

例えば 2 つのコンシューマーがいるとき、奇数番目のメッセージがすべて重く、偶数番目のメッセージがすべて軽い場合、重い奇数番目のメッセージが 1 つのコンシューマーに集中してしまい、もう片方のコンシューマーが空いているにも関わらず奇数番目のメッセージがなかなか処理されないことがある。

  • 1.重いメッセージ → コンシューマーA
  • 2.軽いメッセージ → コンシューマーB
  • 3.重いメッセージ → コンシューマーA ※「1.重いメッセージ」を処理中でもメッセージがディスパッチされる
  • 4.軽いメッセージ → コンシューマーB

次のように basic_qos$prefetch_count を指定すると、コンシューマーにディスパッチされている未 ACK なメッセージの上限が設定できる。1 を指定すれば、あるコンシューマーがなにかしらメッセージを処理中なら(=未ACKなメッセージがあるなら)コンシューマーへはメッセージが配信されません。

<?php
$channel->basic_qos(
    0,    // $prefetch_size
    1,    // $prefetch_count
    false // $a_global
);
  • 1.重いメッセージ → コンシューマーA
  • 2.軽いメッセージ → コンシューマーB
  • 3.重いメッセージ → コンシューマーB ※「コンシューマーA」は「1.重いメッセージ」が未ACKなので別のコンシューマーに送られる
  • 4.軽いメッセージ → ※「1.重いメッセージ」と「3.重いメッセージ」の先に終わった方に送られる

第3引数の $a_global はプリフェッチ数がどの範囲に適用されるかの指定ですが、AMQP 0-9-1 の仕様と RabbitMQ とで差があるようです。

$a_global AMQP 0-9-1 RabbitMQ
false チャンネル全体 コンシューマ個別
true 接続全体 チャンネル全体

$prefetch_size は RabbitMQ では実装されていないとのこと。

no_ack

basic_consume$no_ack = true にすると、コンシューマーにメッセージがディスパッチされた時点でメッセージがキューから削除されるため basic_ackbasic_reject は不要になります。

<?php
$channel->basic_consume(
    'hello',
    '',
    false, // $no_local
    true,  // $no_ack
    false, // $exclusive
    false, // $nowait
    function (AMQPMessage $msg) use ($channel) {
        // ...
    }
);

この方法は RabbitMQ と余分なやり取りがなくなるのでスループットが向上しますが、コンシューマーが突然死などするとメッセージが失われます。メッセージが削除されるのはコンシューマーのコールバックが呼ばれたときではなくコンシューマーにメッセージがディスパッチされたときなので、処理中の1つのメッセージが失われるだけではなく、そのコンシューマーにディスパッチしているすべてのメッセージが失われます。

なお、プリフェッチ数(basic_qos$prefetch_count)は「未ACKのメッセージ数」の上限なので、ACK不要な $no_ack = true だと効果ありません。

ところで $no_ack = true なコンシューマーを停止するときは、そのコンシューマーへのディスパッチを停止したうえでディスパッチ済のメッセージをすべて処理してから停止するのが良いと思うんですけど・・それどうすればいいんですかね?ちょっとわかりませんでした。それができないとコンシューマーの再起動時に確実にメッセージが失われそうな・・それでも構わないぐらいのケースで使うものですかね。

メッセージのTTL

メッセージの TTL はキューとメッセージの両方に指定できる。

# ポリシーで指定
rabbitmqctl set_policy my_ttl '.*' '{"message-ttl":60000}' --apply-to queues
<?php
// キュー宣言で指定
$args = new AMQPTable();
$args->set('x-message-ttl', 60000);
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    true,  // $durable
    false, // $exclusive
    false, // $auto_delete
    false, // $nowait
    $args
);
<?php
// メッセージごとに指定
$msg = new AMQPMessage($data, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'expiration' => 60000,
]);

有効期限が切れたメッセージは後述のデッドレターに送られるかもしくは削除される。

メッセージが再ディスパッチされるとき(basic_reject$requeue = false など )、メッセージの元の有効期限は保持される。つまりメッセージがキューに入った時点で TTL を元に有効期限の日時が確定し、その後 basic_reject などで再ディスパッチされたとしても有効期限の日時が再計算されることはない。

TTL が 0 だとメッセージがパブリッシュされた後直ぐにコンシューマーに送られない限り、メッセージは期限切れになる。

キューとメッセージの両方で TTL が指定されている場合は、より小さい方が採用される。

キューのTTL

キューにも TTL を設定することができる。前述の「キューのポリシーや宣言で指定するメッセージの TTL」ではなく、キュー自体の TTL です。

キューに TTL を設定すると、コンシューマーが居ないなどで一定時間使用されていないキューを自動で削除できる。

# ポリシーで指定
rabbitmqctl set_policy my_expiry '.*' '{"expires":1800000}' --apply-to queues
<?php
// キュー宣言で指定
$args = new AMQPTable();
$args->set('x-expires', 1800000);
$channel->queue_declare(
    'hello_queue',
    false, // $passive = false,
    true,  // $durable = false,
    false, // $exclusive = false,
    false, // $auto_delete = true,
    false, // $nowait = false,
    $args
);

キューが TTL で削除されるとき、そのキューにメッセージが残っているとデッドレターに送られることもなくキューと一緒に削除される。

デッドレター

メッセージは次のいずれかでデッドレターになる。

  • basic_rejectbasic_nack$requeue = false で呼ばれたとき
  • メッセージが TTL により期限切れになったとき
  • キューの長さ制限を超えたとき
  • 他にも?

このときキューにデッドレターの宛先の Exchange が指定されていると、メッセージはその Exchange に 送られます(指定していなければメッセージは削除される)。

# ポリシーで指定
rabbitmqctl set_policy my_dead '.*' '{"dead-letter-exchange":"dead_exchange"},"dead-letter-routing-key":"dead_route"' --apply-to queues
<?php
// キュー宣言で指定
$args = new AMQPTable();
$args->set('x-dead-letter-exchange', 'dead_exchange');
$args->set('x-dead-letter-routing-key', 'dead_route');
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    true,  // $durable
    false, // $exclusive
    false, // $auto_delete
    false, // $nowait
    $args
);

デッドレターメッセージのルーティング

キューに dead-letter-routing-key が設定されているならそのルーティングキーで dead-letter-exchange へ送られる。 設定されていないなら「そのメッセージがパブリッシュされたときの元のルーティングキー」で dead-letter-exchange へ送られる(ルーティングキーが空になるわけではない)。

デッドレターメッセージの書き換え

デッドレターメッセージにはそのメッセージの元の配送先のキューなどの情報がヘッダーに追加される。

<?php
$channel->basic_consume('dead_queue', '', false, true, false, false, function (AMQPMessage $msg) {
    $headers = $msg->get('application_headers');
    assert($headers instanceof AMQPTable);
    $data = $headers->getNativeData();
    var_dump(
        $data['x-first-death-exchange'],
        $data['x-first-death-queue'],
        $data['x-first-death-reason'],
        $data['x-death']
    );
});

元のメッセージに expiration が指定されていてもデッドレターメッセージでは除去される。その他のプロパティは元のメッセージのものがそのまま引き継がれる。

優先度付きキュー

キューの宣言時にそのキューがサポートする最大の優先度を x-max-priority で指定すると、そのキューは優先度付きキューとなる。

<?php
// キュー宣言で指定
$args = new AMQPTable();
$args->set('x-max-priority', 10);
$channel->queue_declare(
    'hello_queue',
    false, // $passive
    true,  // $durable
    false, // $exclusive
    false, // $auto_delete
    false, // $nowait
    $args
);

10 を指定すると 0 ~ 10 の合計 11 個の優先度が設けられることになる。x-max-priority は最大で 255 まで指定できるものの、優先度の数ごとに余分なリソースを食らうので(CPU・メモリ・ディスク)、大量のレベルは作成しないほうが良く、多くても 10 ぐらいまでが奨励される。

パブリッシャーはメッセージの priority プロパティでメッセージの優先度を指定する。大きいほうが優先度は高い。

<?php
$msg = new AMQPMessage($data, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'priority' => 5,
]);

優先度が指定されないメッセージは優先度 0 となる。キューの x-max-priority よりも大きい優先度がメッセージに指定されたときは最大値と同じとして扱われる。

basic_qos でメッセージのプリフェッチ数を制限していないと、コンシューマーがすべてビジーだとしてもメッセージが RabbitMQ へパブリッシュされた時点で直ちにコンシューマーへ配信されるため、優先度の意味がほとんど無くなるので注意(来たものから最速でコンシューマーにディスパッチされるため)。

キューの x-max-priority はポリシーでは定義できない。なぜなら TTL やキューの長さの制限は動的なものなのでキューの宣言後でもポリシーで変更できるが、キューの優先度の最大値はキューの宣言後に変更できないため。

HTTP API でパブリッシュ

Management plugin が有効になっていれば HTTP API でもメッセージをパブリッシュできる。

curl -u 'ore:pass' -H 'content-type:application/json' -XPOST "http://localhost:15672/api/exchanges/%2f/hello_exchange/publish" --data-raw '{
  "properties":{
    "delivery_mode": 2,
    "priority": 5
  },
  "routing_key":"hello_route",
  "payload":"hoge",
  "payload_encoding": "string"
}'

ただ、1回の接続で複数のメッセージを送る分には AMQP のバイナリプロトコルの方が圧倒的に早いし、1回パブリッシュするたびに接続し直す場合でも AMQP の方がちょっと早いぐらいなので、HTTP API でパブリッシュするメリットは特にありません。

メッセージをキューの後ろに回す

メッセージを basic_rejectbasic_nack$requeue = true を指定してメッセージが再キューイングされるとき、メッセージはキューの元の位置に入るため、キューの後ろに同じメッセージが詰まっていても同じメッセージが繰り返しディスパッチされます。

再キューイングのメッセージは後回しにして詰まっているメッセージを先に処理したいときは、次のようにメッセージを ACK した上で再パブリッシュすればできます。

<?php
$channel->basic_consume(
    'hello',
    '',
    false, // $no_local
    false, // $no_ack
    false, // $exclusive
    false, // $nowait
    function (AMQPMessage $msg) use ($channel) {
        // 同じメッセージを再作成して同じ宛先へ再パブリッシュ
        $new = new AMQPMessage($msg->body, $msg->get_properties());
        $channel->basic_publish($new, $msg->delivery_info['exchange'], $msg->delivery_info['routing_key']);
        $channel->basic_ack($msg->getDeliveryTag());
    }
);

注意すべき点として、メッセージやキューにメッセージの TTL が指定されている場合、普通に basic_reject で再キューイングされたときは一番最初にメッセージがキューに入ったときから起算して有効期限が適用されるのに対して、↑の方法だとメッセージを再作成するたびに有効期限が計算され直すため、いつまでもメッセージが有効期限切れにならないことがあります。

メッセージを遅延させる

メッセージをパブリッシュするときに指定した秒数だけコンシューマーへ配信するのを遅延させる方法。普通にはできませんが、rabbitmq-delayed-message-exchange プラグインを有効にすればできます。

次のようにインストールします。

# プラグインのインストールディレクトリを調べる
rabbitmqctl eval 'application:get_env(rabbit, plugins_dir).'
#=> {ok,"/opt/rabbitmq/plugins"}

# プラグインのインストール
curl -sL https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez \
  -o /opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.8.0.ez

# プラグインの有効化
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Exchange の typex-delayed-message を指定します。本来の typex-delayed-type 引数で指定します。

<?php
$args = new AMQPTable();
$args->set('x-delayed-type', 'direct');
$channel->exchange_declare(
    'hello',
    'x-delayed-message',
    false, // $passive = false,
    false, // $durable = false,
    true,  // $auto_delete = true,
    false, // $internal = false,
    false, // $nowait = false,
    $args
);

パブリッシャーは x-delay ヘッダーで遅延時間をミリ秒で指定します。

<?php
$msg = new AMQPMessage($data, [
    'application_headers' => new AMQPTable(['x-delay' => 1000]),
]);
$channel->basic_publish($msg, 'hello');

README によるとこのプラグインは実験的ではあるものの安定しており、README に記載の制限を理解しているならプロダクションで使っても良さそうとのことです。

メッセージを遅延させる(非プラグイン)

rabbitmq-delayed-message-exchange を使わずにやる方法。TTLとデッドレターを使えばできます。

<?php
$args = new AMQPTable();
// デッドレターの宛先として default exchange を指定する
$args->set('x-dead-letter-exchange', '');
// ルーティングキーは指定しないのでデッドレターは元のルーティングキーが維持される
//$args->set('x-dead-letter-routing-key', '');
$channel->queue_declare('delay',
    false, // $passive
    false, // $durable
    false, // $exclusive
    true,  // $auto_delete
    false, // $nowait
    $args
);

// fanout exchange でルーティングキーを無視して delay キューへ送る
$channel->exchange_declare('delay', 'fanout');
$channel->queue_bind('delay', 'delay');
<?php
// 遅延時間を expiration で指定する(ミリ秒)
$msg = new AMQPMessage($data, ['expiration' => 1000]);

// exchange に delay を、ルーティングキーに遅延して配信したいキュー名を指定
$channel->basic_publish($msg, 'delay', 'hello');

この例では default exchange と fanout exchange を用いて、シンプルに1つの遅延キュー(delay)で複数のキューへ配信できるようにしています。

もっと複雑なルーティングを利用したい場合は最終的な宛先となる Exchange ごとに遅延キューを設けるなどする必要があるかもしれません。

<?php
$args = new AMQPTable();
$args->set('x-dead-letter-exchange', 'hello');
$channel->queue_declare('hello.delay',
    false, // $passive
    false, // $durable
    false, // $exclusive
    true,  // $auto_delete
    false, // $nowait
    $args
);

$channel->exchange_declare('hello.delay', 'fanout');
$channel->queue_bind('hello.delay', 'hello.delay');
<?php
$msg = new AMQPMessage($data, ['expiration' => 1000]);
$channel->basic_publish($msg, 'hello.delay', 'aaa.bbb.ccc');

RabbitMQ のクラスタリングのメモ

RabbitMQ のクラスタリングについて下記あたりのドキュメントを読んだメモ。

なおクラスタリングとは別にインターネット経由のような遅い回線を用いたメッセージの複製にフェデレーションというのもあるけれども、そっちは DR とかの用途のようなのでスルー。

ざっくり

  • キュー以外のすべてのデータや状態(vhost/exchange/user/permission)はクラスタのすべてのノードに自動的に複製される
  • キューは単一のノードに配置することもできるし複数のノードに複製することもできる
    • クライアントが接続したノードに存在しないキューでもクライアントからは透過的に利用できる
  • リーダーノードのようなものはなくクラスタのすべてのノードは同等のピア
    • ただしキューにはマスターキューが1つだけあってその他はミラーリングされたキュー
  • rabbitmq-diagnosticsrabbitmqctl などのコマンドはどのノードで実行してもクラスタのすべてのノードを利用できる
    • Management UI も同様
    • rabbitmq-diagnostics environment などの一部のコマンドは接続したノードのみが対象になる

クラスタの形成方法

クラスタの形成はいくつか方法がありますが RabbitMQ がネイティブでサポートしている方法だけ試しました。

他にも AWS/Kubernetes/Consul/etcd などを用いたオートスケーリングに対応した方法もあります(いずれもプラグインによるサポート)。

rabbitmqctl で手動

rabbitmqctl コマンドで手動でクラスタへジョインする。

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@mq01
rabbitmqctl start_app

設定ファイルにクラスタノードをリスト

設定ファイルにノードの一覧を記述すれば、ノードが空のデータディレクトリから起動したときに自動的にクラスタを形成することができる。

cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq01
cluster_formation.classic_config.nodes.2 = rabbit@mq02
cluster_formation.classic_config.nodes.3 = rabbit@mq03

ノードが起動したとき、まずは設定ファイルに羅列されたピアへ接続を試みる。接続に成功すればそのピアのクラスタにジョインする。もしどのピアとも接続できなければ自身がクラスタの最初のノードとして起動する。

複数のノードを同時に起動すると、それら複数のノードが自身を最初のノードとして起動してしまい、クラスタが形成されない可能性がある。それを回避するために設定ファイルやDNSベースのディスカバリではブート後にピアを探しに行く前にランダムな遅延時間が入る(5〜60秒の範囲)。

DNS ベースディスカバリ

設定ファイルに A または AAAA レコードの名前を記載する。

cluster_formation.peer_discovery_backend = rabbit_peer_discovery_dns
cluster_formation.dns.hostname = discovery.local

この名前をルックアップして得られた IP アドレスのリストを逆ルックアップしてホスト名を得て、そのホスト名にプレフィックス rabbit@ を付与してノード名とする。

例えば、次のようにDNSレコードが登録されています。

mq01.local. IN A 192.0.2.101
mq02.local. IN A 192.0.2.102
mq03.local. IN A 192.0.2.103

discovery.local. IN A 192.0.2.101
discovery.local. IN A 192.0.2.102
discovery.local. IN A 192.0.2.103

.101.0.2.192.in-addr.arpa. IN PTR mq01.local.
.102.0.2.192.in-addr.arpa. IN PTR mq02.local.
.103.0.2.192.in-addr.arpa. IN PTR mq03.local.

クラスタの再起動

クラスタのノードを再起動したとき、オンラインなノードに対して30秒10回のタイムアウトでそのピアとの通信を試みる。接続できればそのピアから同期して正常に起動する。ピアが使用可能にならないなら再起動したノードはあきらめて停止する。ただし、ノードをシャットダウンしたときにオンラインな他のピアが無かった場合、そのノードは再起動時に既知のピアとの同期を施行せずに起動する。クラスタの最初の1台として起動する。

つまり、クラスタ全体を停止した場合、最後に停止したノードだけが単体で正常に起動させることができて、その他のノードは起動後に300秒以内にクラスタのいずれかのピアと接続できなければならない。

もしクラスタの最後にシャットダウンしたノードが起動できないときは(例えば停電などですべてのノードが同時に停止するとすべてのノードが自分が最後だと思わないので、最後にシャットダウンしたノードが存在しなくなる)、別のノードで rabbitmqctl force_boot してから RabbitMQ を開始すれば他のピアとの同期を施行せずに起動させることができる。

systemctl stop rabbitmq-server
rabbitmqctl force_boot
systemctl start rabbitmq-server

「30秒10回」の期間は下記の設定で変更できる。

mnesia_table_loading_retry_timeout = 30000
mnesia_table_loading_retry_limit = 10

ノードのリセット後のクラスタへの再ジョイン

ノードがオフラインになっている間にノード名が変更されたり rabbitmqctl reset などでデータディレクトリがリセットされると、ノードを開始しても以前のクラスタには再ジョインできなくなる。 そのようなノードは rabbitmqctl forget_cluster_node <node> でいったんクラスタから削除した上で、クラスタへ再ジョインする必要がある。

# 既存のノードで
rabbitmqctl forget_cluster_node rabbit@mq03

# リセットしたノードで
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@mq01
rabbitmqctl start_app

キューのミラーリング

キューのミラーリングは1つのマスターと複数のミラーで構成されており、キューに対する操作はまずマスターに適用された後にミラーに伝達される(FIFOを保証するため)。

ミラーリングはポリシーを用いて有効化する。ミラーリングには複数のモードがあって ha-mode ポリシーキーで指定する。

exactly モードはキューのレプリカ数(マスターとミラーの合計数)を ha-params で指定する。ha-params: 1 はマスターのみでミラー無しを意味する。キューのマスターが故障すると自動的に他のミラーがマスターに昇格する。クラスタのノードがレプリカ数よりも少ない場合、すべてのノードにミラーされる。クラスタのノードがレプリカ数よりも多い場合、ミラーのいずれかのノードが故障すると別のノードが新たにミラーになる。

all モードはすべてのノードにミラーする。新しいノードが追加されるとそのノードにもミラーリングされる。

nodes モード は ha-params で指定したノードのリストにミラーされる。指定したノードのいずれかが実際のクラスタのメンバーに無くてもエラーにはならず、キューが宣言されたときにどのノードもクラスタに存在しなければ、クライアントが接続されているノードにキューが作成される。

レプリカ数はどれぐらい?

all モードですべてのノードにミラーすると、ネットワークI/O、ディスクI/O、ディスク領域、などでノードに負荷が大きいので、クラスタノードのクオラム数(いわゆる過半数)をレプリカ数にするのがおすすめ。

もしくは、要件によってはキューのメッセージは一過性のものだったり時間にセンシティブだったりするので、一部のキューでは少数のミラーやそもそもミラーリングしないようにすることもありうる。

キューマスターの位置

キューマスターが配置されるノードは下記のいずれかで指定できる。上にあるものほど優先される。

  • キュー宣言時の x-queue-master-locator
  • ポリシーキー queue-master-locator
  • 設定ファイルの queue_master_locator

以下のいずれかが指定できる。

  • min-masters
    • マスターキューが最も少ないノード
  • client-local
    • キューを宣言するクライアントが接続したノード
  • random
    • ランダム

設定ファイルの queue_master_locator のデフォルトが client-local なのでなにも指定しなければ client-local になります。なので最初に接続したプロセスが必要なキューを全部一括で宣言すると、そのプロセスが接続したノードにマスターキューが偏ります。

マスターキューの障害

ミラーリングされたキューからメッセージを取得しているコンシューマがいるとき、そのミラーのマスターが配置されているノードの障害によってキューがフェイルオーバーしても、コンシューマーにはなにも通知されずに自動的に新しいマスターから透過的にメッセージが配信されるようになります。

もちろん、コンシューマーが接続しているノード自身で停止するとその時点で接続が切れます。コンシューマーが接続しているノードとは別のノードにマスターがあって、そのノードが障害でとまったときの話です。

このとき、マスターキューがフェイルオーバーするとどのメッセージがコンシューマーに送信済かわからなくなるため、フェイルオーバー後にすべての ACK されていないメッセージが redelivered フラグを付けて再配信されます。そのためコンシューマーは同じメッセージを再度受信する可能性があります。

basic_consumex-cancel-on-ha-failover: true を指定すると、マスターキューのノードが停止してフェイルオーバーしたときに、キューの basic_consume がキャンセルされてコンシューマーに通知されるため、それを以てコンシューマーはフェイルオーバーしたことを知ることができる。

php-amqplib ならコンシューマーのコードは次のようになります。

$args = new AMQPTable();
$args->set('x-cancel-on-ha-failover', true);
$channel->basic_consume(
    'hello_queue',
    '',
    false, // $no_local = false,
    false, // $no_ack = false,
    false, // $exclusive = false,
    false, // $nowait = false,
    function (AMQPMessage $msg) use ($channel) {
        // ...
    },
    null,
    $args
);

次のような例外として通知されます。

PhpAmqpLib\Exception\AMQPBasicCancelException: Channel was canceled

ポリシーの変更による再構成

キューのポリシーが変更されたとき、なるべく既存のミラーを維持するように構成が自動で変更される。

nodes モードでポリシーを変更したとき、新しい ha-params のリストに以前のマスターが存在しない場合、ミラーのいずれかの同期が完了するのを待ってから既存のマスターキューが削除され、同期済のミラーが新しいマスターになる。

新しいノードのミラーへの同期

クラスタに新しいノードが追加されてそのノードにミラーが追加されたとき、デフォルトの動作ではミラーのキューは空となり既存のメッセージはそのミラーには存在しない。それ以降に新しくパブリッシュされたメッセージのみがそのミラーには含まれる。マスターキューの既存のメッセージがすべて ACK されて削除されると、ミラーキューは完全に同期された状態になる。

rabbitmqctl sync_queue $queue_name などで手動で同期することもできるが、同期中はキューが応答しなくなる。

ポリシーキー ha-sync-mode で新しいミラーが追加されたときにマスターキューにある既存のメッセージを同期するかどうか指定できる。

  • ha-sync-mode: manual
    • 新しいミラーは既存のメッセージは同期せずに新しくパブリッシュされたメッセージのみを受信する
    • これがデフォルト
  • ha-sync-mode: automatic
    • 新しいミラーには既存のメッセージが同期される
    • キューの同期中はキューへのアクセスがブロックされる
    • キューが十分小さく収まるならこれでも良いこともある

完全には同期されていないノードをマスターの停止や障害時のフェイルオーバー先にするかどうかを、以下のポリシーキーで制御できる。

  • ha-promote-on-shutdown
    • マスターキューのノードをシャットダウンするとき
    • when-synced がデフォルト
  • ha-promote-on-failure
    • マスターキューのノードが障害などで利用不可能になったとき
    • always がデフォルト

指定できる値は次の通り。

  • when-synced
    • 完全に同期されているノードのみをフェイルオーバー先に選択する
    • 完全に同期しているノードがなければマスターキューのノードを復帰させるまでそのキューは利用できなくなる
    • もしマスターキューのノードが復帰不可能だとキューを再作成するしかなくなる
  • always
    • 完全に同期されていないノードでもフェイルオーバー先にする

バッチ同期

ミラーリングキューの同期は最大で ha-sync-batch-size で指定した数のメッセージがバッチで送信される。

ノード間の通信が平均のメッセージサイズから求められるバッチ処理の最大サイズのトラフィックに耐えられるか確認しておく必要がある。もしそのトラフィックが net_ticktime よりも長い時間を要するのであればそれが原因でネットワーク分断になってしまうかもしれない。

ポリシーでキューにミラーリングを設定。

rabbitmqctl set_policy ha '^(?!amq\.).*' '{"ha-mode":"exactly","ha-params":2,"queue-master-locator":"min-masters"}' --apply-to queues

正規表現で amq. から始まるキューを除外しています。キューの宣言時に名前を指定せずに匿名キューになったといに amq. から始まる名前に付けられるよです。つまりこのように指定すれば匿名キューはミラーリングから除外されます。がしかし・・そもそも匿名キューは排他キューとしてしか使うことはなさそうなので、もともとミラーリングされることは無いわけで、正規表現は ^ とかですべてのキューを対象にするので良いような気もします。

キューのミラーリング関係の情報を表示(ポリシー、マスターノード、ミラーノード)

rabbitmqctl list_queues name policy pid slave_pids -q
# name            policy  pid                     slave_pids
# hello_queue     ha      <rabbit@mq01.1.2712.0>  [<rabbit@mq02.3.1372.0>]
# hoge_queue      ha      <rabbit@mq01.1.2725.0>  [<rabbit@mq03.2.1134.0>]

ハートビート

RabbitMQ では、RabbitMQ のノード間の通信と、RabbitMQ とクライアント接続の、2種類のハートビート交換が行われている。

ノード間接続(Net Tick Time)

クラスタのノードのすべてのピア間で定期的に Tick message の交換が行われる。net_ticktime 設定値の 1/4 の頻度で交換され、net_ticktime の期間ずっと通信がなければそのピアはダウンしたとみなされる。

net_ticktime の値は advanced.config で次のように指定できる。デフォルトは 60 です。

[
  {kernel, [{net_ticktime,  120}]}
].

クライアント接続のハートビート

RabbitMQ とクライアントとの間でもハートビートが交換されており、タイムアウト時間は heartbeat で指定可能でデフォルトは 60 秒。タイムアウト時間の 1/2 の秒数の頻度でハートビートの交換は行われる。

ネットワーク分断

クラスタのノード間の接続が失われてからその接続が復活したとき、双方が互いに相手方がダウンしていたと判断しているとネットワーク分断によるスプリットブレインになる。

ネットワーク分断が発生しているかは rabbitmqctl cluster_status コマンドで partitions が空かどうかで判断できる。分断が発生していないなら空です。

正常な状態

rabbitmqctl cluster_status
#=> Cluster status of node rabbit@mq01 ...
#=> [{nodes,[{disc,[rabbit@mq01,rabbit@mq02,rabbit@mq03]}]},
#=>  {running_nodes,[rabbit@mq03,rabbit@mq02,rabbit@mq01]},
#=>  {cluster_name,<<"rabbit@mq01">>},
#=>  {partitions,[]},
#=>  {alarms,[{rabbit@mq03,[]},{rabbit@mq02,[]},{rabbit@mq01,[]}]}]

cluster partition handling

設定 cluster_partition_handling でネットワーク分断発生時にどうするかが指定できる。

  • ignore
  • pause-minority
  • pause-if-all-down
  • autoheal

ignore ならネットワーク分断が発生してもそれぞれのパーティションは独立して稼働し、他方のパーティションのノードがクラッシュしたと判斷する。ミラーリングキューはそれぞれのパーティションにマスターが配置され、両方が独立して動作する。ネットワーク接続が回復した後も分断はそのままで、パーティションごとに独立して動作する。

pause-minority は、ネットワーク分断が発生した時点でクラスタ全体のノード数の過半数に満たないパーティションは自動的に一時停止し、接続が回復した時点で自動的に開始する。これはスプリットブレインは発生しないので矛盾なくネットワーク分断から回復できる。

pause_if_all_down は追加のパラメータで複数のノードを指定し、ネットワーク分断が発生したときは指定された複数のノードのいずれにも到達できないノードを自動的に一時停止する。例えば2つのラックに2台ずつの合計4台のクラスタで、ラック間の通信に障害が発生したときに、どちらのラックのノードを優先するかを設定したりできる。pause-minority とは異なり、リストされた複数のノードが分断されると接続が回復したときにスプリットブレインになる。追加のパラメータ cluster_partition_handling.pause_if_all_down.recover でこのときにどうすうるかを指定できる(ignore または autoheal)。

autoheal は、ネットワーク分断が発生した時点では ignore と同様になにも起こらず、接続が回復した時点で下記の方法でパーティションをいずれか 1 つ選択し、それ以外のパーティションのノードを再起動する。

  • 最も多くのクライアントが接続されているパーティション
  • ↑が引き分けなら、ノードが最も多いパーティション
  • ↑が引き分けなら、ランダムに 1 つのパーティションを選択

どのモードを選ぶべき?

  • ignore
    • ネットワークはとても信頼できる
    • すべてのノードがラックにあって、スイッチで接続されていて、そのスイッチは外部へのルートでもある
    • クラスタの一部で障害が発生したときにクラスタ全体がシャットダウンするリスクを負いたくない
    • 2ノードクラスタである
  • pause-minority
    • ネットワークの信頼性が高くない
    • 複数のデータセンタにまたがってクラスタ化されていて一度に1つのデータセンタのみに障害が発生すると想定している
      • これらのデータセンタにはEC2のアベイラビリティーゾーンのように相互に直接かつ信頼性の高い接続が必要
  • autoheal
    • ネットワークが信頼できない可能性がある
    • データの整合性よりもサービスの継続性を重視する
    • 2ノードクラスタである

スプリットブレインからの復旧

ネットワーク分断が発生した後に接続が回復し、それぞれのパーティションが独立に動作するスプリットブレインになった場合、分断したパーティションから一番信頼できるものを選択し、それ以外のパーティションのノードをすべて再起動すると復旧できる。

参考

RabbitMQ メモ

だいぶ前に書いていたメモを発掘。

これらの書いていたものなので投稿日を改ざんしてます。

設定ファイル

https://www.rabbitmq.com/configure.html

RabbitMQ 3.7.0 より前は rabbitmq.config というファイル名で Erlang 用の設定の書式が使われていたのだけど 3.7.0 以降では rabbitmq.conf というファイル名の普通の設定ファイルっぽい書式のものも使うことができて、こちらの方が奨励されています。

設定ファイルの場所は CentOS とかで RPM でインストールすれば /etc/rabbitmq/rabbitmq.conf になります。

rabbitmq.conf の形式では記述できる内容に制限があるため、追加で advanced.config という名前の設定ファイルを設けることもできます。このファイルはErlang 用の設定の書式を使用して記述します。

チューニング

実運用での RabbitMQ の設定値、sysctl やら ulimit やらのチューニングについて下記にまとまっています。

https://www.rabbitmq.com/networking.html

また、下記によるとファイルディスクリプタの上限数 ulimit -n は少なくとも 65536 を奨励するもよう。

https://www.rabbitmq.com/install-rpm.html#kernel-resource-limits

がしかし、例では 64000 を指定しているのだけど・・?

[Service]
LimitNOFILE=64000

なお CentOS7 に入れてみたところデフォルトは 32768 になっていました。

/usr/lib/systemd/system/rabbitmq-server.service

# To override LimitNOFILE, create the following file:
#
# /etc/systemd/system/rabbitmq-server.service.d/limits.conf
#
# with the following content:
#
# [Service]
# LimitNOFILE=65536

LimitNOFILE=32768

モニタリング

https://www.rabbitmq.com/monitoring.html

# クラスタ全体のメトリクスを JSON で得る
curl -u ore:pass http://localhost:15672/api/overview

# 特定のノードのノード固有メトリクスを JSON で得る
curl -u ore:pass http://localhost:15672/api/nodes/rabbit@mq01

# すべてのノードのノード固有メトリクスを JSON で得る
curl -u ore:pass http://localhost:15672/api/nodes

# キュー固有のメトリクスを得る
curl -u ore:pass http://localhost:15672/api/queues/%2F/hoge_queue

ヘルスチェック

https://www.rabbitmq.com/monitoring.html#health-checks

プロダクションチェックリスト

https://www.rabbitmq.com/production-checklist.html

シングルテナントなら vhost はデフォルトの / で問題ない、マルチテナントならテナントごとに個別の vhost を使うのが良い。

デフォルトのユーザー guest はローカルホストからしか使用できないものの実稼働環境では削除して別のユーザーを作るべき。アプリケーションごとにユーザーを設けると良い。

固定IPを持つ多くのクライアントがあるなら「x509証明書」や「ソースIPアドレス範囲」で認証すると良いかも。

RabbitMQ が使用可能なメモリの 40% を超えてメモリを使用するとそれ以上のメッセージをパブリッシャーから受けなくなる。RabbitMQ をメモリを使いすぎると OS のスワッピングによってパフォーマンスに影響があるため、vm_memory_high_watermark でパーセンテージは調整できるけれども基本的にはデフォルト値のままで、変更するにしても 0.40~0.66 に留めるべき。

disk_free_limit はデフォルトだと 50MB でこれは実稼働には小さすぎるかもしれないので、

disk_free_limit.mem_relative = 1.0
disk_free_limit.mem_relative = 1.5
disk_free_limit.mem_relative = 2.0

とかにすると良いかもしれない。

メモリとディスクのアラーム

https://www.rabbitmq.com/alarms.html

メモリ使用量が設定された制限値以上になったとき、または、ディスクの空き容量が設定された制限値を下回ったとき、パブリッシャの通信が一時的にブロックされる。

パブリッシャからは単に遅延しているだけに見える。いずれかひとつのノードでアラームになるとクラスタのすべてのノードで同じ状態になる?

https://www.rabbitmq.com/memory.html https://www.rabbitmq.com/disk-alarms.html

仮想ホスト

https://www.rabbitmq.com/vhosts.html

RabbitMQ は仮想ホストごとに、接続、エクスチェンジ、キュー、バインディング、アクセス許可、ポリシー、などなどがある。

クライアントはどの仮想ホストに接続するかを指定して接続する。仮想ホストをまたがるような相互運用はクライアントが複数の接続を用いて、ある仮想ホストから取得したメッセージを別の仮想ホストに再発行するような形でやる必要がある。

仮想ホストの作成や削除には結構な時間がかかるため、ループの中で大量に作成するときとかはタイムアウト時間に気をつける必要がある。

認証、認可、アクセス制御

https://www.rabbitmq.com/access-control.html

サーバが最初に開始したときに次の通りデータベースが作成去れる。

  • デフォルトの仮想ホスト /
  • デフォルトのユーザー guest とパスワード guest

guest ユーザーはローカルホストからしか接続できません。

プラグイン

プラグインは次のようなコマンドで有効にできる

rabbitmq-plugins enable <plugin-name>

--offline を付ければ RabbitMQ には接続せずに設定ファイル enabled_plugins だけが更新される。このファイルは次のような形式でプラグイン名がリストされる。

[rabbitmq_management,rabbitmq_management_agent,rabbitmq_shovel].

このファイルには依存関係はリストされないもよう? このファイルを更新したときは RabbitMQ の再起動が必要。

マネジメントプラグイン

https://www.rabbitmq.com/management.html

設定で Path Prefix が指定できる

HTTP API での Publishing/Consuming は奨励はされないのだけど、Publishing については長寿命の接続が適切ではない状況では役立つとのこと。

その他のおもしろそうなプラグイン。

  • rabbitmq_random_exchange
    • ロードバランスのためにルーティングするキューをランダムで決定する
  • rabbitmq_recent_history_exchange
    • Exchange の最後の20メッセージを追跡する
    • チャットの履歴みたいな使い方ができる
  • rabbitmq_sharding
    • キューの FIFO を犠牲にしてシャーディングをする
  • rabbitmq_web_dispatch
    • 勝手に有効になる?

jmeter を master(Windows)で slave(Docker)で実行する

Windows 側は Ctrl-R で次のように jmeter を開始します。 -Jremote_hosts にはスレーブである Docker のホストをカンマ区切りで指定します。

jmeter -Jserver.rmi.ssl.disable=true -Jremote_hosts=192.0.2.123

Docker で次のように jmeter を実行します。イメージは Dockerhub で見つけてきた justb4/jmeter を使っています。気になる人は自前でビルドしても良いでしょう。

-Djava.rmi.server.hostname にはこの Docker ホストのアドレスを指定します。↑の -Jremote_hosts と同じになると思います。

docker run --rm --name jmeter -p 1099:1099 -p 60000:60000 justb4/jmeter \
    -s -n -j /dev/stdout \
    -Jserver.rmi.ssl.disable=true \
    -Jserver.rmi.localport=60000 \
    -Djava.rmi.server.hostname=192.0.2.123

試行錯誤の跡

最初適当にググって下記でできると思ったもののダメでした。

docker run --rm --name jmeter -p 1099:1099 justb4/jmeter \
    -s -n -j /dev/stdout \
    -Jserver.rmi.ssl.disable=true \
    -Djava.rmi.server.hostname=192.0.2.123

-Djava.rmi.server.hostname はリッスンアドレスなの? なら 0.0.0.0 で良いのではと思って見るも、やっぱりダメでした。

docker run --rm --name jmeter -p 1099:1099 justb4/jmeter \
    -s -n -j /dev/stdout \
    -Jserver.rmi.ssl.disable=true \
    -Djava.rmi.server.hostname=0.0.0.0

docker jmeter あたりでググると下記が見つかりました。

うーん? どうも java.rmi.server.hostname は接続しにきたクライアントが接続する先のアドレスなので Windows から見た Docker ホストのIPアドレスを指定する必要があるっぽい(最初の例が正しい)。

それと、1099 以外のポートも使われているようで、tcpdump してみると 60000 より大きいランダムなポートが使用されていました。-Jserver.rmi.localport でポートを固定してそのポートも晒すようにします。

参考