Featured image of post コード探偵ロックの事件簿【Competing Consumers】重なる受領印〜増員だけでは消えない二重処理〜

コード探偵ロックの事件簿【Competing Consumers】重なる受領印〜増員だけでは消えない二重処理〜

単一コンシューマで詰まる出荷指示キューを、Competing Consumers と idempotency で安全に並列化します

正午前の出荷管制室

正午の十五分前、出荷管制室の空気は、空調ではどうにもならない種類の熱を帯びていた。

壁のモニタには、配送締切までの残り時間と、出荷指示キューの監視画面が並んでいた。ラベルプリンタは止まっていない。むしろ動き続けている。なのに、一覧の数字だけが悪い方向へ揃っていた。

depth 18,400 / oldest age 38m / duplicate reservation 37

前日の incident review で、別チームが DLQ の件を妙な探偵に片づけてもらった、という話を聞いた。議事メモには「雑居ビルの三階の変人」としか書かれていなかったけれど、今日の私はその曖昧さを笑っていられる状況ではなかった。事務所まで行く時間はない。出荷管制室に来てもらった。

その人は、私が説明を始める前に、机の隅に積まれた未処理伝票を三列に分けていた。受領スタンプ台まで引き寄せている。

「ワトソン君。列を増やしたのに、受領印まで重なったのだね」

「私はその役ではありません」

反射的に訂正してから、私はすぐにモニタを指した。

「depth は下がり始めました。でも duplicate reservation が三十七件です。単一ワーカーでは締切に間に合わなくて、四台に増やしたら今度は倉庫 API への予約が二重になりました」

ロックさんは私のノートPCではなく、伝票の束を見ていた。

「一人では捌ききれず、人数を増やしたら同じ札が二度数えられた。結構。今日は速さそのものではなく、受け渡しの契約を調べよう」

その言い方で、今回の問題がただのスケール不足ではないことが分かった。

Competing Consumers の肝は、ワーカーを増やすことではありません。各メッセージを複数のコンシューマが安全に奪い合えるようにしながら、同じ仕事が再配信されても副作用を二重に起こさないことです。

現場検証 - 一人では詰まり、増やすと重なる

いまの実装は、単一ワーカーが一件ずつ受け取り、外部倉庫 API に予約を送り、その後で ack するだけの構造でした。

Beforeコード: 単一ワーカーが全件を抱える

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package SerialShipmentWorker;
use v5.36;
use Moo;
use Types::Standard qw(InstanceOf);

has queue => (
    is       => 'ro',
    isa      => InstanceOf['DeliveryQueue'],
    required => 1,
);

has gateway => (
    is       => 'ro',
    isa      => InstanceOf['WarehouseGateway'],
    required => 1,
);

sub process_next ($self) {
    my $message = $self->queue->receive or return 'idle';

    my $ok = eval {
        $self->gateway->reserve( $message->{shipment_id} );
        1;
    };

    if ($ok) {
        $self->queue->ack( $message->{message_id} );
        return 'acked';
    }

    $self->queue->release( $message->{message_id} );
    return 'retry';
}

私はこのコードを見せながら、先に弁解を口にしていた。

「単一ワーカーなのは、もともと安全側に倒したかったからです。倉庫 API は遅いし、重複予約の防御も薄い。だから最初は一人で回していました」

「それ自体は悪くない」

ロックさんはあっさり言った。

「一人で持てる量しか来ないならね。だが今日は違う。列は君の慎重さを待ってくれない」

そこまでは、私も分かっていた。問題はその先だった。四台に増やした瞬間、処理速度は上がった。なのに、同じ shipment_id に対して外部倉庫側の受領番号が二つ返るケースが出た。

「でも queue は一件を一人にしか渡さないはずです」

「受け取った瞬間に、事件が終わるのかね」

私は言葉を止めた。

ロックさんはホワイトボードに三つだけ書いた。

claim / reserve / ack

「君たちが一回だと思い込んでいるのは、どれだ」

「……claim です」

「そこだ。claim は一時的な専有権にすぎない。reserve のあと、ack の前に係が倒れれば、その伝票は未受領として戻ってくる。次の係がもう一度押す」

それで、ようやく duplicate reservation の正体が腹に落ちた。

Queue が保証していたのは、その瞬間、そのメッセージを誰が処理中かという排他だけだった。副作用が一回しか起きないことまでは保証していない。そこを埋めないままワーカーだけ増やしたから、問題の性質が「遅い」から「重なる」に変わった。

推理披露 - claim は一回性ではない

私はもう一度、ホワイトボードの三つの単語を見た。

「つまり、増員そのものが危険なのではなくて、reserveack の間が無防備なんですね」

「結構。Competing Consumers は並列化の技法だが、同時に再配信の現実を引き受ける設計でもある」

「再配信は避けられない」

「少なくとも、避けられるものとして振る舞うべきではない」

この切り分けは大事です。Competing Consumers を使うと、各メッセージはその時点で空いているコンシューマに分配されます。だから throughput は上がります。しかし broker 側の claim や lock は、せいぜい「今はこの係が持っている」という一時的な専有権です。

外部 API 呼び出しや DB 更新のような副作用は、その外側にあります。副作用の成功と broker ack の間に障害が挟まれば、メッセージは再配信されます。ここで冪等性がなければ、同じ仕事が二回実行されます。

言い換えると、Competing Consumers は「メッセージを複数人で捌く」パターンですが、「副作用を一回しか起こさない」ことは別途設計しなければなりません。

推理披露 - 競わせるのは worker、重ねないのは idempotency key

ロックさんは、三列に分けた伝票の先頭に、それぞれ小さな付箋を置いた。

「札を付けたまえ、ワトソン君。同じ仕事なら、誰が拾っても同じ札だ」

「idempotency key」

「そう。message の中だけに閉じ込めず、倉庫 API にもその札を見せる」

After では、メッセージ自体に stable な idempotency_key を持たせます。Queue は visible と in-flight を分けて claim_next で分配し、worker は共有の IdempotencyLedger を見ながら処理します。

Afterコード: stable な MessageEnvelope

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package MessageEnvelope;
use v5.36;
use Moo;
use Types::Standard qw(HashRef Int Str);

has message_id => (
    is       => 'ro',
    isa      => Str,
    required => 1,
);

has shipment_id => (
    is       => 'ro',
    isa      => Str,
    required => 1,
);

has idempotency_key => (
    is       => 'ro',
    isa      => Str,
    required => 1,
);

has payload => (
    is       => 'ro',
    isa      => HashRef,
    required => 1,
);

has delivery_attempt => (
    is      => 'rw',
    isa     => Int,
    default => sub { 0 },
);

Afterコード: worker は ledger と queue の契約を守る

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package Role::Worker;
use v5.36;
use Moo::Role;

requires qw(consumer_id queue gateway ledger);

sub process_once ($self) {
    my $message = $self->queue->claim_next( $self->consumer_id ) or return 'idle';

    my $claim = $self->ledger->claim(
        $message->idempotency_key,
        $self->consumer_id,
    );

    if ( $claim eq 'completed' ) {
        $self->queue->ack( $self->consumer_id, $message->message_id );
        return 'duplicate';
    }

    my $ok = eval {
        $self->gateway->reserve(
            $message->shipment_id,
            $message->idempotency_key,
        );
        1;
    };

    if (!$ok) {
        $self->ledger->release( $message->idempotency_key, $self->consumer_id );
        $self->queue->release( $self->consumer_id, $message->message_id );
        return 'retry';
    }

    $self->ledger->complete( $message->idempotency_key );
    $self->queue->ack( $self->consumer_id, $message->message_id );
    return 'acked';
}

ここで効いているのは、単なるワーカー増員ではありません。

  1. Queue は claim_next によって、その瞬間に空いている worker へメッセージを配る
  2. Worker は stable な idempotency_key を共有 ledger に照会する
  3. 外部倉庫 API にも同じ key を渡す
  4. 成功後だけ ack し、失敗時は release する

この設計にすると、問題の性質が変わります。ack 前に障害が起きても、再配信されたメッセージは同じ idempotency_key を持ちます。だから、外部 API 側がその key を見て二回目の副作用を無効化できます。さらに、アプリケーション側の ledger が「この仕事はもう完了した」と覚えていれば、あとから来た duplicate copy は gateway に触れずに ack して終えられます。

「つまり、queue の中だけで重複を防ごうとしない」

「その通り。副作用の相手にも、同じ札を見せるのだよ」

ここで私は、ようやく納得できた。parallelism と idempotency は別々の機能ではない。前者が速さを作り、後者がその速さを壊れないものにする。片方だけでは足りない。

順序が必要なメッセージは別に扱う

ただし、全部を無条件に競わせてよいわけではありません。

私が気にしたのは、同じ注文に対する「出荷確定」と「取消」が同時に並ぶケースでした。これを別 worker が好きな順で処理したら、別の事故になります。

ロックさんはすぐに首を振った。

「何でも競わせるのではない。順序を守るべき仕事は group を切るか、単独係に戻す。Competing Consumers は万能薬ではなく、並列化してよい単位を選ぶための道具だ」

ここも重要です。Competing Consumers は待ち行列の詰まりを解消しますが、ordering を必要とする仕事まで無差別に parallel にしてよい、という話ではありません。独立したメッセージだけを競わせ、順序が必要なものは message group や単一 consumer で守る必要があります。

解決 - テストで throughput と duplicate 抑制を確認する

今回のコード例は、before と after の両方をテストで検証しています。

Beforeテスト: 単一ワーカーは一件ずつしか進まない

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
subtest 'single worker drains one message per poll and backlog remains' => sub {
    my $queue = DeliveryQueue->new;
    enqueue_message($queue, 'msg-001', 'shipment-001');
    enqueue_message($queue, 'msg-002', 'shipment-002');
    enqueue_message($queue, 'msg-003', 'shipment-003');

    my $gateway = WarehouseGateway->new;
    my $worker  = SerialShipmentWorker->new(
        queue   => $queue,
        gateway => $gateway,
    );

    is $worker->process_next, 'acked', 'first poll acks one message';
    is $gateway->total_reservations, 1, 'exactly one shipment reserved';
    is $queue->depth, 2, 'two messages remain in backlog';
};

一回の poll で進むのは一件だけです。これは小さなキューなら十分でも、流量が跳ねた瞬間にそのままボトルネックになります。

Beforeテスト: ack 前障害で同じ仕事が二度走る

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
subtest 'message can be reserved twice when crash happens before ack' => sub {
    my $queue = DeliveryQueue->new;
    enqueue_message($queue, 'msg-101', 'shipment-101');

    my $gateway = WarehouseGateway->new(
        fail_after_first_reserve_for => {
            'shipment-101' => 1,
        },
    );

    my $worker = SerialShipmentWorker->new(
        queue   => $queue,
        gateway => $gateway,
    );

    is $worker->process_next, 'retry', 'first poll releases message for retry';
    is $worker->process_next, 'acked', 'second poll finally acks message';
    is $gateway->reservation_count_for('shipment-101'), 2,
        'shipment was reserved twice';
};

このテストが示しているのは、queue が一件を一人に渡しても、障害が reserveack の間で起きれば同じ仕事は再実行される、という事実です。

Afterテスト: 三人で三件進めつつ、重複を抑える

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
subtest 'three workers drain three messages in one round' => sub {
    my $queue   = DeliveryQueue->new;
    my $gateway = WarehouseGateway->new;
    my $ledger  = IdempotencyLedger->new;

    $queue->enqueue( make_message( 'msg-001', 'shipment-001', 'key-001' ) );
    $queue->enqueue( make_message( 'msg-002', 'shipment-002', 'key-002' ) );
    $queue->enqueue( make_message( 'msg-003', 'shipment-003', 'key-003' ) );
    $queue->enqueue( make_message( 'msg-004', 'shipment-004', 'key-004' ) );

    my @workers = (
        make_consumer( 'worker-A', $queue, $gateway, $ledger ),
        make_consumer( 'worker-B', $queue, $gateway, $ledger ),
        make_consumer( 'worker-C', $queue, $gateway, $ledger ),
    );

    is $_->process_once, 'acked', 'worker acked one message' for @workers;
    is $gateway->total_reservations, 3, 'three reservations completed';
    is $queue->depth, 1, 'one message remains in queue';
};

Competing Consumers にすると、一回のラウンドで複数件を前に進められます。重要なのは、これが「同じコードを三回並べただけ」ではなく、queue 側の契約を claim 単位に切り直していることです。

Afterテスト: 同じ idempotency key の再配信でも副作用は一回

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
subtest 'redelivery with same idempotency key does not repeat side effect' => sub {
    my $queue = DeliveryQueue->new;
    $queue->enqueue( make_message( 'msg-101', 'shipment-101', 'key-101' ) );

    my $gateway = WarehouseGateway->new(
        fail_after_first_reserve_for_key => {
            'key-101' => 1,
        },
    );
    my $ledger = IdempotencyLedger->new;

    my $worker_a = make_consumer( 'worker-A', $queue, $gateway, $ledger );
    my $worker_b = make_consumer( 'worker-B', $queue, $gateway, $ledger );

    is $worker_a->process_once, 'retry', 'first attempt releases the message';
    is $worker_b->process_once, 'acked', 'second attempt acks the same logical work';
    is $gateway->reservation_count_for('shipment-101'), 1,
        'side effect ran only once';
};

この一行が、今回の答えです。

reservation_count_for('shipment-101') == 1

再配信は起きています。worker も変わっています。それでも副作用が一回で止まっているのは、メッセージそのものではなく、仕事の識別子を end-to-end で持ち回っているからです。

実際の broker では acknackrelease の呼び方が違います。ですが、考え方は同じです。

  • claim や lock は一時的な専有権である
  • ack は成功後に返す
  • 失敗時は retry を許す
  • retry されても壊れないよう、stable な idempotency key を副作用の相手まで渡す

この四つが揃って、はじめて安全な Competing Consumers になります。

監視画面の数字が変わるとき

実装を切り替えてから、私はもう一度、出荷管制室の壁を見上げた。

queue depth はすぐには消えなかった。でも、増え続ける列ではなくなっていた。18,40017,920 になり、また少し下がった。duplicate reservation の件数は、それ以上増えなかった。

「速くなった、では足りませんね」

自分でそう口にしたとき、ようやく今回の理解が形になった気がした。

「そうだ。速さは結果にすぎない。先に決めるべきは、同じ仕事が戻ってきても世界が二度変わらぬことだ」

ロックさんは、最初に三列へ分けた伝票を指先で揃えた。

「列を増やすのは、そのあとでよい」

私は監視画面へ視線を戻した。今度は、数字の意味が前よりはっきり読めた。並列化で解決したのではない。並列化しても壊れない契約に直したから、ようやく速くできたのだ。


探偵の調査報告書

容疑(アンチパターン)真実(パターン)証拠(効果)
単一コンシューマ固定Competing Consumers1 ラウンドで複数件を前へ進められる
claim を一回性保証と誤認idempotency key + shared ledger再配信されても副作用は一回に抑えられる
成功前の安易な ack / 失敗時の無秩序な再実行success 後 ack / failure 時 releaseretry を許しつつ queue の整合性を保てる

推理のステップ

  • 単一ワーカーがボトルネックになっているかを、queue depth と oldest age で確認する
  • claim -> side effect -> ack の順に書き出し、障害が入る隙間を特定する
  • message に stable な idempotency_key を持たせる
  • queue を claim_next/ack/release の契約で扱い、worker を複数化する
  • shared ledger と外部 API の両方で、同じ key の重複実行を無害化する
  • 順序が必要な仕事は group を切るか、単一 consumer に残す

ロックより

Competing Consumers は、列を短くする魔術ではない。戻ってきた伝票を、別の係が拾っても世界が二度変わらないようにする作法である。速さはその結果として現れるにすぎない。

外へ向かうメッセージを扱うなら、broker の都合だけで設計してはならない。君のコードの外で起きる副作用まで含めて、同じ仕事をどう見分けるかを決めたまえ。受領印は、一度で十分なのだから。

comments powered by Disqus
Hugo で構築されています。
テーマ StackJimmy によって設計されています。