LinuxやSteamDeckでSteamゲームをプレイする時の必須ツール「steamtinkerlaunch」の紹介

自分はGentoo Linuxで大体何でもやる様にしているので、PCゲームも大体はそのままGentooでやってます。

LinuxでSteamを遊ぶ時にはsteamtinkerlaunchというツールがとても便利なので、これを入れておきましょう。

Linuxでゲームをする人間にとっては結構有名なツールなんですが、日本語の記事がほぼ存在しません。(まあ、皆あんまLinuxでゲームやらないよね……。)

この記事自体も需要がほぼ無さそうですが、せっかくノウハウを溜めたので書いておきます。

一応、この記事の内容はSteam Deckにも対応するので、Steam Deck買ったよって人は使えるかもしれません。

github.com

Arch Linuxを使っていればAURで簡単に入ります。他のディストリの場合はyadというgui toolkitだけインストールすれば、他の依存ツールはほとんど問題無くパッケージマネージャーで入ると思います。

依存ツールが入っていれば、後はDownloadしてきて、sudo make installでOKです。実体はでかいシェルスクリプトなので特にコンパイルとかも必要無い。

詳しいインストール方法はwikiに載っています。

steamtinkerlaunchは何をするのか

steamtinkerlaunchはゲーム本体のexeファイルを起動する前のコマンドラッパーとして起動し、関連ツールのインストールや起動を自動化してくれるツールです。

例えば、ReShadeというDirect3Dの出力にポストプロセッシングシェーダーを差し込んで、グラフィックを弄るツールがWindowsのPCゲームではしばしば使われていますが、それをLinuxのProtonで動作する様に自動的にインストールしてくれたりします。

バージョンの相性があったりするので、しばしば動かないReShadeのバージョンがあったりしますが。

その他にもvulkanのoverlay機能を使ってHUDを出してくれるMangoHudというツールがあるんですが、それを起動するための環境変数を自動でセットしてくれたり、gamescopeというmicro-compositorを使うためのコマンド引数を自動でセットしてくれたりします。

マニュアルでそういった起動コマンドやオーバーレイ表示のための環境変数設定をコントロールするのはかなり面倒なので、steamtinkerlaunchに一元管理してもらうと楽になります。

steamtinkerlaunchの使い方

インストールしたら、各ゲームを起動する時にsteamtinkerlaunchを噛ます様にsteamの設定を変更します。以下の様にゲームごとのプロパティ画面を開いて起動オプションにsteamtinkerlaunch %command%と入れておきます。

この状態で起動すると、こんな画面が出てきます。

真ん中下にあるGAME MENUという所をクリックすると、ゲーム毎の個別の設定画面に入れます。

めちゃくちゃ一杯出てきますが、使いたいものにチェックするだけでOKです。設定が終わったら右下のSAVESAVE AND PLAYを押して、設定を保存してからゲームを開始します。

その他の機能

ProtonやWineのカスタム版をDLしてSteamのcompatibility toolに登録してくれたり、Vortexというnexuxmodsが提供している様々なゲームのMODを管理するマネージャーをインストールしてくれる機能などがあります。ただVortexは何回か試したんですが、結構相性がシビアみたいで自分の環境では中途半端にしか動作しませんでした。MOD自体は普通に動くものが多いのでマニュアルでインストールすれば大体使えます。

個別のツールについて

ここからは自分がよく使っているツールをsteamtinkerlaunchで利用する方法について書いていきます。

ReShade

reshade.me

上の方で説明しましたが、Direct3DやVulkanをフックして、グラフィックにポストプロセッシングシェーダーをかけられる様になるツールです。

具体的には色調補正をしたりシャープネスフィルタを追加したり光源に対するブルーム効果を追加したり、Screen Space Global Illuminationという画面表示領域を基準にしたレイトレーシングを追加したりなどが可能になります。

どれぐらい画が違って見えるかは下に方に比較動画を置いてあるので、参考にしてみてください。

利用するにはsteamtinkerlaunchのGAME MENUにある以下のオプションにチェックを入れます。

このチェックが入っていると起動前にReShadeを自動的にDLし、DLしたInstallerからdllファイルを抽出しゲームディレクトリに自動で配置してくれます。

この時注意しなければいけないのは、ゲームの実行ファイル本体と同じディレクトリに配置する必要があることです。

ReShadeはDirectX関係のDLL読み込みに偽装してフック機構を注入するため、ディレクトリがズレてると読み込めません。

ゲームの中には起動前に独自のランチャーを噛ますものがあり、画面サンプルに利用しているCyberpunk 2077もそういったゲームになります。

ゲームの実体ファイルは、インストールされているルートの下にあるbin/x64以下にあります。そのため、steamtinkerlaunchの設定画面でArchitecture exeAlternative game exe pathを指定し、インストール先のディレクトリを調整する必要があります。

インストールするReShadeのバージョンは、~/.config/steamtinkerlaunch/global.confというファイルにRSVERSという設定項目があるので、そこを編集することでコントロールできます。

自分の環境では5.4.2で動作確認できています。

新しいバージョンに変更した時はReShade updateにチェックを入れておけば、新しいバージョンを再インストールしてくれるはずです。

インストールが正しく完了していれば、ゲームのexeファイルと同じディレクトリにdxgi.dlld3dcompiler_47.dllというファイルが追加されているはずです。またReShadeの設定ファイルであるReShade.iniというファイルも一緒に追加されます。

本来であれば、それに加えてwinetricksかprotontricksという設定ツールを使って、DLLの読み込み設定を弄っておく必要があるのですが、steamtinkerlaunchが自動的に設定を調整してくれます。

これらをマニュアルでやるのは結構大変なので、steamtinkerlaunchが無いと大分面倒臭いことになります。

各シェーダーのインストール

steamtinkerlaunchの起動画面にあるGame Shadersという項目をクリックすると、自動的にGitHubで公開されているいくつかのシェーダーをDownloadし、ゲーム毎に利用するシェーダーを設定できます。有効にしたシェーダーはシンボリックリンクで各ゲーム毎のシェーダーディレクトリに配置されます。

インストールした後にゲームを起動したらデフォルトでHOMEキーを押すことで設定画面が開きます。

そこでシェーダーの有効/無効やパラメーターを調整できます。

ReShadeの設定には、結構分かりにくいところがあるんですが、ReShadeに関しては結構日本語の解説記事があるので、そちらを参考にするのが良いでしょう。

MangoHud

github.com

画面上にGPU利用率やCPU利用率、現在のFPSなどを表示してくれるツール。

こんな感じ。

vulkanのoverlayを利用しているので、vulkanを利用するものなら何でも使える。簡単にテストしたいならMANGOHUD=1 vkcubeとか実行してみましょう。

インストール自体は自分でやる必要がありますが、インストールした後に、steamtinkerlaunchで以下の項目にチェックを付ければ自動で起動してくれます。

~/.config/MangoHud/MangoHud.confを編集することで表示項目を調整できます。

GameScope

github.com

Xwaylandを経由してDRM/KMSを利用し、ダイレクトにゲームフレームを画面に描画するためのmicro-compositorです。

何のこっちゃって感じですが、フレーム描画のオーバーヘッドを減らしてくれるツールという認識で良いと思います。同じfpsでも普通に起動するよりヌルっと動く様になります。

こちらもインストール自体は自分でやる必要があります。steamtinkerlaunchのGAME MENUで以下の項目にチェックを付けると起動時に自動的にgamescopeを間に噛まして起動してくれます。

gamescope自体の設定はsteamtinkerlaunchのトップ画面にあるgamescopeという項目をクリックすると設定画面が開きます。

また、gamescopeにはAMD FidelityFX CASやnvidiaのDLSSを利用したアップスケーリングを対応していない任意のゲームに適用できる機能もあるんですが、自分の環境でそれを有効にすると終了時にamdgpuドライバがハングして死ぬという厳しい問題があったので、アップスケーリングは常用できませんでした。

使える環境であれば、好きなゲームを簡単にupscalingできる様になります。

obs-vkcapture

github.com

Vulkanの画面描画をフックすることで、低オーバーヘッドで画面キャプチャしてobs studioに取り込むことができる様になります。

ArchだったらAURがあるし、Gentooもguru overlayでインストールできるので割と簡単に導入できます。

インストールしたら、steamtinkerlaunchでEnable obs-gamecaptureにチェックを入れます。

obsの入力ソースでゲームキャプチャを選択すると画面の内容がキャプチャできます。 普通にX Window経由でやるより少ないオーバーヘッドでキャプチャできるし、どのWindowを対象にするかも選択しなくていい。

諸々有効にして起動したらこんな感じ。

www.youtube.com

ちなみにサイパン2077はエッジランナーズ効果もあってか、MODがめちゃくちゃ一杯あって、ほぼ別ゲームぐらいまで弄れるので色々楽しい。 ReShadeの設定presetも沢山あるので、ブレードランナーっぽくしたりとかして雰囲気を変えて楽しめる。

レイトレーシングをシェーダーでやると流石に30fpsぐらい落ちますが、それ以外のシェーダーは複数有効にしても数fpsぐらいのオーバーヘッドで済みます。

という訳で、Linuxでsteamゲームをやる時の必須ツールとも言える「steamtinkerlaunch」と関連ツールの紹介でした。

やっぱ、あんまり需要無いとは思いますが……。

今日からneovimでRubyの型(RBS)を書き始める方法 + 実際に書いてみた感想

しばらくRubyをあんま触ってない日々が続いてたんですが、オフラインでRubyKaigiに参加したKaigiEffectということでやる気が甦ってきたので、型を真面目に書くための準備を整えようと色々とやってました。

RubyKaigiでモダンなRubyの開発体験のデモをいくつか見たんですが、大体VSCodeだったのが生粋のVimmerである自分としては残念だったので、neovimでも色々やれるぞという環境を整えておきたかったのも一つです。

という訳で色々環境が整ったのでまとめていきます。

ちなみに、今回の題材はrbsとSteepによる型検査です。sorbetとかもありますが、自分としてはrbsの書式の方が圧倒的に好きなのでこちらでやっていきたいと思います。 (sorbetはRubyコードに直接書けるという大きなメリットはあるんだけど……)

Steepを動かす

まず対象のプロジェクトにSteepをインストールします。最新の環境に合わせたかったので、自分はGemfileでgithubのmasterが入る様にしています。 合わせて依存関係でrbsもインストールされます。

Steepとrbsの関係は過去に色々なカンファレンスで語られているので改めて自分が解説する様なことは特にありませんが、非常に簡単に書いておくとrbsRubyの型を書くための書式であるrbsフォーマットの文法定義やパーサを提供するgemで、それを使って型検査を実際に行うのがSteepです。

init

Steepをインストールしたら bundle exec steep initを実行します。これでSteepfileが生成されます。

コメントアウトされたサンプルが既に記述されているので、それを参考に書きましょう。

target :app do
  check "lib"
  signature "sig"

  library "set", "pathname"
end

SteepのREADMEに書かれているシンプルな例はこんな感じ。lib以下をsig以下にあるrbs情報を使って型検査する、という意味です。

標準の組込みライブラリに関してはrbs gemの中で型情報が定義されているものがあり、それを利用できます。libraryを使ってそれを指定することで型情報を引っ張ってこれます。

これで bundle exec steep check を実行すればとりあえずSteepは動くはずです。もし既存のプロジェクトに追加したら恐らく死ぬ程型エラーが出ますw

利用しているrubygemsの型情報

Rubyistがコードを書く時は普通は多くのrubygemsを利用します。それらの型情報はどう扱えばいいかというと、 https://github.com/ruby/gem_rbs_collection に型情報がまとめられています。 (まだ発展途上のためこれからコミュニティの力で型情報を充実させていく必要があります)

rbs gemが提供するrbsコマンドにはこのgem_rbs_collectionを利用するための仕組みが用意されています。まず以下のコマンドを実行します。

bundle exec rbs collection init

そうすると、rbs_collection.yamlというファイルが生成されます。これはBundlerでいうGemfileの様なものです。少し違う点として基本的にこのファイルは余り編集する必要がありません。次にgem_rbs_collectionの型情報をインストールします。

bundle exec rbs collection install

このコマンドでBundlerがインストールしているrubygemsの情報を自動的に検出して対応するgemのrbsを自動的にインストールしてくれます。 (存在するなら)

gem_rbs_collectionからインストールして欲しくない場合は、先程生成されたrbs_collection.yamlを編集することでignoreを指定することができます。

インストールが終わったらBundlerの様にrbs_collection.lock.yamlが生成されます。Gemfile.lockみたいなものですね。

Steepでgem_rbs_collectionを利用する

Steepはgem_rbs_collectionに対応しており、collection_config の引数にrbs_collection.yamlのファイル名を渡すことでgem_rbs_collectionからインストールしたrbsを認識します。以下の様に記述します。

target :app do
  check "lib"
  signature "sig"

  collection_config "rbs_collection.yaml"
end

これで、大体書くまでの事前準備は完了です。後はsigディレクトリ以下にゴリゴリと型を書いていくだけです。

neovimでrbsを書くために

実際型を書いていくと、逐一ターミナルでsteep checkとか実行してエラーを確認するのは面倒臭くなります。またrbsrubyコードの外側にあるためファイルの切り替えも頻繁に行うことになります。そのため、それらを支援し更に型を書くことで得られる恩恵を享受できる様にエディタを設定しておかないと、いまいち旨味がありません。

rbsファイルとの切り替え

rbsとの切り替えはファイル名のパターンでファイルを切り替えられるvim pluginが昔から色々あるので、それを設定しておくと良いでしょう。vim-altrとかother.nvimなどが利用できます。

自分は最近other.nvimを利用しています。設定内容は以下の様な感じです。(最近neovimのプラグインluaで書かれていることが多く設定もluaで行います)

      local rails_controller_patterns = {
        { target = "/spec/controllers/%1_spec.rb", context = "spec" },
        { target = "/spec/requests/%1_spec.rb", context = "spec" },
        { target = "/spec/factories/%1.rb", context = "factories", transformer = "singularize" },
        { target = "/app/models/%1.rb", context = "models", transformer = "singularize" },
        { target = "/app/views/%1/**/*.html.*", context = "view" },
      }
      require("other-nvim").setup({
        mappings = {
          {
            pattern = "/app/models/(.*).rb",
            target = {
              { target = "/spec/models/%1_spec.rb", context = "spec" },
              { target = "/spec/factories/%1.rb", context = "factories", transformer = "pluralize" },
              { target = "/app/controllers/**/%1_controller.rb", context = "controller", transformer = "pluralize" },
              { target = "/app/views/%1/**/*.html.*", context = "view", transformer = "pluralize" },
            },
          },
          {
            pattern = "/spec/models/(.*)_spec.rb",
            target = {
              { target = "/app/models/%1.rb", context = "models" },
            },
          },
          {
            pattern = "/spec/factories/(.*).rb",
            target = {
              { target = "/app/models/%1.rb", context = "models", transformer = "singularize" },
              { target = "/spec/models/%1_spec.rb", context = "spec", transformer = "singularize" },
            },
          },
          {
            pattern = "/app/services/(.*).rb",
            target = {
              { target = "/spec/services/%1_spec.rb", context = "spec" },
            },
          },
          {
            pattern = "/spec/services/(.*)_spec.rb",
            target = {
              { target = "/app/services/%1.rb", context = "services" },
            },
          },
          {
            pattern = "/app/controllers/.*/(.*)_controller.rb",
            target = rails_controller_patterns,
          },
          {
            pattern = "/app/controllers/(.*)_controller.rb",
            target = rails_controller_patterns,
          },
          {
            pattern = "/app/views/(.*)/.*.html.*",
            target = {
              { target = "/spec/factories/%1.rb", context = "factories", transformer = "singularize" },
              { target = "/app/models/%1.rb", context = "models", transformer = "singularize" },
              { target = "/app/controllers/**/%1_controller.rb", context = "controller", transformer = "pluralize" },
            },
          },
          {
            pattern = "/lib/(.*).rb",
            target = {
              { target = "/spec/%1_spec.rb", context = "spec" },
              { target = "/sig/%1.rbs", context = "sig" },
            },
          },
          {
            pattern = "/sig/(.*).rbs",
            target = {
              { target = "/lib/%1.rb", context = "lib" },
              { target = "/%1.rb" },
            },
          },
          {
            pattern = "/spec/(.*)_spec.rb",
            target = {
              { target = "/lib/%1.rb", context = "lib" },
              { target = "/sig/%1.rbs", context = "sig" },
            },
          },
        },
      })

      local wk = require "which-key"
      wk.register({
        ["<leader>o"] = {
          name = "+Other",
        },
      })
      vim.keymap.set("n", "<F3>", "<cmd>OtherClear<CR><cmd>:Other<CR>")
      vim.keymap.set("n", "<leader>os", "<cmd>OtherClear<CR><cmd>:OtherSplit<CR>")
      vim.keymap.set("n", "<leader>ov", "<cmd>OtherClear<CR><cmd>:OtherVSplit<CR>")

F3を押すとこんな感じでポップアップが出ます。すぐにrbsに移動できて便利です。

SteepのLanguage Serverをnvimで使う

RubyKaigi 2022のデモでも紹介されていましたが、SteepにはLanguage Serverが実装されていてエディタ上に直接型エラーを表示したり、メソッドの型シグネチャを補完候補に表示したりできます。VSCodeでデモが行われていましたが、neovimでも十分に実現可能です。多少設定が必要ですが、そのための流れを紹介していきます。

まず、以下のプラグインをインストールしましょう。インストール方法は各自好きなパッケージマネージャーを利用してください。私は最近はpacker.nvimを利用しています。

大体この辺りがあればOKです。Rubyに限らずその他のLanguage Serverも利用したい場合は、mason.nvimmason-lspconfig.nvimもインストールしましょう。これらはLanguage Serverのインストーラーとそれらをnvim-lspconfigを使って動かす際に設定しやすくしてくれるフック機構を提供してくれるプラグインです。

一応、Steepもmasonでインストールできるんですが、プロジェクトの外部にインストールするよりBundlerを使ってインストールした方が扱い易いので、私はRubyに関してはmasonは活用していません。

実際にSteepのLanguage Serverを利用する設定は以下の様になります。 (自分の設定から関係する箇所だけ抜き出したので、微妙に間違ってるかもしれない……)

vim.keymap.set("n", "[d", vim.diagnostic.goto_prev, {silent = true})
vim.keymap.set("n", "]d", vim.diagnostic.goto_next, {silent = true})

-- The nvim-cmp almost supports LSP's capabilities so You should advertise it to LSP servers..
local capabilities = vim.lsp.protocol.make_client_capabilities()
capabilities = require("cmp_nvim_lsp").update_capabilities(capabilities)

-- Use an on_attach function to only map the following keys
-- after the language server attaches to the current buffer
---@diagnostic disable-next-line: unused-local
local on_attach = function(client, bufnr)
  local bufopts = { noremap = true, silent = true, buffer = bufnr }
  vim.keymap.set("n", "gD", vim.lsp.buf.declaration, bufopts)
  vim.keymap.set("n", "gd", "<cmd>Lspsaga peek_definition<CR>", bufopts)
  vim.keymap.set("n", "gh", "<cmd>Lspsaga hover_doc<CR>", bufopts)
  vim.keymap.set("n", "gs", "<cmd>Lspsaga lsp_finder<CR>", bufopts)
  vim.keymap.set("n", "gi", vim.lsp.buf.implementation, bufopts)
  vim.keymap.set("n", "gr", vim.lsp.buf.references, bufopts)
  vim.keymap.set("n", "gt", vim.lsp.buf.type_definition, bufopts)
  vim.keymap.set("n", "<C-k>", vim.lsp.buf.signature_help, bufopts)
  vim.keymap.set("n", "<space>wa", vim.lsp.buf.add_workspace_folder, bufopts)
  vim.keymap.set("n", "<space>wr", vim.lsp.buf.remove_workspace_folder, bufopts)
  vim.keymap.set("n", "<space>wl", function()
    print(vim.inspect(vim.lsp.buf.list_workspace_folders()))
  end, bufopts)
  vim.keymap.set("n", "<space>rn", "<cmd>Lspsaga rename<CR>", bufopts)
  vim.keymap.set("n", "<F6>", "<cmd>Lspsaga rename<CR>", bufopts)
  vim.keymap.set("n", "<space>ca", "<cmd>Lspsaga code_action<CR>", bufopts)
  vim.keymap.set("v", "<space>ca", "<cmd><C-U>Lspsaga range_code_action<CR>", bufopts)
  vim.keymap.set("n", "<space>cd", "<cmd>Lspsaga show_line_diagnostics<CR>", bufopts)
  vim.keymap.set("n", "[e", "<cmd>Lspsaga diagnostic_jump_next<CR>", bufopts)
  vim.keymap.set("n", "]e", "<cmd>Lspsaga diagnostic_jump_prev<CR>", bufopts)
  vim.keymap.set("n", "[E", function()
    require("lspsaga.diagnostic").goto_prev({ severity = vim.diagnostic.severity.ERROR })
  end, bufopts)
  vim.keymap.set("n", "]E", function()
    require("lspsaga.diagnostic").goto_next({ severity = vim.diagnostic.severity.ERROR })
  end, bufopts)
  vim.keymap.set("n", "<space>cf", function()
    vim.lsp.buf.format({ async = true })
  end, bufopts)
end

local lspconfig = require "lspconfig"

-- SteepのLanguage Serverを起動するための設定
-- デフォルトの設定をいくつか上書きしている
lspconfig.steep.setup({
  -- 補完に対応したcapabilitiesを渡す
  capabilities = capabilities,
  on_attach = function(client, bufnr)
    -- LSP関連のキーマップの基本定義
    on_attach(client, bufnr)
    -- Steepで型チェックを再実行するためのキーマップ定義
    vim.keymap.set("n", "<space>ct", function()
      client.request("$/typecheck", { guid = "typecheck-" .. os.time() }, function()
      end, bufnr)
    end, { silent = true, buffer = bufnr })
  end,
  on_new_config = function(config, root_dir)
    config.cmd = {"bundle", "exec", "steep", "langserver"}
    return config
  end,
})

-- ここからnvim-cmpの補完設定
local cmp = require "cmp"
cmp.setup({
  sources = cmp.config.sources({
    { name = "nvim_lsp" },
    { name = "nvim_lsp_signature_help" },
  }),

  mapping = cmp.mapping.preset.insert({
    ["<C-e>"] = cmp.mapping.abort(),
    ["<CR>"] = cmp.mapping.confirm({ select = true }), -- Accept currently selected item. Set `select` to `false` to only confirm explicitly selected items.
  }),
})

基本的には他のLanguage Serverでもほとんど設定内容は変わらないのですが、Steep特有の設定としてはbundle execを経由する様にコマンド定義を上書きしていることと、$/typecheckという独自のメッセージをリクエストすることで型検査を再実行するためのキーマップを定義している所です。

動作サンプル

実際に手元のプロジェクトに型を書いてみてneovimで型検査をしているデモです。今日紹介したもの以外にも色々プラグインが入ってます。

型エラーが行の横に表示されていて、修正して再実行すれば消えているのが分かります。

また補完候補にシグネチャやドキュメントを出したり、ホバーで型定義を表示したりrbsをプレビューしたりも出来ます。VSCodeに負けてないなって感じがしますね。

型を書くためのドキュメント

基本はrbsリポジトリの方を見ましょう。細かい書式は https://github.com/ruby/rbs/blob/master/docs/syntax.md を見るのが良さそうです。

またSteepには独自の拡張があり、アノテーションという仕組みでローカル変数の型を明示することが出来ます。ドキュメントは https://github.com/soutaro/steep/blob/master/manual/annotations.md にあります。

実際に書いてみて

今回、試しに型を書いてみたのは、自分が昔作ったcrono_triggerというgemです。コードの規模はそれ程では無いですが、ActiveRecordを利用していてRailsの機能をバリバリ使っているgemです。現実のプロジェクトに近そうなので対象にしてみました。

実際にどういう風にSteepfileを設定して型を書いたのかは https://github.com/joker1007/crono_trigger/tree/steep にあります。 (まだブランチなのでその内マージして消えてるかも)

書いてみて感じたのは、Railsに型を付けるのはやはり中々容易ではないということを実感しました。

特にこのgemはActiveSupport::Concernを利用していて、ActiveRecord::Baseのサブクラスにincludeされることが前提になっているコードです。更に内部にClassMethodsモジュールなどがあり、そこではActiveRecord::Baseのシングルトンクラスで使えるメソッドの存在が前提になっています。

当然これらを自動的に判別することは現時点では出来ないため、どうにかして型定義を引っ張ってきたいのですが、現状だと良い方法がよく分かりません。特にシングルトンクラスのメソッド定義をgem_rbs_collectionから引っ張ってくる方法は現時点では多分無いんじゃないだろうか。

そのため、これらの型エラーがノイズになって非常に沢山出力されます。今回、gem_rbs_collectionを参照して型定義をコピーしてきて自分で定義することで型エラーを可能な限り削除し、SteepのDiagnosticモードを緩くするlenient presetを使ったり、どうにもならないファイルを無視することでノイズにならない程度に型エラーを削減することが出来ました。

今の実行結果はこんな感じ。

Detected 15 problems from 6 files

これなら役に立たせることは出来そうです。実際いくつかおかしいなと思う箇所を発見することができました。

しかし、現状のやり方で書いた型定義を、他のコードから参照した時にActiveRecord自体の型定義と衝突しないかとか、scopeで定義したメソッドで返す型をActiveRecord::Relationにするべきか_ActiveRecord_Relation[Model, Key]にするべきかとか、色々とよく分かっていない点があります。 またメソッドのシグネチャ自体もまだまだ発展途上です。

今回書いていて、自分でもすぐに組込みライブラリ用の型シグネチャが足りていないケースなどが見つかったので、プルリクエストを出したりしていました。こうやってコツコツとプルリクエストを出していくことでちょっとづつ充実させていくことが出来るでしょう。

しばらくRubyから離れていてあんまり活動できていなかったんですが、Kaigi Effectの結果またやる気が戻ってきてちょっとしたOSS活動に繋げることが出来たので、やはりRubyKaigiという現場は良いものだなと改めて感じました。

まだ多少ハードルはあると思いますが、型の恩恵を得るのに十分な準備は整いつつあると思います。RubyKaigiで登壇者の方々が言っていた様に、コミュニティの力は非常に重要だと思うので、とりあえず触ってみて皆でプルリク出していきましょう。

RubyKaigi 2022に現地参加してきた

今回、3年ぶりにオフライン開催されたRubyKaigi 2022に参加するために津まで行ってきました。今回はスポンサーでもスピーカーでもなく完全な一般参加者です。

(自分のTwitterは飯テロの画像なども含むため、センシティブなものを含む可能性があるチェックを付けているので画像を見る場合は自分で開いてくださいw)

久しぶりにオフラインの大きなイベントに参加し、3年以上ぶりにRubyコミュニティの皆と再開しました。皆マスクしてるから最初は「えっと、この人は……?」って感じで探り探りなところもありましたが、3日目ぐらいには大分慣れてきて、多くの人の健在を確認することができて、本当に楽しく過ごすことができました。

特に、WASMやYJITのめちゃくちゃ高度な話を聞いて「なるほどわからん」ってなった後に、参加者と感想を言い合いながら飲み会をやるのが、こんなに楽しいってことを完全に忘れていた自分に気付いて、失っていたものの大きさを改めて実感したRubyKaigiでした。

会場で一緒にTRICKを見てザワザワしてくる空気感とか、その場に居ることで楽しさが増すということは一杯ありましたね。

他にも、最近gRPCを使うかどうかを検討している中で、クックパッドさんのスポンサーブースでukstudioさんとgRPCを使ったマイクロサービスについて色々と質問して話し込んだりできたのも、オフラインカンファレンスが生んだ「廊下」のおかげだったなあと思います。

最後にチーフオーガナイザーの松田さんが、この場を再び失いたくはないから走り続けるよ、と言ってくれたことに非常にグッときました。 まだどうなるか分かりませんが、来年もあるぞという希望を持って過ごすことができるのはとてもありがたいことです。 最近、自分が全然Rubyに触らない生活をしているので、来年のRubyKaigiにもCFPに応募できるのかどうか怪しいところがあり、我ながら情けないなと感じるところはありますが、コミュニティの一員としてせめて参加ぐらいは出来るといいなあと思っています。

Kaigi後

結局、最終日朝まで飲んで、何とか3時間弱寝て、ヘロヘロになりつつ鳥羽に向かいました。 子供の頃に何度か行ったことがあった鳥羽水族館に行きたかったので。

鳥羽駅近くのサザエストリートで伊勢海老のお造りとサザエの壺焼きを食べました。

横に座った団体の方と少し雑談をしていたら味噌汁を奢ってもらって、二日酔い(というか最早当日酔い)でヘロヘロの自分にめちゃくちゃ染みました。最高ですね。

本当は体力が普通なら、水族館の後に伊勢神宮の方まで行ったり、伊勢角屋ビールが飲めるところまで行きたかったのですが、完全に体力の限界と、晴天による暑さで気持ち悪くなってきたので、諦めて昼過ぎにそのまま帰る電車に乗りました。 それでも十分楽しめたと思います。また戻ってきたい観光地が増えた。

帰宅したら、もうマジで体力が0で風呂入って即寝るしか出来なかった。一応、今日ある程度仕事している自分は偉いと思う……。

という訳で、今回のRubyKaigiも最高に楽しかった。Kaigiオーガナイザー、スタッフの皆様、難しい状況の中オフラインのカンファレンスイベントを開催してくれて本当にありがとうございます!!

ラストキーノートについて

余りに内容が分からん自分自身に割とムカついてきたので、どっかで自分なりに勉強して分かる範囲をどっかにまとめようかなと思ってます。まあブログにするかどうかは分かりませんが、やる気ある内にやらんとそのままになりそう。

とりあえず、CPUの動き方と機械語について復習して、LBBVの概念についても理解できるかどうかはさておき論文を眺めるぐらいはしてみよう。

neovimの棚卸をして、LSP対応やらファインダーの変更やら色々やった

最近Javaばかり書いてるもんでInteliJしか触ってなくて、neovimを放置してたのだが、やっと重い腰を上げて今風なエディタにするべく、全プラグインを見直しつつ設定を刷新した。

init.lua

まず、普通のvimと共通の設定を作るのを諦めて、設定ファイルをinit.luaに全面的に書き直した。今となってはneovim以外を使うことは余り考えられないし、それが使えない状態だったら素のvimでええわという感じで割り切った。

設定をinit.luaに移行する時にやらなければいけないのは基本的に以下の3つ。

  • set number みたいな設定値をvim.opt.number = trueという形に置き換える。set nonumberみたいなのはvim.opt.number = falseになる。
  • keymapの設定はvim.keymap.set('n', 'L', '<cmd>tabnext<CR>', {silent = true})の様に書く。これはnnoremap <silent> L :tabnext<cr>を意味する。
    • 動作を設定する箇所にluaのfunctionを直接マッピングすることも出来る。
  • letを利用した変数代入は、vim.g.hoge = "fuga"という形で設定できる。これはlet g:hoge = "fuga"を意味する。

もし、元々functionとかautocmdなどを使っていて設定が結構ややこしい場合は移行がかなり面倒臭くなる。 そういった時は最悪vim.cmd(string)の形で括ってしまえば、vimscriptがそのまま動く。s:スコープを使っているとスコープが切れてしまうので分かりにくくなるが、それ以外はそのままコピーすればどうにかなる。

その他の書き方については、nvim-lua-guide-ja/README.ja.md at master · willelz/nvim-lua-guide-ja · GitHub を見ると参考になる。

vimの中でhelpを見たい時は、:help api もしくは :help luaってやると使える関数のリファレンスが見つかる。

プラグイン管理の移行

元々、dein.vimを利用していたのだが、luaベースのconfigに置き換えるにあたって、packer.nvimに置き換えた。

github.com

最近のneovimのプラグインは、luaで書かれているものが多く、luasetupメソッドを読んでconfigurationを行うタイプの物が多い。 なので、プラグイン読み込み後のhookなどを直接luaで書けるpackerが都合が良いだろうと判断した。

多分、今のneovim界隈では一番利用者が多いプラグイン管理システムだと思う。

Fuzzy Finder

ファイルのFuzzy Finderもずっとdeniteを使っていたが、telescope.nvimに移行した。

github.com

動作が十分高速で、preview表示してくれて、デフォルトでMRUのfinderがあるのが良い。

後、telescopeの拡張として、 telescope-file-browserを入れている。

telescopeの画面の中でファイル作成したり、削除したりできる様になる。

しかし、telescopeには一個大きな不満があり、複数ファイルを選択して同時に一気にbufferに展開するということが出来ない。まあ、custom action書いて頑張れば出来そうな気はするのだが、何故これがデフォルトで出来ないのか良く分からない。

LSP対応

元々、LanguageClient-neovimを入れていたのだが、builtinのLSPクライアント(nvim-lsp)に変更した。 neovimのLSP関連の設定は結構ややこしいというか、一回覚えてしまえば、そんなに難しくないのだが、VSCodeほど何も考えんでもオッケーみたいな感じでは無い。

現状、nvim-lspを使う上で必須のプラグインがいくつかある。

nvim-lspconfigは、各種Language Serverのconfig presetが用意されているプラグインで、代表的なLanguage Serverならsetupメソッドを呼ぶだけで使える様になる。

mason.nvimはLanguage Serverやformatter, linterのインストーラーで、これも大体メジャーなLanguage Serverは網羅されている。nvim-lsp-installerはdeprecatedになったらしい。

mason-lspconfig.nvimは、masonでインストールしたLanguage Serverを認識して、setupを実行しやすくするためのhookを提供してくれる。

補完に関しては、 nvim-cmp を使うことにした。

補完ソースとしては以下のものを使っている。

  • cmp-path (ファイルパス補完)
  • cmp-buffer (開いているbufferからキーワード補完)
  • cmp-nvim-lsp (LSPから補完)
  • cmp_luanip (luasnipのスニペット展開候補を補完)
  • cmp-git (GitHubのメンションやissue番号の補完)
  • cmp-cmdline (neovimのコマンド入力時に自動補完が効く)

最終的には、こんな感じの設定でLSPを使った補完が効く様になる。

local cmp = require'cmp'
cmp.setup {
  snippet = {
    expand = function(args)
      require'luasnip'.lsp_expand(args.body)
    end
  },

  sources = cmp.config.sources({
    {name = 'nvim_lsp'},
    {name = 'nvim_lsp_signature_help'},
    {name = 'path'},
    {name = 'buffer'},
    {name = 'nvim_lua'},
    {name = 'luasnip'},
    {name = 'cmdline'},
    {name = 'git'},
  }),
}

-- The nvim-cmp almost supports LSP's capabilities so You should advertise it to LSP servers..
local capabilities = vim.lsp.protocol.make_client_capabilities()
capabilities = require('cmp_nvim_lsp').update_capabilities(capabilities)

local lspconfig = require('lspconfig')
require('mason-lspconfig').setup_handlers {
  function(server_name)
    lspconfig[server_name].setup {
      capabilities = capabilities,
    }
  end,
}

diagnostics

元々ale.vimを利用していたのだが、diagnostics表示もLSPにまとめてしまおうと思って、これも移行することにした。

LSPに対応していないlinterとかformatterの通知をLSPに対応させるラッパーみたいなLanguage Serverが存在していて、代表的なのがnull-lsefm-langserverである。 これらを使って任意のコマンドを実行し結果のフォーマットを調整することで、何でもLSPの上に乗せてエディタに表示させることが出来る。

自分はとりあえずnull-lsを入れてみた。設定は雑にこんな感じ。

require('null-ls').setup({
  capabilities = capabilities,
  sources = {
    require('null-ls').builtins.formatting.stylua,
    require('null-ls').builtins.diagnostics.rubocop.with({
      prefer_local = "bundle_bin",
      condition = function(utils)
        return utils.root_has_file({".rubocop.yml"})
      end
    }),
    require('null-ls').builtins.diagnostics.eslint,
    require('null-ls').builtins.diagnostics.luacheck.with({
      extra_args = {"--globals", "vim", "--globals", "awesome"},
    }),
    require('null-ls').builtins.diagnostics.yamllint,
    require('null-ls').builtins.formatting.gofmt,
    require('null-ls').builtins.formatting.rustfmt,
    require('null-ls').builtins.formatting.rubocop.with({
      prefer_local = "bundle_bin",
      condition = function(utils)
        return utils.root_has_file({".rubocop.yml"})
      end
    }),
    require('null-ls').builtins.completion.spell,
  },
})

rubocopなんかはsolargraphでもリポートしてくれるのだが、プロジェクトルートに.rubocop.ymlが無い時は動かさないとか、細かい設定が出来ないのでnull-lsで実行することにしている。

後は、表示を見易くするために、trouble.nvimを入れた。 workspace全体の警告とかをサクっと一覧できる。

statusbar

元々はairlineを使っていたが、lualine.nvimに乗り換えた。実際、あんまり変わってないw

github.com

その他プラグイン

元々使っていたプラグインの代替になる様なneovim向けのプラグインが見つかったら、軒並置き換えていった。

実際によく使うものをいくつか紹介する。

nvim-treesitter

tree-sitterというrustで書かれた汎用言語パーサがあり、それを使ってSyntax Highlightを行うプラグイン。 デフォルトのvimscriptによるハイライトよりもかなり細かく調整できて高速らしい。

github.com

ただ、これに対応しているColorSchemeを使わないとハイライトが完全に出ない。なので、入れる時には合わせて新しめのColorSchemeに移行するのが良い。

個人的に気に入ってるのは、 zephyrnightfoxあたり。

nvim-ts-rainbow

treesitterを使ってrainbow bracketをやってくれるプラグイン

github.com

vim-matchup

treesitterを使って、do endとかbegin endみたいなキーワードペアもハイライトしてくれる。

github.com

nvim-treesitter-endwise

autopairのdo end版。RuibyとかLuaで自動的にendを埋めてくれる。

github.com

nvim-autopair

対応する)とか]を自動で埋めてくれる。これもtreesitterに対応していて、treesitterのノードに合わせて有効無効が設定できる。

github.com

which-key.nvim

vim-which-keyのlua版。キーマップの入力途中で、次に何のキーを入力すると何が動くかをメニュー上に表示してくれるプラグイン

github.com

色々mappingを設定すると忘れるので便利。 ちゃんと設定するのは面倒だが、ノー設定でもそれなりに使える。

Comment.nvim

コメントのトグルをしてくれるプラグインのnvim版。nerd-commenterから移行した。別に元々困ってなかったが、treesitterに対応していてtreesitterのパーサ情報からコメント記法を取得してトグルしてくれるのがモダンな感じ。

github.com

nvim-web-devicons

とりあえず入れておくと、色々なプラグインがいい感じにアイコンを出してくれる。

github.com

vim-illuminate

カーソルが当たってるワードと同じワードに下線を引いてくれる。ちゃんとLSPやtreesitterを参照して言語構造を認識した上で下線を引いてくれる。

github.com

nvim-surround

surround.vimのneovim版。()とか[]とかをサクっと置換したりする時によく使う。surround.vimより良くなった点は、変更操作中の対象になっている()などがハイライトされる様になったこと。操作してる時に見た目が分かり易くなった。

github.com

gitsigns.nvim

gitgutterのneovim版。gitで変更があったらsignを行番号の横に表示してくれるやつ。gitgutterより高速だと思う。後virtual textでその行のblameを表示してくれる機能は地味に便利。

github.com

diffview.nvim

gitのdiffをいい感じに一覧で出してくれるプラグイン。diffをまとめて見るなら、fugitiveより見易い。

github.com

nvim-tree.lua

neovim内ファイラー。

github.com

indent-blankline.nvim

インデント表示を見易くするやつ。vim-indent-guidesから移行した。

github.com

他にも色々入れ替えたり追加したりしたのだが、とりあえずこんなもんにしておこう。 そんな入れ替えんでいいやろ、みたいなものもせっかくなんでガンガン棚卸して入れ替えたので、一気にモダンになったし、プラグインの動作が割と早くなった。

というか、一回やり出すと止まらなくなって、1週間ぐらいずっとneovimの設定を弄って遊んでいたのが本当の所だったりするw

Kafka入門 第2回 「Kafka Streamsを使ったストリーム処理アプリケーション開発」

以前にAWS Summitで似た様な話をしているので、こちらのブログとかも参考にしてください。

https://dev.classmethod.jp/articles/aws-summit-online-2020-cus-47/

今回の内容も、公式のドキュメントを全部読むなら大体書いてあることなので、既にある程度知っているという方は、後半の「StateStoreの実態」以降の部分だけを読んでもらえれば特に注意すべきことが書いてあります。

参考 https://docs.confluent.io/ja-jp/platform/6.2.0/streams/index.html https://docs.confluent.io/platform/7.0.1/streams/index.html

ストリームプロセッシングとは

イベントストリームに対して、データが到着するたびに処理を行い、それが延々と続いていく様なアプリケーションを指します。 Kafkaのエコシステムにおいては、Kafka BrokerにConsumerとして接続して、無限ループでpollingを行い、データが到達する度に短いインターバルで処理を行うアプリケーションのことです。

複数のレコードを使って集計処理を行うケースもあるが、ある一定期間ごとに集計ウインドウを区切った上で、一件ごとに集計結果をアップデートするか、バッファリングしておいて一定のインターバルをおいて実行する処理で集計を行う、といった形を取る。

ステートレスかステートフルか

ストリームプロセッシングの処理内容は大きくステートレスかステートフルかに分けられます。

ステートレスな処理とは、あるイベントレコードが到着した時に、そのレコードだけで処理が完結する処理のことです。 レコードAをレコードBの形に変換して別のトピックや別のデータストアに転送する処理などがその典型です。

一方、ステートフルな処理とは、到達したイベントレコードやそれを基に生成したデータを一定期間保持しておいて、それと組み合わせて結果を生成する処理のことです。 典型的なものでは、イベントの数を集計して合計や平均やヒストグラムを算出したり、処理効率のためにバッファリングしてまとめて処理したり、別のストリームやデータストアとデータを結合してデータエンリッチメントを行うといったものが挙げられます。

ステートフルな処理においては、一定量のレコードを格納しておくデータストアが別途必要になります。それをStateStoreと呼びます。ストリームプロセッシングにおけるStateStoreには重要な考慮ポイントがいくつか存在しますが、それについては後述します。

ステートフルアプリケーションにおけるデータストア

ストリームプロセッシングは、あるデータが到達したらその都度処理が行われるため、何らかのデータストアを利用すると、データが来る度にデータの取得や保存を行うことになる。

そのため重要な要素として低レイテンシであることが求められます。データ量によってはRedisの様なシンプルで高速なKVSであってもネットワークレイテンシが馬鹿にならないため、十分とは言えない場合があります。

そこでストリームプロセッシングでは、基本的に各処理ノードのローカルにデータストアを保持してレイテンシを低く保つという手法を採用します。

後述するKafka StreamsやApache Flinkなどのストリームプロセッシングフレームワークでは、RocksDBが採用されています。 RocksDBはアプリケーションに組込むタイプのKVSで、RocksDBを各ノードのローカルで動かすことで低レイテンシな状態保持を実現します。

各ノードのローカルで動かすということは、ノードが不慮の事故でダウンし復帰不能になった場合データが失われる危険性があることを意味します。 これを避けるためノードに依存しないデータ永続化の仕組みも別途必要です。この仕組みにおいてはネットワーク通信のオーバヘッドは避けられないため、ある程度のバッファリングが必要になります。

また、Kafkaを利用したストリーミングアプリケーションでは、スケーラビリティを確保するためにアプリケーションを分散して複数ノードで動作させることもよくあります。 そういったケースでは、ノード数の増減に応じて処理するパーティションの割り当てが変わる可能性があります。 第1回で解説した様にKafka Consumerは同一のConsumer Groupでは一つのパーティションを処理できるのは一つのクライアントだけです。台数が増減するとその割り当てが再配置されることになり、処理対象が別ノードに移ることを考慮しなければいけません。 処理が別ノードに移ると、それまでに各ノードのローカルで保持していたデータをどうにかして新しく処理が割り当てられたノードに移し替えないと、今迄に保持していたStateStoreのデータが利用できなくなります。例えばノードが増えた途端に集計済みのカウントがいきなり0に戻ったりすることになる訳です。 これを回避するために処理ノードの変遷に合わせてStateStoreを再配置する仕組みが必要不可欠です。

Kafka Streamsとは

Kafka StreamsはApache Kafkaの開発プロジェクトで公式に提供されているストリームプロセッシングフレームワークです。Javaで実装されています。

前段で解説した様にステートフルなアプリケーションを実装しようと思うと、その状態管理に結構複雑な仕組みが要求されます。 Kafka Streamsを利用するとフレームワークがこういった複雑さの面倒を見てくれます。またConsumerクライアントのpollingループや別のトピックにレコードを転送するためのProducerクライアントの集約なども面倒を見てくれるため、ステートレスなアプリケーションであっても大幅にコードを削減できます。

基本的にはここで説明することは Confluent社の提供しているドキュメントに概ね書かれていることになります。 少し古いですが日本語のドキュメントもあるので、詳細を知りたい方はそちらを参照して隅々まで読み込むのが良いでしょう。

https://docs.confluent.io/ja-jp/platform/6.2.0/streams/concepts.html

主な特徴

Kafkaの開発で主要な役割を担っているConfluent社のドキュメントによると以下の様な特徴があります。

機能 - アプリケーションに高い拡張性、弾力性、分散性をもたらし、フォールトトレラントを実現 - 「厳密に 1 回」の処理セマンティクスをサポート - ステートフル処理とステートレス処理 - ウィンドウ、結合、集約を使用したイベント時処理 - ストリームとデータベースの世界を 1 つにする Kafka Streams の対話型クエリ をサポート - 宣言型で関数型の API と、下位レベルの 命令型の API を選択でき、高い制御性と柔軟性を実現

軽量である - 導入へのハードルが低い - 小規模、中規模、大規模、特大規模のいずれの事例にも対応 - ローカル開発環境から大規模な本稼働環境にスムーズに移行 - 処理クラスターが不要 - Kafka 以外の外部依存関係がない

cf. https://docs.confluent.io/ja-jp/platform/6.2.0/streams/introduction.html

この中でも私が最も重要だと思うポイントが、Kafka以外の外部依存関係がない、という点です。こういった分散処理フレームワークの場合、YARNの様なリソースマネージャーやその他のデータストアが必要になるケースもしばしばありますが、Kafka Streamsにはそれが必要ありません。 これは導入のハードルを大きく下げてくれます。

Tutorialコード

Kafka Streamsでアプリケーションを書くには以下の様なコードを書きます。 (公式ドキュメントからの引用を少し改変したものです)

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.StreamsBuilder;
import org.apache.kafka.streams.processor.Topology;

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.  We will cover this in detail in the subsequent
// sections of this Developer Guide.

StreamsBuilder builder = new StreamsBuilder();
builder
    .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
    .peek((k,v) -> logger.info("Observed event: {}", v))
    .mapValues(s -> s.toUpperCase())
    .peek((k,v) -> logger.info("Transformed event: {}", v))
    .to(outputTopic, Produced.with(stringSerde, stringSerde));
Topology topology = builder.build();
//
// OR
//
topology.addProcessor(...); // when using the Processor API

// Use the configuration properties to tell your application where the Kafka cluster is,
// which Serializers/Deserializers to use by default, to specify security settings,
// and so on.
Properties props = ...;

KafkaStreams streams = new KafkaStreams(topology, props);
streams.start(); // non blocking

この様にKafka StreamsはDSLを繋げたりProcessor APIと呼ばれる特定のインターフェースに則ったJavaクラスを実装するだけで、Kafkaに接続するConumerやProducerの構築、処理スレッドの立ち上げなどの面倒を見てくれます。

Kafka Streamsのアプリケーションモデル

Kafka StreamsはSource Node, Processor Node, Sink Nodeを組み合わせてTopologyという単位を作ることでアプリケーションを構築します。 それぞれどういう意味を持つのか説明していきます。

Topology

Topologyという単語はベースとしては数学用語で位相幾何学を指します。私は数学の徒ではないので詳しくは分かりませんが、物の形状が持つ性質に焦点を当てた学問の様です。 そこから派生してネットワーク・トポロジーというネットワークグラフがどういった構成をしているかの形状パターンを指す用語があります。 Kafka StreamsにおけるTopologyはこのネットワーク・トポロジーと同じものと考えて良いと思います。

Kafka Streamsは特定の役割を持ったノードを接続してグラフ構造を構築しTopologyを形成して動作します。 このTopologyは2つ以上のサブトポロジーによって形成されている場合があります。

streams-architecture-topology.jpg (182.2 kB) (Confluent Documentationから引用)

この様なグラフ構造を1つ以上保持するオブジェクトがKafka StreamsにおけるTopologyです。

2つ以上のサブトポロジーがどうやって生成されるかというと、一つのアプリケーションが複数のソーストピックからデータを取得する様に定義されていて、またそのソーストピックに依存しているProcessor NodeやSink Nodeに直接依存関係が無い場合です。 つまりSource Nodeから辿って一つのグラフが構築される様に依存関係が繋がっている範囲が、一つのサブトポロジーです。 上記の例ではSource Nodeが複数ありますが、下のProcessorが両方のSource Nodeに依存しているためグラフが一つに繋がっているのが分かります。こういった場合はサブトポロジーは1つになります。 もし、左側のSource Nodeがその他の全体と独立してどことも繋がっていないProcessor Nodeにだけ依存している場合は、2つのサブトポロジーが出来ることになります。

Source Node

Kafkaトピックからデータを取得する処理を行うノードをSource Nodeと呼びます。 Kafka Streamsには3種類のデータの取得パターンが存在します詳細はDSLの項で解説します。 上記のサンプルコードのDSLではstreamがそれにあたります。

Processor Node

Source Nodeから受け取ったデータを実際に処理するノードを指します。 実際に開発者が実装するのは、ほぼこのノードになります。 Processor Nodeは複数繋げることが出来ます。処理行程を分割することで見通しの良い実装が可能になります。 上記のサンプルコードのDSLではpeekmapValuesがそれにあたります。

Sink Node

別のKafkaトピックにデータを送信する処理を行うノードを指します。 上記のサンプルコードのDSLではtoがそれにあたります。

この様にSource Nodeが何らかのトピックからデータを取得し、Processor Nodeで何らかの処理を行いデータを加工したり一時的に保存したり別のデータストアに書き出したりして、そして必要であれば、そのデータを更に別のトピックに書き出して他の処理に繋げていく。 Kafka Streamsのアプリケーションはこうやって構築されます。

Task

Kafka Streamsは当然複数のマシンで分散処理が出来る様にデザインされています。 Kafka Streamsではサブトポロジーがpolling対象としているソーストピックのパーティション数に対応したタスクという単位でクライアントに処理を割り当てます。

例えば、トポロジーAの中にサブトポロジーA-1とサブトポロジーA-2があり、サブトポロジーA-1がトピックB-1をサブトポロジーA-2がトピックB-2を参照しているとします。 この時、トピックB-1のパーティション数が8でトピックB-2のパーティション数が10であった場合、1_0 〜 1_7というタスクと、2_0 〜 2_10というタスクが作成されます。タスクの総数は18です。 この18タスクを現在Consumer Groupに所属しているクライアントに割り当てます。割り当てのストラテジーにはいくつかパターンがありますが、そういった動作の詳細は次回以降で解説します。

DSL

上記で解説したTopologyを簡単に構築するために、Kafka Streamsには便利なDSLが用意されています。 DSLを使うことで、Lambdaをメソッドチェーンで繋げる様なインターフェースでストリームアプリケーションを簡単に書くことができます。

これに関しては公式のドキュメントを見る方が明らかに早いので、全体を知りたい方はこちらを参照してください。 日本語版は翻訳が追い付いていないせいか結構古いので最新の機能を知りたい場合は英語版を参照してください。

https://docs.confluent.io/ja-jp/platform/6.2.0/streams/developer-guide/dsl-api.html (ja) https://docs.confluent.io/platform/7.0.1/streams/developer-guide/dsl-api.html (en)

以降ではこの後の解説に必要な特に重要なDSLについてのみ解説していきます。

KStream, KTable, GlobalKTable

Kafka StreamsのDSLではトピックからデータを取得する方法として3つのパターンを提供しています。 これら3つの入力を表すオブジェクトに対してメソッド呼び出しを繋げることでDSLを記述します。

KStream

単純なKafkaトピックからの入力ストリームを指します。

StreamsBuilder builder = new StreamsBuilder();

KStream<String, Long> wordCounts = builder.stream(
    "word-counts-input-topic", /* input topic */
    Consumed.with(
      Serdes.String(), /* key serde */
      Serdes.Long()   /* value serde */
    );

この様に入力を指示する時にSerde(Serializer/Deserializer)を指定します。(実際の入力ではDeserializer側しか利用されないが)

このSerdeを通して、Kafka StreamsはJavaオブジェクトとバイト配列を自動的に変換します。

KTable, GlobalKTable

テーブルの様に扱える入力ストリームを指します。 テーブルの様に扱えるというのは、読み込み始めから現在に至るまでのKafka Brokerに入力された全レコードをローカルに保持していて、他のストリームと結合したりKVSの様に利用できることを意味します。

これは入力レコードの状態を保持しているということでもあり、KTableを使うということはステートフルなアプリケーションである、ということでもあります。

KTableは各パーティションの内容は割り当てられたクライアントにしか保持されませんが、GlobalKTableはあるトピックの全パーティションのデータを全ノードが保持し続けます。そのため入力トピックのデータ量に気を付けて利用する必要があります。

GlobalKTable<String, Long> wordCounts = builder.globalTable(
    "word-counts-input-topic",
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
      "word-counts-global-store" /* table/store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Long()) /* value serde */
    );

KTableにはストア名とストアするデータのSerdeが必要です。 このストア名で各ノードにはRocksDBのデータベースファイルが作成され、そこにデータが蓄積されます。 このデータストアをStateStoreと呼びます。 上記の例にあるMaterialized.asというメソッドを利用してストア名やSerdeを切り替えたり、その他引数の渡し方でインメモリのデータストアを使う様に変更することも可能です。

StateStoreに保持されたデータは(インメモリストアでない限りは)プロセスが再起動したとしても維持されますが、パーティションの割り当てが変わって担当するタスクが切り替わると不要になったり、新しく再作成が必要になる場合があります。 不要になったRocksDBのデータは一定期間の後に自動的に削除されます。

GroupBy, GroupByKey

KStreamやKTableを集計したい時にどのキーを基にして集計するかを指示するDSLです。countやaggregateの前提として呼び出しておく必要があります。

KGroupedStream<String, String> groupedStream = stream.groupBy(
    (key, value) -> value,
    Grouped.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.String())  /* value */
  );

groupByを利用すると必ずデータの再パーティションが行われます。再パーティションは、再パーティション用のトピックが自動的に作成され、そのトピック宛にレコードを送信して、再度取得するという処理を行います。 ネットワークやストレージに冗長な負荷がかかるため、再パーティションは極力避けるべきものです。

パーティション化を避けるためにはgroupByKeyを利用します。

KGroupedStream<String, String> groupedStream = stream.groupByKey(
    Grouped.with(
      Serdes.String, /* key */
      Serdes.String())     /* value */
  );

groupByKeyは現在のキーをそのまま使ってグループ化を指示します。この場合は処理の過程でキーが変更されていない限りは再パーティションは行われません。mapなどの変換処理によってキーが変更されている場合はgroupByKeyを使っても再パーティションが実行されてしまいます。

aggregate

DSLの中では最も汎用的な集計処理を実装できるDSLです。 イニシャルの値を生成するイニシャライザ、新しくレコードが届いた時に実行されるアダー、変更前のレコードの情報を利用して実行されるサブトラクター(省略可能)の3つのラムダを渡して集計処理を実装します。

KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
    () -> 0L, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
    (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
    Materialized.as("aggregated-table-store") /* state store name */
    .withValueSerde(Serdes.Long()) /* serde for aggregate value */
);

集計結果を処理ノードのローカルにずっと維持していなければ、次のレコードが届いた時に今の集計値が何なのかを取得できません。そのため、集計処理は必然的にKTableになります。集計処理した結果はDSLが自動的に構築したStateStoreに格納され保持されます。

StateStoreの実態については色々と解説しなければならない要素があるため、この後でまとめて解説します。とりあえず結果がStateStoreという場所に格納されている、ということを覚えておいてください。

windowedBy

一定の時間ごとに集約範囲を区切ることができるDSLです。この機能を利用することで、5分ごとの合計を算出し続けたり、一回のセッションの中に含まれるイベントの数を数えたりといったことが可能になります。 window処理は時間軸に依存した処理なので、レコードのタイムスタンプと保持期間が重要になります。 場合によっては古いタイムスタンプのレコードが遅れて到着することもあり得ますし、古い集計データをいつまでも保持していたらデータが無限に増えてストレージを圧迫します。 そういった状況に対応するため、window処理には保持期間を別途設定できる様になっています。

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
    .aggregate(
      () -> 0L, /* initializer */
      (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
      Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

ウインドウ処理にはその区切り方によって、Tumbling Window, Sliding Window, Hopping Window, Session Windowと現時点で4つの種類があります。 割とややこしいため、 https://docs.confluent.io/ja-jp/platform/6.2.0/streams/developer-guide/dsl-api.html#windowing を参照することをオススメします。

Processor API

Kafka Streamsのlow level APIにあたるもので、上記のDSLは全てこのProcessor APIで実装されています。

端的に言えば、Processor APIは単なるクラス定義として利用できます。 initメソッドにProcessorContextというオブジェクトが与えられるので、それを利用してKafka Streamsの情報を入手したり、StateStoreへの参照を取得したり、後続の処理にデータを送ったりすることができます。 詳細は、 https://docs.confluent.io/ja-jp/platform/6.2.0/streams/developer-guide/processor-api.html を読んでください。

処理の実態はprocessメソッドに実装する様になっており、レコードが到達する度にこの処理が呼ばれます。 また、一定時間ごとにスケジュール処理を行うこともできます。 ProcessorContextl#scheduleメソッドでPunctuatorと呼ばれる処理を登録することで一定期間ごとに自動的に実行できます。 実行間隔のタイムスタンプをどう扱うかについてKafka StreamsはSTREAM_TIMEとWALL_CLOCK_TIMEの2種類を提供しています。それぞれの違いについては上記リンクに詳細がありますので、そちらを参照してください。

以下は、上記のドキュメント(英語版の最新)から引用したコードサンプルです。 重要な箇所はStateStoreを取得している箇所と、スケジュール処理を登録している箇所、そしてcontext.forwardを呼び出して後続にレコードを渡している部分です。

public class WordCountProcessor implements Processor<String, String, String, String> {
    private KeyValueStore<String, Integer> kvStore;

    @Override
    public void init(final ProcessorContext<String, String> context) {
        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
            try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
                while (iter.hasNext()) {
                    final KeyValue<String, Integer> entry = iter.next();
                    context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
                }
            }
        });
        kvStore = context.getStateStore("Counts");
    }

    @Override
    public void process(final Record<String, String> record) {
        final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");

        for (final String word : words) {
            final Integer oldValue = kvStore.get(word);

            if (oldValue == null) {
                kvStore.put(word, 1);
            } else {
                kvStore.put(word, oldValue + 1);
            }
        }
    }

    @Override
    public void close() {
        // close any resources managed by this processor
        // Note: Do not close any StateStores as these are managed by the library
    }

これをKafka Streamsの中に組込むにはTopologyクラスのコンストラクタ、またはStreamBuilder#buildで生成されたtopologyに対してTopology#addProcessorを呼び出すことで、任意のProcessorクラスを差し込むことができます。 また、この時利用したいStateStoreがあればその名前を渡して関連付けておきます。そうでないとProcessor APIからStateStoreを見つけることが出来ません。その場合は例外が発生します。

Topology topology = new Topology();

// add the source processor node that takes Kafka topic "source-topic" as input
topology.addSource("Source", "source-topic")

    // add the WordCountProcessor node which takes the source processor as its upstream processor
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")

    // add the count store associated with the WordCountProcessor processor
    .addStateStore(countStoreSupplier, "Process")

    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

StateStoreの実態

公式で提供されているStateStoreは大きく分けてRocksDBを利用したpersistentストアとプロセスが終了すると揮発するインメモリストアの二種類ありますが、一般的によく使われるのはpersistentの方だと思います。まず共通の要素について解説していきます。 ちなみに、APIの形式に則って自分で実装してしまえば、独自のStateStoreを定義することも可能です。

StateStoreをlow level APIで利用する場合は自分でStateStoreを構築することができます。

StoreBuilder countStoreBuilder =
  Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("persistent-counts"),
    Serdes.String(),
    Serdes.Long()
  );

これをProcessor APIで利用したい時には以下の様な形で利用します。

KeyValueStore<String, Long> counterStore = context.getStateStore("persistent-counts");
long count = counterStore.fetch("key");
count++;
counterStoreB.put("key", count);

この例では単純なキーの完全一致による取得を行っていますが、Kafka Streamsが提供しているStateStoreはキーのバイト列でソートされているため、rangeメソッドを使って一定の幅のキーを取得することもできます。

StateStoreにおいて重要な要素は以下の通りです。

  • KVSであり、キーの順番でソートされている
  • キーもバリューも実際に格納される値はバイト列であり、アプリケーションとやり取りする際にはSerialize/Deserializeを行う
  • changelogトピックを利用して耐久性を担保する
  • 読み書きに関するcacheの仕組みを持っている

changelogトピック

先程軽く触れましたが、StateStoreを利用すると自動的にchangelogトピックというものがKafka Broker上に作成されます。(インメモリストアの場合はデフォルト無効)

changelogトピックはStateStoreに書き込まれた変更内容を保持しているトピックです。StateStoreにデータが書き込まれたり削除されたり(nullを書き込む)する度に、changelogトピックにレコードが送信されます。

Kafka Streamsではこのchangelogトピックを使って処理ノードのダウンやタスクの再割り当てに対応します。

新しくKafka Streamsのプロセスが立ち上がった時、もしそのアプリケーションがStateStoreを利用する様に定義されていて、かつそのローカルストレージにStateStoreのデータが存在しない場合、Kafka Streamsはレストア処理を行いKafka Broker上に存在するchangelogトピックから全てのレコードを取得し、ローカルのStateStoreを復元します。

同様にノードの増減によってリバランスが実行され、タスクが再割り当てされた時にも、自身のローカルに割り当てられたタスクIDに対応するStateStoreが存在しなければ、まずレストア処理が走ってから処理が継続されます。

Kafka Brokerはproductionで運用する場合普通であれば3つ以上のレプリカを持つ様に設定されており、データ自体の耐障害性はKafka Brokerの機能によって担保する様になっています。

レストア処理の注意点

StateStoreを活用する上で最も注意しておかなければならないことはレストア処理です。

Kafka Streamsはレストアが必要になった時点で処理を停止します。処理が開始・再開されるのはレストア処理が完了した後です。 つまり、StateStoreに溜まっているデータ量が非常に多くなると、レストア処理にかかる時間も長期化していき、いざノードがダウンしたり増減したりする時に、長時間処理が停止する危険が生じます。

これを避けるのは現状かなり難しい問題です。一定以上古いデータや利用頻度の低いデータはネットワークのオーバーヘッドを許容して外部ストアに逃がすなど、データ量を削減する仕組みをコツコツ積み重ねるしかありません。

changelogの無効化

StateStoreを作成する時、StoreBuilderのwithLoggingDisabled()を呼び出しておくと、changelogとの対応関係を無効化することができます。インメモリストアの場合はデフォルトで無効になっています。

changelogトピックが無効になっているとレストア処理が実行されません。RocksDBストアを利用している場合、同じノードで同じIDのタスクが実行されている限りはそのデータが保持され続けます。 一方で、Consumerのリバランスによりタスクの再配置が行われた場合や、処理ノードがダウンして復帰不可能になった場合は、そのデータは消滅します。

主な用途はローカルキャッシュです。メモリに乗り切らない量でもファイルシステムを利用してノードローカルなキャッシュとして利用することが出来ます。

例えば、不変のデータであれば外部のデータストアに蓄積しておいて、最初の一回はネットワークアクセスのオーバーヘッドが発生するが、その後はローカルのファイルシステムにキャッシュしてネットワークを経由したアクセスを回避する、といった用途に利用できます。

StateStoreのキャッシュ

ストリーム処理はその性質上短期間に同じキーのデータを何度も読み書きする可能性が高いものです。例えば、あるuser_idのリンクを踏んだ数などをカウントしている場合、そのイベントレコードが集中して何件も届くというのは十分あり得ることです。 こういった時に、都度ストレージにデータを書き込んでchangelogトピックを更新するのは無駄なオーバーヘッドに繋がる場合があります。

Kafka Streamsではそれを避けるためにデフォルトでキャッシュが有効になっています。書き込んだ内容は一定期間メモリ上に保持され、flushタイミングで実際のStateStoreに書き込みchangelogを更新します。

DSLで構築されるKTableではこのキャッシュ機能を利用してforwardが遅延されます。 例えばaggregateで集約された結果を次のDSLで利用したり別のtopicに送信したりする際には、デフォルトの挙動では一定期間StateStore上にキャッシュされてそこでバッファリングされます。一定時間の経過かキャッシュ用のメモリ領域を使い切ったら、CacheFlushListenerがレコードを次のProcessorにforwardします。

WindowStoreの実態

先程DSLの説明の中でwindowByというDSLについて紹介しました。このDSLを利用することで一定の時間ウインドウごとにレコードを集計することが出来ます。

実はこれはWindowStoreというStateStoreのバリエーションによって実装されています。 WindowStoreを作成する時は以下の様にします。1分毎のウインドウ幅で、2時間分のデータを保持するWindowStoreは以下の様に作成します。

StoreBuilder<WindowStore<String, Long>> counterStoreByMinute =
      Stores.windowStoreBuilder(
              Stores.persistentWindowStore(
                  "counter",
                  Duration.ofHours(2),
                  Duration.ofMinutes(1),
                  false),
              Serdes.String(),
              Serdes.Long());

これをProcessor APIで利用したい時には以下の様な形で利用します。

WindowStore<String, Long> counterStoreByMinute = context.getStateStore("counter");
long durationMillis = Duration.ofMinutes(1).toMillis();
long currentWindowStart = (context.timestamp() / durationMillis) * durationMillis;
long count = counterStoreByMinute.fetch("key", windowStart);
count++;
counterStoreByMinute.put("key", count, currentWindowStart);

キーバリューストアにタイムスタンプを合わせて付与する形で取得や保存を行います。 DSLの裏側にある場合は自動でtimestampからTime Windowを設定してくれますが、WindowStoreを直接扱う場合は、TimeWindowの開始となるタイムスタンプがどこかは手動で算出する必要があります。 WindowStoreに事前に与えられたWindow Durationは、記録されたタイムスタンプを基準にしてどれぐらいの長さのウインドウなのかを示す際に使われます。

上記の例は非常にシンプルな形でしたが、timestampの開始と終了時刻を渡すことで、その範囲に含まれるレコードを順番に取得することもできます。 また、通常のKeyValueStoreと同様にキーのレンジ探索とtimestampの開始と終了時刻のレンジ探索を組み合わせることも出来ます。

内部実装としては、WindowStoreは複数のRocksDBのストアから成り立っていて、記録するタイムスタンプごとに大まかなsegmentに分割されています。保存する時にtimestampから対応するsegmentを特定しその領域にデータを書き込みます。 取得する時は対象範囲のsegmentからのみデータを取得することで探索範囲を小さくしています。 またtimestampの範囲でsegmentが決定できるため、現在処理しているタイムスタンプと比較して保持期限を過ぎたものは、まとめてファイルごと削除することで簡単に破棄できます。

書き込み時にはレコードに与えられたキー(上記の例では"key"文字列)の末尾にミリ秒単位のunix timestampが自動的に付与されて記録されます。

次回に続く

非常に長くなったので、一旦ここらで区切って続きはまた次回とさせていただきます。

次回はDSLでは実現できないがProcessor APIでなら実現可能な応用編と、実際にアプリケーションを開発する時の基本的な考え方について解説する予定です。

Kafka入門 第1回 「そもそもKafkaとはなにか」

これは社内向けに書いた、Kafkaってそもそも何やねん、ということをメンバーに解説するための記事を一部編集して公開できる様にしたものです。 第2回以降では、Kafkaを利用したアプリケーション開発のノウハウについて解説していく予定です。そちらも社内の事情を除いた形で公開していくつもりです。

そもそもKafkaとは

Kafkaはイベントストリーミングプラットフォームと呼ばれるミドルウェアです。 元々はストリームバッファと呼ばれてたと思います。

公式のドキュメントには以下の様に書かれています。

 Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution:

    To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
    To store streams of events durably and reliably for as long as you want.
    To process streams of events as they occur or retrospectively.

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner.

つまり、イベントストリームを書きこんだり読み込んだり出来て、継続的に他のシステムからimportしたりexportしたりできて、それを一定期間(または無期限に)保持し続けることが出来て、発生した時に処理するだけでなく遡って過去のデータを処理することもできるプラットフォームであるということです。 加えて、分散処理可能で、スケーラブルであることも特徴です。

プラットフォームの構成要素は以下の通りです。

  • Kafka Broker ミドルウェアの本体
  • Kafka Client ミドルウェアと通信するためのクライアントライブラリ
  • 上記クライアントによって実装された管理コマンド群
  • Kafka Streams and Kafka Connect アプリケーション開発のためのフレームワーク

イベントストリームとは

終端が存在しない継続的なデータの流れのことです。

システム的にはデータベースやモバイルデバイス等によって発生したデータの動きをリアルタイムにキャプチャし続けたデータの流れと言えます。 一つ一つのデータの発生や変化をイベントと呼び、それらが終端なく延々連なっているものをイベントストリームと呼びます。 このイベントストリームに対して高速に処理を行うことができれば、我々のシステムは、ユーザーに対して即座に最新の結果を届けることが出来る様になります。

イベントストリームには終端が存在しない上にその目的上リアルタイムに近いインターバルで処理することが求められるため、基本的には一件単位で処理を行うことになります。 集計などの複数レコードに渡る処理については、開発者自身で必要なデータの範囲を定義して情報を蓄積しておくストレージを別途用意する必要があります。

Kafkaのエコシステムにはそういったストリーム処理で現実のユースケースを実現するために必要なライブラリも用意されています。

Kafkaでは複数のイベントストリームを扱うことができ、それをトピックと呼びます。トピックは名前を付けて管理できる様にしたイベントストリームということです。

トピックとパーティション

各トピックは1つ以上のパーティションによって分割されています。 また、トピックにはreplication factorが設定されており、各パーティションの複製がどれだけ存在するかを設定できます。replication factorが3であれば、あるパーティションのデータの複製がクラスタ内部に3つ存在するということです。

あるパーティションのあるレプリカは一つのノードによって保持されます。

例えば、TopicAのパーティション数が8でreplication factorが3であった場合、クラスタに所属している各ノードに合計24のパーティションのデータが分散して配置されます。

kafkacat (kcat) コマンドによる出力は以下の様になる。ノード数は5台。

  topic "TopicA" with 8 partitions:
    partition 0, leader 5, replicas: 5,3,1, isrs: 1,5,3
    partition 1, leader 3, replicas: 3,4,2, isrs: 4,2,3
    partition 2, leader 1, replicas: 1,4,2, isrs: 1,4,2
    partition 3, leader 4, replicas: 4,2,5, isrs: 4,2,5
    partition 4, leader 2, replicas: 2,5,3, isrs: 2,5,3
    partition 5, leader 5, replicas: 5,2,3, isrs: 2,5,3
    partition 6, leader 3, replicas: 3,5,1, isrs: 1,5,3
    partition 7, leader 1, replicas: 1,5,3, isrs: 1,5,3

トピックに保持するデータの一行一行をレコードと呼び、レコードはキーとバリューのペアで構成されます。 KafkaのBrokerにとってはキーとバリューはただのバイト列であって、それがどういう型のデータかはクライアントによって解釈されます。 例えば、JSONシリアライズされたデータである場合や、AvroやProtocolBufferによってバイナリシリアライズされたデータである場合、ただの文字列である場合、バイト列をそのまま数値として利用する場合、などシリアライズ方式に依存します。

何故Kafkaが必要なのか

Kafkaは非常に応用範囲が広いため一言で説明するのが難しいが、以下の様なユースケースがあります。

  • システム疎結合化やスケーラビリティ向上のためのメッセージブローカー
  • Webシステムの行動トラッキング
  • システム負荷などをモニタリングするためのメトリックの記録
  • ログ集約
  • イベントソーシング

弊社では、システム疎結合化のためのメッセージ基盤かつ、ユーザーの行動データのリアルタイム集計及び、その集計結果をサービスにリアルタイムに反映するための基盤として利用しています。

Kafkaはキューではない

Kafkaはキューの様なこともできるし、分散キューと呼ばれることもあるが正確にはキューではありません。

Kafkaはイベントストリームを一定期間に渡って保持し続けており、Consumerがそれを処理したとしてもキューと違って保持しているデータは変化しません。 キューの場合は、Consumerがデータを処理した場合そのデータはキューから削除され、他のワーカーから利用できなくなります。そして、基本的には先頭のデータが消えない限り次のレコードは処理されません。 例えばAWSのSQSはその様に動作します。(visibility timeoutなどである程度順番は制御できる) AWSではKinesisがかなりKafkaに近いサービスとして存在します。(現在はManaged Kafkaも提供されている)

キューでないことによる利点

トピックの共有

データがずっと保持されるということは、ある一つのトピックを複数のシステムで共有できるということを意味します。 つまりデータの発生源はKafkaにデータを送信しておくだけで、複数のマイクロサービスを動かすことができます。その先にどんなサービスがあるかを知る必要はありません。 各サービス側は自分がどのトピックを利用するかだけ分かっていれば良く、そこからデータを取り出して自分の処理をするだけで良くなるため、データの発生源について知っておく必要はなくなります。 仮にデータを利用したいサービスが増えたとしても、データの発生元に送信先を追加する必要はありません。 Kafkaの本体となるミドルウェアをメッセージブローカーと呼びますが、この様にメッセージを仲介してサービス間を疎結合に保つ役割を果たすことができます。

耐久性

Consumerがデータを処理してもデータが変化しないということは、Consumerが処理中に何らかのエラーが起こって処理ノードごとダウンしたという場合であっても、データが失われないことを意味します。 そのため、Consumerでエラーが起きたとしても再キューをする必要はなく、ただ別のノードで同じオフセットから処理を継続すればデータを失うことなく処理を復帰できます。 sidekiqなどの様にRedisを処理キューとして利用した場合には、処理中のワーカーがエラーハンドリングを適切にやらないとデータを失う危険性があります。もしOOMなどによってプロセスが突然死した場合には復帰は不可能です。 Kafkaにおいてはそういった心配はほぼ必要なくなります。

一方で、exactly onceモードを利用しない限りはリトライによって二重処理される可能性はあるため、羃等性に関しては別途注意が必要です。

過去のデータを利用できる

Kafkaは一定期間(デフォルトで7日間)のデータを保持し続けていることに加えて、キューと違って任意のオフセットからデータを読み出すことが出来ます。 そのため、過去のデータを利用して処理を開始することが出来ます。

例えば、新しくマイクロサービスを追加した時に、ある程度過去のデータから処理を開始することができますし、一定期間停止しているサービスがあったとしても、再開した時に停止したタイミングまでのデータが残っていれば、安全に処理を再開できます。

キューでないことによる複雑さ

オフセットの記録

キューと違ってデータが消えないということは、Consumer側で自分がどこまでレコードを処理したのかを覚えておく必要があることを意味します。 そのため、KafkaのConsumerクライアントはオフセットを記録しておく場所が必要になります。Kafkaのプロトコルでは、オフセットの記録場所としてもKafka Brokerを利用します。 Kafka Brokerは無期限にデータを保持することが出来るため、各クライアントのオフセット情報を記録するためのストレージとしても利用できます。 クライアントがどこまで処理をしたかのオフセット情報の更新もまたイベントの一つであり、それが延々更新され続けているイベントストリームでもあります。それをKafka Broker自身に格納しておいて、クライアントは起動時にそのオフセット情報をトピックから読み取ります。Brokerに残っている中で最新の値が今までに処理したレコードのオフセットになります。

ちなみにAWSKinesisでは、この様なオフセット情報の記録をDynamoDBを利用して行います。

Consumer Group

キューと異なることによって生じる複雑さの影響はオフセットの記録だけではありません。Consumerの処理によってブローカーのデータが変化しないので、もし複数のワーカーが同じトピックを処理しようとしたら、オフセットの記録だけではワーカーの数だけ重複して同じ処理が実行されてしまいます。

そのため、Kafka ConsumerにはConsumer Groupという概念が存在します。Consumer Groupはどういう目的でKafkaを利用しているかを指す集合であり、Consumerはそれに所属してBrokerからデータを取得します。

同一のConsumer Groupに所属している場合、Kafkaのあるトピックのあるパーティションはそのグループ内で一つのConsumerでしか処理できません。(一つのConsumerが複数のパーティションやトピックを処理することは出来る)

図で示すと以下の様になります。 consumer-groups.png (26.8 kB)

そのため、あるトピックのパーティション総数以上に、そのトピックを処理するConsumerを増やすことはできません。 パーティション数が30であれば、そのトピックを処理できるConsumerの数は30が上限になります。この数はインスタンスの総数で、同じノードに30あっても30ノードに1つづつあっても一緒です。

Kafka Brokerはパーティショニングをしない

Kafkaの大きな特徴の一つとして、パーティショニングを行う主体がBrokerではないという点が挙げられます。 Kafkaにおいてパーティショニングを行うのはProducerです。

データの発生源がKafka Brokerにレコードを送信する時に、クライアントライブラリの実装によってパーティションが決定されます。 そのため、クライアントライブラリが違っていればパーティショニングのロジックが異なる可能性があります。 例えば、JavaのProducerではデフォルトのパーティショニングロジックはキーに対してmurmur2によるハッシュ化を行いその結果をパーティション総数で割って剰余を利用します。 一方でRubyruby-kafkaという実装ではcrc32を利用してハッシュ化を行い同様に剰余を取って利用します。 また、Rubyには他にlibrdkafkaというCのライブラリのバインディングとして実装されたgemがありそちらはcrc32, murmur2, fnv1aなどが選択可能になっています。 この様に同じデータを送信したとしてもクライアントの実装によってパーティションIDが異なる場合があります。

また、厳密にはレコードのキーとパーティショニングは直接関係している訳ではありません。 あくまでデフォルトの挙動としてレコードのキーを利用するだけで、クライアントは任意のデータを任意のロジックでパーティショニングした結果を直接レコードに指定してBrokerに書き込むことができます。

例えばruby-kafkaを利用していたとしても、自力でmurmur2のハッシュ化を行って同一のハッシュ値を算出できれば、それを元に算出したパーティションを直接レコードに指定することで、Javaのライブラリと同じパーティションにレコードを送信できる様になります。

パーティショニングのロジックを揃えることは実際のシステムにおいてはかなり重要です。

例えば、あるユーザー(ID:100)の1日あたりのアクセス数を集計したい場合、ID:100のユーザーのレコードは一台のノードで処理できることが望ましい。 Consumer Groupによってあるパーティションのデータはある1台のノードでしか処理されないことが保証されるので、一貫してID:100のユーザーのレコードが特定のパーティションに送信されるのであれば、各ノードは単に受け取ったデータをローカルに集計結果として蓄積しておけばよくなります。 この時、もし新しいデータの発生源が追加され、クライアントライブラリの実装が異なったせいでパーティショニングの結果が揃わなくなったとしたら、集計結果の蓄積場所がバラバラになってしまい、正しく集計できなくなります。

この様に、どういうキーとどういうアルゴリズムによってそのトピックのデータがパーティショニングされているかを把握することはとても重要です。

基本的には公式のクライアントの実装であるmurmur2 hashingを利用したパーティショニングに揃えておくのが一番安全だと思います。

パーティション数の見積りの重要性

Kafka Brokerを実際に運用する際に非常に難しい要素となるのが、パーティション数を適切に選択することです。

先に述べた様に、同一のConsumer Groupがあるトピックのパーティションからデータ取得できるのは1つのクライアントだけです。つまりスケールできるクライアント数の上限がパーティション数によって決定されてしまいます。

それなら、パーティション数を後から増やせば良いのでは?と考えるかもしれませんが、実際のところそれはそう簡単にはいかないケースも多々あります。 前段で、パーティショニングロジックが異なって違うクライアントに処理が移ってしまうと集計処理が正しく実行できなくなる可能性があると説明しましたが、パーティション数を後から変えた場合も同様の事が発生する可能性があります。

パーティショニングは基本的にキーとなる値をハッシュ化してトピックのパーティション総数で割った剰余を利用します。 パーティションの総数が変化すると同一のデータが割り当てられるパーティションが変わってしまいます。そうなると最終的に処理しているノードが変化してしまい、集計処理が壊れるという事態が発生します。

そのため、パーティション数を後から変更することは、場合によっては非常に手間のかかる難しいオペレーションになります。 実際にこのトピックをどれぐらいの台数までスケールしたいのかをよく考えてパーティション数を見積るのが重要になります。 基本的な見積りの指針となるのは、ネットワークスループットと要件として求められている処理スループット、そして処理にかかるレイテンシです。

補足ですが、処理ノードが変わったとしても集計処理に関係が無い様にアプリケーションを構築すれば良いのではないかと思うかもしれませんが、それはストリームプロセッシングにおいては余りやりたくない設計になります。 何故ならストリームプロセッシングにおける集計では、バッチ処理よりも遥かにデータ取得や書き込みの回数が多くなります。そのためネットワークを介して別ノードにデータを保持する仕組みは無視できないレイテンシの増加を招くことがあり、かなり強い必要性が無い限りは採用したくない設計と言えます。 この点に関する詳細は、以降の解説記事で紹介していく予定です。

まとめ

Kafka はリアルタイムに大量のイベントデータを処理するためのプラットフォームであり、その中核を為すのがKafka Brokerというミドルウェアである。

イベントデータはユーザーの行動ログだったり、システム間のメッセージのやり取りであったり、データベースの変更履歴であったり、様々なものが含まれる。

イベントデータは複数のパーティションに分割されたトピックに格納される。格納されるデータはキーとバリューで構成されたレコードであり、中身は単なるバイト列である。

Kafka Brokerはキューとは異なり、Consumerの処理によってデータが削除されないため、複数のシステムで同じトピックを共有したり、過去に遡ってデータを処理したり、リトライすることが容易である。

一方で、自分がどの範囲のデータを処理する責務を負っているかはクライアント自身で認識しておく必要があり、そのためにオフセット記録やConsumer Groupなどのキューには無い概念が必要になる。

パーティショニングの責任はクライアントが持つため、パーティション結果に依存する集計処理などを行う際には、クライアントライブラリの実装を把握しておくことが重要になる。

(まあ、大体の事は 公式ドキュメントに書いてあるので、これを全部読めば分かるのだが。)

Kafka Streamsを本番運用する時に検討しておくべきconfigの設定値について

Kafka Streamsをそれなりのデータ規模で運用していくとデフォルトの設定値では動作に困るケースがしばしば出てきます。

その中でも気にしておいた方がいい設定値について紹介していきます。

acceptable.recovery.lag

これはあるタスクにおいてStateStoreとchangelogトピックのlagのどこまでを追い付いていると許容するかどうかを設定します。

デフォルト値は10000なんですが、KTableとかで外部からデータが放り込まれていてそれが結構なデータ量だと10000とか一瞬で越えてしまって、一瞬プロセスを再起動しただけでも追い付いていない、みたいな扱いになってrebalanceが即座に走ってしまう挙動になりました。 実際にデータが入ってくるペースを考慮した値に設定しておかないと、プロセスを再起動した際に余計なrebalanceやレストアが走って無駄な停止時間が発生する可能性がある様です。

probing.rebalance.interval.ms

warmup replicaという仕組みが入って、一定期間ごとにwarmup状態をチェックするためのprobing rebalanceという処理が走る様になったのですが、それの実行間隔を指定します。

このconfigは公式のドキュメントだと重要度がLow扱いになってるんですが、これも運用規模によっては結構危険な設定値でした。デフォルト値は10分間です。 ノード数とかpartition数が多いとrebalanceが完了するまでに結構時間がかかる可能性があって、もしrebalanceが安定するまでに10分ぐらいかかってしまう場合、probing rebalanceがずっと終わらなくて、動作こそ継続できるもののクラスタの状態が不安定になるケースがありました。そういった場合に少し長めの値にすると状態が安定するかもしれません。

fetch.max.bytes, max.partition.fetch.bytes

consumerが一度に取得するデータサイズの上限を設定する値です。

これは全体のスループットや、StateStoreのレストア速度に影響します。余り小刻みにデータを取得するとその分の通信オーバヘッドでスループットが低下するので、データ量やネットワーク速度に合わせた値に設定することで、レストアにかかる時間が改善する可能性があります。