AnyEvent::RabbitMQ - メッセージキュー…使いこなしてますか?

cooldaemon
2011-12-04

前置き

こんにちは。cooldaemon と申します。みなさん、意識は高まっていますか? 私は上々です。今回は拙作の AnyEvent::RabbitMQ というモジュールを紹介いたします。

AnyEvent::RabbitMQ は、AMQP ブローカーと非同期にメッセージを送受信するための AMQP クライアントです。AMQP とは、メッセージを扱うミドルウェアのオープンな標準仕様です。AMQP の正確で詳しい内容は、AMQP の公式サイトGoogle をご確認ください。

本当の事を言うと、名前は AnyEvent::AMQP::Client でも良かったのですが、私は、AMQP ブローカー実装の一つ RabbitMQ を常用しており、RabbitMQ 以外の AMQP ブローカーを利用する機会も必要もなく、また、接続性を保証する事もしたくなかったので、あえて名前に AMQP を含めずに RabbitMQ を含めました。

で、結局、何ができるの?

率直に申し上げると、RabbitMQ + AnyEvent::RabbitMQ は、Gearman、TheSchwartz、Qudo、Q4M 等のライバルです。

元々、個人的趣味で細々と作成していたのですが、仕事で下記の条件を満たした Web API を作成する必要があり、試しに利用してみたところ、かなり満足の行く結果を得られましたので、今回は、その事例を中心に紹介いたします。

  • 秒間 200 件程度の HTTP リクエストを捌く必要がある
  • 外部システムとの接続の都合で、一つの HTTP リクエストは、平均 5 秒は接続し続ける必要がある
  • 既存の API と同一のドメインを利用する必要がある

同時接続 1000 程度の HTTP リクエストなら、社内に転がっている 1U サーバ*1一台で十分捌けるだろうし、機器故障に備えて二重化しても二台で十分だろうという予測の元、下記の構成でシステム設計を行いました。

[様々な HTTP クライアント] <-> [Apache * N台] <-> [RabbitMQ * 2台] <-> [Perl 製の Worker * 2台]

Apache と Worker の間の RabbitMQ の役割は、ここの 6.RPC の図の通りです。

HTTP クライアント

HTTP クライアントは、お客様のシステムとなるため、工夫の余地はありません。

Apache * N台

Apache も、既存の API とドメインを揃える必要があるので、既存システムの Apache をそのまま流用しました。よって、サーバ台数も既存システムそのままです。

RabbitMQ * 2台

Apache と RabbitMQ を接続するため、Apache に届いた HTTP リクエストを RabbitMQ の Queue に追加する mod を C で書こうとしたのですが、RabbitMQ に HTTP インターフェースを追加するプラグインを見つけたので、届いた HTTP リクエストを Queue に追加する RabbitMQ プラグインを Erlang で書き、mod_proxy_balancer で接続しました。

実は、RabbitMQ 上には、HTTP リクエストを貯めるキュー以外にも、お客様にメールを送る為のキュー、障害時のトレースログを保存するためのキュー、後々の為に 商用DB に履歴を保存する為のキューなどが存在しているのですが、今回は説明の範囲外とさせてください。

また、RabbitMQ のインストールや運用方法も、今回の説明から除外させて頂きます。詳しくは、RabbitMQ 公式サイトのドキュメントページをご確認ください。

少しだけ補足させて頂くと、ここの 3.Publish/Subscribe の図に示されているテクニックを使うと、簡単にリクエストやレスポンスのログを貯めておく事ができます。

Perl 製の Worker * 2台

実は、フルスクラッチでシステム構築を行う必要があり、AMQP クライアント が存在する言語であれば何を選択しても良かったのですが、AnyEvent::RabbitMQ が非同期かつ複数 Channel に対応*2している事*3KumoFS を非同期に利用するためのクライアント Cache::Memcached::AnyEvent が存在していた事、外部システムとの連携のために非同期 HTTP 通信を高速に行える AnyEvent-Curl が存在していた事が決め手となり Perl を採用しました。

実案件では、物理サーバ一台につき CPU の数だけ DaemonTools 配下で Worker を起動しています。これは、ここの 2.Work queues の図の通りで、これから紹介する Worker を何も考えずに複数起動するだけで実現できます。

やっと本題

Perl で記述した HTTPD Worker の例は、下記の通りです。 まずは、ソースコード全体を俯瞰してください。

#!/usr/bin/env perl

use strict;
use warnings;

use Coro;
use Net::RabbitFoot;
use JSON::XS;
use Readonly;
use Data::Dumper;

Readonly my @MQ_CONNECT_ARGS => (
    host  => '10.0.0.10',
    port  => 5672,
    user  => 'guest',
    pass  => 'guest',
    vhost => '/',
);

Readonly my $PREFETCH => 5;

$| = 1;

$SIG{HUP} = $SIG{INT} = $SIG{TERM} = sub {
    warn "Trapped SIGNAL.\n";
    $Coro::main->ready;
};

my $json = JSON::XS->new()->utf8();

my $rf = eval {
    Net::RabbitFoot->new()->load_xml_spec()->connect(
        @MQ_CONNECT_ARGS,
        (map {
            'on_' . $_ => failure_handler('connecn.on_' . $_)
        } qw(close read_failure)),
    );
};
die $@ if $@;

eval {
    my $ch = $rf->open_channel(
        on_close => failure_handler('channel.on_close'),
    );
    $ch->qos(prefetch_count => $PREFETCH);
    $ch->consume(
        queue      => 'http',
        no_ack     => 0,
        on_consume => unblock_sub {
            work($json, $ch, shift);
        }
    );
};
if ($@) {
    $rf->close;
    die $@;
};

schedule;
$rf->close;
exit;

sub failure_handler {
    my ($event) = @_;
    return unblock_sub {
        warn Dumper({$event => \@_});
        $Coro::main->ready;
    };
}

sub work {
    my ($json, $ch, $request,) = @_;

    my $response = make_http_response(@_);

    $ch->publish(
        routing_key => $request->{header}->reply_to,
        header      => {
            app_id  => $response->{code},
            headers => $response->{headers},
        },
        body        => $response->{body},
        on_return   => failure_handler('channel.on_return'),
    );

    $ch->ack(
        delivery_tag => $request->{deliver}->method_frame->delivery_tag,
    );

    return;
}

sub make_http_response {
    my ($json, $ch, $request,) = @_;

    warn Dumper({headers => $request->{header}->headers});

    my $request_body = $json->decode($request->{body}->payload);

    warn Dumper({body => $request_body});

    return {
        code    => 200,
        headers => {
            'Content-Type' => 'text/plain; charset=UTF-8',
        },
        body    => q{Merry X'mas},
    };
}
AnyEvent::RabbitMQ は、どこ行った?

ごめんなさい。今回は、AnyEvent::RabbitMQ を Coro で包んだ拙作の Net::RabbitFoot を使用しました。処理順を保証する為に発生する AnyEvent 系モジュールのコールバックサブルーチンのネストの嵐を見ると、私は精神的苦痛を感じるので、AnyEvent 系モジュールは、継続やモナドで包み、床下配線として利用する事にしています。

もし仮に AnyEvent::RabbitMQ を直接使う*4と、極端な話、connect 時に on_success に指定したコールバックサブルーチン内に全ての処理を記述するか、コードを AnyEvent->condvar だらけにか選択する事になります。

main スレッドについて

main スレッドは、下記の仕事をしています。

  • RabbitMQ へ接続して Channel を開く
  • キューに貯まったメッセージを 5 個ずつ捌く事を RabbitMQ へ通知
  • キューの監視を行うサブルーチンを登録
  • シグナルや障害が発生するまで待つ

では、順番に説明して行きます。

*** RabbitMQ へ接続して Channel を開く

スレッド毎に AMQP ブローカーと接続する行為は、リソースの無駄遣いであるため、AMQP には、一つの接続を Channel で分割する仕様が定義されています。通常は、各スレッド毎に Channel を開く事でスレッドセーフを保証します。

今回は、継続による疑似スレッドという事もあり、同時に実行されるスレッドは常に一つである事から、キューの監視で使用している Channel を、各スレッド内のメッセージ送信時にも使い回しています*5。もし Coro ではなく ithread を使う際には、スレッド毎に Channel を開く必要があります。

*** キューに貯まったメッセージを 5 個ずつ捌く事を RabbitMQ へ通知

qos で設定した prefetch_count がそれです。ここに 5 と指定すると、この Channel の処理中のメッセージが 5 個になるまで、メッセージが届く度に on_consume の unblock_sub が呼ばれます。つまり、ここに指定した値が並列度となります。

*** キューの監視を行うサブルーチンを登録

Coro から利用する場合は、必ず unblock_sub を指定してください。イベントループからコールバックサブルーチンが呼ばれるので、このコールバックサブルーチン内で Coro::rouse_cb を利用する際に具合が悪いのです。

no_ack に 0 を指定しているので、Ack しないとメッセージは消費されませんが、ここは 1 にして Ack 不要にしても構いません。

*** シグナルや障害が発生するまで待つ

schedule しているだけです。つまり、初期化処理後の main スレッドの主な仕事は、大人しく静かに寝ている事です。$Coro::main->ready が呼ばれるという事は、この Worker の終了を意味します。

on_consume により呼ばれる子スレッドについて

on_consume で登録されているコールバックサブルーチンは、work を呼んでいるだけなので、work の処理を見ていきます。work の処理は下記の通りです。

  • HTTP Request Header を標準エラーに出力
  • HTTP Request Body を標準エラーに出力
  • HTTP Response を返信用のキューに送信
  • メッセージを Ack する

では、順番に説明して行きます。

*** HTTP Request Header を標準エラーに出力

on_consume に指定されたコールバックサブルーチンの第一引数はハッシュとなっており、header と body というキーを持ちます。メッセージを送る publish の引数と比較して頂けると解り易いのですが、$request の header と body には、拙作の rabbitmq-http-proxy が Publish 時に指定した header と body が入っています。

*** HTTP Request Body を標準エラーに出力

rabbitmq-http-proxy の癖で、GET/POST されたパラメータは JSON 化されて body にセットされています。そこで、JSON::XS で decode しています。

*** HTTP Response を返信用のキューに送信

これまた rabbitmq-http-proxy の癖で、header の app_id に HTTP Status Code を指定しています。headers が配列のリファレンスではなく、ハッシュのリファレンスな所も、どうにかしたい所です。実案件では、headers に CGI.pm で作成した Cookie 等の HTTP Headers を設定しています。

*** メッセージを Ack する

前述しましたが、Consume 時の no_ack に 0 を指定しているので、メッセージを消費する為に Ack が必要です。Ack しない場合、Consume している Channel が閉じるか Cancel するまで、この Worker の別スレッドや、他の Worker が同一のメッセージを受け取る事ができません。

以上で Worker の説明終わりです。簡単でしたか?難しかったでしたか?簡単だと思って頂ければ成功なのですが…。

まとめ

国内外の AMQP と RabbitMQ の認知度の都合上、AnyEvent::RabbitMQ に対する意見や要望、機能追加や障害対応のパッチは、全て日本国外から頂いており、日本国内では、多分、全く利用されていません。

今回、例として扱った案件は、オープンソースだけで対応できる仮想の案件となりますが、クレジットカード決済のサービスを提供する際に発生した実案件が元となっています。故に AnyEvent::RabbitMQ や Net::RabbitFoot は、実際にミッションクリティカルな現場で利用されているものであり、日本国内のみなさんにとっても有益なモジュールであるという確信をもって紹介させて頂きました。

もしよろしければ、RabbitMQ 共々、ご利用頂ければ幸いです。

ちなみに、実案件で構築したシステムは、他にも多くの CPAN モジュールに依存した形で構築されており、この場をお借りして、Perl コミュニティにお礼申し上げます。いつも、良質なコードを提供してくれて、ありがとうございます。みなさんが良いクリスマスを迎えられますように!

*1: Core2Duo 3GH/Memory 2G 程度
*2: 対応していないクライアントも多いのです
*3: 自画自賛で恐縮です
*4: 海外ユーザの方々は、AnyEvent::RabbitMQ を直接使っているようです。Coro に依存したくないのでしょうね
*5: 実案件では、念のため、各スレッド毎に Channel を開いています