Featured image of post 集約処理:行数カウント・統計情報 - Perl/Mooでテキスト処理パイプライン

集約処理:行数カウント・統計情報 - Perl/Mooでテキスト処理パイプライン

パイプラインにAggregator(集約処理)を追加し、行数カウントや統計情報を出力できるようにします。CountFilterやStatsFilterの実装を通して、データ集計のパターンを学びます。

@nqounetです。

前回は、PipelineBuilderを導入して直感的なパイプライン構築を実現しました。今回は、これまでとは少し異なる種類の処理、集約処理(Aggregator)をパイプラインに追加します。

このシリーズについて

集約処理とは

これまで作成したフィルターは、入力行を加工して出力行として返すものでした。

  • GrepFilter: 条件に合う行を抽出する
  • SortFilter: 行を並び替える
  • UniqFilter: 重複する行を除去する

これらは「行を処理して行を返す」という点で共通しています。

一方、Unixには wc -l(行数カウント)や awk の統計機能のように、複数の行を集約して別の情報を生成するコマンドもあります。

1
2
3
4
5
# 行数をカウント
cat access.log | grep ERROR | wc -l

# 各行の出現回数をカウント
cat access.log | sort | uniq -c

今回は、このような集約処理をパイプラインに追加します。

CountFilterの実装

データ集約の概念図:複数のストリームがファネルを通って統計に変換される

まずは行数をカウントするCountFilterを作成します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
package CountFilter;
use Moo;
use experimental qw(signatures);
extends 'Filter';

sub apply ($self, $lines) {
    my $count = scalar @$lines;
    return ["$count lines"];
}

1;

このフィルターは入力行の数をカウントし、結果を1行の文字列として返します。

StatsFilterの実装

次に、各行の出現回数をカウントするStatsFilterを作成します。これはUnixの sort | uniq -c と同等の機能です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package StatsFilter;
use Moo;
use experimental qw(signatures);
extends 'Filter';

sub apply ($self, $lines) {
    my %count;
    $count{$_}++ for @$lines;
    
    my @result;
    for my $line (sort { $count{$b} <=> $count{$a} } keys %count) {
        push @result, sprintf("%4d %s", $count{$line}, $line);
    }
    
    return \@result;
}

1;

このフィルターはハッシュを使って各行の出現回数をカウントし、出現回数の多い順にソートして返します。

PipelineBuilderに追加

新しいフィルターをPipelineBuilderに追加します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package PipelineBuilder;
use Moo;
use experimental qw(signatures);

has _filters => (
    is      => 'ro',
    default => sub { [] },
);

sub grep ($self, $pattern) {
    push $self->_filters->@*, GrepFilter->new(pattern => $pattern);
    return $self;
}

sub sort ($self) {
    push $self->_filters->@*, SortFilter->new();
    return $self;
}

sub uniq ($self) {
    push $self->_filters->@*, UniqFilter->new();
    return $self;
}

sub count ($self) {
    push $self->_filters->@*, CountFilter->new();
    return $self;
}

sub stats ($self) {
    push $self->_filters->@*, StatsFilter->new();
    return $self;
}

sub build ($self) {
    my @filters = $self->_filters->@*;
    return undef unless @filters;
    
    my $pipeline = pop @filters;
    while (my $filter = pop @filters) {
        $pipeline = $filter->with_next($pipeline);
    }
    
    return $pipeline;
}

1;

countstats メソッドを追加しました。

使ってみる

では、新しいフィルターを使ってみましょう。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
my @log_lines = (
    '2026-01-30 10:00:05 ERROR: Connection failed',
    '2026-01-30 10:00:10 INFO: Retrying connection',
    '2026-01-30 10:00:15 ERROR: Database timeout',
    '2026-01-30 10:00:20 ERROR: Connection failed',
    '2026-01-30 10:00:25 INFO: Connection restored',
    '2026-01-30 10:00:30 ERROR: Database timeout',
    '2026-01-30 10:00:35 ERROR: Database timeout',
);

# ERRORの行数をカウント
my $count_pipeline = PipelineBuilder->new()
    ->grep('ERROR')
    ->count()
    ->build();

say "=== ERROR行数 ===";
say $_ for $count_pipeline->process(\@log_lines)->@*;

# ERRORの種類別に集計
my $stats_pipeline = PipelineBuilder->new()
    ->grep('ERROR')
    ->stats()
    ->build();

say "";
say "=== ERROR種類別集計 ===";
say $_ for $stats_pipeline->process(\@log_lines)->@*;

実行結果は以下の通りです。

1
2
3
4
5
6
7
8
9
=== ERROR行数 ===
5 lines

=== ERROR種類別集計 ===
   3 2026-01-30 10:00:35 ERROR: Database timeout
   1 2026-01-30 10:00:05 ERROR: Connection failed
   1 2026-01-30 10:00:15 ERROR: Database timeout
   1 2026-01-30 10:00:20 ERROR: Connection failed
   1 2026-01-30 10:00:30 ERROR: Database timeout

あれ?結果がおかしいですね。同じ「Database timeout」や「Connection failed」が別の行としてカウントされています。

問題:タイムスタンプの影響

問題は、同じエラーでもタイムスタンプが違うため、別の行として扱われていることです。

エラーの種類だけで集計したい場合は、タイムスタンプを除去する必要があります。

ExtractFilterの追加

特定のパターンを抽出するExtractFilterを作成します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package ExtractFilter;
use Moo;
use experimental qw(signatures);
extends 'Filter';

has pattern => (
    is       => 'ro',
    required => 1,
);

sub _clone_attributes ($self) {
    return (pattern => $self->pattern);
}

sub apply ($self, $lines) {
    my $pattern = $self->pattern;
    my @result;
    
    for my $line (@$lines) {
        if ($line =~ /$pattern/) {
            push @result, $1 // $&;
        }
    }
    
    return \@result;
}

1;

このフィルターは、正規表現にマッチした部分(キャプチャグループがあればその内容)を抽出します。

PipelineBuilderに追加

1
2
3
4
sub extract ($self, $pattern) {
    push $self->_filters->@*, ExtractFilter->new(pattern => $pattern);
    return $self;
}

改善版で試す

1
2
3
4
5
6
7
8
9
# エラーメッセージ部分だけを抽出して集計
my $extract_pipeline = PipelineBuilder->new()
    ->grep('ERROR')
    ->extract('ERROR: (.+)')
    ->stats()
    ->build();

say "=== エラー種類別集計(改善版) ===";
say $_ for $extract_pipeline->process(\@log_lines)->@*;

実行結果は以下の通りです。

1
2
3
=== エラー種類別集計(改善版) ===
   3 Database timeout
   2 Connection failed

期待通りの結果が得られました。

パイプラインの柔軟性

今回の例で、パイプラインの柔軟性が実感できたのではないでしょうか。

	flowchart LR
    Input["ログデータ"] --> Grep["GrepFilter\n(ERROR)"]
    Grep --> Extract["ExtractFilter\n(メッセージ抽出)"]
    Extract --> Stats["StatsFilter\n(集計)"]
    Stats --> Output["統計結果"]

各フィルターは単一の責任を持ち、それらを組み合わせることで複雑な処理を実現しています。これはSOLID原則の単一責任の原則(SRP)に従った設計です。

今回のポイント

  • 集約処理(Aggregator)の概念を学んだ
  • CountFilterで行数カウントを実装した
  • StatsFilterで出現頻度の集計を実装した
  • ExtractFilterでパターン抽出を実装した
  • フィルターを組み合わせて複雑な処理を実現した

今回の完成コード

以下が今回作成したコードの完成版です。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env perl
# 言語: perl
# バージョン: 5.36以上
# 依存: Moo

use v5.36;

# === Filter(基底クラス) ===
package Filter {
    use Moo;
    use experimental qw(signatures);

    has next_filter => (
        is        => 'ro',
        predicate => 'has_next_filter',
    );

    sub with_next ($self, $next) {
        return ref($self)->new(
            $self->_clone_attributes(),
            next_filter => $next,
        );
    }

    sub _clone_attributes ($self) {
        return ();
    }

    sub process ($self, $lines) {
        my $result = $self->apply($lines);
        
        if ($self->has_next_filter) {
            return $self->next_filter->process($result);
        }
        return $result;
    }

    sub apply ($self, $lines) {
        return $lines;
    }
}

# === GrepFilter ===
package GrepFilter {
    use Moo;
    use experimental qw(signatures);
    extends 'Filter';

    has pattern => (
        is       => 'ro',
        required => 1,
    );

    sub _clone_attributes ($self) {
        return (pattern => $self->pattern);
    }

    sub apply ($self, $lines) {
        my $pattern = $self->pattern;
        return [grep { /$pattern/ } @$lines];
    }
}

# === SortFilter ===
package SortFilter {
    use Moo;
    use experimental qw(signatures);
    extends 'Filter';

    sub apply ($self, $lines) {
        return [sort @$lines];
    }
}

# === UniqFilter ===
package UniqFilter {
    use Moo;
    use experimental qw(signatures);
    extends 'Filter';

    sub apply ($self, $lines) {
        my %seen;
        return [grep { !$seen{$_}++ } @$lines];
    }
}

# === CountFilter ===
package CountFilter {
    use Moo;
    use experimental qw(signatures);
    extends 'Filter';

    sub apply ($self, $lines) {
        my $count = scalar @$lines;
        return ["$count lines"];
    }
}

# === StatsFilter ===
package StatsFilter {
    use Moo;
    use experimental qw(signatures);
    extends 'Filter';

    sub apply ($self, $lines) {
        my %count;
        $count{$_}++ for @$lines;
        
        my @result;
        for my $line (sort { $count{$b} <=> $count{$a} } keys %count) {
            push @result, sprintf("%4d %s", $count{$line}, $line);
        }
        
        return \@result;
    }
}

# === ExtractFilter ===
package ExtractFilter {
    use Moo;
    use experimental qw(signatures);
    extends 'Filter';

    has pattern => (
        is       => 'ro',
        required => 1,
    );

    sub _clone_attributes ($self) {
        return (pattern => $self->pattern);
    }

    sub apply ($self, $lines) {
        my $pattern = $self->pattern;
        my @result;
        
        for my $line (@$lines) {
            if ($line =~ /$pattern/) {
                push @result, $1 // $&;
            }
        }
        
        return \@result;
    }
}

# === PipelineBuilder ===
package PipelineBuilder {
    use Moo;
    use experimental qw(signatures);

    has _filters => (
        is      => 'ro',
        default => sub { [] },
    );

    sub grep ($self, $pattern) {
        push $self->_filters->@*, GrepFilter->new(pattern => $pattern);
        return $self;
    }

    sub sort ($self) {
        push $self->_filters->@*, SortFilter->new();
        return $self;
    }

    sub uniq ($self) {
        push $self->_filters->@*, UniqFilter->new();
        return $self;
    }

    sub count ($self) {
        push $self->_filters->@*, CountFilter->new();
        return $self;
    }

    sub stats ($self) {
        push $self->_filters->@*, StatsFilter->new();
        return $self;
    }

    sub extract ($self, $pattern) {
        push $self->_filters->@*, ExtractFilter->new(pattern => $pattern);
        return $self;
    }

    sub build ($self) {
        my @filters = $self->_filters->@*;
        return undef unless @filters;
        
        my $pipeline = pop @filters;
        while (my $filter = pop @filters) {
            $pipeline = $filter->with_next($pipeline);
        }
        
        return $pipeline;
    }
}

# === メイン処理 ===
package main {
    my @log_lines = (
        '2026-01-30 10:00:05 ERROR: Connection failed',
        '2026-01-30 10:00:10 INFO: Retrying connection',
        '2026-01-30 10:00:15 ERROR: Database timeout',
        '2026-01-30 10:00:20 ERROR: Connection failed',
        '2026-01-30 10:00:25 INFO: Connection restored',
        '2026-01-30 10:00:30 ERROR: Database timeout',
        '2026-01-30 10:00:35 ERROR: Database timeout',
    );

    # ERRORの行数をカウント
    my $count_pipeline = PipelineBuilder->new()
        ->grep('ERROR')
        ->count()
        ->build();

    say "=== ERROR行数 ===";
    say $_ for $count_pipeline->process(\@log_lines)->@*;

    say "";

    # エラーメッセージ部分だけを抽出して集計
    my $extract_pipeline = PipelineBuilder->new()
        ->grep('ERROR')
        ->extract('ERROR: (.+)')
        ->stats()
        ->build();

    say "=== エラー種類別集計 ===";
    say $_ for $extract_pipeline->process(\@log_lines)->@*;
}

次回予告

次回は、実践的なアクセスログ解析パイプラインを構築します。Apache/Nginxのアクセスログから、アクセス数の多いURLや時間帯別のアクセス数など、実務で役立つ解析を行いましょう。

お楽しみに!

comments powered by Disqus
Hugo で構築されています。
テーマ StackJimmy によって設計されています。