最近のjoker1007

Rubyist近況[1] Advent Calendar 17日目です。

仕事的な近況

最近、仕事で全然Rubyを書かなくなってきた。たまにRailsアプリの改修作業をやる程度で、今年書いた量で言うなら間違いなくJavaが一番多いだろう。

直近で書いたブログ記事を参照してもらえると分かり易いが、ここ1年ぐらいの自分の主戦場はKafkaとストリームプロセッサである。

Kafkaは非常に便利なミドルウェアだがメッセージキューの様でメッセージキューでなく、分散ストレージとして動作するがブローカー自体は余り分散の仕組みをコントロールする訳でもなく、クライアントライブラリ側で色々制御する様な仕組みになっているので、どういう活用の仕方をするものなのかイマイチ理解しづらい難しいミドルウェアでもある。

自分は完全に0から考えて必要だと思って調査してほとんどの仕組みを勝手に作ってしまったので性質も活用方法も分かっているが、そんなにすぐに各メンバーにそういった知識が伝えられる訳では無いので、Kafkaをゴリゴリ使う機能は必然的に自分が主導で書くことになる。

段階的に知識を移譲している最中だが、当分の間は作って説明して資料を作ってというのを繰り返すことになるだろう。

こんな感じで、ひたすらKafka周りの整備や作ったストリーム処理アプリケーションのパフォーマンスチューニングや新機能の実装をやっているとJavaばかり書くことになってしまった。今は秒間10万件のレコードを速攻で処理するストリーム処理基盤にガシガシ機能を追加している。

実際、Javaを結構書く様になって思ったが、最近のJavaには余りストレスを感じない。まあinterfaceの扱いとか中途半端なOptionalとかコンストラクタの制約とかlombok頼りな現状とか色々どうにかならんのかと思うことはあるが、長年の積み重ねで磨き抜かれたInteliJが余りに便利なので、開発体験自体はかなり良い。

(正直、そびえ立つまでに大きくなったRailsアプリケーションを改修するより遥かに楽とさえ言える……。やっぱ型検査とそれを利用した高度なLanguage Serverとか欲しくなるよね。)

そして、Rubyを全然書いていないのでgemのメンテやら何やらも割と放置気味で、3.1が出ようとしているこの時期になっても、全然新機能で遊んでいる余裕もない。Rubyistとしては恥ずかしいばかりである。仕事でMPを使い切っているのも余り良い状態ではない。もっとゆとりがあった方が良いのは間違いないだろう。

一方で、Kafkaについては多分かなり詳しくなったので、無茶苦茶高度な質問でなければ答えられるぐらいには理解したと思う。もしKafkaに興味がある人が居たらどこかで情報交換できると嬉しい。

ちなみに、何故Kafkaを使ってるのか、何がそんなに大事なのかという話については、後日改めてブログに書こうかなと思っているので、興味がある方は是非読んでやってください。

個人的な近況

今年は年の始めにzen3アーキのRyzenRadeon RX 6900 XTを使って新しいPCを組んだので、PC環境は今のところかなり快適な状態。

joker1007.hatenablog.com

OSは昔から愛用しているGentoo Linuxを使っている。Linuxにおけるノイキャンの方法とかビデオ会議やらなんやらの環境構築については、以下の様な記事を書いたので、もしLinuxに乗り換えようか考えてる人が居たら、参考になるかもしれません。

joker1007.hatenablog.com

別にやんちゃクラブに触発されたという程ではないが、SteamのおかげでLinuxでも結構PCゲームは出来る様になっており、Gentoo LinuxでもPCゲームはちゃんと遊べるぞ、ということを主張するためにゲームプレイを配信していこうかと思って、最近配信試験をしていたりとかする。とりあえず今はFF4のピクセルリマスターのプレイをYouTubeに記録している。今はちょうどラストダンジョン直前まで来てます。

www.youtube.com

他にHorizon Zero Dawnにシェーダー噛ませてRay Tracingを使える様にした比較動画とかを作ったりしてます。

www.youtube.com

Linuxでこの辺りのことをやる方法についても、暇を見つけて解説記事書いてみようかと思っているので、こちらも興味がある方はお楽しみに。

今年買ったものとか

今年買った大きい買い物は基本的にデスク周りのものが多い。まあ基本引き篭りやしね……。

  • FlexiSpotの電動昇降机
  • 3万円ぐらいのデスク用のサウンドバー (暫定的に買ったものなので、もうちょい良いものが欲しい。HDMI 2.1に対応していて4k 120fps出るやつとか無いだろうか)
  • 10Gbps対応のtp-linkのスイッチングハブ (しかし10GbpsのNICは無い……)
  • Sony WF-1000XM4 (無くしそうと思ってたが、家の鍵と同じ固定のポケットに入れられるので割と大丈夫かもしれない)
  • iPhone13 (5年ぶりの新スマホ)

WF-1000XM4は思ってたより買って良かった。うちのGentoo Linuxと96khzで接続できたし、ちょうど緊急事態宣言も明けて神田ぐらいまで飲みに行く機会も増えた時期に買ったので、利用頻度も結構高い。最初ちょっと音が籠ってるかなと思ったが慣れてきたら十分に良い音だと思う様になってきたし、これはオススメの品だと思う。

生命的な近況

いよいよ30代も大分後半になってきて、ぶっちゃけ体のあちこちにガタが来ている実感がある。そもそも不摂生の塊みたいな人間なので生活を改めろよ、という話なのだが、面倒なので何も対処しないまま何となく生きている。

正直、後10年ぐらいしたら死ぬのでは……ぐらいの感じではあるのだが、万が一半端に長生きしてしまった時のために、そろそろ金をまともに溜める手段を考えないと、このままでは老人になった時に詰んでしまうので、その辺りも気にする様になってきた。

とりあえず、来年の目標としていい加減にIDECOとNISAぐらいちゃんとやるかと考えている。

しかし、自分は家庭の都合という奴でどうにもならない出費が物凄く多いため、どうやって安定して運用資金を捻出するか非常に難しい。実際、毎月の収支は現状トントンか弱赤字である。収入が途絶えたら即座に死ぬ。(皆どうやって金溜めてんのかなと不思議である)

まあ、今のところは働けなくなる程のダメージは負ってないので、しばらくは死なないと思う。もし死にそうになったらTwitterで死ぬ手前ぐらいまでは実況していきたいと思っている。

まとめ

という訳で、何とか生きてます。2022年もよろしくお願いします。来年はもう少し友人に会って「うぇーい」ってビールジョッキを酌み交わせる日が戻ってくると良いなあ。

Kafka StreamsのStateStoreのちょっと変わった使い方 (ローカルキャッシュ、 コンフィグストア)

この記事は Kafka Advent Calendarの14日目の記事です。(1日遅れてるけど)

今回は割とライトな記事です。

ローカルキャッシュとしてのStateStore

Kafka StreamsにおいてStateStoreは、トピックから入力されて何らかの状態を保持した処理をしたい場合にその状態を永続化するために利用するのが基本的な形です。

一方でProcessor APIを活用しだすと、別にトピックから入力されてKafka Streamsで処理したものだけが保持対象じゃないことに気付きます。

StateStoreは基本的にはchangelogと紐付いているので、大規模な集計処理等を行ってデータ量が膨れ上がると、もしノードのストレージが無くなった時やrebalanceが走った時に、レストアに非常に長い時間が必要になります。これは現時点でちょっとやそっとでは解消しようがありません。

それを避けるために、通信という大きなオーバーヘッドを抱えても外部のデータソース(RedisやCassandraなど)からデータを取得したくなることがあります。

RedisやCassandraはかなり読み込みが高速なデータストアですが、Kafka Streamsの様なストリームプロセッサで頻繁に通信を行うと流石にローカルのStateStoreと比べて劇的に性能が落ちます。

そこで、withLoggingDisabled()をセットしたWindowStoreをキャッシュに使います。

changelogと紐付いていなければレストア処理が発生しないので、Processor APIの中でWindowStoreにデータが見つからなかった時だけデータを取りにいけば、処理内容によってはかなりの割合で外部との通信処理を削減することができます。

ここでWindowStoreを利用するのは現在KeyValueStoreでは自動的にデータをexpireする方法が無いからです。WindowStoreならretention timeを調整することで一定期間過ぎたStateStoreをローカルストレージから自動で削除できます。(RocksDB自体にはTTLを設定する方法があるがKafka Streamsからはそれを変更できない実装になっている)

実際MemoryStoreには、lruMapというストアが用意されており、オンメモリに乗る範囲であればこれを利用するとローカルなキャッシュがシンプルに実現できます。ただこれはメモリに乗る範囲でしか使えないことと、プロセスを再起動したら消滅してしまうことが難点です。

KeyValueStoreでTTLが使える様になると嬉しいんですが、今のところissueがあるのみ、といった感じですね。(https://issues.apache.org/jira/browse/KAFKA-4212)

コンフィグストアとしてのGlobalKTable

GlobalKTable(GlobalStateStore)は、全処理ノードに分配されるキーバリューストアとして利用できます。つまりデータフォーマットさえ決めておけばKafkaのトピックを経由して、任意の設定値を起動中の全ノードに配布したり削除したり出来るということです。

例えば、Kafka Streamsに限らずストリームプロセッサは処理が複雑になってくると結構デバッグするのが大変です。そういった時にユーザーの処理の流れを事細かに追いたいケースがあります。

デバッグモードを作り込むことは出来ますが、全ユーザーに対してそれを動かすと負荷がとんでもないことになって処理が詰まってしまったり、ログが膨大になったりする可能性が高いでしょう。

なので、対象のユーザーを絞ったりサンプリングして取得する様にしておきたくなります。

こういったケースでGlobalKTableに対象のユーザーの識別子やサンプリングの割合などの設定値を保持可能にしておきます。開発者はCLIのKafka Producerを使ってGlobalKTableになっているtopicにJSONを送り込めば、全ノードに対して起動したまま対象ユーザーの設定を変更することができます。必要無くなればnull値を送って設定を消せば元通りに出来ます。

こういった仕組みを管理機構として組み込んでおけば、いざという時に情報収集がしやすくなります。

この様にGlobalKTableはStreamからのJoin対象としてだけ利用するのではなく、外部から全ノードに設定値を即座に転送しストリームプロセッサ内で利用する仕組みにも応用できる、という話でした。

Kafka StreamsのWindowStoreのchangelogを再利用するためのダーティハック

この記事で紹介する内容はある意味で非常にリスキーであり、Kafka Streamsの内部実装に強く依存しています。 現状代替できる策が無いため、やむなく編み出した方法であることに注意してください。

Kafka StreamsのStateStoreとchangelogについて

Kafka Streamsではイベントの数をカウントするなどのStatefulな処理を実現するためにStateStoreというキーバリューストアを利用します。 もし処理ノードが壊れた時にも、キーバリューストアのデータが失われない様にするため、Kafka StreamsはStateStoreにデータを書き込むと同時にKafka Brokerに変更内容を書き込みデータの永続性を担保します。これをchangelogトピックと言います。

WindowStoreのchangelogの再利用

24時間のTimeWindowでユーザーのイベント毎の実行回数をカウントし、1ヶ月間(30日)保持しているとします。この場合、保持しておかなけれはいけないレコードの総数は最大で ユーザーの総数 * イベントの種類 * 30日分 になります。もしユーザーが1000万人、イベントが100種類あったとすると300億件のレコードを保持する必要があります。

そして、このイベントカウンターを他のKafka Streamsアプリケーションで利用したくなった時にどうするかを考えます。 (ワークロードが大きく異なりクラスタのキャパシティコントロールを考慮して別クラスタとして管理したい等の理由で)

この場合、現時点でこのWindowStoreを真っ当な手段で再利用する方法はありません。

安全な方法はイベントカウンターの情報を別のトピックに書き出して、それをKTableとして別アプリケーションから読み出す方法です。

しかし、この方法では単なるKeyValueStoreとしてしか読み出せなくなる上に、changelogトピックと別のトピックにデータを書き出す必要があってブローカーの負荷やストレージ消費が2倍になります。数千億件ぐらいの総数になってくると、気楽に倍のデータを保持する訳にはいかなくなってきます。

なので、どうにかしてWindowStoreのバックにあるchangelogを読み出してWindowStoreを復元できないものかと方法を探しました。

そして、Topology Optimizationで使われている内部APIを無理矢理引っ張り出すことで実現可能であることを発見しました。

InternalTopologyBuilder#connectSourceStoreAndTopicとWindowStore書き込みプロセッサの実装

Topology Optimizationを有効にすると、KTableとしてトピックのデータを読み込んだ時にchangelogトピックを新規に作成せずに、直接StateStoreとソーストピックを接続し、StateStoreにcheckpointが存在しない場合は、そのソーストピックからデータを取ってリカバリする様に動作します。

この処理をコントロールしているのが、InternalTopologyBuilder#connectSourceStoreAndTopicです。

    public void connectSourceStoreAndTopic(final String sourceStoreName,
                                           final String topic) {
        if (storeToChangelogTopic.containsKey(sourceStoreName)) {
            throw new TopologyException("Source store " + sourceStoreName + " is already added.");
        }
        storeToChangelogTopic.put(sourceStoreName, topic);
        changelogTopicToStore.put(topic, sourceStoreName);
    }

ちなみにKTableのOptimizationでどう呼ばれているかというと、こんな風なコードになっています。

// TableSourceNode.java

    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
        final String topicName = topicNames().iterator().next();

        // TODO: we assume source KTables can only be timestamped-key-value stores for now.
        // should be expanded for other types of stores as well.
        final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
            new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();

        if (isGlobalKTable) {
            topologyBuilder.addGlobalStore(
                storeBuilder,
                sourceName,
                consumedInternal().timestampExtractor(),
                consumedInternal().keyDeserializer(),
                consumedInternal().valueDeserializer(),
                topicName,
                processorParameters.processorName(),
                (ProcessorSupplier<K, V, Void, Void>) processorParameters.processorSupplier()
            );
        } else {
            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
                                      sourceName,
                                      consumedInternal().timestampExtractor(),
                                      consumedInternal().keyDeserializer(),
                                      consumedInternal().valueDeserializer(),
                                      topicName);

            topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName);

            // only add state store if the source KTable should be materialized
            final KTableSource<K, V> ktableSource = processorParameters.kTableSourceSupplier();
            if (ktableSource.materialized()) {
                topologyBuilder.addStateStore(storeBuilder, nodeName());

                if (shouldReuseSourceTopicForChangelog) {
                    storeBuilder.withLoggingDisabled();
                    topologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName);
                }
            }
        }

    }

このメソッドによってStateStoreとソーストピックの関係が登録されることで、このソーストピックから直接StateStoreをレストアする動作が動く様になります。

このAPIは非公開になっていて通常はアクセスできないので、これをReflectionを使って無理やり引っ張り出します。

  @SneakyThrows
  private InternalTopologyBuilder getInternalTopologyBuilder(Topology topology) {
    Field internalTopologyBuilderField = Topology.class.getDeclaredField("internalTopologyBuilder");
    internalTopologyBuilderField.setAccessible(true);
    return (InternalTopologyBuilder) internalTopologyBuilderField.get(topology);
  }

  private void connectSourceStoreAndTopic(
      Topology topology, String storeName, String changelogTopicName) {
    InternalTopologyBuilder internalTopologyBuilder = getInternalTopologyBuilder(topology);

    internalTopologyBuilder.connectSourceStoreAndTopic(storeName, changelogTopicName);
  }

この様にTopologyクラスから非公開フィールドのinternalTopologyBuilderというフィールドを引っ張り出してきて無理矢理アクセスして、その中にあるInternalTopologyBuilderインスタンスを引っ張り出します。

InternalToplogyBuilderインスタンスさえ手に入れば、後は普通にpublicメソッドを呼ぶだけなので、これで対応可能です。

  connectSourceStoreAndTopic(
      topology,
      eventCounterStore.name(),
      "other-app-event-counter-store-changelog");

ただし、これだけではまだ再利用できません。これで動作するのはレストアだけなので、新しくレコードが到達した時にそのレコードをWindowStoreに書き込むことが出来ません。

そのため、上記に加えてProcessor APIを使って新しくレコードが来た時に正しくWindowStoreに書き込む処理を実装します。

import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.WindowStore;

@Slf4j
public class WindowStoreSource<K, V> implements ProcessorSupplier<Windowed<K>, V, Windowed<K>, V> {

  private final String queryableName;

  public WindowStoreSource(final String queryableName) {
    Objects.requireNonNull(queryableName, "storeName can't be null");

    this.queryableName = queryableName;
  }

  @Override
  public Processor<Windowed<K>, V, Windowed<K>, V> get() {
    return new WindowStoreSourceProcessor();
  }

  private class WindowStoreSourceProcessor implements Processor<Windowed<K>, V, Windowed<K>, V> {
    private WindowStore<K, V> store;

    @Override
    public void init(ProcessorContext<Windowed<K>, V> context) {
      store = context.getStateStore(queryableName);
    }

    @Override
    public void process(Record<Windowed<K>, V> record) {
      store.put(record.key().key(), record.value(), record.key().window().start());
    }
  }
}

基本的に来た側からデータをStateStoreに書いているだけです、レコードのTimeWindowの開始時点をちゃんとWindowStoreに書き込む時に引っ張ってきていることに注意してください。

最後にchangelogトピックのSerdeとWindowStoreBuilderを適切に設定して完了です。

  // changelog topicに書き込まれているTimeWindow情報付きのキーのデシリアライザ
  // このデシリアライザはドキュメントは無いものの、普通にアクセスできる。
  TimeWindowedDeserializer<String> windowedStringDeserializer =
      new TimeWindowedDeserializer<>(
          Serdes.String().deserializer(), Duration.ofHours(24).toMillis());
  windowedStringDeserializer.setIsChangelogTopic(true);

  final var EVENT_COUNTER_SOURCE = "EVENT_COUNTER_SOURCE";
  topology.addSource(
      EVENT_COUNTER_SOURCE,
      windowedStringDeserializer,
      new EventCounterDeserializer(),
      "other-app-event-counter-store-changelog");

  final var EVENT_COUNTER_PROCESSOR = "EVENT_COUNTER_SOURCE";
  topology.addProcessor(
      EVENT_COUNTER_PROCESSOR,
      new WindowStoreSource<String, EventCounter>(eventCounterStore.name()),
      EVENT_COUNTER_SOURCE);
  topology.connectProcessorAndStateStores(
      EVENT_COUNTER_PROCESSOR, eventCounterStore.name());

最後に

実際のところ、これは内部の実装が変わると容易にぶっ壊れる上に、完全にundocumentedなので、全くその実装が維持されることは保証されていません。 とはいえ、フレームワークがメジャーバージョンアップしたら色々直さなければいけないのは普通のことなので、Kafka Brokerを実際に立ててWindowStoreのレストアと書き込みに関するIntegration Testを実装し、CIを回しておけばまず気付けるのではないかと思います。

私の環境では、このハックによってかなりの量のデータやネットワーク負荷を削減することが出来たのでリスクと天秤にかけて利用する価値はあると判断していますが、正直万人にオススメはしないハックと言えます。

とはいえ、Kafka Streamsの内部実装を知る勉強になるかもしれないので、今回紹介させていただきました。

Kafka Streamsのlow level APIであるProcessor APIの活用例 (範囲JOINの実現)

この記事はKafka Advent Calendarの10日目の記事になります。

Kafka StreamsにはProcessor APIというlow levelなAPIがあります。

https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html

https://docs.confluent.io/ja-jp/platform/6.0.1/streams/developer-guide/processor-api.html (confluentには日本語のドキュメントもあるのでサクっと読みたい人はこちら)

私がしばらく開発してきたKafka Streamsアプリケーションは開発の最初こそStream DSLを使って開発してましたが、今やほとんどProcessor APIしか使ってません。 実際のところ、これを使えば何でも出来るんですが、何でも出来るとなると逆に取っ掛かりが捕めないと思うのでユースケースを紹介していきたいと思います。

一番重要な点はStreams DSLの裏に隠れているStateStoreを直接使えるという点です。これにより通常のStream DSLでは実現できないがユースケースとして十分に必要性がありそうな事が実現可能になります。

1. 2つ以上の関連レコードを取得するJOIN

Stream DSLによって提供されるJOINはキーによる完全一致のみです。INNER JOIN, LEFT JOIN, OUTER JOINとありますが、キーによって一致する最新のレコードを取ってくることしかできません。

一方で、Processor APIを利用すると部分的にキーが一致するレコードを複数取得するJOINや、キー以外の値でJOINする事が可能になります。

これには、実際のところStateStoreが実際のところどういう物なのかということを理解しておく必要があります。(注: ここで解説する挙動は組込みで実装されているStateStoreに関する挙動であり、自分でカスタムなStateStoreを実装している場合はこの限りではありません)

StateStoreはKafka Streamのタスクごとに作成され、キーのバイト列でソートされたキーバリューストアとして表現されます。これはOn MemoryでもPersistent(backendはRocksDB)でも同じです。

そして、実はStateStoreは本当にただのキーバリューストアなので、保存されているキーとKafkaのパーティションとは全く関係がありません。

どういうことかというと、Kafkaがuser_idでパーティションされていて、それによってKafkaのパーティションIDが決まっていたとしても、StateStoreに書き込むキーはそれと関係なく、例えばgroup_idとかuserの行動ログのevent_id等を設定できるということです。

例えばuserがある行動を取る度に、userが複数登録しているWeb Hookを叩きたいとします。

この場合user_idをパーティションキーとして利用して、以下の様なイベントログを受けるevent_logsトピックがあると考えてみましょう。

{user_id: 1, event_id: 123, payload: {"key1": "value1", "key2": "value2"}}

そして、event_idが123に一致する時にあるURLにevent_logsのpayloadを渡してアクセスして欲しい、というWebHookを受け取るweb_hooksというtopicを考えてみます。データ構造は以下の通り。

{webhook_id: 1, user_id: 1, event_id: 123, url: "http://foo.example.com/api", http_method: "POST"}
{webhook_id: 1, user_id: 1, event_id: 123, url: "http://bar.example.com/api", http_method: "POST"}
{webhook_id: 1, user_id: 1, event_id: 123, url: "http://hoge.example.com/api", http_method: "POST"}

この例で分かる様に一人のユーザーは複数のWebHook設定を持っている可能性があります。一つのイベントが来た時にキーが完全一致する一件だけを取得するのでは必要な情報が足りません。

という訳でキーとパーティショニングを工夫します。Kafkaはキーとバリューでレコードを管理しますが、実際のところキーとパーティション割り当ては直接関係がありません。Producerのクライアントライブラリのデフォルトの挙動がそうなっているだけで、送信時にレコードのキーと異なる任意のキーでパーティションIDを割り当てることができます。

この例では、user_idでパーティショニングした上でレコードのキーを#{sprintf( '%020d', user_id)}:${webhook_id}の様な形にしてbrokerに送信します。

こうしておいた上でKafka StreamsアプリでKTableとして読み込みます。

StreamsBuilder builder = new StreamsBuilder();

KTable<String, WebHook> webHooks =
    builder.table(
        "web_hooks",
        Consumed.with(Serdes.String(), new WebHookSerde())
            .withName("Source-WebHook"),
        Materialized.<String, WebHook, KeyValueStore<Bytes, byte[]>>as(
                "web_hooks_store"));

// SerdeはJSONをオブジェクトとして読み込んでくれるものが定義されているものとします

この様にKTableとして読み込む様にすると、user_idでパーティショニングされたIDと関連するタスク上でStateStore上には#{sprintf(user_id, '%020d')}:${webhook_id}というキーでデータが書き込まれて保持されます。

そして、イベントログを受け取る側でProcessorAPIを利用して、KTableとして書き込んだStateStoreを参照します。

KStream<String, EventLog> eventLogs =
    builder.stream(
        "event_logs",
        Consumed.with(Serdes.String(), new EventLogSerde())
            .withName("Source-EventLog"));

eventLogs.process(() -> new EventLogProcessor(), Named.as("EventLogProcessor"), "web_hooks_store");

この様に引数で指定することでProcessorは名前が一致する定義済みのStateStoreを参照することができます。

Processorはこの様な実装になります。

public class EventLogProcessor implements Processor<String, EventLog, String, WebHookRequest> {
  private ProcessorContext<String, EventLog> context;
  private TimestampedKeyValueStore<String, EventLog> eventLogStore;

  @Override
  public void init(ProcessorContext<String, WebHookRequest> context) {
    context = context;
    eventLogStore = context.getStateStore("web_hooks_store");
  }

  @Override
  public void process(Record<String, EventLog> record) {
    var userId = record.value().getUserId();
    var keyPrefixStart = String.format("$020d", userId)
    var keyPrefixEnd = String.format("$020d", userId + 1);
    try (var it = eventLogStore.range(keyPrefixStart, keyPrefixEnd)) {
      while (it.hasNext()) {
        var keyValue = it.next();
        var valueAndTimestamp = keyValue.value;
        var value = valueAndTimestamp.value();
        if (record.value.getEventId() == value.getEventId()) {
          var webhookRequest = new WebHookRequest(userId, value.getWebHookId(), record.value().getPayload());
          context.forward(record.withValue(webhookRequest));
        }
      }
    }
  }
}

注目して欲しいのはcontext.getStateStore("web_hooks_store")var it = eventLogStore.range(keyPrefixStart, keyPrefixEnd)です。

initの中でprocessメソッドに引き渡したStateStoreを取得していますが、Processorの中で取得できるのは引数で渡された名前のStateStoreだけです。この様にしてinit内で取得したものをプライベート変数の中に保持しておきます。またレコードのフォワードに利用するのでProcessorContextも一緒に保持しておきます。

実際にレコードが到達した時に処理されるのがprocessメソッドです。

レコードが到達したらレコード情報からuser_idを取得しそれを元にキーのprefixを作ります。そしてそのキーのprefixを利用してStateStoreのrangeメソッドを使って範囲取得を実行します。StateStoreはキーのバイト列でソートされているので、キーの構成が適切なら特定のuser_idに一致する全てのレコードがStateStoreから取得できます。

後はイテレーターを回して、一つづつレコードを結合してforwardメソッドに渡すだけで、後続で任意の処理が実行できます。この例だと次のProcessorでWebHookの実際のリクエスト処理を実装することになるでしょう。

この様にすることで、KTableを利用して範囲JOINが実現可能です。

注意点と補足

Processor APIを使ってcontext.forwardしてしまうと、現時点では移行の処理はlow levelのProcessor APIでしか処理できません。後続の処理をDSLでやる様に戻すことは出来ないということです。

Processor APIを使って後続の処理を実装する場合は以下の様になります。

topology.addProcessor(
    "NextProcessor",
    () -> new NextProcessor(),
    "EventLogProcessor",
    "other_state_store_name");

こんな感じで一度StreamBuilderからbuildした後のTopologyに対してaddProcessorを読んで、親のノードの名前を指定することで任意のProcessorを後続に繋げることが出来ます。 この後続のProcessorはいくつも追加できる様になっていて、親になるProcessorcontext.forwardすると全ての子となるプロセッサで処理が実行されます。もし、特定のProcessorだけに処理を渡したい場合はProcessorに渡した名前(この例ではNextProcessor)を利用して特定の子Processorにだけ処理をルーティングする様にcontext.forwardの引数で調整することもできます。

また、補足ですが、context.fowardした後の後続の処理は全て同期的に連なった単なるメソッド呼び出しになっており、あるProcessor内で複数回context.fowardを呼ぶとその都度末端まで処理が同期的に実行されてから次のレコードに処理が移ります。

そして、あるレコードに起因する全ての処理が末端まで呼び出された後に、次のレコードが処理され、そしてバッファに存在している全てのレコードの処理が終わると、次のランループに入ってレコードをtopicからpollingする処理が実行されブローカーからレコードを取得します。

実際の所、Kafka StreamsはStream Threadごとに並列で動作しますが、そのStream Threadの中身は単純なwhile loopによる同期処理の連なりで実現されている、ということです。

まとめ

今回はKafka Streamsのlow level APIで実現する範囲JOINについて解説しました。 他にもProcessor APIを使った割と利用頻度の高そうなユースケースがいくつかあるのですが、今日は力尽きたのでここまでにさせていただきます。 また時間があれば、別のユースケースも紹介するかもしれません。その時はまた読んでやってください。

LinuxでWF-1000XM4のハイレゾ再生を利用する方法

SonyのフルワイヤレスイヤフォンであるWF-1000XM4は最大で96khzに対応しており、ハイレゾ対応であることを売りの一つにしているが、それを利用するためにはBluetoothのデータ転送codecとしてLDACというcodecを使う必要がある。

まず前提としてデフォルトのWF-1000XM4は接続性を優先する設定になっていてLDAC接続が利用できなくなっているので、スマホと接続する等してSonyのHeadphonesアプリから音質優先モードに設定を変更しておく必要がある。(ちなみにiPhoneはそもそもLDACに対応していないので音質優先モードにしてもLDACは使えない)

LinuxでLDAC接続する方法

pulseaudio-modules-bt

Linuxであれば、pulseaudio-modules-btがインストールされていれば、pulseaudioの接続プロファイルの選択でLDAC接続を選択することができる。

github.com

メジャーなディストリビューションなら概ねパッケージが提供されているので、それでインストールできる。

利用する前に、最初にbluezを使ってイヤフォンとペアリングしておく必要があるが、以降は普通にpulseaudioのsink interfaceとして利用できる。

ただpulseaudioはデフォルトの設定だと、一旦ソフトウェアレベルで再サンプリングされて44100hzか48000hzに落とされるはずなので、avoid-resamplingを有効にしてsample-rateの設定を変更しておく必要がある。

この辺りをちゃんと意識しておかないと、結局44100hzで再生されているという様な状態になるのでpulseaudioを利用する時には再サンプリングの存在を意識しないとハイレゾ再生できなくなってしまう。

結構設定がややこしいので、正直音楽を聞くならpulseaudioを使うのは余りオススメしないが、普通にシステムの音声出力として利用するならこちらの方が手軽だろう。リサンプリングの有無に関わらず、LDAC接続の方が明らかに音が良い。

bluez-alsa

pulseaudioを使わずにALSAから直接デバイスを触りたい場合はbluez-alsaを利用できる。ALSAから直接触れる様になると、再サンプリング無しで直接LDACでエンコードしてBluetoothに流すことが容易なので、mpdやその他のプレイヤーを利用して音楽を聞くのがメインならこちらの方が良い。

github.com

これもメジャーなディストリビューションなら大体パッケージがあるはず。

パッケージからインストールしたらsystemdのunitファイルも一緒にインストールされるはずなので、それを使ってbluealsaのデーモンを起動する。

後はREADMEに従って、ALSAのconfigファイルである.asoundrcに以下の内容になる様に設定を記述する。

$ cat ~/.asoundrc

# 以下が設定ファイルの内容
defaults.bluealsa.service "org.bluealsa"
defaults.bluealsa.device "XX:XX:XX:XX:XX:XX"
defaults.bluealsa.profile "a2dp"
defaults.bluealsa.delay 10000

後はbluetoothctlやその他のフロントエンドを使ってWF-1000XM4とペアリングし接続するとbluealsaがALSAから利用できるオーディオデバイスを追加してくれる。

接続が上手くいけば、以下のコマンドでデバイスが確認できる。(XX_XX_XX_XX_XXはデバイスアドレスなので各環境で読み替える)

$ bluealsa-cli list-pcms
/org/bluealsa/hci0/dev_XX_XX_XX_XX_XX/hspag/sink
/org/bluealsa/hci0/dev_XX_XX_XX_XX_XX/hspag/source
/org/bluealsa/hci0/dev_XX_XX_XX_XX_XX/a2dpsrc/sink

もしくは

$ bluealsa-aplay -L
bluealsa:SRV=org.bluealsa,DEV=XX:XX:XX:XX:XXPROFILE=sco
    WF-1000XM4, trusted audio-card, playback
    SCO (CVSD): S16_LE 1 channel 8000 Hz
bluealsa:SRV=org.bluealsa,DEV=XX:XX:XX:XX:XX,PROFILE=sco
    WF-1000XM4, trusted audio-card, capture
    SCO (CVSD): S16_LE 1 channel 8000 Hz
bluealsa:SRV=org.bluealsa,DEV=XX:XX:XX:XX:XX,PROFILE=a2dp
    WF-1000XM4, trusted audio-card, playback
    A2DP (LDAC): S32_LE 2 channels 96000 Hz

しかしデフォルトでの接続設定だとAACが伝送codecになっているので、設定の変更が必要になる。上記のbluealsa-aplay -La2dpのcodec情報がLDACではなくAACになっているはず。

設定の変更はbluealsa-cliで行う。まず利用可能なcodecを確認する。

$ bluealsa-cli info /org/bluealsa/hci0/dev_XX_XX_XX_XX_XX/a2dpsrc/sink
Device: /org/bluez/hci0/dev_XX_XX_XX_XX_XX
Sequence: 0
Transport: A2DP-source
Mode: sink
Format: S32_LE
Channels: 2
Sampling: 96000 Hz
Available codecs: SBC AAC LDAC
Selected codec: LDAC
Delay: 200.0 ms
SoftVolume: Y
Volume: L: 127 R: 127
Muted: L: N R: N
$ bluealsa-cli codec /org/bluealsa/hci0/dev_XX_XX_XX_XX_XX/a2dpsrc/sink LDAC

これでselected codecがLDACになっていればOK。接続しなおしてサンプリングレートが96khzになっていることを確認する。

後は通常のalsaを利用した再生設定を各プレイヤーに追加すればハイレゾ再生が可能になる。

mpdの設定なら以下の様になる。

audio_output {
    type        "alsa"
    name        "WF-1000XM4"
    device      "bluealsa" # alsaのbluealsaに関するデフォルト設定をそのまま利用するという意味
    mixer_type      "none"
    auto_resample   "no"
    auto_format "no"
    auto_channels   "no"
}

Gentooでamf-amdgpu-proを利用してGPUで動画エンコードを行えるffmpegを作る方法のメモ

AMDGPUのハードウェアエンコード支援機能としてAMFというものがあり、それを利用できるffmpegをビルドする方法。

AMDのサイトからubuntu用のRadeon Software for Linuxを落とす。

ex. Radeon™ Software for Linux® 21.30 Release Notes | AMD

展開して、amf-amdgpu-pro-.debファイルを更に展開する。

$ ar vx amf-amdgpu-pro_21.20-1271047_amd64.deb
$ tar xvf data.tar.xz

展開された libamfrt64.so libamfrt64.so.0 libamfrt64.so.0.0.0 を /usr/libにコピー。

ldconfigを実行。

以下のリポジトリからAMF SDKをダウンロード。

github.com

展開されたファイルの中からamf/public/includeディレクトリを/usr/include/AMFにコピー。

後はffmpegをemergeでビルドしなおせば、勝手に判別してh264_amfとhevc_amfが使える様になる。

Kafkaに接続するJavaアプリケーションをGravitonインスタンスへ移行してパフォーマンスを改善する

社内向けのドキュメントと兼用したので、前提とかメンバー向けの解説が含まれているので、前後のパフォーマンスの変化だけを見たい人は、下の方のグラフ画像までスクロールしてください。

gravitonインスタンスを活用するモチベーション

ワークロードによる相性はあるが、特にマルチスレッド性能で既存のインスタンスより性能向上が見られる上にコストが安いため、うまくフィットすれば性能改善とコスト削減の双方でメリットがある。

また、周辺ハードウェアもアップデートされているため、エフェメラルストレージ付きのインスタンスのストレージサイズが増えているなどのメリットもある。

特に現時点ではr6gdインスタンスが利用したかった。

gravitonインスタンスを利用するためarm64アーキテクチャへの対応

gravitonインスタンスはarm64 (aarch64) アーキテクチャのCPUのため、既存のx86_64アーキテクチャでビルドされたアプリケーションは動作しない。

現状、弊社ではほとんどのアプリケーションはコンテナとして動作しているのでコンテナイメージをarm64アーキテクチャで構築しなおす必要がある。

arm64でコンテナイメージを作成する主な方法として以下の様なものがある。

arm64アーキテクチャで動作するビルドサーバーを構築する

弊社ではこの方針を採用している。元々docker imageのキャッシュ管理やインフラリソースの自由度のためにDockerイメージビルドサーバーがあったので、arm64対応のために新規でビルドサーバーを増築した。

arm64アーキテクチャで動作するビルドサーバー上でイメージを構築すれば自動的にarm64アーキテクチャのコンテナイメージが構築できる。

構築済みのイメージのアーキテクチャは以下のコマンドで確認できる。

docker inspect <image-id> | jq '.[].Architecture'
"arm64"

docker buildx

Dockerの実験的機能として実装されているbuildxを利用することでマルチアーキテクチャイメージを構築することができる。

docker buildx create --name arm64builder --platform linux/arm64
docker buildx use arm64builder
docker buildx inspect --bootstrap

これでビルド準備が出来る。 ビルドする時は以下のコマンドを利用する。

docker buildx build --platform linux/arm64 -t imagetag --load .

しかし、手元で実験した所、何故かgradleによるビルドでqemuが死んでビルド出来ないというエラーが発生した。

dockerやqemuのバージョンで結構動作状況が違う可能性があり、まだ安定して何にでも使えるという感じではない。

arm64で動作するCIサービスの利用

例えば、circleciではmachineタイプのexecutorを指定することでarm64インスタンスを利用してCIを実行できる。 このmachine executor上でdockerを直接操作してイメージ構築とプッシュを実行できる。

https://circleci.com/docs/2.0/arm-resources/

arm64アーキテクチャのAMI構築

弊社ではpackerを利用してAMIを構築しているが、AMI構築時にもCPUアーキテクチャの変更を考慮する必要がある。 base AMIにarm64アーキテクチャ対応のものを指定し、gravitonインスタンスを設定してビルドする。

ビルド上の注意

graviton上でより良いパフォーマンスを出すためには、いくつかの最適化が必要になる場合がある。具体的には以下のドキュメントを一通り読んでおくことを推奨する。

https://github.com/aws/aws-graviton-getting-started

特にコンパイルが必要な言語に関しては、推奨されるコンパイルフラグがあるため、より意識しておく必要がある。

Javaアプリケーションに関しては、Java11以降のOpenJDKなら概ねarm64アーキテクチャを十分にサポートしているが、Java8以前のアプリケーションでパフォーマンスを十分に発揮させるためにはAmazon CorrettoというJDKの利用を検討してみると良いと書かれています。

また、graviton2の分岐予測に影響があるため、 tiered compilationとコードキャッシュの制限を変更するオプションが有効である場合があることが記載されています。 ワークロードによって有利・不利があるので適宜検証しつつ有効化するのが良いらしい。

-XX:-TieredCompilation -XX:ReservedCodeCacheSize=64M -XX:InitialCodeCacheSize=64M

弊社では、USE_GRAVITON_OPTIMIZATION環境変数に値を入れてコンテナを起動すると、アプリケーション起動時に上記オプションを有効にする様にエントリポイントスクリプトを更新した。

移行の流れ

以下の流れでインスタンスを移行していく。

build時にターゲットのCPUアーキテクチャを選択可能にする

環境変数USE_ARM64に値を入れてcapistranoを実行すると、arm64インスタンスでdocker buildを行う様に設定する。

こんな感じで実行する。

USE_ARM64=1 bundle exec cap dev_staging docker:push

ECRはdocker manifestを設定することでマルチアーキテクチャイメージを登録できるが、現状その辺りに対応するのが面倒なため、タグ名に-arm64 suffixを付与することでイメージを分ける。上記の環境変数で判別してsuffixが付与される様に分岐してある。

今回、amazon corretto JVMを利用できる様にarm64向けの新しいDockerfile-arm64を追加した。 落ち着いたらDockerfile側に統合する予定。

arm64対応のautoscaling groupとECS clusterを追加

terraformで新しいクラスタとそのためのautoscaling groupを追加する。 AMIも新しくarm64に構築したものに変更する。 クラスタごと分けるのはインスタンスに対する配置戦略等を設定するのが面倒で、混在していると移行期に分かりにくいので、箱ごと分けることにする。

デプロイ対象のイメージとクラスターを変更し新しい環境にECSサービスを作る

arm64用のクラスタでアプリケーションを起動できる準備をする

arm64クラスタでアプリケーションを起動しつつ徐々に入れ替える

動作自体は、staging環境で事前に確認している。

datadogでモニタしているCPU利用率やメモリ消費のメトリックを中心に入れ替え前後で負荷にどういった変化があったか確認する。

性能比較

r5.xlarge -> r6g.xlargeに変更し数日稼動させた後、2日分のメトリックを1週間前のCPU負荷と比較。 メトリックはdatadog-agentで取得できるdocker.cpu.usageを使う。 (メトリック比較はproduction環境にて実施)

コスト参考: 0.304 USD /hour -> 0.2432 USD

appA (JVM, KafkaStreams, メインスレッド数: 4)

不規則にスパイクが発生するタイプのワークロード。 ベースラインのCPU利用率の減少とスパイク時の利用率のの減少が確認できた。 ただし、日によって負荷傾向に差があるため、安定した性能変化は次のアプリで見た方が良い。

graviton_compare2.png (294.4 kB)

app2 (JVM, KafkaStreams, メインスレッド数: 4)

定常的にデータ受信数に比例した負荷がかかるが概ね一定の負荷傾向で、昼や夕方に高負荷になり夜間は低負荷になる。 負荷が低い時間帯ではほぼ差が無いが、高負荷になる時間帯では20ポイント以上利用率が減少している。中間ぐらいの負荷になる時間帯でも10ポイント前後利用率が減少。 高負荷時にかなりの余裕が出来た。

graviton_compare.png (188.6 kB)

ちなみに、その他のメトリックについては顕著な差分は発生しなかった。

申し訳ありません、適当なこと言ってました

当初、色々作業してすぐだったのでごちゃごちゃしていて、前後比較の対象を間違って見ていたらしく、下記の様なツイートをしていました。

週明けに、ダッシュボードをちゃんと整理してたら、あれ?ちゃんとイメージ通りの結果になったなということが分かり、どうやら私が盛大に勘違いしていた可能性が濃厚です。

AWSさん、間違ったデータを元にネガティブな発言をしてしまい申し訳ありません。ちゃんとパフォーマンス出ました。最高です。ありがとうございます。これからもお世話になります。

皆もgravitonインスタンスをガンガン試していきましょう。

まとめ

インスタンスの変更だけで得られるパフォーマンス改善としてはかなりの効果があった上に1台当たりの基本コストは80%程度にまで抑えられるので、かなりコストパフォーマンスが良くなったと言える。

イメージビルドラインを複数用意したり移行にあたって別クラスタを用意する等の手間をかける必要はあるが、やる価値は十分になると思う。特にマルチスレッドに強いらしいので、GoやJava等のCPUバウンドなアプリケーションでメリットが大きいだろう。

注意点は、インスタンスのワークロード次第ではgravitonを採用できないケースもまだまだ多いので、当分の間arm64とx86_64が混在することは避けられないため、どちらでビルドしているのかをちゃんと意識しておかないとハマる可能性があること。