From 2c14af5b93a1547856b6650e66998e3e27b90c8b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 20 Aug 2020 21:47:38 +0200 Subject: [PATCH 1/2] Materialize: Only get schema from source tablets if target is missing tables Signed-off-by: Rohit Nayak --- go/vt/wrangler/materializer.go | 82 +++++++++++++++---------- go/vt/wrangler/materializer_env_test.go | 33 ++++++++-- go/vt/wrangler/materializer_test.go | 8 +++ 3 files changed, 87 insertions(+), 36 deletions(-) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 57d12feaf3d..e881c6ee2db 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -50,6 +50,7 @@ type materializer struct { targetVSchema *vindexes.KeyspaceSchema sourceShards []*topo.ShardInfo targetShards []*topo.ShardInfo + mu sync.Mutex } const ( @@ -589,43 +590,45 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater }, nil } +func (mz *materializer) getTableDDLs(ctx context.Context) (map[string]string, error) { + tableDDLs := make(map[string]string) + allTables := []string{"/.*/"} + + sourceMaster := mz.sourceShards[0].MasterAlias + if sourceMaster == nil { + return nil, fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName()) + } + + log.Infof("getting table schemas from source master %v...", sourceMaster) + var err error + sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false) + if err != nil { + return nil, err + } + log.Infof("got table schemas from source master %v.", sourceMaster) + + for _, td := range sourceSchema.TableDefinitions { + tableDDLs[td.Name] = td.Schema + } + return tableDDLs, nil +} + func (mz *materializer) deploySchema(ctx context.Context) error { + var tableDDLs map[string]string return mz.forAllTargets(func(target *topo.ShardInfo) error { allTables := []string{"/.*/"} hasTargetTable := map[string]bool{} - { - log.Infof("getting table schemas from target master %v...", target.MasterAlias) - targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false) - if err != nil { - return err - } - log.Infof("got table schemas from target master %v.", target.MasterAlias) - - for _, td := range targetSchema.TableDefinitions { - hasTargetTable[td.Name] = true - } + log.Infof("getting table schemas from target master %v...", target.MasterAlias) + targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false) + if err != nil { + return err } + log.Infof("got table schemas from target master %v.", target.MasterAlias) - sourceDDL := map[string]string{} - { - sourceMaster := mz.sourceShards[0].MasterAlias - if sourceMaster == nil { - return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName()) - } - - log.Infof("getting table schemas from source master %v...", sourceMaster) - var err error - sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false) - if err != nil { - return err - } - log.Infof("got table schemas from source master %v.", sourceMaster) - - for _, td := range sourceSchema.TableDefinitions { - sourceDDL[td.Name] = td.Schema - } + for _, td := range targetSchema.TableDefinitions { + hasTargetTable[td.Name] = true } targetTablet, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias) @@ -633,7 +636,9 @@ func (mz *materializer) deploySchema(ctx context.Context) error { return err } - applyDDLs := []string{} + var applyDDLs []string + { + } for _, ts := range mz.ms.TableSettings { if hasTargetTable[ts.TargetTable] { // Table already exists. @@ -642,6 +647,21 @@ func (mz *materializer) deploySchema(ctx context.Context) error { if ts.CreateDdl == "" { return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable) } + + var err error + mz.mu.Lock() + if len(tableDDLs) == 0 { + //only get ddls for tables, once and lazily: if we need to copy the schema from source to target + //we copy schemas from masters on the source keyspace + //and we have found use cases where user just has a replica (no master) in the source keyspace + tableDDLs, err = mz.getTableDDLs(ctx) + } + mz.mu.Unlock() + if err != nil { + log.Errorf("Error getting DDLs of source tables: %s", err.Error()) + return err + } + createDDL := ts.CreateDdl if createDDL == createDDLAsCopy || createDDL == createDDLAsCopyDropConstraint { if ts.SourceExpression != "" { @@ -656,7 +676,7 @@ func (mz *materializer) deploySchema(ctx context.Context) error { } } - ddl, ok := sourceDDL[ts.TargetTable] + ddl, ok := tableDDLs[ts.TargetTable] if !ok { return fmt.Errorf("source table %v does not exist", ts.TargetTable) } diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index 6c8df72d50a..3142b0d0326 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -19,6 +19,7 @@ package wrangler import ( "fmt" "regexp" + "strconv" "strings" "sync" "testing" @@ -63,7 +64,6 @@ func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, s tmc: newTestMaterializerTMClient(), } env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) - tabletID := 100 for _, shard := range sources { _ = env.addTablet(tabletID, env.ms.SourceKeyspace, shard, topodatapb.TabletType_MASTER) @@ -161,18 +161,41 @@ type testMaterializerTMClient struct { tmclient.TabletManagerClient schema map[string]*tabletmanagerdatapb.SchemaDefinition - mu sync.Mutex - vrQueries map[int][]*queryResult + mu sync.Mutex + vrQueries map[int][]*queryResult + getSchemaCounts map[string]int + muSchemaCount sync.Mutex } func newTestMaterializerTMClient() *testMaterializerTMClient { return &testMaterializerTMClient{ - schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), - vrQueries: make(map[int][]*queryResult), + schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), + vrQueries: make(map[int][]*queryResult), + getSchemaCounts: make(map[string]int), } } +func (tmc *testMaterializerTMClient) schemaRequested(uid uint32) { + tmc.muSchemaCount.Lock() + defer tmc.muSchemaCount.Unlock() + key := strconv.Itoa(int(uid)) + n, ok := tmc.getSchemaCounts[key] + if !ok { + tmc.getSchemaCounts[key] = 1 + } else { + tmc.getSchemaCounts[key] = n + 1 + } +} + +func (tmc *testMaterializerTMClient) getSchemaRequestCount(uid uint32) int { + tmc.muSchemaCount.Lock() + defer tmc.muSchemaCount.Unlock() + key := strconv.Itoa(int(uid)) + return tmc.getSchemaCounts[key] +} + func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { + tmc.schemaRequested(tablet.Alias.Uid) schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} for _, table := range tables { // TODO: Add generalized regexps if needed for test purposes. diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 6adb25b3e54..12882b0d8b0 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -1699,6 +1699,8 @@ func TestMaterializerDeploySchema(t *testing.T) { err := env.wr.Materialize(context.Background(), ms) assert.NoError(t, err) env.tmc.verifyQueries(t) + require.Equal(t, env.tmc.getSchemaRequestCount(100), 1) + require.Equal(t, env.tmc.getSchemaRequestCount(200), 1) } func TestMaterializerCopySchema(t *testing.T) { @@ -1734,6 +1736,9 @@ func TestMaterializerCopySchema(t *testing.T) { err := env.wr.Materialize(context.Background(), ms) assert.NoError(t, err) env.tmc.verifyQueries(t) + require.Equal(t, env.tmc.getSchemaRequestCount(100), 1) + require.Equal(t, env.tmc.getSchemaRequestCount(200), 1) + } func TestMaterializerExplicitColumns(t *testing.T) { @@ -1922,6 +1927,9 @@ func TestMaterializerNoDDL(t *testing.T) { err := env.wr.Materialize(context.Background(), ms) assert.EqualError(t, err, "target table t1 does not exist and there is no create ddl defined") + require.Equal(t, env.tmc.getSchemaRequestCount(100), 0) + require.Equal(t, env.tmc.getSchemaRequestCount(200), 1) + } func TestMaterializerNoSourceMaster(t *testing.T) { From 27a3c57e80b4780ec839b5a11630483a69d972b9 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 22 Aug 2020 16:28:39 +0200 Subject: [PATCH 2/2] Address review comments Signed-off-by: Rohit Nayak --- go/vt/wrangler/materializer.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index e881c6ee2db..369d992ee45 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -50,7 +50,6 @@ type materializer struct { targetVSchema *vindexes.KeyspaceSchema sourceShards []*topo.ShardInfo targetShards []*topo.ShardInfo - mu sync.Mutex } const ( @@ -590,8 +589,8 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater }, nil } -func (mz *materializer) getTableDDLs(ctx context.Context) (map[string]string, error) { - tableDDLs := make(map[string]string) +func (mz *materializer) getSourceTableDDLs(ctx context.Context) (map[string]string, error) { + sourceDDLs := make(map[string]string) allTables := []string{"/.*/"} sourceMaster := mz.sourceShards[0].MasterAlias @@ -608,13 +607,14 @@ func (mz *materializer) getTableDDLs(ctx context.Context) (map[string]string, er log.Infof("got table schemas from source master %v.", sourceMaster) for _, td := range sourceSchema.TableDefinitions { - tableDDLs[td.Name] = td.Schema + sourceDDLs[td.Name] = td.Schema } - return tableDDLs, nil + return sourceDDLs, nil } func (mz *materializer) deploySchema(ctx context.Context) error { - var tableDDLs map[string]string + var sourceDDLs map[string]string + var mu sync.Mutex return mz.forAllTargets(func(target *topo.ShardInfo) error { allTables := []string{"/.*/"} @@ -637,8 +637,6 @@ func (mz *materializer) deploySchema(ctx context.Context) error { } var applyDDLs []string - { - } for _, ts := range mz.ms.TableSettings { if hasTargetTable[ts.TargetTable] { // Table already exists. @@ -649,14 +647,14 @@ func (mz *materializer) deploySchema(ctx context.Context) error { } var err error - mz.mu.Lock() - if len(tableDDLs) == 0 { + mu.Lock() + if len(sourceDDLs) == 0 { //only get ddls for tables, once and lazily: if we need to copy the schema from source to target //we copy schemas from masters on the source keyspace //and we have found use cases where user just has a replica (no master) in the source keyspace - tableDDLs, err = mz.getTableDDLs(ctx) + sourceDDLs, err = mz.getSourceTableDDLs(ctx) } - mz.mu.Unlock() + mu.Unlock() if err != nil { log.Errorf("Error getting DDLs of source tables: %s", err.Error()) return err @@ -676,7 +674,7 @@ func (mz *materializer) deploySchema(ctx context.Context) error { } } - ddl, ok := tableDDLs[ts.TargetTable] + ddl, ok := sourceDDLs[ts.TargetTable] if !ok { return fmt.Errorf("source table %v does not exist", ts.TargetTable) }