Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for Directed Read options #2766

Merged
merged 27 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
edc5bbf
fix: prevent illegal negative timeout values into thread sleep() meth…
arpan14 Feb 6, 2023
49a85df
Merge pull request #1 from arpan14/retryerror
arpan14 Feb 8, 2023
4cd497b
Fixing lint issues.
arpan14 Feb 8, 2023
4a6aa8e
Merge branch 'googleapis:main' into main
arpan14 Mar 13, 2023
b2aa09d
Merge branch 'googleapis:main' into main
arpan14 Mar 15, 2023
8d6d71e
Merge branch 'googleapis:main' into main
arpan14 May 9, 2023
77e6e7d
Merge branch 'googleapis:main' into main
arpan14 Jul 17, 2023
e8b7fad
Merge branch 'googleapis:main' into main
arpan14 Jul 25, 2023
8aa84e1
Merge branch 'googleapis:main' into main
arpan14 Oct 10, 2023
57fd405
Merge branch 'googleapis:main' into main
arpan14 Oct 27, 2023
1253563
Merge branch 'googleapis:main' into main
arpan14 Nov 20, 2023
d4f6a60
Merge branch 'googleapis:main' into main
arpan14 Dec 15, 2023
3efaf7c
Merge branch 'googleapis:main' into main
arpan14 Dec 26, 2023
3269c6b
feat: add support for Directed Read options.
arpan14 Dec 27, 2023
8725334
chore: fix lint issues.
arpan14 Dec 27, 2023
2311384
test: add unit tests for options class.
arpan14 Dec 27, 2023
bf5e6e0
test: add tests using mock spanner.
arpan14 Dec 27, 2023
5a3427e
test: add unit test for partitioned read.
arpan14 Dec 27, 2023
5c8028e
test: add unit test for partitioned read.
arpan14 Dec 28, 2023
12e93a7
chore: adding option in spanner options.
arpan14 Dec 29, 2023
9122ece
chore: fix NPE.
arpan14 Dec 29, 2023
c8df52c
chore: disabling test on emulator.
arpan14 Dec 30, 2023
ead5ab6
chore: adding test for query in RW transaction.
arpan14 Jan 2, 2024
641cb0b
chore: adding IT for transaction manager interface.
arpan14 Jan 2, 2024
54b8ee0
chore: disable IT for emulator.
arpan14 Jan 3, 2024
87aa78d
chore: PR comments.
arpan14 Jan 3, 2024
db8b52c
chore: address PR comments.
arpan14 Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
Expand Down Expand Up @@ -72,6 +73,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
private Span span = Tracing.getTracer().getCurrentSpan();
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private DirectedReadOptions defaultDirectedReadOption;
private ExecutorProvider executorProvider;
private Clock clock = new Clock();

Expand Down Expand Up @@ -117,6 +119,11 @@ B setClock(Clock clock) {
return self();
}

B setDefaultDirectedReadOptions(DirectedReadOptions directedReadOptions) {
this.defaultDirectedReadOption = directedReadOptions;
return self();
}

abstract T build();
}

Expand Down Expand Up @@ -399,6 +406,7 @@ void initTransaction() {
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

private final DirectedReadOptions defaultDirectedReadOptions;
private final Clock clock;

@GuardedBy("lock")
Expand All @@ -423,6 +431,7 @@ void initTransaction() {
this.rpc = builder.rpc;
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
this.defaultDirectedReadOptions = builder.defaultDirectedReadOption;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.clock = builder.clock;
Expand Down Expand Up @@ -623,6 +632,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
if (options.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(options.dataBoostEnabled());
}
if (options.hasDirectedReadOptions()) {
builder.setDirectedReadOptions(options.directedReadOptions());
} else if (defaultDirectedReadOptions != null) {
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
builder.setDirectedReadOptions(defaultDirectedReadOptions);
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
Expand Down Expand Up @@ -811,6 +825,11 @@ ResultSet readInternalWithOptions(
if (readOptions.hasDataBoostEnabled()) {
builder.setDataBoostEnabled(readOptions.dataBoostEnabled());
}
if (readOptions.hasDirectedReadOptions()) {
builder.setDirectedReadOptions(readOptions.directedReadOptions());
} else if (defaultDirectedReadOptions != null) {
builder.setDirectedReadOptions(defaultDirectedReadOptions);
}
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(
sessionClient.getSpanner().getOptions().getDirectedReadOptions()),
checkNotNull(bound));
}

Expand All @@ -77,7 +79,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(
sessionClient.getSpanner().getOptions().getDirectedReadOptions()),
batchTransactionId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.common.base.Preconditions;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.RequestOptions.Priority;
import java.io.Serializable;
import java.util.Objects;
Expand Down Expand Up @@ -224,6 +225,18 @@ public static CreateUpdateDeleteAdminApiOption validateOnly(Boolean validateOnly
return new ValidateOnlyOption(validateOnly);
}

/**
* Option to request DirectedRead for ReadOnlyTransaction and SingleUseTransaction.
*
* <p>The DirectedReadOptions can be used to indicate which replicas or regions should be used for
* non-transactional reads or queries. Not all requests can be sent to non-leader replicas. In
* particular, some requests such as reads within read-write transactions must be sent to a
* designated leader replica. These requests ignore DirectedReadOptions.
*/
public static ReadAndQueryOption directedRead(DirectedReadOptions directedReadOptions) {
return new DirectedReadOption(directedReadOptions);
}

/** Option to request {@link CommitStats} for read/write transactions. */
static final class CommitStatsOption extends InternalOption implements TransactionOption {
@Override
Expand Down Expand Up @@ -325,6 +338,21 @@ void appendToOptions(Options options) {
}
}

static final class DirectedReadOption extends InternalOption implements ReadAndQueryOption {
private final DirectedReadOptions directedReadOptions;

DirectedReadOption(DirectedReadOptions directedReadOptions) {
this.directedReadOptions =
Preconditions.checkNotNull(directedReadOptions, "DirectedReadOptions cannot be null");
;
}

@Override
void appendToOptions(Options options) {
options.directedReadOptions = directedReadOptions;
}
}

private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
Expand All @@ -338,6 +366,7 @@ void appendToOptions(Options options) {
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean dataBoostEnabled;
private DirectedReadOptions directedReadOptions;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -438,6 +467,14 @@ Boolean dataBoostEnabled() {
return dataBoostEnabled;
}

boolean hasDirectedReadOptions() {
return directedReadOptions != null;
}

DirectedReadOptions directedReadOptions() {
return directedReadOptions;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand Down Expand Up @@ -477,6 +514,9 @@ public String toString() {
if (dataBoostEnabled != null) {
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
}
if (directedReadOptions != null) {
b.append("directedReadOptions: ").append(directedReadOptions).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -512,7 +552,8 @@ public boolean equals(Object o) {
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled());
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
}

@Override
Expand Down Expand Up @@ -557,6 +598,9 @@ public int hashCode() {
if (dataBoostEnabled != null) {
result = 31 * result + dataBoostEnabled.hashCode();
}
if (directedReadOptions != null) {
result = 31 * result + directedReadOptions.hashCode();
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ public ReadContext singleUse(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
Expand All @@ -274,6 +275,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.buildSingleUseReadOnlyTransaction());
Expand All @@ -293,6 +295,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.TransportOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.Options.DirectedReadOption;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.admin.database.v1.DatabaseAdminSettings;
Expand All @@ -50,6 +51,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.SpannerGrpc;
Expand Down Expand Up @@ -137,6 +139,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final String compressorName;
private final boolean leaderAwareRoutingEnabled;
private final boolean attemptDirectPath;
private final DirectedReadOptions directedReadOptions;

/** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */
public interface CallCredentialsProvider {
Expand Down Expand Up @@ -627,6 +630,7 @@ private SpannerOptions(Builder builder) {
compressorName = builder.compressorName;
leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled;
attemptDirectPath = builder.attemptDirectPath;
directedReadOptions = builder.directedReadOptions;
}

/**
Expand Down Expand Up @@ -729,6 +733,7 @@ public static class Builder
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
private boolean leaderAwareRoutingEnabled = true;
private boolean attemptDirectPath = true;
private DirectedReadOptions directedReadOptions;

private static String createCustomClientLibToken(String token) {
return token + " " + ServiceOptions.getGoogApiClientLibName();
Expand Down Expand Up @@ -789,6 +794,7 @@ private Builder() {
this.channelConfigurator = options.channelConfigurator;
this.interceptorProvider = options.interceptorProvider;
this.attemptDirectPath = options.attemptDirectPath;
this.directedReadOptions = options.directedReadOptions;
}

@Override
Expand Down Expand Up @@ -1153,6 +1159,32 @@ public Builder setAsyncExecutorProvider(CloseableExecutorProvider provider) {
return this;
}

/**
* Sets the {@link DirectedReadOption} that specify which replicas or regions should be used for
* non-transactional reads or queries.
*
* <p>DirectedReadOptions set at the request level will take precedence over the options set
* using this method.
*
* <p>An example below of how {@link DirectedReadOptions} can be constructed by including a
* replica.
*
* <pre><code>
* DirectedReadOptions.newBuilder()
* .setIncludeReplicas(
* IncludeReplicas.newBuilder()
* .addReplicaSelections(
* ReplicaSelection.newBuilder().setLocation("us-east1").build()))
* .build();
* }
* </code></pre>
*/
public Builder setDirectedReadOptions(DirectedReadOptions directedReadOptions) {
this.directedReadOptions =
Preconditions.checkNotNull(directedReadOptions, "DirectedReadOptions cannot be null");
return this;
}

/**
* Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code
* PartialResultSet} chunks for each read and query. The data size of each chunk depends on the
Expand Down Expand Up @@ -1371,6 +1403,10 @@ public boolean isLeaderAwareRoutingEnabled() {
return leaderAwareRoutingEnabled;
}

public DirectedReadOptions getDirectedReadOptions() {
return directedReadOptions;
}

@BetaApi
public boolean isAttemptDirectPath() {
return attemptDirectPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas;
import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
Expand All @@ -45,6 +48,14 @@

@RunWith(Parameterized.class)
public class AbstractReadContextTest {
private static final DirectedReadOptions DIRECTED_READ_OPTIONS =
DirectedReadOptions.newBuilder()
.setIncludeReplicas(
IncludeReplicas.newBuilder()
.addReplicaSelections(
ReplicaSelection.newBuilder().setLocation("us-west1").build()))
.build();

@Parameter(0)
public QueryOptions defaultQueryOptions;

Expand Down Expand Up @@ -250,4 +261,15 @@ public void executeSqlRequestBuilderWithRequestOptionsWithTxnTag() {
.isEqualTo("app=spanner,env=test,action=query");
assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test");
}

@Test
public void testGetExecuteSqlRequestBuilderWithDirectedReadOptions() {
ExecuteSqlRequest.Builder request =
context.getExecuteSqlRequestBuilder(
Statement.of("SELECT * FROM FOO"),
QueryMode.NORMAL,
Options.fromQueryOptions(Options.directedRead(DIRECTED_READ_OPTIONS)),
false);
assertEquals(DIRECTED_READ_OPTIONS, request.getDirectedReadOptions());
}
}
Loading
Loading