RubyKaigi 2022に現地参加してきた

今回、3年ぶりにオフライン開催されたRubyKaigi 2022に参加するために津まで行ってきました。今回はスポンサーでもスピーカーでもなく完全な一般参加者です。

(自分のTwitterは飯テロの画像なども含むため、センシティブなものを含む可能性があるチェックを付けているので画像を見る場合は自分で開いてくださいw)

久しぶりにオフラインの大きなイベントに参加し、3年以上ぶりにRubyコミュニティの皆と再開しました。皆マスクしてるから最初は「えっと、この人は……?」って感じで探り探りなところもありましたが、3日目ぐらいには大分慣れてきて、多くの人の健在を確認することができて、本当に楽しく過ごすことができました。

特に、WASMやYJITのめちゃくちゃ高度な話を聞いて「なるほどわからん」ってなった後に、参加者と感想を言い合いながら飲み会をやるのが、こんなに楽しいってことを完全に忘れていた自分に気付いて、失っていたものの大きさを改めて実感したRubyKaigiでした。

会場で一緒にTRICKを見てザワザワしてくる空気感とか、その場に居ることで楽しさが増すということは一杯ありましたね。

他にも、最近gRPCを使うかどうかを検討している中で、クックパッドさんのスポンサーブースでukstudioさんとgRPCを使ったマイクロサービスについて色々と質問して話し込んだりできたのも、オフラインカンファレンスが生んだ「廊下」のおかげだったなあと思います。

最後にチーフオーガナイザーの松田さんが、この場を再び失いたくはないから走り続けるよ、と言ってくれたことに非常にグッときました。 まだどうなるか分かりませんが、来年もあるぞという希望を持って過ごすことができるのはとてもありがたいことです。 最近、自分が全然Rubyに触らない生活をしているので、来年のRubyKaigiにもCFPに応募できるのかどうか怪しいところがあり、我ながら情けないなと感じるところはありますが、コミュニティの一員としてせめて参加ぐらいは出来るといいなあと思っています。

Kaigi後

結局、最終日朝まで飲んで、何とか3時間弱寝て、ヘロヘロになりつつ鳥羽に向かいました。 子供の頃に何度か行ったことがあった鳥羽水族館に行きたかったので。

鳥羽駅近くのサザエストリートで伊勢海老のお造りとサザエの壺焼きを食べました。

横に座った団体の方と少し雑談をしていたら味噌汁を奢ってもらって、二日酔い(というか最早当日酔い)でヘロヘロの自分にめちゃくちゃ染みました。最高ですね。

本当は体力が普通なら、水族館の後に伊勢神宮の方まで行ったり、伊勢角屋ビールが飲めるところまで行きたかったのですが、完全に体力の限界と、晴天による暑さで気持ち悪くなってきたので、諦めて昼過ぎにそのまま帰る電車に乗りました。 それでも十分楽しめたと思います。また戻ってきたい観光地が増えた。

帰宅したら、もうマジで体力が0で風呂入って即寝るしか出来なかった。一応、今日ある程度仕事している自分は偉いと思う……。

という訳で、今回のRubyKaigiも最高に楽しかった。Kaigiオーガナイザー、スタッフの皆様、難しい状況の中オフラインのカンファレンスイベントを開催してくれて本当にありがとうございます!!

ラストキーノートについて

余りに内容が分からん自分自身に割とムカついてきたので、どっかで自分なりに勉強して分かる範囲をどっかにまとめようかなと思ってます。まあブログにするかどうかは分かりませんが、やる気ある内にやらんとそのままになりそう。

とりあえず、CPUの動き方と機械語について復習して、LBBVの概念についても理解できるかどうかはさておき論文を眺めるぐらいはしてみよう。

neovimの棚卸をして、LSP対応やらファインダーの変更やら色々やった

最近Javaばかり書いてるもんでInteliJしか触ってなくて、neovimを放置してたのだが、やっと重い腰を上げて今風なエディタにするべく、全プラグインを見直しつつ設定を刷新した。

init.lua

まず、普通のvimと共通の設定を作るのを諦めて、設定ファイルをinit.luaに全面的に書き直した。今となってはneovim以外を使うことは余り考えられないし、それが使えない状態だったら素のvimでええわという感じで割り切った。

設定をinit.luaに移行する時にやらなければいけないのは基本的に以下の3つ。

  • set number みたいな設定値をvim.opt.number = trueという形に置き換える。set nonumberみたいなのはvim.opt.number = falseになる。
  • keymapの設定はvim.keymap.set('n', 'L', '<cmd>tabnext<CR>', {silent = true})の様に書く。これはnnoremap <silent> L :tabnext<cr>を意味する。
    • 動作を設定する箇所にluaのfunctionを直接マッピングすることも出来る。
  • letを利用した変数代入は、vim.g.hoge = "fuga"という形で設定できる。これはlet g:hoge = "fuga"を意味する。

もし、元々functionとかautocmdなどを使っていて設定が結構ややこしい場合は移行がかなり面倒臭くなる。 そういった時は最悪vim.cmd(string)の形で括ってしまえば、vimscriptがそのまま動く。s:スコープを使っているとスコープが切れてしまうので分かりにくくなるが、それ以外はそのままコピーすればどうにかなる。

その他の書き方については、nvim-lua-guide-ja/README.ja.md at master · willelz/nvim-lua-guide-ja · GitHub を見ると参考になる。

vimの中でhelpを見たい時は、:help api もしくは :help luaってやると使える関数のリファレンスが見つかる。

プラグイン管理の移行

元々、dein.vimを利用していたのだが、luaベースのconfigに置き換えるにあたって、packer.nvimに置き換えた。

github.com

最近のneovimのプラグインは、luaで書かれているものが多く、luasetupメソッドを読んでconfigurationを行うタイプの物が多い。 なので、プラグイン読み込み後のhookなどを直接luaで書けるpackerが都合が良いだろうと判断した。

多分、今のneovim界隈では一番利用者が多いプラグイン管理システムだと思う。

Fuzzy Finder

ファイルのFuzzy Finderもずっとdeniteを使っていたが、telescope.nvimに移行した。

github.com

動作が十分高速で、preview表示してくれて、デフォルトでMRUのfinderがあるのが良い。

後、telescopeの拡張として、 telescope-file-browserを入れている。

telescopeの画面の中でファイル作成したり、削除したりできる様になる。

しかし、telescopeには一個大きな不満があり、複数ファイルを選択して同時に一気にbufferに展開するということが出来ない。まあ、custom action書いて頑張れば出来そうな気はするのだが、何故これがデフォルトで出来ないのか良く分からない。

LSP対応

元々、LanguageClient-neovimを入れていたのだが、builtinのLSPクライアント(nvim-lsp)に変更した。 neovimのLSP関連の設定は結構ややこしいというか、一回覚えてしまえば、そんなに難しくないのだが、VSCodeほど何も考えんでもオッケーみたいな感じでは無い。

現状、nvim-lspを使う上で必須のプラグインがいくつかある。

nvim-lspconfigは、各種Language Serverのconfig presetが用意されているプラグインで、代表的なLanguage Serverならsetupメソッドを呼ぶだけで使える様になる。

mason.nvimはLanguage Serverやformatter, linterのインストーラーで、これも大体メジャーなLanguage Serverは網羅されている。nvim-lsp-installerはdeprecatedになったらしい。

mason-lspconfig.nvimは、masonでインストールしたLanguage Serverを認識して、setupを実行しやすくするためのhookを提供してくれる。

補完に関しては、 nvim-cmp を使うことにした。

補完ソースとしては以下のものを使っている。

  • cmp-path (ファイルパス補完)
  • cmp-buffer (開いているbufferからキーワード補完)
  • cmp-nvim-lsp (LSPから補完)
  • cmp_luanip (luasnipのスニペット展開候補を補完)
  • cmp-git (GitHubのメンションやissue番号の補完)
  • cmp-cmdline (neovimのコマンド入力時に自動補完が効く)

最終的には、こんな感じの設定でLSPを使った補完が効く様になる。

local cmp = require'cmp'
cmp.setup {
  snippet = {
    expand = function(args)
      require'luasnip'.lsp_expand(args.body)
    end
  },

  sources = cmp.config.sources({
    {name = 'nvim_lsp'},
    {name = 'nvim_lsp_signature_help'},
    {name = 'path'},
    {name = 'buffer'},
    {name = 'nvim_lua'},
    {name = 'luasnip'},
    {name = 'cmdline'},
    {name = 'git'},
  }),
}

-- The nvim-cmp almost supports LSP's capabilities so You should advertise it to LSP servers..
local capabilities = vim.lsp.protocol.make_client_capabilities()
capabilities = require('cmp_nvim_lsp').update_capabilities(capabilities)

local lspconfig = require('lspconfig')
require('mason-lspconfig').setup_handlers {
  function(server_name)
    lspconfig[server_name].setup {
      capabilities = capabilities,
    }
  end,
}

diagnostics

元々ale.vimを利用していたのだが、diagnostics表示もLSPにまとめてしまおうと思って、これも移行することにした。

LSPに対応していないlinterとかformatterの通知をLSPに対応させるラッパーみたいなLanguage Serverが存在していて、代表的なのがnull-lsefm-langserverである。 これらを使って任意のコマンドを実行し結果のフォーマットを調整することで、何でもLSPの上に乗せてエディタに表示させることが出来る。

自分はとりあえずnull-lsを入れてみた。設定は雑にこんな感じ。

require('null-ls').setup({
  capabilities = capabilities,
  sources = {
    require('null-ls').builtins.formatting.stylua,
    require('null-ls').builtins.diagnostics.rubocop.with({
      prefer_local = "bundle_bin",
      condition = function(utils)
        return utils.root_has_file({".rubocop.yml"})
      end
    }),
    require('null-ls').builtins.diagnostics.eslint,
    require('null-ls').builtins.diagnostics.luacheck.with({
      extra_args = {"--globals", "vim", "--globals", "awesome"},
    }),
    require('null-ls').builtins.diagnostics.yamllint,
    require('null-ls').builtins.formatting.gofmt,
    require('null-ls').builtins.formatting.rustfmt,
    require('null-ls').builtins.formatting.rubocop.with({
      prefer_local = "bundle_bin",
      condition = function(utils)
        return utils.root_has_file({".rubocop.yml"})
      end
    }),
    require('null-ls').builtins.completion.spell,
  },
})

rubocopなんかはsolargraphでもリポートしてくれるのだが、プロジェクトルートに.rubocop.ymlが無い時は動かさないとか、細かい設定が出来ないのでnull-lsで実行することにしている。

後は、表示を見易くするために、trouble.nvimを入れた。 workspace全体の警告とかをサクっと一覧できる。

statusbar

元々はairlineを使っていたが、lualine.nvimに乗り換えた。実際、あんまり変わってないw

github.com

その他プラグイン

元々使っていたプラグインの代替になる様なneovim向けのプラグインが見つかったら、軒並置き換えていった。

実際によく使うものをいくつか紹介する。

nvim-treesitter

tree-sitterというrustで書かれた汎用言語パーサがあり、それを使ってSyntax Highlightを行うプラグイン。 デフォルトのvimscriptによるハイライトよりもかなり細かく調整できて高速らしい。

github.com

ただ、これに対応しているColorSchemeを使わないとハイライトが完全に出ない。なので、入れる時には合わせて新しめのColorSchemeに移行するのが良い。

個人的に気に入ってるのは、 zephyrnightfoxあたり。

nvim-ts-rainbow

treesitterを使ってrainbow bracketをやってくれるプラグイン

github.com

vim-matchup

treesitterを使って、do endとかbegin endみたいなキーワードペアもハイライトしてくれる。

github.com

nvim-treesitter-endwise

autopairのdo end版。RuibyとかLuaで自動的にendを埋めてくれる。

github.com

nvim-autopair

対応する)とか]を自動で埋めてくれる。これもtreesitterに対応していて、treesitterのノードに合わせて有効無効が設定できる。

github.com

which-key.nvim

vim-which-keyのlua版。キーマップの入力途中で、次に何のキーを入力すると何が動くかをメニュー上に表示してくれるプラグイン

github.com

色々mappingを設定すると忘れるので便利。 ちゃんと設定するのは面倒だが、ノー設定でもそれなりに使える。

Comment.nvim

コメントのトグルをしてくれるプラグインのnvim版。nerd-commenterから移行した。別に元々困ってなかったが、treesitterに対応していてtreesitterのパーサ情報からコメント記法を取得してトグルしてくれるのがモダンな感じ。

github.com

nvim-web-devicons

とりあえず入れておくと、色々なプラグインがいい感じにアイコンを出してくれる。

github.com

vim-illuminate

カーソルが当たってるワードと同じワードに下線を引いてくれる。ちゃんとLSPやtreesitterを参照して言語構造を認識した上で下線を引いてくれる。

github.com

nvim-surround

surround.vimのneovim版。()とか[]とかをサクっと置換したりする時によく使う。surround.vimより良くなった点は、変更操作中の対象になっている()などがハイライトされる様になったこと。操作してる時に見た目が分かり易くなった。

github.com

gitsigns.nvim

gitgutterのneovim版。gitで変更があったらsignを行番号の横に表示してくれるやつ。gitgutterより高速だと思う。後virtual textでその行のblameを表示してくれる機能は地味に便利。

github.com

diffview.nvim

gitのdiffをいい感じに一覧で出してくれるプラグイン。diffをまとめて見るなら、fugitiveより見易い。

github.com

nvim-tree.lua

neovim内ファイラー。

github.com

indent-blankline.nvim

インデント表示を見易くするやつ。vim-indent-guidesから移行した。

github.com

他にも色々入れ替えたり追加したりしたのだが、とりあえずこんなもんにしておこう。 そんな入れ替えんでいいやろ、みたいなものもせっかくなんでガンガン棚卸して入れ替えたので、一気にモダンになったし、プラグインの動作が割と早くなった。

というか、一回やり出すと止まらなくなって、1週間ぐらいずっとneovimの設定を弄って遊んでいたのが本当の所だったりするw

Kafka入門 第2回 「Kafka Streamsを使ったストリーム処理アプリケーション開発」

以前にAWS Summitで似た様な話をしているので、こちらのブログとかも参考にしてください。

https://dev.classmethod.jp/articles/aws-summit-online-2020-cus-47/

今回の内容も、公式のドキュメントを全部読むなら大体書いてあることなので、既にある程度知っているという方は、後半の「StateStoreの実態」以降の部分だけを読んでもらえれば特に注意すべきことが書いてあります。

参考 https://docs.confluent.io/ja-jp/platform/6.2.0/streams/index.html https://docs.confluent.io/platform/7.0.1/streams/index.html

ストリームプロセッシングとは

イベントストリームに対して、データが到着するたびに処理を行い、それが延々と続いていく様なアプリケーションを指します。 Kafkaのエコシステムにおいては、Kafka BrokerにConsumerとして接続して、無限ループでpollingを行い、データが到達する度に短いインターバルで処理を行うアプリケーションのことです。

複数のレコードを使って集計処理を行うケースもあるが、ある一定期間ごとに集計ウインドウを区切った上で、一件ごとに集計結果をアップデートするか、バッファリングしておいて一定のインターバルをおいて実行する処理で集計を行う、といった形を取る。

ステートレスかステートフルか

ストリームプロセッシングの処理内容は大きくステートレスかステートフルかに分けられます。

ステートレスな処理とは、あるイベントレコードが到着した時に、そのレコードだけで処理が完結する処理のことです。 レコードAをレコードBの形に変換して別のトピックや別のデータストアに転送する処理などがその典型です。

一方、ステートフルな処理とは、到達したイベントレコードやそれを基に生成したデータを一定期間保持しておいて、それと組み合わせて結果を生成する処理のことです。 典型的なものでは、イベントの数を集計して合計や平均やヒストグラムを算出したり、処理効率のためにバッファリングしてまとめて処理したり、別のストリームやデータストアとデータを結合してデータエンリッチメントを行うといったものが挙げられます。

ステートフルな処理においては、一定量のレコードを格納しておくデータストアが別途必要になります。それをStateStoreと呼びます。ストリームプロセッシングにおけるStateStoreには重要な考慮ポイントがいくつか存在しますが、それについては後述します。

ステートフルアプリケーションにおけるデータストア

ストリームプロセッシングは、あるデータが到達したらその都度処理が行われるため、何らかのデータストアを利用すると、データが来る度にデータの取得や保存を行うことになる。

そのため重要な要素として低レイテンシであることが求められます。データ量によってはRedisの様なシンプルで高速なKVSであってもネットワークレイテンシが馬鹿にならないため、十分とは言えない場合があります。

そこでストリームプロセッシングでは、基本的に各処理ノードのローカルにデータストアを保持してレイテンシを低く保つという手法を採用します。

後述するKafka StreamsやApache Flinkなどのストリームプロセッシングフレームワークでは、RocksDBが採用されています。 RocksDBはアプリケーションに組込むタイプのKVSで、RocksDBを各ノードのローカルで動かすことで低レイテンシな状態保持を実現します。

各ノードのローカルで動かすということは、ノードが不慮の事故でダウンし復帰不能になった場合データが失われる危険性があることを意味します。 これを避けるためノードに依存しないデータ永続化の仕組みも別途必要です。この仕組みにおいてはネットワーク通信のオーバヘッドは避けられないため、ある程度のバッファリングが必要になります。

また、Kafkaを利用したストリーミングアプリケーションでは、スケーラビリティを確保するためにアプリケーションを分散して複数ノードで動作させることもよくあります。 そういったケースでは、ノード数の増減に応じて処理するパーティションの割り当てが変わる可能性があります。 第1回で解説した様にKafka Consumerは同一のConsumer Groupでは一つのパーティションを処理できるのは一つのクライアントだけです。台数が増減するとその割り当てが再配置されることになり、処理対象が別ノードに移ることを考慮しなければいけません。 処理が別ノードに移ると、それまでに各ノードのローカルで保持していたデータをどうにかして新しく処理が割り当てられたノードに移し替えないと、今迄に保持していたStateStoreのデータが利用できなくなります。例えばノードが増えた途端に集計済みのカウントがいきなり0に戻ったりすることになる訳です。 これを回避するために処理ノードの変遷に合わせてStateStoreを再配置する仕組みが必要不可欠です。

Kafka Streamsとは

Kafka StreamsはApache Kafkaの開発プロジェクトで公式に提供されているストリームプロセッシングフレームワークです。Javaで実装されています。

前段で解説した様にステートフルなアプリケーションを実装しようと思うと、その状態管理に結構複雑な仕組みが要求されます。 Kafka Streamsを利用するとフレームワークがこういった複雑さの面倒を見てくれます。またConsumerクライアントのpollingループや別のトピックにレコードを転送するためのProducerクライアントの集約なども面倒を見てくれるため、ステートレスなアプリケーションであっても大幅にコードを削減できます。

基本的にはここで説明することは Confluent社の提供しているドキュメントに概ね書かれていることになります。 少し古いですが日本語のドキュメントもあるので、詳細を知りたい方はそちらを参照して隅々まで読み込むのが良いでしょう。

https://docs.confluent.io/ja-jp/platform/6.2.0/streams/concepts.html

主な特徴

Kafkaの開発で主要な役割を担っているConfluent社のドキュメントによると以下の様な特徴があります。

機能 - アプリケーションに高い拡張性、弾力性、分散性をもたらし、フォールトトレラントを実現 - 「厳密に 1 回」の処理セマンティクスをサポート - ステートフル処理とステートレス処理 - ウィンドウ、結合、集約を使用したイベント時処理 - ストリームとデータベースの世界を 1 つにする Kafka Streams の対話型クエリ をサポート - 宣言型で関数型の API と、下位レベルの 命令型の API を選択でき、高い制御性と柔軟性を実現

軽量である - 導入へのハードルが低い - 小規模、中規模、大規模、特大規模のいずれの事例にも対応 - ローカル開発環境から大規模な本稼働環境にスムーズに移行 - 処理クラスターが不要 - Kafka 以外の外部依存関係がない

cf. https://docs.confluent.io/ja-jp/platform/6.2.0/streams/introduction.html

この中でも私が最も重要だと思うポイントが、Kafka以外の外部依存関係がない、という点です。こういった分散処理フレームワークの場合、YARNの様なリソースマネージャーやその他のデータストアが必要になるケースもしばしばありますが、Kafka Streamsにはそれが必要ありません。 これは導入のハードルを大きく下げてくれます。

Tutorialコード

Kafka Streamsでアプリケーションを書くには以下の様なコードを書きます。 (公式ドキュメントからの引用を少し改変したものです)

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.StreamsBuilder;
import org.apache.kafka.streams.processor.Topology;

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.  We will cover this in detail in the subsequent
// sections of this Developer Guide.

StreamsBuilder builder = new StreamsBuilder();
builder
    .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
    .peek((k,v) -> logger.info("Observed event: {}", v))
    .mapValues(s -> s.toUpperCase())
    .peek((k,v) -> logger.info("Transformed event: {}", v))
    .to(outputTopic, Produced.with(stringSerde, stringSerde));
Topology topology = builder.build();
//
// OR
//
topology.addProcessor(...); // when using the Processor API

// Use the configuration properties to tell your application where the Kafka cluster is,
// which Serializers/Deserializers to use by default, to specify security settings,
// and so on.
Properties props = ...;

KafkaStreams streams = new KafkaStreams(topology, props);
streams.start(); // non blocking

この様にKafka StreamsはDSLを繋げたりProcessor APIと呼ばれる特定のインターフェースに則ったJavaクラスを実装するだけで、Kafkaに接続するConumerやProducerの構築、処理スレッドの立ち上げなどの面倒を見てくれます。

Kafka Streamsのアプリケーションモデル

Kafka StreamsはSource Node, Processor Node, Sink Nodeを組み合わせてTopologyという単位を作ることでアプリケーションを構築します。 それぞれどういう意味を持つのか説明していきます。

Topology

Topologyという単語はベースとしては数学用語で位相幾何学を指します。私は数学の徒ではないので詳しくは分かりませんが、物の形状が持つ性質に焦点を当てた学問の様です。 そこから派生してネットワーク・トポロジーというネットワークグラフがどういった構成をしているかの形状パターンを指す用語があります。 Kafka StreamsにおけるTopologyはこのネットワーク・トポロジーと同じものと考えて良いと思います。

Kafka Streamsは特定の役割を持ったノードを接続してグラフ構造を構築しTopologyを形成して動作します。 このTopologyは2つ以上のサブトポロジーによって形成されている場合があります。

streams-architecture-topology.jpg (182.2 kB) (Confluent Documentationから引用)

この様なグラフ構造を1つ以上保持するオブジェクトがKafka StreamsにおけるTopologyです。

2つ以上のサブトポロジーがどうやって生成されるかというと、一つのアプリケーションが複数のソーストピックからデータを取得する様に定義されていて、またそのソーストピックに依存しているProcessor NodeやSink Nodeに直接依存関係が無い場合です。 つまりSource Nodeから辿って一つのグラフが構築される様に依存関係が繋がっている範囲が、一つのサブトポロジーです。 上記の例ではSource Nodeが複数ありますが、下のProcessorが両方のSource Nodeに依存しているためグラフが一つに繋がっているのが分かります。こういった場合はサブトポロジーは1つになります。 もし、左側のSource Nodeがその他の全体と独立してどことも繋がっていないProcessor Nodeにだけ依存している場合は、2つのサブトポロジーが出来ることになります。

Source Node

Kafkaトピックからデータを取得する処理を行うノードをSource Nodeと呼びます。 Kafka Streamsには3種類のデータの取得パターンが存在します詳細はDSLの項で解説します。 上記のサンプルコードのDSLではstreamがそれにあたります。

Processor Node

Source Nodeから受け取ったデータを実際に処理するノードを指します。 実際に開発者が実装するのは、ほぼこのノードになります。 Processor Nodeは複数繋げることが出来ます。処理行程を分割することで見通しの良い実装が可能になります。 上記のサンプルコードのDSLではpeekmapValuesがそれにあたります。

Sink Node

別のKafkaトピックにデータを送信する処理を行うノードを指します。 上記のサンプルコードのDSLではtoがそれにあたります。

この様にSource Nodeが何らかのトピックからデータを取得し、Processor Nodeで何らかの処理を行いデータを加工したり一時的に保存したり別のデータストアに書き出したりして、そして必要であれば、そのデータを更に別のトピックに書き出して他の処理に繋げていく。 Kafka Streamsのアプリケーションはこうやって構築されます。

Task

Kafka Streamsは当然複数のマシンで分散処理が出来る様にデザインされています。 Kafka Streamsではサブトポロジーがpolling対象としているソーストピックのパーティション数に対応したタスクという単位でクライアントに処理を割り当てます。

例えば、トポロジーAの中にサブトポロジーA-1とサブトポロジーA-2があり、サブトポロジーA-1がトピックB-1をサブトポロジーA-2がトピックB-2を参照しているとします。 この時、トピックB-1のパーティション数が8でトピックB-2のパーティション数が10であった場合、1_0 〜 1_7というタスクと、2_0 〜 2_10というタスクが作成されます。タスクの総数は18です。 この18タスクを現在Consumer Groupに所属しているクライアントに割り当てます。割り当てのストラテジーにはいくつかパターンがありますが、そういった動作の詳細は次回以降で解説します。

DSL

上記で解説したTopologyを簡単に構築するために、Kafka Streamsには便利なDSLが用意されています。 DSLを使うことで、Lambdaをメソッドチェーンで繋げる様なインターフェースでストリームアプリケーションを簡単に書くことができます。

これに関しては公式のドキュメントを見る方が明らかに早いので、全体を知りたい方はこちらを参照してください。 日本語版は翻訳が追い付いていないせいか結構古いので最新の機能を知りたい場合は英語版を参照してください。

https://docs.confluent.io/ja-jp/platform/6.2.0/streams/developer-guide/dsl-api.html (ja) https://docs.confluent.io/platform/7.0.1/streams/developer-guide/dsl-api.html (en)

以降ではこの後の解説に必要な特に重要なDSLについてのみ解説していきます。

KStream, KTable, GlobalKTable

Kafka StreamsのDSLではトピックからデータを取得する方法として3つのパターンを提供しています。 これら3つの入力を表すオブジェクトに対してメソッド呼び出しを繋げることでDSLを記述します。

KStream

単純なKafkaトピックからの入力ストリームを指します。

StreamsBuilder builder = new StreamsBuilder();

KStream<String, Long> wordCounts = builder.stream(
    "word-counts-input-topic", /* input topic */
    Consumed.with(
      Serdes.String(), /* key serde */
      Serdes.Long()   /* value serde */
    );

この様に入力を指示する時にSerde(Serializer/Deserializer)を指定します。(実際の入力ではDeserializer側しか利用されないが)

このSerdeを通して、Kafka StreamsはJavaオブジェクトとバイト配列を自動的に変換します。

KTable, GlobalKTable

テーブルの様に扱える入力ストリームを指します。 テーブルの様に扱えるというのは、読み込み始めから現在に至るまでのKafka Brokerに入力された全レコードをローカルに保持していて、他のストリームと結合したりKVSの様に利用できることを意味します。

これは入力レコードの状態を保持しているということでもあり、KTableを使うということはステートフルなアプリケーションである、ということでもあります。

KTableは各パーティションの内容は割り当てられたクライアントにしか保持されませんが、GlobalKTableはあるトピックの全パーティションのデータを全ノードが保持し続けます。そのため入力トピックのデータ量に気を付けて利用する必要があります。

GlobalKTable<String, Long> wordCounts = builder.globalTable(
    "word-counts-input-topic",
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
      "word-counts-global-store" /* table/store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Long()) /* value serde */
    );

KTableにはストア名とストアするデータのSerdeが必要です。 このストア名で各ノードにはRocksDBのデータベースファイルが作成され、そこにデータが蓄積されます。 このデータストアをStateStoreと呼びます。 上記の例にあるMaterialized.asというメソッドを利用してストア名やSerdeを切り替えたり、その他引数の渡し方でインメモリのデータストアを使う様に変更することも可能です。

StateStoreに保持されたデータは(インメモリストアでない限りは)プロセスが再起動したとしても維持されますが、パーティションの割り当てが変わって担当するタスクが切り替わると不要になったり、新しく再作成が必要になる場合があります。 不要になったRocksDBのデータは一定期間の後に自動的に削除されます。

GroupBy, GroupByKey

KStreamやKTableを集計したい時にどのキーを基にして集計するかを指示するDSLです。countやaggregateの前提として呼び出しておく必要があります。

KGroupedStream<String, String> groupedStream = stream.groupBy(
    (key, value) -> value,
    Grouped.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.String())  /* value */
  );

groupByを利用すると必ずデータの再パーティションが行われます。再パーティションは、再パーティション用のトピックが自動的に作成され、そのトピック宛にレコードを送信して、再度取得するという処理を行います。 ネットワークやストレージに冗長な負荷がかかるため、再パーティションは極力避けるべきものです。

パーティション化を避けるためにはgroupByKeyを利用します。

KGroupedStream<String, String> groupedStream = stream.groupByKey(
    Grouped.with(
      Serdes.String, /* key */
      Serdes.String())     /* value */
  );

groupByKeyは現在のキーをそのまま使ってグループ化を指示します。この場合は処理の過程でキーが変更されていない限りは再パーティションは行われません。mapなどの変換処理によってキーが変更されている場合はgroupByKeyを使っても再パーティションが実行されてしまいます。

aggregate

DSLの中では最も汎用的な集計処理を実装できるDSLです。 イニシャルの値を生成するイニシャライザ、新しくレコードが届いた時に実行されるアダー、変更前のレコードの情報を利用して実行されるサブトラクター(省略可能)の3つのラムダを渡して集計処理を実装します。

KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
    () -> 0L, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
    (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
    Materialized.as("aggregated-table-store") /* state store name */
    .withValueSerde(Serdes.Long()) /* serde for aggregate value */
);

集計結果を処理ノードのローカルにずっと維持していなければ、次のレコードが届いた時に今の集計値が何なのかを取得できません。そのため、集計処理は必然的にKTableになります。集計処理した結果はDSLが自動的に構築したStateStoreに格納され保持されます。

StateStoreの実態については色々と解説しなければならない要素があるため、この後でまとめて解説します。とりあえず結果がStateStoreという場所に格納されている、ということを覚えておいてください。

windowedBy

一定の時間ごとに集約範囲を区切ることができるDSLです。この機能を利用することで、5分ごとの合計を算出し続けたり、一回のセッションの中に含まれるイベントの数を数えたりといったことが可能になります。 window処理は時間軸に依存した処理なので、レコードのタイムスタンプと保持期間が重要になります。 場合によっては古いタイムスタンプのレコードが遅れて到着することもあり得ますし、古い集計データをいつまでも保持していたらデータが無限に増えてストレージを圧迫します。 そういった状況に対応するため、window処理には保持期間を別途設定できる様になっています。

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
    .aggregate(
      () -> 0L, /* initializer */
      (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
      Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

ウインドウ処理にはその区切り方によって、Tumbling Window, Sliding Window, Hopping Window, Session Windowと現時点で4つの種類があります。 割とややこしいため、 https://docs.confluent.io/ja-jp/platform/6.2.0/streams/developer-guide/dsl-api.html#windowing を参照することをオススメします。

Processor API

Kafka Streamsのlow level APIにあたるもので、上記のDSLは全てこのProcessor APIで実装されています。

端的に言えば、Processor APIは単なるクラス定義として利用できます。 initメソッドにProcessorContextというオブジェクトが与えられるので、それを利用してKafka Streamsの情報を入手したり、StateStoreへの参照を取得したり、後続の処理にデータを送ったりすることができます。 詳細は、 https://docs.confluent.io/ja-jp/platform/6.2.0/streams/developer-guide/processor-api.html を読んでください。

処理の実態はprocessメソッドに実装する様になっており、レコードが到達する度にこの処理が呼ばれます。 また、一定時間ごとにスケジュール処理を行うこともできます。 ProcessorContextl#scheduleメソッドでPunctuatorと呼ばれる処理を登録することで一定期間ごとに自動的に実行できます。 実行間隔のタイムスタンプをどう扱うかについてKafka StreamsはSTREAM_TIMEとWALL_CLOCK_TIMEの2種類を提供しています。それぞれの違いについては上記リンクに詳細がありますので、そちらを参照してください。

以下は、上記のドキュメント(英語版の最新)から引用したコードサンプルです。 重要な箇所はStateStoreを取得している箇所と、スケジュール処理を登録している箇所、そしてcontext.forwardを呼び出して後続にレコードを渡している部分です。

public class WordCountProcessor implements Processor<String, String, String, String> {
    private KeyValueStore<String, Integer> kvStore;

    @Override
    public void init(final ProcessorContext<String, String> context) {
        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
            try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
                while (iter.hasNext()) {
                    final KeyValue<String, Integer> entry = iter.next();
                    context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
                }
            }
        });
        kvStore = context.getStateStore("Counts");
    }

    @Override
    public void process(final Record<String, String> record) {
        final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");

        for (final String word : words) {
            final Integer oldValue = kvStore.get(word);

            if (oldValue == null) {
                kvStore.put(word, 1);
            } else {
                kvStore.put(word, oldValue + 1);
            }
        }
    }

    @Override
    public void close() {
        // close any resources managed by this processor
        // Note: Do not close any StateStores as these are managed by the library
    }

これをKafka Streamsの中に組込むにはTopologyクラスのコンストラクタ、またはStreamBuilder#buildで生成されたtopologyに対してTopology#addProcessorを呼び出すことで、任意のProcessorクラスを差し込むことができます。 また、この時利用したいStateStoreがあればその名前を渡して関連付けておきます。そうでないとProcessor APIからStateStoreを見つけることが出来ません。その場合は例外が発生します。

Topology topology = new Topology();

// add the source processor node that takes Kafka topic "source-topic" as input
topology.addSource("Source", "source-topic")

    // add the WordCountProcessor node which takes the source processor as its upstream processor
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")

    // add the count store associated with the WordCountProcessor processor
    .addStateStore(countStoreSupplier, "Process")

    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

StateStoreの実態

公式で提供されているStateStoreは大きく分けてRocksDBを利用したpersistentストアとプロセスが終了すると揮発するインメモリストアの二種類ありますが、一般的によく使われるのはpersistentの方だと思います。まず共通の要素について解説していきます。 ちなみに、APIの形式に則って自分で実装してしまえば、独自のStateStoreを定義することも可能です。

StateStoreをlow level APIで利用する場合は自分でStateStoreを構築することができます。

StoreBuilder countStoreBuilder =
  Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("persistent-counts"),
    Serdes.String(),
    Serdes.Long()
  );

これをProcessor APIで利用したい時には以下の様な形で利用します。

KeyValueStore<String, Long> counterStore = context.getStateStore("persistent-counts");
long count = counterStore.fetch("key");
count++;
counterStoreB.put("key", count);

この例では単純なキーの完全一致による取得を行っていますが、Kafka Streamsが提供しているStateStoreはキーのバイト列でソートされているため、rangeメソッドを使って一定の幅のキーを取得することもできます。

StateStoreにおいて重要な要素は以下の通りです。

  • KVSであり、キーの順番でソートされている
  • キーもバリューも実際に格納される値はバイト列であり、アプリケーションとやり取りする際にはSerialize/Deserializeを行う
  • changelogトピックを利用して耐久性を担保する
  • 読み書きに関するcacheの仕組みを持っている

changelogトピック

先程軽く触れましたが、StateStoreを利用すると自動的にchangelogトピックというものがKafka Broker上に作成されます。(インメモリストアの場合はデフォルト無効)

changelogトピックはStateStoreに書き込まれた変更内容を保持しているトピックです。StateStoreにデータが書き込まれたり削除されたり(nullを書き込む)する度に、changelogトピックにレコードが送信されます。

Kafka Streamsではこのchangelogトピックを使って処理ノードのダウンやタスクの再割り当てに対応します。

新しくKafka Streamsのプロセスが立ち上がった時、もしそのアプリケーションがStateStoreを利用する様に定義されていて、かつそのローカルストレージにStateStoreのデータが存在しない場合、Kafka Streamsはレストア処理を行いKafka Broker上に存在するchangelogトピックから全てのレコードを取得し、ローカルのStateStoreを復元します。

同様にノードの増減によってリバランスが実行され、タスクが再割り当てされた時にも、自身のローカルに割り当てられたタスクIDに対応するStateStoreが存在しなければ、まずレストア処理が走ってから処理が継続されます。

Kafka Brokerはproductionで運用する場合普通であれば3つ以上のレプリカを持つ様に設定されており、データ自体の耐障害性はKafka Brokerの機能によって担保する様になっています。

レストア処理の注意点

StateStoreを活用する上で最も注意しておかなければならないことはレストア処理です。

Kafka Streamsはレストアが必要になった時点で処理を停止します。処理が開始・再開されるのはレストア処理が完了した後です。 つまり、StateStoreに溜まっているデータ量が非常に多くなると、レストア処理にかかる時間も長期化していき、いざノードがダウンしたり増減したりする時に、長時間処理が停止する危険が生じます。

これを避けるのは現状かなり難しい問題です。一定以上古いデータや利用頻度の低いデータはネットワークのオーバーヘッドを許容して外部ストアに逃がすなど、データ量を削減する仕組みをコツコツ積み重ねるしかありません。

changelogの無効化

StateStoreを作成する時、StoreBuilderのwithLoggingDisabled()を呼び出しておくと、changelogとの対応関係を無効化することができます。インメモリストアの場合はデフォルトで無効になっています。

changelogトピックが無効になっているとレストア処理が実行されません。RocksDBストアを利用している場合、同じノードで同じIDのタスクが実行されている限りはそのデータが保持され続けます。 一方で、Consumerのリバランスによりタスクの再配置が行われた場合や、処理ノードがダウンして復帰不可能になった場合は、そのデータは消滅します。

主な用途はローカルキャッシュです。メモリに乗り切らない量でもファイルシステムを利用してノードローカルなキャッシュとして利用することが出来ます。

例えば、不変のデータであれば外部のデータストアに蓄積しておいて、最初の一回はネットワークアクセスのオーバーヘッドが発生するが、その後はローカルのファイルシステムにキャッシュしてネットワークを経由したアクセスを回避する、といった用途に利用できます。

StateStoreのキャッシュ

ストリーム処理はその性質上短期間に同じキーのデータを何度も読み書きする可能性が高いものです。例えば、あるuser_idのリンクを踏んだ数などをカウントしている場合、そのイベントレコードが集中して何件も届くというのは十分あり得ることです。 こういった時に、都度ストレージにデータを書き込んでchangelogトピックを更新するのは無駄なオーバーヘッドに繋がる場合があります。

Kafka Streamsではそれを避けるためにデフォルトでキャッシュが有効になっています。書き込んだ内容は一定期間メモリ上に保持され、flushタイミングで実際のStateStoreに書き込みchangelogを更新します。

DSLで構築されるKTableではこのキャッシュ機能を利用してforwardが遅延されます。 例えばaggregateで集約された結果を次のDSLで利用したり別のtopicに送信したりする際には、デフォルトの挙動では一定期間StateStore上にキャッシュされてそこでバッファリングされます。一定時間の経過かキャッシュ用のメモリ領域を使い切ったら、CacheFlushListenerがレコードを次のProcessorにforwardします。

WindowStoreの実態

先程DSLの説明の中でwindowByというDSLについて紹介しました。このDSLを利用することで一定の時間ウインドウごとにレコードを集計することが出来ます。

実はこれはWindowStoreというStateStoreのバリエーションによって実装されています。 WindowStoreを作成する時は以下の様にします。1分毎のウインドウ幅で、2時間分のデータを保持するWindowStoreは以下の様に作成します。

StoreBuilder<WindowStore<String, Long>> counterStoreByMinute =
      Stores.windowStoreBuilder(
              Stores.persistentWindowStore(
                  "counter",
                  Duration.ofHours(2),
                  Duration.ofMinutes(1),
                  false),
              Serdes.String(),
              Serdes.Long());

これをProcessor APIで利用したい時には以下の様な形で利用します。

WindowStore<String, Long> counterStoreByMinute = context.getStateStore("counter");
long durationMillis = Duration.ofMinutes(1).toMillis();
long currentWindowStart = (context.timestamp() / durationMillis) * durationMillis;
long count = counterStoreByMinute.fetch("key", windowStart);
count++;
counterStoreByMinute.put("key", count, currentWindowStart);

キーバリューストアにタイムスタンプを合わせて付与する形で取得や保存を行います。 DSLの裏側にある場合は自動でtimestampからTime Windowを設定してくれますが、WindowStoreを直接扱う場合は、TimeWindowの開始となるタイムスタンプがどこかは手動で算出する必要があります。 WindowStoreに事前に与えられたWindow Durationは、記録されたタイムスタンプを基準にしてどれぐらいの長さのウインドウなのかを示す際に使われます。

上記の例は非常にシンプルな形でしたが、timestampの開始と終了時刻を渡すことで、その範囲に含まれるレコードを順番に取得することもできます。 また、通常のKeyValueStoreと同様にキーのレンジ探索とtimestampの開始と終了時刻のレンジ探索を組み合わせることも出来ます。

内部実装としては、WindowStoreは複数のRocksDBのストアから成り立っていて、記録するタイムスタンプごとに大まかなsegmentに分割されています。保存する時にtimestampから対応するsegmentを特定しその領域にデータを書き込みます。 取得する時は対象範囲のsegmentからのみデータを取得することで探索範囲を小さくしています。 またtimestampの範囲でsegmentが決定できるため、現在処理しているタイムスタンプと比較して保持期限を過ぎたものは、まとめてファイルごと削除することで簡単に破棄できます。

書き込み時にはレコードに与えられたキー(上記の例では"key"文字列)の末尾にミリ秒単位のunix timestampが自動的に付与されて記録されます。

次回に続く

非常に長くなったので、一旦ここらで区切って続きはまた次回とさせていただきます。

次回はDSLでは実現できないがProcessor APIでなら実現可能な応用編と、実際にアプリケーションを開発する時の基本的な考え方について解説する予定です。

Kafka入門 第1回 「そもそもKafkaとはなにか」

これは社内向けに書いた、Kafkaってそもそも何やねん、ということをメンバーに解説するための記事を一部編集して公開できる様にしたものです。 第2回以降では、Kafkaを利用したアプリケーション開発のノウハウについて解説していく予定です。そちらも社内の事情を除いた形で公開していくつもりです。

そもそもKafkaとは

Kafkaはイベントストリーミングプラットフォームと呼ばれるミドルウェアです。 元々はストリームバッファと呼ばれてたと思います。

公式のドキュメントには以下の様に書かれています。

 Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution:

    To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
    To store streams of events durably and reliably for as long as you want.
    To process streams of events as they occur or retrospectively.

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner.

つまり、イベントストリームを書きこんだり読み込んだり出来て、継続的に他のシステムからimportしたりexportしたりできて、それを一定期間(または無期限に)保持し続けることが出来て、発生した時に処理するだけでなく遡って過去のデータを処理することもできるプラットフォームであるということです。 加えて、分散処理可能で、スケーラブルであることも特徴です。

プラットフォームの構成要素は以下の通りです。

  • Kafka Broker ミドルウェアの本体
  • Kafka Client ミドルウェアと通信するためのクライアントライブラリ
  • 上記クライアントによって実装された管理コマンド群
  • Kafka Streams and Kafka Connect アプリケーション開発のためのフレームワーク

イベントストリームとは

終端が存在しない継続的なデータの流れのことです。

システム的にはデータベースやモバイルデバイス等によって発生したデータの動きをリアルタイムにキャプチャし続けたデータの流れと言えます。 一つ一つのデータの発生や変化をイベントと呼び、それらが終端なく延々連なっているものをイベントストリームと呼びます。 このイベントストリームに対して高速に処理を行うことができれば、我々のシステムは、ユーザーに対して即座に最新の結果を届けることが出来る様になります。

イベントストリームには終端が存在しない上にその目的上リアルタイムに近いインターバルで処理することが求められるため、基本的には一件単位で処理を行うことになります。 集計などの複数レコードに渡る処理については、開発者自身で必要なデータの範囲を定義して情報を蓄積しておくストレージを別途用意する必要があります。

Kafkaのエコシステムにはそういったストリーム処理で現実のユースケースを実現するために必要なライブラリも用意されています。

Kafkaでは複数のイベントストリームを扱うことができ、それをトピックと呼びます。トピックは名前を付けて管理できる様にしたイベントストリームということです。

トピックとパーティション

各トピックは1つ以上のパーティションによって分割されています。 また、トピックにはreplication factorが設定されており、各パーティションの複製がどれだけ存在するかを設定できます。replication factorが3であれば、あるパーティションのデータの複製がクラスタ内部に3つ存在するということです。

あるパーティションのあるレプリカは一つのノードによって保持されます。

例えば、TopicAのパーティション数が8でreplication factorが3であった場合、クラスタに所属している各ノードに合計24のパーティションのデータが分散して配置されます。

kafkacat (kcat) コマンドによる出力は以下の様になる。ノード数は5台。

  topic "TopicA" with 8 partitions:
    partition 0, leader 5, replicas: 5,3,1, isrs: 1,5,3
    partition 1, leader 3, replicas: 3,4,2, isrs: 4,2,3
    partition 2, leader 1, replicas: 1,4,2, isrs: 1,4,2
    partition 3, leader 4, replicas: 4,2,5, isrs: 4,2,5
    partition 4, leader 2, replicas: 2,5,3, isrs: 2,5,3
    partition 5, leader 5, replicas: 5,2,3, isrs: 2,5,3
    partition 6, leader 3, replicas: 3,5,1, isrs: 1,5,3
    partition 7, leader 1, replicas: 1,5,3, isrs: 1,5,3

トピックに保持するデータの一行一行をレコードと呼び、レコードはキーとバリューのペアで構成されます。 KafkaのBrokerにとってはキーとバリューはただのバイト列であって、それがどういう型のデータかはクライアントによって解釈されます。 例えば、JSONシリアライズされたデータである場合や、AvroやProtocolBufferによってバイナリシリアライズされたデータである場合、ただの文字列である場合、バイト列をそのまま数値として利用する場合、などシリアライズ方式に依存します。

何故Kafkaが必要なのか

Kafkaは非常に応用範囲が広いため一言で説明するのが難しいが、以下の様なユースケースがあります。

  • システム疎結合化やスケーラビリティ向上のためのメッセージブローカー
  • Webシステムの行動トラッキング
  • システム負荷などをモニタリングするためのメトリックの記録
  • ログ集約
  • イベントソーシング

弊社では、システム疎結合化のためのメッセージ基盤かつ、ユーザーの行動データのリアルタイム集計及び、その集計結果をサービスにリアルタイムに反映するための基盤として利用しています。

Kafkaはキューではない

Kafkaはキューの様なこともできるし、分散キューと呼ばれることもあるが正確にはキューではありません。

Kafkaはイベントストリームを一定期間に渡って保持し続けており、Consumerがそれを処理したとしてもキューと違って保持しているデータは変化しません。 キューの場合は、Consumerがデータを処理した場合そのデータはキューから削除され、他のワーカーから利用できなくなります。そして、基本的には先頭のデータが消えない限り次のレコードは処理されません。 例えばAWSのSQSはその様に動作します。(visibility timeoutなどである程度順番は制御できる) AWSではKinesisがかなりKafkaに近いサービスとして存在します。(現在はManaged Kafkaも提供されている)

キューでないことによる利点

トピックの共有

データがずっと保持されるということは、ある一つのトピックを複数のシステムで共有できるということを意味します。 つまりデータの発生源はKafkaにデータを送信しておくだけで、複数のマイクロサービスを動かすことができます。その先にどんなサービスがあるかを知る必要はありません。 各サービス側は自分がどのトピックを利用するかだけ分かっていれば良く、そこからデータを取り出して自分の処理をするだけで良くなるため、データの発生源について知っておく必要はなくなります。 仮にデータを利用したいサービスが増えたとしても、データの発生元に送信先を追加する必要はありません。 Kafkaの本体となるミドルウェアをメッセージブローカーと呼びますが、この様にメッセージを仲介してサービス間を疎結合に保つ役割を果たすことができます。

耐久性

Consumerがデータを処理してもデータが変化しないということは、Consumerが処理中に何らかのエラーが起こって処理ノードごとダウンしたという場合であっても、データが失われないことを意味します。 そのため、Consumerでエラーが起きたとしても再キューをする必要はなく、ただ別のノードで同じオフセットから処理を継続すればデータを失うことなく処理を復帰できます。 sidekiqなどの様にRedisを処理キューとして利用した場合には、処理中のワーカーがエラーハンドリングを適切にやらないとデータを失う危険性があります。もしOOMなどによってプロセスが突然死した場合には復帰は不可能です。 Kafkaにおいてはそういった心配はほぼ必要なくなります。

一方で、exactly onceモードを利用しない限りはリトライによって二重処理される可能性はあるため、羃等性に関しては別途注意が必要です。

過去のデータを利用できる

Kafkaは一定期間(デフォルトで7日間)のデータを保持し続けていることに加えて、キューと違って任意のオフセットからデータを読み出すことが出来ます。 そのため、過去のデータを利用して処理を開始することが出来ます。

例えば、新しくマイクロサービスを追加した時に、ある程度過去のデータから処理を開始することができますし、一定期間停止しているサービスがあったとしても、再開した時に停止したタイミングまでのデータが残っていれば、安全に処理を再開できます。

キューでないことによる複雑さ

オフセットの記録

キューと違ってデータが消えないということは、Consumer側で自分がどこまでレコードを処理したのかを覚えておく必要があることを意味します。 そのため、KafkaのConsumerクライアントはオフセットを記録しておく場所が必要になります。Kafkaのプロトコルでは、オフセットの記録場所としてもKafka Brokerを利用します。 Kafka Brokerは無期限にデータを保持することが出来るため、各クライアントのオフセット情報を記録するためのストレージとしても利用できます。 クライアントがどこまで処理をしたかのオフセット情報の更新もまたイベントの一つであり、それが延々更新され続けているイベントストリームでもあります。それをKafka Broker自身に格納しておいて、クライアントは起動時にそのオフセット情報をトピックから読み取ります。Brokerに残っている中で最新の値が今までに処理したレコードのオフセットになります。

ちなみにAWSKinesisでは、この様なオフセット情報の記録をDynamoDBを利用して行います。

Consumer Group

キューと異なることによって生じる複雑さの影響はオフセットの記録だけではありません。Consumerの処理によってブローカーのデータが変化しないので、もし複数のワーカーが同じトピックを処理しようとしたら、オフセットの記録だけではワーカーの数だけ重複して同じ処理が実行されてしまいます。

そのため、Kafka ConsumerにはConsumer Groupという概念が存在します。Consumer Groupはどういう目的でKafkaを利用しているかを指す集合であり、Consumerはそれに所属してBrokerからデータを取得します。

同一のConsumer Groupに所属している場合、Kafkaのあるトピックのあるパーティションはそのグループ内で一つのConsumerでしか処理できません。(一つのConsumerが複数のパーティションやトピックを処理することは出来る)

図で示すと以下の様になります。 consumer-groups.png (26.8 kB)

そのため、あるトピックのパーティション総数以上に、そのトピックを処理するConsumerを増やすことはできません。 パーティション数が30であれば、そのトピックを処理できるConsumerの数は30が上限になります。この数はインスタンスの総数で、同じノードに30あっても30ノードに1つづつあっても一緒です。

Kafka Brokerはパーティショニングをしない

Kafkaの大きな特徴の一つとして、パーティショニングを行う主体がBrokerではないという点が挙げられます。 Kafkaにおいてパーティショニングを行うのはProducerです。

データの発生源がKafka Brokerにレコードを送信する時に、クライアントライブラリの実装によってパーティションが決定されます。 そのため、クライアントライブラリが違っていればパーティショニングのロジックが異なる可能性があります。 例えば、JavaのProducerではデフォルトのパーティショニングロジックはキーに対してmurmur2によるハッシュ化を行いその結果をパーティション総数で割って剰余を利用します。 一方でRubyruby-kafkaという実装ではcrc32を利用してハッシュ化を行い同様に剰余を取って利用します。 また、Rubyには他にlibrdkafkaというCのライブラリのバインディングとして実装されたgemがありそちらはcrc32, murmur2, fnv1aなどが選択可能になっています。 この様に同じデータを送信したとしてもクライアントの実装によってパーティションIDが異なる場合があります。

また、厳密にはレコードのキーとパーティショニングは直接関係している訳ではありません。 あくまでデフォルトの挙動としてレコードのキーを利用するだけで、クライアントは任意のデータを任意のロジックでパーティショニングした結果を直接レコードに指定してBrokerに書き込むことができます。

例えばruby-kafkaを利用していたとしても、自力でmurmur2のハッシュ化を行って同一のハッシュ値を算出できれば、それを元に算出したパーティションを直接レコードに指定することで、Javaのライブラリと同じパーティションにレコードを送信できる様になります。

パーティショニングのロジックを揃えることは実際のシステムにおいてはかなり重要です。

例えば、あるユーザー(ID:100)の1日あたりのアクセス数を集計したい場合、ID:100のユーザーのレコードは一台のノードで処理できることが望ましい。 Consumer Groupによってあるパーティションのデータはある1台のノードでしか処理されないことが保証されるので、一貫してID:100のユーザーのレコードが特定のパーティションに送信されるのであれば、各ノードは単に受け取ったデータをローカルに集計結果として蓄積しておけばよくなります。 この時、もし新しいデータの発生源が追加され、クライアントライブラリの実装が異なったせいでパーティショニングの結果が揃わなくなったとしたら、集計結果の蓄積場所がバラバラになってしまい、正しく集計できなくなります。

この様に、どういうキーとどういうアルゴリズムによってそのトピックのデータがパーティショニングされているかを把握することはとても重要です。

基本的には公式のクライアントの実装であるmurmur2 hashingを利用したパーティショニングに揃えておくのが一番安全だと思います。

パーティション数の見積りの重要性

Kafka Brokerを実際に運用する際に非常に難しい要素となるのが、パーティション数を適切に選択することです。

先に述べた様に、同一のConsumer Groupがあるトピックのパーティションからデータ取得できるのは1つのクライアントだけです。つまりスケールできるクライアント数の上限がパーティション数によって決定されてしまいます。

それなら、パーティション数を後から増やせば良いのでは?と考えるかもしれませんが、実際のところそれはそう簡単にはいかないケースも多々あります。 前段で、パーティショニングロジックが異なって違うクライアントに処理が移ってしまうと集計処理が正しく実行できなくなる可能性があると説明しましたが、パーティション数を後から変えた場合も同様の事が発生する可能性があります。

パーティショニングは基本的にキーとなる値をハッシュ化してトピックのパーティション総数で割った剰余を利用します。 パーティションの総数が変化すると同一のデータが割り当てられるパーティションが変わってしまいます。そうなると最終的に処理しているノードが変化してしまい、集計処理が壊れるという事態が発生します。

そのため、パーティション数を後から変更することは、場合によっては非常に手間のかかる難しいオペレーションになります。 実際にこのトピックをどれぐらいの台数までスケールしたいのかをよく考えてパーティション数を見積るのが重要になります。 基本的な見積りの指針となるのは、ネットワークスループットと要件として求められている処理スループット、そして処理にかかるレイテンシです。

補足ですが、処理ノードが変わったとしても集計処理に関係が無い様にアプリケーションを構築すれば良いのではないかと思うかもしれませんが、それはストリームプロセッシングにおいては余りやりたくない設計になります。 何故ならストリームプロセッシングにおける集計では、バッチ処理よりも遥かにデータ取得や書き込みの回数が多くなります。そのためネットワークを介して別ノードにデータを保持する仕組みは無視できないレイテンシの増加を招くことがあり、かなり強い必要性が無い限りは採用したくない設計と言えます。 この点に関する詳細は、以降の解説記事で紹介していく予定です。

まとめ

Kafka はリアルタイムに大量のイベントデータを処理するためのプラットフォームであり、その中核を為すのがKafka Brokerというミドルウェアである。

イベントデータはユーザーの行動ログだったり、システム間のメッセージのやり取りであったり、データベースの変更履歴であったり、様々なものが含まれる。

イベントデータは複数のパーティションに分割されたトピックに格納される。格納されるデータはキーとバリューで構成されたレコードであり、中身は単なるバイト列である。

Kafka Brokerはキューとは異なり、Consumerの処理によってデータが削除されないため、複数のシステムで同じトピックを共有したり、過去に遡ってデータを処理したり、リトライすることが容易である。

一方で、自分がどの範囲のデータを処理する責務を負っているかはクライアント自身で認識しておく必要があり、そのためにオフセット記録やConsumer Groupなどのキューには無い概念が必要になる。

パーティショニングの責任はクライアントが持つため、パーティション結果に依存する集計処理などを行う際には、クライアントライブラリの実装を把握しておくことが重要になる。

(まあ、大体の事は 公式ドキュメントに書いてあるので、これを全部読めば分かるのだが。)

Kafka Streamsを本番運用する時に検討しておくべきconfigの設定値について

Kafka Streamsをそれなりのデータ規模で運用していくとデフォルトの設定値では動作に困るケースがしばしば出てきます。

その中でも気にしておいた方がいい設定値について紹介していきます。

acceptable.recovery.lag

これはあるタスクにおいてStateStoreとchangelogトピックのlagのどこまでを追い付いていると許容するかどうかを設定します。

デフォルト値は10000なんですが、KTableとかで外部からデータが放り込まれていてそれが結構なデータ量だと10000とか一瞬で越えてしまって、一瞬プロセスを再起動しただけでも追い付いていない、みたいな扱いになってrebalanceが即座に走ってしまう挙動になりました。 実際にデータが入ってくるペースを考慮した値に設定しておかないと、プロセスを再起動した際に余計なrebalanceやレストアが走って無駄な停止時間が発生する可能性がある様です。

probing.rebalance.interval.ms

warmup replicaという仕組みが入って、一定期間ごとにwarmup状態をチェックするためのprobing rebalanceという処理が走る様になったのですが、それの実行間隔を指定します。

このconfigは公式のドキュメントだと重要度がLow扱いになってるんですが、これも運用規模によっては結構危険な設定値でした。デフォルト値は10分間です。 ノード数とかpartition数が多いとrebalanceが完了するまでに結構時間がかかる可能性があって、もしrebalanceが安定するまでに10分ぐらいかかってしまう場合、probing rebalanceがずっと終わらなくて、動作こそ継続できるもののクラスタの状態が不安定になるケースがありました。そういった場合に少し長めの値にすると状態が安定するかもしれません。

fetch.max.bytes, max.partition.fetch.bytes

consumerが一度に取得するデータサイズの上限を設定する値です。

これは全体のスループットや、StateStoreのレストア速度に影響します。余り小刻みにデータを取得するとその分の通信オーバヘッドでスループットが低下するので、データ量やネットワーク速度に合わせた値に設定することで、レストアにかかる時間が改善する可能性があります。

最近のjoker1007

Rubyist近況[1] Advent Calendar 17日目です。

仕事的な近況

最近、仕事で全然Rubyを書かなくなってきた。たまにRailsアプリの改修作業をやる程度で、今年書いた量で言うなら間違いなくJavaが一番多いだろう。

直近で書いたブログ記事を参照してもらえると分かり易いが、ここ1年ぐらいの自分の主戦場はKafkaとストリームプロセッサである。

Kafkaは非常に便利なミドルウェアだがメッセージキューの様でメッセージキューでなく、分散ストレージとして動作するがブローカー自体は余り分散の仕組みをコントロールする訳でもなく、クライアントライブラリ側で色々制御する様な仕組みになっているので、どういう活用の仕方をするものなのかイマイチ理解しづらい難しいミドルウェアでもある。

自分は完全に0から考えて必要だと思って調査してほとんどの仕組みを勝手に作ってしまったので性質も活用方法も分かっているが、そんなにすぐに各メンバーにそういった知識が伝えられる訳では無いので、Kafkaをゴリゴリ使う機能は必然的に自分が主導で書くことになる。

段階的に知識を移譲している最中だが、当分の間は作って説明して資料を作ってというのを繰り返すことになるだろう。

こんな感じで、ひたすらKafka周りの整備や作ったストリーム処理アプリケーションのパフォーマンスチューニングや新機能の実装をやっているとJavaばかり書くことになってしまった。今は秒間10万件のレコードを速攻で処理するストリーム処理基盤にガシガシ機能を追加している。

実際、Javaを結構書く様になって思ったが、最近のJavaには余りストレスを感じない。まあinterfaceの扱いとか中途半端なOptionalとかコンストラクタの制約とかlombok頼りな現状とか色々どうにかならんのかと思うことはあるが、長年の積み重ねで磨き抜かれたInteliJが余りに便利なので、開発体験自体はかなり良い。

(正直、そびえ立つまでに大きくなったRailsアプリケーションを改修するより遥かに楽とさえ言える……。やっぱ型検査とそれを利用した高度なLanguage Serverとか欲しくなるよね。)

そして、Rubyを全然書いていないのでgemのメンテやら何やらも割と放置気味で、3.1が出ようとしているこの時期になっても、全然新機能で遊んでいる余裕もない。Rubyistとしては恥ずかしいばかりである。仕事でMPを使い切っているのも余り良い状態ではない。もっとゆとりがあった方が良いのは間違いないだろう。

一方で、Kafkaについては多分かなり詳しくなったので、無茶苦茶高度な質問でなければ答えられるぐらいには理解したと思う。もしKafkaに興味がある人が居たらどこかで情報交換できると嬉しい。

ちなみに、何故Kafkaを使ってるのか、何がそんなに大事なのかという話については、後日改めてブログに書こうかなと思っているので、興味がある方は是非読んでやってください。

個人的な近況

今年は年の始めにzen3アーキのRyzenRadeon RX 6900 XTを使って新しいPCを組んだので、PC環境は今のところかなり快適な状態。

joker1007.hatenablog.com

OSは昔から愛用しているGentoo Linuxを使っている。Linuxにおけるノイキャンの方法とかビデオ会議やらなんやらの環境構築については、以下の様な記事を書いたので、もしLinuxに乗り換えようか考えてる人が居たら、参考になるかもしれません。

joker1007.hatenablog.com

別にやんちゃクラブに触発されたという程ではないが、SteamのおかげでLinuxでも結構PCゲームは出来る様になっており、Gentoo LinuxでもPCゲームはちゃんと遊べるぞ、ということを主張するためにゲームプレイを配信していこうかと思って、最近配信試験をしていたりとかする。とりあえず今はFF4のピクセルリマスターのプレイをYouTubeに記録している。今はちょうどラストダンジョン直前まで来てます。

www.youtube.com

他にHorizon Zero Dawnにシェーダー噛ませてRay Tracingを使える様にした比較動画とかを作ったりしてます。

www.youtube.com

Linuxでこの辺りのことをやる方法についても、暇を見つけて解説記事書いてみようかと思っているので、こちらも興味がある方はお楽しみに。

今年買ったものとか

今年買った大きい買い物は基本的にデスク周りのものが多い。まあ基本引き篭りやしね……。

  • FlexiSpotの電動昇降机
  • 3万円ぐらいのデスク用のサウンドバー (暫定的に買ったものなので、もうちょい良いものが欲しい。HDMI 2.1に対応していて4k 120fps出るやつとか無いだろうか)
  • 10Gbps対応のtp-linkのスイッチングハブ (しかし10GbpsのNICは無い……)
  • Sony WF-1000XM4 (無くしそうと思ってたが、家の鍵と同じ固定のポケットに入れられるので割と大丈夫かもしれない)
  • iPhone13 (5年ぶりの新スマホ)

WF-1000XM4は思ってたより買って良かった。うちのGentoo Linuxと96khzで接続できたし、ちょうど緊急事態宣言も明けて神田ぐらいまで飲みに行く機会も増えた時期に買ったので、利用頻度も結構高い。最初ちょっと音が籠ってるかなと思ったが慣れてきたら十分に良い音だと思う様になってきたし、これはオススメの品だと思う。

生命的な近況

いよいよ30代も大分後半になってきて、ぶっちゃけ体のあちこちにガタが来ている実感がある。そもそも不摂生の塊みたいな人間なので生活を改めろよ、という話なのだが、面倒なので何も対処しないまま何となく生きている。

正直、後10年ぐらいしたら死ぬのでは……ぐらいの感じではあるのだが、万が一半端に長生きしてしまった時のために、そろそろ金をまともに溜める手段を考えないと、このままでは老人になった時に詰んでしまうので、その辺りも気にする様になってきた。

とりあえず、来年の目標としていい加減にIDECOとNISAぐらいちゃんとやるかと考えている。

しかし、自分は家庭の都合という奴でどうにもならない出費が物凄く多いため、どうやって安定して運用資金を捻出するか非常に難しい。実際、毎月の収支は現状トントンか弱赤字である。収入が途絶えたら即座に死ぬ。(皆どうやって金溜めてんのかなと不思議である)

まあ、今のところは働けなくなる程のダメージは負ってないので、しばらくは死なないと思う。もし死にそうになったらTwitterで死ぬ手前ぐらいまでは実況していきたいと思っている。

まとめ

という訳で、何とか生きてます。2022年もよろしくお願いします。来年はもう少し友人に会って「うぇーい」ってビールジョッキを酌み交わせる日が戻ってくると良いなあ。

Kafka StreamsのStateStoreのちょっと変わった使い方 (ローカルキャッシュ、 コンフィグストア)

この記事は Kafka Advent Calendarの14日目の記事です。(1日遅れてるけど)

今回は割とライトな記事です。

ローカルキャッシュとしてのStateStore

Kafka StreamsにおいてStateStoreは、トピックから入力されて何らかの状態を保持した処理をしたい場合にその状態を永続化するために利用するのが基本的な形です。

一方でProcessor APIを活用しだすと、別にトピックから入力されてKafka Streamsで処理したものだけが保持対象じゃないことに気付きます。

StateStoreは基本的にはchangelogと紐付いているので、大規模な集計処理等を行ってデータ量が膨れ上がると、もしノードのストレージが無くなった時やrebalanceが走った時に、レストアに非常に長い時間が必要になります。これは現時点でちょっとやそっとでは解消しようがありません。

それを避けるために、通信という大きなオーバーヘッドを抱えても外部のデータソース(RedisやCassandraなど)からデータを取得したくなることがあります。

RedisやCassandraはかなり読み込みが高速なデータストアですが、Kafka Streamsの様なストリームプロセッサで頻繁に通信を行うと流石にローカルのStateStoreと比べて劇的に性能が落ちます。

そこで、withLoggingDisabled()をセットしたWindowStoreをキャッシュに使います。

changelogと紐付いていなければレストア処理が発生しないので、Processor APIの中でWindowStoreにデータが見つからなかった時だけデータを取りにいけば、処理内容によってはかなりの割合で外部との通信処理を削減することができます。

ここでWindowStoreを利用するのは現在KeyValueStoreでは自動的にデータをexpireする方法が無いからです。WindowStoreならretention timeを調整することで一定期間過ぎたStateStoreをローカルストレージから自動で削除できます。(RocksDB自体にはTTLを設定する方法があるがKafka Streamsからはそれを変更できない実装になっている)

実際MemoryStoreには、lruMapというストアが用意されており、オンメモリに乗る範囲であればこれを利用するとローカルなキャッシュがシンプルに実現できます。ただこれはメモリに乗る範囲でしか使えないことと、プロセスを再起動したら消滅してしまうことが難点です。

KeyValueStoreでTTLが使える様になると嬉しいんですが、今のところissueがあるのみ、といった感じですね。(https://issues.apache.org/jira/browse/KAFKA-4212)

コンフィグストアとしてのGlobalKTable

GlobalKTable(GlobalStateStore)は、全処理ノードに分配されるキーバリューストアとして利用できます。つまりデータフォーマットさえ決めておけばKafkaのトピックを経由して、任意の設定値を起動中の全ノードに配布したり削除したり出来るということです。

例えば、Kafka Streamsに限らずストリームプロセッサは処理が複雑になってくると結構デバッグするのが大変です。そういった時にユーザーの処理の流れを事細かに追いたいケースがあります。

デバッグモードを作り込むことは出来ますが、全ユーザーに対してそれを動かすと負荷がとんでもないことになって処理が詰まってしまったり、ログが膨大になったりする可能性が高いでしょう。

なので、対象のユーザーを絞ったりサンプリングして取得する様にしておきたくなります。

こういったケースでGlobalKTableに対象のユーザーの識別子やサンプリングの割合などの設定値を保持可能にしておきます。開発者はCLIのKafka Producerを使ってGlobalKTableになっているtopicにJSONを送り込めば、全ノードに対して起動したまま対象ユーザーの設定を変更することができます。必要無くなればnull値を送って設定を消せば元通りに出来ます。

こういった仕組みを管理機構として組み込んでおけば、いざという時に情報収集がしやすくなります。

この様にGlobalKTableはStreamからのJoin対象としてだけ利用するのではなく、外部から全ノードに設定値を即座に転送しストリームプロセッサ内で利用する仕組みにも応用できる、という話でした。