Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve async sharding #977

Merged
merged 6 commits into from
Jun 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added test
  • Loading branch information
daniel-sanche committed Jun 7, 2024
commit f6b187dd99fc7585b2a4bf7736cec0ec293563d0
40 changes: 40 additions & 0 deletions tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1975,6 +1975,46 @@ async def mock_call(*args, **kwargs):
idx = i + _CONCURRENCY_LIMIT
assert rpc_start_list[idx] - (i * increment_time) < eps

@pytest.mark.asyncio
async def test_read_rows_sharded_expirary(self):
"""
If the operation times out before all shards complete, should raise
a ShardedReadRowsExceptionGroup
"""
from google.cloud.bigtable.data._async.client import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
from google.api_core.exceptions import DeadlineExceeded

operation_timeout = 0.1

# let the first batch complete, but the next batch times out
num_queries = 15
sleeps = [0] * _CONCURRENCY_LIMIT + [DeadlineExceeded("times up")] * (
num_queries - _CONCURRENCY_LIMIT
)

async def mock_call(*args, **kwargs):
next_item = sleeps.pop(0)
if isinstance(next_item, Exception):
raise next_item
else:
await asyncio.sleep(next_item)
return [mock.Mock()]

async with _make_client() as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(table, "read_rows") as read_rows:
read_rows.side_effect = mock_call
queries = [ReadRowsQuery() for _ in range(num_queries)]
with pytest.raises(ShardedReadRowsExceptionGroup) as exc:
await table.read_rows_sharded(
queries, operation_timeout=operation_timeout
)
assert isinstance(exc.value, ShardedReadRowsExceptionGroup)
assert len(exc.value.exceptions) == num_queries - _CONCURRENCY_LIMIT
# should keep successful queries
assert len(exc.value.successful_rows) == _CONCURRENCY_LIMIT


class TestSampleRowKeys:
async def _make_gapic_stream(self, sample_list: list[tuple[bytes, int]]):
Expand Down
Loading