Kafka StreamsのStateStoreのちょっと変わった使い方 (ローカルキャッシュ、 コンフィグストア)

この記事は Kafka Advent Calendarの14日目の記事です。(1日遅れてるけど)

今回は割とライトな記事です。

ローカルキャッシュとしてのStateStore

Kafka StreamsにおいてStateStoreは、トピックから入力されて何らかの状態を保持した処理をしたい場合にその状態を永続化するために利用するのが基本的な形です。

一方でProcessor APIを活用しだすと、別にトピックから入力されてKafka Streamsで処理したものだけが保持対象じゃないことに気付きます。

StateStoreは基本的にはchangelogと紐付いているので、大規模な集計処理等を行ってデータ量が膨れ上がると、もしノードのストレージが無くなった時やrebalanceが走った時に、レストアに非常に長い時間が必要になります。これは現時点でちょっとやそっとでは解消しようがありません。

それを避けるために、通信という大きなオーバーヘッドを抱えても外部のデータソース(RedisやCassandraなど)からデータを取得したくなることがあります。

RedisやCassandraはかなり読み込みが高速なデータストアですが、Kafka Streamsの様なストリームプロセッサで頻繁に通信を行うと流石にローカルのStateStoreと比べて劇的に性能が落ちます。

そこで、withLoggingDisabled()をセットしたWindowStoreをキャッシュに使います。

changelogと紐付いていなければレストア処理が発生しないので、Processor APIの中でWindowStoreにデータが見つからなかった時だけデータを取りにいけば、処理内容によってはかなりの割合で外部との通信処理を削減することができます。

ここでWindowStoreを利用するのは現在KeyValueStoreでは自動的にデータをexpireする方法が無いからです。WindowStoreならretention timeを調整することで一定期間過ぎたStateStoreをローカルストレージから自動で削除できます。(RocksDB自体にはTTLを設定する方法があるがKafka Streamsからはそれを変更できない実装になっている)

実際MemoryStoreには、lruMapというストアが用意されており、オンメモリに乗る範囲であればこれを利用するとローカルなキャッシュがシンプルに実現できます。ただこれはメモリに乗る範囲でしか使えないことと、プロセスを再起動したら消滅してしまうことが難点です。

KeyValueStoreでTTLが使える様になると嬉しいんですが、今のところissueがあるのみ、といった感じですね。(https://issues.apache.org/jira/browse/KAFKA-4212)

コンフィグストアとしてのGlobalKTable

GlobalKTable(GlobalStateStore)は、全処理ノードに分配されるキーバリューストアとして利用できます。つまりデータフォーマットさえ決めておけばKafkaのトピックを経由して、任意の設定値を起動中の全ノードに配布したり削除したり出来るということです。

例えば、Kafka Streamsに限らずストリームプロセッサは処理が複雑になってくると結構デバッグするのが大変です。そういった時にユーザーの処理の流れを事細かに追いたいケースがあります。

デバッグモードを作り込むことは出来ますが、全ユーザーに対してそれを動かすと負荷がとんでもないことになって処理が詰まってしまったり、ログが膨大になったりする可能性が高いでしょう。

なので、対象のユーザーを絞ったりサンプリングして取得する様にしておきたくなります。

こういったケースでGlobalKTableに対象のユーザーの識別子やサンプリングの割合などの設定値を保持可能にしておきます。開発者はCLIのKafka Producerを使ってGlobalKTableになっているtopicにJSONを送り込めば、全ノードに対して起動したまま対象ユーザーの設定を変更することができます。必要無くなればnull値を送って設定を消せば元通りに出来ます。

こういった仕組みを管理機構として組み込んでおけば、いざという時に情報収集がしやすくなります。

この様にGlobalKTableはStreamからのJoin対象としてだけ利用するのではなく、外部から全ノードに設定値を即座に転送しストリームプロセッサ内で利用する仕組みにも応用できる、という話でした。