Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve async sharding #977

Merged
merged 6 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,10 @@ async def read_rows_with_semaphore(query):
async with concurrency_sem:
# calculate new timeout based on time left in overall operation
shard_timeout = next(rpc_timeout_generator)
if shard_timeout <= 0:
raise DeadlineExceeded(
"Operation timeout exceeded before starting query"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe before starting subquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the earliest we can check after getting through the semaphore.

There is a similar check at the very beginning as part of _get_timeouts though (which raises an AttributeError)

)
return await self.read_rows(
query,
operation_timeout=shard_timeout,
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2015,6 +2015,35 @@ async def mock_call(*args, **kwargs):
# should keep successful queries
assert len(exc.value.successful_rows) == _CONCURRENCY_LIMIT

@pytest.mark.asyncio
async def test_read_rows_sharded_negative_batch_timeout(self):
"""
try to run with batch that starts after operation timeout

They should raise DeadlineExceeded errors
"""
import time
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
from google.api_core.exceptions import DeadlineExceeded

async def mock_call(*args, **kwargs):
await asyncio.sleep(0.05)
return [mock.Mock()]

async with _make_client() as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(table, "read_rows") as read_rows:
read_rows.side_effect = mock_call
queries = [ReadRowsQuery() for _ in range(15)]
with pytest.raises(ShardedReadRowsExceptionGroup) as exc:
await table.read_rows_sharded(queries, operation_timeout=0.01)
assert isinstance(exc.value, ShardedReadRowsExceptionGroup)
assert len(exc.value.exceptions) == 5
assert all(
isinstance(e.__cause__, DeadlineExceeded)
for e in exc.value.exceptions
)


class TestSampleRowKeys:
async def _make_gapic_stream(self, sample_list: list[tuple[bytes, int]]):
Expand Down
Loading