Coroを使って並行処理B!

こんにちわ。『層・圏・トポス』読者の会からの刺客、id:hirataraです。

「並行処理」って言葉はなんだか魅力的ですよね! そこで、今日はCoroを使った並行処理を紹介します。なお、Coroはコアモジュールではありませんので、使ってみたい場合にはCPANからインストールして下さい。

スレッドを作る

Coroでは、asyncによって新しいスレッドを作ることができます。ただし、asyncでスレッドを作っても、何もしなければ他のスレッドに勝手に処理が移ることはありません。asyncで作成した別のスレッドに制御を移すには、明示的な操作が必要となります。ここではcedeを使って、asyncブロックへ処理を移しましょう。

use strict;
use warnings;
use Coro;

async {
    print "Another thread\n";
};

print "main thread\n";
cede;
print "main thread, again.\n";

__END__

[実行結果]
main thread
Another thread
main thread, again.

cedeを呼んだタイミングでasyncブロックに制御が移り、ブロックを抜けた後にcedeの次の行に制御が戻って来ています。

並行処理させる

次に、いくつかのスレッドを並行処理をさせてみましょう。Coroでの並行処理は、あるスレッドの待ち時間に他のスレッドが動いてくれる、ってのがミソとなります。ここではCoro::Timer::sleepを使ってその原理を見てみます。

use strict;
use warnings;
use Coro;
use Coro::Timer;

my @coros;
for my $sec (1 .. 3){
    push @coros, async {
        print "wait $sec sec.\n";
        Coro::Timer::sleep $sec;
        print "after $sec sec.\n";
    } ;
}
$_->join for @coros;

__END__

[実行結果]
wait 1 sec.
wait 2 sec.
wait 3 sec.
after 1 sec.
after 2 sec.
after 3 sec.

1秒かかる処理、2秒かかる処理、3秒かかる処理を実行してますので、並行処理でなければ6秒かかるはずです。しかし、Coro::Timer::sleepを使っているため、これらのスレッドは並行動作し、このプログラムは3秒で終了します。

実例: URLのダウンロード

sleepするだけでは面白くないので、もう少し具体的な例を見ておきましょう。Coro::LWPを使うと、LWPがCoroを使ってなるべくブロックしないように動くようになります。言い換えると、LWPが、待ち時間になると他のスレッドに処理を移すことで並行動作するようになるということです。

use strict;
use warnings;
use Coro;
use Coro::LWP;
use LWP::UserAgent;

my @coros;
for my $url (
    'http://d.hatena.ne.jp/hiratara/',
    'http://www.google.co.jp/',
    'http://www.yahoo.co.jp/',
){
    push @coros, async {
        my $ua = LWP::UserAgent->new;
        print "load $url\n";
        my $res = $ua->get( $url );
        print "got $url: ", $res->title, "\n";
    };
}
$_->join for @coros;

__END__

[実行結果]
load http://d.hatena.ne.jp/hiratara/
load http://www.google.co.jp/
load http://www.yahoo.co.jp/
got http://www.google.co.jp/: Google
got http://www.yahoo.co.jp/: Yahoo! JAPAN
got http://d.hatena.ne.jp/hiratara/: a geek born in Tomakomai

全てのコンテンツの読み込みが同時にスタートし、並行して読み込みが行われていることがわかると思います。use Coro::LWPの部分をコメントにして動かした場合と結果を比較すると、より並行動作していることがはっきりするでしょう。

なお、非同期でHTTPアクセスを行うモジュールとしては、Coroの作者が作ったAnyEvent::HTTPというモジュールもあり、こちらのほうがよいとされています。ただし、Coro::LWPを使うと、LWPを使ったMechanizeやWeb::Scraperもそのまま動くという利点もありますので、ケースバイケースで選択するのがよいと思います。

実行の制御

Coroのスレッドは、scheduleを呼ぶと眠り、readyで起こされると再び動き始めます。この働きをそのまま利用して実行の順番を調整することもできますが、これは大変です。例えば、以下のコードでは、他のスレッドに勝手にメインスレッドが起こされてしまっており、Doneの出力処理が実行されていません。

async {
	# The evil thread
	$Coro::main->ready;
};

my $done = $Coro::current;

async {
    print "Please wait.\n";
    cede;
    print "Done\n";
    $done->ready;
};

print "main thread\n";
schedule;
print "main thread, again.\n";

__END__
[実行結果]
main thread
Please wait.
main thread, again.

このような場合、Coroが用意しているロックの機構を利用し、実行の順序を明示的に指示する必要があります。例えば、Signalを使うと、この処理は以下のように書けます。

async {
	# The evil thread
	$Coro::main->ready;
};

my $done = Coro::Signal->new;

async {
    print "Please wait.\n";
    cede;
    print "Done\n";
    $done->send;
};

print "main thread\n";
$done->wait;
print "main thread, again.\n";

__END__
[実行結果]
main thread
Please wait.
Done
main thread, again.

waitメソッドは、sendが呼ばれるまでブロックします。そして、asyncブロックからsendを送ったタイミングで、メインスレッドのロックが解除されて再び動き出します。

また、カウンター付きのロックが必要な場合は、Semaphoreを使います。Signalのwaitの代わりに、downを使ってブロックさせることができます。downは文字通りカウンタの値を1減らすメソッドですが、カウンタが0以下の時はブロックして正の値になるのを待ちます。そして、ブロックしているSemaphoreを起こすにはupを呼びます。

以下の例では負(正確には0以下)の値のセマフォを作り、3つのプロセスが全て終了するのを待っています。

my $num = 3;
my $semaphore = Coro::Semaphore->new( 1 - $num );

for my $i ( 1 .. $num ) {
    async {
        Coro::Timer::sleep $i;
        print "After $i sec.\n";
        $semaphore->up;
    } ;
}

$semaphore->down;
print "Finished\n";

ここでAnyEventのcondvarに慣れている方であれば、beginとendのようなことをしたいと考えるかもしれませんが、downメソッドは0以下値の時はブロックするので、beginの代わりには使えません。この場合、負の値でもブロックしないadjustメソッドを使うことができます。

my $semaphore = Coro::Semaphore->new;

for my $i ( 1 .. 3 ) {
    $semaphore->adjust( -1 );
    async {
        Coro::Timer::sleep $i;
        print "After $i sec.\n";
        $semaphore->up;
    } ;
}

$semaphore->down;
print "Finished\n";

これとは逆に正の値のセマフォは、同時に走る処理の数を制御するのに便利です。以下の例では、2個しかないリソースを10個のスレッドが奪い合っていますが、セマフォによってリソースに同時に触れるスレッドを制限しています。

my $lock = Coro::Semaphore->new( 2 );
my @resources = ( 'A', 'B' );

for (1 .. 10) {
    async {
        $lock->down;
        my $resource = shift @resources or die "Depleted energy source.";

        print "Got $resource.\n";
        Coro::Timer::sleep 1;  # Using the resource

        print "Finished using $resource.\n";
        push @resources, $resource;
        $lock->up;
    };
}

schedule;
# Never end.

なお、この処理はguardメソッドを使って以下のように書くこともできます。

    async {
        my $guard = $lock->guard;
        my $resource = shift @resources or die "Depleted energy source.";

        print "Got $resource.\n";
        Coro::Timer::sleep 1;  # Using the resource

        print "Finished using $resource.\n";
        push @resources, $resource;
    };

この書き方をすると、セマフォのupのし忘れを防ぐことができます。ただしこの場合でも、upは確実に実行されてもリソースの解放処理が必ず走るわけではないので、Guardクラスを使って適切なガードを書く方が無難かもしれません。

スレッド間でのデータのやりとり

スレッドが、処理した結果を返すにはterminateを使います。そして、その結果を受け取るにはjoinを使います。

my $coro = async {
	terminate "From the other thread";
};

print $coro->join, "\n";

__END__

[実行結果]
From the other thread

この例では明示的にterminateを呼んでいますが、asyncブロックは戻り値を渡してterminateを呼ぶように自動的にラップされるので、単にreturnで値を返してもjoinで値を受けることができます。

また、別のスレッドからcancelでスレッドをterminateさせることもできますが、この時に終了値を外のスレッドから渡すことが出来ます。

my $coro = async {
	print "Wait 10 sec.\n";
	Coro::Timer::sleep 10;
	terminate "From the other thread";
};

async {
	Coro::Timer::sleep 1;
	$coro->cancel( "Terminated" );
};

print $coro->join, "\n";

__END__

[実行結果]
Wait 10 sec.
Terminated

ただし、terminateとjoinでは、1つのスレッドから複数回値を返すことはできません。特にスレッドプールを使う場合には、スレッドが終了しないのでjoinが使えません。そこで、スレッド間でもっと自由にデータをやりとりする道具として、Coro::Channelがあります。Coro::Channelを使うと、簡単にスレッド間でデータのやり取りをすることができます。

my $ch = Coro::Channel->new;

async {
	$ch->put($_) for qw/one two three DAAAAA!/;
	$ch->shutdown;
};

while( my $got = $ch->get ){
	print $got, "\n";
}

__END__

[実行結果]
one
two
three
DAAAAA!

Coro::Channelの動作を、ブロッキングキューと見なすこともできます。デフォルトでは、大きさに制限のないキューとして振る舞います。つまり、putはブロックせず、getはキューが空の時だけブロックします。また、コンストラクタに値を与えることで、大きさ制限のあるキューとして利用することもできます。

スレッドへの割り込み

cancelで他のスレッドを終了させることができることを紹介しましたが、スレッドを終了せずに割り込みをかけることもできます。そのためには、throwを使います。

別のスレッドに対してthrowを呼ぶと、あたかもthrowを呼ばれたスレッドの中でdieが起こったように動作します。evalでブロックする処理を包んだ状態にしておけば、外からthrowで投げられた例外をキャッチすることができます。

use Try::Tiny;

my $done = Coro::Signal->new;

my $worker = async {
	# The worker
	try {
		while () {
			print "In the loop.\n";
			Coro::Timer::sleep 1;
		}
	}catch{
		if( /interrupted/ ){
			print "Interrupted by anyone.\n";
			$done->send;
		}else{
			die $_;
		}
	};
};

async {
	# An interrupter
	print "Wait 3 seconds.\n";
	Coro::Timer::sleep 3;
	print "Interrupt.\n";
	$worker->throw( 'interrupted' );
};

$done->wait;

__END__
[結果]
In the loop.
Wait 3 seconds.
In the loop.
In the loop.
Interrupt.
Interrupted by anyone.

その他の話題

このエントリでは、主にCoroの制御について説明しました。実際にCoroを使う場合は、主にIOの待ち時間を有効利用させることになります。Coroに対応したIOとしては、Coro::HandleCoro::SocketCoro::AIOなどがありますので、こちらを参照して下さい。

また、CoroはAnyEventに対応したモジュールにおいて、コールバックが戻ってくるまでの待ち時間を有効利用するためにも使えます。CoroとAnyEventを組み合わせて使う方法についてはこちらのエントリこちらのエントリも参照して下さい。

そして、たくさんのスレッドを扱う場合に有効な手段となるスレッドプールに関しては、こちらのエントリも参照して下さい。

まとめ

Coroでスレッドを作り、並行動作させることができます。I/O待ちなどによってブロックが多く発生するプログラムでは、Coroによって動作速度を改善できるでしょう。

明日はあのAcme大全の著者、makamaka_at_donzokoさんです。とても楽しみですね!

参考リンク