Skip to content

Commit

Permalink
feat: add support for Directed Read options (#2766)
Browse files Browse the repository at this point in the history
* fix: prevent illegal negative timeout values into thread sleep() method while retrying exceptions in unit tests.

* For details on issue see - #2206

* Fixing lint issues.

* feat: add support for Directed Read options.

* chore: fix lint issues.

* test: add unit tests for options class.

* test: add tests using mock spanner.

* test: add unit test for partitioned read.

* test: add unit test for partitioned read.

* chore: adding option in spanner options.

* chore: fix NPE.

* chore: disabling test on emulator.

* chore: adding test for query in RW transaction.

* chore: adding IT for transaction manager interface.

* chore: disable IT for emulator.

* chore: PR comments.

* chore: address PR comments.
  • Loading branch information
arpan14 committed Jan 3, 2024
1 parent 74a586f commit 26c6c63
Show file tree
Hide file tree
Showing 12 changed files with 740 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
Expand Down Expand Up @@ -72,6 +73,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
private Span span = Tracing.getTracer().getCurrentSpan();
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private DirectedReadOptions defaultDirectedReadOption;
private ExecutorProvider executorProvider;
private Clock clock = new Clock();

Expand Down Expand Up @@ -117,6 +119,11 @@ B setClock(Clock clock) {
return self();
}

B setDefaultDirectedReadOptions(DirectedReadOptions directedReadOptions) {
this.defaultDirectedReadOption = directedReadOptions;
return self();
}

abstract T build();
}

Expand Down Expand Up @@ -399,6 +406,7 @@ void initTransaction() {
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

private final DirectedReadOptions defaultDirectedReadOptions;
private final Clock clock;

@GuardedBy("lock")
Expand All @@ -423,6 +431,7 @@ void initTransaction() {
this.rpc = builder.rpc;
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
this.defaultDirectedReadOptions = builder.defaultDirectedReadOption;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.clock = builder.clock;
Expand Down Expand Up @@ -623,6 +632,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
if (options.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(options.dataBoostEnabled());
}
if (options.hasDirectedReadOptions()) {
builder.setDirectedReadOptions(options.directedReadOptions());
} else if (defaultDirectedReadOptions != null) {
builder.setDirectedReadOptions(defaultDirectedReadOptions);
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
Expand Down Expand Up @@ -811,6 +825,11 @@ ResultSet readInternalWithOptions(
if (readOptions.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(readOptions.dataBoostEnabled());
}
if (readOptions.hasDirectedReadOptions()) {
builder.setDirectedReadOptions(readOptions.directedReadOptions());
} else if (defaultDirectedReadOptions != null) {
builder.setDirectedReadOptions(defaultDirectedReadOptions);
}
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(
sessionClient.getSpanner().getOptions().getDirectedReadOptions()),
checkNotNull(bound));
}

Expand All @@ -77,7 +79,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(
sessionClient.getSpanner().getOptions().getDirectedReadOptions()),
batchTransactionId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.common.base.Preconditions;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.RequestOptions.Priority;
import java.io.Serializable;
import java.util.Objects;
Expand Down Expand Up @@ -224,6 +225,18 @@ public static CreateUpdateDeleteAdminApiOption validateOnly(Boolean validateOnly
return new ValidateOnlyOption(validateOnly);
}

/**
* Option to request DirectedRead for ReadOnlyTransaction and SingleUseTransaction.
*
* <p>The DirectedReadOptions can be used to indicate which replicas or regions should be used for
* non-transactional reads or queries. Not all requests can be sent to non-leader replicas. In
* particular, some requests such as reads within read-write transactions must be sent to a
* designated leader replica. These requests ignore DirectedReadOptions.
*/
public static ReadAndQueryOption directedRead(DirectedReadOptions directedReadOptions) {
return new DirectedReadOption(directedReadOptions);
}

/** Option to request {@link CommitStats} for read/write transactions. */
static final class CommitStatsOption extends InternalOption implements TransactionOption {
@Override
Expand Down Expand Up @@ -325,6 +338,21 @@ void appendToOptions(Options options) {
}
}

static final class DirectedReadOption extends InternalOption implements ReadAndQueryOption {
private final DirectedReadOptions directedReadOptions;

DirectedReadOption(DirectedReadOptions directedReadOptions) {
this.directedReadOptions =
Preconditions.checkNotNull(directedReadOptions, "DirectedReadOptions cannot be null");
;
}

@Override
void appendToOptions(Options options) {
options.directedReadOptions = directedReadOptions;
}
}

private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
Expand All @@ -338,6 +366,7 @@ void appendToOptions(Options options) {
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean dataBoostEnabled;
private DirectedReadOptions directedReadOptions;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -438,6 +467,14 @@ Boolean dataBoostEnabled() {
return dataBoostEnabled;
}

boolean hasDirectedReadOptions() {
return directedReadOptions != null;
}

DirectedReadOptions directedReadOptions() {
return directedReadOptions;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand Down Expand Up @@ -477,6 +514,9 @@ public String toString() {
if (dataBoostEnabled != null) {
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
}
if (directedReadOptions != null) {
b.append("directedReadOptions: ").append(directedReadOptions).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -512,7 +552,8 @@ public boolean equals(Object o) {
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled());
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
}

@Override
Expand Down Expand Up @@ -557,6 +598,9 @@ public int hashCode() {
if (dataBoostEnabled != null) {
result = 31 * result + dataBoostEnabled.hashCode();
}
if (directedReadOptions != null) {
result = 31 * result + directedReadOptions.hashCode();
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ public ReadContext singleUse(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
Expand All @@ -274,6 +275,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.buildSingleUseReadOnlyTransaction());
Expand All @@ -293,6 +295,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.TransportOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.Options.DirectedReadOption;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.admin.database.v1.DatabaseAdminSettings;
Expand All @@ -50,6 +51,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.SpannerGrpc;
Expand Down Expand Up @@ -137,6 +139,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final String compressorName;
private final boolean leaderAwareRoutingEnabled;
private final boolean attemptDirectPath;
private final DirectedReadOptions directedReadOptions;

/** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */
public interface CallCredentialsProvider {
Expand Down Expand Up @@ -627,6 +630,7 @@ private SpannerOptions(Builder builder) {
compressorName = builder.compressorName;
leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled;
attemptDirectPath = builder.attemptDirectPath;
directedReadOptions = builder.directedReadOptions;
}

/**
Expand Down Expand Up @@ -729,6 +733,7 @@ public static class Builder
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
private boolean leaderAwareRoutingEnabled = true;
private boolean attemptDirectPath = true;
private DirectedReadOptions directedReadOptions;

private static String createCustomClientLibToken(String token) {
return token + " " + ServiceOptions.getGoogApiClientLibName();
Expand Down Expand Up @@ -789,6 +794,7 @@ private Builder() {
this.channelConfigurator = options.channelConfigurator;
this.interceptorProvider = options.interceptorProvider;
this.attemptDirectPath = options.attemptDirectPath;
this.directedReadOptions = options.directedReadOptions;
}

@Override
Expand Down Expand Up @@ -1153,6 +1159,32 @@ public Builder setAsyncExecutorProvider(CloseableExecutorProvider provider) {
return this;
}

/**
* Sets the {@link DirectedReadOption} that specify which replicas or regions should be used for
* non-transactional reads or queries.
*
* <p>DirectedReadOptions set at the request level will take precedence over the options set
* using this method.
*
* <p>An example below of how {@link DirectedReadOptions} can be constructed by including a
* replica.
*
* <pre><code>
* DirectedReadOptions.newBuilder()
* .setIncludeReplicas(
* IncludeReplicas.newBuilder()
* .addReplicaSelections(
* ReplicaSelection.newBuilder().setLocation("us-east1").build()))
* .build();
* }
* </code></pre>
*/
public Builder setDirectedReadOptions(DirectedReadOptions directedReadOptions) {
this.directedReadOptions =
Preconditions.checkNotNull(directedReadOptions, "DirectedReadOptions cannot be null");
return this;
}

/**
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
Expand Down Expand Up @@ -1371,6 +1403,10 @@ public boolean isLeaderAwareRoutingEnabled() {
return leaderAwareRoutingEnabled;
}

public DirectedReadOptions getDirectedReadOptions() {
return directedReadOptions;
}

@BetaApi
public boolean isAttemptDirectPath() {
return attemptDirectPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas;
import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
Expand All @@ -45,6 +48,14 @@

@RunWith(Parameterized.class)
public class AbstractReadContextTest {
private static final DirectedReadOptions DIRECTED_READ_OPTIONS =
DirectedReadOptions.newBuilder()
.setIncludeReplicas(
IncludeReplicas.newBuilder()
.addReplicaSelections(
ReplicaSelection.newBuilder().setLocation("us-west1").build()))
.build();

@Parameter(0)
public QueryOptions defaultQueryOptions;

Expand Down Expand Up @@ -250,4 +261,15 @@ public void executeSqlRequestBuilderWithRequestOptionsWithTxnTag() {
.isEqualTo("app=spanner,env=test,action=query");
assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test");
}

@Test
public void testGetExecuteSqlRequestBuilderWithDirectedReadOptions() {
ExecuteSqlRequest.Builder request =
context.getExecuteSqlRequestBuilder(
Statement.of("SELECT * FROM FOO"),
QueryMode.NORMAL,
Options.fromQueryOptions(Options.directedRead(DIRECTED_READ_OPTIONS)),
false);
assertEquals(DIRECTED_READ_OPTIONS, request.getDirectedReadOptions());
}
}
Loading

0 comments on commit 26c6c63

Please sign in to comment.