diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index f91628e16..bd4ed899d 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -422,17 +422,6 @@ jobs: if: ${{ success() }} run: | export TICDC_NEWARCH=true && make integration_test CASE=partition_table - - - name: Test multi_tables_ddl - if: ${{ success() }} - run: | - export TICDC_NEWARCH=true && make integration_test CASE=multi_tables_ddl - - - name: Test multi_source - if: ${{ success() }} - run: | - export TICDC_NEWARCH=true && make integration_test CASE=multi_source - - name: Test ddl_attributes if: ${{ success() }} @@ -516,12 +505,22 @@ jobs: run: | export TICDC_NEWARCH=true && make integration_test CASE=cdc - # The 20th case in this group - name: Test overwrite_resume_with_syncpoint if: ${{ success() }} run: | export TICDC_NEWARCH=true && make integration_test CASE=overwrite_resume_with_syncpoint + - name: Test multi_source + if: ${{ success() }} + run: | + export TICDC_NEWARCH=true && make integration_test CASE=multi_source + + # The 20th case in this group + - name: Test multi_tables_ddl + if: ${{ success() }} + run: | + export TICDC_NEWARCH=true && make integration_test CASE=multi_tables_ddl + - name: Upload test logs if: always() uses: ./.github/actions/upload-test-logs diff --git a/downstreamadapter/sink/helper/eventrouter/event_router.go b/downstreamadapter/sink/helper/eventrouter/event_router.go index e8aa0b217..7cc943e7e 100644 --- a/downstreamadapter/sink/helper/eventrouter/event_router.go +++ b/downstreamadapter/sink/helper/eventrouter/event_router.go @@ -84,18 +84,18 @@ func (s *EventRouter) GetTopicForRowChange(tableInfo *common.TableInfo) string { func (s *EventRouter) GetTopicForDDL(ddl *commonEvent.DDLEvent) string { var schema, table string - if ddl.GetPrevSchemaName() != "" { - if ddl.GetPrevTableName() == "" { + if ddl.GetExtraSchemaName() != "" { + if ddl.GetExtraTableName() == "" { return s.defaultTopic } - schema = ddl.GetPrevSchemaName() - table = ddl.GetPrevTableName() + schema = ddl.GetExtraSchemaName() + table = ddl.GetExtraTableName() } else { - if ddl.GetCurrentTableName() == "" { + if ddl.GetTableName() == "" { return s.defaultTopic } - schema = ddl.GetCurrentSchemaName() - table = ddl.GetCurrentTableName() + schema = ddl.GetSchemaName() + table = ddl.GetTableName() } topicGenerator := s.matchTopicGenerator(schema, table) diff --git a/downstreamadapter/sink/helper/eventrouter/event_router_test.go b/downstreamadapter/sink/helper/eventrouter/event_router_test.go index d4386d0fa..dc2a89de5 100644 --- a/downstreamadapter/sink/helper/eventrouter/event_router_test.go +++ b/downstreamadapter/sink/helper/eventrouter/event_router_test.go @@ -315,10 +315,10 @@ func TestGetTopicForDDL(t *testing.T) { }, { ddl: &commonEvent.DDLEvent{ - PrevSchemaName: "test1", - PrevTableName: "tb1", - SchemaName: "test1", - TableName: "tb2", + ExtraSchemaName: "test1", + ExtraTableName: "tb1", + SchemaName: "test1", + TableName: "tb2", }, expectedTopic: "test1_tb1", }, diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index f6f1a3fd2..debec9865 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -400,13 +400,13 @@ func unmarshalPersistedDDLEvent(value []byte) PersistedDDLEvent { } ddlEvent.TableInfoValue = nil - if ddlEvent.PreTableInfoValue != nil { + if ddlEvent.ExtraTableInfoValue != nil { var err error - ddlEvent.PreTableInfo, err = common.UnmarshalJSONToTableInfo(ddlEvent.PreTableInfoValue) + ddlEvent.ExtraTableInfo, err = common.UnmarshalJSONToTableInfo(ddlEvent.ExtraTableInfoValue) if err != nil { log.Fatal("unmarshal pre table info failed", zap.Error(err)) } - ddlEvent.PreTableInfoValue = nil + ddlEvent.ExtraTableInfoValue = nil } if len(ddlEvent.MultipleTableInfosValue) > 0 { @@ -446,8 +446,8 @@ func writePersistedDDLEvent(db *pebble.DB, ddlEvent *PersistedDDLEvent) error { if err != nil { return err } - if ddlEvent.PreTableInfo != nil { - ddlEvent.PreTableInfoValue, err = ddlEvent.PreTableInfo.Marshal() + if ddlEvent.ExtraTableInfo != nil { + ddlEvent.ExtraTableInfoValue, err = ddlEvent.ExtraTableInfo.Marshal() if err != nil { return err } @@ -646,6 +646,7 @@ func loadAllPhysicalTablesAtTs( return nil, err } log.Info("after load tables in kv snap", + zap.Int("tableInfoMapLen", len(tableInfoMap)), zap.Int("tableMapLen", len(tableMap)), zap.Int("partitionMapLen", len(partitionMap))) @@ -680,6 +681,7 @@ func loadAllPhysicalTablesAtTs( }) } log.Info("after load tables from ddl", + zap.Int("tableInfoMapLen", len(tableInfoMap)), zap.Int("tableMapLen", len(tableMap)), zap.Int("partitionMapLen", len(partitionMap))) tables := make([]commonEvent.Table, 0) diff --git a/logservice/schemastore/multi_version_test.go b/logservice/schemastore/multi_version_test.go index 5425b7761..1560c8934 100644 --- a/logservice/schemastore/multi_version_test.go +++ b/logservice/schemastore/multi_version_test.go @@ -22,8 +22,9 @@ import ( func TestBuildVersionedTableInfoStore(t *testing.T) { type QueryTableInfoTestCase struct { - snapTs uint64 - name string + snapTs uint64 + schemaName string + tableName string } testCases := []struct { tableID int64 @@ -41,8 +42,9 @@ func TestBuildVersionedTableInfoStore(t *testing.T) { }(), queryCases: []QueryTableInfoTestCase{ { - snapTs: 1000, - name: "t", + snapTs: 1000, + schemaName: "test", + tableName: "t", }, }, deleteVersion: 1010, @@ -57,12 +59,14 @@ func TestBuildVersionedTableInfoStore(t *testing.T) { }(), queryCases: []QueryTableInfoTestCase{ { - snapTs: 1010, - name: "t", + snapTs: 1010, + schemaName: "test", + tableName: "t", }, { - snapTs: 1020, - name: "t2", + snapTs: 1020, + schemaName: "test", + tableName: "t2", }, }, }, @@ -71,18 +75,20 @@ func TestBuildVersionedTableInfoStore(t *testing.T) { tableID: 101, ddlEvents: func() []*PersistedDDLEvent { return []*PersistedDDLEvent{ - buildCreatePartitionTableEventForTest(10, 100, "test", "partition_table", []int64{101, 102, 103}, 1010), // create partition table 100 with partitions 101, 102, 103 - buildExchangePartitionTableEventForTest(10, 200, 10, 100, "test", "normal_table", "test", "partition_table", []int64{101, 102, 103}, []int64{200, 102, 103}, 1020), // rename table 101 + buildCreatePartitionTableEventForTest(10, 100, "test", "partition_table", []int64{101, 102, 103}, 1010), // create partition table 100 with partitions 101, 102, 103 + buildExchangePartitionTableEventForTest(12, 200, 10, 100, "test2", "normal_table", "test", "partition_table", []int64{101, 102, 103}, []int64{200, 102, 103}, 1020), // rename table 101 } }(), queryCases: []QueryTableInfoTestCase{ { - snapTs: 1010, - name: "partition_table", + snapTs: 1010, + schemaName: "test", + tableName: "partition_table", }, { - snapTs: 1020, - name: "normal_table", + snapTs: 1020, + schemaName: "test2", + tableName: "normal_table", }, }, }, @@ -91,18 +97,20 @@ func TestBuildVersionedTableInfoStore(t *testing.T) { tableID: 200, ddlEvents: func() []*PersistedDDLEvent { return []*PersistedDDLEvent{ - buildCreateTableEventForTest(10, 200, "test", "normal_table", 1010), // create table 200 - buildExchangePartitionTableEventForTest(10, 200, 10, 100, "test", "normal_table", "test", "partition_table", []int64{101, 102, 103}, []int64{200, 102, 103}, 1020), // rename table 101 + buildCreateTableEventForTest(10, 200, "test", "normal_table", 1010), // create table 200 + buildExchangePartitionTableEventForTest(10, 200, 12, 100, "test", "normal_table", "test2", "partition_table", []int64{101, 102, 103}, []int64{200, 102, 103}, 1020), // rename table 101 } }(), queryCases: []QueryTableInfoTestCase{ { - snapTs: 1010, - name: "normal_table", + snapTs: 1010, + schemaName: "test", + tableName: "normal_table", }, { - snapTs: 1020, - name: "partition_table", + snapTs: 1020, + schemaName: "test2", + tableName: "partition_table", }, }, }, @@ -116,7 +124,8 @@ func TestBuildVersionedTableInfoStore(t *testing.T) { for _, c := range tt.queryCases { tableInfo, err := store.getTableInfo(c.snapTs) require.Nil(t, err) - require.Equal(t, c.name, tableInfo.TableName.Table) + require.Equal(t, c.schemaName, tableInfo.TableName.Schema) + require.Equal(t, c.tableName, tableInfo.TableName.Table) if !tableInfo.TableName.IsPartition { require.Equal(t, tt.tableID, tableInfo.TableName.TableID) } diff --git a/logservice/schemastore/multi_version_test_utils.go b/logservice/schemastore/multi_version_test_utils.go index 246722c2c..780f69cfe 100644 --- a/logservice/schemastore/multi_version_test_utils.go +++ b/logservice/schemastore/multi_version_test_utils.go @@ -21,11 +21,11 @@ import ( func buildCreateTableEventForTest(schemaID, tableID int64, schemaName, tableName string, finishedTs uint64) *PersistedDDLEvent { return &PersistedDDLEvent{ - Type: byte(model.ActionCreateTable), - CurrentSchemaID: schemaID, - CurrentTableID: tableID, - CurrentSchemaName: schemaName, - CurrentTableName: tableName, + Type: byte(model.ActionCreateTable), + SchemaID: schemaID, + TableID: tableID, + SchemaName: schemaName, + TableName: tableName, TableInfo: &model.TableInfo{ ID: tableID, Name: pmodel.NewCIStr(tableName), @@ -42,11 +42,11 @@ func buildCreatePartitionTableEventForTest(schemaID, tableID int64, schemaName, }) } return &PersistedDDLEvent{ - Type: byte(model.ActionCreateTable), - CurrentSchemaID: schemaID, - CurrentTableID: tableID, - CurrentSchemaName: schemaName, - CurrentTableName: tableName, + Type: byte(model.ActionCreateTable), + SchemaID: schemaID, + TableID: tableID, + SchemaName: schemaName, + TableName: tableName, TableInfo: &model.TableInfo{ ID: tableID, Name: pmodel.NewCIStr(tableName), @@ -61,12 +61,12 @@ func buildCreatePartitionTableEventForTest(schemaID, tableID int64, schemaName, func buildTruncateTableEventForTest(schemaID, oldTableID, newTableID int64, schemaName, tableName string, finishedTs uint64) *PersistedDDLEvent { return &PersistedDDLEvent{ - Type: byte(model.ActionTruncateTable), - CurrentSchemaID: schemaID, - CurrentTableID: newTableID, - CurrentSchemaName: schemaName, - CurrentTableName: tableName, - PrevTableID: oldTableID, + Type: byte(model.ActionTruncateTable), + SchemaID: schemaID, + TableID: oldTableID, + SchemaName: schemaName, + TableName: tableName, + ExtraTableID: newTableID, TableInfo: &model.TableInfo{ ID: newTableID, Name: pmodel.NewCIStr(tableName), @@ -75,16 +75,16 @@ func buildTruncateTableEventForTest(schemaID, oldTableID, newTableID int64, sche } } -func buildRenameTableEventForTest(prevSchemaID, schemaID, tableID int64, prevSchemaName, prevTableName, schemaName, tableName string, finishedTs uint64) *PersistedDDLEvent { +func buildRenameTableEventForTest(extraSchemaID, schemaID, tableID int64, extraSchemaName, extraTableName, schemaName, tableName string, finishedTs uint64) *PersistedDDLEvent { return &PersistedDDLEvent{ - Type: byte(model.ActionRenameTable), - CurrentSchemaID: schemaID, - CurrentTableID: tableID, - CurrentSchemaName: schemaName, - CurrentTableName: tableName, - PrevSchemaID: prevSchemaID, - PrevSchemaName: prevSchemaName, - PrevTableName: prevTableName, + Type: byte(model.ActionRenameTable), + SchemaID: schemaID, + TableID: tableID, + SchemaName: schemaName, + TableName: tableName, + ExtraSchemaID: extraSchemaID, + ExtraSchemaName: extraSchemaName, + ExtraTableName: extraTableName, TableInfo: &model.TableInfo{ ID: tableID, Name: pmodel.NewCIStr(tableName), @@ -105,15 +105,15 @@ func buildExchangePartitionTableEventForTest( }) } return &PersistedDDLEvent{ - Type: byte(model.ActionExchangeTablePartition), - CurrentSchemaID: partitionSchemaID, - CurrentTableID: partitionTableID, - CurrentSchemaName: partitionSchemaName, - CurrentTableName: partitionTableName, - PrevSchemaID: normalSchemaID, - PrevTableID: normalTableID, - PrevSchemaName: normalSchemaName, - PrevTableName: normalTableName, + Type: byte(model.ActionExchangeTablePartition), + SchemaID: normalSchemaID, + TableID: normalTableID, + SchemaName: normalSchemaName, + TableName: normalTableName, + ExtraSchemaID: partitionSchemaID, + ExtraTableID: partitionTableID, + ExtraSchemaName: partitionSchemaName, + ExtraTableName: partitionTableName, TableInfo: &model.TableInfo{ ID: partitionTableID, Name: pmodel.NewCIStr(partitionTableName), @@ -122,7 +122,7 @@ func buildExchangePartitionTableEventForTest( Enable: true, }, }, - PreTableInfo: common.WrapTableInfo(normalSchemaID, normalSchemaName, &model.TableInfo{ + ExtraTableInfo: common.WrapTableInfo(normalSchemaID, normalSchemaName, &model.TableInfo{ ID: normalTableID, Name: pmodel.NewCIStr(normalTableName), }), diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 0e1ebe110..b87813d9a 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -662,7 +662,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error { // TODO: do we have a better way to do this? if ddlEvent.Type == byte(model.ActionExchangeTablePartition) { - ddlEvent.PreTableInfo, _ = p.forceGetTableInfo(ddlEvent.PrevTableID, ddlEvent.FinishedTs) + ddlEvent.ExtraTableInfo, _ = p.forceGetTableInfo(ddlEvent.TableID, ddlEvent.FinishedTs) } // Note: need write ddl event to disk before update ddl history, diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 91b00afc4..eee6bdd94 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -324,7 +324,6 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, }, - model.ActionAlterIndexVisibility: { buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, @@ -333,7 +332,6 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, }, - model.ActionExchangeTablePartition: { buildPersistedDDLEventFunc: buildPersistedDDLEventForExchangePartition, updateDDLHistoryFunc: updateDDLHistoryForExchangeTablePartition, @@ -350,7 +348,6 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForRenameTables, buildDDLEventFunc: buildDDLEventForRenameTables, }, - model.ActionCreateTables: { buildPersistedDDLEventFunc: buildPersistedDDLEventForCreateTables, updateDDLHistoryFunc: updateDDLHistoryForCreateTables, @@ -367,7 +364,6 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForSingleTableDDL, buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, }, - model.ActionReorganizePartition: { buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalPartitionDDL, updateDDLHistoryFunc: updateDDLHistoryForReorganizePartition, @@ -376,7 +372,6 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ extractTableInfoFunc: extractTableInfoFuncForTruncateAndReorganizePartition, buildDDLEventFunc: buildDDLEventForTruncateAndReorganizePartition, }, - model.ActionAlterTTLInfo: { buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, updateDDLHistoryFunc: updateDDLHistoryForAlterTableTTL, @@ -465,19 +460,18 @@ func buildPersistedDDLEventCommon(args buildPersistedDDLEventFuncArgs) Persisted // Note: if a ddl involve multiple tables, job.TableID is different with job.BinlogInfo.TableInfo.ID // and usually job.BinlogInfo.TableInfo.ID will be the newly created IDs. - // Here we just use job.TableID as CurrentTableID and let ddl specific logic to adjust it.s event := PersistedDDLEvent{ - ID: job.ID, - Type: byte(job.Type), - CurrentSchemaID: job.SchemaID, - CurrentTableID: job.TableID, - Query: query, - SchemaVersion: job.BinlogInfo.SchemaVersion, - DBInfo: job.BinlogInfo.DBInfo, - TableInfo: job.BinlogInfo.TableInfo, - FinishedTs: job.BinlogInfo.FinishedTS, - BDRRole: job.BDRRole, - CDCWriteSource: job.CDCWriteSource, + ID: job.ID, + Type: byte(job.Type), + SchemaID: job.SchemaID, + TableID: job.TableID, + Query: query, + SchemaVersion: job.BinlogInfo.SchemaVersion, + DBInfo: job.BinlogInfo.DBInfo, + TableInfo: job.BinlogInfo.TableInfo, + FinishedTs: job.BinlogInfo.FinishedTS, + BDRRole: job.BDRRole, + CDCWriteSource: job.CDCWriteSource, } return event } @@ -486,61 +480,61 @@ func buildPersistedDDLEventForSchemaDDL(args buildPersistedDDLEventFuncArgs) Per event := buildPersistedDDLEventCommon(args) log.Info("buildPersistedDDLEvent for create/drop schema", zap.Any("type", event.Type), - zap.Int64("schemaID", event.CurrentSchemaID), + zap.Int64("schemaID", event.SchemaID), zap.String("schemaName", event.DBInfo.Name.O)) - event.CurrentSchemaName = event.DBInfo.Name.O + event.SchemaName = event.DBInfo.Name.O return event } func buildPersistedDDLEventForCreateView(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) - event.CurrentTableName = args.job.TableName + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + event.TableName = args.job.TableName return event } func buildPersistedDDLEventForDropView(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) // We don't store the relationship: view_id -> table_name, get table name from args.job - event.CurrentTableName = args.job.TableName + event.TableName = args.job.TableName // The query in job maybe "DROP VIEW test1.view1, test2.view2", we need rebuild it here. - event.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`", event.CurrentSchemaName, event.CurrentTableName) + event.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`", event.SchemaName, event.TableName) return event } func buildPersistedDDLEventForCreateTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) - event.CurrentTableName = event.TableInfo.Name.O + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + event.TableName = event.TableInfo.Name.O return event } func buildPersistedDDLEventForDropTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) - event.CurrentTableName = getTableName(args.tableMap, event.CurrentTableID) + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + event.TableName = getTableName(args.tableMap, event.TableID) // The query in job maybe "DROP TABLE test1.table1, test2.table2", we need rebuild it here. - event.Query = fmt.Sprintf("DROP TABLE `%s`.`%s`", event.CurrentSchemaName, event.CurrentTableName) + event.Query = fmt.Sprintf("DROP TABLE `%s`.`%s`", event.SchemaName, event.TableName) return event } func buildPersistedDDLEventForNormalDDLOnSingleTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) - event.CurrentTableName = getTableName(args.tableMap, event.CurrentTableID) + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + event.TableName = getTableName(args.tableMap, event.TableID) return event } func buildPersistedDDLEventForTruncateTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) // only table id change after truncate - event.PrevTableID = event.CurrentTableID - event.CurrentTableID = event.TableInfo.ID - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) - event.CurrentTableName = getTableName(args.tableMap, event.PrevTableID) + event.ExtraTableID = event.TableInfo.ID + // schema/table name remains the same, so it is ok to get them using old table/schema id + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + event.TableName = getTableName(args.tableMap, event.TableID) if isPartitionTable(event.TableInfo) { - for id := range args.partitionMap[event.PrevTableID] { + for id := range args.partitionMap[event.TableID] { event.PrevPartitions = append(event.PrevPartitions, id) } } @@ -551,37 +545,37 @@ func buildPersistedDDLEventForRenameTable(args buildPersistedDDLEventFuncArgs) P event := buildPersistedDDLEventCommon(args) // Note: schema id/schema name/table name may be changed or not // table id does not change, we use it to get the table's prev schema id/name and table name - event.PrevSchemaID = getSchemaID(args.tableMap, event.CurrentTableID) - // TODO: check how PrevTableName will be used later - event.PrevTableName = getTableName(args.tableMap, event.CurrentTableID) - event.PrevSchemaName = getSchemaName(args.databaseMap, event.PrevSchemaID) - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) + event.ExtraSchemaID = getSchemaID(args.tableMap, event.TableID) + // TODO: check how ExtraTableName will be used later + event.ExtraTableName = getTableName(args.tableMap, event.TableID) + event.ExtraSchemaName = getSchemaName(args.databaseMap, event.ExtraSchemaID) + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) // get the table's current table name from the ddl job - event.CurrentTableName = event.TableInfo.Name.O + event.TableName = event.TableInfo.Name.O if len(args.job.InvolvingSchemaInfo) > 0 { log.Info("buildPersistedDDLEvent for rename table", zap.String("query", event.Query), - zap.Int64("schemaID", event.CurrentSchemaID), - zap.String("schemaName", event.CurrentSchemaName), - zap.String("tableName", event.CurrentTableName), - zap.Int64("prevSchemaID", event.PrevSchemaID), - zap.String("prevSchemaName", event.PrevSchemaName), - zap.String("prevTableName", event.PrevTableName), + zap.Int64("schemaID", event.SchemaID), + zap.String("SchemaName", event.SchemaName), + zap.String("tableName", event.TableName), + zap.Int64("ExtraSchemaID", event.ExtraSchemaID), + zap.String("ExtraSchemaName", event.ExtraSchemaName), + zap.String("ExtraTableName", event.ExtraTableName), zap.Any("involvingSchemaInfo", args.job.InvolvingSchemaInfo)) // The query in job maybe "RENAME TABLE table1 to test2.table2", we need rebuild it here. // // Note: Why use args.job.InvolvingSchemaInfo to build query? - // because event.PrevSchemaID may not be accurate for rename table in some case. + // because event.ExtraSchemaID may not be accurate for rename table in some case. // after pr: https://github.com/pingcap/tidb/pull/43341, // assume there is a table `test.t` and a ddl: `rename table t to test2.t;`, and its commit ts is `100`. // if you get a ddl snapshot at ts `99`, table `t` is already in `test2`. - // so event.PrevSchemaName will also be `test2`. + // so event.ExtraSchemaName will also be `test2`. // And because SchemaStore is the source of truth inside cdc, - // we can use event.PrevSchemaID(even it is wrong) to update the internal state of the cdc. + // we can use event.ExtraSchemaID(even it is wrong) to update the internal state of the cdc. // But event.Query will be emit to downstream(out of cdc), we must make it correct. event.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`", args.job.InvolvingSchemaInfo[0].Database, args.job.InvolvingSchemaInfo[0].Table, - event.CurrentSchemaName, event.CurrentTableName) + event.SchemaName, event.TableName) } return event @@ -589,7 +583,7 @@ func buildPersistedDDLEventForRenameTable(args buildPersistedDDLEventFuncArgs) P func buildPersistedDDLEventForNormalPartitionDDL(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventForNormalDDLOnSingleTable(args) - for id := range args.partitionMap[event.CurrentTableID] { + for id := range args.partitionMap[event.TableID] { event.PrevPartitions = append(event.PrevPartitions, id) } return event @@ -597,17 +591,35 @@ func buildPersistedDDLEventForNormalPartitionDDL(args buildPersistedDDLEventFunc func buildPersistedDDLEventForExchangePartition(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) - event.PrevSchemaID = event.CurrentSchemaID - event.PrevTableID = event.CurrentTableID - event.PrevSchemaName = getSchemaName(args.databaseMap, event.PrevSchemaID) - event.PrevTableName = getTableName(args.tableMap, event.PrevTableID) - event.CurrentTableID = event.TableInfo.ID - event.CurrentSchemaID = getSchemaID(args.tableMap, event.TableInfo.ID) - event.CurrentTableName = getTableName(args.tableMap, event.TableInfo.ID) - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) - for id := range args.partitionMap[event.CurrentTableID] { + event.TableName = getTableName(args.tableMap, event.TableID) + event.SchemaID = getSchemaID(args.tableMap, event.TableID) + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + + event.ExtraTableID = event.TableInfo.ID + event.ExtraTableName = getTableName(args.tableMap, event.ExtraTableID) + event.ExtraSchemaID = getSchemaID(args.tableMap, event.ExtraTableID) + event.ExtraSchemaName = getSchemaName(args.databaseMap, event.ExtraSchemaID) + for id := range args.partitionMap[event.ExtraTableID] { event.PrevPartitions = append(event.PrevPartitions, id) } + if event.Query != "" { + upperQuery := strings.ToUpper(event.Query) + idx1 := strings.Index(upperQuery, "EXCHANGE PARTITION") + len("EXCHANGE PARTITION") + idx2 := strings.Index(upperQuery, "WITH TABLE") + + // Note that partition name should be parsed from original query, not the upperQuery. + partName := strings.TrimSpace(event.Query[idx1:idx2]) + partName = strings.Replace(partName, "`", "", -1) + event.Query = fmt.Sprintf("ALTER TABLE `%s`.`%s` EXCHANGE PARTITION `%s` WITH TABLE `%s`.`%s`", + event.ExtraSchemaName, event.ExtraTableName, partName, event.SchemaName, event.TableName) + + if strings.HasSuffix(upperQuery, "WITHOUT VALIDATION") { + event.Query += " WITHOUT VALIDATION" + } + } else { + log.Warn("exchange partition query is empty, should only happen in unit tests", + zap.Int64("jobID", event.ID)) + } return event } @@ -629,15 +641,15 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) var querys []string for i, tableInfo := range args.job.BinlogInfo.MultipleTableInfos { info := renameArgs.RenameTableInfos[i] - prevSchemaID := getSchemaID(args.tableMap, tableInfo.ID) - event.PrevSchemaIDs = append(event.PrevSchemaIDs, prevSchemaID) - event.PrevSchemaNames = append(event.PrevSchemaNames, getSchemaName(args.databaseMap, prevSchemaID)) - prevTableName := getTableName(args.tableMap, tableInfo.ID) - event.PrevTableNames = append(event.PrevTableNames, prevTableName) - event.CurrentSchemaIDs = append(event.CurrentSchemaIDs, info.NewSchemaID) - currentSchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) - event.CurrentSchemaNames = append(event.CurrentSchemaNames, currentSchemaName) - querys = append(querys, fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", info.OldSchemaName.O, prevTableName, currentSchemaName, tableInfo.Name.L)) + extraSchemaID := getSchemaID(args.tableMap, tableInfo.ID) + event.ExtraSchemaIDs = append(event.ExtraSchemaIDs, extraSchemaID) + event.ExtraSchemaNames = append(event.ExtraSchemaNames, getSchemaName(args.databaseMap, extraSchemaID)) + extraTableName := getTableName(args.tableMap, tableInfo.ID) + event.ExtraTableNames = append(event.ExtraTableNames, extraTableName) + event.SchemaIDs = append(event.SchemaIDs, info.NewSchemaID) + SchemaName := getSchemaName(args.databaseMap, info.NewSchemaID) + event.SchemaNames = append(event.SchemaNames, SchemaName) + querys = append(querys, fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`;", info.OldSchemaName.O, extraTableName, SchemaName, tableInfo.Name.L)) } event.Query = strings.Join(querys, "") @@ -647,24 +659,24 @@ func buildPersistedDDLEventForRenameTables(args buildPersistedDDLEventFuncArgs) func buildPersistedDDLEventForCreateTables(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) event.MultipleTableInfos = args.job.BinlogInfo.MultipleTableInfos return event } func buildPersistedDDLEventForAlterTablePartitioning(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) - event.PrevTableID = event.CurrentTableID - event.CurrentTableID = event.TableInfo.ID - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) - event.CurrentTableName = getTableName(args.tableMap, event.PrevTableID) - if event.CurrentTableName != event.TableInfo.Name.O { + event.ExtraTableID = event.TableID + event.TableID = event.TableInfo.ID + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + event.TableName = getTableName(args.tableMap, event.ExtraTableID) + if event.TableName != event.TableInfo.Name.O { log.Panic("table name should not change", - zap.String("prevTableName", event.CurrentTableName), + zap.String("ExtraTableName", event.TableName), zap.String("tableName", event.TableInfo.Name.O)) } // prev table may be a normal table or a partition table - if partitions, ok := args.partitionMap[event.PrevTableID]; ok { + if partitions, ok := args.partitionMap[event.ExtraTableID]; ok { for id := range partitions { event.PrevPartitions = append(event.PrevPartitions, id) } @@ -674,18 +686,18 @@ func buildPersistedDDLEventForAlterTablePartitioning(args buildPersistedDDLEvent func buildPersistedDDLEventForRemovePartitioning(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) - event.PrevTableID = event.CurrentTableID - event.CurrentTableID = event.TableInfo.ID - event.CurrentSchemaName = getSchemaName(args.databaseMap, event.CurrentSchemaID) - event.CurrentTableName = getTableName(args.tableMap, event.PrevTableID) - if event.CurrentTableName != event.TableInfo.Name.O { + event.ExtraTableID = event.TableID + event.TableID = event.TableInfo.ID + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + event.TableName = getTableName(args.tableMap, event.ExtraTableID) + if event.TableName != event.TableInfo.Name.O { log.Panic("table name should not change", - zap.String("prevTableName", event.CurrentTableName), + zap.String("ExtraTableName", event.TableName), zap.String("tableName", event.TableInfo.Name.O)) } - partitions, ok := args.partitionMap[event.PrevTableID] + partitions, ok := args.partitionMap[event.ExtraTableID] if !ok { - log.Panic("table is not a partition table", zap.Int64("tableID", event.PrevTableID)) + log.Panic("table is not a partition table", zap.Int64("tableID", event.ExtraTableID)) } for id := range partitions { event.PrevPartitions = append(event.PrevPartitions, id) @@ -721,7 +733,7 @@ func updateDDLHistoryForTableTriggerOnlyDDL(args updateDDLHistoryFuncArgs) []uin func updateDDLHistoryForSchemaDDL(args updateDDLHistoryFuncArgs) []uint64 { args.appendTableTriggerDDLHistory(args.ddlEvent.FinishedTs) - for tableID := range args.databaseMap[args.ddlEvent.CurrentSchemaID].Tables { + for tableID := range args.databaseMap[args.ddlEvent.SchemaID].Tables { if partitionInfo, ok := args.partitionMap[tableID]; ok { for id := range partitionInfo { args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, id) @@ -743,7 +755,7 @@ func updateDDLHistoryForAddDropTable(args updateDDLHistoryFuncArgs) []uint64 { args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, partitionID) } } else { - args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.CurrentTableID) + args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.TableID) } return args.tableTriggerDDLHistory } @@ -754,7 +766,7 @@ func updateDDLHistoryForNormalDDLOnSingleTable(args updateDDLHistoryFuncArgs) [] args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, partitionID) } } else { - args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.CurrentTableID) + args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.TableID) } return args.tableTriggerDDLHistory } @@ -765,7 +777,7 @@ func updateDDLHistoryForTruncateTable(args updateDDLHistoryFuncArgs) []uint64 { args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, getAllPartitionIDs(args.ddlEvent.TableInfo)...) args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.PrevPartitions...) } else { - args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.CurrentTableID, args.ddlEvent.PrevTableID) + args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.TableID, args.ddlEvent.ExtraTableID) } return args.tableTriggerDDLHistory } @@ -804,7 +816,7 @@ func updateDDLHistoryForExchangeTablePartition(args updateDDLHistoryFuncArgs) [] if len(droppedIDs) != 1 { log.Panic("exchange table partition should only drop one partition", zap.Int64s("droppedIDs", droppedIDs)) } - args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, droppedIDs[0], args.ddlEvent.PrevTableID) + args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, droppedIDs[0], args.ddlEvent.TableID) return args.tableTriggerDDLHistory } @@ -848,7 +860,7 @@ func updateDDLHistoryForAlterTablePartitioning(args updateDDLHistoryFuncArgs) [] if len(args.ddlEvent.PrevPartitions) > 0 { args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.PrevPartitions...) } else { - args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.PrevTableID) + args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.ExtraTableID) } args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, getAllPartitionIDs(args.ddlEvent.TableInfo)...) return args.tableTriggerDDLHistory @@ -857,7 +869,7 @@ func updateDDLHistoryForAlterTablePartitioning(args updateDDLHistoryFuncArgs) [] func updateDDLHistoryForRemovePartitioning(args updateDDLHistoryFuncArgs) []uint64 { args.appendTableTriggerDDLHistory(args.ddlEvent.FinishedTs) args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.PrevPartitions...) - args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.CurrentTableID) + args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.TableID) return args.tableTriggerDDLHistory } @@ -867,7 +879,7 @@ func updateDDLHistoryForAlterTableTTL(args updateDDLHistoryFuncArgs) []uint64 { args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, partitionID) } } else { - args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.CurrentTableID) + args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.TableID) } return args.tableTriggerDDLHistory } @@ -876,14 +888,14 @@ func updateDDLHistoryForAlterTableTTL(args updateDDLHistoryFuncArgs) []uint64 { // updateDatabaseInfoAndTableInfoFunc begin // ======= func updateSchemaMetadataForCreateSchema(args updateSchemaMetadataFuncArgs) { - args.databaseMap[args.event.CurrentSchemaID] = &BasicDatabaseInfo{ - Name: args.event.CurrentSchemaName, + args.databaseMap[args.event.SchemaID] = &BasicDatabaseInfo{ + Name: args.event.SchemaName, Tables: make(map[int64]bool), } } func updateSchemaMetadataForDropSchema(args updateSchemaMetadataFuncArgs) { - schemaID := args.event.CurrentSchemaID + schemaID := args.event.SchemaID for tableID := range args.databaseMap[schemaID].Tables { delete(args.tableMap, tableID) delete(args.partitionMap, tableID) @@ -892,8 +904,8 @@ func updateSchemaMetadataForDropSchema(args updateSchemaMetadataFuncArgs) { } func updateSchemaMetadataForNewTableDDL(args updateSchemaMetadataFuncArgs) { - tableID := args.event.CurrentTableID - schemaID := args.event.CurrentSchemaID + tableID := args.event.TableID + schemaID := args.event.SchemaID args.addTableToDB(tableID, schemaID) args.tableMap[tableID] = &BasicTableInfo{ SchemaID: schemaID, @@ -909,8 +921,8 @@ func updateSchemaMetadataForNewTableDDL(args updateSchemaMetadataFuncArgs) { } func updateSchemaMetadataForDropTable(args updateSchemaMetadataFuncArgs) { - tableID := args.event.CurrentTableID - schemaID := args.event.CurrentSchemaID + tableID := args.event.TableID + schemaID := args.event.SchemaID args.removeTableFromDB(tableID, schemaID) delete(args.tableMap, tableID) if isPartitionTable(args.event.TableInfo) { @@ -921,9 +933,9 @@ func updateSchemaMetadataForDropTable(args updateSchemaMetadataFuncArgs) { func updateSchemaMetadataIgnore(args updateSchemaMetadataFuncArgs) {} func updateSchemaMetadataForTruncateTable(args updateSchemaMetadataFuncArgs) { - oldTableID := args.event.PrevTableID - newTableID := args.event.CurrentTableID - schemaID := args.event.CurrentSchemaID + oldTableID := args.event.TableID + newTableID := args.event.ExtraTableID + schemaID := args.event.SchemaID args.removeTableFromDB(oldTableID, schemaID) delete(args.tableMap, oldTableID) args.addTableToDB(newTableID, schemaID) @@ -942,31 +954,31 @@ func updateSchemaMetadataForTruncateTable(args updateSchemaMetadataFuncArgs) { } func updateSchemaMetadataForRenameTable(args updateSchemaMetadataFuncArgs) { - tableID := args.event.CurrentTableID - if args.event.PrevSchemaID != args.event.CurrentSchemaID { - args.tableMap[tableID].SchemaID = args.event.CurrentSchemaID - args.removeTableFromDB(tableID, args.event.PrevSchemaID) - args.addTableToDB(tableID, args.event.CurrentSchemaID) + tableID := args.event.TableID + if args.event.ExtraSchemaID != args.event.SchemaID { + args.tableMap[tableID].SchemaID = args.event.SchemaID + args.removeTableFromDB(tableID, args.event.ExtraSchemaID) + args.addTableToDB(tableID, args.event.SchemaID) } - args.tableMap[tableID].Name = args.event.CurrentTableName + args.tableMap[tableID].Name = args.event.TableName } func updateSchemaMetadataForAddPartition(args updateSchemaMetadataFuncArgs) { newCreatedIDs := getCreatedIDs(args.event.PrevPartitions, getAllPartitionIDs(args.event.TableInfo)) for _, id := range newCreatedIDs { - args.partitionMap[args.event.CurrentTableID][id] = nil + args.partitionMap[args.event.TableID][id] = nil } } func updateSchemaMetadataForDropPartition(args updateSchemaMetadataFuncArgs) { droppedIDs := getDroppedIDs(args.event.PrevPartitions, getAllPartitionIDs(args.event.TableInfo)) for _, id := range droppedIDs { - delete(args.partitionMap[args.event.CurrentTableID], id) + delete(args.partitionMap[args.event.TableID], id) } } func updateSchemaMetadataForTruncateTablePartition(args updateSchemaMetadataFuncArgs) { - tableID := args.event.CurrentTableID + tableID := args.event.TableID physicalIDs := getAllPartitionIDs(args.event.TableInfo) droppedIDs := getDroppedIDs(args.event.PrevPartitions, physicalIDs) for _, id := range droppedIDs { @@ -985,19 +997,19 @@ func updateSchemaMetadataForExchangeTablePartition(args updateSchemaMetadataFunc log.Panic("exchange table partition should only drop one partition", zap.Int64s("droppedIDs", droppedIDs)) } - normalTableID := args.event.PrevTableID - normalSchemaID := args.event.PrevSchemaID - partitionID := droppedIDs[0] + normalTableID := args.event.TableID + normalSchemaID := args.event.SchemaID normalTableName := getTableName(args.tableMap, normalTableID) + partitionTableID := args.event.ExtraTableID + targetPartitionID := droppedIDs[0] args.removeTableFromDB(normalTableID, normalSchemaID) delete(args.tableMap, normalTableID) - args.addTableToDB(partitionID, normalSchemaID) - args.tableMap[partitionID] = &BasicTableInfo{ + args.addTableToDB(targetPartitionID, normalSchemaID) + args.tableMap[targetPartitionID] = &BasicTableInfo{ SchemaID: normalSchemaID, Name: normalTableName, } - partitionTableID := args.event.CurrentTableID - delete(args.partitionMap[partitionTableID], partitionID) + delete(args.partitionMap[partitionTableID], targetPartitionID) args.partitionMap[partitionTableID][normalTableID] = nil } @@ -1006,10 +1018,10 @@ func updateSchemaMetadataForRenameTables(args updateSchemaMetadataFuncArgs) { log.Panic("multiple table infos should not be nil") } for i, info := range args.event.MultipleTableInfos { - if args.event.PrevSchemaIDs[i] != args.event.CurrentSchemaIDs[i] { - args.tableMap[info.ID].SchemaID = args.event.CurrentSchemaIDs[i] - args.removeTableFromDB(info.ID, args.event.PrevSchemaIDs[i]) - args.addTableToDB(info.ID, args.event.CurrentSchemaIDs[i]) + if args.event.ExtraSchemaIDs[i] != args.event.SchemaIDs[i] { + args.tableMap[info.ID].SchemaID = args.event.SchemaIDs[i] + args.removeTableFromDB(info.ID, args.event.ExtraSchemaIDs[i]) + args.addTableToDB(info.ID, args.event.SchemaIDs[i]) } args.tableMap[info.ID].Name = info.Name.O } @@ -1020,9 +1032,9 @@ func updateSchemaMetadataForCreateTables(args updateSchemaMetadataFuncArgs) { log.Panic("multiple table infos should not be nil") } for _, info := range args.event.MultipleTableInfos { - args.addTableToDB(info.ID, args.event.CurrentSchemaID) + args.addTableToDB(info.ID, args.event.SchemaID) args.tableMap[info.ID] = &BasicTableInfo{ - SchemaID: args.event.CurrentSchemaID, + SchemaID: args.event.SchemaID, Name: info.Name.O, } if isPartitionTable(info) { @@ -1036,7 +1048,7 @@ func updateSchemaMetadataForCreateTables(args updateSchemaMetadataFuncArgs) { } func updateSchemaMetadataForReorganizePartition(args updateSchemaMetadataFuncArgs) { - tableID := args.event.CurrentTableID + tableID := args.event.TableID physicalIDs := getAllPartitionIDs(args.event.TableInfo) droppedIDs := getDroppedIDs(args.event.PrevPartitions, physicalIDs) for _, id := range droppedIDs { @@ -1050,17 +1062,17 @@ func updateSchemaMetadataForReorganizePartition(args updateSchemaMetadataFuncArg func updateSchemaMetadataForAlterTablePartitioning(args updateSchemaMetadataFuncArgs) { // drop old table and its partitions(if old table is a partition table) - oldTableID := args.event.PrevTableID - schemaID := args.event.CurrentSchemaID + oldTableID := args.event.ExtraTableID + schemaID := args.event.SchemaID args.removeTableFromDB(oldTableID, schemaID) delete(args.tableMap, oldTableID) delete(args.partitionMap, oldTableID) // add new normal table - newTableID := args.event.CurrentTableID + newTableID := args.event.TableID args.addTableToDB(newTableID, schemaID) args.tableMap[newTableID] = &BasicTableInfo{ - SchemaID: args.event.CurrentSchemaID, - Name: args.event.CurrentTableName, + SchemaID: args.event.SchemaID, + Name: args.event.TableName, } args.partitionMap[newTableID] = make(BasicPartitionInfo) for _, id := range getAllPartitionIDs(args.event.TableInfo) { @@ -1070,17 +1082,17 @@ func updateSchemaMetadataForAlterTablePartitioning(args updateSchemaMetadataFunc func updateSchemaMetadataForRemovePartitioning(args updateSchemaMetadataFuncArgs) { // drop old partition table and its partitions - oldTableID := args.event.PrevTableID - schemaID := args.event.CurrentSchemaID + oldTableID := args.event.ExtraTableID + schemaID := args.event.SchemaID args.removeTableFromDB(oldTableID, schemaID) delete(args.tableMap, oldTableID) delete(args.partitionMap, oldTableID) // add new normal table - newTableID := args.event.CurrentTableID + newTableID := args.event.TableID args.addTableToDB(newTableID, schemaID) args.tableMap[newTableID] = &BasicTableInfo{ - SchemaID: args.event.CurrentSchemaID, - Name: args.event.CurrentTableName, + SchemaID: args.event.SchemaID, + Name: args.event.TableName, } } @@ -1094,7 +1106,7 @@ func iterateEventTablesForSingleTableDDL(event *PersistedDDLEvent, apply func(ta if isPartitionTable(event.TableInfo) { apply(getAllPartitionIDs(event.TableInfo)...) } else { - apply(event.CurrentTableID) + apply(event.TableID) } } @@ -1103,7 +1115,7 @@ func iterateEventTablesForTruncateTable(event *PersistedDDLEvent, apply func(tab apply(event.PrevPartitions...) apply(getAllPartitionIDs(event.TableInfo)...) } else { - apply(event.PrevTableID, event.CurrentTableID) + apply(event.ExtraTableID, event.TableID) } } @@ -1133,7 +1145,7 @@ func iterateEventTablesForExchangeTablePartition(event *PersistedDDLEvent, apply zap.Int64s("droppedIDs", droppedIDs)) } targetPartitionID := droppedIDs[0] - apply(targetPartitionID, event.PrevTableID) + apply(targetPartitionID, event.TableID) } func iterateEventTablesForRenameTables(event *PersistedDDLEvent, apply func(tableId ...int64)) { @@ -1168,14 +1180,14 @@ func iterateEventTablesForAlterTablePartitioning(event *PersistedDDLEvent, apply if len(event.PrevPartitions) > 0 { apply(event.PrevPartitions...) } else { - apply(event.PrevTableID) + apply(event.ExtraTableID) } apply(getAllPartitionIDs(event.TableInfo)...) } func iterateEventTablesForRemovePartitioning(event *PersistedDDLEvent, apply func(tableId ...int64)) { apply(event.PrevPartitions...) - apply(event.CurrentTableID) + apply(event.TableID) } // ======= @@ -1186,12 +1198,12 @@ func extractTableInfoFuncForSingleTableDDL(event *PersistedDDLEvent, tableID int if isPartitionTable(event.TableInfo) { for _, partitionID := range getAllPartitionIDs(event.TableInfo) { if tableID == partitionID { - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo), false + return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false } } } else { - if tableID == event.CurrentTableID { - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo), false + if tableID == event.TableID { + return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false } } log.Panic("should not reach here", zap.Any("event", event), zap.Int64("tableID", tableID)) @@ -1199,9 +1211,9 @@ func extractTableInfoFuncForSingleTableDDL(event *PersistedDDLEvent, tableID int } func extractTableInfoFuncForExchangeTablePartition(event *PersistedDDLEvent, tableID int64) (*common.TableInfo, bool) { - if tableID == event.PrevTableID { + if tableID == event.TableID { // old normal table id, return the table info of the partition table - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo), false + return common.WrapTableInfo(event.ExtraSchemaID, event.ExtraSchemaName, event.TableInfo), false } else { physicalIDs := getAllPartitionIDs(event.TableInfo) droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) @@ -1212,15 +1224,15 @@ func extractTableInfoFuncForExchangeTablePartition(event *PersistedDDLEvent, tab if tableID != droppedIDs[0] { log.Panic("should not reach here", zap.Int64("tableID", tableID), zap.Int64("expectedPartitionID", droppedIDs[0])) } - if event.PreTableInfo == nil { - log.Panic("cannot find pre table info", zap.Int64("tableID", tableID)) + if event.ExtraTableInfo == nil { + log.Panic("cannot find extra table info", zap.Int64("tableID", tableID)) } // old partition id, return the table info of the normal table - columnSchema := event.PreTableInfo.ShadowCopyColumnSchema() + columnSchema := event.ExtraTableInfo.ShadowCopyColumnSchema() tableInfo := common.NewTableInfo( - event.PrevSchemaID, - event.PrevSchemaName, - pmodel.NewCIStr(event.PrevTableName).O, + event.SchemaID, + event.SchemaName, + pmodel.NewCIStr(event.TableName).O, tableID, false, columnSchema) @@ -1233,7 +1245,7 @@ func extractTableInfoFuncIgnore(event *PersistedDDLEvent, tableID int64) (*commo } func extractTableInfoFuncForDropTable(event *PersistedDDLEvent, tableID int64) (*common.TableInfo, bool) { - if event.CurrentTableID == tableID { + if event.TableID == tableID { return nil, true } log.Panic("should not reach here", zap.Int64("tableID", tableID)) @@ -1244,14 +1256,14 @@ func extractTableInfoFuncForTruncateTable(event *PersistedDDLEvent, tableID int6 if isPartitionTable(event.TableInfo) { for _, partitionID := range getAllPartitionIDs(event.TableInfo) { if tableID == partitionID { - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo), false + return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false } } return nil, true } else { - if tableID == event.CurrentTableID { - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo), false - } else if tableID == event.PrevTableID { + if tableID == event.ExtraTableID { + return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false + } else if tableID == event.TableID { return nil, true } } @@ -1263,7 +1275,7 @@ func extractTableInfoFuncForAddPartition(event *PersistedDDLEvent, tableID int64 newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo)) for _, partition := range newCreatedIDs { if tableID == partition { - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo), false + return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false } } return nil, false @@ -1290,7 +1302,7 @@ func extractTableInfoFuncForTruncateAndReorganizePartition(event *PersistedDDLEv newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs) for _, partition := range newCreatedIDs { if tableID == partition { - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo), false + return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false } } return nil, false @@ -1301,12 +1313,12 @@ func extractTableInfoFuncForRenameTables(event *PersistedDDLEvent, tableID int64 if isPartitionTable(tableInfo) { for _, partitionID := range getAllPartitionIDs(tableInfo) { if tableID == partitionID { - return common.WrapTableInfo(event.CurrentSchemaIDs[i], event.CurrentSchemaNames[i], tableInfo), false + return common.WrapTableInfo(event.SchemaIDs[i], event.SchemaNames[i], tableInfo), false } } } else { if tableID == tableInfo.ID { - return common.WrapTableInfo(event.CurrentSchemaIDs[i], event.CurrentSchemaNames[i], tableInfo), false + return common.WrapTableInfo(event.SchemaIDs[i], event.SchemaNames[i], tableInfo), false } } } @@ -1319,12 +1331,12 @@ func extractTableInfoFuncForCreateTables(event *PersistedDDLEvent, tableID int64 if isPartitionTable(tableInfo) { for _, partitionID := range getAllPartitionIDs(tableInfo) { if tableID == partitionID { - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, tableInfo), false + return common.WrapTableInfo(event.SchemaID, event.SchemaName, tableInfo), false } } } else { if tableID == tableInfo.ID { - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, tableInfo), false + return common.WrapTableInfo(event.SchemaID, event.SchemaName, tableInfo), false } } } @@ -1340,13 +1352,13 @@ func extractTableInfoFuncForAlterTablePartitioning(event *PersistedDDLEvent, tab } } } else { - if tableID == event.PrevTableID { + if tableID == event.ExtraTableID { return nil, true } } for _, partitionID := range getAllPartitionIDs(event.TableInfo) { if tableID == partitionID { - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo), false + return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false } } log.Panic("should not reach here", zap.Int64("tableID", tableID)) @@ -1354,8 +1366,8 @@ func extractTableInfoFuncForAlterTablePartitioning(event *PersistedDDLEvent, tab } func extractTableInfoFuncForRemovePartitioning(event *PersistedDDLEvent, tableID int64) (*common.TableInfo, bool) { - if event.CurrentTableID == tableID { - return common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.TableInfo), false + if event.TableID == tableID { + return common.WrapTableInfo(event.SchemaID, event.SchemaName, event.TableInfo), false } else { for _, partitionID := range event.PrevPartitions { if tableID == partitionID { @@ -1376,27 +1388,27 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, // Note: not all ddl types will respect the `filtered` result, example: create tables, rename tables filtered := false // TODO: ShouldDiscardDDL is used for old architecture, should be removed later - if tableFilter != nil && rawEvent.CurrentSchemaName != "" && rawEvent.CurrentTableName != "" { - filtered = tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, rawEvent.CurrentTableName, rawEvent.TableInfo) + if tableFilter != nil && rawEvent.SchemaName != "" && rawEvent.TableName != "" { + filtered = tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.SchemaName, rawEvent.TableName, rawEvent.TableInfo) // if the ddl invovles another table name, only set filtered to true when all of them should be filtered - if rawEvent.PrevSchemaName != "" && rawEvent.PrevTableName != "" { - filtered = filtered && tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.PrevSchemaName, rawEvent.PrevTableName, rawEvent.TableInfo) + if rawEvent.ExtraSchemaName != "" && rawEvent.ExtraTableName != "" { + filtered = filtered && tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.ExtraSchemaName, rawEvent.ExtraTableName, rawEvent.TableInfo) } } if rawEvent.TableInfo != nil { wrapTableInfo = common.WrapTableInfo( - rawEvent.CurrentSchemaID, - rawEvent.CurrentSchemaName, + rawEvent.SchemaID, + rawEvent.SchemaName, rawEvent.TableInfo) } return commonEvent.DDLEvent{ Type: rawEvent.Type, // TODO: whether the following four fields are needed - SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.CurrentTableID, - SchemaName: rawEvent.CurrentSchemaName, - TableName: rawEvent.CurrentTableName, + SchemaID: rawEvent.SchemaID, + TableID: rawEvent.TableID, + SchemaName: rawEvent.SchemaName, + TableName: rawEvent.TableName, Query: rawEvent.Query, TableInfo: wrapTableInfo, @@ -1424,14 +1436,14 @@ func buildDDLEventForDropSchema(rawEvent *PersistedDDLEvent, tableFilter filter. } ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeDB, - SchemaID: rawEvent.CurrentSchemaID, + SchemaID: rawEvent.SchemaID, } ddlEvent.NeedDroppedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeDB, - SchemaID: rawEvent.CurrentSchemaID, + SchemaID: rawEvent.SchemaID, } ddlEvent.TableNameChange = &commonEvent.TableNameChange{ - DropDatabaseName: rawEvent.CurrentSchemaName, + DropDatabaseName: rawEvent.SchemaName, } return ddlEvent, true } @@ -1445,7 +1457,7 @@ func buildDDLEventForModifySchemaCharsetAndCollate( } ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeDB, - SchemaID: rawEvent.CurrentSchemaID, + SchemaID: rawEvent.SchemaID, } return ddlEvent, true } @@ -1464,23 +1476,23 @@ func buildDDLEventForNewTableDDL(rawEvent *PersistedDDLEvent, tableFilter filter ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, len(physicalIDs)) for _, id := range physicalIDs { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, commonEvent.Table{ - SchemaID: rawEvent.CurrentSchemaID, + SchemaID: rawEvent.SchemaID, TableID: id, }) } } else { ddlEvent.NeedAddedTables = []commonEvent.Table{ { - SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.CurrentTableID, + SchemaID: rawEvent.SchemaID, + TableID: rawEvent.TableID, }, } } ddlEvent.TableNameChange = &commonEvent.TableNameChange{ AddName: []commonEvent.SchemaTableName{ { - SchemaName: rawEvent.CurrentSchemaName, - TableName: rawEvent.CurrentTableName, + SchemaName: rawEvent.SchemaName, + TableName: rawEvent.TableName, }, }, } @@ -1508,18 +1520,18 @@ func buildDDLEventForDropTable(rawEvent *PersistedDDLEvent, tableFilter filter.F } else { ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.CurrentTableID, heartbeatpb.DDLSpan.TableID}, + TableIDs: []int64{rawEvent.TableID, heartbeatpb.DDLSpan.TableID}, } ddlEvent.NeedDroppedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.CurrentTableID}, + TableIDs: []int64{rawEvent.TableID}, } } ddlEvent.TableNameChange = &commonEvent.TableNameChange{ DropName: []commonEvent.SchemaTableName{ { - SchemaName: rawEvent.CurrentSchemaName, - TableName: rawEvent.CurrentTableName, + SchemaName: rawEvent.SchemaName, + TableName: rawEvent.TableName, }, }, } @@ -1533,7 +1545,7 @@ func buildDDLEventForNormalDDLOnSingleTable(rawEvent *PersistedDDLEvent, tableFi } ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.CurrentTableID}, + TableIDs: []int64{rawEvent.TableID}, } return ddlEvent, true } @@ -1545,7 +1557,7 @@ func buildDDLEventForNormalDDLOnSingleTableForTiDB(rawEvent *PersistedDDLEvent, } ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.CurrentTableID}, + TableIDs: []int64{rawEvent.TableID}, } return ddlEvent, true } @@ -1572,24 +1584,24 @@ func buildDDLEventForTruncateTable(rawEvent *PersistedDDLEvent, tableFilter filt ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, len(physicalIDs)) for _, id := range physicalIDs { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, commonEvent.Table{ - SchemaID: rawEvent.CurrentSchemaID, + SchemaID: rawEvent.SchemaID, TableID: id, }) } } else { ddlEvent.NeedDroppedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.PrevTableID}, + TableIDs: []int64{rawEvent.TableID}, } ddlEvent.NeedAddedTables = []commonEvent.Table{ { - SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.CurrentTableID, + SchemaID: rawEvent.SchemaID, + TableID: rawEvent.ExtraTableID, }, } ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.PrevTableID, heartbeatpb.DDLSpan.TableID}, + TableIDs: []int64{rawEvent.TableID, heartbeatpb.DDLSpan.TableID}, } } return ddlEvent, true @@ -1600,10 +1612,10 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter if !ok { return ddlEvent, false } - ddlEvent.PrevSchemaName = rawEvent.PrevSchemaName - ddlEvent.PrevTableName = rawEvent.PrevTableName - ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName, rawEvent.TableInfo) - ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName, rawEvent.TableInfo) + ddlEvent.ExtraSchemaName = rawEvent.ExtraSchemaName + ddlEvent.ExtraTableName = rawEvent.ExtraTableName + ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.ExtraSchemaName, rawEvent.ExtraTableName, rawEvent.TableInfo) + ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.SchemaName, rawEvent.TableName, rawEvent.TableInfo) if isPartitionTable(rawEvent.TableInfo) { allPhysicalIDs := getAllPartitionIDs(rawEvent.TableInfo) if !ignorePrevTable { @@ -1616,27 +1628,27 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter } if !ignoreCurrentTable { // check whether schema change - if rawEvent.PrevSchemaID != rawEvent.CurrentSchemaID { + if rawEvent.ExtraSchemaID != rawEvent.SchemaID { ddlEvent.UpdatedSchemas = make([]commonEvent.SchemaIDChange, 0, len(allPhysicalIDs)) for _, id := range allPhysicalIDs { ddlEvent.UpdatedSchemas = append(ddlEvent.UpdatedSchemas, commonEvent.SchemaIDChange{ TableID: id, - OldSchemaID: rawEvent.PrevSchemaID, - NewSchemaID: rawEvent.CurrentSchemaID, + OldSchemaID: rawEvent.ExtraSchemaID, + NewSchemaID: rawEvent.SchemaID, }) } } ddlEvent.TableNameChange = &commonEvent.TableNameChange{ AddName: []commonEvent.SchemaTableName{ { - SchemaName: rawEvent.CurrentSchemaName, - TableName: rawEvent.CurrentTableName, + SchemaName: rawEvent.SchemaName, + TableName: rawEvent.TableName, }, }, DropName: []commonEvent.SchemaTableName{ { - SchemaName: rawEvent.PrevSchemaName, - TableName: rawEvent.PrevTableName, + SchemaName: rawEvent.ExtraSchemaName, + TableName: rawEvent.ExtraTableName, }, }, } @@ -1649,50 +1661,50 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter ddlEvent.TableNameChange = &commonEvent.TableNameChange{ DropName: []commonEvent.SchemaTableName{ { - SchemaName: rawEvent.PrevSchemaName, - TableName: rawEvent.PrevTableName, + SchemaName: rawEvent.ExtraSchemaName, + TableName: rawEvent.ExtraTableName, }, }, } } } else if !ignoreCurrentTable { // ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl - ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query) + ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query) } else { // if the table is both filtered out before and after rename table, the ddl should not be fetched log.Panic("should not build a ignored rename table ddl", zap.String("DDL", rawEvent.Query), zap.Int64("jobID", rawEvent.ID), - zap.Int64("schemaID", rawEvent.CurrentSchemaID), - zap.Int64("tableID", rawEvent.CurrentTableID)) + zap.Int64("schemaID", rawEvent.SchemaID), + zap.Int64("tableID", rawEvent.TableID)) } } else { if !ignorePrevTable { ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.CurrentTableID, heartbeatpb.DDLSpan.TableID}, + TableIDs: []int64{rawEvent.TableID, heartbeatpb.DDLSpan.TableID}, } if !ignoreCurrentTable { - if rawEvent.PrevSchemaID != rawEvent.CurrentSchemaID { + if rawEvent.ExtraSchemaID != rawEvent.SchemaID { ddlEvent.UpdatedSchemas = []commonEvent.SchemaIDChange{ { - TableID: rawEvent.CurrentTableID, - OldSchemaID: rawEvent.PrevSchemaID, - NewSchemaID: rawEvent.CurrentSchemaID, + TableID: rawEvent.TableID, + OldSchemaID: rawEvent.ExtraSchemaID, + NewSchemaID: rawEvent.SchemaID, }, } } ddlEvent.TableNameChange = &commonEvent.TableNameChange{ AddName: []commonEvent.SchemaTableName{ { - SchemaName: rawEvent.CurrentSchemaName, - TableName: rawEvent.CurrentTableName, + SchemaName: rawEvent.SchemaName, + TableName: rawEvent.TableName, }, }, DropName: []commonEvent.SchemaTableName{ { - SchemaName: rawEvent.PrevSchemaName, - TableName: rawEvent.PrevTableName, + SchemaName: rawEvent.ExtraSchemaName, + TableName: rawEvent.ExtraTableName, }, }, } @@ -1700,27 +1712,27 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter // the table is filtered out after rename table, we need drop the table ddlEvent.NeedDroppedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.CurrentTableID}, + TableIDs: []int64{rawEvent.TableID}, } ddlEvent.TableNameChange = &commonEvent.TableNameChange{ DropName: []commonEvent.SchemaTableName{ { - SchemaName: rawEvent.PrevSchemaName, - TableName: rawEvent.PrevTableName, + SchemaName: rawEvent.ExtraSchemaName, + TableName: rawEvent.ExtraTableName, }, }, } } } else if !ignoreCurrentTable { // ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl - ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query) + ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query) } else { // if the table is both filtered out before and after rename table, the ddl should not be fetched log.Panic("should not build a ignored rename table ddl", zap.String("DDL", rawEvent.Query), zap.Int64("jobID", rawEvent.ID), - zap.Int64("schemaID", rawEvent.CurrentSchemaID), - zap.Int64("tableID", rawEvent.CurrentTableID)) + zap.Int64("schemaID", rawEvent.SchemaID), + zap.Int64("tableID", rawEvent.TableID)) } } return ddlEvent, true @@ -1743,7 +1755,7 @@ func buildDDLEventForAddPartition(rawEvent *PersistedDDLEvent, tableFilter filte ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, len(newCreatedIDs)) for _, id := range newCreatedIDs { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, commonEvent.Table{ - SchemaID: rawEvent.CurrentSchemaID, + SchemaID: rawEvent.SchemaID, TableID: id, }) } @@ -1810,7 +1822,7 @@ func buildDDLEventForTruncateAndReorganizePartition(rawEvent *PersistedDDLEvent, newCreatedIDs := getCreatedIDs(rawEvent.PrevPartitions, physicalIDs) for _, id := range newCreatedIDs { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, commonEvent.Table{ - SchemaID: rawEvent.CurrentSchemaID, + SchemaID: rawEvent.SchemaID, TableID: id, }) } @@ -1827,8 +1839,9 @@ func buildDDLEventForExchangeTablePartition(rawEvent *PersistedDDLEvent, tableFi if !ok { return ddlEvent, false } - ignoreNormalTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName, rawEvent.TableInfo) - ignorePartitionTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName, rawEvent.TableInfo) + // TODO: rawEvent.TableInfo is not correct for ignoreNormalTable + ignoreNormalTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.SchemaName, rawEvent.TableName, rawEvent.TableInfo) + ignorePartitionTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.ExtraSchemaName, rawEvent.ExtraTableName, rawEvent.TableInfo) physicalIDs := getAllPartitionIDs(rawEvent.TableInfo) droppedIDs := getDroppedIDs(rawEvent.PrevPartitions, physicalIDs) if len(droppedIDs) != 1 { @@ -1839,34 +1852,34 @@ func buildDDLEventForExchangeTablePartition(rawEvent *PersistedDDLEvent, tableFi if !ignoreNormalTable && !ignorePartitionTable { ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.PrevTableID, targetPartitionID, heartbeatpb.DDLSpan.TableID}, + TableIDs: []int64{rawEvent.TableID, targetPartitionID, heartbeatpb.DDLSpan.TableID}, } - if rawEvent.CurrentSchemaID != rawEvent.PrevSchemaID { + if rawEvent.SchemaID != rawEvent.ExtraSchemaID { ddlEvent.UpdatedSchemas = []commonEvent.SchemaIDChange{ { TableID: targetPartitionID, - OldSchemaID: rawEvent.CurrentSchemaID, - NewSchemaID: rawEvent.PrevSchemaID, + OldSchemaID: rawEvent.ExtraSchemaID, + NewSchemaID: rawEvent.SchemaID, }, { - TableID: rawEvent.PrevTableID, - OldSchemaID: rawEvent.PrevSchemaID, - NewSchemaID: rawEvent.CurrentSchemaID, + TableID: rawEvent.TableID, + OldSchemaID: rawEvent.SchemaID, + NewSchemaID: rawEvent.ExtraSchemaID, }, } } } else if !ignoreNormalTable { ddlEvent.BlockedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.PrevTableID, heartbeatpb.DDLSpan.TableID}, + TableIDs: []int64{rawEvent.TableID, heartbeatpb.DDLSpan.TableID}, } ddlEvent.NeedDroppedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.PrevTableID}, + TableIDs: []int64{rawEvent.TableID}, } ddlEvent.NeedAddedTables = []commonEvent.Table{ { - SchemaID: rawEvent.PrevSchemaID, + SchemaID: rawEvent.SchemaID, TableID: targetPartitionID, }, } @@ -1881,8 +1894,8 @@ func buildDDLEventForExchangeTablePartition(rawEvent *PersistedDDLEvent, tableFi } ddlEvent.NeedAddedTables = []commonEvent.Table{ { - SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.PrevTableID, + SchemaID: rawEvent.ExtraSchemaID, + TableID: rawEvent.TableID, }, } } else { @@ -1909,8 +1922,8 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte log.Panic("rename tables length is not equal table infos", zap.Any("querys", querys), zap.Any("tableInfos", rawEvent.MultipleTableInfos)) } for i, tableInfo := range rawEvent.MultipleTableInfos { - ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaNames[i], rawEvent.PrevTableNames[i], tableInfo) - ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaNames[i], tableInfo.Name.O, tableInfo) + ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.ExtraSchemaNames[i], rawEvent.ExtraTableNames[i], tableInfo) + ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.SchemaNames[i], tableInfo.Name.O, tableInfo) if ignorePrevTable && ignoreCurrentTable { continue } @@ -1919,26 +1932,26 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte allPhysicalIDs := getAllPartitionIDs(rawEvent.TableInfo) if !ignorePrevTable { resultQuerys = append(resultQuerys, querys[i]) - tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.CurrentSchemaID, rawEvent.CurrentSchemaName, tableInfo)) + tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.SchemaID, rawEvent.SchemaName, tableInfo)) ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, allPhysicalIDs...) if !ignoreCurrentTable { // check whether schema change - if rawEvent.PrevSchemaIDs[i] != rawEvent.CurrentSchemaIDs[i] { + if rawEvent.ExtraSchemaIDs[i] != rawEvent.SchemaIDs[i] { for _, id := range allPhysicalIDs { ddlEvent.UpdatedSchemas = append(ddlEvent.UpdatedSchemas, commonEvent.SchemaIDChange{ TableID: id, - OldSchemaID: rawEvent.PrevSchemaIDs[i], - NewSchemaID: rawEvent.CurrentSchemaIDs[i], + OldSchemaID: rawEvent.ExtraSchemaIDs[i], + NewSchemaID: rawEvent.SchemaIDs[i], }) } } addNames = append(addNames, commonEvent.SchemaTableName{ - SchemaName: rawEvent.CurrentSchemaNames[i], + SchemaName: rawEvent.SchemaNames[i], TableName: tableInfo.Name.O, }) dropNames = append(dropNames, commonEvent.SchemaTableName{ - SchemaName: rawEvent.PrevSchemaNames[i], - TableName: rawEvent.PrevTableNames[i], + SchemaName: rawEvent.ExtraSchemaNames[i], + TableName: rawEvent.ExtraTableNames[i], }) } else { // the table is filtered out after rename table, we need drop the table @@ -1949,36 +1962,36 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte } ddlEvent.NeedDroppedTables.TableIDs = append(ddlEvent.NeedDroppedTables.TableIDs, allPhysicalIDs...) dropNames = append(dropNames, commonEvent.SchemaTableName{ - SchemaName: rawEvent.PrevSchemaNames[i], - TableName: rawEvent.PrevTableNames[i], + SchemaName: rawEvent.ExtraSchemaNames[i], + TableName: rawEvent.ExtraTableNames[i], }) } } else if !ignoreCurrentTable { // ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl - ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query) + ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query) } else { // if the table is both filtered out before and after rename table, ignore } } else { if !ignorePrevTable { resultQuerys = append(resultQuerys, querys[i]) - tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.CurrentSchemaID, rawEvent.CurrentSchemaName, tableInfo)) + tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.SchemaID, rawEvent.SchemaName, tableInfo)) ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, tableInfo.ID) if !ignoreCurrentTable { - if rawEvent.PrevSchemaIDs[i] != rawEvent.CurrentSchemaIDs[i] { + if rawEvent.ExtraSchemaIDs[i] != rawEvent.SchemaIDs[i] { ddlEvent.UpdatedSchemas = append(ddlEvent.UpdatedSchemas, commonEvent.SchemaIDChange{ TableID: tableInfo.ID, - OldSchemaID: rawEvent.PrevSchemaIDs[i], - NewSchemaID: rawEvent.CurrentSchemaIDs[i], + OldSchemaID: rawEvent.ExtraSchemaIDs[i], + NewSchemaID: rawEvent.SchemaIDs[i], }) } addNames = append(addNames, commonEvent.SchemaTableName{ - SchemaName: rawEvent.CurrentSchemaNames[i], + SchemaName: rawEvent.SchemaNames[i], TableName: tableInfo.Name.O, }) dropNames = append(dropNames, commonEvent.SchemaTableName{ - SchemaName: rawEvent.PrevSchemaNames[i], - TableName: rawEvent.PrevTableNames[i], + SchemaName: rawEvent.ExtraSchemaNames[i], + TableName: rawEvent.ExtraTableNames[i], }) } else { // the table is filtered out after rename table, we need drop the table @@ -1989,13 +2002,13 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte } ddlEvent.NeedDroppedTables.TableIDs = append(ddlEvent.NeedDroppedTables.TableIDs, tableInfo.ID) dropNames = append(dropNames, commonEvent.SchemaTableName{ - SchemaName: rawEvent.PrevSchemaNames[i], - TableName: rawEvent.PrevTableNames[i], + SchemaName: rawEvent.ExtraSchemaNames[i], + TableName: rawEvent.ExtraTableNames[i], }) } } else if !ignoreCurrentTable { // ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl - ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.CurrentTableID, rawEvent.Query) + ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query) } else { // ignore } @@ -2025,7 +2038,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte logicalTableCount := 0 allFiltered := true for _, info := range rawEvent.MultipleTableInfos { - if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O, info) { + if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.SchemaName, info.Name.O, info) { continue } allFiltered = false @@ -2054,31 +2067,31 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte resultQuerys := make([]string, 0, logicalTableCount) tableInfos := make([]*common.TableInfo, 0, logicalTableCount) for i, info := range rawEvent.MultipleTableInfos { - if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O, info) { + if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.SchemaName, info.Name.O, info) { log.Info("build ddl event for create tables filter table", - zap.String("schemaName", rawEvent.CurrentSchemaName), + zap.String("schemaName", rawEvent.SchemaName), zap.String("tableName", info.Name.O)) continue } if isPartitionTable(info) { for _, partitionID := range getAllPartitionIDs(info) { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, commonEvent.Table{ - SchemaID: rawEvent.CurrentSchemaID, + SchemaID: rawEvent.SchemaID, TableID: partitionID, }) } } else { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, commonEvent.Table{ - SchemaID: rawEvent.CurrentSchemaID, + SchemaID: rawEvent.SchemaID, TableID: info.ID, }) } addName = append(addName, commonEvent.SchemaTableName{ - SchemaName: rawEvent.CurrentSchemaName, + SchemaName: rawEvent.SchemaName, TableName: info.Name.O, }) resultQuerys = append(resultQuerys, querys[i]) - tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.CurrentSchemaID, rawEvent.CurrentSchemaName, info)) + tableInfos = append(tableInfos, common.WrapTableInfo(rawEvent.SchemaID, rawEvent.SchemaName, info)) } ddlEvent.TableNameChange = &commonEvent.TableNameChange{ AddName: addName, @@ -2108,15 +2121,15 @@ func buildDDLEventForAlterTablePartitioning(rawEvent *PersistedDDLEvent, tableFi TableIDs: rawEvent.PrevPartitions, } } else { - ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, rawEvent.PrevTableID) + ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, rawEvent.ExtraTableID) ddlEvent.NeedDroppedTables = &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.PrevTableID}, + TableIDs: []int64{rawEvent.ExtraTableID}, } } for _, id := range getAllPartitionIDs(rawEvent.TableInfo) { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, commonEvent.Table{ - SchemaID: rawEvent.CurrentSchemaID, + SchemaID: rawEvent.SchemaID, TableID: id, }) } @@ -2140,8 +2153,8 @@ func buildDDLEventForRemovePartitioning(rawEvent *PersistedDDLEvent, tableFilter } ddlEvent.NeedAddedTables = []commonEvent.Table{ { - SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.CurrentTableID, + SchemaID: rawEvent.SchemaID, + TableID: rawEvent.TableID, }, } return ddlEvent, true diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 783659634..bccf4a850 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -1681,7 +1681,6 @@ func TestApplyDDLJobs(t *testing.T) { // test add/drop primary key and alter index visibility for table // test modify table charset // test alter table ttl/remove ttl - // test set TiFlash replica // test multi schema change // test add/drop column { diff --git a/logservice/schemastore/types.go b/logservice/schemastore/types.go index 42f5920a5..b69dcb78b 100644 --- a/logservice/schemastore/types.go +++ b/logservice/schemastore/types.go @@ -25,42 +25,57 @@ type PersistedDDLEvent struct { ID int64 `msg:"id"` Type byte `msg:"type"` - // for exchange partition, it is the info of the partition table - CurrentSchemaID int64 `msg:"current_schema_id"` - CurrentTableID int64 `msg:"current_table_id"` - CurrentSchemaName string `msg:"current_schema_name"` - CurrentTableName string `msg:"current_table_name"` - - // The following fields are only set when the ddl job involves a prev table - // for exchange partition, it is the info of the normal table before exchange - PrevSchemaID int64 `msg:"prev_schema_id"` - PrevTableID int64 `msg:"prev_table_id"` - PrevSchemaName string `msg:"prev_schema_name"` - PrevTableName string `msg:"prev_table_name"` - - // only used for rename tables - PrevSchemaIDs []int64 `msg:"prev_schema_ids"` - PrevSchemaNames []string `msg:"prev_schema_names"` - PrevTableNames []string `msg:"prev_table_names"` - CurrentSchemaIDs []int64 `msg:"current_schema_ids"` - CurrentSchemaNames []string `msg:"s"` - - // The following fields are only set when the ddl job involves a partition table + // SchemaID is from upstream Job.SchemaID, it corresponds to TableID + // it is the DB id of the table after the ddl + SchemaID int64 `msg:"schema_id"` + // TableID is from upstream Job.TableID + // - for most ddl types which just involve a single table id, it is the table id of the table + // - for ExchangeTablePartition, it is the table id of the normal table before exchange + // and it is one of of the partition ids after exchange + // - for TruncateTable, it the table ID of the old table + TableID int64 `msg:"table_id"` + // SchemaName corresponds to SchemaID + SchemaName string `msg:"schema_name"` + // TableName corresponds to TableID + TableName string `msg:"table_name"` + + // ExtraSchemaID corresponds to ExtraTableID + ExtraSchemaID int64 `msg:"extra_schema_id"` + // - for ExchangeTablePartition, it is the table id of the partition table + // - for TruncateTable, it the table ID of the new table + ExtraTableID int64 `msg:"extra_table_id"` + // ExtraSchemaName corresponds to ExtraSchemaID + ExtraSchemaName string `msg:"extra_schema_name"` + // ExtraTableName corresponds to ExtraTableID + ExtraTableName string `msg:"extra_table_name"` + + // the following fields are only used for RenameTables + SchemaIDs []int64 `msg:"schema_ids"` + SchemaNames []string `msg:"schema_names"` + ExtraSchemaIDs []int64 `msg:"extra_schema_ids"` + ExtraSchemaNames []string `msg:"extra_schema_names"` + ExtraTableNames []string `msg:"extra_table_names"` + + // the following fields are only set when the ddl job involves a partition table + // it is the partition info of the table before this ddl PrevPartitions []int64 `msg:"prev_partitions"` - Query string `msg:"query"` - SchemaVersion int64 `msg:"schema_version"` - DBInfo *model.DBInfo `msg:"-"` - // for exchange partition, it is the info of the partition table - TableInfo *model.TableInfo `msg:"-"` - // TODO: use a custom struct to store the table info? - TableInfoValue []byte `msg:"table_info_value"` - // for exchange partition, it is the info of the normal table - PreTableInfo *common.TableInfo `msg:"-"` - // TODO: is there a better way to store PreTableInfo? - PreTableInfoValue []byte `msg:"pre_table_info_value"` - FinishedTs uint64 `msg:"finished_ts"` - + Query string `msg:"query"` + SchemaVersion int64 `msg:"schema_version"` + FinishedTs uint64 `msg:"finished_ts"` + + DBInfo *model.DBInfo `msg:"-"` + // it is from upstream job.TableInfo + // - for most ddl types which just involve a single table id, it is the table info of the table after the ddl + // - for ExchangeTablePartition, it is the table info of the partition table after exchange + // note: ExtraTableID is the partition table id. (it is a little tricky) + TableInfo *model.TableInfo `msg:"-"` + TableInfoValue []byte `msg:"table_info_value"` + // - for ExchangeTablePartition, it is the the info of the normal table before exchange + // and we derive the normal table info after exchange from this field(by clone it with a different table id) + ExtraTableInfo *common.TableInfo `msg:"-"` + ExtraTableInfoValue []byte `msg:"extra_table_info_value"` + // the following fields are just used for CreateTables and RenameTables MultipleTableInfos []*model.TableInfo `msg:"-"` MultipleTableInfosValue [][]byte `msg:"multi_table_info_value"` diff --git a/logservice/schemastore/types_gen.go b/logservice/schemastore/types_gen.go index 88d607847..2018d8b4b 100644 --- a/logservice/schemastore/types_gen.go +++ b/logservice/schemastore/types_gen.go @@ -36,146 +36,146 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Type") return } - case "current_schema_id": - z.CurrentSchemaID, err = dc.ReadInt64() + case "schema_id": + z.SchemaID, err = dc.ReadInt64() if err != nil { - err = msgp.WrapError(err, "CurrentSchemaID") + err = msgp.WrapError(err, "SchemaID") return } - case "current_table_id": - z.CurrentTableID, err = dc.ReadInt64() + case "table_id": + z.TableID, err = dc.ReadInt64() if err != nil { - err = msgp.WrapError(err, "CurrentTableID") + err = msgp.WrapError(err, "TableID") return } - case "current_schema_name": - z.CurrentSchemaName, err = dc.ReadString() + case "schema_name": + z.SchemaName, err = dc.ReadString() if err != nil { - err = msgp.WrapError(err, "CurrentSchemaName") + err = msgp.WrapError(err, "SchemaName") return } - case "current_table_name": - z.CurrentTableName, err = dc.ReadString() + case "table_name": + z.TableName, err = dc.ReadString() if err != nil { - err = msgp.WrapError(err, "CurrentTableName") + err = msgp.WrapError(err, "TableName") return } - case "prev_schema_id": - z.PrevSchemaID, err = dc.ReadInt64() + case "extra_schema_id": + z.ExtraSchemaID, err = dc.ReadInt64() if err != nil { - err = msgp.WrapError(err, "PrevSchemaID") + err = msgp.WrapError(err, "ExtraSchemaID") return } - case "prev_table_id": - z.PrevTableID, err = dc.ReadInt64() + case "extra_table_id": + z.ExtraTableID, err = dc.ReadInt64() if err != nil { - err = msgp.WrapError(err, "PrevTableID") + err = msgp.WrapError(err, "ExtraTableID") return } - case "prev_schema_name": - z.PrevSchemaName, err = dc.ReadString() + case "extra_schema_name": + z.ExtraSchemaName, err = dc.ReadString() if err != nil { - err = msgp.WrapError(err, "PrevSchemaName") + err = msgp.WrapError(err, "ExtraSchemaName") return } - case "prev_table_name": - z.PrevTableName, err = dc.ReadString() + case "extra_table_name": + z.ExtraTableName, err = dc.ReadString() if err != nil { - err = msgp.WrapError(err, "PrevTableName") + err = msgp.WrapError(err, "ExtraTableName") return } - case "prev_schema_ids": + case "schema_ids": var zb0002 uint32 zb0002, err = dc.ReadArrayHeader() if err != nil { - err = msgp.WrapError(err, "PrevSchemaIDs") + err = msgp.WrapError(err, "SchemaIDs") return } - if cap(z.PrevSchemaIDs) >= int(zb0002) { - z.PrevSchemaIDs = (z.PrevSchemaIDs)[:zb0002] + if cap(z.SchemaIDs) >= int(zb0002) { + z.SchemaIDs = (z.SchemaIDs)[:zb0002] } else { - z.PrevSchemaIDs = make([]int64, zb0002) + z.SchemaIDs = make([]int64, zb0002) } - for za0001 := range z.PrevSchemaIDs { - z.PrevSchemaIDs[za0001], err = dc.ReadInt64() + for za0001 := range z.SchemaIDs { + z.SchemaIDs[za0001], err = dc.ReadInt64() if err != nil { - err = msgp.WrapError(err, "PrevSchemaIDs", za0001) + err = msgp.WrapError(err, "SchemaIDs", za0001) return } } - case "prev_schema_names": + case "schema_names": var zb0003 uint32 zb0003, err = dc.ReadArrayHeader() if err != nil { - err = msgp.WrapError(err, "PrevSchemaNames") + err = msgp.WrapError(err, "SchemaNames") return } - if cap(z.PrevSchemaNames) >= int(zb0003) { - z.PrevSchemaNames = (z.PrevSchemaNames)[:zb0003] + if cap(z.SchemaNames) >= int(zb0003) { + z.SchemaNames = (z.SchemaNames)[:zb0003] } else { - z.PrevSchemaNames = make([]string, zb0003) + z.SchemaNames = make([]string, zb0003) } - for za0002 := range z.PrevSchemaNames { - z.PrevSchemaNames[za0002], err = dc.ReadString() + for za0002 := range z.SchemaNames { + z.SchemaNames[za0002], err = dc.ReadString() if err != nil { - err = msgp.WrapError(err, "PrevSchemaNames", za0002) + err = msgp.WrapError(err, "SchemaNames", za0002) return } } - case "prev_table_names": + case "extra_schema_ids": var zb0004 uint32 zb0004, err = dc.ReadArrayHeader() if err != nil { - err = msgp.WrapError(err, "PrevTableNames") + err = msgp.WrapError(err, "ExtraSchemaIDs") return } - if cap(z.PrevTableNames) >= int(zb0004) { - z.PrevTableNames = (z.PrevTableNames)[:zb0004] + if cap(z.ExtraSchemaIDs) >= int(zb0004) { + z.ExtraSchemaIDs = (z.ExtraSchemaIDs)[:zb0004] } else { - z.PrevTableNames = make([]string, zb0004) + z.ExtraSchemaIDs = make([]int64, zb0004) } - for za0003 := range z.PrevTableNames { - z.PrevTableNames[za0003], err = dc.ReadString() + for za0003 := range z.ExtraSchemaIDs { + z.ExtraSchemaIDs[za0003], err = dc.ReadInt64() if err != nil { - err = msgp.WrapError(err, "PrevTableNames", za0003) + err = msgp.WrapError(err, "ExtraSchemaIDs", za0003) return } } - case "current_schema_ids": + case "extra_schema_names": var zb0005 uint32 zb0005, err = dc.ReadArrayHeader() if err != nil { - err = msgp.WrapError(err, "CurrentSchemaIDs") + err = msgp.WrapError(err, "ExtraSchemaNames") return } - if cap(z.CurrentSchemaIDs) >= int(zb0005) { - z.CurrentSchemaIDs = (z.CurrentSchemaIDs)[:zb0005] + if cap(z.ExtraSchemaNames) >= int(zb0005) { + z.ExtraSchemaNames = (z.ExtraSchemaNames)[:zb0005] } else { - z.CurrentSchemaIDs = make([]int64, zb0005) + z.ExtraSchemaNames = make([]string, zb0005) } - for za0004 := range z.CurrentSchemaIDs { - z.CurrentSchemaIDs[za0004], err = dc.ReadInt64() + for za0004 := range z.ExtraSchemaNames { + z.ExtraSchemaNames[za0004], err = dc.ReadString() if err != nil { - err = msgp.WrapError(err, "CurrentSchemaIDs", za0004) + err = msgp.WrapError(err, "ExtraSchemaNames", za0004) return } } - case "s": + case "extra_table_names": var zb0006 uint32 zb0006, err = dc.ReadArrayHeader() if err != nil { - err = msgp.WrapError(err, "CurrentSchemaNames") + err = msgp.WrapError(err, "ExtraTableNames") return } - if cap(z.CurrentSchemaNames) >= int(zb0006) { - z.CurrentSchemaNames = (z.CurrentSchemaNames)[:zb0006] + if cap(z.ExtraTableNames) >= int(zb0006) { + z.ExtraTableNames = (z.ExtraTableNames)[:zb0006] } else { - z.CurrentSchemaNames = make([]string, zb0006) + z.ExtraTableNames = make([]string, zb0006) } - for za0005 := range z.CurrentSchemaNames { - z.CurrentSchemaNames[za0005], err = dc.ReadString() + for za0005 := range z.ExtraTableNames { + z.ExtraTableNames[za0005], err = dc.ReadString() if err != nil { - err = msgp.WrapError(err, "CurrentSchemaNames", za0005) + err = msgp.WrapError(err, "ExtraTableNames", za0005) return } } @@ -210,22 +210,22 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "SchemaVersion") return } - case "table_info_value": - z.TableInfoValue, err = dc.ReadBytes(z.TableInfoValue) + case "finished_ts": + z.FinishedTs, err = dc.ReadUint64() if err != nil { - err = msgp.WrapError(err, "TableInfoValue") + err = msgp.WrapError(err, "FinishedTs") return } - case "pre_table_info_value": - z.PreTableInfoValue, err = dc.ReadBytes(z.PreTableInfoValue) + case "table_info_value": + z.TableInfoValue, err = dc.ReadBytes(z.TableInfoValue) if err != nil { - err = msgp.WrapError(err, "PreTableInfoValue") + err = msgp.WrapError(err, "TableInfoValue") return } - case "finished_ts": - z.FinishedTs, err = dc.ReadUint64() + case "extra_table_info_value": + z.ExtraTableInfoValue, err = dc.ReadBytes(z.ExtraTableInfoValue) if err != nil { - err = msgp.WrapError(err, "FinishedTs") + err = msgp.WrapError(err, "ExtraTableInfoValue") return } case "multi_table_info_value": @@ -293,168 +293,168 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Type") return } - // write "current_schema_id" - err = en.Append(0xb1, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) + // write "schema_id" + err = en.Append(0xa9, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) if err != nil { return } - err = en.WriteInt64(z.CurrentSchemaID) + err = en.WriteInt64(z.SchemaID) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaID") + err = msgp.WrapError(err, "SchemaID") return } - // write "current_table_id" - err = en.Append(0xb0, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64) + // write "table_id" + err = en.Append(0xa8, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64) if err != nil { return } - err = en.WriteInt64(z.CurrentTableID) + err = en.WriteInt64(z.TableID) if err != nil { - err = msgp.WrapError(err, "CurrentTableID") + err = msgp.WrapError(err, "TableID") return } - // write "current_schema_name" - err = en.Append(0xb3, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65) + // write "schema_name" + err = en.Append(0xab, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65) if err != nil { return } - err = en.WriteString(z.CurrentSchemaName) + err = en.WriteString(z.SchemaName) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaName") + err = msgp.WrapError(err, "SchemaName") return } - // write "current_table_name" - err = en.Append(0xb2, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65) + // write "table_name" + err = en.Append(0xaa, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65) if err != nil { return } - err = en.WriteString(z.CurrentTableName) + err = en.WriteString(z.TableName) if err != nil { - err = msgp.WrapError(err, "CurrentTableName") + err = msgp.WrapError(err, "TableName") return } - // write "prev_schema_id" - err = en.Append(0xae, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) + // write "extra_schema_id" + err = en.Append(0xaf, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) if err != nil { return } - err = en.WriteInt64(z.PrevSchemaID) + err = en.WriteInt64(z.ExtraSchemaID) if err != nil { - err = msgp.WrapError(err, "PrevSchemaID") + err = msgp.WrapError(err, "ExtraSchemaID") return } - // write "prev_table_id" - err = en.Append(0xad, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64) + // write "extra_table_id" + err = en.Append(0xae, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64) if err != nil { return } - err = en.WriteInt64(z.PrevTableID) + err = en.WriteInt64(z.ExtraTableID) if err != nil { - err = msgp.WrapError(err, "PrevTableID") + err = msgp.WrapError(err, "ExtraTableID") return } - // write "prev_schema_name" - err = en.Append(0xb0, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65) + // write "extra_schema_name" + err = en.Append(0xb1, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65) if err != nil { return } - err = en.WriteString(z.PrevSchemaName) + err = en.WriteString(z.ExtraSchemaName) if err != nil { - err = msgp.WrapError(err, "PrevSchemaName") + err = msgp.WrapError(err, "ExtraSchemaName") return } - // write "prev_table_name" - err = en.Append(0xaf, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65) + // write "extra_table_name" + err = en.Append(0xb0, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65) if err != nil { return } - err = en.WriteString(z.PrevTableName) + err = en.WriteString(z.ExtraTableName) if err != nil { - err = msgp.WrapError(err, "PrevTableName") + err = msgp.WrapError(err, "ExtraTableName") return } - // write "prev_schema_ids" - err = en.Append(0xaf, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64, 0x73) + // write "schema_ids" + err = en.Append(0xaa, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64, 0x73) if err != nil { return } - err = en.WriteArrayHeader(uint32(len(z.PrevSchemaIDs))) + err = en.WriteArrayHeader(uint32(len(z.SchemaIDs))) if err != nil { - err = msgp.WrapError(err, "PrevSchemaIDs") + err = msgp.WrapError(err, "SchemaIDs") return } - for za0001 := range z.PrevSchemaIDs { - err = en.WriteInt64(z.PrevSchemaIDs[za0001]) + for za0001 := range z.SchemaIDs { + err = en.WriteInt64(z.SchemaIDs[za0001]) if err != nil { - err = msgp.WrapError(err, "PrevSchemaIDs", za0001) + err = msgp.WrapError(err, "SchemaIDs", za0001) return } } - // write "prev_schema_names" - err = en.Append(0xb1, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) + // write "schema_names" + err = en.Append(0xac, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) if err != nil { return } - err = en.WriteArrayHeader(uint32(len(z.PrevSchemaNames))) + err = en.WriteArrayHeader(uint32(len(z.SchemaNames))) if err != nil { - err = msgp.WrapError(err, "PrevSchemaNames") + err = msgp.WrapError(err, "SchemaNames") return } - for za0002 := range z.PrevSchemaNames { - err = en.WriteString(z.PrevSchemaNames[za0002]) + for za0002 := range z.SchemaNames { + err = en.WriteString(z.SchemaNames[za0002]) if err != nil { - err = msgp.WrapError(err, "PrevSchemaNames", za0002) + err = msgp.WrapError(err, "SchemaNames", za0002) return } } - // write "prev_table_names" - err = en.Append(0xb0, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) + // write "extra_schema_ids" + err = en.Append(0xb0, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64, 0x73) if err != nil { return } - err = en.WriteArrayHeader(uint32(len(z.PrevTableNames))) + err = en.WriteArrayHeader(uint32(len(z.ExtraSchemaIDs))) if err != nil { - err = msgp.WrapError(err, "PrevTableNames") + err = msgp.WrapError(err, "ExtraSchemaIDs") return } - for za0003 := range z.PrevTableNames { - err = en.WriteString(z.PrevTableNames[za0003]) + for za0003 := range z.ExtraSchemaIDs { + err = en.WriteInt64(z.ExtraSchemaIDs[za0003]) if err != nil { - err = msgp.WrapError(err, "PrevTableNames", za0003) + err = msgp.WrapError(err, "ExtraSchemaIDs", za0003) return } } - // write "current_schema_ids" - err = en.Append(0xb2, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64, 0x73) + // write "extra_schema_names" + err = en.Append(0xb2, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) if err != nil { return } - err = en.WriteArrayHeader(uint32(len(z.CurrentSchemaIDs))) + err = en.WriteArrayHeader(uint32(len(z.ExtraSchemaNames))) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaIDs") + err = msgp.WrapError(err, "ExtraSchemaNames") return } - for za0004 := range z.CurrentSchemaIDs { - err = en.WriteInt64(z.CurrentSchemaIDs[za0004]) + for za0004 := range z.ExtraSchemaNames { + err = en.WriteString(z.ExtraSchemaNames[za0004]) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaIDs", za0004) + err = msgp.WrapError(err, "ExtraSchemaNames", za0004) return } } - // write "s" - err = en.Append(0xa1, 0x73) + // write "extra_table_names" + err = en.Append(0xb1, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) if err != nil { return } - err = en.WriteArrayHeader(uint32(len(z.CurrentSchemaNames))) + err = en.WriteArrayHeader(uint32(len(z.ExtraTableNames))) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaNames") + err = msgp.WrapError(err, "ExtraTableNames") return } - for za0005 := range z.CurrentSchemaNames { - err = en.WriteString(z.CurrentSchemaNames[za0005]) + for za0005 := range z.ExtraTableNames { + err = en.WriteString(z.ExtraTableNames[za0005]) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaNames", za0005) + err = msgp.WrapError(err, "ExtraTableNames", za0005) return } } @@ -495,34 +495,34 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "SchemaVersion") return } - // write "table_info_value" - err = en.Append(0xb0, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) + // write "finished_ts" + err = en.Append(0xab, 0x66, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x5f, 0x74, 0x73) if err != nil { return } - err = en.WriteBytes(z.TableInfoValue) + err = en.WriteUint64(z.FinishedTs) if err != nil { - err = msgp.WrapError(err, "TableInfoValue") + err = msgp.WrapError(err, "FinishedTs") return } - // write "pre_table_info_value" - err = en.Append(0xb4, 0x70, 0x72, 0x65, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) + // write "table_info_value" + err = en.Append(0xb0, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) if err != nil { return } - err = en.WriteBytes(z.PreTableInfoValue) + err = en.WriteBytes(z.TableInfoValue) if err != nil { - err = msgp.WrapError(err, "PreTableInfoValue") + err = msgp.WrapError(err, "TableInfoValue") return } - // write "finished_ts" - err = en.Append(0xab, 0x66, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x5f, 0x74, 0x73) + // write "extra_table_info_value" + err = en.Append(0xb6, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) if err != nil { return } - err = en.WriteUint64(z.FinishedTs) + err = en.WriteBytes(z.ExtraTableInfoValue) if err != nil { - err = msgp.WrapError(err, "FinishedTs") + err = msgp.WrapError(err, "ExtraTableInfoValue") return } // write "multi_table_info_value" @@ -575,59 +575,59 @@ func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { // string "type" o = append(o, 0xa4, 0x74, 0x79, 0x70, 0x65) o = msgp.AppendByte(o, z.Type) - // string "current_schema_id" - o = append(o, 0xb1, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) - o = msgp.AppendInt64(o, z.CurrentSchemaID) - // string "current_table_id" - o = append(o, 0xb0, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64) - o = msgp.AppendInt64(o, z.CurrentTableID) - // string "current_schema_name" - o = append(o, 0xb3, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65) - o = msgp.AppendString(o, z.CurrentSchemaName) - // string "current_table_name" - o = append(o, 0xb2, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65) - o = msgp.AppendString(o, z.CurrentTableName) - // string "prev_schema_id" - o = append(o, 0xae, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) - o = msgp.AppendInt64(o, z.PrevSchemaID) - // string "prev_table_id" - o = append(o, 0xad, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64) - o = msgp.AppendInt64(o, z.PrevTableID) - // string "prev_schema_name" - o = append(o, 0xb0, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65) - o = msgp.AppendString(o, z.PrevSchemaName) - // string "prev_table_name" - o = append(o, 0xaf, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65) - o = msgp.AppendString(o, z.PrevTableName) - // string "prev_schema_ids" - o = append(o, 0xaf, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64, 0x73) - o = msgp.AppendArrayHeader(o, uint32(len(z.PrevSchemaIDs))) - for za0001 := range z.PrevSchemaIDs { - o = msgp.AppendInt64(o, z.PrevSchemaIDs[za0001]) - } - // string "prev_schema_names" - o = append(o, 0xb1, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) - o = msgp.AppendArrayHeader(o, uint32(len(z.PrevSchemaNames))) - for za0002 := range z.PrevSchemaNames { - o = msgp.AppendString(o, z.PrevSchemaNames[za0002]) - } - // string "prev_table_names" - o = append(o, 0xb0, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) - o = msgp.AppendArrayHeader(o, uint32(len(z.PrevTableNames))) - for za0003 := range z.PrevTableNames { - o = msgp.AppendString(o, z.PrevTableNames[za0003]) - } - // string "current_schema_ids" - o = append(o, 0xb2, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64, 0x73) - o = msgp.AppendArrayHeader(o, uint32(len(z.CurrentSchemaIDs))) - for za0004 := range z.CurrentSchemaIDs { - o = msgp.AppendInt64(o, z.CurrentSchemaIDs[za0004]) - } - // string "s" - o = append(o, 0xa1, 0x73) - o = msgp.AppendArrayHeader(o, uint32(len(z.CurrentSchemaNames))) - for za0005 := range z.CurrentSchemaNames { - o = msgp.AppendString(o, z.CurrentSchemaNames[za0005]) + // string "schema_id" + o = append(o, 0xa9, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) + o = msgp.AppendInt64(o, z.SchemaID) + // string "table_id" + o = append(o, 0xa8, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64) + o = msgp.AppendInt64(o, z.TableID) + // string "schema_name" + o = append(o, 0xab, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z.SchemaName) + // string "table_name" + o = append(o, 0xaa, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z.TableName) + // string "extra_schema_id" + o = append(o, 0xaf, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64) + o = msgp.AppendInt64(o, z.ExtraSchemaID) + // string "extra_table_id" + o = append(o, 0xae, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x64) + o = msgp.AppendInt64(o, z.ExtraTableID) + // string "extra_schema_name" + o = append(o, 0xb1, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z.ExtraSchemaName) + // string "extra_table_name" + o = append(o, 0xb0, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z.ExtraTableName) + // string "schema_ids" + o = append(o, 0xaa, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.SchemaIDs))) + for za0001 := range z.SchemaIDs { + o = msgp.AppendInt64(o, z.SchemaIDs[za0001]) + } + // string "schema_names" + o = append(o, 0xac, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.SchemaNames))) + for za0002 := range z.SchemaNames { + o = msgp.AppendString(o, z.SchemaNames[za0002]) + } + // string "extra_schema_ids" + o = append(o, 0xb0, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x69, 0x64, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.ExtraSchemaIDs))) + for za0003 := range z.ExtraSchemaIDs { + o = msgp.AppendInt64(o, z.ExtraSchemaIDs[za0003]) + } + // string "extra_schema_names" + o = append(o, 0xb2, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.ExtraSchemaNames))) + for za0004 := range z.ExtraSchemaNames { + o = msgp.AppendString(o, z.ExtraSchemaNames[za0004]) + } + // string "extra_table_names" + o = append(o, 0xb1, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.ExtraTableNames))) + for za0005 := range z.ExtraTableNames { + o = msgp.AppendString(o, z.ExtraTableNames[za0005]) } // string "prev_partitions" o = append(o, 0xaf, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73) @@ -641,15 +641,15 @@ func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { // string "schema_version" o = append(o, 0xae, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) o = msgp.AppendInt64(o, z.SchemaVersion) - // string "table_info_value" - o = append(o, 0xb0, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) - o = msgp.AppendBytes(o, z.TableInfoValue) - // string "pre_table_info_value" - o = append(o, 0xb4, 0x70, 0x72, 0x65, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) - o = msgp.AppendBytes(o, z.PreTableInfoValue) // string "finished_ts" o = append(o, 0xab, 0x66, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x5f, 0x74, 0x73) o = msgp.AppendUint64(o, z.FinishedTs) + // string "table_info_value" + o = append(o, 0xb0, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) + o = msgp.AppendBytes(o, z.TableInfoValue) + // string "extra_table_info_value" + o = append(o, 0xb6, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) + o = msgp.AppendBytes(o, z.ExtraTableInfoValue) // string "multi_table_info_value" o = append(o, 0xb6, 0x6d, 0x75, 0x6c, 0x74, 0x69, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) o = msgp.AppendArrayHeader(o, uint32(len(z.MultipleTableInfosValue))) @@ -695,146 +695,146 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Type") return } - case "current_schema_id": - z.CurrentSchemaID, bts, err = msgp.ReadInt64Bytes(bts) + case "schema_id": + z.SchemaID, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaID") + err = msgp.WrapError(err, "SchemaID") return } - case "current_table_id": - z.CurrentTableID, bts, err = msgp.ReadInt64Bytes(bts) + case "table_id": + z.TableID, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "CurrentTableID") + err = msgp.WrapError(err, "TableID") return } - case "current_schema_name": - z.CurrentSchemaName, bts, err = msgp.ReadStringBytes(bts) + case "schema_name": + z.SchemaName, bts, err = msgp.ReadStringBytes(bts) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaName") + err = msgp.WrapError(err, "SchemaName") return } - case "current_table_name": - z.CurrentTableName, bts, err = msgp.ReadStringBytes(bts) + case "table_name": + z.TableName, bts, err = msgp.ReadStringBytes(bts) if err != nil { - err = msgp.WrapError(err, "CurrentTableName") + err = msgp.WrapError(err, "TableName") return } - case "prev_schema_id": - z.PrevSchemaID, bts, err = msgp.ReadInt64Bytes(bts) + case "extra_schema_id": + z.ExtraSchemaID, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "PrevSchemaID") + err = msgp.WrapError(err, "ExtraSchemaID") return } - case "prev_table_id": - z.PrevTableID, bts, err = msgp.ReadInt64Bytes(bts) + case "extra_table_id": + z.ExtraTableID, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "PrevTableID") + err = msgp.WrapError(err, "ExtraTableID") return } - case "prev_schema_name": - z.PrevSchemaName, bts, err = msgp.ReadStringBytes(bts) + case "extra_schema_name": + z.ExtraSchemaName, bts, err = msgp.ReadStringBytes(bts) if err != nil { - err = msgp.WrapError(err, "PrevSchemaName") + err = msgp.WrapError(err, "ExtraSchemaName") return } - case "prev_table_name": - z.PrevTableName, bts, err = msgp.ReadStringBytes(bts) + case "extra_table_name": + z.ExtraTableName, bts, err = msgp.ReadStringBytes(bts) if err != nil { - err = msgp.WrapError(err, "PrevTableName") + err = msgp.WrapError(err, "ExtraTableName") return } - case "prev_schema_ids": + case "schema_ids": var zb0002 uint32 zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { - err = msgp.WrapError(err, "PrevSchemaIDs") + err = msgp.WrapError(err, "SchemaIDs") return } - if cap(z.PrevSchemaIDs) >= int(zb0002) { - z.PrevSchemaIDs = (z.PrevSchemaIDs)[:zb0002] + if cap(z.SchemaIDs) >= int(zb0002) { + z.SchemaIDs = (z.SchemaIDs)[:zb0002] } else { - z.PrevSchemaIDs = make([]int64, zb0002) + z.SchemaIDs = make([]int64, zb0002) } - for za0001 := range z.PrevSchemaIDs { - z.PrevSchemaIDs[za0001], bts, err = msgp.ReadInt64Bytes(bts) + for za0001 := range z.SchemaIDs { + z.SchemaIDs[za0001], bts, err = msgp.ReadInt64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "PrevSchemaIDs", za0001) + err = msgp.WrapError(err, "SchemaIDs", za0001) return } } - case "prev_schema_names": + case "schema_names": var zb0003 uint32 zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { - err = msgp.WrapError(err, "PrevSchemaNames") + err = msgp.WrapError(err, "SchemaNames") return } - if cap(z.PrevSchemaNames) >= int(zb0003) { - z.PrevSchemaNames = (z.PrevSchemaNames)[:zb0003] + if cap(z.SchemaNames) >= int(zb0003) { + z.SchemaNames = (z.SchemaNames)[:zb0003] } else { - z.PrevSchemaNames = make([]string, zb0003) + z.SchemaNames = make([]string, zb0003) } - for za0002 := range z.PrevSchemaNames { - z.PrevSchemaNames[za0002], bts, err = msgp.ReadStringBytes(bts) + for za0002 := range z.SchemaNames { + z.SchemaNames[za0002], bts, err = msgp.ReadStringBytes(bts) if err != nil { - err = msgp.WrapError(err, "PrevSchemaNames", za0002) + err = msgp.WrapError(err, "SchemaNames", za0002) return } } - case "prev_table_names": + case "extra_schema_ids": var zb0004 uint32 zb0004, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { - err = msgp.WrapError(err, "PrevTableNames") + err = msgp.WrapError(err, "ExtraSchemaIDs") return } - if cap(z.PrevTableNames) >= int(zb0004) { - z.PrevTableNames = (z.PrevTableNames)[:zb0004] + if cap(z.ExtraSchemaIDs) >= int(zb0004) { + z.ExtraSchemaIDs = (z.ExtraSchemaIDs)[:zb0004] } else { - z.PrevTableNames = make([]string, zb0004) + z.ExtraSchemaIDs = make([]int64, zb0004) } - for za0003 := range z.PrevTableNames { - z.PrevTableNames[za0003], bts, err = msgp.ReadStringBytes(bts) + for za0003 := range z.ExtraSchemaIDs { + z.ExtraSchemaIDs[za0003], bts, err = msgp.ReadInt64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "PrevTableNames", za0003) + err = msgp.WrapError(err, "ExtraSchemaIDs", za0003) return } } - case "current_schema_ids": + case "extra_schema_names": var zb0005 uint32 zb0005, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaIDs") + err = msgp.WrapError(err, "ExtraSchemaNames") return } - if cap(z.CurrentSchemaIDs) >= int(zb0005) { - z.CurrentSchemaIDs = (z.CurrentSchemaIDs)[:zb0005] + if cap(z.ExtraSchemaNames) >= int(zb0005) { + z.ExtraSchemaNames = (z.ExtraSchemaNames)[:zb0005] } else { - z.CurrentSchemaIDs = make([]int64, zb0005) + z.ExtraSchemaNames = make([]string, zb0005) } - for za0004 := range z.CurrentSchemaIDs { - z.CurrentSchemaIDs[za0004], bts, err = msgp.ReadInt64Bytes(bts) + for za0004 := range z.ExtraSchemaNames { + z.ExtraSchemaNames[za0004], bts, err = msgp.ReadStringBytes(bts) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaIDs", za0004) + err = msgp.WrapError(err, "ExtraSchemaNames", za0004) return } } - case "s": + case "extra_table_names": var zb0006 uint32 zb0006, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaNames") + err = msgp.WrapError(err, "ExtraTableNames") return } - if cap(z.CurrentSchemaNames) >= int(zb0006) { - z.CurrentSchemaNames = (z.CurrentSchemaNames)[:zb0006] + if cap(z.ExtraTableNames) >= int(zb0006) { + z.ExtraTableNames = (z.ExtraTableNames)[:zb0006] } else { - z.CurrentSchemaNames = make([]string, zb0006) + z.ExtraTableNames = make([]string, zb0006) } - for za0005 := range z.CurrentSchemaNames { - z.CurrentSchemaNames[za0005], bts, err = msgp.ReadStringBytes(bts) + for za0005 := range z.ExtraTableNames { + z.ExtraTableNames[za0005], bts, err = msgp.ReadStringBytes(bts) if err != nil { - err = msgp.WrapError(err, "CurrentSchemaNames", za0005) + err = msgp.WrapError(err, "ExtraTableNames", za0005) return } } @@ -869,22 +869,22 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "SchemaVersion") return } - case "table_info_value": - z.TableInfoValue, bts, err = msgp.ReadBytesBytes(bts, z.TableInfoValue) + case "finished_ts": + z.FinishedTs, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "TableInfoValue") + err = msgp.WrapError(err, "FinishedTs") return } - case "pre_table_info_value": - z.PreTableInfoValue, bts, err = msgp.ReadBytesBytes(bts, z.PreTableInfoValue) + case "table_info_value": + z.TableInfoValue, bts, err = msgp.ReadBytesBytes(bts, z.TableInfoValue) if err != nil { - err = msgp.WrapError(err, "PreTableInfoValue") + err = msgp.WrapError(err, "TableInfoValue") return } - case "finished_ts": - z.FinishedTs, bts, err = msgp.ReadUint64Bytes(bts) + case "extra_table_info_value": + z.ExtraTableInfoValue, bts, err = msgp.ReadBytesBytes(bts, z.ExtraTableInfoValue) if err != nil { - err = msgp.WrapError(err, "FinishedTs") + err = msgp.WrapError(err, "ExtraTableInfoValue") return } case "multi_table_info_value": @@ -932,19 +932,19 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *PersistedDDLEvent) Msgsize() (s int) { - s = 3 + 3 + msgp.Int64Size + 5 + msgp.ByteSize + 18 + msgp.Int64Size + 17 + msgp.Int64Size + 20 + msgp.StringPrefixSize + len(z.CurrentSchemaName) + 19 + msgp.StringPrefixSize + len(z.CurrentTableName) + 15 + msgp.Int64Size + 14 + msgp.Int64Size + 17 + msgp.StringPrefixSize + len(z.PrevSchemaName) + 16 + msgp.StringPrefixSize + len(z.PrevTableName) + 16 + msgp.ArrayHeaderSize + (len(z.PrevSchemaIDs) * (msgp.Int64Size)) + 18 + msgp.ArrayHeaderSize - for za0002 := range z.PrevSchemaNames { - s += msgp.StringPrefixSize + len(z.PrevSchemaNames[za0002]) + s = 3 + 3 + msgp.Int64Size + 5 + msgp.ByteSize + 10 + msgp.Int64Size + 9 + msgp.Int64Size + 12 + msgp.StringPrefixSize + len(z.SchemaName) + 11 + msgp.StringPrefixSize + len(z.TableName) + 16 + msgp.Int64Size + 15 + msgp.Int64Size + 18 + msgp.StringPrefixSize + len(z.ExtraSchemaName) + 17 + msgp.StringPrefixSize + len(z.ExtraTableName) + 11 + msgp.ArrayHeaderSize + (len(z.SchemaIDs) * (msgp.Int64Size)) + 13 + msgp.ArrayHeaderSize + for za0002 := range z.SchemaNames { + s += msgp.StringPrefixSize + len(z.SchemaNames[za0002]) } - s += 17 + msgp.ArrayHeaderSize - for za0003 := range z.PrevTableNames { - s += msgp.StringPrefixSize + len(z.PrevTableNames[za0003]) + s += 17 + msgp.ArrayHeaderSize + (len(z.ExtraSchemaIDs) * (msgp.Int64Size)) + 19 + msgp.ArrayHeaderSize + for za0004 := range z.ExtraSchemaNames { + s += msgp.StringPrefixSize + len(z.ExtraSchemaNames[za0004]) } - s += 19 + msgp.ArrayHeaderSize + (len(z.CurrentSchemaIDs) * (msgp.Int64Size)) + 2 + msgp.ArrayHeaderSize - for za0005 := range z.CurrentSchemaNames { - s += msgp.StringPrefixSize + len(z.CurrentSchemaNames[za0005]) + s += 18 + msgp.ArrayHeaderSize + for za0005 := range z.ExtraTableNames { + s += msgp.StringPrefixSize + len(z.ExtraTableNames[za0005]) } - s += 16 + msgp.ArrayHeaderSize + (len(z.PrevPartitions) * (msgp.Int64Size)) + 6 + msgp.StringPrefixSize + len(z.Query) + 15 + msgp.Int64Size + 17 + msgp.BytesPrefixSize + len(z.TableInfoValue) + 21 + msgp.BytesPrefixSize + len(z.PreTableInfoValue) + 12 + msgp.Uint64Size + 23 + msgp.ArrayHeaderSize + s += 16 + msgp.ArrayHeaderSize + (len(z.PrevPartitions) * (msgp.Int64Size)) + 6 + msgp.StringPrefixSize + len(z.Query) + 15 + msgp.Int64Size + 12 + msgp.Uint64Size + 17 + msgp.BytesPrefixSize + len(z.TableInfoValue) + 23 + msgp.BytesPrefixSize + len(z.ExtraTableInfoValue) + 23 + msgp.ArrayHeaderSize for za0007 := range z.MultipleTableInfosValue { s += msgp.BytesPrefixSize + len(z.MultipleTableInfosValue[za0007]) } diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 9bf856836..bd0ad4506 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -33,24 +33,28 @@ type DDLEvent struct { Version byte `json:"version"` DispatcherID common.DispatcherID `json:"-"` Type byte `json:"type"` - // SchemaID means different for different job types: - // - ExchangeTablePartition: db id of non-partitioned table + // SchemaID is from upstream job.SchemaID SchemaID int64 `json:"schema_id"` + // TableID is from upstream job.TableID // TableID means different for different job types: - // - ExchangeTablePartition: non-partitioned table id - TableID int64 `json:"table_id"` - SchemaName string `json:"schema_name"` - TableName string `json:"table_name"` - PrevSchemaName string `json:"prev_schema_name"` - PrevTableName string `json:"prev_table_name"` - Query string `json:"query"` - TableInfo *common.TableInfo `json:"-"` - FinishedTs uint64 `json:"finished_ts"` + // - for most ddl types which just involve a single table id, it is the table id of the table + // - for ExchangeTablePartition, it is the table id of the normal table before exchange + // and it is one of of the partition ids after exchange + // - for TruncateTable, it the table ID of the old table + TableID int64 `json:"table_id"` + SchemaName string `json:"schema_name"` + TableName string `json:"table_name"` + // the following two fields are just used for RenameTable, + // they are the old schema/table name of the table + ExtraSchemaName string `json:"extra_schema_name"` + ExtraTableName string `json:"extra_table_name"` + Query string `json:"query"` + TableInfo *common.TableInfo `json:"-"` + FinishedTs uint64 `json:"finished_ts"` // The seq of the event. It is set by event service. Seq uint64 `json:"seq"` // State is the state of sender when sending this event. - State EventSenderState `json:"state"` - // TODO: just here for compile, may be changed later + State EventSenderState `json:"state"` MultipleTableInfos []*common.TableInfo `json:"-"` BlockedTables *InfluencedTables `json:"blocked_tables"` @@ -105,20 +109,20 @@ func (d *DDLEvent) PostFlush() { } } -func (d *DDLEvent) GetCurrentSchemaName() string { +func (d *DDLEvent) GetSchemaName() string { return d.SchemaName } -func (d *DDLEvent) GetCurrentTableName() string { +func (d *DDLEvent) GetTableName() string { return d.TableName } -func (d *DDLEvent) GetPrevSchemaName() string { - return d.PrevSchemaName +func (d *DDLEvent) GetExtraSchemaName() string { + return d.ExtraSchemaName } -func (d *DDLEvent) GetPrevTableName() string { - return d.PrevTableName +func (d *DDLEvent) GetExtraTableName() string { + return d.ExtraTableName } func (d *DDLEvent) GetEvents() []*DDLEvent { @@ -144,8 +148,8 @@ func (d *DDLEvent) GetEvents() []*DDLEvent { Type: d.Type, // SchemaID: d.TableInfo.SchemaID, // TableID: d.TableInfo.TableName.TableID, - SchemaName: d.PrevSchemaName, - TableName: d.PrevTableName, + SchemaName: d.ExtraSchemaName, + TableName: d.ExtraTableName, Query: d.Query, FinishedTs: d.FinishedTs, }, diff --git a/pkg/sink/codec/avro/glue_schema_registry.go b/pkg/sink/codec/avro/glue_schema_registry.go index 9ff83056d..07acd4ba6 100644 --- a/pkg/sink/codec/avro/glue_schema_registry.go +++ b/pkg/sink/codec/avro/glue_schema_registry.go @@ -290,11 +290,11 @@ func (m *glueSchemaManager) updateSchema(ctx context.Context, schemaName, schema return *resp.SchemaVersionId, nil } -func (m *glueSchemaManager) getSchemaByName(ctx context.Context, schemaNAme string) (bool, string, error) { +func (m *glueSchemaManager) getSchemaByName(ctx context.Context, schemaName string) (bool, string, error) { input := &glue.GetSchemaVersionInput{ SchemaId: &types.SchemaId{ RegistryName: aws.String(m.registryName), - SchemaName: aws.String(schemaNAme), + SchemaName: aws.String(schemaName), }, SchemaVersionNumber: &types.SchemaVersionNumber{LatestVersion: true}, } diff --git a/pkg/sink/codec/canal/encoder.go b/pkg/sink/codec/canal/encoder.go index d1b16db14..52b2eaabb 100644 --- a/pkg/sink/codec/canal/encoder.go +++ b/pkg/sink/codec/canal/encoder.go @@ -385,8 +385,8 @@ func NewJSONRowEventEncoder(ctx context.Context, config *common.Config) (common. func (c *JSONRowEventEncoder) newJSONMessageForDDL(e *commonEvent.DDLEvent) canalJSONMessageInterface { msg := &JSONMessage{ ID: 0, // ignored by both Canal Adapter and Flink - Schema: e.GetCurrentSchemaName(), - Table: e.GetCurrentTableName(), + Schema: e.GetSchemaName(), + Table: e.GetTableName(), IsDDL: true, EventType: convertDdlEventType(e.Type).String(), ExecutionTime: convertToCanalTs(e.GetCommitTs()),