Tracks

書いてみたい方はココを参照下さい。

Resque で学ぶジョブキューイング B!


hatak <id.hatak@gmail.com>


こんにちは。hatak (@hisashi) です。

Perl でジョブキューというと TheSchwartzQudo などの名前が挙がるかと思いますが、今回はバックエンドに Redis を利用したジョブキュー "Resque" を紹介します。

Resque はバックグラウンドジョブを処理するためのライブラリで、Github を始め大規模なサイトでも利用されています。もともとは Rubyライブラリですが、Ruby 以外の様々な言語でも実装されています。今回紹介するのはその Perl 実装のモジュールです。

試してみる

クライアントで入力した文字列をワーカーが表示するだけのシンプルなプログラムを作ってみます。

まずは Redis を利用できるようにしておく必要があります。今回はローカルの Redis を利用しますが、異なるホストで動作している場合は適宜ホスト名やポート番号を変更してください。

client.pl
#!/usr/bin/env perl
use strict;
use warnings;

use Resque;
use Log::Minimal;

my $resque = Resque->new(redis => '127.0.0.1:6379');

print 'input text: ';
while (my $text = <STDIN>){
    chomp $text;
    if ($text) {    # if defined
        $resque->push('echo' => {
                class => 'My::Echo',
                args => [ $text ],
            });
        infof('enqueue: %s', $text);
    }
    print 'input text: ';
}
worker.pl
#!/usr/bin/env perl
use strict;
use warnings;

use Resque;
use Log::Minimal;

my $worker = Resque->new(redis => '127.0.0.1:6379')->worker;
$worker->add_queue('echo');
infof('### start worker');
$worker->work;
infof('### stop worker');

また、ワーカーから実行する処理をモジュールとして作成しておきます。実行時には perform メソッドが呼び出されるので、ここに処理を記述します。

lib/MyTask/Echo.pm
package MyTask::Echo;
use strict;
use v5.10;

use Log::Minimal;

sub perform {
    my $job = shift;
    infof('dequeue: %s', ddf($job->args));
    say $job->args->[0];
}

1;

今回のサンプルで利用している依存モジュールもインストールしておきます。

さて、実行してみましょう。

ターミナルを複数用意し、一方で worker.pl を実行します。

$ ./worker.pl
2012-12-12T12:12:02 [INFO] ### start worker at ./worker.pl line 10

そして、もう一方では client.pl を実行し、何かテキストを入力します。

$ ./client.pl
input text: hello
2012-12-12T12:12:07 [INFO] enqueue: hello at ./client.pl line 18

すると、入力したテキストが worker.pl を実行していたターミナルに表示されます。

$ ./worker.pl
2012-12-12T12:12:02 [INFO] ### start worker at ./worker.pl line 10
2012-12-12T12:12:12 [INFO] dequeue: ['hello'] at /Users/hatak/work/jq/lib/MyTask/Echo.pm line 9
hello

worker.pl を複数立ち上げた場合、クライアントから送信したテキストはいずれかのワーカープロセスで処理され表示されます。もちろん、複数のクライアントでも動作します。簡単ですね。

仕組みを見てみる

Resque を利用したジョブキューの処理構成を図に示します。

+----------+  Resque   +-------+  Resque   +----------+
| client 1 | --------> |       | <-------- | worker 1 |
+----------+           |       |           +----------+
                       | Redis |
+----------+  Resque   |       |  Resque   +----------+
| client 2 | --------> |       | <-------- | worker 2 |
+----------+           +-------+           +----------+

クライアントからジョブが Redis のキューに格納されます。これを、それぞれのワーカープロセスが読み取り、ジョブを処理をする形です。

サンプルコードには時刻が分かるようにログ出力を入れましたが、クライアントからのジョブが実行されるまで数秒遅延することがわかります。

$resque->push('echo' => {
        class => 'MyTask::Echo',
        args => [ $text ],
    });

今回の例では、"echo" という名前のキューに、実行するクラス名 "MyTask::Echo" が引数と共にキューイングされます。

ワーカーは指定されたキューをチェックし、ジョブが入っていれば取り出して指定されたクラスを require し、perform メソッドを実行します。

複数のワーカープロセスが存在している状態でも、Redis のアトミック操作を利用しているため二重には処理されません。

Redis の中身をのぞいてみましょう。

redis-cli でのぞいてみると、次のようなキーがあることがわかります。

127.0.0.1:6379> keys "resque:*"
 1) "resque:worker:sakura.local:25712:echo:started"
 2) "resque:worker:sakura.local:25888:echo:started"
 3) "resque:stat:processed:sakura.local:25888:echo"
 4) "resque:queues"
 5) "resque:failed"
 6) "resque:queue:echo"
 7) "resque:stat:failed"
 8) "resque:stat:processed:sakura.local:25712:echo"
 9) "resque:stat:processed"
10) "resque:workers"
11) "resque:stat:failed:sakura.local:25888:echo"

格納されている Resque に関連するデータは全てテキストデータです。キュー本体も JSON でシリアライズされたジョブが リスト型で格納されているだけですので、簡単にチェックすることができます。

共通

resque:queues
[set型] キューのリスト

resque:workers
[set型] ワーカープロセスのリスト

resque:failed
[list型] 失敗したジョブの結果 (JSON) のリスト

resque:stat:processed
[string型] 取り出した (dequeue) ジョブの総数

resque:stat:failed
[string型] 処理に失敗したジョブの総数

キュー

resque:queue:<キュー名>
[list型] ジョブ (JSON) のリストが格納されているキュー本体、

ワーカープロセス

resque:worker:<ホスト名>:<ポート番号>:<キュー名>:started
[string型] ワーカープロセスの開始時刻

resque:stat:processed:<ホスト名>:<ポート番号>:<キュー名>
[string型] ワーカーが取り出した (dequeue) ジョブの数

resque:stat:failed:<ホスト名>:<ポート番号>:<キュー名>
[string型] ワーカーが処理に失敗したジョブの数

ワーカーは "resque:queue:<キュー名>" からジョブを取り出し、処理します。processed のカウントは取り出した時点で処理の成否にかかわらずインクリメントされます。

処理が失敗した場合は "resque:failed" のリストに JSON シリアライズされて追加され、failed のカウントがインクリメントされます。

これらのデータは、キューやワーカーの状態によってキーの存在自体が変化します。

例えば、ワーカーごとのキーはワーカープロセスを終了すると削除されるので、ワーカーを止めたりして動きを見てみると面白いです。

まとめ

Redis を利用したジョブキュー "Resque" を紹介しました。バックエンドに採用されている Redis のデータも簡単に見られるので、ジョブキューの動きを学ぶこともできます。

ちなみに、シンプルに実装された例もあります。基本的な動きはこちらでも同じです。

ジョブキューもデータストアも、特性をうまく活かして使い分けたいと思いました。