-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub: Messages for a key are not received up to an indefinite time when an old stream within the iterator closes and a new stream is opened #10440
Comments
This isn't recommended for exactly once delivery, since you need to check the AckResult to make sure the acks actually go through. Is there a reason why you're using the former?
To my knowledge, once a message gets into the library sections, there is always a graceful shutdown (it doesn't discard messages), even when the streams are recreated. I think your stream count is too high. It's something we just started warning users about, but we strongly recommend using 1 stream ( |
So we have made the change to use "AckResult"instead of "Ack"
That's what I was expecting, but it seems that some messages associated with an ordering key are sometimes delayed by up to 10 minutes. This typically occurs after a stream is recreated.
We tried setting NumGoroutines to 1, but this led to a reduction in the number of outstanding messages and caused delays in processing them. I am currently gathering metrics to determine the exact amount of data we're processing before adjusting the NumGoroutines value. Is there a metric in the library that can provide this information? Additionally, why does a higher goroutine count pose a problem? We have also created a Google support ticket that you can look into: https://console.cloud.google.com/support/cases/detail/v2/51980112 Thanks! |
Thanks, currently following up on the support ticket but thought I'd update you here as well:
The problem is that there's an unintended behavior where As mentioned before, each stream can handle 10 MB/s, and looking at your processing speed, it doesn't look like you are hitting that. |
Thanks for your response, @hongalex. Your explanation provides helpful context. However, I'm still unclear about how this relates to the issue we're facing. Our workers' outstanding messages are significantly below the MaxOutstandingMessages threshold. Are you observing any server-side metrics indicating that the server is pausing to send metrics to a client due to backpressure? Additionally, could you share which metrics you are using to determine that we are processing less than 10MB/s per client? I am trying to add a metric in our application to measure the payload size of a pubsub.Message. Another detail here is that within the Receive function, when we process a pubsub message, we publish a new message for the same ordering key before calling AckWithResult for the current message. Could that be an issue? |
Yes, I can see that your streams are being flow controlled, which unfortunately aren't public. Strangely, the expirations aren't tied to when the flow control is happening. It might be more correlated to when streams are reopened (as you originally suggested) but it's hard for me to see since I'm basing this on
These are server side metrics, but I'm seeing it per subscription, rather than per client. You can also estimate this if you know the average size of the message.
I don't think that's a problem, since the publish and subscribe side is fairly decoupled. |
We tried the branch you shared with us on another project/subscription/topic, but it hasn't resolved the issue. I've added the details to the Google support ticket: Google Support Ticket. We are still seeing a significant dip in the number of outstanding messages as perceived by the client at random intervals. Messages for some ordering keys are delayed by 5 to 10 minutes or more. Here's the process we follow:
At this point, we expect to see the next message (m2) within a few seconds, but sometimes it is delayed by 5 minutes or more. This issue affects not just one ordering key but most of them, which impacts our application as we need to process the messages in under 5 minutes. It seems like either the server stops sending messages for certain ordering keys or the client has them but isn't processing them. Any ideas on how to debug this or what to try next? |
Sorry that the branch didn't fix your issue. I had suggested that fix since there is a known limitation with stream reconnecting and excess messages delivered.
So in this case, it seems like your issue is more closely tied to exactly once delivery and ordering, and potentially the way you nack messages. Someone else is actively looking at that ticket and you should expect further updates/guidance on the ticket rather than here for that. I'll be following the ticket closely too. |
I'm working with @pawanrawal on this issue. I've deployed the suggestion from the support case that we create multiple subscribers with |
I answered the previous question on the ticket but posting here for posterity: You should create a custom applicationKey, _ := tag.NewKey("applicationKey")
// Use this ctx to pass into sub.Receive. Change "application1" to be a unique value to identify
// the various subscriber clients.
ctx, _ := tag.New(context.Background(),
tag.Insert(applicationKey, "application1"))
...
// Append the new key to the OpenCensus view's TagKeys before registering
msgView := pubsub.OutstandingMessagesView
msgView.TagKeys = append(msgView.TagKeys, applicationKey)
if err := view.Register(msgView); err != nil {
log.Fatalf("failed to register views: %v", err)
} It is important to note that OpenCensus has been deprecated and we will eventually be moving to OpenTelemetry metrics. We will give sufficient notice beforehand before making any breaking changes and may continue to support OC metrics even after they're marked deprecated. Closing the original issue since the investigation and fix is not currently tied to the client library and behavior of using exactly+ordering together. |
Client
PubSub v1.38.0
Environment
GKE
Go Environment
NA
Code
We are using Google Pub/Sub with exactly-once and ordered delivery enabled on a topic. For acknowledging messages, we use "Ack" instead of "AckWithResult". I've observed that whenever a new stream is opened due to the Pub/Sub server closing an old stream (part of the iterator), messages that have been Ack'ed stop being received mostly until the ack deadline is reached or sometimes even longer (say 8 or 12 mins) when the deadline is 2m.
It seems that after a stream is closed because the Pub/Sub server sent a GOAWAY, the library fails to acknowledge the messages that were Ack'ed using ACK until the server's ack deadline is reached, and then they are resent for processing. Could it be that when streams are closed, the library discards the messages that were accepted for ACKing or doesn't retry for certain gRPC error codes when it should?
NumGoRoutines: 10
MaxOutstandingMessages: 4000
MinExtensionTimePeriod: 2m
MaxExtensionTimePeriod: 2m
Expected behavior
I would have expected the messages to show up immediately as they were acked by the application.
Actual behavior
The messages for a given key do not show up for a couple of couple of mins ("MaxExtensionPeriod" is 2 mins) and then they show up.
Screenshots
The text was updated successfully, but these errors were encountered: