Esegui i passaggi del flusso di lavoro in parallelo

Passi paralleli possono ridurre il tempo totale di esecuzione per un flusso di lavoro l'esecuzione di più chiamate di blocco contemporaneamente.

Bloccare le chiamate, ad esempio sonno, chiamate HTTP e i callback possono richiedere del tempo, da millisecondi a giorni. I passaggi paralleli hanno lo scopo di aiutare nello svolgimento per le operazioni a lunga esecuzione. Se un flusso di lavoro deve eseguire più chiamate di blocco sono indipendenti l'uno dall'altro. L'uso di rami paralleli può ridurre il numero totale di esecuzione avviando contemporaneamente le chiamate e attendendo completarli.

Ad esempio, se il flusso di lavoro deve recuperare i dati dei clienti indipendenti prima di continuare, i rami paralleli consentono richieste API. Se ci sono cinque sistemi e ognuno richiede due secondi per rispondere, l'esecuzione dei passaggi in sequenza in un flusso di lavoro potrebbe richiedere almeno 10 secondi; per eseguirle in parallelo potrebbero richiedere solo due.

Crea un passaggio parallelo

Crea un passaggio parallel per definire una parte del flusso di lavoro in cui due o più passaggi possono essere eseguiti contemporaneamente.

YAML

  - PARALLEL_STEP_NAME:
      parallel:
        exception_policy: POLICY
        shared: [VARIABLE_A, VARIABLE_B, ...]
        concurrency_limit: CONCURRENCY_LIMIT
        BRANCHES_OR_FOR:
          ...

JSON

  [
    {
      "PARALLEL_STEP_NAME": {
        "parallel": {
          "exception_policy": "POLICY",
          "shared": [
            "VARIABLE_A",
            "VARIABLE_B",
            ...
          ],
          "concurrency_limit": "CONCURRENCY_LIMIT",
          "BRANCHES_OR_FOR":
          ...
        }
      }
    }
  ]

Sostituisci quanto segue:

  • PARALLEL_STEP_NAME: il nome del passaggio parallelo.
  • POLICY (facoltativo): determina l'azione in modo diverso i rami assumono quando si verifica un'eccezione non gestita. Il criterio predefinito, continueAll, non comporta ulteriori azioni e tutti gli altri rami di eseguire un tentativo di esecuzione. Tieni presente che continueAll è l'unico criterio attualmente supportato.
  • VARIABLE_A, VARIABLE_B e così via on: un elenco di variabili scrivibili con ambito padre che consentono le assegnazioni nel passaggio parallelo. Per ulteriori informazioni, vedi Variabili condivise.
  • CONCURRENCY_LIMIT (facoltativo): il numero massimo di rami e iterazioni che possono essere eseguiti contemporaneamente all'interno di un singolo flusso di lavoro prima che ulteriori rami e iterazioni vengano messi in coda in attesa. Questo si applica solo a un singolo passaggio parallel e non è a cascata. Deve essere un numero intero positivo e può essere un valore letterale o un'espressione. Per i dettagli, vedi Limiti di contemporaneità.
  • BRANCHES_OR_FOR: usa branches o for per indicare una delle seguenti opzioni:
    • Rami che possono essere eseguiti contemporaneamente.
    • Un loop in cui possono essere eseguite contemporaneamente.

Tieni presente quanto segue:

  • I rami e le iterazioni paralleli possono essere eseguiti in qualsiasi ordine e possono essere in un ordine diverso a ogni esecuzione.
  • I passaggi paralleli possono includere altri passaggi paralleli nidificati fino al limite di profondità. Vedi Quote e limiti.
  • Per ulteriori dettagli, consulta la pagina di riferimento sulla sintassi per passaggi paralleli.
di Gemini Advanced.

Sostituisci la funzione sperimentale con un passaggio parallelo

Se utilizzi experimental.executions.map per supportare il lavoro parallelo, puoi: eseguire la migrazione del flusso di lavoro per usare passaggi paralleli, eseguendo for loop in parallelo. Per alcuni esempi, vedi Sostituisci la funzione sperimentale con un passaggio parallelo.

Esempi

Questi esempi dimostrano la sintassi.

Eseguire operazioni in parallelo (utilizzando i rami)

Se il flusso di lavoro prevede più insiemi di passaggi diversi che possono essere eseguiti contemporaneamente, posizionarli in rami paralleli può ridurre il tempo totale necessari per completare questi passaggi.

Nell'esempio seguente, un ID utente viene passato come argomento al flusso di lavoro e i dati vengono recuperati in parallelo da due servizi diversi. Variabili condivise consentire la scrittura dei valori nei rami e leggere dopo i rami completato:

YAML

main:
  params: [input]
  steps:
    - init:
        assign:
          - userProfile: {}
          - recentItems: []
    - enrichUserData:
        parallel:
          shared: [userProfile, recentItems]  # userProfile and recentItems are shared to make them writable in the branches
          branches:
            - getUserProfileBranch:
                steps:
                  - getUserProfile:
                      call: http.get
                      args:
                        url: '${"https://example.com/users/" + input.userId}'
                      result: userProfile
            - getRecentItemsBranch:
                steps:
                  - getRecentItems:
                      try:
                        call: http.get
                        args:
                          url: '${"https://example.com/items?userId=" + input.userId}'
                        result: recentItems
                      except:
                        as: e
                        steps:
                          - ignoreError:
                              assign:  # continue with an empty list if this call fails
                                - recentItems: []

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "userProfile": {}
            },
            {
              "recentItems": []
            }
          ]
        }
      },
      {
        "enrichUserData": {
          "parallel": {
            "shared": [
              "userProfile",
              "recentItems"
            ],
            "branches": [
              {
                "getUserProfileBranch": {
                  "steps": [
                    {
                      "getUserProfile": {
                        "call": "http.get",
                        "args": {
                          "url": "${\"https://example.com/users/\" + input.userId}"
                        },
                        "result": "userProfile"
                      }
                    }
                  ]
                }
              },
              {
                "getRecentItemsBranch": {
                  "steps": [
                    {
                      "getRecentItems": {
                        "try": {
                          "call": "http.get",
                          "args": {
                            "url": "${\"https://example.com/items?userId=\" + input.userId}"
                          },
                          "result": "recentItems"
                        },
                        "except": {
                          "as": "e",
                          "steps": [
                            {
                              "ignoreError": {
                                "assign": [
                                  {
                                    "recentItems": []
                                  }
                                ]
                              }
                            }
                          ]
                        }
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    ]
  }
}

Elaborare gli elementi in parallelo (utilizzando un ciclo parallelo)

Se devi eseguire la stessa azione per ogni elemento di un elenco, puoi completare l'esecuzione più rapidamente utilizzando un loop parallelo. Un loop parallelo consente eseguire in parallelo più iterazioni di loop. Tieni presente che, a differenza di i cicli for regolari, le iterazioni possono possono essere eseguite in qualsiasi ordine.

Nell'esempio seguente, un insieme di notifiche utente viene elaborato in un ciclo for parallelo:

YAML

main:
  params: [input]
  steps:
    - sendNotifications:
        parallel:
          for:
            value: notification
            in: ${input.notifications}
            steps:
              - notify:
                  call: http.post
                  args:
                    url: https://example.com/sendNotification
                    body:
                      notification: ${notification}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "sendNotifications": {
          "parallel": {
            "for": {
              "value": "notification",
              "in": "${input.notifications}",
              "steps": [
                {
                  "notify": {
                    "call": "http.post",
                    "args": {
                      "url": "https://example.com/sendNotification",
                      "body": {
                        "notification": "${notification}"
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
    ]
  }
}

Aggregare dati (utilizzando un ciclo parallelo)

Puoi elaborare un insieme di elementi mentre raccogli i dati dalle operazioni eseguita su ciascun elemento. Ad esempio, potresti voler monitorare gli ID dei articoli o gestisci un elenco di elementi con errori.

Nell'esempio seguente, 10 query separate in una finestra BigQuery pubblica Ciascun set di dati restituisce il numero di parole di un documento o di un insieme di documenti. R variabile condivisa permette che il conteggio delle parole si accumuli e venga letto dopo tutte le iterazioni completato. Dopo aver calcolato il numero di parole in tutti i documenti, che restituisce il totale.

YAML

# Use a parallel loop to make ten queries to a public BigQuery dataset and
# use a shared variable to accumulate a count of words; after all iterations
# complete, return the total number of words across all documents
main:
  params: [input]
  steps:
    - init:
        assign:
          - numWords: 0
          - corpuses:
              - sonnets
              - various
              - 1kinghenryvi
              - 2kinghenryvi
              - 3kinghenryvi
              - comedyoferrors
              - kingrichardiii
              - titusandronicus
              - tamingoftheshrew
              - loveslabourslost
    - runQueries:
        parallel:  # 'numWords' is shared so it can be written within the parallel loop
          shared: [numWords]
          for:
            value: corpus
            in: ${corpuses}
            steps:
              - runQuery:
                  call: googleapis.bigquery.v2.jobs.query
                  args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                      useLegacySql: false
                      query: ${"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` " + " WHERE corpus='" + corpus + "' "}
                  result: query
              - add:
                  assign:
                    - numWords: ${numWords + int(query.rows[0].f[0].v)}  # first result is the count
    - done:
        return: ${numWords}

JSON

{
  "main": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "init": {
          "assign": [
            {
              "numWords": 0
            },
            {
              "corpuses": [
                "sonnets",
                "various",
                "1kinghenryvi",
                "2kinghenryvi",
                "3kinghenryvi",
                "comedyoferrors",
                "kingrichardiii",
                "titusandronicus",
                "tamingoftheshrew",
                "loveslabourslost"
              ]
            }
          ]
        }
      },
      {
        "runQueries": {
          "parallel": {
            "shared": [
              "numWords"
            ],
            "for": {
              "value": "corpus",
              "in": "${corpuses}",
              "steps": [
                {
                  "runQuery": {
                    "call": "googleapis.bigquery.v2.jobs.query",
                    "args": {
                      "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}",
                      "body": {
                        "useLegacySql": false,
                        "query": "${\"SELECT COUNT(DISTINCT word) FROM `bigquery-public-data.samples.shakespeare` \" + \" WHERE corpus='\" + corpus + \"' \"}"
                      }
                    },
                    "result": "query"
                  }
                },
                {
                  "add": {
                    "assign": [
                      {
                        "numWords": "${numWords + int(query.rows[0].f[0].v)}"
                      }
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      {
        "done": {
          "return": "${numWords}"
        }
      }
    ]
  }
}

Passaggi successivi