[go: up one dir, main page]

Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter): remove old header routing (#6960)
Browse files Browse the repository at this point in the history
Previously, we needed to inject a routing header when opening the bidi AppendRows connection to successfully route traffic to the correct region.  Routing no longer needs this explicit hint, and works by examining the requests on the stream.

This change removes the header injection, and also simplifies the open contract we use to no longer include a stream ID, which was in place solely for the injection.

Related internal issue: 185842996
  • Loading branch information
shollyman committed Oct 28, 2022
1 parent bda33ab commit 434b407
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 18 deletions.
9 changes: 3 additions & 6 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/api/option"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// DetectProjectID is a sentinel value that instructs NewClient to detect the
Expand Down Expand Up @@ -93,11 +92,9 @@ func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*M
}

// createOpenF builds the opener function we need to access the AppendRows bidi stream.
func createOpenF(ctx context.Context, streamFunc streamClientFunc) func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
arc, err := streamFunc(
// Bidi Streaming doesn't append stream ID as request metadata, so we must inject it manually.
metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)), opts...)
func createOpenF(ctx context.Context, streamFunc streamClientFunc) func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
arc, err := streamFunc(ctx, opts...)
if err != nil {
return nil, err
}
Expand Down
10 changes: 3 additions & 7 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -82,8 +82,8 @@ type ManagedStream struct {
// aspects of the stream client
ctx context.Context // retained context for the stream
cancel context.CancelFunc
callOptions []gax.CallOption // options passed when opening an append client
open func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection
callOptions []gax.CallOption // options passed when opening an append client
open func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection

mu sync.Mutex
arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection
Expand Down Expand Up @@ -225,11 +225,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
r := &unaryRetryer{}
for {
recordStat(ms.ctx, AppendClientOpenCount, 1)
streamID := ""
if ms.streamSettings != nil {
streamID = ms.streamSettings.streamID
}
arc, err := ms.open(streamID, ms.callOptions...)
arc, err := ms.open(ms.callOptions...)
bo, shouldRetry := r.Retry(err)
if err != nil && shouldRetry {
recordStat(ms.ctx, AppendClientOpenRetryCount, 1)
Expand Down
10 changes: 5 additions & 5 deletions bigquery/storage/managedwriter/managed_stream_test.go
Expand Up @@ -64,7 +64,7 @@ func TestManagedStream_OpenWithRetry(t *testing.T) {
for _, tc := range testCases {
ms := &ManagedStream{
ctx: context.Background(),
open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
if len(tc.errors) == 0 {
panic("out of errors")
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (tarc *testAppendRowsClient) CloseSend() error {
}

// openTestArc handles wiring in a test AppendRowsClient into a managedstream by providing the open function.
func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.AppendRowsRequest) error, recvF func() (*storagepb.AppendRowsResponse, error)) func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.AppendRowsRequest) error, recvF func() (*storagepb.AppendRowsResponse, error)) func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
sF := func(req *storagepb.AppendRowsRequest) error {
testARC.requests = append(testARC.requests, req)
return nil
Expand All @@ -143,7 +143,7 @@ func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.Append
testARC.closeF = func() error {
return nil
}
return func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
testARC.openCount = testARC.openCount + 1
return testARC, nil
}
Expand Down Expand Up @@ -397,14 +397,14 @@ func TestManagedStream_AppendDeadlocks(t *testing.T) {
openF := openTestArc(&testAppendRowsClient{}, nil, nil)
ms := &ManagedStream{
ctx: context.Background(),
open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
if len(tc.openErrors) == 0 {
panic("out of open errors")
}
curErr := tc.openErrors[0]
tc.openErrors = tc.openErrors[1:]
if curErr == nil {
return openF(s, opts...)
return openF(opts...)
}
return nil, curErr
},
Expand Down

0 comments on commit 434b407

Please sign in to comment.