借助 Cloud Storage 批处理来源插件,您可以从 Cloud Storage 存储分区中读取数据并将其导入 Cloud Data Fusion,以便进一步处理和转换。它可让您从多种文件格式加载数据,包括:
- 结构化:CSV、Avro、Parquet、ORC
- 半结构化:JSON、XML
- 其他:文字、二进制
准备工作
Cloud Data Fusion 通常有两个服务帐号:
- 设计时服务帐号:Cloud Data Fusion API Service Agent
- 执行时服务帐号:Compute Engine 服务帐号
在使用 Cloud Storage 批处理来源插件之前,请向每个服务帐号授予以下角色或权限。
Cloud Data Fusion API Service Agent
此服务帐号已具有所有必需的权限,您无需添加其他权限。
Compute Engine 服务账号
在 Google Cloud 项目中,向 Compute Engine 服务帐号授予以下 IAM 角色或权限:
- Storage Legacy Bucket Reader (
roles/storage.legacyBucketReader
)。此预定义角色包含所需的storage.buckets.get
权限。 Storage Object Viewer (
roles/storage.legacyBucketReader
)。此预定义角色包含以下所需权限:storage.objects.get
storage.objects.list
配置插件
- 转到 Cloud Data Fusion 网页界面,然后点击 Studio。
- 检查是否已选择 Data Pipeline - Batch(而非实时)。
- 在来源菜单中,点击 GCS。Cloud Storage 节点会显示在您的流水线中。
- 如需配置来源,请转到 Cloud Storage 节点,然后点击属性。
输入以下属性。如需查看完整列表,请参阅属性。
- 为 Cloud Storage 节点输入标签,例如
Cloud Storage tables
。 输入连接详情。您可以设置新的一次性连接,也可以设置可重复使用的现有连接。
新增关联项
如需添加与 Cloud Storage 的一次性连接,请按以下步骤操作:
- 使使用连接保持关闭状态。
- 在项目 ID 字段中,将值保留为“自动检测”。
在服务帐号类型字段中,将值保留为文件路径,将服务帐号文件路径保留为自动检测。
可重复使用的连接
要重复使用现有连接,请按以下步骤操作:
- 开启使用网络连接。
- 点击浏览连接。
点击连接名称,例如 Cloud Storage Default。
可选:如果连接不存在,并且您想要创建新的可重复使用的连接,请点击添加连接,然后参考本页面上新建连接标签页中的步骤。
在引用名称字段中,输入要用于沿袭的名称,例如
data-fusion-gcs-campaign
。在路径字段中,输入读取路径,例如
gs://BUCKET_PATH
。在格式字段中,为要读取的数据选择以下文件格式之一:
- avro
- blob(blob 格式需要架构,包含名为字节体类型的字段)
- csv
- 分隔
- json
- parquet
- text(文本格式所需的架构需包含名为“string”类型的“body”字段)
- tsv
- 您在环境中部署的任何格式插件的名称
可选:如需测试连接,请点击获取架构。
可选:在样本大小字段中,输入要检查所选数据类型的最大行数,例如
1000
。可选:在替换字段中,输入要跳过的列名称及其各自的数据类型。
可选:输入高级属性,例如最小拆分大小或正则表达式路径过滤条件(请参阅属性)。
可选:在临时存储桶名称字段中,输入 Cloud Storage 存储桶的名称。
- 为 Cloud Storage 节点输入标签,例如
可选:点击验证并解决发现的所有错误。
点击关闭。属性已保存,您可以继续在 Cloud Data Fusion Studio 中构建数据流水线。
属性
属性 | 已启用宏 | 必需属性 | 说明 |
---|---|---|---|
标签 | 否 | 是 | 数据流水线中节点的名称。 |
使用连接 | 否 | 否 | 浏览以查找可重复使用的来源连接。如需详细了解如何添加、导入和修改连接时显示的连接,请参阅管理连接。 |
连接 | 是 | 是 | 如果 Use connection 处于开启状态,您选择的可重复使用的连接的名称将显示在此字段中。 |
项目 ID | 是 | 否 | 仅在使用连接处于关闭状态时使用。项目的全局唯一标识符。 默认值为 auto-detect 。 |
服务帐号类型 | 是 | 否 | 从下列选项中选择一项:
|
服务账号文件路径 | 是 | 否 | 仅在服务账号类型值为文件路径时使用。用于授权的服务帐号密钥的本地文件系统上的路径。如果作业在 Dataproc 集群上运行,请将该值设置为自动检测。如果作业在其他类型的集群上运行,则该文件必须存在于集群中的每个节点上。 默认值为 auto-detect 。 |
服务帐号 JSON | 是 | 否 | 仅在服务帐号类型值为 JSON 时使用。 服务帐号的 JSON 文件内容。 |
参考编号 | 否 | 是 | 名称,用于为其他服务(例如沿袭和注释元数据)唯一标识此来源。 |
路径 | 是 | 是 | 要读取的文件的路径。如果指定了目录,请使用反斜杠 (/ ) 终止路径,例如 gs://bucket/path/to/directory/ 。如需匹配文件名模式,您可以使用星号 (* ) 作为通配符。如果找不到或找不到文件,流水线将失败。 |
格式 | 否 | 是 | 要读取的数据的格式。格式必须为以下之一:
|
样本大小 | 是 | 否 | 为自动检测数据类型而调查的行数上限。默认值为 1000。 |
替换 | 是 | 否 | 包含相应数据(跳过了自动数据类型检测)的列的列表。 |
分隔符 | 是 | 否 | 采用分隔格式时使用的分隔符。对于其他格式,系统会忽略此属性。 |
启用带引号的值 | 是 | 否 | 是否将引号之间的内容视为值。此属性仅用于 csv、tsv 或分隔格式。例如,如果此属性设置为 true,则以下转换会输出两个字段:1, "a, b, c" 。
第一个字段值为 1 。第二个代码具有 a, b, c 。引号字符会被删除。换行符不能用引号括起来。插件假定引号已正确括起来,例如 "a, b, c" 。不加引号 ("a,b,c, ) 会导致错误。默认值为 False。 |
将第一行用作标题 | 是 | 否 | 是否使用每个文件的第一行作为列标题。支持的格式有文本、csv、tsv 和分隔。 默认值为 False。 |
最小拆分大小 | 是 | 否 | 每个输入分区的最小大小(以字节为单位)。较小的分区可以提高并行级别,但需要更多的资源和开销。 如果格式值为 blob ,则无法拆分数据。 |
最大拆分大小 | 是 | 否 | 每个输入分区的大小上限(以字节为单位)。较小的分区可以提高并行级别,但需要更多的资源和开销。 如果格式值为 blob ,则无法拆分数据。默认值为 128 MB。 |
正则表达式路径过滤条件 | 是 | 否 | 文件路径必须匹配才能包含在输入中的正则表达式。系统会比较完整路径,而不只是文件名。如果未指定任何文件,则不会执行文件过滤。如需详细了解正则表达式语法,请参阅模式。 |
路径字段 | 是 | 否 | 输出字段,用于放置从中读取记录的文件的路径。如果未指定,则输出记录中不包含该路径。如果已指定,则该字段必须以字符串形式存在于输出架构中。 |
仅路径文件名 | 是 | 否 | 如果设置了路径字段属性,请仅使用文件名,不要使用路径的 URI。 默认值为 False。 |
以递归方式读取文件 | 是 | 否 | 是否从路径以递归方式读取文件。 默认值为 False。 |
允许空输入内容 | 是 | 否 | 是否允许输入路径中不含任何数据。如果设置为 False,则插件会在没有要读取的数据时发生错误。如果设置为 True,则不会抛出任何错误,并且不会读取任何记录。 默认值为 False。 |
数据文件已加密 | 是 | 否 | 文件是否经过加密。如需了解详情,请参阅数据文件加密。 默认值为 False。 |
加密元数据文件后缀 | 是 | 否 | 加密元数据文件的文件名后缀。 默认值为 metadata。 |
文件系统属性 | 是 | 否 | 读取数据时要与 InputFormat 一起使用的其他属性。 |
文件编码 | 是 | 否 | 要读取的文件的字符编码。 默认值为 UTF-8。 |
输出架构 | 是 | 否 | 如果设置了路径字段属性,则该属性必须以字符串形式存在于架构中。 |
数据文件加密
本部分介绍了数据文件加密属性。如果将此属性设置为 true,系统会使用 Tink 库提供的流式 AEAD 解密文件。每个数据文件必须附带一个包含加密信息的元数据文件。例如,位于 gs://BUCKET/PATH_TO_DIRECTORY/file1.csv.enc
的加密数据文件必须在位于 gs://BUCKET/
PATH_TO_DIRECTORY/file1.csv.enc.metadata
的位置具有元数据文件。该元数据文件包含一个具有以下属性的 JSON 对象:
属性 | 说明 |
---|---|
kms |
用于加密数据加密密钥的 Cloud Key Management Service URI。 |
aad |
加密中使用的 Base64 编码的额外身份验证数据。 |
key set |
一个 JSON 对象,表示来自 Tink 库的序列化密钥集信息。 |
示例
/* Counting example */ { "kms": "gcp-kms://projects/my-key-project/locations/us-west1/keyRings/my-key-ring/cryptoKeys/mykey", "aad": "73iT4SUJBM24umXecCCf3A==", "keyset": { "keysetInfo": { "primaryKeyId": 602257784, "keyInfo": [{ "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmHkdfStreamingKey", "outputPrefixType": "RAW", "keyId": 602257784, "status": "ENABLED" }] }, "encryptedKeyset": "CiQAz5HH+nUA0Zuqnz4LCnBEVTHS72s/zwjpcnAMIPGpW6kxLggSrAEAcJKHmXeg8kfJ3GD4GuFeWDZzgGn3tfolk6Yf5d7rxKxDEChIMWJWGhWlDHbBW5B9HqWfKx2nQWSC+zjM8FLefVtPYrdJ8n6Eg8ksAnSyXmhN5LoIj6az3XBugtXvCCotQHrBuyoDY+j5ZH9J4tm/bzrLEjCdWAc+oAlhsUAV77jZhowJr6EBiyVuRVfcwLwiscWkQ9J7jjHc7ih9HKfnqAZmQ6iWP36OMrEn" } }
版本说明
后续步骤
- 详细了解 Cloud Data Fusion 中的插件。