Featured image of post コード探偵ロックの事件簿【Outbox】届かなかった手紙〜二重書き込みが生んだ沈黙の障害〜

コード探偵ロックの事件簿【Outbox】届かなかった手紙〜二重書き込みが生んだ沈黙の障害〜

DB更新とイベント送信の非原子的な二重書き込みを、Outboxパターンで解消する方法をPerl/Mooで解説。Sagaの各ステップに信頼性の土台を敷く続編回。

事件の発端 — 赤い疑問符

2週間ぶりに、あの会議室にいる。

前回ここに来たときは、徹夜明けだった。冷めたコーヒーと、300件の幽霊注文と、ロックさんが描いたホワイトボードの図。在庫・決済・配送の3つの箱、それぞれの下に伸びる逆向きの矢印——Sagaの補償トランザクション。

あのホワイトボードの図は、まだ残っている。消さなかった。

自分が描き足した部分もある。各箱の間の矢印に、赤いマーカーで「→ event?」と書き込んだ。余白には Role::SagaStep の実装メモ。あの日ロックさんが帰った後、すぐにエディタを開いて書き始めたコードの断片だ。

ロックさんに言われた通り、Sagaを組んだ。SagaStep のRoleを定義し、在庫確保・決済・配送の各ステップに executecompensate のペアを実装した。SagaOrchestrator も動く。テストも通る。

——結合テスト環境に投入するまでは、すべてがうまくいっていた。

在庫確保の execute がDBコミットに成功した直後、決済サービスへの通知イベントの送信が失敗した。メッセージキューが一時的に落ちていた。DBには「在庫確保済み」と書いてある。しかしその事実を決済サービスは知らない。Orchestratorも次のステップに進めない。

300件の幽霊注文の小規模版。またか、と思った。

今回は自分からロックさんにメールした。前回CTOが請求書送付のために交換したアドレスを、CTOから教えてもらった。返信は短かった。

例のホワイトボードの前で待っていたまえ。火曜15時。——LCI

ドアがノックされる。

ロックさんが入ってきた。前回と同じツイードのジャケットにジーンズ。ただし今回は鞄が小さい——前回持ってきた1987年の論文の束は入っていない。

ロックさんの視線がまっすぐホワイトボードに向かう。足が止まる。

「……消さなかったのか」

「自分が描き足した部分もあります」

ロックさんが近づき、赤字のメモを読む。「→ event?」を指でなぞった。

「ここが新しい事件現場か」

「はい」

自分はノートPCの画面をロックさんに向けた。テスト結果のログが並んでいる。ReserveInventory#execute: DB commit OK の直後に、赤い文字。EventPublisher: connection refused

「在庫の確保は成功しています。DBにはコミット済みです。でもその直後に、決済サービスへ通知するイベントの送信が失敗しました。メッセージキューが一時的に落ちていて」

「イベントは?」

「消えました。DBは更新済み。イベントは送られていない」

ロックさんが椅子に座った。前回は立ったまま話していた。今回は腰を据える構えだ。

「前回は鎖を切った。2PCをやめて、各ステップを独立させた。補償も定義した。だが——」

「ステップの出口が信頼できない。DBに書いたことと、外に伝えたことがズレます」

ロックさんの目が少し細まった。前回はこの速さで問題を言語化できなかった男が、今日は自分で答えを持ってきている。

「ワトソン君、君は2つの場所に同時に手紙を出そうとしている」

ロックさんがホワイトボードに立ち、赤字メモの隣に新しい図を描き始めた。

左に「DB」の箱。右に「MQ」の箱。真ん中にアプリケーション。アプリケーションからDBへの矢印に「✓」、MQへの矢印に「✗」。

「データベースとメッセージキュー。2つの宛先に同じ処理で書こうとしている。1つ目の手紙——DB更新——は届いた。2つ目の手紙——イベント送信——は届かなかった。しかし差出人は1つ目が届いた時点で満足して立ち去った」

「……二重書き込み」

Dual Write問題だ。2つの異なるシステムへの書き込みは、どちらか一方が失敗する可能性を常に孕んでいる。そして失敗した側は——沈黙する。エラーを返す相手がいない」

現場検証 — 二通の手紙、二つの宛先

ロックさんがホワイトボードの前に立ったまま、自分のノートPCに視線を移した。

「前回のコードを見せてくれ。ReserveInventoryexecute だ」

画面を向ける。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
sub execute ($self, $context) {
    my $result = $self->inventory_service->reserve(
        $context->{order_id},
        $context->{items},
    );
    $context->{reservation_id} = $result->{reservation_id};

    # イベント送信
    $self->event_publisher->publish('inventory.reserved', {
        order_id       => $context->{order_id},
        reservation_id => $result->{reservation_id},
    });

    return $context;
}

「2行ある」ロックさんが指を立てた。「reserve でDBを更新し、publish でイベントを送る。この2つは同じトランザクションかね?」

「……違います。reserve がコミットされた後に publish を呼んでいます」

コミットの後。つまり、DBの書き込みは確定済みだ。そこから別のシステムに手を伸ばす。このとき——」

「メッセージキューが落ちていたら、イベントは消えます」

「消える。DBには『在庫確保済み』と書いてある。しかしその事実を知っているのはDBだけだ。決済サービスは知らない。Orchestratorも知らない」

ロックさんがホワイトボードに戻り、先ほどの「DB ✓ / MQ ✗」の図の下に時系列を描いた。

1
2
3
t1: DB commit (inventory reserved) ✓
t2: publish event → MQ down ✗
t3: execute returns → Orchestrator thinks "step succeeded"

「t3が致命的だ。Orchestratorは execute が return した時点で成功と見なす。しかしイベントは届いていない。次のステップはトリガーされない」

ロックさんがマーカーのキャップを外さずに、図を指した。

成功したはずの処理が、外からは見えない。これが二重書き込みの本質だ。エラーが起きたのではない。起きるべきことが起きなかった

自分は腕を組んだ。前回の徹夜明けの猫背ではなく、考え込む姿勢だ。

「……前回、ロックさんは言いました。『問題の性質を変えろ』と。2PCは全体がハングする。Sagaはそれを局所故障に変える。じゃあ今回は——書き込み先が2つあることが問題なんだから——」

ロックさんが黙っている。前回は自分から答えを出していた。今回は待っている。

「……1つにすればいい? イベントの送信も……DBに書けばいいんですか。メッセージキューに送る代わりに、同じDBの同じトランザクションでイベントを記録する

一拍、間があった。

「その直感は正しい」

ロックさんの声に、前回にはなかった色があった。

「——ワトソン君、君は前回より速い」

小さく息をついた。自嘲ではなかった。「やっぱりそうか」という確認だ。

「DB更新もイベント記録も、同じデータベースの同じトランザクションに入れる。1つのコミットで両方が確定する。1つのロールバックで両方が消える。これがOutboxパターンだ」

ロックさんがホワイトボードの「DB ✓ / MQ ✗」の図の隣に、新しい図を描いた。DBの箱の中に小さなテーブルを描き、「OUTBOX」とラベルをつける。

「手紙を直接ポストに投函するのではなく、自分の机の引き出しにまず入れる。引き出し——Outbox——はDBの中にある。ビジネスデータと同じ場所だ。だからDBトランザクションの中で一緒に書ける」

推理披露 — 同じ机に書け

Outboxメッセージの形を決める

「まず、引き出しに入れる手紙の形を決める」

ロックさんが自分のノートPCの画面に手を伸ばした。キーボードを借りてもいいか、とは聞かない。前回もそうだった。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
package OutboxMessage;
use Moo;
use Types::Standard qw(Str HashRef Bool);

has id             => (is => 'ro', isa => Str, required => 1);
has aggregate_type => (is => 'ro', isa => Str, required => 1);
has aggregate_id   => (is => 'ro', isa => Str, required => 1);
has event_type     => (is => 'ro', isa => Str, required => 1);
has payload        => (is => 'ro', isa => HashRef, required => 1);
has published      => (is => 'rw', isa => Bool, default => 0);

aggregate_typeaggregate_id は何ですか」

「この手紙が誰についての手紙かを示す。注文ID 42 の在庫確保イベントなら、aggregate_typeOrderaggregate_id42 だ。配達人がどのトピックに配達すべきかを決める情報でもある」

published はリレーが送信済みかどうかのフラグですね」

「その通り」

OutboxWriter — 手紙を書く契約

「次に、手紙を引き出しに入れる役割を定義する」

1
2
3
package Role::OutboxWriter;
use Moo::Role;
requires 'save_outbox_message';

requires 'save_outbox_message'。このRoleを取り込むクラスは、Outboxにメッセージを保存するメソッドを実装しなければならない」

「前回の Role::SagaSteprequires qw(execute compensate) と同じ構造ですね」

「同じ原則だ。契約を先に定義し、実装を後から縛る

UnitOfWork — トランザクションの核

「ここが核心だ。ビジネス処理とOutbox書き込みを1つのトランザクションで包む

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package UnitOfWork;
use Moo;
use Types::Standard qw(ArrayRef Object);

has repository => (is => 'ro', isa => Object, required => 1);
has _pending   => (is => 'ro', isa => ArrayRef, default => sub { [] });

sub add_message ($self, $message) {
    push $self->_pending->@*, $message;
}

sub commit ($self, $business_work) {
    $self->repository->begin_work;
    eval {
        $business_work->($self->repository);

        for my $msg ($self->_pending->@*) {
            $self->repository->save_outbox_message($msg);
        }

        $self->repository->commit;
        @{$self->_pending} = ();
        1;
    } or do {
        my $err = $@;
        $self->repository->rollback;
        die $err;
    };
}

画面を見つめた。しばらく黙った。

「……begin_work の中に、ビジネスロジックとOutboxのINSERTが両方入っている」

「そうだ。$business_work->($self->repository) でビジネスデータを更新し、続けてOutboxテーブルにメッセージをINSERTする。どちらかが失敗すれば rollback。両方成功すれば commit。——1つのコミットで、2つのことが確定する

「DBの外には何も書いていない」

「書いていない。メッセージキューには触れていない。ネットワークを跨いでいない。閉じた世界の中で完結している。だからRDBのACIDがそのまま使える」

腕を組んでいた手をほどいた。前に乗り出す。

「これなら……在庫確保のDBコミットが成功した時点で、イベントも必ずOutboxに入っている。逆に、在庫確保が失敗すれば、イベントもロールバックされる」

手紙は必ず引き出しに入る。引き出しは机の一部で、机はトランザクションで守られている。手紙だけが消えることはない

ReserveInventory の書き直し

「では、前回の ReserveInventory を書き直そう」

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sub execute ($self, $context) {
    my $uow = $context->{unit_of_work};

    $uow->add_message(OutboxMessage->new(
        id             => create_uuid(),
        aggregate_type => 'Order',
        aggregate_id   => $context->{order_id},
        event_type     => 'inventory.reserved',
        payload        => {
            order_id => $context->{order_id},
            items    => $context->{items},
        },
    ));

    $uow->commit(sub ($repo) {
        $repo->insert_reservation(
            $context->{order_id},
            $context->{items},
            'confirming',
        );
    });

    return $context;
}

event_publisher->publish がなくなった」

「代わりに $uow->add_message$uow->commit だ」

publish はDBの外に手を伸ばしていた。add_message はDBの中に手紙を置くだけ。実際の送信は——」

「——別の誰かがやる」

「その通り。書く人と届ける人を分ける。原子性の問題と配信の問題を分離する」

PollingPublisher — 郵便配達人

「Outboxに書いたメッセージは、いつ、誰が送るんですか」

郵便配達人だ」

ロックさんがホワイトボードのOutboxテーブルの隣に、時計のマークと棒人間を描いた。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package PollingPublisher;
use Moo;
use Types::Standard qw(Object Int);

has repository       => (is => 'ro', isa => Object, required => 1);
has event_publisher  => (is => 'ro', isa => Object, required => 1);
has batch_size       => (is => 'ro', isa => Int, default => 100);

sub publish_pending ($self) {
    my @messages = $self->repository->find_unpublished(
        $self->batch_size,
    );

    for my $msg (@messages) {
        $self->event_publisher->publish(
            $msg->event_type,
            $msg->payload,
        );
        $msg->published(1);
        $self->repository->mark_published($msg->id);
    }

    return scalar @messages;
}

「別プロセスとして定期的に publish_pending を実行する。Outboxテーブルから未送信のメッセージをSELECTし、メッセージブローカーにpublishした後、published = TRUE に更新する」

「定期的に……ポーリングですか。遅延が出ますよね」

「出る。ポーリング間隔が5秒なら、最悪5秒の遅延がある」

ロックさんがこちらを見た。

「だが考えてみろ。前回の2PCでは決済サービスのタイムアウトで全体が止まった。5秒の遅延と全体停止、どちらを取る?」

「……5秒のほうがましです」

「ましどころか、桁が違う」

事件解決 — 消失から重複へ

「ポーリングがメッセージを送った直後にクラッシュしたら——published フラグを更新する前にプロセスが落ちたら——同じメッセージがもう1回送られますよね」

ロックさんが頷いた。

「送られる。2回以上送られる可能性がある」

眉を寄せた。せっかく原子性を手に入れたのに、今度は重複か。

「Outboxはat-least-once deliveryだ。最低1回は届く。しかし、ちょうど1回は保証しない」

「それは——」

「問題の性質を見ろ、ワトソン君」

ロックさんがホワイトボードに2行書いた。

1
2
Dual Write: メッセージが消える → 検知困難 → 復旧不可能
Outbox:     メッセージが重複する → 検知容易 → 冪等性で対処可能

「消えた手紙は探せない。しかし同じ手紙が2通届いたら、1通を捨てればいい

しばらく黙った。前回の会話が頭に浮かんだ。「補償が失敗したらどうなるんですか」と聞いたとき、ロックさんは「問題の性質が変わる」と答えた。全壊が局所故障に変わると。

「……前回も同じことを言っていましたね。問題がゼロになるのではなく、性質が変わる」

「覚えていたか」

ロックさんの声にわずかな温度があった。

「Sagaは『全壊を局所故障に変える』。Outboxは『消失を重複に変える』。完璧なアーキテクチャは存在しない。しかし、対処可能な問題に変換することはできる

「冪等性は——消費者側で、メッセージIDで重複を弾けばいい。OutboxMessageid がそのためにある」

「その通りだ」

ホワイトボードを見ていた。前回ロックさんが描いたSagaの図、自分が足した赤字メモ、そして今日ロックさんが描いたOutboxの図。3つのレイヤーが重なっている。

「CDC——Change Data Captureを使えば、ポーリングの遅延はなくなりますよね。DBのトランザクションログを監視して」

「知っているのか」

「SREなので。PostgreSQLのWAL、Debeziumは運用したことがあります」

ロックさんが少し感心した様子で目を上げた。ホワイトボードにポーリングとCDCの比較を簡潔に描いた。

1
2
Polling:  SELECT → publish → UPDATE  (単純、遅延あり)
CDC:      WAL → Debezium → publish   (リアルタイム、運用複雑)

「ポーリングは単純だが遅延がある。CDCはリアルタイムに近いがDB固有の仕組みに依存する。どちらを選ぶかは、君のシステムが許容する遅延と運用の複雑さ次第だ。正解はない。トレードオフがあるだけだ」

「今のシステムなら……ポーリングで十分です。秒単位の遅延は許容範囲だし、運用対象は少ないほうがいい」

「賢明な判断だ」

ロックさんが立ち上がり、ジャケットの襟を整えた。

「前回の事件は『鎖を切る方法』だった。今回は『手紙を確実に届ける方法』だ。——Sagaの各ステップに、Outboxという土台を入れたまえ。各ステップが独立して実行・補償でき、かつイベントの発行が原子的に保証される。それが全体の信頼性になる」

「ありがとうございます」

ドアに向かいながら、ロックさんが振り返った。

「次の壁はいつ来るかね、ワトソン君」

「……来ないに越したことはないんですが」

「壁があるうちは設計が進んでいる証拠だ。壁がなくなったときが——怖いんだよ。本当は」

ロックさんが出ていった。

会議室に一人残った。

ホワイトボードに向かう。前回ロックさんが描いた3つの箱——在庫・決済・配送。各箱の下に逆向き矢印。自分が足した赤字の「→ event?」。

赤字を消した。

代わりに、各箱の横に小さな四角を描く。「OUTBOX」とラベルをつけた。Outboxから伸びる矢印が次の箱に向かう。

これで全部のステップに土台ができた。在庫を確保し、Outboxに手紙を入れる。配達人が届ける。決済が始まる。決済を処理し、Outboxに手紙を入れる。配達人が届ける。配送が始まる。——全部つながった。

エディタに向かった。前回は package Role::SagaStep; と打った。

今日は:

1
package OutboxWriter;

まず、手紙を書く仕組みから。


探偵の調査報告書

容疑(アンチパターン)真実(パターン)証拠(効果)
二重書き込み(Dual Write) — DB更新とイベント送信が非原子的で、送信失敗時にイベントが消失するOutbox Pattern — イベントをDBの同一トランザクション内でOutboxテーブルに記録し、別プロセスが非同期で転送するDB更新とイベント記録の原子性が保証される。メッセージ消失が構造的に不可能になる

推理のステップ

  1. 二重書き込みを特定する — DB更新とメッセージブローカーへの送信が別々のシステムに行われていないか確認する
  2. Outboxテーブルを作る — ビジネスDBに outbox テーブルを追加する。カラムは id, aggregate_type, aggregate_id, event_type, payload, published
  3. UnitOfWorkで包む — ビジネスロジックとOutbox INSERTを同一トランザクションで実行する仕組みを作る
  4. PollingPublisherを実装する — 定期的にOutboxテーブルをポーリングし、未送信メッセージをブローカーに転送するプロセスを作る
  5. 消費者を冪等にする — at-least-once deliveryに対応するため、メッセージIDによる重複排除を消費者側で実装する

ロックより

前回は鎖を切った。今回は手紙の配達を確実にした。——Sagaが骨格なら、Outboxは血管だ。骨があっても血が通わなければ体は動かない。設計とは、こういう層を一つずつ積み上げていくことだ。

次に壁にぶつかったら、また来るがいい。壁がある間は、まだ前に進んでいる。

comments powered by Disqus
Hugo で構築されています。
テーマ StackJimmy によって設計されています。