Kafka入門 第1回 「そもそもKafkaとはなにか」

これは社内向けに書いた、Kafkaってそもそも何やねん、ということをメンバーに解説するための記事を一部編集して公開できる様にしたものです。 第2回以降では、Kafkaを利用したアプリケーション開発のノウハウについて解説していく予定です。そちらも社内の事情を除いた形で公開していくつもりです。

そもそもKafkaとは

Kafkaはイベントストリーミングプラットフォームと呼ばれるミドルウェアです。 元々はストリームバッファと呼ばれてたと思います。

公式のドキュメントには以下の様に書かれています。

 Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution:

    To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
    To store streams of events durably and reliably for as long as you want.
    To process streams of events as they occur or retrospectively.

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner.

つまり、イベントストリームを書きこんだり読み込んだり出来て、継続的に他のシステムからimportしたりexportしたりできて、それを一定期間(または無期限に)保持し続けることが出来て、発生した時に処理するだけでなく遡って過去のデータを処理することもできるプラットフォームであるということです。 加えて、分散処理可能で、スケーラブルであることも特徴です。

プラットフォームの構成要素は以下の通りです。

  • Kafka Broker ミドルウェアの本体
  • Kafka Client ミドルウェアと通信するためのクライアントライブラリ
  • 上記クライアントによって実装された管理コマンド群
  • Kafka Streams and Kafka Connect アプリケーション開発のためのフレームワーク

イベントストリームとは

終端が存在しない継続的なデータの流れのことです。

システム的にはデータベースやモバイルデバイス等によって発生したデータの動きをリアルタイムにキャプチャし続けたデータの流れと言えます。 一つ一つのデータの発生や変化をイベントと呼び、それらが終端なく延々連なっているものをイベントストリームと呼びます。 このイベントストリームに対して高速に処理を行うことができれば、我々のシステムは、ユーザーに対して即座に最新の結果を届けることが出来る様になります。

イベントストリームには終端が存在しない上にその目的上リアルタイムに近いインターバルで処理することが求められるため、基本的には一件単位で処理を行うことになります。 集計などの複数レコードに渡る処理については、開発者自身で必要なデータの範囲を定義して情報を蓄積しておくストレージを別途用意する必要があります。

Kafkaのエコシステムにはそういったストリーム処理で現実のユースケースを実現するために必要なライブラリも用意されています。

Kafkaでは複数のイベントストリームを扱うことができ、それをトピックと呼びます。トピックは名前を付けて管理できる様にしたイベントストリームということです。

トピックとパーティション

各トピックは1つ以上のパーティションによって分割されています。 また、トピックにはreplication factorが設定されており、各パーティションの複製がどれだけ存在するかを設定できます。replication factorが3であれば、あるパーティションのデータの複製がクラスタ内部に3つ存在するということです。

あるパーティションのあるレプリカは一つのノードによって保持されます。

例えば、TopicAのパーティション数が8でreplication factorが3であった場合、クラスタに所属している各ノードに合計24のパーティションのデータが分散して配置されます。

kafkacat (kcat) コマンドによる出力は以下の様になる。ノード数は5台。

  topic "TopicA" with 8 partitions:
    partition 0, leader 5, replicas: 5,3,1, isrs: 1,5,3
    partition 1, leader 3, replicas: 3,4,2, isrs: 4,2,3
    partition 2, leader 1, replicas: 1,4,2, isrs: 1,4,2
    partition 3, leader 4, replicas: 4,2,5, isrs: 4,2,5
    partition 4, leader 2, replicas: 2,5,3, isrs: 2,5,3
    partition 5, leader 5, replicas: 5,2,3, isrs: 2,5,3
    partition 6, leader 3, replicas: 3,5,1, isrs: 1,5,3
    partition 7, leader 1, replicas: 1,5,3, isrs: 1,5,3

トピックに保持するデータの一行一行をレコードと呼び、レコードはキーとバリューのペアで構成されます。 KafkaのBrokerにとってはキーとバリューはただのバイト列であって、それがどういう型のデータかはクライアントによって解釈されます。 例えば、JSONシリアライズされたデータである場合や、AvroやProtocolBufferによってバイナリシリアライズされたデータである場合、ただの文字列である場合、バイト列をそのまま数値として利用する場合、などシリアライズ方式に依存します。

何故Kafkaが必要なのか

Kafkaは非常に応用範囲が広いため一言で説明するのが難しいが、以下の様なユースケースがあります。

  • システム疎結合化やスケーラビリティ向上のためのメッセージブローカー
  • Webシステムの行動トラッキング
  • システム負荷などをモニタリングするためのメトリックの記録
  • ログ集約
  • イベントソーシング

弊社では、システム疎結合化のためのメッセージ基盤かつ、ユーザーの行動データのリアルタイム集計及び、その集計結果をサービスにリアルタイムに反映するための基盤として利用しています。

Kafkaはキューではない

Kafkaはキューの様なこともできるし、分散キューと呼ばれることもあるが正確にはキューではありません。

Kafkaはイベントストリームを一定期間に渡って保持し続けており、Consumerがそれを処理したとしてもキューと違って保持しているデータは変化しません。 キューの場合は、Consumerがデータを処理した場合そのデータはキューから削除され、他のワーカーから利用できなくなります。そして、基本的には先頭のデータが消えない限り次のレコードは処理されません。 例えばAWSのSQSはその様に動作します。(visibility timeoutなどである程度順番は制御できる) AWSではKinesisがかなりKafkaに近いサービスとして存在します。(現在はManaged Kafkaも提供されている)

キューでないことによる利点

トピックの共有

データがずっと保持されるということは、ある一つのトピックを複数のシステムで共有できるということを意味します。 つまりデータの発生源はKafkaにデータを送信しておくだけで、複数のマイクロサービスを動かすことができます。その先にどんなサービスがあるかを知る必要はありません。 各サービス側は自分がどのトピックを利用するかだけ分かっていれば良く、そこからデータを取り出して自分の処理をするだけで良くなるため、データの発生源について知っておく必要はなくなります。 仮にデータを利用したいサービスが増えたとしても、データの発生元に送信先を追加する必要はありません。 Kafkaの本体となるミドルウェアをメッセージブローカーと呼びますが、この様にメッセージを仲介してサービス間を疎結合に保つ役割を果たすことができます。

耐久性

Consumerがデータを処理してもデータが変化しないということは、Consumerが処理中に何らかのエラーが起こって処理ノードごとダウンしたという場合であっても、データが失われないことを意味します。 そのため、Consumerでエラーが起きたとしても再キューをする必要はなく、ただ別のノードで同じオフセットから処理を継続すればデータを失うことなく処理を復帰できます。 sidekiqなどの様にRedisを処理キューとして利用した場合には、処理中のワーカーがエラーハンドリングを適切にやらないとデータを失う危険性があります。もしOOMなどによってプロセスが突然死した場合には復帰は不可能です。 Kafkaにおいてはそういった心配はほぼ必要なくなります。

一方で、exactly onceモードを利用しない限りはリトライによって二重処理される可能性はあるため、羃等性に関しては別途注意が必要です。

過去のデータを利用できる

Kafkaは一定期間(デフォルトで7日間)のデータを保持し続けていることに加えて、キューと違って任意のオフセットからデータを読み出すことが出来ます。 そのため、過去のデータを利用して処理を開始することが出来ます。

例えば、新しくマイクロサービスを追加した時に、ある程度過去のデータから処理を開始することができますし、一定期間停止しているサービスがあったとしても、再開した時に停止したタイミングまでのデータが残っていれば、安全に処理を再開できます。

キューでないことによる複雑さ

オフセットの記録

キューと違ってデータが消えないということは、Consumer側で自分がどこまでレコードを処理したのかを覚えておく必要があることを意味します。 そのため、KafkaのConsumerクライアントはオフセットを記録しておく場所が必要になります。Kafkaのプロトコルでは、オフセットの記録場所としてもKafka Brokerを利用します。 Kafka Brokerは無期限にデータを保持することが出来るため、各クライアントのオフセット情報を記録するためのストレージとしても利用できます。 クライアントがどこまで処理をしたかのオフセット情報の更新もまたイベントの一つであり、それが延々更新され続けているイベントストリームでもあります。それをKafka Broker自身に格納しておいて、クライアントは起動時にそのオフセット情報をトピックから読み取ります。Brokerに残っている中で最新の値が今までに処理したレコードのオフセットになります。

ちなみにAWSKinesisでは、この様なオフセット情報の記録をDynamoDBを利用して行います。

Consumer Group

キューと異なることによって生じる複雑さの影響はオフセットの記録だけではありません。Consumerの処理によってブローカーのデータが変化しないので、もし複数のワーカーが同じトピックを処理しようとしたら、オフセットの記録だけではワーカーの数だけ重複して同じ処理が実行されてしまいます。

そのため、Kafka ConsumerにはConsumer Groupという概念が存在します。Consumer Groupはどういう目的でKafkaを利用しているかを指す集合であり、Consumerはそれに所属してBrokerからデータを取得します。

同一のConsumer Groupに所属している場合、Kafkaのあるトピックのあるパーティションはそのグループ内で一つのConsumerでしか処理できません。(一つのConsumerが複数のパーティションやトピックを処理することは出来る)

図で示すと以下の様になります。 consumer-groups.png (26.8 kB)

そのため、あるトピックのパーティション総数以上に、そのトピックを処理するConsumerを増やすことはできません。 パーティション数が30であれば、そのトピックを処理できるConsumerの数は30が上限になります。この数はインスタンスの総数で、同じノードに30あっても30ノードに1つづつあっても一緒です。

Kafka Brokerはパーティショニングをしない

Kafkaの大きな特徴の一つとして、パーティショニングを行う主体がBrokerではないという点が挙げられます。 Kafkaにおいてパーティショニングを行うのはProducerです。

データの発生源がKafka Brokerにレコードを送信する時に、クライアントライブラリの実装によってパーティションが決定されます。 そのため、クライアントライブラリが違っていればパーティショニングのロジックが異なる可能性があります。 例えば、JavaのProducerではデフォルトのパーティショニングロジックはキーに対してmurmur2によるハッシュ化を行いその結果をパーティション総数で割って剰余を利用します。 一方でRubyruby-kafkaという実装ではcrc32を利用してハッシュ化を行い同様に剰余を取って利用します。 また、Rubyには他にlibrdkafkaというCのライブラリのバインディングとして実装されたgemがありそちらはcrc32, murmur2, fnv1aなどが選択可能になっています。 この様に同じデータを送信したとしてもクライアントの実装によってパーティションIDが異なる場合があります。

また、厳密にはレコードのキーとパーティショニングは直接関係している訳ではありません。 あくまでデフォルトの挙動としてレコードのキーを利用するだけで、クライアントは任意のデータを任意のロジックでパーティショニングした結果を直接レコードに指定してBrokerに書き込むことができます。

例えばruby-kafkaを利用していたとしても、自力でmurmur2のハッシュ化を行って同一のハッシュ値を算出できれば、それを元に算出したパーティションを直接レコードに指定することで、Javaのライブラリと同じパーティションにレコードを送信できる様になります。

パーティショニングのロジックを揃えることは実際のシステムにおいてはかなり重要です。

例えば、あるユーザー(ID:100)の1日あたりのアクセス数を集計したい場合、ID:100のユーザーのレコードは一台のノードで処理できることが望ましい。 Consumer Groupによってあるパーティションのデータはある1台のノードでしか処理されないことが保証されるので、一貫してID:100のユーザーのレコードが特定のパーティションに送信されるのであれば、各ノードは単に受け取ったデータをローカルに集計結果として蓄積しておけばよくなります。 この時、もし新しいデータの発生源が追加され、クライアントライブラリの実装が異なったせいでパーティショニングの結果が揃わなくなったとしたら、集計結果の蓄積場所がバラバラになってしまい、正しく集計できなくなります。

この様に、どういうキーとどういうアルゴリズムによってそのトピックのデータがパーティショニングされているかを把握することはとても重要です。

基本的には公式のクライアントの実装であるmurmur2 hashingを利用したパーティショニングに揃えておくのが一番安全だと思います。

パーティション数の見積りの重要性

Kafka Brokerを実際に運用する際に非常に難しい要素となるのが、パーティション数を適切に選択することです。

先に述べた様に、同一のConsumer Groupがあるトピックのパーティションからデータ取得できるのは1つのクライアントだけです。つまりスケールできるクライアント数の上限がパーティション数によって決定されてしまいます。

それなら、パーティション数を後から増やせば良いのでは?と考えるかもしれませんが、実際のところそれはそう簡単にはいかないケースも多々あります。 前段で、パーティショニングロジックが異なって違うクライアントに処理が移ってしまうと集計処理が正しく実行できなくなる可能性があると説明しましたが、パーティション数を後から変えた場合も同様の事が発生する可能性があります。

パーティショニングは基本的にキーとなる値をハッシュ化してトピックのパーティション総数で割った剰余を利用します。 パーティションの総数が変化すると同一のデータが割り当てられるパーティションが変わってしまいます。そうなると最終的に処理しているノードが変化してしまい、集計処理が壊れるという事態が発生します。

そのため、パーティション数を後から変更することは、場合によっては非常に手間のかかる難しいオペレーションになります。 実際にこのトピックをどれぐらいの台数までスケールしたいのかをよく考えてパーティション数を見積るのが重要になります。 基本的な見積りの指針となるのは、ネットワークスループットと要件として求められている処理スループット、そして処理にかかるレイテンシです。

補足ですが、処理ノードが変わったとしても集計処理に関係が無い様にアプリケーションを構築すれば良いのではないかと思うかもしれませんが、それはストリームプロセッシングにおいては余りやりたくない設計になります。 何故ならストリームプロセッシングにおける集計では、バッチ処理よりも遥かにデータ取得や書き込みの回数が多くなります。そのためネットワークを介して別ノードにデータを保持する仕組みは無視できないレイテンシの増加を招くことがあり、かなり強い必要性が無い限りは採用したくない設計と言えます。 この点に関する詳細は、以降の解説記事で紹介していく予定です。

まとめ

Kafka はリアルタイムに大量のイベントデータを処理するためのプラットフォームであり、その中核を為すのがKafka Brokerというミドルウェアである。

イベントデータはユーザーの行動ログだったり、システム間のメッセージのやり取りであったり、データベースの変更履歴であったり、様々なものが含まれる。

イベントデータは複数のパーティションに分割されたトピックに格納される。格納されるデータはキーとバリューで構成されたレコードであり、中身は単なるバイト列である。

Kafka Brokerはキューとは異なり、Consumerの処理によってデータが削除されないため、複数のシステムで同じトピックを共有したり、過去に遡ってデータを処理したり、リトライすることが容易である。

一方で、自分がどの範囲のデータを処理する責務を負っているかはクライアント自身で認識しておく必要があり、そのためにオフセット記録やConsumer Groupなどのキューには無い概念が必要になる。

パーティショニングの責任はクライアントが持つため、パーティション結果に依存する集計処理などを行う際には、クライアントライブラリの実装を把握しておくことが重要になる。

(まあ、大体の事は 公式ドキュメントに書いてあるので、これを全部読めば分かるのだが。)