Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add otel tracing with links #9594

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
90401ff
feat(pubsub): add otel tracing for subscribing
hongalex Sep 18, 2023
a24c893
sync with main otel trunk
hongalex Sep 18, 2023
d307201
fix trailing merge conflict lines
hongalex Sep 18, 2023
c0dfd35
run go mod tidy
hongalex Sep 18, 2023
603835c
add subscribe span test, refactor to map to use SpanContext
hongalex Oct 3, 2023
2b2fc38
switch to using sync.map for optimization
hongalex Oct 3, 2023
da15e9b
fix parent spans
hongalex Oct 4, 2023
13ad8ea
switch subscribe spans to use events
hongalex Oct 14, 2023
2980177
make subscribe span end after ack
hongalex Oct 26, 2023
9386954
use updated custom attribute names
hongalex Oct 31, 2023
29b7888
standardize attribute names and span names
hongalex Jan 11, 2024
9f95507
upgrade dependencies, add disable trace flag
hongalex Jan 19, 2024
718c28a
clean up create span
hongalex Jan 19, 2024
de24f2f
wrap publish side spans in helpers
hongalex Jan 23, 2024
3724293
add test for when telemetry is disabled
hongalex Jan 23, 2024
28cad4d
fix issue with nil clientconfig
hongalex Jan 23, 2024
b4d5173
add code.Function attribute
hongalex Jan 23, 2024
0164a15
make tracing disabled by default, wrap spans in helper
hongalex Feb 2, 2024
3c35fb0
span events cleanup
hongalex Feb 3, 2024
7c045e7
merge in changes from main branch
hongalex Feb 3, 2024
8c93af3
Merge branch 'main' of github.com:googleapis/google-cloud-go into pub…
hongalex Feb 5, 2024
f80cc12
support context passing for GAPIC spans
hongalex Feb 6, 2024
ba6b1e0
merge with feature trunk
hongalex Feb 6, 2024
97e1cef
undo changes to semconv in storage client
hongalex Feb 6, 2024
ef00726
fix publish tests and parent ordering
hongalex Feb 12, 2024
cadb30c
Merge branch 'pubsub-otel-trace-receive-clean' of github.com:hongalex…
hongalex Feb 12, 2024
45641de
fix disable tracing and error tests
hongalex Feb 13, 2024
8a2cd61
update subscribe side spans and tests
hongalex Feb 16, 2024
2800def
fix race condition, rename concurrency control span
hongalex Feb 29, 2024
3d6b2f2
add basic benchmark
hongalex Mar 5, 2024
2fedef2
Merge branch 'pubsub-otel-trace-receive-clean' of github.com:hongalex…
hongalex Mar 5, 2024
04d5233
make process span end after ack/nack called
hongalex Mar 5, 2024
af71630
merge main
hongalex Mar 13, 2024
219d93b
Merge branch 'pubsub-otel-trace-receive-clean' of github.com:hongalex…
hongalex Mar 13, 2024
b553856
sync with trunk branch
hongalex Mar 15, 2024
06adcf7
run go mod tidy
hongalex Mar 15, 2024
763a44e
switch to using exp/slices since go1.21 not supported yet
hongalex Mar 15, 2024
592b1f6
update activeSpans sync.map comment and name
hongalex Mar 15, 2024
78b2dc4
add helper function for extracting resource ID from fqn
hongalex Mar 15, 2024
1d81a00
guard otel code behind flag
hongalex Mar 18, 2024
47be12a
switch code.function naming
hongalex Mar 19, 2024
dfd4dfa
Merge branch 'main' of github.com:googleapis/google-cloud-go into pub…
hongalex Mar 19, 2024
2165f3a
Merge branch 'pubsub-otel-trace' of github.com:googleapis/google-clou…
hongalex Mar 19, 2024
306043e
remove extra argument from toMessage
hongalex Mar 19, 2024
339babb
revert changes to ctx passed to fc.acquire, fix span hierarchy
hongalex Mar 19, 2024
fdf81aa
rename span map and fix context clobbering bug
hongalex Mar 26, 2024
210edcb
removed unused args in message conversion helpers
hongalex Mar 26, 2024
c3d5c9b
fix leak in activeSpans map, shift context passing, rename attributes
hongalex Apr 9, 2024
5e17348
fix failing tests because of attributes changes
hongalex Apr 10, 2024
0b40755
cleanup unused function and rename FQN helper
hongalex Apr 10, 2024
5589d42
simplify payload size attribute calc
hongalex Apr 12, 2024
bacda49
fix issue with trying to use uninitialized span
hongalex Apr 12, 2024
110fd53
fix merge conflict with main
hongalex Jun 19, 2024
4b1aa55
bring in changes from diverged otel branch
hongalex Jun 20, 2024
e5c1f6b
Merge branch 'pubsub-otel-trace-receive-clean' of github.com:hongalex…
hongalex Jun 20, 2024
8db922a
run go mod tidy
hongalex Jun 20, 2024
97bb12d
upgrade to latest semconv version
hongalex Jun 21, 2024
f8291a0
Merge branch 'main' of github.com:googleapis/google-cloud-go into pub…
hongalex Jun 21, 2024
e62f21e
migrate more attributes to semconv 1.26.0
hongalex Jun 21, 2024
f160c6a
fix trace tests
hongalex Jun 22, 2024
a6b724a
add ink to batch spans for message spans
hongalex Jun 22, 2024
249e8ce
add name of span we are linking to
hongalex Jun 22, 2024
b77a0f5
add project attribute, check span recording
hongalex Jun 25, 2024
454e69e
fix issue with not pulling messages
hongalex Jun 25, 2024
3179843
resolve merge
hongalex Jul 2, 2024
2962ed2
run go.mod tidy
hongalex Jul 3, 2024
b432d07
temporarily remove use of go1.21 package
hongalex Jul 3, 2024
41e3750
remove use of attribute.Set which requires go1.21
hongalex Jul 3, 2024
0fca05b
downgrade back to go1.20
hongalex Jul 3, 2024
e43571c
downgrade back to go1.20
hongalex Jul 3, 2024
b32bde9
bump go version to use latest otel version
hongalex Jul 3, 2024
13ee471
update enable otel client comment
hongalex Jul 3, 2024
980603f
improve documentation, fix ack/modack logic flow
hongalex Jul 22, 2024
be8f7fb
clarified context assignment comment for otelctx
hongalex Jul 22, 2024
84aefe9
fix span context propagation in subscribe spans
hongalex Jul 30, 2024
6475a66
undo bad merge, fix empty project FQN parsing
hongalex Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
standardize attribute names and span names
  • Loading branch information
hongalex committed Jan 11, 2024
commit 29b78884d0cf6be8df058d9596fae04b89f5f1f6
27 changes: 15 additions & 12 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
"cloud.google.com/go/pubsub/internal/distribution"
gax "github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
Expand Down Expand Up @@ -72,6 +72,7 @@ type messageIterator struct {
po *pullOptions
ps *pullStream
subc *vkit.SubscriberClient
subID string
subName string
kaTick <-chan time.Time // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
Expand Down Expand Up @@ -132,12 +133,15 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
cctx, cancel := context.WithCancel(context.Background())
cctx = withSubscriptionKey(cctx, subName)

subID := strings.Split(subName, "/")[3]
hongalex marked this conversation as resolved.
Show resolved Hide resolved

it := &messageIterator{
ctx: cctx,
cancel: cancel,
ps: ps,
po: po,
subc: subc,
subID: subID,
subName: subName,
kaTick: time.After(keepAlivePeriod),
ackTicker: ackTicker,
Expand Down Expand Up @@ -305,15 +309,13 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {

opts := getSubSpanAttributes(it.subName, m, semconv.MessagingOperationReceive)
if m.Attributes != nil {
ctx = otel.GetTextMapPropagator().Extract(ctx, newMessageCarrier(m))

log.Printf("context extracted: %+v\n", ctx)
ctx = propagation.TraceContext{}.Extract(ctx, newMessageCarrier(m))
}
_, span := tracer().Start(ctx, fmt.Sprintf("%s %s", it.subName, subscribeReceiveSpanName), opts...)
_, span := tracer().Start(ctx, fmt.Sprintf("%s %s", it.subID, subscribeSpanName), opts...)
span.SetAttributes(
attribute.Bool(eosAttribute, it.enableExactlyOnceDelivery),
attribute.String(ackIDAttribute, ackID),
attribute.Int(numBatchedMessagesAttribute, len(pendingMessages)),
semconv.MessagingBatchMessageCount(len(pendingMessages)),
)
it.activeSpan.Store(ackID, span)
}
Expand Down Expand Up @@ -539,7 +541,7 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
links = append(links, trace.Link{SpanContext: parentSpan.SpanContext()})
}
}
_, ackSpan := tracer().Start(context.Background(), ackSpanName, trace.WithLinks(links...))
_, ackSpan := tracer().Start(context.Background(), fmt.Sprintf("%s %s", it.subID, ackSpanName), trace.WithLinks(links...))
defer ackSpan.End()
ackSpan.SetAttributes(semconv.MessagingBatchMessageCount(numBatch))

Expand Down Expand Up @@ -594,14 +596,15 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur
for len(ackIDs) > 0 {
toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)

var spanName, eventName string
spanName := it.subID + " "
var eventName string
if isNack {
recordStat(it.ctx, NackCount, int64(len(toSend)))
spanName = nackSpanName
spanName += spanName + nackSpanName
eventName = "nack"
} else {
recordStat(it.ctx, ModAckCount, int64(len(toSend)))
spanName = modAckSpanName
spanName = spanName + modAckSpanName
eventName = "modack"
}

Expand All @@ -627,8 +630,8 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur
defer mSpan.End()
if !isNack {
mSpan.SetAttributes(
attribute.Int(modackDeadlineSecondsAttribute, int(deadlineSec)),
attribute.Bool(initialModackAttribute, isReceipt))
attribute.Int(ackDeadlineSecAttribute, int(deadlineSec)),
attribute.Bool(receiptModackAttribute, isReceipt))
}
mSpan.SetAttributes(semconv.MessagingBatchMessageCount(numBatch))
}
Expand Down
4 changes: 2 additions & 2 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/internal/scheduler"
gax "github.com/googleapis/gax-go/v2"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -1426,7 +1426,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
m := msg.(*Message)
schedulerSpan.End()
defer wg.Done()
_, cSpan := tracer().Start(ctx3, fmt.Sprintf("%s %s", s.String(), subscribeProcessSpanName))
_, cSpan := tracer().Start(ctx3, fmt.Sprintf("%s %s", s.ID(), subscribeProcessSpanName))
defer cSpan.End()
old2 := ackh.doneFunc
ackh.doneFunc = func(ackID string, ack bool, r *ipubsub.AckResult, receiveTime time.Time) {
Expand Down
24 changes: 16 additions & 8 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ import (
gax "github.com/googleapis/gax-go/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/api/support/bundler"
"google.golang.org/grpc"
Expand Down Expand Up @@ -572,7 +571,7 @@ var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false,
// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish
// will immediately return a PublishResult with an error.
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
ctx, publishSpan := startPublishSpan(ctx, msg, t.String())
ctx, publishSpan := startPublishSpan(ctx, msg, t.ID())
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
if err != nil {
log.Printf("pubsub: cannot create context with tag in Publish: %v", err)
Expand Down Expand Up @@ -765,19 +764,28 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
// key, it doesn't matter which we read from.
orderingKey = bms[0].msg.OrderingKey
}

links := make([]trace.Link, 0, numMsgs)
for _, bm := range bms {
links = append(links, trace.Link{SpanContext: bm.span.SpanContext()})
}

topicID := strings.Split(t.name, "/")[3]
_, pSpan := tracer().Start(ctx, fmt.Sprintf("%s %s", topicID, publishRPCSpanName), trace.WithLinks(links...))
pSpan.SetAttributes(semconv.MessagingBatchMessageCount(numMsgs))
defer pSpan.End()

for i, bm := range bms {
pbMsgs[i] = &pb.PubsubMessage{
Data: bm.msg.Data,
Attributes: bm.msg.Attributes,
OrderingKey: bm.msg.OrderingKey,
}
if bm.msg.Attributes != nil {
ctx = otel.GetTextMapPropagator().Extract(ctx, newMessageCarrier(bm.msg))
ctx = propagation.TraceContext{}.Extract(ctx, newMessageCarrier(bm.msg))
}
_, pSpan := tracer().Start(ctx, publishRPCSpanName)
pSpan.SetAttributes(attribute.Int(numBatchedMessagesAttribute, numMsgs))

defer bm.span.End()
defer pSpan.End()
bm.msg = nil // release bm.msg for GC
}

Expand Down
55 changes: 26 additions & 29 deletions pubsub/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -310,36 +310,34 @@ func (c messageCarrier) Keys() []string {

const (
// publish span names
publisherSpanName = "send"
publisherSpanName = "create"
publishFlowControlSpanName = "publisher flow control"
publishBatcherSpanName = "publish scheduler"
publishBatcherSpanName = "publisher batching"
publishRPCSpanName = "publish"

// subscribe span names
subscribeReceiveSpanName = "receive"
subscribeSpanName = "subscribe"
subscriberFlowControlSpanName = "subscriber flow control"
subscribeProcessSpanName = "process"
subscribeSchedulerSpanName = "subscribe scheduler"
receiptModAckSpanName = "send initial ModifyAckDeadline"
modAckSpanName = "send ModifyAckDeadline"
ackSpanName = "send Acknowledge"
nackSpanName = "send Negative Acknowledge"
modAckSpanName = "modify ack deadline"
ackSpanName = "ack"
nackSpanName = "nack"

// custom pubsub specific attributes
numBatchedMessagesAttribute = "messaging.gcp.pubsub.num_messages_in_batch"
subscriptionAttribute = "messaging.gcp.pubsub.subscription"
orderingAttribute = "messaging.gcp.pubsub.ordering_key"
deliveryAttemptAttribute = "messaging.gcp.pubsub.delivery_attempt"
eosAttribute = "messaging.gcp.pppubsub.exactly_once_delivery"
ackIDAttribute = "messaging.gcp.pubsub.ack_id"
resultAttribute = "messaging.gcp.pubsub.result"
modackDeadlineSecondsAttribute = "messaging.gcp.pubsub.modack_deadline_seconds"
initialModackAttribute = "messaging.gcp.pubsub.is_initial_modack"
pubsubPrefix = "messaging.gcp_pubsub."
orderingAttribute = pubsubPrefix + "message.ordering_key"
deliveryAttemptAttribute = pubsubPrefix + "message.delivery_attempt"
eosAttribute = pubsubPrefix + "exactly_once_delivery"
ackIDAttribute = pubsubPrefix + "message.ack_id"
resultAttribute = pubsubPrefix + "result"
ackDeadlineSecAttribute = pubsubPrefix + "ack_deadline_seconds"
receiptModackAttribute = pubsubPrefix + "is_receipt_modack"
hongalex marked this conversation as resolved.
Show resolved Hide resolved
)

func startPublishSpan(ctx context.Context, m *Message, topicName string) (context.Context, trace.Span) {
opts := getPublishSpanAttributes(topicName, m)
return tracer().Start(ctx, fmt.Sprintf("%s %s", topicName, publisherSpanName), opts...)
func startPublishSpan(ctx context.Context, m *Message, topicID string) (context.Context, trace.Span) {
opts := getPublishSpanAttributes(topicID, m)
return tracer().Start(ctx, fmt.Sprintf("%s %s", topicID, publisherSpanName), opts...)
}

func startPublishFlowControlSpan(ctx context.Context) (context.Context, trace.Span) {
Expand All @@ -362,10 +360,10 @@ func getPublishSpanAttributes(topic string, msg *Message, opts ...attribute.KeyV
ss := []trace.SpanStartOption{
trace.WithAttributes(
semconv.MessagingSystemKey.String("pubsub"),
semconv.MessagingDestinationKey.String(topic),
semconv.MessagingDestinationKindTopic,
semconv.MessagingMessageIDKey.String(msg.ID),
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize),
semconv.MessagingDestinationName(topic),
semconv.MessagingDestinationTemplate("projects/{projectID}/subscriptions/{subscriptionID}"),
semconv.MessagingMessageID(msg.ID),
semconv.MessagingMessagePayloadSizeBytes(msgSize),
attribute.String(orderingAttribute, msg.OrderingKey),
),
trace.WithAttributes(opts...),
Expand All @@ -381,7 +379,7 @@ func injectPropagation(ctx context.Context, msg *Message) {
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
}
otel.GetTextMapPropagator().Inject(ctx, newMessageCarrier(msg))
propagation.TraceContext{}.Inject(ctx, newMessageCarrier(msg))
}
}

Expand All @@ -394,10 +392,9 @@ func getSubSpanAttributes(sub string, msg *Message, opts ...attribute.KeyValue)
ss := []trace.SpanStartOption{
trace.WithAttributes(
semconv.MessagingSystemKey.String("pubsub"),
semconv.MessagingDestinationKindTopic,
semconv.MessagingMessageIDKey.String(msg.ID),
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize),
attribute.String(subscriptionAttribute, sub),
semconv.MessagingDestinationName(sub),
semconv.MessagingMessageID(msg.ID),
semconv.MessagingMessagePayloadSizeBytes(msgSize),
attribute.String(orderingAttribute, msg.OrderingKey),
),
trace.WithAttributes(opts...),
Expand Down
25 changes: 11 additions & 14 deletions pubsub/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"
)
Expand All @@ -58,7 +58,7 @@ func TestTrace_MessageCarrier(t *testing.T) {
}

newCtx := context.Background()
otel.GetTextMapPropagator().Extract(newCtx, newMessageCarrier(msg))
propagation.TraceContext{}.Extract(newCtx, newMessageCarrier(msg))
if _, ok := msg.Attributes[googclientPrefix+"traceparent"]; !ok {
t.Fatalf("expected traceparent in message attributes, found empty string")
}
Expand Down Expand Up @@ -95,8 +95,7 @@ func TestTrace_PublishSpan(t *testing.T) {
Name: fmt.Sprintf("%s %s", topicName, publisherSpanName),
SpanKind: trace.SpanKindProducer,
Attributes: []attribute.KeyValue{
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationKey.String(topicName),
semconv.MessagingDestinationName(topicName),
// Hardcoded since the fake server always returns m0 first.
semconv.MessagingMessageIDKey.String("m0"),
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize),
Expand Down Expand Up @@ -126,7 +125,7 @@ func TestTrace_PublishSpan(t *testing.T) {
tracetest.SpanStub{
Name: publishRPCSpanName,
Attributes: []attribute.KeyValue{
attribute.Int(numBatchedMessagesAttribute, 1),
semconv.MessagingBatchMessageCount(1),
},
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Expand Down Expand Up @@ -349,10 +348,10 @@ func TestTrace_SubscribeSpans(t *testing.T) {
},
},
tracetest.SpanStub{
Name: fmt.Sprintf("%s %s", subName, subscribeReceiveSpanName),
Name: fmt.Sprintf("%s %s", subName, subscribeSpanName),
SpanKind: trace.SpanKindConsumer,
Attributes: []attribute.KeyValue{
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationName(topicName),
// Hardcoded since the fake server always returns m0 first.
semconv.MessagingMessageIDKey.String("m0"),
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize),
Expand All @@ -363,8 +362,7 @@ func TestTrace_SubscribeSpans(t *testing.T) {
attribute.String(resultAttribute, "ack"),
attribute.String(orderingAttribute, m.OrderingKey),
semconv.MessagingSystemKey.String("pubsub"),
attribute.Int(numBatchedMessagesAttribute, 1),
attribute.String(subscriptionAttribute, subName),
semconv.MessagingBatchMessageCount(1),
},
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Expand Down Expand Up @@ -399,9 +397,9 @@ func TestTrace_SubscribeSpans(t *testing.T) {
Version: internal.Version,
},
Attributes: []attribute.KeyValue{
attribute.Bool(initialModackAttribute, true),
attribute.Int(modackDeadlineSecondsAttribute, 10),
attribute.Int(numBatchedMessagesAttribute, 1),
attribute.Bool(receiptModackAttribute, true),
attribute.Int(ackDeadlineSecAttribute, 10),
semconv.MessagingBatchMessageCount(1),
},
},
}
Expand Down Expand Up @@ -457,8 +455,7 @@ func getPublishSpanStubsWithError(topicName string, m *Message, msgSize int, err
Name: fmt.Sprintf("%s %s", topicName, publisherSpanName),
SpanKind: trace.SpanKindProducer,
Attributes: []attribute.KeyValue{
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationKey.String(topicName),
semconv.MessagingDestinationName(topicName),
semconv.MessagingMessageIDKey.String(""),
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize),
attribute.String(orderingAttribute, m.OrderingKey),
Expand Down
Loading