From 476fca41f8c067a87b9934c0b10232ef494be0df Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 22 Jul 2020 18:05:24 +0800 Subject: [PATCH 01/15] chose one index to output delete events --- cdc/entry/mounter.go | 88 ++++++++--------- cdc/entry/schema_storage.go | 43 ++++++++- cdc/model/sink.go | 59 +++++++++++- cdc/sink/dispatcher/default.go | 31 +----- cdc/sink/dispatcher/default_test.go | 24 +++-- .../dispatcher/{rowid.go => index_value.go} | 19 +++- .../{rowid_test.go => index_value_test.go} | 94 ++++++++++++++++--- cdc/sink/dispatcher/interface.go | 7 +- cdc/sink/dispatcher/switcher_test.go | 2 +- cdc/sink/dispatcher/table.go | 19 +--- cdc/sink/dispatcher/table_test.go | 19 ++-- pkg/hash/position_inertia.go | 23 +++++ pkg/hash/position_inertia_test.go | 62 ++++++++++++ 13 files changed, 364 insertions(+), 126 deletions(-) rename cdc/sink/dispatcher/{rowid.go => index_value.go} (53%) rename cdc/sink/dispatcher/{rowid_test.go => index_value_test.go} (53%) create mode 100644 pkg/hash/position_inertia.go create mode 100644 pkg/hash/position_inertia_test.go diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 454ccfa388a..c668df48b2a 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -18,9 +18,7 @@ import ( "context" "encoding/binary" "encoding/json" - "fmt" "math/rand" - "strconv" "strings" "time" @@ -420,17 +418,53 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*m } } } + err := setHandleKeyFlag(tableInfo, values) + if err != nil { + return nil, errors.Trace(err) + } event.Delete = row.Delete event.Columns = values event.Keys = genMultipleKeys(tableInfo.TableInfo, values, quotes.QuoteSchema(event.Table.Schema, event.Table.Table)) return event, nil } +func setHandleKeyFlag(tableInfo *TableInfo, colValues map[string]*model.Column) error { + switch tableInfo.HandleIndexID { + case -2: + log.Fatal("this table is not a eligible", zap.Int64("tableID", tableInfo.ID)) + case -1: + // pk is handle + if !tableInfo.PKIsHandle { + log.Fatal("the pk of this table is not handle", zap.Int64("tableID", tableInfo.ID)) + } + for _, colInfo := range tableInfo.Columns { + if mysql.HasPriKeyFlag(colInfo.Flag) { + colValues[colInfo.Name.O].Flag.SetIsHandleKey() + break + } + } + default: + handleIndexInfo, exist := tableInfo.GetIndexInfo(tableInfo.HandleIndexID) + if !exist { + return errors.NotFoundf("handle index info(%d) in table(%d)", tableInfo.HandleIndexID, tableInfo.ID) + } + for _, colInfo := range handleIndexInfo.Columns { + colName := tableInfo.Columns[colInfo.Offset].Name.O + colValues[colName].Flag.SetIsHandleKey() + } + } + return nil +} + func (m *mounterImpl) mountIndexKVEntry(tableInfo *TableInfo, idx *indexKVEntry) (*model.RowChangedEvent, error) { // skip set index KV if !idx.Delete { return nil, nil } + // skip the index which not handle + if idx.IndexID != tableInfo.HandleIndexID { + return nil, nil + } indexInfo, exist := tableInfo.GetIndexInfo(idx.IndexID) if !exist { @@ -454,12 +488,14 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *TableInfo, idx *indexKVEntry) return nil, errors.Trace(err) } whereHandle := true - values[idxCol.Name.O] = &model.Column{ + col := &model.Column{ Type: tableInfo.Columns[idxCol.Offset].Tp, WhereHandle: &whereHandle, Value: value, Flag: transColumnFlag(tableInfo.Columns[idxCol.Offset]), } + col.Flag.SetIsHandleKey() + values[idxCol.Name.O] = col } return &model.RowChangedEvent{ StartTs: idx.StartTs, @@ -599,50 +635,6 @@ func genMultipleKeys(ti *timodel.TableInfo, values map[string]*model.Column, tab return multipleKeys } -func columnValue(value interface{}) string { - var data string - switch v := value.(type) { - case nil: - data = "null" - case bool: - if v { - data = "1" - } else { - data = "0" - } - case int: - data = strconv.FormatInt(int64(v), 10) - case int8: - data = strconv.FormatInt(int64(v), 10) - case int16: - data = strconv.FormatInt(int64(v), 10) - case int32: - data = strconv.FormatInt(int64(v), 10) - case int64: - data = strconv.FormatInt(int64(v), 10) - case uint8: - data = strconv.FormatUint(uint64(v), 10) - case uint16: - data = strconv.FormatUint(uint64(v), 10) - case uint32: - data = strconv.FormatUint(uint64(v), 10) - case uint64: - data = strconv.FormatUint(uint64(v), 10) - case float32: - data = strconv.FormatFloat(float64(v), 'f', -1, 32) - case float64: - data = strconv.FormatFloat(float64(v), 'f', -1, 64) - case string: - data = v - case []byte: - data = string(v) - default: - data = fmt.Sprintf("%v", v) - } - - return data -} - func transColumnFlag(col *timodel.ColumnInfo) model.ColumnFlagType { var flag model.ColumnFlagType if col.Charset == "binary" { @@ -659,7 +651,7 @@ func genKeyList(table string, columns []*timodel.ColumnInfo, values map[string]* log.L().Debug("ignore null value", zap.String("column", col.Name.O), zap.String("table", table)) continue // ignore `null` value. } - buf.WriteString(columnValue(val.Value)) + buf.WriteString(model.ColumnValueString(val.Value)) } if buf.Len() == 0 { log.L().Debug("all value are nil, no key generated", zap.String("table", table)) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index a5533cac59a..c46174f55d4 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -206,7 +206,15 @@ type TableInfo struct { columnsOffset map[int64]int indicesOffset map[int64]int uniqueColumns map[int64]struct{} - handleColID int64 + + // only for new row format decoder + handleColID int64 + + // the mounter will chose this index to output delete events + // special value: + // -1 : pk is handle + // -2 : the table is not eligible + HandleIndexID int64 // if the table of this row only has one unique index(includes primary key), // IndieMarkCol will be set to the name of the unique index @@ -224,6 +232,7 @@ func WrapTableInfo(schemaID int64, schemaName string, info *timodel.TableInfo) * indicesOffset: make(map[int64]int, len(info.Indices)), uniqueColumns: make(map[int64]struct{}), handleColID: -1, + HandleIndexID: -2, rowColInfos: make([]rowcodec.ColInfo, len(info.Columns)), } @@ -234,6 +243,7 @@ func WrapTableInfo(schemaID int64, schemaName string, info *timodel.TableInfo) * isPK := (ti.PKIsHandle && mysql.HasPriKeyFlag(col.Flag)) || col.ID == timodel.ExtraHandleID if isPK { ti.handleColID = col.ID + ti.HandleIndexID = -1 ti.uniqueColumns[col.ID] = struct{}{} uniqueIndexNum++ } @@ -265,10 +275,39 @@ func WrapTableInfo(schemaID int64, schemaName string, info *timodel.TableInfo) * } } } - + ti.findHandleIndex() return ti } +func (ti *TableInfo) findHandleIndex() { + if ti.HandleIndexID == -1 { + // pk is handle + return + } + handleIndexOffset := -1 + for i, idx := range ti.Indices { + if !ti.IsIndexUnique(idx) { + continue + } + if idx.Primary { + handleIndexOffset = i + break + } + if handleIndexOffset < 0 { + handleIndexOffset = i + } else { + if len(ti.Indices[handleIndexOffset].Columns) > len(ti.Indices[i].Columns) || + (len(ti.Indices[handleIndexOffset].Columns) == len(ti.Indices[i].Columns) && + ti.Indices[handleIndexOffset].ID > ti.Indices[i].ID) { + handleIndexOffset = i + } + } + } + if handleIndexOffset >= 0 { + ti.HandleIndexID = ti.Indices[handleIndexOffset].ID + } +} + // GetColumnInfo returns the column info by ID func (ti *TableInfo) GetColumnInfo(colID int64) (info *timodel.ColumnInfo, exist bool) { colOffset, exist := ti.columnsOffset[colID] diff --git a/cdc/model/sink.go b/cdc/model/sink.go index fc77fce19a6..22d34951c2b 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -15,6 +15,7 @@ package model import ( "fmt" + "strconv" "github.com/pingcap/log" "github.com/pingcap/parser/model" @@ -42,7 +43,8 @@ type ColumnFlagType util.Flag const ( // BinaryFlag means col charset is binary - BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) + BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) + HandleKeyFlag ColumnFlagType = 1 << ColumnFlagType(iota) ) //SetIsBinary set BinaryFlag @@ -60,6 +62,18 @@ func (b *ColumnFlagType) IsBinary() bool { return (*util.Flag)(b).HasAll(util.Flag(BinaryFlag)) } +func (b *ColumnFlagType) SetIsHandleKey() { + (*util.Flag)(b).Add(util.Flag(HandleKeyFlag)) +} + +func (b *ColumnFlagType) UnsetIsHandleKey() { + (*util.Flag)(b).Remove(util.Flag(HandleKeyFlag)) +} + +func (b *ColumnFlagType) IsHandleKey() bool { + return (*util.Flag)(b).HasAll(util.Flag(HandleKeyFlag)) +} + // TableName represents name of a table, includes table name and schema name. type TableName struct { Schema string `toml:"db-name" json:"db-name"` @@ -117,6 +131,49 @@ type Column struct { Value interface{} `json:"v"` } +func ColumnValueString(c interface{}) string { + var data string + switch v := c.(type) { + case nil: + data = "null" + case bool: + if v { + data = "1" + } else { + data = "0" + } + case int: + data = strconv.FormatInt(int64(v), 10) + case int8: + data = strconv.FormatInt(int64(v), 10) + case int16: + data = strconv.FormatInt(int64(v), 10) + case int32: + data = strconv.FormatInt(int64(v), 10) + case int64: + data = strconv.FormatInt(int64(v), 10) + case uint8: + data = strconv.FormatUint(uint64(v), 10) + case uint16: + data = strconv.FormatUint(uint64(v), 10) + case uint32: + data = strconv.FormatUint(uint64(v), 10) + case uint64: + data = strconv.FormatUint(uint64(v), 10) + case float32: + data = strconv.FormatFloat(float64(v), 'f', -1, 32) + case float64: + data = strconv.FormatFloat(float64(v), 'f', -1, 64) + case string: + data = v + case []byte: + data = string(v) + default: + data = fmt.Sprintf("%v", v) + } + return data +} + // ColumnInfo represents the name and type information passed to the sink type ColumnInfo struct { Name string diff --git a/cdc/sink/dispatcher/default.go b/cdc/sink/dispatcher/default.go index 0c49ec24ec7..76c1906bc79 100644 --- a/cdc/sink/dispatcher/default.go +++ b/cdc/sink/dispatcher/default.go @@ -14,12 +14,7 @@ package dispatcher import ( - "encoding/json" - "hash/crc32" - - "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" - "go.uber.org/zap" ) type defaultDispatcher struct { @@ -27,28 +22,10 @@ type defaultDispatcher struct { } func (d *defaultDispatcher) Dispatch(row *model.RowChangedEvent) int32 { - hash := crc32.NewIEEE() if len(row.IndieMarkCol) == 0 { - // distribute partition by table - _, err := hash.Write([]byte(row.Table.Schema)) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) - } - _, err = hash.Write([]byte(row.Table.Table)) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) - } - return int32(hash.Sum32() % uint32(d.partitionNum)) - } - // distribute partition by rowid or unique column value - value := row.Columns[row.IndieMarkCol].Value - b, err := json.Marshal(value) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) - } - _, err = hash.Write(b) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) + tbd := tableDispatcher{partitionNum: d.partitionNum} + return tbd.Dispatch(row) } - return int32(hash.Sum32() % uint32(d.partitionNum)) + ivd := indexValueDispatcher{partitionNum: d.partitionNum} + return ivd.Dispatch(row) } diff --git a/cdc/sink/dispatcher/default_test.go b/cdc/sink/dispatcher/default_test.go index e7158b319e9..107f5aa6177 100644 --- a/cdc/sink/dispatcher/default_test.go +++ b/cdc/sink/dispatcher/default_test.go @@ -36,9 +36,10 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 1, + Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 7}, + }, exceptPartition: 9}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -48,9 +49,10 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 2, + Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 13}, + }, exceptPartition: 5}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -60,9 +62,10 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 3, + Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 11}, + }, exceptPartition: 1}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -72,9 +75,10 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 1, + Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 7}, + }, exceptPartition: 15}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -84,9 +88,10 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 2, + Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 13}, + }, exceptPartition: 3}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -96,9 +101,10 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Columns: map[string]*model.Column{ "id": { Value: 3, + Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 11}, + }, exceptPartition: 7}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -109,7 +115,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 1, }, }, - }, exceptPartition: 3}, + }, exceptPartition: 14}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -120,7 +126,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 2, }, }, - }, exceptPartition: 3}, + }, exceptPartition: 14}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -131,7 +137,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 3, }, }, - }, exceptPartition: 3}, + }, exceptPartition: 14}, } p := &defaultDispatcher{partitionNum: 16} for _, tc := range testCases { diff --git a/cdc/sink/dispatcher/rowid.go b/cdc/sink/dispatcher/index_value.go similarity index 53% rename from cdc/sink/dispatcher/rowid.go rename to cdc/sink/dispatcher/index_value.go index 206ef5357cc..c8c758452ea 100644 --- a/cdc/sink/dispatcher/rowid.go +++ b/cdc/sink/dispatcher/index_value.go @@ -13,12 +13,23 @@ package dispatcher -import "github.com/pingcap/ticdc/cdc/model" +import ( + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/hash" +) -type rowIDDispatcher struct { +type indexValueDispatcher struct { partitionNum int32 } -func (r *rowIDDispatcher) Dispatch(row *model.RowChangedEvent) int32 { - return int32(uint64(row.RowID) % uint64(r.partitionNum)) +func (r *indexValueDispatcher) Dispatch(row *model.RowChangedEvent) int32 { + var h hash.PositionInertia + h.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) + for name, col := range row.Columns { + if col.Flag.IsHandleKey() { + h.Write([]byte(name), []byte(model.ColumnValueString(col.Value))) + } + } + h ^= h<<4 | h>>4 + return int32(byte(h) % byte(r.partitionNum)) } diff --git a/cdc/sink/dispatcher/rowid_test.go b/cdc/sink/dispatcher/index_value_test.go similarity index 53% rename from cdc/sink/dispatcher/rowid_test.go rename to cdc/sink/dispatcher/index_value_test.go index 473224b0b87..6087257d042 100644 --- a/cdc/sink/dispatcher/rowid_test.go +++ b/cdc/sink/dispatcher/index_value_test.go @@ -32,45 +32,115 @@ func (s RowIDDispatcherSuite) TestRowIDDispatcher(c *check.C) { Schema: "test", Table: "t1", }, - RowID: 1, + Columns: map[string]*model.Column{ + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 22, + Flag: 0, + }, + }, }, exceptPartition: 1}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t1", }, - RowID: 2, - }, exceptPartition: 2}, + Columns: map[string]*model.Column{ + "a": { + Value: 22, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 22, + Flag: 0, + }, + }, + }, exceptPartition: 11}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t1", }, - RowID: 3, - }, exceptPartition: 3}, + Columns: map[string]*model.Column{ + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 33, + Flag: 0, + }, + }, + }, exceptPartition: 1}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, - RowID: 1, - }, exceptPartition: 1}, + Columns: map[string]*model.Column{ + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 22, + Flag: model.HandleKeyFlag, + }, + }, + }, exceptPartition: 5}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t2", + }, + Columns: map[string]*model.Column{ + "b": { + Value: 22, + Flag: model.HandleKeyFlag, + }, + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + }, + }, exceptPartition: 5}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, - RowID: 2, - }, exceptPartition: 2}, + Columns: map[string]*model.Column{ + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 0, + Flag: model.HandleKeyFlag, + }, + }, + }, exceptPartition: 5}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, - RowID: 88, - }, exceptPartition: 8}, + Columns: map[string]*model.Column{ + "a": { + Value: 11, + Flag: model.HandleKeyFlag, + }, + "b": { + Value: 33, + Flag: model.HandleKeyFlag, + }, + }, + }, exceptPartition: 3}, } - p := &rowIDDispatcher{partitionNum: 16} + p := &indexValueDispatcher{partitionNum: 16} for _, tc := range testCases { c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition) } diff --git a/cdc/sink/dispatcher/interface.go b/cdc/sink/dispatcher/interface.go index a7e37ab3c0f..f9eb708dd39 100644 --- a/cdc/sink/dispatcher/interface.go +++ b/cdc/sink/dispatcher/interface.go @@ -36,6 +36,7 @@ const ( dispatchRuleRowID dispatchRuleTS dispatchRuleTable + dispatchRuleIndexValue ) func (r *dispatchRule) fromString(rule string) { @@ -48,6 +49,8 @@ func (r *dispatchRule) fromString(rule string) { *r = dispatchRuleTS case "table": *r = dispatchRuleTable + case "index-value": + *r = dispatchRuleIndexValue default: *r = dispatchRuleDefault log.Warn("can't support dispatch rule, using default rule", zap.String("rule", rule)) @@ -99,8 +102,8 @@ func NewDispatcher(cfg *config.ReplicaConfig, partitionNum int32) (Dispatcher, e var rule dispatchRule rule.fromString(ruleConfig.Dispatcher) switch rule { - case dispatchRuleRowID: - d = &rowIDDispatcher{partitionNum: partitionNum} + case dispatchRuleRowID, dispatchRuleIndexValue: + d = &indexValueDispatcher{partitionNum: partitionNum} case dispatchRuleTS: d = &tsDispatcher{partitionNum: partitionNum} case dispatchRuleTable: diff --git a/cdc/sink/dispatcher/switcher_test.go b/cdc/sink/dispatcher/switcher_test.go index 532e9149578..b7f18f55e9a 100644 --- a/cdc/sink/dispatcher/switcher_test.go +++ b/cdc/sink/dispatcher/switcher_test.go @@ -45,7 +45,7 @@ func (s SwitcherSuite) TestSwitcher(c *check.C) { Table: &model.TableName{ Schema: "test", Table: "table1", }, - }), check.FitsTypeOf, &rowIDDispatcher{}) + }), check.FitsTypeOf, &indexValueDispatcher{}) c.Assert(d.(*dispatcherSwitcher).matchDispatcher(&model.RowChangedEvent{ Table: &model.TableName{ Schema: "sbs", Table: "table2", diff --git a/cdc/sink/dispatcher/table.go b/cdc/sink/dispatcher/table.go index 9ecdbf6e0e7..30b6762fefa 100644 --- a/cdc/sink/dispatcher/table.go +++ b/cdc/sink/dispatcher/table.go @@ -14,11 +14,8 @@ package dispatcher import ( - "hash/crc32" - - "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" - "go.uber.org/zap" + "github.com/pingcap/ticdc/pkg/hash" ) type tableDispatcher struct { @@ -26,15 +23,9 @@ type tableDispatcher struct { } func (t *tableDispatcher) Dispatch(row *model.RowChangedEvent) int32 { - hash := crc32.NewIEEE() + var h hash.PositionInertia // distribute partition by table - _, err := hash.Write([]byte(row.Table.Schema)) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) - } - _, err = hash.Write([]byte(row.Table.Table)) - if err != nil { - log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) - } - return int32(hash.Sum32() % uint32(t.partitionNum)) + h.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) + h ^= h<<4 | h>>4 + return int32(byte(h) % byte(t.partitionNum)) } diff --git a/cdc/sink/dispatcher/table_test.go b/cdc/sink/dispatcher/table_test.go index acf0bc0e566..22eae70a333 100644 --- a/cdc/sink/dispatcher/table_test.go +++ b/cdc/sink/dispatcher/table_test.go @@ -33,42 +33,49 @@ func (s TableDispatcherSuite) TestTableDispatcher(c *check.C) { Table: "t1", }, CommitTs: 1, - }, exceptPartition: 15}, + }, exceptPartition: 10}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t1", }, CommitTs: 2, - }, exceptPartition: 15}, + }, exceptPartition: 10}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t1", }, CommitTs: 3, - }, exceptPartition: 15}, + }, exceptPartition: 10}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, CommitTs: 1, - }, exceptPartition: 5}, + }, exceptPartition: 12}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, CommitTs: 2, - }, exceptPartition: 5}, + }, exceptPartition: 12}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, CommitTs: 3, - }, exceptPartition: 5}, + }, exceptPartition: 12}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t3", + }, + CommitTs: 3, + }, exceptPartition: 14}, } p := &tableDispatcher{partitionNum: 16} for _, tc := range testCases { diff --git a/pkg/hash/position_inertia.go b/pkg/hash/position_inertia.go new file mode 100644 index 00000000000..2a023251c77 --- /dev/null +++ b/pkg/hash/position_inertia.go @@ -0,0 +1,23 @@ +package hash + +type PositionInertia byte + +func (p *PositionInertia) Write(bss ...[]byte) { + var blockHash byte + var i int + for _, bs := range bss { + for _, b := range bs { + blockHash ^= loopLeftMove(b, i) + i += 1 + } + } + *p ^= PositionInertia(blockHash) +} + +func loopLeftMove(source byte, step int) byte { + step %= 8 + if step < 0 { + step += 8 + } + return source>>(8-step) | (source << step) +} diff --git a/pkg/hash/position_inertia_test.go b/pkg/hash/position_inertia_test.go new file mode 100644 index 00000000000..2c93ed858f4 --- /dev/null +++ b/pkg/hash/position_inertia_test.go @@ -0,0 +1,62 @@ +package hash + +import ( + "testing" + + . "github.com/pingcap/check" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&testPositionInertia{}) + +type testPositionInertia struct{} + +func (s *testPositionInertia) TestLoopLeftMove(c *C) { + c.Assert(loopLeftMove(0b11001100, 0), Equals, byte(0b11001100)) + c.Assert(loopLeftMove(0b11001100, 1), Equals, byte(0b10011001)) + c.Assert(loopLeftMove(0b11001100, 2), Equals, byte(0b00110011)) + c.Assert(loopLeftMove(0b11001100, -1), Equals, byte(0b01100110)) + c.Assert(loopLeftMove(0b11001100, 8), Equals, byte(0b11001100)) + c.Assert(loopLeftMove(0b11001100, 13), Equals, byte(0b10011001)) + c.Assert(loopLeftMove(0b11001100, -13), Equals, byte(0b01100110)) +} + +func (s *testPositionInertia) TestPositionInertia(c *C) { + var hash PositionInertia + hash.Write([]byte("hello"), []byte("hash")) + hash.Write([]byte("hello"), []byte("pingcap")) + hash.Write([]byte("hello"), []byte("ticdc")) + hash.Write([]byte("hello"), []byte("tools")) + c.Assert(hash, Equals, PositionInertia(0xef)) + + hash = 0 + hash.Write([]byte("hello"), []byte("pingcap")) + hash.Write([]byte("hello"), []byte("hash")) + hash.Write([]byte("hello"), []byte("tools")) + hash.Write([]byte("hello"), []byte("ticdc")) + c.Assert(hash, Equals, PositionInertia(0xef)) + + hash = 0 + hash.Write([]byte("hello"), []byte("ticdc")) + hash.Write([]byte("hello"), []byte("hash")) + hash.Write([]byte("hello"), []byte("tools")) + hash.Write([]byte("hello"), []byte("pingcap")) + c.Assert(hash, Equals, PositionInertia(0xef)) + + hash = 0 + hash.Write([]byte("ticdc"), []byte("hello")) + hash.Write([]byte("hello"), []byte("hash")) + hash.Write([]byte("hello"), []byte("tools")) + hash.Write([]byte("hello"), []byte("pingcap")) + c.Assert(hash, Equals, PositionInertia(0x40)) + + hash = 0 + hash.Write([]byte("ticdc"), []byte("hello")) + hash.Write([]byte("hello"), []byte("hash")) + hash.Write([]byte("tools"), []byte("hello")) + hash.Write([]byte("hello"), []byte("pingcap")) + c.Assert(hash, Equals, PositionInertia(0x3d)) +} From 483b1b38899a0b4bbdee57366686fb53c5eda69c Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 22 Jul 2020 19:00:22 +0800 Subject: [PATCH 02/15] using handleKey --- cdc/sink/mysql.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 67fef01edd4..31daacf747b 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -755,7 +755,7 @@ func prepareDelete(schema, table string, cols map[string]*model.Column) (string, func whereSlice(cols map[string]*model.Column) (colNames []string, args []interface{}) { // Try to use unique key values when available for colName, col := range cols { - if col.WhereHandle == nil || !*col.WhereHandle { + if !col.Flag.IsHandleKey() { continue } colNames = append(colNames, colName) From 6be08456267c3f28198656ed9fa4e978c61941cb Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 24 Jul 2020 14:48:57 +0800 Subject: [PATCH 03/15] fix the bug about generate column --- cdc/entry/mounter.go | 7 +++++-- cdc/entry/schema_storage.go | 17 +++++++++++++---- cdc/model/sink.go | 17 +++++++++++++++-- cdc/model/sink_test.go | 4 +++- cdc/sink/black_hole.go | 7 ++++--- cdc/sink/mysql.go | 3 +++ tests/generate_column/data/prepare.sql | 4 ++-- 7 files changed, 45 insertions(+), 14 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 692778a723e..da17b0b686f 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -367,7 +367,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*m if !exist { return nil, errors.NotFoundf("column info, colID: %d", index) } - if !row.Delete && !tableInfo.IsColWritable(colInfo) { + if !tableInfo.IsColCDCVisible(colInfo) { continue } colName := colInfo.Name.O @@ -408,7 +408,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*m if !row.Delete { for _, col := range tableInfo.Columns { _, ok := values[col.Name.O] - if !ok && tableInfo.IsColWritable(col) { + if !ok && tableInfo.IsColCDCVisible(col) { column := &model.Column{ Type: col.Tp, Value: getDefaultOrZeroValue(col), @@ -644,6 +644,9 @@ func transColumnFlag(col *timodel.ColumnInfo) model.ColumnFlagType { if col.Charset == "binary" { flag.SetIsBinary() } + if col.IsGenerated() { + flag.SetIsGeneratedColumn() + } return flag } diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index c46174f55d4..c3644880996 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -335,9 +335,13 @@ func (ti *TableInfo) GetRowColInfos() (int64, []rowcodec.ColInfo) { return ti.handleColID, ti.rowColInfos } -// IsColWritable returns is the col is writeable -func (ti *TableInfo) IsColWritable(col *timodel.ColumnInfo) bool { - return col.State == timodel.StatePublic && !col.IsGenerated() +// IsColCDCVisible returns is the col is visible for CDC +func (ti *TableInfo) IsColCDCVisible(col *timodel.ColumnInfo) bool { + // this column is a virtual generated column + if col.IsGenerated() && !col.GeneratedStored { + return false + } + return col.State == timodel.StatePublic } // GetUniqueKeys returns all unique keys of the table as a slice of column names @@ -394,7 +398,12 @@ func (ti *TableInfo) IsIndexUnique(indexInfo *timodel.IndexInfo) bool { } if indexInfo.Unique { for _, col := range indexInfo.Columns { - if !mysql.HasNotNullFlag(ti.Columns[col.Offset].Flag) { + colInfo := ti.Columns[col.Offset] + if !mysql.HasNotNullFlag(colInfo.Flag) { + return false + } + // this column is a virtual generated column + if colInfo.IsGenerated() && !colInfo.GeneratedStored { return false } } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 22d34951c2b..13873b949b1 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -43,8 +43,9 @@ type ColumnFlagType util.Flag const ( // BinaryFlag means col charset is binary - BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) - HandleKeyFlag ColumnFlagType = 1 << ColumnFlagType(iota) + BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) + HandleKeyFlag ColumnFlagType = 1 << ColumnFlagType(iota) + GeneratedColumnFlag ColumnFlagType = 1 << ColumnFlagType(iota) ) //SetIsBinary set BinaryFlag @@ -74,6 +75,18 @@ func (b *ColumnFlagType) IsHandleKey() bool { return (*util.Flag)(b).HasAll(util.Flag(HandleKeyFlag)) } +func (b *ColumnFlagType) SetIsGeneratedColumn() { + (*util.Flag)(b).Add(util.Flag(GeneratedColumnFlag)) +} + +func (b *ColumnFlagType) UnsetIsGeneratedColumn() { + (*util.Flag)(b).Remove(util.Flag(GeneratedColumnFlag)) +} + +func (b *ColumnFlagType) IsGeneratedColumn() bool { + return (*util.Flag)(b).HasAll(util.Flag(GeneratedColumnFlag)) +} + // TableName represents name of a table, includes table name and schema name. type TableName struct { Schema string `toml:"db-name" json:"db-name"` diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index bee71123df4..03f6909f967 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -21,9 +21,11 @@ var _ = check.Suite(&columnFlagTypeSuite{}) func (s *configSuite) TestBinaryFlag(c *check.C) { var flag ColumnFlagType - c.Assert(flag.IsBinary(), check.IsFalse) flag.SetIsBinary() + flag.SetIsGeneratedColumn() c.Assert(flag.IsBinary(), check.IsTrue) + c.Assert(flag.IsHandleKey(), check.IsFalse) + c.Assert(flag.IsGeneratedColumn(), check.IsTrue) flag.UnsetIsBinary() c.Assert(flag.IsBinary(), check.IsFalse) } diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 70ac9155444..0c2ae2bc17f 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -44,13 +44,14 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model zap.Uint64("CommitTs", row.CommitTs), zap.Uint64("checkpointTs", checkpointTs)) } + log.Debug("BlockHoleSink: EmitRowChangedEvents", zap.Any("row", row)) } atomic.AddUint64(&b.accumulated, uint64(len(rows))) return nil } func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) error { - log.Info("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs)) + log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs)) err := b.statistics.RecordBatchExecution(func() (int, error) { // TODO: add some random replication latency accumulated := atomic.LoadUint64(&b.accumulated) @@ -64,12 +65,12 @@ func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs ui } func (b *blackHoleSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { - log.Info("BlockHoleSink: Checkpoint Event", zap.Uint64("ts", ts)) + log.Debug("BlockHoleSink: Checkpoint Event", zap.Uint64("ts", ts)) return nil } func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - log.Info("BlockHoleSink: DDL Event", zap.Any("ddl", ddl)) + log.Debug("BlockHoleSink: DDL Event", zap.Any("ddl", ddl)) return nil } diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 31daacf747b..7f3ccaba328 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -718,6 +718,9 @@ func prepareReplace(schema, table string, cols map[string]*model.Column) (string columnNames := make([]string, 0, len(cols)) args := make([]interface{}, 0, len(cols)) for k, v := range cols { + if v.Flag.IsGeneratedColumn() { + continue + } columnNames = append(columnNames, k) args = append(args, v.Value) } diff --git a/tests/generate_column/data/prepare.sql b/tests/generate_column/data/prepare.sql index 84636a52ac4..52d5c1d8a68 100644 --- a/tests/generate_column/data/prepare.sql +++ b/tests/generate_column/data/prepare.sql @@ -9,8 +9,8 @@ update t set a = 11 where b = 3; delete from t where b=4; delete from t where a=4; -create table t1 (a int, b int as (a + 1) virtual not null, unique index idx(b)); -insert into t1 (a) values (1),(2), (3),(4),(5),(6),(7); +create table t1 (a int, b int as (a + 1) virtual not null, c int not null, unique index idx(b), unique index idx(c)); +insert into t1 (a, c) values (1, 2),(2, 3), (3, 4),(4, 5),(5, 6),(6, 7),(7, 8); update t1 set a = 10 where a = 1; update t1 set a = 11 where b = 3; delete from t1 where b=4; From b6d95a2a2d9ce031cc6e401774a0af03a754c334 Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 24 Jul 2020 14:54:53 +0800 Subject: [PATCH 04/15] fix check --- cdc/model/sink.go | 15 ++++++++++++--- pkg/hash/position_inertia.go | 14 ++++++++++++++ pkg/hash/position_inertia_test.go | 13 +++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 13873b949b1..3eca782fe07 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -42,9 +42,11 @@ const ( type ColumnFlagType util.Flag const ( - // BinaryFlag means col charset is binary - BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) - HandleKeyFlag ColumnFlagType = 1 << ColumnFlagType(iota) + // BinaryFlag means the column charset is binary + BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) + // HandleKeyFlag means the column selected as the handle key + HandleKeyFlag ColumnFlagType = 1 << ColumnFlagType(iota) + // GeneratedColumnFlag means the column is a generated column GeneratedColumnFlag ColumnFlagType = 1 << ColumnFlagType(iota) ) @@ -63,26 +65,32 @@ func (b *ColumnFlagType) IsBinary() bool { return (*util.Flag)(b).HasAll(util.Flag(BinaryFlag)) } +//SetIsHandleKey set HandleKey func (b *ColumnFlagType) SetIsHandleKey() { (*util.Flag)(b).Add(util.Flag(HandleKeyFlag)) } +//UnsetIsHandleKey unset HandleKey func (b *ColumnFlagType) UnsetIsHandleKey() { (*util.Flag)(b).Remove(util.Flag(HandleKeyFlag)) } +//IsHandleKey show whether HandleKey is set func (b *ColumnFlagType) IsHandleKey() bool { return (*util.Flag)(b).HasAll(util.Flag(HandleKeyFlag)) } +//SetIsGeneratedColumn set GeneratedColumn func (b *ColumnFlagType) SetIsGeneratedColumn() { (*util.Flag)(b).Add(util.Flag(GeneratedColumnFlag)) } +//UnsetIsGeneratedColumn unset GeneratedColumn func (b *ColumnFlagType) UnsetIsGeneratedColumn() { (*util.Flag)(b).Remove(util.Flag(GeneratedColumnFlag)) } +//IsGeneratedColumn show whether GeneratedColumn is set func (b *ColumnFlagType) IsGeneratedColumn() bool { return (*util.Flag)(b).HasAll(util.Flag(GeneratedColumnFlag)) } @@ -144,6 +152,7 @@ type Column struct { Value interface{} `json:"v"` } +// ColumnValueString returns a string representation of the column value func ColumnValueString(c interface{}) string { var data string switch v := c.(type) { diff --git a/pkg/hash/position_inertia.go b/pkg/hash/position_inertia.go index 2a023251c77..5f200320abf 100644 --- a/pkg/hash/position_inertia.go +++ b/pkg/hash/position_inertia.go @@ -1,5 +1,19 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package hash +// PositionInertia is a 8-bits hash type PositionInertia byte func (p *PositionInertia) Write(bss ...[]byte) { diff --git a/pkg/hash/position_inertia_test.go b/pkg/hash/position_inertia_test.go index 2c93ed858f4..4a60ce84fe4 100644 --- a/pkg/hash/position_inertia_test.go +++ b/pkg/hash/position_inertia_test.go @@ -1,3 +1,16 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package hash import ( From 67975bbd69cb508911d54bde4cd8a22cbd541273 Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 24 Jul 2020 14:59:15 +0800 Subject: [PATCH 05/15] fix test --- tests/generate_column/data/prepare.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/generate_column/data/prepare.sql b/tests/generate_column/data/prepare.sql index 52d5c1d8a68..789bc4d7446 100644 --- a/tests/generate_column/data/prepare.sql +++ b/tests/generate_column/data/prepare.sql @@ -9,7 +9,7 @@ update t set a = 11 where b = 3; delete from t where b=4; delete from t where a=4; -create table t1 (a int, b int as (a + 1) virtual not null, c int not null, unique index idx(b), unique index idx(c)); +create table t1 (a int, b int as (a + 1) virtual not null, c int not null, unique index idx1(b), unique index idx2(c)); insert into t1 (a, c) values (1, 2),(2, 3), (3, 4),(4, 5),(5, 6),(6, 7),(7, 8); update t1 set a = 10 where a = 1; update t1 set a = 11 where b = 3; From 47cbf5c01682121596ede6e9fe13205ad1834b32 Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 24 Jul 2020 15:55:05 +0800 Subject: [PATCH 06/15] update comment --- cdc/model/sink.go | 4 +++- pkg/hash/position_inertia.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 3eca782fe07..c7ce11b2531 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -146,7 +146,9 @@ type RowChangedEvent struct { // Column represents a column value in row changed event type Column struct { - Type byte `json:"t"` + Type byte `json:"t"` + // WhereHandle is deprecation + // WhereHandle is instead by HandleKey in Flag WhereHandle *bool `json:"h,omitempty"` Flag ColumnFlagType `json:"f"` Value interface{} `json:"v"` diff --git a/pkg/hash/position_inertia.go b/pkg/hash/position_inertia.go index 5f200320abf..0486469b254 100644 --- a/pkg/hash/position_inertia.go +++ b/pkg/hash/position_inertia.go @@ -13,7 +13,7 @@ package hash -// PositionInertia is a 8-bits hash +// PositionInertia is a 8-bits hash which is bytes partitions inertia type PositionInertia byte func (p *PositionInertia) Write(bss ...[]byte) { From 63abc43f6916f555c0db4696063e200ec89209a5 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 27 Jul 2020 11:05:54 +0800 Subject: [PATCH 07/15] fmt --- cdc/entry/schema_storage.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index b0dfc76bc67..904ac3e0ec8 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -201,12 +201,12 @@ func (s *schemaSnapshot) Clone() *schemaSnapshot { // TableInfo provides meta data describing a DB table. type TableInfo struct { *timodel.TableInfo - SchemaID int64 - TableName model.TableName + SchemaID int64 + TableName model.TableName TableInfoVersion uint64 - columnsOffset map[int64]int - indicesOffset map[int64]int - uniqueColumns map[int64]struct{} + columnsOffset map[int64]int + indicesOffset map[int64]int + uniqueColumns map[int64]struct{} // only for new row format decoder handleColID int64 @@ -226,16 +226,16 @@ type TableInfo struct { // WrapTableInfo creates a TableInfo from a timodel.TableInfo func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *timodel.TableInfo) *TableInfo { ti := &TableInfo{ - TableInfo: info, - SchemaID: schemaID, - TableName: model.TableName{Schema: schemaName, Table: info.Name.O}, + TableInfo: info, + SchemaID: schemaID, + TableName: model.TableName{Schema: schemaName, Table: info.Name.O}, TableInfoVersion: version, - columnsOffset: make(map[int64]int, len(info.Columns)), - indicesOffset: make(map[int64]int, len(info.Indices)), - uniqueColumns: make(map[int64]struct{}), - handleColID: -1, - HandleIndexID: -2, - rowColInfos: make([]rowcodec.ColInfo, len(info.Columns)), + columnsOffset: make(map[int64]int, len(info.Columns)), + indicesOffset: make(map[int64]int, len(info.Indices)), + uniqueColumns: make(map[int64]struct{}), + handleColID: -1, + HandleIndexID: -2, + rowColInfos: make([]rowcodec.ColInfo, len(info.Columns)), } uniqueIndexNum := 0 From 46ad65593c3652d09e2926dab0b70d452d2a447a Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 28 Jul 2020 19:17:18 +0800 Subject: [PATCH 08/15] update --- cdc/entry/mounter.go | 6 +++--- cdc/entry/schema_storage.go | 15 ++++++++++----- cdc/model/sink.go | 4 ++-- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 3d11d97ffb5..9fa60e0af2e 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -352,7 +352,7 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) { } func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) { - if row.Delete && !tableInfo.PKIsHandle { + if row.Delete && tableInfo.HandleIndexID != HandleIndexPKIsHandle { return nil, nil } @@ -433,9 +433,9 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*m func setHandleKeyFlag(tableInfo *TableInfo, colValues map[string]*model.Column) error { switch tableInfo.HandleIndexID { - case -2: + case HandleIndexTableIneligible: log.Fatal("this table is not a eligible", zap.Int64("tableID", tableInfo.ID)) - case -1: + case HandleIndexPKIsHandle: // pk is handle if !tableInfo.PKIsHandle { log.Fatal("the pk of this table is not handle", zap.Int64("tableID", tableInfo.ID)) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 904ac3e0ec8..b02a5f4d8ea 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -198,6 +198,11 @@ func (s *schemaSnapshot) Clone() *schemaSnapshot { return n } +const ( + HandleIndexPKIsHandle = -1 + HandleIndexTableIneligible = -2 +) + // TableInfo provides meta data describing a DB table. type TableInfo struct { *timodel.TableInfo @@ -213,8 +218,8 @@ type TableInfo struct { // the mounter will chose this index to output delete events // special value: - // -1 : pk is handle - // -2 : the table is not eligible + // HandleIndexPKIsHandle(-1) : pk is handle + // HandleIndexTableIneligible(-2) : the table is not eligible HandleIndexID int64 // if the table of this row only has one unique index(includes primary key), @@ -234,7 +239,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *timo indicesOffset: make(map[int64]int, len(info.Indices)), uniqueColumns: make(map[int64]struct{}), handleColID: -1, - HandleIndexID: -2, + HandleIndexID: HandleIndexTableIneligible, rowColInfos: make([]rowcodec.ColInfo, len(info.Columns)), } @@ -245,7 +250,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *timo isPK := (ti.PKIsHandle && mysql.HasPriKeyFlag(col.Flag)) || col.ID == timodel.ExtraHandleID if isPK { ti.handleColID = col.ID - ti.HandleIndexID = -1 + ti.HandleIndexID = HandleIndexPKIsHandle ti.uniqueColumns[col.ID] = struct{}{} uniqueIndexNum++ } @@ -282,7 +287,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *timo } func (ti *TableInfo) findHandleIndex() { - if ti.HandleIndexID == -1 { + if ti.HandleIndexID == HandleIndexPKIsHandle { // pk is handle return } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index ee69786e54c..f45db0e93dd 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -45,9 +45,9 @@ const ( // BinaryFlag means the column charset is binary BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) // HandleKeyFlag means the column selected as the handle key - HandleKeyFlag ColumnFlagType = 1 << ColumnFlagType(iota) + HandleKeyFlag // GeneratedColumnFlag means the column is a generated column - GeneratedColumnFlag ColumnFlagType = 1 << ColumnFlagType(iota) + GeneratedColumnFlag ) //SetIsBinary set BinaryFlag From 5ea7591aa0904d6c0324aae00dbdf424a104a89a Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 28 Jul 2020 19:21:42 +0800 Subject: [PATCH 09/15] fix check --- cdc/entry/schema_storage.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index b02a5f4d8ea..8b506822764 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -199,7 +199,9 @@ func (s *schemaSnapshot) Clone() *schemaSnapshot { } const ( - HandleIndexPKIsHandle = -1 + // HandleIndexPKIsHandle represents the handle index is pk and the pk is handle + HandleIndexPKIsHandle = -1 + // HandleIndexTableIneligible represents the table is ineligible HandleIndexTableIneligible = -2 ) From 8c0cb5494d54ebbdbf2df65b9530b8594ee7d396 Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 29 Jul 2020 12:13:52 +0800 Subject: [PATCH 10/15] new hash algorithm --- cdc/sink/dispatcher/default.go | 16 ++++++--- cdc/sink/dispatcher/default_test.go | 45 +++++++++++++++++------ cdc/sink/dispatcher/index_value.go | 17 ++++++--- cdc/sink/dispatcher/index_value_test.go | 20 +++++------ cdc/sink/dispatcher/interface.go | 8 ++--- cdc/sink/dispatcher/table.go | 15 +++++--- cdc/sink/dispatcher/table_test.go | 16 ++++----- cdc/sink/dispatcher/ts.go | 6 ++++ pkg/hash/position_inertia.go | 48 ++++++++++++++++++------- pkg/hash/position_inertia_test.go | 30 ++++++---------- 10 files changed, 143 insertions(+), 78 deletions(-) diff --git a/cdc/sink/dispatcher/default.go b/cdc/sink/dispatcher/default.go index 76c1906bc79..8a04196b587 100644 --- a/cdc/sink/dispatcher/default.go +++ b/cdc/sink/dispatcher/default.go @@ -19,13 +19,21 @@ import ( type defaultDispatcher struct { partitionNum int32 + tbd *tableDispatcher + ivd *indexValueDispatcher +} + +func newDefaultDispatcher(partitionNum int32) *defaultDispatcher { + return &defaultDispatcher{ + partitionNum: partitionNum, + tbd: newTableDispatcher(partitionNum), + ivd: newIndexValueDispatcher(partitionNum), + } } func (d *defaultDispatcher) Dispatch(row *model.RowChangedEvent) int32 { if len(row.IndieMarkCol) == 0 { - tbd := tableDispatcher{partitionNum: d.partitionNum} - return tbd.Dispatch(row) + return d.tbd.Dispatch(row) } - ivd := indexValueDispatcher{partitionNum: d.partitionNum} - return ivd.Dispatch(row) + return d.ivd.Dispatch(row) } diff --git a/cdc/sink/dispatcher/default_test.go b/cdc/sink/dispatcher/default_test.go index 107f5aa6177..7c1a5a5707b 100644 --- a/cdc/sink/dispatcher/default_test.go +++ b/cdc/sink/dispatcher/default_test.go @@ -39,7 +39,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 9}, + }, exceptPartition: 15}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -52,7 +52,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 5}, + }, exceptPartition: 4}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -65,7 +65,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 1}, + }, exceptPartition: 2}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -77,8 +77,11 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 1, Flag: model.HandleKeyFlag, }, + "a": { + Value: 1, + }, }, - }, exceptPartition: 15}, + }, exceptPartition: 4}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -90,8 +93,11 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 2, Flag: model.HandleKeyFlag, }, + "a": { + Value: 2, + }, }, - }, exceptPartition: 3}, + }, exceptPartition: 15}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -103,8 +109,27 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 3, Flag: model.HandleKeyFlag, }, + "a": { + Value: 3, + }, }, - }, exceptPartition: 7}, + }, exceptPartition: 9}, + {row: &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "t2", + }, + IndieMarkCol: "id", + Columns: map[string]*model.Column{ + "id": { + Value: 3, + Flag: model.HandleKeyFlag, + }, + "a": { + Value: 4, + }, + }, + }, exceptPartition: 9}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -115,7 +140,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 1, }, }, - }, exceptPartition: 14}, + }, exceptPartition: 1}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -126,7 +151,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 2, }, }, - }, exceptPartition: 14}, + }, exceptPartition: 1}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -137,9 +162,9 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 3, }, }, - }, exceptPartition: 14}, + }, exceptPartition: 1}, } - p := &defaultDispatcher{partitionNum: 16} + p := newDefaultDispatcher(16) for _, tc := range testCases { c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition) } diff --git a/cdc/sink/dispatcher/index_value.go b/cdc/sink/dispatcher/index_value.go index c8c758452ea..05c8e12bee6 100644 --- a/cdc/sink/dispatcher/index_value.go +++ b/cdc/sink/dispatcher/index_value.go @@ -20,16 +20,23 @@ import ( type indexValueDispatcher struct { partitionNum int32 + hasher *hash.PositionInertia +} + +func newIndexValueDispatcher(partitionNum int32) *indexValueDispatcher { + return &indexValueDispatcher{ + partitionNum: partitionNum, + hasher: hash.NewPositionInertia(), + } } func (r *indexValueDispatcher) Dispatch(row *model.RowChangedEvent) int32 { - var h hash.PositionInertia - h.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) + r.hasher.Reset() + r.hasher.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) for name, col := range row.Columns { if col.Flag.IsHandleKey() { - h.Write([]byte(name), []byte(model.ColumnValueString(col.Value))) + r.hasher.Write([]byte(name), []byte(model.ColumnValueString(col.Value))) } } - h ^= h<<4 | h>>4 - return int32(byte(h) % byte(r.partitionNum)) + return int32(r.hasher.Sum8() % byte(r.partitionNum)) } diff --git a/cdc/sink/dispatcher/index_value_test.go b/cdc/sink/dispatcher/index_value_test.go index 6087257d042..2d7178d94eb 100644 --- a/cdc/sink/dispatcher/index_value_test.go +++ b/cdc/sink/dispatcher/index_value_test.go @@ -18,11 +18,11 @@ import ( "github.com/pingcap/ticdc/cdc/model" ) -type RowIDDispatcherSuite struct{} +type IndexValueDispatcherSuite struct{} -var _ = check.Suite(&RowIDDispatcherSuite{}) +var _ = check.Suite(&IndexValueDispatcherSuite{}) -func (s RowIDDispatcherSuite) TestRowIDDispatcher(c *check.C) { +func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) { testCases := []struct { row *model.RowChangedEvent exceptPartition int32 @@ -42,7 +42,7 @@ func (s RowIDDispatcherSuite) TestRowIDDispatcher(c *check.C) { Flag: 0, }, }, - }, exceptPartition: 1}, + }, exceptPartition: 8}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -58,7 +58,7 @@ func (s RowIDDispatcherSuite) TestRowIDDispatcher(c *check.C) { Flag: 0, }, }, - }, exceptPartition: 11}, + }, exceptPartition: 5}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -74,7 +74,7 @@ func (s RowIDDispatcherSuite) TestRowIDDispatcher(c *check.C) { Flag: 0, }, }, - }, exceptPartition: 1}, + }, exceptPartition: 8}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -90,7 +90,7 @@ func (s RowIDDispatcherSuite) TestRowIDDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 5}, + }, exceptPartition: 9}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -106,7 +106,7 @@ func (s RowIDDispatcherSuite) TestRowIDDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 5}, + }, exceptPartition: 9}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -138,9 +138,9 @@ func (s RowIDDispatcherSuite) TestRowIDDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 3}, + }, exceptPartition: 13}, } - p := &indexValueDispatcher{partitionNum: 16} + p := newIndexValueDispatcher(16) for _, tc := range testCases { c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition) } diff --git a/cdc/sink/dispatcher/interface.go b/cdc/sink/dispatcher/interface.go index f9eb708dd39..d021281ca20 100644 --- a/cdc/sink/dispatcher/interface.go +++ b/cdc/sink/dispatcher/interface.go @@ -103,13 +103,13 @@ func NewDispatcher(cfg *config.ReplicaConfig, partitionNum int32) (Dispatcher, e rule.fromString(ruleConfig.Dispatcher) switch rule { case dispatchRuleRowID, dispatchRuleIndexValue: - d = &indexValueDispatcher{partitionNum: partitionNum} + d = newIndexValueDispatcher(partitionNum) case dispatchRuleTS: - d = &tsDispatcher{partitionNum: partitionNum} + d = newTsDispatcher(partitionNum) case dispatchRuleTable: - d = &tableDispatcher{partitionNum: partitionNum} + d = newTsDispatcher(partitionNum) case dispatchRuleDefault: - d = &defaultDispatcher{partitionNum: partitionNum} + d = newDefaultDispatcher(partitionNum) } rules = append(rules, struct { Dispatcher diff --git a/cdc/sink/dispatcher/table.go b/cdc/sink/dispatcher/table.go index 30b6762fefa..2922a2bec66 100644 --- a/cdc/sink/dispatcher/table.go +++ b/cdc/sink/dispatcher/table.go @@ -20,12 +20,19 @@ import ( type tableDispatcher struct { partitionNum int32 + hasher *hash.PositionInertia +} + +func newTableDispatcher(partitionNum int32) *tableDispatcher { + return &tableDispatcher{ + partitionNum: partitionNum, + hasher: hash.NewPositionInertia(), + } } func (t *tableDispatcher) Dispatch(row *model.RowChangedEvent) int32 { - var h hash.PositionInertia + t.hasher.Reset() // distribute partition by table - h.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) - h ^= h<<4 | h>>4 - return int32(byte(h) % byte(t.partitionNum)) + t.hasher.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) + return int32(t.hasher.Sum8() % byte(t.partitionNum)) } diff --git a/cdc/sink/dispatcher/table_test.go b/cdc/sink/dispatcher/table_test.go index 22eae70a333..9be85c7962a 100644 --- a/cdc/sink/dispatcher/table_test.go +++ b/cdc/sink/dispatcher/table_test.go @@ -33,51 +33,51 @@ func (s TableDispatcherSuite) TestTableDispatcher(c *check.C) { Table: "t1", }, CommitTs: 1, - }, exceptPartition: 10}, + }, exceptPartition: 12}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t1", }, CommitTs: 2, - }, exceptPartition: 10}, + }, exceptPartition: 12}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t1", }, CommitTs: 3, - }, exceptPartition: 10}, + }, exceptPartition: 12}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, CommitTs: 1, - }, exceptPartition: 12}, + }, exceptPartition: 7}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, CommitTs: 2, - }, exceptPartition: 12}, + }, exceptPartition: 7}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, CommitTs: 3, - }, exceptPartition: 12}, + }, exceptPartition: 7}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t3", }, CommitTs: 3, - }, exceptPartition: 14}, + }, exceptPartition: 1}, } - p := &tableDispatcher{partitionNum: 16} + p := newTableDispatcher(16) for _, tc := range testCases { c.Assert(p.Dispatch(tc.row), check.Equals, tc.exceptPartition) } diff --git a/cdc/sink/dispatcher/ts.go b/cdc/sink/dispatcher/ts.go index 97bc23b452a..06d1fb1634a 100644 --- a/cdc/sink/dispatcher/ts.go +++ b/cdc/sink/dispatcher/ts.go @@ -19,6 +19,12 @@ type tsDispatcher struct { partitionNum int32 } +func newTsDispatcher(partitionNum int32) *tsDispatcher { + return &tsDispatcher{ + partitionNum: partitionNum, + } +} + func (t *tsDispatcher) Dispatch(row *model.RowChangedEvent) int32 { return int32(row.CommitTs % uint64(t.partitionNum)) } diff --git a/pkg/hash/position_inertia.go b/pkg/hash/position_inertia.go index 0486469b254..62dfac5e4ed 100644 --- a/pkg/hash/position_inertia.go +++ b/pkg/hash/position_inertia.go @@ -13,25 +13,47 @@ package hash +import ( + "hash" + "hash/crc32" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +const hashMagicNumber = 0x6A + // PositionInertia is a 8-bits hash which is bytes partitions inertia -type PositionInertia byte +type PositionInertia struct { + hashValue byte + hasher hash.Hash32 +} + +func NewPositionInertia() *PositionInertia { + return &PositionInertia{ + hashValue: hashMagicNumber, + hasher: crc32.NewIEEE(), + } +} func (p *PositionInertia) Write(bss ...[]byte) { - var blockHash byte - var i int + p.hasher.Reset() for _, bs := range bss { - for _, b := range bs { - blockHash ^= loopLeftMove(b, i) - i += 1 + _, err := p.hasher.Write(bs) + if err != nil { + log.Fatal("failed to write hash", zap.Error(err)) } } - *p ^= PositionInertia(blockHash) + rawHash := p.hasher.Sum32() + rawHash ^= rawHash >> 16 + rawHash ^= rawHash >> 8 + p.hashValue ^= byte(rawHash) } -func loopLeftMove(source byte, step int) byte { - step %= 8 - if step < 0 { - step += 8 - } - return source>>(8-step) | (source << step) +func (p *PositionInertia) Sum8() byte { + return p.hashValue +} + +func (p *PositionInertia) Reset() { + p.hashValue = hashMagicNumber } diff --git a/pkg/hash/position_inertia_test.go b/pkg/hash/position_inertia_test.go index 4a60ce84fe4..83fb6c603c5 100644 --- a/pkg/hash/position_inertia_test.go +++ b/pkg/hash/position_inertia_test.go @@ -27,49 +27,39 @@ var _ = Suite(&testPositionInertia{}) type testPositionInertia struct{} -func (s *testPositionInertia) TestLoopLeftMove(c *C) { - c.Assert(loopLeftMove(0b11001100, 0), Equals, byte(0b11001100)) - c.Assert(loopLeftMove(0b11001100, 1), Equals, byte(0b10011001)) - c.Assert(loopLeftMove(0b11001100, 2), Equals, byte(0b00110011)) - c.Assert(loopLeftMove(0b11001100, -1), Equals, byte(0b01100110)) - c.Assert(loopLeftMove(0b11001100, 8), Equals, byte(0b11001100)) - c.Assert(loopLeftMove(0b11001100, 13), Equals, byte(0b10011001)) - c.Assert(loopLeftMove(0b11001100, -13), Equals, byte(0b01100110)) -} - func (s *testPositionInertia) TestPositionInertia(c *C) { - var hash PositionInertia + hash := NewPositionInertia() hash.Write([]byte("hello"), []byte("hash")) hash.Write([]byte("hello"), []byte("pingcap")) hash.Write([]byte("hello"), []byte("ticdc")) hash.Write([]byte("hello"), []byte("tools")) - c.Assert(hash, Equals, PositionInertia(0xef)) + c.Assert(hash.Sum8(), Equals, byte(0x1c)) - hash = 0 + hash.Reset() hash.Write([]byte("hello"), []byte("pingcap")) hash.Write([]byte("hello"), []byte("hash")) hash.Write([]byte("hello"), []byte("tools")) hash.Write([]byte("hello"), []byte("ticdc")) - c.Assert(hash, Equals, PositionInertia(0xef)) + c.Assert(hash.Sum8(), Equals, byte(0x1c)) - hash = 0 + hash.Reset() hash.Write([]byte("hello"), []byte("ticdc")) hash.Write([]byte("hello"), []byte("hash")) hash.Write([]byte("hello"), []byte("tools")) hash.Write([]byte("hello"), []byte("pingcap")) - c.Assert(hash, Equals, PositionInertia(0xef)) + c.Assert(hash.Sum8(), Equals, byte(0x1c)) - hash = 0 + hash.Reset() hash.Write([]byte("ticdc"), []byte("hello")) hash.Write([]byte("hello"), []byte("hash")) hash.Write([]byte("hello"), []byte("tools")) hash.Write([]byte("hello"), []byte("pingcap")) - c.Assert(hash, Equals, PositionInertia(0x40)) + c.Assert(hash.Sum8(), Equals, byte(0x8c)) - hash = 0 + hash.Reset() hash.Write([]byte("ticdc"), []byte("hello")) hash.Write([]byte("hello"), []byte("hash")) hash.Write([]byte("tools"), []byte("hello")) hash.Write([]byte("hello"), []byte("pingcap")) - c.Assert(hash, Equals, PositionInertia(0x3d)) + c.Assert(hash.Sum8(), Equals, byte(0x93)) } From 082a7ab0309c9a84cea3699a8057668900da8526 Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 29 Jul 2020 14:35:20 +0800 Subject: [PATCH 11/15] new hash algorithm --- cdc/sink/dispatcher/default_test.go | 20 ++++++++++---------- cdc/sink/dispatcher/index_value.go | 2 +- cdc/sink/dispatcher/index_value_test.go | 14 +++++++------- cdc/sink/dispatcher/table.go | 2 +- cdc/sink/dispatcher/table_test.go | 14 +++++++------- pkg/hash/position_inertia.go | 10 ++++------ pkg/hash/position_inertia_test.go | 10 +++++----- 7 files changed, 35 insertions(+), 37 deletions(-) diff --git a/cdc/sink/dispatcher/default_test.go b/cdc/sink/dispatcher/default_test.go index 7c1a5a5707b..7c6c0b241cd 100644 --- a/cdc/sink/dispatcher/default_test.go +++ b/cdc/sink/dispatcher/default_test.go @@ -39,7 +39,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 15}, + }, exceptPartition: 11}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -52,7 +52,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 4}, + }, exceptPartition: 1}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -65,7 +65,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 2}, + }, exceptPartition: 7}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -81,7 +81,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 1, }, }, - }, exceptPartition: 4}, + }, exceptPartition: 1}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -97,7 +97,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 2, }, }, - }, exceptPartition: 15}, + }, exceptPartition: 11}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -113,7 +113,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 3, }, }, - }, exceptPartition: 9}, + }, exceptPartition: 13}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -129,7 +129,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 4, }, }, - }, exceptPartition: 9}, + }, exceptPartition: 13}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -140,7 +140,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 1, }, }, - }, exceptPartition: 1}, + }, exceptPartition: 3}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -151,7 +151,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 2, }, }, - }, exceptPartition: 1}, + }, exceptPartition: 3}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -162,7 +162,7 @@ func (s DefaultDispatcherSuite) TestDefaultDispatcher(c *check.C) { Value: 3, }, }, - }, exceptPartition: 1}, + }, exceptPartition: 3}, } p := newDefaultDispatcher(16) for _, tc := range testCases { diff --git a/cdc/sink/dispatcher/index_value.go b/cdc/sink/dispatcher/index_value.go index 05c8e12bee6..15fc639b4f4 100644 --- a/cdc/sink/dispatcher/index_value.go +++ b/cdc/sink/dispatcher/index_value.go @@ -38,5 +38,5 @@ func (r *indexValueDispatcher) Dispatch(row *model.RowChangedEvent) int32 { r.hasher.Write([]byte(name), []byte(model.ColumnValueString(col.Value))) } } - return int32(r.hasher.Sum8() % byte(r.partitionNum)) + return int32(r.hasher.Sum32() % uint32(r.partitionNum)) } diff --git a/cdc/sink/dispatcher/index_value_test.go b/cdc/sink/dispatcher/index_value_test.go index 2d7178d94eb..da2f56d80fa 100644 --- a/cdc/sink/dispatcher/index_value_test.go +++ b/cdc/sink/dispatcher/index_value_test.go @@ -42,7 +42,7 @@ func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) { Flag: 0, }, }, - }, exceptPartition: 8}, + }, exceptPartition: 2}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -58,7 +58,7 @@ func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) { Flag: 0, }, }, - }, exceptPartition: 5}, + }, exceptPartition: 11}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -74,7 +74,7 @@ func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) { Flag: 0, }, }, - }, exceptPartition: 8}, + }, exceptPartition: 2}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -90,7 +90,7 @@ func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 9}, + }, exceptPartition: 5}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -106,7 +106,7 @@ func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 9}, + }, exceptPartition: 5}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -122,7 +122,7 @@ func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 5}, + }, exceptPartition: 14}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", @@ -138,7 +138,7 @@ func (s IndexValueDispatcherSuite) TestIndexValueDispatcher(c *check.C) { Flag: model.HandleKeyFlag, }, }, - }, exceptPartition: 13}, + }, exceptPartition: 2}, } p := newIndexValueDispatcher(16) for _, tc := range testCases { diff --git a/cdc/sink/dispatcher/table.go b/cdc/sink/dispatcher/table.go index 2922a2bec66..28d4c04d464 100644 --- a/cdc/sink/dispatcher/table.go +++ b/cdc/sink/dispatcher/table.go @@ -34,5 +34,5 @@ func (t *tableDispatcher) Dispatch(row *model.RowChangedEvent) int32 { t.hasher.Reset() // distribute partition by table t.hasher.Write([]byte(row.Table.Schema), []byte(row.Table.Table)) - return int32(t.hasher.Sum8() % byte(t.partitionNum)) + return int32(t.hasher.Sum32() % uint32(t.partitionNum)) } diff --git a/cdc/sink/dispatcher/table_test.go b/cdc/sink/dispatcher/table_test.go index 9be85c7962a..0b818974957 100644 --- a/cdc/sink/dispatcher/table_test.go +++ b/cdc/sink/dispatcher/table_test.go @@ -33,49 +33,49 @@ func (s TableDispatcherSuite) TestTableDispatcher(c *check.C) { Table: "t1", }, CommitTs: 1, - }, exceptPartition: 12}, + }, exceptPartition: 15}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t1", }, CommitTs: 2, - }, exceptPartition: 12}, + }, exceptPartition: 15}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t1", }, CommitTs: 3, - }, exceptPartition: 12}, + }, exceptPartition: 15}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, CommitTs: 1, - }, exceptPartition: 7}, + }, exceptPartition: 5}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, CommitTs: 2, - }, exceptPartition: 7}, + }, exceptPartition: 5}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t2", }, CommitTs: 3, - }, exceptPartition: 7}, + }, exceptPartition: 5}, {row: &model.RowChangedEvent{ Table: &model.TableName{ Schema: "test", Table: "t3", }, CommitTs: 3, - }, exceptPartition: 1}, + }, exceptPartition: 3}, } p := newTableDispatcher(16) for _, tc := range testCases { diff --git a/pkg/hash/position_inertia.go b/pkg/hash/position_inertia.go index 62dfac5e4ed..d31aa60372e 100644 --- a/pkg/hash/position_inertia.go +++ b/pkg/hash/position_inertia.go @@ -21,11 +21,11 @@ import ( "go.uber.org/zap" ) -const hashMagicNumber = 0x6A +const hashMagicNumber = 0 // PositionInertia is a 8-bits hash which is bytes partitions inertia type PositionInertia struct { - hashValue byte + hashValue uint32 hasher hash.Hash32 } @@ -45,12 +45,10 @@ func (p *PositionInertia) Write(bss ...[]byte) { } } rawHash := p.hasher.Sum32() - rawHash ^= rawHash >> 16 - rawHash ^= rawHash >> 8 - p.hashValue ^= byte(rawHash) + p.hashValue ^= rawHash } -func (p *PositionInertia) Sum8() byte { +func (p *PositionInertia) Sum32() uint32 { return p.hashValue } diff --git a/pkg/hash/position_inertia_test.go b/pkg/hash/position_inertia_test.go index 83fb6c603c5..99de839754f 100644 --- a/pkg/hash/position_inertia_test.go +++ b/pkg/hash/position_inertia_test.go @@ -33,33 +33,33 @@ func (s *testPositionInertia) TestPositionInertia(c *C) { hash.Write([]byte("hello"), []byte("pingcap")) hash.Write([]byte("hello"), []byte("ticdc")) hash.Write([]byte("hello"), []byte("tools")) - c.Assert(hash.Sum8(), Equals, byte(0x1c)) + c.Assert(hash.Sum32(), Equals, uint32(0x914505a7)) hash.Reset() hash.Write([]byte("hello"), []byte("pingcap")) hash.Write([]byte("hello"), []byte("hash")) hash.Write([]byte("hello"), []byte("tools")) hash.Write([]byte("hello"), []byte("ticdc")) - c.Assert(hash.Sum8(), Equals, byte(0x1c)) + c.Assert(hash.Sum32(), Equals, uint32(0x914505a7)) hash.Reset() hash.Write([]byte("hello"), []byte("ticdc")) hash.Write([]byte("hello"), []byte("hash")) hash.Write([]byte("hello"), []byte("tools")) hash.Write([]byte("hello"), []byte("pingcap")) - c.Assert(hash.Sum8(), Equals, byte(0x1c)) + c.Assert(hash.Sum32(), Equals, uint32(0x914505a7)) hash.Reset() hash.Write([]byte("ticdc"), []byte("hello")) hash.Write([]byte("hello"), []byte("hash")) hash.Write([]byte("hello"), []byte("tools")) hash.Write([]byte("hello"), []byte("pingcap")) - c.Assert(hash.Sum8(), Equals, byte(0x8c)) + c.Assert(hash.Sum32(), Equals, uint32(0xf21b4f40)) hash.Reset() hash.Write([]byte("ticdc"), []byte("hello")) hash.Write([]byte("hello"), []byte("hash")) hash.Write([]byte("tools"), []byte("hello")) hash.Write([]byte("hello"), []byte("pingcap")) - c.Assert(hash.Sum8(), Equals, byte(0x93)) + c.Assert(hash.Sum32(), Equals, uint32(0x7cd6194a)) } From 3104e9fcb9370c7bbd10e237423d38d328f9d1e4 Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 29 Jul 2020 14:40:07 +0800 Subject: [PATCH 12/15] fix check --- pkg/hash/position_inertia.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/hash/position_inertia.go b/pkg/hash/position_inertia.go index d31aa60372e..db97202a5d4 100644 --- a/pkg/hash/position_inertia.go +++ b/pkg/hash/position_inertia.go @@ -29,6 +29,7 @@ type PositionInertia struct { hasher hash.Hash32 } +// NewPositionInertia creates a new position inertia algorithm hash builder func NewPositionInertia() *PositionInertia { return &PositionInertia{ hashValue: hashMagicNumber, @@ -36,6 +37,7 @@ func NewPositionInertia() *PositionInertia { } } +// Write writes the bytes into the PositionInertia func (p *PositionInertia) Write(bss ...[]byte) { p.hasher.Reset() for _, bs := range bss { @@ -48,10 +50,12 @@ func (p *PositionInertia) Write(bss ...[]byte) { p.hashValue ^= rawHash } +// Sum32 returns the 32-bits hash func (p *PositionInertia) Sum32() uint32 { return p.hashValue } +// Reset resets the PositionInertia func (p *PositionInertia) Reset() { p.hashValue = hashMagicNumber } From 9147906e2c9ceaffe343217e9871d311002dd1b3 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 3 Aug 2020 15:53:20 +0800 Subject: [PATCH 13/15] add debug code --- cdc/sink/mysql.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 22a44d043e0..280df315375 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -613,10 +613,10 @@ func (s *mysqlSink) execDMLWithMaxRetries( } for i, query := range sqls { args := values[i] + log.Debug("exec row", zap.String("sql", query), zap.Any("args", args)) if _, err := tx.ExecContext(ctx, query, args...); err != nil { return 0, checkTxnErr(errors.Trace(err)) } - log.Debug("exec row", zap.String("sql", query), zap.Any("args", args)) } if err = tx.Commit(); err != nil { return 0, checkTxnErr(errors.Trace(err)) @@ -677,7 +677,7 @@ func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, if err != nil { return errors.Trace(err) } - log.Debug("show prepareDMLs", zap.Any("rows", rows), zap.Strings("sqls", sqls), zap.Any("values", values)) + log.Debug("prepare DMLs", zap.Any("rows", rows), zap.Strings("sqls", sqls), zap.Any("values", values)) if err := s.execDMLWithMaxRetries(ctx, sqls, values, defaultDMLMaxRetryTime, bucket); err != nil { ts := make([]uint64, 0, len(rows)) for _, row := range rows { From 9627608fc7f7530dfbc2f68bcb002346f665f2cd Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 3 Aug 2020 16:03:04 +0800 Subject: [PATCH 14/15] fix bug --- cdc/entry/mounter.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index e80287995b1..dae55119f29 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -395,22 +395,21 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill } cols[colName] = col } - if !fillWithDefaultValue { - return cols, nil - } - for _, col := range tableInfo.Columns { - _, ok := cols[col.Name.O] - if !ok && tableInfo.IsColCDCVisible(col) { - column := &model.Column{ - Type: col.Tp, - Value: getDefaultOrZeroValue(col), - Flag: transColumnFlag(col), - } - if tableInfo.IsColumnUnique(col.ID) { - whereHandle := true - column.WhereHandle = &whereHandle + if fillWithDefaultValue { + for _, col := range tableInfo.Columns { + _, ok := cols[col.Name.O] + if !ok && tableInfo.IsColCDCVisible(col) { + column := &model.Column{ + Type: col.Tp, + Value: getDefaultOrZeroValue(col), + Flag: transColumnFlag(col), + } + if tableInfo.IsColumnUnique(col.ID) { + whereHandle := true + column.WhereHandle = &whereHandle + } + cols[col.Name.O] = column } - cols[col.Name.O] = column } } From 0f8623f24f165991e4d15dd8785298572e8c4354 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 3 Aug 2020 17:32:13 +0800 Subject: [PATCH 15/15] Apply suggestions from code review Co-authored-by: Zixiong Liu --- cdc/entry/mounter.go | 4 ++-- cdc/model/schema_storage.go | 8 ++++---- cdc/model/sink.go | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index dae55119f29..76067491c9b 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -481,7 +481,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr func setHandleKeyFlag(tableInfo *model.TableInfo, colValues map[string]*model.Column) error { switch tableInfo.HandleIndexID { case model.HandleIndexTableIneligible: - log.Fatal("this table is not a eligible", zap.Int64("tableID", tableInfo.ID)) + log.Fatal("this table is not eligible", zap.Int64("tableID", tableInfo.ID)) case model.HandleIndexPKIsHandle: // pk is handle if !tableInfo.PKIsHandle { @@ -511,7 +511,7 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKV if !idx.Delete || m.enableOldValue { return nil, nil } - // skip the index which not handle + // skip any index that is not the handle if idx.IndexID != tableInfo.HandleIndexID { return nil, nil } diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 366c329c646..8b0912a39ee 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -22,9 +22,9 @@ import ( ) const ( - // HandleIndexPKIsHandle represents the handle index is pk and the pk is handle + // HandleIndexPKIsHandle represents that the handle index is the pk and the pk is the handle HandleIndexPKIsHandle = -1 - // HandleIndexTableIneligible represents the table is ineligible + // HandleIndexTableIneligible represents that the table is ineligible HandleIndexTableIneligible = -2 ) @@ -41,7 +41,7 @@ type TableInfo struct { // only for new row format decoder handleColID int64 - // the mounter will chose this index to output delete events + // the mounter will choose this index to output delete events // special value: // HandleIndexPKIsHandle(-1) : pk is handle // HandleIndexTableIneligible(-2) : the table is not eligible @@ -167,7 +167,7 @@ func (ti *TableInfo) GetRowColInfos() (int64, []rowcodec.ColInfo) { return ti.handleColID, ti.rowColInfos } -// IsColCDCVisible returns is the col is visible for CDC +// IsColCDCVisible returns whether the col is visible for CDC func (ti *TableInfo) IsColCDCVisible(col *model.ColumnInfo) bool { // this column is a virtual generated column if col.IsGenerated() && !col.GeneratedStored { diff --git a/cdc/model/sink.go b/cdc/model/sink.go index c4250db47a8..0c5270aae1f 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -43,7 +43,7 @@ type ColumnFlagType util.Flag const ( // BinaryFlag means the column charset is binary BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) - // HandleKeyFlag means the column selected as the handle key + // HandleKeyFlag means the column is selected as the handle key HandleKeyFlag // GeneratedColumnFlag means the column is a generated column GeneratedColumnFlag @@ -146,13 +146,13 @@ type RowChangedEvent struct { type Column struct { Type byte `json:"t"` // WhereHandle is deprecation - // WhereHandle is instead by HandleKey in Flag + // WhereHandle is replaced by HandleKey in Flag WhereHandle *bool `json:"h,omitempty"` Flag ColumnFlagType `json:"f"` Value interface{} `json:"v"` } -// ColumnValueString returns a string representation of the column value +// ColumnValueString returns the string representation of the column value func ColumnValueString(c interface{}) string { var data string switch v := c.(type) {