Skip to content

Commit

Permalink
fix: retry is_job_done on ConnectionError (#1930)
Browse files Browse the repository at this point in the history
  • Loading branch information
tswast committed May 23, 2024
1 parent c3f7b23 commit 4f72723
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
8 changes: 8 additions & 0 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ def _job_should_retry(exc):
if isinstance(exc, exceptions.RetryError):
exc = exc.cause

# Per https://github.com/googleapis/python-bigquery/issues/1929, sometimes
# retriable errors make their way here. Because of the separate
# `restart_query_job` logic to make sure we aren't restarting non-failed
# jobs, it should be safe to continue and not totally fail our attempt at
# waiting for the query to complete.
if _should_retry(exc):
return True

if not hasattr(exc, "errors") or len(exc.errors) == 0:
return False

Expand Down
117 changes: 117 additions & 0 deletions tests/unit/test_job_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import google.api_core.exceptions
import google.api_core.retry
import freezegun
import requests.exceptions

from google.cloud.bigquery.client import Client
from google.cloud.bigquery import _job_helpers
Expand Down Expand Up @@ -126,6 +127,122 @@ def api_request(method, path, query_params=None, data=None, **kw):
assert job.job_id == orig_job_id


def test_retry_connection_error_with_default_retries_and_successful_first_job(
monkeypatch, client
):
"""
Make sure ConnectionError can be retried at `is_job_done` level, even if
retries are exhaused by API-level retry.
Note: Because restart_query_job is set to True only in the case of a
confirmed job failure, this should be safe to do even when a job is not
idempotent.
Regression test for issue
https://github.com/googleapis/python-bigquery/issues/1929
"""
job_counter = 0

def make_job_id(*args, **kwargs):
nonlocal job_counter
job_counter += 1
return f"{job_counter}"

monkeypatch.setattr(_job_helpers, "make_job_id", make_job_id)
conn = client._connection = make_connection()
project = client.project
job_reference_1 = {"projectId": project, "jobId": "1", "location": "test-loc"}
NUM_API_RETRIES = 2

with freezegun.freeze_time(
"2024-01-01 00:00:00",
# Note: because of exponential backoff and a bit of jitter,
# NUM_API_RETRIES will get less accurate the greater the value.
# We add 1 because we know there will be at least some additional
# calls to fetch the time / sleep before the retry deadline is hit.
auto_tick_seconds=(
google.cloud.bigquery.retry._DEFAULT_RETRY_DEADLINE / NUM_API_RETRIES
)
+ 1,
):
conn.api_request.side_effect = [
# jobs.insert
{"jobReference": job_reference_1, "status": {"state": "PENDING"}},
# jobs.get
{"jobReference": job_reference_1, "status": {"state": "RUNNING"}},
# jobs.getQueryResults x2
requests.exceptions.ConnectionError(),
requests.exceptions.ConnectionError(),
# jobs.get
# Job actually succeeeded, so we shouldn't be restarting the job,
# even though we are retrying at the `is_job_done` level.
{"jobReference": job_reference_1, "status": {"state": "DONE"}},
# jobs.getQueryResults
{"jobReference": job_reference_1, "jobComplete": True},
]

job = client.query("select 1")
rows_iter = job.result()

assert job.done() # Shouldn't make any additional API calls.
assert rows_iter is not None

# Should only have created one job, even though we did call job_retry.
assert job_counter == 1

# Double-check that we made the API calls we expected to make.
conn.api_request.assert_has_calls(
[
# jobs.insert
mock.call(
method="POST",
path="/projects/PROJECT/jobs",
data={
"jobReference": {"jobId": "1", "projectId": "PROJECT"},
"configuration": {
"query": {"useLegacySql": False, "query": "select 1"}
},
},
timeout=None,
),
# jobs.get
mock.call(
method="GET",
path="/projects/PROJECT/jobs/1",
query_params={"location": "test-loc"},
timeout=None,
),
# jobs.getQueryResults x2
mock.call(
method="GET",
path="/projects/PROJECT/queries/1",
query_params={"maxResults": 0, "location": "test-loc"},
timeout=None,
),
mock.call(
method="GET",
path="/projects/PROJECT/queries/1",
query_params={"maxResults": 0, "location": "test-loc"},
timeout=None,
),
# jobs.get -- is_job_done checking again
mock.call(
method="GET",
path="/projects/PROJECT/jobs/1",
query_params={"location": "test-loc"},
timeout=None,
),
# jobs.getQueryResults
mock.call(
method="GET",
path="/projects/PROJECT/queries/1",
query_params={"maxResults": 0, "location": "test-loc"},
timeout=120,
),
],
)


def test_query_retry_with_default_retry_and_ambiguous_errors_only_retries_with_failed_job(
client, monkeypatch
):
Expand Down

0 comments on commit 4f72723

Please sign in to comment.