Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: small refactor ddl event #979

Merged
merged 16 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -422,17 +422,6 @@ jobs:
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=partition_table

- name: Test multi_tables_ddl
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=multi_tables_ddl

- name: Test multi_source
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=multi_source


- name: Test ddl_attributes
if: ${{ success() }}
Expand Down Expand Up @@ -516,12 +505,22 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=cdc

# The 20th case in this group
- name: Test overwrite_resume_with_syncpoint
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=overwrite_resume_with_syncpoint

- name: Test multi_source
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=multi_source

# The 20th case in this group
- name: Test multi_tables_ddl
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=multi_tables_ddl

- name: Upload test logs
if: always()
uses: ./.github/actions/upload-test-logs
Expand Down
14 changes: 7 additions & 7 deletions downstreamadapter/sink/helper/eventrouter/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,18 @@ func (s *EventRouter) GetTopicForRowChange(tableInfo *common.TableInfo) string {
func (s *EventRouter) GetTopicForDDL(ddl *commonEvent.DDLEvent) string {
var schema, table string

if ddl.GetPrevSchemaName() != "" {
if ddl.GetPrevTableName() == "" {
if ddl.GetExtraSchemaName() != "" {
if ddl.GetExtraTableName() == "" {
return s.defaultTopic
}
schema = ddl.GetPrevSchemaName()
table = ddl.GetPrevTableName()
schema = ddl.GetExtraSchemaName()
table = ddl.GetExtraTableName()
} else {
if ddl.GetCurrentTableName() == "" {
if ddl.GetTableName() == "" {
return s.defaultTopic
}
schema = ddl.GetCurrentSchemaName()
table = ddl.GetCurrentTableName()
schema = ddl.GetSchemaName()
table = ddl.GetTableName()
}

topicGenerator := s.matchTopicGenerator(schema, table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,10 @@ func TestGetTopicForDDL(t *testing.T) {
},
{
ddl: &commonEvent.DDLEvent{
PrevSchemaName: "test1",
PrevTableName: "tb1",
SchemaName: "test1",
TableName: "tb2",
ExtraSchemaName: "test1",
ExtraTableName: "tb1",
SchemaName: "test1",
TableName: "tb2",
},
expectedTopic: "test1_tb1",
},
Expand Down
12 changes: 7 additions & 5 deletions logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,13 @@ func unmarshalPersistedDDLEvent(value []byte) PersistedDDLEvent {
}
ddlEvent.TableInfoValue = nil

if ddlEvent.PreTableInfoValue != nil {
if ddlEvent.ExtraTableInfoValue != nil {
var err error
ddlEvent.PreTableInfo, err = common.UnmarshalJSONToTableInfo(ddlEvent.PreTableInfoValue)
ddlEvent.ExtraTableInfo, err = common.UnmarshalJSONToTableInfo(ddlEvent.ExtraTableInfoValue)
if err != nil {
log.Fatal("unmarshal pre table info failed", zap.Error(err))
}
ddlEvent.PreTableInfoValue = nil
ddlEvent.ExtraTableInfoValue = nil
}

if len(ddlEvent.MultipleTableInfosValue) > 0 {
Expand Down Expand Up @@ -446,8 +446,8 @@ func writePersistedDDLEvent(db *pebble.DB, ddlEvent *PersistedDDLEvent) error {
if err != nil {
return err
}
if ddlEvent.PreTableInfo != nil {
ddlEvent.PreTableInfoValue, err = ddlEvent.PreTableInfo.Marshal()
if ddlEvent.ExtraTableInfo != nil {
ddlEvent.ExtraTableInfoValue, err = ddlEvent.ExtraTableInfo.Marshal()
if err != nil {
return err
}
Expand Down Expand Up @@ -646,6 +646,7 @@ func loadAllPhysicalTablesAtTs(
return nil, err
}
log.Info("after load tables in kv snap",
zap.Int("tableInfoMapLen", len(tableInfoMap)),
zap.Int("tableMapLen", len(tableMap)),
zap.Int("partitionMapLen", len(partitionMap)))

Expand Down Expand Up @@ -680,6 +681,7 @@ func loadAllPhysicalTablesAtTs(
})
}
log.Info("after load tables from ddl",
zap.Int("tableInfoMapLen", len(tableInfoMap)),
zap.Int("tableMapLen", len(tableMap)),
zap.Int("partitionMapLen", len(partitionMap)))
tables := make([]commonEvent.Table, 0)
Expand Down
51 changes: 30 additions & 21 deletions logservice/schemastore/multi_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (

func TestBuildVersionedTableInfoStore(t *testing.T) {
type QueryTableInfoTestCase struct {
snapTs uint64
name string
snapTs uint64
schemaName string
tableName string
}
testCases := []struct {
tableID int64
Expand All @@ -41,8 +42,9 @@ func TestBuildVersionedTableInfoStore(t *testing.T) {
}(),
queryCases: []QueryTableInfoTestCase{
{
snapTs: 1000,
name: "t",
snapTs: 1000,
schemaName: "test",
tableName: "t",
},
},
deleteVersion: 1010,
Expand All @@ -57,12 +59,14 @@ func TestBuildVersionedTableInfoStore(t *testing.T) {
}(),
queryCases: []QueryTableInfoTestCase{
{
snapTs: 1010,
name: "t",
snapTs: 1010,
schemaName: "test",
tableName: "t",
},
{
snapTs: 1020,
name: "t2",
snapTs: 1020,
schemaName: "test",
tableName: "t2",
},
},
},
Expand All @@ -71,18 +75,20 @@ func TestBuildVersionedTableInfoStore(t *testing.T) {
tableID: 101,
ddlEvents: func() []*PersistedDDLEvent {
return []*PersistedDDLEvent{
buildCreatePartitionTableEventForTest(10, 100, "test", "partition_table", []int64{101, 102, 103}, 1010), // create partition table 100 with partitions 101, 102, 103
buildExchangePartitionTableEventForTest(10, 200, 10, 100, "test", "normal_table", "test", "partition_table", []int64{101, 102, 103}, []int64{200, 102, 103}, 1020), // rename table 101
buildCreatePartitionTableEventForTest(10, 100, "test", "partition_table", []int64{101, 102, 103}, 1010), // create partition table 100 with partitions 101, 102, 103
buildExchangePartitionTableEventForTest(12, 200, 10, 100, "test2", "normal_table", "test", "partition_table", []int64{101, 102, 103}, []int64{200, 102, 103}, 1020), // rename table 101
}
}(),
queryCases: []QueryTableInfoTestCase{
{
snapTs: 1010,
name: "partition_table",
snapTs: 1010,
schemaName: "test",
tableName: "partition_table",
},
{
snapTs: 1020,
name: "normal_table",
snapTs: 1020,
schemaName: "test2",
tableName: "normal_table",
},
},
},
Expand All @@ -91,18 +97,20 @@ func TestBuildVersionedTableInfoStore(t *testing.T) {
tableID: 200,
ddlEvents: func() []*PersistedDDLEvent {
return []*PersistedDDLEvent{
buildCreateTableEventForTest(10, 200, "test", "normal_table", 1010), // create table 200
buildExchangePartitionTableEventForTest(10, 200, 10, 100, "test", "normal_table", "test", "partition_table", []int64{101, 102, 103}, []int64{200, 102, 103}, 1020), // rename table 101
buildCreateTableEventForTest(10, 200, "test", "normal_table", 1010), // create table 200
buildExchangePartitionTableEventForTest(10, 200, 12, 100, "test", "normal_table", "test2", "partition_table", []int64{101, 102, 103}, []int64{200, 102, 103}, 1020), // rename table 101
}
}(),
queryCases: []QueryTableInfoTestCase{
{
snapTs: 1010,
name: "normal_table",
snapTs: 1010,
schemaName: "test",
tableName: "normal_table",
},
{
snapTs: 1020,
name: "partition_table",
snapTs: 1020,
schemaName: "test2",
tableName: "partition_table",
},
},
},
Expand All @@ -116,7 +124,8 @@ func TestBuildVersionedTableInfoStore(t *testing.T) {
for _, c := range tt.queryCases {
tableInfo, err := store.getTableInfo(c.snapTs)
require.Nil(t, err)
require.Equal(t, c.name, tableInfo.TableName.Table)
require.Equal(t, c.schemaName, tableInfo.TableName.Schema)
require.Equal(t, c.tableName, tableInfo.TableName.Table)
if !tableInfo.TableName.IsPartition {
require.Equal(t, tt.tableID, tableInfo.TableName.TableID)
}
Expand Down
70 changes: 35 additions & 35 deletions logservice/schemastore/multi_version_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (

func buildCreateTableEventForTest(schemaID, tableID int64, schemaName, tableName string, finishedTs uint64) *PersistedDDLEvent {
return &PersistedDDLEvent{
Type: byte(model.ActionCreateTable),
CurrentSchemaID: schemaID,
CurrentTableID: tableID,
CurrentSchemaName: schemaName,
CurrentTableName: tableName,
Type: byte(model.ActionCreateTable),
SchemaID: schemaID,
TableID: tableID,
SchemaName: schemaName,
TableName: tableName,
TableInfo: &model.TableInfo{
ID: tableID,
Name: pmodel.NewCIStr(tableName),
Expand All @@ -42,11 +42,11 @@ func buildCreatePartitionTableEventForTest(schemaID, tableID int64, schemaName,
})
}
return &PersistedDDLEvent{
Type: byte(model.ActionCreateTable),
CurrentSchemaID: schemaID,
CurrentTableID: tableID,
CurrentSchemaName: schemaName,
CurrentTableName: tableName,
Type: byte(model.ActionCreateTable),
SchemaID: schemaID,
TableID: tableID,
SchemaName: schemaName,
TableName: tableName,
TableInfo: &model.TableInfo{
ID: tableID,
Name: pmodel.NewCIStr(tableName),
Expand All @@ -61,12 +61,12 @@ func buildCreatePartitionTableEventForTest(schemaID, tableID int64, schemaName,

func buildTruncateTableEventForTest(schemaID, oldTableID, newTableID int64, schemaName, tableName string, finishedTs uint64) *PersistedDDLEvent {
return &PersistedDDLEvent{
Type: byte(model.ActionTruncateTable),
CurrentSchemaID: schemaID,
CurrentTableID: newTableID,
CurrentSchemaName: schemaName,
CurrentTableName: tableName,
PrevTableID: oldTableID,
Type: byte(model.ActionTruncateTable),
SchemaID: schemaID,
TableID: oldTableID,
SchemaName: schemaName,
TableName: tableName,
ExtraTableID: newTableID,
TableInfo: &model.TableInfo{
ID: newTableID,
Name: pmodel.NewCIStr(tableName),
Expand All @@ -75,16 +75,16 @@ func buildTruncateTableEventForTest(schemaID, oldTableID, newTableID int64, sche
}
}

func buildRenameTableEventForTest(prevSchemaID, schemaID, tableID int64, prevSchemaName, prevTableName, schemaName, tableName string, finishedTs uint64) *PersistedDDLEvent {
func buildRenameTableEventForTest(extraSchemaID, schemaID, tableID int64, extraSchemaName, extraTableName, schemaName, tableName string, finishedTs uint64) *PersistedDDLEvent {
return &PersistedDDLEvent{
Type: byte(model.ActionRenameTable),
CurrentSchemaID: schemaID,
CurrentTableID: tableID,
CurrentSchemaName: schemaName,
CurrentTableName: tableName,
PrevSchemaID: prevSchemaID,
PrevSchemaName: prevSchemaName,
PrevTableName: prevTableName,
Type: byte(model.ActionRenameTable),
SchemaID: schemaID,
TableID: tableID,
SchemaName: schemaName,
TableName: tableName,
ExtraSchemaID: extraSchemaID,
ExtraSchemaName: extraSchemaName,
ExtraTableName: extraTableName,
TableInfo: &model.TableInfo{
ID: tableID,
Name: pmodel.NewCIStr(tableName),
Expand All @@ -105,15 +105,15 @@ func buildExchangePartitionTableEventForTest(
})
}
return &PersistedDDLEvent{
Type: byte(model.ActionExchangeTablePartition),
CurrentSchemaID: partitionSchemaID,
CurrentTableID: partitionTableID,
CurrentSchemaName: partitionSchemaName,
CurrentTableName: partitionTableName,
PrevSchemaID: normalSchemaID,
PrevTableID: normalTableID,
PrevSchemaName: normalSchemaName,
PrevTableName: normalTableName,
Type: byte(model.ActionExchangeTablePartition),
SchemaID: normalSchemaID,
TableID: normalTableID,
SchemaName: normalSchemaName,
TableName: normalTableName,
ExtraSchemaID: partitionSchemaID,
ExtraTableID: partitionTableID,
ExtraSchemaName: partitionSchemaName,
ExtraTableName: partitionTableName,
TableInfo: &model.TableInfo{
ID: partitionTableID,
Name: pmodel.NewCIStr(partitionTableName),
Expand All @@ -122,7 +122,7 @@ func buildExchangePartitionTableEventForTest(
Enable: true,
},
},
PreTableInfo: common.WrapTableInfo(normalSchemaID, normalSchemaName, &model.TableInfo{
ExtraTableInfo: common.WrapTableInfo(normalSchemaID, normalSchemaName, &model.TableInfo{
ID: normalTableID,
Name: pmodel.NewCIStr(normalTableName),
}),
Expand Down
2 changes: 1 addition & 1 deletion logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error {

// TODO: do we have a better way to do this?
if ddlEvent.Type == byte(model.ActionExchangeTablePartition) {
ddlEvent.PreTableInfo, _ = p.forceGetTableInfo(ddlEvent.PrevTableID, ddlEvent.FinishedTs)
ddlEvent.ExtraTableInfo, _ = p.forceGetTableInfo(ddlEvent.TableID, ddlEvent.FinishedTs)
}

// Note: need write ddl event to disk before update ddl history,
Expand Down
Loading
Loading