BigQueryのStorage Write APIにおけるexactly onceの仕組みとエラーハンドリングについてまとめる

2021年の年末ぐらいから、BigQueryにはStorage Write APIというものが使える様になっている。 これは旧来のStreaming Insert APIに代わるもので、gRPCを利用してより高いスループットでデータ投入が出来る様になっている。Streaming Insertは全体で秒間1GBまでしか書き込めなかったが、Storage Write APIなら3GBまで書き込める。

また、バッチ処理的なワークロードにも対応しており、PENDINGモードを使えば複数のワーカーから書き込んだものを溜めておいて、アトミックにチャンクを書き込むといったことが可能になる。

しかも、このAPIはStreaming Insertより安い。同じデータを書き込むのに半額で済む。(Storage Write: $0.025/1GB, Streaming Insert: $0.010 / 200MBなので、1GBに直すと$0.025:$0.05になる)

できれば使っていきたい。

cloud.google.com

このStorage Write APIではexactly-onceのセマンティクスがサポートされている。しかし結構扱いに癖があるというか最初良く分からなかったので、その扱いについてここにまとめておく。 特に公式のリファレンスにあるサンプルコードはワンタイムの書き込みのサンプルコードだけで、書き込みに失敗したら単にエラーを返して終了している。 自分が知りたかったのは、動き続けるアプリケーションでどうやってリトライしたらいいのかということなのだが、そういうサンプルが見つからない。外部の記事にも余り無い。 (実は皆こういうAPIを生で使ってないのでは……?)

Storage Write APIでexactly-onceに対応しようとした場合には、書き込み時にoffsetを指定することになる。

Javaのサンプルコードを公式から転記すると以下の様になる。

https://cloud.google.com/bigquery/docs/write-api-streaming

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;

public class WriteCommittedStream {

  public static void runWriteCommittedStream()
      throws DescriptorValidationException, InterruptedException, IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "MY_PROJECT_ID";
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";

    writeCommittedStream(projectId, datasetName, tableName);
  }

  public static void writeCommittedStream(String projectId, String datasetName, String tableName)
      throws DescriptorValidationException, InterruptedException, IOException {
    BigQueryWriteClient client = BigQueryWriteClient.create();
    TableName parentTable = TableName.of(projectId, datasetName, tableName);

    DataWriter writer = new DataWriter();
    // One time initialization.
    writer.initialize(parentTable, client);

    try {
      // Write two batches of fake data to the stream, each with 10 JSON records.  Data may be
      // batched up to the maximum request size:
      // https://cloud.google.com/bigquery/quotas#write-api-limits
      long offset = 0;
      for (int i = 0; i < 2; i++) {
        // Create a JSON object that is compatible with the table schema.
        JSONArray jsonArr = new JSONArray();
        for (int j = 0; j < 10; j++) {
          JSONObject record = new JSONObject();
          record.put("col1", String.format("batch-record %03d-%03d", i, j));
          jsonArr.put(record);
        }
        writer.append(jsonArr, offset);
        offset += jsonArr.length();
      }
    } catch (ExecutionException e) {
      // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
      // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
      // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
      System.out.println("Failed to append records. \n" + e);
    }

    // Final cleanup for the stream.
    writer.cleanup(client);
    System.out.println("Appended records successfully.");
  }

  // A simple wrapper object showing how the stateful stream writer should be used.
  private static class DataWriter {

    private JsonStreamWriter streamWriter;
    // Track the number of in-flight requests to wait for all responses before shutting down.
    private final Phaser inflightRequestCount = new Phaser(1);

    private final Object lock = new Object();

    @GuardedBy("lock")
    private RuntimeException error = null;

    void initialize(TableName parentTable, BigQueryWriteClient client)
        throws IOException, DescriptorValidationException, InterruptedException {
      // Initialize a write stream for the specified table.
      // For more information on WriteStream.Type, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html
      WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();

      CreateWriteStreamRequest createWriteStreamRequest =
          CreateWriteStreamRequest.newBuilder()
              .setParent(parentTable.toString())
              .setWriteStream(stream)
              .build();
      WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

      // Use the JSON stream writer to send records in JSON format.
      // For more information about JsonStreamWriter, see:
      // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
      streamWriter =
          JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build();
    }

    public void append(JSONArray data, long offset)
        throws DescriptorValidationException, IOException, ExecutionException {
      synchronized (this.lock) {
        // If earlier appends have failed, we need to reset before continuing.
        if (this.error != null) {
          throw this.error;
        }
      }
      // Append asynchronously for increased throughput.
      ApiFuture<AppendRowsResponse> future = streamWriter.append(data, offset);
      ApiFutures.addCallback(
          future, new DataWriter.AppendCompleteCallback(this), MoreExecutors.directExecutor());
      // Increase the count of in-flight requests.
      inflightRequestCount.register();
    }

    public void cleanup(BigQueryWriteClient client) {
      // Wait for all in-flight requests to complete.
      inflightRequestCount.arriveAndAwaitAdvance();

      // Close the connection to the server.
      streamWriter.close();

      // Verify that no error occurred in the stream.
      synchronized (this.lock) {
        if (this.error != null) {
          throw this.error;
        }
      }

      // Finalize the stream.
      FinalizeWriteStreamResponse finalizeResponse =
          client.finalizeWriteStream(streamWriter.getStreamName());
      System.out.println("Rows written: " + finalizeResponse.getRowCount());
    }

    public String getStreamName() {
      return streamWriter.getStreamName();
    }

    static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {

      private final DataWriter parent;

      public AppendCompleteCallback(DataWriter parent) {
        this.parent = parent;
      }

      public void onSuccess(AppendRowsResponse response) {
        System.out.format("Append %d success\n", response.getAppendResult().getOffset().getValue());
        done();
      }

      public void onFailure(Throwable throwable) {
        synchronized (this.parent.lock) {
          if (this.parent.error == null) {
            StorageException storageException = Exceptions.toStorageException(throwable);
            this.parent.error =
                (storageException != null) ? storageException : new RuntimeException(throwable);
          }
        }
        System.out.format("Error: %s\n", throwable.toString());
        done();
      }

      private void done() {
        // Reduce the count of in-flight requests.
        this.parent.inflightRequestCount.arriveAndDeregister();
      }
    }
  }
}

ポイントになるのは ApiFuture<AppendRowsResponse> future = streamWriter.append(data, offset) の部分。 このoffsetとは何かというと、CreateWriteStreamを呼んで新しい書き込みのためのストリームを作ってから、書き込んだレコードの数を表している。 作成してすぐの時は0であり、10行書き込んだら10になっている。

もし、ストリームが保持しているoffsetが10で、書き込み時に指定されたoffsetが10でなかったら、BigQueryはエラーを返す様になっている。 10より小さい場合はALREADY_EXISTS、10より大きい場合はOUT_OF_RANGEとなる。

公式のリファレンスでは以下の様に説明されている。

ALREADY_EXISTS(StorageErrorCode.OFFSET_ALREADY_EXISTS): 行がすでに書き込まれています。このエラーは無視してかまいません。 OUT_OF_RANGE(StorageErrorCode.OFFSET_OUT_OF_RANGE): 前の書き込みオペレーションが失敗しました。最後に成功した書き込みから再試行します。

なるほど、という感じだがよくよく考えるとそう単純ではない。

offset管理について

まず、この書き込みoffsetの値は書き込む側で管理しておく必要がある。つまり自分が何件書いたかをちゃんとカウントして次のoffsetはこれだと指定する責務は書き込みクライアントにあるということ。

何故こうなるかというと、Storage Write APIで実際に書き込みを行う処理は非同期で行われるので、書き込みを行う時点でBigQuery側からの情報を持っていないためだ。 1回レコードバッファから何件か書き込んで、結果が帰ってくる前に2回目の書き込みが行える様になっている。

なので、非同期でガンガン書き込みつつ自分でoffsetをカウントアップして調整しなければならない。

エラーハンドリングとリトライ

じゃあ、エラーハンドリングはどうするのかというと、非同期処理の結果を一定間隔でメインスレッドで確認するか別スレッドでモニタするかしていると思うが、そこで結果を見てリトライ方法を調節することになる。

実際にエラーが起きて、あるチャンクの書き込みが失敗した場合、そのチャンクについては失敗した原因がエラーとなって帰ってくる。一方で非同期でもっと先のレコードも書き込みをやっているはずだが、それらは全部OUT_OF_RANGEになって書き込みが失敗する。 そうなったら、OUT_OF_RANGEではない通常のエラーが出たレコードチャンクの戦闘に戻ってリトライする必要がある。なので、レコードを書き込む時にはレコードチャンクを書き込んだ時のoffset値がそれぞれ何であったのかを記録しておく。もしくは確実に書けたというoffset値を別途記録しておく。

エラーが起きた時に書き込めていないことが確実だったら、後は適当にそこからリスタートすれば良いのだが、書き込めたのか書き込めていないのか良く分からないケースで実際には書き込めていた、というケースがあった場合は結構面倒臭いことになる。

もし、リトライした時に実際は書き込めていたということなら、ALREADY_EXISTSが帰ってくるはずだ。これは無視して次に進めということになっている。しかし、この時にリトライで書き込もうとしたレコードの数が、最初の試行で書き込んだレコードと一致していなかった場合、単純に無視するとリトライ時に本来スキップしてはいけないレコードをスキップしてしまう可能性があるのではと思っている。

そのため、リトライする時には前の試行と全く同じ範囲のレコードでリトライしないと安全ではない可能性がある。 id:1からid:100のレコードを書き込んで失敗、リトライする時にもしid:105までのレコードを一緒にリトライしALREADY_EXISTSで無視してしまったら、101から105までのレコードをロストする。

これに対処するには、前回の書き込みはid:1からid:100のidシーケンスのレコードで行われたことを記録しておく必要がある。この時、一部のレコードがスキーマ不正などによって書き込めなかった場合は、範囲を同じにしつつ不正なレコードを排除してリトライするといったことも必要になるかもしれない。

という訳で、リトライ安全かつexactly-onceを実現しようとすると、自分で管理しておかなければいけないものが結構多い。適当に書いてエラーになったらリトライすれば良いというものではない。

実際、どういう風に書くのかというとここでサンプルコードを出すには複雑過ぎて面倒臭いので、自分が書いたStorage Write APIを使ったKafka ConnectのSinkコネクターリポジトリへのリンクを貼っておく。もし実装に興味があるなら、こちらを参照して欲しい。

github.com

まだよく分かっていない点や不満な点

実際、書き込めたか書き込めていないかよく分からん、みたいな状態はたまにしか起きないケースで意図して遭遇できるものではない。なのでエラーになって帰ってくるケースが本当に想定しているパターンでカバーできているのかがはっきりしない。

例えば、タイムアウトみたいなもので結果が帰ってこなかった場合で、後続のoffsetで書いた非同期処理の結果が書き込めているなら実際には書き込みOKと判断できるだろう。その時にはリトライ自体が不要になると思うが、本当にそうなるのかよく分からない。この場合、前述したレコード件数の管理の問題でとてもリトライしづらい。

また、部分的にinvalidなレコードを書き込んだ時に、どういうエラーが帰ってくるのかもイマイチはっきりしない。書き込み時にBigQueryのテーブル定義からgRPCのスキーマが生成されるので、まずそこで引っかかったら書き込みまで到達しないので、それは簡単にエラーハンドリングできる。一方で、gRPCのスキーマではvalidだがBigQueryのテーブルではinvalidになるレコードの場合にどういうエラーになるのか、部分的に書き込んだという結果が帰ってくるのか、辺りがはっきりしていない。 SDKのインターフェースから推測できる範囲でこれなら安全だろうという感じで書いたのだが、本当にそうなのかというのがよく分からない。

この辺、公式のリファレンスを見ても細かいケースの差異でどうなるのかが全然書いてないので、正直困る。

とりあえず、ある程度動くものは書けたので一旦良しとしているが、もうちょっと良い資料が無いものか。