1000万件オーバーのレコードのデータをカジュアルに扱うための心構え

自分が所属している会社のメンバーの教育用資料として、それなりの規模のデータを扱う時に前提として意識しておかなければいけないことをざっくりまとめたので、弊社特有の話は除外して公開用に整理してみました。

大規模データ処理、分散処理に慣れている人にとっては今更改めて言うことじゃないだろ、みたいな話ばかりだと思いますが、急激にデータスケールが増大してしまったりすると環境に開発者の意識が追い付かないこともあるかと思います。

そういったケースで参考にできるかもしれません。

弊社は基本的にAWSによって運用されているので、AWSを前提にした様なキーワードやサービス名が出てきます。後、句読点があったり無かったりしますが、ご容赦ください。

追記: 社内用の資料の編集なのでかなりハイコンテキストな内容だから誤解するかもしれませんが、これらはそもそもRDBの話ではありません。(関係無くは無いけど) 1000万オーバーの件数から数件取るとかそういう話ではなく数億件の中から1000万件を取得して数分以内に処理を終わらせるとかそういう処理を頻繁に(カジュアルに)実装しなければならない弊社の話でした。

1ノード, 1コアで処理してはいけない

  • 1件が200byteとしても1000万件を処理するなら2GB。これだけならメモリに乗り切らない程ではないが、オブジェクトにマッピングすると数GBを余裕で越えたりするし、1億件なら数十GBを越えるかもしれない。
  • 1件処理するのにかかる時間が1msecでも1000万件を処理すると1万秒 (3時間弱) かかる。
  • RubyはGILに引っかかるのでそもそも余り向いてない。可能ならマルチスレッドで処理を行い易いGoかJava辺りの利用を検討すること。(別にRustとかC++でも良いけど)
  • 上記以上の規模や負荷がかかる場合はノードごと分散することを検討する

通信回数を減らせ

  • 1度の通信が1msecで完了しても、1000万件を処理すると1万秒 (3時間弱) かかる。
  • 通信レイテンシやラウンドトリップタイムによってスループットの限界が決まる。通信回数が多いとワイヤースピードよりも遥かに低速で律速する。
  • redisキャッシュに接続してデータ取るのにもコストがかかる。

通信は常に失敗する可能性がある

  • 通信の信頼性はそんなに高くない。AWSの中でも度々不可解なネットワークの断絶が起きる。
  • 突然通信が終了すると、片側で終わったということすら検知できない場合があるため、状態遷移が中途半端な状態で処理が止まる可能性を考慮しなければならない

エラー対象を特定可能にしておく

  • エラーが起きた時に、どのデータに異常があったか判別できないと1000万オーバーのデータから大体の検討で探すことになり、非常に時間がかかる
  • 実装に不具合があった場合、特定パターンのデータが到達したら即座にシステムごと死ぬケースもある。
  • エラートラッキングサービスに必要なメタデータを付与してエラーを送る様にする
  • 一方で、ignoreして処理継続可能なエラーを都度トラッカーに送ると、一瞬で数十万のエラーが積み重なって課金額がえらいことになる危険があるので、注意すること。

リトライ可能ポイントを考慮しろ

  • リトライ機構は常に必要である
  • 1000万件処理している時の990万件目で処理が死んだ時、残りの10万件でリトライ可能にできるかを考慮する
  • 処理が複数ステップに跨る時は、ステップごとにコンポーネントを分けワークフローエンジンによってコントロールすること
    • 個別のステップを独立して実行可能にしてリトライ可能ポイントやテストデータ投入ポイントを挟む
  • 仕組みの単純化トレードオフになるケースもあるので、フルリトライが最善であるケースもある

羃等性の維持

  • リトライ時に状態の回復を要求すると自動リトライが困難になる
  • 処理の最初から流せば常に同じ状態になる様に設計することで、リトライに関わる処理が単純化できる
  • at least onceはexactly onceより圧倒的に処理負荷が軽い、羃等であるならat least onceで安全に処理できる
  • 羃等にすることが難しい処理に注意
    • カウントアップ等の今のデータからの差分を書き込む処理

分散処理ではロカリティ(データの所在)を意識する

  • 複数データのJOINが必要な場合、同一のキーであらかじめデータを同じ箇所に集めておいてから結合しないと、データの再配置が必要になる。再配置 = 大容量の通信処理であり高負荷。
  • 出力の全件Orderingは基本的に全てのデータを集約しないとソートできない

モニタリング必須

  • 関連するコンポーネントが増えるとボトルネックが分かり辛くなる。全ての箇所でモニタリング機構を整備すること。
  • コンポーネントに跨る様な処理の場合はdatadogに全体を俯瞰できる専用のdashboardを作る。
  • 最低限下記のリソースをモニタリングすること

    • CPU利用率とiowait
    • Disk I/O (iopsとスループット)
    • ストレージ消費
    • メモリ消費
    • ネットワークスループット
    • GC統計
    • 通信レイテンシ (中央値・最大値、90〜95パーセンタイル辺り)
  • 追加でモニタしておきたいもの

    • 無視できるエラーの回数、異常データの数や出現頻度
    • 処理遅延 (キューイングから処理されるまでの待ち時間)
    • DBへのqps
    • キャッシュヒット率 (キャッシュ機構がある場合)

余裕を持ったリソース確保 (staticかオートスケーリング かを問わない)

  • 唐突に大きな顧客からのアクセスが増えることがあるため、常に最低でも2倍ぐらいの負荷に耐えられる余地を残しておきたい
  • staticなノードの場合は、上記の様にモニタリングをしっかり整備し、早い段階でノード追加、スケールアップを行える様にアラートを整備する
  • それ以外のノードはオートスケーリングもしくは処理のキューイングに応じて自動的に必要なリソースを確保する機構が必須である
    • FargateやLambdaも活用できるし、リソース確保が簡単になる。それらの制約に対応出来る場合は検討の余地がある。
    • オートスケーリングの基本は増やす時は大胆に増やし、減らす時は少しづつ減らす

工程毎のログを吐け

  • バルク処理は長時間動作する上、複数ステップを伴うことがあるので、適宜ログを吐かないと、どこまで進んでいるか分からない。
    • 特定処理でスタックしてエラーにもならなかった場合に、進捗が確認できなくなる状態を避けたい
  • 一方で一件単位で詳細なログを吐くとログのデータサイズやログ出力の負荷が馬鹿にならないので、レコード個別のログについては必要性を考慮すること

汎用的なETL基盤を構築すること

  • importやexport等を個別の機能ごとに作るべきではない
  • 個別に作るとスケーラビリティの問題に個別で対処しなければならなくなる
  • 現状だとembulkをラップして実行できる基盤を準備し、必要があれば適宜プラグインを書くと概ね対応できる。

既存のコードを信用するな

  • 誰が書いたコードであっても1年以上前のコードがベースになっている箇所を信用してはならない
    • 実装を理解し現時点でも問題無いかどうかを確認すること
  • 1年あれば提供規模や顧客規模も変わるし、会社として注力している箇所も変わる。

    手癖で書くな

  • 量が変われば解決方法が変わる
  • 1年前と同じ方法では機能提供できないケースが多い
  • 特にWebシステム開発の手癖をバルク処理に持ち込んでは駄目。

クラウドサービスを使え

  • クラウドサービスの機能を理解し、不要な開発工程を減らすこと
  • ちょっとでも関係しそうなサービスを見かけたらとりあえずドキュメントを読む癖を付けておかないと、いざという時に思い付かなくなる。
  • 以下のサービスはデータ処理基盤において特に重要で、日常的に検討対象に入るので定期的に復習し新機能をキャッチアップすること
    • S3
    • SQS
    • SNS
    • Elasticsearch
    • ElastiCache
    • ECS/Fargate
    • StepFunctions
    • CloudWatchEvents

追記: 社内用の資料から対象を絞り込んだ時に消えてたもの

  • EMR
  • Athena

Kaigi on Rails 2020に登壇して、Kafkaを使ったサービス分割の話をしてきました。

2020/10/3 に開催されたKaigi on Rails 2020に登壇してきました。

オンラインのイベントやカンファレンスに登壇したことはあるんですが、今迄割とカンファレンスの現場感みたいなのが薄いなと感じる中で、Kaigi on Railsはかなり以前の祭り感みたいなのを感じることが出来て、とても良いカンファレンスだったと思います。

特にSpatial Chatでカンファレンス会場の廊下っぽい空気を完璧とは言えないまでも再現出来てたのはとても良い試みだと思いました。

さて、私はというとキーノートを除いた通常発表のトリとして発表させていただきました。

(午前だったら寝坊してたのでやばかった……、運営チームの気遣いに感謝しておりますw)

内容は以下の通りです。

speakerdeck.com

背景とか反省とか

RailsというWebアプリケーション開発のフレームワークを中心にしたカンファレンスということで、割と業務ドメイン特有の要素が前提になっていても良いかなあと思い、無理に一般的な話として頑張らないことにしました。

最近、増え続けるデータ量と複雑化するデータパイプラインの活用に頭を悩ませ続けているので、そういった高負荷な非同期処理が連鎖する環境でKafkaを活用してサービスをバラしつつ高トラフィックかつ複雑な処理を実装する、ということをやってまして、Railsでやってたころとどの辺りが違うのか、という話を中心に資料を作りました。

発表後にTwitterで何人かに「何故こんなことをRailsでやるのか分からん」みたいな感想をもらいましたが、自分も「ほんまそれ」と思いますねw

私がこの会社に正式に入ったのは4年ぐらい前で、当時は扱うデータは数百万件ぐらいだったんですよ。トラフィックも総データ量も当時から100倍以上は増えていて、しかもビジネス上重要だった機能を廃止して注力する方向を変えたり等もあって、根本的に構造が噛み合ってない箇所がいくつか残ったりしている状況でした。

なので、最初のころはRailsでも何とかなったっちゃなった訳です。

で、増え続けるトラフィックに対応しつつ機能開発をやり、その中で段階的にRailsと噛み合わない処理を色々と引き剥がしてきたので、こういう感じになっているという背景があります。

そういった機能開発要求の中で特にハードだったのが、データパイプラインの準リアルタイム化で、バッチベースの処理から大きくアーキテクチャを転換する必要に迫られました。

そこでアーキテクチャの中心にKafkaを置いて、業務のコアとなるデータパイプラインとWebアプリケーション側の非同期処理の複雑な非同期処理を上手く連携できる仕組みに寄せていこうと判断しました。

今回の発表は、そういった開発の流れの中で現在進行形で戦っている最中の話でした。

今も試行錯誤している最中の話だし、私自身が全ての範囲を見て実装できている訳ではないので、きっちりまとめ切って確信を持って話しが出来なかったのが多少心残りではあります。

特に、一般化してコードに落としたり知見をgemにまとめたりといったことが全然出来ていないので、そこは大分残念でした。

そして、いざ資料を書いてしまったら補足説明とかで色々と書き過ぎた結果、20分に全然収まらなくなってしまって、すげー発表時間をオーバーしてしまいました……。申し訳ありません。(この辺りも順番が最後の方でまだマシだった)

やっぱ、登壇経験がそこそこあっても、ちゃんと時間測って喋るリハーサルやらないと駄目だなーと反省しております。

結構資料が難産だったので、サボってしまったのが良くなかったですね。

という訳で、今回の発表は自己評価としては微妙な感じになってしまいましたが、何かしら伝わるものがあったなら幸いです。

来年もKaigi on Railsをやるぞ、という感じらしいので、またコンテンツ提供出来る様に頑張りたいと思います。次回もよろしくお願いします。

(来年はオフラインで集まれるんだろうか……)

パーフェクトRails著者が解説するdeviseの現代的なユーザー認証のモデル構成について

最近、パーフェクトRuby on Railsの増補改訂版をリリースさせていただいた身なので、久しぶりにRailsについて書いてみようと思う。 まあ、書籍の宣伝みたいなものです。

数日前に、noteというサービスでWebフロント側に投稿者のIPアドレスが露出するという漏洩事故が起きました。これがどれぐらい問題かは一旦置いておいて、何故こういうことになるのか、そしてRailsでよく使われるdeviseという認証機構作成ライブラリのより良い使い方について話をしていきます。 (noteがRailsを使っているか、ここで話をするdeviseを採用しているかは定かではないので、ここから先の話はその事故とは直接関係ありません。Railsだったとしても恐らく使ってないか変な使い方してると思うんですが、理由は後述)

何故こんなことが起きるのか

そもそも、フロント側に何故IPアドレスを送ってんだ、という話ですが、いくつかのブログ記事に書かれている様に、ActiveRecordがUserレコードに詰め込まれている色々な情報をSELECT * FROM usersで引っ張ってきてしまって、json変換する時に参照してしまってフロントに漏れたんでは、というのは想像できる話ですね。

Railsというフレームワークデファクトとして使われている認証機構構築ライブラリのdeviseは、とにかく色々な機能が組込まれており設定と簡単なDSLでパスワード認証、メールアドレス認証、パスワードリセット、ログイントラック等ができる様になります。

しかし、これを何も考えずにざっと作ったUserモデルでポコポコ有効化していくと、上記の様な問題に繋がる可能性はあります。

ただ私としては、こういう事情があったからといって別にDeviseを使うべきではない、とまでは思いません。というか改めて確認してみましたが、Devise自体はこの問題についてはちゃんと対処済みです。具体的にはdeviseのモジュールをincludeしたら時点でserializable_hashが上書きされ、to_jsonとas_jsonからdevise関連のカラムを除外する様になってます。(ソース)

むしろ、Deviseの構成元にして半端にパクったものを自作する方が危ない例の一つと言えますね。まあ、どっちにしてもto_jsonをそのまま渡すのは止めた方が良いやり方であることは間違いないです。Deviseの防御機構もこれで安心か、と言われると色々と説明が無さ過ぎて何とも言えないところです。READMEに書いてるRailsの知識が全然無いなら使わない方が良いよ、というのはこういう所に現れているのかなという感じです。

結局のところ、これはRailsがどういうフレームワークとして成り立ってきたかということと現代的なWebアプリケーションの実装におけるモデル構造とが乖離してきており、現代的なモデル構成が取れていないという点に問題の根幹があるからで、別にDeviseを使おうが使わなかろうが、同じ様なモデル構成を取って雑にフロントに渡せば同じ事故が起き易くなります。

私見ですが、Deviseが採用している基本形であるUserモデルに認証に付随する様々な機能をぶち込むというのは、現代において余り良い方法ではないし、それを参考にして半端にパクった認証機構を作ると尚のこと危険である、と思っています。(とにかくdeviseは初心者向けのサンプルが悪い)

Railsというフレームワークはv1.0から数えても既に15年近い歴史があり、充分な老舗フレームワークです。そして、Railsは現代においても充分に便利で強力なフレームワークですが、JSの扱いにおいては現代の潮流とは割と異なった流れに乗っています。

SPAというものを余り重視しておらず、現代においても基本的にはRailsはHTMLを出力するフレームワークという立ち位置が基本だと思っています。そういうフレームワークなので先進的なJSコミュニティからの評判は余り良くないようです。

HTMLを出力する場合、HTMLテンプレートのそれぞれの箇所に「この情報を出すぞ」とバックエンドも触っている人が明確に決めて記述するので、あるモデルに必要ない情報が混じっててもサーバーの向こう側に届くことはありません。

一方でSPAの様にフロントサイドでレンダリングしてDOMを弄くりまわす様な構成のアプリケーションの場合、フロントエンドとバックエンドで開発の専門性が異なるケースがしばしばあるため、バックエンドからまとめて情報を渡して後はフロントエンドでよしなに扱ってくれ、という方向に意識が向く場合があります。

そうすると、昔ながらのRailsの様に何でもかんでもUserに突っ込んでActiveRecordでよしなにやる、みたいな考え方でモデルを構築していくと、事故る訳です。

Deviseも10年近い歴史があって、そういう所からスタートしているので、昔からの情報やREADMEにある情報だけでは、現代的なWebアプリケーションと合致しない箇所がしばしば出てきていると思います。

そもそもSPAは難しいものです。サーバーサイドという自分達の環境に情報を閉じ込めておけないので、サーバーサイドでデータを取り扱うのに比べて、何を持つべきで何を持つべきでないのかを遥かにセンシティブに考える必要があります。

防衛策

じゃあ、どうやってこういったことを防ぐと良いのか。サーバーサイド側で責任を持つなら、必ず明示的にattributeを列挙するシリアライザかjbuilderの様なjson builderを通して、暗黙的なデータ渡しを行わない様にすることが基本になります。render json: @hogeは止めましょう。まあ、固定でエラーメッセージを返すエラーハンドラとかなら問題無いと思いますが、余計なお節介としてエラーになったオブジェクトの情報も付けとくか、とか思ってしまうと死ぬので気は使いましょう。

フロントサイドでも明示的に必要な情報だけをくれ、とサーバーサイドに要求することは出来ます。それにはGraphQLが使えます。GraphQLではフロント側から明示的に必要な属性値を列挙してリクエストを行うため余計な情報を取得してしまう可能性を下げることができます。フロントとバックエンドで開発体制を分業しつつ、安全性と柔軟性のバランスを取った仕組みと言えます。

そして、そもそも余計な情報をUserに持たせない様にモデルを作っておくという方法も取れます。人間書いてればうっかりしたりサボったりするもので、何事も見過ごす可能性があるもので、最初に防御的に作っておけばそういった時に事故を防げる可能性が上がります。Railsは良くも悪くもRDBと密結合したフレームワークなので、まずテーブル構成からしっかりやっておけば、それなりに安全性が上がります。

Deviseでこういうデータベース構造を取ろうとすると多少の工夫が必要になるのが残念なところですが、そもそもテンプレのまま作れるユーザー認証基盤というのはお試しアプリぐらいなものなので、割り切って多少コントローラーとviewを弄るぐらい普通だと考えた方が良いんじゃないかと思います。ログインを要求するモデルはUserに限らずAdministratorとかOperatorとか色々あったりするので、コントローラーとviewをdeviseから分離して弄るのは、実務の場合ほぼ必須なので。

じゃあ、どういう風に弄るのかという例を紹介していこうと思います。

ユーザー認証関連のテーブル設計

昔はただユーザーIDとパスワードで認証できればそれで良かったのですが、昨今のWebアプリケーションでは遥かに多くのことが必須になっています。現代のユーザー認証において求められる機能や前提は以下の様なものになります。

  • 古典的なパスワード認証
  • Twitter/Google等の各種サービスアカウントを利用した認証
  • APIのためのトークン認証
  • メールアドレスの存在確認
  • パスワードリセット
  • 2FA
  • アカウントロック

これらを都度自作するのは、非常に大変です。認証だけなら割と何とかなりますが、メールアドレスの存在確認やパスワードリセット等は、ユーザー側からの要求を受け付けるビューやエントリポイントがあって、更にそれらの有効期限管理等が絡んでくるため、サーバサイドだけで話が完結しないし状態遷移が長時間に渡って継続します。Deviseが使われる理由は主にこの辺りにあります。

という訳で、Deviseを使って工数を削減したいが出来るだけ安全に使いたい場合にどうすれば良いのか、というと話は単純で機能毎にテーブルを分けた方が良いだろう、と私は考えています。

認証部分のテーブル分割の例

Userというその他のリソースと関連するモデルに持たせるのは、そのアカウントとしてログインしているならほぼ常に必要とするデータだけを持たせる様にしましょう。ID、ログインネーム、メールアドレスのみぐらいが良いですね。 そして、認証機構毎にテーブルを分けます。例えば以下の様なモデルを作ります。

  • User::DatabaseAuthentication
  • User::TwitterAuthentication
  • User::GoogleAuthentication
  • User::ApiTokenAuthentication

(User名前空間に作ってますが、定数の扱いで事故る可能性もあるのでUser prefixとして作っても良い。管理のしやすい方で)

こうすることで、特定の認証を必要としないユーザーはレコード自体を作らないで済みますし、必要なユーザーのカラムはほぼ全てNOT NULLにできます。

もし、後から2FAに対応してくれ、という要求があったとしても、新しくテーブルを作れば対応できます。既存のモデルに与える影響は小さくできるでしょう。

登録前確認やリセットやロックについての分割の例

登録前確認やアカウントロックの場合は以下の様なモデルを作ることになるでしょう。

  • User::Registration
  • User::AccountLocking
  • User::PasswordResetRequest

この様にテーブルを分けるのは、RESTfulなエントリポイントを作る上でも親和性があり、デフォルトでエントリポイント毎に必要な情報のみ受け渡す様に制限されます。 また、RDBのレコードを頻繁にアップデートしたり、NULLABLEなカラムが多数存在するのは状態管理や異常データの検知を複雑化させるので、出来る限り避けたいところですが、こういったテーブル構成を取ることでレコードがあるか無いかと、一つか二つ程度のカラムの整合性を確認するだけで正常かどうかを判断できる様になります。 レコードのvalidationを行う時には、境界を明確に分けて組み合わせが爆発しない様にコントロールすることが重要になります。

例えば、登録プロセスの途中でメールアドレスの存在確認を行い確認出来た場合にのみ、アカウントを利用可能にしたい場合、ユーザーの登録途中、という状態をUserモデルに持たせない様にし、ユーザー登録という行為自体を事実として記録します。こうすることで、中途半端な状態のUserレコードや関連リソースを作らずに済みます。

この様に機能毎に単一目的のモデルにしておくと、これらのレコードは用事が終わった時点で完全に削除することができます。そもそも一時的な情報でしかないので、最悪何らかの事故や実装上の問題によってテーブルごと作り直しても致命的な問題になり辛くなります。

もし機能の必要性が無くなれば、モデルと関連しているコントローラーごと消せば済むし、後から必要になってもUser本体へのmigrationが不要です。

Userというとにかく肥大化しがちで絶対に失ってはいけないレコードに、一時的な要求でしか使わない情報を持たせるのはほとんどの場合で悪手でしょう。

サンプルコード

一応、User, User::DatabaseAuthentication, User::Registrationとモデルを分割しUserモデルを極限までコンパクトにした動くサンプルを用意してみました。3時間ぐらいで作ったので登録とログイン回り以外は張りぼてだし、もっと調整する所色々あると思いますが、まあどうやって弄るのかの参考ぐらいにはなると思います。

github.com

各テーブルのカラムはdeviseが必要とするものと名前だけという最小構成です。

class DeviseCreateUsers < ActiveRecord::Migration[6.0]
  def change
    create_table :users do |t|
      t.string :nickname, null: false

      t.timestamps null: false
    end

    add_index :users, :nickname, unique: true
  end
end
class DeviseCreateUserDatabaseAuthentications < ActiveRecord::Migration[6.0]
  def change
    create_table :user_database_authentications do |t|
      t.references :user, foreign_key: true, dependent: :destroy,  index: {unique: true}
      ## Database authenticatable
      t.string :email,              null: false
      t.string :encrypted_password, null: false

      t.timestamps null: false
    end

    add_index :user_database_authentications, :email, unique: true
  end
end
class DeviseCreateUserRegistrations < ActiveRecord::Migration[6.0]
  def change
    create_table :user_registrations do |t|
      ## Confirmable
      t.string   :confirmation_token,  null: false
      t.datetime :confirmed_at
      t.datetime :confirmation_sent_at
      t.string   :unconfirmed_email
      t.string   :email

      t.timestamps null: false
    end

    add_index :user_registrations, :confirmation_token, unique: true
    add_index :user_registrations, :unconfirmed_email, unique: true
  end
end

モデルはこんな感じ。

class User < ApplicationRecord
  devise :authenticatable

  has_one :database_authentication
end
class User::DatabaseAuthentication < ApplicationRecord
  belongs_to :user
  devise :database_authenticatable, :validatable
end
class User::Registration < ApplicationRecord
  devise :confirmable
end

何故Userを:authenticatableにしているかというと、もし複数の認証基盤に対応することになった場合、database_authenticationのsign_inが行われていなくてもuserという大本の方でsession管理する方向に修正できる様にしてあります。

で、次にコントローラーを多少カスタマイズします。

デフォルトの挙動ではUserが先に作られていることを前提にしているので、registrations#createのsuperを呼ぶ前にregistrationが無ければ作成する様にしておきます。後は、renderするテンプレートやredirect先を適宜調整します。

この時点でaction_mailerからメールが飛び、トークン付きのURLが送信されます。(development環境ならデフォルトで本文がコンソールに表示されます)

デフォルトのconfirmations_controllerは確認が取れたら、ログイン画面にredirectしますが、controllerを少しだけ上書きし普通にshowテンプレートをレンダリングする様にします。

コードは以下の様になる。

class User::RegistrationsController < Devise::ConfirmationsController
  # GET /resource/confirmation/new
  # def new
  #   super
  # end

  # POST /resource/confirmation
  def create
    user_registration = User::Registration.find_or_initialize_by(unconfirmed_email: params[:registration][:email])
    if user_registration.save
      super do
        flash[:notice] = "Sending an email confirmation instruction"
        return render :create
      end
    else
      respond_with(user_registration)
    end
  end

  # GET /resource/confirmation?confirmation_token=abcdef
  def show
    super do
      @user = User.new
      @user_database_authentication = User::DatabaseAuthentication.new
      return render :show
    end
  end
end

そして、showのテンプレートに本登録に必要なパスワード登録等のformを記述します。 (面倒なので省略)

このフォーム画面は、有効期限内のメールアドレス認証トークンを持っている場合のみアクセス可能です。

後はフォームの受け取り先で、transactionを使ってUserとUser::DatabaseAuthenticationを登録します。UserはただのIDの箱みたいなものなので何も気にせず作ることができます。User::DatabaseAuthenticationもモデルがsaveされる時にdeviseが良しなにやるので、普通にモデルをnewしてsaveすれば問題ありません。(自分でソースコードは読んでおきましょう)

コードで書くとこんな感じ。

class User::RegistrationsController < Devise::ConfirmationsController
  def finish
    self.resource = resource_class.confirm_by_token(params[:confirmation_token])
    ActiveRecord::Base.transaction do
      @user = User.new(nickname: params[:nickname])
      @user_database_authentication = User::DatabaseAuthentication.new(user: @user, email: params[:email], password: params[:password], password_confirmation: params[:password_confirmation])
      @user.save!
      @user_database_authentication.save!
      self.resource.destroy!
    end

    sign_in(:user, @user)
    sign_in(:database_authentication, @user_database_authentication)

    redirect_to posts_path
  rescue
    render :show
  end
end

丁寧にやるなら、showで表示するformのためにFormクラスを作っても良いでしょう。

後、Deviseはroutingの設定が無いとDevise.mappingsに登録されず組込みのコントローラーを使えなくなってしまうので、routes.rbも設定しておかないと駄目です。

Rails.application.routes.draw do
  devise_for :user, skip: :all
  devise_for :database_authentications, class_name: "User::DatabaseAuthentication", controllers: {
    sessions: 'user/database_authentication/sessions'
  }
  devise_for :registrations, class_name: "User::Registration", controllers: {
    confirmations: 'user/registrations'
  }
  devise_scope :registration do
    post "/registration/finish", to: "user/registrations#finish",  as: "finish_user_registration"
  end
  devise_for :users
  resources :posts
  # For details on the DSL available within this file, see https://guides.rubyonrails.org/routing.html
end

とまあ、こんな感じで調整していけば、そんなに魔術的じゃない感じでdeviseの機能を利用できます。モデルのモジュールとcontrollerのソースコードは追えないと駄目ですが、利用に際した前提条件と言えます。(wardenは流石に深く突っ込まなくても大丈夫だと思うけど)

コントローラー名とかURLとかもうちょっとどうにかしようがありますが、とりあえずこの様にして不要な情報をUserから引っ剥がしていくことは出来ます。

認証機構って実はあんまり作る機会が無いんですよね。最初から必要なのでプロジェクトの立ち上げ期に参画してないと余り触らない。しかも後からモデル構造弄るのがめちゃくちゃ大変になるので、微妙な作りになっててもほとんど修正されないことが多いし。

という訳で、この機会にユーザー認証のモデル構造がどうなってると良いのか考え直して、素振りしてみても良いんじゃないでしょうか。 (正直、deviseどころか久しぶりにRails触ったんで、routingの設定とかマジ分からんってなってた……)

パーフェクトRails 増補改訂版の自身の担当分について

長らく改訂版をお待たせしていたパーフェクトRailsがついに新しくなります。 私は、やはり人間は締切が近くならないと働かない、という極めて重要な事実を改めて学ぶことができたのが良かったと思っています。

そろそろ献本させていただいた本は届き始めている様で、ブログやTwitter等で紹介していただけて嬉しい限りです。 全体の解説や紹介はそちらに任せるとして、私は今回担当していた箇所が大きく変わったので、それについての感想や裏話を書こうかと思います。

前回担当していた箇所

前回は、基本的に終盤のRailsの基本機能を越えたアプリケーションを作る時に助けになる章を担当していました。

元々は、そういう仕事としてRailsアプリケーションを書く上で気にしておきたいこと、というのが書かれた本が余り無く、何とかそういう本を作りたいという思いがあったので、あの辺りの章を担当させてもらいました。

正解の無い部分にオピニオンを打ち込むという意志の元に書いたので、今となっては自分から見ても多少考え方が変わっていたりする箇所もあります。

Railsを余り触っていない2年間

一方で、私自身はというと、ここ2年ぐらい余りRailsを触っていませんでした。 Railsコンポーネントを使うことはあってもWebシステムとして使っておらず、ActiveRecordRDBをどうやって引き剥がすか、みたいなことばかり考えています。 利用している永続化層もCassandraとかKafkaとか分散ミドルウェアが中心になっており、Rubyから触るにしてもRailsで全くカバーされない範囲になっています。

直近数ヶ月で一番書いてる言語は間違いなくJavaです。

今回担当した箇所

という訳で、現代的なRailsの実践編を執筆しなおして纏めるには、どうにも説得力あるものを書ける自信がありませんでした。 このままでは本が出せないという感じなので、自分から誰かに依頼できないかと考えていました。 そこで今回お願いさせていただいたのが、やさいちさん(twitter: @_yasaichi)です。

RailsDM 2019という大きめのイベントがあったのですが、そこでRuby on Railsの正体と向き合い方という内容で登壇されていました。 私の過去の発表や私も参考にさせていただいたスライドも参照した上で、現代的なWebアプリケーションの考え方に沿っている内容で上手く纏められた素晴らしいトークでした。 私がそれを聞いていたのもあって、今回の改訂版にあたって私が過去に書いていた箇所を引き継いでもらえないかと依頼したら、やりますと言っていただけました。(本当にありがとうございます。めちゃくちゃ助かりました。)

私が以前に担当していた箇所はしっかり引き継いでいただけたと思いますし、改訂版でもしっかり紙面が確保されています、ご期待ください!

じゃあ私は何を書いたかというと、コンテナでRailsを動かすという章です。 昨今、先進的なRailsの現場では大なり小なりコンテナが使われていると認識しています。一方で本番環境では使われていないという話もそれなりに耳にします。 Railsをコンテナ環境で動作させる知見が書籍の中に纏まっていれば、それなりに価値のある情報になるだろうと思いました。

弊社では2016年辺りからコンテナを本格的に導入しており、私自身そこそこ知見を持っているということと、複数のミドルウェアを利用したアプリケーションのデプロイやテストのために、コンテナのセットアップは頻繁に業務として実施しているので、この章を担当するのが良いだろうということになりました。

内容は概ね以下の通りとなっています。

  • Infrastracture as Codeの歴史とDockerの意義
  • 効率的なコンテナイメージの作り方 (レイヤーキャッシュとかマウントキャッシュとか)
  • 開発環境での利用方法 (docker-composeとか)
  • 本番環境に向けて注意しておく点
  • オーケストレーションサービスの紹介

実際書いてて思いましたが、マジのproductionレベルで実際にコンテナを活用するなら、オーケストレーションサービスの活用は必須なんですが、それについて説明をしだすと少なくともECSかKubeのどちらかについてまともに解説を書かなければならず、1章どころでは済まなくなります。 流石にRailsの本でそこまでボリューム取るのも難しいし、私自身の余力もそんなに無かったのもあって、本番環境でコンテナ運用が出来る様になるまでの前段階という形に留めることになりました。 (弊社だとECSを採用していてkubeの知見がそんなに無いとかもある)

一応、本書で纏めた内容はそのために必ず考慮しておかなければならない要素になっていて、ここをカバーしていれば自分達が選択したオーケストレーションについて学習するだけで一気に本番環境での活用が近くなる内容に出来たとは思っています。 特に設定値と秘匿情報については読み込む方法のイメージを事前に考慮しておかないと、後々で移行しようとすると結構面倒なことになります。

私の担当箇所はこんな感じになってますが、その他の内容も一線で活躍しているメンバーが最新のRailsを対象に知見をまとめた物になっています。 紙の本としては相当なぶ厚さになっているらしいですが、その分情報量はパーフェクトと言える内容になっているでしょう。 電子版も割とすぐに出るらしいので、重そう……って感じの人は是非電子版で購入を検討していただけると幸いです。

Linuxオーディオ環境の刷新

コロナが蔓延してから美味しいものを食べるために外食するという行為がほぼ無くなってしまい、お金を稼ぐモチベーションが薄れていたので、いっそ散財してモチベーションを取り戻そうと思い、自宅の環境改善としてオーディオにお金をぶち込んでみた。

ついでにMacを使わなくなってしばらく経つのに、iPhoneとの連携のためにiTunesfoobar2000を無理やりLinuxで使ってたのを止めようと思いソフトウェア周りも一新することに決めた。

ハードウェア刷新

新しいヘッドホン

基本的に音楽はヘッドホンで聴くのだが、自分はゼンハイザーというメーカーのヘッドホンが昔から好きなので、まずは、そこのフラッグシップモデルであるHD800Sを買う。

実はHD820という後継のモデルもあったのだが、レビューを見る限りでは低音が多少HD800Sより強く出るが、HD800Sを持ってる人が買い替える程じゃないというコメントが多く、値段に10万以上差があったので、HD800Sにしました。

www.sennheiser.co.jp

新しいヘッドホンアンプ

そして、HD800Sのために本家が作ったヘッドホンアンプのHDV820も合わせてポチった。ヨドバシで溜まってたポイントを全部費しても27万円もした。

www.sennheiser.co.jp

https://www.yodobashi.com/product/100000001003662474/

今迄はゼンハイザーのHD700とDr. DAC2という大分昔に買った4万円ぐらいのヘッドホンアンプだったが、明らかに音の解像度が上がったのでお金出して良かったと思う。

ちゃんとバランス接続するとすげーなこれは。

新しいオーディオI/F

ついでに遥か昔に歌の練習のために持ってたオーディオI/Fがいつの間にかLinuxで動かなくなってたので、そっちも刷新した。

www.amazon.co.jp

これはLinuxで動くのが確認取れてて1万円ちょっとの代物をサクっと買った。 これにShureSM58というメジャーで安価なマイクを挿して使っている。

ソフトウェア刷新

Linuxで聴くための環境作り

もう何年間foobar2000を使ってたか分からんし、今もあれが一番使い易くて便利だと思うのだが、LinuxがメインPCになってから流石に32bitのwineをずっと使い続けるのに疲れてきた。 という訳で、最低限以下の要件を満たすプレイヤーを探すことにした。

  • Linuxで大量の曲が管理できる (20000曲以上)
  • タグを使った検索が簡単で高速である
  • DACにリサンプリング無しでデータが転送出来る
  • レーティングの移行がし易い
  • タグに埋め込み済みの歌詞表示が出来る

1日かけて探して回って、UIに若干の不満があるが一つだけ必要な機能を全て満たすプレイヤーを見つけた。

それがCantataである。

github.com

f:id:joker1007:20200518032935p:plain

Cantataについて

CantataはMPD (Music Player Daemon)というLinuxを音楽サーバにするためのソフトのクライアントだ。 実際に音楽データのデータベースを管理し再生するのはMPDの方になる。

MPD自体は非常に多機能で歴史も長い。 大体のファイルフォーマットはdecodeできて、ALSAのhwを直接参照してbit perfect (データ変換無し) でDACにデータ転送も出来るし、HTTPサーバーを立ててストリーミングさせたりicecastサーバにデータを流したりできる。

また、DSDというハイレゾ音源でしばしば使われるPCMと異なったサンプリング方式のデータも、PCM変換無しでDACに送ることができる。(完全なDSDネイティブ再生とDSD Over PCMがあるらしい)

今回購入したHDV820はDSDの再生に対応しており、DSD Over PCMを使ったネイティブ再生がLinuxでも動作確認できた。

CantataのUIの不満なところは一覧表示した時に画面に出せる情報が少ないという点と、表示の自由度が全然無いという点。

UIのフォントサイズを調整する設定すら存在せず、マジかよこれだからLinuxは……という気持ちになる。4K解像度でそのまま使うと視力が1.5無いと辛いと思う。QT_SCALE_FACTORで雑に拡大するしかない。

もう一つ気になる点は、既に新機能の開発は終了しており、現在はバグ修正のみがメインとなっているところ。まあ、foobar2000も大差無い様な気がするし必要な完成度はあるのでこの点は気にしないことにした。

ちなみに上述の要件の中で満たすのが難しかったのが、レーティングの移行と歌詞表示である。

変に独自のデータベースを持ってるとレーティングをimportするのが大変だったのだがCantataのレーティングはMPDのstickerという機能を単純に利用しているだけで、MPDのstickerは実際はただのSQLiteデータベースなので元のデータの一覧さえ出せれば簡単に移行が可能だった。

歌詞表示はiTunes & foobar2000時代はLYRICSというタグに歌詞を埋め込んでいたのだが、これを表示してくれるプレーヤーがほぼ存在しない。探した中ではCantataしか無かった。 非常に手間をかけて全部テキストファイルとして抽出しなおせば何とかなったっちゃ何とかなったのだが、面倒は少ない方が良い。

ちなみに調べた中で他に良さそうだったプレイヤーソフトはMPDクライアントだとgmpcで、MPDを使わないプレイヤーだとStrawberryとQuod Libetが良さそうな感じだった。 普通DSDの再生とか気にしなくて良いと思うので、完全に新しく音楽ライブラリを作るならこの辺りでも良さそう。

iPhoneで聴くための環境作り

iTunesに依存しないためには、Appleクラウドサービスを使わずにiPhoneで音楽を聴く方法が必要だ。

これも色々と探してみたが、現状と同等の環境を得るために使えそうなのはYoutube Music (Google Play Musicは年内終了らしい)とSubsonicの二つが有力だったので試してみた。

Youtube Musicは、無料で10万曲も楽曲を送れるし、flacもそのまま送れてむしろiTunesより良いじゃんという感じではあるのだが、メインがYoutube側なので、自分のアップロードしたライブラリを視聴するのに1タップ要るのがめんどい。 後、アップロード機能が出来たのがごく最近であり、全くツールが無くてめちゃめちゃ不便だ。ブラウザで20000曲もデータ送るの大変過ぎるだろ。 一応、フォルダをD&Dしたら再帰的に読んでくれるのだが、おもむろに2000曲ぐらいまとめてアップロードしてみたら4時間ぐらいかかったし、Chromeのメモリ消費量が16GBぐらいになった。ハングしなかったのは幸運と言える。 アップロードがこなれてきたら、割と使えるとは思う。

Subsonicは自分のPCにサーバを立てて音楽ライブラリに外部からアクセス可能にするソフト。 ライブラリ構築が早いし、プレイヤー側のアプリ次第ではFLACがそのまま再生できるのが良い。

実際のところ自分でサーバー立てるのに何も苦労が無いならSubsonicの方が大分楽だと思う。 問題点は自宅サーバーが落ちたらアウトであるという点と、バックアップ代わりにはならない点か。

現状、Subsonic側が有力な選択肢になっている。

しかし、Youtube MusicもSubsoniciTunesで埋め込んだ歌詞は見れねえ……。

まあ、実際のところ、そもそも外出とか全然しない世の中になった訳で、外で移動中に音楽聴くとか出来なくても大して困らんっちゃ困らん気もする。今となっては程々で良い

録音環境

インターネット越しにカラオケ会をやるという目標のためには、音楽をPC上で再生しオーディオI/Fで音声を取り込んでリバーブを軽くかけた上でミックスするという工程をリアルタイムで実行したい。

しかし、実際に歌ってみて違和感が無い様にするには結構な低レイテンシが必要であり、Linuxで普通使われるpulseaudioでは大分厳しいものがあった。

pulseaudioでマイク入力取ってエフェクトをかけるとどうやってもローカルで40msecぐらいのレイテンシがある。 更にpulseaudioで音をミックスしてスピーカーに出すところでも数十msecぐらいはレイテンシがある。 自分でヘッドホンで音楽を聴きながらモニタバックした時に数十msecもズレてると歌えたものではない。

なので、低レイテンシなオーディオ環境構築に特化して作られたJACK Audio Connection Kitを使うことにした。

JACKの利用法

pulseaudioが動いてサウンドバイスを掴んでると使えないので、JACK Audio Connection Kitを使う時はpulseaudio -kでpulseaudioを一旦殺す必要がある。

pulseaudioを殺したら、cadenceというJACKサーバーを管理してくれるフロントエンドがあるので、そいつを起動する。

https://kx.studio/screenshots/cadence1.png

alsaのユーティリティツールであるaplayやarecordを使ってサウンドバイスのIDを調べてdeviceに指定してJACKを起動したら、使える様になる。 JACKに対応していないアプリケーション (Chrome)とかで音を出すためにはpulseaudioとJACKをブリッジしてくれるプラグインを使う。

PulseAudio/サンプル - ArchWiki が参考になる。

サンプリングレート, buffer size, periodsで大体レイテンシが決まるのだが、自分の環境だとブロックレイテンシを3msecまで下げることが出来た。 いくつかエフェクトを追加しても余裕で10msecぐらいで収まる。

リアルタイムでミックスする時

patch bayというジャンルのソフトを使う。cadenceにはcatiaというpatch bayが付属しているのでこれを使う。

https://kx.studio/screenshots/catia.png

こんな感じで、入力やフィルターや出力をグラフィカルに配線して繋ぐことができる。

これでマイク入力にエフェクトをかけてスピーカーに繋いだりできる。

エフェクトをかけるには色々と手段があるのだがJack-Rackというのが手頃だったので、これを使って軽くリバーブをかけてmixできる様になった。

インターネットカラオケに向けて

JACKに対応したオープンソースのリモートでジャム・セッションをやるためのアプリケーションとして Jamulusというのがある。WindowsMacにも対応している。

github.com

先に紹介したCatiaで色々繋げれば、ニンテンドーSwitchのJOY SOUNDアプリから音を流しつつ自分の声を重ねてJamulusに繋げば、インターネット越しに一緒にカラオケできるんじゃねえのかという可能性を考えている。

ただ問題なのは、まともなスペックのPCと低レイテンシな音声入出力環境 (WindowsだとASIO必須)、そして40msec以下の遅延で通信できる環境が必要になる点だ。

俺の自宅は全て揃っているが、そんなの揃ってる人がカラオケ友達にほぼ居ないので実験が出来ない。DTMやってて、かつ良いインターネット回線持ってる人とか結構少ない……。

という訳で、現状はただの妄想である。

まあ、俺がカラオケで歌っているところを一方的に配信することは出来る様になったw


というわけで、Linuxで結構真面目にオーディオ環境を整えてみた。 あんまりこういう事やる人は居ないと思うが、思い立ってラズパイを音楽サーバーにしたいとか考える人には参考になるかもしれない。

ちなみにお金が劇的に減ったので、金を稼ぐモチベーションは戻ってきたw

ISUCON9への出場と敗北

ISUCON9に出場してきた。そして敗北してきた。

fujiwara, tagomoris, 自分という大人気ない面子で出場して負けてしまったので、非常に悔しい。 大した活躍が出来なくて申し訳なさで一杯である。 最終スコアは5000ちょいぐらいでしたが、3台分のリソースを全然回せなかったので、アプリサーバのCPUまでネックを移せれば予選突破レベルだったかなあ……。

分かっていた明らかな問題点

  • DB負荷が高い
  • DBのロックが長い
  • APIのシーケンシャル呼び出しにより待ち時間
  • loginの負荷
  • 静的ファイルの配信

やれたこと

  • N+1自体の解消
  • マルチスレッド化してAPI呼び出しの待ち時間を最小化
  • 静的ファイルをnginxで配信
  • categoryをオンメモリに乗せる
  • 複数台構成化

失敗

N+1解消手法の選択ミス

N+1を解消するのにJOINを使ってしまったことが一番問題だった。

フロントからのリクエストを変えられないので、ページネーションをするためにcreated_atのorder byの範囲を絞らないと何をやってもDBネックが消えなかった。

JOINのパターンが複数パターンあったので、ORDER BY狙いのindexが全然効かせられなくて、時間使い切ってしまった。

もう少し早めに方針転換してクエリをバラす方向に触れれば良かったかもしれない。

単純にitemsだけORDER BYが効く様にした上で、IN句を使って他のエンティティを取得しアプリ側で結合するのが妥当だったと今なら思う。

ソースコード確認するまでの時間

最初にAlibaba Cloudのインスタンス起動に失敗し起動がロックされてしまった。

結果、インスタンスを起動してソースコードを得るまでに1時間ぐらい時間がかかってしまった。 また、ソースコードの分量が多くて読んで状況を把握し、実際に作業に入った時点で昼前ぐらいだったと思う。

完全な負け惜しみだが、そこで無駄にした時間が無ければもう1手打てたかもしれない……。

エラーハンドリングの適正化

エラーハンドリングがかなり厳しいコードで、地道にコード直してAPI呼び出しやスタックトレースがまともにロギング出来る様に最初に改修しておくべきだった。

エラー発生時の情報取得が上手くできてなくて、かなりの時間を無駄にした。 結果、出来ることがかなり減ってしまった。

上手くいったこと

モリスさんがマルチスレッド化をサクっと実装してくれたため、そこで最低限のスコア上昇は得られた。 そこはやれば上がるのは分かり易かったし、我々はこういうコードを書くのは最近得意であるw

というわけで、ここは数少ない上手くいった箇所だったと思う。

反省と負け推しみ

ある程度やる事自体は見えていたのだが、完全に手数が足りなかったり、DBネックを潰し切れなかったのが致命傷になってしまった。 bcryptを別アルゴリズムにしていいというのは完全に思い付いていなかった。そこに負荷がかかっているのは分かっていたが、ルールに対する思考の柔軟性が無かった。

総合的に柔軟性を失っているなあということを感じた。後、面倒臭いことに我武者羅に突っ込むのを躊躇してしまったかもしれない……。 おっさんになるとはこういうことなのかと思った次第である。

そして、言い訳と負け惜しみを書いておくと、私自身クライアントから直接アクセスが来る様なWebアプリケーションを最近全く書いてないので、完全に勘とか手の早さを失っていると思った。

インデックスという概念が無い全スキャン前提のDWHとかパーティションキーでデータを取得するCassandraとかに対して1000万件オーダーでデータを取得する様なクエリばっかり書いてるし、データパイプラインのスケーラビリティとかストリーム処理に関するコードばかり考えていて、Webのページネーションとかをちゃんと実装する、みたいなことから遠ざかって久しい。

昔取った杵柄みたいなものでいつまでも戦える程簡単なものではない様だ。

これはISUCONという大会が良く出来ているという話でもあるので、悪いことではないだろう。 しかし、やはり負けるのは辛い。特にチームメンバーが強力だっただけに辛さが3倍である。

次回はどうだろうなあ。仕事がまたコンシューマアクセスが来るWeb領域に戻るまでは諦めるかもしれないw

Kafka Streamを使ったストリーム処理の概要と運用時の考慮点

最近、仕事で分散ストリーム処理に手を出していて、その基盤としてApache KafkaとKafka Streamsを使うことにしたので、動作概要とストリーム処理のイメージについてまとめておく。

kafkaそのものについては今更説明の必要は無いだろうと思う。

Kafka Streamsはkafkaを基盤にして分散ストリーム処理を簡単に書くためのDSLライブラリ。

https://kafka.apache.org/documentation/streams/

延々流れてくるデータを変換して別のtopicに流したり、時間のウインドウを区切ってカウントした結果を流したり、みたいなのがサクっと書ける。 Apache Flinkなんかと似た様なことができる。

Kafka Streamsが良いのは以下の点。

  • ただのConsumer/Producerのラッパーなのでfat-jarファイル一つで簡単に動かせる
  • 可用性やデータの分散をkafkaのconsumer groupとtopicの仕組みに載せているので別途ミドルウェアを必要としない

なので、kafkaが動作している環境ならJavaCLIツールを書く様な感じで分散ストリーム処理が実装できる。

簡単な例だとこんな感じになる。

public class EventCountStream {
    static Topology buildStream(Properties props) {
                StreamsBuilder builder = new StreamsBuilder();
                builder.stream("input-event-topic", Consumed.with(Serdes.String(), Serdes.Integer());
                builder.groupByKey().count();
                builder.to("counted");
                return builder.build(props);
    }

    public static void main(String[] args) {
        String bootstrapServers = envs.getOrDefault("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092");

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "repro-event-count-stream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000");
        props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "500");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "2500000");

        Topology topology = buildStream(props);

        System.out.println(topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("graceful stop");
            streams.close(Duration.ofSeconds(60));
            System.out.println("stream stopped");
            latch.countDown();
        }, "shutdown"));

        try {
            streams.start();
            latch.await();
        }
        catch (Throwable e) {
            e.printStackTrace();
            System.exit(1);
        }

        System.exit(0);
    }
}

Kafka Streamsのコンセプトモデルと実行モデル

ここでは、今、現時点で自分が理解しているKafka Streamsの動作について解説する。

大まかな概念としては、https://kafka.apache.org/23/documentation/streams/core-conceptsに書かれているが、StreamBuilderクラスに生えているDSLメソッドを使ってノードを作成し、それをグラフ上に繋げていくことでストリーム処理のパイプラインを構築することになる。

例えば、あるトピックからデータを取得するSourceノード、値を変換するProcessorノード、グルーピングするProcessorノード、aggregationを行うProcessorノード、別のトピックに書き出すSinkノード等を繋げて処理を構築することになる。

実際の実行モデルについてはhttps://kafka.apache.org/23/documentation/streams/architectureに記述がある。

Kafkaはトピックのパーティション毎にどのConsumerが処理を担当するかを固定で割り当てて、処理対象が重複するのを防ぐ機能があり、それをConsumer Groupと呼ぶ。 slideshareで見つけた資料から引用させてもらうとこんな感じ。

Kafka Streamsの基本的な実行モデルでは、Consumer Groupの各パーティション割り当て毎にタスクという実行単位が作られる。タスクは一連のストリーム処理を実行する。 (実際には全てのパーティションからデータを取得するソースノードや、処理ノードのグラフ構造が複数のトポロジに分割されるケースもあり、1:1で対応している訳ではない) また、Kafka Streamsは一ノードにつき任意のスレッド数で実行できて、そのスレッドをStream Threadと呼ぶ。 各タスクはStream Threadの中で起動し、一つのスレッドの中に複数のタスクを持つことができる。

f:id:joker1007:20190828183036p:plain

つまり、ソーストピックのパーティションの数だけ実行タスクが生成されるが、それぞれのタスクの実行主体はKafka Streamsワーカーのノード数 x スレッド数に分散して割り当てられる。 その割り当てをKafkaのConsumer Groupを使って管理している。 注意点として、一つのストリームスレッドで複数のタスク(つまり複数のパーティション)を処理できるが、スレッドが共有だと一つのパーティションのデータを処理している間は他のパーティションのデータ処理は待たされることになる。

コードレベルでの実行イメージ

Kafka Streamsはラムダで一連の処理が繋がっているかの様にコードを書くことができるが、実際に一つ一つのラムダ式として記述された内容は常に同一のスレッドや同一のノードで実行される訳ではない。 例えば、レコードのキーを変更する操作を行うとrepartitionが発生し、一旦kafkaの中間トピックを介して別タスクに処理が移ってから続きの処理が実行されることになる。その別タスクは完全に別ノードで実行されるかもしれないし、別のスレッドで実行されるかもしれない。 なので、各ラムダに書いたものは完全に独立した処理としてコードを書かなければならない。スコープ外から持ち込めるのはstaticで各ノードで共通であることが分かっているものぐらいしか使ってはいけない。

また、ストリーム処理を実装する上では、あるパーティションのロカリティを考慮した実装することが重要になる。 kafkaはデフォルトのパーティショナーにmurmur2 hashを利用しており、同一のキー, 同一のパーティション数であれば同じパーティション番号にデータが配置される。 そして、同じトポロジ内の同一パーティション番号は同じStream Threadに処理が割り当てられる。 それによって、あるキーでグルーピングした処理は同一のStream Threadで処理されることが分かるので、集計処理を行うことが可能になっている。 こういう仕組みになっているため、Stream DSLより低レイヤのAPIを利用してマニュアルで状態管理をしつつ集計する時には、その処理がどういうキーでパーティションされていてどういう単位でState Store(ワーカー内部に持っている状態管理用のKey Value Store)を持っているかを考えながら実装する必要がある。

KStreamとKTable

Stream DSLで構築したパイプラインは大きく分けてKStreamとKTableという二種類のモデルで表現される。 公式リファレンスによると、KStreamはレコードストリームの抽象、KTableは変更ログストリームの抽象と説明されている。もうちょっと詳しく説明するとそれぞれ以下の様な感じで動く。

  • KStreamは一つ一つのレコードがデータとして完結しており、全てのレコード一つ一つをそのままストリームデータとして扱う。
  • KTableは一つのレコードは変更履歴情報であり、ストリーム処理で流れるデータは変更履歴を集約した最新の状態を表すレコードになる。 テーブル上に存在する更新可能なレコードの様に扱うことができるのでKTableというのだと思う。

Kafkaはストリームバッファなので、KTableの様なデータを扱うためにはストリームの開始時点から現在までのデータを全て読み出して再生しないと最新の状態を復元できない。 しかし、毎回そんなことをやってられないので、Kafka Streamsでは起動時にレストア処理が走って、ソーストピックからデータを取得して各ワーカーのローカルにあるState Storeという領域に最新の情報を保存する形になっている。 State StoreはデフォルトでPersistent State StoreとIn Memory State Storeが存在する。Persistent State StoreはRocksDBという埋め込み型のデータベースをKVSとして利用してデータを保存している。

Kafka Streamsではグルーピングして集計するという様な状態を保持したストリーム処理はKTableとして表現される。そういった処理には状態復元のデータソースになる中間トピックと各ワーカーにState Storeが必要になる。 当然両方ともストレージを消費するので、KTableとして扱うデータが多ければ多い程各ワーカーのローカルストレージ領域が必要になる。

KTableデータの保持期間

ストリーム処理でデータを集計する際には、タイムウインドウを区切って集計するのが一般的。 もし、タイムウインドウを1時間で集計しているとして、そんなデータを何週間も保持しなくて良いことの方が多いだろう。 そういう処理で無駄なデータを保持し続けない様にするために、集計処理毎にデータ保持期間を設定できる様になっている。

運用時の考慮点

Kafka Streamsは状態を保持する必要がある機能はKafka本体にデータを保持する仕組みになっているので、基本的にワーカーを増やすだけで処理をスケールアウトさせることができる。 しかし、前述した様なState Storeのレストア処理やConsumer Groupへの参加申請に伴うrebalance処理にそれなりに時間を要する。

そのため、頻繁にワーカーが出入りする様なオートスケールを使ったコスト節約方法とはかなり相性が悪い。 ちなみにrebalanceにかかる時間に対する改善策は開発コミュニティでも議論されていて、今後は高速化されるかもしれない。

KIP-345: Introduce static membership protocol to reduce consumer rebalances - Apache Kafka - Apache Software Foundation

そういう訳なので、レストアスループットを向上させるためのチューニングや頻繁にノード数を増減させなくていい様なスケーリングが必要になる。 また、一定以上接続が無いとConsumer Groupから離脱してしまうため、いくつか安定稼動のために重要なconfigがあったりする。

  • max.poll.interval.ms この設定値を越えてpollingが行われないとConsumer Groupから離脱する。通信状況や負荷などによって処理が詰まった場合、復活したり離脱したりを繰り返して延々処理が進まなくなる可能性があるので、ある程度余裕を持った値に設定しておいた方が良さそう。
  • session.timeout.ms この設定値より長い期間heartbeatがbrokerに届かなかった場合、Consumer Groupから離脱する。これも上記と同様の理由である程度余裕を持たせた方が良いだろう。
  • max.poll.records 一回のpollingで取得するレコード数。一回pollingしてからそのレコードバッチに対して処理が行われるため、スループット効率を狙い過ぎると、max.poll.interval.msを越えてしまってConsumer Groupから変に離脱してしまう可能性がある。
  • max.partition.fetch.bytes あるパーティションから一度に取得できるデータ量。レストア処理のスループットに影響する。
  • receive.buffer.bytes TCPのRCVBUFの値を設定する。デフォルトが65535と現在のネットワーク構成からすると少ない値になっているので、設定しなおした方が良い。

また、当たり前の話だが、そもそもデータ量を減らすのは大事なので、バイナリシリアライズフォーマットを採用したりしてトラフィックを削減するのも重要になる。

終わりに

まだ本格的に運用しているという段階ではないので、他にも考慮点やチューニングポイントが出てくるだろうと思う。 特にConsumer Groupのrebalanceとレストア処理のリードタイムについては多少不安が残っている。

そうは言っても、必要な機能は十分に揃っている印象で、追加のミドルウェア無しで実用的な分散ストリーム処理が書けるのは非常にありがたい。 もうしばらく調査と実験を続行して本格運用に乗せていく予定。