From 2399ee19c69490dd0dd6b8830f4e0712e378e981 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 13 May 2021 15:11:40 +0200 Subject: [PATCH 1/9] Refactor PKInfoMap to get info for all columns in the table as ColInfoMap as a precursor to adding Extra info for detecting generated columns Signed-off-by: Rohit Nayak --- .../vreplication/replicator_plan.go | 12 ++-- .../vreplication/replicator_plan_test.go | 16 ++--- .../vreplication/table_plan_builder.go | 67 ++++++++--------- .../tabletmanager/vreplication/vcopier.go | 4 +- .../tabletmanager/vreplication/vplayer.go | 2 +- .../tabletmanager/vreplication/vreplicator.go | 72 ++++++++++--------- 6 files changed, 90 insertions(+), 83 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 672c40c56bc..385e90c0f23 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -53,7 +53,7 @@ type ReplicatorPlan struct { VStreamFilter *binlogdatapb.Filter TargetTables map[string]*TablePlan TablePlans map[string]*TablePlan - PKInfoMap map[string][]*PrimaryKeyInfo + ColInfoMap map[string][]*ColumnInfo stats *binlogplayer.Stats } @@ -94,10 +94,10 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent // requires us to wait for the field info sent by the source. func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Result, fields []*querypb.Field) (*TablePlan, error) { tpb := &tablePlanBuilder{ - name: sqlparser.NewTableIdent(tableName), - lastpk: lastpk, - pkInfos: rp.PKInfoMap[tableName], - stats: rp.stats, + name: sqlparser.NewTableIdent(tableName), + lastpk: lastpk, + columnInfos: rp.ColInfoMap[tableName], + stats: rp.stats, } for _, field := range fields { colName := sqlparser.NewColIdent(field.Name) @@ -114,7 +114,7 @@ func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Res tpb.colExprs = append(tpb.colExprs, cexpr) } // The following actions are a subset of buildTablePlan. - if err := tpb.analyzePK(rp.PKInfoMap); err != nil { + if err := tpb.analyzePK(rp.ColInfoMap); err != nil { return nil, err } return tpb.generate(), nil diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go index 6297ac0652b..fc1736dc5bc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go @@ -669,8 +669,8 @@ func TestBuildPlayerPlan(t *testing.T) { err: "group by expression is not allowed to reference an aggregate expression: a", }} - PrimaryKeyInfos := map[string][]*PrimaryKeyInfo{ - "t1": {&PrimaryKeyInfo{Name: "c1"}}, + PrimaryKeyInfos := map[string][]*ColumnInfo{ + "t1": {&ColumnInfo{Name: "c1", IsPK: true}}, } copyState := map[string]*sqltypes.Result{ @@ -711,9 +711,9 @@ func TestBuildPlayerPlan(t *testing.T) { } func TestBuildPlayerPlanNoDup(t *testing.T) { - PrimaryKeyInfos := map[string][]*PrimaryKeyInfo{ - "t1": {&PrimaryKeyInfo{Name: "c1"}}, - "t2": {&PrimaryKeyInfo{Name: "c2"}}, + PrimaryKeyInfos := map[string][]*ColumnInfo{ + "t1": {&ColumnInfo{Name: "c1"}}, + "t2": {&ColumnInfo{Name: "c2"}}, } input := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -732,9 +732,9 @@ func TestBuildPlayerPlanNoDup(t *testing.T) { } func TestBuildPlayerPlanExclude(t *testing.T) { - PrimaryKeyInfos := map[string][]*PrimaryKeyInfo{ - "t1": {&PrimaryKeyInfo{Name: "c1"}}, - "t2": {&PrimaryKeyInfo{Name: "c2"}}, + PrimaryKeyInfos := map[string][]*ColumnInfo{ + "t1": {&ColumnInfo{Name: "c1"}}, + "t2": {&ColumnInfo{Name: "c2"}}, } input := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 66c57ca09d0..8ec80bc7a25 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -47,13 +47,13 @@ type tablePlanBuilder struct { // selColumns keeps track of the columns we want to pull from source. // If Lastpk is set, we compare this list against the table's pk and // add missing references. - selColumns map[string]bool - colExprs []*colExpr - onInsert insertType - pkCols []*colExpr - lastpk *sqltypes.Result - pkInfos []*PrimaryKeyInfo - stats *binlogplayer.Stats + selColumns map[string]bool + colExprs []*colExpr + onInsert insertType + pkCols []*colExpr + lastpk *sqltypes.Result + columnInfos []*ColumnInfo + stats *binlogplayer.Stats } // colExpr describes the processing to be performed to @@ -112,7 +112,7 @@ const ( // a table-specific rule is built to be sent to the source. We don't send the // original rule to the source because it may not match the same tables as the // target. -// pkInfoMap specifies the list of primary key columns for each table. +// colInfoMap specifies the list of primary key columns for each table. // copyState is a map of tables that have not been fully copied yet. // If a table is not present in copyState, then it has been fully copied. If so, // all replication events are applied. The table still has to match a Filter.Rule. @@ -123,15 +123,15 @@ const ( // The TablePlan built is a partial plan. The full plan for a table is built // when we receive field information from events or rows sent by the source. // buildExecutionPlan is the function that builds the full plan. -func buildReplicatorPlan(filter *binlogdatapb.Filter, pkInfoMap map[string][]*PrimaryKeyInfo, copyState map[string]*sqltypes.Result, stats *binlogplayer.Stats) (*ReplicatorPlan, error) { +func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*ColumnInfo, copyState map[string]*sqltypes.Result, stats *binlogplayer.Stats) (*ReplicatorPlan, error) { plan := &ReplicatorPlan{ VStreamFilter: &binlogdatapb.Filter{FieldEventMode: filter.FieldEventMode}, TargetTables: make(map[string]*TablePlan), TablePlans: make(map[string]*TablePlan), - PKInfoMap: pkInfoMap, + ColInfoMap: colInfoMap, stats: stats, } - for tableName := range pkInfoMap { + for tableName := range colInfoMap { lastpk, ok := copyState[tableName] if ok && lastpk == nil { // Don't replicate uncopied tables. @@ -144,7 +144,7 @@ func buildReplicatorPlan(filter *binlogdatapb.Filter, pkInfoMap map[string][]*Pr if rule == nil { continue } - tablePlan, err := buildTablePlan(tableName, rule.Filter, pkInfoMap, lastpk, stats) + tablePlan, err := buildTablePlan(tableName, rule.Filter, colInfoMap, lastpk, stats) if err != nil { return nil, err } @@ -183,7 +183,7 @@ func MatchTable(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Ru return nil, nil } -func buildTablePlan(tableName, filter string, pkInfoMap map[string][]*PrimaryKeyInfo, lastpk *sqltypes.Result, stats *binlogplayer.Stats) (*TablePlan, error) { +func buildTablePlan(tableName, filter string, colInfoMap map[string][]*ColumnInfo, lastpk *sqltypes.Result, stats *binlogplayer.Stats) (*TablePlan, error) { query := filter // generate equivalent select statement if filter is empty or a keyrange. switch { @@ -231,10 +231,10 @@ func buildTablePlan(tableName, filter string, pkInfoMap map[string][]*PrimaryKey From: sel.From, Where: sel.Where, }, - selColumns: make(map[string]bool), - lastpk: lastpk, - pkInfos: pkInfoMap[tableName], - stats: stats, + selColumns: make(map[string]bool), + lastpk: lastpk, + columnInfos: colInfoMap[tableName], + stats: stats, } if err := tpb.analyzeExprs(sel.SelectExprs); err != nil { @@ -255,7 +255,7 @@ func buildTablePlan(tableName, filter string, pkInfoMap map[string][]*PrimaryKey if err := tpb.analyzeGroupBy(sel.GroupBy); err != nil { return nil, err } - if err := tpb.analyzePK(pkInfoMap); err != nil { + if err := tpb.analyzePK(colInfoMap); err != nil { return nil, err } @@ -475,22 +475,25 @@ func (tpb *tablePlanBuilder) analyzeGroupBy(groupBy sqlparser.GroupBy) error { } // analyzePK builds tpb.pkCols. -func (tpb *tablePlanBuilder) analyzePK(pkInfoMap map[string][]*PrimaryKeyInfo) error { - pkcols, ok := pkInfoMap[tpb.name.String()] +func (tpb *tablePlanBuilder) analyzePK(colInfoMap map[string][]*ColumnInfo) error { + cols, ok := colInfoMap[tpb.name.String()] if !ok { return fmt.Errorf("table %s not found in schema", tpb.name) } - for _, pkcol := range pkcols { - cexpr := tpb.findCol(sqlparser.NewColIdent(pkcol.Name)) + for _, col := range cols { + if !col.IsPK { + continue + } + cexpr := tpb.findCol(sqlparser.NewColIdent(col.Name)) if cexpr == nil { - return fmt.Errorf("primary key column %v not found in select list", pkcol) + return fmt.Errorf("primary key column %v not found in select list", col) } if cexpr.operation != opExpr { - return fmt.Errorf("primary key column %v is not allowed to reference an aggregate expression", pkcol) + return fmt.Errorf("primary key column %v is not allowed to reference an aggregate expression", col) } cexpr.isPK = true - cexpr.dataType = pkcol.DataType - cexpr.columnType = pkcol.ColumnType + cexpr.dataType = col.DataType + cexpr.columnType = col.ColumnType tpb.pkCols = append(tpb.pkCols, cexpr) } return nil @@ -708,13 +711,13 @@ func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bi } func (tpb *tablePlanBuilder) getCharsetAndCollation(pkname string) (charSet string, collation string) { - for _, pkInfo := range tpb.pkInfos { - if strings.EqualFold(pkInfo.Name, pkname) { - if pkInfo.CharSet != "" { - charSet = fmt.Sprintf(" _%s ", pkInfo.CharSet) + for _, colInfo := range tpb.columnInfos { + if colInfo.IsPK && strings.EqualFold(colInfo.Name, pkname) { + if colInfo.CharSet != "" { + charSet = fmt.Sprintf(" _%s ", colInfo.CharSet) } - if pkInfo.Collation != "" { - collation = fmt.Sprintf(" COLLATE %s ", pkInfo.Collation) + if colInfo.Collation != "" { + collation = fmt.Sprintf(" COLLATE %s ", colInfo.Collation) } } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index a16cb1ba528..7975dc3c05c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -57,7 +57,7 @@ func newVCopier(vr *vreplicator) *vcopier { func (vc *vcopier) initTablesForCopy(ctx context.Context) error { defer vc.vr.dbClient.Rollback() - plan, err := buildReplicatorPlan(vc.vr.source.Filter, vc.vr.pkInfoMap, nil, vc.vr.stats) + plan, err := buildReplicatorPlan(vc.vr.source.Filter, vc.vr.colInfoMap, nil, vc.vr.stats) if err != nil { return err } @@ -200,7 +200,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma log.Infof("Copying table %s, lastpk: %v", tableName, copyState[tableName]) - plan, err := buildReplicatorPlan(vc.vr.source.Filter, vc.vr.pkInfoMap, nil, vc.vr.stats) + plan, err := buildReplicatorPlan(vc.vr.source.Filter, vc.vr.colInfoMap, nil, vc.vr.stats) if err != nil { return err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 9933d0db7d5..c62217429c9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -105,7 +105,7 @@ func (vp *vplayer) play(ctx context.Context) error { return nil } - plan, err := buildReplicatorPlan(vp.vr.source.Filter, vp.vr.pkInfoMap, vp.copyState, vp.vr.stats) + plan, err := buildReplicatorPlan(vp.vr.source.Filter, vp.vr.colInfoMap, vp.copyState, vp.vr.stats) if err != nil { vp.vr.stats.ErrorCounts.Add([]string{"Plan"}, 1) return err diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 9a9e8de1ff6..28cfadb672b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -78,8 +78,8 @@ type vreplicator struct { state string stats *binlogplayer.Stats // mysqld is used to fetch the local schema. - mysqld mysqlctl.MysqlDaemon - pkInfoMap map[string][]*PrimaryKeyInfo + mysqld mysqlctl.MysqlDaemon + colInfoMap map[string][]*ColumnInfo originalFKCheckSetting int64 } @@ -154,11 +154,11 @@ func (vr *vreplicator) Replicate(ctx context.Context) error { } func (vr *vreplicator) replicate(ctx context.Context) error { - pkInfo, err := vr.buildPkInfoMap(ctx) + colInfo, err := vr.buildColInfoMap(ctx) if err != nil { return err } - vr.pkInfoMap = pkInfo + vr.colInfoMap = colInfo if err := vr.getSettingFKCheck(); err != nil { return err } @@ -224,22 +224,23 @@ func (vr *vreplicator) replicate(ctx context.Context) error { } } -// PrimaryKeyInfo is used to store charset and collation for primary keys where applicable -type PrimaryKeyInfo struct { +// ColumnInfo is used to store charset and collation for primary keys where applicable +type ColumnInfo struct { Name string CharSet string Collation string DataType string ColumnType string + IsPK bool } -func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*PrimaryKeyInfo, error) { +func (vr *vreplicator) buildColInfoMap(ctx context.Context) (map[string][]*ColumnInfo, error) { schema, err := vr.mysqld.GetSchema(ctx, vr.dbClient.DBName(), []string{"/.*/"}, nil, false) if err != nil { return nil, err } queryTemplate := "select character_set_name, collation_name, column_name, data_type, column_type from information_schema.columns where table_schema=%s and table_name=%s;" - pkInfoMap := make(map[string][]*PrimaryKeyInfo) + colInfoMap := make(map[string][]*ColumnInfo) for _, td := range schema.TableDefinitions { query := fmt.Sprintf(queryTemplate, encodeString(vr.dbClient.DBName()), encodeString(td.Name)) @@ -257,47 +258,50 @@ func (vr *vreplicator) buildPkInfoMap(ctx context.Context) (map[string][]*Primar } else { pks = td.Columns } - var pkInfos []*PrimaryKeyInfo - for _, pk := range pks { + var colInfo []*ColumnInfo + for _, row := range qr.Rows { charSet := "" collation := "" + columnName := "" + isPK := false var dataType, columnType string - for _, row := range qr.Rows { - columnName := row[2].ToString() - if strings.EqualFold(columnName, pk) { - var currentField *querypb.Field - for _, field := range td.Fields { - if field.Name == pk { - currentField = field - break - } - } - if currentField == nil { - continue - } - dataType = row[3].ToString() - columnType = row[4].ToString() - if sqltypes.IsText(currentField.Type) { - charSet = row[0].ToString() - collation = row[1].ToString() - } + columnName = row[2].ToString() + var currentField *querypb.Field + for _, field := range td.Fields { + if field.Name == columnName { + currentField = field break } } + if currentField == nil { + continue + } + dataType = row[3].ToString() + columnType = row[4].ToString() + if sqltypes.IsText(currentField.Type) { + charSet = row[0].ToString() + collation = row[1].ToString() + } if dataType == "" || columnType == "" { - return nil, fmt.Errorf("no dataType/columnType found in information_schema.columns for table %s, column %s", td.Name, pk) + return nil, fmt.Errorf("no dataType/columnType found in information_schema.columns for table %s, column %s", td.Name, columnName) + } + for _, pk := range pks { + if columnName == pk { + isPK = true + } } - pkInfos = append(pkInfos, &PrimaryKeyInfo{ - Name: pk, + colInfo = append(colInfo, &ColumnInfo{ + Name: columnName, CharSet: charSet, Collation: collation, DataType: dataType, ColumnType: columnType, + IsPK: isPK, }) } - pkInfoMap[td.Name] = pkInfos + colInfoMap[td.Name] = colInfo } - return pkInfoMap, nil + return colInfoMap, nil } func (vr *vreplicator) readSettings(ctx context.Context) (settings binlogplayer.VRSettings, numTablesToCopy int64, err error) { From 32d438eec7572be6c00c5aa7a5fdd5b5a3986973 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 13 May 2021 21:26:05 +0200 Subject: [PATCH 2/9] Ignore inserting/updating generated columns on the target. Add e2e tests to test generated columns in source as well as target for reshard and materialize Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/config.go | 6 ++-- .../vreplication/unsharded_init_data.sql | 6 ++-- .../vreplication/replicator_plan.go | 13 ++++++++ .../vreplication/table_plan_builder.go | 26 +++++++++++++++ .../tabletmanager/vreplication/vreplicator.go | 33 +++++++++++-------- 5 files changed, 65 insertions(+), 19 deletions(-) diff --git a/go/test/endtoend/vreplication/config.go b/go/test/endtoend/vreplication/config.go index 214876a2050..937d27bf904 100644 --- a/go/test/endtoend/vreplication/config.go +++ b/go/test/endtoend/vreplication/config.go @@ -6,7 +6,7 @@ create table product(pid int, description varbinary(128), primary key(pid)); create table customer(cid int, name varbinary(128), meta json default null, typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid)) CHARSET=utf8mb4; create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; create table merchant(mname varchar(128), category varchar(128), primary key(mname)) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; -create table orders(oid int, cid int, pid int, mname varchar(128), price int, primary key(oid)); +create table orders(oid int, cid int, pid int, mname varchar(128), price int, qty int, total int as (qty * price), total2 int as (qty * price) stored, primary key(oid)); create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; create table customer2(cid int, name varbinary(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid)); create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; @@ -243,8 +243,8 @@ create table tenant(tenant_id binary(16), name varbinary(16), primary key (tenan "targetKeyspace": "merchant", "tableSettings": [{ "targetTable": "morders", - "sourceExpression": "select * from orders", - "create_ddl": "create table morders(oid int, cid int, mname varchar(128), pid int, price int, primary key(oid))" + "sourceExpression": "select oid, cid, mname, pid, price, qty, total from orders", + "create_ddl": "create table morders(oid int, cid int, mname varchar(128), pid int, price int, qty int, total int, total2 int as (10 * total), primary key(oid))" }] } ` diff --git a/go/test/endtoend/vreplication/unsharded_init_data.sql b/go/test/endtoend/vreplication/unsharded_init_data.sql index 1b58404cfb7..4dd20072436 100644 --- a/go/test/endtoend/vreplication/unsharded_init_data.sql +++ b/go/test/endtoend/vreplication/unsharded_init_data.sql @@ -5,9 +5,9 @@ insert into merchant(mname, category) values('monoprice', 'electronics'); insert into merchant(mname, category) values('newegg', 'electronics'); insert into product(pid, description) values(1, 'keyboard'); insert into product(pid, description) values(2, 'monitor'); -insert into orders(oid, cid, mname, pid, price) values(1, 1, 'monoprice', 1, 10); -insert into orders(oid, cid, mname, pid, price) values(2, 1, 'newegg', 2, 15); -insert into orders(oid, cid, mname, pid, price) values(3, 2, 'monoprice', 2, 20); +insert into orders(oid, cid, mname, pid, price, qty) values(1, 1, 'monoprice', 1, 10, 1); +insert into orders(oid, cid, mname, pid, price, qty) values(2, 1, 'newegg', 2, 15, 2); +insert into orders(oid, cid, mname, pid, price, qty) values(3, 2, 'monoprice', 2, 20, 3); insert into customer2(cid, name, typ, sport) values(1, 'john',1,'football,baseball'); insert into customer2(cid, name, typ, sport) values(2, 'paul','soho','cricket'); insert into customer2(cid, name, typ, sport) values(3, 'ringo','enterprise',''); diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 385e90c0f23..1eacdf2ad60 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -101,6 +101,19 @@ func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Res } for _, field := range fields { colName := sqlparser.NewColIdent(field.Name) + isGenerated := false + for _, colInfo := range tpb.columnInfos { + if !strings.EqualFold(colInfo.Name, field.Name) { + continue + } + if colInfo.IsGenerated { + isGenerated = true + } + break + } + if isGenerated { + continue + } cexpr := &colExpr{ colName: colName, colType: field.Type, diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 8ec80bc7a25..cce2d933526 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -535,6 +535,9 @@ func (tpb *tablePlanBuilder) generateInsertPart(buf *sqlparser.TrackedBuffer) *s } separator := "" for _, cexpr := range tpb.colExprs { + if tpb.isColumnGenerated(cexpr.colName) { + continue + } buf.Myprintf("%s%v", separator, cexpr.colName) separator = "," } @@ -546,6 +549,9 @@ func (tpb *tablePlanBuilder) generateValuesPart(buf *sqlparser.TrackedBuffer, bv bvf.mode = bvAfter separator := "(" for _, cexpr := range tpb.colExprs { + if tpb.isColumnGenerated(cexpr.colName) { + continue + } buf.Myprintf("%s", separator) separator = "," switch cexpr.operation { @@ -571,6 +577,9 @@ func (tpb *tablePlanBuilder) generateSelectPart(buf *sqlparser.TrackedBuffer, bv buf.WriteString(" select ") separator := "" for _, cexpr := range tpb.colExprs { + if tpb.isColumnGenerated(cexpr.colName) { + continue + } buf.Myprintf("%s", separator) separator = ", " switch cexpr.operation { @@ -604,6 +613,9 @@ func (tpb *tablePlanBuilder) generateOnDupPart(buf *sqlparser.TrackedBuffer) *sq if cexpr.isGrouped || cexpr.isPK { continue } + if tpb.isColumnGenerated(cexpr.colName) { + continue + } buf.Myprintf("%s%v=", separator, cexpr.colName) separator = ", " switch cexpr.operation { @@ -631,6 +643,9 @@ func (tpb *tablePlanBuilder) generateUpdateStatement() *sqlparser.ParsedQuery { if cexpr.isGrouped || cexpr.isPK { continue } + if tpb.isColumnGenerated(cexpr.colName) { + continue + } buf.Myprintf("%s%v=", separator, cexpr.colName) separator = ", " switch cexpr.operation { @@ -748,6 +763,17 @@ func (tpb *tablePlanBuilder) generatePKConstraint(buf *sqlparser.TrackedBuffer, buf.WriteString(")") } +func (tpb *tablePlanBuilder) isColumnGenerated(col sqlparser.ColIdent) bool { + isGenerated := false + for _, colInfo := range tpb.columnInfos { + if col.EqualString(colInfo.Name) && colInfo.IsGenerated { + isGenerated = true + break + } + } + return isGenerated +} + // bindvarFormatter is a dual mode formatter. Its behavior // can be changed dynamically changed to generate bind vars // for the 'before' row or 'after' row by setting its mode diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 28cfadb672b..cd827ba26e1 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -226,12 +226,13 @@ func (vr *vreplicator) replicate(ctx context.Context) error { // ColumnInfo is used to store charset and collation for primary keys where applicable type ColumnInfo struct { - Name string - CharSet string - Collation string - DataType string - ColumnType string - IsPK bool + Name string + CharSet string + Collation string + DataType string + ColumnType string + IsPK bool + IsGenerated bool } func (vr *vreplicator) buildColInfoMap(ctx context.Context) (map[string][]*ColumnInfo, error) { @@ -239,7 +240,7 @@ func (vr *vreplicator) buildColInfoMap(ctx context.Context) (map[string][]*Colum if err != nil { return nil, err } - queryTemplate := "select character_set_name, collation_name, column_name, data_type, column_type from information_schema.columns where table_schema=%s and table_name=%s;" + queryTemplate := "select character_set_name, collation_name, column_name, data_type, column_type, extra from information_schema.columns where table_schema=%s and table_name=%s;" colInfoMap := make(map[string][]*ColumnInfo) for _, td := range schema.TableDefinitions { @@ -264,6 +265,7 @@ func (vr *vreplicator) buildColInfoMap(ctx context.Context) (map[string][]*Colum collation := "" columnName := "" isPK := false + isGenerated := false var dataType, columnType string columnName = row[2].ToString() var currentField *querypb.Field @@ -290,13 +292,18 @@ func (vr *vreplicator) buildColInfoMap(ctx context.Context) (map[string][]*Colum isPK = true } } + extra := row[5].ToString() + if strings.Contains(strings.ToLower(extra), "generated") { + isGenerated = true + } colInfo = append(colInfo, &ColumnInfo{ - Name: columnName, - CharSet: charSet, - Collation: collation, - DataType: dataType, - ColumnType: columnType, - IsPK: isPK, + Name: columnName, + CharSet: charSet, + Collation: collation, + DataType: dataType, + ColumnType: columnType, + IsPK: isPK, + IsGenerated: isGenerated, }) } colInfoMap[td.Name] = colInfo From 8e4c944f32de3d685bc66631185b7f01aae71488 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 14 May 2021 22:14:19 +0200 Subject: [PATCH 3/9] Update logic to ignore generated columns for additional use cases Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/cluster.go | 5 ++ go/vt/sqlparser/parsed_query.go | 46 ++++++++--- .../vreplication/replicator_plan.go | 13 +-- .../vreplication/table_plan_builder.go | 34 +++++--- .../vreplication/vcopier_test.go | 81 +++++++++++++++++++ 5 files changed, 151 insertions(+), 28 deletions(-) diff --git a/go/test/endtoend/vreplication/cluster.go b/go/test/endtoend/vreplication/cluster.go index e9041cca287..8c90f3f9b41 100644 --- a/go/test/endtoend/vreplication/cluster.go +++ b/go/test/endtoend/vreplication/cluster.go @@ -127,6 +127,11 @@ func getClusterConfig(idx int, dataRootDir string) *ClusterConfig { } func init() { + // for local debugging set this variable so that each run uses VTDATAROOT instead of a random dir + // and also does not teardown the cluster for inspecting logs and the databases + if os.Getenv("VREPLICATION_E2E_DEBUG") != "" { + debug = true + } rand.Seed(time.Now().UTC().UnixNano()) originalVtdataroot = os.Getenv("VTDATAROOT") var mainVtDataRoot string diff --git a/go/vt/sqlparser/parsed_query.go b/go/vt/sqlparser/parsed_query.go index 5ce0581e5a2..7651c4bc481 100644 --- a/go/vt/sqlparser/parsed_query.go +++ b/go/vt/sqlparser/parsed_query.go @@ -32,7 +32,7 @@ import ( ) // ParsedQuery represents a parsed query where -// bind locations are precompued for fast substitutions. +// bind locations are precomputed for fast substitutions. type ParsedQuery struct { Query string bindLocations []bindLocation @@ -86,29 +86,57 @@ func (pq *ParsedQuery) Append(buf *strings.Builder, bindVariables map[string]*qu } // AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that -// the fields in the row are in the same order as the placeholders in this query. -func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field, row *querypb.Row) error { +// the fields in the row are in the same order as the placeholders in this query. The fields might include generated +// columns which are dropped, by checking against skipFields, before binding the variables +// note: there can be more fields than bind locations since extra columns might be requested from the source if not all +// primary keys columns are not in the select list, for example. Also some values in the row may not correspond for +// values from the database on the source: sum/count for aggregation queries, for example +func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field, row *querypb.Row, skipFields map[string]bool) error { if len(fields) < len(pq.bindLocations) { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ", len(fields), len(pq.bindLocations)) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ", + len(fields), len(pq.bindLocations)) } + + type colInfo struct { + typ querypb.Type + length int64 + offset int64 + } + rowInfo := make([]*colInfo, 0) + + offset := int64(0) + for i, field := range fields { // collect info required for fields to be bound + length := row.Lengths[i] + if !skipFields[strings.ToLower(field.Name)] { + rowInfo = append(rowInfo, &colInfo{ + typ: field.Type, + length: length, + offset: offset, + }) + } + if length > 0 { + offset += row.Lengths[i] + } + } + + // bind field values to locations var offsetQuery int - var offsetRow int64 for i, loc := range pq.bindLocations { + col := rowInfo[i] buf.WriteString(pq.Query[offsetQuery:loc.offset]) - typ := fields[i].Type + typ := col.typ if typ == querypb.Type_TUPLE { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i) } - length := row.Lengths[i] + length := col.length if length < 0 { // -1 means a null variable; serialize it directly buf.WriteString("null") } else { - vv := sqltypes.MakeTrusted(typ, row.Values[offsetRow:offsetRow+length]) + vv := sqltypes.MakeTrusted(typ, row.Values[col.offset:col.offset+col.length]) vv.EncodeSQLBytes2(buf) - offsetRow += length } offsetQuery = loc.offset + loc.length diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 1eacdf2ad60..9558533decd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -94,15 +94,15 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent // requires us to wait for the field info sent by the source. func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Result, fields []*querypb.Field) (*TablePlan, error) { tpb := &tablePlanBuilder{ - name: sqlparser.NewTableIdent(tableName), - lastpk: lastpk, - columnInfos: rp.ColInfoMap[tableName], - stats: rp.stats, + name: sqlparser.NewTableIdent(tableName), + lastpk: lastpk, + colInfos: rp.ColInfoMap[tableName], + stats: rp.stats, } for _, field := range fields { colName := sqlparser.NewColIdent(field.Name) isGenerated := false - for _, colInfo := range tpb.columnInfos { + for _, colInfo := range tpb.colInfos { if !strings.EqualFold(colInfo.Name, field.Name) { continue } @@ -196,6 +196,7 @@ type TablePlan struct { // a primary key column (row move). PKReferences []string Stats *binlogplayer.Stats + FieldsToSkip map[string]bool } // MarshalJSON performs a custom JSON Marshalling. @@ -233,7 +234,7 @@ func (tp *TablePlan) applyBulkInsert(sqlbuffer *bytes2.Buffer, rows *binlogdatap if i > 0 { sqlbuffer.WriteString(", ") } - if err := tp.BulkInsertValues.AppendFromRow(sqlbuffer, tp.Fields, row); err != nil { + if err := tp.BulkInsertValues.AppendFromRow(sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil { return nil, err } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index cce2d933526..38b69f1060e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -47,13 +47,13 @@ type tablePlanBuilder struct { // selColumns keeps track of the columns we want to pull from source. // If Lastpk is set, we compare this list against the table's pk and // add missing references. - selColumns map[string]bool - colExprs []*colExpr - onInsert insertType - pkCols []*colExpr - lastpk *sqltypes.Result - columnInfos []*ColumnInfo - stats *binlogplayer.Stats + selColumns map[string]bool + colExprs []*colExpr + onInsert insertType + pkCols []*colExpr + lastpk *sqltypes.Result + colInfos []*ColumnInfo + stats *binlogplayer.Stats } // colExpr describes the processing to be performed to @@ -231,10 +231,10 @@ func buildTablePlan(tableName, filter string, colInfoMap map[string][]*ColumnInf From: sel.From, Where: sel.Where, }, - selColumns: make(map[string]bool), - lastpk: lastpk, - columnInfos: colInfoMap[tableName], - stats: stats, + selColumns: make(map[string]bool), + lastpk: lastpk, + colInfos: colInfoMap[tableName], + stats: stats, } if err := tpb.analyzeExprs(sel.SelectExprs); err != nil { @@ -295,6 +295,13 @@ func (tpb *tablePlanBuilder) generate() *TablePlan { bvf := &bindvarFormatter{} + fieldsToSkip := make(map[string]bool) + for _, colInfo := range tpb.colInfos { + if colInfo.IsGenerated { + fieldsToSkip[colInfo.Name] = true + } + } + return &TablePlan{ TargetName: tpb.name.String(), Lastpk: tpb.lastpk, @@ -306,6 +313,7 @@ func (tpb *tablePlanBuilder) generate() *TablePlan { Delete: tpb.generateDeleteStatement(), PKReferences: pkrefs, Stats: tpb.stats, + FieldsToSkip: fieldsToSkip, } } @@ -726,7 +734,7 @@ func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bi } func (tpb *tablePlanBuilder) getCharsetAndCollation(pkname string) (charSet string, collation string) { - for _, colInfo := range tpb.columnInfos { + for _, colInfo := range tpb.colInfos { if colInfo.IsPK && strings.EqualFold(colInfo.Name, pkname) { if colInfo.CharSet != "" { charSet = fmt.Sprintf(" _%s ", colInfo.CharSet) @@ -765,7 +773,7 @@ func (tpb *tablePlanBuilder) generatePKConstraint(buf *sqlparser.TrackedBuffer, func (tpb *tablePlanBuilder) isColumnGenerated(col sqlparser.ColIdent) bool { isGenerated := false - for _, colInfo := range tpb.columnInfos { + for _, colInfo := range tpb.colInfos { if col.EqualString(colInfo.Name) && colInfo.IsGenerated { isGenerated = true break diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 326f9386bb0..d2f61982222 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -1280,3 +1280,84 @@ func TestPlayerCopyTableCancel(t *testing.T) { {"2", "bbb"}, }) } + +func TestPlayerCopyTablesWithGeneratedColumn(t *testing.T) { + flavor := strings.ToLower(env.Flavor) + // Disable tests on percona (which identifies as mysql56) and mariadb platforms in CI since they + // generated columns support was added in 5.7 and mariadb added mysql compatible generated columns in 10.2 + if strings.Contains(flavor, "56") || strings.Contains(flavor, "maria") { + return + } + defer deleteTablet(addTablet(100)) + + execStatements(t, []string{ + "create table src1(id int, val varbinary(128), val2 varbinary(128) as (concat(id, val)), val3 varbinary(128) as (concat(val, id)), id2 int, primary key(id))", + "insert into src1(id, val, id2) values(2, 'bbb', 20), (1, 'aaa', 10)", + fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), val2 varbinary(128) as (concat(id, val)), val3 varbinary(128), id2 int, primary key(id))", vrepldb), + "create table src2(id int, val varbinary(128), val2 varbinary(128) as (concat(id, val)), val3 varbinary(128) as (concat(val, id)), id2 int, primary key(id))", + "insert into src2(id, val, id2) values(2, 'bbb', 20), (1, 'aaa', 10)", + fmt.Sprintf("create table %s.dst2(val3 varbinary(128), val varbinary(128), id2 int)", vrepldb), + }) + defer execStatements(t, []string{ + "drop table src1", + fmt.Sprintf("drop table %s.dst1", vrepldb), + "drop table src2", + fmt.Sprintf("drop table %s.dst2", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "dst1", + Filter: "select * from src1", + }, { + Match: "dst2", + Filter: "select val3, val, id2 from src2", + }}, + } + + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + OnDdl: binlogdatapb.OnDDLAction_IGNORE, + } + query := binlogplayer.CreateVReplicationState("test", bls, "", binlogplayer.VReplicationInit, playerEngine.dbName) + qr, err := playerEngine.Exec(query) + if err != nil { + t.Fatal(err) + } + defer func() { + query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID) + if _, err := playerEngine.Exec(query); err != nil { + t.Fatal(err) + } + expectDeleteQueries(t) + }() + + expectNontxQueries(t, []string{ + // Create the list of tables to copy and transition to Copying state. + "/insert into _vt.vreplication", + "/update _vt.vreplication set message=", + "/insert into _vt.copy_state", + "/update _vt.vreplication set state", + // The first fast-forward has no starting point. So, it just saves the current position. + "insert into dst1(id,val,val3,id2) values (1,'aaa','aaa1',10), (2,'bbb','bbb2',20)", + `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, + // copy of dst1 is done: delete from copy_state. + "/delete from _vt.copy_state.*dst1", + "insert into dst2(val3,val,id2) values ('aaa1','aaa',10), ('bbb2','bbb',20)", + `/update _vt.copy_state set lastpk='fields: rows: ' where vrepl_id=.*`, + // copy of dst2 is done: delete from copy_state. + "/delete from _vt.copy_state.*dst2", + "/update _vt.vreplication set state", + }) + expectData(t, "dst1", [][]string{ + {"1", "aaa", "1aaa", "aaa1", "10"}, + {"2", "bbb", "2bbb", "bbb2", "20"}, + }) + expectData(t, "dst2", [][]string{ + {"aaa1", "aaa", "10"}, + {"bbb2", "bbb", "20"}, + }) +} From 46f534726d7e39efd31c07dab2df673aca8db465 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 15 May 2021 11:33:47 +0200 Subject: [PATCH 4/9] Add vstreamer test to confirm generated columns are sent in events Signed-off-by: Rohit Nayak Signed-off-by: Rohit Nayak --- .../vreplication/vcopier_test.go | 2 +- .../tabletserver/vstreamer/vstreamer_test.go | 48 +++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index d2f61982222..c6aed04523e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -1285,7 +1285,7 @@ func TestPlayerCopyTablesWithGeneratedColumn(t *testing.T) { flavor := strings.ToLower(env.Flavor) // Disable tests on percona (which identifies as mysql56) and mariadb platforms in CI since they // generated columns support was added in 5.7 and mariadb added mysql compatible generated columns in 10.2 - if strings.Contains(flavor, "56") || strings.Contains(flavor, "maria") { + if !strings.Contains(flavor, "mysql57") && !strings.Contains(flavor, "mysql80") { return } defer deleteTablet(addTablet(100)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index e30f4b82719..0fbff975571 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -1922,6 +1922,54 @@ func TestFilteredMultipleWhere(t *testing.T) { runCases(t, filter, testcases, "", nil) } +// TestGeneratedColumns just confirms that generated columns are sent in a vstream as expected +func TestGeneratedColumns(t *testing.T) { + flavor := strings.ToLower(env.Flavor) + // Disable tests on percona (which identifies as mysql56) and mariadb platforms in CI since they + // generated columns support was added in 5.7 and mariadb added mysql compatible generated columns in 10.2 + if !strings.Contains(flavor, "mysql57") && !strings.Contains(flavor, "mysql80") { + return + } + execStatements(t, []string{ + "create table t1(id int, val varbinary(6), val2 varbinary(6) as (concat(id, val)), val3 varbinary(6) as (concat(val, id)), id2 int, primary key(id))", + }) + defer execStatements(t, []string{ + "drop table t1", + }) + engine.se.Reload(context.Background()) + queries := []string{ + "begin", + "insert into t1(id, val, id2) values (1, 'aaa', 10)", + "insert into t1(id, val, id2) values (2, 'bbb', 20)", + "commit", + } + + fe := &TestFieldEvent{ + table: "t1", + db: "vttest", + cols: []*TestColumn{ + {name: "id", dataType: "INT32", colType: "", len: 11, charset: 63}, + {name: "val", dataType: "VARBINARY", colType: "", len: 6, charset: 63}, + {name: "val2", dataType: "VARBINARY", colType: "", len: 6, charset: 63}, + {name: "val3", dataType: "VARBINARY", colType: "", len: 6, charset: 63}, + {name: "id2", dataType: "INT32", colType: "", len: 11, charset: 63}, + }, + } + + testcases := []testcase{{ + input: queries, + output: [][]string{{ + `begin`, + fe.String(), + `type:ROW row_event: > > `, + `type:ROW row_event: > > `, + `gtid`, + `commit`, + }}, + }} + runCases(t, nil, testcases, "current", nil) +} + func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string, tablePK []*binlogdatapb.TableLastPK) { ctx, cancel := context.WithCancel(context.Background()) From 38de519945251360be5d818278e520277cd89411 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 16 May 2021 17:09:05 +0200 Subject: [PATCH 5/9] Add vplayer test for generated columns Signed-off-by: Rohit Nayak --- .../vreplication/vplayer_flaky_test.go | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index f5aaaea0228..630310a3a89 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -2618,6 +2618,89 @@ func TestVReplicationLogs(t *testing.T) { } } +func TestGeneratedColumns(t *testing.T) { + defer deleteTablet(addTablet(100)) + + execStatements(t, []string{ + "create table t1(id int, val varbinary(6), val2 varbinary(6) as (concat(id, val)), val3 varbinary(6) as (concat(val, id)), id2 int, primary key(id))", + fmt.Sprintf("create table %s.t1(id int, val varbinary(6), val2 varbinary(6) as (concat(id, val)), val3 varbinary(6), id2 int, primary key(id))", vrepldb), + "create table t2(id int, val varbinary(128), val2 varbinary(128) as (concat(id, val)) stored, val3 varbinary(128) as (concat(val, id)), id2 int, primary key(id))", + fmt.Sprintf("create table %s.t2(id int, val3 varbinary(128), val varbinary(128), id2 int, primary key(id))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table t1", + fmt.Sprintf("drop table %s.t1", vrepldb), + "drop table t2", + fmt.Sprintf("drop table %s.t2", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }, { + Match: "t2", + Filter: "select id, val3, val, id2 from t2", + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + OnDdl: binlogdatapb.OnDDLAction_IGNORE, + } + cancel, _ := startVReplication(t, bls, "") + defer cancel() + + testcases := []struct { + input string + output string + table string + data [][]string + }{{ + input: "insert into t1(id, val, id2) values (1, 'aaa', 10)", + output: "insert into t1(id,val,val3,id2) values (1,'aaa','aaa1',10)", + table: "t1", + data: [][]string{ + {"1", "aaa", "1aaa", "aaa1", "10"}, + }, + }, { + input: "update t1 set val = 'bbb', id2 = 11 where id = 1", + output: "update t1 set val='bbb', val3='bbb1', id2=11 where id=1", + table: "t1", + data: [][]string{ + {"1", "bbb", "1bbb", "bbb1", "11"}, + }, + }, { + input: "insert into t2(id, val, id2) values (1, 'aaa', 10)", + output: "insert into t2(id,val3,val,id2) values (1,'aaa1','aaa',10)", + table: "t2", + data: [][]string{ + {"1", "aaa1", "aaa", "10"}, + }, + }, { + input: "update t2 set val = 'bbb', id2 = 11 where id = 1", + output: "update t2 set val3='bbb1', val='bbb', id2=11 where id=1", + table: "t2", + data: [][]string{ + {"1", "bbb1", "bbb", "11"}, + }, + }} + + for _, tcases := range testcases { + execStatements(t, []string{tcases.input}) + output := []string{ + tcases.output, + } + expectNontxQueries(t, output) + time.Sleep(1 * time.Second) + if tcases.table != "" { + expectData(t, tcases.table, tcases.data) + } + } +} + func expectJSON(t *testing.T, table string, values [][]string, id int, exec func(ctx context.Context, query string) (*sqltypes.Result, error)) { t.Helper() From 7789c30528deac4371a8a393d16d03c2d4f51c3a Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 16 May 2021 17:30:33 +0200 Subject: [PATCH 6/9] Add vplayer test for generated columns Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/vplayer_flaky_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 630310a3a89..bf4d20a98f3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -2619,6 +2619,12 @@ func TestVReplicationLogs(t *testing.T) { } func TestGeneratedColumns(t *testing.T) { + flavor := strings.ToLower(env.Flavor) + // Disable tests on percona (which identifies as mysql56) and mariadb platforms in CI since they + // generated columns support was added in 5.7 and mariadb added mysql compatible generated columns in 10.2 + if !strings.Contains(flavor, "mysql57") && !strings.Contains(flavor, "mysql80") { + return + } defer deleteTablet(addTablet(100)) execStatements(t, []string{ @@ -2694,7 +2700,6 @@ func TestGeneratedColumns(t *testing.T) { tcases.output, } expectNontxQueries(t, output) - time.Sleep(1 * time.Second) if tcases.table != "" { expectData(t, tcases.table, tcases.data) } From 6d9e3cbe5eea10bb4f361f8a746d1de3bbdfb6e8 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 7 Jun 2021 11:16:07 +0200 Subject: [PATCH 7/9] Improve comment Signed-off-by: Rohit Nayak --- go/vt/sqlparser/parsed_query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/sqlparser/parsed_query.go b/go/vt/sqlparser/parsed_query.go index 7651c4bc481..1fff8b37c3d 100644 --- a/go/vt/sqlparser/parsed_query.go +++ b/go/vt/sqlparser/parsed_query.go @@ -89,7 +89,7 @@ func (pq *ParsedQuery) Append(buf *strings.Builder, bindVariables map[string]*qu // the fields in the row are in the same order as the placeholders in this query. The fields might include generated // columns which are dropped, by checking against skipFields, before binding the variables // note: there can be more fields than bind locations since extra columns might be requested from the source if not all -// primary keys columns are not in the select list, for example. Also some values in the row may not correspond for +// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for // values from the database on the source: sum/count for aggregation queries, for example func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field, row *querypb.Row, skipFields map[string]bool) error { if len(fields) < len(pq.bindLocations) { From d0714c7721e76cf021cf23a9b45d480079b8875e Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 7 Jun 2021 14:40:37 +0200 Subject: [PATCH 8/9] Fix merge Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/helper.go | 4 ++-- go/test/endtoend/vreplication/vreplication_test_env.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/vreplication/helper.go b/go/test/endtoend/vreplication/helper.go index 15726e2dd36..01a80b27982 100644 --- a/go/test/endtoend/vreplication/helper.go +++ b/go/test/endtoend/vreplication/helper.go @@ -189,11 +189,11 @@ func validateDryRunResults(t *testing.T, output string, want []string) { } if !match { fail = true - t.Logf("want %s, got %s\n", w, gotDryRun[i]) + t.Fatalf("want %s, got %s\n", w, gotDryRun[i]) } } if fail { - t.Fatal("Dry run results don't match") + t.Fatalf("Dry run results don't match, want %s, got %s", want, gotDryRun) } } diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index 240649ce7ba..244025b83de 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -49,10 +49,10 @@ var dryRunResultsReadCustomerShard = []string{ var dryRunResultsSwitchWritesM2m3 = []string{ "Lock keyspace merchant", "Stop streams on keyspace merchant", - "/ Id 2 Keyspace customer Shard -80 Rules rules:{match:\"morders\" filter:\"select * from orders where in_keyrange(mname, 'merchant.md5', '-80')\"} at Position ", - "/ Id 2 Keyspace customer Shard -80 Rules rules:{match:\"morders\" filter:\"select * from orders where in_keyrange(mname, 'merchant.md5', '80-')\"} at Position ", - "/ Id 3 Keyspace customer Shard 80- Rules rules:{match:\"morders\" filter:\"select * from orders where in_keyrange(mname, 'merchant.md5', '-80')\"} at Position ", - "/ Id 3 Keyspace customer Shard 80- Rules rules:{match:\"morders\" filter:\"select * from orders where in_keyrange(mname, 'merchant.md5', '80-')\"} at Position ", + "/ Id 2 Keyspace customer Shard -80 Rules rules:{match:\"morders\" filter:\"select oid, cid, mname, pid, price, qty, total from orders where in_keyrange(mname, 'merchant.md5', '-80')\"} at Position ", + "/ Id 2 Keyspace customer Shard -80 Rules rules:{match:\"morders\" filter:\"select oid, cid, mname, pid, price, qty, total from orders where in_keyrange(mname, 'merchant.md5', '80-')\"} at Position ", + "/ Id 3 Keyspace customer Shard 80- Rules rules:{match:\"morders\" filter:\"select oid, cid, mname, pid, price, qty, total from orders where in_keyrange(mname, 'merchant.md5', '-80')\"} at Position ", + "/ Id 3 Keyspace customer Shard 80- Rules rules:{match:\"morders\" filter:\"select oid, cid, mname, pid, price, qty, total from orders where in_keyrange(mname, 'merchant.md5', '80-')\"} at Position ", "/ Id 4 Keyspace customer Shard -80 Rules rules:{match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '-80') group by merchant_name\"} at Position ", "/ Id 4 Keyspace customer Shard -80 Rules rules:{match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '80-') group by merchant_name\"} at Position ", "/ Id 5 Keyspace customer Shard 80- Rules rules:{match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '-80') group by merchant_name\"} at Position ", From 6b52e982ccdf1c20b1b11d84693e5d57da6b46b6 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 8 Jun 2021 17:20:51 +0200 Subject: [PATCH 9/9] Address review comments Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/table_plan_builder.go | 6 ++---- go/vt/vttablet/tabletmanager/vreplication/vreplicator.go | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 38b69f1060e..9d094b7d462 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -772,14 +772,12 @@ func (tpb *tablePlanBuilder) generatePKConstraint(buf *sqlparser.TrackedBuffer, } func (tpb *tablePlanBuilder) isColumnGenerated(col sqlparser.ColIdent) bool { - isGenerated := false for _, colInfo := range tpb.colInfos { if col.EqualString(colInfo.Name) && colInfo.IsGenerated { - isGenerated = true - break + return true } } - return isGenerated + return false } // bindvarFormatter is a dual mode formatter. Its behavior diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index cd827ba26e1..2c12568f527 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -224,7 +224,7 @@ func (vr *vreplicator) replicate(ctx context.Context) error { } } -// ColumnInfo is used to store charset and collation for primary keys where applicable +// ColumnInfo is used to store charset and collation type ColumnInfo struct { Name string CharSet string @@ -292,8 +292,8 @@ func (vr *vreplicator) buildColInfoMap(ctx context.Context) (map[string][]*Colum isPK = true } } - extra := row[5].ToString() - if strings.Contains(strings.ToLower(extra), "generated") { + extra := strings.ToLower(row[5].ToString()) + if strings.Contains(extra, "generated") || strings.Contains(extra, "virtual") { isGenerated = true } colInfo = append(colInfo, &ColumnInfo{