Skip to content

Commit

Permalink
Merge pull request #8129 from planetscale/rn-vr-gen-columns
Browse files Browse the repository at this point in the history
VReplication: ignore generated columns in workflows
  • Loading branch information
rohit-nayak-ps authored Jun 10, 2021
2 parents 36657df + 6b52e98 commit ccdb446
Show file tree
Hide file tree
Showing 15 changed files with 416 additions and 106 deletions.
5 changes: 5 additions & 0 deletions go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func getClusterConfig(idx int, dataRootDir string) *ClusterConfig {
}

func init() {
// for local debugging set this variable so that each run uses VTDATAROOT instead of a random dir
// and also does not teardown the cluster for inspecting logs and the databases
if os.Getenv("VREPLICATION_E2E_DEBUG") != "" {
debug = true
}
rand.Seed(time.Now().UTC().UnixNano())
originalVtdataroot = os.Getenv("VTDATAROOT")
var mainVtDataRoot string
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ create table product(pid int, description varbinary(128), primary key(pid));
create table customer(cid int, name varbinary(128), meta json default null, typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid)) CHARSET=utf8mb4;
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table merchant(mname varchar(128), category varchar(128), primary key(mname)) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
create table orders(oid int, cid int, pid int, mname varchar(128), price int, primary key(oid));
create table orders(oid int, cid int, pid int, mname varchar(128), price int, qty int, total int as (qty * price), total2 int as (qty * price) stored, primary key(oid));
create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table customer2(cid int, name varbinary(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid));
create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
Expand Down Expand Up @@ -243,8 +243,8 @@ create table tenant(tenant_id binary(16), name varbinary(16), primary key (tenan
"targetKeyspace": "merchant",
"tableSettings": [{
"targetTable": "morders",
"sourceExpression": "select * from orders",
"create_ddl": "create table morders(oid int, cid int, mname varchar(128), pid int, price int, primary key(oid))"
"sourceExpression": "select oid, cid, mname, pid, price, qty, total from orders",
"create_ddl": "create table morders(oid int, cid int, mname varchar(128), pid int, price int, qty int, total int, total2 int as (10 * total), primary key(oid))"
}]
}
`
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
}
if !match {
fail = true
t.Logf("want %s, got %s\n", w, gotDryRun[i])
t.Fatalf("want %s, got %s\n", w, gotDryRun[i])
}
}
if fail {
t.Fatal("Dry run results don't match")
t.Fatalf("Dry run results don't match, want %s, got %s", want, gotDryRun)
}
}

Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ insert into merchant(mname, category) values('monoprice', 'electronics');
insert into merchant(mname, category) values('newegg', 'electronics');
insert into product(pid, description) values(1, 'keyboard');
insert into product(pid, description) values(2, 'monitor');
insert into orders(oid, cid, mname, pid, price) values(1, 1, 'monoprice', 1, 10);
insert into orders(oid, cid, mname, pid, price) values(2, 1, 'newegg', 2, 15);
insert into orders(oid, cid, mname, pid, price) values(3, 2, 'monoprice', 2, 20);
insert into orders(oid, cid, mname, pid, price, qty) values(1, 1, 'monoprice', 1, 10, 1);
insert into orders(oid, cid, mname, pid, price, qty) values(2, 1, 'newegg', 2, 15, 2);
insert into orders(oid, cid, mname, pid, price, qty) values(3, 2, 'monoprice', 2, 20, 3);
insert into customer2(cid, name, typ, sport) values(1, 'john',1,'football,baseball');
insert into customer2(cid, name, typ, sport) values(2, 'paul','soho','cricket');
insert into customer2(cid, name, typ, sport) values(3, 'ringo','enterprise','');
Expand Down
8 changes: 4 additions & 4 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ var dryRunResultsReadCustomerShard = []string{
var dryRunResultsSwitchWritesM2m3 = []string{
"Lock keyspace merchant",
"Stop streams on keyspace merchant",
"/ Id 2 Keyspace customer Shard -80 Rules rules:{match:\"morders\" filter:\"select * from orders where in_keyrange(mname, 'merchant.md5', '-80')\"} at Position ",
"/ Id 2 Keyspace customer Shard -80 Rules rules:{match:\"morders\" filter:\"select * from orders where in_keyrange(mname, 'merchant.md5', '80-')\"} at Position ",
"/ Id 3 Keyspace customer Shard 80- Rules rules:{match:\"morders\" filter:\"select * from orders where in_keyrange(mname, 'merchant.md5', '-80')\"} at Position ",
"/ Id 3 Keyspace customer Shard 80- Rules rules:{match:\"morders\" filter:\"select * from orders where in_keyrange(mname, 'merchant.md5', '80-')\"} at Position ",
"/ Id 2 Keyspace customer Shard -80 Rules rules:{match:\"morders\" filter:\"select oid, cid, mname, pid, price, qty, total from orders where in_keyrange(mname, 'merchant.md5', '-80')\"} at Position ",
"/ Id 2 Keyspace customer Shard -80 Rules rules:{match:\"morders\" filter:\"select oid, cid, mname, pid, price, qty, total from orders where in_keyrange(mname, 'merchant.md5', '80-')\"} at Position ",
"/ Id 3 Keyspace customer Shard 80- Rules rules:{match:\"morders\" filter:\"select oid, cid, mname, pid, price, qty, total from orders where in_keyrange(mname, 'merchant.md5', '-80')\"} at Position ",
"/ Id 3 Keyspace customer Shard 80- Rules rules:{match:\"morders\" filter:\"select oid, cid, mname, pid, price, qty, total from orders where in_keyrange(mname, 'merchant.md5', '80-')\"} at Position ",
"/ Id 4 Keyspace customer Shard -80 Rules rules:{match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '-80') group by merchant_name\"} at Position ",
"/ Id 4 Keyspace customer Shard -80 Rules rules:{match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '80-') group by merchant_name\"} at Position ",
"/ Id 5 Keyspace customer Shard 80- Rules rules:{match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '-80') group by merchant_name\"} at Position ",
Expand Down
46 changes: 37 additions & 9 deletions go/vt/sqlparser/parsed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

// ParsedQuery represents a parsed query where
// bind locations are precompued for fast substitutions.
// bind locations are precomputed for fast substitutions.
type ParsedQuery struct {
Query string
bindLocations []bindLocation
Expand Down Expand Up @@ -86,29 +86,57 @@ func (pq *ParsedQuery) Append(buf *strings.Builder, bindVariables map[string]*qu
}

// AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that
// the fields in the row are in the same order as the placeholders in this query.
func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field, row *querypb.Row) error {
// the fields in the row are in the same order as the placeholders in this query. The fields might include generated
// columns which are dropped, by checking against skipFields, before binding the variables
// note: there can be more fields than bind locations since extra columns might be requested from the source if not all
// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for
// values from the database on the source: sum/count for aggregation queries, for example
func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field, row *querypb.Row, skipFields map[string]bool) error {
if len(fields) < len(pq.bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ", len(fields), len(pq.bindLocations))
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
len(fields), len(pq.bindLocations))
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
}
rowInfo := make([]*colInfo, 0)

offset := int64(0)
for i, field := range fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !skipFields[strings.ToLower(field.Name)] {
rowInfo = append(rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
})
}
if length > 0 {
offset += row.Lengths[i]
}
}

// bind field values to locations
var offsetQuery int
var offsetRow int64
for i, loc := range pq.bindLocations {
col := rowInfo[i]
buf.WriteString(pq.Query[offsetQuery:loc.offset])

typ := fields[i].Type
typ := col.typ
if typ == querypb.Type_TUPLE {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i)
}

length := row.Lengths[i]
length := col.length
if length < 0 {
// -1 means a null variable; serialize it directly
buf.WriteString("null")
} else {
vv := sqltypes.MakeTrusted(typ, row.Values[offsetRow:offsetRow+length])
vv := sqltypes.MakeTrusted(typ, row.Values[col.offset:col.offset+col.length])
vv.EncodeSQLBytes2(buf)
offsetRow += length
}

offsetQuery = loc.offset + loc.length
Expand Down
28 changes: 21 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type ReplicatorPlan struct {
VStreamFilter *binlogdatapb.Filter
TargetTables map[string]*TablePlan
TablePlans map[string]*TablePlan
PKInfoMap map[string][]*PrimaryKeyInfo
ColInfoMap map[string][]*ColumnInfo
stats *binlogplayer.Stats
}

Expand Down Expand Up @@ -94,13 +94,26 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent
// requires us to wait for the field info sent by the source.
func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Result, fields []*querypb.Field) (*TablePlan, error) {
tpb := &tablePlanBuilder{
name: sqlparser.NewTableIdent(tableName),
lastpk: lastpk,
pkInfos: rp.PKInfoMap[tableName],
stats: rp.stats,
name: sqlparser.NewTableIdent(tableName),
lastpk: lastpk,
colInfos: rp.ColInfoMap[tableName],
stats: rp.stats,
}
for _, field := range fields {
colName := sqlparser.NewColIdent(field.Name)
isGenerated := false
for _, colInfo := range tpb.colInfos {
if !strings.EqualFold(colInfo.Name, field.Name) {
continue
}
if colInfo.IsGenerated {
isGenerated = true
}
break
}
if isGenerated {
continue
}
cexpr := &colExpr{
colName: colName,
colType: field.Type,
Expand All @@ -114,7 +127,7 @@ func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Res
tpb.colExprs = append(tpb.colExprs, cexpr)
}
// The following actions are a subset of buildTablePlan.
if err := tpb.analyzePK(rp.PKInfoMap); err != nil {
if err := tpb.analyzePK(rp.ColInfoMap); err != nil {
return nil, err
}
return tpb.generate(), nil
Expand Down Expand Up @@ -183,6 +196,7 @@ type TablePlan struct {
// a primary key column (row move).
PKReferences []string
Stats *binlogplayer.Stats
FieldsToSkip map[string]bool
}

// MarshalJSON performs a custom JSON Marshalling.
Expand Down Expand Up @@ -220,7 +234,7 @@ func (tp *TablePlan) applyBulkInsert(sqlbuffer *bytes2.Buffer, rows *binlogdatap
if i > 0 {
sqlbuffer.WriteString(", ")
}
if err := tp.BulkInsertValues.AppendFromRow(sqlbuffer, tp.Fields, row); err != nil {
if err := tp.BulkInsertValues.AppendFromRow(sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil {
return nil, err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ func TestBuildPlayerPlan(t *testing.T) {
err: "group by expression is not allowed to reference an aggregate expression: a",
}}

PrimaryKeyInfos := map[string][]*PrimaryKeyInfo{
"t1": {&PrimaryKeyInfo{Name: "c1"}},
PrimaryKeyInfos := map[string][]*ColumnInfo{
"t1": {&ColumnInfo{Name: "c1", IsPK: true}},
}

copyState := map[string]*sqltypes.Result{
Expand Down Expand Up @@ -711,9 +711,9 @@ func TestBuildPlayerPlan(t *testing.T) {
}

func TestBuildPlayerPlanNoDup(t *testing.T) {
PrimaryKeyInfos := map[string][]*PrimaryKeyInfo{
"t1": {&PrimaryKeyInfo{Name: "c1"}},
"t2": {&PrimaryKeyInfo{Name: "c2"}},
PrimaryKeyInfos := map[string][]*ColumnInfo{
"t1": {&ColumnInfo{Name: "c1"}},
"t2": {&ColumnInfo{Name: "c2"}},
}
input := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand All @@ -732,9 +732,9 @@ func TestBuildPlayerPlanNoDup(t *testing.T) {
}

func TestBuildPlayerPlanExclude(t *testing.T) {
PrimaryKeyInfos := map[string][]*PrimaryKeyInfo{
"t1": {&PrimaryKeyInfo{Name: "c1"}},
"t2": {&PrimaryKeyInfo{Name: "c2"}},
PrimaryKeyInfos := map[string][]*ColumnInfo{
"t1": {&ColumnInfo{Name: "c1"}},
"t2": {&ColumnInfo{Name: "c2"}},
}
input := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down
Loading

0 comments on commit ccdb446

Please sign in to comment.