Featured image of post Perlでの並行・並列処理 — Parallel::ForkManager

Perlでの並行・並列処理 — Parallel::ForkManager

Parallel::ForkManager を使った Perl の並行・並列処理の基本と実用例を解説します。

Perlでの並行・並列処理 - Parallel::ForkManager

大量のデータを処理する場合や、複数の独立したタスクを実行する場合、並列処理を活用することで処理時間を大幅に短縮できます。Parallel::ForkManager は、Perlで並列処理を実現する最も人気のあるモジュールの一つです。fork を使ったマルチプロセス処理を簡単に扱えるようにします。

並行処理と並列処理の違い

まず、用語の整理をしておきましょう:

  • 並行処理(Concurrency): 複数のタスクを交互に実行し、同時に進行しているように見せる
  • 並列処理(Parallelism): 複数のタスクを物理的に同時に実行する

Parallel::ForkManager は、fork を使って複数のプロセスを生成するため、真の並列処理を実現します。マルチコアCPUで複数のプロセスが本当に同時に実行されます。

Parallel::ForkManager の基本

インストール

1
cpanm Parallel::ForkManager

最もシンプルな例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use Parallel::ForkManager;
use feature qw(say);

# 最大3つの子プロセスを並列実行
my $pm = Parallel::ForkManager->new(3);

for my $i (1..10) {
    # 子プロセスを起動
    $pm->start and next;
    
    # ここから子プロセスの処理
    say "プロセス $i 開始";
    sleep 2;  # 処理をシミュレート
    say "プロセス $i 完了";
    
    # 子プロセス終了
    $pm->finish;
}

# すべての子プロセスが終了するまで待つ
$pm->wait_all_children;

say "全プロセス完了";

このコードは:

  1. 最大3つの子プロセスを同時実行
  2. 10個のタスクを順次処理
  3. 各タスクが2秒かかる
  4. 逐次処理なら20秒かかるが、並列処理で約8秒で完了

基本的な仕組み

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use Parallel::ForkManager;
use feature qw(say);

my $pm = Parallel::ForkManager->new(2);  # 最大2プロセス

# データのリスト
my @tasks = (1..5);

for my $task (@tasks) {
    # start() は親プロセスで子プロセスのPIDを返し、
    # 子プロセスでは0を返す
    my $pid = $pm->start;
    
    if ($pid) {
        # 親プロセス
        say "親: タスク $task を子プロセス $pid で開始";
        next;  # 次のイテレーションへ
    }
    
    # 以下は子プロセスのみ実行
    say "子: タスク $task を処理中(PID: $$)";
    sleep 1;
    
    # finish() を呼んで子プロセス終了
    $pm->finish;
}

# すべての子プロセスの終了を待つ
$pm->wait_all_children;
say "親: すべてのタスク完了";

実用例

Webスクレイピング

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
use Parallel::ForkManager;
use HTTP::Tiny;
use feature qw(say);

my $pm = Parallel::ForkManager->new(5);  # 同時に5つまで

my @urls = (
    'https://example.com/page1',
    'https://example.com/page2',
    'https://example.com/page3',
    'https://example.com/page4',
    'https://example.com/page5',
    'https://example.com/page6',
);

for my $url (@urls) {
    $pm->start and next;
    
    # 各子プロセスでHTTPリクエスト
    say "取得開始: $url (PID: $$)";
    
    my $http = HTTP::Tiny->new(timeout => 10);
    my $response = $http->get($url);
    
    if ($response->{success}) {
        my $length = length($response->{content});
        say "完了: $url - ${length}バイト";
    } else {
        say "エラー: $url - $response->{status} $response->{reason}";
    }
    
    $pm->finish;
}

$pm->wait_all_children;
say "すべてのURLを取得しました";

バッチ処理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use Parallel::ForkManager;
use feature qw(say);

# 大量の画像ファイルをリサイズする例
my @images = glob('/path/to/images/*.jpg');
my $pm = Parallel::ForkManager->new(4);  # CPUコア数に応じて調整

for my $image (@images) {
    $pm->start and next;
    
    say "処理中: $image";
    
    # 画像処理をシミュレート
    # 実際には ImageMagick などを使用
    system("convert", $image, "-resize", "800x600", "resized_$image");
    
    say "完了: $image";
    
    $pm->finish;
}

$pm->wait_all_children;
say "すべての画像処理が完了しました";

ログファイルの並列解析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use Parallel::ForkManager;
use feature qw(say);

my @log_files = glob('/var/log/app/*.log');
my $pm = Parallel::ForkManager->new(8);

my %results;

for my $log_file (@log_files) {
    $pm->start and next;
    
    say "解析中: $log_file";
    
    open my $fh, '<', $log_file or die "ファイルが開けません: $!";
    
    my $error_count = 0;
    my $warning_count = 0;
    
    while (my $line = <$fh>) {
        $error_count++   if $line =~ /ERROR/;
        $warning_count++ if $line =~ /WARN/;
    }
    
    close $fh;
    
    say "$log_file: ERROR=$error_count, WARN=$warning_count";
    
    $pm->finish;
}

$pm->wait_all_children;
say "ログ解析完了";

子プロセスからのデータ取得

子プロセスの処理結果を親プロセスに返すには、コールバックを使用します:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use Parallel::ForkManager;
use feature qw(say);

my $pm = Parallel::ForkManager->new(3);

# 子プロセスが終了したときのコールバック
$pm->run_on_finish(sub {
    my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
    
    if (defined $data) {
        say "PID $pid の結果: $data->{result}";
    }
});

for my $i (1..5) {
    $pm->start and next;
    
    # 処理をシミュレート
    my $result = $i * $i;
    sleep 1;
    
    # データを親プロセスに渡す
    $pm->finish(0, { result => $result });
}

$pm->wait_all_children;
say "すべての計算完了";

より複雑なデータの受け渡し

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use Parallel::ForkManager;
use feature qw(say);

my $pm = Parallel::ForkManager->new(4);

my %all_results;

$pm->run_on_finish(sub {
    my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
    
    if (defined $data) {
        $all_results{$data->{file}} = {
            lines => $data->{lines},
            words => $data->{words},
            chars => $data->{chars},
        };
    }
});

my @files = qw(file1.txt file2.txt file3.txt);

for my $file (@files) {
    $pm->start and next;
    
    # ファイルの統計情報を取得
    open my $fh, '<', $file or die "Cannot open $file: $!";
    
    my $lines = 0;
    my $words = 0;
    my $chars = 0;
    
    while (my $line = <$fh>) {
        $lines++;
        $chars += length($line);
        $words += scalar(split /\s+/, $line);
    }
    
    close $fh;
    
    # 結果を返す
    $pm->finish(0, {
        file  => $file,
        lines => $lines,
        words => $words,
        chars => $chars,
    });
}

$pm->wait_all_children;

# 結果を表示
say "=== ファイル統計 ===";
for my $file (keys %all_results) {
    my $stats = $all_results{$file};
    say "$file:";
    say "  行数: $stats->{lines}";
    say "  単語数: $stats->{words}";
    say "  文字数: $stats->{chars}";
}

エラーハンドリング

基本的なエラー処理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use Parallel::ForkManager;
use Try::Tiny;
use feature qw(say);

my $pm = Parallel::ForkManager->new(3);

# エラー発生時のコールバック
$pm->run_on_finish(sub {
    my ($pid, $exit_code) = @_;
    
    if ($exit_code != 0) {
        say "PID $pid がエラー終了しました(終了コード: $exit_code)";
    }
});

for my $i (1..5) {
    $pm->start and next;
    
    try {
        # エラーが起こる可能性のある処理
        die "意図的なエラー" if $i == 3;
        
        say "タスク $i 成功";
        $pm->finish(0);  # 成功
    }
    catch {
        say "タスク $i でエラー: $_";
        $pm->finish(1);  # エラー終了
    };
}

$pm->wait_all_children;

タイムアウト処理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use Parallel::ForkManager;
use feature qw(say);

my $pm = Parallel::ForkManager->new(3);

for my $i (1..5) {
    $pm->start and next;
    
    # タイムアウト設定(5秒)
    local $SIG{ALRM} = sub { die "タイムアウト\n" };
    alarm 5;
    
    eval {
        say "タスク $i 開始";
        
        # 時間がかかる処理をシミュレート
        sleep($i * 2);
        
        say "タスク $i 完了";
        alarm 0;  # アラームをキャンセル
    };
    
    if ($@) {
        if ($@ =~ /タイムアウト/) {
            say "タスク $i がタイムアウトしました";
            $pm->finish(1);
        } else {
            die $@;
        }
    }
    
    $pm->finish(0);
}

$pm->wait_all_children;

パフォーマンス最適化

適切なプロセス数の設定

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
use Parallel::ForkManager;
use Sys::Info;
use feature qw(say);

# CPU コア数を取得
my $info = Sys::Info->new;
my $cpu = $info->device('CPU');
my $cpu_count = $cpu->count || 4;  # 取得できない場合は4

say "検出されたCPUコア数: $cpu_count";

# CPUバウンドな処理の場合はコア数と同じ
my $pm = Parallel::ForkManager->new($cpu_count);

# I/Oバウンドな処理の場合はコア数の2倍程度
# my $pm = Parallel::ForkManager->new($cpu_count * 2);

# 処理...

データの分割方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use Parallel::ForkManager;
use List::Util qw(sum);
use feature qw(say);

# 大量のデータを効率的に処理
my @data = (1..10000);
my $chunk_size = 100;
my $pm = Parallel::ForkManager->new(4);

my @results;

$pm->run_on_finish(sub {
    my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
    push @results, @{$data->{results}} if $data;
});

# データをチャンクに分割して処理
for (my $i = 0; $i < @data; $i += $chunk_size) {
    $pm->start and next;
    
    my $end = $i + $chunk_size - 1;
    $end = $#data if $end > $#data;
    
    my @chunk = @data[$i..$end];
    
    # チャンク内の各要素を処理
    my @chunk_results = map { $_ * 2 } @chunk;
    
    $pm->finish(0, { results => \@chunk_results });
}

$pm->wait_all_children;

say "処理結果の総数: ", scalar(@results);
say "合計: ", sum(@results);

メモリ使用量の最適化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use Parallel::ForkManager;
use feature qw(say);

my $pm = Parallel::ForkManager->new(4);

# 大量のデータを扱う場合は、親プロセスでデータを保持せず、
# 子プロセスで必要なデータのみを読み込む

my @file_list = glob('/large/dataset/*.dat');

for my $file (@file_list) {
    $pm->start and next;
    
    # 子プロセスでファイルを読み込む
    open my $fh, '<', $file or die "Cannot open $file: $!";
    
    # 処理(一度に全部メモリに読み込まない)
    while (my $line = <$fh>) {
        # 行ごとに処理
        process_line($line);
    }
    
    close $fh;
    
    $pm->finish;
}

$pm->wait_all_children;

sub process_line {
    my ($line) = @_;
    # 処理...
}

データベース処理での注意点

fork を使う際、データベース接続には注意が必要です:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use Parallel::ForkManager;
use DBI;
use feature qw(say);

my $pm = Parallel::ForkManager->new(4);

# 親プロセスでの接続は作らない
# my $dbh = DBI->connect(...);  # これはNG!

my @tasks = (1..10);

for my $task (@tasks) {
    $pm->start and next;
    
    # 各子プロセスで接続を作成
    my $dbh = DBI->connect(
        'dbi:SQLite:dbname=test.db',
        '', '',
        { RaiseError => 1, AutoCommit => 1 }
    );
    
    # データベース操作
    my $sth = $dbh->prepare('SELECT * FROM users WHERE id = ?');
    $sth->execute($task);
    my $user = $sth->fetchrow_hashref;
    
    say "タスク $task: ユーザー $user->{name}";
    
    # 接続を閉じる
    $dbh->disconnect;
    
    $pm->finish;
}

$pm->wait_all_children;

実用的な完全例:URLリストのダウンロードと処理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use Parallel::ForkManager;
use HTTP::Tiny;
use Try::Tiny;
use feature qw(say);
use Time::HiRes qw(time);

my $start_time = time;

# 設定
my $max_processes = 10;
my $timeout = 30;

# 処理するURLリスト
my @urls = (
    'https://example.com/api/data/1',
    'https://example.com/api/data/2',
    # ... 多数のURL
);

my $pm = Parallel::ForkManager->new($max_processes);

my %results;
my $success_count = 0;
my $error_count = 0;

# 結果収集のコールバック
$pm->run_on_finish(sub {
    my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data) = @_;
    
    return unless defined $data;
    
    $results{$data->{url}} = $data;
    
    if ($data->{success}) {
        $success_count++;
        say "成功: $data->{url} (${data->{size}}バイト)";
    } else {
        $error_count++;
        say "失敗: $data->{url} - $data->{error}";
    }
});

# 各URLを並列処理
for my $url (@urls) {
    $pm->start and next;
    
    my $result = {
        url     => $url,
        success => 0,
    };
    
    try {
        my $http = HTTP::Tiny->new(timeout => $timeout);
        my $response = $http->get($url);
        
        if ($response->{success}) {
            $result->{success} = 1;
            $result->{size} = length($response->{content});
            $result->{status} = $response->{status};
            
            # ここでダウンロードしたコンテンツを処理
            # process_content($response->{content});
        } else {
            $result->{error} = "$response->{status} $response->{reason}";
        }
    }
    catch {
        $result->{error} = $_;
    };
    
    $pm->finish(0, $result);
}

# すべての子プロセスの終了を待つ
$pm->wait_all_children;

my $elapsed = time - $start_time;

# 結果のサマリー
say "\n=== 処理完了 ===";
say "総URL数: ", scalar(@urls);
say "成功: $success_count";
say "失敗: $error_count";
say sprintf("処理時間: %.2f秒", $elapsed);
say sprintf("平均: %.2f秒/URL", $elapsed / @urls);

まとめ

Parallel::ForkManager を使うことで、Perlで簡単に並列処理を実装できます。

重要なポイント

  1. 適切なプロセス数 - CPUコア数を考慮(CPU処理)、I/O待ちならより多く
  2. データの受け渡し - run_on_finish コールバックを活用
  3. エラー処理 - 各子プロセスで適切にエラーをハンドル
  4. リソース管理 - データベース接続などは子プロセスで作成
  5. メモリ効率 - 大量データは親プロセスで保持しない

よくあるユースケース

  • Webスクレイピング
  • 画像/動画の一括変換
  • ログファイルの解析
  • バッチ処理
  • APIリクエストの並列実行

並列処理を適切に使うことで、処理時間を劇的に短縮できます。ただし、オーバーヘッドもあるため、小規模なタスクでは逐次処理の方が速い場合もあります。ベンチマークを取って、実際の効果を確認することが重要です。

comments powered by Disqus
Hugo で構築されています。
テーマ StackJimmy によって設計されています。