AWS SDK for PHP の SQS::ReceiveMessage でシグナルを受けたときに long pooling を中断して終了したい

<?php
pcntl_async_signals(true);

$sig_handler = function ($signo) use (&$term){
    echo "signal:$signo\n";
    $term = true;
};

pcntl_signal(SIGHUP, $sig_handler);
pcntl_signal(SIGINT, $sig_handler);
pcntl_signal(SIGQUIT, $sig_handler);
pcntl_signal(SIGTERM, $sig_handler);

$sqs = new SqsClient([
    'region'  => 'ap-northeast-1',
    'version' => 'latest',
]);

while (!$term) {
    $messages = $sqs->receiveMessage([
        'QueueUrl'        => $queue,
        'WaitTimeSeconds' => 20,
    ]);

    // 何か処理する、途中でシグナルを受けても死なない
}

SQS からメッセージを受信し、何かの処理が完了するまではシグナルを受けてもプロセスを終了しないようにするためにシグナルハンドラを仕込んでいます。しかしこのコードでは WaitTimeSeconds で指定した long polling 中にもシグナルで終了しなくなるため、終了させたいときにサッと終了しません。

非同期シグナルが有効になっていればシグナルハンドラでの割り込みは利くし AWS SDK for PHP(の中の Guzzle) の中で呼ばれている curl_multi_select もシグナルで中断されるものの AWS SDK for PHP(の中の Guzzle)が自動的に curl_multi_select を再試行するため sqs::receiveMessage から抜けてこなくて、きっちり WaitTimeSeconds まで待たされます。

sqs::receiveMessage で待っているときは実質何もしていないのでシグナルを受けたらすぐにサッと終了してほしいです。終了が最大で20秒も遅延するのはちょっと長すぎます。

いくつか改善案を考えてみました。

シグナルを処理しない

PHP でシグナルを処理しようとするから終了が遅れてしまうのであってシグナルを処理しなければ即死するので、シグナルを処理しないことにします。

<?php
$sqs = new SqsClient([
    'region' => 'ap-northeast-1',
    'version' => 'latest',
]);

for (;;) {
    $messages = $sqs->receiveMessage([
        'QueueUrl'        => $queue,
        'WaitTimeSeconds' => 20,
    ]);

    // 何か処理する、途中でシグナルを受けて死んでもいいじゃない
}

いやそれで終わりならこんな記事書かんわ。

メッセージを受信した後、何か処理しているときにはできればシグナルでは即死せず、何かの処理が終わってから死んでほしいです。

処理中はシグナルをブロックする

シグナルハンドラは仕込まず、かつ、何か処理中はシグナルをブロックします。

<?php
$sqs = new SqsClient([
    'region' => 'ap-northeast-1',
    'version' => 'latest',
]);

for (;;) {
    $messages = $sqs->receiveMessage([
        'QueueUrl'        => $queue,
        'WaitTimeSeconds' => 20,
    ]);

    pcntl_sigprocmask(SIG_BLOCK, [SIGHUP, SIGINT, SIGQUIT, SIGTERM]);
    try {

        // 何か処理する、途中でシグナルを受けても死なない

    } finally {
        pcntl_sigprocmask(SIG_UNBLOCK, [SIGHUP, SIGINT, SIGQUIT, SIGTERM]);
    }
}

シグナルハンドラで exit する

シグナルハンドラで exit すれば普通に終了します。 何か処理中は死んでほしくないのでフラグ変数とかで分岐させます。

<?php
pcntl_async_signals(true);

$processing = false;

$sig_handler = function ($signo) use (&$term, &$processing){
    echo "signal:$signo\n";
    if (!$processing) {
        exit;
    }
    $term = true;
};

pcntl_signal(SIGHUP, $sig_handler);
pcntl_signal(SIGINT, $sig_handler);
pcntl_signal(SIGQUIT, $sig_handler);
pcntl_signal(SIGTERM, $sig_handler);

$sqs = new SqsClient([
    'region' => 'ap-northeast-1',
    'version' => 'latest',
]);

while (!$term) {
    $messages = $sqs->receiveMessage([
        'QueueUrl'        => $queue,
        'WaitTimeSeconds' => 20,
    ]);
    try {
        $processing = true;

        // 何か処理する、途中でシグナルを受けても死なない

    } finally {
        $processing = false;
    }
}

シグナルハンドラから例外を投げる

先ほどの例とあまり変わりませんが、シグナルハンドラから例外を投げれば無理やり SQS::ReceiveMessage を抜けることができます。 なお、飛んでくる例外は AWS SDK の例外クラスでラップされているので getPrevious で剥がす必要があります。

<?php
class SignalException extends RuntimeException {}

pcntl_async_signals(true);

$processing = false;

$sig_handler = function ($signo) use (&$term, &$processing){
    echo "signal:$signo\n";
    if (!$processing) {
        throw new SignalException("signal:$signo", $signo);
    }
    $term = true;
};

pcntl_signal(SIGHUP, $sig_handler);
pcntl_signal(SIGINT, $sig_handler);
pcntl_signal(SIGQUIT, $sig_handler);
pcntl_signal(SIGTERM, $sig_handler);

$sqs = new SqsClient([
    'region' => 'ap-northeast-1',
    'version' => 'latest',
]);

while (!$term) {
    try {
        $messages = $sqs->receiveMessage([
            'QueueUrl'        => $queue,
            'WaitTimeSeconds' => 20,
        ]);
    } catch (Throwable $ex) {
        if ($ex->getPrevious() instanceof SignalException) {
            break;
        }
        throw $ex;
    }
    try {
        $processing = true;

        // 何か処理する、途中でシグナルを受けても死なない

    } finally {
        $processing = false;
    }
}

シグナルハンドラからリクエストをキャンセルする → ダメ

ダメでした。WaitTimeSeconds の時間を待ったうえで wait から CancellationException が飛んでくるという動きになりました。

<?php
pcntl_async_signals(true);

$sig_handler = function ($signo) use (&$term, &$async){
    echo "signal:$signo\n";
    if ($async) {
        assert($async instanceof Promise);
        $async->cancel();
        $async = null;
    }
    $term = true;
};

pcntl_signal(SIGHUP, $sig_handler);
pcntl_signal(SIGINT, $sig_handler);
pcntl_signal(SIGQUIT, $sig_handler);
pcntl_signal(SIGTERM, $sig_handler);

$sqs = new SqsClient([
    'region' => 'ap-northeast-1',
    'version' => 'latest',
]);

while (!$term) {
    $async = $sqs->receiveMessageAsync([
        'QueueUrl'        => $queue,
        'WaitTimeSeconds' => 20,
    ]);
    try {
        $messages = $async->wait();
    } catch (CancellationException) {
        break;
    }
    $async = null;

    // 何か処理する、途中でシグナルを受けても死なない
}

素の Guzzle で requestAsync などを使うと大丈夫だったのですが・・・謎。