読者です 読者をやめる 読者になる 読者になる

mysql_use_result と Server Sent Events でデータをストリーム的にブラウザに返す

PHP MySQL JavaScript

まず下記のような手順で 1000 万件のレコードを持つテーブルを作ります。

$ mysql test -e "
  create table t (
    id int not null primary key,
    str text not null
  )
"

$ seq 10000000 |
  mysql test -e "
    load data local infile '/dev/stdin' into table t (id)
    set str = floor(rand()*10000000)
  "

このテーブルを PDO で普通に SELECT すると・・・

<?php
$db = new \PDO(
    "mysql:unix_socket=/var/lib/mysql/mysql.sock;dbname=test;charset=utf8",
    "test", "pass", [
        \PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
        \PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
        \PDO::ATTR_EMULATE_PREPARES => 0,
    ]
);

$sql = "select * from t order by id";
$stmt = $db->query($sql);

foreach ($stmt as $row) {
    $data = json_encode($row, JSON_UNESCAPED_UNICODE);
    echo "$data\n";
    flush();
}

メモリ 1G の swap なしの VM で実行すると Out of memory になってしまいました。

$ php pdo.php
Killed

$ sudo tail -n 2 /var/log/messages
Jun 15 23:10:21 ore-no-server kernel: Out of memory: Kill process 28028 (php) score 707 or sacrifice child
Jun 15 23:10:21 ore-no-server kernel: Killed process 28028, UID 500, (php) total-vm:944336kB, anon-rss:719964kB, file-rss:112kB

PDO::MYSQL_ATTR_USE_BUFFERED_QUERY に 0 を指定すると大丈夫です。

<?php
$db = new \PDO(
    "mysql:unix_socket=/var/lib/mysql/mysql.sock;dbname=test;charset=utf8",
    "test", "pass", [
        \PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
        \PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
        \PDO::ATTR_EMULATE_PREPARES => 0,
        \PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => 0,
    ]
);

$sql = "select * from t order by id";
$stmt = $db->query($sql);

foreach ($stmt as $row) {
    $data = json_encode($row, JSON_UNESCAPED_UNICODE);
    echo "$data\n";
    flush();
}
$ php pdo.php
{"id":1,"str":"5998599"}
{"id":2,"str":"7155699"}
 :
 :
 :
{"id":9999999,"str":"3498003"}
{"id":10000000,"str":"3795234"}

ソースコードは見ていませんが、おそらく mysql_store_resultmysql_use_result の違いでしょう。

mysql_store_result だと query の時点ですべての結果がクライアントに送信されるため、クライアント側で 1000 万件のレコードのためのバッファを確保しようとします。mysql_use_result なら query の時点ですべての結果が送信されることはなく、fetch で適当なサイズの結果が逐次送信されるため、クライアント側に巨大なバッファが必要ありません。

また、mysql_store_result ならすべての結果をクライアントに送信しきるまで query が応答を返さないのに対して、mysql_use_result なら最初の1行(あるいは max_allowed_packet に達するまでの行数?)が送信されれば query が応答を返すため、最初の1行目が表示されるまでの応答がとても早くなります。

ただし、select * from t order by str などとインデックスの無い列をソートの条件にすると、最初の1行目を取得するためにテーブルを全行走査する必要があるのでとても遅いです。応答を速くするためにはいわゆる ORDER BY 狙いのインデックスが必要です。

また、mysql_use_result だと同時に複数のクエリを開くことができません。

<?php
$db = new \PDO(
    "mysql:unix_socket=/var/lib/mysql/mysql.sock;dbname=test;charset=utf8",
    "test", "pass", [
        \PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
        \PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
        \PDO::ATTR_EMULATE_PREPARES => 0,
        \PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => 0,
    ]
);

$sql = "select * from t order by id limit 1";
$stmt1 = $db->query($sql);

$sql = "select * from t order by id desc limit 1";
$stmt2 = $db->query($sql);
// SQLSTATE[HY000]: General error: 2014 Cannot execute queries while other unbuffered queries are active.  Consider using PDOStatement::fetchAll().  Alternatively, if your code is only ever going to run against mysql, you may enable query buffering by setting the PDO::MYSQL_ATTR_USE_BUFFERED_QUERY attribute.

var_dump($stmt1->fetch());
var_dump($stmt2->fetch());

mysql_store_result なら複数同時に開いて並列に fetch することができます。

<?php
$db = new \PDO(
    "mysql:unix_socket=/var/lib/mysql/mysql.sock;dbname=test;charset=utf8",
    "test", "pass", [
        \PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
        \PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
        \PDO::ATTR_EMULATE_PREPARES => 0,
    ]
);

$sql = "select * from t order by id limit 1";
$stmt1 = $db->query($sql);

$sql = "select * from t order by id desc limit 1";
$stmt2 = $db->query($sql);
// SQLSTATE[HY000]: General error: 2014 Cannot execute queries while other unbuffered queries are active.  Consider using PDOStatement::fetchAll().  Alternatively, if your code is only ever going to run against mysql, you may enable query buffering by setting the PDO::MYSQL_ATTR_USE_BUFFERED_QUERY attribute.

var_dump($stmt1->fetch());
var_dump($stmt2->fetch());
/*
array(2) {
  'id' =>
  int(1)
  'str' =>
  string(7) "5182048"
}
array(2) {
  'id' =>
  int(10000000)
  'str' =>
  string(7) "2880349"
}
*/

また、次のように mysql_use_result で途中で結果の fetch を中断したとしても、対象テーブルの走査が完了するまで次のクエリを実行できないため、結果セットのクローズ $stmt->closeCursor() でかなり待たされます。

<?php
$db = new \PDO(
    "mysql:unix_socket=/var/lib/mysql/mysql.sock;dbname=test;charset=utf8",
    "test", "pass", [
        \PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
        \PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
        \PDO::ATTR_EMULATE_PREPARES => 0,
        \PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => 0,
    ]
);

$sql = "select * from t order by id";
$stmt = $db->query($sql);
var_dump($stmt->fetch());
$stmt->closeCursor();
$stmt = null;

$sql = "select * from t order by id limit 1";
$stmt = $db->query($sql);
var_dump($stmt->fetch());

さらに、ストレージエンジンが MyISAM だと結果をすべて fetch しきるまでそのテーブルへの書き込みがロックされます。InnoDB は大丈夫だったと思います。


mysql_use_resultEventSource を使って Web サーバからブラウザにストリーム的に 1000 万件のレコードを返してみます。

stream.php

<?php
set_time_limit(0);

while (ob_get_level()) {
    ob_end_clean();
}

$db = new \PDO(
    "mysql:unix_socket=/var/lib/mysql/mysql.sock;dbname=test;charset=utf8",
    "test", "pass", [
        \PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
        \PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
        \PDO::ATTR_EMULATE_PREPARES => 0,
        \PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => 0,
    ]
);

$sql = "select * from t order by id";
$stmt = $db->query($sql);

header("Content-type: text/event-stream; charset=utf-8");

foreach ($stmt as $row) {
    if (connection_aborted()) {
        break;
    }
    $data = json_encode($row, JSON_UNESCAPED_UNICODE);
    echo "data: $data\n\n";
    flush();
    usleep(100);
}

index.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8" />
    <title>example</title>
</head>
<body>

<button id="go">GO</button>
<span id="time"></span>
<pre id="pre"></pre>

<script src="//code.jquery.com/jquery-2.1.1.min.js"></script>
<script>
$(function(){
    var $time = $('#time');
    var $elem = $('#pre');
    var start;

    $('#go').on('click', function(){
        start = new Date();
        $time.text("0 sec");

        new EventSource("stream.php").onmessage = function(e){
            $("<div></div>").text(e.data).prependTo($elem);
            var $children = $elem.children();
            if ($children.size() > 100) {
                $children.last().remove();
            }
            $time.text(parseInt((new Date() - start) / 1000) + ' sec');
        }
    })
})
</script>

</body>
</html>

index.html をブラウザで開いて GO ボタンをクリックすると、データがだら~っと流れてきます。