Von BigQuery in Dataflow lesen

In diesem Dokument wird gezeigt, wie Sie mit dem BigQuery-E/A-Connector von Apache Beam Daten aus BigQuery lesen.

Überblick

Der BigQuery-E/A-Connector unterstützt zwei Optionen zum Lesen aus BigQuery:

  • Direkte Tabellenlesevorgänge. Diese Option ist die schnellste, da sie die BigQuery Storage Read API verwendet.
  • Exportjob Mit dieser Option führt BigQuery einen Exportjob aus, der die Tabellendaten in Cloud Storage schreibt. Der Connector liest dann die exportierten Daten aus Cloud Storage. Diese Option ist weniger effizient, da sie den Exportschritt erfordert.

Exportjobs sind die Standardoption. Rufen Sie withMethod(Method.DIRECT_READ) auf, um direkte Lesevorgänge anzugeben.

Der Connector serialisiert die Tabellendaten in einer PCollection. Jedes Element in PCollection stellt eine einzelne Tabellenzeile dar. Der Connector unterstützt die folgenden Serialisierungsmethoden:

Parallelität

Parallelität in diesem Connector hängt von der Lesemethode ab:

  • Direkte Lesevorgänge: Der E/A-Connector erzeugt eine dynamische Anzahl von Streams basierend auf der Größe der Exportanfrage. Diese Streams werden direkt von BigQuery gelesen.

  • Exportjobs: BigQuery bestimmt, wie viele Dateien in Cloud Storage geschrieben werden. Die Anzahl der Dateien hängt von der Abfrage und der Datenmenge ab. Der E/A-Connector liest die exportierten Dateien parallel.

Leistung

Die folgende Tabelle enthält Leistungsmesswerte verschiedener BigQuery E/A-Leseoptionen. Die Arbeitslasten wurden auf einem e2-standard2-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Sie haben nicht Runner v2 verwendet.

100 Mio. Datensätze | 1 KB | 1 Spalte Durchsatz (Byte) Durchsatz (Elemente)
Speicherlesevorgänge 120 Mbit/s 88.000 Elemente pro Sekunde
Avro-Export 105 Mbit/s 78.000 Elemente pro Sekunde
Json-Export 110 Mbit/s 81.000 Elemente pro Sekunde

Diese Messwerte basieren auf einfachen Batchpipelines. Sie sollen die Leistung von E/A-Connectors vergleichen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Leistung von Beam IO.

Best Practices

  • Im Allgemeinen empfehlen wir die Verwendung direkter Tabellenlesevorgänge (Method.DIRECT_READ). Die Storage Read API eignet sich besser für Datenpipelines als Exportjobs, da der Zwischenschritt zum Exportieren von Daten nicht benötigt wird.

  • Wenn Sie direkte Lesevorgänge verwenden, wird Ihnen die Nutzung der Storage Read API in Rechnung gestellt. Weitere Informationen finden Sie auf der Seite BigQuery-Preise unter Preise für die Datenextraktion.

  • Für Exportjobs fallen keine zusätzlichen Kosten an. Für Exportjobs gelten jedoch Limits. Bei großen Datenbewegungen, bei denen die Aktualität die Priorität ist und die Kosten angepasst werden können, werden direkte Lesevorgänge empfohlen.

  • Für die Storage Read API gelten Kontingentlimits. Verwenden Sie Google Cloud-Messwerte, um Ihre Kontingentnutzung zu überwachen.

  • Bei Verwendung der Storage Read API können in den Logs möglicherweise Fehler beim Ablaufen und bei einer Zeitüberschreitung angezeigt werden. Dazu gehören:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    Diese Fehler können auftreten, wenn ein Vorgang länger als das Zeitlimit dauert, in der Regel in Pipelines, die länger als 6 Stunden ausgeführt werden. Wechseln Sie zu Dateiexporten, um dieses Problem zu beheben.

  • Wenn Sie das Java SDK verwenden, sollten Sie eine Klasse erstellen, die das Schema der BigQuery-Tabelle darstellt. Rufen Sie dann useBeamSchema in der Pipeline auf, um automatisch zwischen Apache Beam-Row- und BigQuery-TableRow-Typen zu konvertieren. Ein Beispiel für eine Schemaklasse finden Sie unter ExampleModel.java.

Beispiele

In den Codebeispielen in diesem Abschnitt werden direkte Tabellenlesevorgänge verwendet.

Wenn Sie stattdessen einen Exportjob verwenden möchten, lassen Sie den Aufruf von withMethod weg oder geben Sie Method.EXPORT an. Legen Sie dann die Pipelineoption --tempLocation fest, um einen Cloud Storage-Bucket für die exportierten Dateien anzugeben.

In diesen Codebeispielen wird davon ausgegangen, dass die Quelltabelle folgende Spalten hat:

  • name (string)
  • age (integer)

Wird als JSON-Schemadatei angegeben:

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

Avro-formatierte Datensätze lesen

Verwenden Sie die Methode read(SerializableFunction), um BigQuery-Daten in Avro-formatierte Datensätze zu lesen. Diese Methode verwendet eine anwendungsdefinierte Funktion, die SchemaAndRecord-Objekte parst und einen benutzerdefinierten Datentyp zurückgibt. Die Ausgabe des Connectors ist ein PCollection Ihres benutzerdefinierten Datentyps.

Der folgende Code liest ein PCollection<MyData> aus einer BigQuery-Tabelle, wobei MyData eine anwendungsdefinierte Klasse ist.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Die Methode read verwendet eine SerializableFunction<SchemaAndRecord, T>-Schnittstelle, die eine Funktion zum Konvertieren von Avro-Datensätzen in eine benutzerdefinierte Datenklasse definiert. Im vorherigen Codebeispiel wird diese Konvertierungsfunktion durch die Methode MyData.apply implementiert. Die Beispielfunktion parst die Felder name und age aus dem Avro-Datensatz und gibt eine MyData-Instanz zurück.

Rufen Sie wie im vorherigen Beispiel die Methode from auf, um anzugeben, welche BigQuery-Tabelle gelesen werden soll. Weitere Informationen finden Sie in der BigQuery-E/A-Connector-Dokumentation unter Tabellennamen.

Lesen von TableRow-Objekten

Die Methode readTableRows liest BigQuery-Daten in ein PCollection von TableRow-Objekten. Jede TableRow ist eine Zuordnung von Schlüssel/Wert-Paaren, die eine einzelne Zeile mit Tabellendaten enthält. Geben Sie die zu lesende BigQuery-Tabelle an. Rufen Sie dazu die Methode from auf.

Mit dem folgenden Code wird PCollection<TableRows> aus einer BigQuery-Tabelle gelesen.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

In diesem Beispiel wird auch gezeigt, wie auf die Werte über das Wörterbuch TableRow zugegriffen wird. Ganzzahlwerte werden als Strings codiert, die dem exportierten BigQuery-JSON-Format entsprechen.

Spaltenprojektion und -filterung

Wenn Sie direkte Lesevorgänge (Method.DIRECT_READ) verwenden, können Sie die Lesevorgänge effizienter gestalten, indem Sie reduzieren, wie viele Daten aus BigQuery gelesen und über das Netzwerk gesendet werden.

  • Spaltenprojektion: Rufen Sie withSelectedFields auf, um eine Teilmenge von Spalten aus der Tabelle zu lesen. Dies ermöglicht effiziente Lesevorgänge, wenn Tabellen viele Spalten enthalten.
  • Zeilenfilterung: Rufen Sie withRowRestriction auf, um ein Prädikat anzugeben, das Daten auf der Serverseite filtert.

Filterprädikate müssen deterministisch sein und die Aggregation wird nicht unterstützt.

Im folgenden Beispiel werden die Spalten "user_name" und "age" projiziert und Zeilen herausgefiltert, die nicht mit dem Prädikat "age > 18" übereinstimmen.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Aus einem Abfrageergebnis lesen

Die vorherigen Beispiele zeigen, wie Zeilen aus einer Tabelle gelesen werden. Sie können auch aus dem Ergebnis einer SQL-Abfrage lesen, indem Sie fromQuery aufrufen. Bei diesem Ansatz wird ein Teil der Rechenarbeit in BigQuery verschoben. Sie können diese Methode auch zum Lesen aus einer BigQuery-Ansicht oder einer materialisierten Ansicht verwenden. Führen Sie dazu eine Abfrage der Ansicht aus.

Im folgenden Beispiel wird eine Abfrage für ein öffentliches BigQuery-Dataset ausgeführt und die Ergebnisse werden gelesen. Nachdem die Pipeline ausgeführt wurde, können Sie den Abfragejob im BigQuery-Jobverlauf sehen.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Nächste Schritte