Perlでの非同期処理 - IO::Async と Mojo::IOLoop
非同期プログラミングは、I/O待ちの時間を有効活用して、複数の処理を並行して扱う技術です。Perlには優れた非同期処理のライブラリが存在し、その中でもIO::AsyncとMojo::IOLoopが代表的です。
イベント駆動プログラミングの基本
イベント駆動プログラミングでは、処理をブロックせずにイベントループでイベントを待ち受けます。これにより、I/O待ちの間に他の処理を実行できます。
1
2
3
4
5
6
7
8
|
# 同期的な処理(ブロッキング)
my $result1 = fetch_data($url1); # 完了まで待つ
my $result2 = fetch_data($url2); # result1完了後に実行
# 非同期処理(ノンブロッキング)
fetch_data_async($url1, sub { my $result1 = shift; });
fetch_data_async($url2, sub { my $result2 = shift; });
# 両方同時に実行される
|
IO::Async の使い方
IO::Asyncは、Future/Promiseベースの非同期フレームワークです。
基本的な使い方
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
use IO::Async::Loop;
use IO::Async::Timer::Countdown;
my $loop = IO::Async::Loop->new;
# タイマーの作成
my $timer = IO::Async::Timer::Countdown->new(
delay => 3,
on_expire => sub {
print "3秒経過しました\n";
$loop->stop;
},
);
$loop->add($timer);
$timer->start;
print "タイマー開始\n";
$loop->run;
|
複数のタイマーを同時実行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
use IO::Async::Loop;
use IO::Async::Timer::Countdown;
my $loop = IO::Async::Loop->new;
my @timers;
for my $delay (1, 2, 3) {
my $timer = IO::Async::Timer::Countdown->new(
delay => $delay,
on_expire => sub {
print "${delay}秒のタイマーが完了\n";
},
);
$loop->add($timer);
$timer->start;
push @timers, $timer;
}
# 3秒後にループを停止
my $stop_timer = IO::Async::Timer::Countdown->new(
delay => 3.5,
on_expire => sub { $loop->stop },
);
$loop->add($stop_timer);
$stop_timer->start;
$loop->run;
|
IO::Async での非同期HTTP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
use IO::Async::Loop;
use Net::Async::HTTP;
use Future::Utils qw(fmap_void);
my $loop = IO::Async::Loop->new;
my $http = Net::Async::HTTP->new;
$loop->add($http);
my @urls = (
'https://www.perl.org/',
'https://metacpan.org/',
'https://perldoc.perl.org/',
);
# 並行して複数URLを取得
my @futures = map {
my $url = $_;
$http->GET($url)->then(sub {
my $response = shift;
printf "%s: %d bytes\n", $url, length($response->content);
Future->done;
})->catch(sub {
my $error = shift;
warn "Failed to fetch $url: $error\n";
Future->done;
});
} @urls;
Future->wait_all(@futures)->get;
|
IO::Async::Process でプロセス実行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
use IO::Async::Loop;
use IO::Async::Process;
my $loop = IO::Async::Loop->new;
my $process = IO::Async::Process->new(
command => ['perl', '-e', 'print "Hello from child\n"; sleep 2; print "Done\n"'],
stdout => {
on_read => sub {
my ($stream, $buffref) = @_;
print "Child: $$buffref";
$$buffref = '';
return 0;
},
},
on_finish => sub {
my ($self, $exitcode) = @_;
print "Child process finished with exit code $exitcode\n";
$loop->stop;
},
);
$loop->add($process);
print "Starting child process...\n";
$loop->run;
|
Mojo::IOLoop の活用
Mojo::IOLoopは、Mojoliciousフレームワークの一部として提供される非同期イベントループです。
基本的なタイマー
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
use Mojo::IOLoop;
# 2秒後に実行
Mojo::IOLoop->timer(2 => sub {
my $loop = shift;
say '2秒経過しました';
});
# 1秒ごとに実行(5回まで)
my $count = 0;
my $id = Mojo::IOLoop->recurring(1 => sub {
my $loop = shift;
$count++;
say "Tick $count";
$loop->remove($id) if $count >= 5;
});
# ループ開始
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
|
Mojo::IOLoop での非同期HTTP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
use Mojo::UserAgent;
use Mojo::IOLoop;
my $ua = Mojo::UserAgent->new;
my @urls = qw(
https://www.perl.org/
https://metacpan.org/
https://perldoc.perl.org/
);
my $count = 0;
for my $url (@urls) {
$ua->get($url => sub {
my ($ua, $tx) = @_;
if (my $res = $tx->success) {
printf "%s: %d bytes\n", $url, length($res->body);
} else {
my $err = $tx->error;
warn "Failed to fetch $url: $err->{message}\n";
}
$count++;
Mojo::IOLoop->stop if $count == @urls;
});
}
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
|
WebSocketの非同期処理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
use Mojo::UserAgent;
use Mojo::IOLoop;
my $ua = Mojo::UserAgent->new;
# WebSocket接続(エコーサーバー)
$ua->websocket('wss://echo.websocket.org/' => sub {
my ($ua, $tx) = @_;
unless ($tx->is_websocket) {
say 'WebSocket handshake failed!';
return;
}
say 'WebSocket connected';
# メッセージ受信時
$tx->on(message => sub {
my ($tx, $msg) = @_;
say "Received: $msg";
$tx->finish;
});
# メッセージ送信
$tx->send('Hello WebSocket!');
});
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
|
実用例: 並行ファイルダウンロード
IO::AsyncとNet::Async::HTTPを使った並行ダウンロード:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
use IO::Async::Loop;
use Net::Async::HTTP;
use Path::Tiny;
use Future::Utils qw(fmap_void);
my $loop = IO::Async::Loop->new;
my $http = Net::Async::HTTP->new(max_connections_per_host => 4);
$loop->add($http);
sub download_file {
my ($url, $output_path) = @_;
$http->GET($url)->then(sub {
my $response = shift;
path($output_path)->spew_raw($response->content);
printf "Downloaded: %s (%d bytes)\n", $url, length($response->content);
Future->done;
})->catch(sub {
my $error = shift;
warn "Failed to download $url: $error\n";
Future->done;
});
}
my %downloads = (
'https://www.perl.org/favicon.ico' => '/tmp/perl.ico',
'https://metacpan.org/favicon.ico' => '/tmp/metacpan.ico',
);
my @futures = map {
download_file($_, $downloads{$_})
} keys %downloads;
Future->wait_all(@futures)->get;
|
パフォーマンス比較
同期処理と非同期処理の実行時間の違い:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
use Time::HiRes qw(time);
use Mojo::UserAgent;
my $ua = Mojo::UserAgent->new;
my @urls = ('https://www.perl.org/') x 5;
# 同期処理
my $start = time;
for my $url (@urls) {
$ua->get($url)->result;
}
printf "Synchronous: %.2f seconds\n", time - $start;
# 非同期処理
$start = time;
my $count = 0;
for my $url (@urls) {
$ua->get($url => sub {
$count++;
Mojo::IOLoop->stop if $count == @urls;
});
}
Mojo::IOLoop->start;
printf "Asynchronous: %.2f seconds\n", time - $start;
|
まとめ
- IO::Async: Future/Promiseベースで、複雑な非同期フローを扱いやすい
- Mojo::IOLoop: シンプルで直感的、Mojoliciousと統合されている
- 非同期処理により、I/O待ちを有効活用して並行処理が可能
- 適切に使えば、パフォーマンスが大幅に向上する
どちらのライブラリも優れていますが、既にMojoliciousを使っている場合はMojo::IOLoop、より汎用的な非同期処理が必要な場合はIO::Asyncが適しています。