Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Open telemetry implementation #2770

Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ nosetests.xml
.settings
.DS_Store
.classpath
.tool-versions

# Built documentation
docs/
Expand Down
7 changes: 6 additions & 1 deletion google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void rollbackToSavepoint(java.lang.String)</method>
</difference>

<!-- Delay start transaction -->
<difference>
<differenceType>7012</differenceType>
Expand Down Expand Up @@ -540,6 +539,12 @@
<className>com/google/cloud/spanner/Dialect</className>
<method>java.lang.String getDefaultSchema()</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/spanner/PartitionedDmlTransaction</className>
<method>void setSpan(io.opencensus.trace.Span)</method>
<to>void setSpan(com.google.cloud.spanner.ISpan)</to>
</difference>

<!-- Added DirectedReadOptions -->
<difference>
Expand Down
32 changes: 29 additions & 3 deletions google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@
<artifactId>opencensus-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
Expand Down Expand Up @@ -393,7 +401,6 @@
<version>2.2</version>
<scope>test</scope>
</dependency>

<!-- Benchmarking dependencies -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
Expand All @@ -407,9 +414,28 @@
<version>1.37</version>
<scope>test</scope>
</dependency>

<!-- OpenTelemetry test dependencies -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-metrics</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>java9</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
Expand All @@ -53,8 +54,6 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracing;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
Expand All @@ -70,7 +69,8 @@ abstract class AbstractReadContext
abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadContext> {
private SessionImpl session;
private SpannerRpc rpc;
private Span span = Tracing.getTracer().getCurrentSpan();
private ISpan span;
private TraceWrapper tracer;
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private DirectedReadOptions defaultDirectedReadOption;
Expand All @@ -94,11 +94,16 @@ B setRpc(SpannerRpc rpc) {
return self();
}

B setSpan(Span span) {
B setSpan(ISpan span) {
this.span = span;
return self();
}

B setTracer(TraceWrapper tracer) {
this.tracer = tracer;
return self();
}

B setDefaultPrefetchChunks(int defaultPrefetchChunks) {
this.defaultPrefetchChunks = defaultPrefetchChunks;
return self();
Expand Down Expand Up @@ -389,9 +394,12 @@ void initTransaction() {
}
transactionId = transaction.getId();
span.addAnnotation(
"Transaction Creation Done", TraceUtil.getTransactionAnnotations(transaction));
"Transaction Creation Done",
ImmutableMap.of(
"Id", transaction.getId().toStringUtf8(), "Timestamp", timestamp.toString()));

} catch (SpannerException e) {
span.addAnnotation("Transaction Creation Failed", TraceUtil.getExceptionAnnotations(e));
span.addAnnotation("Transaction Creation Failed", e);
throw e;
}
}
Expand All @@ -402,7 +410,8 @@ void initTransaction() {
final SessionImpl session;
final SpannerRpc rpc;
final ExecutorProvider executorProvider;
Span span;
ISpan span;
TraceWrapper tracer;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

Expand Down Expand Up @@ -435,10 +444,11 @@ void initTransaction() {
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.clock = builder.clock;
this.tracer = builder.tracer;
}

@Override
public void setSpan(Span span) {
public void setSpan(ISpan span) {
this.span = span;
}

Expand Down Expand Up @@ -692,6 +702,7 @@ ResultSet executeQueryInternalWithOptions(
MAX_BUFFERED_CHUNKS,
SpannerImpl.QUERY,
span,
tracer,
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
Expand Down Expand Up @@ -752,7 +763,7 @@ public final void invalidate() {

@Override
public void close() {
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
synchronized (lock) {
isClosed = true;
}
Expand Down Expand Up @@ -837,6 +848,7 @@ ResultSet readInternalWithOptions(
MAX_BUFFERED_CHUNKS,
SpannerImpl.READ,
span,
tracer,
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.CharSource;
import com.google.common.util.concurrent.Uninterruptibles;
Expand All @@ -53,11 +52,6 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TypeCode;
import io.grpc.Context;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
Expand Down Expand Up @@ -87,7 +81,6 @@

/** Implementation of {@link ResultSet}. */
abstract class AbstractResultSet<R> extends AbstractStructReader implements ResultSet {
private static final Tracer tracer = Tracing.getTracer();
private static final com.google.protobuf.Value NULL_VALUE =
com.google.protobuf.Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build();

Expand Down Expand Up @@ -1206,7 +1199,8 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
private final BackOff backOff;
private final LinkedList<PartialResultSet> buffer = new LinkedList<>();
private final int maxBufferSize;
private final Span span;
private final ISpan span;
private final TraceWrapper tracer;
private CloseableIterator<PartialResultSet> stream;
private ByteString resumeToken;
private boolean finished;
Expand All @@ -1220,12 +1214,14 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
protected ResumableStreamIterator(
int maxBufferSize,
String streamName,
Span parent,
ISpan parent,
TraceWrapper tracer,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan();
this.tracer = tracer;
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent);
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
this.backOff = newBackOff();
Expand Down Expand Up @@ -1281,11 +1277,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
}

private void backoffSleep(Context context, long backoffMillis) throws SpannerException {
tracer
.getCurrentSpan()
.addAnnotation(
"Backing off",
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis)));
tracer.getCurrentSpan().addAnnotation("Backing off", "Delay", backoffMillis);
final CountDownLatch latch = new CountDownLatch(1);
final Context.CancellationListener listener =
ignored -> {
Expand Down Expand Up @@ -1325,7 +1317,7 @@ public void execute(Runnable command) {
public void close(@Nullable String message) {
if (stream != null) {
stream.close(message);
span.end(TraceUtil.END_SPAN_OPTIONS);
span.end();
stream = null;
}
}
Expand All @@ -1343,11 +1335,9 @@ protected PartialResultSet computeNext() {
if (stream == null) {
span.addAnnotation(
"Starting/Resuming stream",
ImmutableMap.of(
"ResumeToken",
AttributeValue.stringAttributeValue(
resumeToken == null ? "null" : resumeToken.toStringUtf8())));
try (Scope s = tracer.withSpan(span)) {
"ResumeToken",
resumeToken == null ? "null" : resumeToken.toStringUtf8());
try (IScope scope = tracer.withSpan(span)) {
// When start a new stream set the Span as current to make the gRPC Span a child of
// this Span.
stream = checkNotNull(startStream(resumeToken));
Expand Down Expand Up @@ -1387,17 +1377,15 @@ protected PartialResultSet computeNext() {
}
} catch (SpannerException spannerException) {
if (safeToRetry && isRetryable(spannerException)) {
span.addAnnotation(
"Stream broken. Safe to retry",
TraceUtil.getExceptionAnnotations(spannerException));
span.addAnnotation("Stream broken. Safe to retry", spannerException);
logger.log(Level.FINE, "Retryable exception, will sleep and retry", spannerException);
// Truncate any items in the buffer before the last retry token.
while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) {
buffer.removeLast();
}
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
stream = null;
try (Scope s = tracer.withSpan(span)) {
try (IScope s = tracer.withSpan(span)) {
long delay = spannerException.getRetryDelayInMillis();
if (delay != -1) {
backoffSleep(context, delay);
Expand All @@ -1408,12 +1396,12 @@ protected PartialResultSet computeNext() {

continue;
}
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, spannerException);
span.addAnnotation("Stream broken. Not safe to retry", spannerException);
span.setStatus(spannerException);
throw spannerException;
} catch (RuntimeException e) {
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, e);
span.addAnnotation("Stream broken. Not safe to retry", e);
span.setStatus(e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,27 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;

/** Implementation of {@link AsyncTransactionManager}. */
final class AsyncTransactionManagerImpl
implements CommittableAsyncTransactionManager, SessionTransaction {
private static final Tracer tracer = Tracing.getTracer();

private final SessionImpl session;
private Span span;
private ISpan span;
private final Options options;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<CommitResponse> commitResponse = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) {
AsyncTransactionManagerImpl(SessionImpl session, ISpan span, TransactionOption... options) {
this.session = session;
this.span = span;
this.options = Options.fromTransactionOptions(options);
}

@Override
public void setSpan(Span span) {
public void setSpan(ISpan span) {
this.span = span;
}

Expand Down
Loading
Loading