NodeJS で Async hooks を使うトランザクションの実用的な実装

前に書いた NodeJS で実験的な Async hooks を使って横断的なトランザクション をもう少し実用的な実装にしてみました。とは言っても NodeJS をプロダクション用に書くことが全くないのでホントに実用的になってるかどうかはわかりませんが。

// context.ts

import asyncHooks from 'async_hooks'

interface Pool<T extends Connection> {
    getConnection(): Promise<T>,
    end(): Promise<void>;
}

interface Connection {
    beginTransaction(): Promise<void>,
    commit(): Promise<void>,
    rollback(): Promise<void>,
    release(): void,
}

type PromiseValue<T> = T extends Promise<infer V> ? V : never;

type PoolConnection<TPool extends Pool<Connection>> = PromiseValue<ReturnType<TPool["getConnection"]>>;

export class Conext<
    TPool extends Pool<TConnection>,
    TConnection extends Connection = PoolConnection<TPool>
> {
    private readonly context: {[eid: string]: { conn: TConnection, trx: number }} = {};

    private asyncHooks: asyncHooks.AsyncHook;

    constructor(private pool: TPool) {
        this.asyncHooks = asyncHooks.createHook({
            init: (asyncId, type, triggerAsyncId, resource) => {
                if (this.context[triggerAsyncId]) {
                    this.context[asyncId] = this.context[triggerAsyncId];
                }
            },
            destroy: (asyncId) => {
                delete this.context[asyncId];
            }
        }).enable();
    }

    async connection<T>(callback: (conn: TConnection) => Promise<T>) {
        const eid = asyncHooks.executionAsyncId();
        if (this.context[eid]) {
            return await callback(this.context[eid].conn);
        }
        const conn = await this.pool.getConnection();
        try {
            const eid = asyncHooks.executionAsyncId();
            this.context[eid] = { conn: conn, trx: 0 };
            try {
                return await callback(conn);
            } finally {
                delete this.context[eid];
            }
        } finally {
            conn.release();
        }
    }

    async transaction<T>(callback: (conn: TConnection) => Promise<T>) {
        return await this.connection(async (conn) => {
            const eid = asyncHooks.executionAsyncId();
            if (this.context[eid].trx === 0) {
                this.context[eid].trx++;
                await conn.beginTransaction();
                try {
                    const ret = await callback(conn);
                    await conn.commit();
                    return ret;
                } catch (err) {
                    await conn.rollback();
                    throw err;
                } finally {
                    this.context[eid].trx--;
                }
            } else {
                this.context[eid].trx++;
                try {
                    return await callback(conn);
                } finally {
                    this.context[eid].trx--;
                }
            }
        });
    }

    async end() {
        try {
            return await this.pool.end();
        } finally {
            this.asyncHooks.disable();
        }
    }
}

ジェネリクスを用いて mysql や mariadb のパッケージに直接依存しないようにしました。

使い方

コネクションプールを作成し、それをコンストラクタに渡して Conext のインスタンスを作ります。 このインスタンスは例のようにパッケージスコープでエクスポートしてもいいですし、DIコンテナを使っているならDIコンテナに入れるなどしても良いと思います。

// db.ts

import mysql from 'mariadb'
import { Conext } from './conext'

const dbConfig: mysql.PoolConfig = {
    host: process.env.MYSQL_HOST,
    port: parseInt(process.env.MYSQL_PORT || ''),
    user: process.env.MYSQL_USER,
    password: process.env.MYSQL_PASSWORD,
    database: process.env.MYSQL_DATABASE,
}

const pool = mysql.createPool(dbConfig);

export default new Conext(pool);

実際に使ってみる例です。トランザクションが必要ないときは connection を、トランザクションが必要なときは transaction を使います。どちらの場合もそのスコープ、およびそのスコープから呼ばれた同期/非同期関数では同じ接続が使用されます。

// index.ts

import db from './db'

(async () => {

    const func1 = async () => {
        // このスコープの中では同じコネクションが使用される
        await db.connection(async (conn) => {
            const [{cid}] = await conn.query('select connection_id() as cid');
            console.log(`${cid} -> func1`);

            await conn.query('select sleep(1)');
            await func2();
        });
    };

    const func2 = async () => {
        // ネストされたトランザクションは一番外側でのみ begin/commit される
        await db.transaction(async (conn) => {
            const [{cid}] = await conn.query('select connection_id() as cid');
            console.log(`${cid} -> func2`);

            await conn.query("insert into t (name) values ('aaa')");
            await conn.query('select sleep(1)');
            await func3();
        });
    };

    const func3 = async () => {
        // このトランザクションは func2 の内側なので begin/commit されない
        await db.transaction(async (conn) => {
            const [{cid}] = await conn.query('select connection_id() as cid');
            console.log(`${cid} -> func3`);

            await conn.query("insert into t (name) values ('zzz')");
        });
    };

    try {
        // 5並行で func1 -> func2 -> func3 を実行
        // 5接続しか使用されず func1/func2/func3 では同じ接続が使用される
        await Promise.all(Array.from(Array(5)).map(async () => func1()));

        // 30並行でクエリを実行
        // プールの接続数のデフォが10なので10接続しか使用されない
        // 1クエリで1秒かかるので合計で3秒ぐらいかかる
        const start = new Date().getTime();
        const ids = await Promise.all(Array.from(Array(30)).map(async () => {
            return await db.connection(async (conn) => {
                await conn.query('select sleep(1)');
                const [{cid}] = await conn.query('select connection_id() as cid');
                return cid;
            });
        }));
        console.log(`connection_ids:`, new Set(ids))
        console.log(`duration: ${new Date().getTime() - start} ms`);

    } catch (err) {
        console.error(err);
    } finally {
        await db.end();
    }
})()

さいごに

NodeJS のサーバサイドだとどうしてもコンテキスト的なものを引き回す必要があるのだと思っていたのですが、Async hooks なんて便利なものが(Experimental だけど)出来ていたんですね。

Async hooks は 3 年ぐらい前に出てきたもののようです。NodeJS のトレンドは全然追えていませんが、NodeJS のフレームワークでは Async hooks がもっと活用されてくるようになってきたりするのでしょうか。