世界を動かす技術を、日本語で。

イベント駆動システムのためのデッドレターキューとしてのPostgreSQLの利用

概要

Wayfairのプロジェクトで、Kafkaイベントストリームを用いた日次ビジネスレポート生成システムの失敗時対応を担当。 KafkaのDLQでは可観測性が低く、CloudSQL PostgreSQLをDead Letter Queueとして運用。 失敗イベント管理、再試行、監査性向上のためのDLQテーブル設計とインデックス戦略。 ShedLockによる安全なDLQ再試行スケジューリング実装。 運用上の利点と、KafkaとPostgreSQLの役割分担による堅牢なパイプライン構築。

Wayfairのイベント集約システムと失敗時の課題

  • Wayfair のプロジェクトにて、 Kafkaイベントストリーム を活用した日次レポート生成システムを担当
  • Kafkaコンシューマ がイベントを受信し、外部サービスでデータ補完、 CloudSQL PostgreSQL へ永続化
  • 正常時は イベントの流れ がスムーズだが、分散システム特有の 障害発生 が不可避
  • 外部APIのダウン・遅延、コンシューマのクラッシュ、不正なイベント構造など多様な 障害シナリオ
  • これらの障害を 優雅に処理 するために Dead Letter Queue(DLQ) を導入

Kafka DLQの限界とPostgreSQL DLQへの移行

  • 一般的な手法として Kafkaトピック をDLQに利用するが、 可観測性に課題
    • 失敗理由別の検索や特定イベントの再試行が 困難
    • 「昨日何が失敗したか」等の 簡単な質問にも追加ツールが必要
  • CloudSQL PostgreSQL をDLQとして利用することで、 運用複雑性を増やさず に障害イベントを 第一級市民 として管理
  • 失敗イベントごとに ペイロード・失敗理由・ステータス を記録
    • ステータスは PENDING (未再処理)と SUCCEEDED (再処理成功)の2種類でシンプルに管理

DLQテーブル設計とインデックス戦略

  • payload カラムは JSONB 型で生イベントを柔軟に格納
  • status でライフサイクルを明確化
  • retry_after で過剰な再試行を防止
  • retry_count で再試行回数を管理
  • タイムスタンプ で監査や運用分析が容易
  • 主なインデックス
    • status
    • status, retry_after
    • event_type
    • created_at
  • これにより再試行対象イベントの効率的抽出や、デバッグ・時系列分析が 高速化

ShedLockによるDLQ再試行メカニズム

  • ShedLock を用いた 再試行スケジューラー でPENDINGイベントを定期的に再処理
  • サービスが 複数インスタンス で動作しても、ShedLockにより 重複実行防止
  • 再試行設定例
    • 最大再試行回数:240
    • 1回あたりバッチサイズ:50
    • 実行間隔:6時間
  • 再試行時の流れ
    • スケジューラーが FOR UPDATE SKIP LOCKED 付きSQLで安全に対象行を選択
    • 成功時は SUCCEEDED へ、失敗時は PENDING で次回以降再試行
    • SKIP LOCKED により複数インスタンスが同時に異なる行を処理、 重複再試行防止と高スループット両立

運用上のメリット

  • 失敗イベントの可視化監査性 向上
  • SQLでシンプルに失敗分析・再処理が可能
  • 長時間の外部依存障害でも イベント損失や再試行嵐を回避
  • 本質的に不正なイベントも 可視化 され、 サイレントドロップ防止
  • 障害対応の 心理的負担軽減明確なリカバリパス の確立

KafkaとPostgreSQLの役割分担と総括

  • Kafkaは 高スループットなイベント配送 を担当
  • PostgreSQLは 障害イベントの永続化・検索性・再試行管理 を担当
  • 各システムの 強みを活かす設計 で、 堅牢かつ運用容易なパイプライン を実現
  • PostgreSQL DLQにより障害対応が「退屈で予測可能」になり、 本番運用に最適

Hackerたちの意見

このアプローチで気をつけるべき一番のことは、必ず何らかの失敗やバグが発生して、デッドメッセージのレートが10倍、100倍、1000倍になることがあるってことだね。それがDLQデータベースをオーバーロードさせちゃう。サーキットブレーカーかレートリミットが必要だよ。

これだよ!メインキューがバックオフするより最悪なのは、DLQにアイテムが入らないことだね。システムが落ちちゃうから。

DLQに配信できないなら、どうするの?結局、メッセージが欠けちゃうことになるじゃん。どっちにしても関係ないよね。

あるアプリで、未処理の例外が発生したときにスタックトレース付きの内部メールを送るようにしてたんだ。アジアのボックスでOOMが発生して、数百通のメールを毎秒送信して会社のWANバックボーンとチーム全体のメールボックスを飽和させるまでは、うまくいってたんだけどね。いい思い出だよ。

どんなシステムでも、いずれはそうなるよ。PGを見下す必要はないよ。PGはより取り組みやすく、もはや専門的なスキルではなくなってるから。

これはどんなDLQでも同じリスクがあるね。DLQのアイデアは、最終的にリトライ(バックオフ付き)することなんだけど、失敗が続くとそこに留まることになる。DLQから抜け出せないメッセージを監視する必要があるよ。理想的には、DLQに何も留まるべきじゃなくて、もしそうなったら修正すべきことなんだ。

DLQメッセージをキューに入れて、コンシューマーがpgに取り込むことってできるのかな?(メッセージを引き出したばかりなら、キューは多分ダウンしてないよね)

FOR UPDATE SKIP LOCKED 今日は新しいことを学んだ。FOR UPDATEの意味は知ってたけど、SKIP LOCKEDの指令についてはちゃんと調べたことがなかったんだ。これは結構クールだね。

SKIP LOCKEDについて知ったのは、ChatGPTが並行処理の問題を解決するために提案してくれたからなんだ。こういうことを学ぶのに素晴らしいツールだね。

そうそう、SKIP LOCKEDは素晴らしい。実際にはほとんどの場合LIMITが必要なんだけど、この記事では触れてなかったね。選択が複数のテーブルにまたがる場合は注意が必要だよ。明示的にロックしたリレーションだけが保護されるから(SELECT … FOR UPDATE OF t1, t2を参照)。ORDER BYも重要で、公平性やリトライの挙動を制御するからね。ANALYZEにも気をつけて。自動分析は、デッドからライブのタプルしきい値を越えたときにしか実行されないから、大きなテーブルや古い行が多い場合は遅れることがあって、悪いプランやSKIP LOCKEDのパフォーマンス低下につながることがあるよ。最後に、削除やライフサイクルについても考えてみて。成功時の削除や、スケジュールされたクリーンアップ(pg_cronを考慮して)や古いデータのパーティショニングは、効率を保つのに役立つよ。

SegmentはMySQLをキューとして使ってるけど、DLQとしては使ってないんだ。それが彼らのスケールでうまくいってる。だから、これをキューとして耐えられるシステムは結構あるよ(全部じゃないけど)。私はシンプルなフローで、タスクは毎時数千件。PostgreSQLを使ってる。可視性が高く、再キューが簡単で、耐久性のあるストレージ。適切なインデックスを使えば、全然問題ないよ。LLMは最初からスキップロックのコードを書いてくれるし、ローカル開発も楽だよ。低ボリュームのシステムでは、イベントバスにPostgresをいつも選んでる。

もちろん、超大規模なイベント処理には使わないけど、90%のビジネスアプリにはメッセージ/タスクキューのデフォルトとしては素晴らしいよ。1日に数億件のイベント/タスクを処理してて、同時に10,000件未満のプロセスがデキューしてるなら、これがデフォルトだね。私はPGベースのキューシステムを使ったアプリで働いていて、通常のキューシステムでは簡単に達成できないような、タスクの優先順位や順序を動的に調整できたり、キューの内容を簡単にクエリしたりレポートしたりできる不可欠な機能を提供してくれてる。私たちのニーズに特化した他の興味深い機能もたくさん組み込まれていて、ここで詳しく説明するのはちょっと躊躇しちゃうけどね。

DNAの処理を極めて高いスケールで始めることはほとんどないよね。イベントベースの作業をする時は、さらに桁違いに多くのイベントが発生するし。これは、必要な時に物事を整理できる、合理的なスタート地点だと思う。ほとんどのものは、そんなに大きくスケールしないからね。

ShedlockとSELECT FOR UPDATE SKIP LOCKEDを使う理由は?Shedlockは並行処理を止める(ちょっとだけど)、でももう一つの方は並行処理を可能にするんだよね。

SKIP LOCKEDについて、Postgres 9.5で導入されたんだけど、これがその素晴らしい2016年の2ndQuadrantの投稿のアーカイブコピーね [†] https://web.archive.org/web/20240309030618/https://www.2ndqu... それに対応する2016年のHNディスカッションスレッドもあるよ https://news.ycombinator.com/item?id=14676859 [†] どうやら、古い2ndquadrant.comのブログ投稿のリンクは、enterprisedbに買収された後に壊れちゃったみたい。

このパターンについて具体例や失敗モードを含む詳細なウォークスルーを公開したよ:PostgreSQL FOR UPDATE SKIP LOCKED: The One-Liner Job Queue https://www.dbpro.app/blog/postgresql-skip-locked 競合条件や原子的なクレームの挙動、ワーカーのクラッシュ、優先順位やリトライが通常どのように重ねられるかについてもカバーしてる。古い2ndQuadrantの投稿で説明されているのと非常に似たアプローチだけど、現代的なエンドツーエンドの例になってるよ。

基本原則の素晴らしい応用だね。生産負荷が高い場合でも、全然合理的だと思うよ。(例:前の職場では、常に毎秒30,000イベントを処理するサービスがあって、DLQにはせいぜい数百メッセージしか入ってなかった)。キューに1時間以上古いメッセージがあったら、ページングされる仕組みだったよ。DLQのボリュームが常に高いなら、上流のデータやデータ処理ロジックに問題があるってことだよ、アーキテクチャの問題じゃない。

ステータスに文字列を使う理由って何?ブール値の方が無駄なスペースを使わずに済むし、特にステータスがインデックスされてるから意味ないよね。それに、可能ならevent_typeも整数にした方がいいと思う。あと、同じ先頭フィールド(ステータス)で二つのインデックスを持つ理由は何なの?

ちょっと関係ないけど、エリクサーエコシステムでPostgresのネイティブメッセージキューとしてObanを使ってて、めっちゃ気に入ってる。私のユースケースでは、KafkaやRabbitMQみたいな別のインフラを立ち上げるよりずっとシンプルなんだよね。

Chargifyでこれやったことあるけど、MySQLでね。Redisが使えないときは、ジョブをJSONの塊としてMySQLテーブルにダンプしてた。クロンジョブが定期的にそれを掃除して、ジョブを再キューイングしてたんだけど、うまくいってたよ。