Linux(SteamDeck)でWin用のゲームメモリ改造系ツールを動かす方法

Steam版のFinal Fantasy Pixel Remasterにはブーストモードが無い

FF5ピクリマのゲーム自体は結構前に買ってたんだけど、プレイし始めたのは最近で、Steam版にはブーストモードを含めたPS4/Switch版のアップデートが入ってないことを先週知ったところ。

まあ、別にそんな困らないんだけど、FF5は色々組み合わせて遊ぼうとするとアビリティ習得に結構レベリングが必要なので、ブーストできないのは結構面倒臭い。

で、流石にメジャーなPCゲームなんで探せば何かツールあるんじゃないかなーと思ったら案の定あった。

FLiNG Trainer - PC Game Cheats and Mods ってところにPCゲームのチートツールが色々集まってる。 FF5Final Fantasy V (Pixel Remaster) Trainer - FLiNG Trainer - PC Game Cheats and Mods にあった。 これを使えば獲得ABPのN倍が実現できそうである。

しかし、ここは割とマシな方なんだけどそうは言ってもこの手のチートツールは結構怪しいところあるので、実行は自己責任で。

メモリ改造ツールをLinuxで動かす

ここから本題。この種のツールは実行中のプロセスにアタッチして特定のメモリ領域を書き換えることによって成り立っている。

なので、Steamで起動しているFF5のプロセスがツール側から認識できる必要がある。

LinuxでSteamのゲームを起動する場合、Linuxネイティブで起動するものを除くと大抵はProtonというWineからforkしたWin APIの互換実装を利用することになる。

要はWine上で動いている訳だが、何も考えずにwineコマンドでこの手のツールを起動してもプロセスは認識できない。

WineはWINEPREFIXで指定されるディレクトリに擬似的にCドライブやレジストリの値を保存する。デフォルトは~/.wineだ。このWINEPREFIXごとに独立した環境として起動するのでWINEPREFIXを揃えてやる必要がある。

じゃあSteamで起動した時にはどうなっているのか。Linux上のSteamでWin向けのゲームをインストールするとゲームごとに独立したWINEPREFIXが自動的に作成される。例えば/mnt/steamがSteamのゲームインストール先として登録されていた場合、WINEPREFIXの場所は/mnt/steam/steamapps/compatdata/<game id>/pfxになる。今回のFF5ピクリマのケースだと/mnt/steam/steamapps/compatdata/1173810/pfxだ。ちなみにゲーム本体は/mnt/steam/common/<game title>にインストールされる。

SteamのGame IDの調べ方は色々あるが、手っ取り早いのはSteamライブラリからスクリーンショットの管理画面を開いて、そこから保存ディレクトリを開くこと。screenshotsの親ディレクトリがGame IDごとに分類されているはず。ググってもすぐに出てくるしCLIで取れる様にするツールもある。

という訳で、WINEPREFIXの指定場所が分かった。

後は、実行しているWineというかProtonを揃えておく必要がある。

Steamで利用しているProtonがどこにインストールされているかというとこれは結構マチマチなので、起動時にpsコマンドでも打ってそこかだそれっぽいのを探す方が良いかもしれない。

自分はProtonを更にforkしたProton-GEというやつを使っていて、その場合は~/.local/share/Steam/compatibilitytools.d/GE-Proton8-19/に入っていた。

この中の~/.local/share/Steam/compatibilitytools.d/GE-Proton8-19/files/bin/wineを使ってツールを起動する。最終的なコマンドとしては以下の様になる。

WINEPREFIX=/mnt/steam/steamapps/compatdata/<game id>/pfx ~/.local/share/Steam/compatibilitytools.d/GE-Proton8-19/files/bin/wine <trainer tool exe>

上手く環境が噛み合っていれば、ツール側からゲーム本体のプロセスが認識できる様になる。

他にもsteamtinkerlaunchのcustom commandを使って起動時に同時に別のexeを動かすなどの方法も使えそうだったのだが、どうも自分の環境ではツール側の起動に失敗する問題があった。流石にこの辺りは良く分からん。

後は、ツールのUIでポチポチやれば良い。自分はこれで一応上手くいった。

という訳で、今回も非常に需要の少なそうなGentoo LinuxでゲームをやるためのTips記事だった。

Wayland環境に移行してHyprlandを使ってみて3日程度経ったので感想を書く

ふと思い立って(テスト前に掃除したくなるやつ)、一度試して挫折したwayland環境移行を試してみようと再度やってみたら、割と使えそうな感じだったので常用目指して真面目に設定してみた。

今回はi3ベースのswayじゃなくてHyprlandというどっちかというとAwesomeに近いcompositorを使ってみた。ちなみにディストリはGentoo Linuxだ。

Hyprlandは割とビジュアルにこだわっていて、ウインドウを開いたり移動したりするとウニョウニョ動いて楽しい。 Hyprlandの大きな利点は、独自のxdg-desktop-portalが準備されていて、wayland環境でもウインドウ単位のスクリーンシェアがちゃんと動作すること。 swayだとxdg-desktop-portal-wlrを使うことになるんだが、これはモニタ単位のスクリーンシェアしか出来ない。これが不便だったので常用するのが辛かった。 一方でHyprlandは、xdg-desktop-portal-hyprlandが利用できて、このおかげでウインドウシェアが動く。(まあ、ハマりどころは多々あるが)

youtu.be

スクリーンシェアについて

現状、Hyprlandでスクリーンシェアをやるには以下のものが要る。

どうも起動の順番で上手く動かない時があるが、基本的にはpipewireのサービスとwireplumberのサービスを動かして、xdg-desktop-portal-hyprlandとxdg-desktop-portalの両方を動かす必要がある。xdg-desktop-portal-gtkはなんか勝手に起動した。

加えて、Hyprland起動時に環境変数を設定しておく。

env = QT_QPA_PLATFORM,wayland
env = QT_WAYLAND_DISABLE_WINDOWDECORATION,"1"
env = GTK_THEME,Adwaita:dark
env = MOZ_ENABLE_WAYLAND,1
env = XDG_SESSION_TYPE,wayland
env = XDG_SESSION_DESKTOP,Hyprland
env = XDG_CURRENT_DESKTOP,Hyprland
env = _JAVA_AWT_WM_NONREPARENTING,1

exec-once = dbus-update-activation-environment --systemd WAYLAND_DISPLAY XDG_CURRENT_DESKTOP=hyprland XDG_SESSION_TYPE=wayland
exec-once = systemctl --user import-environment WAYLAND_DISPLAY XDG_CURRENT_DESKTOP

XDG_ほげほげの部分がスクリーンシェアに必要らしい。多分xdg-desktop-portalが詳細実装を探す時に使ってる。 後、_JAVA_AWT_WM_NONREPARENTINGが無いとIDEAの画面に何も出なくなる。これは辛い。 後はQT系アプリの調整とか、GTKのテーマ設定とか、Firefoxでwayland nativeの動作を有効にするとか、その辺のための設定。

最後のexec-onceはdbusとsystemdのuser設定に環境変数の内容を通知しておく。これでsystemdとかdbus回りで起動しているプロセスに環境変数が適用されるはず。

また注意点として、ディスプレイのcolor depthで10bit colorを利用しているとキャプチャが上手くいかないらしい点と、Xwayland経由のアプリケーションからだとウインドウが見えない点があるが、まあよく使う範囲では許容できる。

参考:

日本語入力(IME)

元々ibusibus-skkを使っていたのだが、これはwayland nativeなウインドウで日本語入力が効かないことがよくあった。 wayland compositor側のprotocol実装の不足かibus側のprotocol実装の不足のどちらかのせいだったと思う。

で、今回はfcitx5に乗り換えてみたところ、割とちゃんと日本語入力できる様になった。しばしば複数回変換した時の候補ウインドウがちゃんと出ないアプリがあるが、そこまで困らなくなった。一応、そういう時のためにホットキーでneovimを起動して書いた文字列をclipboardに即コピーできる様にしてあるが。 (Gentooは基本のリポジトリにfcitx5が無いので、サードパーティのoverlayを使う必要があるのが面倒だった)

Steam

自分はSteamのゲームも基本的にLinuxを使ってるが、昔Swayで試そうとした時はエラーになって駄目だった。 最近はSwayでもHyprlandでもちゃんと動作する様だ。 Steam自体はwayland nativeではなくXwayland経由で動く。

少なくともONIとADOFAIとサイバーパンク2077は動作確認できた。

動作感と現状の問題点

三大懸念だったスクリーンシェア、日本語入力、Steamが割と何とかなったので、数日使ってみて、動作感や安定性などを確認してみた。

全体的な動きとしてはwaylandの方が軽いというか、X上でpicom使うよりスムーズに動いている様に思う。 mpvでの動画再生とかもwaylandの方が安定していて、高画質動画に更にshaderかけたりした時でもdropが発生しにくくなった。

Firefoxはwayland対応が割とちゃんとしているのかXより動作が軽いしメモリ消費が少なくなった様に思う。

一方で、やはり全体的に微妙な不安定さがある。 しばしば特定のウインドウに対してキーボード入力が効かなくなることがある。ウインドウを閉じるホットキーも効かなくなるので、そうなると外からkillするしかない。

また、スクリーンロック中によく落ちたり固まったりする。特に長時間放置してdpms offが走ってディスプレイが切れた時に上手く復帰できないことが多い。変な固まり方をするとsshで入って再起動するしかなくなることがあった。swayidleとswaylockを使ってるが、これは別にswayに依存している訳ではないはずなので、多分Hyprland側がこなれていないんだろうと思う。

後は、スクリーンシェアの画質が良くないところも問題と言える。もうちょっと綺麗に取る方法無いのだろうか。pipewireとかwireplumber側の設定で調整できんのかな。この辺りが良く分からん。

総じてまだハマるところは色々あるが使えなくはないな、という感じ。

見た目自体は良いしキビキビ動くのでもうちょっと使ってみようと思う。何とかなりそうならメインPCはwaylandのまま頑張ってみたい。

RubyKaigi 2023 参加報告とちょっとエモい話

RubyKaigi 2023に参加してきました。

今回は長野県の松本での開催でした。

全体的な感想

今回は、会場のスポンサーブースの数や来場者が去年より格段に多く、かつてのRubyKaigiが戻ってきたことを強く感じました。

4, 5年ぶりぐらいに会う人も沢山居て、会う人会う人に「うおー、久しぶりです!」って言って回ってた気がします。 久しぶりに会う人と直接近況をやり取りできるのは、とても嬉しいことですね。

自分はあんまり写真撮らないタイプなのですが(食べ物と酒は除く)、今回は割と多くの #rubyfriends 写真を撮った気がする。 それぐらいはしゃいでいたと言えるのかもしれない。

(撮った写真を了解無く上げるのは、ちょっと気になったので写真は割愛)

とにかく、色々な人にまた会えたのが嬉しかった。そういうRubyKaigiでした。

セッションについて

今回は、パーサー周りのトークが妙に多く、世は正に大パーサー時代という感じでしたね。(RubyKaigiのトークって結構テーマが集中する傾向にあると思う)

mameさんやsoutaroさんと廊下で話していて、開発者体験を向上させて新しい言語に置いていかれない様にするためには、昨今LSPを避けては通れないというのは感じますし、そうなってくると絶対に必要なのが記述途中の不完全なソースコードを上手く扱う方法です。そりゃパーサーについて考える機会も増えるわな、という感じですね。

もちろん、その延長で別のRuby実装や別言語で書かれたRubyのためのツール(rubyfmtなど)のメンテナンス性の向上にも繋がるし、ホットなトピックになるのも自然という感じでした。

という訳で、今回聞いたのは、この辺りです。

Matz Keynote

今回は歴史の話がメインって感じでしたが、Ruby30thの時にも結構聞いた感じがするので、個人的にはもうちょっと未来の話が聞きたかったですね。 終盤の、ISO規格どこいった?!辺りの話は爆笑しましたがww エンジンかかってきたMatzの方が面白いのは間違いない。

The future vision of Ruby Parser

今回の主役の一人と言える、kanekoさんによるparser実装の話。 bisonというparser generatorをRubyで実装しなおしたlramaというプロダクトで置き換えるという非常にカッコいいことをやってるのが印象的です。 LALRパーサーのgenerator記法を少し拡張するだけで、やれることが大分増えるというのはスマートで良いなと思いました。 今回YARPの方の話は聞いていないのですが、YARPはRubyで手作りしたparserということで、結構スタイルの違いを感じますね。

なんと今回のRubyKaigi中にbisonへの置き換えを実際に達成して、kanekoさんは「牛殺し」の称号を獲得していましたw

"Ractor" reconsidered

キーノートでMatzが話していた、benefitをいかに提供するかということに繋がる話でした。 Ractorを実際に使ってみる上で、現実的に性能が上がる見込みが強くならないと、利用者が付いてきてくれずフィードバックが得にくくなって、改善も進みにくくなる、という問題にいかに対処するかという話が中心だったと思います。 特にRactorが複数あるとそれぞれのGCが競合してしまう問題は非常に辛い問題だと思いました。 改善すればかなり効果がありそうに思います。

後、M:NスレッドプロジェクトのMaNyにも触れていましたが、これも期待感があります。 一方で、やらなきゃいけないことのリストが半端なかったので、うおー大丈夫なのか……って感じもありました。

Power up your REPL life with types

ぺんさんによるkatakata_irbの紹介。 これについては、とにかく入れるだけで便利になるので、まず試してみようの一言ですね。

利用者側にとって簡単なのがとても素晴らしいなと思いました。

Lightning Talks

今回、CFPに応募したんですが、落ちてしまったので悔しかったというのが一番の感想ですね。 vim関係の話は大倉さんが話していたので枠が無かったというのもあったらしいですが、自分でもパンチが弱いなという感じだったので仕方ない。

Learn Ractor

実は寝坊してしまって、後半ちょっとしか聞けてない。 enumerationがあればRactorで並列化できる可能性を考えてもみても良い、という話だけ記憶に留めておきました。

Implementing "++" operator, stepping into parse.y

kanekoさんの発表に並ぶ、今大会ベストトークの一つ。 トークの流れがよく練られていてめちゃくちゃ面白かったですね。 それぞれのプロセスが、別々のジャンルの人間に刺さる様になっていて、私とモリスさんなんかはlocal_variable_setで爆笑していました。 parse.yのアクションからbinding呼んでlocal_variable_setを使うというのは、全然考えたことがなくて、やられたーと思いました。

アフターパーティでnobuさんが楽しそうにしおいさんと話していたのも、良い光景でした。

RubyGems on the watch

Maciejさんによる、RubyGemsセキュリティインシデントに間する発表。 リリース前のgem情報をGitHubから収集してbrandjackingを行う例とか、アップロード失敗を利用してCDNに悪意あるコードをキャッシュさせるとか、改竄コードが入る余地というのは現実としてあるということが理解できる話でした。 身につまされる内容というか、OSSの世界においては利用するコードが正しいかどうかをちゃんとチェックする責任は利用者にあるし、不用意な信用を前提にして行動をしてはいけないということを肝に銘じておきたい。

Revisiting TypeProf - IDE support as a primary feature

typeprofで開発者体験を上げるために、本当に必要だったものは当初の想定と結構違っていたという話でした。 LSPでの利用を重視してv2を作っているということでしたが、最近自分もLSP周りの整備を行ってtypeprofも触ってみましたが、もっと良い体験が得られるなら期待が高まります。

Multiverse Ruby

この話は個人的にかなり興味深いトークというか、Rubyのマニアックな挙動を利用して現実的に役に立ちそうな可能性に繋げるというのは、とても好きなジャンルの話ですね。 loadの第二引数に匿名モジュールを渡すことで、load先のクラス・モジュール定義が匿名モジュールの名前空間の中に定義されるという挙動を利用して、namespace分離を行えないかという内容の話でした。

実際問題として、他から参照を制限できるnamespaceやgem間での名前の衝突や同一gemの複数バージョン利用などを考えると、現状のRubyで対応できないけど、役に立ちそうなことはいくつかあるので、Rubyのこれからのnamespaceを考える上で一石を投じる発表だったと思います。

モリスさんがこんなブログを書いていたので、そちらも参考になると思います。 https://tagomoris.hatenablog.com/entry/2023/05/15/174652

Optimizing YJIT’s Performance, from Inception to Production

今回、gihyoさんのレポートとして私がこのキーノートを担当することになったので、詳しくはそちらで書くとして、どうでもいい小話が一つ。 Shopifyで頭文字Dが流行ってるんだろうか?w

Ruby Committers and The World

今回はShopifyさん仕切りで、アジェンダがしっかりしていたのと英語メインで進行していたのが今迄との違いです。 別に今回のが悪いという話ではないのですが、進行を無視してコミッタがプロレスを始めて、おもむろにnobuさんがパッチ袋引っ張り出してくる、みたいな展開が無かったのが、例年のThe Worldファンとしてはちょっと物足りなさを感じました。 まあ、どっちが良いという話ではないのですが。

Build Your Own SQLite3

picoruby上でSQLite3を動かすために、VFSレイヤーを自力で実装して必要な関数を全部マッピングしていくという発表でした。 この実装力は本当に凄い。そして、SQLite3が動くキーボードが完成していました。 hasumikinさんにキーボードを借りる時は注意した方がいいかもしれませんw 裏でSQLite3が動いているかもしれないw

Ruby JIT Hacking Guide

今回とても楽しみにしていたRJITに関する発表です。 RJITってrubyのiseq情報とcfpを受け取って、rubyアセンブラを書いて実装を置き換えることで高速化する、みたいな機構だと認識しているんですが、iseq情報を利用して悪い方向に書き換えて、見た目と違う挙動を実現できるのでは、と考えていました。 見込み自体は正しかったので、これを使って来年のRubyKaigiまでに何か作りたいと思ってはいるのですが、まだ実現可能性が何も見えていないので、本当にやれるかどうかは完全に不明です……。

Parsing RBS

soutaroさんによるキーノートで、error tolerant parserをどうやって作るかという話が順序立って説明されていて勉強になりました。 LSPのedit notificationを使って変更チャンクを認識して、そこで制御可能な形でparserの処理を打ち切ってエラー処理に持っていくというのは、とてもスマートだなと思いました。 steepのLSPも最近ある程度触っていて、rbsを書く機会もちょっとづつ作ろうとしているので、開発体験の向上に期待しています。

まとめ

今年は、自分の中でも多くのセッションを聞いたRubyKaigiになりました。 体調が比較的安定してたというか、睡眠破綻が余り起きなかったおかげだろうか。

Leaner Drink upについて

今回は、久しぶりに日本酒の選定をさせてもらいました。Leanerさんとは直接関わりがあった訳ではないのですが、コミュニティの友人の中の人から声をかけていただいて、それなら協力させてもらいますという感じでやらせてもらうことになりました。

まあ、要するにコミュニティ繋がりのただの酒好きのオッサンということです。

とは言え、それなりに長くRubyコミュニティで活動していたこともあってか、多くの友人や日本酒好きが参加してくれて、好評をいただくことができました。

いやー、趣味を曝け出してる感じはちょっと怖いところもあるし、全てちゃんと味を確認してから選べている訳ではないので、不安も少しありましたが、上手く終わって良かったかなと思います。

Leaner様からも感謝の言葉をいただきました。こちらこそスポンサー業の一部とはいえお手伝いできたことを嬉しく思います。

エモい話

今回、去年のRubyKaigiより久しぶりに会ったRubyistが遥かに多く、そのおかげで熱量の高い話も結構やれたかなと思っています。

特にTwitterでも話をしたんですが、kaneko.yさんから「Asakusa.rbでお世話になった頃から憧れのRubyistの一人でした、コミュニティで近くに居てくれたおかげでこれだけの成果が出せる様になったんです」という感じの事を言われて、本当にメチャクチャ嬉しかったんですよ。こんなに嬉しいことは無いんじゃないかと思うぐらい。ぶっちゃけ泣きそうになったし、これ書いてても涙出そうなのを堪えている。 自分から見たら、kanekoさんはとっくにRuby界のHeroの一人でこちらこそ憧れてるRubyistの一人だった訳で、そのことを伝え返すこともできて嬉しかった。 とは言え、そういう過去の自分に乗っかっているだけでは駄目で、何かしらの良いアウトプットを出していける自分でありたいなという思いも強くなりました。 コロナという非常に大きなコミュニティの分断があった後でそういうことがあったので、直接思いを伝える機会があるなら逃してはいけないなと本当に実感した訳ですね。

そういうことがあったので、その後の夜中にemori.houseの面々と飲んだ後、今回スタッフではなく一般参加していたぷぽさんに対して、今迄の感謝と我々が如何にRubyKaigiというイベントを楽しんでいるかということ、そしてその楽しみを今迄スタッフとして支えてくれていたぷぽさんが今年めちゃくちゃ満喫しているのを本当に嬉しく思っている、ということを語るのに繋がった訳ですね。これも気恥ずかしい話ではありますがw

その他のちょっとした話としては、自分のライブラリを使ってくれてるという人が何人か声をかけてくれたことも嬉しかったことの一つです。あんまり代表作と言えるほど大きなものが作れていないので、こういうちょっとしたことでも嬉しく思います。

コロナ過を経て、もしかしたらもう会わない人も一杯居るのかもしれないなと思ったし、今回のRubyKaigiで会えても次に会えるとは限らないということが、以前より遥かに現実的な問題になっていて、そのせいかちゃんと写真という記録を残したり、言える時に感謝の思いは伝えた方が良いなと思うことしきりなRubyKaigiでした。

ちょっと話は変わるのですが、Kaigi後に書かれたいくつかのブログの話を見たりKaigi中に自分もそういう話をしていたりということもあって、憧れの捉え方や自分が何者だと考えているかについて、人それぞれの色々な考え方があるのだなと実感しました。

今回のRubyKaigiでは、そういう自分の立ち位置の変遷や、周りの人達の変化の話や、考え方の違いというものに触れる機会がとても多かった様に思います。 CTOという立場からの変化、人生ステージの変化など、自分がRubyコミュニティに居場所を見つけて10年以上も経った訳で、自分にも周りにもそういった変化を感じている中で、久しぶりに会った人が一杯いたことで自然とそういう話が増えた気がします。

私は、昔から所謂「強い」エンジニアに憧れがあり、自分もそうなりたいと思っていたし、それが生きていく上で重要なポイントの一つでもありました。結果的にそれなりの適性があったので、ある程度のアウトプットは出せたと思うし、そういった友人も沢山できました。

一方で、今もずっと自分はそんな大した人間ではなく普通のプログラマだと思ってるのですが、Kaigi終了後にばったり遭遇したSTORESの藤村さんと話した時に「自分もそう思うけど、状況証拠的に何かしら逸脱したところが自分にあると認めざるを得ない」って感じのことを言っていて、その話に「めっちゃ分かるわー」と強く共感しました。普通それなりにエンジニアを抱えている会社のCTOなんかにはならないんですよねw

つまり、自分にはある程度は「強い」エンジニアとしてやっていく資質があったとは言えるのでは、と最近思える様になりました。ただ、そこに満足してしまうと自分の成長が止まってしまう可能性が高いので、より高い技術力に対する飢えはずっと持っておきたいところですが。

という感じで、自分はそういうコードを書いて「強く」なることが好きで何とかやれていると思うし、これまでの人生の中でRubyKaigiやRubyコミュニティに居る「ギャングスター」にストレートに憧れることが出来たのですが、RubyKaigiって世界でも稀に見る程にヤバイ人達が登壇しているスーパーなイベントでもある訳で、大体登壇している人達ってのはある種の変態ばっかりなんですよ。そう簡単に真似できるもんじゃない。(お前が言うなという話かもしれませんが)

なので、RubyKaigiで受けた刺激はあくまで刺激として受け取って、気にせず自分の人生のための活動をすること、自分なりのやり方でコミュニティに恩を返すのも大事なことなのだと思います。目立つヒーローばっかりが人間じゃない。

自分も、最近Rubyという言語と直接的に強く関わっていないこともあって、自分は最近ちゃんとやれてんのか?と思うこともたまに……という感じだったしw

そんなこんなで、この辺りのことを考えることに繋がったんですが、まあそれなりに経験も積んだおっさんになったので、自分としては、落ち着いて今回のRubyKaigiで受け取ったクソデカ感情というやつを次のRubyKaigiにぶつけられる様に活動していこうと思っています。

一言で言うなら、やる気は出てるぞ!ってことです。(頭が着いていかない可能性があるが……)

まずは、RJITで遊ぶぞ!

最後に

自分の観測範囲でも数名コロナ陽性反応と共に発熱している人がチラホラと出てきています。 幸い、自分は今のところ喉が痛いだけです。これは連日酒を飲んでは色々な人と話し続けてたので当然と言えるのですが、とりあえず時間を見つけて検査は受けに行こうと思っています。 皆様も体調にお気をつけください。

RubyでBigQueryのStorage Write APIを利用するまでの流れ

自分がググった限りではネット上に記事が皆無で無限の知識のAI様に聞いてもウソしか教えてくれなかったので、ここにまとめておく。 多分、fluent-plugin-bigqueryのメンテをやっている自分ぐらいにしか需要が無いのだろうと思う……。

とりあえず、1日かけて格闘した結果、とりあえず書き込みができるところまでは到達した。

必要なもの

rubygems.orgを見ると分かるのだが、BigQueryのライブラリが1000万件近くDLされているのに対して、storage APIを叩くためのgemの累計ダウンロード数がわずか3万件である。単純計算でRubyでBigQueryを触ろうとしている人の0.3%しか使っていない。それは解説も無いわという感じ。

実装例

require "google/cloud/bigquery"
require "google/cloud/bigquery/storage"
require "google/protobuf"
require "json"

project_id = "<your project id>"
dataset_id = "<your dataset id>"
table_name = "<your table name>"

bigquery = Google::Cloud::Bigquery.new(project_id: project_id)
dataset = bigquery.dataset(dataset_id)
table = dataset.table(table_name)

write_client = Google::Cloud::Bigquery::Storage.big_query_write

def convert_field_schema(parent_name, field, i, builder, struct_fields)
  method =
    case field.mode
    when "REQUIRED"
      :required
    when "NULLABLE"
      :optional
    when "REPEATED"
      :repeated
    else
      raise ArgumentError, "Unsupported mode: #{field.mode}"
    end

  case field.type.to_sym
  when :BOOLEAN
    builder.send(method, field.name, :bool, i)
  when :BYTES
    builder.send(method, field.name, :bytes, i)
  when :DATE
    builder.send(method, field.name, :int32, i)
  when :DATETIME
    builder.send(method, field.name, :int64, i)
  when :DOUBLE
    builder.send(method, field.name, :double, i)
  when :INTEGER
    builder.send(method, field.name, :int64, i)
  when :NUMERIC
    builder.send(method, field.name, :bytes, i)
  when :BIG_NUMERIC
    builder.send(method, field.name, :bytes, i)
  when :STRING
    builder.send(method, field.name, :string, i)
  when :JSON
    builder.send(method, field.name, :string, i)
  when :GEOGRAPHY
    builder.send(method, field.name, :string, i)
  when :TIME
    builder.send(method, field.name, :int64, i)
  when :TIMESTAMP
    builder.send(method, field.name, :int64, i)
  when :RECORD
    inner_type_name = parent_name + "." + camelize_name(field.name)
    builder.send(method, field.name, :message, i, "fluent.plugin.bigquery.table.#{inner_type_name}")
    struct_fields << field
  else
    raise ArgumentError, "Unsupported data type: #{field.type}"
  end
end

def camelize_name(name)
  name.split('_').map(&:capitalize).join
end

def build_protobuf_descriptor(name, fields)
  struct_fields = []
  type_name = camelize_name(name)
  msg_proto = nil
  file_proto = nil
  
  Google::Protobuf::DescriptorPool.generated_pool.build do
    add_file("fluent/plugin/bigquery/table/#{name}.proto", :syntax => :proto2) do
      msg_proto = build_message_descriptor(nil, type_name, fields, self, struct_fields)
      until struct_fields.empty?
        f = struct_fields.shift
        build_message_descriptor(type_name, camelize_name(f.name), f.fields, self, struct_fields)
      end
    end
  end

  {
    msgclass: Google::Protobuf::DescriptorPool.generated_pool.lookup("fluent.plugin.bigquery.table.#{type_name}").msgclass, 
    msg_proto: msg_proto,
  }
end

def build_message_descriptor(parent_name, name, fields, builder, struct_fields)
  type_name = [parent_name, name].compact.join(".")
  message_builder = nil
  builder.add_message "fluent.plugin.bigquery.table.#{type_name}" do
    message_builder = self
    fields.each.with_index(1) do |field, i|
      convert_field_schema(type_name, field, i, message_builder, struct_fields)
    end
  end
  message_builder.instance_variable_get("@msg_proto") # DescriptorProtoを取るために内部のインスタンス変数を直接参照している
end

# tableのスキーマ情報をAPI経由で取得し、Protocol Bufferのdescriptorに変換する
result = build_protobuf_descriptor(table_name, table.schema.fields)
msgclass = result[:msgclass]
msg_proto = result[:msg_proto]

# Nested TypeをBigQueryのAPI形式に合わせる
msg_proto.field[8].type_name = "Inner"

json = JSON.dump({id: "id", insight_id: 1, custom_event_id: 2, name: "hoge", properties: "{\"aa\": \"bb\"}", user_id: 3, tracked_at: Time.now.to_i * 1000000, idfv: "afsdaf", inner: {str_value: "str", int_value: 123}})
# 直接オブジェクトを生成しても良いが、今後の用途のためにJSONからメッセージオブジェクトを生成している
val = msgclass.decode_json(json)

# メッセージオブジェクトをProtocol Bufferのバイナリ形式にシリアライズする
serialized = msgclass.encode(val)

# Bigquery Storage APIのAppendRowsRequestの生成
data = [
  Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest.new(
    write_stream: "projects/#{project_id}/datasets/#{dataset_id}/tables/#{table_name}/streams/_default",
    proto_rows: Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest::ProtoData.new(
      rows: Google::Cloud::Bigquery::Storage::V1::ProtoRows.new(
        serialized_rows: [serialized]
      ),
      writer_schema: Google::Cloud::Bigquery::Storage::V1::ProtoSchema.new(
        proto_descriptor: msg_proto
      )
    )
  )
]

# リクエスト実行
output = write_client.append_rows(data)
output.each do |res|
  p res
end

とりあえず動くところまで持っていっただけのベタ書きのコードなので汚いが、ここまでいければ後は綺麗にするだけなので何とかなるだろう。

ざっくり解説していく。

RubyでStorage Write APIを使う上で非常に面倒な点が、自分でProtocol Buffer形式にデータをシリアライズしなければいけないことと、ProtocolBufferのDescriptorProto(スキーマ定義みたいなもの)を生成しなければならないことの二点である。

Protocol Bufferへのシリアライズ

Javaのライブラリなどでは、ライブラリ自体がテーブルからスキーマを取得し自動的にJSONをProtocol Bufferに変換してDescriptorProtoまで準備してくれるので、JSON形式のオブジェクトを投げるだけで良いのだが、Rubyでは全て自分でやる必要がある。 更にそこにいくつかハマりどころがあり、それを乗り越えなければならない。

上記のコードにおいてはbuild_protobuf_descriptorメソッドがその根幹になる。

基本的にはBigqueryのスキーマからフィールド定義を引っ張ってきて、それを適宜合う形のタイプのDescriptorに変換していく。 google-protobufにはDescriptorを生成するためのDSLが存在するので、それを利用することでRubyコードからDescriptorを定義できる。

しかし、これについてはドキュメントが全然無い。通常Protocol Bufferのスキーマはprotoファイルの書式で定義するもので各言語のコードでどう表現するかはprotocで自動生成したコードによって決まっている。通常利用することが余り無いのでちゃんとしたドキュメントが存在しない。

という訳で、protocで生成したコードを元にソースコードを確認し、使い方を調べる必要があった。

例えば、次の様なprotoファイルを元に生成したRubyコードは下記の形になる。

syntax = "proto2";

package test.pkg;

import "bar.proto";
import "google/protobuf/timestamp.proto";

message Foo {
  message Baz {
    optional int64 num = 1;
  }
  optional int64 id = 1;
  optional string name = 2;
  optional Bar bar = 3;
  repeated string values = 4;
  optional google.protobuf.Timestamp ts = 5;
  optional Baz baz = 6;
}
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: foo.proto

require 'google/protobuf'

require 'bar_pb'
require 'google/protobuf/timestamp_pb'

Google::Protobuf::DescriptorPool.generated_pool.build do
  add_file("foo.proto", :syntax => :proto2) do
    add_message "test.pkg.Foo" do
      optional :id, :int64, 1
      optional :name, :string, 2
      optional :bar, :message, 3, "test.pkg.Bar"
      repeated :values, :string, 4
      optional :ts, :message, 5, "google.protobuf.Timestamp"
      optional :baz, :message, 6, "test.pkg.Foo.Baz"
    end
    add_message "test.pkg.Foo.Baz" do
      optional :num, :int64, 1
    end
  end
end

module Test
  module Pkg
    Foo = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("test.pkg.Foo").msgclass
    Foo::Baz = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("test.pkg.Foo.Baz").msgclass
  end
end

これによるとGoogle::Protobuf::DescriptorPool.generated_pool.buildを使ってDSLでDescriptorが定義できるらしい。実装はここにある。 https://github.com/protocolbuffers/protobuf/blob/main/ruby/lib/google/protobuf/descriptor_dsl.rb

BigQueryから取得したスキーマ情報を元に、これらのDSLメソッドを呼び出しているのがconvert_field_schemaになる。 少しややこしいのがRECORD型の扱いで、これに関してはnested typeとしてスキーマを参照する様にして、別のmessageタイプとしてadd_messageを呼び出す様にしている。

この生成したDescriptorからmsgclassを引っ張ってきて、オブジェクトを生成すればencodeメソッドでバイナリシリアライズができる。

ちなみに、BigQueryのどの型がProtocol Bufferのどの型に対応するかに関しても、ドキュメントが見つからなかった。そのため、この対応関係はJavaソースコードの関係がありそうな箇所を読んでパクってきた。

BigQueryへの書き込み

基本的にはWriteClient#append_rowsメソッドに配列に入っているAppendRowsRequestオブジェクトを渡せばいいのだが、ここにトラップが存在する。

このリクエストオブジェクトを作成するためには、Protocol BufferのDescriptorProtoオブジェクトが必要になる。Descriptorオブジェクトではない。 そして、RubyのライブラリではDescriptorからDescriptorProtoを得る手段が無い。Javaにはあるのに。

つまり、RubyでProtocolBufferにシリアライズするために必要なスキーマ情報のオブジェクトが、BigQueryのライブラリが要求しているデータと噛み合っていない。

DescriptorProtoを直接生成できなくは無いが、先に示したdescriptor_dsl.rbに定義されているBuilder DSLの実装を見るとかなりややこしいことをしている。正直、これを再実装したくはない。 なので、上記のソースコードではinstance_variable_getを使って、BuilderがDescriptorを生成した後にBuilderオブジェクト内に残されているDescriptorProtoのオブジェクトを無理矢理引っ張り出している。これはこれでかなりデンジャラスというか悪いことをしている気がするが、こうしない限りはDSLと同等の処理を自前で実装しなおすことになる。

ここまでしてDescriptorProtoを取得しても、まだこれだけでは書き込みは上手くいかない。

DescriptorProtoにnested typeが含まれている場合、DSL上ではpackageとnamespaceを含めたfull qualifiedな名前で型を参照する必要があるのだが、これをこのまま渡すとBigQueryのAPIが型を見つけられなくてエラーになる。 そのため、該当するnested typeからpackageとnamespaceを除外し、シンプルな型名に参照を変換しておく必要がある。 上記のコードでは、そのための変換コードを真面目に書いておらず、実験を成功させるために決め打ちで型名を調整している。

この挙動についても調べた限りでは、ドキュメントが見つからず、自分の場合はJavaのライブラリの実装を読んで何をしなければいけないのかを探り出した。

これで何とか書き込みに成功した。

まとめ

やってみたら使えない訳ではなかったので無いよりは全然マシなのだが、RubyでStorage Write APIを使うにはJavaに比べて実に手間がかかることが分かった。 余り需要が無い気がするが、もしRubyでBigQuery Storage Write APIを使いたい時に、この記事が参考になると良いなと思う。

とりあえず、これでfluent-plugin-bigqueryにStorage Write APIを使って書き込むモードが追加できそうだという取っ掛りを得た。Storage Write APIが使える様になればStream Insertより高いスループットで、しかもデータ量当たりの転送量を半額に抑えられるので、暇を見つけてやっていこうと思う。

rubygem開発でSteepを使って型を書く時の現状のオススメ設定 (2023年3月版)

Rails(というかActiveRecord)に型を付けるのは大変だが、Railsが絡まないrubygemにはそんなに苦労なく型が書けるので、これからgemを書く時には型を書きたいという人向けに今のところオススメの設定を紹介します。 というか自分が忘れるのでまとめておきます。

現状とはsteep-1.3.1, rbs-2.8.4を指します。

rbsは既に3系が出ていますが、一般利用者が型検査に利用する場合はsteepを使うはずで、steepはまだrbsの3系に対応していません。また、rbs-3.0で多少変わっているところもあるので、割と寿命が短い話かもしれません。

設定例

とりあえず結論から。Steepfileとrbs_collection.yamlを修正します。

Steepfile:

D = Steep::Diagnostic

target :lib do
  signature "sig"

  check "lib"                       # Directory name
  
  # configure_code_diagnostics(D::Ruby.strict)       # `strict` diagnostics setting
  # configure_code_diagnostics(D::Ruby.lenient)      # `lenient` diagnostics setting
  configure_code_diagnostics do |hash|
    hash[D::Ruby::MethodDefinitionMissing] = :warning
    hash[D::Ruby::UnknownConstant] = :information
  end
end

rbs_collection.yaml:

# Download sources
sources:
  - name: ruby/gem_rbs_collection
    remote: https://github.com/ruby/gem_rbs_collection.git
    revision: main
    repo_dir: gems

# A directory to install the downloaded RBSs
path: .gem_rbs_collection

gems:
  - name: steep
    ignore: true
  - name: rbs
    ignore: true
  - name: <developing gem name>
    ignore: true

それぞれsteep init, rbs collection initで雛形を生成できます。

この設定をした上で、自分が開発中のgemの型定義をsig以下に書いていきます。lib以下とファイルパスを揃えてエディタで切り替え易い様に設定しておくと良いでしょう。

解説

Steepfile

Steepfileで型検査ツールであるsteepの挙動を制御できます。signatureとcheckは何を指定しているのか簡単なのですが、configure_code_diagnosticsがちょっと分かりにくいのと、快適な結果を得るために地味に重要なので、ここを設定しておきます。

この設定は、steepが検出した各通知対象をどのエラーレベルに設定するかを指定します。nilにセットすると報告しなくなります。

steepはsteep checkコマンドで型を検査できますが、この時デフォルトのエラー通知レベルはwarningになっています。なので、configureで指定した通知対象のレベルが:error:warningでないと通知しませんしFailedになりません。エラー通知レベルは--severity-levelオプションで実行時に指定できます。

では、各通知対象のデフォルト設定がどうなっているかというと、現時点で明確なドキュメントは無いのでsteepのソースコードを見る必要があります。lib/steep/diagnostic/ruby.rb辺りにあります。

ソースコードを参照すると分かるんですが、デフォルトが一番厳しくて、strict, lenientの準に緩くなります。自分の感覚ではデフォルトは厳し過ぎて、strictでは緩過ぎる所はあって、現状では個別に調整した方が良いと感じています。

その中でも特に設定しておいた方が良い値が、上記の設定例で記載したMethodDefinitionMissingUnknownConstantです。MethodDefinitionMissingは型定義に書いてあるのに実装が存在しない場合の警告で、UnknownConstantは型定義の無い定数・クラス・モジュールに対する参照がコード上に存在する時の警告です。

MethodDefinitionMissingはデフォルトが:informationで、自分はこれは単純に実行した時に警告として出て欲しいのでwarningに変更しました。

UnknownConstantは、現状型定義の無いライブラリを参照するのが普通なので、ノーオプションで実行してこのwarningが出ると非常に結果が煩雑になるので、:informationレベルに設定して、オプション指定して意図的に見たい時にだけ引っかかる様にしました。完全に無効化すると、gem_rbs_collectionにライブラリの型定義を追加したいというモチベーションを奪うので、無効化はしていません。

その他にもコードの状況に応じて、現時点では必要無いなと思う通知内容があれば、:informationか:hintレベルにしておく方が結果を見易くする上では有用だと思います。

rbs_collection.yaml

これはrbs collectionコマンドの設定ファイルです。rbs collectionを使うと、Gemfileやgemspecで依存している各ライブラリの型定義をgem_rbs_collectionリポジトリから取得してきてsteepに認識させることができます。また、gemの中にsigがある場合はそちらを参照する様になります。

しかし、現時点で何も考えずに設定すると結構ハマるというか、rbs collection initやった上で設定を変更せずにsteepを実行すると、めちゃくちゃ大量にエラーが出ると思います。 これは、steepとrbsのgem自体に含まれているsigを検査しようとするので、そこに必要な型定義が足りていないとめちゃくちゃエラーが出るんですね。そしてその型検査は通常自分のgem開発ではほぼ必要が無いものになります。(steepやrbs自体を活用するツールは別)

なので、普通のgem開発でrbs collectionを利用する場合は、steepとrbsを除外する設定を追加しておく必要があります。この設定例についてはsteepのguidesディレクトリに解説があります。 (see. https://github.com/soutaro/steep/blob/master/guides/src/gem-rbs-collection/gem-rbs-collection.md)

もう一つの問題がDuplication declarationエラーが出てしまうことです。

github.com

issueに書かれてるんですが、デフォルトだとrbs collectionが開発中のgem自身を認識してしまうのでSteepfileのsigとrbs collectionを読みにいく挙動で重複してしまい、上記のエラーに繋がります。

この挙動は仕様なのかsteepの管轄なのかrbsの管轄なのか微妙なラインだと思いますが、現状何もガイド無しに二つを同時に使ってgem開発しようとすると結構ハマる問題だと思います。

この問題に対する対策は、Steepfileからsignature指定を削除してrbs collectionに任せるか、rbs_collection.yamlで開発中のgemをignore対象に追加してSteepfileに任せるかのどちらかになります。結果は同じなので好みのやり方を選択してください。上記の設定例では後者を採用しています。

この辺りは、デファクトとなるドキュメントへの導線があれば解決すると思いますが、デフォルトの設定値をどうするかは結構難しい問題で開発体験にも結構影響する所です。今後のアップデートによってこういったことを余り気にする必要がなくなると良いですね。

最近rbs-3.0が出たばかりだし、今後の変化に注目しておきましょう。

ソフトウェアエンジニアとしての能力を高める方法について考えてみた

早朝の寝る前ぐらいの時間にぼやっと下記の様なツイートしたらちょっと反応を貰ったので、取り留めは無いが自分なりに考えていることを書いてみる。

タイトルは雑に書いたけど、能力を高めるというと範囲が広過ぎるので、技術的な意味でできる事が増える、ということをテーマとして話をしていこうと思う。基本的に自分の考え方の話なのでそこは御留意ください。

ツイートした通りで、状況や対象に依って割合は変わるかもしれないが基本的にそのためにやることは3つしかないと思っている。

  • 出来る限り公式に近いドキュメント、もしくは信頼できる著者による書籍を読む。場合によっては論文を参照する。
  • それを使ってみる。とりあえず動く小さなアプリなりツールなりが書けるのが一番良い。
  • それを利用しているOSSのコードを読む。フレームワークやライブラリ自体だったら一旦動くものを書いてからコードを追う。

実際には、とにかく公式ドキュメントを読んで書いてを繰り返すことが多い。つまり、ただ読んで書いてをやり続けるだけということになる。

公式のドキュメントが分かり辛いという例は無くは無いと思うが、基本的に技術要素として重要なことは公式のドキュメントかソースコードを見れば大体書いてあるので、改めて他の人が説明することはほとんど無い。自分の理解のために自分の言葉で言い換えてみるというのは意味があると思うが、他人向けの説明資料を作っても余り現実的な効果は無い。というかそんなので本当に理解できるんだったら、既に俺よりよっぽど分かっている。(じゃあ何も書かなくていいかというとそんなことも無いと思うので、それはまた機会があればブログにしようかと思う)

とは言え、↑みたいなことをやるのは時間も気力も必要で、学ぶのに全く苦労しないということは無い。基礎知識が足りないので遠回りしないと分からんことも一杯ある。

なので、やるためのエネルギーを捻り出さなければいけない。自分なりに考えられる動機は4つある。

一つ目は興味(楽しさ)だ。単純に気になるし面白いと思うからやってみたい知りたいと思えば、多少の苦労も乗り越えられる。

二つ目は責任。言い換えると仕事で必要だからやる、ということ。それを使わないと仕事が進まないんだったら苦労してでも学ぶしかない。この時、最悪他の人が何とかしてくれるだろうとか思ってしまうともう駄目で動機が消滅してしまう。

三つ目は危機感。今のまま自分が出来ることが増えていかなければ、いずれプログラマとしての信用を失うかもしれない、それは嫌なので不安から逃れるために新しいことを学びたいと思う。

四つ目は利益(金)。これが出来れば利益に直結するということであれば、利益のために多少の苦労を乗り越えることが出来る。ただ、その利益が信じられるものでないと大した動機にはならない。

他の人がどう考えているかは分からんけど、自分として割と想像が付くのはこんなところだ。もちろん何かを学ぶ時に、どれか一つにドライブされているという様なことはなくてこれらの複合で成り立っている。

興味と危機感は内発的動機で、方向性がポジティブかネガティブかという点で異なるが最終的には自分の意志(MP)に依存している。これで動けるなら他人に頼らなくて良くなるし理想的だと思うが、実際のところこのリソースは有限だと思う。好奇心が尽きたらと考えるとめちゃくちゃ怖い。

一方で責任と利益は外発的動機で、環境・状況によって与えられるものだ。自分の曖昧なリソースに頼らなくていいので分かり易い。

これを自分の目線から見ると、内発的な動機は自分でコントロールするもので生き方や物の考え方をどうコントロールして生きていくかという問題で、外発的な動機は自分で考えても仕方が無いので状況に合わせて行動するということになる。

一方で人を育てる・人を動かすという視点からすると、当人の興味や危機感なんてものは全くコントロールできないが、その人に与える責任や利益は会社側の立場からコントロール可能な物事と言える。

という訳で、自分としては自分じゃない誰かの能力を底上げするということになると、もう実際にやらせて責任を負わせるかこれが出来たら給与を上げるよと確約するしか無いと考えている。

「お前がやるんだよ」レベルまで責任を与えられると良いのだが、実際の所はギリギリになったらケツを持つことは上司の仕事になるだろうが。

言い換えると、「出来るかどうか分からんがとにかく責任を負って仕事をやってみる」しか成長する方法は無いんじゃないかという話だ。この責任から逃げるのも自由だが、それで成長できるかというと相当の才能が要ると思う。少なくとも自分程度の頭では無理だ。

出来ることだから仕事でやるんじゃなくて、仕事でやるから出来る様になる。

取り留めが無い感じだが、最終的な結論としては「もし出来なかったら辛いし、良く分かってないがとにかくやってみる」みたいな精神で仕事できるかどうかが職業エンジニアとしての能力向上に重要で、そういうエンジニアを増やしたい企業としてはとにかく耐えられる人間に責任を移譲してやる事をやらせるということになる。この時スケジュール上の遅延や失敗のリスクを覚悟しなければならないし、失敗した時に過剰に責めてはならない。何もペナルティが無い状態を責任を負っているとは言い辛いので、最低限何らかの評価は必要だと思うけど。

結局のところ、自分がよく例えで持ち出しているが、HxHの天空闘技場で強めに念能力でぶん殴られて生き残るみたいな方法しか普通の人間が壁を越える方法は無いのではと思う。非常にモヒカン的で生存バイアスに支配された考え方であることに自覚はあるが、どうしても他の方法がよく分からないし実感が湧かない。

BigQueryのStorage Write APIにおけるexactly onceの仕組みとエラーハンドリングについてまとめる

2021年の年末ぐらいから、BigQueryにはStorage Write APIというものが使える様になっている。 これは旧来のStreaming Insert APIに代わるもので、gRPCを利用してより高いスループットでデータ投入が出来る様になっている。Streaming Insertは全体で秒間1GBまでしか書き込めなかったが、Storage Write APIなら3GBまで書き込める。

また、バッチ処理的なワークロードにも対応しており、PENDINGモードを使えば複数のワーカーから書き込んだものを溜めておいて、アトミックにチャンクを書き込むといったことが可能になる。

しかも、このAPIはStreaming Insertより安い。同じデータを書き込むのに半額で済む。(Storage Write: $0.025/1GB, Streaming Insert: $0.010 / 200MBなので、1GBに直すと$0.025:$0.05になる)

できれば使っていきたい。

cloud.google.com

このStorage Write APIではexactly-onceのセマンティクスがサポートされている。しかし結構扱いに癖があるというか最初良く分からなかったので、その扱いについてここにまとめておく。 特に公式のリファレンスにあるサンプルコードはワンタイムの書き込みのサンプルコードだけで、書き込みに失敗したら単にエラーを返して終了している。 自分が知りたかったのは、動き続けるアプリケーションでどうやってリトライしたらいいのかということなのだが、そういうサンプルが見つからない。外部の記事にも余り無い。 (実は皆こういうAPIを生で使ってないのでは……?)

Storage Write APIでexactly-onceに対応しようとした場合には、書き込み時にoffsetを指定することになる。

Javaのサンプルコードを公式から転記すると以下の様になる。

https://cloud.google.com/bigquery/docs/write-api-streaming

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;

public class WriteCommittedStream {

  public static void runWriteCommittedStream()
      throws DescriptorValidationException, InterruptedException, IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "MY_PROJECT_ID";
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";

    writeCommittedStream(projectId, datasetName, tableName);
  }

  public static void writeCommittedStream(String projectId, String datasetName, String tableName)
      throws DescriptorValidationException, InterruptedException, IOException {
    BigQueryWriteClient client = BigQueryWriteClient.create();
    TableName parentTable = TableName.of(projectId, datasetName, tableName);

    DataWriter writer = new DataWriter();
    // One time initialization.
    writer.initialize(parentTable, client);

    try {
      // Write two batches of fake data to the stream, each with 10 JSON records.  Data may be
      // batched up to the maximum request size:
      // https://cloud.google.com/bigquery/quotas#write-api-limits
      long offset = 0;
      for (int i = 0; i < 2; i++) {
        // Create a JSON object that is compatible with the table schema.
        JSONArray jsonArr = new JSONArray();
        for (int j = 0; j < 10; j++) {
          JSONObject record = new JSONObject();
          record.put("col1", String.format("batch-record %03d-%03d", i, j));
          jsonArr.put(record);
        }
        writer.append(jsonArr, offset);
        offset += jsonArr.length();
      }
    } catch (ExecutionException e) {
      // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
      // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
      // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
      System.out.println("Failed to append records. \n" + e);
    }

    // Final cleanup for the stream.
    writer.cleanup(client);
    System.out.println("Appended records successfully.");
  }

  // A simple wrapper object showing how the stateful stream writer should be used.
  private static class DataWriter {

    private JsonStreamWriter streamWriter;
    // Track the number of in-flight requests to wait for all responses before shutting down.
    private final Phaser inflightRequestCount = new Phaser(1);

    private final Object lock = new Object();

    @GuardedBy("lock")
    private RuntimeException error = null;

    void initialize(TableName parentTable, BigQueryWriteClient client)
        throws IOException, DescriptorValidationException, InterruptedException {
      // Initialize a write stream for the specified table.
      // For more information on WriteStream.Type, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html
      WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();

      CreateWriteStreamRequest createWriteStreamRequest =
          CreateWriteStreamRequest.newBuilder()
              .setParent(parentTable.toString())
              .setWriteStream(stream)
              .build();
      WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

      // Use the JSON stream writer to send records in JSON format.
      // For more information about JsonStreamWriter, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
      streamWriter =
          JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build();
    }

    public void append(JSONArray data, long offset)
        throws DescriptorValidationException, IOException, ExecutionException {
      synchronized (this.lock) {
        // If earlier appends have failed, we need to reset before continuing.
        if (this.error != null) {
          throw this.error;
        }
      }
      // Append asynchronously for increased throughput.
      ApiFuture<AppendRowsResponse> future = streamWriter.append(data, offset);
      ApiFutures.addCallback(
          future, new DataWriter.AppendCompleteCallback(this), MoreExecutors.directExecutor());
      // Increase the count of in-flight requests.
      inflightRequestCount.register();
    }

    public void cleanup(BigQueryWriteClient client) {
      // Wait for all in-flight requests to complete.
      inflightRequestCount.arriveAndAwaitAdvance();

      // Close the connection to the server.
      streamWriter.close();

      // Verify that no error occurred in the stream.
      synchronized (this.lock) {
        if (this.error != null) {
          throw this.error;
        }
      }

      // Finalize the stream.
      FinalizeWriteStreamResponse finalizeResponse =
          client.finalizeWriteStream(streamWriter.getStreamName());
      System.out.println("Rows written: " + finalizeResponse.getRowCount());
    }

    public String getStreamName() {
      return streamWriter.getStreamName();
    }

    static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {

      private final DataWriter parent;

      public AppendCompleteCallback(DataWriter parent) {
        this.parent = parent;
      }

      public void onSuccess(AppendRowsResponse response) {
        System.out.format("Append %d success\n", response.getAppendResult().getOffset().getValue());
        done();
      }

      public void onFailure(Throwable throwable) {
        synchronized (this.parent.lock) {
          if (this.parent.error == null) {
            StorageException storageException = Exceptions.toStorageException(throwable);
            this.parent.error =
                (storageException != null) ? storageException : new RuntimeException(throwable);
          }
        }
        System.out.format("Error: %s\n", throwable.toString());
        done();
      }

      private void done() {
        // Reduce the count of in-flight requests.
        this.parent.inflightRequestCount.arriveAndDeregister();
      }
    }
  }
}

ポイントになるのは ApiFuture<AppendRowsResponse> future = streamWriter.append(data, offset) の部分。 このoffsetとは何かというと、CreateWriteStreamを呼んで新しい書き込みのためのストリームを作ってから、書き込んだレコードの数を表している。 作成してすぐの時は0であり、10行書き込んだら10になっている。

もし、ストリームが保持しているoffsetが10で、書き込み時に指定されたoffsetが10でなかったら、BigQueryはエラーを返す様になっている。 10より小さい場合はALREADY_EXISTS、10より大きい場合はOUT_OF_RANGEとなる。

公式のリファレンスでは以下の様に説明されている。

ALREADY_EXISTS(StorageErrorCode.OFFSET_ALREADY_EXISTS): 行がすでに書き込まれています。このエラーは無視してかまいません。 OUT_OF_RANGE(StorageErrorCode.OFFSET_OUT_OF_RANGE): 前の書き込みオペレーションが失敗しました。最後に成功した書き込みから再試行します。

なるほど、という感じだがよくよく考えるとそう単純ではない。

offset管理について

まず、この書き込みoffsetの値は書き込む側で管理しておく必要がある。つまり自分が何件書いたかをちゃんとカウントして次のoffsetはこれだと指定する責務は書き込みクライアントにあるということ。

何故こうなるかというと、Storage Write APIで実際に書き込みを行う処理は非同期で行われるので、書き込みを行う時点でBigQuery側からの情報を持っていないためだ。 1回レコードバッファから何件か書き込んで、結果が帰ってくる前に2回目の書き込みが行える様になっている。

なので、非同期でガンガン書き込みつつ自分でoffsetをカウントアップして調整しなければならない。

エラーハンドリングとリトライ

じゃあ、エラーハンドリングはどうするのかというと、非同期処理の結果を一定間隔でメインスレッドで確認するか別スレッドでモニタするかしていると思うが、そこで結果を見てリトライ方法を調節することになる。

実際にエラーが起きて、あるチャンクの書き込みが失敗した場合、そのチャンクについては失敗した原因がエラーとなって帰ってくる。一方で非同期でもっと先のレコードも書き込みをやっているはずだが、それらは全部OUT_OF_RANGEになって書き込みが失敗する。 そうなったら、OUT_OF_RANGEではない通常のエラーが出たレコードチャンクの戦闘に戻ってリトライする必要がある。なので、レコードを書き込む時にはレコードチャンクを書き込んだ時のoffset値がそれぞれ何であったのかを記録しておく。もしくは確実に書けたというoffset値を別途記録しておく。

エラーが起きた時に書き込めていないことが確実だったら、後は適当にそこからリスタートすれば良いのだが、書き込めたのか書き込めていないのか良く分からないケースで実際には書き込めていた、というケースがあった場合は結構面倒臭いことになる。

もし、リトライした時に実際は書き込めていたということなら、ALREADY_EXISTSが帰ってくるはずだ。これは無視して次に進めということになっている。しかし、この時にリトライで書き込もうとしたレコードの数が、最初の試行で書き込んだレコードと一致していなかった場合、単純に無視するとリトライ時に本来スキップしてはいけないレコードをスキップしてしまう可能性があるのではと思っている。

そのため、リトライする時には前の試行と全く同じ範囲のレコードでリトライしないと安全ではない可能性がある。 id:1からid:100のレコードを書き込んで失敗、リトライする時にもしid:105までのレコードを一緒にリトライしALREADY_EXISTSで無視してしまったら、101から105までのレコードをロストする。

これに対処するには、前回の書き込みはid:1からid:100のidシーケンスのレコードで行われたことを記録しておく必要がある。この時、一部のレコードがスキーマ不正などによって書き込めなかった場合は、範囲を同じにしつつ不正なレコードを排除してリトライするといったことも必要になるかもしれない。

という訳で、リトライ安全かつexactly-onceを実現しようとすると、自分で管理しておかなければいけないものが結構多い。適当に書いてエラーになったらリトライすれば良いというものではない。

実際、どういう風に書くのかというとここでサンプルコードを出すには複雑過ぎて面倒臭いので、自分が書いたStorage Write APIを使ったKafka ConnectのSinkコネクターリポジトリへのリンクを貼っておく。もし実装に興味があるなら、こちらを参照して欲しい。

github.com

まだよく分かっていない点や不満な点

実際、書き込めたか書き込めていないかよく分からん、みたいな状態はたまにしか起きないケースで意図して遭遇できるものではない。なのでエラーになって帰ってくるケースが本当に想定しているパターンでカバーできているのかがはっきりしない。

例えば、タイムアウトみたいなもので結果が帰ってこなかった場合で、後続のoffsetで書いた非同期処理の結果が書き込めているなら実際には書き込みOKと判断できるだろう。その時にはリトライ自体が不要になると思うが、本当にそうなるのかよく分からない。この場合、前述したレコード件数の管理の問題でとてもリトライしづらい。

また、部分的にinvalidなレコードを書き込んだ時に、どういうエラーが帰ってくるのかもイマイチはっきりしない。書き込み時にBigQueryのテーブル定義からgRPCのスキーマが生成されるので、まずそこで引っかかったら書き込みまで到達しないので、それは簡単にエラーハンドリングできる。一方で、gRPCのスキーマではvalidだがBigQueryのテーブルではinvalidになるレコードの場合にどういうエラーになるのか、部分的に書き込んだという結果が帰ってくるのか、辺りがはっきりしていない。 SDKのインターフェースから推測できる範囲でこれなら安全だろうという感じで書いたのだが、本当にそうなのかというのがよく分からない。

この辺、公式のリファレンスを見ても細かいケースの差異でどうなるのかが全然書いてないので、正直困る。

とりあえず、ある程度動くものは書けたので一旦良しとしているが、もうちょっと良い資料が無いものか。