From eea6a1ed918273eef6c9b057a507bd54c2340fcc Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 18:01:16 +0800 Subject: [PATCH 01/18] ajust some config and add log --- logservice/eventstore/event_store.go | 5 +++-- logservice/schemastore/disk_format.go | 5 +++++ logservice/schemastore/persist_storage.go | 8 ++++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index e346cad18..de51732d3 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -185,7 +185,8 @@ func New( // TODO: close pebble db at exit for i := 0; i < dbCount; i++ { db, err := pebble.Open(fmt.Sprintf("%s/%d", dbPath, i), &pebble.Options{ - DisableWAL: true, + DisableWAL: true, + MemTableSize: 8 << 20, }) if err != nil { log.Fatal("open db failed", zap.Error(err)) @@ -378,7 +379,7 @@ func (e *eventStore) Close(ctx context.Context) error { } func (e *eventStore) updateMetrics(ctx context.Context) error { - ticker := time.NewTicker(3 * time.Second) + ticker := time.NewTicker(5 * time.Second) for { select { case <-ctx.Done(): diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index 07fad6320..d5064a770 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -551,6 +551,11 @@ func loadAllPhysicalTablesInSnap( } tables := make([]common.Table, 0) for tableID, tableInfo := range tableMap { + if _, ok := databaseMap[tableInfo.SchemaID]; !ok { + log.Panic("database not found", + zap.Int64("schemaID", tableInfo.SchemaID), + zap.String("tableName", tableInfo.Name)) + } if tableFilter != nil && tableFilter.ShouldIgnoreTable(databaseMap[tableInfo.SchemaID].Name, tableInfo.Name) { continue } diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 190e01ee1..57de8ece9 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -98,7 +98,9 @@ func newPersistentStorage( } // TODO: update pebble options - db, err := pebble.Open(dbPath, &pebble.Options{}) + db, err := pebble.Open(dbPath, &pebble.Options{ + DisableWAL: true, + }) if err != nil { log.Fatal("open db failed", zap.Error(err)) } @@ -162,7 +164,9 @@ func (p *persistentStorage) initializeFromKVStorage(dbPath string, storage kv.St var err error // TODO: update pebble options - if p.db, err = pebble.Open(dbPath, &pebble.Options{}); err != nil { + if p.db, err = pebble.Open(dbPath, &pebble.Options{ + DisableWAL: true, + }); err != nil { log.Fatal("open db failed", zap.Error(err)) } log.Info("schema store initialize from kv storage begin", From 9c08b60c02b08813c91ed7d4af9ca9728553316d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 18:05:16 +0800 Subject: [PATCH 02/18] add more log --- logservice/schemastore/schema_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 72b06f55b..07de1bc3c 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -257,7 +257,7 @@ func (s *schemaStore) FetchTableTriggerDDLEvents(tableFilter filter.Filter, star } func (s *schemaStore) writeDDLEvent(ddlEvent DDLJobWithCommitTs) { - log.Debug("write ddl event", + log.Info("write ddl event", zap.Int64("schemaID", ddlEvent.Job.SchemaID), zap.Int64("tableID", ddlEvent.Job.TableID), zap.Uint64("finishedTs", ddlEvent.Job.BinlogInfo.FinishedTS), From bab79688466fb43f8d6f0445ac0759962ec1a07d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 20:33:44 +0800 Subject: [PATCH 03/18] improve log --- logservice/schemastore/disk_format.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index d5064a770..e04f46e36 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -554,7 +554,8 @@ func loadAllPhysicalTablesInSnap( if _, ok := databaseMap[tableInfo.SchemaID]; !ok { log.Panic("database not found", zap.Int64("schemaID", tableInfo.SchemaID), - zap.String("tableName", tableInfo.Name)) + zap.String("tableName", tableInfo.Name), + zap.Any("")) } if tableFilter != nil && tableFilter.ShouldIgnoreTable(databaseMap[tableInfo.SchemaID].Name, tableInfo.Name) { continue From 39d61580213f8f6d899ad725f2ef50c008951997 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 20:34:58 +0800 Subject: [PATCH 04/18] fix log --- logservice/schemastore/disk_format.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index e04f46e36..6c6a77f8a 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -555,7 +555,7 @@ func loadAllPhysicalTablesInSnap( log.Panic("database not found", zap.Int64("schemaID", tableInfo.SchemaID), zap.String("tableName", tableInfo.Name), - zap.Any("")) + zap.Any("databaseMap", databaseMap)) } if tableFilter != nil && tableFilter.ShouldIgnoreTable(databaseMap[tableInfo.SchemaID].Name, tableInfo.Name) { continue From fd7566a1beae73e20b581ccb257d2ee9a6a95170 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 20:43:12 +0800 Subject: [PATCH 05/18] add more log --- logservice/schemastore/disk_format.go | 2 +- logservice/schemastore/persist_storage.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index 6c6a77f8a..24a131b6a 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -555,7 +555,7 @@ func loadAllPhysicalTablesInSnap( log.Panic("database not found", zap.Int64("schemaID", tableInfo.SchemaID), zap.String("tableName", tableInfo.Name), - zap.Any("databaseMap", databaseMap)) + zap.Any("databaseMapLen", len(databaseMap))) } if tableFilter != nil && tableFilter.ShouldIgnoreTable(databaseMap[tableInfo.SchemaID].Name, tableInfo.Name) { continue diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 57de8ece9..cdc9f9f31 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -815,6 +815,9 @@ func updateDatabaseInfoAndTableInfo( } createTable := func(schemaID int64, tableID int64) { + log.Info("create table", + zap.Int64("schemaID", schemaID), + zap.Int64("tableID", tableID)) addTableToDB(schemaID, tableID) tableMap[tableID] = &BasicTableInfo{ SchemaID: schemaID, From b55512641003e4a6b62d8ad40e053175502e2212 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 20:43:44 +0800 Subject: [PATCH 06/18] add more log --- logservice/schemastore/persist_storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index cdc9f9f31..e1428b4bb 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -815,7 +815,7 @@ func updateDatabaseInfoAndTableInfo( } createTable := func(schemaID int64, tableID int64) { - log.Info("create table", + log.Info("create table in updateDatabaseInfoAndTableInfo", zap.Int64("schemaID", schemaID), zap.Int64("tableID", tableID)) addTableToDB(schemaID, tableID) From 7815ee660d86340b06ae12d9252ed3364370f00c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 21:05:04 +0800 Subject: [PATCH 07/18] add more log --- logservice/schemastore/disk_format.go | 14 ++++++++++++-- logservice/schemastore/persist_storage.go | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index 24a131b6a..a3101023a 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -183,7 +183,7 @@ func loadDatabasesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*Basic return databaseMap, nil } -func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*BasicTableInfo, error) { +func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int64]*BasicDatabaseInfo) (map[int64]*BasicTableInfo, error) { tablesInKVSnap := make(map[int64]*BasicTableInfo) startKey, err := tableInfoKey(gcTs, 0) @@ -212,6 +212,11 @@ func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*BasicTab if err := json.Unmarshal(table_info_entry.TableInfoValue, &tbNameInfo); err != nil { log.Fatal("unmarshal table name info failed", zap.Error(err)) } + if _, ok := databaseMap[table_info_entry.SchemaID]; !ok { + log.Panic("database not found", + zap.Int64("schemaID", table_info_entry.SchemaID), + zap.String("schemaName", table_info_entry.SchemaName), + zap.String("tableName", tbNameInfo.Name.O)) tablesInKVSnap[tbNameInfo.ID] = &BasicTableInfo{ SchemaID: table_info_entry.SchemaID, Name: tbNameInfo.Name.O, @@ -502,7 +507,7 @@ func cleanObseleteData(db *pebble.DB, oldGcTs uint64, gcTs uint64) { } } -func loadAllPhysicalTablesInSnap( +func loadAllPhysicalTablesAtTs( storageSnap *pebble.Snapshot, gcTs uint64, snapVersion uint64, @@ -518,6 +523,8 @@ func loadAllPhysicalTablesInSnap( if err != nil { return nil, err } + log.Info("load tables in kv snap", + zap.Int("tableMapLen", len(tableMap))) // apply ddl jobs in range (gcTs, snapVersion] startKey, err := ddlJobKey(gcTs + 1) @@ -549,11 +556,14 @@ func loadAllPhysicalTablesInSnap( log.Panic("updateDatabaseInfo error", zap.Error(err)) } } + log.Info("load tables in kv snap from ddl", + zap.Int("tableMapLen", len(tableMap))) tables := make([]common.Table, 0) for tableID, tableInfo := range tableMap { if _, ok := databaseMap[tableInfo.SchemaID]; !ok { log.Panic("database not found", zap.Int64("schemaID", tableInfo.SchemaID), + zap.Int64("tableID", tableID), zap.String("tableName", tableInfo.Name), zap.Any("databaseMapLen", len(databaseMap))) } diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index e1428b4bb..792ee673d 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -232,7 +232,7 @@ func (p *persistentStorage) getAllPhysicalTables(snapTs uint64, tableFilter filt zap.Uint64("snapTs", snapTs), zap.Any("duration", time.Since(start).Seconds())) }() - return loadAllPhysicalTablesInSnap(storageSnap, gcTs, snapTs, tableFilter) + return loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter) } // only return when table info is initialized From 4094217f4f86a84e1f78069b7f9e1b6ae8086ef4 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 21:06:35 +0800 Subject: [PATCH 08/18] fix --- logservice/schemastore/disk_format.go | 3 ++- logservice/schemastore/persist_storage.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index a3101023a..a1e33eaf3 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -217,6 +217,7 @@ func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int6 zap.Int64("schemaID", table_info_entry.SchemaID), zap.String("schemaName", table_info_entry.SchemaName), zap.String("tableName", tbNameInfo.Name.O)) + } tablesInKVSnap[tbNameInfo.ID] = &BasicTableInfo{ SchemaID: table_info_entry.SchemaID, Name: tbNameInfo.Name.O, @@ -519,7 +520,7 @@ func loadAllPhysicalTablesAtTs( return nil, err } - tableMap, err := loadTablesInKVSnap(storageSnap, gcTs) + tableMap, err := loadTablesInKVSnap(storageSnap, gcTs, databaseMap) if err != nil { return nil, err } diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 792ee673d..ff210473f 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -199,7 +199,7 @@ func (p *persistentStorage) initializeFromDisk() { log.Fatal("load database info from disk failed") } - if p.tableMap, err = loadTablesInKVSnap(storageSnap, p.gcTs); err != nil { + if p.tableMap, err = loadTablesInKVSnap(storageSnap, p.gcTs, p.databaseMap); err != nil { log.Fatal("load tables in kv snapshot failed") } From 1e6c20b1c8ba4eda81584ccca7f3e9613df2683a Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 21:21:07 +0800 Subject: [PATCH 09/18] try fix --- logservice/schemastore/persist_storage.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index ff210473f..c416b7654 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -838,6 +838,9 @@ func updateDatabaseInfoAndTableInfo( Tables: make(map[int64]bool), } case model.ActionDropSchema: + for tableID := range databaseMap[event.SchemaID].Tables { + dropTable(event.SchemaID, tableID) + } delete(databaseMap, event.SchemaID) case model.ActionCreateTable: createTable(event.SchemaID, event.TableID) From 1eaec91c01b0d20b3d29f150deb3b06285443708 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 21:34:31 +0800 Subject: [PATCH 10/18] remove log --- logservice/schemastore/persist_storage.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index c416b7654..3fb59899f 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -815,9 +815,6 @@ func updateDatabaseInfoAndTableInfo( } createTable := func(schemaID int64, tableID int64) { - log.Info("create table in updateDatabaseInfoAndTableInfo", - zap.Int64("schemaID", schemaID), - zap.Int64("tableID", tableID)) addTableToDB(schemaID, tableID) tableMap[tableID] = &BasicTableInfo{ SchemaID: schemaID, From e301ef030f7aa15418be7e7e50088301929f268c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 21:51:08 +0800 Subject: [PATCH 11/18] try fix --- logservice/schemastore/persist_storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 3fb59899f..ecbfab30f 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -836,7 +836,7 @@ func updateDatabaseInfoAndTableInfo( } case model.ActionDropSchema: for tableID := range databaseMap[event.SchemaID].Tables { - dropTable(event.SchemaID, tableID) + delete(tableMap, tableID) } delete(databaseMap, event.SchemaID) case model.ActionCreateTable: From ee72c7871ff374f8d9ddb8526cce284db10b05da Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 21:58:05 +0800 Subject: [PATCH 12/18] add more log --- logservice/schemastore/persist_storage.go | 6 ++++++ logservice/schemastore/persist_storage_test.go | 1 + 2 files changed, 7 insertions(+) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index ecbfab30f..8d75dc1de 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -835,9 +835,15 @@ func updateDatabaseInfoAndTableInfo( Tables: make(map[int64]bool), } case model.ActionDropSchema: + log.Info("drop schema in updateDatabaseInfo", + zap.Int64("schemaID", event.SchemaID), + zap.Int("tableMapLen", len(tableMap))) for tableID := range databaseMap[event.SchemaID].Tables { delete(tableMap, tableID) } + log.Info("after drop schema in updateDatabaseInfo", + zap.Int64("schemaID", event.SchemaID), + zap.Int("tableMapLen", len(tableMap))) delete(databaseMap, event.SchemaID) case model.ActionCreateTable: createTable(event.SchemaID, event.TableID) diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 058e0fb5b..446533454 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -386,6 +386,7 @@ func TestHandleCreateDropSchemaTableDDL(t *testing.T) { pStorage.handleSortedDDLEvents(ddlEvent) require.Equal(t, 0, len(pStorage.databaseMap)) + require.Equal(t, 0, len(pStorage.tableMap)) require.Equal(t, 5, len(pStorage.tableTriggerDDLHistory)) require.Equal(t, uint64(300), pStorage.tableTriggerDDLHistory[4]) require.Equal(t, 3, len(pStorage.tablesDDLHistory)) From 5168137cb1c13028dcb25d0c0c6581289994242c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 22:08:12 +0800 Subject: [PATCH 13/18] fix --- logservice/schemastore/disk_format.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index a1e33eaf3..bb6b9d835 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -212,7 +212,8 @@ func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int6 if err := json.Unmarshal(table_info_entry.TableInfoValue, &tbNameInfo); err != nil { log.Fatal("unmarshal table name info failed", zap.Error(err)) } - if _, ok := databaseMap[table_info_entry.SchemaID]; !ok { + databaseInfo, ok := databaseMap[table_info_entry.SchemaID] + if !ok { log.Panic("database not found", zap.Int64("schemaID", table_info_entry.SchemaID), zap.String("schemaName", table_info_entry.SchemaName), @@ -223,6 +224,8 @@ func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int6 Name: tbNameInfo.Name.O, InKVSnap: true, } + // TODO: add a unit test for this case + databaseInfo.Tables[tbNameInfo.ID] = true } return tablesInKVSnap, nil From 7811ccaac0b0c790e86bf7590a16be8235f21fae Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 22:12:36 +0800 Subject: [PATCH 14/18] remove log --- logservice/schemastore/disk_format.go | 4 ++-- logservice/schemastore/persist_storage.go | 6 ------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index bb6b9d835..30756c5f9 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -219,13 +219,13 @@ func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int6 zap.String("schemaName", table_info_entry.SchemaName), zap.String("tableName", tbNameInfo.Name.O)) } + // TODO: add a unit test for this case + databaseInfo.Tables[tbNameInfo.ID] = true tablesInKVSnap[tbNameInfo.ID] = &BasicTableInfo{ SchemaID: table_info_entry.SchemaID, Name: tbNameInfo.Name.O, InKVSnap: true, } - // TODO: add a unit test for this case - databaseInfo.Tables[tbNameInfo.ID] = true } return tablesInKVSnap, nil diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 8d75dc1de..ecbfab30f 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -835,15 +835,9 @@ func updateDatabaseInfoAndTableInfo( Tables: make(map[int64]bool), } case model.ActionDropSchema: - log.Info("drop schema in updateDatabaseInfo", - zap.Int64("schemaID", event.SchemaID), - zap.Int("tableMapLen", len(tableMap))) for tableID := range databaseMap[event.SchemaID].Tables { delete(tableMap, tableID) } - log.Info("after drop schema in updateDatabaseInfo", - zap.Int64("schemaID", event.SchemaID), - zap.Int("tableMapLen", len(tableMap))) delete(databaseMap, event.SchemaID) case model.ActionCreateTable: createTable(event.SchemaID, event.TableID) From d4c89cafe24518389632350e5fe8d1d25e07568e Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 22:15:02 +0800 Subject: [PATCH 15/18] fix --- logservice/schemastore/disk_format.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index 30756c5f9..04dcd753e 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -527,7 +527,7 @@ func loadAllPhysicalTablesAtTs( if err != nil { return nil, err } - log.Info("load tables in kv snap", + log.Info("after load tables in kv snap", zap.Int("tableMapLen", len(tableMap))) // apply ddl jobs in range (gcTs, snapVersion] @@ -560,7 +560,7 @@ func loadAllPhysicalTablesAtTs( log.Panic("updateDatabaseInfo error", zap.Error(err)) } } - log.Info("load tables in kv snap from ddl", + log.Info("after load tables from ddl", zap.Int("tableMapLen", len(tableMap))) tables := make([]common.Table, 0) for tableID, tableInfo := range tableMap { From def568b6739a9c2b9bd6f30b1497c784eb4b9e50 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 25 Sep 2024 23:51:50 +0800 Subject: [PATCH 16/18] add more log --- logservice/schemastore/schema_store.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 07de1bc3c..6478701f6 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -188,6 +188,9 @@ func (s *schemaStore) GetAllPhysicalTables(snapTs uint64, filter filter.Filter) func (s *schemaStore) RegisterTable(tableID int64, startTs uint64) error { metrics.SchemaStoreResolvedRegisterTableGauge.Inc() + log.Info("register table", + zap.Int64("tableID", tableID), + zap.Uint64("startTs", startTs)) s.waitResolvedTs(tableID, startTs, 5*time.Second) return s.dataStorage.registerTable(tableID, startTs) } From a020c95b9a49871d1eb4c6f8d9b14bcdbd19ba80 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 26 Sep 2024 08:45:53 +0800 Subject: [PATCH 17/18] add some log --- logservice/schemastore/persist_storage.go | 8 ++++++++ logservice/schemastore/schema_store.go | 5 +++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index ecbfab30f..44ebd3932 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -584,6 +584,11 @@ func (p *persistentStorage) handleSortedDDLEvents(ddlEvents ...PersistedDDLEvent for i := range ddlEvents { p.mu.Lock() + log.Info("handle resolved ddl event", + zap.Int64("schemaID", ddlEvents[i].SchemaID), + zap.Int64("tableID", ddlEvents[i].TableID), + zap.Uint64("finishedTs", ddlEvents[i].FinishedTs), + zap.String("query", ddlEvents[i].Query)) if shouldSkipDDL(&ddlEvents[i], p.databaseMap, p.tableMap) { p.mu.Unlock() continue @@ -815,6 +820,9 @@ func updateDatabaseInfoAndTableInfo( } createTable := func(schemaID int64, tableID int64) { + log.Info("updateDatabaseInfoAndTableInfo create table", + zap.Int64("schemaID", schemaID), + zap.Int64("tableID", tableID)) addTableToDB(schemaID, tableID) tableMap[tableID] = &BasicTableInfo{ SchemaID: schemaID, diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 6478701f6..f65e56f69 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -188,10 +188,11 @@ func (s *schemaStore) GetAllPhysicalTables(snapTs uint64, filter filter.Filter) func (s *schemaStore) RegisterTable(tableID int64, startTs uint64) error { metrics.SchemaStoreResolvedRegisterTableGauge.Inc() + s.waitResolvedTs(tableID, startTs, 5*time.Second) log.Info("register table", zap.Int64("tableID", tableID), - zap.Uint64("startTs", startTs)) - s.waitResolvedTs(tableID, startTs, 5*time.Second) + zap.Uint64("startTs", startTs), + zap.Uint64("resolvedTs", s.resolvedTs.Load())) return s.dataStorage.registerTable(tableID, startTs) } From fe57dc826a424abb109757798023055815c2851c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 26 Sep 2024 10:39:17 +0800 Subject: [PATCH 18/18] fix log --- logservice/schemastore/persist_storage.go | 9 +++++---- logservice/schemastore/persist_storage_test.go | 18 ++++++++++++++++++ logservice/schemastore/schema_store.go | 2 +- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 44ebd3932..135f642b4 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -230,7 +230,7 @@ func (p *persistentStorage) getAllPhysicalTables(snapTs uint64, tableFilter filt defer func() { log.Info("getAllPhysicalTables finish", zap.Uint64("snapTs", snapTs), - zap.Any("duration", time.Since(start).Seconds())) + zap.Any("duration(s)", time.Since(start).Seconds())) }() return loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter) } @@ -657,6 +657,10 @@ func completePersistedDDLEvent( switch model.ActionType(event.Type) { case model.ActionCreateSchema, model.ActionDropSchema: + log.Info("completePersistedDDLEvent for create/drop schema", + zap.Any("type", event.Type), + zap.Int64("schemaID", event.SchemaID), + zap.String("schemaName", event.DBInfo.Name.O)) event.SchemaName = event.DBInfo.Name.O case model.ActionCreateTable: event.SchemaName = getSchemaName(event.SchemaID) @@ -820,9 +824,6 @@ func updateDatabaseInfoAndTableInfo( } createTable := func(schemaID int64, tableID int64) { - log.Info("updateDatabaseInfoAndTableInfo create table", - zap.Int64("schemaID", schemaID), - zap.Int64("tableID", tableID)) addTableToDB(schemaID, tableID) tableMap[tableID] = &BasicTableInfo{ SchemaID: schemaID, diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 446533454..0691145c1 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -211,6 +211,24 @@ func TestBuildVersionedTableInfoStore(t *testing.T) { require.Nil(t, err) } + // // create another table + // tableID2 := tableID + 1 + // { + // ddlEvent := PersistedDDLEvent{ + // Type: byte(model.ActionCreateTable), + // SchemaID: schemaID, + // TableID: tableID, + // SchemaVersion: 3000, + // TableInfo: &model.TableInfo{ + // ID: tableID, + // Name: model.NewCIStr("t2"), + // }, + // FinishedTs: renameVersion, + // } + // err = pStorage.handleSortedDDLEvents(ddlEvent) + // require.Nil(t, err) + // } + upperBound := UpperBoundMeta{ FinishedDDLTs: 3000, SchemaVersion: 4000, diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index f65e56f69..f7f403133 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -261,7 +261,7 @@ func (s *schemaStore) FetchTableTriggerDDLEvents(tableFilter filter.Filter, star } func (s *schemaStore) writeDDLEvent(ddlEvent DDLJobWithCommitTs) { - log.Info("write ddl event", + log.Debug("write ddl event", zap.Int64("schemaID", ddlEvent.Job.SchemaID), zap.Int64("tableID", ddlEvent.Job.TableID), zap.Uint64("finishedTs", ddlEvent.Job.BinlogInfo.FinishedTS),