Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigquery/storage/managedwriter): correct reconnection logic #8164

Merged
merged 2 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,12 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
forceReconnect := false
if pw.writer != nil && pw.descVersion != nil && pw.descVersion.isNewer(pw.writer.curDescVersion) {
pw.writer.curDescVersion = pw.descVersion
if !canMultiplex(pw.writeStreamID) {
if co.optimizer == nil {
forceReconnect = true
} else {
if !co.optimizer.isMultiplexing() {
forceReconnect = true
}
}
}

Expand Down
68 changes: 56 additions & 12 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,6 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Parallel()
testPendingStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("SchemaEvolution", func(t *testing.T) {
t.Parallel()
testSchemaEvolution(ctx, t, mwClient, bqClient, dataset)
})
t.Run("SimpleCDC", func(t *testing.T) {
t.Parallel()
testSimpleCDC(ctx, t, mwClient, bqClient, dataset)
Expand All @@ -267,6 +263,56 @@ func TestIntegration_ManagedWriter(t *testing.T) {
})
}

func TestIntegration_SchemaEvolution(t *testing.T) {

testcases := []struct {
desc string
clientOpts []option.ClientOption
writerOpts []WriterOption
}{
{
desc: "Simplex_Committed",
writerOpts: []WriterOption{
WithType(CommittedStream),
},
},
{
desc: "Simplex_Default",
writerOpts: []WriterOption{
WithType(DefaultStream),
},
},
{
desc: "Multiplex_Default",
clientOpts: []option.ClientOption{
WithMultiplexing(),
WithMultiplexPoolLimit(2),
},
writerOpts: []WriterOption{
WithType(DefaultStream),
},
},
}

for _, tc := range testcases {
mwClient, bqClient := getTestClients(context.Background(), t, tc.clientOpts...)
defer mwClient.Close()
defer bqClient.Close()

dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1")
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
t.Run(tc.desc, func(t *testing.T) {
testSchemaEvolution(ctx, t, mwClient, bqClient, dataset, tc.writerOpts...)
})
}
}

func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
Expand Down Expand Up @@ -1094,7 +1140,7 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
}
}

func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
Expand All @@ -1104,11 +1150,9 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithSchemaDescriptor(descriptorProto),
WithType(CommittedStream),
)
opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
opts = append(opts, WithSchemaDescriptor(descriptorProto))
ms, err := mwClient.NewManagedStream(ctx, opts...)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
Expand Down Expand Up @@ -1154,7 +1198,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
// this subjects us to a possible race, as the backend that services GetWriteStream isn't necessarily the
// one in charge of the stream, and thus may report ready early.
for {
resp, err := ms.AppendRows(ctx, [][]byte{latestRow}, WithOffset(curOffset))
resp, err := ms.AppendRows(ctx, [][]byte{latestRow})
if err != nil {
t.Errorf("got error on dupe append: %v", err)
break
Expand All @@ -1181,7 +1225,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
t.Errorf("failed to marshal evolved message: %v", err)
}
// Send an append with an evolved schema
res, err := ms.AppendRows(ctx, [][]byte{b}, WithOffset(curOffset), UpdateSchemaDescriptor(descriptorProto))
res, err := ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
if err != nil {
t.Errorf("failed evolved append: %v", err)
}
Expand Down
22 changes: 22 additions & 0 deletions bigquery/storage/managedwriter/send_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type sendOptimizer interface {

// optimizeSend handles possible manipulation of a request, and triggers the send.
optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error

// isMultiplexing tracks if we've actually sent writes to more than a single stream on this connection.
isMultiplexing() bool
}

// verboseOptimizer is a primarily a testing optimizer that always sends the full request.
Expand All @@ -50,6 +53,11 @@ func (vo *verboseOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsC
return arc.Send(pw.constructFullRequest(true))
}

func (vo *verboseOptimizer) isMultiplexing() bool {
// we declare this no to ensure we always reconnect on schema changes.
return false
}

// simplexOptimizer is used for connections bearing AppendRowsRequest for only a single stream.
//
// The optimizations here are straightforward:
Expand Down Expand Up @@ -80,6 +88,11 @@ func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsC
return err
}

func (so *simplexOptimizer) isMultiplexing() bool {
// A simplex optimizer is not designed for multiplexing.
return false
}

// multiplexOptimizer is used for connections where requests for multiple default streams are sent on a common
// connection. Only default streams can currently be multiplexed.
//
Expand All @@ -93,10 +106,12 @@ func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsC
type multiplexOptimizer struct {
prevStream string
prevDescriptorVersion *descriptorVersion
multiplexStreams bool
}

func (mo *multiplexOptimizer) signalReset() {
mo.prevStream = ""
mo.multiplexStreams = false
mo.prevDescriptorVersion = nil
}

Expand Down Expand Up @@ -139,11 +154,18 @@ func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRow
mo.prevStream = pw.writeStreamID
mo.prevDescriptorVersion = pw.descVersion
}
// Also, note that we've sent traffic for multiple streams, which means the backend recognizes this
// is a multiplex stream as well.
mo.multiplexStreams = true
}
}
return err
}

func (mo *multiplexOptimizer) isMultiplexing() bool {
return mo.multiplexStreams
}

// getDescriptorFromAppend is a utility method for extracting the deeply nested schema
// descriptor from a request. It returns a nil if the descriptor is not set.
func getDescriptorFromAppend(req *storagepb.AppendRowsRequest) *descriptorpb.DescriptorProto {
Expand Down