From 6fe985b9cc9773142adf4c005839f1693a98a5c5 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 26 Jul 2023 08:51:07 +0300 Subject: [PATCH 1/2] GetSchema: limit concurrent operations Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/schema.go | 48 +++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/go/vt/mysqlctl/schema.go b/go/vt/mysqlctl/schema.go index 9b2146cf13e..4bf7f22a430 100644 --- a/go/vt/mysqlctl/schema.go +++ b/go/vt/mysqlctl/schema.go @@ -22,21 +22,24 @@ import ( "regexp" "sort" "strings" - "sync" + + "golang.org/x/sync/errgroup" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate/evalengine" - - "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl/tmutils" - querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" +) + +const ( + getSchemaConcurrency = 20 ) var autoIncr = regexp.MustCompile(` AUTO_INCREMENT=\d+`) @@ -102,18 +105,18 @@ func (mysqld *Mysqld) GetSchema(ctx context.Context, dbName string, request *tab ctx, cancel := context.WithCancel(ctx) defer cancel() - var wg sync.WaitGroup allErrors := &concurrency.AllErrorRecorder{} + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(getSchemaConcurrency) + // Get per-table schema concurrently. tableNames := make([]string, 0, len(tds)) for _, td := range tds { tableNames = append(tableNames, td.Name) + td := td - wg.Add(1) - go func(td *tabletmanagerdatapb.TableDefinition) { - defer wg.Done() - + eg.Go(func() error { fields, columns, schema, err := mysqld.collectSchema(ctx, dbName, td.Name, td.Type, request.TableSchemaOnly) if err != nil { // There's a possible race condition: it could happen that a table was dropped in between reading @@ -122,40 +125,39 @@ func (mysqld *Mysqld) GetSchema(ctx context.Context, dbName string, request *tab // This is fine. We identify the situation and keep the table without any fields/columns/key information sqlErr, isSQLErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) if isSQLErr && sqlErr != nil && sqlErr.Number() == mysql.ERNoSuchTable { - return + return nil } allErrors.RecordError(err) cancel() - return + return err } td.Fields = fields td.Columns = columns td.Schema = schema - }(td) + return nil + }) } + colMap := map[string][]string{} // Get primary columns concurrently. // The below runs a single query on `INFORMATION_SCHEMA` and does not interact with the actual tables. // It is therefore safe to run even if some tables are dropped in the interim. - colMap := map[string][]string{} - if len(tableNames) > 0 { - wg.Add(1) - go func() { - defer wg.Done() - + if len(tableNames) > 0 && !request.TableSchemaOnly { + eg.Go(func() error { var err error colMap, err = mysqld.getPrimaryKeyColumns(ctx, dbName, tableNames...) if err != nil { allErrors.RecordError(err) cancel() - return + return err } - }() + return nil + }) } - wg.Wait() + eg.Wait() if err := allErrors.AggrError(vterrors.Aggregate); err != nil { return nil, err } From 1910e77d4ba09e85d3e850a1d774febe8d0662d8 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 26 Jul 2023 09:50:38 +0300 Subject: [PATCH 2/2] clarifying the use of '20' as a concurrency limit Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/schema.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/go/vt/mysqlctl/schema.go b/go/vt/mysqlctl/schema.go index 4bf7f22a430..4097fdc6ac8 100644 --- a/go/vt/mysqlctl/schema.go +++ b/go/vt/mysqlctl/schema.go @@ -39,6 +39,11 @@ import ( ) const ( + // In a local environment and without latency, we have seen that an unbounded concurrency still translates to less than + // 20 concurrent MySQL connections. Which is why placing a limit of 20 concurrent goroutines (each mapped to a MySQL connection) + // is unlikely to affect optimal environments. + // In high latency environments, unbounded concurrency can translate to a very high number of concurrent MySQL connections. This + // is an undesirable behavior. We prefer to push back on GetSchema and make it run over longer time, instead. getSchemaConcurrency = 20 )