Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenTelemetry tracing to the Publisher and Subscriber #2086

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
903895a
feat: Initial publish side Open Telemetry support
michaelpri10 Jun 17, 2024
469f220
feat: Publish-side trace context injection
michaelpri10 Jun 20, 2024
ad31b53
feat: Tests and improvements to publish side OTel tracing
michaelpri10 Jun 24, 2024
9246689
feat: More tests and refactoring for publish-side OpenTelemetry
michaelpri10 Jun 24, 2024
34594cb
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 24, 2024
901e135
feat: Formatting files
michaelpri10 Jun 24, 2024
6339e52
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 24, 2024
c954ae1
feat: Publisher test changes
michaelpri10 Jun 24, 2024
fc163ab
test: Fix OpenTelemetry test
michaelpri10 Jun 24, 2024
6ab4cfe
Feat: Use OpenTelemetry semconv
michaelpri10 Jun 24, 2024
bc7530a
test: Fix some dependency issues
michaelpri10 Jun 24, 2024
77d56df
feat: Test fix
michaelpri10 Jun 24, 2024
e275efa
feat: Add comment for setter in builder
michaelpri10 Jun 25, 2024
456ac83
Opentelemetry subscribe (#2100)
michaelpri10 Jul 2, 2024
8bbd688
Merge branch 'main' into opentelemetry
michaelpri10 Jul 2, 2024
620e6b5
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 2, 2024
8668b92
Merge branch 'opentelemetry' of https://github.com/googleapis/java-pu…
gcf-owl-bot[bot] Jul 2, 2024
61257b8
Opentelemetry subscribe (#2101)
michaelpri10 Jul 2, 2024
b0e0424
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 2, 2024
e21a9f0
fix: Fix build errors in Publisher
michaelpri10 Jul 2, 2024
150ab74
test: Ignore org.assertj:assertj-core which is required for OTel test…
michaelpri10 Jul 8, 2024
f558305
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 8, 2024
e7b05de
test: Add tests for subscriber OTel functions
michaelpri10 Jul 9, 2024
2ab460c
Merge branch 'main' into opentelemetry
michaelpri10 Aug 28, 2024
6c5e03c
feat: Changes to OpenTelemetry implementation to add links earlier an…
michaelpri10 Aug 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@
<artifactId>google-http-client</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -142,6 +154,21 @@
<artifactId>opencensus-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious that are there any cases Truth and junit can not do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is package is needed for the OpenTelemetry assertions. The tests fail to compile without it.

<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Need testing utility classes for generated gRPC clients tests -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down Expand Up @@ -174,6 +201,7 @@
<ignoredUnusedDeclaredDependency>com.google.auth:google-auth-library-oauth2-http:jar</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.opencensus:opencensus-impl</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>javax.annotation:javax.annotation-api</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>org.assertj:assertj-core</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
public class AckRequestData {
private final String ackId;
private final Optional<SettableApiFuture<AckResponse>> messageFuture;
private PubsubMessageWrapper messageWrapper;

protected AckRequestData(Builder builder) {
this.ackId = builder.ackId;
this.messageFuture = builder.messageFuture;
this.messageWrapper = builder.messageWrapper;
}

public String getAckId() {
Expand All @@ -36,6 +38,17 @@ public SettableApiFuture<AckResponse> getMessageFutureIfExists() {
return this.messageFuture.orElse(null);
}

/**
* Returns an empty PubsubMessageWrapper with OpenTelemetry tracing disabled. This allows methods
* that use this method to be unit tested.
*/
public PubsubMessageWrapper getMessageWrapper() {
if (this.messageWrapper == null) {
return PubsubMessageWrapper.newBuilder(null, null, false).build();
}
return messageWrapper;
}

public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) {
if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) {
switch (ackResponse) {
Expand Down Expand Up @@ -68,6 +81,7 @@ public static Builder newBuilder(String ackId) {
protected static final class Builder {
private final String ackId;
private Optional<SettableApiFuture<AckResponse>> messageFuture = Optional.empty();
private PubsubMessageWrapper messageWrapper;

protected Builder(String ackId) {
this.ackId = ackId;
Expand All @@ -78,6 +92,11 @@ public Builder setMessageFuture(SettableApiFuture<AckResponse> messageFuture) {
return this;
}

public Builder setMessageWrapper(PubsubMessageWrapper messageWrapper) {
this.messageWrapper = messageWrapper;
return this;
}

public AckRequestData build() {
return new AckRequestData(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import io.opentelemetry.api.trace.Tracer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -104,6 +105,10 @@ class MessageDispatcher {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

private final String subscriptionName;
private final boolean enableOpenTelemetryTracing;
private final Tracer tracer;

/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
ACK,
Expand Down Expand Up @@ -157,6 +162,7 @@ public void onFailure(Throwable t) {
t);
this.ackRequestData.setResponse(AckResponse.OTHER, false);
pendingNacks.add(this.ackRequestData);
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
forget();
}

Expand All @@ -169,9 +175,11 @@ public void onSuccess(AckReply reply) {
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("ack");
break;
case NACK:
pendingNacks.add(this.ackRequestData);
this.ackRequestData.getMessageWrapper().endSubscribeProcessSpan("nack");
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
Expand Down Expand Up @@ -217,6 +225,10 @@ private MessageDispatcher(Builder builder) {
jobLock = new ReentrantLock();
messagesWaiter = new Waiter();
sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);

subscriptionName = builder.subscriptionName;
enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
tracer = builder.tracer;
}

private boolean shouldSetMessageFuture() {
Expand Down Expand Up @@ -351,13 +363,15 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
}

private static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
private OutstandingMessage(AckHandler ackHandler) {
this.ackHandler = ackHandler;
}

public PubsubMessageWrapper messageWrapper() {
return this.ackHandler.ackRequestData.getMessageWrapper();
}
}

private static class ReceiptCompleteData {
Expand Down Expand Up @@ -390,10 +404,21 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
if (shouldSetMessageFuture()) {
builder.setMessageFuture(SettableApiFuture.create());
}
PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
message.getMessage(),
subscriptionName,
message.getAckId(),
message.getDeliveryAttempt(),
enableOpenTelemetryTracing)
.build();
builder.setMessageWrapper(messageWrapper);
messageWrapper.startSubscriberSpan(tracer, this.exactlyOnceDeliveryEnabled.get());

AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);
OutstandingMessage outstandingMessage = new OutstandingMessage(ackHandler);

if (this.exactlyOnceDeliveryEnabled.get()) {
// For exactly once deliveries we don't add to outstanding batch because we first
Expand Down Expand Up @@ -457,30 +482,39 @@ private void processBatch(List<OutstandingMessage> batch) {
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
message.messageWrapper().startSubscribeConcurrencyControlSpan(tracer);
try {
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
flowController.reserve(1, message.messageWrapper().getPubsubMessage().getSerializedSize());
message.messageWrapper().endSubscribeConcurrencyControlSpan();
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
message.messageWrapper().setSubscribeConcurrencyControlSpanException(unexpectedException);
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
addDeliveryInfoCount(message.messageWrapper());
processOutstandingMessage(message.ackHandler);
}
}

private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
PubsubMessage originalMessage = receivedMessage.getMessage();
int deliveryAttempt = receivedMessage.getDeliveryAttempt();
private void addDeliveryInfoCount(PubsubMessageWrapper messageWrapper) {
PubsubMessage originalMessage = messageWrapper.getPubsubMessage();
int deliveryAttempt = messageWrapper.getDeliveryAttempt();
// Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In
// this case, do not populate the PubsubMessage with the delivery attempt attribute.
if (deliveryAttempt > 0) {
return PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build();
messageWrapper.setPubsubMessage(
PubsubMessage.newBuilder(originalMessage)
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build());
}
return originalMessage;
}

private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
private void processOutstandingMessage(final AckHandler ackHandler) {
// Get the PubsubMessageWrapper and the PubsubMessage it wraps that are stored withing the
// AckHandler object.
PubsubMessageWrapper messageWrapper = ackHandler.ackRequestData.getMessageWrapper();
PubsubMessage message = messageWrapper.getPubsubMessage();

// This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection
// use below in the consumers
SettableApiFuture<AckReply> ackReplySettableApiFuture = SettableApiFuture.create();
Expand All @@ -499,8 +533,10 @@ public void run() {
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.
ackHandler.forget();
messageWrapper.setSubscriberSpanExpirationResult();
return;
}
messageWrapper.startSubscribeProcessSpan(tracer);
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
Expand All @@ -521,7 +557,9 @@ public void run() {
if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
messageWrapper.startSubscribeSchedulerSpan(tracer);
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
messageWrapper.endSubscribeSchedulerSpan();
}
}

Expand Down Expand Up @@ -607,8 +645,10 @@ void processOutstandingOperations() {
List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
pendingReceipts.drainTo(ackRequestDataReceipts);
if (!ackRequestDataReceipts.isEmpty()) {
modackRequestData.add(
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
ModackRequestData receiptModack =
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
receiptModack.setIsReceiptModack(true);
modackRequestData.add(receiptModack);
}
logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());

Expand Down Expand Up @@ -645,6 +685,10 @@ public static final class Builder {
private ScheduledExecutorService systemExecutor;
private ApiClock clock;

private String subscriptionName;
private boolean enableOpenTelemetryTracing;
private Tracer tracer;

protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
}
Expand Down Expand Up @@ -715,6 +759,21 @@ public Builder setApiClock(ApiClock clock) {
return this;
}

public Builder setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
return this;
}

public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
return this;
}

public Builder setTracer(Tracer tracer) {
this.tracer = tracer;
return this;
}

public MessageDispatcher build() {
return new MessageDispatcher(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class ModackRequestData {
private final int deadlineExtensionSeconds;
private List<AckRequestData> ackRequestData;
private boolean isReceiptModack;

ModackRequestData(int deadlineExtensionSeconds) {
this.deadlineExtensionSeconds = deadlineExtensionSeconds;
Expand All @@ -45,8 +46,17 @@ public List<AckRequestData> getAckRequestData() {
return ackRequestData;
}

public boolean getIsReceiptModack() {
return isReceiptModack;
}

public ModackRequestData addAckRequestData(AckRequestData ackRequestData) {
this.ackRequestData.add(ackRequestData);
return this;
}

public ModackRequestData setIsReceiptModack(boolean isReceiptModack) {
this.isReceiptModack = isReceiptModack;
return this;
}
}
Loading
Loading