diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 171ef3098b5..76067491c9b 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -18,9 +18,7 @@ import ( "context" "encoding/binary" "encoding/json" - "fmt" "math/rand" - "strconv" "strings" "time" @@ -378,11 +376,7 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill if !exist { return nil, errors.NotFoundf("column info, colID: %d", index) } - // the judge about `fillWithDefaultValue` is tricky - // if the `fillWithDefaultValue` is true, the event must be deletion - // we should output the generated column in deletion event - // this tricky code will be improve after pingcap/ticdc#787 merged - if !tableInfo.IsColWritable(colInfo) && fillWithDefaultValue { + if !tableInfo.IsColCDCVisible(colInfo) { continue } colName := colInfo.Name.O @@ -401,24 +395,28 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill } cols[colName] = col } - if !fillWithDefaultValue { - return cols, nil - } - for _, col := range tableInfo.Columns { - _, ok := cols[col.Name.O] - if !ok && tableInfo.IsColWritable(col) { - column := &model.Column{ - Type: col.Tp, - Value: getDefaultOrZeroValue(col), - Flag: transColumnFlag(col), - } - if tableInfo.IsColumnUnique(col.ID) { - whereHandle := true - column.WhereHandle = &whereHandle + if fillWithDefaultValue { + for _, col := range tableInfo.Columns { + _, ok := cols[col.Name.O] + if !ok && tableInfo.IsColCDCVisible(col) { + column := &model.Column{ + Type: col.Tp, + Value: getDefaultOrZeroValue(col), + Flag: transColumnFlag(col), + } + if tableInfo.IsColumnUnique(col.ID) { + whereHandle := true + column.WhereHandle = &whereHandle + } + cols[col.Name.O] = column } - cols[col.Name.O] = column } } + + err := setHandleKeyFlag(tableInfo, cols) + if err != nil { + return nil, errors.Trace(err) + } return cols, nil } @@ -480,11 +478,43 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr }, nil } +func setHandleKeyFlag(tableInfo *model.TableInfo, colValues map[string]*model.Column) error { + switch tableInfo.HandleIndexID { + case model.HandleIndexTableIneligible: + log.Fatal("this table is not eligible", zap.Int64("tableID", tableInfo.ID)) + case model.HandleIndexPKIsHandle: + // pk is handle + if !tableInfo.PKIsHandle { + log.Fatal("the pk of this table is not handle", zap.Int64("tableID", tableInfo.ID)) + } + for _, colInfo := range tableInfo.Columns { + if mysql.HasPriKeyFlag(colInfo.Flag) { + colValues[colInfo.Name.O].Flag.SetIsHandleKey() + break + } + } + default: + handleIndexInfo, exist := tableInfo.GetIndexInfo(tableInfo.HandleIndexID) + if !exist { + return errors.NotFoundf("handle index info(%d) in table(%d)", tableInfo.HandleIndexID, tableInfo.ID) + } + for _, colInfo := range handleIndexInfo.Columns { + colName := tableInfo.Columns[colInfo.Offset].Name.O + colValues[colName].Flag.SetIsHandleKey() + } + } + return nil +} + func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKVEntry) (*model.RowChangedEvent, error) { // skip set index KV if !idx.Delete || m.enableOldValue { return nil, nil } + // skip any index that is not the handle + if idx.IndexID != tableInfo.HandleIndexID { + return nil, nil + } indexInfo, exist := tableInfo.GetIndexInfo(idx.IndexID) if !exist { @@ -508,12 +538,14 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKV return nil, errors.Trace(err) } whereHandle := true - preCols[idxCol.Name.O] = &model.Column{ + col := &model.Column{ Type: tableInfo.Columns[idxCol.Offset].Tp, WhereHandle: &whereHandle, Value: value, Flag: transColumnFlag(tableInfo.Columns[idxCol.Offset]), } + col.Flag.SetIsHandleKey() + preCols[idxCol.Name.O] = col } return &model.RowChangedEvent{ StartTs: idx.StartTs, @@ -664,55 +696,14 @@ func genMultipleKeys(ti *timodel.TableInfo, preCols, cols map[string]*model.Colu return multipleKeys } -func columnValue(value interface{}) string { - var data string - switch v := value.(type) { - case nil: - data = "null" - case bool: - if v { - data = "1" - } else { - data = "0" - } - case int: - data = strconv.FormatInt(int64(v), 10) - case int8: - data = strconv.FormatInt(int64(v), 10) - case int16: - data = strconv.FormatInt(int64(v), 10) - case int32: - data = strconv.FormatInt(int64(v), 10) - case int64: - data = strconv.FormatInt(int64(v), 10) - case uint8: - data = strconv.FormatUint(uint64(v), 10) - case uint16: - data = strconv.FormatUint(uint64(v), 10) - case uint32: - data = strconv.FormatUint(uint64(v), 10) - case uint64: - data = strconv.FormatUint(uint64(v), 10) - case float32: - data = strconv.FormatFloat(float64(v), 'f', -1, 32) - case float64: - data = strconv.FormatFloat(float64(v), 'f', -1, 64) - case string: - data = v - case []byte: - data = string(v) - default: - data = fmt.Sprintf("%v", v) - } - - return data -} - func transColumnFlag(col *timodel.ColumnInfo) model.ColumnFlagType { var flag model.ColumnFlagType if col.Charset == "binary" { flag.SetIsBinary() } + if col.IsGenerated() { + flag.SetIsGeneratedColumn() + } return flag } @@ -724,7 +715,7 @@ func genKeyList(table string, columns []*timodel.ColumnInfo, values map[string]* log.L().Debug("ignore null value", zap.String("column", col.Name.O), zap.String("table", table)) continue // ignore `null` value. } - buf.WriteString(columnValue(val.Value)) + buf.WriteString(model.ColumnValueString(val.Value)) } if buf.Len() == 0 { log.L().Debug("all value are nil, no key generated", zap.String("table", table)) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 893119f84d4..8b0912a39ee 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -21,6 +21,13 @@ import ( "github.com/pingcap/tidb/util/rowcodec" ) +const ( + // HandleIndexPKIsHandle represents that the handle index is the pk and the pk is the handle + HandleIndexPKIsHandle = -1 + // HandleIndexTableIneligible represents that the table is ineligible + HandleIndexTableIneligible = -2 +) + // TableInfo provides meta data describing a DB table. type TableInfo struct { *model.TableInfo @@ -30,7 +37,15 @@ type TableInfo struct { columnsOffset map[int64]int indicesOffset map[int64]int uniqueColumns map[int64]struct{} - handleColID int64 + + // only for new row format decoder + handleColID int64 + + // the mounter will choose this index to output delete events + // special value: + // HandleIndexPKIsHandle(-1) : pk is handle + // HandleIndexTableIneligible(-2) : the table is not eligible + HandleIndexID int64 // if the table of this row only has one unique index(includes primary key), // IndieMarkCol will be set to the name of the unique index @@ -49,6 +64,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode indicesOffset: make(map[int64]int, len(info.Indices)), uniqueColumns: make(map[int64]struct{}), handleColID: -1, + HandleIndexID: HandleIndexTableIneligible, rowColInfos: make([]rowcodec.ColInfo, len(info.Columns)), } @@ -59,6 +75,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode isPK := (ti.PKIsHandle && mysql.HasPriKeyFlag(col.Flag)) || col.ID == model.ExtraHandleID if isPK { ti.handleColID = col.ID + ti.HandleIndexID = HandleIndexPKIsHandle ti.uniqueColumns[col.ID] = struct{}{} uniqueIndexNum++ } @@ -90,10 +107,39 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode } } } - + ti.findHandleIndex() return ti } +func (ti *TableInfo) findHandleIndex() { + if ti.HandleIndexID == HandleIndexPKIsHandle { + // pk is handle + return + } + handleIndexOffset := -1 + for i, idx := range ti.Indices { + if !ti.IsIndexUnique(idx) { + continue + } + if idx.Primary { + handleIndexOffset = i + break + } + if handleIndexOffset < 0 { + handleIndexOffset = i + } else { + if len(ti.Indices[handleIndexOffset].Columns) > len(ti.Indices[i].Columns) || + (len(ti.Indices[handleIndexOffset].Columns) == len(ti.Indices[i].Columns) && + ti.Indices[handleIndexOffset].ID > ti.Indices[i].ID) { + handleIndexOffset = i + } + } + } + if handleIndexOffset >= 0 { + ti.HandleIndexID = ti.Indices[handleIndexOffset].ID + } +} + // GetColumnInfo returns the column info by ID func (ti *TableInfo) GetColumnInfo(colID int64) (info *model.ColumnInfo, exist bool) { colOffset, exist := ti.columnsOffset[colID] @@ -121,9 +167,13 @@ func (ti *TableInfo) GetRowColInfos() (int64, []rowcodec.ColInfo) { return ti.handleColID, ti.rowColInfos } -// IsColWritable returns is the col is writeable -func (ti *TableInfo) IsColWritable(col *model.ColumnInfo) bool { - return col.State == model.StatePublic && !col.IsGenerated() +// IsColCDCVisible returns whether the col is visible for CDC +func (ti *TableInfo) IsColCDCVisible(col *model.ColumnInfo) bool { + // this column is a virtual generated column + if col.IsGenerated() && !col.GeneratedStored { + return false + } + return col.State == model.StatePublic } // GetUniqueKeys returns all unique keys of the table as a slice of column names @@ -180,7 +230,12 @@ func (ti *TableInfo) IsIndexUnique(indexInfo *model.IndexInfo) bool { } if indexInfo.Unique { for _, col := range indexInfo.Columns { - if !mysql.HasNotNullFlag(ti.Columns[col.Offset].Flag) { + colInfo := ti.Columns[col.Offset] + if !mysql.HasNotNullFlag(colInfo.Flag) { + return false + } + // this column is a virtual generated column + if colInfo.IsGenerated() && !colInfo.GeneratedStored { return false } } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 3e18fe7fc32..0c5270aae1f 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -15,6 +15,7 @@ package model import ( "fmt" + "strconv" "github.com/pingcap/log" "github.com/pingcap/parser/model" @@ -40,8 +41,12 @@ const ( type ColumnFlagType util.Flag const ( - // BinaryFlag means col charset is binary + // BinaryFlag means the column charset is binary BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) + // HandleKeyFlag means the column is selected as the handle key + HandleKeyFlag + // GeneratedColumnFlag means the column is a generated column + GeneratedColumnFlag ) //SetIsBinary set BinaryFlag @@ -59,6 +64,36 @@ func (b *ColumnFlagType) IsBinary() bool { return (*util.Flag)(b).HasAll(util.Flag(BinaryFlag)) } +//SetIsHandleKey set HandleKey +func (b *ColumnFlagType) SetIsHandleKey() { + (*util.Flag)(b).Add(util.Flag(HandleKeyFlag)) +} + +//UnsetIsHandleKey unset HandleKey +func (b *ColumnFlagType) UnsetIsHandleKey() { + (*util.Flag)(b).Remove(util.Flag(HandleKeyFlag)) +} + +//IsHandleKey show whether HandleKey is set +func (b *ColumnFlagType) IsHandleKey() bool { + return (*util.Flag)(b).HasAll(util.Flag(HandleKeyFlag)) +} + +//SetIsGeneratedColumn set GeneratedColumn +func (b *ColumnFlagType) SetIsGeneratedColumn() { + (*util.Flag)(b).Add(util.Flag(GeneratedColumnFlag)) +} + +//UnsetIsGeneratedColumn unset GeneratedColumn +func (b *ColumnFlagType) UnsetIsGeneratedColumn() { + (*util.Flag)(b).Remove(util.Flag(GeneratedColumnFlag)) +} + +//IsGeneratedColumn show whether GeneratedColumn is set +func (b *ColumnFlagType) IsGeneratedColumn() bool { + return (*util.Flag)(b).HasAll(util.Flag(GeneratedColumnFlag)) +} + // TableName represents name of a table, includes table name and schema name. type TableName struct { Schema string `toml:"db-name" json:"db-name"` @@ -109,12 +144,58 @@ type RowChangedEvent struct { // Column represents a column value in row changed event type Column struct { - Type byte `json:"t"` + Type byte `json:"t"` + // WhereHandle is deprecation + // WhereHandle is replaced by HandleKey in Flag WhereHandle *bool `json:"h,omitempty"` Flag ColumnFlagType `json:"f"` Value interface{} `json:"v"` } +// ColumnValueString returns the string representation of the column value +func ColumnValueString(c interface{}) string { + var data string + switch v := c.(type) { + case nil: + data = "null" + case bool: + if v { + data = "1" + } else { + data = "0" + } + case int: + data = strconv.FormatInt(int64(v), 10) + case int8: + data = strconv.FormatInt(int64(v), 10) + case int16: + data = strconv.FormatInt(int64(v), 10) + case int32: + data = strconv.FormatInt(int64(v), 10) + case int64: + data = strconv.FormatInt(int64(v), 10) + case uint8: + data = strconv.FormatUint(uint64(v), 10) + case uint16: + data = strconv.FormatUint(uint64(v), 10) + case uint32: + data = strconv.FormatUint(uint64(v), 10) + case uint64: + data = strconv.FormatUint(uint64(v), 10) + case float32: + data = strconv.FormatFloat(float64(v), 'f', -1, 32) + case float64: + data = strconv.FormatFloat(float64(v), 'f', -1, 64) + case string: + data = v + case []byte: + data = string(v) + default: + data = fmt.Sprintf("%v", v) + } + return data +} + // ColumnInfo represents the name and type information passed to the sink type ColumnInfo struct { Name string diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index bee71123df4..03f6909f967 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -21,9 +21,11 @@ var _ = check.Suite(&columnFlagTypeSuite{}) func (s *configSuite) TestBinaryFlag(c *check.C) { var flag ColumnFlagType - c.Assert(flag.IsBinary(), check.IsFalse) flag.SetIsBinary() + flag.SetIsGeneratedColumn() c.Assert(flag.IsBinary(), check.IsTrue) + c.Assert(flag.IsHandleKey(), check.IsFalse) + c.Assert(flag.IsGeneratedColumn(), check.IsTrue) flag.UnsetIsBinary() c.Assert(flag.IsBinary(), check.IsFalse) } diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 27f955910d9..3395272dee4 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -44,13 +44,14 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model zap.Uint64("CommitTs", row.CommitTs), zap.Uint64("checkpointTs", checkpointTs)) } + log.Debug("BlockHoleSink: EmitRowChangedEvents", zap.Any("row", row)) } atomic.AddUint64(&b.accumulated, uint64(len(rows))) return nil } func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) error { - log.Info("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs)) + log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs)) err := b.statistics.RecordBatchExecution(func() (int, error) { // TODO: add some random replication latency accumulated := atomic.LoadUint64(&b.accumulated) @@ -64,12 +65,12 @@ func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs ui } func (b *blackHoleSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { - log.Info("BlockHoleSink: Checkpoint Event", zap.Uint64("ts", ts)) + log.Debug("BlockHoleSink: Checkpoint Event", zap.Uint64("ts", ts)) return nil } func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - log.Info("BlockHoleSink: DDL Event", zap.Any("ddl", ddl)) + log.Debug("BlockHoleSink: DDL Event", zap.Any("ddl", ddl)) return nil } diff --git a/cdc/sink/dispatcher/default.go b/cdc/sink/dispatcher/default.go index 229c4460b34..8a04196b587 100644 --- a/cdc/sink/dispatcher/default.go +++ b/cdc/sink/dispatcher/default.go @@ -14,48 +14,26 @@ package dispatcher import ( - "encoding/json" - "hash/crc32" - - "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" - "go.uber.org/zap" ) type defaultDispatcher struct { partitionNum int32 + tbd *tableDispatcher + ivd *indexValueDispatcher } -func (d *defaultDispatcher) Dispatch(row *model.RowChangedEvent) int32 { - hash := crc32.NewIEEE() - if len(row.IndieMarkCol) == 0 { - // distribute partition by table - _, err := hash.Write([]byte(row.Table.Schema)) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) - } - _, err = hash.Write([]byte(row.Table.Table)) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) - } - return int32(hash.Sum32() % uint32(d.partitionNum)) +func newDefaultDispatcher(partitionNum int32) *defaultDispatcher { + return &defaultDispatcher{ + partitionNum: partitionNum, + tbd: newTableDispatcher(partitionNum), + ivd: newIndexValueDispatcher(partitionNum), } - // FIXME(leoppro): if the row events includes both pre-cols and cols - // the dispatch logic here is wrong +} - // distribute partition by rowid or unique column value - dispatchCols := row.Columns - if len(row.Columns) == 0 { - dispatchCols = row.PreColumns - } - value := dispatchCols[row.IndieMarkCol].Value - b, err := json.Marshal(value) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) - } - _, err = hash.Write(b) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) +func (d *defaultDispatcher) Dispatch(row *model.RowChangedEvent) int32 { + if len(row.IndieMarkCol) == 0 { + return d.tbd.Dispatch(row) } - return int32(hash.Sum32() % uint32(d.partitionNum)) + return d.ivd.Dispatch(row) } diff --git a/cdc/sink/dispatcher/default_test.go b/cdc/sink/dispatcher/default_test.go index e7158b319e9..7c6c0b241cd 100644 --- a/cdc/sink/dispatcher/default_test.go +++ b/cdc/sink/dispatcher/default_test.go @@ -36,9 +36,10 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 1, + Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 7}, + }, exceptPartition: 11}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -48,9 +49,10 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 2, + Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 13}, + }, exceptPartition: 1}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -60,9 +62,10 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 3, + Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 11}, + }, exceptPartition: 7}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -72,9 +75,13 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 1, + Flag: model.HandleKeyFlag, + }, + "a": { + Value: 1, }, }, - }, exceptPartition: 7}, + }, exceptPartition: 1}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -84,6 +91,26 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 2, + Flag: model.HandleKeyFlag, + }, + "a": { + Value: 2, + }, + }, + }, exceptPartition: 11}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t2", + }, + IndieMarkCol: "id", + Columns: map[string]*model.Column{ + "id": { + Value: 3, + Flag: model.HandleKeyFlag, + }, + "a": { + Value: 3, }, }, }, exceptPartition: 13}, @@ -96,9 +123,13 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 3, + Flag: model.HandleKeyFlag, + }, + "a": { + Value: 4, }, }, - }, exceptPartition: 11}, + }, exceptPartition: 13}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -133,7 +164,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { }, }, exceptPartition: 3}, } - p := &defaultDispatcher{partitionNum: 16} + p := newDefaultDispatcher(16) for _, tc := range testCases { c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition) } diff --git a/cdc/sink/dispatcher/index_value.go b/cdc/sink/dispatcher/index_value.go new file mode 100644 index 00000000000..bdc463021df --- /dev/null +++ b/cdc/sink/dispatcher/index_value.go @@ -0,0 +1,50 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/hash" +) + +type indexValueDispatcher struct { + partitionNum int32 + hasher *hash.PositionInertia +} + +func newIndexValueDispatcher(partitionNum int32) *indexValueDispatcher { + return &indexValueDispatcher{ + partitionNum: partitionNum, + hasher: hash.NewPositionInertia(), + } +} + +func (r *indexValueDispatcher) Dispatch(row *model.RowChangedEvent) int32 { + r.hasher.Reset() + r.hasher.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) + // FIXME(leoppro): if the row events includes both pre-cols and cols + // the dispatch logic here is wrong + + // distribute partition by rowid or unique column value + dispatchCols := row.Columns + if len(row.Columns) == 0 { + dispatchCols = row.PreColumns + } + for name, col := range dispatchCols { + if col.Flag.IsHandleKey() { + r.hasher.Write([]byte(name), []byte(model.ColumnValueString(col.Value))) + } + } + return int32(r.hasher.Sum32() % uint32(r.partitionNum)) +} diff --git a/cdc/sink/dispatcher/index_value_test.go b/cdc/sink/dispatcher/index_value_test.go new file mode 100644 index 00000000000..da2f56d80fa --- /dev/null +++ b/cdc/sink/dispatcher/index_value_test.go @@ -0,0 +1,147 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "github.com/pingcap/check" + "github.com/pingcap/ticdc/cdc/model" +) + +type IndexValueDispatcherSuite struct{} + +var _ = check.Suite(&IndexValueDispatcherSuite{}) + +func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) { + testCases := []struct { + row *model.RowChangedEvent + exceptPartition int32 + }{ + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t1", + }, + Columns: map[string]*model.Column{ + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 22, + Flag: 0, + }, + }, + }, exceptPartition: 2}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t1", + }, + Columns: map[string]*model.Column{ + "a": { + Value: 22, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 22, + Flag: 0, + }, + }, + }, exceptPartition: 11}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t1", + }, + Columns: map[string]*model.Column{ + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 33, + Flag: 0, + }, + }, + }, exceptPartition: 2}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t2", + }, + Columns: map[string]*model.Column{ + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 22, + Flag: model.HandleKeyFlag, + }, + }, + }, exceptPartition: 5}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t2", + }, + Columns: map[string]*model.Column{ + "b": { + Value: 22, + Flag: model.HandleKeyFlag, + }, + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + }, + }, exceptPartition: 5}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t2", + }, + Columns: map[string]*model.Column{ + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 0, + Flag: model.HandleKeyFlag, + }, + }, + }, exceptPartition: 14}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t2", + }, + Columns: map[string]*model.Column{ + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 33, + Flag: model.HandleKeyFlag, + }, + }, + }, exceptPartition: 2}, + } + p := newIndexValueDispatcher(16) + for _, tc := range testCases { + c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition) + } +} diff --git a/cdc/sink/dispatcher/interface.go b/cdc/sink/dispatcher/interface.go index a7e37ab3c0f..d021281ca20 100644 --- a/cdc/sink/dispatcher/interface.go +++ b/cdc/sink/dispatcher/interface.go @@ -36,6 +36,7 @@ const ( dispatchRuleRowID dispatchRuleTS dispatchRuleTable + dispatchRuleIndexValue ) func (r *dispatchRule) fromString(rule string) { @@ -48,6 +49,8 @@ func (r *dispatchRule) fromString(rule string) { *r = dispatchRuleTS case "table": *r = dispatchRuleTable + case "index-value": + *r = dispatchRuleIndexValue default: *r = dispatchRuleDefault log.Warn("can't support dispatch rule, using default rule", zap.String("rule", rule)) @@ -99,14 +102,14 @@ func NewDispatcher(cfg *config.ReplicaConfig, partitionNum int32) (Dispatcher, e var rule dispatchRule rule.fromString(ruleConfig.Dispatcher) switch rule { - case dispatchRuleRowID: - d = &rowIDDispatcher{partitionNum: partitionNum} + case dispatchRuleRowID, dispatchRuleIndexValue: + d = newIndexValueDispatcher(partitionNum) case dispatchRuleTS: - d = &tsDispatcher{partitionNum: partitionNum} + d = newTsDispatcher(partitionNum) case dispatchRuleTable: - d = &tableDispatcher{partitionNum: partitionNum} + d = newTsDispatcher(partitionNum) case dispatchRuleDefault: - d = &defaultDispatcher{partitionNum: partitionNum} + d = newDefaultDispatcher(partitionNum) } rules = append(rules, struct { Dispatcher diff --git a/cdc/sink/dispatcher/rowid.go b/cdc/sink/dispatcher/rowid.go deleted file mode 100644 index 206ef5357cc..00000000000 --- a/cdc/sink/dispatcher/rowid.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package dispatcher - -import "github.com/pingcap/ticdc/cdc/model" - -type rowIDDispatcher struct { - partitionNum int32 -} - -func (r *rowIDDispatcher) Dispatch(row *model.RowChangedEvent) int32 { - return int32(uint64(row.RowID) % uint64(r.partitionNum)) -} diff --git a/cdc/sink/dispatcher/rowid_test.go b/cdc/sink/dispatcher/rowid_test.go deleted file mode 100644 index 473224b0b87..00000000000 --- a/cdc/sink/dispatcher/rowid_test.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package dispatcher - -import ( - "github.com/pingcap/check" - "github.com/pingcap/ticdc/cdc/model" -) - -type RowIDDispatcherSuite struct{} - -var _ = check.Suite(&RowIDDispatcherSuite{}) - -func (s RowIDDispatcherSuite) TestRowIDDispatcher(c *check.C) { - testCases := []struct { - row *model.RowChangedEvent - exceptPartition int32 - }{ - {row: &model.RowChangedEvent{ - Table: &model.TableName{ - Schema: "test", - Table: "t1", - }, - RowID: 1, - }, exceptPartition: 1}, - {row: &model.RowChangedEvent{ - Table: &model.TableName{ - Schema: "test", - Table: "t1", - }, - RowID: 2, - }, exceptPartition: 2}, - {row: &model.RowChangedEvent{ - Table: &model.TableName{ - Schema: "test", - Table: "t1", - }, - RowID: 3, - }, exceptPartition: 3}, - {row: &model.RowChangedEvent{ - Table: &model.TableName{ - Schema: "test", - Table: "t2", - }, - RowID: 1, - }, exceptPartition: 1}, - {row: &model.RowChangedEvent{ - Table: &model.TableName{ - Schema: "test", - Table: "t2", - }, - RowID: 2, - }, exceptPartition: 2}, - {row: &model.RowChangedEvent{ - Table: &model.TableName{ - Schema: "test", - Table: "t2", - }, - RowID: 88, - }, exceptPartition: 8}, - } - p := &rowIDDispatcher{partitionNum: 16} - for _, tc := range testCases { - c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition) - } -} diff --git a/cdc/sink/dispatcher/switcher_test.go b/cdc/sink/dispatcher/switcher_test.go index 532e9149578..b7f18f55e9a 100644 --- a/cdc/sink/dispatcher/switcher_test.go +++ b/cdc/sink/dispatcher/switcher_test.go @@ -45,7 +45,7 @@ func (s SwitcherSuite) TestSwitcher(c *check.C) { Table: &model.TableName{ Schema: "test", Table: "table1", }, - }), check.FitsTypeOf, &rowIDDispatcher{}) + }), check.FitsTypeOf, &indexValueDispatcher{}) c.Assert(d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{ Table: &model.TableName{ Schema: "sbs", Table: "table2", diff --git a/cdc/sink/dispatcher/table.go b/cdc/sink/dispatcher/table.go index 9ecdbf6e0e7..28d4c04d464 100644 --- a/cdc/sink/dispatcher/table.go +++ b/cdc/sink/dispatcher/table.go @@ -14,27 +14,25 @@ package dispatcher import ( - "hash/crc32" - - "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" - "go.uber.org/zap" + "github.com/pingcap/ticdc/pkg/hash" ) type tableDispatcher struct { partitionNum int32 + hasher *hash.PositionInertia +} + +func newTableDispatcher(partitionNum int32) *tableDispatcher { + return &tableDispatcher{ + partitionNum: partitionNum, + hasher: hash.NewPositionInertia(), + } } func (t *tableDispatcher) Dispatch(row *model.RowChangedEvent) int32 { - hash := crc32.NewIEEE() + t.hasher.Reset() // distribute partition by table - _, err := hash.Write([]byte(row.Table.Schema)) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) - } - _, err = hash.Write([]byte(row.Table.Table)) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) - } - return int32(hash.Sum32() % uint32(t.partitionNum)) + t.hasher.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) + return int32(t.hasher.Sum32() % uint32(t.partitionNum)) } diff --git a/cdc/sink/dispatcher/table_test.go b/cdc/sink/dispatcher/table_test.go index acf0bc0e566..0b818974957 100644 --- a/cdc/sink/dispatcher/table_test.go +++ b/cdc/sink/dispatcher/table_test.go @@ -69,8 +69,15 @@ func (s TableDispatcherSuite) TestTableDispatcher(c *check.C) { }, CommitTs: 3, }, exceptPartition: 5}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t3", + }, + CommitTs: 3, + }, exceptPartition: 3}, } - p := &tableDispatcher{partitionNum: 16} + p := newTableDispatcher(16) for _, tc := range testCases { c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition) } diff --git a/cdc/sink/dispatcher/ts.go b/cdc/sink/dispatcher/ts.go index 97bc23b452a..06d1fb1634a 100644 --- a/cdc/sink/dispatcher/ts.go +++ b/cdc/sink/dispatcher/ts.go @@ -19,6 +19,12 @@ type tsDispatcher struct { partitionNum int32 } +func newTsDispatcher(partitionNum int32) *tsDispatcher { + return &tsDispatcher{ + partitionNum: partitionNum, + } +} + func (t *tsDispatcher) Dispatch(row *model.RowChangedEvent) int32 { return int32(row.CommitTs % uint64(t.partitionNum)) } diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index cbb6284727a..280df315375 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -613,10 +613,10 @@ func (s *mysqlSink) execDMLWithMaxRetries( } for i, query := range sqls { args := values[i] + log.Debug("exec row", zap.String("sql", query), zap.Any("args", args)) if _, err := tx.ExecContext(ctx, query, args...); err != nil { return 0, checkTxnErr(errors.Trace(err)) } - log.Debug("exec row", zap.String("sql", query), zap.Any("args", args)) } if err = tx.Commit(); err != nil { return 0, checkTxnErr(errors.Trace(err)) @@ -677,7 +677,7 @@ func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, if err != nil { return errors.Trace(err) } - log.Debug("show prepareDMLs", zap.Any("rows", rows), zap.Strings("sqls", sqls), zap.Any("values", values)) + log.Debug("prepare DMLs", zap.Any("rows", rows), zap.Strings("sqls", sqls), zap.Any("values", values)) if err := s.execDMLWithMaxRetries(ctx, sqls, values, defaultDMLMaxRetryTime, bucket); err != nil { ts := make([]uint64, 0, len(rows)) for _, row := range rows { @@ -696,6 +696,9 @@ func prepareReplace(schema, table string, cols map[string]*model.Column) (string columnNames := make([]string, 0, len(cols)) args := make([]interface{}, 0, len(cols)) for k, v := range cols { + if v.Flag.IsGeneratedColumn() { + continue + } columnNames = append(columnNames, k) args = append(args, v.Value) } @@ -733,7 +736,7 @@ func prepareDelete(schema, table string, cols map[string]*model.Column) (string, func whereSlice(cols map[string]*model.Column) (colNames []string, args []interface{}) { // Try to use unique key values when available for colName, col := range cols { - if col.WhereHandle == nil || !*col.WhereHandle { + if !col.Flag.IsHandleKey() { continue } colNames = append(colNames, colName) diff --git a/pkg/hash/position_inertia.go b/pkg/hash/position_inertia.go new file mode 100644 index 00000000000..db97202a5d4 --- /dev/null +++ b/pkg/hash/position_inertia.go @@ -0,0 +1,61 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package hash + +import ( + "hash" + "hash/crc32" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +const hashMagicNumber = 0 + +// PositionInertia is a 8-bits hash which is bytes partitions inertia +type PositionInertia struct { + hashValue uint32 + hasher hash.Hash32 +} + +// NewPositionInertia creates a new position inertia algorithm hash builder +func NewPositionInertia() *PositionInertia { + return &PositionInertia{ + hashValue: hashMagicNumber, + hasher: crc32.NewIEEE(), + } +} + +// Write writes the bytes into the PositionInertia +func (p *PositionInertia) Write(bss ...[]byte) { + p.hasher.Reset() + for _, bs := range bss { + _, err := p.hasher.Write(bs) + if err != nil { + log.Fatal("failed to write hash", zap.Error(err)) + } + } + rawHash := p.hasher.Sum32() + p.hashValue ^= rawHash +} + +// Sum32 returns the 32-bits hash +func (p *PositionInertia) Sum32() uint32 { + return p.hashValue +} + +// Reset resets the PositionInertia +func (p *PositionInertia) Reset() { + p.hashValue = hashMagicNumber +} diff --git a/pkg/hash/position_inertia_test.go b/pkg/hash/position_inertia_test.go new file mode 100644 index 00000000000..99de839754f --- /dev/null +++ b/pkg/hash/position_inertia_test.go @@ -0,0 +1,65 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package hash + +import ( + "testing" + + . "github.com/pingcap/check" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&testPositionInertia{}) + +type testPositionInertia struct{} + +func (s *testPositionInertia) TestPositionInertia(c *C) { + hash := NewPositionInertia() + hash.Write([]byte("hello"), []byte("hash")) + hash.Write([]byte("hello"), []byte("pingcap")) + hash.Write([]byte("hello"), []byte("ticdc")) + hash.Write([]byte("hello"), []byte("tools")) + c.Assert(hash.Sum32(), Equals, uint32(0x914505a7)) + + hash.Reset() + hash.Write([]byte("hello"), []byte("pingcap")) + hash.Write([]byte("hello"), []byte("hash")) + hash.Write([]byte("hello"), []byte("tools")) + hash.Write([]byte("hello"), []byte("ticdc")) + c.Assert(hash.Sum32(), Equals, uint32(0x914505a7)) + + hash.Reset() + hash.Write([]byte("hello"), []byte("ticdc")) + hash.Write([]byte("hello"), []byte("hash")) + hash.Write([]byte("hello"), []byte("tools")) + hash.Write([]byte("hello"), []byte("pingcap")) + c.Assert(hash.Sum32(), Equals, uint32(0x914505a7)) + + hash.Reset() + hash.Write([]byte("ticdc"), []byte("hello")) + hash.Write([]byte("hello"), []byte("hash")) + hash.Write([]byte("hello"), []byte("tools")) + hash.Write([]byte("hello"), []byte("pingcap")) + c.Assert(hash.Sum32(), Equals, uint32(0xf21b4f40)) + + hash.Reset() + hash.Write([]byte("ticdc"), []byte("hello")) + hash.Write([]byte("hello"), []byte("hash")) + hash.Write([]byte("tools"), []byte("hello")) + hash.Write([]byte("hello"), []byte("pingcap")) + c.Assert(hash.Sum32(), Equals, uint32(0x7cd6194a)) +} diff --git a/tests/generate_column/data/prepare.sql b/tests/generate_column/data/prepare.sql index 84636a52ac4..789bc4d7446 100644 --- a/tests/generate_column/data/prepare.sql +++ b/tests/generate_column/data/prepare.sql @@ -9,8 +9,8 @@ update t set a = 11 where b = 3; delete from t where b=4; delete from t where a=4; -create table t1 (a int, b int as (a + 1) virtual not null, unique index idx(b)); -insert into t1 (a) values (1),(2), (3),(4),(5),(6),(7); +create table t1 (a int, b int as (a + 1) virtual not null, c int not null, unique index idx1(b), unique index idx2(c)); +insert into t1 (a, c) values (1, 2),(2, 3), (3, 4),(4, 5),(5, 6),(6, 7),(7, 8); update t1 set a = 10 where a = 1; update t1 set a = 11 where b = 3; delete from t1 where b=4;