From f9fefad6ee2e16804d109d8bfbb613062f57ea65 Mon Sep 17 00:00:00 2001 From: Astha Mohta <35952883+asthamohta@users.noreply.github.com> Date: Thu, 27 Apr 2023 21:26:57 +0530 Subject: [PATCH 1/2] feat: Leader Aware Routing (#899) * changes * tests * Update client.py * Update test_client.py * Update connection.py * setting feature false * changes --- google/cloud/spanner_dbapi/connection.py | 19 ++- google/cloud/spanner_v1/_helpers.py | 12 ++ google/cloud/spanner_v1/batch.py | 9 +- google/cloud/spanner_v1/client.py | 19 +++ google/cloud/spanner_v1/database.py | 10 +- google/cloud/spanner_v1/pool.py | 13 +- google/cloud/spanner_v1/session.py | 17 ++- google/cloud/spanner_v1/snapshot.py | 29 +++- google/cloud/spanner_v1/transaction.py | 24 ++++ tests/unit/spanner_dbapi/test_connect.py | 8 +- tests/unit/spanner_dbapi/test_connection.py | 3 +- tests/unit/test__helpers.py | 14 ++ tests/unit/test_batch.py | 25 +++- tests/unit/test_client.py | 17 +++ tests/unit/test_database.py | 66 +++++++-- tests/unit/test_instance.py | 1 + tests/unit/test_pool.py | 1 + tests/unit/test_session.py | 143 ++++++++++++++---- tests/unit/test_snapshot.py | 11 +- tests/unit/test_spanner.py | 152 ++++++++++++++++---- tests/unit/test_transaction.py | 43 +++++- 21 files changed, 543 insertions(+), 93 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index a50e48804b..e6a0610baf 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -508,6 +508,7 @@ def connect( pool=None, user_agent=None, client=None, + route_to_leader_enabled=False, ): """Creates a connection to a Google Cloud Spanner database. @@ -544,6 +545,14 @@ def connect( :class:`~google.cloud.spanner_v1.Client`. :param client: (Optional) Custom user provided Client Object + :type route_to_leader_enabled: boolean + :param route_to_leader_enabled: + (Optional) Default False. Set route_to_leader_enabled as True to + Enable leader aware routing. Enabling leader aware routing + would route all requests in RW/PDML transactions to the + leader region. + + :rtype: :class:`google.cloud.spanner_dbapi.connection.Connection` :returns: Connection object associated with the given Google Cloud Spanner resource. @@ -556,11 +565,17 @@ def connect( ) if isinstance(credentials, str): client = spanner.Client.from_service_account_json( - credentials, project=project, client_info=client_info + credentials, + project=project, + client_info=client_info, + route_to_leader_enabled=False, ) else: client = spanner.Client( - project=project, credentials=credentials, client_info=client_info + project=project, + credentials=credentials, + client_info=client_info, + route_to_leader_enabled=False, ) else: if project is not None and client.project != project: diff --git a/google/cloud/spanner_v1/_helpers.py b/google/cloud/spanner_v1/_helpers.py index b364514d09..1e647db339 100644 --- a/google/cloud/spanner_v1/_helpers.py +++ b/google/cloud/spanner_v1/_helpers.py @@ -292,3 +292,15 @@ def _metadata_with_prefix(prefix, **kw): List[Tuple[str, str]]: RPC metadata with supplied prefix """ return [("google-cloud-resource-prefix", prefix)] + + +def _metadata_with_leader_aware_routing(value, **kw): + """Create RPC metadata containing a leader aware routing header + + Args: + value (bool): header value + + Returns: + List[Tuple[str, str]]: RPC metadata with leader aware routing header + """ + return ("x-goog-spanner-route-to-leader", str(value).lower()) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 48c533d2cd..7ee0392aa4 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -20,7 +20,10 @@ from google.cloud.spanner_v1._helpers import _SessionWrapper from google.cloud.spanner_v1._helpers import _make_list_value_pbs -from google.cloud.spanner_v1._helpers import _metadata_with_prefix +from google.cloud.spanner_v1._helpers import ( + _metadata_with_prefix, + _metadata_with_leader_aware_routing, +) from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1 import RequestOptions @@ -159,6 +162,10 @@ def commit(self, return_commit_stats=False, request_options=None): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) + if database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) trace_attributes = {"num_mutations": len(self._mutations)} diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index f943573b66..c37c5e8411 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -114,6 +114,13 @@ class Client(ClientWithProject): If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.QueryOptions` + :type route_to_leader_enabled: boolean + :param route_to_leader_enabled: + (Optional) Default False. Set route_to_leader_enabled as True to + Enable leader aware routing. Enabling leader aware routing + would route all requests in RW/PDML transactions to the + leader region. + :raises: :class:`ValueError ` if both ``read_only`` and ``admin`` are :data:`True` """ @@ -132,6 +139,7 @@ def __init__( client_info=_CLIENT_INFO, client_options=None, query_options=None, + route_to_leader_enabled=False, ): self._emulator_host = _get_spanner_emulator_host() @@ -171,6 +179,8 @@ def __init__( ): warnings.warn(_EMULATOR_HOST_HTTP_SCHEME) + self._route_to_leader_enabled = route_to_leader_enabled + @property def credentials(self): """Getter for client's credentials. @@ -242,6 +252,15 @@ def database_admin_api(self): ) return self._database_admin_api + @property + def route_to_leader_enabled(self): + """Getter for if read-write or pdml requests will be routed to leader. + + :rtype: boolean + :returns: If read-write requests will be routed to leader. + """ + return self._route_to_leader_enabled + def copy(self): """Make a copy of this client. diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 8e72d6cf8f..f78fff7816 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -44,7 +44,10 @@ from google.cloud.spanner_v1 import RequestOptions from google.cloud.spanner_v1 import SpannerClient from google.cloud.spanner_v1._helpers import _merge_query_options -from google.cloud.spanner_v1._helpers import _metadata_with_prefix +from google.cloud.spanner_v1._helpers import ( + _metadata_with_prefix, + _metadata_with_leader_aware_routing, +) from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.keyset import KeySet from google.cloud.spanner_v1.pool import BurstyPool @@ -155,6 +158,7 @@ def __init__( self._encryption_config = encryption_config self._database_dialect = database_dialect self._database_role = database_role + self._route_to_leader_enabled = self._instance._client.route_to_leader_enabled if pool is None: pool = BurstyPool(database_role=database_role) @@ -565,6 +569,10 @@ def execute_partitioned_dml( ) metadata = _metadata_with_prefix(self.name) + if self._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(self._route_to_leader_enabled) + ) def execute_pdml(): with SessionCheckout(self._pool) as session: diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 7455d0cd20..56837bfc0b 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -20,7 +20,10 @@ from google.cloud.exceptions import NotFound from google.cloud.spanner_v1 import BatchCreateSessionsRequest from google.cloud.spanner_v1 import Session -from google.cloud.spanner_v1._helpers import _metadata_with_prefix +from google.cloud.spanner_v1._helpers import ( + _metadata_with_prefix, + _metadata_with_leader_aware_routing, +) from warnings import warn _NOW = datetime.datetime.utcnow # unit tests may replace @@ -191,6 +194,10 @@ def bind(self, database): self._database = database api = database.spanner_api metadata = _metadata_with_prefix(database.name) + if database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) self._database_role = self._database_role or self._database.database_role request = BatchCreateSessionsRequest( database=database.name, @@ -402,6 +409,10 @@ def bind(self, database): self._database = database api = database.spanner_api metadata = _metadata_with_prefix(database.name) + if database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) created_session_count = 0 self._database_role = self._database_role or self._database.database_role diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 5b1ca6fbb8..256e72511b 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -26,7 +26,10 @@ from google.cloud.spanner_v1 import ExecuteSqlRequest from google.cloud.spanner_v1 import CreateSessionRequest -from google.cloud.spanner_v1._helpers import _metadata_with_prefix +from google.cloud.spanner_v1._helpers import ( + _metadata_with_prefix, + _metadata_with_leader_aware_routing, +) from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.snapshot import Snapshot @@ -125,6 +128,12 @@ def create(self): raise ValueError("Session ID already set by back-end") api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) + if self._database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing( + self._database._route_to_leader_enabled + ) + ) request = CreateSessionRequest(database=self._database.name) if self._database.database_role is not None: @@ -153,6 +162,12 @@ def exists(self): return False api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) + if self._database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing( + self._database._route_to_leader_enabled + ) + ) with trace_call("CloudSpanner.GetSession", self) as span: try: diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 362e5dd1bc..dc526c9504 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -31,7 +31,10 @@ from google.api_core import gapic_v1 from google.cloud.spanner_v1._helpers import _make_value_pb from google.cloud.spanner_v1._helpers import _merge_query_options -from google.cloud.spanner_v1._helpers import _metadata_with_prefix +from google.cloud.spanner_v1._helpers import ( + _metadata_with_prefix, + _metadata_with_leader_aware_routing, +) from google.cloud.spanner_v1._helpers import _SessionWrapper from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1.streamed import StreamedResultSet @@ -235,6 +238,10 @@ def read( database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) + if not self._read_only and database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) if request_options is None: request_options = RequestOptions() @@ -244,7 +251,7 @@ def read( if self._read_only: # Transaction tags are not supported for read only transactions. request_options.transaction_tag = None - else: + elif self.transaction_tag is not None: request_options.transaction_tag = self.transaction_tag request = ReadRequest( @@ -391,6 +398,10 @@ def execute_sql( database = self._session._database metadata = _metadata_with_prefix(database.name) + if not self._read_only and database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) api = database.spanner_api @@ -406,7 +417,7 @@ def execute_sql( if self._read_only: # Transaction tags are not supported for read only transactions. request_options.transaction_tag = None - else: + elif self.transaction_tag is not None: request_options.transaction_tag = self.transaction_tag request = ExecuteSqlRequest( @@ -527,6 +538,10 @@ def partition_read( database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) + if database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) transaction = self._make_txn_selector() partition_options = PartitionOptions( partition_size_bytes=partition_size_bytes, max_partitions=max_partitions @@ -621,6 +636,10 @@ def partition_query( database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) + if database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) transaction = self._make_txn_selector() partition_options = PartitionOptions( partition_size_bytes=partition_size_bytes, max_partitions=max_partitions @@ -766,6 +785,10 @@ def begin(self): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) + if not self._read_only and database._route_to_leader_enabled: + metadata.append( + (_metadata_with_leader_aware_routing(database._route_to_leader_enabled)) + ) txn_selector = self._make_txn_selector() with trace_call("CloudSpanner.BeginTransaction", self._session): response = api.begin_transaction( diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index ce34054ab9..31ce4b24f8 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -21,6 +21,7 @@ _make_value_pb, _merge_query_options, _metadata_with_prefix, + _metadata_with_leader_aware_routing, ) from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import ExecuteBatchDmlRequest @@ -50,6 +51,7 @@ class Transaction(_SnapshotBase, _BatchBase): _multi_use = True _execute_sql_count = 0 _lock = threading.Lock() + _read_only = False def __init__(self, session): if session._transaction is not None: @@ -124,6 +126,10 @@ def begin(self): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) + if database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite()) with trace_call("CloudSpanner.BeginTransaction", self._session): response = api.begin_transaction( @@ -140,6 +146,12 @@ def rollback(self): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) + if database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing( + database._route_to_leader_enabled + ) + ) with trace_call("CloudSpanner.Rollback", self._session): api.rollback( session=self._session.name, @@ -176,6 +188,10 @@ def commit(self, return_commit_stats=False, request_options=None): database = self._session._database api = database.spanner_api metadata = _metadata_with_prefix(database.name) + if database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) trace_attributes = {"num_mutations": len(self._mutations)} if request_options is None: @@ -294,6 +310,10 @@ def execute_update( params_pb = self._make_params_pb(params, param_types) database = self._session._database metadata = _metadata_with_prefix(database.name) + if database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) api = database.spanner_api seqno, self._execute_sql_count = ( @@ -406,6 +426,10 @@ def batch_update(self, statements, request_options=None): database = self._session._database metadata = _metadata_with_prefix(database.name) + if database._route_to_leader_enabled: + metadata.append( + _metadata_with_leader_aware_routing(database._route_to_leader_enabled) + ) api = database.spanner_api seqno, self._execute_sql_count = ( diff --git a/tests/unit/spanner_dbapi/test_connect.py b/tests/unit/spanner_dbapi/test_connect.py index 948659d595..a5b520bcbf 100644 --- a/tests/unit/spanner_dbapi/test_connect.py +++ b/tests/unit/spanner_dbapi/test_connect.py @@ -56,6 +56,7 @@ def test_w_implicit(self, mock_client): instance.database.assert_called_once_with(DATABASE, pool=None) # Datbase constructs its own pool self.assertIsNotNone(connection.database._pool) + self.assertTrue(connection.instance._client.route_to_leader_enabled) def test_w_explicit(self, mock_client): from google.cloud.spanner_v1.pool import AbstractSessionPool @@ -76,12 +77,16 @@ def test_w_explicit(self, mock_client): credentials, pool=pool, user_agent=USER_AGENT, + route_to_leader_enabled=False, ) self.assertIsInstance(connection, Connection) mock_client.assert_called_once_with( - project=PROJECT, credentials=credentials, client_info=mock.ANY + project=PROJECT, + credentials=credentials, + client_info=mock.ANY, + route_to_leader_enabled=False, ) client_info = mock_client.call_args_list[0][1]["client_info"] self.assertEqual(client_info.user_agent, USER_AGENT) @@ -115,6 +120,7 @@ def test_w_credential_file_path(self, mock_client): credentials_path, project=PROJECT, client_info=mock.ANY, + route_to_leader_enabled=False, ) client_info = factory.call_args_list[0][1]["client_info"] self.assertEqual(client_info.user_agent, USER_AGENT) diff --git a/tests/unit/spanner_dbapi/test_connection.py b/tests/unit/spanner_dbapi/test_connection.py index 7a0ac9e687..6867c20d36 100644 --- a/tests/unit/spanner_dbapi/test_connection.py +++ b/tests/unit/spanner_dbapi/test_connection.py @@ -44,9 +44,10 @@ def _get_client_info(self): def _make_connection(self, **kwargs): from google.cloud.spanner_dbapi import Connection from google.cloud.spanner_v1.instance import Instance + from google.cloud.spanner_v1.client import Client # We don't need a real Client object to test the constructor - instance = Instance(INSTANCE, client=None) + instance = Instance(INSTANCE, client=Client) database = instance.database(DATABASE) return Connection(instance, database, **kwargs) diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index 21434da191..e90d2dec82 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -669,3 +669,17 @@ def test(self): prefix = "prefix" metadata = self._call_fut(prefix) self.assertEqual(metadata, [("google-cloud-resource-prefix", prefix)]) + + +class Test_metadata_with_leader_aware_routing(unittest.TestCase): + def _call_fut(self, *args, **kw): + from google.cloud.spanner_v1._helpers import _metadata_with_leader_aware_routing + + return _metadata_with_leader_aware_routing(*args, **kw) + + def test(self): + value = True + metadata = self._call_fut(True) + self.assertEqual( + metadata, ("x-goog-spanner-route-to-leader", str(value).lower()) + ) diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 2d685acfbf..a7f4451379 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -239,7 +239,13 @@ def test_commit_ok(self): self.assertEqual(mutations, batch._mutations) self.assertIsInstance(single_use_txn, TransactionOptions) self.assertTrue(type(single_use_txn).pb(single_use_txn).HasField("read_write")) - self.assertEqual(metadata, [("google-cloud-resource-prefix", database.name)]) + self.assertEqual( + metadata, + [ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) self.assertEqual(request_options, RequestOptions()) self.assertSpanAttributes( @@ -285,7 +291,13 @@ def _test_commit_with_request_options(self, request_options=None): self.assertEqual(mutations, batch._mutations) self.assertIsInstance(single_use_txn, TransactionOptions) self.assertTrue(type(single_use_txn).pb(single_use_txn).HasField("read_write")) - self.assertEqual(metadata, [("google-cloud-resource-prefix", database.name)]) + self.assertEqual( + metadata, + [ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) self.assertEqual(actual_request_options, expected_request_options) self.assertSpanAttributes( @@ -362,7 +374,13 @@ def test_context_mgr_success(self): self.assertEqual(mutations, batch._mutations) self.assertIsInstance(single_use_txn, TransactionOptions) self.assertTrue(type(single_use_txn).pb(single_use_txn).HasField("read_write")) - self.assertEqual(metadata, [("google-cloud-resource-prefix", database.name)]) + self.assertEqual( + metadata, + [ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) self.assertEqual(request_options, RequestOptions()) self.assertSpanAttributes( @@ -404,6 +422,7 @@ def __init__(self, database=None, name=TestBatch.SESSION_NAME): class _Database(object): name = "testing" + _route_to_leader_enabled = True class _FauxSpannerAPI: diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 68d8ea6857..e1532ca470 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -59,6 +59,7 @@ def _constructor_test_helper( client_options=None, query_options=None, expected_query_options=None, + route_to_leader_enabled=None, ): import google.api_core.client_options from google.cloud.spanner_v1 import client as MUT @@ -78,6 +79,9 @@ def _constructor_test_helper( else: expected_client_options = client_options + if route_to_leader_enabled is not None: + kwargs["route_to_leader_enabled"] = route_to_leader_enabled + client = self._make_one( project=self.PROJECT, credentials=creds, @@ -106,6 +110,10 @@ def _constructor_test_helper( ) if expected_query_options is not None: self.assertEqual(client._query_options, expected_query_options) + if route_to_leader_enabled is not None: + self.assertEqual(client.route_to_leader_enabled, route_to_leader_enabled) + else: + self.assertFalse(client.route_to_leader_enabled) @mock.patch("google.cloud.spanner_v1.client._get_spanner_emulator_host") @mock.patch("warnings.warn") @@ -219,6 +227,15 @@ def test_constructor_custom_query_options_env_config(self, mock_ver, mock_stats) expected_query_options=expected_query_options, ) + def test_constructor_route_to_leader_disbled(self): + from google.cloud.spanner_v1 import client as MUT + + expected_scopes = (MUT.SPANNER_ADMIN_SCOPE,) + creds = _make_credentials() + self._constructor_test_helper( + expected_scopes, creds, route_to_leader_enabled=False + ) + @mock.patch("google.cloud.spanner_v1.client._get_spanner_emulator_host") def test_instance_admin_api(self, mock_em): from google.cloud.spanner_v1.client import SPANNER_ADMIN_SCOPE diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 030cf5512b..d070628aac 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -114,6 +114,7 @@ def test_ctor_defaults(self): # BurstyPool does not create sessions during 'bind()'. self.assertTrue(database._pool._sessions.empty()) self.assertIsNone(database.database_role) + self.assertTrue(database._route_to_leader_enabled, True) def test_ctor_w_explicit_pool(self): instance = _Instance(self.INSTANCE_NAME) @@ -134,6 +135,16 @@ def test_ctor_w_database_role(self): self.assertIs(database._instance, instance) self.assertIs(database.database_role, self.DATABASE_ROLE) + def test_ctor_w_route_to_leader_disbled(self): + client = _Client(route_to_leader_enabled=False) + instance = _Instance(self.INSTANCE_NAME, client=client) + database = self._make_one( + self.DATABASE_ID, instance, database_role=self.DATABASE_ROLE + ) + self.assertEqual(database.database_id, self.DATABASE_ID) + self.assertIs(database._instance, instance) + self.assertFalse(database._route_to_leader_enabled) + def test_ctor_w_ddl_statements_non_string(self): with self.assertRaises(ValueError): @@ -449,8 +460,9 @@ def test___eq__(self): self.assertEqual(database1, database2) def test___eq__type_differ(self): + instance = _Instance(self.INSTANCE_NAME) pool = _Pool() - database1 = self._make_one(self.DATABASE_ID, None, pool=pool) + database1 = self._make_one(self.DATABASE_ID, instance, pool=pool) database2 = object() self.assertNotEqual(database1, database2) @@ -463,9 +475,12 @@ def test___ne__same_value(self): self.assertFalse(comparison_val) def test___ne__(self): + instance1, instance2 = _Instance(self.INSTANCE_NAME + "1"), _Instance( + self.INSTANCE_NAME + "2" + ) pool1, pool2 = _Pool(), _Pool() - database1 = self._make_one("database_id1", "instance1", pool=pool1) - database2 = self._make_one("database_id2", "instance2", pool=pool2) + database1 = self._make_one("database_id1", instance1, pool=pool1) + database2 = self._make_one("database_id2", instance2, pool=pool2) self.assertNotEqual(database1, database2) def test_create_grpc_error(self): @@ -996,7 +1011,10 @@ def _execute_partitioned_dml_helper( api.begin_transaction.assert_called_with( session=session.name, options=txn_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) if retried: self.assertEqual(api.begin_transaction.call_count, 2) @@ -1034,7 +1052,10 @@ def _execute_partitioned_dml_helper( api.execute_streaming_sql.assert_any_call( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) if retried: expected_retry_transaction = TransactionSelector( @@ -1051,7 +1072,10 @@ def _execute_partitioned_dml_helper( ) api.execute_streaming_sql.assert_called_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertEqual(api.execute_streaming_sql.call_count, 2) else: @@ -1182,7 +1206,8 @@ def test_batch(self): def test_batch_snapshot(self): from google.cloud.spanner_v1.database import BatchSnapshot - database = self._make_one(self.DATABASE_ID, instance=object(), pool=_Pool()) + instance = _Instance(self.INSTANCE_NAME) + database = self._make_one(self.DATABASE_ID, instance=instance, pool=_Pool()) batch_txn = database.batch_snapshot() self.assertIsInstance(batch_txn, BatchSnapshot) @@ -1193,7 +1218,8 @@ def test_batch_snapshot(self): def test_batch_snapshot_w_read_timestamp(self): from google.cloud.spanner_v1.database import BatchSnapshot - database = self._make_one(self.DATABASE_ID, instance=object(), pool=_Pool()) + instance = _Instance(self.INSTANCE_NAME) + database = self._make_one(self.DATABASE_ID, instance=instance, pool=_Pool()) timestamp = self._make_timestamp() batch_txn = database.batch_snapshot(read_timestamp=timestamp) @@ -1205,7 +1231,8 @@ def test_batch_snapshot_w_read_timestamp(self): def test_batch_snapshot_w_exact_staleness(self): from google.cloud.spanner_v1.database import BatchSnapshot - database = self._make_one(self.DATABASE_ID, instance=object(), pool=_Pool()) + instance = _Instance(self.INSTANCE_NAME) + database = self._make_one(self.DATABASE_ID, instance=instance, pool=_Pool()) duration = self._make_duration() batch_txn = database.batch_snapshot(exact_staleness=duration) @@ -1662,7 +1689,10 @@ def test_context_mgr_success(self): ) api.commit.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_context_mgr_w_commit_stats_success(self): @@ -1706,7 +1736,10 @@ def test_context_mgr_w_commit_stats_success(self): ) api.commit.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) database.logger.info.assert_called_once_with( @@ -1747,7 +1780,10 @@ def test_context_mgr_w_commit_stats_error(self): ) api.commit.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) database.logger.info.assert_not_called() @@ -2622,7 +2658,7 @@ def _make_instance_api(): class _Client(object): - def __init__(self, project=TestDatabase.PROJECT_ID): + def __init__(self, project=TestDatabase.PROJECT_ID, route_to_leader_enabled=True): from google.cloud.spanner_v1 import ExecuteSqlRequest self.project = project @@ -2632,10 +2668,11 @@ def __init__(self, project=TestDatabase.PROJECT_ID): self._client_info = mock.Mock() self._client_options = mock.Mock() self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") + self.route_to_leader_enabled = route_to_leader_enabled class _Instance(object): - def __init__(self, name, client=None, emulator_host=None): + def __init__(self, name, client=_Client(), emulator_host=None): self.name = name self.instance_id = name.rsplit("/", 1)[1] self._client = client @@ -2649,6 +2686,7 @@ def __init__(self, name): class _Database(object): log_commit_stats = False + _route_to_leader_enabled = True def __init__(self, name, instance=None): self.name = name diff --git a/tests/unit/test_instance.py b/tests/unit/test_instance.py index e0a0f663cf..f9d1fec6b8 100644 --- a/tests/unit/test_instance.py +++ b/tests/unit/test_instance.py @@ -1015,6 +1015,7 @@ def __init__(self, project, timeout_seconds=None): self.project = project self.project_name = "projects/" + self.project self.timeout_seconds = timeout_seconds + self.route_to_leader_enabled = True def copy(self): from copy import deepcopy diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py index 3a9d35bc92..58665634de 100644 --- a/tests/unit/test_pool.py +++ b/tests/unit/test_pool.py @@ -957,6 +957,7 @@ def __init__(self, name): self._sessions = [] self._database_role = None self.database_id = name + self._route_to_leader_enabled = True def mock_batch_create_sessions( request=None, diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index edad4ce777..3125e33f21 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -69,6 +69,7 @@ def _make_database(name=DATABASE_NAME, database_role=None): database.name = name database.log_commit_stats = False database.database_role = database_role + database._route_to_leader_enabled = True return database @staticmethod @@ -168,7 +169,10 @@ def test_create_w_database_role(self): gax_api.create_session.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertSpanAttributes( @@ -194,7 +198,11 @@ def test_create_wo_database_role(self): ) gax_api.create_session.assert_called_once_with( - request=request, metadata=[("google-cloud-resource-prefix", database.name)] + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertSpanAttributes( @@ -220,7 +228,11 @@ def test_create_ok(self): ) gax_api.create_session.assert_called_once_with( - request=request, metadata=[("google-cloud-resource-prefix", database.name)] + request=request, + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertSpanAttributes( @@ -250,7 +262,10 @@ def test_create_w_labels(self): gax_api.create_session.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertSpanAttributes( @@ -296,7 +311,10 @@ def test_exists_hit(self): gax_api.get_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertSpanAttributes( @@ -321,7 +339,10 @@ def test_exists_hit_wo_span(self): gax_api.get_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertNoSpans() @@ -340,7 +361,10 @@ def test_exists_miss(self): gax_api.get_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertSpanAttributes( @@ -366,7 +390,10 @@ def test_exists_miss_wo_span(self): gax_api.get_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertNoSpans() @@ -386,7 +413,10 @@ def test_exists_error(self): gax_api.get_session.assert_called_once_with( name=self.SESSION_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertSpanAttributes( @@ -900,7 +930,10 @@ def unit_of_work(txn, *args, **kw): gax_api.begin_transaction.assert_called_once_with( session=self.SESSION_NAME, options=expected_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -910,7 +943,10 @@ def unit_of_work(txn, *args, **kw): ) gax_api.commit.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_run_in_transaction_w_commit_error(self): @@ -962,7 +998,10 @@ def unit_of_work(txn, *args, **kw): ) gax_api.commit.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_run_in_transaction_w_abort_no_retry_metadata(self): @@ -1021,7 +1060,10 @@ def unit_of_work(txn, *args, **kw): mock.call( session=self.SESSION_NAME, options=expected_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) ] * 2, @@ -1037,7 +1079,10 @@ def unit_of_work(txn, *args, **kw): [ mock.call( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) ] * 2, @@ -1114,7 +1159,10 @@ def unit_of_work(txn, *args, **kw): mock.call( session=self.SESSION_NAME, options=expected_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) ] * 2, @@ -1130,7 +1178,10 @@ def unit_of_work(txn, *args, **kw): [ mock.call( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) ] * 2, @@ -1206,7 +1257,10 @@ def unit_of_work(txn, *args, **kw): gax_api.begin_transaction.assert_called_once_with( session=self.SESSION_NAME, options=expected_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1216,7 +1270,10 @@ def unit_of_work(txn, *args, **kw): ) gax_api.commit.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_run_in_transaction_w_abort_w_retry_metadata_deadline(self): @@ -1298,7 +1355,10 @@ def _time(_results=[1, 1.5]): gax_api.begin_transaction.assert_called_once_with( session=self.SESSION_NAME, options=expected_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1308,7 +1368,10 @@ def _time(_results=[1, 1.5]): ) gax_api.commit.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_run_in_transaction_w_timeout(self): @@ -1378,7 +1441,10 @@ def _time(_results=[1, 2, 4, 8]): mock.call( session=self.SESSION_NAME, options=expected_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) ] * 3, @@ -1394,7 +1460,10 @@ def _time(_results=[1, 2, 4, 8]): [ mock.call( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) ] * 3, @@ -1454,7 +1523,10 @@ def unit_of_work(txn, *args, **kw): gax_api.begin_transaction.assert_called_once_with( session=self.SESSION_NAME, options=expected_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1465,7 +1537,10 @@ def unit_of_work(txn, *args, **kw): ) gax_api.commit.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) database.logger.info.assert_called_once_with( "CommitStats: mutation_count: 4\n", extra={"commit_stats": commit_stats} @@ -1518,7 +1593,10 @@ def unit_of_work(txn, *args, **kw): gax_api.begin_transaction.assert_called_once_with( session=self.SESSION_NAME, options=expected_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1529,7 +1607,10 @@ def unit_of_work(txn, *args, **kw): ) gax_api.commit.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) database.logger.info.assert_not_called() @@ -1589,7 +1670,10 @@ def unit_of_work(txn, *args, **kw): gax_api.begin_transaction.assert_called_once_with( session=self.SESSION_NAME, options=expected_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) request = CommitRequest( session=self.SESSION_NAME, @@ -1599,7 +1683,10 @@ def unit_of_work(txn, *args, **kw): ) gax_api.commit.assert_called_once_with( request=request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_delay_helper_w_no_delay(self): diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index c3ea162f11..2731e4f258 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -1108,7 +1108,10 @@ def _partition_read_helper( ) api.partition_read.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], retry=retry, timeout=timeout, ) @@ -1245,7 +1248,10 @@ def _partition_query_helper( ) api.partition_query.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], retry=retry, timeout=timeout, ) @@ -1691,6 +1697,7 @@ class _Database(object): def __init__(self): self.name = "testing" self._instance = _Instance() + self._route_to_leader_enabled = True class _Session(object): diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index a7c41c5f4f..e4cd1e84cd 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -261,7 +261,8 @@ def _execute_sql_expected_request( ) expected_request_options = REQUEST_OPTIONS - expected_request_options.transaction_tag = None + expected_request_options.transaction_tag = self.TRANSACTION_TAG + expected_request = ExecuteSqlRequest( session=self.SESSION_NAME, sql=SQL_QUERY_WITH_PARAM, @@ -358,7 +359,7 @@ def _read_helper_expected_request(self, partition=None, begin=True, count=0): # Transaction tag is ignored for read request. expected_request_options = REQUEST_OPTIONS - expected_request_options.transaction_tag = None + expected_request_options.transaction_tag = self.TRANSACTION_TAG expected_request = ReadRequest( session=self.SESSION_NAME, @@ -465,7 +466,10 @@ def test_transaction_should_include_begin_with_first_update(self): request=self._execute_update_expected_request(database=database), retry=RETRY, timeout=TIMEOUT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_transaction_should_include_begin_with_first_query(self): @@ -477,7 +481,10 @@ def test_transaction_should_include_begin_with_first_query(self): api.execute_streaming_sql.assert_called_once_with( request=self._execute_sql_expected_request(database=database), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], timeout=TIMEOUT, retry=RETRY, ) @@ -491,7 +498,10 @@ def test_transaction_should_include_begin_with_first_read(self): api.streaming_read.assert_called_once_with( request=self._read_helper_expected_request(), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], retry=RETRY, timeout=TIMEOUT, ) @@ -504,7 +514,10 @@ def test_transaction_should_include_begin_with_first_batch_update(self): self._batch_update_helper(transaction=transaction, database=database, api=api) api.execute_batch_dml.assert_called_once_with( request=self._batch_update_expected_request(), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( @@ -519,7 +532,10 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( ) api.execute_batch_dml.assert_called_once_with( request=self._batch_update_expected_request(begin=True), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self._execute_update_helper(transaction=transaction, api=api) api.execute_sql.assert_called_once_with( @@ -528,7 +544,10 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( ), retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_transaction_should_use_transaction_id_returned_by_first_query(self): @@ -541,7 +560,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_query(self): request=self._execute_sql_expected_request(database=database), retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self._execute_update_helper(transaction=transaction, api=api) @@ -551,7 +573,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_query(self): ), retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_transaction_should_use_transaction_id_returned_by_first_update(self): @@ -564,7 +589,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): request=self._execute_update_expected_request(database=database), retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self._execute_sql_helper(transaction=transaction, api=api) @@ -572,7 +600,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): request=self._execute_sql_expected_request(database=database, begin=False), retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_transaction_should_use_transaction_id_returned_by_first_read(self): @@ -583,7 +614,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): self._read_helper(transaction=transaction, api=api) api.streaming_read.assert_called_once_with( request=self._read_helper_expected_request(), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], retry=RETRY, timeout=TIMEOUT, ) @@ -591,7 +625,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): self._batch_update_helper(transaction=transaction, database=database, api=api) api.execute_batch_dml.assert_called_once_with( request=self._batch_update_expected_request(begin=False), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) def test_transaction_should_use_transaction_id_returned_by_first_batch_update(self): @@ -602,12 +639,18 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se self._batch_update_helper(transaction=transaction, database=database, api=api) api.execute_batch_dml.assert_called_once_with( request=self._batch_update_expected_request(), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self._read_helper(transaction=transaction, api=api) api.streaming_read.assert_called_once_with( request=self._read_helper_expected_request(begin=False), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], retry=RETRY, timeout=TIMEOUT, ) @@ -644,19 +687,28 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ request=self._execute_update_expected_request(database), retry=RETRY, timeout=TIMEOUT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) api.execute_sql.assert_any_call( request=self._execute_update_expected_request(database, begin=False), retry=RETRY, timeout=TIMEOUT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) api.execute_batch_dml.assert_any_call( request=self._batch_update_expected_request(begin=False), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertEqual(api.execute_sql.call_count, 2) @@ -694,17 +746,26 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ request=self._execute_update_expected_request(database, begin=False), retry=RETRY, timeout=TIMEOUT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) api.execute_batch_dml.assert_any_call( request=self._batch_update_expected_request(), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) api.execute_batch_dml.assert_any_call( request=self._batch_update_expected_request(begin=False), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertEqual(api.execute_sql.call_count, 1) @@ -747,19 +808,28 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ request=self._execute_update_expected_request(database, begin=False), retry=RETRY, timeout=TIMEOUT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) api.streaming_read.assert_any_call( request=self._read_helper_expected_request(), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], retry=RETRY, timeout=TIMEOUT, ) api.streaming_read.assert_any_call( request=self._read_helper_expected_request(begin=False), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], retry=RETRY, timeout=TIMEOUT, ) @@ -804,20 +874,28 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ request=self._execute_update_expected_request(database, begin=False), retry=RETRY, timeout=TIMEOUT, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) - req = self._execute_sql_expected_request(database) api.execute_streaming_sql.assert_any_call( request=req, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], retry=RETRY, timeout=TIMEOUT, ) api.execute_streaming_sql.assert_any_call( request=self._execute_sql_expected_request(database, begin=False), - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], retry=RETRY, timeout=TIMEOUT, ) @@ -825,6 +903,21 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ self.assertEqual(api.execute_sql.call_count, 1) self.assertEqual(api.execute_streaming_sql.call_count, 2) + def test_transaction_should_execute_sql_with_route_to_leader_disabled(self): + database = _Database() + database._route_to_leader_enabled = False + session = _Session(database) + api = database.spanner_api = self._make_spanner_api() + transaction = self._make_one(session) + self._execute_sql_helper(transaction=transaction, api=api) + + api.execute_streaming_sql.assert_called_once_with( + request=self._execute_sql_expected_request(database=database), + metadata=[("google-cloud-resource-prefix", database.name)], + timeout=TIMEOUT, + retry=RETRY, + ) + class _Client(object): def __init__(self): @@ -842,6 +935,7 @@ class _Database(object): def __init__(self): self.name = "testing" self._instance = _Instance() + self._route_to_leader_enabled = True class _Session(object): diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 5fb69b4979..ccf52f6a9f 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -182,7 +182,13 @@ def test_begin_ok(self): session_id, txn_options, metadata = api._begun self.assertEqual(session_id, session.name) self.assertTrue(type(txn_options).pb(txn_options).HasField("read_write")) - self.assertEqual(metadata, [("google-cloud-resource-prefix", database.name)]) + self.assertEqual( + metadata, + [ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) self.assertSpanAttributes( "CloudSpanner.BeginTransaction", attributes=TestTransaction.BASE_ATTRIBUTES @@ -261,7 +267,13 @@ def test_rollback_ok(self): session_id, txn_id, metadata = api._rolled_back self.assertEqual(session_id, session.name) self.assertEqual(txn_id, self.TRANSACTION_ID) - self.assertEqual(metadata, [("google-cloud-resource-prefix", database.name)]) + self.assertEqual( + metadata, + [ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) self.assertSpanAttributes( "CloudSpanner.Rollback", attributes=TestTransaction.BASE_ATTRIBUTES @@ -364,7 +376,13 @@ def _commit_helper( self.assertEqual(session_id, session.name) self.assertEqual(txn_id, self.TRANSACTION_ID) self.assertEqual(mutations, transaction._mutations) - self.assertEqual(metadata, [("google-cloud-resource-prefix", database.name)]) + self.assertEqual( + metadata, + [ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) self.assertEqual(actual_request_options, expected_request_options) if return_commit_stats: @@ -541,7 +559,10 @@ def _execute_update_helper( request=expected_request, retry=retry, timeout=timeout, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertEqual(transaction._execute_sql_count, count + 1) @@ -714,7 +735,10 @@ def _batch_update_helper(self, error_after=None, count=0, request_options=None): ) api.execute_batch_dml.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], ) self.assertEqual(transaction._execute_sql_count, count + 1) @@ -813,7 +837,13 @@ def test_context_mgr_success(self): self.assertEqual(session_id, self.SESSION_NAME) self.assertEqual(txn_id, self.TRANSACTION_ID) self.assertEqual(mutations, transaction._mutations) - self.assertEqual(metadata, [("google-cloud-resource-prefix", database.name)]) + self.assertEqual( + metadata, + [ + ("google-cloud-resource-prefix", database.name), + ("x-goog-spanner-route-to-leader", "true"), + ], + ) def test_context_mgr_failure(self): from google.protobuf.empty_pb2 import Empty @@ -857,6 +887,7 @@ class _Database(object): def __init__(self): self.name = "testing" self._instance = _Instance() + self._route_to_leader_enabled = True class _Session(object): From 998b87ac1d64867ac069750cba7563b9a36fb07f Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Wed, 3 May 2023 14:54:37 +0530 Subject: [PATCH 2/2] chore(main): release 3.33.0 (#935) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- .release-please-manifest.json | 2 +- CHANGELOG.md | 7 +++++++ google/cloud/spanner_admin_database_v1/gapic_version.py | 2 +- google/cloud/spanner_admin_instance_v1/gapic_version.py | 2 +- google/cloud/spanner_v1/gapic_version.py | 2 +- .../snippet_metadata_google.spanner.admin.database.v1.json | 2 +- .../snippet_metadata_google.spanner.admin.instance.v1.json | 2 +- .../snippet_metadata_google.spanner.v1.json | 2 +- 8 files changed, 14 insertions(+), 7 deletions(-) diff --git a/.release-please-manifest.json b/.release-please-manifest.json index fdaa154ba6..c773879722 100644 --- a/.release-please-manifest.json +++ b/.release-please-manifest.json @@ -1,3 +1,3 @@ { - ".": "3.32.0" + ".": "3.33.0" } diff --git a/CHANGELOG.md b/CHANGELOG.md index d3ac8844a7..6d001715b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ [1]: https://pypi.org/project/google-cloud-spanner/#history +## [3.33.0](https://github.com/googleapis/python-spanner/compare/v3.32.0...v3.33.0) (2023-04-27) + + +### Features + +* Leader Aware Routing ([#899](https://github.com/googleapis/python-spanner/issues/899)) ([f9fefad](https://github.com/googleapis/python-spanner/commit/f9fefad6ee2e16804d109d8bfbb613062f57ea65)) + ## [3.32.0](https://github.com/googleapis/python-spanner/compare/v3.31.0...v3.32.0) (2023-04-25) diff --git a/google/cloud/spanner_admin_database_v1/gapic_version.py b/google/cloud/spanner_admin_database_v1/gapic_version.py index c25973c215..d28f9e263b 100644 --- a/google/cloud/spanner_admin_database_v1/gapic_version.py +++ b/google/cloud/spanner_admin_database_v1/gapic_version.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "3.32.0" # {x-release-please-version} +__version__ = "3.33.0" # {x-release-please-version} diff --git a/google/cloud/spanner_admin_instance_v1/gapic_version.py b/google/cloud/spanner_admin_instance_v1/gapic_version.py index c25973c215..d28f9e263b 100644 --- a/google/cloud/spanner_admin_instance_v1/gapic_version.py +++ b/google/cloud/spanner_admin_instance_v1/gapic_version.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "3.32.0" # {x-release-please-version} +__version__ = "3.33.0" # {x-release-please-version} diff --git a/google/cloud/spanner_v1/gapic_version.py b/google/cloud/spanner_v1/gapic_version.py index c25973c215..d28f9e263b 100644 --- a/google/cloud/spanner_v1/gapic_version.py +++ b/google/cloud/spanner_v1/gapic_version.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "3.32.0" # {x-release-please-version} +__version__ = "3.33.0" # {x-release-please-version} diff --git a/samples/generated_samples/snippet_metadata_google.spanner.admin.database.v1.json b/samples/generated_samples/snippet_metadata_google.spanner.admin.database.v1.json index 84392b855c..5562aea1b3 100644 --- a/samples/generated_samples/snippet_metadata_google.spanner.admin.database.v1.json +++ b/samples/generated_samples/snippet_metadata_google.spanner.admin.database.v1.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-spanner-admin-database", - "version": "3.32.0" + "version": "3.33.0" }, "snippets": [ { diff --git a/samples/generated_samples/snippet_metadata_google.spanner.admin.instance.v1.json b/samples/generated_samples/snippet_metadata_google.spanner.admin.instance.v1.json index a55d81e55b..ef55b568ac 100644 --- a/samples/generated_samples/snippet_metadata_google.spanner.admin.instance.v1.json +++ b/samples/generated_samples/snippet_metadata_google.spanner.admin.instance.v1.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-spanner-admin-instance", - "version": "3.32.0" + "version": "3.33.0" }, "snippets": [ { diff --git a/samples/generated_samples/snippet_metadata_google.spanner.v1.json b/samples/generated_samples/snippet_metadata_google.spanner.v1.json index 37e501b123..afcec7443d 100644 --- a/samples/generated_samples/snippet_metadata_google.spanner.v1.json +++ b/samples/generated_samples/snippet_metadata_google.spanner.v1.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-spanner", - "version": "3.32.0" + "version": "3.33.0" }, "snippets": [ {