Perlでの並行・並列処理 - Parallel::ForkManager
大量のデータを処理する場合や、複数の独立したタスクを実行する場合、並列処理を活用することで処理時間を大幅に短縮できます。Parallel::ForkManager は、Perlで並列処理を実現する最も人気のあるモジュールの一つです。fork を使ったマルチプロセス処理を簡単に扱えるようにします。
並行処理と並列処理の違い
まず、用語の整理をしておきましょう:
- 並行処理(Concurrency): 複数のタスクを交互に実行し、同時に進行しているように見せる
- 並列処理(Parallelism): 複数のタスクを物理的に同時に実行する
Parallel::ForkManager は、fork を使って複数のプロセスを生成するため、真の並列処理を実現します。マルチコアCPUで複数のプロセスが本当に同時に実行されます。
Parallel::ForkManager の基本
インストール
1
| cpanm Parallel::ForkManager
|
最もシンプルな例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| use Parallel::ForkManager;
use feature qw(say);
# 最大3つの子プロセスを並列実行
my $pm = Parallel::ForkManager->new(3);
for my $i (1..10) {
# 子プロセスを起動
$pm->start and next;
# ここから子プロセスの処理
say "プロセス $i 開始";
sleep 2; # 処理をシミュレート
say "プロセス $i 完了";
# 子プロセス終了
$pm->finish;
}
# すべての子プロセスが終了するまで待つ
$pm->wait_all_children;
say "全プロセス完了";
|
このコードは:
- 最大3つの子プロセスを同時実行
- 10個のタスクを順次処理
- 各タスクが2秒かかる
- 逐次処理なら20秒かかるが、並列処理で約8秒で完了
基本的な仕組み
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| use Parallel::ForkManager;
use feature qw(say);
my $pm = Parallel::ForkManager->new(2); # 最大2プロセス
# データのリスト
my @tasks = (1..5);
for my $task (@tasks) {
# start() は親プロセスで子プロセスのPIDを返し、
# 子プロセスでは0を返す
my $pid = $pm->start;
if ($pid) {
# 親プロセス
say "親: タスク $task を子プロセス $pid で開始";
next; # 次のイテレーションへ
}
# 以下は子プロセスのみ実行
say "子: タスク $task を処理中(PID: $$)";
sleep 1;
# finish() を呼んで子プロセス終了
$pm->finish;
}
# すべての子プロセスの終了を待つ
$pm->wait_all_children;
say "親: すべてのタスク完了";
|
実用例
Webスクレイピング
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| use Parallel::ForkManager;
use HTTP::Tiny;
use feature qw(say);
my $pm = Parallel::ForkManager->new(5); # 同時に5つまで
my @urls = (
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3',
'https://example.com/page4',
'https://example.com/page5',
'https://example.com/page6',
);
for my $url (@urls) {
$pm->start and next;
# 各子プロセスでHTTPリクエスト
say "取得開始: $url (PID: $$)";
my $http = HTTP::Tiny->new(timeout => 10);
my $response = $http->get($url);
if ($response->{success}) {
my $length = length($response->{content});
say "完了: $url - ${length}バイト";
} else {
say "エラー: $url - $response->{status} $response->{reason}";
}
$pm->finish;
}
$pm->wait_all_children;
say "すべてのURLを取得しました";
|
バッチ処理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| use Parallel::ForkManager;
use feature qw(say);
# 大量の画像ファイルをリサイズする例
my @images = glob('/path/to/images/*.jpg');
my $pm = Parallel::ForkManager->new(4); # CPUコア数に応じて調整
for my $image (@images) {
$pm->start and next;
say "処理中: $image";
# 画像処理をシミュレート
# 実際には ImageMagick などを使用
system("convert", $image, "-resize", "800x600", "resized_$image");
say "完了: $image";
$pm->finish;
}
$pm->wait_all_children;
say "すべての画像処理が完了しました";
|
ログファイルの並列解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| use Parallel::ForkManager;
use feature qw(say);
my @log_files = glob('/var/log/app/*.log');
my $pm = Parallel::ForkManager->new(8);
my %results;
for my $log_file (@log_files) {
$pm->start and next;
say "解析中: $log_file";
open my $fh, '<', $log_file or die "ファイルが開けません: $!";
my $error_count = 0;
my $warning_count = 0;
while (my $line = <$fh>) {
$error_count++ if $line =~ /ERROR/;
$warning_count++ if $line =~ /WARN/;
}
close $fh;
say "$log_file: ERROR=$error_count, WARN=$warning_count";
$pm->finish;
}
$pm->wait_all_children;
say "ログ解析完了";
|
子プロセスからのデータ取得
子プロセスの処理結果を親プロセスに返すには、コールバックを使用します:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| use Parallel::ForkManager;
use feature qw(say);
my $pm = Parallel::ForkManager->new(3);
# 子プロセスが終了したときのコールバック
$pm->run_on_finish(sub {
my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
if (defined $data) {
say "PID $pid の結果: $data->{result}";
}
});
for my $i (1..5) {
$pm->start and next;
# 処理をシミュレート
my $result = $i * $i;
sleep 1;
# データを親プロセスに渡す
$pm->finish(0, { result => $result });
}
$pm->wait_all_children;
say "すべての計算完了";
|
より複雑なデータの受け渡し
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| use Parallel::ForkManager;
use feature qw(say);
my $pm = Parallel::ForkManager->new(4);
my %all_results;
$pm->run_on_finish(sub {
my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
if (defined $data) {
$all_results{$data->{file}} = {
lines => $data->{lines},
words => $data->{words},
chars => $data->{chars},
};
}
});
my @files = qw(file1.txt file2.txt file3.txt);
for my $file (@files) {
$pm->start and next;
# ファイルの統計情報を取得
open my $fh, '<', $file or die "Cannot open $file: $!";
my $lines = 0;
my $words = 0;
my $chars = 0;
while (my $line = <$fh>) {
$lines++;
$chars += length($line);
$words += scalar(split /\s+/, $line);
}
close $fh;
# 結果を返す
$pm->finish(0, {
file => $file,
lines => $lines,
words => $words,
chars => $chars,
});
}
$pm->wait_all_children;
# 結果を表示
say "=== ファイル統計 ===";
for my $file (keys %all_results) {
my $stats = $all_results{$file};
say "$file:";
say " 行数: $stats->{lines}";
say " 単語数: $stats->{words}";
say " 文字数: $stats->{chars}";
}
|
エラーハンドリング
基本的なエラー処理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| use Parallel::ForkManager;
use Try::Tiny;
use feature qw(say);
my $pm = Parallel::ForkManager->new(3);
# エラー発生時のコールバック
$pm->run_on_finish(sub {
my ($pid, $exit_code) = @_;
if ($exit_code != 0) {
say "PID $pid がエラー終了しました(終了コード: $exit_code)";
}
});
for my $i (1..5) {
$pm->start and next;
try {
# エラーが起こる可能性のある処理
die "意図的なエラー" if $i == 3;
say "タスク $i 成功";
$pm->finish(0); # 成功
}
catch {
say "タスク $i でエラー: $_";
$pm->finish(1); # エラー終了
};
}
$pm->wait_all_children;
|
タイムアウト処理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| use Parallel::ForkManager;
use feature qw(say);
my $pm = Parallel::ForkManager->new(3);
for my $i (1..5) {
$pm->start and next;
# タイムアウト設定(5秒)
local $SIG{ALRM} = sub { die "タイムアウト\n" };
alarm 5;
eval {
say "タスク $i 開始";
# 時間がかかる処理をシミュレート
sleep($i * 2);
say "タスク $i 完了";
alarm 0; # アラームをキャンセル
};
if ($@) {
if ($@ =~ /タイムアウト/) {
say "タスク $i がタイムアウトしました";
$pm->finish(1);
} else {
die $@;
}
}
$pm->finish(0);
}
$pm->wait_all_children;
|
パフォーマンス最適化
適切なプロセス数の設定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| use Parallel::ForkManager;
use Sys::Info;
use feature qw(say);
# CPU コア数を取得
my $info = Sys::Info->new;
my $cpu = $info->device('CPU');
my $cpu_count = $cpu->count || 4; # 取得できない場合は4
say "検出されたCPUコア数: $cpu_count";
# CPUバウンドな処理の場合はコア数と同じ
my $pm = Parallel::ForkManager->new($cpu_count);
# I/Oバウンドな処理の場合はコア数の2倍程度
# my $pm = Parallel::ForkManager->new($cpu_count * 2);
# 処理...
|
データの分割方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| use Parallel::ForkManager;
use List::Util qw(sum);
use feature qw(say);
# 大量のデータを効率的に処理
my @data = (1..10000);
my $chunk_size = 100;
my $pm = Parallel::ForkManager->new(4);
my @results;
$pm->run_on_finish(sub {
my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
push @results, @{$data->{results}} if $data;
});
# データをチャンクに分割して処理
for (my $i = 0; $i < @data; $i += $chunk_size) {
$pm->start and next;
my $end = $i + $chunk_size - 1;
$end = $#data if $end > $#data;
my @chunk = @data[$i..$end];
# チャンク内の各要素を処理
my @chunk_results = map { $_ * 2 } @chunk;
$pm->finish(0, { results => \@chunk_results });
}
$pm->wait_all_children;
say "処理結果の総数: ", scalar(@results);
say "合計: ", sum(@results);
|
メモリ使用量の最適化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| use Parallel::ForkManager;
use feature qw(say);
my $pm = Parallel::ForkManager->new(4);
# 大量のデータを扱う場合は、親プロセスでデータを保持せず、
# 子プロセスで必要なデータのみを読み込む
my @file_list = glob('/large/dataset/*.dat');
for my $file (@file_list) {
$pm->start and next;
# 子プロセスでファイルを読み込む
open my $fh, '<', $file or die "Cannot open $file: $!";
# 処理(一度に全部メモリに読み込まない)
while (my $line = <$fh>) {
# 行ごとに処理
process_line($line);
}
close $fh;
$pm->finish;
}
$pm->wait_all_children;
sub process_line {
my ($line) = @_;
# 処理...
}
|
データベース処理での注意点
fork を使う際、データベース接続には注意が必要です:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| use Parallel::ForkManager;
use DBI;
use feature qw(say);
my $pm = Parallel::ForkManager->new(4);
# 親プロセスでの接続は作らない
# my $dbh = DBI->connect(...); # これはNG!
my @tasks = (1..10);
for my $task (@tasks) {
$pm->start and next;
# 各子プロセスで接続を作成
my $dbh = DBI->connect(
'dbi:SQLite:dbname=test.db',
'', '',
{ RaiseError => 1, AutoCommit => 1 }
);
# データベース操作
my $sth = $dbh->prepare('SELECT * FROM users WHERE id = ?');
$sth->execute($task);
my $user = $sth->fetchrow_hashref;
say "タスク $task: ユーザー $user->{name}";
# 接続を閉じる
$dbh->disconnect;
$pm->finish;
}
$pm->wait_all_children;
|
実用的な完全例:URLリストのダウンロードと処理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
| use Parallel::ForkManager;
use HTTP::Tiny;
use Try::Tiny;
use feature qw(say);
use Time::HiRes qw(time);
my $start_time = time;
# 設定
my $max_processes = 10;
my $timeout = 30;
# 処理するURLリスト
my @urls = (
'https://example.com/api/data/1',
'https://example.com/api/data/2',
# ... 多数のURL
);
my $pm = Parallel::ForkManager->new($max_processes);
my %results;
my $success_count = 0;
my $error_count = 0;
# 結果収集のコールバック
$pm->run_on_finish(sub {
my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
return unless defined $data;
$results{$data->{url}} = $data;
if ($data->{success}) {
$success_count++;
say "成功: $data->{url} (${data->{size}}バイト)";
} else {
$error_count++;
say "失敗: $data->{url} - $data->{error}";
}
});
# 各URLを並列処理
for my $url (@urls) {
$pm->start and next;
my $result = {
url => $url,
success => 0,
};
try {
my $http = HTTP::Tiny->new(timeout => $timeout);
my $response = $http->get($url);
if ($response->{success}) {
$result->{success} = 1;
$result->{size} = length($response->{content});
$result->{status} = $response->{status};
# ここでダウンロードしたコンテンツを処理
# process_content($response->{content});
} else {
$result->{error} = "$response->{status} $response->{reason}";
}
}
catch {
$result->{error} = $_;
};
$pm->finish(0, $result);
}
# すべての子プロセスの終了を待つ
$pm->wait_all_children;
my $elapsed = time - $start_time;
# 結果のサマリー
say "\n=== 処理完了 ===";
say "総URL数: ", scalar(@urls);
say "成功: $success_count";
say "失敗: $error_count";
say sprintf("処理時間: %.2f秒", $elapsed);
say sprintf("平均: %.2f秒/URL", $elapsed / @urls);
|
まとめ
Parallel::ForkManager を使うことで、Perlで簡単に並列処理を実装できます。
重要なポイント:
- 適切なプロセス数 - CPUコア数を考慮(CPU処理)、I/O待ちならより多く
- データの受け渡し -
run_on_finish コールバックを活用 - エラー処理 - 各子プロセスで適切にエラーをハンドル
- リソース管理 - データベース接続などは子プロセスで作成
- メモリ効率 - 大量データは親プロセスで保持しない
よくあるユースケース:
- Webスクレイピング
- 画像/動画の一括変換
- ログファイルの解析
- バッチ処理
- APIリクエストの並列実行
並列処理を適切に使うことで、処理時間を劇的に短縮できます。ただし、オーバーヘッドもあるため、小規模なタスクでは逐次処理の方が速い場合もあります。ベンチマークを取って、実際の効果を確認することが重要です。