Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Messages for a key are not received up to an indefinite time when an old stream within the iterator closes and a new stream is opened #10440

Closed
pawanrawal opened this issue Jun 26, 2024 · 9 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. status: investigating The issue is under investigation, which is determined to be non-trivial.

Comments

@pawanrawal
Copy link

pawanrawal commented Jun 26, 2024

Client

PubSub v1.38.0

Environment

GKE

Go Environment

NA

Code

We are using Google Pub/Sub with exactly-once and ordered delivery enabled on a topic. For acknowledging messages, we use "Ack" instead of "AckWithResult". I've observed that whenever a new stream is opened due to the Pub/Sub server closing an old stream (part of the iterator), messages that have been Ack'ed stop being received mostly until the ack deadline is reached or sometimes even longer (say 8 or 12 mins) when the deadline is 2m.

It seems that after a stream is closed because the Pub/Sub server sent a GOAWAY, the library fails to acknowledge the messages that were Ack'ed using ACK until the server's ack deadline is reached, and then they are resent for processing. Could it be that when streams are closed, the library discards the messages that were accepted for ACKing or doesn't retry for certain gRPC error codes when it should?

NumGoRoutines: 10
MaxOutstandingMessages: 4000
MinExtensionTimePeriod: 2m
MaxExtensionTimePeriod: 2m

Expected behavior

I would have expected the messages to show up immediately as they were acked by the application.

Actual behavior

The messages for a given key do not show up for a couple of couple of mins ("MaxExtensionPeriod" is 2 mins) and then they show up.

Screenshots

  • logs showing 2 min gap between a message being acked and then being received again
Screenshot 2024-06-26 at 20 57 01
  • Expired ack deadlines graph from the pubsub server
Screenshot 2024-06-26 at 20 58 52
  • Graph showing increase in open_stream_count around 20:25 when we saw that the message was delayed by a couple of mins
Screenshot 2024-06-26 at 21 09 07
@pawanrawal pawanrawal added the triage me I really want to be triaged. label Jun 26, 2024
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jun 26, 2024
@pawanrawal pawanrawal changed the title pubsub: Messages are not acked when an old stream within the iterator closes and a new stream is opened pubsub: Messages for a key are not received up to MaxExtensionPeriod when an old stream within the iterator closes and a new stream is opened Jun 26, 2024
@pawanrawal pawanrawal changed the title pubsub: Messages for a key are not received up to MaxExtensionPeriod when an old stream within the iterator closes and a new stream is opened pubsub: Messages for a key are not received up to an indefinite team when an old stream within the iterator closes and a new stream is opened Jun 26, 2024
@hongalex hongalex added status: investigating The issue is under investigation, which is determined to be non-trivial. and removed triage me I really want to be triaged. labels Jun 26, 2024
@hongalex
Copy link
Member

hongalex commented Jun 26, 2024

For acknowledging messages, we use "Ack" instead of "AckWithResult".

This isn't recommended for exactly once delivery, since you need to check the AckResult to make sure the acks actually go through. Is there a reason why you're using the former?

Could it be that when streams are closed, the library discards the messages that were accepted for ACKing or doesn't retry for certain gRPC error codes when it should?

To my knowledge, once a message gets into the library sections, there is always a graceful shutdown (it doesn't discard messages), even when the streams are recreated.

I think your stream count is too high. It's something we just started warning users about, but we strongly recommend using 1 stream (sub.ReceiveSettings.NumGoroutines=1) if your processing is under 10 MB/s. If higher, I recommend horizontal scaling (creating more subscriber applications). Could you try lowering that to see if this helps?

@pawanrawal
Copy link
Author

So we have made the change to use "AckResult"instead of "Ack"

To my knowledge, once a message gets into the library sections, there is always a graceful shutdown (it doesn't discard messages), even when the streams are recreated.

That's what I was expecting, but it seems that some messages associated with an ordering key are sometimes delayed by up to 10 minutes. This typically occurs after a stream is recreated.

I think your stream count is too high. It's something we just started warning users about, but we strongly recommend using 1 stream (sub.ReceiveSettings.NumGoroutines=1) if your processing is under 10 MB/s. If higher, I recommend horizontal scaling (creating more subscriber applications). Could you try lowering that to see if this helps?

We tried setting NumGoroutines to 1, but this led to a reduction in the number of outstanding messages and caused delays in processing them. I am currently gathering metrics to determine the exact amount of data we're processing before adjusting the NumGoroutines value. Is there a metric in the library that can provide this information? Additionally, why does a higher goroutine count pose a problem?

We have also created a Google support ticket that you can look into: https://console.cloud.google.com/support/cases/detail/v2/51980112

Thanks!

@pawanrawal pawanrawal changed the title pubsub: Messages for a key are not received up to an indefinite team when an old stream within the iterator closes and a new stream is opened pubsub: Messages for a key are not received up to an indefinite time when an old stream within the iterator closes and a new stream is opened Jun 27, 2024
@hongalex
Copy link
Member

Thanks, currently following up on the support ticket but thought I'd update you here as well:

We tried setting NumGoroutines to 1, but this led to a reduction in the number of outstanding messages and caused delays in processing them. I am currently gathering metrics to determine the exact amount of data we're processing before adjusting the NumGoroutines value. Is there a metric in the library that can provide this information? Additionally, why does a higher goroutine count pose a problem?

The problem is that there's an unintended behavior where MaxOutstandingMessages isn't being respected when NumGoroutines is > 1. This setting configures how many streaming pull connections we open, where each stream currently tells the server "I want you to give me 4,000 messages" at a time, using your numbers. The client is still limited by 4,000 messages total, but if you have 10 streams, the server will try to deliver 40,000 messages at once. The client will start processing 4,000, and pause on the remaining 36,000 while the others are being processed. This is likely what is contributing to message expirations.

As mentioned before, each stream can handle 10 MB/s, and looking at your processing speed, it doesn't look like you are hitting that.

@pawanrawal
Copy link
Author

Thanks for your response, @hongalex. Your explanation provides helpful context. However, I'm still unclear about how this relates to the issue we're facing. Our workers' outstanding messages are significantly below the MaxOutstandingMessages threshold. Are you observing any server-side metrics indicating that the server is pausing to send metrics to a client due to backpressure? Additionally, could you share which metrics you are using to determine that we are processing less than 10MB/s per client? I am trying to add a metric in our application to measure the payload size of a pubsub.Message.

Another detail here is that within the Receive function, when we process a pubsub message, we publish a new message for the same ordering key before calling AckWithResult for the current message. Could that be an issue?

@hongalex
Copy link
Member

hongalex commented Jun 28, 2024

Are you observing any server-side metrics indicating that the server is pausing to send metrics to a client due to backpressure?

Yes, I can see that your streams are being flow controlled, which unfortunately aren't public. Strangely, the expirations aren't tied to when the flow control is happening. It might be more correlated to when streams are reopened (as you originally suggested) but it's hard for me to see since I'm basing this on subscription/open_streaming_pulls which fluctuates as subscriber clients come up and spin down.

Additionally, could you share which metrics you are using to determine that we are processing less than 10MB/s per client? I am trying to add a metric in our application to measure the payload size of a pubsub.Message.

These are server side metrics, but I'm seeing it per subscription, rather than per client. You can also estimate this if you know the average size of the message.

Another detail here is that within the Receive function, when we process a pubsub message, we publish a new message for the same ordering key before calling AckWithResult for the current message. Could that be an issue?

I don't think that's a problem, since the publish and subscribe side is fairly decoupled.

@pawanrawal
Copy link
Author

pawanrawal commented Jul 2, 2024

We tried the branch you shared with us on another project/subscription/topic, but it hasn't resolved the issue. I've added the details to the Google support ticket: Google Support Ticket.

We are still seeing a significant dip in the number of outstanding messages as perceived by the client at random intervals. Messages for some ordering keys are delayed by 5 to 10 minutes or more. Here's the process we follow:

  1. Receive a Pub/Sub message for an ordering key (m1).
  2. Process the message.
  3. Write the next message for the same ordering key (m2).
  4. Acknowledge the old message (m1).

At this point, we expect to see the next message (m2) within a few seconds, but sometimes it is delayed by 5 minutes or more. This issue affects not just one ordering key but most of them, which impacts our application as we need to process the messages in under 5 minutes. It seems like either the server stops sending messages for certain ordering keys or the client has them but isn't processing them. Any ideas on how to debug this or what to try next?

@hongalex
Copy link
Member

hongalex commented Jul 2, 2024

Sorry that the branch didn't fix your issue. I had suggested that fix since there is a known limitation with stream reconnecting and excess messages delivered.

It seems like either the server stops sending messages for certain ordering keys or the client has them but isn't processing them

So in this case, it seems like your issue is more closely tied to exactly once delivery and ordering, and potentially the way you nack messages. Someone else is actively looking at that ticket and you should expect further updates/guidance on the ticket rather than here for that. I'll be following the ticket closely too.

@maybelle
Copy link

I'm working with @pawanrawal on this issue. I've deployed the suggestion from the support case that we create multiple subscribers with NumGoroutines=1 within the same instance to mitigate the stream reset issues somewhat and it does seem to have a small positive effect. However, one unfortunate side effect is that the tracing metrics now are incorrect because there's a single shared metric for the client across multiple subscriber objects. I've been specifically tracking opencensus_cloud_google_com_go_pubsub_outstanding_messages but I'm sure the other metrics have similar issues. Would you be able to support a fix for this in the client?

@hongalex
Copy link
Member

I answered the previous question on the ticket but posting here for posterity:

You should create a custom TagKey and tag your application, which is recommended for OpenCensus. This roughly requires doing something like the following

applicationKey, _ := tag.NewKey("applicationKey")
// Use this ctx to pass into sub.Receive. Change "application1" to be a unique value to identify
// the various subscriber clients.
ctx, _ := tag.New(context.Background(),
	tag.Insert(applicationKey, "application1"))
...
// Append the new key to the OpenCensus view's TagKeys before registering
msgView := pubsub.OutstandingMessagesView
msgView.TagKeys = append(msgView.TagKeys, applicationKey)

if err := view.Register(msgView); err != nil {
	log.Fatalf("failed to register views: %v", err)
}

It is important to note that OpenCensus has been deprecated and we will eventually be moving to OpenTelemetry metrics. We will give sufficient notice beforehand before making any breaking changes and may continue to support OC metrics even after they're marked deprecated.

Closing the original issue since the investigation and fix is not currently tied to the client library and behavior of using exactly+ordering together.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. status: investigating The issue is under investigation, which is determined to be non-trivial.
Projects
None yet
Development

No branches or pull requests

3 participants