Kafka入門 第1回 「そもそもKafkaとはなにか」

これは社内向けに書いた、Kafkaってそもそも何やねん、ということをメンバーに解説するための記事を一部編集して公開できる様にしたものです。 第2回以降では、Kafkaを利用したアプリケーション開発のノウハウについて解説していく予定です。そちらも社内の事情を除いた形で公開していくつもりです。

そもそもKafkaとは

Kafkaはイベントストリーミングプラットフォームと呼ばれるミドルウェアです。 元々はストリームバッファと呼ばれてたと思います。

公式のドキュメントには以下の様に書かれています。

 Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution:

    To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
    To store streams of events durably and reliably for as long as you want.
    To process streams of events as they occur or retrospectively.

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner.

つまり、イベントストリームを書きこんだり読み込んだり出来て、継続的に他のシステムからimportしたりexportしたりできて、それを一定期間(または無期限に)保持し続けることが出来て、発生した時に処理するだけでなく遡って過去のデータを処理することもできるプラットフォームであるということです。 加えて、分散処理可能で、スケーラブルであることも特徴です。

プラットフォームの構成要素は以下の通りです。

  • Kafka Broker ミドルウェアの本体
  • Kafka Client ミドルウェアと通信するためのクライアントライブラリ
  • 上記クライアントによって実装された管理コマンド群
  • Kafka Streams and Kafka Connect アプリケーション開発のためのフレームワーク

イベントストリームとは

終端が存在しない継続的なデータの流れのことです。

システム的にはデータベースやモバイルデバイス等によって発生したデータの動きをリアルタイムにキャプチャし続けたデータの流れと言えます。 一つ一つのデータの発生や変化をイベントと呼び、それらが終端なく延々連なっているものをイベントストリームと呼びます。 このイベントストリームに対して高速に処理を行うことができれば、我々のシステムは、ユーザーに対して即座に最新の結果を届けることが出来る様になります。

イベントストリームには終端が存在しない上にその目的上リアルタイムに近いインターバルで処理することが求められるため、基本的には一件単位で処理を行うことになります。 集計などの複数レコードに渡る処理については、開発者自身で必要なデータの範囲を定義して情報を蓄積しておくストレージを別途用意する必要があります。

Kafkaのエコシステムにはそういったストリーム処理で現実のユースケースを実現するために必要なライブラリも用意されています。

Kafkaでは複数のイベントストリームを扱うことができ、それをトピックと呼びます。トピックは名前を付けて管理できる様にしたイベントストリームということです。

トピックとパーティション

各トピックは1つ以上のパーティションによって分割されています。 また、トピックにはreplication factorが設定されており、各パーティションの複製がどれだけ存在するかを設定できます。replication factorが3であれば、あるパーティションのデータの複製がクラスタ内部に3つ存在するということです。

あるパーティションのあるレプリカは一つのノードによって保持されます。

例えば、TopicAのパーティション数が8でreplication factorが3であった場合、クラスタに所属している各ノードに合計24のパーティションのデータが分散して配置されます。

kafkacat (kcat) コマンドによる出力は以下の様になる。ノード数は5台。

  topic "TopicA" with 8 partitions:
    partition 0, leader 5, replicas: 5,3,1, isrs: 1,5,3
    partition 1, leader 3, replicas: 3,4,2, isrs: 4,2,3
    partition 2, leader 1, replicas: 1,4,2, isrs: 1,4,2
    partition 3, leader 4, replicas: 4,2,5, isrs: 4,2,5
    partition 4, leader 2, replicas: 2,5,3, isrs: 2,5,3
    partition 5, leader 5, replicas: 5,2,3, isrs: 2,5,3
    partition 6, leader 3, replicas: 3,5,1, isrs: 1,5,3
    partition 7, leader 1, replicas: 1,5,3, isrs: 1,5,3

トピックに保持するデータの一行一行をレコードと呼び、レコードはキーとバリューのペアで構成されます。 KafkaのBrokerにとってはキーとバリューはただのバイト列であって、それがどういう型のデータかはクライアントによって解釈されます。 例えば、JSONシリアライズされたデータである場合や、AvroやProtocolBufferによってバイナリシリアライズされたデータである場合、ただの文字列である場合、バイト列をそのまま数値として利用する場合、などシリアライズ方式に依存します。

何故Kafkaが必要なのか

Kafkaは非常に応用範囲が広いため一言で説明するのが難しいが、以下の様なユースケースがあります。

  • システム疎結合化やスケーラビリティ向上のためのメッセージブローカー
  • Webシステムの行動トラッキング
  • システム負荷などをモニタリングするためのメトリックの記録
  • ログ集約
  • イベントソーシング

弊社では、システム疎結合化のためのメッセージ基盤かつ、ユーザーの行動データのリアルタイム集計及び、その集計結果をサービスにリアルタイムに反映するための基盤として利用しています。

Kafkaはキューではない

Kafkaはキューの様なこともできるし、分散キューと呼ばれることもあるが正確にはキューではありません。

Kafkaはイベントストリームを一定期間に渡って保持し続けており、Consumerがそれを処理したとしてもキューと違って保持しているデータは変化しません。 キューの場合は、Consumerがデータを処理した場合そのデータはキューから削除され、他のワーカーから利用できなくなります。そして、基本的には先頭のデータが消えない限り次のレコードは処理されません。 例えばAWSのSQSはその様に動作します。(visibility timeoutなどである程度順番は制御できる) AWSではKinesisがかなりKafkaに近いサービスとして存在します。(現在はManaged Kafkaも提供されている)

キューでないことによる利点

トピックの共有

データがずっと保持されるということは、ある一つのトピックを複数のシステムで共有できるということを意味します。 つまりデータの発生源はKafkaにデータを送信しておくだけで、複数のマイクロサービスを動かすことができます。その先にどんなサービスがあるかを知る必要はありません。 各サービス側は自分がどのトピックを利用するかだけ分かっていれば良く、そこからデータを取り出して自分の処理をするだけで良くなるため、データの発生源について知っておく必要はなくなります。 仮にデータを利用したいサービスが増えたとしても、データの発生元に送信先を追加する必要はありません。 Kafkaの本体となるミドルウェアをメッセージブローカーと呼びますが、この様にメッセージを仲介してサービス間を疎結合に保つ役割を果たすことができます。

耐久性

Consumerがデータを処理してもデータが変化しないということは、Consumerが処理中に何らかのエラーが起こって処理ノードごとダウンしたという場合であっても、データが失われないことを意味します。 そのため、Consumerでエラーが起きたとしても再キューをする必要はなく、ただ別のノードで同じオフセットから処理を継続すればデータを失うことなく処理を復帰できます。 sidekiqなどの様にRedisを処理キューとして利用した場合には、処理中のワーカーがエラーハンドリングを適切にやらないとデータを失う危険性があります。もしOOMなどによってプロセスが突然死した場合には復帰は不可能です。 Kafkaにおいてはそういった心配はほぼ必要なくなります。

一方で、exactly onceモードを利用しない限りはリトライによって二重処理される可能性はあるため、羃等性に関しては別途注意が必要です。

過去のデータを利用できる

Kafkaは一定期間(デフォルトで7日間)のデータを保持し続けていることに加えて、キューと違って任意のオフセットからデータを読み出すことが出来ます。 そのため、過去のデータを利用して処理を開始することが出来ます。

例えば、新しくマイクロサービスを追加した時に、ある程度過去のデータから処理を開始することができますし、一定期間停止しているサービスがあったとしても、再開した時に停止したタイミングまでのデータが残っていれば、安全に処理を再開できます。

キューでないことによる複雑さ

オフセットの記録

キューと違ってデータが消えないということは、Consumer側で自分がどこまでレコードを処理したのかを覚えておく必要があることを意味します。 そのため、KafkaのConsumerクライアントはオフセットを記録しておく場所が必要になります。Kafkaのプロトコルでは、オフセットの記録場所としてもKafka Brokerを利用します。 Kafka Brokerは無期限にデータを保持することが出来るため、各クライアントのオフセット情報を記録するためのストレージとしても利用できます。 クライアントがどこまで処理をしたかのオフセット情報の更新もまたイベントの一つであり、それが延々更新され続けているイベントストリームでもあります。それをKafka Broker自身に格納しておいて、クライアントは起動時にそのオフセット情報をトピックから読み取ります。Brokerに残っている中で最新の値が今までに処理したレコードのオフセットになります。

ちなみにAWSKinesisでは、この様なオフセット情報の記録をDynamoDBを利用して行います。

Consumer Group

キューと異なることによって生じる複雑さの影響はオフセットの記録だけではありません。Consumerの処理によってブローカーのデータが変化しないので、もし複数のワーカーが同じトピックを処理しようとしたら、オフセットの記録だけではワーカーの数だけ重複して同じ処理が実行されてしまいます。

そのため、Kafka ConsumerにはConsumer Groupという概念が存在します。Consumer Groupはどういう目的でKafkaを利用しているかを指す集合であり、Consumerはそれに所属してBrokerからデータを取得します。

同一のConsumer Groupに所属している場合、Kafkaのあるトピックのあるパーティションはそのグループ内で一つのConsumerでしか処理できません。(一つのConsumerが複数のパーティションやトピックを処理することは出来る)

図で示すと以下の様になります。 consumer-groups.png (26.8 kB)

そのため、あるトピックのパーティション総数以上に、そのトピックを処理するConsumerを増やすことはできません。 パーティション数が30であれば、そのトピックを処理できるConsumerの数は30が上限になります。この数はインスタンスの総数で、同じノードに30あっても30ノードに1つづつあっても一緒です。

Kafka Brokerはパーティショニングをしない

Kafkaの大きな特徴の一つとして、パーティショニングを行う主体がBrokerではないという点が挙げられます。 Kafkaにおいてパーティショニングを行うのはProducerです。

データの発生源がKafka Brokerにレコードを送信する時に、クライアントライブラリの実装によってパーティションが決定されます。 そのため、クライアントライブラリが違っていればパーティショニングのロジックが異なる可能性があります。 例えば、JavaのProducerではデフォルトのパーティショニングロジックはキーに対してmurmur2によるハッシュ化を行いその結果をパーティション総数で割って剰余を利用します。 一方でRubyruby-kafkaという実装ではcrc32を利用してハッシュ化を行い同様に剰余を取って利用します。 また、Rubyには他にlibrdkafkaというCのライブラリのバインディングとして実装されたgemがありそちらはcrc32, murmur2, fnv1aなどが選択可能になっています。 この様に同じデータを送信したとしてもクライアントの実装によってパーティションIDが異なる場合があります。

また、厳密にはレコードのキーとパーティショニングは直接関係している訳ではありません。 あくまでデフォルトの挙動としてレコードのキーを利用するだけで、クライアントは任意のデータを任意のロジックでパーティショニングした結果を直接レコードに指定してBrokerに書き込むことができます。

例えばruby-kafkaを利用していたとしても、自力でmurmur2のハッシュ化を行って同一のハッシュ値を算出できれば、それを元に算出したパーティションを直接レコードに指定することで、Javaのライブラリと同じパーティションにレコードを送信できる様になります。

パーティショニングのロジックを揃えることは実際のシステムにおいてはかなり重要です。

例えば、あるユーザー(ID:100)の1日あたりのアクセス数を集計したい場合、ID:100のユーザーのレコードは一台のノードで処理できることが望ましい。 Consumer Groupによってあるパーティションのデータはある1台のノードでしか処理されないことが保証されるので、一貫してID:100のユーザーのレコードが特定のパーティションに送信されるのであれば、各ノードは単に受け取ったデータをローカルに集計結果として蓄積しておけばよくなります。 この時、もし新しいデータの発生源が追加され、クライアントライブラリの実装が異なったせいでパーティショニングの結果が揃わなくなったとしたら、集計結果の蓄積場所がバラバラになってしまい、正しく集計できなくなります。

この様に、どういうキーとどういうアルゴリズムによってそのトピックのデータがパーティショニングされているかを把握することはとても重要です。

基本的には公式のクライアントの実装であるmurmur2 hashingを利用したパーティショニングに揃えておくのが一番安全だと思います。

パーティション数の見積りの重要性

Kafka Brokerを実際に運用する際に非常に難しい要素となるのが、パーティション数を適切に選択することです。

先に述べた様に、同一のConsumer Groupがあるトピックのパーティションからデータ取得できるのは1つのクライアントだけです。つまりスケールできるクライアント数の上限がパーティション数によって決定されてしまいます。

それなら、パーティション数を後から増やせば良いのでは?と考えるかもしれませんが、実際のところそれはそう簡単にはいかないケースも多々あります。 前段で、パーティショニングロジックが異なって違うクライアントに処理が移ってしまうと集計処理が正しく実行できなくなる可能性があると説明しましたが、パーティション数を後から変えた場合も同様の事が発生する可能性があります。

パーティショニングは基本的にキーとなる値をハッシュ化してトピックのパーティション総数で割った剰余を利用します。 パーティションの総数が変化すると同一のデータが割り当てられるパーティションが変わってしまいます。そうなると最終的に処理しているノードが変化してしまい、集計処理が壊れるという事態が発生します。

そのため、パーティション数を後から変更することは、場合によっては非常に手間のかかる難しいオペレーションになります。 実際にこのトピックをどれぐらいの台数までスケールしたいのかをよく考えてパーティション数を見積るのが重要になります。 基本的な見積りの指針となるのは、ネットワークスループットと要件として求められている処理スループット、そして処理にかかるレイテンシです。

補足ですが、処理ノードが変わったとしても集計処理に関係が無い様にアプリケーションを構築すれば良いのではないかと思うかもしれませんが、それはストリームプロセッシングにおいては余りやりたくない設計になります。 何故ならストリームプロセッシングにおける集計では、バッチ処理よりも遥かにデータ取得や書き込みの回数が多くなります。そのためネットワークを介して別ノードにデータを保持する仕組みは無視できないレイテンシの増加を招くことがあり、かなり強い必要性が無い限りは採用したくない設計と言えます。 この点に関する詳細は、以降の解説記事で紹介していく予定です。

まとめ

Kafka はリアルタイムに大量のイベントデータを処理するためのプラットフォームであり、その中核を為すのがKafka Brokerというミドルウェアである。

イベントデータはユーザーの行動ログだったり、システム間のメッセージのやり取りであったり、データベースの変更履歴であったり、様々なものが含まれる。

イベントデータは複数のパーティションに分割されたトピックに格納される。格納されるデータはキーとバリューで構成されたレコードであり、中身は単なるバイト列である。

Kafka Brokerはキューとは異なり、Consumerの処理によってデータが削除されないため、複数のシステムで同じトピックを共有したり、過去に遡ってデータを処理したり、リトライすることが容易である。

一方で、自分がどの範囲のデータを処理する責務を負っているかはクライアント自身で認識しておく必要があり、そのためにオフセット記録やConsumer Groupなどのキューには無い概念が必要になる。

パーティショニングの責任はクライアントが持つため、パーティション結果に依存する集計処理などを行う際には、クライアントライブラリの実装を把握しておくことが重要になる。

(まあ、大体の事は 公式ドキュメントに書いてあるので、これを全部読めば分かるのだが。)

Kafka Streamsを本番運用する時に検討しておくべきconfigの設定値について

Kafka Streamsをそれなりのデータ規模で運用していくとデフォルトの設定値では動作に困るケースがしばしば出てきます。

その中でも気にしておいた方がいい設定値について紹介していきます。

acceptable.recovery.lag

これはあるタスクにおいてStateStoreとchangelogトピックのlagのどこまでを追い付いていると許容するかどうかを設定します。

デフォルト値は10000なんですが、KTableとかで外部からデータが放り込まれていてそれが結構なデータ量だと10000とか一瞬で越えてしまって、一瞬プロセスを再起動しただけでも追い付いていない、みたいな扱いになってrebalanceが即座に走ってしまう挙動になりました。 実際にデータが入ってくるペースを考慮した値に設定しておかないと、プロセスを再起動した際に余計なrebalanceやレストアが走って無駄な停止時間が発生する可能性がある様です。

probing.rebalance.interval.ms

warmup replicaという仕組みが入って、一定期間ごとにwarmup状態をチェックするためのprobing rebalanceという処理が走る様になったのですが、それの実行間隔を指定します。

このconfigは公式のドキュメントだと重要度がLow扱いになってるんですが、これも運用規模によっては結構危険な設定値でした。デフォルト値は10分間です。 ノード数とかpartition数が多いとrebalanceが完了するまでに結構時間がかかる可能性があって、もしrebalanceが安定するまでに10分ぐらいかかってしまう場合、probing rebalanceがずっと終わらなくて、動作こそ継続できるもののクラスタの状態が不安定になるケースがありました。そういった場合に少し長めの値にすると状態が安定するかもしれません。

fetch.max.bytes, max.partition.fetch.bytes

consumerが一度に取得するデータサイズの上限を設定する値です。

これは全体のスループットや、StateStoreのレストア速度に影響します。余り小刻みにデータを取得するとその分の通信オーバヘッドでスループットが低下するので、データ量やネットワーク速度に合わせた値に設定することで、レストアにかかる時間が改善する可能性があります。

最近のjoker1007

Rubyist近況[1] Advent Calendar 17日目です。

仕事的な近況

最近、仕事で全然Rubyを書かなくなってきた。たまにRailsアプリの改修作業をやる程度で、今年書いた量で言うなら間違いなくJavaが一番多いだろう。

直近で書いたブログ記事を参照してもらえると分かり易いが、ここ1年ぐらいの自分の主戦場はKafkaとストリームプロセッサである。

Kafkaは非常に便利なミドルウェアだがメッセージキューの様でメッセージキューでなく、分散ストレージとして動作するがブローカー自体は余り分散の仕組みをコントロールする訳でもなく、クライアントライブラリ側で色々制御する様な仕組みになっているので、どういう活用の仕方をするものなのかイマイチ理解しづらい難しいミドルウェアでもある。

自分は完全に0から考えて必要だと思って調査してほとんどの仕組みを勝手に作ってしまったので性質も活用方法も分かっているが、そんなにすぐに各メンバーにそういった知識が伝えられる訳では無いので、Kafkaをゴリゴリ使う機能は必然的に自分が主導で書くことになる。

段階的に知識を移譲している最中だが、当分の間は作って説明して資料を作ってというのを繰り返すことになるだろう。

こんな感じで、ひたすらKafka周りの整備や作ったストリーム処理アプリケーションのパフォーマンスチューニングや新機能の実装をやっているとJavaばかり書くことになってしまった。今は秒間10万件のレコードを速攻で処理するストリーム処理基盤にガシガシ機能を追加している。

実際、Javaを結構書く様になって思ったが、最近のJavaには余りストレスを感じない。まあinterfaceの扱いとか中途半端なOptionalとかコンストラクタの制約とかlombok頼りな現状とか色々どうにかならんのかと思うことはあるが、長年の積み重ねで磨き抜かれたInteliJが余りに便利なので、開発体験自体はかなり良い。

(正直、そびえ立つまでに大きくなったRailsアプリケーションを改修するより遥かに楽とさえ言える……。やっぱ型検査とそれを利用した高度なLanguage Serverとか欲しくなるよね。)

そして、Rubyを全然書いていないのでgemのメンテやら何やらも割と放置気味で、3.1が出ようとしているこの時期になっても、全然新機能で遊んでいる余裕もない。Rubyistとしては恥ずかしいばかりである。仕事でMPを使い切っているのも余り良い状態ではない。もっとゆとりがあった方が良いのは間違いないだろう。

一方で、Kafkaについては多分かなり詳しくなったので、無茶苦茶高度な質問でなければ答えられるぐらいには理解したと思う。もしKafkaに興味がある人が居たらどこかで情報交換できると嬉しい。

ちなみに、何故Kafkaを使ってるのか、何がそんなに大事なのかという話については、後日改めてブログに書こうかなと思っているので、興味がある方は是非読んでやってください。

個人的な近況

今年は年の始めにzen3アーキのRyzenRadeon RX 6900 XTを使って新しいPCを組んだので、PC環境は今のところかなり快適な状態。

joker1007.hatenablog.com

OSは昔から愛用しているGentoo Linuxを使っている。Linuxにおけるノイキャンの方法とかビデオ会議やらなんやらの環境構築については、以下の様な記事を書いたので、もしLinuxに乗り換えようか考えてる人が居たら、参考になるかもしれません。

joker1007.hatenablog.com

別にやんちゃクラブに触発されたという程ではないが、SteamのおかげでLinuxでも結構PCゲームは出来る様になっており、Gentoo LinuxでもPCゲームはちゃんと遊べるぞ、ということを主張するためにゲームプレイを配信していこうかと思って、最近配信試験をしていたりとかする。とりあえず今はFF4のピクセルリマスターのプレイをYouTubeに記録している。今はちょうどラストダンジョン直前まで来てます。

www.youtube.com

他にHorizon Zero Dawnにシェーダー噛ませてRay Tracingを使える様にした比較動画とかを作ったりしてます。

www.youtube.com

Linuxでこの辺りのことをやる方法についても、暇を見つけて解説記事書いてみようかと思っているので、こちらも興味がある方はお楽しみに。

今年買ったものとか

今年買った大きい買い物は基本的にデスク周りのものが多い。まあ基本引き篭りやしね……。

  • FlexiSpotの電動昇降机
  • 3万円ぐらいのデスク用のサウンドバー (暫定的に買ったものなので、もうちょい良いものが欲しい。HDMI 2.1に対応していて4k 120fps出るやつとか無いだろうか)
  • 10Gbps対応のtp-linkのスイッチングハブ (しかし10GbpsのNICは無い……)
  • Sony WF-1000XM4 (無くしそうと思ってたが、家の鍵と同じ固定のポケットに入れられるので割と大丈夫かもしれない)
  • iPhone13 (5年ぶりの新スマホ)

WF-1000XM4は思ってたより買って良かった。うちのGentoo Linuxと96khzで接続できたし、ちょうど緊急事態宣言も明けて神田ぐらいまで飲みに行く機会も増えた時期に買ったので、利用頻度も結構高い。最初ちょっと音が籠ってるかなと思ったが慣れてきたら十分に良い音だと思う様になってきたし、これはオススメの品だと思う。

生命的な近況

いよいよ30代も大分後半になってきて、ぶっちゃけ体のあちこちにガタが来ている実感がある。そもそも不摂生の塊みたいな人間なので生活を改めろよ、という話なのだが、面倒なので何も対処しないまま何となく生きている。

正直、後10年ぐらいしたら死ぬのでは……ぐらいの感じではあるのだが、万が一半端に長生きしてしまった時のために、そろそろ金をまともに溜める手段を考えないと、このままでは老人になった時に詰んでしまうので、その辺りも気にする様になってきた。

とりあえず、来年の目標としていい加減にIDECOとNISAぐらいちゃんとやるかと考えている。

しかし、自分は家庭の都合という奴でどうにもならない出費が物凄く多いため、どうやって安定して運用資金を捻出するか非常に難しい。実際、毎月の収支は現状トントンか弱赤字である。収入が途絶えたら即座に死ぬ。(皆どうやって金溜めてんのかなと不思議である)

まあ、今のところは働けなくなる程のダメージは負ってないので、しばらくは死なないと思う。もし死にそうになったらTwitterで死ぬ手前ぐらいまでは実況していきたいと思っている。

まとめ

という訳で、何とか生きてます。2022年もよろしくお願いします。来年はもう少し友人に会って「うぇーい」ってビールジョッキを酌み交わせる日が戻ってくると良いなあ。

Kafka StreamsのStateStoreのちょっと変わった使い方 (ローカルキャッシュ、 コンフィグストア)

この記事は Kafka Advent Calendarの14日目の記事です。(1日遅れてるけど)

今回は割とライトな記事です。

ローカルキャッシュとしてのStateStore

Kafka StreamsにおいてStateStoreは、トピックから入力されて何らかの状態を保持した処理をしたい場合にその状態を永続化するために利用するのが基本的な形です。

一方でProcessor APIを活用しだすと、別にトピックから入力されてKafka Streamsで処理したものだけが保持対象じゃないことに気付きます。

StateStoreは基本的にはchangelogと紐付いているので、大規模な集計処理等を行ってデータ量が膨れ上がると、もしノードのストレージが無くなった時やrebalanceが走った時に、レストアに非常に長い時間が必要になります。これは現時点でちょっとやそっとでは解消しようがありません。

それを避けるために、通信という大きなオーバーヘッドを抱えても外部のデータソース(RedisやCassandraなど)からデータを取得したくなることがあります。

RedisやCassandraはかなり読み込みが高速なデータストアですが、Kafka Streamsの様なストリームプロセッサで頻繁に通信を行うと流石にローカルのStateStoreと比べて劇的に性能が落ちます。

そこで、withLoggingDisabled()をセットしたWindowStoreをキャッシュに使います。

changelogと紐付いていなければレストア処理が発生しないので、Processor APIの中でWindowStoreにデータが見つからなかった時だけデータを取りにいけば、処理内容によってはかなりの割合で外部との通信処理を削減することができます。

ここでWindowStoreを利用するのは現在KeyValueStoreでは自動的にデータをexpireする方法が無いからです。WindowStoreならretention timeを調整することで一定期間過ぎたStateStoreをローカルストレージから自動で削除できます。(RocksDB自体にはTTLを設定する方法があるがKafka Streamsからはそれを変更できない実装になっている)

実際MemoryStoreには、lruMapというストアが用意されており、オンメモリに乗る範囲であればこれを利用するとローカルなキャッシュがシンプルに実現できます。ただこれはメモリに乗る範囲でしか使えないことと、プロセスを再起動したら消滅してしまうことが難点です。

KeyValueStoreでTTLが使える様になると嬉しいんですが、今のところissueがあるのみ、といった感じですね。(https://issues.apache.org/jira/browse/KAFKA-4212)

コンフィグストアとしてのGlobalKTable

GlobalKTable(GlobalStateStore)は、全処理ノードに分配されるキーバリューストアとして利用できます。つまりデータフォーマットさえ決めておけばKafkaのトピックを経由して、任意の設定値を起動中の全ノードに配布したり削除したり出来るということです。

例えば、Kafka Streamsに限らずストリームプロセッサは処理が複雑になってくると結構デバッグするのが大変です。そういった時にユーザーの処理の流れを事細かに追いたいケースがあります。

デバッグモードを作り込むことは出来ますが、全ユーザーに対してそれを動かすと負荷がとんでもないことになって処理が詰まってしまったり、ログが膨大になったりする可能性が高いでしょう。

なので、対象のユーザーを絞ったりサンプリングして取得する様にしておきたくなります。

こういったケースでGlobalKTableに対象のユーザーの識別子やサンプリングの割合などの設定値を保持可能にしておきます。開発者はCLIのKafka Producerを使ってGlobalKTableになっているtopicにJSONを送り込めば、全ノードに対して起動したまま対象ユーザーの設定を変更することができます。必要無くなればnull値を送って設定を消せば元通りに出来ます。

こういった仕組みを管理機構として組み込んでおけば、いざという時に情報収集がしやすくなります。

この様にGlobalKTableはStreamからのJoin対象としてだけ利用するのではなく、外部から全ノードに設定値を即座に転送しストリームプロセッサ内で利用する仕組みにも応用できる、という話でした。

Kafka StreamsのWindowStoreのchangelogを再利用するためのダーティハック

この記事で紹介する内容はある意味で非常にリスキーであり、Kafka Streamsの内部実装に強く依存しています。 現状代替できる策が無いため、やむなく編み出した方法であることに注意してください。

Kafka StreamsのStateStoreとchangelogについて

Kafka Streamsではイベントの数をカウントするなどのStatefulな処理を実現するためにStateStoreというキーバリューストアを利用します。 もし処理ノードが壊れた時にも、キーバリューストアのデータが失われない様にするため、Kafka StreamsはStateStoreにデータを書き込むと同時にKafka Brokerに変更内容を書き込みデータの永続性を担保します。これをchangelogトピックと言います。

WindowStoreのchangelogの再利用

24時間のTimeWindowでユーザーのイベント毎の実行回数をカウントし、1ヶ月間(30日)保持しているとします。この場合、保持しておかなけれはいけないレコードの総数は最大で ユーザーの総数 * イベントの種類 * 30日分 になります。もしユーザーが1000万人、イベントが100種類あったとすると300億件のレコードを保持する必要があります。

そして、このイベントカウンターを他のKafka Streamsアプリケーションで利用したくなった時にどうするかを考えます。 (ワークロードが大きく異なりクラスタのキャパシティコントロールを考慮して別クラスタとして管理したい等の理由で)

この場合、現時点でこのWindowStoreを真っ当な手段で再利用する方法はありません。

安全な方法はイベントカウンターの情報を別のトピックに書き出して、それをKTableとして別アプリケーションから読み出す方法です。

しかし、この方法では単なるKeyValueStoreとしてしか読み出せなくなる上に、changelogトピックと別のトピックにデータを書き出す必要があってブローカーの負荷やストレージ消費が2倍になります。数千億件ぐらいの総数になってくると、気楽に倍のデータを保持する訳にはいかなくなってきます。

なので、どうにかしてWindowStoreのバックにあるchangelogを読み出してWindowStoreを復元できないものかと方法を探しました。

そして、Topology Optimizationで使われている内部APIを無理矢理引っ張り出すことで実現可能であることを発見しました。

InternalTopologyBuilder#connectSourceStoreAndTopicとWindowStore書き込みプロセッサの実装

Topology Optimizationを有効にすると、KTableとしてトピックのデータを読み込んだ時にchangelogトピックを新規に作成せずに、直接StateStoreとソーストピックを接続し、StateStoreにcheckpointが存在しない場合は、そのソーストピックからデータを取ってリカバリする様に動作します。

この処理をコントロールしているのが、InternalTopologyBuilder#connectSourceStoreAndTopicです。

    public void connectSourceStoreAndTopic(final String sourceStoreName,
                                           final String topic) {
        if (storeToChangelogTopic.containsKey(sourceStoreName)) {
            throw new TopologyException("Source store " + sourceStoreName + " is already added.");
        }
        storeToChangelogTopic.put(sourceStoreName, topic);
        changelogTopicToStore.put(topic, sourceStoreName);
    }

ちなみにKTableのOptimizationでどう呼ばれているかというと、こんな風なコードになっています。

// TableSourceNode.java

    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
        final String topicName = topicNames().iterator().next();

        // TODO: we assume source KTables can only be timestamped-key-value stores for now.
        // should be expanded for other types of stores as well.
        final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
            new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();

        if (isGlobalKTable) {
            topologyBuilder.addGlobalStore(
                storeBuilder,
                sourceName,
                consumedInternal().timestampExtractor(),
                consumedInternal().keyDeserializer(),
                consumedInternal().valueDeserializer(),
                topicName,
                processorParameters.processorName(),
                (ProcessorSupplier<K, V, Void, Void>) processorParameters.processorSupplier()
            );
        } else {
            topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
                                      sourceName,
                                      consumedInternal().timestampExtractor(),
                                      consumedInternal().keyDeserializer(),
                                      consumedInternal().valueDeserializer(),
                                      topicName);

            topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName);

            // only add state store if the source KTable should be materialized
            final KTableSource<K, V> ktableSource = processorParameters.kTableSourceSupplier();
            if (ktableSource.materialized()) {
                topologyBuilder.addStateStore(storeBuilder, nodeName());

                if (shouldReuseSourceTopicForChangelog) {
                    storeBuilder.withLoggingDisabled();
                    topologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName);
                }
            }
        }

    }

このメソッドによってStateStoreとソーストピックの関係が登録されることで、このソーストピックから直接StateStoreをレストアする動作が動く様になります。

このAPIは非公開になっていて通常はアクセスできないので、これをReflectionを使って無理やり引っ張り出します。

  @SneakyThrows
  private InternalTopologyBuilder getInternalTopologyBuilder(Topology topology) {
    Field internalTopologyBuilderField = Topology.class.getDeclaredField("internalTopologyBuilder");
    internalTopologyBuilderField.setAccessible(true);
    return (InternalTopologyBuilder) internalTopologyBuilderField.get(topology);
  }

  private void connectSourceStoreAndTopic(
      Topology topology, String storeName, String changelogTopicName) {
    InternalTopologyBuilder internalTopologyBuilder = getInternalTopologyBuilder(topology);

    internalTopologyBuilder.connectSourceStoreAndTopic(storeName, changelogTopicName);
  }

この様にTopologyクラスから非公開フィールドのinternalTopologyBuilderというフィールドを引っ張り出してきて無理矢理アクセスして、その中にあるInternalTopologyBuilderインスタンスを引っ張り出します。

InternalToplogyBuilderインスタンスさえ手に入れば、後は普通にpublicメソッドを呼ぶだけなので、これで対応可能です。

  connectSourceStoreAndTopic(
      topology,
      eventCounterStore.name(),
      "other-app-event-counter-store-changelog");

ただし、これだけではまだ再利用できません。これで動作するのはレストアだけなので、新しくレコードが到達した時にそのレコードをWindowStoreに書き込むことが出来ません。

そのため、上記に加えてProcessor APIを使って新しくレコードが来た時に正しくWindowStoreに書き込む処理を実装します。

import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.WindowStore;

@Slf4j
public class WindowStoreSource<K, V> implements ProcessorSupplier<Windowed<K>, V, Windowed<K>, V> {

  private final String queryableName;

  public WindowStoreSource(final String queryableName) {
    Objects.requireNonNull(queryableName, "storeName can't be null");

    this.queryableName = queryableName;
  }

  @Override
  public Processor<Windowed<K>, V, Windowed<K>, V> get() {
    return new WindowStoreSourceProcessor();
  }

  private class WindowStoreSourceProcessor implements Processor<Windowed<K>, V, Windowed<K>, V> {
    private WindowStore<K, V> store;

    @Override
    public void init(ProcessorContext<Windowed<K>, V> context) {
      store = context.getStateStore(queryableName);
    }

    @Override
    public void process(Record<Windowed<K>, V> record) {
      store.put(record.key().key(), record.value(), record.key().window().start());
    }
  }
}

基本的に来た側からデータをStateStoreに書いているだけです、レコードのTimeWindowの開始時点をちゃんとWindowStoreに書き込む時に引っ張ってきていることに注意してください。

最後にchangelogトピックのSerdeとWindowStoreBuilderを適切に設定して完了です。

  // changelog topicに書き込まれているTimeWindow情報付きのキーのデシリアライザ
  // このデシリアライザはドキュメントは無いものの、普通にアクセスできる。
  TimeWindowedDeserializer<String> windowedStringDeserializer =
      new TimeWindowedDeserializer<>(
          Serdes.String().deserializer(), Duration.ofHours(24).toMillis());
  windowedStringDeserializer.setIsChangelogTopic(true);

  final var EVENT_COUNTER_SOURCE = "EVENT_COUNTER_SOURCE";
  topology.addSource(
      EVENT_COUNTER_SOURCE,
      windowedStringDeserializer,
      new EventCounterDeserializer(),
      "other-app-event-counter-store-changelog");

  final var EVENT_COUNTER_PROCESSOR = "EVENT_COUNTER_SOURCE";
  topology.addProcessor(
      EVENT_COUNTER_PROCESSOR,
      new WindowStoreSource<String, EventCounter>(eventCounterStore.name()),
      EVENT_COUNTER_SOURCE);
  topology.connectProcessorAndStateStores(
      EVENT_COUNTER_PROCESSOR, eventCounterStore.name());

最後に

実際のところ、これは内部の実装が変わると容易にぶっ壊れる上に、完全にundocumentedなので、全くその実装が維持されることは保証されていません。 とはいえ、フレームワークがメジャーバージョンアップしたら色々直さなければいけないのは普通のことなので、Kafka Brokerを実際に立ててWindowStoreのレストアと書き込みに関するIntegration Testを実装し、CIを回しておけばまず気付けるのではないかと思います。

私の環境では、このハックによってかなりの量のデータやネットワーク負荷を削減することが出来たのでリスクと天秤にかけて利用する価値はあると判断していますが、正直万人にオススメはしないハックと言えます。

とはいえ、Kafka Streamsの内部実装を知る勉強になるかもしれないので、今回紹介させていただきました。

Kafka Streamsのlow level APIであるProcessor APIの活用例 (範囲JOINの実現)

この記事はKafka Advent Calendarの10日目の記事になります。

Kafka StreamsにはProcessor APIというlow levelなAPIがあります。

https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html

https://docs.confluent.io/ja-jp/platform/6.0.1/streams/developer-guide/processor-api.html (confluentには日本語のドキュメントもあるのでサクっと読みたい人はこちら)

私がしばらく開発してきたKafka Streamsアプリケーションは開発の最初こそStream DSLを使って開発してましたが、今やほとんどProcessor APIしか使ってません。 実際のところ、これを使えば何でも出来るんですが、何でも出来るとなると逆に取っ掛かりが捕めないと思うのでユースケースを紹介していきたいと思います。

一番重要な点はStreams DSLの裏に隠れているStateStoreを直接使えるという点です。これにより通常のStream DSLでは実現できないがユースケースとして十分に必要性がありそうな事が実現可能になります。

1. 2つ以上の関連レコードを取得するJOIN

Stream DSLによって提供されるJOINはキーによる完全一致のみです。INNER JOIN, LEFT JOIN, OUTER JOINとありますが、キーによって一致する最新のレコードを取ってくることしかできません。

一方で、Processor APIを利用すると部分的にキーが一致するレコードを複数取得するJOINや、キー以外の値でJOINする事が可能になります。

これには、実際のところStateStoreが実際のところどういう物なのかということを理解しておく必要があります。(注: ここで解説する挙動は組込みで実装されているStateStoreに関する挙動であり、自分でカスタムなStateStoreを実装している場合はこの限りではありません)

StateStoreはKafka Streamのタスクごとに作成され、キーのバイト列でソートされたキーバリューストアとして表現されます。これはOn MemoryでもPersistent(backendはRocksDB)でも同じです。

そして、実はStateStoreは本当にただのキーバリューストアなので、保存されているキーとKafkaのパーティションとは全く関係がありません。

どういうことかというと、Kafkaがuser_idでパーティションされていて、それによってKafkaのパーティションIDが決まっていたとしても、StateStoreに書き込むキーはそれと関係なく、例えばgroup_idとかuserの行動ログのevent_id等を設定できるということです。

例えばuserがある行動を取る度に、userが複数登録しているWeb Hookを叩きたいとします。

この場合user_idをパーティションキーとして利用して、以下の様なイベントログを受けるevent_logsトピックがあると考えてみましょう。

{user_id: 1, event_id: 123, payload: {"key1": "value1", "key2": "value2"}}

そして、event_idが123に一致する時にあるURLにevent_logsのpayloadを渡してアクセスして欲しい、というWebHookを受け取るweb_hooksというtopicを考えてみます。データ構造は以下の通り。

{webhook_id: 1, user_id: 1, event_id: 123, url: "http://foo.example.com/api", http_method: "POST"}
{webhook_id: 1, user_id: 1, event_id: 123, url: "http://bar.example.com/api", http_method: "POST"}
{webhook_id: 1, user_id: 1, event_id: 123, url: "http://hoge.example.com/api", http_method: "POST"}

この例で分かる様に一人のユーザーは複数のWebHook設定を持っている可能性があります。一つのイベントが来た時にキーが完全一致する一件だけを取得するのでは必要な情報が足りません。

という訳でキーとパーティショニングを工夫します。Kafkaはキーとバリューでレコードを管理しますが、実際のところキーとパーティション割り当ては直接関係がありません。Producerのクライアントライブラリのデフォルトの挙動がそうなっているだけで、送信時にレコードのキーと異なる任意のキーでパーティションIDを割り当てることができます。

この例では、user_idでパーティショニングした上でレコードのキーを#{sprintf( '%020d', user_id)}:${webhook_id}の様な形にしてbrokerに送信します。

こうしておいた上でKafka StreamsアプリでKTableとして読み込みます。

StreamsBuilder builder = new StreamsBuilder();

KTable<String, WebHook> webHooks =
    builder.table(
        "web_hooks",
        Consumed.with(Serdes.String(), new WebHookSerde())
            .withName("Source-WebHook"),
        Materialized.<String, WebHook, KeyValueStore<Bytes, byte[]>>as(
                "web_hooks_store"));

// SerdeはJSONをオブジェクトとして読み込んでくれるものが定義されているものとします

この様にKTableとして読み込む様にすると、user_idでパーティショニングされたIDと関連するタスク上でStateStore上には#{sprintf(user_id, '%020d')}:${webhook_id}というキーでデータが書き込まれて保持されます。

そして、イベントログを受け取る側でProcessorAPIを利用して、KTableとして書き込んだStateStoreを参照します。

KStream<String, EventLog> eventLogs =
    builder.stream(
        "event_logs",
        Consumed.with(Serdes.String(), new EventLogSerde())
            .withName("Source-EventLog"));

eventLogs.process(() -> new EventLogProcessor(), Named.as("EventLogProcessor"), "web_hooks_store");

この様に引数で指定することでProcessorは名前が一致する定義済みのStateStoreを参照することができます。

Processorはこの様な実装になります。

public class EventLogProcessor implements Processor<String, EventLog, String, WebHookRequest> {
  private ProcessorContext<String, EventLog> context;
  private TimestampedKeyValueStore<String, EventLog> eventLogStore;

  @Override
  public void init(ProcessorContext<String, WebHookRequest> context) {
    context = context;
    eventLogStore = context.getStateStore("web_hooks_store");
  }

  @Override
  public void process(Record<String, EventLog> record) {
    var userId = record.value().getUserId();
    var keyPrefixStart = String.format("$020d", userId)
    var keyPrefixEnd = String.format("$020d", userId + 1);
    try (var it = eventLogStore.range(keyPrefixStart, keyPrefixEnd)) {
      while (it.hasNext()) {
        var keyValue = it.next();
        var valueAndTimestamp = keyValue.value;
        var value = valueAndTimestamp.value();
        if (record.value.getEventId() == value.getEventId()) {
          var webhookRequest = new WebHookRequest(userId, value.getWebHookId(), record.value().getPayload());
          context.forward(record.withValue(webhookRequest));
        }
      }
    }
  }
}

注目して欲しいのはcontext.getStateStore("web_hooks_store")var it = eventLogStore.range(keyPrefixStart, keyPrefixEnd)です。

initの中でprocessメソッドに引き渡したStateStoreを取得していますが、Processorの中で取得できるのは引数で渡された名前のStateStoreだけです。この様にしてinit内で取得したものをプライベート変数の中に保持しておきます。またレコードのフォワードに利用するのでProcessorContextも一緒に保持しておきます。

実際にレコードが到達した時に処理されるのがprocessメソッドです。

レコードが到達したらレコード情報からuser_idを取得しそれを元にキーのprefixを作ります。そしてそのキーのprefixを利用してStateStoreのrangeメソッドを使って範囲取得を実行します。StateStoreはキーのバイト列でソートされているので、キーの構成が適切なら特定のuser_idに一致する全てのレコードがStateStoreから取得できます。

後はイテレーターを回して、一つづつレコードを結合してforwardメソッドに渡すだけで、後続で任意の処理が実行できます。この例だと次のProcessorでWebHookの実際のリクエスト処理を実装することになるでしょう。

この様にすることで、KTableを利用して範囲JOINが実現可能です。

注意点と補足

Processor APIを使ってcontext.forwardしてしまうと、現時点では移行の処理はlow levelのProcessor APIでしか処理できません。後続の処理をDSLでやる様に戻すことは出来ないということです。

Processor APIを使って後続の処理を実装する場合は以下の様になります。

topology.addProcessor(
    "NextProcessor",
    () -> new NextProcessor(),
    "EventLogProcessor",
    "other_state_store_name");

こんな感じで一度StreamBuilderからbuildした後のTopologyに対してaddProcessorを読んで、親のノードの名前を指定することで任意のProcessorを後続に繋げることが出来ます。 この後続のProcessorはいくつも追加できる様になっていて、親になるProcessorcontext.forwardすると全ての子となるプロセッサで処理が実行されます。もし、特定のProcessorだけに処理を渡したい場合はProcessorに渡した名前(この例ではNextProcessor)を利用して特定の子Processorにだけ処理をルーティングする様にcontext.forwardの引数で調整することもできます。

また、補足ですが、context.fowardした後の後続の処理は全て同期的に連なった単なるメソッド呼び出しになっており、あるProcessor内で複数回context.fowardを呼ぶとその都度末端まで処理が同期的に実行されてから次のレコードに処理が移ります。

そして、あるレコードに起因する全ての処理が末端まで呼び出された後に、次のレコードが処理され、そしてバッファに存在している全てのレコードの処理が終わると、次のランループに入ってレコードをtopicからpollingする処理が実行されブローカーからレコードを取得します。

実際の所、Kafka StreamsはStream Threadごとに並列で動作しますが、そのStream Threadの中身は単純なwhile loopによる同期処理の連なりで実現されている、ということです。

まとめ

今回はKafka Streamsのlow level APIで実現する範囲JOINについて解説しました。 他にもProcessor APIを使った割と利用頻度の高そうなユースケースがいくつかあるのですが、今日は力尽きたのでここまでにさせていただきます。 また時間があれば、別のユースケースも紹介するかもしれません。その時はまた読んでやってください。

LinuxでWF-1000XM4のハイレゾ再生を利用する方法

SonyのフルワイヤレスイヤフォンであるWF-1000XM4は最大で96khzに対応しており、ハイレゾ対応であることを売りの一つにしているが、それを利用するためにはBluetoothのデータ転送codecとしてLDACというcodecを使う必要がある。

まず前提としてデフォルトのWF-1000XM4は接続性を優先する設定になっていてLDAC接続が利用できなくなっているので、スマホと接続する等してSonyのHeadphonesアプリから音質優先モードに設定を変更しておく必要がある。(ちなみにiPhoneはそもそもLDACに対応していないので音質優先モードにしてもLDACは使えない)

LinuxでLDAC接続する方法

pulseaudio-modules-bt

Linuxであれば、pulseaudio-modules-btがインストールされていれば、pulseaudioの接続プロファイルの選択でLDAC接続を選択することができる。

github.com

メジャーなディストリビューションなら概ねパッケージが提供されているので、それでインストールできる。

利用する前に、最初にbluezを使ってイヤフォンとペアリングしておく必要があるが、以降は普通にpulseaudioのsink interfaceとして利用できる。

ただpulseaudioはデフォルトの設定だと、一旦ソフトウェアレベルで再サンプリングされて44100hzか48000hzに落とされるはずなので、avoid-resamplingを有効にしてsample-rateの設定を変更しておく必要がある。

この辺りをちゃんと意識しておかないと、結局44100hzで再生されているという様な状態になるのでpulseaudioを利用する時には再サンプリングの存在を意識しないとハイレゾ再生できなくなってしまう。

結構設定がややこしいので、正直音楽を聞くならpulseaudioを使うのは余りオススメしないが、普通にシステムの音声出力として利用するならこちらの方が手軽だろう。リサンプリングの有無に関わらず、LDAC接続の方が明らかに音が良い。

bluez-alsa

pulseaudioを使わずにALSAから直接デバイスを触りたい場合はbluez-alsaを利用できる。ALSAから直接触れる様になると、再サンプリング無しで直接LDACでエンコードしてBluetoothに流すことが容易なので、mpdやその他のプレイヤーを利用して音楽を聞くのがメインならこちらの方が良い。

github.com

これもメジャーなディストリビューションなら大体パッケージがあるはず。

パッケージからインストールしたらsystemdのunitファイルも一緒にインストールされるはずなので、それを使ってbluealsaのデーモンを起動する。

後はREADMEに従って、ALSAのconfigファイルである.asoundrcに以下の内容になる様に設定を記述する。

$ cat ~/.asoundrc

# 以下が設定ファイルの内容
defaults.bluealsa.service "org.bluealsa"
defaults.bluealsa.device "XX:XX:XX:XX:XX:XX"
defaults.bluealsa.profile "a2dp"
defaults.bluealsa.delay 10000

後はbluetoothctlやその他のフロントエンドを使ってWF-1000XM4とペアリングし接続するとbluealsaがALSAから利用できるオーディオデバイスを追加してくれる。

接続が上手くいけば、以下のコマンドでデバイスが確認できる。(XX_XX_XX_XX_XXはデバイスアドレスなので各環境で読み替える)

$ bluealsa-cli list-pcms
/org/bluealsa/hci0/dev_XX_XX_XX_XX_XX/hspag/sink
/org/bluealsa/hci0/dev_XX_XX_XX_XX_XX/hspag/source
/org/bluealsa/hci0/dev_XX_XX_XX_XX_XX/a2dpsrc/sink

もしくは

$ bluealsa-aplay -L
bluealsa:SRV=org.bluealsa,DEV=XX:XX:XX:XX:XXPROFILE=sco
    WF-1000XM4, trusted audio-card, playback
    SCO (CVSD): S16_LE 1 channel 8000 Hz
bluealsa:SRV=org.bluealsa,DEV=XX:XX:XX:XX:XX,PROFILE=sco
    WF-1000XM4, trusted audio-card, capture
    SCO (CVSD): S16_LE 1 channel 8000 Hz
bluealsa:SRV=org.bluealsa,DEV=XX:XX:XX:XX:XX,PROFILE=a2dp
    WF-1000XM4, trusted audio-card, playback
    A2DP (LDAC): S32_LE 2 channels 96000 Hz

しかしデフォルトでの接続設定だとAACが伝送codecになっているので、設定の変更が必要になる。上記のbluealsa-aplay -La2dpのcodec情報がLDACではなくAACになっているはず。

設定の変更はbluealsa-cliで行う。まず利用可能なcodecを確認する。

$ bluealsa-cli info /org/bluealsa/hci0/dev_XX_XX_XX_XX_XX/a2dpsrc/sink
Device: /org/bluez/hci0/dev_XX_XX_XX_XX_XX
Sequence: 0
Transport: A2DP-source
Mode: sink
Format: S32_LE
Channels: 2
Sampling: 96000 Hz
Available codecs: SBC AAC LDAC
Selected codec: LDAC
Delay: 200.0 ms
SoftVolume: Y
Volume: L: 127 R: 127
Muted: L: N R: N
$ bluealsa-cli codec /org/bluealsa/hci0/dev_XX_XX_XX_XX_XX/a2dpsrc/sink LDAC

これでselected codecがLDACになっていればOK。接続しなおしてサンプリングレートが96khzになっていることを確認する。

後は通常のalsaを利用した再生設定を各プレイヤーに追加すればハイレゾ再生が可能になる。

mpdの設定なら以下の様になる。

audio_output {
    type        "alsa"
    name        "WF-1000XM4"
    device      "bluealsa" # alsaのbluealsaに関するデフォルト設定をそのまま利用するという意味
    mixer_type      "none"
    auto_resample   "no"
    auto_format "no"
    auto_channels   "no"
}