Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenTelemetry tracing to the Publisher and Subscriber #2086

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
903895a
feat: Initial publish side Open Telemetry support
michaelpri10 Jun 17, 2024
469f220
feat: Publish-side trace context injection
michaelpri10 Jun 20, 2024
ad31b53
feat: Tests and improvements to publish side OTel tracing
michaelpri10 Jun 24, 2024
9246689
feat: More tests and refactoring for publish-side OpenTelemetry
michaelpri10 Jun 24, 2024
34594cb
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 24, 2024
901e135
feat: Formatting files
michaelpri10 Jun 24, 2024
6339e52
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 24, 2024
c954ae1
feat: Publisher test changes
michaelpri10 Jun 24, 2024
fc163ab
test: Fix OpenTelemetry test
michaelpri10 Jun 24, 2024
6ab4cfe
Feat: Use OpenTelemetry semconv
michaelpri10 Jun 24, 2024
bc7530a
test: Fix some dependency issues
michaelpri10 Jun 24, 2024
77d56df
feat: Test fix
michaelpri10 Jun 24, 2024
e275efa
feat: Add comment for setter in builder
michaelpri10 Jun 25, 2024
456ac83
Opentelemetry subscribe (#2100)
michaelpri10 Jul 2, 2024
8bbd688
Merge branch 'main' into opentelemetry
michaelpri10 Jul 2, 2024
620e6b5
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 2, 2024
8668b92
Merge branch 'opentelemetry' of https://github.com/googleapis/java-pu…
gcf-owl-bot[bot] Jul 2, 2024
61257b8
Opentelemetry subscribe (#2101)
michaelpri10 Jul 2, 2024
b0e0424
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 2, 2024
e21a9f0
fix: Fix build errors in Publisher
michaelpri10 Jul 2, 2024
150ab74
test: Ignore org.assertj:assertj-core which is required for OTel test…
michaelpri10 Jul 8, 2024
f558305
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 8, 2024
e7b05de
test: Add tests for subscriber OTel functions
michaelpri10 Jul 9, 2024
2ab460c
Merge branch 'main' into opentelemetry
michaelpri10 Aug 28, 2024
6c5e03c
feat: Changes to OpenTelemetry implementation to add links earlier an…
michaelpri10 Aug 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: More tests and refactoring for publish-side OpenTelemetry
  • Loading branch information
michaelpri10 committed Jun 24, 2024
commit 9246689d0c64c2f30339bb2bf262b5d380c38ded
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public Span getPublisherSpan() {
*/
public void startPublisherSpan(Tracer tracer) {
if (enableOpenTelemetryTracing && tracer != null) {

AttributesBuilder attributesBuilder =
OpenTelemetryUtil.createPublishSpanAttributesBuilder(
topicName, "Publisher.publish", "create");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.pubsub.v1;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -62,19 +63,21 @@ public class OpenTelemetryTest {
private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.envelope.size";
private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";

private static final OpenTelemetryRule openTelemetryRule = OpenTelemetryRule.create();
private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent";

private static final OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create();

@Test
public void testPublishSpansSuccess() {
openTelemetryRule.clearSpans();
openTelemetryTesting.clearSpans();

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true)
.build();
List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);

long messageSize = messageWrapper.getPubsubMessage().getSerializedSize();
Tracer tracer = openTelemetryRule.getOpenTelemetry().getTracer("test");
Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");

// Call all span start/end methods in the expected order
messageWrapper.startPublisherSpan(tracer);
Expand All @@ -88,8 +91,8 @@ public void testPublishSpansSuccess() {
messageWrapper.setMessageIdSpanAttribute(MESSAGE_ID);
messageWrapper.endPublisherSpan();

List<SpanData> allSpans = openTelemetryRule.getSpans();
assertEquals(allSpans.size(), 4);
List<SpanData> allSpans = openTelemetryTesting.getSpans();
assertEquals(4, allSpans.size());
SpanData flowControlSpanData = allSpans.get(0);
SpanData batchingSpanData = allSpans.get(1);
SpanData publishRpcSpanData = allSpans.get(2);
Expand Down Expand Up @@ -118,10 +121,10 @@ public void testPublishSpansSuccess() {
.hasEnded();

List<LinkData> publishRpcLinks = publishRpcSpanData.getLinks();
assertEquals(publishRpcLinks.size(), messageWrappers.size());
assertEquals(publishRpcLinks.get(0).getSpanContext(), publisherSpanData.getSpanContext());
assertEquals(messageWrappers.size(), publishRpcLinks.size());
assertEquals(publisherSpanData.getSpanContext(), publishRpcLinks.get(0).getSpanContext());

assertEquals(publishRpcSpanData.getAttributes().size(), 6);
assertEquals(6, publishRpcSpanData.getAttributes().size());
AttributesAssert publishRpcSpanAttributesAssert =
OpenTelemetryAssertions.assertThat(publishRpcSpanData.getAttributes());
publishRpcSpanAttributesAssert
Expand All @@ -140,7 +143,7 @@ public void testPublishSpansSuccess() {
.hasNoParent()
.hasEnded();

assertEquals(publisherSpanData.getEvents().size(), 2);
assertEquals(2, publisherSpanData.getEvents().size());
EventDataAssert startEventAssert =
OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(0));
startEventAssert.hasName(PUBLISH_START_EVENT);
Expand All @@ -149,10 +152,10 @@ public void testPublishSpansSuccess() {
endEventAssert.hasName(PUBLISH_END_EVENT);

List<LinkData> publisherLinks = publisherSpanData.getLinks();
assertEquals(publisherLinks.size(), 1);
assertEquals(publisherLinks.get(0).getSpanContext(), publishRpcSpanData.getSpanContext());
assertEquals(1, publisherLinks.size());
assertEquals(publishRpcSpanData.getSpanContext(), publisherLinks.get(0).getSpanContext());

assertEquals(publisherSpanData.getAttributes().size(), 8);
assertEquals(8, publisherSpanData.getAttributes().size());
AttributesAssert publisherSpanAttributesAssert =
OpenTelemetryAssertions.assertThat(publisherSpanData.getAttributes());
publisherSpanAttributesAssert
Expand All @@ -164,26 +167,33 @@ public void testPublishSpansSuccess() {
.containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
.containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
.containsEntry(MESSAGE_ID_ATTR_KEY, MESSAGE_ID);

// Check that the message has the attribute containing the trace context.
PubsubMessage message = messageWrapper.getPubsubMessage();
assertEquals(1, message.getAttributesMap().size());
assertTrue(message.containsAttributes(TRACEPARENT_ATTRIBUTE));
assertTrue(message.getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "").contains(publisherSpanData.getTraceId()));
assertTrue(message.getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "").contains(publisherSpanData.getSpanId()));
}

@Test
public void testPublishFlowControlSpanFailure() {
openTelemetryRule.clearSpans();
openTelemetryTesting.clearSpans();

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true)
.build();

Tracer tracer = openTelemetryRule.getOpenTelemetry().getTracer("test");
Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");

messageWrapper.startPublisherSpan(tracer);
messageWrapper.startPublishFlowControlSpan(tracer);

Exception e = new Exception("test-exception");
messageWrapper.setPublishFlowControlSpanException(e);

List<SpanData> allSpans = openTelemetryRule.getSpans();
assertEquals(allSpans.size(), 2);
List<SpanData> allSpans = openTelemetryTesting.getSpans();
assertEquals(2, allSpans.size());
SpanData flowControlSpanData = allSpans.get(0);
SpanData publisherSpanData = allSpans.get(1);

Expand All @@ -208,22 +218,22 @@ public void testPublishFlowControlSpanFailure() {

@Test
public void testPublishBatchingSpanFailure() {
openTelemetryRule.clearSpans();
openTelemetryTesting.clearSpans();

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true)
.build();

Tracer tracer = openTelemetryRule.getOpenTelemetry().getTracer("test");
Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");

messageWrapper.startPublisherSpan(tracer);
messageWrapper.startPublishBatchingSpan(tracer);

Exception e = new Exception("test-exception");
messageWrapper.setPublishBatchingSpanException(e);

List<SpanData> allSpans = openTelemetryRule.getSpans();
assertEquals(allSpans.size(), 2);
List<SpanData> allSpans = openTelemetryTesting.getSpans();
assertEquals(2, allSpans.size());
SpanData batchingSpanData = allSpans.get(0);
SpanData publisherSpanData = allSpans.get(1);

Expand All @@ -247,14 +257,14 @@ public void testPublishBatchingSpanFailure() {

@Test
public void testPublishRpcSpanFailure() {
openTelemetryRule.clearSpans();
openTelemetryTesting.clearSpans();

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString(), true)
.build();

List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);
Tracer tracer = openTelemetryRule.getOpenTelemetry().getTracer("test");
Tracer tracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");

messageWrapper.startPublisherSpan(tracer);
Span publishRpcSpan =
Expand All @@ -264,8 +274,8 @@ public void testPublishRpcSpanFailure() {
OpenTelemetryUtil.setPublishRpcSpanException(publishRpcSpan, e, true);
messageWrapper.endPublisherSpan();

List<SpanData> allSpans = openTelemetryRule.getSpans();
assertEquals(allSpans.size(), 2);
List<SpanData> allSpans = openTelemetryTesting.getSpans();
assertEquals(2, allSpans.size());
SpanData rpcSpanData = allSpans.get(0);
SpanData publisherSpanData = allSpans.get(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand All @@ -74,6 +81,11 @@ public class PublisherImplTest {
private static final TransportChannelProvider TEST_CHANNEL_PROVIDER =
LocalChannelProvider.create("test-server");

private static final String PUBLISHER_SPAN_NAME = TEST_TOPIC.getTopic() + " create";
private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control";
private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching";
private static final String PUBLISH_RPC_SPAN_NAME = TEST_TOPIC.getTopic() + " publish";

private FakeScheduledExecutorService fakeExecutor;

private FakePublisherServiceImpl testPublisherServiceImpl;
Expand Down Expand Up @@ -1304,6 +1316,69 @@ public void run() {
publish4Completed.await();
}

@Test
public void testPublishOpenTelemetryTracing() throws Exception {
OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create();
OpenTelemetry openTelemetry = openTelemetryTesting.getOpenTelemetry();
final Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.setDelayThreshold(Duration.ofSeconds(5))
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.setMaxOutstandingElementCount(2L)
.setMaxOutstandingRequestBytes(100L)
.build())
.build())
.setOpenTelemetry(openTelemetry)
.setEnableOpenTelemetryTracing(true)
.build();

testPublisherServiceImpl.addPublishResponse(
PublishResponse.newBuilder().addMessageIds("1"));
ApiFuture<String> publishFuture = sendTestMessage(publisher, "A");
assertEquals("1", publishFuture.get());

List<SpanData> allSpans = openTelemetryTesting.getSpans();
assertEquals(4, allSpans.size());
SpanData flowControlSpanData = allSpans.get(0);
SpanData batchingSpanData = allSpans.get(1);
SpanData publishRpcSpanData = allSpans.get(2);
SpanData publisherSpanData = allSpans.get(3);
Comment on lines +1348 to +1351
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are these orders determined? It doesn't seem like it's chronological. Is it possible these change based on some race condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order is decided by when the span is ended. This should be the order in which spans are ended, so there shouldn't be a race condition in which they would end in a different order.


SpanDataAssert flowControlSpanDataAssert =
OpenTelemetryAssertions.assertThat(flowControlSpanData);
flowControlSpanDataAssert
.hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME)
.hasParent(publisherSpanData)
.hasEnded();

SpanDataAssert batchingSpanDataAssert = OpenTelemetryAssertions.assertThat(batchingSpanData);
batchingSpanDataAssert
.hasName(PUBLISH_BATCHING_SPAN_NAME)
.hasParent(publisherSpanData)
.hasEnded();

SpanDataAssert publishRpcSpanDataAssert =
OpenTelemetryAssertions.assertThat(publishRpcSpanData);
publishRpcSpanDataAssert
.hasName(PUBLISH_RPC_SPAN_NAME)
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasEnded();

SpanDataAssert publishSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData);
publishSpanDataAssert
.hasName(PUBLISHER_SPAN_NAME)
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasEnded();
}

private Builder getTestPublisherBuilder() {
return Publisher.newBuilder(TEST_TOPIC)
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
Expand Down
Loading