Kafka Streamを使ったストリーム処理の概要と運用時の考慮点

最近、仕事で分散ストリーム処理に手を出していて、その基盤としてApache KafkaとKafka Streamsを使うことにしたので、動作概要とストリーム処理のイメージについてまとめておく。

kafkaそのものについては今更説明の必要は無いだろうと思う。

Kafka Streamsはkafkaを基盤にして分散ストリーム処理を簡単に書くためのDSLライブラリ。

https://kafka.apache.org/documentation/streams/

延々流れてくるデータを変換して別のtopicに流したり、時間のウインドウを区切ってカウントした結果を流したり、みたいなのがサクっと書ける。 Apache Flinkなんかと似た様なことができる。

Kafka Streamsが良いのは以下の点。

  • ただのConsumer/Producerのラッパーなのでfat-jarファイル一つで簡単に動かせる
  • 可用性やデータの分散をkafkaのconsumer groupとtopicの仕組みに載せているので別途ミドルウェアを必要としない

なので、kafkaが動作している環境ならJavaCLIツールを書く様な感じで分散ストリーム処理が実装できる。

簡単な例だとこんな感じになる。

public class EventCountStream {
    static Topology buildStream(Properties props) {
                StreamsBuilder builder = new StreamsBuilder();
                builder.stream("input-event-topic", Consumed.with(Serdes.String(), Serdes.Integer());
                builder.groupByKey().count();
                builder.to("counted");
                return builder.build(props);
    }

    public static void main(String[] args) {
        String bootstrapServers = envs.getOrDefault("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092");

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "repro-event-count-stream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000");
        props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "500");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "2500000");

        Topology topology = buildStream(props);

        System.out.println(topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("graceful stop");
            streams.close(Duration.ofSeconds(60));
            System.out.println("stream stopped");
            latch.countDown();
        }, "shutdown"));

        try {
            streams.start();
            latch.await();
        }
        catch (Throwable e) {
            e.printStackTrace();
            System.exit(1);
        }

        System.exit(0);
    }
}

Kafka Streamsのコンセプトモデルと実行モデル

ここでは、今、現時点で自分が理解しているKafka Streamsの動作について解説する。

大まかな概念としては、https://kafka.apache.org/23/documentation/streams/core-conceptsに書かれているが、StreamBuilderクラスに生えているDSLメソッドを使ってノードを作成し、それをグラフ上に繋げていくことでストリーム処理のパイプラインを構築することになる。

例えば、あるトピックからデータを取得するSourceノード、値を変換するProcessorノード、グルーピングするProcessorノード、aggregationを行うProcessorノード、別のトピックに書き出すSinkノード等を繋げて処理を構築することになる。

実際の実行モデルについてはhttps://kafka.apache.org/23/documentation/streams/architectureに記述がある。

Kafkaはトピックのパーティション毎にどのConsumerが処理を担当するかを固定で割り当てて、処理対象が重複するのを防ぐ機能があり、それをConsumer Groupと呼ぶ。 slideshareで見つけた資料から引用させてもらうとこんな感じ。

Kafka Streamsの基本的な実行モデルでは、Consumer Groupの各パーティション割り当て毎にタスクという実行単位が作られる。タスクは一連のストリーム処理を実行する。 (実際には全てのパーティションからデータを取得するソースノードや、処理ノードのグラフ構造が複数のトポロジに分割されるケースもあり、1:1で対応している訳ではない) また、Kafka Streamsは一ノードにつき任意のスレッド数で実行できて、そのスレッドをStream Threadと呼ぶ。 各タスクはStream Threadの中で起動し、一つのスレッドの中に複数のタスクを持つことができる。

f:id:joker1007:20190828183036p:plain

つまり、ソーストピックのパーティションの数だけ実行タスクが生成されるが、それぞれのタスクの実行主体はKafka Streamsワーカーのノード数 x スレッド数に分散して割り当てられる。 その割り当てをKafkaのConsumer Groupを使って管理している。 注意点として、一つのストリームスレッドで複数のタスク(つまり複数のパーティション)を処理できるが、スレッドが共有だと一つのパーティションのデータを処理している間は他のパーティションのデータ処理は待たされることになる。

コードレベルでの実行イメージ

Kafka Streamsはラムダで一連の処理が繋がっているかの様にコードを書くことができるが、実際に一つ一つのラムダ式として記述された内容は常に同一のスレッドや同一のノードで実行される訳ではない。 例えば、レコードのキーを変更する操作を行うとrepartitionが発生し、一旦kafkaの中間トピックを介して別タスクに処理が移ってから続きの処理が実行されることになる。その別タスクは完全に別ノードで実行されるかもしれないし、別のスレッドで実行されるかもしれない。 なので、各ラムダに書いたものは完全に独立した処理としてコードを書かなければならない。スコープ外から持ち込めるのはstaticで各ノードで共通であることが分かっているものぐらいしか使ってはいけない。

また、ストリーム処理を実装する上では、あるパーティションのロカリティを考慮した実装することが重要になる。 kafkaはデフォルトのパーティショナーにmurmur2 hashを利用しており、同一のキー, 同一のパーティション数であれば同じパーティション番号にデータが配置される。 そして、同じトポロジ内の同一パーティション番号は同じStream Threadに処理が割り当てられる。 それによって、あるキーでグルーピングした処理は同一のStream Threadで処理されることが分かるので、集計処理を行うことが可能になっている。 こういう仕組みになっているため、Stream DSLより低レイヤのAPIを利用してマニュアルで状態管理をしつつ集計する時には、その処理がどういうキーでパーティションされていてどういう単位でState Store(ワーカー内部に持っている状態管理用のKey Value Store)を持っているかを考えながら実装する必要がある。

KStreamとKTable

Stream DSLで構築したパイプラインは大きく分けてKStreamとKTableという二種類のモデルで表現される。 公式リファレンスによると、KStreamはレコードストリームの抽象、KTableは変更ログストリームの抽象と説明されている。もうちょっと詳しく説明するとそれぞれ以下の様な感じで動く。

  • KStreamは一つ一つのレコードがデータとして完結しており、全てのレコード一つ一つをそのままストリームデータとして扱う。
  • KTableは一つのレコードは変更履歴情報であり、ストリーム処理で流れるデータは変更履歴を集約した最新の状態を表すレコードになる。 テーブル上に存在する更新可能なレコードの様に扱うことができるのでKTableというのだと思う。

Kafkaはストリームバッファなので、KTableの様なデータを扱うためにはストリームの開始時点から現在までのデータを全て読み出して再生しないと最新の状態を復元できない。 しかし、毎回そんなことをやってられないので、Kafka Streamsでは起動時にレストア処理が走って、ソーストピックからデータを取得して各ワーカーのローカルにあるState Storeという領域に最新の情報を保存する形になっている。 State StoreはデフォルトでPersistent State StoreとIn Memory State Storeが存在する。Persistent State StoreはRocksDBという埋め込み型のデータベースをKVSとして利用してデータを保存している。

Kafka Streamsではグルーピングして集計するという様な状態を保持したストリーム処理はKTableとして表現される。そういった処理には状態復元のデータソースになる中間トピックと各ワーカーにState Storeが必要になる。 当然両方ともストレージを消費するので、KTableとして扱うデータが多ければ多い程各ワーカーのローカルストレージ領域が必要になる。

KTableデータの保持期間

ストリーム処理でデータを集計する際には、タイムウインドウを区切って集計するのが一般的。 もし、タイムウインドウを1時間で集計しているとして、そんなデータを何週間も保持しなくて良いことの方が多いだろう。 そういう処理で無駄なデータを保持し続けない様にするために、集計処理毎にデータ保持期間を設定できる様になっている。

運用時の考慮点

Kafka Streamsは状態を保持する必要がある機能はKafka本体にデータを保持する仕組みになっているので、基本的にワーカーを増やすだけで処理をスケールアウトさせることができる。 しかし、前述した様なState Storeのレストア処理やConsumer Groupへの参加申請に伴うrebalance処理にそれなりに時間を要する。

そのため、頻繁にワーカーが出入りする様なオートスケールを使ったコスト節約方法とはかなり相性が悪い。 ちなみにrebalanceにかかる時間に対する改善策は開発コミュニティでも議論されていて、今後は高速化されるかもしれない。

KIP-345: Introduce static membership protocol to reduce consumer rebalances - Apache Kafka - Apache Software Foundation

そういう訳なので、レストアスループットを向上させるためのチューニングや頻繁にノード数を増減させなくていい様なスケーリングが必要になる。 また、一定以上接続が無いとConsumer Groupから離脱してしまうため、いくつか安定稼動のために重要なconfigがあったりする。

  • max.poll.interval.ms この設定値を越えてpollingが行われないとConsumer Groupから離脱する。通信状況や負荷などによって処理が詰まった場合、復活したり離脱したりを繰り返して延々処理が進まなくなる可能性があるので、ある程度余裕を持った値に設定しておいた方が良さそう。
  • session.timeout.ms この設定値より長い期間heartbeatがbrokerに届かなかった場合、Consumer Groupから離脱する。これも上記と同様の理由である程度余裕を持たせた方が良いだろう。
  • max.poll.records 一回のpollingで取得するレコード数。一回pollingしてからそのレコードバッチに対して処理が行われるため、スループット効率を狙い過ぎると、max.poll.interval.msを越えてしまってConsumer Groupから変に離脱してしまう可能性がある。
  • max.partition.fetch.bytes あるパーティションから一度に取得できるデータ量。レストア処理のスループットに影響する。
  • receive.buffer.bytes TCPのRCVBUFの値を設定する。デフォルトが65535と現在のネットワーク構成からすると少ない値になっているので、設定しなおした方が良い。

また、当たり前の話だが、そもそもデータ量を減らすのは大事なので、バイナリシリアライズフォーマットを採用したりしてトラフィックを削減するのも重要になる。

終わりに

まだ本格的に運用しているという段階ではないので、他にも考慮点やチューニングポイントが出てくるだろうと思う。 特にConsumer Groupのrebalanceとレストア処理のリードタイムについては多少不安が残っている。

そうは言っても、必要な機能は十分に揃っている印象で、追加のミドルウェア無しで実用的な分散ストリーム処理が書けるのは非常にありがたい。 もうしばらく調査と実験を続行して本格運用に乗せていく予定。