Cloud Storage バッチソース プラグインを使用すると、Cloud Storage バケットからデータを読み取り、Cloud Data Fusion に取り込んでさらに処理と変換を行うことができます。これにより、次のような複数のファイル形式からデータを読み込むことが可能です。
- 構造化: CSV、Avro、Parquet、ORC
- 半構造化: JSON、XML
- その他: テキスト、バイナリ
始める前に
Cloud Data Fusion には通常、次の 2 つのサービス アカウントがあります。
- 設計時のサービス アカウント: Cloud Data Fusion API サービス エージェント
- 実行時のサービス アカウント: Compute Engine サービス アカウント
Cloud Storage バッチソース プラグインを使用する前に、各サービス アカウントに次のロールを付与します。
Cloud Data Fusion API サービス エージェント
このサービス アカウントには、すでに必要な権限がすべて付与されているため、権限を追加する必要はありません。
Compute Engine サービス アカウント
Google Cloud プロジェクトで、次の IAM のロールまたは権限を Compute Engine サービス アカウントに付与します。
- Storage レガシー バケット読み取り(
roles/storage.legacyBucketReader
)。この事前定義ロールには、必要なstorage.buckets.get
権限が含まれています。 Storage オブジェクト閲覧者(
roles/storage.legacyBucketReader
)。この事前定義ロールには、次の必要な権限が含まれています。storage.objects.get
storage.objects.list
プラグインを構成する
- Cloud Data Fusion ウェブ インターフェースに移動し、[Studio] をクリックします。
- [Realtime] ではなく、[Data Pipeline - Batch] が選択されていることを確認します。
- [Source] メニューで [GCS] をクリックします。Cloud Storage ノードがパイプラインに表示されます。
- ソースを構成するには、Cloud Storage ノードに移動して、[Properties] をクリックします。
次のプロパティを入力します。完全版リストについては、プロパティをご覧ください。
- Cloud Storage ノードのラベル(例:
Cloud Storage tables
)を入力します。 接続の詳細を入力します。新しい 1 回限りの接続または再利用可能な既存接続を設定できます。
新しい接続
Cloud Storage に 1 回限りの接続を追加する手順は次のとおりです。
- [接続を使用] をオフのままにします。
- [Project ID] フィールドの値は、自動検出された値のままにします。
[Service account type] フィールドで、値を [File path] のままにし、[Service account file path] を自動検出のままにします。
再利用可能な接続
既存の接続を再利用する手順は次のとおりです。
- [接続を使用] をオンにします。
- [接続を参照] をクリックします。
接続名([Cloud Storage Default] など)をクリックします。
省略可: 接続が存在せず、再利用可能な新しい接続を作成する場合は [接続を追加] をクリックし、本ページにある [新しい接続] タブの手順をご参照ください。
[Reference name] フィールドに、リネージに使用する名前を入力します(例:
data-fusion-gcs-campaign
)。[Path] フィールドに、読み取り元のパスを入力します(例:
gs://BUCKET_PATH
)。[Format] フィールドで、読み取るデータに次のいずれかのファイル形式を選択します。
- avro
- blob(blob 形式には、bytes 型の body というフィールドを含むスキーマが必要です)
- csv
- delimited
- json
- parquet
- text(テキスト形式には、string 型の body というフィールドを含むスキーマが必要です)
- tsv
- 環境にデプロイしたフォーマット プラグインの名前
(省略可)接続をテストするには、[Get schema] をクリックします。
省略可: [Sample size] フィールドに、選択したデータ型で確認する最大行数(たとえば、
1000
)を入力します。省略可: [Override] フィールドに、列名とスキップする各データ型を入力します。
省略可: 最小分割サイズや正規表現のパスフィルタなどの [Advanced properties] を入力します(プロパティをご覧ください)。
省略可: [Temporary bucket name] フィールドに、Cloud Storage バケットの名前を入力します。
- Cloud Storage ノードのラベル(例:
省略可: [Validate] をクリックして、見つかったエラーに対処します。
[閉じる] をクリックします。プロパティは保存され、Cloud Data Fusion Studio で引き続きデータ パイプラインを構築できます。
プロパティ
プロパティ | マクロ対応 | 必要なプロパティ | 説明 |
---|---|---|---|
ラベル | × | ○ | データ パイプラインのノードの名前。 |
接続を使用 | × | × | ソースへの再利用可能な接続を参照します。接続を参照するときに表示される接続の追加、インポート、編集の詳細については、接続を管理するをご覧ください。 |
接続 | ○ | ○ | [接続を使用] がオンになっている場合は、選択した再利用可能な接続の名前がこのフィールドに表示されます。 |
プロジェクト ID | ○ | × | [接続を使用] がオフになっている場合にのみ使用されます。プロジェクトのグローバルに一意の識別子。 デフォルトは auto-detect です。 |
サービス アカウントの種類 | ○ | × | 次のいずれかのオプションを選択します。
|
サービス アカウント ファイルのパス | ○ | × | サービス アカウント タイプの値が [ファイルパス] の場合にのみ使用されます。認証に使用するサービス アカウント キーのローカル ファイル システム上のパス。ジョブが Dataproc クラスタで実行される場合は、値を自動検出に設定します。他のタイプのクラスタでジョブが実行される場合、クラスタ内の各ノードにファイルが存在する必要があります。 デフォルトは auto-detect です。 |
サービス アカウント JSON | ○ | × | サービス アカウント タイプの値が JSON の場合にのみ使用されます。サービス アカウントの JSON ファイル コンテンツ。 |
参照名 | × | ○ | リネージやメタデータのアノテーションなど、他のサービスに対してこのソースを一意に識別する名前。 |
[Path] | ○ | ○ | 読み取るファイルへのパス。ディレクトリが指定されている場合、パスはバックスラッシュ(/ )で終了します(例: gs://bucket/path/to/directory/ )。ファイル名パターンと照合するために、アスタリスク(* )をワイルドカードとして使用できます。ファイルが見つからなかった場合や一致しなかった場合は、パイプラインは失敗します。 |
形式 | × | ○ | 読み取るデータの形式。形式は次のいずれかにする必要があります。
|
サンプルサイズ | ○ | × | 自動データ型検出で調査される行の最大数。デフォルトは 1,000 です。 |
オーバーライド | ○ | × | 自動データ型検出がスキップされる、対応するデータを含む列のリスト。 |
区切り文字 | ○ | × | 形式が delimited である場合に使用する区切り文字。このプロパティは、他の形式では無視されます。 |
引用符で囲まれた値を有効にする | ○ | × | 引用符で囲まれたコンテンツを値として扱うかどうか。このプロパティは、csv、tsv、delimited 形式でのみ使用されます。たとえば、このプロパティが true に設定されている場合、次の 2 つのフィールドが 1, "a, b, c" として出力されます。最初のフィールドの値は 1 です。2 つ目は a, b, c です。引用符自体は削除されます。改行区切り文字を引用符で囲むことはできません。このプラグインでは、引用符が正しく囲まれていることを前提とします(例: "a, b, c" )。引用を閉じていない("a,b,c, )とエラーが発生します。デフォルト値は false です。 |
最初の行をヘッダーとして使用する | ○ | × | 各ファイルの最初の行を列ヘッダーとして使用するかどうか。サポートされている形式は、text、csv、tsv、delimited です。 デフォルトは False です。 |
最小スプリット サイズ | ○ | × | 入力パーティションごとの最小サイズ(バイト単位)。パーティションが減少すると並列処理のレベルは高くなりますが、より多くのリソースを必要とし、オーバーヘッドが大きくなります。 Format の値が blob の場合、データを分割することはできません。 |
最大スプリット サイズ | ○ | × | 入力パーティションごとの最大サイズ(バイト単位)。パーティションが減少すると並列処理のレベルは高くなりますが、より多くのリソースを必要とし、オーバーヘッドが大きくなります。 Format の値が blob の場合、データを分割することはできません。デフォルトは 128 MB です。 |
正規表現のパスフィルタ | ○ | × | 入力に含めるためにファイルパスが一致する必要がある正規表現。ファイル名だけでなく、フルパスが比較されます。ファイルを指定しないと、ファイルのフィルタリングは行われません。正規表現の構文の詳細については、パターンをご覧ください。 |
パスフィールド | ○ | × | レコードの読み取り元のファイルのパスを配置する出力フィールド。指定しない場合、パスは出力レコードに含まれません。指定する場合、フィールドは出力スキーマに文字列として存在する必要があります。 |
パスファイル名のみ | ○ | × | [パスフィールド] プロパティが設定されている場合、パスの URI ではなくファイル名のみを使用します。 デフォルトは False です。 |
ファイルを再帰的に読み取る | ○ | × | ファイルをパスから再帰的に読み取るかどうか。 デフォルトは False です。 |
空の入力を許可する | ○ | × | データが含まれていない入力パスを許可するかどうか。False に設定すると、読み取るデータがないときにプラグインでエラーが発生します。True に設定すると、エラーはスローされず、レコードは読み取られません。 デフォルトは False です。 |
データファイルの暗号化 | ○ | × | ファイルが暗号化されているかどうか。詳細については、データファイルの暗号化をご覧ください。 デフォルトは False です。 |
暗号化メタデータ ファイルの接尾辞 | ○ | × | 暗号化メタデータ ファイルのファイル名接尾辞。 デフォルトは metadata です。 |
ファイル システムのプロパティ | ○ | × | データを読み取るときに InputFormat で使用する追加のプロパティ。 |
ファイル エンコード | ○ | × | 読み取るファイルの文字エンコード。 デフォルトは UTF-8 です。 |
出力スキーマ | ○ | × | パスフィールド プロパティが設定されている場合、スキーマに文字列として存在する必要があります。 |
データファイルの暗号化
このセクションでは、データファイルの暗号化プロパティについて説明します。true に設定すると、ファイルは Tink ライブラリによって提供されるストリーミング AEAD を使用して復号されます。各データファイルには、暗号情報を含むメタデータ ファイルが付属している必要があります。たとえば、gs://BUCKET/PATH_TO_DIRECTORY/file1.csv.enc
にある暗号化されたデータファイルには、gs://BUCKET/
PATH_TO_DIRECTORY/file1.csv.enc.metadata
にあるメタデータ ファイルが必要です。メタデータ ファイルには、次のプロパティを持つ JSON オブジェクトが含まれています。
プロパティ | 説明 |
---|---|
kms |
データ暗号鍵の暗号化に使用された Cloud Key Management Service URI。 |
aad |
暗号化で使用される Base64 でエンコードされた追加認証データ。 |
key set |
Tink ライブラリからのシリアル化された鍵セット情報を表す JSON オブジェクト。 |
例
/* Counting example */ { "kms": "gcp-kms://projects/my-key-project/locations/us-west1/keyRings/my-key-ring/cryptoKeys/mykey", "aad": "73iT4SUJBM24umXecCCf3A==", "keyset": { "keysetInfo": { "primaryKeyId": 602257784, "keyInfo": [{ "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmHkdfStreamingKey", "outputPrefixType": "RAW", "keyId": 602257784, "status": "ENABLED" }] }, "encryptedKeyset": "CiQAz5HH+nUA0Zuqnz4LCnBEVTHS72s/zwjpcnAMIPGpW6kxLggSrAEAcJKHmXeg8kfJ3GD4GuFeWDZzgGn3tfolk6Yf5d7rxKxDEChIMWJWGhWlDHbBW5B9HqWfKx2nQWSC+zjM8FLefVtPYrdJ8n6Eg8ksAnSyXmhN5LoIj6az3XBugtXvCCotQHrBuyoDY+j5ZH9J4tm/bzrLEjCdWAc+oAlhsUAV77jZhowJr6EBiyVuRVfcwLwiscWkQ9J7jjHc7ih9HKfnqAZmQ6iWP36OMrEn" } }
リリースノート
次のステップ
- Cloud Data Fusion のプラグインについて学習する。