この記事はKafka Advent Calendarの10日目の記事になります。
Kafka StreamsにはProcessor APIというlow levelなAPIがあります。
https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html
https://docs.confluent.io/ja-jp/platform/6.0.1/streams/developer-guide/processor-api.html (confluentには日本語のドキュメントもあるのでサクっと読みたい人はこちら)
私がしばらく開発してきたKafka Streamsアプリケーションは開発の最初こそStream DSLを使って開発してましたが、今やほとんどProcessor APIしか使ってません。 実際のところ、これを使えば何でも出来るんですが、何でも出来るとなると逆に取っ掛かりが捕めないと思うのでユースケースを紹介していきたいと思います。
一番重要な点はStreams DSLの裏に隠れているStateStoreを直接使えるという点です。これにより通常のStream DSLでは実現できないがユースケースとして十分に必要性がありそうな事が実現可能になります。
1. 2つ以上の関連レコードを取得するJOIN
Stream DSLによって提供されるJOINはキーによる完全一致のみです。INNER JOIN, LEFT JOIN, OUTER JOINとありますが、キーによって一致する最新のレコードを取ってくることしかできません。
一方で、Processor APIを利用すると部分的にキーが一致するレコードを複数取得するJOINや、キー以外の値でJOINする事が可能になります。
これには、実際のところStateStoreが実際のところどういう物なのかということを理解しておく必要があります。(注: ここで解説する挙動は組込みで実装されているStateStoreに関する挙動であり、自分でカスタムなStateStoreを実装している場合はこの限りではありません)
StateStoreはKafka Streamのタスクごとに作成され、キーのバイト列でソートされたキーバリューストアとして表現されます。これはOn MemoryでもPersistent(backendはRocksDB)でも同じです。
そして、実はStateStoreは本当にただのキーバリューストアなので、保存されているキーとKafkaのパーティションとは全く関係がありません。
どういうことかというと、Kafkaがuser_idでパーティションされていて、それによってKafkaのパーティションIDが決まっていたとしても、StateStoreに書き込むキーはそれと関係なく、例えばgroup_idとかuserの行動ログのevent_id等を設定できるということです。
例えばuserがある行動を取る度に、userが複数登録しているWeb Hookを叩きたいとします。
この場合user_idをパーティションキーとして利用して、以下の様なイベントログを受けるevent_logsトピックがあると考えてみましょう。
{user_id: 1, event_id: 123, payload: {"key1": "value1", "key2": "value2"}}
そして、event_idが123に一致する時にあるURLにevent_logsのpayloadを渡してアクセスして欲しい、というWebHookを受け取るweb_hooksというtopicを考えてみます。データ構造は以下の通り。
{webhook_id: 1, user_id: 1, event_id: 123, url: "http://foo.example.com/api", http_method: "POST"} {webhook_id: 1, user_id: 1, event_id: 123, url: "http://bar.example.com/api", http_method: "POST"} {webhook_id: 1, user_id: 1, event_id: 123, url: "http://hoge.example.com/api", http_method: "POST"}
この例で分かる様に一人のユーザーは複数のWebHook設定を持っている可能性があります。一つのイベントが来た時にキーが完全一致する一件だけを取得するのでは必要な情報が足りません。
という訳でキーとパーティショニングを工夫します。Kafkaはキーとバリューでレコードを管理しますが、実際のところキーとパーティション割り当ては直接関係がありません。Producerのクライアントライブラリのデフォルトの挙動がそうなっているだけで、送信時にレコードのキーと異なる任意のキーでパーティションIDを割り当てることができます。
この例では、user_idでパーティショニングした上でレコードのキーを#{sprintf( '%020d', user_id)}:${webhook_id}
の様な形にしてbrokerに送信します。
こうしておいた上でKafka StreamsアプリでKTableとして読み込みます。
StreamsBuilder builder = new StreamsBuilder(); KTable<String, WebHook> webHooks = builder.table( "web_hooks", Consumed.with(Serdes.String(), new WebHookSerde()) .withName("Source-WebHook"), Materialized.<String, WebHook, KeyValueStore<Bytes, byte[]>>as( "web_hooks_store")); // SerdeはJSONをオブジェクトとして読み込んでくれるものが定義されているものとします
この様にKTableとして読み込む様にすると、user_idでパーティショニングされたIDと関連するタスク上でStateStore上には#{sprintf(user_id, '%020d')}:${webhook_id}
というキーでデータが書き込まれて保持されます。
そして、イベントログを受け取る側でProcessorAPIを利用して、KTableとして書き込んだStateStoreを参照します。
KStream<String, EventLog> eventLogs = builder.stream( "event_logs", Consumed.with(Serdes.String(), new EventLogSerde()) .withName("Source-EventLog")); eventLogs.process(() -> new EventLogProcessor(), Named.as("EventLogProcessor"), "web_hooks_store");
この様に引数で指定することでProcessorは名前が一致する定義済みのStateStoreを参照することができます。
Processorはこの様な実装になります。
public class EventLogProcessor implements Processor<String, EventLog, String, WebHookRequest> { private ProcessorContext<String, EventLog> context; private TimestampedKeyValueStore<String, EventLog> eventLogStore; @Override public void init(ProcessorContext<String, WebHookRequest> context) { context = context; eventLogStore = context.getStateStore("web_hooks_store"); } @Override public void process(Record<String, EventLog> record) { var userId = record.value().getUserId(); var keyPrefixStart = String.format("$020d", userId) var keyPrefixEnd = String.format("$020d", userId + 1); try (var it = eventLogStore.range(keyPrefixStart, keyPrefixEnd)) { while (it.hasNext()) { var keyValue = it.next(); var valueAndTimestamp = keyValue.value; var value = valueAndTimestamp.value(); if (record.value.getEventId() == value.getEventId()) { var webhookRequest = new WebHookRequest(userId, value.getWebHookId(), record.value().getPayload()); context.forward(record.withValue(webhookRequest)); } } } } }
注目して欲しいのはcontext.getStateStore("web_hooks_store")
とvar it = eventLogStore.range(keyPrefixStart, keyPrefixEnd)
です。
init
の中でprocess
メソッドに引き渡したStateStoreを取得していますが、Processorの中で取得できるのは引数で渡された名前のStateStoreだけです。この様にしてinit
内で取得したものをプライベート変数の中に保持しておきます。またレコードのフォワードに利用するのでProcessorContext
も一緒に保持しておきます。
実際にレコードが到達した時に処理されるのがprocess
メソッドです。
レコードが到達したらレコード情報からuser_id
を取得しそれを元にキーのprefixを作ります。そしてそのキーのprefixを利用してStateStoreのrange
メソッドを使って範囲取得を実行します。StateStoreはキーのバイト列でソートされているので、キーの構成が適切なら特定のuser_id
に一致する全てのレコードがStateStoreから取得できます。
後はイテレーターを回して、一つづつレコードを結合してforward
メソッドに渡すだけで、後続で任意の処理が実行できます。この例だと次のProcessorでWebHookの実際のリクエスト処理を実装することになるでしょう。
この様にすることで、KTableを利用して範囲JOINが実現可能です。
注意点と補足
Processor APIを使ってcontext.forward
してしまうと、現時点では移行の処理はlow levelのProcessor APIでしか処理できません。後続の処理をDSLでやる様に戻すことは出来ないということです。
Processor APIを使って後続の処理を実装する場合は以下の様になります。
topology.addProcessor( "NextProcessor", () -> new NextProcessor(), "EventLogProcessor", "other_state_store_name");
こんな感じで一度StreamBuilder
からbuildした後のTopology
に対してaddProcessor
を読んで、親のノードの名前を指定することで任意のProcessorを後続に繋げることが出来ます。
この後続のProcessorはいくつも追加できる様になっていて、親になるProcessorでcontext.forward
すると全ての子となるプロセッサで処理が実行されます。もし、特定のProcessorだけに処理を渡したい場合はProcessorに渡した名前(この例ではNextProcessor
)を利用して特定の子Processorにだけ処理をルーティングする様にcontext.forward
の引数で調整することもできます。
また、補足ですが、context.foward
した後の後続の処理は全て同期的に連なった単なるメソッド呼び出しになっており、あるProcessor内で複数回context.foward
を呼ぶとその都度末端まで処理が同期的に実行されてから次のレコードに処理が移ります。
そして、あるレコードに起因する全ての処理が末端まで呼び出された後に、次のレコードが処理され、そしてバッファに存在している全てのレコードの処理が終わると、次のランループに入ってレコードをtopicからpollingする処理が実行されブローカーからレコードを取得します。
実際の所、Kafka StreamsはStream Threadごとに並列で動作しますが、そのStream Threadの中身は単純なwhile loopによる同期処理の連なりで実現されている、ということです。
まとめ
今回はKafka Streamsのlow level APIで実現する範囲JOINについて解説しました。 他にもProcessor APIを使った割と利用頻度の高そうなユースケースがいくつかあるのですが、今日は力尽きたのでここまでにさせていただきます。 また時間があれば、別のユースケースも紹介するかもしれません。その時はまた読んでやってください。