Pub/Sub ストリーミング ソースから読み取る

Cloud Data Fusion は、ストリーミング データ パイプライン内の Pub/Sub ソースをサポートしています。

準備

ロールと権限

Pub/Sub ストリーミング ソースから読み取るために必要な権限を取得するには、管理者に、Pub/Sub サブスクリプションにアクセスする際に必要なサービス アカウントに対する Pub/Sub 編集者roles/pubsub.editor)IAM ロールを付与するよう依頼してください。ロールの付与の詳細については、アクセスの管理をご覧ください。

この事前定義ロールには、Pub/Sub ストリーミング ソースからの読み取りに必要な権限が含まれています。必要な権限を正確に確認するには、[必要な権限] セクションを開いてください。

必要な権限

Pub/Sub ストリーミング ソースから読み取るには、次の権限が必要です。

  • pubsub.snapshots.create
  • pubsub.snapshots.delete
  • pubsub.snapshots.seek
  • pubsub.subscriptions.consume
  • pubsub.topics.attachSubscription

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Pub/Sub にアクセスするには、プラグイン プロパティで指定したサービス アカウントに対するロールを付与します。何も指定されていない場合は、Dataproc サービス アカウントに対するロールを付与します。

ロールの付与の詳細については、アクセスの管理をご覧ください。

Pub/Sub ソースをストリーミング データ パイプラインに追加する

  1. インスタンスに移動します:

    1. Google Cloud コンソールで、Cloud Data Fusion のページに移動します。

    2. Cloud Data Fusion ウェブ インターフェースでインスタンスを開くには、[Instances] をクリックしてから、[View instance] をクリックします。

      [インスタンス] に移動

  2. Cloud Data Fusion ウェブ インターフェースで、[Studio] をクリックします。

  3. [Data Pipeline - Realtime] を選択します。

  4. [ソース] メニューで、[Pub/Sub] を選択します。Pub/Sub ストリーミング ソースノードがパイプラインに表示されます。

  5. Pub/Sub ノードで [プロパティ] をクリックし、ソースを構成します。詳細については、Pub/Sub ストリーミング ソースをご覧ください。

Windower プラグインのない単一の Pub/Sub ソースのサポート

Cloud Data Fusion バージョン 6.9.1 は、単一の Pub/Sub ストリーミング ソースを持ち、Windower プラグインを持たないリアルタイム パイプラインをサポートしています。

  • Pub/Sub ストリーミング ソースには組み込みのサポートがあり、データは少なくとも 1 回処理されます。Spark チェックポインティングを有効にする必要はありません。
  • Pub/Sub ストリーミング ソースは、各バッチの先頭で Pub/Sub スナップショットを作成し、各バッチの最後で削除します。
  • Pub/Sub スナップショットの作成には費用がかかります。詳細については、Pub/Sub の料金をご覧ください。
  • Cloud Audit Logs でスナップショットの作成をモニタリングできます。

Pub/Sub ストリーミング ソースを使用したパイプラインをアップグレードする

Cloud Data Fusion では、6.9.1 以降で作成された Pub/Sub ストリーミング ソースを使用したストリーミング パイプラインの直接アプリケーション アップグレードをサポートしています。

Cloud Data Fusion は、バージョン 6.9.0 以前の Pub/Sub ストリーミング ソースを使用したデータ パイプラインのアップグレードをサポートしていません。代わりに、これらのパイプラインを 6.9.1 にアップグレードしてください。

  1. インスタンスのアップグレードが予定されている場合、トピックへのデータの公開を停止します。
  2. パイプラインが公開データの処理を終了するまで待ちます。
  3. データが完全に処理されたら、パイプラインを停止します。
  4. インスタンスをアップグレードします。
  5. 既存のパイプラインを複製し、最新のプラグインに更新します。
  6. パイプラインをデプロイします。
  7. 新しいパイプラインを実行してデータを読み込みます。

    新しいバージョンでは、Spark チェックポインティングの代わりに、スナップショットを自動的に使用します。

  8. 古いパイプラインを削除します。

次のステップ