Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): Add RESOURCE_EXHAUSTED to retryable transaction codes #10412

Merged
merged 12 commits into from
Jul 18, 2024
Merged
Prev Previous commit
Next Next commit
feat: Add RESOURCE_EXHAUSTED to retryable transaction codes
Update formatting
  • Loading branch information
Vizerai committed Jun 21, 2024
commit e697a1ea59bd41a091faab13b83ff7639689a3b3
163 changes: 83 additions & 80 deletions spanner/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,75 +16,77 @@ limitations under the License.

package spanner

import (
"context"
"strings"
"time"
import(
"context"
"strings"
"time"

"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/ptypes"
"github.com/googleapis/gax-go/v2"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/ptypes"
"github.com/googleapis/gax-go/v2"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status")

const (
retryInfoKey = "google.rpc.retryinfo-bin"
)
const(retryInfoKey = "google.rpc.retryinfo-bin")

// DefaultRetryBackoff is used for retryers as a fallback value when the server
// did not return any retry information.
var DefaultRetryBackoff = gax.Backoff{
Initial: 20 * time.Millisecond,
Max: 32 * time.Second,
Multiplier: 1.3,
}
// DefaultRetryBackoff is used for retryers as a fallback value when the
// server did not return any retry information.
var DefaultRetryBackoff =
gax.Backoff{
Initial : 20 * time.Millisecond,
Max : 32 * time.Second,
Multiplier : 1.3,
}

// spannerRetryer extends the generic gax Retryer, but also checks for any
// retry info returned by Cloud Spanner and uses that if present.
type spannerRetryer struct {
gax.Retryer
// spannerRetryer extends the generic gax Retryer, but also checks for
// any retry info returned by Cloud Spanner and uses that if present.
type spannerRetryer struct {
gax.Retryer
}

// onCodes returns a spannerRetryer that will retry on the specified error
// codes. For Internal errors, only errors that have one of a list of known
// descriptions should be retried.
func onCodes(bo gax.Backoff, cc ...codes.Code) gax.Retryer {
return &spannerRetryer{
Retryer: gax.OnCodes(cc, bo),
}
return &spannerRetryer {
Retryer:
gax.OnCodes(cc, bo),
}
}

// Retry returns the retry delay returned by Cloud Spanner if that is present.
// Otherwise it returns the retry delay calculated by the generic gax Retryer.
func (r *spannerRetryer) Retry(err error) (time.Duration, bool) {
if status.Code(err) == codes.Internal &&
!strings.Contains(err.Error(), "stream terminated by RST_STREAM") &&
// See b/25451313.
!strings.Contains(err.Error(), "HTTP/2 error code: INTERNAL_ERROR") &&
// See b/27794742.
!strings.Contains(err.Error(), "Connection closed with unknown cause") &&
!strings.Contains(err.Error(), "Received unexpected EOS on DATA frame from server") {
return 0, false
}
func(r *spannerRetryer) Retry(err error)(time.Duration, bool) {
if status
.Code(err) == codes.Internal &&
!strings.Contains(err.Error(), "stream terminated by RST_STREAM") &&
// See b/25451313.
!strings.Contains(err.Error(), "HTTP/2 error code: INTERNAL_ERROR") &&
// See b/27794742.
!strings.Contains(err.Error(),
"Connection closed with unknown cause") &&
!strings.Contains(err.Error(),
"Received unexpected EOS on DATA frame from server"){
return 0, false}

delay, shouldRetry := r.Retryer.Retry(err)
if !shouldRetry {
return 0, false
}
if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
delay = serverDelay
}
return delay, true
delay,
shouldRetry : = r.Retryer.Retry(err) if !shouldRetry {
return 0, false
}
if serverDelay
, hasServerDelay : = ExtractRetryDelay(err);
hasServerDelay { delay = serverDelay }
return delay, true
}

// runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound executes the given function and
// retries it if it returns an Aborted, Session not found error or certain Internal errors. The retry
// is delayed if the error was Aborted or Internal error. The delay between retries is the delay
// returned by Cloud Spanner, or if none is returned, the calculated delay with
// a minimum of 10ms and maximum of 32s. There is no delay before the retry if
// the error was Session not found or failed inline begin transaction.
// runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound executes the given
// function and retries it if it returns an Aborted, Session not found error or
// certain Internal errors. The retry is delayed if the error was Aborted or
// Internal error. The delay between retries is the delay returned by Cloud
// Spanner, or if none is returned, the calculated delay with a minimum of 10ms
// and maximum of 32s. There is no delay before the retry if the error was
// Session not found or failed inline begin transaction.
func runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound(ctx context.Context, f func(context.Context) error) error {
retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.ResourceExhausted, codes.Internal)
funcWithRetry := func(ctx context.Context) error {
Expand Down Expand Up @@ -124,35 +126,36 @@ func runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound(ctx context.Conte
return err
}
trace.TracePrintf(ctx, nil, "Backing off after ABORTED for %s, then retrying", delay)
if err := gax.Sleep(ctx, delay); err != nil {
return err
}
}
}
return funcWithRetry(ctx)
if err := gax.Sleep(ctx, delay);
err != nil { return err }
}
}
return funcWithRetry(ctx)
}

// ExtractRetryDelay extracts retry backoff from a *spanner.Error if present.
func ExtractRetryDelay(err error) (time.Duration, bool) {
var se *Error
var s *status.Status
// Unwrap status error.
if errorAs(err, &se) {
s = status.Convert(se.Unwrap())
} else {
s = status.Convert(err)
}
if s == nil {
return 0, false
}
for _, detail := range s.Details() {
if retryInfo, ok := detail.(*errdetails.RetryInfo); ok {
delay, err := ptypes.Duration(retryInfo.RetryDelay)
if err != nil {
return 0, false
}
return delay, true
}
}
return 0, false
func ExtractRetryDelay(err error)(time.Duration, bool) {
var se *Error var s *status.Status
// Unwrap status error.
if errorAs (err, &se) {
s = status.Convert(se.Unwrap())
}
else {
s = status.Convert(err)
}
if s
== nil { return 0, false }
for
_, detail : = range s.Details() {
if retryInfo
, ok : = detail.(*errdetails.RetryInfo);
ok {
delay,
err : = ptypes.Duration(retryInfo.RetryDelay) if err != nil {
return 0, false
}
return delay, true
}
}
return 0, false
}
Loading