自分がググった限りではネット上に記事が皆無で無限の知識のAI様に聞いてもウソしか教えてくれなかったので、ここにまとめておく。 多分、fluent-plugin-bigqueryのメンテをやっている自分ぐらいにしか需要が無いのだろうと思う……。
とりあえず、1日かけて格闘した結果、とりあえず書き込みができるところまでは到達した。
必要なもの
rubygems.orgを見ると分かるのだが、BigQueryのライブラリが1000万件近くDLされているのに対して、storage APIを叩くためのgemの累計ダウンロード数がわずか3万件である。単純計算でRubyでBigQueryを触ろうとしている人の0.3%しか使っていない。それは解説も無いわという感じ。
実装例
require "google/cloud/bigquery" require "google/cloud/bigquery/storage" require "google/protobuf" require "json" project_id = "<your project id>" dataset_id = "<your dataset id>" table_name = "<your table name>" bigquery = Google::Cloud::Bigquery.new(project_id: project_id) dataset = bigquery.dataset(dataset_id) table = dataset.table(table_name) write_client = Google::Cloud::Bigquery::Storage.big_query_write def convert_field_schema(parent_name, field, i, builder, struct_fields) method = case field.mode when "REQUIRED" :required when "NULLABLE" :optional when "REPEATED" :repeated else raise ArgumentError, "Unsupported mode: #{field.mode}" end case field.type.to_sym when :BOOLEAN builder.send(method, field.name, :bool, i) when :BYTES builder.send(method, field.name, :bytes, i) when :DATE builder.send(method, field.name, :int32, i) when :DATETIME builder.send(method, field.name, :int64, i) when :DOUBLE builder.send(method, field.name, :double, i) when :INTEGER builder.send(method, field.name, :int64, i) when :NUMERIC builder.send(method, field.name, :bytes, i) when :BIG_NUMERIC builder.send(method, field.name, :bytes, i) when :STRING builder.send(method, field.name, :string, i) when :JSON builder.send(method, field.name, :string, i) when :GEOGRAPHY builder.send(method, field.name, :string, i) when :TIME builder.send(method, field.name, :int64, i) when :TIMESTAMP builder.send(method, field.name, :int64, i) when :RECORD inner_type_name = parent_name + "." + camelize_name(field.name) builder.send(method, field.name, :message, i, "fluent.plugin.bigquery.table.#{inner_type_name}") struct_fields << field else raise ArgumentError, "Unsupported data type: #{field.type}" end end def camelize_name(name) name.split('_').map(&:capitalize).join end def build_protobuf_descriptor(name, fields) struct_fields = [] type_name = camelize_name(name) msg_proto = nil file_proto = nil Google::Protobuf::DescriptorPool.generated_pool.build do add_file("fluent/plugin/bigquery/table/#{name}.proto", :syntax => :proto2) do msg_proto = build_message_descriptor(nil, type_name, fields, self, struct_fields) until struct_fields.empty? f = struct_fields.shift build_message_descriptor(type_name, camelize_name(f.name), f.fields, self, struct_fields) end end end { msgclass: Google::Protobuf::DescriptorPool.generated_pool.lookup("fluent.plugin.bigquery.table.#{type_name}").msgclass, msg_proto: msg_proto, } end def build_message_descriptor(parent_name, name, fields, builder, struct_fields) type_name = [parent_name, name].compact.join(".") message_builder = nil builder.add_message "fluent.plugin.bigquery.table.#{type_name}" do message_builder = self fields.each.with_index(1) do |field, i| convert_field_schema(type_name, field, i, message_builder, struct_fields) end end message_builder.instance_variable_get("@msg_proto") # DescriptorProtoを取るために内部のインスタンス変数を直接参照している end # tableのスキーマ情報をAPI経由で取得し、Protocol Bufferのdescriptorに変換する result = build_protobuf_descriptor(table_name, table.schema.fields) msgclass = result[:msgclass] msg_proto = result[:msg_proto] # Nested TypeをBigQueryのAPI形式に合わせる msg_proto.field[8].type_name = "Inner" json = JSON.dump({id: "id", insight_id: 1, custom_event_id: 2, name: "hoge", properties: "{\"aa\": \"bb\"}", user_id: 3, tracked_at: Time.now.to_i * 1000000, idfv: "afsdaf", inner: {str_value: "str", int_value: 123}}) # 直接オブジェクトを生成しても良いが、今後の用途のためにJSONからメッセージオブジェクトを生成している val = msgclass.decode_json(json) # メッセージオブジェクトをProtocol Bufferのバイナリ形式にシリアライズする serialized = msgclass.encode(val) # Bigquery Storage APIのAppendRowsRequestの生成 data = [ Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest.new( write_stream: "projects/#{project_id}/datasets/#{dataset_id}/tables/#{table_name}/streams/_default", proto_rows: Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest::ProtoData.new( rows: Google::Cloud::Bigquery::Storage::V1::ProtoRows.new( serialized_rows: [serialized] ), writer_schema: Google::Cloud::Bigquery::Storage::V1::ProtoSchema.new( proto_descriptor: msg_proto ) ) ) ] # リクエスト実行 output = write_client.append_rows(data) output.each do |res| p res end
とりあえず動くところまで持っていっただけのベタ書きのコードなので汚いが、ここまでいければ後は綺麗にするだけなので何とかなるだろう。
ざっくり解説していく。
RubyでStorage Write APIを使う上で非常に面倒な点が、自分でProtocol Buffer形式にデータをシリアライズしなければいけないことと、ProtocolBufferのDescriptorProto(スキーマ定義みたいなもの)を生成しなければならないことの二点である。
Protocol Bufferへのシリアライズ
Javaのライブラリなどでは、ライブラリ自体がテーブルからスキーマを取得し自動的にJSONをProtocol Bufferに変換してDescriptorProtoまで準備してくれるので、JSON形式のオブジェクトを投げるだけで良いのだが、Rubyでは全て自分でやる必要がある。 更にそこにいくつかハマりどころがあり、それを乗り越えなければならない。
上記のコードにおいてはbuild_protobuf_descriptor
メソッドがその根幹になる。
基本的にはBigqueryのスキーマからフィールド定義を引っ張ってきて、それを適宜合う形のタイプのDescriptorに変換していく。 google-protobufにはDescriptorを生成するためのDSLが存在するので、それを利用することでRubyコードからDescriptorを定義できる。
しかし、これについてはドキュメントが全然無い。通常Protocol Bufferのスキーマはprotoファイルの書式で定義するもので各言語のコードでどう表現するかはprotoc
で自動生成したコードによって決まっている。通常利用することが余り無いのでちゃんとしたドキュメントが存在しない。
という訳で、protoc
で生成したコードを元にソースコードを確認し、使い方を調べる必要があった。
例えば、次の様なprotoファイルを元に生成したRubyコードは下記の形になる。
syntax = "proto2"; package test.pkg; import "bar.proto"; import "google/protobuf/timestamp.proto"; message Foo { message Baz { optional int64 num = 1; } optional int64 id = 1; optional string name = 2; optional Bar bar = 3; repeated string values = 4; optional google.protobuf.Timestamp ts = 5; optional Baz baz = 6; }
# Generated by the protocol buffer compiler. DO NOT EDIT! # source: foo.proto require 'google/protobuf' require 'bar_pb' require 'google/protobuf/timestamp_pb' Google::Protobuf::DescriptorPool.generated_pool.build do add_file("foo.proto", :syntax => :proto2) do add_message "test.pkg.Foo" do optional :id, :int64, 1 optional :name, :string, 2 optional :bar, :message, 3, "test.pkg.Bar" repeated :values, :string, 4 optional :ts, :message, 5, "google.protobuf.Timestamp" optional :baz, :message, 6, "test.pkg.Foo.Baz" end add_message "test.pkg.Foo.Baz" do optional :num, :int64, 1 end end end module Test module Pkg Foo = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("test.pkg.Foo").msgclass Foo::Baz = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("test.pkg.Foo.Baz").msgclass end end
これによるとGoogle::Protobuf::DescriptorPool.generated_pool.build
を使ってDSLでDescriptorが定義できるらしい。実装はここにある。
https://github.com/protocolbuffers/protobuf/blob/main/ruby/lib/google/protobuf/descriptor_dsl.rb
BigQueryから取得したスキーマ情報を元に、これらのDSLメソッドを呼び出しているのがconvert_field_schema
になる。
少しややこしいのがRECORD型の扱いで、これに関してはnested typeとしてスキーマを参照する様にして、別のmessageタイプとしてadd_message
を呼び出す様にしている。
この生成したDescriptorからmsgclassを引っ張ってきて、オブジェクトを生成すればencode
メソッドでバイナリシリアライズができる。
ちなみに、BigQueryのどの型がProtocol Bufferのどの型に対応するかに関しても、ドキュメントが見つからなかった。そのため、この対応関係はJavaのソースコードの関係がありそうな箇所を読んでパクってきた。
BigQueryへの書き込み
基本的にはWriteClient#append_rows
メソッドに配列に入っているAppendRowsRequest
オブジェクトを渡せばいいのだが、ここにトラップが存在する。
このリクエストオブジェクトを作成するためには、Protocol BufferのDescriptorProtoオブジェクトが必要になる。Descriptorオブジェクトではない。 そして、RubyのライブラリではDescriptorからDescriptorProtoを得る手段が無い。Javaにはあるのに。
つまり、RubyでProtocolBufferにシリアライズするために必要なスキーマ情報のオブジェクトが、BigQueryのライブラリが要求しているデータと噛み合っていない。
DescriptorProtoを直接生成できなくは無いが、先に示したdescriptor_dsl.rbに定義されているBuilder DSLの実装を見るとかなりややこしいことをしている。正直、これを再実装したくはない。
なので、上記のソースコードではinstance_variable_get
を使って、BuilderがDescriptorを生成した後にBuilderオブジェクト内に残されているDescriptorProtoのオブジェクトを無理矢理引っ張り出している。これはこれでかなりデンジャラスというか悪いことをしている気がするが、こうしない限りはDSLと同等の処理を自前で実装しなおすことになる。
ここまでしてDescriptorProtoを取得しても、まだこれだけでは書き込みは上手くいかない。
DescriptorProtoにnested typeが含まれている場合、DSL上ではpackageとnamespaceを含めたfull qualifiedな名前で型を参照する必要があるのだが、これをこのまま渡すとBigQueryのAPIが型を見つけられなくてエラーになる。 そのため、該当するnested typeからpackageとnamespaceを除外し、シンプルな型名に参照を変換しておく必要がある。 上記のコードでは、そのための変換コードを真面目に書いておらず、実験を成功させるために決め打ちで型名を調整している。
この挙動についても調べた限りでは、ドキュメントが見つからず、自分の場合はJavaのライブラリの実装を読んで何をしなければいけないのかを探り出した。
これで何とか書き込みに成功した。
まとめ
やってみたら使えない訳ではなかったので無いよりは全然マシなのだが、RubyでStorage Write APIを使うにはJavaに比べて実に手間がかかることが分かった。 余り需要が無い気がするが、もしRubyでBigQuery Storage Write APIを使いたい時に、この記事が参考になると良いなと思う。
とりあえず、これでfluent-plugin-bigqueryにStorage Write APIを使って書き込むモードが追加できそうだという取っ掛りを得た。Storage Write APIが使える様になればStream Insertより高いスループットで、しかもデータ量当たりの転送量を半額に抑えられるので、暇を見つけてやっていこうと思う。