Skip to content

Commit

Permalink
Merge pull request #6525 from planetscale/rn-field-event-additional-a…
Browse files Browse the repository at this point in the history
…ttributes

 Field event: add additional attributes
  • Loading branch information
sougou authored Aug 2, 2020
2 parents 9fad4b0 + e325eb4 commit 6a63c22
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 173 deletions.
13 changes: 12 additions & 1 deletion go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,19 @@ func TestVStream(t *testing.T) {
Type: querypb.Type_INT64,
}},
}

gotFields := events[1].FieldEvent
if !proto.Equal(gotFields, wantFields) {
filteredFields := &binlogdatapb.FieldEvent{
TableName: gotFields.TableName,
Fields: []*querypb.Field{},
}
for _, field := range gotFields.Fields {
filteredFields.Fields = append(filteredFields.Fields, &querypb.Field{
Name: field.Name,
Type: field.Type,
})
}
if !proto.Equal(filteredFields, wantFields) {
t.Errorf("FieldEvent:\n%v, want\n%v", gotFields, wantFields)
}
wantRows := &binlogdatapb.RowEvent{
Expand Down
27 changes: 16 additions & 11 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestSchemaVersioning(t *testing.T) {
{
query: "insert into vitess_version values(1, 10)",
output: []string{
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 values:"110" > > > `,
`gtid`,
},
Expand All @@ -109,7 +109,7 @@ func TestSchemaVersioning(t *testing.T) {
}, {
query: "insert into vitess_version values(2, 20, 200)",
output: []string{
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:INT32 > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:11 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"220200" > > > `,
`gtid`,
},
Expand All @@ -124,7 +124,7 @@ func TestSchemaVersioning(t *testing.T) {
}, {
query: "insert into vitess_version values(3, 30, 'TTT')",
output: []string{
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"330TTT" > > > `,
`gtid`,
},
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestSchemaVersioning(t *testing.T) {
}, {
query: "insert into vitess_version values(4, 40, 'FFF', 'GGGG' )",
output: []string{
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > fields:<name:"id4" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > fields:<name:"id4" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id4" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 lengths:4 values:"440FFFGGGG" > > > `,
`gtid`,
},
Expand Down Expand Up @@ -219,26 +219,26 @@ func TestSchemaVersioning(t *testing.T) {
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 values:"110" > > > `,
`gtid`,
`gtid`,
`type:DDL statement:"alter table vitess_version add column id3 int" `,
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:INT32 > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:11 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"220200" > > > `,
`gtid`,
`gtid`,
`type:DDL statement:"alter table vitess_version modify column id3 varbinary(16)" `,
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"330TTT" > > > `,
`gtid`,
`gtid`,
`type:DDL statement:"alter table vitess_version add column id4 varbinary(16)" `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > fields:<name:"id4" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > fields:<name:"id4" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id4" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 lengths:4 values:"440FFFGGGG" > > > `,
`gtid`,
)
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestSchemaVersioning(t *testing.T) {
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 values:"110" > > > `,
`gtid`,
`gtid`,
Expand All @@ -300,12 +300,12 @@ func TestSchemaVersioning(t *testing.T) {
`gtid`,
/*at this point we only have latest schema so we have types (int32, int32, varbinary, varbinary),
but the three fields below match the first three types in the latest, so the field names are correct*/
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"330TTT" > > > `,
`gtid`,
`gtid`,
`type:DDL statement:"alter table vitess_version add column id4 varbinary(16)" `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > fields:<name:"id4" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > fields:<name:"id4" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id4" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 lengths:4 values:"440FFFGGGG" > > > `,
`gtid`,
)
Expand Down Expand Up @@ -407,6 +407,11 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan []
}
default:
evs[i].Timestamp = 0
if evs[i].Type == binlogdatapb.VEventType_FIELD {
for j := range evs[i].FieldEvent.Fields {
evs[i].FieldEvent.Fields[j].Flags = 0
}
}
if got := fmt.Sprintf("%v", evs[i]); got != want {
t.Fatalf("%v (%d): event:\n%q, want\n%q", query, i, got, want)
}
Expand Down
27 changes: 9 additions & 18 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,7 @@ type ColExpr struct {
Vindex vindexes.Vindex
VindexColumns []int

// Alias is usually the column name, but it can be changed
// if the select expression aliases with an "AS" expression.
// Also, "keyspace_id()" will be aliased as "keyspace_id".
// This Alias is sent as field info for the returned stream.
Alias sqlparser.ColIdent
Type querypb.Type
Field *querypb.Field
}

// Table contains the metadata for a table.
Expand All @@ -103,10 +98,7 @@ type Table struct {
func (plan *Plan) fields() []*querypb.Field {
fields := make([]*querypb.Field, len(plan.ColExprs))
for i, ce := range plan.ColExprs {
fields[i] = &querypb.Field{
Name: ce.Alias.String(),
Type: ce.Type,
}
fields[i] = ce.Field
}
return fields
}
Expand Down Expand Up @@ -264,8 +256,7 @@ func buildREPlan(ti *Table, vschema *localVSchema, filter string) (*Plan, error)
plan.ColExprs = make([]ColExpr, len(ti.Fields))
for i, col := range ti.Fields {
plan.ColExprs[i].ColNum = i
plan.ColExprs[i].Alias = sqlparser.NewColIdent(col.Name)
plan.ColExprs[i].Type = col.Type
plan.ColExprs[i].Field = col
}
if filter == "" {
return plan, nil
Expand Down Expand Up @@ -435,8 +426,7 @@ func (plan *Plan) analyzeExprs(vschema *localVSchema, selExprs sqlparser.SelectE
plan.ColExprs = make([]ColExpr, len(plan.Table.Fields))
for i, col := range plan.Table.Fields {
plan.ColExprs[i].ColNum = i
plan.ColExprs[i].Alias = sqlparser.NewColIdent(col.Name)
plan.ColExprs[i].Type = col.Type
plan.ColExprs[i].Field = col
}
}
return nil
Expand All @@ -462,8 +452,7 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp
}
return ColExpr{
ColNum: colnum,
Alias: as,
Type: plan.Table.Fields[colnum].Type,
Field: plan.Table.Fields[colnum],
}, nil
case *sqlparser.FuncExpr:
if inner.Name.Lowered() != "keyspace_id" {
Expand All @@ -481,10 +470,12 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp
return ColExpr{}, err
}
return ColExpr{
Field: &querypb.Field{
Name: "keyspace_id",
Type: sqltypes.VarBinary,
},
Vindex: cv.Vindex,
VindexColumns: vindexColumns,
Alias: sqlparser.NewColIdent("keyspace_id"),
Type: sqltypes.VarBinary,
}, nil
default:
return ColExpr{}, fmt.Errorf("unsupported: %v", sqlparser.String(aliased.Expr))
Expand Down
Loading

0 comments on commit 6a63c22

Please sign in to comment.