Skip to content

Commit

Permalink
filter (ticdc): enhance event filter (#10509)
Browse files Browse the repository at this point in the history
close #10511
  • Loading branch information
asddongmen authored Feb 7, 2024
1 parent ba8e2aa commit ed54e78
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 375 deletions.
2 changes: 1 addition & 1 deletion dm/syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ func (ddl *DDLWorker) AstToDDLEvent(qec *queryEventContext, info *ddlInfo) (et b
}
}
case ast.AlterTableReorganizePartition:
return bf.ReorganizePartion
return bf.ReorganizePartition
case ast.AlterTableRebuildPartition:
return bf.RebuildPartition
case ast.AlterTableCoalescePartitions:
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/incompatible_ddl_changes/conf/dm-task1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ filters:
"drop primary key",
"drop unique key",
"modify default value",
"modify constaints",
"modify constraint",
"modify columns order",
"modify charset",
"modify collation",
Expand Down
6 changes: 3 additions & 3 deletions dm/tests/incompatible_ddl_changes/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ function incompatible_ddl() {
"binlog skip test" \
"\"result\": true" 2

# modify constaints
# modify constraint
run_sql_source1 "alter table incompatible_ddl_changes.t1 add constraint c_int_unique unique(c_int)"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"event modify constaint" 1
"event modify constraint" 1
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog skip test" \
"\"result\": true" 2
Expand Down Expand Up @@ -238,7 +238,7 @@ function incompatible_ddl() {
run_sql_source1 "alter table incompatible_ddl_changes.tb3 add index idx(id);"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"event modify constaints" 1
"event modify constraint" 1
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog skip test" \
"\"result\": true" 2
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22
github.com/pingcap/tidb v1.1.0-beta.0.20240105042433-54d8a1416ab0
github.com/pingcap/tidb-tools v0.0.0-20231228035519-c4bdf178b3d6
github.com/pingcap/tidb-tools v0.0.0-20240131075425-5bb51786a1dc
github.com/pingcap/tidb/pkg/parser v0.0.0-20231229060758-e19e06e1bc19
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -867,8 +867,8 @@ github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hq
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb v1.1.0-beta.0.20240105042433-54d8a1416ab0 h1:EAeL37UVvIMvAG5JP0qDSBWZvV4tKn0gUbSP05vjtqU=
github.com/pingcap/tidb v1.1.0-beta.0.20240105042433-54d8a1416ab0/go.mod h1:1jrwjaNIwD9JmFR3NghQ/xu/PCuPQVLhK8sshyFadN0=
github.com/pingcap/tidb-tools v0.0.0-20231228035519-c4bdf178b3d6 h1:NJQd14hU0pLj8vS+vYtgFw4bXrFIXa/HfMdxALHtgVU=
github.com/pingcap/tidb-tools v0.0.0-20231228035519-c4bdf178b3d6/go.mod h1:7m72uoBoTrr8JNNA7iFBH824j3clIJx34zlUWJ45Q6Y=
github.com/pingcap/tidb-tools v0.0.0-20240131075425-5bb51786a1dc h1:z9gMlNH+AMmp62EniIRtyVVgOsrFcuxiUu6Ty60uEMU=
github.com/pingcap/tidb-tools v0.0.0-20240131075425-5bb51786a1dc/go.mod h1:7m72uoBoTrr8JNNA7iFBH824j3clIJx34zlUWJ45Q6Y=
github.com/pingcap/tidb/pkg/parser v0.0.0-20231229060758-e19e06e1bc19 h1:nmFYQ4q3m0v7AleVVyiijMtEzfxCh8zqMeuD1npsy+w=
github.com/pingcap/tidb/pkg/parser v0.0.0-20231229060758-e19e06e1bc19/go.mod h1:yRkiqLFwIqibYg2P7h4bclHjHcJiIFRLKhGRyBcKYus=
github.com/pingcap/tipb v0.0.0-20230919054518-dfd7d194838f h1:NCiI4Wyu4GkViLGTu6cYcxt79LZ1SenBBQX1OwEV6Jg=
Expand Down
3 changes: 3 additions & 0 deletions pkg/filter/expr_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ func (f *dmlExprFilter) shouldSkipDML(
rawRow model.RowChangedDatums,
ti *model.TableInfo,
) (bool, error) {
if len(f.rules) == 0 {
return false, nil
}
// for defense purpose, normally the row and ti should not be nil.
if ti == nil || row == nil || rawRow.IsEmpty() {
return false, nil
Expand Down
107 changes: 61 additions & 46 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package filter

import (
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
timodel "github.com/pingcap/tidb/pkg/parser/model"
tfilter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -27,45 +28,63 @@ const (
TiCDCSystemSchema = "tidb_cdc"
)

// allowDDLList is a list of DDL types that can be applied to cdc's schema storage.
// It's a white list.
var allowDDLList = []timodel.ActionType{
timodel.ActionCreateSchema,
timodel.ActionDropSchema,
timodel.ActionCreateTable,
timodel.ActionDropTable,
timodel.ActionAddColumn,
timodel.ActionDropColumn,
timodel.ActionAddIndex,
timodel.ActionDropIndex,
timodel.ActionTruncateTable,
timodel.ActionModifyColumn,
timodel.ActionRenameTable,
timodel.ActionRenameTables,
timodel.ActionSetDefaultValue,
timodel.ActionModifyTableComment,
timodel.ActionRenameIndex,
timodel.ActionAddTablePartition,
timodel.ActionDropTablePartition,
timodel.ActionCreateView,
timodel.ActionModifyTableCharsetAndCollate,
timodel.ActionTruncateTablePartition,
timodel.ActionDropView,
timodel.ActionRecoverTable,
timodel.ActionModifySchemaCharsetAndCollate,
timodel.ActionAddPrimaryKey,
timodel.ActionDropPrimaryKey,
timodel.ActionAddColumns, // Removed in TiDB v6.2.0, see https://github.com/pingcap/tidb/pull/35862.
timodel.ActionDropColumns, // Removed in TiDB v6.2.0
timodel.ActionRebaseAutoID,
timodel.ActionAlterIndexVisibility,
timodel.ActionMultiSchemaChange,
timodel.ActionExchangeTablePartition,
timodel.ActionReorganizePartition,
timodel.ActionAlterTTLInfo,
timodel.ActionAlterTTLRemove,
timodel.ActionAlterTablePartitioning,
timodel.ActionRemovePartitioning,
// ddlWhiteListMap is a map of all DDL types that can be applied to cdc's schema storage.
var ddlWhiteListMap = map[timodel.ActionType]bf.EventType{
// schema related DDLs
timodel.ActionCreateSchema: bf.CreateDatabase,
timodel.ActionDropSchema: bf.DropDatabase,
timodel.ActionModifySchemaCharsetAndCollate: bf.ModifySchemaCharsetAndCollate,

// table related DDLs
timodel.ActionCreateTable: bf.CreateTable,
timodel.ActionDropTable: bf.DropTable,
timodel.ActionTruncateTable: bf.TruncateTable,
timodel.ActionRenameTable: bf.RenameTable,
timodel.ActionRenameTables: bf.RenameTable,
timodel.ActionRecoverTable: bf.RecoverTable,
timodel.ActionModifyTableComment: bf.ModifyTableComment,
timodel.ActionModifyTableCharsetAndCollate: bf.ModifyTableCharsetAndCollate,

// view related DDLs
timodel.ActionCreateView: bf.CreateView,
timodel.ActionDropView: bf.DropView,

// partition related DDLs
timodel.ActionAddTablePartition: bf.AddTablePartition,
timodel.ActionDropTablePartition: bf.DropTablePartition,
timodel.ActionTruncateTablePartition: bf.TruncateTablePartition,
timodel.ActionExchangeTablePartition: bf.ExchangePartition,
timodel.ActionReorganizePartition: bf.ReorganizePartition,
timodel.ActionAlterTablePartitioning: bf.AlterTablePartitioning,
timodel.ActionRemovePartitioning: bf.RemovePartitioning,

// column related DDLs
timodel.ActionAddColumn: bf.AddColumn,
timodel.ActionDropColumn: bf.DropColumn,
timodel.ActionModifyColumn: bf.ModifyColumn,
timodel.ActionSetDefaultValue: bf.SetDefaultValue,

// index related DDLs
timodel.ActionRebaseAutoID: bf.RebaseAutoID,
timodel.ActionAddPrimaryKey: bf.AddPrimaryKey,
timodel.ActionDropPrimaryKey: bf.DropPrimaryKey,
timodel.ActionAddIndex: bf.CreateIndex,
timodel.ActionDropIndex: bf.DropIndex,
timodel.ActionRenameIndex: bf.RenameIndex,
timodel.ActionAlterIndexVisibility: bf.AlterIndexVisibility,

// TTL related DDLs
timodel.ActionAlterTTLInfo: bf.AlterTTLInfo,
timodel.ActionAlterTTLRemove: bf.AlterTTLRemove,

// difficult to classify DDLs
timodel.ActionMultiSchemaChange: bf.MultiSchemaChange,

// deprecated DDLs,see https://github.com/pingcap/tidb/pull/35862.
// DDL types below are deprecated in TiDB v6.2.0, but we still keep them here
// In case that some users will use TiCDC to replicate data from TiDB v6.1.x.
timodel.ActionAddColumns: bf.AddColumn,
timodel.ActionDropColumns: bf.DropColumn,
}

// Filter are safe for concurrent use.
Expand Down Expand Up @@ -117,7 +136,7 @@ func NewFilter(cfg *config.ReplicaConfig, tz string) (Filter, error) {
if err != nil {
return nil, err
}
sqlEventFilter, err := newSQLEventFilter(cfg.Filter, cfg.SQLMode)
sqlEventFilter, err := newSQLEventFilter(cfg.Filter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -222,12 +241,8 @@ func (f *filter) shouldIgnoreStartTs(ts uint64) bool {
}

func isAllowedDDL(actionType timodel.ActionType) bool {
for _, action := range allowDDLList {
if actionType == action {
return true
}
}
return false
_, ok := ddlWhiteListMap[actionType]
return ok
}

// IsSchemaDDL returns true if the action type is a schema DDL.
Expand Down
7 changes: 4 additions & 3 deletions pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,14 @@ func TestShouldDiscardDDL(t *testing.T) {
}

func TestIsAllowedDDL(t *testing.T) {
require.Len(t, ddlWhiteListMap, 36)
type testCase struct {
timodel.ActionType
allowed bool
}
testCases := make([]testCase, 0, len(allowDDLList))
for _, action := range allowDDLList {
testCases = append(testCases, testCase{action, true})
testCases := make([]testCase, 0, len(ddlWhiteListMap))
for ddlType := range ddlWhiteListMap {
testCases = append(testCases, testCase{ddlType, true})
}
testCases = append(testCases, testCase{timodel.ActionAddForeignKey, false})
testCases = append(testCases, testCase{timodel.ActionDropForeignKey, false})
Expand Down
90 changes: 28 additions & 62 deletions pkg/filter/sql_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@
package filter

import (
"fmt"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/mysql"
tfilter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -84,8 +79,8 @@ func newSQLEventFilterRule(cfg *config.EventFilterRule) (*sqlEventRule, error) {
}

func verifyIgnoreEvents(types []bf.EventType) error {
typesMap := make(map[bf.EventType]struct{}, len(supportedEventTypes))
for _, et := range supportedEventTypes {
typesMap := make(map[bf.EventType]struct{}, len(SupportedEventTypes()))
for _, et := range SupportedEventTypes() {
typesMap[et] = struct{}{}
}
for _, et := range types {
Expand All @@ -98,28 +93,11 @@ func verifyIgnoreEvents(types []bf.EventType) error {

// sqlEventFilter is a filter that filters DDL/DML event by its type or query.
type sqlEventFilter struct {
// Please be careful, parser.Parser is not thread safe.
pLock sync.Mutex
// Currently, parser is only used to parse ddl query.
// So we can use a lock to protect it.
// If we want to use it to parse dml query in the future,
// we should create a parser for each goroutine.
ddlParser *parser.Parser
rules []*sqlEventRule
rules []*sqlEventRule
}

func newSQLEventFilter(cfg *config.FilterConfig, sqlMode string) (*sqlEventFilter, error) {
p := parser.New()
mode, err := mysql.GetSQLMode(sqlMode)
if err != nil {
log.Error("failed to get sql mode", zap.Error(err))
return nil, cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode))
}
p.SetSQLMode(mode)

res := &sqlEventFilter{
ddlParser: p,
}
func newSQLEventFilter(cfg *config.FilterConfig) (*sqlEventFilter, error) {
res := &sqlEventFilter{}
for _, rule := range cfg.EventFilters {
if err := res.addRule(rule); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -155,17 +133,12 @@ func (f *sqlEventFilter) getRules(schema, table string) []*sqlEventRule {

// skipDDLEvent skips ddl event by its type and query.
func (f *sqlEventFilter) shouldSkipDDL(ddl *model.DDLEvent) (bool, error) {
if len(f.rules) == 0 {
return false, nil
}
schema := ddl.TableInfo.TableName.Schema
table := ddl.TableInfo.TableName.Table
log.Info("sql event filter handle ddl event",
zap.Any("ddlType", ddl.Type), zap.String("schema", schema),
zap.String("table", table), zap.String("query", ddl.Query))
f.pLock.Lock()
evenType, err := ddlToEventType(f.ddlParser, ddl.Query, ddl.Type)
f.pLock.Unlock()
if err != nil {
return false, err
}
evenType := ddlToEventType(ddl.Type)
if evenType == bf.NullEvent {
log.Warn("sql event filter unsupported ddl type, do nothing",
zap.String("type", ddl.Type.String()),
Expand All @@ -185,12 +158,31 @@ func (f *sqlEventFilter) shouldSkipDDL(ddl *model.DDLEvent) (bool, error) {
if action == bf.Ignore {
return true, nil
}

// If the ddl is alter table's subtype,
// we need try to filter it by bf.AlterTable.
if isAlterTable(ddl.Type) {
action, err = rule.bf.Filter(
binlogFilterSchemaPlaceholder,
binlogFilterTablePlaceholder,
bf.AlterTable, ddl.Query)
if err != nil {
return false, errors.Trace(err)
}
if action == bf.Ignore {
return true, nil
}
}
}
return false, nil
}

// shouldSkipDML skips dml event by its type.
func (f *sqlEventFilter) shouldSkipDML(event *model.RowChangedEvent) (bool, error) {
if len(f.rules) == 0 {
return false, nil
}

var et bf.EventType
switch {
case event.IsInsert():
Expand All @@ -216,29 +208,3 @@ func (f *sqlEventFilter) shouldSkipDML(event *model.RowChangedEvent) (bool, erro
}
return false, nil
}

var supportedEventTypes = []bf.EventType{
bf.AllDML,
bf.AllDDL,

// dml events
bf.InsertEvent,
bf.UpdateEvent,
bf.DeleteEvent,

// ddl events
bf.CreateSchema,
bf.CreateDatabase,
bf.DropSchema,
bf.DropDatabase,
bf.CreateTable,
bf.DropTable,
bf.RenameTable,
bf.TruncateTable,
bf.AlterTable,
bf.CreateView,
bf.DropView,
bf.AddTablePartition,
bf.DropTablePartition,
bf.TruncateTablePartition,
}
Loading

0 comments on commit ed54e78

Please sign in to comment.