diff --git a/go/vt/mysqlctl/schema.go b/go/vt/mysqlctl/schema.go index 9b2146cf13e..4097fdc6ac8 100644 --- a/go/vt/mysqlctl/schema.go +++ b/go/vt/mysqlctl/schema.go @@ -22,21 +22,29 @@ import ( "regexp" "sort" "strings" - "sync" + + "golang.org/x/sync/errgroup" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate/evalengine" - - "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl/tmutils" - querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" +) + +const ( + // In a local environment and without latency, we have seen that an unbounded concurrency still translates to less than + // 20 concurrent MySQL connections. Which is why placing a limit of 20 concurrent goroutines (each mapped to a MySQL connection) + // is unlikely to affect optimal environments. + // In high latency environments, unbounded concurrency can translate to a very high number of concurrent MySQL connections. This + // is an undesirable behavior. We prefer to push back on GetSchema and make it run over longer time, instead. + getSchemaConcurrency = 20 ) var autoIncr = regexp.MustCompile(` AUTO_INCREMENT=\d+`) @@ -102,18 +110,18 @@ func (mysqld *Mysqld) GetSchema(ctx context.Context, dbName string, request *tab ctx, cancel := context.WithCancel(ctx) defer cancel() - var wg sync.WaitGroup allErrors := &concurrency.AllErrorRecorder{} + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(getSchemaConcurrency) + // Get per-table schema concurrently. tableNames := make([]string, 0, len(tds)) for _, td := range tds { tableNames = append(tableNames, td.Name) + td := td - wg.Add(1) - go func(td *tabletmanagerdatapb.TableDefinition) { - defer wg.Done() - + eg.Go(func() error { fields, columns, schema, err := mysqld.collectSchema(ctx, dbName, td.Name, td.Type, request.TableSchemaOnly) if err != nil { // There's a possible race condition: it could happen that a table was dropped in between reading @@ -122,40 +130,39 @@ func (mysqld *Mysqld) GetSchema(ctx context.Context, dbName string, request *tab // This is fine. We identify the situation and keep the table without any fields/columns/key information sqlErr, isSQLErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) if isSQLErr && sqlErr != nil && sqlErr.Number() == mysql.ERNoSuchTable { - return + return nil } allErrors.RecordError(err) cancel() - return + return err } td.Fields = fields td.Columns = columns td.Schema = schema - }(td) + return nil + }) } + colMap := map[string][]string{} // Get primary columns concurrently. // The below runs a single query on `INFORMATION_SCHEMA` and does not interact with the actual tables. // It is therefore safe to run even if some tables are dropped in the interim. - colMap := map[string][]string{} - if len(tableNames) > 0 { - wg.Add(1) - go func() { - defer wg.Done() - + if len(tableNames) > 0 && !request.TableSchemaOnly { + eg.Go(func() error { var err error colMap, err = mysqld.getPrimaryKeyColumns(ctx, dbName, tableNames...) if err != nil { allErrors.RecordError(err) cancel() - return + return err } - }() + return nil + }) } - wg.Wait() + eg.Wait() if err := allErrors.AggrError(vterrors.Aggregate); err != nil { return nil, err }