[go: up one dir, main page]

Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter): retry improvements (#9642)
Browse files Browse the repository at this point in the history
This PR makes two changes to retry behaviors in managedwriter.

In the first, this PR expands the set of conditions that trigger reconnect when sending the initial request to the backend.

In the second, this PR adds some additional handling for context cancellations when reading responses back from the service.  In cases like reconnection, we establish a new Connection, each of which has it's own associated context.  When draining remaining writes from a connection that's being shut down, we now pass the write into a retryer with a status-based error rather than raw context.Canceled, so we can recover more cleanly if the user is leveraging write retries.

Related internal issue:
b/326242484
  • Loading branch information
shollyman committed Mar 25, 2024
1 parent a7abf56 commit 48a9258
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 14 deletions.
27 changes: 21 additions & 6 deletions bigquery/storage/managedwriter/connection.go
Expand Up @@ -498,24 +498,39 @@ func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, f
// enables testing
type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)

var errConnectionCanceled = grpcstatus.Error(codes.Canceled, "client connection context was canceled")

// connRecvProcessor is used to propagate append responses back up with the originating write requests. It
// It runs as a goroutine. A connection object allows for reconnection, and each reconnection establishes a new
// processing gorouting and backing channel.
// context, processing goroutine and backing channel.
func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
for {
select {
case <-ctx.Done():
// Context is done, so we're not going to get further updates. Mark all work left in the channel
// with the context error. We don't attempt to re-enqueue in this case.
// Channel context is done, which means we're not getting further updates on in flight appends and should
// process everything left in the existing channel/connection.
doneErr := ctx.Err()
if doneErr == context.Canceled {
// This is a special case. Connection recovery ends up cancelling a context as part of a reconnection, and with
// request retrying enabled we can possibly re-enqueue writes. To allow graceful retry for this behavior, we
// we translate this to an rpc status error to avoid doing things like introducing context errors as part of the retry predicate.
//
// The tradeoff here is that write retries may roundtrip multiple times for something like a pool shutdown, even though the final
// outcome would result in an error.
doneErr = errConnectionCanceled
}
for {
pw, ok := <-ch
if !ok {
return
}
// It's unlikely this connection will recover here, but for correctness keep the flow controller
// state correct by releasing.
// This connection will not recover, but still attempt to keep flow controller state consistent.
co.release(pw)
pw.markDone(nil, ctx.Err())

// TODO: Determine if/how we should report this case, as we have no viable context for propagating.

// Because we can't tell locally if this write is done, we pass it back to the retrier for possible re-enqueue.
pw.writer.processRetry(pw, co, nil, doneErr)
}
case nextWrite, ok := <-ch:
if !ok {
Expand Down
22 changes: 16 additions & 6 deletions bigquery/storage/managedwriter/retry.go
Expand Up @@ -130,13 +130,23 @@ func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, b
// our bidi stream to close/reopen based on the responses error. Errors here signal that no
// further appends will succeed.
func shouldReconnect(err error) bool {
var knownErrors = []error{
io.EOF,
status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport

// io.EOF is the typical not connected signal.
if errors.Is(err, io.EOF) {
return true
}
// Backend responses that trigger reconnection on send.
reconnectCodes := []codes.Code{
codes.Aborted,
codes.Canceled,
codes.Unavailable,
codes.DeadlineExceeded,
}
for _, ke := range knownErrors {
if errors.Is(err, ke) {
return true
if s, ok := status.FromError(err); ok {
for _, c := range reconnectCodes {
if s.Code() == c {
return true
}
}
}
return false
Expand Down
21 changes: 19 additions & 2 deletions bigquery/storage/managedwriter/retry_test.go
Expand Up @@ -15,6 +15,7 @@
package managedwriter

import (
"context"
"fmt"
"io"
"testing"
Expand Down Expand Up @@ -60,6 +61,10 @@ func TestManagedStream_AppendErrorRetries(t *testing.T) {
err: status.Error(codes.ResourceExhausted, "Exceeds 'AppendRows throughput' quota for some reason"),
want: true,
},
{
err: context.Canceled,
want: false,
},
}

retry := newStatelessRetryer()
Expand All @@ -86,11 +91,23 @@ func TestManagedStream_ShouldReconnect(t *testing.T) {
want: true,
},
{
err: status.Error(codes.Unavailable, "nope"),
err: status.Error(codes.Unavailable, "the connection is draining"),
want: true,
},
{
err: status.Error(codes.ResourceExhausted, "oof"), // may just be pushback
want: false,
},
{
err: status.Error(codes.Unavailable, "the connection is draining"),
err: status.Error(codes.Canceled, "blah"),
want: true,
},
{
err: status.Error(codes.Aborted, "connection has been idle too long"),
want: true,
},
{
err: status.Error(codes.DeadlineExceeded, "blah"), // possibly bad backend, reconnect to speed recovery.
want: true,
},
{
Expand Down

0 comments on commit 48a9258

Please sign in to comment.