Usa l'API Streaming legacy

Questo documento descrive come trasferire flussi di dati in BigQuery utilizzando versione precedente di tabledata.insertAll .

Per i nuovi progetti, consigliamo di utilizzare API BigQuery StorageWrite anziché tabledata.insertAll. L'API StorageWrite ha prezzi e funzionalità più solide, inclusa la semantica della consegna "exactly-once". Se stai eseguendo la migrazione di un progetto esistente dal metodo tabledata.insertAll all'API StorageWrite, ti consigliamo di selezionare il stream predefinito. La Il metodo tabledata.insertAll è ancora completamente supportato.

Prima di iniziare

  1. Assicurati di disporre dell'accesso in scrittura al set di dati che contiene tabella di destinazione. La tabella deve esistere prima di iniziare a scrivere dati a meno che non utilizzi tabelle modello. Per saperne di più sulle tabelle dei modelli, consulta la sezione Creare tabelle automaticamente utilizzando le tabelle modello.

  2. Controlla il criterio delle quote per i flussi di dati.

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Lo streaming non è disponibile per il livello gratuito. Se tenti di utilizzare lo streaming senza attivare la fatturazione, viene visualizzato il seguente errore: BigQuery: Streaming insert is not allowed in the free tier.

  5. Concedi ruoli IAM (Identity and Access Management) che concedono agli utenti le autorizzazioni necessarie per eseguire ogni attività in questo documento.

Autorizzazioni obbligatorie

Per inserire un flusso di dati in BigQuery, devi disporre delle seguenti autorizzazioni IAM:

  • bigquery.tables.updateData (consente di inserire dati nella tabella)
  • bigquery.tables.get (consente di ottenere i metadati della tabella)
  • bigquery.datasets.get (consente di ottenere i metadati del set di dati)
  • bigquery.tables.create (obbligatorio se utilizzi un tabella modello per creare la tabella automaticamente)

Ciascuno dei seguenti ruoli IAM predefiniti include le autorizzazioni necessarie per trasmettere i dati in modalità flusso a BigQuery:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

Per ulteriori informazioni su ruoli e autorizzazioni IAM in per BigQuery, consulta Ruoli e autorizzazioni predefiniti.

Trasmetti flussi di dati in BigQuery

C#

Prima di provare questo esempio, segui le istruzioni per la configurazione di C# nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API C# BigQuery documentazione di riferimento.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Prima di provare questo esempio, segui le istruzioni per la configurazione di Go nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API Go BigQuery documentazione di riferimento.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni per la configurazione di Java nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API Java BigQuery documentazione di riferimento.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni per la configurazione di Node.js nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API Node.js BigQuery documentazione di riferimento.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

Prima di provare questo esempio, segui le istruzioni per la configurazione di PHP nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API PHP BigQuery documentazione di riferimento.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

Prima di provare questo esempio, segui le istruzioni per la configurazione di Python nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API Python BigQuery documentazione di riferimento.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

Prima di provare questo esempio, segui le istruzioni per la configurazione di Ruby nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API Ruby BigQuery documentazione di riferimento.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Non è necessario compilare il campo insertID quando inserisci le righe. L'esempio seguente mostra come evitare di inviare un insertID per ogni riga durante lo streaming.

Java

Prima di provare questo esempio, segui le istruzioni per la configurazione di Java nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API Java BigQuery documentazione di riferimento.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni per la configurazione di Python nel Guida rapida di BigQuery con librerie client. Per ulteriori informazioni, consulta API Python BigQuery documentazione di riferimento.

Per eseguire l'autenticazione su BigQuery, configura Credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per le librerie client.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Invia dati relativi a data e ora

Per i campi relativi a data e ora, formatta i dati nel metodo tabledata.insertAll come che segue:

Tipo Formato
DATE Una stringa nel formato "YYYY-MM-DD"
DATETIME Una stringa nel formato "YYYY-MM-DD [HH:MM:SS]"
TIME Una stringa nel formato "HH:MM:SS"
TIMESTAMP Il numero di secondi dal 1970-01-01 (epoca Unix) o una stringa nel formato "YYYY-MM-DD HH:MM[:SS]"

Invia dati intervallo

Per i campi di tipo RANGE<T>, formatta i dati in tabledata.insertAll come oggetto JSON con due campi, start e end. I valori mancanti o NULL per i campi start e end rappresentano confini illimitati. Questi campi devono avere lo stesso formato JSON supportato del tipo T, dove T può essere DATE, DATETIME e TIMESTAMP.

Nell'esempio seguente, il campo f_range_date rappresenta un RANGE<DATE> colonna in una tabella. In questa colonna viene inserita una riga utilizzando API tabledata.insertAll.

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

Disponibilità dei dati in modalità flusso

I dati sono disponibili per l'analisi in tempo reale utilizzando GoogleSQL query immediatamente dopo che BigQuery ha confermato Richiesta tabledata.insertAll.

Le righe trasmesse di recente in streaming in una tabella partizionata per data di importazione hanno temporaneamente un Valore NULL per _PARTITIONTIME pseudocolonna. Per queste righe, BigQuery assegna lo stato finale non NULL valore della colonna PARTITIONTIME in background, in genere entro pochi minuti. In rari casi, questa operazione può richiedere fino a 90 minuti.

Alcune righe trasmesse di recente potrebbero non essere disponibili per la copia della tabella in genere per un per alcuni minuti. In rari casi, questa operazione può richiedere fino a 90 minuti. Per vedere se i dati è disponibile per la copia della tabella, controlla la risposta tables.get per una sezione denominata streamingBuffer Se la sezione streamingBuffer non è presente, i tuoi dati possono essere copiati. Puoi anche utilizzare il campo streamingBuffer.oldestEntryTime per identificare il dei record nel buffer di flusso.

Deduplicazione Best effort

Quando fornisci insertId per una riga inserita, BigQuery utilizza questo valore ID per consentire la deduplicazione ottimale dei dati fino a un minuto. Ossia, se trasmetti in streaming la stessa riga con lo stesso insertId più di una volta all'interno quel periodo di tempo nella stessa tabella, BigQuery potrebbe deduplicare le occorrenze multiple della riga, conservando solo una di queste occorrenze.

Il sistema prevede che anche le righe fornite con insertId identici vengano identici. Se due righe hanno insertId identici, non è deterministico quali righe vengono conservate da BigQuery.

La deduplicazione è generalmente pensata per scenari di nuovo tentativo in un sistema distribuito in cui non è possibile determinare lo stato di un inserimento di flussi di dati in caso di determinati errori di Compute Engine, ad esempio errori di rete tra il sistema e BigQuery o errori interni di BigQuery. Se riprovi a eseguire l'inserimento, utilizza lo stesso insertId per lo stesso insieme di righe, in modo che BigQuery può tentare di deduplicare i dati. Per ulteriori informazioni per ulteriori informazioni, consulta la risoluzione dei problemi di inserimento di flussi di dati.

La deduplicazione offerta da BigQuery è il miglior tentativo e dovrebbe come meccanismo per garantire l'assenza di duplicati nei e i dati di Google Cloud. Inoltre, BigQuery potrebbe ridurre la qualità dei di eliminare i duplicati in qualsiasi momento per garantire l'affidabilità e la disponibilità dei dati.

Se hai requisiti di deduplicazione rigorosi per i tuoi dati, Google Cloud Datastore è un servizio alternativo che supporta transazioni.

Disattivazione della deduplicazione best effort

Puoi disattivare la deduplicazione best effort non compilando il insertId per ogni riga inserita. Questo è il metodo consigliato per inserire i dati.

Apache Beam e Dataflow

Per disattivare la deduplicazione ottimale quando utilizzi Apache Beam Connettore BigQuery I/O per Java, utilizza Metodo ignoreInsertIds().

Rimozione manuale dei duplicati

Per assicurarti che non esistano righe duplicate al termine della trasmissione del flusso, utilizza la seguente procedura manuale:

  1. Aggiungi insertId come colonna nello schema della tabella e includi il parametro insertId nei dati per ogni riga.
  2. Una volta interrotto il flusso di dati, esegui questa query per verificare duplicati:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Se il risultato è maggiore di 1, esistono duplicati.
  3. Per rimuovere i duplicati, esegui questa query: Specifica una tabella di destinazione, consentire risultati di grandi dimensioni e disabilitare la suddivisione dei risultati.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Note sulla query di rimozione duplicata:

  • La strategia più sicura per la query di rimozione duplicata è quella di scegliere come target una nuova tabella. In alternativa, puoi scegliere come target la tabella di origine con disposizione di scrittura WRITE_TRUNCATE.
  • La query di rimozione duplicata aggiunge una colonna row_number con il valore 1 a alla fine dello schema della tabella. La query utilizza un parametro SELECT * EXCEPT di GoogleSQL a escludi la colonna row_number dalla tabella di destinazione. La Prefisso #standardSQL attiva GoogleSQL per questa query. In alternativa, puoi selezionare in base a i nomi delle colonne per omettere questa colonna.
  • Per eseguire query sui dati in tempo reale rimuovendo i duplicati, puoi anche creare una vista della tabella utilizzando la query di rimozione duplicata. Tieni presente che i costi delle query rispetto alla vista vengono calcolate in base alle colonne selezionate di grandi dimensioni, il che può comportare grandi dimensioni di scansione dei byte.

Inserimento di flussi in tabelle partizionate nel tempo

Quando trasmetti i flussi di dati in una tabella partizionata nel tempo, ogni partizione ha un flusso di dati buffer. Il buffer di flussi di dati viene conservato quando esegui un caricamento, una query o una copia un job che sovrascrive una partizione impostando la proprietà writeDisposition su WRITE_TRUNCATE. Se vuoi rimuovere il buffer di streaming, verifica che il buffer di streaming è vuoto richiamando tables.get sulla partizione.

Partizionamento in fase di importazione

Quando esegui il flusso di dati in una tabella partizionata per data di importazione, BigQuery deduce la partizione di destinazione dall'ora UTC attuale.

I nuovi dati in arrivo vengono temporaneamente inseriti nella partizione __UNPARTITIONED__ mentre sono nel buffer di flusso. Quando ci sono abbastanza dati non partizionati, BigQuery esegue il partizionamento dei dati nella partizione corretta. Tuttavia, non esiste uno SLA (accordo sul livello del servizio) per quanto tempo impiegano i dati a spostarsi __UNPARTITIONED__ partizione. Una query può escludere i dati presenti nel buffer di flusso da una query filtrando NULL valori dalla partizione __UNPARTITIONED__ utilizzando uno dei pseudocolonne (_PARTITIONTIME oppure _PARTITIONDATE a seconda del tipo di dati preferito).

Se disponi di flussi di dati in una tabella partizionata giornalmente, puoi eseguire l'override l'inferenza delle date fornendo un decoratore della partizione come parte dell'insertAll richiesta. Includi il decorator nel parametro tableId. Ad esempio, puoi il flusso di dati alla partizione corrispondente al 01/03/2021 per la tabella table1 utilizzando decoratore della partizione:

table1$20210301

Quando esegui lo streaming con un decorator per partizioni, puoi trasmettere in streaming alle partizioni all'interno ultimi 31 giorni nel passato e 16 giorni nel futuro rispetto alla data corrente in base all'ora UTC attuale. Per scrivere nelle partizioni per date esterne a queste limiti consentiti, utilizza un job di caricamento o di query, come descritto in Aggiunta e sovrascrittura di tabella partizionata partizionate.

Lo streaming con un decorator partizione è supportato solo per il partizionamento giornaliero tabelle. Non è supportato per le tabelle partizionate orarie, mensili o annuali.

Per il test, puoi utilizzare lo strumento a riga di comando bq Comando dell'interfaccia a riga di comando bq insert. Ad esempio, il comando seguente invia una singola riga in modalità flusso a una partizione 1° gennaio 2017 ($20170101) in una tabella partizionata denominata mydataset.mytable:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

Partizionamento delle colonne in base all'unità di tempo

Puoi inserire flussi di dati in una tabella partizionata in base a DATE, DATETIME o TIMESTAMP con un periodo compreso tra 5 anni nel passato e 1 anno nel futuro. I dati che non rientrano in questo intervallo sono stati rifiutati.

Quando i dati vengono trasmessi in flusso, vengono inizialmente posizionati all'interno di __UNPARTITIONED__ della partizione di testo. Quando ci sono abbastanza dati non partizionati, BigQuery ripartiziona automaticamente i dati, inserendoli nella partizione appropriata. Tuttavia, non esiste uno SLA (accordo sul livello del servizio) sul tempo necessario affinché i dati vengano spostati al di fuori __UNPARTITIONED__ partizione.

  • Nota: le partizioni giornaliere vengono elaborate in modo diverso rispetto a quelle orarie, mensili e partizioni annuali. Solo i dati esterni dell'intervallo di date (dai 7 ai 3 giorni futuri) viene estratto nella partizione UNPARTITIONED, in attesa di ripartizionamento. D'altra parte, per una tabella partizionata oraria, i dati vengono sempre estratti la partizione UNPARTITIONED e successivamente ripartizionata.

Crea tabelle automaticamente utilizzando le tabelle modello

Le tabelle modello forniscono un meccanismo per suddividere una tabella logica in molte tabelle più piccole per creare set di dati più piccoli (ad esempio per ID utente). Tabelle modello presentino alcune limitazioni descritte di seguito. Invece, tabelle partizionate e le tabelle in cluster sono consigliate per raggiungere questo comportamento.

Per utilizzare una tabella modello tramite l'API BigQuery, aggiungi un parametro templateSuffix alla tua richiesta di insertAll. Per lo strumento a riga di comando bq, aggiungi il flag template_suffix al comando insert. Se BigQuery rileva un templateSuffix o il flag template_suffix, tratta la tabella target come base modello. Crea una nuova tabella che condivide lo stesso schema della tabella target e ha un nome che include il suffisso specificato:

<targeted_table_name> + <templateSuffix>

L'utilizzo di una tabella modello evita il sovraccarico della creazione di ogni tabella singolarmente e specificando lo schema per ogni tabella. Devi solo creare un singolo modello e fornire diversi suffissi in modo che BigQuery possa creare le nuove tabelle. BigQuery posiziona le tabelle nello stesso progetto e set di dati.

Le tabelle create utilizzando le tabelle modello sono in genere disponibili entro pochi secondi. In rare occasioni, potrebbe essere necessario più tempo prima che diventino disponibili.

Modifica lo schema della tabella del modello

Se modifichi lo schema di una tabella modello, tutte le tabelle generate successivamente utilizza lo schema aggiornato. Le tabelle generate in precedenza non sono interessate, a meno che la tabella esistente non abbia ancora un buffer di flusso.

Per le tabelle esistenti che hanno ancora un buffer di flusso, se modifichi modello di tabella in modo compatibile con le versioni precedenti, lo schema di questi vengono aggiornate anche le tabelle generate in streaming attivamente. Tuttavia, se modifichi lo schema della tabella del modello in modo non compatibile con le versioni precedenti, verranno persi tutti i dati presenti nel buffer che utilizzano il vecchio schema. Inoltre, non è in grado di trasmettere nuovi dati in modalità flusso a tabelle generate esistenti che utilizzano il vecchio schema, ora incompatibile.

Dopo aver modificato lo schema di una tabella modello, attendi che le modifiche vengano propagate prima di provare a inserire nuovi dati o a eseguire query sulle tabelle generate. Richieste di inserimento i nuovi campi dovrebbero riuscire entro pochi minuti. Tenta di eseguire una query sul nuovo potrebbero richiedere un'attesa più lunga fino a 90 minuti.

Se vuoi modificare lo schema di una tabella generata, non modificare il valore dello schema finché non viene interrotto il flusso di dati tramite la tabella del modello e la sezione delle statistiche sui flussi di dati della tabella non è presente nella risposta tables.get(), che indica che nessun dato è memorizzato nel buffer nella tabella.

Tabelle partizionate e tabelle in cluster non soffrono di limitazioni precedenti e rappresentano il meccanismo consigliato.

Dettagli tabella modello

Valore suffisso del modello
Il valore templateSuffix (o --template_suffix) deve contenere solo lettere (a-z, A-Z), numeri (0-9) o trattini bassi (_). La lunghezza combinata massima del nome della tabella e del suffisso della tabella è di 1024 caratteri.
Quota

Le tabelle dei modelli sono soggette alla quota di flussi di dati limitazioni. Il progetto può creare fino a 10 tabelle al secondo con le tabelle modello, simili all'API tables.insert. Questa quota si applica solo alle tabelle in fase di creazione, non alle tabelle in fase di modifica.

Se la tua applicazione deve creare più di 10 tabelle al secondo, ti consigliamo di usare tabelle in cluster. Ad esempio, puoi inserire l'ID tabella ad alta cardinalità nella colonna chiave di una singola tabella di clustering.

Durata

La tabella generata eredita la data e l'ora di scadenza dal set di dati. Come con i normali flussi di dati, le tabelle generate non possono essere copiate immediatamente.

Deduplicazione

La deduplicazione avviene solo tra riferimenti uniformi a una tabella di destinazione. Ad esempio, se trasmetti contemporaneamente il flusso a una tabella generata utilizzando sia tabelle modello e un normale comando insertAll, senza deduplicazione tra le righe inserite da tabelle modello e un normale comando insertAll.

Visualizzazioni

La tabella modello e le tabelle generate non devono essere viste.

Risoluzione dei problemi di inserimento di flussi di dati

Le sezioni seguenti illustrano come risolvere gli errori che si verificano. quando trasferisci flussi di dati in BigQuery utilizzando l'API di streaming legacy. Per ulteriori informazioni per informazioni su come risolvere gli errori di quota per gli inserimenti di flussi di dati, consulta Quota di inserimento di flussi di dati errori.

Codici di risposta HTTP di errore

Se ricevi un codice di risposta HTTP di errore, ad esempio un errore di rete, non è possibile capire se l'inserimento di flussi di dati è riuscito. Se provi a inviare di nuovo la richiesta, potresti ritrovarti con righe duplicate nella tua tabella. Per aiutare proteggi la tua tabella dalla duplicazione, imposta la proprietà insertId quando inviando la tua richiesta. BigQuery utilizza la proprietà insertId per la deduplicazione.

Se ricevi un errore di autorizzazione, un errore di nome tabella non valido o un di quota, nessuna riga viene inserita e l'intera richiesta non riesce.

Codici di risposta HTTP di operazione riuscita

Anche se ricevi un codice di risposta HTTP di operazione riuscita, dovrai controllare Proprietà insertErrors della risposta per determinare se gli inserimenti di righe hanno avuto esito positivo perché è possibile che BigQuery sia stato solo parzialmente l'inserimento delle righe è riuscito. Potresti riscontrare uno dei seguenti scenari:

  • Tutte le righe sono state inserite correttamente. Se la proprietà insertErrors è un elenco vuoto, tutte le righe sono state inserite correttamente.
  • Alcune righe sono state inserite correttamente. Tranne nei casi in cui è presente schema non corrispondente in nessuna delle righe, le righe indicate nella proprietà insertErrors sono non inserito e tutte le altre righe sono state inserite correttamente. errors contiene informazioni dettagliate sul motivo per cui ogni riga con esito negativo. La proprietà index indica l'indice di riga in base 0 della richiesta a cui si applica l'errore.
  • Nessuna delle righe inserite correttamente. Se BigQuery rileva mancata corrispondenza dello schema nelle singole righe della richiesta, nessuna riga viene inserita e Viene restituita una voce insertErrors per ogni riga, anche per le righe che non avevano uno schema non corrispondente. Le righe che non presentavano una mancata corrispondenza dello schema hanno un errore con reason impostata su stopped e può essere inviata nuovamente così com'è. Righe non andate a buon fine includi informazioni dettagliate sulla mancata corrispondenza dello schema. Scopri di più sul buffer di protocollo supportato tipi di dati per ogni tipo di dati BigQuery, consulta Conversioni dei tipi di dati.

Errori dei metadati per l'inserimento di flussi di dati

Poiché l'API per i flussi di dati di BigQuery è progettata per tassi di inserzione elevati, le modifiche all'esposizione dei metadati della tabella sottostante sono coerenti quando si interagisce con il sistema di streaming. Nella maggior parte dei casi le modifiche ai metadati vengono propagate entro pochi minuti, durante questo periodo le risposte dell'API potrebbero riflettere lo stato incoerente della tabella.

Ecco alcuni scenari:

  • Modifiche allo schema. Modifica dello schema di una tabella che di recente ha ricevuto un flusso di dati gli inserimenti possono causare risposte con errori di mancata corrispondenza dello schema perché il sistema di flussi di dati potrebbe non immediatamente la modifica dello schema.
  • Creazione/eliminazione di una tabella. Il flusso di dati a una tabella inesistente restituisce una variante di un Risposta notFound. Una tabella creata in risposta potrebbe non essere riconosciuta immediatamente per inserimento di flussi di dati successivi. Analogamente, se elimini o ricrei una tabella, puoi creare un periodo. di tempo in cui gli inserimenti di flussi di dati vengono effettivamente inviati alla tabella precedente. Il flusso di dati inserisce potrebbero non essere presenti nella nuova tabella.
  • Troncamento della tabella. Troncamento dei dati di una tabella (utilizzando un job di query che utilizza writeDisposition di WRITE_TRUNCATE) può causare allo stesso modo inserimenti successivi durante la coerenza in un periodo di tempo limitato.

Dati mancanti/non disponibili

Gli inserimenti di flussi di dati risiedono temporaneamente nello spazio di archiviazione ottimizzato per la scrittura, che ha disponibilità diverse caratteristiche rispetto all'archiviazione gestita. Alcune operazioni in BigQuery non interagiscono con lo spazio di archiviazione ottimizzato per la scrittura, come i job di copia delle tabelle e i metodi API come tabledata.list. I flussi di dati recenti non saranno presenti nella tabella o nell'output di destinazione.