Kafka StreamsのWindowStoreのchangelogを再利用するためのダーティハック

この記事で紹介する内容はある意味で非常にリスキーであり、Kafka Streamsの内部実装に強く依存しています。 現状代替できる策が無いため、やむなく編み出した方法であることに注意してください。

Kafka StreamsのStateStoreとchangelogについて

Kafka Streamsではイベントの数をカウントするなどのStatefulな処理を実現するためにStateStoreというキーバリューストアを利用します。 もし処理ノードが壊れた時にも、キーバリューストアのデータが失われない様にするため、Kafka StreamsはStateStoreにデータを書き込むと同時にKafka Brokerに変更内容を書き込みデータの永続性を担保します。これをchangelogトピックと言います。

WindowStoreのchangelogの再利用

24時間のTimeWindowでユーザーのイベント毎の実行回数をカウントし、1ヶ月間(30日)保持しているとします。この場合、保持しておかなけれはいけないレコードの総数は最大で ユーザーの総数 * イベントの種類 * 30日分 になります。もしユーザーが1000万人、イベントが100種類あったとすると300億件のレコードを保持する必要があります。

そして、このイベントカウンターを他のKafka Streamsアプリケーションで利用したくなった時にどうするかを考えます。 (ワークロードが大きく異なりクラスタのキャパシティコントロールを考慮して別クラスタとして管理したい等の理由で)

この場合、現時点でこのWindowStoreを真っ当な手段で再利用する方法はありません。

安全な方法はイベントカウンターの情報を別のトピックに書き出して、それをKTableとして別アプリケーションから読み出す方法です。

しかし、この方法では単なるKeyValueStoreとしてしか読み出せなくなる上に、changelogトピックと別のトピックにデータを書き出す必要があってブローカーの負荷やストレージ消費が2倍になります。数千億件ぐらいの総数になってくると、気楽に倍のデータを保持する訳にはいかなくなってきます。

なので、どうにかしてWindowStoreのバックにあるchangelogを読み出してWindowStoreを復元できないものかと方法を探しました。

そして、Topology Optimizationで使われている内部APIを無理矢理引っ張り出すことで実現可能であることを発見しました。

InternalTopologyBuilder#connectSourceStoreAndTopicとWindowStore書き込みプロセッサの実装

Topology Optimizationを有効にすると、KTableとしてトピックのデータを読み込んだ時にchangelogトピックを新規に作成せずに、直接StateStoreとソーストピックを接続し、StateStoreにcheckpointが存在しない場合は、そのソーストピックからデータを取ってリカバリする様に動作します。

この処理をコントロールしているのが、InternalTopologyBuilder#connectSourceStoreAndTopicです。

    public void connectSourceStoreAndTopic(final String sourceStoreName,
                                           final String topic) {
        if (storeToChangelogTopic.containsKey(sourceStoreName)) {
            throw new TopologyException("Source store " + sourceStoreName + " is already added.");
        }
        storeToChangelogTopic.put(sourceStoreName, topic);
        changelogTopicToStore.put(topic, sourceStoreName);
    }

ちなみにKTableのOptimizationでどう呼ばれているかというと、こんな風なコードになっています。

// TableSourceNode.java

    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
        final String topicName = topicNames().iterator().next();

        // TODO: we assume source KTables can only be timestamped-key-value stores for now.
        // should be expanded for other types of stores as well.
        final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
            new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();

        if (isGlobalKTable) {
            topologyBuilder.addGlobalStore(
                storeBuilder,
                sourceName,
                consumedInternal().timestampExtractor(),
                consumedInternal().keyDeserializer(),
                consumedInternal().valueDeserializer(),
                topicName,
                processorParameters.processorName(),
                (ProcessorSupplier<K, V, Void, Void>) processorParameters.processorSupplier()
            );
        } else {
            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
                                      sourceName,
                                      consumedInternal().timestampExtractor(),
                                      consumedInternal().keyDeserializer(),
                                      consumedInternal().valueDeserializer(),
                                      topicName);

            topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName);

            // only add state store if the source KTable should be materialized
            final KTableSource<K, V> ktableSource = processorParameters.kTableSourceSupplier();
            if (ktableSource.materialized()) {
                topologyBuilder.addStateStore(storeBuilder, nodeName());

                if (shouldReuseSourceTopicForChangelog) {
                    storeBuilder.withLoggingDisabled();
                    topologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName);
                }
            }
        }

    }

このメソッドによってStateStoreとソーストピックの関係が登録されることで、このソーストピックから直接StateStoreをレストアする動作が動く様になります。

このAPIは非公開になっていて通常はアクセスできないので、これをReflectionを使って無理やり引っ張り出します。

  @SneakyThrows
  private InternalTopologyBuilder getInternalTopologyBuilder(Topology topology) {
    Field internalTopologyBuilderField = Topology.class.getDeclaredField("internalTopologyBuilder");
    internalTopologyBuilderField.setAccessible(true);
    return (InternalTopologyBuilder) internalTopologyBuilderField.get(topology);
  }

  private void connectSourceStoreAndTopic(
      Topology topology, String storeName, String changelogTopicName) {
    InternalTopologyBuilder internalTopologyBuilder = getInternalTopologyBuilder(topology);

    internalTopologyBuilder.connectSourceStoreAndTopic(storeName, changelogTopicName);
  }

この様にTopologyクラスから非公開フィールドのinternalTopologyBuilderというフィールドを引っ張り出してきて無理矢理アクセスして、その中にあるInternalTopologyBuilderインスタンスを引っ張り出します。

InternalToplogyBuilderインスタンスさえ手に入れば、後は普通にpublicメソッドを呼ぶだけなので、これで対応可能です。

  connectSourceStoreAndTopic(
      topology,
      eventCounterStore.name(),
      "other-app-event-counter-store-changelog");

ただし、これだけではまだ再利用できません。これで動作するのはレストアだけなので、新しくレコードが到達した時にそのレコードをWindowStoreに書き込むことが出来ません。

そのため、上記に加えてProcessor APIを使って新しくレコードが来た時に正しくWindowStoreに書き込む処理を実装します。

import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.WindowStore;

@Slf4j
public class WindowStoreSource<K, V> implements ProcessorSupplier<Windowed<K>, V, Windowed<K>, V> {

  private final String queryableName;

  public WindowStoreSource(final String queryableName) {
    Objects.requireNonNull(queryableName, "storeName can't be null");

    this.queryableName = queryableName;
  }

  @Override
  public Processor<Windowed<K>, V, Windowed<K>, V> get() {
    return new WindowStoreSourceProcessor();
  }

  private class WindowStoreSourceProcessor implements Processor<Windowed<K>, V, Windowed<K>, V> {
    private WindowStore<K, V> store;

    @Override
    public void init(ProcessorContext<Windowed<K>, V> context) {
      store = context.getStateStore(queryableName);
    }

    @Override
    public void process(Record<Windowed<K>, V> record) {
      store.put(record.key().key(), record.value(), record.key().window().start());
    }
  }
}

基本的に来た側からデータをStateStoreに書いているだけです、レコードのTimeWindowの開始時点をちゃんとWindowStoreに書き込む時に引っ張ってきていることに注意してください。

最後にchangelogトピックのSerdeとWindowStoreBuilderを適切に設定して完了です。

  // changelog topicに書き込まれているTimeWindow情報付きのキーのデシリアライザ
  // このデシリアライザはドキュメントは無いものの、普通にアクセスできる。
  TimeWindowedDeserializer<String> windowedStringDeserializer =
      new TimeWindowedDeserializer<>(
          Serdes.String().deserializer(), Duration.ofHours(24).toMillis());
  windowedStringDeserializer.setIsChangelogTopic(true);

  final var EVENT_COUNTER_SOURCE = "EVENT_COUNTER_SOURCE";
  topology.addSource(
      EVENT_COUNTER_SOURCE,
      windowedStringDeserializer,
      new EventCounterDeserializer(),
      "other-app-event-counter-store-changelog");

  final var EVENT_COUNTER_PROCESSOR = "EVENT_COUNTER_SOURCE";
  topology.addProcessor(
      EVENT_COUNTER_PROCESSOR,
      new WindowStoreSource<String, EventCounter>(eventCounterStore.name()),
      EVENT_COUNTER_SOURCE);
  topology.connectProcessorAndStateStores(
      EVENT_COUNTER_PROCESSOR, eventCounterStore.name());

最後に

実際のところ、これは内部の実装が変わると容易にぶっ壊れる上に、完全にundocumentedなので、全くその実装が維持されることは保証されていません。 とはいえ、フレームワークがメジャーバージョンアップしたら色々直さなければいけないのは普通のことなので、Kafka Brokerを実際に立ててWindowStoreのレストアと書き込みに関するIntegration Testを実装し、CIを回しておけばまず気付けるのではないかと思います。

私の環境では、このハックによってかなりの量のデータやネットワーク負荷を削減することが出来たのでリスクと天秤にかけて利用する価値はあると判断していますが、正直万人にオススメはしないハックと言えます。

とはいえ、Kafka Streamsの内部実装を知る勉強になるかもしれないので、今回紹介させていただきました。