本番環境でcassandraの運用を開始してみた

職場でcassandraの運用を開始したので、選定理由とか運用してみて得た知見や所感等を書いてみようと思う。 (まだ十分に知見を得たとは良い難いので、間違った印象を得ていたり、より良いオペレーションに気付いていない可能性があります。)

cassandraの採用理由

そもそもの目的はホットデータの書き込み先としての利用です。要件としては以下の様なものがあります。

  • エンドユーザー端末の数に比例するので、書き込みのスケーラビリティが必要
  • 書き込みデータは更新される
  • 一件単位、もしくは数秒単位の短かいサイクルでバッファチャンクを書き込む必要がある
  • prestoで読み込める (最悪connectorは書いてもいいが、できれば避けたい)
  • 一度に数百万件ぐらいまで読み込む可能性があるので、それなりに読み込みもスケールして欲しいし、レンジスキャンにもある程度効率が求められる。

元々、データ加工で潰しが効く様にkafkaの利用も検討してたんですが、データが頻繁に更新される可能性があるので一意のキーでデータが特定できてupsertが可能であり、書き込みレイテンシを小さく保つことができそうということでcassandraに倒しました。

その他、Apache Kuduとかも調査はしましたが、ちょっと新し過ぎるのとprestoとの接続性に不安があったので、見送りました。

cassandraの構築方法と現在のクラスタ規模

ディスクとネットワークにかなりのI/Oがかかるので、コンテナは使わずにpackerでAMIを生成し、EC2で普通に立ててます。

cassandraにはseedという概念があり、クラスタの情報を提供するための初期ノードが必要になります。 なので、seedは固定でローカルIPを降って3台立ててあり、他のノードはAutoscalingで起動できる様にしてあります。 クラスタ全体では10台で運用しています。テーブルは現時点で3種類あり、一番大きいテーブルで50GB程のデータが入ってます。 今後利用範囲を広げていく予定です。

現時点での利用スタイル

利用開始当初は、アプリケーション本体のコードを弄らずに実験するために、fluentdを経由してデータを投入していました。 元々fluentdにデータを流していたので、copyして軽く加工するだけで出力先をコントロールできる様になっています。 で、データを投入するためにfluentdのプラグインを書きました。(いくつか既存のものがあったんですが古かった) ; github.com

cassandraは挿入は一件単位で行うことになるので、fluentdの様にbufferチャンクを溜めて処理するのは余り相性が良くないとは思いますが。 というのもfluentdはバッファチャンク単位でリトライすることになるので、チャンクの中の一行がエラーになった場合にどうリトライするかを考えるのが面倒くさい。 とりあえず無視するか全体リトライするかの二択を選べる様にはしましたが……。

現在は利用スタイルが固まったので、rubyのcassandra-driverを使ってアプリケーションから直接データ投入する様に変更中です。

メトリック収集

会社ではdatadogを利用しており、datadogにcassandraのインテグレーションがあるのでそれを利用しています。 実態はdatatdog-agentがJMXを介してmbeanから値を取得する形になります。

とりあえず以下の値を収集してます。

  • CPU利用率
  • 読み書きのレイテンシ
  • ディスク利用量 (デバイス全体と、テーブル毎のディスク利用量)
  • 書き込み件数
  • sstableの数
  • keycache hit ratio
  • drop message rate
  • pending task count
  • JVM系のメトリック (heap, nonheap, gc等)

加えて、プロセス監視もdatadogでやってます。

cassandraの特性についての理解

アーキテクチャの細かい解説は、datastaxのドキュメントを読むのが分かり易いと思いますが、cassandraは大まかには以下の様に動きます。

書き込み時

  • レコードのパーティションキーからcoodinatorを決定する
  • coordinatorに書き込みをしにいく
  • まずmemtableというオンメモリのテーブルにデータを書き込む
  • 一定時間、一定データ量毎にflushされてsstableと呼ばれるファイルに書き出される
  • sstable内のデータは不変
  • データが肥大化しない様に定期的にコンパクションが実行されてsstableがマージされる
  • 更新なり削除なりが発生した時はtombstoneマーカーが付与されて、コンパクションタイミングで削除される
  • 整合性レベルに応じた応答が返ってきたら、クライアントにレスポンスを返す

読み込み時

  • パーティションキーを元にcoordinatorにデータを要求する
  • memtableを確認し、あれば取得する
  • 行キャッシュを確認し、あれば取得する
  • ブルームフィルタとkey cacheを使ってsstableの読み出し先を特定する
  • それでも見つからない場合は、sstableに含まれるパーティションインデックスからデータの場所を特定し読み込む
  • 整合性レベルに応じた結果が集まったら、レスポンスを返す

書き込みは基本的にメモリに書かれるし追記しか起きないのでスケールさせやすい。現在のうちの利用規模だと一件辺りの書き込みレイテンシは100μsぐらいで完了してます。(ネットワークレイテンシは別)

しかし、読み込みの時はパーティションを特定してクエリするのが必須になるため、クエリのワークロードに合わせてパーティションキーを設計する必要がある。

実際、パーティションキーを指定しない形でクエリすることは基本できない様になっている。 この点で、読み込みに関してはデータ構造がリッチなKVSに近い。

オリジナルのデータが同じでも参照方法に合わせて、テーブルを作る方が良いとされている様です。 その分書き込みの数が増えるが、書き込みはスケールさせやすいので読み込み形式に合わせた構造を優先した方が良いということだと思います。

prestoからの読み込み

パーティションキーを指定しない場合は全パーティションからデータを読むことになる。 パーティションキーを指定すれば必要なデータだけ取ってくれる。うまくパーティションが分散できれば、大量に読み込んでもそれなりのスピードがでる。クエリパターンに合わせたテーブル設計が大事。 パーティションキー設計や必要なカラムの絞り込みが上手くできないと10倍から100倍ぐらい負荷が変わる。

prestoノード30台、cassandraノード10台で300万件のuser_idを取得してもクエリ自体は3秒ぐらいで終わるし、負荷もかなり軽い。 ただ、1行辺りのデータ量が増えるとsstableの読み出しに結構な負荷がかかるので、読み込みが詰まる傾向があった。

また、キーキャッシュやchunk cache (sstableのキャッシュ)にちゃんとデータを載せておいて、実際のディスクI/Oが発生しない様にしておかないと読み込みが安定しない。

セカンダリインデックスについて

ノード毎にインデックスが作成される、パーティションキーを指定しなくてもクエリ可能になるが、大量のパーティションをスキャンすると異様に遅くなるので、少数のパーティションの中でデータを更に絞り込むのには役に立つが、パーティションキーの代わりに使える様な便利なものではない。

余りデータのカーディナリティが低くても高くても駄目で、程々に共通して存在するデータが有効らしい。また更新が頻発するカラムだとindex更新に負荷がかかるので、利用を避けるべきです。 別途パーティションキーの異なるテーブルを作って、用途に合わせてデータを投入していく方が良さそうです。

cassandraの運用

ノード追加

設定ファイルにseed情報とSnitchの設定を書いて起動すると自動でクラスタに所属します。 起動時にbootstrapと呼ばれる処理が走り、自動でデータの再分散が行われます。

同時に複数台追加しようとすると、bootstrapが競合するので、追加は1台づつでないと事故ったり止まったりします。 急激にクラスタを増強できないので、多少の余裕を見てハンドリングする必要がある。

ノードの削除

正常起動しているノードをクラスターから削除する場合は、nodetool decomissionコマンドを削除したいノードで実行する。 データの再分散が行われて完了したらクラスタから情報が削除されます。

ノード故障

ノードが故障した場合は色々考慮する自体が増えます。

書き込みに関してはhinted handoffという機能により、故障ノードへの書き込み要求をcoordinatorが一時的に受け取ることができます。 ノードが復帰したら、hintファイルに蓄積された書き込みデータが同期される。hintが溜まり過ぎるとノードの復帰に時間がかかる。

ノードが復帰不能の場合は、nodetool removenodeコマンドにnode idを渡してノード情報をクラスタから削除する。この時点でhintファイルが溜まっていれば削除されるはずです。 ノードを同数に戻す場合は代わりに新しく1台追加します。

起動時にJVMオプションを渡して、あるIPのノードを置き換えるという指定をすることで、そのノードの役目を引き継ぐことができる様ですが、オペレーションが複雑化するので諦めて、故障した奴をクラスタから削除し、新規にノードを追加しなおして再分散した方が楽に思える。 ただし、これはクラスタが持ってるデータ規模等に左右されると思います。

seedノード故障

seedノードが故障するとめちゃくちゃ面倒なことになる。

seedノードは起動時に特別扱いされており、bootstrapが行われない。そのため、一回通常ノードとして構成してbootsrapを完了してからseedノードとして起動しなおすことになる。 同一IPで新しくノードを構成しなおそうとすると、非常にややこしいので諦めて新しくノードを追加しbootstrapが終わった時点で、seedにしたいノードのIPを各ノードの設定ファイルに追加し、順番にcassandraを再起動するというのが一番マシな手順ではないかと思います。

同じIPで構成しなおしたい場合は、一回seedsリストから除外してcassandraクラスタ全体を再起動した後、ノードを追加してbootstrapが終わった後に、再度seedsリストにIPを追加して、またクラスタ全体を再起動する必要があるようです。

クラスタを構成するためにseedノードにかなり依存しているので、ここが本当にややこしい。同時にseedノードが全部死んだ時にどうやって復帰すればいいのかは良く分かっていない。生き残っているノードを新しいseedとして指定した上で、クラスタ全体を再起動し、新しくノードを復活させて設定ファイルをこまごま調整すれば元に戻ると思いますが。

今は10数台ぐらいの規模なのでどうってことは無いのですが、これが100台規模になってくるとseedzeノード故障は死ぬ程面倒なのでは、と思うので大規模クラスタ運用している人の知見が欲しい所です。

全体的に見て、ノード自体が無事でプロセスが短時間死んだ程度ならどうとでもなりそうですが、ノードごと死ぬとなるとかなりオペレーションが面倒だったり再分散の負荷がかかるので、作業タイミングに注意した方が良さそうです。

全体感

10台ぐらいで構築するなら、それ程苦労はしなかった。(ある程度インフラの自動構成やイメージ作成に知見があるからですが) モニタリングもdatadogと組み合わせれば割と簡単にdashboardが構築できました。監視さえしっかりしていれば、落ち着いて運用できそうです。

問題はやはりハードウェア故障でノードごと突然死する場合で、そういう場合にデータロストしない様にかつ過剰な負荷がかからない様にオペレーションするのはかなり面倒な気配がビンビンします。

書き込みは本当に早いし、スケールするので安心感がある。読み込みはとにかくクエリパターンを事前にちゃんと考えて、それに合わせたパーティション設計をすることが必須です。まずクエリありきでテーブル構造考えないと、読み込みにめっちゃ負荷がかかることになります。パーティションキーとクエリパターンを中心にしたデータ設計は慣れるまで苦労しそうです。

マテビューが入ったことで、クエリパターンに合わせた柔軟なテーブル構成が実現できそうな気配がありますが、実験してみた所書き込みプロセスが不安定になってOOMで落ちたりしたので、もうちょい知見を積むなりリソース調整をマスターする必要がある。後、現時点ではprestoから読み込む対象として利用できないのが、うちの環境だと辛い点でした。

とりあえず、ホットデータを利用しやすくしつつスケーラビリティを確保する、という当初の目的は達成できそうなので、しばらく運用を継続して様子を見ていこうと思います。