From e27dbe5f58229dab208eeeed44d53e741700c814 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 12 Apr 2023 09:06:05 +0200 Subject: [PATCH] perf: remove custom transport executor (#2366) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * perf: remove custom transport executor Remove the custom executor provider that is set on the underlying generated clients and instead use the internal core gRPC executor provider. The latter is a shared executor provider for all gRPC channels that creates threads on demand. This prevents the creation of unnecessary threads at startup, and can reduce overall thread usage for applications that create multiple Spanner instances during their lifetime. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 68 ------- .../cloud/spanner/BaseSessionPoolTest.java | 5 +- .../cloud/spanner/SpannerOptionsTest.java | 26 ++- .../spanner/SpannerOptionsThreadTest.java | 179 ------------------ .../cloud/spanner/SpannerThreadsTest.java | 5 +- 5 files changed, 30 insertions(+), 253 deletions(-) delete mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 7f325665542..eb8633d0449 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -23,7 +23,6 @@ import com.google.api.core.InternalApi; import com.google.api.core.NanoClock; import com.google.api.gax.core.CredentialsProvider; -import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.GaxProperties; import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.GrpcCallContext; @@ -185,7 +184,6 @@ import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -200,8 +198,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -211,53 +207,6 @@ /** Implementation of Cloud Spanner remote calls using Gapic libraries. */ @InternalApi public class GapicSpannerRpc implements SpannerRpc { - - /** - * {@link ExecutorProvider} that keeps track of the executors that are created and shuts these - * down when the {@link SpannerRpc} is closed. - */ - private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider { - - // 4 Gapic clients * 4 channels per client. - private static final int DEFAULT_MIN_THREAD_COUNT = 16; - private final List executors = new LinkedList<>(); - private final ThreadFactory threadFactory; - - private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) { - this.threadFactory = threadFactory; - } - - @Override - public boolean shouldAutoClose() { - return false; - } - - @Override - public ScheduledExecutorService getExecutor() { - int numCpus = Runtime.getRuntime().availableProcessors(); - int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus); - ScheduledExecutorService executor = - new ScheduledThreadPoolExecutor(numThreads, threadFactory); - synchronized (this) { - executors.add(executor); - } - return executor; - } - - /** Shuts down all executors that have been created by this {@link ExecutorProvider}. */ - private synchronized void shutdown() { - for (ScheduledExecutorService executor : executors) { - executor.shutdown(); - } - } - - private void awaitTermination() throws InterruptedException { - for (ScheduledExecutorService executor : executors) { - executor.awaitTermination(10L, TimeUnit.SECONDS); - } - } - } - private static final PathTemplate PROJECT_NAME_TEMPLATE = PathTemplate.create("projects/{project}"); private static final PathTemplate OPERATION_NAME_TEMPLATE = @@ -277,7 +226,6 @@ private void awaitTermination() throws InterruptedException { CLIENT_LIBRARY_LANGUAGE + "/" + GaxProperties.getLibraryVersion(GapicSpannerRpc.class); private static final String API_FILE = "grpc-gcp-apiconfig.json"; - private final ManagedInstantiatingExecutorProvider executorProvider; private boolean rpcIsClosed; private final SpannerStub spannerStub; private final SpannerStub partitionedDmlStub; @@ -356,13 +304,6 @@ public GapicSpannerRpc(final SpannerOptions options) { this.compressorName = options.getCompressorName(); if (initializeStubs) { - // Create a managed executor provider. - this.executorProvider = - new ManagedInstantiatingExecutorProvider( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat(options.getTransportChannelExecutorThreadNameFormat()) - .build()); // First check if SpannerOptions provides a TransportChannelProvider. Create one // with information gathered from SpannerOptions if none is provided InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = @@ -373,11 +314,6 @@ public GapicSpannerRpc(final SpannerOptions options) { .setMaxInboundMetadataSize(MAX_METADATA_SIZE) .setPoolSize(options.getNumChannels()) - // Before updating this method to setExecutor, please verify with a code owner on - // the lowest version of gax-grpc that needs to be supported. Currently v1.47.17, - // which doesn't support the setExecutor variant. - .setExecutorProvider(executorProvider) - // Set a keepalive time of 120 seconds to help long running // commit GRPC calls succeed .setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS)) @@ -536,7 +472,6 @@ public UnaryCallable createUnaryCalla this.databaseAdminStubSettings = null; this.spannerWatchdog = null; this.partitionedDmlRetrySettings = null; - this.executorProvider = null; } } @@ -1932,7 +1867,6 @@ public void shutdown() { this.instanceAdminStub.close(); this.databaseAdminStub.close(); this.spannerWatchdog.shutdown(); - this.executorProvider.shutdown(); try { this.spannerStub.awaitTermination(10L, TimeUnit.SECONDS); @@ -1940,7 +1874,6 @@ public void shutdown() { this.instanceAdminStub.awaitTermination(10L, TimeUnit.SECONDS); this.databaseAdminStub.awaitTermination(10L, TimeUnit.SECONDS); this.spannerWatchdog.awaitTermination(10L, TimeUnit.SECONDS); - this.executorProvider.awaitTermination(); } catch (InterruptedException e) { throw SpannerExceptionFactory.propagateInterrupt(e); } @@ -1954,7 +1887,6 @@ public void shutdownNow() { this.instanceAdminStub.close(); this.databaseAdminStub.close(); this.spannerWatchdog.shutdown(); - this.executorProvider.shutdown(); this.spannerStub.shutdownNow(); this.partitionedDmlStub.shutdownNow(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index 6aaf28a5fc4..3a595358fea 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -54,7 +54,10 @@ public ScheduledExecutorService get() { @Override public void release(ScheduledExecutorService executor) { - executor.shutdown(); + try { + executor.shutdown(); + } catch (Throwable ignore) { + } } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 55a15809a48..8819dab462b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -923,14 +923,22 @@ public void testCustomAsyncExecutorProvider() { @Test public void testDefaultNumChannelsWithGrpcGcpExtensionEnabled() { SpannerOptions options = - SpannerOptions.newBuilder().setProjectId("test-project").enableGrpcGcpExtension().build(); + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .enableGrpcGcpExtension() + .build(); assertEquals(SpannerOptions.GRPC_GCP_ENABLED_DEFAULT_CHANNELS, options.getNumChannels()); } @Test public void testDefaultNumChannelsWithGrpcGcpExtensionDisabled() { - SpannerOptions options = SpannerOptions.newBuilder().setProjectId("test-project").build(); + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .build(); assertEquals(SpannerOptions.DEFAULT_CHANNELS, options.getNumChannels()); } @@ -943,6 +951,7 @@ public void testNumChannelsWithGrpcGcpExtensionEnabled() { SpannerOptions options1 = SpannerOptions.newBuilder() .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) .setNumChannels(numChannels) .enableGrpcGcpExtension() .build(); @@ -954,6 +963,7 @@ public void testNumChannelsWithGrpcGcpExtensionEnabled() { SpannerOptions options2 = SpannerOptions.newBuilder() .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) .enableGrpcGcpExtension() .setNumChannels(numChannels) .build(); @@ -972,12 +982,19 @@ public void checkCreatedInstanceWhenGrpcGcpExtensionDisabled() { Spanner spanner2 = options1.getService(); assertNotSame(spanner1, spanner2); + + spanner1.close(); + spanner2.close(); } @Test public void checkCreatedInstanceWhenGrpcGcpExtensionEnabled() { SpannerOptions options = - SpannerOptions.newBuilder().setProjectId("test-project").enableGrpcGcpExtension().build(); + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .enableGrpcGcpExtension() + .build(); SpannerOptions options1 = options.toBuilder().build(); assertEquals(true, options.isGrpcGcpExtensionEnabled()); assertEquals(options.isGrpcGcpExtensionEnabled(), options1.isGrpcGcpExtensionEnabled()); @@ -986,5 +1003,8 @@ public void checkCreatedInstanceWhenGrpcGcpExtensionEnabled() { Spanner spanner2 = options1.getService(); assertNotSame(spanner1, spanner2); + + spanner1.close(); + spanner2.close(); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java deleted file mode 100644 index 91919b15234..00000000000 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.cloud.NoCredentials; -import com.google.cloud.spanner.connection.AbstractMockServerTest; -import com.google.common.base.Stopwatch; -import com.google.spanner.admin.database.v1.ListDatabasesResponse; -import com.google.spanner.admin.instance.v1.ListInstancesResponse; -import io.grpc.ManagedChannelBuilder; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class SpannerOptionsThreadTest extends AbstractMockServerTest { - private static final int NUMBER_OF_TEST_RUNS = 2; - private static final int DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT = 4; - private static final int NUM_GAPIC_CLIENTS = 4; - private static final int NUM_THREADS = - Math.max( - DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * NUM_GAPIC_CLIENTS, - Runtime.getRuntime().availableProcessors()); - private static final String SPANNER_THREAD_NAME = "Cloud-Spanner-TransportChannel"; - private static final String THREAD_PATTERN = "%s-[0-9]+"; - - private final DatabaseId dbId = DatabaseId.of("p", "i", "d"); - - private SpannerOptions createOptions() { - return SpannerOptions.newBuilder() - .setProjectId("p") - // Set a custom channel configurator to allow http instead of https. - .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) - .setHost("http://localhost:" + getPort()) - .setCredentials(NoCredentials.getInstance()) - .build(); - } - - @Test - public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException { - int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME); - for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { - waitForStartup(); - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)).isAtMost(baseThreadCount); - // Create Spanner instance. - // We make a copy of the options instance, as SpannerOptions caches any service object - // that has been handed out. - SpannerOptions options = createOptions(); - Spanner spanner = options.getService(); - // Get a database client and do a query. This should initiate threads for the Spanner service. - DatabaseClient client = spanner.getDatabaseClient(dbId); - List resultSets = new ArrayList<>(); - // SpannerStub affiliates a channel with a session, so we need to use multiple sessions - // to ensure we also hit multiple channels. - for (int i2 = 0; i2 < options.getSessionPoolOptions().getMaxSessions(); i2++) { - ResultSet rs = client.singleUse().executeQuery(SELECT_COUNT_STATEMENT); - // Execute ResultSet#next() to send the query to Spanner. - rs.next(); - // Delay closing the result set in order to force the use of multiple sessions. - // As each session is linked to one transport channel, using multiple different - // sessions should initialize multiple transport channels. - resultSets.add(rs); - // Check whether the number of expected threads has been reached. - if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) == NUM_THREADS + baseThreadCount) { - break; - } - } - for (ResultSet rs : resultSets) { - rs.close(); - } - // Check the number of threads after the query. Doing a request should initialize a thread - // pool for the underlying SpannerClient. - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)) - .isEqualTo(NUM_THREADS + baseThreadCount); - - // Then do a request to the InstanceAdmin service and check the number of threads. - // Doing a request should initialize a thread pool for the underlying InstanceAdminClient. - for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) { - InstanceAdminClient instanceAdminClient = spanner.getInstanceAdminClient(); - mockInstanceAdmin.addResponse(ListInstancesResponse.getDefaultInstance()); - instanceAdminClient.listInstances(); - } - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)) - .isEqualTo(NUM_THREADS + baseThreadCount); - - // Then do a request to the DatabaseAdmin service and check the number of threads. - // Doing a request should initialize a thread pool for the underlying DatabaseAdminClient. - for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) { - DatabaseAdminClient databaseAdminClient = spanner.getDatabaseAdminClient(); - mockDatabaseAdmin.addResponse(ListDatabasesResponse.getDefaultInstance()); - databaseAdminClient.listDatabases(dbId.getInstanceId().getInstance()); - } - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)) - .isEqualTo(NUM_THREADS + baseThreadCount); - - // Now close the Spanner instance and check whether the threads are shutdown or not. - spanner.close(); - // Wait a little to allow the threads to actually shutdown. - Stopwatch watch = Stopwatch.createStarted(); - while (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) > baseThreadCount - && watch.elapsed(TimeUnit.SECONDS) < 2) { - Thread.sleep(50L); - } - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)).isAtMost(baseThreadCount); - } - } - - @Test - public void testMultipleSpannersFromSameSpannerOptions() throws InterruptedException { - waitForStartup(); - int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME); - SpannerOptions options = createOptions(); - try (Spanner spanner1 = options.getService()) { - // Having both in the try-with-resources block is not possible, as it is the same instance. - // One will be closed before the other, and the closing of the second instance would fail. - Spanner spanner2 = options.getService(); - assertThat(spanner1).isSameInstanceAs(spanner2); - DatabaseClient client1 = spanner1.getDatabaseClient(dbId); - DatabaseClient client2 = spanner2.getDatabaseClient(dbId); - assertThat(client1).isSameInstanceAs(client2); - try (ResultSet rs1 = client1.singleUse().executeQuery(SELECT_COUNT_STATEMENT); - ResultSet rs2 = client2.singleUse().executeQuery(SELECT_COUNT_STATEMENT)) { - while (rs1.next() && rs2.next()) { - // Do nothing, just consume the result sets. - } - } - } - Stopwatch watch = Stopwatch.createStarted(); - while (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) > baseThreadCount - && watch.elapsed(TimeUnit.SECONDS) < 2) { - Thread.sleep(50L); - } - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)).isAtMost(baseThreadCount); - } - - private void waitForStartup() throws InterruptedException { - // Wait until the IT environment has already started all base worker threads. - int threadCount; - Stopwatch watch = Stopwatch.createStarted(); - do { - threadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME); - Thread.sleep(100L); - } while (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) > threadCount - && watch.elapsed(TimeUnit.SECONDS) < 5); - } - - private int getNumberOfThreadsWithName(String serviceName) { - Pattern pattern = Pattern.compile(String.format(THREAD_PATTERN, serviceName)); - Set threadSet = Thread.getAllStackTraces().keySet(); - int res = 0; - for (Thread thread : threadSet) { - if (pattern.matcher(thread.getName()).matches()) { - res++; - } - } - return res; - } -} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerThreadsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerThreadsTest.java index 9789168776f..919eedf6071 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerThreadsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerThreadsTest.java @@ -16,7 +16,6 @@ package com.google.cloud.spanner; -import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.CoreMatchers.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -185,6 +184,7 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException while (getNumberOfThreadsWithName(threadName, false, initialNumberOfThreads) > initialNumberOfThreads && watch.elapsed(TimeUnit.SECONDS) < 2) { + //noinspection BusyWait Thread.sleep(10L); } assertThat( @@ -241,6 +241,7 @@ && getNumberOfThreadsWithName(threadName, false, initialNumberOfThreads) while (getNumberOfThreadsWithName(threadName, false, initialNumberOfThreads) > initialNumberOfThreads && watch.elapsed(TimeUnit.SECONDS) < 5) { + //noinspection BusyWait Thread.sleep(10L); } assertEquals( @@ -283,7 +284,7 @@ private int getNumberOfThreadsWithName(String serviceName, boolean dumpStack, in } } if (dumpStack && res > expected) { - found.stream().forEach(t -> dumpThread(t)); + found.forEach(this::dumpThread); } return res; }