Skip to content

Commit

Permalink
perf: if page_size or max_results is set on QueryJob.result(), …
Browse files Browse the repository at this point in the history
…use to download first page of results (#1942)

* perf: if `page_size` or `max_results` is set on `QueryJob.result()`, use to download first page of results

* add unit tests for query_and_wait

* populate maxResults on page 2

* fix maxResults

* fix coverage

---------

Co-authored-by: Lingqing Gan <[email protected]>
  • Loading branch information
tswast and Linchin committed Jun 3, 2024
1 parent fc3edd5 commit 3e7a48d
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 125 deletions.
23 changes: 14 additions & 9 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
DEFAULT_RETRY,
DEFAULT_TIMEOUT,
DEFAULT_GET_JOB_TIMEOUT,
POLLING_DEFAULT_VALUE,
)
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
Expand Down Expand Up @@ -1963,6 +1964,7 @@ def _get_query_results(
timeout_ms: Optional[int] = None,
location: Optional[str] = None,
timeout: TimeoutType = DEFAULT_TIMEOUT,
page_size: int = 0,
) -> _QueryResults:
"""Get the query results object for a query job.
Expand All @@ -1981,20 +1983,26 @@ def _get_query_results(
before using ``retry``. If set, this connection timeout may be
increased to a minimum value. This prevents retries on what
would otherwise be a successful response.
page_size (int):
Maximum number of rows in a single response. See maxResults in
the jobs.getQueryResults REST API.
Returns:
google.cloud.bigquery.query._QueryResults:
A new ``_QueryResults`` instance.
"""

extra_params: Dict[str, Any] = {"maxResults": 0}
extra_params: Dict[str, Any] = {"maxResults": page_size}

if timeout is not None:
if not isinstance(timeout, (int, float)):
timeout = _MIN_GET_QUERY_RESULTS_TIMEOUT
else:
timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)

if page_size > 0:
extra_params["formatOptions.useInt64Timestamp"] = True

if project is None:
project = self.project

Expand Down Expand Up @@ -3504,7 +3512,7 @@ def query_and_wait(
location: Optional[str] = None,
project: Optional[str] = None,
api_timeout: TimeoutType = DEFAULT_TIMEOUT,
wait_timeout: TimeoutType = None,
wait_timeout: Union[Optional[float], object] = POLLING_DEFAULT_VALUE,
retry: retries.Retry = DEFAULT_RETRY,
job_retry: retries.Retry = DEFAULT_JOB_RETRY,
page_size: Optional[int] = None,
Expand Down Expand Up @@ -3538,10 +3546,12 @@ def query_and_wait(
api_timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
wait_timeout (Optional[float]):
wait_timeout (Optional[Union[float, object]]):
The number of seconds to wait for the query to finish. If the
query doesn't finish before this timeout, the client attempts
to cancel the query.
to cancel the query. If unset, the underlying REST API calls
have timeouts, but we still wait indefinitely for the job to
finish.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC. This only applies to making RPC
calls. It isn't used to retry failed jobs. This has
Expand Down Expand Up @@ -4128,11 +4138,6 @@ def _list_rows_from_query_results(
if start_index is not None:
params["startIndex"] = start_index

# We don't call jobs.query with a page size, so if the user explicitly
# requests a certain size, invalidate the cache.
if page_size is not None:
first_page_response = None

params["formatOptions.useInt64Timestamp"] = True
row_iterator = RowIterator(
client=self,
Expand Down
51 changes: 41 additions & 10 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,10 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
raise

def _reload_query_results(
self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: Optional[float] = None
self,
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: Optional[float] = None,
page_size: int = 0,
):
"""Refresh the cached query results unless already cached and complete.
Expand All @@ -1395,6 +1398,9 @@ def _reload_query_results(
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
page_size (int):
Maximum number of rows in a single response. See maxResults in
the jobs.getQueryResults REST API.
"""
# Optimization: avoid a call to jobs.getQueryResults if it's already
# been fetched, e.g. from jobs.query first page of results.
Expand Down Expand Up @@ -1425,7 +1431,14 @@ def _reload_query_results(

# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout
if timeout is not None:
transport_timeout = timeout
else:
transport_timeout = self._transport_timeout

# Handle PollingJob._DEFAULT_VALUE.
if not isinstance(transport_timeout, (float, int)):
transport_timeout = None

self._query_results = self._client._get_query_results(
self.job_id,
Expand All @@ -1434,6 +1447,7 @@ def _reload_query_results(
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
page_size=page_size,
)

def result( # type: ignore # (incompatible with supertype)
Expand Down Expand Up @@ -1515,11 +1529,25 @@ def result( # type: ignore # (incompatible with supertype)
# actually correspond to a finished query job.
)

# Setting max_results should be equivalent to setting page_size with
# regards to allowing the user to tune how many results to download
# while we wait for the query to finish. See internal issue:
# 344008814.
if page_size is None and max_results is not None:
page_size = max_results

# When timeout has default sentinel value ``object()``, do not pass
# anything to invoke default timeouts in subsequent calls.
kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
done_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
reload_query_results_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
list_rows_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
if type(timeout) is not object:
kwargs["timeout"] = timeout
done_kwargs["timeout"] = timeout
list_rows_kwargs["timeout"] = timeout
reload_query_results_kwargs["timeout"] = timeout

if page_size is not None:
reload_query_results_kwargs["page_size"] = page_size

try:
retry_do_query = getattr(self, "_retry_do_query", None)
Expand Down Expand Up @@ -1562,7 +1590,7 @@ def is_job_done():
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, **kwargs):
if self.done(retry=retry, **done_kwargs):
# If it's already failed, we might as well stop.
job_failed_exception = self.exception()
if job_failed_exception is not None:
Expand Down Expand Up @@ -1599,14 +1627,16 @@ def is_job_done():
# response from the REST API. This ensures we aren't
# making any extra API calls if the previous loop
# iteration fetched the finished job.
self._reload_query_results(retry=retry, **kwargs)
self._reload_query_results(
retry=retry, **reload_query_results_kwargs
)
return True

# Call jobs.getQueryResults with max results set to 0 just to
# wait for the query to finish. Unlike most methods,
# jobs.getQueryResults hangs as long as it can to ensure we
# know when the query has finished as soon as possible.
self._reload_query_results(retry=retry, **kwargs)
self._reload_query_results(retry=retry, **reload_query_results_kwargs)

# Even if the query is finished now according to
# jobs.getQueryResults, we'll want to reload the job status if
Expand Down Expand Up @@ -1679,8 +1709,9 @@ def is_job_done():
# We know that there's at least 1 row, so only treat the response from
# jobs.getQueryResults / jobs.query as the first page of the
# RowIterator response if there are any rows in it. This prevents us
# from stopping the iteration early because we're missing rows and
# there's no next page token.
# from stopping the iteration early in the cases where we set
# maxResults=0. In that case, we're missing rows and there's no next
# page token.
first_page_response = self._query_results._properties
if "rows" not in first_page_response:
first_page_response = None
Expand All @@ -1699,7 +1730,7 @@ def is_job_done():
query_id=self.query_id,
first_page_response=first_page_response,
num_dml_affected_rows=self._query_results.num_dml_affected_rows,
**kwargs,
**list_rows_kwargs,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,7 @@ def _get_next_page_response(self):
if self._page_size is not None:
if self.page_number and "startIndex" in params:
del params["startIndex"]
params["maxResults"] = self._page_size

return self.api_request(
method=self._HTTP_METHOD, path=self.path, query_params=params
)
Expand Down
Loading

0 comments on commit 3e7a48d

Please sign in to comment.