Membaca dari BigQuery ke Dataflow

Dokumen ini menjelaskan cara membaca data dari BigQuery ke Dataflow menggunakan konektor I/O BigQuery Apache Beam.

Ringkasan

Konektor I/O BigQuery mendukung dua opsi untuk membaca dari BigQuery:

  • Pembacaan tabel langsung. Opsi ini adalah yang tercepat, karena menggunakan BigQuery Storage Read API.
  • Ekspor tugas. Dengan opsi ini, BigQuery menjalankan tugas ekspor yang menulis data tabel ke Cloud Storage. Konektor kemudian membaca data yang diekspor dari Cloud Storage. Opsi ini kurang efisien karena memerlukan langkah ekspor.

Tugas ekspor adalah opsi default. Untuk menentukan pembacaan langsung, panggil withMethod(Method.DIRECT_READ).

Konektor menserialisasi data tabel menjadi PCollection. Setiap elemen dalam PCollection mewakili satu baris tabel. Konektor mendukung metode serialisasi berikut:

Keparalelan

Paralelisme dalam konektor ini bergantung pada metode baca:

  • Pembacaan langsung: Konektor I/O menghasilkan jumlah stream yang dinamis, berdasarkan ukuran permintaan ekspor. Cloud SQL membaca aliran data ini langsung dari BigQuery secara paralel.

  • Tugas ekspor: BigQuery menentukan jumlah file yang akan ditulis ke Cloud Storage. Jumlah file bergantung pada kueri dan volume data. Konektor I/O membaca file yang diekspor secara paralel.

Performa

Tabel berikut menampilkan metrik performa untuk berbagai opsi baca BigQuery I/O. Workload dijalankan pada satu pekerja e2-standard2, menggunakan Apache Beam SDK 2.49.0 untuk Java. Mereka tidak menggunakan Runner v2.

100 M data | 1 kB | 1 kolom Throughput (byte) Throughput (elemen)
Pembacaan Penyimpanan 120 MBps 88.000 elemen per detik
Ekspor Avro 105 MBps 78.000 elemen per detik
Ekspor JSON 110 MBps 81.000 elemen per detik

Metrik ini didasarkan pada pipeline batch sederhana. Keduanya ditujukan untuk membandingkan performa antara konektor I/O, dan tidak selalu merepresentasikan pipeline di dunia nyata. Performa pipeline Dataflow bersifat kompleks, dan merupakan fungsi dari jenis VM, data yang sedang diproses, performa sumber dan sink eksternal, serta kode pengguna. Metrik didasarkan pada menjalankan Java SDK dan tidak mewakili karakteristik performa SDK bahasa lainnya. Untuk mengetahui informasi selengkapnya, lihat Performa Beam IO.

Praktik terbaik

  • Secara umum, sebaiknya gunakan pembacaan tabel langsung (Method.DIRECT_READ). Storage Read API lebih cocok untuk pipeline data daripada tugas ekspor, karena tidak memerlukan langkah menengah untuk mengekspor data.

  • Jika menggunakan pembacaan langsung, Anda akan dikenai biaya untuk penggunaan Storage Read API. Lihat Harga ekstraksi data di halaman harga BigQuery.

  • Tidak ada biaya tambahan untuk tugas ekspor. Namun, tugas ekspor memiliki batas. Untuk pemindahan data besar, dengan prioritas prioritas dan biaya yang dapat disesuaikan, baca langsung direkomendasikan.

  • Storage Read API memiliki batas kuota. Gunakan metrik Google Cloud untuk memantau penggunaan kuota.

  • Saat menggunakan Storage Read API, Anda mungkin melihat error habis masa berlaku lease dan waktu tunggu sesi di log, seperti:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    Error ini dapat terjadi saat operasi memerlukan waktu lebih lama dari waktu tunggu, biasanya dalam pipeline yang berjalan lebih dari 6 jam. Untuk mengurangi masalah ini, beralihlah ke ekspor file.

  • Saat menggunakan Java SDK, pertimbangkan untuk membuat class yang mewakili skema tabel BigQuery. Lalu, panggil useBeamSchema di pipeline Anda untuk otomatis melakukan konversi antara jenis Row Apache Beam dan TableRow BigQuery. Untuk contoh class skema, lihat ExampleModel.java.

Contoh

Contoh kode di bagian ini menggunakan pembacaan tabel langsung.

Untuk menggunakan tugas ekspor, hapus panggilan ke withMethod atau tentukan Method.EXPORT. Kemudian, tetapkan opsi pipeline --tempLocation untuk menentukan bucket Cloud Storage untuk file yang diekspor.

Contoh kode ini mengasumsikan bahwa tabel sumber memiliki kolom berikut:

  • name (string)
  • age (bilangan bulat)

Ditentukan sebagai file skema JSON:

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

Membaca kumpulan data berformat Avro

Untuk membaca data BigQuery ke dalam data berformat Avro, gunakan metode read(SerializableFunction). Metode ini mengambil fungsi yang ditentukan aplikasi yang mengurai objek SchemaAndRecord dan menampilkan jenis data kustom. Output dari konektor adalah PCollection dari jenis data kustom Anda.

Kode berikut membaca PCollection<MyData> dari tabel BigQuery, dengan MyData adalah class yang ditentukan aplikasi.

Java

Untuk mengautentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Metode read menggunakan antarmuka SerializableFunction<SchemaAndRecord, T>, yang menentukan fungsi untuk mengonversi dari data Avro ke class data kustom. Pada contoh kode sebelumnya, metode MyData.apply menerapkan fungsi konversi ini. Fungsi contoh mengurai kolom name dan age dari data Avro dan menampilkan instance MyData.

Untuk menentukan tabel BigQuery yang akan dibaca, panggil metode from, seperti yang ditunjukkan pada contoh sebelumnya. Untuk mengetahui informasi lebih lanjut, lihat Nama tabel dalam dokumentasi konektor I/O BigQuery.

Membaca objek TableRow

Metode readTableRows membaca data BigQuery ke dalam PCollection dari objek TableRow. Setiap TableRow adalah peta key-value pair yang menyimpan satu baris data tabel. Tentukan tabel BigQuery yang akan dibaca dengan memanggil metode from.

Kode berikut membaca PCollection<TableRows> dari tabel BigQuery.

Java

Untuk mengautentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Contoh ini juga menunjukkan cara mengakses nilai dari kamus TableRow. Nilai bilangan bulat dienkode sebagai string agar cocok dengan format JSON yang diekspor BigQuery.

Proyeksi dan pemfilteran kolom

Saat menggunakan pembacaan langsung (Method.DIRECT_READ), Anda dapat membuat operasi baca lebih efisien dengan mengurangi jumlah data yang dibaca dari BigQuery dan dikirim melalui jaringan.

  • Proyeksi kolom: Panggil withSelectedFields untuk membaca subset kolom dari tabel. Hal ini memungkinkan pembacaan yang efisien ketika tabel berisi banyak kolom.
  • Pemfilteran baris: Panggil withRowRestriction untuk menentukan predikat yang memfilter data di sisi server.

Predikat filter harus deterministik, dan agregasi tidak didukung.

Contoh berikut memproyeksikan kolom "user_name" dan "age", serta memfilter baris yang tidak cocok dengan predikat "age > 18".

Java

Untuk mengautentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Membaca dari hasil kueri

Contoh sebelumnya menunjukkan cara membaca baris dari tabel. Anda juga dapat membaca dari hasil kueri SQL, dengan memanggil fromQuery. Pendekatan ini memindahkan sebagian pekerjaan komputasi ke BigQuery. Anda juga dapat menggunakan metode ini untuk membaca dari tampilan BigQuery atau tampilan terwujud, dengan menjalankan kueri terhadap tampilan.

Contoh berikut menjalankan kueri terhadap set data publik BigQuery dan membaca hasilnya. Setelah pipeline berjalan, Anda dapat melihat tugas kueri di histori tugas BigQuery.

Java

Untuk mengautentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Langkah selanjutnya