[go: up one dir, main page]

Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): introduce location routing head…
Browse files Browse the repository at this point in the history
…er (#7663)

This allows for the backend to more efficient route traffic.  Normally we'd extract this from the request, but location is not part of the write identifier.
  • Loading branch information
shollyman committed Mar 31, 2023
1 parent cd36965 commit cf06802
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
10 changes: 8 additions & 2 deletions bigquery/storage/managedwriter/client.go
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc/metadata"
)

// DetectProjectID is a sentinel value that instructs NewClient to detect the
Expand Down Expand Up @@ -217,7 +218,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre
}

// No existing pool available, create one for the location and add to shared pools.
pool, err := c.createPool(ctx, nil, streamFunc)
pool, err := c.createPool(ctx, loc, nil, streamFunc)
if err != nil {
return nil, err
}
Expand All @@ -226,13 +227,17 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre
}

// createPool builds a connectionPool.
func (c *Client) createPool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
func (c *Client) createPool(ctx context.Context, location string, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
cCtx, cancel := context.WithCancel(ctx)

if c.cfg == nil {
cancel()
return nil, fmt.Errorf("missing client config")
}
if location != "" {
// add location header to the retained pool context.
cCtx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_location=%s", location))
}
fcRequests := c.cfg.defaultInflightRequests
fcBytes := c.cfg.defaultInflightBytes
arOpts := c.cfg.defaultAppendRowsCallOptions
Expand All @@ -250,6 +255,7 @@ func (c *Client) createPool(ctx context.Context, settings *streamSettings, strea

pool := &connectionPool{
id: newUUID(poolIDPrefix),
location: location,
ctx: cCtx,
cancel: cancel,
open: createOpenF(ctx, streamFunc),
Expand Down
31 changes: 30 additions & 1 deletion bigquery/storage/managedwriter/client_test.go
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

func TestTableParentFromStreamName(t *testing.T) {
Expand Down Expand Up @@ -53,6 +54,34 @@ func TestTableParentFromStreamName(t *testing.T) {
}
}

func TestCreatePool_Location(t *testing.T) {
c := &Client{
cfg: &writerClientConfig{},
}
pool, err := c.createPool(context.Background(), "foo", nil, nil)
if err != nil {
t.Fatalf("createPool: %v", err)
}
meta, ok := metadata.FromOutgoingContext(pool.ctx)
if !ok {
t.Fatalf("no metadata in outgoing context")
}
vals, ok := meta["x-goog-request-params"]
if !ok {
t.Fatalf("metadata key not present")
}
found := false
for _, v := range vals {
if v == "write_location=foo" {
found = true
break
}
}
if !found {
t.Fatal("expected location header not found")
}
}

// TestCreatePool tests the result of calling createPool with different combinations
// of global configuration and per-writer configuration.
func TestCreatePool(t *testing.T) {
Expand Down Expand Up @@ -126,7 +155,7 @@ func TestCreatePool(t *testing.T) {
c := &Client{
cfg: tc.cfg,
}
got, err := c.createPool(context.Background(), tc.settings, nil)
got, err := c.createPool(context.Background(), "", tc.settings, nil)
if err != nil {
if !tc.wantErr {
t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
Expand Down
3 changes: 2 additions & 1 deletion bigquery/storage/managedwriter/connection.go
Expand Up @@ -43,7 +43,8 @@ var (
// The pool retains references to connections, and maintains the mapping between writers
// and connections.
type connectionPool struct {
id string
id string
location string // BQ region associated with this pool.

// the pool retains the long-lived context responsible for opening/maintaining bidi connections.
ctx context.Context
Expand Down

0 comments on commit cf06802

Please sign in to comment.