自分がググった限りではネット上に記事が皆無で無限の知識のAI様に聞いてもウソしか教えてくれなかったので、ここにまとめておく。
多分、fluent-plugin-bigqueryのメンテをやっている自分ぐらいにしか需要が無いのだろうと思う……。
とりあえず、1日かけて格闘した結果、とりあえず書き込みができるところまでは到達した。
必要なもの
rubygems.orgを見ると分かるのだが、BigQueryのライブラリが1000万件近くDLされているのに対して、storage APIを叩くためのgemの累計ダウンロード数がわずか3万件である。単純計算でRubyでBigQueryを触ろうとしている人の0.3%しか使っていない。それは解説も無いわという感じ。
実装例
require "google/cloud/bigquery"
require "google/cloud/bigquery/storage"
require "google/protobuf"
require "json"
project_id = "<your project id>"
dataset_id = "<your dataset id>"
table_name = "<your table name>"
bigquery = Google::Cloud::Bigquery.new(project_id: project_id)
dataset = bigquery.dataset(dataset_id)
table = dataset.table(table_name)
write_client = Google::Cloud::Bigquery::Storage.big_query_write
def convert_field_schema(parent_name, field, i, builder, struct_fields)
method =
case field.mode
when "REQUIRED"
:required
when "NULLABLE"
:optional
when "REPEATED"
:repeated
else
raise ArgumentError, "Unsupported mode: #{field.mode}"
end
case field.type.to_sym
when :BOOLEAN
builder.send(method, field.name, :bool, i)
when :BYTES
builder.send(method, field.name, :bytes, i)
when :DATE
builder.send(method, field.name, :int32, i)
when :DATETIME
builder.send(method, field.name, :int64, i)
when :DOUBLE
builder.send(method, field.name, :double, i)
when :INTEGER
builder.send(method, field.name, :int64, i)
when :NUMERIC
builder.send(method, field.name, :bytes, i)
when :BIG_NUMERIC
builder.send(method, field.name, :bytes, i)
when :STRING
builder.send(method, field.name, :string, i)
when :JSON
builder.send(method, field.name, :string, i)
when :GEOGRAPHY
builder.send(method, field.name, :string, i)
when :TIME
builder.send(method, field.name, :int64, i)
when :TIMESTAMP
builder.send(method, field.name, :int64, i)
when :RECORD
inner_type_name = parent_name + "." + camelize_name(field.name)
builder.send(method, field.name, :message, i, "fluent.plugin.bigquery.table.#{inner_type_name}")
struct_fields << field
else
raise ArgumentError, "Unsupported data type: #{field.type}"
end
end
def camelize_name(name)
name.split('_').map(&:capitalize).join
end
def build_protobuf_descriptor(name, fields)
struct_fields = []
type_name = camelize_name(name)
msg_proto = nil
file_proto = nil
Google::Protobuf::DescriptorPool.generated_pool.build do
add_file("fluent/plugin/bigquery/table/#{name}.proto", :syntax => :proto2) do
msg_proto = build_message_descriptor(nil, type_name, fields, self, struct_fields)
until struct_fields.empty?
f = struct_fields.shift
build_message_descriptor(type_name, camelize_name(f.name), f.fields, self, struct_fields)
end
end
end
{
msgclass: Google::Protobuf::DescriptorPool.generated_pool.lookup("fluent.plugin.bigquery.table.#{type_name}").msgclass,
msg_proto: msg_proto,
}
end
def build_message_descriptor(parent_name, name, fields, builder, struct_fields)
type_name = [parent_name, name].compact.join(".")
message_builder = nil
builder.add_message "fluent.plugin.bigquery.table.#{type_name}" do
message_builder = self
fields.each.with_index(1) do |field, i|
convert_field_schema(type_name, field, i, message_builder, struct_fields)
end
end
message_builder.instance_variable_get("@msg_proto")
end
result = build_protobuf_descriptor(table_name, table.schema.fields)
msgclass = result[:msgclass]
msg_proto = result[:msg_proto]
msg_proto.field[8].type_name = "Inner"
json = JSON.dump({id: "id", insight_id: 1, custom_event_id: 2, name: "hoge", properties: "{\"aa\": \"bb\"}", user_id: 3, tracked_at: Time.now.to_i * 1000000, idfv: "afsdaf", inner: {str_value: "str", int_value: 123}})
val = msgclass.decode_json(json)
serialized = msgclass.encode(val)
data = [
Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest.new(
write_stream: "projects/#{project_id}/datasets/#{dataset_id}/tables/#{table_name}/streams/_default",
proto_rows: Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest::ProtoData.new(
rows: Google::Cloud::Bigquery::Storage::V1::ProtoRows.new(
serialized_rows: [serialized]
),
writer_schema: Google::Cloud::Bigquery::Storage::V1::ProtoSchema.new(
proto_descriptor: msg_proto
)
)
)
]
output = write_client.append_rows(data)
output.each do |res|
p res
end
とりあえず動くところまで持っていっただけのベタ書きのコードなので汚いが、ここまでいければ後は綺麗にするだけなので何とかなるだろう。
ざっくり解説していく。
RubyでStorage Write APIを使う上で非常に面倒な点が、自分でProtocol Buffer形式にデータをシリアライズしなければいけないことと、ProtocolBufferのDescriptorProto(スキーマ定義みたいなもの)を生成しなければならないことの二点である。
Protocol Bufferへのシリアライズ
Javaのライブラリなどでは、ライブラリ自体がテーブルからスキーマを取得し自動的にJSONをProtocol Bufferに変換してDescriptorProtoまで準備してくれるので、JSON形式のオブジェクトを投げるだけで良いのだが、Rubyでは全て自分でやる必要がある。
更にそこにいくつかハマりどころがあり、それを乗り越えなければならない。
上記のコードにおいてはbuild_protobuf_descriptor
メソッドがその根幹になる。
基本的にはBigqueryのスキーマからフィールド定義を引っ張ってきて、それを適宜合う形のタイプのDescriptorに変換していく。
google-protobufにはDescriptorを生成するためのDSLが存在するので、それを利用することでRubyコードからDescriptorを定義できる。
しかし、これについてはドキュメントが全然無い。通常Protocol Bufferのスキーマはprotoファイルの書式で定義するもので各言語のコードでどう表現するかはprotoc
で自動生成したコードによって決まっている。通常利用することが余り無いのでちゃんとしたドキュメントが存在しない。
という訳で、protoc
で生成したコードを元にソースコードを確認し、使い方を調べる必要があった。
例えば、次の様なprotoファイルを元に生成したRubyコードは下記の形になる。
syntax = "proto2";
package test.pkg;
import "bar.proto";
import "google/protobuf/timestamp.proto";
message Foo {
message Baz {
optional int64 num = 1;
}
optional int64 id = 1;
optional string name = 2;
optional Bar bar = 3;
repeated string values = 4;
optional google.protobuf.Timestamp ts = 5;
optional Baz baz = 6;
}
require 'google/protobuf'
require 'bar_pb'
require 'google/protobuf/timestamp_pb'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_file("foo.proto", :syntax => :proto2) do
add_message "test.pkg.Foo" do
optional :id, :int64, 1
optional :name, :string, 2
optional :bar, :message, 3, "test.pkg.Bar"
repeated :values, :string, 4
optional :ts, :message, 5, "google.protobuf.Timestamp"
optional :baz, :message, 6, "test.pkg.Foo.Baz"
end
add_message "test.pkg.Foo.Baz" do
optional :num, :int64, 1
end
end
end
module Test
module Pkg
Foo = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("test.pkg.Foo").msgclass
Foo::Baz = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("test.pkg.Foo.Baz").msgclass
end
end
これによるとGoogle::Protobuf::DescriptorPool.generated_pool.build
を使ってDSLでDescriptorが定義できるらしい。実装はここにある。
https://github.com/protocolbuffers/protobuf/blob/main/ruby/lib/google/protobuf/descriptor_dsl.rb
BigQueryから取得したスキーマ情報を元に、これらのDSLメソッドを呼び出しているのがconvert_field_schema
になる。
少しややこしいのがRECORD型の扱いで、これに関してはnested typeとしてスキーマを参照する様にして、別のmessageタイプとしてadd_message
を呼び出す様にしている。
この生成したDescriptorからmsgclassを引っ張ってきて、オブジェクトを生成すればencode
メソッドでバイナリシリアライズができる。
ちなみに、BigQueryのどの型がProtocol Bufferのどの型に対応するかに関しても、ドキュメントが見つからなかった。そのため、この対応関係はJavaのソースコードの関係がありそうな箇所を読んでパクってきた。
BigQueryへの書き込み
基本的にはWriteClient#append_rows
メソッドに配列に入っているAppendRowsRequest
オブジェクトを渡せばいいのだが、ここにトラップが存在する。
このリクエストオブジェクトを作成するためには、Protocol BufferのDescriptorProtoオブジェクトが必要になる。Descriptorオブジェクトではない。
そして、RubyのライブラリではDescriptorからDescriptorProtoを得る手段が無い。Javaにはあるのに。
つまり、RubyでProtocolBufferにシリアライズするために必要なスキーマ情報のオブジェクトが、BigQueryのライブラリが要求しているデータと噛み合っていない。
DescriptorProtoを直接生成できなくは無いが、先に示したdescriptor_dsl.rbに定義されているBuilder DSLの実装を見るとかなりややこしいことをしている。正直、これを再実装したくはない。
なので、上記のソースコードではinstance_variable_get
を使って、BuilderがDescriptorを生成した後にBuilderオブジェクト内に残されているDescriptorProtoのオブジェクトを無理矢理引っ張り出している。これはこれでかなりデンジャラスというか悪いことをしている気がするが、こうしない限りはDSLと同等の処理を自前で実装しなおすことになる。
ここまでしてDescriptorProtoを取得しても、まだこれだけでは書き込みは上手くいかない。
DescriptorProtoにnested typeが含まれている場合、DSL上ではpackageとnamespaceを含めたfull qualifiedな名前で型を参照する必要があるのだが、これをこのまま渡すとBigQueryのAPIが型を見つけられなくてエラーになる。
そのため、該当するnested typeからpackageとnamespaceを除外し、シンプルな型名に参照を変換しておく必要がある。
上記のコードでは、そのための変換コードを真面目に書いておらず、実験を成功させるために決め打ちで型名を調整している。
この挙動についても調べた限りでは、ドキュメントが見つからず、自分の場合はJavaのライブラリの実装を読んで何をしなければいけないのかを探り出した。
これで何とか書き込みに成功した。
まとめ
やってみたら使えない訳ではなかったので無いよりは全然マシなのだが、RubyでStorage Write APIを使うにはJavaに比べて実に手間がかかることが分かった。
余り需要が無い気がするが、もしRubyでBigQuery Storage Write APIを使いたい時に、この記事が参考になると良いなと思う。
とりあえず、これでfluent-plugin-bigqueryにStorage Write APIを使って書き込むモードが追加できそうだという取っ掛りを得た。Storage Write APIが使える様になればStream Insertより高いスループットで、しかもデータ量当たりの転送量を半額に抑えられるので、暇を見つけてやっていこうと思う。