Featured image of post 動的な組み立て:パイプラインビルダー - Perl/Mooでテキスト処理パイプライン

動的な組み立て:パイプラインビルダー - Perl/Mooでテキスト処理パイプライン

Fluent Interface(流暢なインターフェース)を使ったパイプラインビルダーを作成します。メソッドチェーンで直感的にフィルターを組み立てる、モダンなPerlコーディングスタイルを習得します。

@nqounetです。

前回、フィルターを連鎖させるには逆順に作成する必要がある、という問題点を発見しました。今回は、パイプラインビルダーを導入してこの問題を解決します。

このシリーズについて

前回の問題点

前回のコードでは、パイプラインを以下のように構築していました。

1
2
3
4
5
6
7
# 最後に実行したい処理から先に作成する必要がある
my $uniq = UniqFilter->new();
my $sort = SortFilter->new(next_filter => $uniq);
my $grep = GrepFilter->new(
    pattern     => 'ERROR',
    next_filter => $sort,
);

grep -> sort -> uniq の順に処理したいのに、コードでは uniq -> sort -> grep の順に書く必要があります。これは直感に反しており、大きなパイプラインになるほど混乱を招きます。

パイプラインビルダーの設計

解決策として、パイプラインビルダーを導入します。目指すインターフェースは以下の通りです。

1
2
3
4
5
my $pipeline = PipelineBuilder->new()
    ->grep('ERROR')
    ->sort()
    ->uniq()
    ->build();

これなら処理順序と記述順序が一致し、直感的に理解できます。このような「メソッドを連鎖させて呼び出す」書き方を、Fluent Interface(流れるようなインターフェース)と呼びます。

PipelineBuilderの実装

ロボットアームがパイプラインを組み立てる様子

では、PipelineBuilderを実装していきましょう。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package PipelineBuilder;
use Moo;
use experimental qw(signatures);

has _filters => (
    is      => 'ro',
    default => sub { [] },
);

sub grep ($self, $pattern) {
    push $self->_filters->@*, GrepFilter->new(pattern => $pattern);
    return $self;
}

sub sort ($self) {
    push $self->_filters->@*, SortFilter->new();
    return $self;
}

sub uniq ($self) {
    push $self->_filters->@*, UniqFilter->new();
    return $self;
}

sub build ($self) {
    my @filters = $self->_filters->@*;
    return undef unless @filters;
    
    # 最後のフィルターから逆順に連結
    my $pipeline = pop @filters;
    while (my $filter = pop @filters) {
        $pipeline = $filter->with_next($pipeline);
    }
    
    return $pipeline;
}

1;

ポイントを解説します。

  • _filters 属性でフィルターを順番に蓄積する
  • 各メソッド(grep, sort, uniq)は $self を返すことでメソッドチェーンを可能にする
  • build メソッドで蓄積したフィルターを連結してパイプラインを構築する

with_nextメソッドの追加

ビルダーの build メソッドで使う with_next メソッドをFilterクラスに追加します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package Filter;
use Moo;
use experimental qw(signatures);

has next_filter => (
    is        => 'ro',
    predicate => 'has_next_filter',
);

sub with_next ($self, $next) {
    return ref($self)->new(
        $self->_clone_attributes(),
        next_filter => $next,
    );
}

sub _clone_attributes ($self) {
    # サブクラスでオーバーライドする
    return ();
}

sub process ($self, $lines) {
    my $result = $self->apply($lines);
    
    if ($self->has_next_filter) {
        return $self->next_filter->process($result);
    }
    return $result;
}

sub apply ($self, $lines) {
    return $lines;
}

1;

with_next メソッドは、現在のフィルターと同じ設定で、next_filter だけを変更した新しいインスタンスを作成します。

GrepFilterの_clone_attributesを実装

GrepFilterは pattern 属性を持つので、クローン時にこれを引き継ぐ必要があります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package GrepFilter;
use Moo;
use experimental qw(signatures);
extends 'Filter';

has pattern => (
    is       => 'ro',
    required => 1,
);

sub _clone_attributes ($self) {
    return (pattern => $self->pattern);
}

sub apply ($self, $lines) {
    my $pattern = $self->pattern;
    return [grep { /$pattern/ } @$lines];
}

1;

新しいパイプラインを使ってみる

では、パイプラインビルダーを使ってみましょう。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
my @log_lines = (
    '2026-01-30 10:00:15 ERROR: Database timeout',
    '2026-01-30 10:00:05 ERROR: Connection failed',
    '2026-01-30 10:00:15 ERROR: Database timeout',
    '2026-01-30 10:00:20 INFO: Connection restored',
    '2026-01-30 10:00:05 ERROR: Connection failed',
);

my $pipeline = PipelineBuilder->new()
    ->grep('ERROR')
    ->sort()
    ->uniq()
    ->build();

my $result = $pipeline->process(\@log_lines);

say "=== ERROR行をソートして重複除去 ===";
say $_ for @$result;

実行結果は以下の通りです。

1
2
3
=== ERROR行をソートして重複除去 ===
2026-01-30 10:00:05 ERROR: Connection failed
2026-01-30 10:00:15 ERROR: Database timeout

処理順序とコードの記述順序が一致し、とても読みやすくなりました。

パイプラインの動的な変更

ビルダーを使うと、条件に応じてパイプラインを動的に構築することも簡単です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
sub create_pipeline ($require_sort, $require_uniq) {
    my $builder = PipelineBuilder->new()->grep('ERROR');
    
    $builder->sort() if $require_sort;
    $builder->uniq() if $require_uniq;
    
    return $builder->build();
}

# ソートだけ
my $p1 = create_pipeline(1, 0);

# ソートと重複除去
my $p2 = create_pipeline(1, 1);

# 何もしない(grepだけ)
my $p3 = create_pipeline(0, 0);

実行時の条件に応じて異なるパイプラインを構築できます。これはDecoratorパターンの真価で、機能の追加・削除が動的に行えるということを意味します。

Fluent Interfaceのメリット

Fluent Interfaceには以下のメリットがあります。

  • 可読性が高い(英語の文章のように読める)
  • メソッドの順序が処理順序と一致する
  • IDEの補完が効きやすい
  • 設定の変更が容易

ただし、デバッグが難しくなる場合があるというデメリットもあります。メソッドチェーンの途中でエラーが発生すると、どこで問題が起きたかわかりにくくなることがあります。

今回のポイント

  • Fluent Interface(流れるようなインターフェース)の考え方を学んだ
  • PipelineBuilderクラスを作成し、直感的なパイプライン構築を実現した
  • with_next メソッドでフィルターのクローンと連結を実装した
  • 動的なパイプライン構築の柔軟性を確認した

今回の完成コード

以下が今回作成したコードの完成版です。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env perl
# 言語: perl
# バージョン: 5.36以上
# 依存: Moo

use v5.36;

# === Filter(基底クラス) ===
package Filter {
    use Moo;
    use experimental qw(signatures);

    has next_filter => (
        is        => 'ro',
        predicate => 'has_next_filter',
    );

    sub with_next ($self, $next) {
        return ref($self)->new(
            $self->_clone_attributes(),
            next_filter => $next,
        );
    }

    sub _clone_attributes ($self) {
        return ();
    }

    sub process ($self, $lines) {
        my $result = $self->apply($lines);
        
        if ($self->has_next_filter) {
            return $self->next_filter->process($result);
        }
        return $result;
    }

    sub apply ($self, $lines) {
        return $lines;
    }
}

# === GrepFilter ===
package GrepFilter {
    use Moo;
    use experimental qw(signatures);
    extends 'Filter';

    has pattern => (
        is       => 'ro',
        required => 1,
    );

    sub _clone_attributes ($self) {
        return (pattern => $self->pattern);
    }

    sub apply ($self, $lines) {
        my $pattern = $self->pattern;
        return [grep { /$pattern/ } @$lines];
    }
}

# === SortFilter ===
package SortFilter {
    use Moo;
    use experimental qw(signatures);
    extends 'Filter';

    sub apply ($self, $lines) {
        return [sort @$lines];
    }
}

# === UniqFilter ===
package UniqFilter {
    use Moo;
    use experimental qw(signatures);
    extends 'Filter';

    sub apply ($self, $lines) {
        my %seen;
        return [grep { !$seen{$_}++ } @$lines];
    }
}

# === PipelineBuilder ===
package PipelineBuilder {
    use Moo;
    use experimental qw(signatures);

    has _filters => (
        is      => 'ro',
        default => sub { [] },
    );

    sub grep ($self, $pattern) {
        push $self->_filters->@*, GrepFilter->new(pattern => $pattern);
        return $self;
    }

    sub sort ($self) {
        push $self->_filters->@*, SortFilter->new();
        return $self;
    }

    sub uniq ($self) {
        push $self->_filters->@*, UniqFilter->new();
        return $self;
    }

    sub build ($self) {
        my @filters = $self->_filters->@*;
        return undef unless @filters;
        
        my $pipeline = pop @filters;
        while (my $filter = pop @filters) {
            $pipeline = $filter->with_next($pipeline);
        }
        
        return $pipeline;
    }
}

# === メイン処理 ===
package main {
    my @log_lines = (
        '2026-01-30 10:00:15 ERROR: Database timeout',
        '2026-01-30 10:00:05 ERROR: Connection failed',
        '2026-01-30 10:00:15 ERROR: Database timeout',
        '2026-01-30 10:00:20 INFO: Connection restored',
        '2026-01-30 10:00:05 ERROR: Connection failed',
    );

    my $pipeline = PipelineBuilder->new()
        ->grep('ERROR')
        ->sort()
        ->uniq()
        ->build();

    my $result = $pipeline->process(\@log_lines);

    say "=== ERROR行をソートして重複除去 ===";
    say $_ for @$result;
}

次回予告

次回は、集約処理(Aggregator)の概念を導入します。行数カウントや統計情報の出力など、これまでとは異なる種類の処理をパイプラインに追加していきます。

お楽しみに!

comments powered by Disqus
Hugo で構築されています。
テーマ StackJimmy によって設計されています。