Kafka Streamsのlow level APIであるProcessor APIの活用例 (範囲JOINの実現)

この記事はKafka Advent Calendarの10日目の記事になります。

Kafka StreamsにはProcessor APIというlow levelなAPIがあります。

https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html

https://docs.confluent.io/ja-jp/platform/6.0.1/streams/developer-guide/processor-api.html (confluentには日本語のドキュメントもあるのでサクっと読みたい人はこちら)

私がしばらく開発してきたKafka Streamsアプリケーションは開発の最初こそStream DSLを使って開発してましたが、今やほとんどProcessor APIしか使ってません。 実際のところ、これを使えば何でも出来るんですが、何でも出来るとなると逆に取っ掛かりが捕めないと思うのでユースケースを紹介していきたいと思います。

一番重要な点はStreams DSLの裏に隠れているStateStoreを直接使えるという点です。これにより通常のStream DSLでは実現できないがユースケースとして十分に必要性がありそうな事が実現可能になります。

1. 2つ以上の関連レコードを取得するJOIN

Stream DSLによって提供されるJOINはキーによる完全一致のみです。INNER JOIN, LEFT JOIN, OUTER JOINとありますが、キーによって一致する最新のレコードを取ってくることしかできません。

一方で、Processor APIを利用すると部分的にキーが一致するレコードを複数取得するJOINや、キー以外の値でJOINする事が可能になります。

これには、実際のところStateStoreが実際のところどういう物なのかということを理解しておく必要があります。(注: ここで解説する挙動は組込みで実装されているStateStoreに関する挙動であり、自分でカスタムなStateStoreを実装している場合はこの限りではありません)

StateStoreはKafka Streamのタスクごとに作成され、キーのバイト列でソートされたキーバリューストアとして表現されます。これはOn MemoryでもPersistent(backendはRocksDB)でも同じです。

そして、実はStateStoreは本当にただのキーバリューストアなので、保存されているキーとKafkaのパーティションとは全く関係がありません。

どういうことかというと、Kafkaがuser_idでパーティションされていて、それによってKafkaのパーティションIDが決まっていたとしても、StateStoreに書き込むキーはそれと関係なく、例えばgroup_idとかuserの行動ログのevent_id等を設定できるということです。

例えばuserがある行動を取る度に、userが複数登録しているWeb Hookを叩きたいとします。

この場合user_idをパーティションキーとして利用して、以下の様なイベントログを受けるevent_logsトピックがあると考えてみましょう。

{user_id: 1, event_id: 123, payload: {"key1": "value1", "key2": "value2"}}

そして、event_idが123に一致する時にあるURLにevent_logsのpayloadを渡してアクセスして欲しい、というWebHookを受け取るweb_hooksというtopicを考えてみます。データ構造は以下の通り。

{webhook_id: 1, user_id: 1, event_id: 123, url: "http://foo.example.com/api", http_method: "POST"}
{webhook_id: 1, user_id: 1, event_id: 123, url: "http://bar.example.com/api", http_method: "POST"}
{webhook_id: 1, user_id: 1, event_id: 123, url: "http://hoge.example.com/api", http_method: "POST"}

この例で分かる様に一人のユーザーは複数のWebHook設定を持っている可能性があります。一つのイベントが来た時にキーが完全一致する一件だけを取得するのでは必要な情報が足りません。

という訳でキーとパーティショニングを工夫します。Kafkaはキーとバリューでレコードを管理しますが、実際のところキーとパーティション割り当ては直接関係がありません。Producerのクライアントライブラリのデフォルトの挙動がそうなっているだけで、送信時にレコードのキーと異なる任意のキーでパーティションIDを割り当てることができます。

この例では、user_idでパーティショニングした上でレコードのキーを#{sprintf( '%020d', user_id)}:${webhook_id}の様な形にしてbrokerに送信します。

こうしておいた上でKafka StreamsアプリでKTableとして読み込みます。

StreamsBuilder builder = new StreamsBuilder();

KTable<String, WebHook> webHooks =
    builder.table(
        "web_hooks",
        Consumed.with(Serdes.String(), new WebHookSerde())
            .withName("Source-WebHook"),
        Materialized.<String, WebHook, KeyValueStore<Bytes, byte[]>>as(
                "web_hooks_store"));

// SerdeはJSONをオブジェクトとして読み込んでくれるものが定義されているものとします

この様にKTableとして読み込む様にすると、user_idでパーティショニングされたIDと関連するタスク上でStateStore上には#{sprintf(user_id, '%020d')}:${webhook_id}というキーでデータが書き込まれて保持されます。

そして、イベントログを受け取る側でProcessorAPIを利用して、KTableとして書き込んだStateStoreを参照します。

KStream<String, EventLog> eventLogs =
    builder.stream(
        "event_logs",
        Consumed.with(Serdes.String(), new EventLogSerde())
            .withName("Source-EventLog"));

eventLogs.process(() -> new EventLogProcessor(), Named.as("EventLogProcessor"), "web_hooks_store");

この様に引数で指定することでProcessorは名前が一致する定義済みのStateStoreを参照することができます。

Processorはこの様な実装になります。

public class EventLogProcessor implements Processor<String, EventLog, String, WebHookRequest> {
  private ProcessorContext<String, EventLog> context;
  private TimestampedKeyValueStore<String, EventLog> eventLogStore;

  @Override
  public void init(ProcessorContext<String, WebHookRequest> context) {
    context = context;
    eventLogStore = context.getStateStore("web_hooks_store");
  }

  @Override
  public void process(Record<String, EventLog> record) {
    var userId = record.value().getUserId();
    var keyPrefixStart = String.format("$020d", userId)
    var keyPrefixEnd = String.format("$020d", userId + 1);
    try (var it = eventLogStore.range(keyPrefixStart, keyPrefixEnd)) {
      while (it.hasNext()) {
        var keyValue = it.next();
        var valueAndTimestamp = keyValue.value;
        var value = valueAndTimestamp.value();
        if (record.value.getEventId() == value.getEventId()) {
          var webhookRequest = new WebHookRequest(userId, value.getWebHookId(), record.value().getPayload());
          context.forward(record.withValue(webhookRequest));
        }
      }
    }
  }
}

注目して欲しいのはcontext.getStateStore("web_hooks_store")var it = eventLogStore.range(keyPrefixStart, keyPrefixEnd)です。

initの中でprocessメソッドに引き渡したStateStoreを取得していますが、Processorの中で取得できるのは引数で渡された名前のStateStoreだけです。この様にしてinit内で取得したものをプライベート変数の中に保持しておきます。またレコードのフォワードに利用するのでProcessorContextも一緒に保持しておきます。

実際にレコードが到達した時に処理されるのがprocessメソッドです。

レコードが到達したらレコード情報からuser_idを取得しそれを元にキーのprefixを作ります。そしてそのキーのprefixを利用してStateStoreのrangeメソッドを使って範囲取得を実行します。StateStoreはキーのバイト列でソートされているので、キーの構成が適切なら特定のuser_idに一致する全てのレコードがStateStoreから取得できます。

後はイテレーターを回して、一つづつレコードを結合してforwardメソッドに渡すだけで、後続で任意の処理が実行できます。この例だと次のProcessorでWebHookの実際のリクエスト処理を実装することになるでしょう。

この様にすることで、KTableを利用して範囲JOINが実現可能です。

注意点と補足

Processor APIを使ってcontext.forwardしてしまうと、現時点では移行の処理はlow levelのProcessor APIでしか処理できません。後続の処理をDSLでやる様に戻すことは出来ないということです。

Processor APIを使って後続の処理を実装する場合は以下の様になります。

topology.addProcessor(
    "NextProcessor",
    () -> new NextProcessor(),
    "EventLogProcessor",
    "other_state_store_name");

こんな感じで一度StreamBuilderからbuildした後のTopologyに対してaddProcessorを読んで、親のノードの名前を指定することで任意のProcessorを後続に繋げることが出来ます。 この後続のProcessorはいくつも追加できる様になっていて、親になるProcessorcontext.forwardすると全ての子となるプロセッサで処理が実行されます。もし、特定のProcessorだけに処理を渡したい場合はProcessorに渡した名前(この例ではNextProcessor)を利用して特定の子Processorにだけ処理をルーティングする様にcontext.forwardの引数で調整することもできます。

また、補足ですが、context.fowardした後の後続の処理は全て同期的に連なった単なるメソッド呼び出しになっており、あるProcessor内で複数回context.fowardを呼ぶとその都度末端まで処理が同期的に実行されてから次のレコードに処理が移ります。

そして、あるレコードに起因する全ての処理が末端まで呼び出された後に、次のレコードが処理され、そしてバッファに存在している全てのレコードの処理が終わると、次のランループに入ってレコードをtopicからpollingする処理が実行されブローカーからレコードを取得します。

実際の所、Kafka StreamsはStream Threadごとに並列で動作しますが、そのStream Threadの中身は単純なwhile loopによる同期処理の連なりで実現されている、ということです。

まとめ

今回はKafka Streamsのlow level APIで実現する範囲JOINについて解説しました。 他にもProcessor APIを使った割と利用頻度の高そうなユースケースがいくつかあるのですが、今日は力尽きたのでここまでにさせていただきます。 また時間があれば、別のユースケースも紹介するかもしれません。その時はまた読んでやってください。