RubyでBigQueryのStorage Write APIを利用するまでの流れ

自分がググった限りではネット上に記事が皆無で無限の知識のAI様に聞いてもウソしか教えてくれなかったので、ここにまとめておく。 多分、fluent-plugin-bigqueryのメンテをやっている自分ぐらいにしか需要が無いのだろうと思う……。

とりあえず、1日かけて格闘した結果、とりあえず書き込みができるところまでは到達した。

必要なもの

rubygems.orgを見ると分かるのだが、BigQueryのライブラリが1000万件近くDLされているのに対して、storage APIを叩くためのgemの累計ダウンロード数がわずか3万件である。単純計算でRubyでBigQueryを触ろうとしている人の0.3%しか使っていない。それは解説も無いわという感じ。

実装例

require "google/cloud/bigquery"
require "google/cloud/bigquery/storage"
require "google/protobuf"
require "json"

project_id = "<your project id>"
dataset_id = "<your dataset id>"
table_name = "<your table name>"

bigquery = Google::Cloud::Bigquery.new(project_id: project_id)
dataset = bigquery.dataset(dataset_id)
table = dataset.table(table_name)

write_client = Google::Cloud::Bigquery::Storage.big_query_write

def convert_field_schema(parent_name, field, i, builder, struct_fields)
  method =
    case field.mode
    when "REQUIRED"
      :required
    when "NULLABLE"
      :optional
    when "REPEATED"
      :repeated
    else
      raise ArgumentError, "Unsupported mode: #{field.mode}"
    end

  case field.type.to_sym
  when :BOOLEAN
    builder.send(method, field.name, :bool, i)
  when :BYTES
    builder.send(method, field.name, :bytes, i)
  when :DATE
    builder.send(method, field.name, :int32, i)
  when :DATETIME
    builder.send(method, field.name, :int64, i)
  when :DOUBLE
    builder.send(method, field.name, :double, i)
  when :INTEGER
    builder.send(method, field.name, :int64, i)
  when :NUMERIC
    builder.send(method, field.name, :bytes, i)
  when :BIG_NUMERIC
    builder.send(method, field.name, :bytes, i)
  when :STRING
    builder.send(method, field.name, :string, i)
  when :JSON
    builder.send(method, field.name, :string, i)
  when :GEOGRAPHY
    builder.send(method, field.name, :string, i)
  when :TIME
    builder.send(method, field.name, :int64, i)
  when :TIMESTAMP
    builder.send(method, field.name, :int64, i)
  when :RECORD
    inner_type_name = parent_name + "." + camelize_name(field.name)
    builder.send(method, field.name, :message, i, "fluent.plugin.bigquery.table.#{inner_type_name}")
    struct_fields << field
  else
    raise ArgumentError, "Unsupported data type: #{field.type}"
  end
end

def camelize_name(name)
  name.split('_').map(&:capitalize).join
end

def build_protobuf_descriptor(name, fields)
  struct_fields = []
  type_name = camelize_name(name)
  msg_proto = nil
  file_proto = nil
  
  Google::Protobuf::DescriptorPool.generated_pool.build do
    add_file("fluent/plugin/bigquery/table/#{name}.proto", :syntax => :proto2) do
      msg_proto = build_message_descriptor(nil, type_name, fields, self, struct_fields)
      until struct_fields.empty?
        f = struct_fields.shift
        build_message_descriptor(type_name, camelize_name(f.name), f.fields, self, struct_fields)
      end
    end
  end

  {
    msgclass: Google::Protobuf::DescriptorPool.generated_pool.lookup("fluent.plugin.bigquery.table.#{type_name}").msgclass, 
    msg_proto: msg_proto,
  }
end

def build_message_descriptor(parent_name, name, fields, builder, struct_fields)
  type_name = [parent_name, name].compact.join(".")
  message_builder = nil
  builder.add_message "fluent.plugin.bigquery.table.#{type_name}" do
    message_builder = self
    fields.each.with_index(1) do |field, i|
      convert_field_schema(type_name, field, i, message_builder, struct_fields)
    end
  end
  message_builder.instance_variable_get("@msg_proto") # DescriptorProtoを取るために内部のインスタンス変数を直接参照している
end

# tableのスキーマ情報をAPI経由で取得し、Protocol Bufferのdescriptorに変換する
result = build_protobuf_descriptor(table_name, table.schema.fields)
msgclass = result[:msgclass]
msg_proto = result[:msg_proto]

# Nested TypeをBigQueryのAPI形式に合わせる
msg_proto.field[8].type_name = "Inner"

json = JSON.dump({id: "id", insight_id: 1, custom_event_id: 2, name: "hoge", properties: "{\"aa\": \"bb\"}", user_id: 3, tracked_at: Time.now.to_i * 1000000, idfv: "afsdaf", inner: {str_value: "str", int_value: 123}})
# 直接オブジェクトを生成しても良いが、今後の用途のためにJSONからメッセージオブジェクトを生成している
val = msgclass.decode_json(json)

# メッセージオブジェクトをProtocol Bufferのバイナリ形式にシリアライズする
serialized = msgclass.encode(val)

# Bigquery Storage APIのAppendRowsRequestの生成
data = [
  Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest.new(
    write_stream: "projects/#{project_id}/datasets/#{dataset_id}/tables/#{table_name}/streams/_default",
    proto_rows: Google::Cloud::Bigquery::Storage::V1::AppendRowsRequest::ProtoData.new(
      rows: Google::Cloud::Bigquery::Storage::V1::ProtoRows.new(
        serialized_rows: [serialized]
      ),
      writer_schema: Google::Cloud::Bigquery::Storage::V1::ProtoSchema.new(
        proto_descriptor: msg_proto
      )
    )
  )
]

# リクエスト実行
output = write_client.append_rows(data)
output.each do |res|
  p res
end

とりあえず動くところまで持っていっただけのベタ書きのコードなので汚いが、ここまでいければ後は綺麗にするだけなので何とかなるだろう。

ざっくり解説していく。

RubyでStorage Write APIを使う上で非常に面倒な点が、自分でProtocol Buffer形式にデータをシリアライズしなければいけないことと、ProtocolBufferのDescriptorProto(スキーマ定義みたいなもの)を生成しなければならないことの二点である。

Protocol Bufferへのシリアライズ

Javaのライブラリなどでは、ライブラリ自体がテーブルからスキーマを取得し自動的にJSONをProtocol Bufferに変換してDescriptorProtoまで準備してくれるので、JSON形式のオブジェクトを投げるだけで良いのだが、Rubyでは全て自分でやる必要がある。 更にそこにいくつかハマりどころがあり、それを乗り越えなければならない。

上記のコードにおいてはbuild_protobuf_descriptorメソッドがその根幹になる。

基本的にはBigqueryのスキーマからフィールド定義を引っ張ってきて、それを適宜合う形のタイプのDescriptorに変換していく。 google-protobufにはDescriptorを生成するためのDSLが存在するので、それを利用することでRubyコードからDescriptorを定義できる。

しかし、これについてはドキュメントが全然無い。通常Protocol Bufferのスキーマはprotoファイルの書式で定義するもので各言語のコードでどう表現するかはprotocで自動生成したコードによって決まっている。通常利用することが余り無いのでちゃんとしたドキュメントが存在しない。

という訳で、protocで生成したコードを元にソースコードを確認し、使い方を調べる必要があった。

例えば、次の様なprotoファイルを元に生成したRubyコードは下記の形になる。

syntax = "proto2";

package test.pkg;

import "bar.proto";
import "google/protobuf/timestamp.proto";

message Foo {
  message Baz {
    optional int64 num = 1;
  }
  optional int64 id = 1;
  optional string name = 2;
  optional Bar bar = 3;
  repeated string values = 4;
  optional google.protobuf.Timestamp ts = 5;
  optional Baz baz = 6;
}
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: foo.proto

require 'google/protobuf'

require 'bar_pb'
require 'google/protobuf/timestamp_pb'

Google::Protobuf::DescriptorPool.generated_pool.build do
  add_file("foo.proto", :syntax => :proto2) do
    add_message "test.pkg.Foo" do
      optional :id, :int64, 1
      optional :name, :string, 2
      optional :bar, :message, 3, "test.pkg.Bar"
      repeated :values, :string, 4
      optional :ts, :message, 5, "google.protobuf.Timestamp"
      optional :baz, :message, 6, "test.pkg.Foo.Baz"
    end
    add_message "test.pkg.Foo.Baz" do
      optional :num, :int64, 1
    end
  end
end

module Test
  module Pkg
    Foo = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("test.pkg.Foo").msgclass
    Foo::Baz = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("test.pkg.Foo.Baz").msgclass
  end
end

これによるとGoogle::Protobuf::DescriptorPool.generated_pool.buildを使ってDSLでDescriptorが定義できるらしい。実装はここにある。 https://github.com/protocolbuffers/protobuf/blob/main/ruby/lib/google/protobuf/descriptor_dsl.rb

BigQueryから取得したスキーマ情報を元に、これらのDSLメソッドを呼び出しているのがconvert_field_schemaになる。 少しややこしいのがRECORD型の扱いで、これに関してはnested typeとしてスキーマを参照する様にして、別のmessageタイプとしてadd_messageを呼び出す様にしている。

この生成したDescriptorからmsgclassを引っ張ってきて、オブジェクトを生成すればencodeメソッドでバイナリシリアライズができる。

ちなみに、BigQueryのどの型がProtocol Bufferのどの型に対応するかに関しても、ドキュメントが見つからなかった。そのため、この対応関係はJavaソースコードの関係がありそうな箇所を読んでパクってきた。

BigQueryへの書き込み

基本的にはWriteClient#append_rowsメソッドに配列に入っているAppendRowsRequestオブジェクトを渡せばいいのだが、ここにトラップが存在する。

このリクエストオブジェクトを作成するためには、Protocol BufferのDescriptorProtoオブジェクトが必要になる。Descriptorオブジェクトではない。 そして、RubyのライブラリではDescriptorからDescriptorProtoを得る手段が無い。Javaにはあるのに。

つまり、RubyでProtocolBufferにシリアライズするために必要なスキーマ情報のオブジェクトが、BigQueryのライブラリが要求しているデータと噛み合っていない。

DescriptorProtoを直接生成できなくは無いが、先に示したdescriptor_dsl.rbに定義されているBuilder DSLの実装を見るとかなりややこしいことをしている。正直、これを再実装したくはない。 なので、上記のソースコードではinstance_variable_getを使って、BuilderがDescriptorを生成した後にBuilderオブジェクト内に残されているDescriptorProtoのオブジェクトを無理矢理引っ張り出している。これはこれでかなりデンジャラスというか悪いことをしている気がするが、こうしない限りはDSLと同等の処理を自前で実装しなおすことになる。

ここまでしてDescriptorProtoを取得しても、まだこれだけでは書き込みは上手くいかない。

DescriptorProtoにnested typeが含まれている場合、DSL上ではpackageとnamespaceを含めたfull qualifiedな名前で型を参照する必要があるのだが、これをこのまま渡すとBigQueryのAPIが型を見つけられなくてエラーになる。 そのため、該当するnested typeからpackageとnamespaceを除外し、シンプルな型名に参照を変換しておく必要がある。 上記のコードでは、そのための変換コードを真面目に書いておらず、実験を成功させるために決め打ちで型名を調整している。

この挙動についても調べた限りでは、ドキュメントが見つからず、自分の場合はJavaのライブラリの実装を読んで何をしなければいけないのかを探り出した。

これで何とか書き込みに成功した。

まとめ

やってみたら使えない訳ではなかったので無いよりは全然マシなのだが、RubyでStorage Write APIを使うにはJavaに比べて実に手間がかかることが分かった。 余り需要が無い気がするが、もしRubyでBigQuery Storage Write APIを使いたい時に、この記事が参考になると良いなと思う。

とりあえず、これでfluent-plugin-bigqueryにStorage Write APIを使って書き込むモードが追加できそうだという取っ掛りを得た。Storage Write APIが使える様になればStream Insertより高いスループットで、しかもデータ量当たりの転送量を半額に抑えられるので、暇を見つけてやっていこうと思う。