Kafka入門 第2回 「Kafka Streamsを使ったストリーム処理アプリケーション開発」

以前にAWS Summitで似た様な話をしているので、こちらのブログとかも参考にしてください。

https://dev.classmethod.jp/articles/aws-summit-online-2020-cus-47/

今回の内容も、公式のドキュメントを全部読むなら大体書いてあることなので、既にある程度知っているという方は、後半の「StateStoreの実態」以降の部分だけを読んでもらえれば特に注意すべきことが書いてあります。

参考 https://docs.confluent.io/ja-jp/platform/6.2.0/streams/index.html https://docs.confluent.io/platform/7.0.1/streams/index.html

ストリームプロセッシングとは

イベントストリームに対して、データが到着するたびに処理を行い、それが延々と続いていく様なアプリケーションを指します。 Kafkaのエコシステムにおいては、Kafka BrokerにConsumerとして接続して、無限ループでpollingを行い、データが到達する度に短いインターバルで処理を行うアプリケーションのことです。

複数のレコードを使って集計処理を行うケースもあるが、ある一定期間ごとに集計ウインドウを区切った上で、一件ごとに集計結果をアップデートするか、バッファリングしておいて一定のインターバルをおいて実行する処理で集計を行う、といった形を取る。

ステートレスかステートフルか

ストリームプロセッシングの処理内容は大きくステートレスかステートフルかに分けられます。

ステートレスな処理とは、あるイベントレコードが到着した時に、そのレコードだけで処理が完結する処理のことです。 レコードAをレコードBの形に変換して別のトピックや別のデータストアに転送する処理などがその典型です。

一方、ステートフルな処理とは、到達したイベントレコードやそれを基に生成したデータを一定期間保持しておいて、それと組み合わせて結果を生成する処理のことです。 典型的なものでは、イベントの数を集計して合計や平均やヒストグラムを算出したり、処理効率のためにバッファリングしてまとめて処理したり、別のストリームやデータストアとデータを結合してデータエンリッチメントを行うといったものが挙げられます。

ステートフルな処理においては、一定量のレコードを格納しておくデータストアが別途必要になります。それをStateStoreと呼びます。ストリームプロセッシングにおけるStateStoreには重要な考慮ポイントがいくつか存在しますが、それについては後述します。

ステートフルアプリケーションにおけるデータストア

ストリームプロセッシングは、あるデータが到達したらその都度処理が行われるため、何らかのデータストアを利用すると、データが来る度にデータの取得や保存を行うことになる。

そのため重要な要素として低レイテンシであることが求められます。データ量によってはRedisの様なシンプルで高速なKVSであってもネットワークレイテンシが馬鹿にならないため、十分とは言えない場合があります。

そこでストリームプロセッシングでは、基本的に各処理ノードのローカルにデータストアを保持してレイテンシを低く保つという手法を採用します。

後述するKafka StreamsやApache Flinkなどのストリームプロセッシングフレームワークでは、RocksDBが採用されています。 RocksDBはアプリケーションに組込むタイプのKVSで、RocksDBを各ノードのローカルで動かすことで低レイテンシな状態保持を実現します。

各ノードのローカルで動かすということは、ノードが不慮の事故でダウンし復帰不能になった場合データが失われる危険性があることを意味します。 これを避けるためノードに依存しないデータ永続化の仕組みも別途必要です。この仕組みにおいてはネットワーク通信のオーバヘッドは避けられないため、ある程度のバッファリングが必要になります。

また、Kafkaを利用したストリーミングアプリケーションでは、スケーラビリティを確保するためにアプリケーションを分散して複数ノードで動作させることもよくあります。 そういったケースでは、ノード数の増減に応じて処理するパーティションの割り当てが変わる可能性があります。 第1回で解説した様にKafka Consumerは同一のConsumer Groupでは一つのパーティションを処理できるのは一つのクライアントだけです。台数が増減するとその割り当てが再配置されることになり、処理対象が別ノードに移ることを考慮しなければいけません。 処理が別ノードに移ると、それまでに各ノードのローカルで保持していたデータをどうにかして新しく処理が割り当てられたノードに移し替えないと、今迄に保持していたStateStoreのデータが利用できなくなります。例えばノードが増えた途端に集計済みのカウントがいきなり0に戻ったりすることになる訳です。 これを回避するために処理ノードの変遷に合わせてStateStoreを再配置する仕組みが必要不可欠です。

Kafka Streamsとは

Kafka StreamsはApache Kafkaの開発プロジェクトで公式に提供されているストリームプロセッシングフレームワークです。Javaで実装されています。

前段で解説した様にステートフルなアプリケーションを実装しようと思うと、その状態管理に結構複雑な仕組みが要求されます。 Kafka Streamsを利用するとフレームワークがこういった複雑さの面倒を見てくれます。またConsumerクライアントのpollingループや別のトピックにレコードを転送するためのProducerクライアントの集約なども面倒を見てくれるため、ステートレスなアプリケーションであっても大幅にコードを削減できます。

基本的にはここで説明することは Confluent社の提供しているドキュメントに概ね書かれていることになります。 少し古いですが日本語のドキュメントもあるので、詳細を知りたい方はそちらを参照して隅々まで読み込むのが良いでしょう。

https://docs.confluent.io/ja-jp/platform/6.2.0/streams/concepts.html

主な特徴

Kafkaの開発で主要な役割を担っているConfluent社のドキュメントによると以下の様な特徴があります。

機能 - アプリケーションに高い拡張性、弾力性、分散性をもたらし、フォールトトレラントを実現 - 「厳密に 1 回」の処理セマンティクスをサポート - ステートフル処理とステートレス処理 - ウィンドウ、結合、集約を使用したイベント時処理 - ストリームとデータベースの世界を 1 つにする Kafka Streams の対話型クエリ をサポート - 宣言型で関数型の API と、下位レベルの 命令型の API を選択でき、高い制御性と柔軟性を実現

軽量である - 導入へのハードルが低い - 小規模、中規模、大規模、特大規模のいずれの事例にも対応 - ローカル開発環境から大規模な本稼働環境にスムーズに移行 - 処理クラスターが不要 - Kafka 以外の外部依存関係がない

cf. https://docs.confluent.io/ja-jp/platform/6.2.0/streams/introduction.html

この中でも私が最も重要だと思うポイントが、Kafka以外の外部依存関係がない、という点です。こういった分散処理フレームワークの場合、YARNの様なリソースマネージャーやその他のデータストアが必要になるケースもしばしばありますが、Kafka Streamsにはそれが必要ありません。 これは導入のハードルを大きく下げてくれます。

Tutorialコード

Kafka Streamsでアプリケーションを書くには以下の様なコードを書きます。 (公式ドキュメントからの引用を少し改変したものです)

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.StreamsBuilder;
import org.apache.kafka.streams.processor.Topology;

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.  We will cover this in detail in the subsequent
// sections of this Developer Guide.

StreamsBuilder builder = new StreamsBuilder();
builder
    .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
    .peek((k,v) -> logger.info("Observed event: {}", v))
    .mapValues(s -> s.toUpperCase())
    .peek((k,v) -> logger.info("Transformed event: {}", v))
    .to(outputTopic, Produced.with(stringSerde, stringSerde));
Topology topology = builder.build();
//
// OR
//
topology.addProcessor(...); // when using the Processor API

// Use the configuration properties to tell your application where the Kafka cluster is,
// which Serializers/Deserializers to use by default, to specify security settings,
// and so on.
Properties props = ...;

KafkaStreams streams = new KafkaStreams(topology, props);
streams.start(); // non blocking

この様にKafka StreamsはDSLを繋げたりProcessor APIと呼ばれる特定のインターフェースに則ったJavaクラスを実装するだけで、Kafkaに接続するConumerやProducerの構築、処理スレッドの立ち上げなどの面倒を見てくれます。

Kafka Streamsのアプリケーションモデル

Kafka StreamsはSource Node, Processor Node, Sink Nodeを組み合わせてTopologyという単位を作ることでアプリケーションを構築します。 それぞれどういう意味を持つのか説明していきます。

Topology

Topologyという単語はベースとしては数学用語で位相幾何学を指します。私は数学の徒ではないので詳しくは分かりませんが、物の形状が持つ性質に焦点を当てた学問の様です。 そこから派生してネットワーク・トポロジーというネットワークグラフがどういった構成をしているかの形状パターンを指す用語があります。 Kafka StreamsにおけるTopologyはこのネットワーク・トポロジーと同じものと考えて良いと思います。

Kafka Streamsは特定の役割を持ったノードを接続してグラフ構造を構築しTopologyを形成して動作します。 このTopologyは2つ以上のサブトポロジーによって形成されている場合があります。

streams-architecture-topology.jpg (182.2 kB) (Confluent Documentationから引用)

この様なグラフ構造を1つ以上保持するオブジェクトがKafka StreamsにおけるTopologyです。

2つ以上のサブトポロジーがどうやって生成されるかというと、一つのアプリケーションが複数のソーストピックからデータを取得する様に定義されていて、またそのソーストピックに依存しているProcessor NodeやSink Nodeに直接依存関係が無い場合です。 つまりSource Nodeから辿って一つのグラフが構築される様に依存関係が繋がっている範囲が、一つのサブトポロジーです。 上記の例ではSource Nodeが複数ありますが、下のProcessorが両方のSource Nodeに依存しているためグラフが一つに繋がっているのが分かります。こういった場合はサブトポロジーは1つになります。 もし、左側のSource Nodeがその他の全体と独立してどことも繋がっていないProcessor Nodeにだけ依存している場合は、2つのサブトポロジーが出来ることになります。

Source Node

Kafkaトピックからデータを取得する処理を行うノードをSource Nodeと呼びます。 Kafka Streamsには3種類のデータの取得パターンが存在します詳細はDSLの項で解説します。 上記のサンプルコードのDSLではstreamがそれにあたります。

Processor Node

Source Nodeから受け取ったデータを実際に処理するノードを指します。 実際に開発者が実装するのは、ほぼこのノードになります。 Processor Nodeは複数繋げることが出来ます。処理行程を分割することで見通しの良い実装が可能になります。 上記のサンプルコードのDSLではpeekmapValuesがそれにあたります。

Sink Node

別のKafkaトピックにデータを送信する処理を行うノードを指します。 上記のサンプルコードのDSLではtoがそれにあたります。

この様にSource Nodeが何らかのトピックからデータを取得し、Processor Nodeで何らかの処理を行いデータを加工したり一時的に保存したり別のデータストアに書き出したりして、そして必要であれば、そのデータを更に別のトピックに書き出して他の処理に繋げていく。 Kafka Streamsのアプリケーションはこうやって構築されます。

Task

Kafka Streamsは当然複数のマシンで分散処理が出来る様にデザインされています。 Kafka Streamsではサブトポロジーがpolling対象としているソーストピックのパーティション数に対応したタスクという単位でクライアントに処理を割り当てます。

例えば、トポロジーAの中にサブトポロジーA-1とサブトポロジーA-2があり、サブトポロジーA-1がトピックB-1をサブトポロジーA-2がトピックB-2を参照しているとします。 この時、トピックB-1のパーティション数が8でトピックB-2のパーティション数が10であった場合、1_0 〜 1_7というタスクと、2_0 〜 2_10というタスクが作成されます。タスクの総数は18です。 この18タスクを現在Consumer Groupに所属しているクライアントに割り当てます。割り当てのストラテジーにはいくつかパターンがありますが、そういった動作の詳細は次回以降で解説します。

DSL

上記で解説したTopologyを簡単に構築するために、Kafka Streamsには便利なDSLが用意されています。 DSLを使うことで、Lambdaをメソッドチェーンで繋げる様なインターフェースでストリームアプリケーションを簡単に書くことができます。

これに関しては公式のドキュメントを見る方が明らかに早いので、全体を知りたい方はこちらを参照してください。 日本語版は翻訳が追い付いていないせいか結構古いので最新の機能を知りたい場合は英語版を参照してください。

https://docs.confluent.io/ja-jp/platform/6.2.0/streams/developer-guide/dsl-api.html (ja) https://docs.confluent.io/platform/7.0.1/streams/developer-guide/dsl-api.html (en)

以降ではこの後の解説に必要な特に重要なDSLについてのみ解説していきます。

KStream, KTable, GlobalKTable

Kafka StreamsのDSLではトピックからデータを取得する方法として3つのパターンを提供しています。 これら3つの入力を表すオブジェクトに対してメソッド呼び出しを繋げることでDSLを記述します。

KStream

単純なKafkaトピックからの入力ストリームを指します。

StreamsBuilder builder = new StreamsBuilder();

KStream<String, Long> wordCounts = builder.stream(
    "word-counts-input-topic", /* input topic */
    Consumed.with(
      Serdes.String(), /* key serde */
      Serdes.Long()   /* value serde */
    );

この様に入力を指示する時にSerde(Serializer/Deserializer)を指定します。(実際の入力ではDeserializer側しか利用されないが)

このSerdeを通して、Kafka StreamsはJavaオブジェクトとバイト配列を自動的に変換します。

KTable, GlobalKTable

テーブルの様に扱える入力ストリームを指します。 テーブルの様に扱えるというのは、読み込み始めから現在に至るまでのKafka Brokerに入力された全レコードをローカルに保持していて、他のストリームと結合したりKVSの様に利用できることを意味します。

これは入力レコードの状態を保持しているということでもあり、KTableを使うということはステートフルなアプリケーションである、ということでもあります。

KTableは各パーティションの内容は割り当てられたクライアントにしか保持されませんが、GlobalKTableはあるトピックの全パーティションのデータを全ノードが保持し続けます。そのため入力トピックのデータ量に気を付けて利用する必要があります。

GlobalKTable<String, Long> wordCounts = builder.globalTable(
    "word-counts-input-topic",
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
      "word-counts-global-store" /* table/store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Long()) /* value serde */
    );

KTableにはストア名とストアするデータのSerdeが必要です。 このストア名で各ノードにはRocksDBのデータベースファイルが作成され、そこにデータが蓄積されます。 このデータストアをStateStoreと呼びます。 上記の例にあるMaterialized.asというメソッドを利用してストア名やSerdeを切り替えたり、その他引数の渡し方でインメモリのデータストアを使う様に変更することも可能です。

StateStoreに保持されたデータは(インメモリストアでない限りは)プロセスが再起動したとしても維持されますが、パーティションの割り当てが変わって担当するタスクが切り替わると不要になったり、新しく再作成が必要になる場合があります。 不要になったRocksDBのデータは一定期間の後に自動的に削除されます。

GroupBy, GroupByKey

KStreamやKTableを集計したい時にどのキーを基にして集計するかを指示するDSLです。countやaggregateの前提として呼び出しておく必要があります。

KGroupedStream<String, String> groupedStream = stream.groupBy(
    (key, value) -> value,
    Grouped.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.String())  /* value */
  );

groupByを利用すると必ずデータの再パーティションが行われます。再パーティションは、再パーティション用のトピックが自動的に作成され、そのトピック宛にレコードを送信して、再度取得するという処理を行います。 ネットワークやストレージに冗長な負荷がかかるため、再パーティションは極力避けるべきものです。

パーティション化を避けるためにはgroupByKeyを利用します。

KGroupedStream<String, String> groupedStream = stream.groupByKey(
    Grouped.with(
      Serdes.String, /* key */
      Serdes.String())     /* value */
  );

groupByKeyは現在のキーをそのまま使ってグループ化を指示します。この場合は処理の過程でキーが変更されていない限りは再パーティションは行われません。mapなどの変換処理によってキーが変更されている場合はgroupByKeyを使っても再パーティションが実行されてしまいます。

aggregate

DSLの中では最も汎用的な集計処理を実装できるDSLです。 イニシャルの値を生成するイニシャライザ、新しくレコードが届いた時に実行されるアダー、変更前のレコードの情報を利用して実行されるサブトラクター(省略可能)の3つのラムダを渡して集計処理を実装します。

KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
    () -> 0L, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
    (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
    Materialized.as("aggregated-table-store") /* state store name */
    .withValueSerde(Serdes.Long()) /* serde for aggregate value */
);

集計結果を処理ノードのローカルにずっと維持していなければ、次のレコードが届いた時に今の集計値が何なのかを取得できません。そのため、集計処理は必然的にKTableになります。集計処理した結果はDSLが自動的に構築したStateStoreに格納され保持されます。

StateStoreの実態については色々と解説しなければならない要素があるため、この後でまとめて解説します。とりあえず結果がStateStoreという場所に格納されている、ということを覚えておいてください。

windowedBy

一定の時間ごとに集約範囲を区切ることができるDSLです。この機能を利用することで、5分ごとの合計を算出し続けたり、一回のセッションの中に含まれるイベントの数を数えたりといったことが可能になります。 window処理は時間軸に依存した処理なので、レコードのタイムスタンプと保持期間が重要になります。 場合によっては古いタイムスタンプのレコードが遅れて到着することもあり得ますし、古い集計データをいつまでも保持していたらデータが無限に増えてストレージを圧迫します。 そういった状況に対応するため、window処理には保持期間を別途設定できる様になっています。

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
    .aggregate(
      () -> 0L, /* initializer */
      (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
      Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

ウインドウ処理にはその区切り方によって、Tumbling Window, Sliding Window, Hopping Window, Session Windowと現時点で4つの種類があります。 割とややこしいため、 https://docs.confluent.io/ja-jp/platform/6.2.0/streams/developer-guide/dsl-api.html#windowing を参照することをオススメします。

Processor API

Kafka Streamsのlow level APIにあたるもので、上記のDSLは全てこのProcessor APIで実装されています。

端的に言えば、Processor APIは単なるクラス定義として利用できます。 initメソッドにProcessorContextというオブジェクトが与えられるので、それを利用してKafka Streamsの情報を入手したり、StateStoreへの参照を取得したり、後続の処理にデータを送ったりすることができます。 詳細は、 https://docs.confluent.io/ja-jp/platform/6.2.0/streams/developer-guide/processor-api.html を読んでください。

処理の実態はprocessメソッドに実装する様になっており、レコードが到達する度にこの処理が呼ばれます。 また、一定時間ごとにスケジュール処理を行うこともできます。 ProcessorContextl#scheduleメソッドでPunctuatorと呼ばれる処理を登録することで一定期間ごとに自動的に実行できます。 実行間隔のタイムスタンプをどう扱うかについてKafka StreamsはSTREAM_TIMEとWALL_CLOCK_TIMEの2種類を提供しています。それぞれの違いについては上記リンクに詳細がありますので、そちらを参照してください。

以下は、上記のドキュメント(英語版の最新)から引用したコードサンプルです。 重要な箇所はStateStoreを取得している箇所と、スケジュール処理を登録している箇所、そしてcontext.forwardを呼び出して後続にレコードを渡している部分です。

public class WordCountProcessor implements Processor<String, String, String, String> {
    private KeyValueStore<String, Integer> kvStore;

    @Override
    public void init(final ProcessorContext<String, String> context) {
        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
            try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
                while (iter.hasNext()) {
                    final KeyValue<String, Integer> entry = iter.next();
                    context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
                }
            }
        });
        kvStore = context.getStateStore("Counts");
    }

    @Override
    public void process(final Record<String, String> record) {
        final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");

        for (final String word : words) {
            final Integer oldValue = kvStore.get(word);

            if (oldValue == null) {
                kvStore.put(word, 1);
            } else {
                kvStore.put(word, oldValue + 1);
            }
        }
    }

    @Override
    public void close() {
        // close any resources managed by this processor
        // Note: Do not close any StateStores as these are managed by the library
    }

これをKafka Streamsの中に組込むにはTopologyクラスのコンストラクタ、またはStreamBuilder#buildで生成されたtopologyに対してTopology#addProcessorを呼び出すことで、任意のProcessorクラスを差し込むことができます。 また、この時利用したいStateStoreがあればその名前を渡して関連付けておきます。そうでないとProcessor APIからStateStoreを見つけることが出来ません。その場合は例外が発生します。

Topology topology = new Topology();

// add the source processor node that takes Kafka topic "source-topic" as input
topology.addSource("Source", "source-topic")

    // add the WordCountProcessor node which takes the source processor as its upstream processor
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")

    // add the count store associated with the WordCountProcessor processor
    .addStateStore(countStoreSupplier, "Process")

    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

StateStoreの実態

公式で提供されているStateStoreは大きく分けてRocksDBを利用したpersistentストアとプロセスが終了すると揮発するインメモリストアの二種類ありますが、一般的によく使われるのはpersistentの方だと思います。まず共通の要素について解説していきます。 ちなみに、APIの形式に則って自分で実装してしまえば、独自のStateStoreを定義することも可能です。

StateStoreをlow level APIで利用する場合は自分でStateStoreを構築することができます。

StoreBuilder countStoreBuilder =
  Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("persistent-counts"),
    Serdes.String(),
    Serdes.Long()
  );

これをProcessor APIで利用したい時には以下の様な形で利用します。

KeyValueStore<String, Long> counterStore = context.getStateStore("persistent-counts");
long count = counterStore.fetch("key");
count++;
counterStoreB.put("key", count);

この例では単純なキーの完全一致による取得を行っていますが、Kafka Streamsが提供しているStateStoreはキーのバイト列でソートされているため、rangeメソッドを使って一定の幅のキーを取得することもできます。

StateStoreにおいて重要な要素は以下の通りです。

  • KVSであり、キーの順番でソートされている
  • キーもバリューも実際に格納される値はバイト列であり、アプリケーションとやり取りする際にはSerialize/Deserializeを行う
  • changelogトピックを利用して耐久性を担保する
  • 読み書きに関するcacheの仕組みを持っている

changelogトピック

先程軽く触れましたが、StateStoreを利用すると自動的にchangelogトピックというものがKafka Broker上に作成されます。(インメモリストアの場合はデフォルト無効)

changelogトピックはStateStoreに書き込まれた変更内容を保持しているトピックです。StateStoreにデータが書き込まれたり削除されたり(nullを書き込む)する度に、changelogトピックにレコードが送信されます。

Kafka Streamsではこのchangelogトピックを使って処理ノードのダウンやタスクの再割り当てに対応します。

新しくKafka Streamsのプロセスが立ち上がった時、もしそのアプリケーションがStateStoreを利用する様に定義されていて、かつそのローカルストレージにStateStoreのデータが存在しない場合、Kafka Streamsはレストア処理を行いKafka Broker上に存在するchangelogトピックから全てのレコードを取得し、ローカルのStateStoreを復元します。

同様にノードの増減によってリバランスが実行され、タスクが再割り当てされた時にも、自身のローカルに割り当てられたタスクIDに対応するStateStoreが存在しなければ、まずレストア処理が走ってから処理が継続されます。

Kafka Brokerはproductionで運用する場合普通であれば3つ以上のレプリカを持つ様に設定されており、データ自体の耐障害性はKafka Brokerの機能によって担保する様になっています。

レストア処理の注意点

StateStoreを活用する上で最も注意しておかなければならないことはレストア処理です。

Kafka Streamsはレストアが必要になった時点で処理を停止します。処理が開始・再開されるのはレストア処理が完了した後です。 つまり、StateStoreに溜まっているデータ量が非常に多くなると、レストア処理にかかる時間も長期化していき、いざノードがダウンしたり増減したりする時に、長時間処理が停止する危険が生じます。

これを避けるのは現状かなり難しい問題です。一定以上古いデータや利用頻度の低いデータはネットワークのオーバーヘッドを許容して外部ストアに逃がすなど、データ量を削減する仕組みをコツコツ積み重ねるしかありません。

changelogの無効化

StateStoreを作成する時、StoreBuilderのwithLoggingDisabled()を呼び出しておくと、changelogとの対応関係を無効化することができます。インメモリストアの場合はデフォルトで無効になっています。

changelogトピックが無効になっているとレストア処理が実行されません。RocksDBストアを利用している場合、同じノードで同じIDのタスクが実行されている限りはそのデータが保持され続けます。 一方で、Consumerのリバランスによりタスクの再配置が行われた場合や、処理ノードがダウンして復帰不可能になった場合は、そのデータは消滅します。

主な用途はローカルキャッシュです。メモリに乗り切らない量でもファイルシステムを利用してノードローカルなキャッシュとして利用することが出来ます。

例えば、不変のデータであれば外部のデータストアに蓄積しておいて、最初の一回はネットワークアクセスのオーバーヘッドが発生するが、その後はローカルのファイルシステムにキャッシュしてネットワークを経由したアクセスを回避する、といった用途に利用できます。

StateStoreのキャッシュ

ストリーム処理はその性質上短期間に同じキーのデータを何度も読み書きする可能性が高いものです。例えば、あるuser_idのリンクを踏んだ数などをカウントしている場合、そのイベントレコードが集中して何件も届くというのは十分あり得ることです。 こういった時に、都度ストレージにデータを書き込んでchangelogトピックを更新するのは無駄なオーバーヘッドに繋がる場合があります。

Kafka Streamsではそれを避けるためにデフォルトでキャッシュが有効になっています。書き込んだ内容は一定期間メモリ上に保持され、flushタイミングで実際のStateStoreに書き込みchangelogを更新します。

DSLで構築されるKTableではこのキャッシュ機能を利用してforwardが遅延されます。 例えばaggregateで集約された結果を次のDSLで利用したり別のtopicに送信したりする際には、デフォルトの挙動では一定期間StateStore上にキャッシュされてそこでバッファリングされます。一定時間の経過かキャッシュ用のメモリ領域を使い切ったら、CacheFlushListenerがレコードを次のProcessorにforwardします。

WindowStoreの実態

先程DSLの説明の中でwindowByというDSLについて紹介しました。このDSLを利用することで一定の時間ウインドウごとにレコードを集計することが出来ます。

実はこれはWindowStoreというStateStoreのバリエーションによって実装されています。 WindowStoreを作成する時は以下の様にします。1分毎のウインドウ幅で、2時間分のデータを保持するWindowStoreは以下の様に作成します。

StoreBuilder<WindowStore<String, Long>> counterStoreByMinute =
      Stores.windowStoreBuilder(
              Stores.persistentWindowStore(
                  "counter",
                  Duration.ofHours(2),
                  Duration.ofMinutes(1),
                  false),
              Serdes.String(),
              Serdes.Long());

これをProcessor APIで利用したい時には以下の様な形で利用します。

WindowStore<String, Long> counterStoreByMinute = context.getStateStore("counter");
long durationMillis = Duration.ofMinutes(1).toMillis();
long currentWindowStart = (context.timestamp() / durationMillis) * durationMillis;
long count = counterStoreByMinute.fetch("key", windowStart);
count++;
counterStoreByMinute.put("key", count, currentWindowStart);

キーバリューストアにタイムスタンプを合わせて付与する形で取得や保存を行います。 DSLの裏側にある場合は自動でtimestampからTime Windowを設定してくれますが、WindowStoreを直接扱う場合は、TimeWindowの開始となるタイムスタンプがどこかは手動で算出する必要があります。 WindowStoreに事前に与えられたWindow Durationは、記録されたタイムスタンプを基準にしてどれぐらいの長さのウインドウなのかを示す際に使われます。

上記の例は非常にシンプルな形でしたが、timestampの開始と終了時刻を渡すことで、その範囲に含まれるレコードを順番に取得することもできます。 また、通常のKeyValueStoreと同様にキーのレンジ探索とtimestampの開始と終了時刻のレンジ探索を組み合わせることも出来ます。

内部実装としては、WindowStoreは複数のRocksDBのストアから成り立っていて、記録するタイムスタンプごとに大まかなsegmentに分割されています。保存する時にtimestampから対応するsegmentを特定しその領域にデータを書き込みます。 取得する時は対象範囲のsegmentからのみデータを取得することで探索範囲を小さくしています。 またtimestampの範囲でsegmentが決定できるため、現在処理しているタイムスタンプと比較して保持期限を過ぎたものは、まとめてファイルごと削除することで簡単に破棄できます。

書き込み時にはレコードに与えられたキー(上記の例では"key"文字列)の末尾にミリ秒単位のunix timestampが自動的に付与されて記録されます。

次回に続く

非常に長くなったので、一旦ここらで区切って続きはまた次回とさせていただきます。

次回はDSLでは実現できないがProcessor APIでなら実現可能な応用編と、実際にアプリケーションを開発する時の基本的な考え方について解説する予定です。

Kafka入門 第1回 「そもそもKafkaとはなにか」

これは社内向けに書いた、Kafkaってそもそも何やねん、ということをメンバーに解説するための記事を一部編集して公開できる様にしたものです。 第2回以降では、Kafkaを利用したアプリケーション開発のノウハウについて解説していく予定です。そちらも社内の事情を除いた形で公開していくつもりです。

そもそもKafkaとは

Kafkaはイベントストリーミングプラットフォームと呼ばれるミドルウェアです。 元々はストリームバッファと呼ばれてたと思います。

公式のドキュメントには以下の様に書かれています。

 Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution:

    To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
    To store streams of events durably and reliably for as long as you want.
    To process streams of events as they occur or retrospectively.

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner.

つまり、イベントストリームを書きこんだり読み込んだり出来て、継続的に他のシステムからimportしたりexportしたりできて、それを一定期間(または無期限に)保持し続けることが出来て、発生した時に処理するだけでなく遡って過去のデータを処理することもできるプラットフォームであるということです。 加えて、分散処理可能で、スケーラブルであることも特徴です。

プラットフォームの構成要素は以下の通りです。

  • Kafka Broker ミドルウェアの本体
  • Kafka Client ミドルウェアと通信するためのクライアントライブラリ
  • 上記クライアントによって実装された管理コマンド群
  • Kafka Streams and Kafka Connect アプリケーション開発のためのフレームワーク

イベントストリームとは

終端が存在しない継続的なデータの流れのことです。

システム的にはデータベースやモバイルデバイス等によって発生したデータの動きをリアルタイムにキャプチャし続けたデータの流れと言えます。 一つ一つのデータの発生や変化をイベントと呼び、それらが終端なく延々連なっているものをイベントストリームと呼びます。 このイベントストリームに対して高速に処理を行うことができれば、我々のシステムは、ユーザーに対して即座に最新の結果を届けることが出来る様になります。

イベントストリームには終端が存在しない上にその目的上リアルタイムに近いインターバルで処理することが求められるため、基本的には一件単位で処理を行うことになります。 集計などの複数レコードに渡る処理については、開発者自身で必要なデータの範囲を定義して情報を蓄積しておくストレージを別途用意する必要があります。

Kafkaのエコシステムにはそういったストリーム処理で現実のユースケースを実現するために必要なライブラリも用意されています。

Kafkaでは複数のイベントストリームを扱うことができ、それをトピックと呼びます。トピックは名前を付けて管理できる様にしたイベントストリームということです。

トピックとパーティション

各トピックは1つ以上のパーティションによって分割されています。 また、トピックにはreplication factorが設定されており、各パーティションの複製がどれだけ存在するかを設定できます。replication factorが3であれば、あるパーティションのデータの複製がクラスタ内部に3つ存在するということです。

あるパーティションのあるレプリカは一つのノードによって保持されます。

例えば、TopicAのパーティション数が8でreplication factorが3であった場合、クラスタに所属している各ノードに合計24のパーティションのデータが分散して配置されます。

kafkacat (kcat) コマンドによる出力は以下の様になる。ノード数は5台。

  topic "TopicA" with 8 partitions:
    partition 0, leader 5, replicas: 5,3,1, isrs: 1,5,3
    partition 1, leader 3, replicas: 3,4,2, isrs: 4,2,3
    partition 2, leader 1, replicas: 1,4,2, isrs: 1,4,2
    partition 3, leader 4, replicas: 4,2,5, isrs: 4,2,5
    partition 4, leader 2, replicas: 2,5,3, isrs: 2,5,3
    partition 5, leader 5, replicas: 5,2,3, isrs: 2,5,3
    partition 6, leader 3, replicas: 3,5,1, isrs: 1,5,3
    partition 7, leader 1, replicas: 1,5,3, isrs: 1,5,3

トピックに保持するデータの一行一行をレコードと呼び、レコードはキーとバリューのペアで構成されます。 KafkaのBrokerにとってはキーとバリューはただのバイト列であって、それがどういう型のデータかはクライアントによって解釈されます。 例えば、JSONシリアライズされたデータである場合や、AvroやProtocolBufferによってバイナリシリアライズされたデータである場合、ただの文字列である場合、バイト列をそのまま数値として利用する場合、などシリアライズ方式に依存します。

何故Kafkaが必要なのか

Kafkaは非常に応用範囲が広いため一言で説明するのが難しいが、以下の様なユースケースがあります。

  • システム疎結合化やスケーラビリティ向上のためのメッセージブローカー
  • Webシステムの行動トラッキング
  • システム負荷などをモニタリングするためのメトリックの記録
  • ログ集約
  • イベントソーシング

弊社では、システム疎結合化のためのメッセージ基盤かつ、ユーザーの行動データのリアルタイム集計及び、その集計結果をサービスにリアルタイムに反映するための基盤として利用しています。

Kafkaはキューではない

Kafkaはキューの様なこともできるし、分散キューと呼ばれることもあるが正確にはキューではありません。

Kafkaはイベントストリームを一定期間に渡って保持し続けており、Consumerがそれを処理したとしてもキューと違って保持しているデータは変化しません。 キューの場合は、Consumerがデータを処理した場合そのデータはキューから削除され、他のワーカーから利用できなくなります。そして、基本的には先頭のデータが消えない限り次のレコードは処理されません。 例えばAWSのSQSはその様に動作します。(visibility timeoutなどである程度順番は制御できる) AWSではKinesisがかなりKafkaに近いサービスとして存在します。(現在はManaged Kafkaも提供されている)

キューでないことによる利点

トピックの共有

データがずっと保持されるということは、ある一つのトピックを複数のシステムで共有できるということを意味します。 つまりデータの発生源はKafkaにデータを送信しておくだけで、複数のマイクロサービスを動かすことができます。その先にどんなサービスがあるかを知る必要はありません。 各サービス側は自分がどのトピックを利用するかだけ分かっていれば良く、そこからデータを取り出して自分の処理をするだけで良くなるため、データの発生源について知っておく必要はなくなります。 仮にデータを利用したいサービスが増えたとしても、データの発生元に送信先を追加する必要はありません。 Kafkaの本体となるミドルウェアをメッセージブローカーと呼びますが、この様にメッセージを仲介してサービス間を疎結合に保つ役割を果たすことができます。

耐久性

Consumerがデータを処理してもデータが変化しないということは、Consumerが処理中に何らかのエラーが起こって処理ノードごとダウンしたという場合であっても、データが失われないことを意味します。 そのため、Consumerでエラーが起きたとしても再キューをする必要はなく、ただ別のノードで同じオフセットから処理を継続すればデータを失うことなく処理を復帰できます。 sidekiqなどの様にRedisを処理キューとして利用した場合には、処理中のワーカーがエラーハンドリングを適切にやらないとデータを失う危険性があります。もしOOMなどによってプロセスが突然死した場合には復帰は不可能です。 Kafkaにおいてはそういった心配はほぼ必要なくなります。

一方で、exactly onceモードを利用しない限りはリトライによって二重処理される可能性はあるため、羃等性に関しては別途注意が必要です。

過去のデータを利用できる

Kafkaは一定期間(デフォルトで7日間)のデータを保持し続けていることに加えて、キューと違って任意のオフセットからデータを読み出すことが出来ます。 そのため、過去のデータを利用して処理を開始することが出来ます。

例えば、新しくマイクロサービスを追加した時に、ある程度過去のデータから処理を開始することができますし、一定期間停止しているサービスがあったとしても、再開した時に停止したタイミングまでのデータが残っていれば、安全に処理を再開できます。

キューでないことによる複雑さ

オフセットの記録

キューと違ってデータが消えないということは、Consumer側で自分がどこまでレコードを処理したのかを覚えておく必要があることを意味します。 そのため、KafkaのConsumerクライアントはオフセットを記録しておく場所が必要になります。Kafkaのプロトコルでは、オフセットの記録場所としてもKafka Brokerを利用します。 Kafka Brokerは無期限にデータを保持することが出来るため、各クライアントのオフセット情報を記録するためのストレージとしても利用できます。 クライアントがどこまで処理をしたかのオフセット情報の更新もまたイベントの一つであり、それが延々更新され続けているイベントストリームでもあります。それをKafka Broker自身に格納しておいて、クライアントは起動時にそのオフセット情報をトピックから読み取ります。Brokerに残っている中で最新の値が今までに処理したレコードのオフセットになります。

ちなみにAWSKinesisでは、この様なオフセット情報の記録をDynamoDBを利用して行います。

Consumer Group

キューと異なることによって生じる複雑さの影響はオフセットの記録だけではありません。Consumerの処理によってブローカーのデータが変化しないので、もし複数のワーカーが同じトピックを処理しようとしたら、オフセットの記録だけではワーカーの数だけ重複して同じ処理が実行されてしまいます。

そのため、Kafka ConsumerにはConsumer Groupという概念が存在します。Consumer Groupはどういう目的でKafkaを利用しているかを指す集合であり、Consumerはそれに所属してBrokerからデータを取得します。

同一のConsumer Groupに所属している場合、Kafkaのあるトピックのあるパーティションはそのグループ内で一つのConsumerでしか処理できません。(一つのConsumerが複数のパーティションやトピックを処理することは出来る)

図で示すと以下の様になります。 consumer-groups.png (26.8 kB)

そのため、あるトピックのパーティション総数以上に、そのトピックを処理するConsumerを増やすことはできません。 パーティション数が30であれば、そのトピックを処理できるConsumerの数は30が上限になります。この数はインスタンスの総数で、同じノードに30あっても30ノードに1つづつあっても一緒です。

Kafka Brokerはパーティショニングをしない

Kafkaの大きな特徴の一つとして、パーティショニングを行う主体がBrokerではないという点が挙げられます。 Kafkaにおいてパーティショニングを行うのはProducerです。

データの発生源がKafka Brokerにレコードを送信する時に、クライアントライブラリの実装によってパーティションが決定されます。 そのため、クライアントライブラリが違っていればパーティショニングのロジックが異なる可能性があります。 例えば、JavaのProducerではデフォルトのパーティショニングロジックはキーに対してmurmur2によるハッシュ化を行いその結果をパーティション総数で割って剰余を利用します。 一方でRubyruby-kafkaという実装ではcrc32を利用してハッシュ化を行い同様に剰余を取って利用します。 また、Rubyには他にlibrdkafkaというCのライブラリのバインディングとして実装されたgemがありそちらはcrc32, murmur2, fnv1aなどが選択可能になっています。 この様に同じデータを送信したとしてもクライアントの実装によってパーティションIDが異なる場合があります。

また、厳密にはレコードのキーとパーティショニングは直接関係している訳ではありません。 あくまでデフォルトの挙動としてレコードのキーを利用するだけで、クライアントは任意のデータを任意のロジックでパーティショニングした結果を直接レコードに指定してBrokerに書き込むことができます。

例えばruby-kafkaを利用していたとしても、自力でmurmur2のハッシュ化を行って同一のハッシュ値を算出できれば、それを元に算出したパーティションを直接レコードに指定することで、Javaのライブラリと同じパーティションにレコードを送信できる様になります。

パーティショニングのロジックを揃えることは実際のシステムにおいてはかなり重要です。

例えば、あるユーザー(ID:100)の1日あたりのアクセス数を集計したい場合、ID:100のユーザーのレコードは一台のノードで処理できることが望ましい。 Consumer Groupによってあるパーティションのデータはある1台のノードでしか処理されないことが保証されるので、一貫してID:100のユーザーのレコードが特定のパーティションに送信されるのであれば、各ノードは単に受け取ったデータをローカルに集計結果として蓄積しておけばよくなります。 この時、もし新しいデータの発生源が追加され、クライアントライブラリの実装が異なったせいでパーティショニングの結果が揃わなくなったとしたら、集計結果の蓄積場所がバラバラになってしまい、正しく集計できなくなります。

この様に、どういうキーとどういうアルゴリズムによってそのトピックのデータがパーティショニングされているかを把握することはとても重要です。

基本的には公式のクライアントの実装であるmurmur2 hashingを利用したパーティショニングに揃えておくのが一番安全だと思います。

パーティション数の見積りの重要性

Kafka Brokerを実際に運用する際に非常に難しい要素となるのが、パーティション数を適切に選択することです。

先に述べた様に、同一のConsumer Groupがあるトピックのパーティションからデータ取得できるのは1つのクライアントだけです。つまりスケールできるクライアント数の上限がパーティション数によって決定されてしまいます。

それなら、パーティション数を後から増やせば良いのでは?と考えるかもしれませんが、実際のところそれはそう簡単にはいかないケースも多々あります。 前段で、パーティショニングロジックが異なって違うクライアントに処理が移ってしまうと集計処理が正しく実行できなくなる可能性があると説明しましたが、パーティション数を後から変えた場合も同様の事が発生する可能性があります。

パーティショニングは基本的にキーとなる値をハッシュ化してトピックのパーティション総数で割った剰余を利用します。 パーティションの総数が変化すると同一のデータが割り当てられるパーティションが変わってしまいます。そうなると最終的に処理しているノードが変化してしまい、集計処理が壊れるという事態が発生します。

そのため、パーティション数を後から変更することは、場合によっては非常に手間のかかる難しいオペレーションになります。 実際にこのトピックをどれぐらいの台数までスケールしたいのかをよく考えてパーティション数を見積るのが重要になります。 基本的な見積りの指針となるのは、ネットワークスループットと要件として求められている処理スループット、そして処理にかかるレイテンシです。

補足ですが、処理ノードが変わったとしても集計処理に関係が無い様にアプリケーションを構築すれば良いのではないかと思うかもしれませんが、それはストリームプロセッシングにおいては余りやりたくない設計になります。 何故ならストリームプロセッシングにおける集計では、バッチ処理よりも遥かにデータ取得や書き込みの回数が多くなります。そのためネットワークを介して別ノードにデータを保持する仕組みは無視できないレイテンシの増加を招くことがあり、かなり強い必要性が無い限りは採用したくない設計と言えます。 この点に関する詳細は、以降の解説記事で紹介していく予定です。

まとめ

Kafka はリアルタイムに大量のイベントデータを処理するためのプラットフォームであり、その中核を為すのがKafka Brokerというミドルウェアである。

イベントデータはユーザーの行動ログだったり、システム間のメッセージのやり取りであったり、データベースの変更履歴であったり、様々なものが含まれる。

イベントデータは複数のパーティションに分割されたトピックに格納される。格納されるデータはキーとバリューで構成されたレコードであり、中身は単なるバイト列である。

Kafka Brokerはキューとは異なり、Consumerの処理によってデータが削除されないため、複数のシステムで同じトピックを共有したり、過去に遡ってデータを処理したり、リトライすることが容易である。

一方で、自分がどの範囲のデータを処理する責務を負っているかはクライアント自身で認識しておく必要があり、そのためにオフセット記録やConsumer Groupなどのキューには無い概念が必要になる。

パーティショニングの責任はクライアントが持つため、パーティション結果に依存する集計処理などを行う際には、クライアントライブラリの実装を把握しておくことが重要になる。

(まあ、大体の事は 公式ドキュメントに書いてあるので、これを全部読めば分かるのだが。)

Kafka Streamsを本番運用する時に検討しておくべきconfigの設定値について

Kafka Streamsをそれなりのデータ規模で運用していくとデフォルトの設定値では動作に困るケースがしばしば出てきます。

その中でも気にしておいた方がいい設定値について紹介していきます。

acceptable.recovery.lag

これはあるタスクにおいてStateStoreとchangelogトピックのlagのどこまでを追い付いていると許容するかどうかを設定します。

デフォルト値は10000なんですが、KTableとかで外部からデータが放り込まれていてそれが結構なデータ量だと10000とか一瞬で越えてしまって、一瞬プロセスを再起動しただけでも追い付いていない、みたいな扱いになってrebalanceが即座に走ってしまう挙動になりました。 実際にデータが入ってくるペースを考慮した値に設定しておかないと、プロセスを再起動した際に余計なrebalanceやレストアが走って無駄な停止時間が発生する可能性がある様です。

probing.rebalance.interval.ms

warmup replicaという仕組みが入って、一定期間ごとにwarmup状態をチェックするためのprobing rebalanceという処理が走る様になったのですが、それの実行間隔を指定します。

このconfigは公式のドキュメントだと重要度がLow扱いになってるんですが、これも運用規模によっては結構危険な設定値でした。デフォルト値は10分間です。 ノード数とかpartition数が多いとrebalanceが完了するまでに結構時間がかかる可能性があって、もしrebalanceが安定するまでに10分ぐらいかかってしまう場合、probing rebalanceがずっと終わらなくて、動作こそ継続できるもののクラスタの状態が不安定になるケースがありました。そういった場合に少し長めの値にすると状態が安定するかもしれません。

fetch.max.bytes, max.partition.fetch.bytes

consumerが一度に取得するデータサイズの上限を設定する値です。

これは全体のスループットや、StateStoreのレストア速度に影響します。余り小刻みにデータを取得するとその分の通信オーバヘッドでスループットが低下するので、データ量やネットワーク速度に合わせた値に設定することで、レストアにかかる時間が改善する可能性があります。

最近のjoker1007

Rubyist近況[1] Advent Calendar 17日目です。

仕事的な近況

最近、仕事で全然Rubyを書かなくなってきた。たまにRailsアプリの改修作業をやる程度で、今年書いた量で言うなら間違いなくJavaが一番多いだろう。

直近で書いたブログ記事を参照してもらえると分かり易いが、ここ1年ぐらいの自分の主戦場はKafkaとストリームプロセッサである。

Kafkaは非常に便利なミドルウェアだがメッセージキューの様でメッセージキューでなく、分散ストレージとして動作するがブローカー自体は余り分散の仕組みをコントロールする訳でもなく、クライアントライブラリ側で色々制御する様な仕組みになっているので、どういう活用の仕方をするものなのかイマイチ理解しづらい難しいミドルウェアでもある。

自分は完全に0から考えて必要だと思って調査してほとんどの仕組みを勝手に作ってしまったので性質も活用方法も分かっているが、そんなにすぐに各メンバーにそういった知識が伝えられる訳では無いので、Kafkaをゴリゴリ使う機能は必然的に自分が主導で書くことになる。

段階的に知識を移譲している最中だが、当分の間は作って説明して資料を作ってというのを繰り返すことになるだろう。

こんな感じで、ひたすらKafka周りの整備や作ったストリーム処理アプリケーションのパフォーマンスチューニングや新機能の実装をやっているとJavaばかり書くことになってしまった。今は秒間10万件のレコードを速攻で処理するストリーム処理基盤にガシガシ機能を追加している。

実際、Javaを結構書く様になって思ったが、最近のJavaには余りストレスを感じない。まあinterfaceの扱いとか中途半端なOptionalとかコンストラクタの制約とかlombok頼りな現状とか色々どうにかならんのかと思うことはあるが、長年の積み重ねで磨き抜かれたInteliJが余りに便利なので、開発体験自体はかなり良い。

(正直、そびえ立つまでに大きくなったRailsアプリケーションを改修するより遥かに楽とさえ言える……。やっぱ型検査とそれを利用した高度なLanguage Serverとか欲しくなるよね。)

そして、Rubyを全然書いていないのでgemのメンテやら何やらも割と放置気味で、3.1が出ようとしているこの時期になっても、全然新機能で遊んでいる余裕もない。Rubyistとしては恥ずかしいばかりである。仕事でMPを使い切っているのも余り良い状態ではない。もっとゆとりがあった方が良いのは間違いないだろう。

一方で、Kafkaについては多分かなり詳しくなったので、無茶苦茶高度な質問でなければ答えられるぐらいには理解したと思う。もしKafkaに興味がある人が居たらどこかで情報交換できると嬉しい。

ちなみに、何故Kafkaを使ってるのか、何がそんなに大事なのかという話については、後日改めてブログに書こうかなと思っているので、興味がある方は是非読んでやってください。

個人的な近況

今年は年の始めにzen3アーキのRyzenRadeon RX 6900 XTを使って新しいPCを組んだので、PC環境は今のところかなり快適な状態。

joker1007.hatenablog.com

OSは昔から愛用しているGentoo Linuxを使っている。Linuxにおけるノイキャンの方法とかビデオ会議やらなんやらの環境構築については、以下の様な記事を書いたので、もしLinuxに乗り換えようか考えてる人が居たら、参考になるかもしれません。

joker1007.hatenablog.com

別にやんちゃクラブに触発されたという程ではないが、SteamのおかげでLinuxでも結構PCゲームは出来る様になっており、Gentoo LinuxでもPCゲームはちゃんと遊べるぞ、ということを主張するためにゲームプレイを配信していこうかと思って、最近配信試験をしていたりとかする。とりあえず今はFF4のピクセルリマスターのプレイをYouTubeに記録している。今はちょうどラストダンジョン直前まで来てます。

www.youtube.com

他にHorizon Zero Dawnにシェーダー噛ませてRay Tracingを使える様にした比較動画とかを作ったりしてます。

www.youtube.com

Linuxでこの辺りのことをやる方法についても、暇を見つけて解説記事書いてみようかと思っているので、こちらも興味がある方はお楽しみに。

今年買ったものとか

今年買った大きい買い物は基本的にデスク周りのものが多い。まあ基本引き篭りやしね……。

  • FlexiSpotの電動昇降机
  • 3万円ぐらいのデスク用のサウンドバー (暫定的に買ったものなので、もうちょい良いものが欲しい。HDMI 2.1に対応していて4k 120fps出るやつとか無いだろうか)
  • 10Gbps対応のtp-linkのスイッチングハブ (しかし10GbpsのNICは無い……)
  • Sony WF-1000XM4 (無くしそうと思ってたが、家の鍵と同じ固定のポケットに入れられるので割と大丈夫かもしれない)
  • iPhone13 (5年ぶりの新スマホ)

WF-1000XM4は思ってたより買って良かった。うちのGentoo Linuxと96khzで接続できたし、ちょうど緊急事態宣言も明けて神田ぐらいまで飲みに行く機会も増えた時期に買ったので、利用頻度も結構高い。最初ちょっと音が籠ってるかなと思ったが慣れてきたら十分に良い音だと思う様になってきたし、これはオススメの品だと思う。

生命的な近況

いよいよ30代も大分後半になってきて、ぶっちゃけ体のあちこちにガタが来ている実感がある。そもそも不摂生の塊みたいな人間なので生活を改めろよ、という話なのだが、面倒なので何も対処しないまま何となく生きている。

正直、後10年ぐらいしたら死ぬのでは……ぐらいの感じではあるのだが、万が一半端に長生きしてしまった時のために、そろそろ金をまともに溜める手段を考えないと、このままでは老人になった時に詰んでしまうので、その辺りも気にする様になってきた。

とりあえず、来年の目標としていい加減にIDECOとNISAぐらいちゃんとやるかと考えている。

しかし、自分は家庭の都合という奴でどうにもならない出費が物凄く多いため、どうやって安定して運用資金を捻出するか非常に難しい。実際、毎月の収支は現状トントンか弱赤字である。収入が途絶えたら即座に死ぬ。(皆どうやって金溜めてんのかなと不思議である)

まあ、今のところは働けなくなる程のダメージは負ってないので、しばらくは死なないと思う。もし死にそうになったらTwitterで死ぬ手前ぐらいまでは実況していきたいと思っている。

まとめ

という訳で、何とか生きてます。2022年もよろしくお願いします。来年はもう少し友人に会って「うぇーい」ってビールジョッキを酌み交わせる日が戻ってくると良いなあ。

Kafka StreamsのStateStoreのちょっと変わった使い方 (ローカルキャッシュ、 コンフィグストア)

この記事は Kafka Advent Calendarの14日目の記事です。(1日遅れてるけど)

今回は割とライトな記事です。

ローカルキャッシュとしてのStateStore

Kafka StreamsにおいてStateStoreは、トピックから入力されて何らかの状態を保持した処理をしたい場合にその状態を永続化するために利用するのが基本的な形です。

一方でProcessor APIを活用しだすと、別にトピックから入力されてKafka Streamsで処理したものだけが保持対象じゃないことに気付きます。

StateStoreは基本的にはchangelogと紐付いているので、大規模な集計処理等を行ってデータ量が膨れ上がると、もしノードのストレージが無くなった時やrebalanceが走った時に、レストアに非常に長い時間が必要になります。これは現時点でちょっとやそっとでは解消しようがありません。

それを避けるために、通信という大きなオーバーヘッドを抱えても外部のデータソース(RedisやCassandraなど)からデータを取得したくなることがあります。

RedisやCassandraはかなり読み込みが高速なデータストアですが、Kafka Streamsの様なストリームプロセッサで頻繁に通信を行うと流石にローカルのStateStoreと比べて劇的に性能が落ちます。

そこで、withLoggingDisabled()をセットしたWindowStoreをキャッシュに使います。

changelogと紐付いていなければレストア処理が発生しないので、Processor APIの中でWindowStoreにデータが見つからなかった時だけデータを取りにいけば、処理内容によってはかなりの割合で外部との通信処理を削減することができます。

ここでWindowStoreを利用するのは現在KeyValueStoreでは自動的にデータをexpireする方法が無いからです。WindowStoreならretention timeを調整することで一定期間過ぎたStateStoreをローカルストレージから自動で削除できます。(RocksDB自体にはTTLを設定する方法があるがKafka Streamsからはそれを変更できない実装になっている)

実際MemoryStoreには、lruMapというストアが用意されており、オンメモリに乗る範囲であればこれを利用するとローカルなキャッシュがシンプルに実現できます。ただこれはメモリに乗る範囲でしか使えないことと、プロセスを再起動したら消滅してしまうことが難点です。

KeyValueStoreでTTLが使える様になると嬉しいんですが、今のところissueがあるのみ、といった感じですね。(https://issues.apache.org/jira/browse/KAFKA-4212)

コンフィグストアとしてのGlobalKTable

GlobalKTable(GlobalStateStore)は、全処理ノードに分配されるキーバリューストアとして利用できます。つまりデータフォーマットさえ決めておけばKafkaのトピックを経由して、任意の設定値を起動中の全ノードに配布したり削除したり出来るということです。

例えば、Kafka Streamsに限らずストリームプロセッサは処理が複雑になってくると結構デバッグするのが大変です。そういった時にユーザーの処理の流れを事細かに追いたいケースがあります。

デバッグモードを作り込むことは出来ますが、全ユーザーに対してそれを動かすと負荷がとんでもないことになって処理が詰まってしまったり、ログが膨大になったりする可能性が高いでしょう。

なので、対象のユーザーを絞ったりサンプリングして取得する様にしておきたくなります。

こういったケースでGlobalKTableに対象のユーザーの識別子やサンプリングの割合などの設定値を保持可能にしておきます。開発者はCLIのKafka Producerを使ってGlobalKTableになっているtopicにJSONを送り込めば、全ノードに対して起動したまま対象ユーザーの設定を変更することができます。必要無くなればnull値を送って設定を消せば元通りに出来ます。

こういった仕組みを管理機構として組み込んでおけば、いざという時に情報収集がしやすくなります。

この様にGlobalKTableはStreamからのJoin対象としてだけ利用するのではなく、外部から全ノードに設定値を即座に転送しストリームプロセッサ内で利用する仕組みにも応用できる、という話でした。

Kafka StreamsのWindowStoreのchangelogを再利用するためのダーティハック

この記事で紹介する内容はある意味で非常にリスキーであり、Kafka Streamsの内部実装に強く依存しています。 現状代替できる策が無いため、やむなく編み出した方法であることに注意してください。

Kafka StreamsのStateStoreとchangelogについて

Kafka Streamsではイベントの数をカウントするなどのStatefulな処理を実現するためにStateStoreというキーバリューストアを利用します。 もし処理ノードが壊れた時にも、キーバリューストアのデータが失われない様にするため、Kafka StreamsはStateStoreにデータを書き込むと同時にKafka Brokerに変更内容を書き込みデータの永続性を担保します。これをchangelogトピックと言います。

WindowStoreのchangelogの再利用

24時間のTimeWindowでユーザーのイベント毎の実行回数をカウントし、1ヶ月間(30日)保持しているとします。この場合、保持しておかなけれはいけないレコードの総数は最大で ユーザーの総数 * イベントの種類 * 30日分 になります。もしユーザーが1000万人、イベントが100種類あったとすると300億件のレコードを保持する必要があります。

そして、このイベントカウンターを他のKafka Streamsアプリケーションで利用したくなった時にどうするかを考えます。 (ワークロードが大きく異なりクラスタのキャパシティコントロールを考慮して別クラスタとして管理したい等の理由で)

この場合、現時点でこのWindowStoreを真っ当な手段で再利用する方法はありません。

安全な方法はイベントカウンターの情報を別のトピックに書き出して、それをKTableとして別アプリケーションから読み出す方法です。

しかし、この方法では単なるKeyValueStoreとしてしか読み出せなくなる上に、changelogトピックと別のトピックにデータを書き出す必要があってブローカーの負荷やストレージ消費が2倍になります。数千億件ぐらいの総数になってくると、気楽に倍のデータを保持する訳にはいかなくなってきます。

なので、どうにかしてWindowStoreのバックにあるchangelogを読み出してWindowStoreを復元できないものかと方法を探しました。

そして、Topology Optimizationで使われている内部APIを無理矢理引っ張り出すことで実現可能であることを発見しました。

InternalTopologyBuilder#connectSourceStoreAndTopicとWindowStore書き込みプロセッサの実装

Topology Optimizationを有効にすると、KTableとしてトピックのデータを読み込んだ時にchangelogトピックを新規に作成せずに、直接StateStoreとソーストピックを接続し、StateStoreにcheckpointが存在しない場合は、そのソーストピックからデータを取ってリカバリする様に動作します。

この処理をコントロールしているのが、InternalTopologyBuilder#connectSourceStoreAndTopicです。

    public void connectSourceStoreAndTopic(final String sourceStoreName,
                                           final String topic) {
        if (storeToChangelogTopic.containsKey(sourceStoreName)) {
            throw new TopologyException("Source store " + sourceStoreName + " is already added.");
        }
        storeToChangelogTopic.put(sourceStoreName, topic);
        changelogTopicToStore.put(topic, sourceStoreName);
    }

ちなみにKTableのOptimizationでどう呼ばれているかというと、こんな風なコードになっています。

// TableSourceNode.java

    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
        final String topicName = topicNames().iterator().next();

        // TODO: we assume source KTables can only be timestamped-key-value stores for now.
        // should be expanded for other types of stores as well.
        final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
            new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();

        if (isGlobalKTable) {
            topologyBuilder.addGlobalStore(
                storeBuilder,
                sourceName,
                consumedInternal().timestampExtractor(),
                consumedInternal().keyDeserializer(),
                consumedInternal().valueDeserializer(),
                topicName,
                processorParameters.processorName(),
                (ProcessorSupplier<K, V, Void, Void>) processorParameters.processorSupplier()
            );
        } else {
            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
                                      sourceName,
                                      consumedInternal().timestampExtractor(),
                                      consumedInternal().keyDeserializer(),
                                      consumedInternal().valueDeserializer(),
                                      topicName);

            topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName);

            // only add state store if the source KTable should be materialized
            final KTableSource<K, V> ktableSource = processorParameters.kTableSourceSupplier();
            if (ktableSource.materialized()) {
                topologyBuilder.addStateStore(storeBuilder, nodeName());

                if (shouldReuseSourceTopicForChangelog) {
                    storeBuilder.withLoggingDisabled();
                    topologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName);
                }
            }
        }

    }

このメソッドによってStateStoreとソーストピックの関係が登録されることで、このソーストピックから直接StateStoreをレストアする動作が動く様になります。

このAPIは非公開になっていて通常はアクセスできないので、これをReflectionを使って無理やり引っ張り出します。

  @SneakyThrows
  private InternalTopologyBuilder getInternalTopologyBuilder(Topology topology) {
    Field internalTopologyBuilderField = Topology.class.getDeclaredField("internalTopologyBuilder");
    internalTopologyBuilderField.setAccessible(true);
    return (InternalTopologyBuilder) internalTopologyBuilderField.get(topology);
  }

  private void connectSourceStoreAndTopic(
      Topology topology, String storeName, String changelogTopicName) {
    InternalTopologyBuilder internalTopologyBuilder = getInternalTopologyBuilder(topology);

    internalTopologyBuilder.connectSourceStoreAndTopic(storeName, changelogTopicName);
  }

この様にTopologyクラスから非公開フィールドのinternalTopologyBuilderというフィールドを引っ張り出してきて無理矢理アクセスして、その中にあるInternalTopologyBuilderインスタンスを引っ張り出します。

InternalToplogyBuilderインスタンスさえ手に入れば、後は普通にpublicメソッドを呼ぶだけなので、これで対応可能です。

  connectSourceStoreAndTopic(
      topology,
      eventCounterStore.name(),
      "other-app-event-counter-store-changelog");

ただし、これだけではまだ再利用できません。これで動作するのはレストアだけなので、新しくレコードが到達した時にそのレコードをWindowStoreに書き込むことが出来ません。

そのため、上記に加えてProcessor APIを使って新しくレコードが来た時に正しくWindowStoreに書き込む処理を実装します。

import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.WindowStore;

@Slf4j
public class WindowStoreSource<K, V> implements ProcessorSupplier<Windowed<K>, V, Windowed<K>, V> {

  private final String queryableName;

  public WindowStoreSource(final String queryableName) {
    Objects.requireNonNull(queryableName, "storeName can't be null");

    this.queryableName = queryableName;
  }

  @Override
  public Processor<Windowed<K>, V, Windowed<K>, V> get() {
    return new WindowStoreSourceProcessor();
  }

  private class WindowStoreSourceProcessor implements Processor<Windowed<K>, V, Windowed<K>, V> {
    private WindowStore<K, V> store;

    @Override
    public void init(ProcessorContext<Windowed<K>, V> context) {
      store = context.getStateStore(queryableName);
    }

    @Override
    public void process(Record<Windowed<K>, V> record) {
      store.put(record.key().key(), record.value(), record.key().window().start());
    }
  }
}

基本的に来た側からデータをStateStoreに書いているだけです、レコードのTimeWindowの開始時点をちゃんとWindowStoreに書き込む時に引っ張ってきていることに注意してください。

最後にchangelogトピックのSerdeとWindowStoreBuilderを適切に設定して完了です。

  // changelog topicに書き込まれているTimeWindow情報付きのキーのデシリアライザ
  // このデシリアライザはドキュメントは無いものの、普通にアクセスできる。
  TimeWindowedDeserializer<String> windowedStringDeserializer =
      new TimeWindowedDeserializer<>(
          Serdes.String().deserializer(), Duration.ofHours(24).toMillis());
  windowedStringDeserializer.setIsChangelogTopic(true);

  final var EVENT_COUNTER_SOURCE = "EVENT_COUNTER_SOURCE";
  topology.addSource(
      EVENT_COUNTER_SOURCE,
      windowedStringDeserializer,
      new EventCounterDeserializer(),
      "other-app-event-counter-store-changelog");

  final var EVENT_COUNTER_PROCESSOR = "EVENT_COUNTER_SOURCE";
  topology.addProcessor(
      EVENT_COUNTER_PROCESSOR,
      new WindowStoreSource<String, EventCounter>(eventCounterStore.name()),
      EVENT_COUNTER_SOURCE);
  topology.connectProcessorAndStateStores(
      EVENT_COUNTER_PROCESSOR, eventCounterStore.name());

最後に

実際のところ、これは内部の実装が変わると容易にぶっ壊れる上に、完全にundocumentedなので、全くその実装が維持されることは保証されていません。 とはいえ、フレームワークがメジャーバージョンアップしたら色々直さなければいけないのは普通のことなので、Kafka Brokerを実際に立ててWindowStoreのレストアと書き込みに関するIntegration Testを実装し、CIを回しておけばまず気付けるのではないかと思います。

私の環境では、このハックによってかなりの量のデータやネットワーク負荷を削減することが出来たのでリスクと天秤にかけて利用する価値はあると判断していますが、正直万人にオススメはしないハックと言えます。

とはいえ、Kafka Streamsの内部実装を知る勉強になるかもしれないので、今回紹介させていただきました。

Kafka Streamsのlow level APIであるProcessor APIの活用例 (範囲JOINの実現)

この記事はKafka Advent Calendarの10日目の記事になります。

Kafka StreamsにはProcessor APIというlow levelなAPIがあります。

https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html

https://docs.confluent.io/ja-jp/platform/6.0.1/streams/developer-guide/processor-api.html (confluentには日本語のドキュメントもあるのでサクっと読みたい人はこちら)

私がしばらく開発してきたKafka Streamsアプリケーションは開発の最初こそStream DSLを使って開発してましたが、今やほとんどProcessor APIしか使ってません。 実際のところ、これを使えば何でも出来るんですが、何でも出来るとなると逆に取っ掛かりが捕めないと思うのでユースケースを紹介していきたいと思います。

一番重要な点はStreams DSLの裏に隠れているStateStoreを直接使えるという点です。これにより通常のStream DSLでは実現できないがユースケースとして十分に必要性がありそうな事が実現可能になります。

1. 2つ以上の関連レコードを取得するJOIN

Stream DSLによって提供されるJOINはキーによる完全一致のみです。INNER JOIN, LEFT JOIN, OUTER JOINとありますが、キーによって一致する最新のレコードを取ってくることしかできません。

一方で、Processor APIを利用すると部分的にキーが一致するレコードを複数取得するJOINや、キー以外の値でJOINする事が可能になります。

これには、実際のところStateStoreが実際のところどういう物なのかということを理解しておく必要があります。(注: ここで解説する挙動は組込みで実装されているStateStoreに関する挙動であり、自分でカスタムなStateStoreを実装している場合はこの限りではありません)

StateStoreはKafka Streamのタスクごとに作成され、キーのバイト列でソートされたキーバリューストアとして表現されます。これはOn MemoryでもPersistent(backendはRocksDB)でも同じです。

そして、実はStateStoreは本当にただのキーバリューストアなので、保存されているキーとKafkaのパーティションとは全く関係がありません。

どういうことかというと、Kafkaがuser_idでパーティションされていて、それによってKafkaのパーティションIDが決まっていたとしても、StateStoreに書き込むキーはそれと関係なく、例えばgroup_idとかuserの行動ログのevent_id等を設定できるということです。

例えばuserがある行動を取る度に、userが複数登録しているWeb Hookを叩きたいとします。

この場合user_idをパーティションキーとして利用して、以下の様なイベントログを受けるevent_logsトピックがあると考えてみましょう。

{user_id: 1, event_id: 123, payload: {"key1": "value1", "key2": "value2"}}

そして、event_idが123に一致する時にあるURLにevent_logsのpayloadを渡してアクセスして欲しい、というWebHookを受け取るweb_hooksというtopicを考えてみます。データ構造は以下の通り。

{webhook_id: 1, user_id: 1, event_id: 123, url: "http://foo.example.com/api", http_method: "POST"}
{webhook_id: 1, user_id: 1, event_id: 123, url: "http://bar.example.com/api", http_method: "POST"}
{webhook_id: 1, user_id: 1, event_id: 123, url: "http://hoge.example.com/api", http_method: "POST"}

この例で分かる様に一人のユーザーは複数のWebHook設定を持っている可能性があります。一つのイベントが来た時にキーが完全一致する一件だけを取得するのでは必要な情報が足りません。

という訳でキーとパーティショニングを工夫します。Kafkaはキーとバリューでレコードを管理しますが、実際のところキーとパーティション割り当ては直接関係がありません。Producerのクライアントライブラリのデフォルトの挙動がそうなっているだけで、送信時にレコードのキーと異なる任意のキーでパーティションIDを割り当てることができます。

この例では、user_idでパーティショニングした上でレコードのキーを#{sprintf( '%020d', user_id)}:${webhook_id}の様な形にしてbrokerに送信します。

こうしておいた上でKafka StreamsアプリでKTableとして読み込みます。

StreamsBuilder builder = new StreamsBuilder();

KTable<String, WebHook> webHooks =
    builder.table(
        "web_hooks",
        Consumed.with(Serdes.String(), new WebHookSerde())
            .withName("Source-WebHook"),
        Materialized.<String, WebHook, KeyValueStore<Bytes, byte[]>>as(
                "web_hooks_store"));

// SerdeはJSONをオブジェクトとして読み込んでくれるものが定義されているものとします

この様にKTableとして読み込む様にすると、user_idでパーティショニングされたIDと関連するタスク上でStateStore上には#{sprintf(user_id, '%020d')}:${webhook_id}というキーでデータが書き込まれて保持されます。

そして、イベントログを受け取る側でProcessorAPIを利用して、KTableとして書き込んだStateStoreを参照します。

KStream<String, EventLog> eventLogs =
    builder.stream(
        "event_logs",
        Consumed.with(Serdes.String(), new EventLogSerde())
            .withName("Source-EventLog"));

eventLogs.process(() -> new EventLogProcessor(), Named.as("EventLogProcessor"), "web_hooks_store");

この様に引数で指定することでProcessorは名前が一致する定義済みのStateStoreを参照することができます。

Processorはこの様な実装になります。

public class EventLogProcessor implements Processor<String, EventLog, String, WebHookRequest> {
  private ProcessorContext<String, EventLog> context;
  private TimestampedKeyValueStore<String, EventLog> eventLogStore;

  @Override
  public void init(ProcessorContext<String, WebHookRequest> context) {
    context = context;
    eventLogStore = context.getStateStore("web_hooks_store");
  }

  @Override
  public void process(Record<String, EventLog> record) {
    var userId = record.value().getUserId();
    var keyPrefixStart = String.format("$020d", userId)
    var keyPrefixEnd = String.format("$020d", userId + 1);
    try (var it = eventLogStore.range(keyPrefixStart, keyPrefixEnd)) {
      while (it.hasNext()) {
        var keyValue = it.next();
        var valueAndTimestamp = keyValue.value;
        var value = valueAndTimestamp.value();
        if (record.value.getEventId() == value.getEventId()) {
          var webhookRequest = new WebHookRequest(userId, value.getWebHookId(), record.value().getPayload());
          context.forward(record.withValue(webhookRequest));
        }
      }
    }
  }
}

注目して欲しいのはcontext.getStateStore("web_hooks_store")var it = eventLogStore.range(keyPrefixStart, keyPrefixEnd)です。

initの中でprocessメソッドに引き渡したStateStoreを取得していますが、Processorの中で取得できるのは引数で渡された名前のStateStoreだけです。この様にしてinit内で取得したものをプライベート変数の中に保持しておきます。またレコードのフォワードに利用するのでProcessorContextも一緒に保持しておきます。

実際にレコードが到達した時に処理されるのがprocessメソッドです。

レコードが到達したらレコード情報からuser_idを取得しそれを元にキーのprefixを作ります。そしてそのキーのprefixを利用してStateStoreのrangeメソッドを使って範囲取得を実行します。StateStoreはキーのバイト列でソートされているので、キーの構成が適切なら特定のuser_idに一致する全てのレコードがStateStoreから取得できます。

後はイテレーターを回して、一つづつレコードを結合してforwardメソッドに渡すだけで、後続で任意の処理が実行できます。この例だと次のProcessorでWebHookの実際のリクエスト処理を実装することになるでしょう。

この様にすることで、KTableを利用して範囲JOINが実現可能です。

注意点と補足

Processor APIを使ってcontext.forwardしてしまうと、現時点では移行の処理はlow levelのProcessor APIでしか処理できません。後続の処理をDSLでやる様に戻すことは出来ないということです。

Processor APIを使って後続の処理を実装する場合は以下の様になります。

topology.addProcessor(
    "NextProcessor",
    () -> new NextProcessor(),
    "EventLogProcessor",
    "other_state_store_name");

こんな感じで一度StreamBuilderからbuildした後のTopologyに対してaddProcessorを読んで、親のノードの名前を指定することで任意のProcessorを後続に繋げることが出来ます。 この後続のProcessorはいくつも追加できる様になっていて、親になるProcessorcontext.forwardすると全ての子となるプロセッサで処理が実行されます。もし、特定のProcessorだけに処理を渡したい場合はProcessorに渡した名前(この例ではNextProcessor)を利用して特定の子Processorにだけ処理をルーティングする様にcontext.forwardの引数で調整することもできます。

また、補足ですが、context.fowardした後の後続の処理は全て同期的に連なった単なるメソッド呼び出しになっており、あるProcessor内で複数回context.fowardを呼ぶとその都度末端まで処理が同期的に実行されてから次のレコードに処理が移ります。

そして、あるレコードに起因する全ての処理が末端まで呼び出された後に、次のレコードが処理され、そしてバッファに存在している全てのレコードの処理が終わると、次のランループに入ってレコードをtopicからpollingする処理が実行されブローカーからレコードを取得します。

実際の所、Kafka StreamsはStream Threadごとに並列で動作しますが、そのStream Threadの中身は単純なwhile loopによる同期処理の連なりで実現されている、ということです。

まとめ

今回はKafka Streamsのlow level APIで実現する範囲JOINについて解説しました。 他にもProcessor APIを使った割と利用頻度の高そうなユースケースがいくつかあるのですが、今日は力尽きたのでここまでにさせていただきます。 また時間があれば、別のユースケースも紹介するかもしれません。その時はまた読んでやってください。