Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Field event: add additional attributes #6525

Merged
13 changes: 12 additions & 1 deletion go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,19 @@ func TestVStream(t *testing.T) {
Type: querypb.Type_INT64,
}},
}

gotFields := events[1].FieldEvent
if !proto.Equal(gotFields, wantFields) {
filteredFields := &binlogdatapb.FieldEvent{
TableName: gotFields.TableName,
Fields: []*querypb.Field{},
}
for _, field := range gotFields.Fields {
filteredFields.Fields = append(filteredFields.Fields, &querypb.Field{
Name: field.Name,
Type: field.Type,
})
}
if !proto.Equal(filteredFields, wantFields) {
t.Errorf("FieldEvent:\n%v, want\n%v", gotFields, wantFields)
}
wantRows := &binlogdatapb.RowEvent{
Expand Down
27 changes: 16 additions & 11 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestSchemaVersioning(t *testing.T) {
{
query: "insert into vitess_version values(1, 10)",
output: []string{
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 values:"110" > > > `,
`gtid`,
},
Expand All @@ -109,7 +109,7 @@ func TestSchemaVersioning(t *testing.T) {
}, {
query: "insert into vitess_version values(2, 20, 200)",
output: []string{
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:INT32 > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:11 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"220200" > > > `,
`gtid`,
},
Expand All @@ -124,7 +124,7 @@ func TestSchemaVersioning(t *testing.T) {
}, {
query: "insert into vitess_version values(3, 30, 'TTT')",
output: []string{
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"330TTT" > > > `,
`gtid`,
},
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestSchemaVersioning(t *testing.T) {
}, {
query: "insert into vitess_version values(4, 40, 'FFF', 'GGGG' )",
output: []string{
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > fields:<name:"id4" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > fields:<name:"id4" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id4" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 lengths:4 values:"440FFFGGGG" > > > `,
`gtid`,
},
Expand Down Expand Up @@ -219,26 +219,26 @@ func TestSchemaVersioning(t *testing.T) {
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 values:"110" > > > `,
`gtid`,
`gtid`,
`type:DDL statement:"alter table vitess_version add column id3 int" `,
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:INT32 > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:11 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"220200" > > > `,
`gtid`,
`gtid`,
`type:DDL statement:"alter table vitess_version modify column id3 varbinary(16)" `,
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"330TTT" > > > `,
`gtid`,
`gtid`,
`type:DDL statement:"alter table vitess_version add column id4 varbinary(16)" `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > fields:<name:"id4" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > fields:<name:"id4" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id4" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 lengths:4 values:"440FFFGGGG" > > > `,
`gtid`,
)
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestSchemaVersioning(t *testing.T) {
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 values:"110" > > > `,
`gtid`,
`gtid`,
Expand All @@ -300,12 +300,12 @@ func TestSchemaVersioning(t *testing.T) {
`gtid`,
/*at this point we only have latest schema so we have types (int32, int32, varbinary, varbinary),
but the three fields below match the first three types in the latest, so the field names are correct*/
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"330TTT" > > > `,
`gtid`,
`gtid`,
`type:DDL statement:"alter table vitess_version add column id4 varbinary(16)" `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > fields:<name:"id4" type:VARBINARY > > `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id1" column_length:11 charset:63 > fields:<name:"id2" type:INT32 table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id2" column_length:11 charset:63 > fields:<name:"id3" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id3" column_length:16 charset:63 > fields:<name:"id4" type:VARBINARY table:"vitess_version" org_table:"vitess_version" database:"vttest" org_name:"id4" column_length:16 charset:63 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 lengths:4 values:"440FFFGGGG" > > > `,
`gtid`,
)
Expand Down Expand Up @@ -407,6 +407,11 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan []
}
default:
evs[i].Timestamp = 0
if evs[i].Type == binlogdatapb.VEventType_FIELD {
for j := range evs[i].FieldEvent.Fields {
evs[i].FieldEvent.Fields[j].Flags = 0
}
}
if got := fmt.Sprintf("%v", evs[i]); got != want {
t.Fatalf("%v (%d): event:\n%q, want\n%q", query, i, got, want)
}
Expand Down
27 changes: 9 additions & 18 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,7 @@ type ColExpr struct {
Vindex vindexes.Vindex
VindexColumns []int

// Alias is usually the column name, but it can be changed
// if the select expression aliases with an "AS" expression.
// Also, "keyspace_id()" will be aliased as "keyspace_id".
// This Alias is sent as field info for the returned stream.
Alias sqlparser.ColIdent
Type querypb.Type
Field *querypb.Field
}

// Table contains the metadata for a table.
Expand All @@ -103,10 +98,7 @@ type Table struct {
func (plan *Plan) fields() []*querypb.Field {
fields := make([]*querypb.Field, len(plan.ColExprs))
for i, ce := range plan.ColExprs {
fields[i] = &querypb.Field{
Name: ce.Alias.String(),
Type: ce.Type,
}
fields[i] = ce.Field
}
return fields
}
Expand Down Expand Up @@ -264,8 +256,7 @@ func buildREPlan(ti *Table, vschema *localVSchema, filter string) (*Plan, error)
plan.ColExprs = make([]ColExpr, len(ti.Fields))
for i, col := range ti.Fields {
plan.ColExprs[i].ColNum = i
plan.ColExprs[i].Alias = sqlparser.NewColIdent(col.Name)
plan.ColExprs[i].Type = col.Type
plan.ColExprs[i].Field = col
}
if filter == "" {
return plan, nil
Expand Down Expand Up @@ -435,8 +426,7 @@ func (plan *Plan) analyzeExprs(vschema *localVSchema, selExprs sqlparser.SelectE
plan.ColExprs = make([]ColExpr, len(plan.Table.Fields))
for i, col := range plan.Table.Fields {
plan.ColExprs[i].ColNum = i
plan.ColExprs[i].Alias = sqlparser.NewColIdent(col.Name)
plan.ColExprs[i].Type = col.Type
plan.ColExprs[i].Field = col
}
}
return nil
Expand All @@ -462,8 +452,7 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp
}
return ColExpr{
ColNum: colnum,
Alias: as,
Type: plan.Table.Fields[colnum].Type,
Field: plan.Table.Fields[colnum],
}, nil
case *sqlparser.FuncExpr:
if inner.Name.Lowered() != "keyspace_id" {
Expand All @@ -481,10 +470,12 @@ func (plan *Plan) analyzeExpr(vschema *localVSchema, selExpr sqlparser.SelectExp
return ColExpr{}, err
}
return ColExpr{
Field: &querypb.Field{
Name: "keyspace_id",
Type: sqltypes.VarBinary,
},
Vindex: cv.Vindex,
VindexColumns: vindexColumns,
Alias: sqlparser.NewColIdent("keyspace_id"),
Type: sqltypes.VarBinary,
}, nil
default:
return ColExpr{}, fmt.Errorf("unsupported: %v", sqlparser.String(aliased.Expr))
Expand Down
Loading