diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index e346cad18..de51732d3 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -185,7 +185,8 @@ func New( // TODO: close pebble db at exit for i := 0; i < dbCount; i++ { db, err := pebble.Open(fmt.Sprintf("%s/%d", dbPath, i), &pebble.Options{ - DisableWAL: true, + DisableWAL: true, + MemTableSize: 8 << 20, }) if err != nil { log.Fatal("open db failed", zap.Error(err)) @@ -378,7 +379,7 @@ func (e *eventStore) Close(ctx context.Context) error { } func (e *eventStore) updateMetrics(ctx context.Context) error { - ticker := time.NewTicker(3 * time.Second) + ticker := time.NewTicker(5 * time.Second) for { select { case <-ctx.Done(): diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index 07fad6320..04dcd753e 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -183,7 +183,7 @@ func loadDatabasesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*Basic return databaseMap, nil } -func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*BasicTableInfo, error) { +func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int64]*BasicDatabaseInfo) (map[int64]*BasicTableInfo, error) { tablesInKVSnap := make(map[int64]*BasicTableInfo) startKey, err := tableInfoKey(gcTs, 0) @@ -212,6 +212,15 @@ func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*BasicTab if err := json.Unmarshal(table_info_entry.TableInfoValue, &tbNameInfo); err != nil { log.Fatal("unmarshal table name info failed", zap.Error(err)) } + databaseInfo, ok := databaseMap[table_info_entry.SchemaID] + if !ok { + log.Panic("database not found", + zap.Int64("schemaID", table_info_entry.SchemaID), + zap.String("schemaName", table_info_entry.SchemaName), + zap.String("tableName", tbNameInfo.Name.O)) + } + // TODO: add a unit test for this case + databaseInfo.Tables[tbNameInfo.ID] = true tablesInKVSnap[tbNameInfo.ID] = &BasicTableInfo{ SchemaID: table_info_entry.SchemaID, Name: tbNameInfo.Name.O, @@ -502,7 +511,7 @@ func cleanObseleteData(db *pebble.DB, oldGcTs uint64, gcTs uint64) { } } -func loadAllPhysicalTablesInSnap( +func loadAllPhysicalTablesAtTs( storageSnap *pebble.Snapshot, gcTs uint64, snapVersion uint64, @@ -514,10 +523,12 @@ func loadAllPhysicalTablesInSnap( return nil, err } - tableMap, err := loadTablesInKVSnap(storageSnap, gcTs) + tableMap, err := loadTablesInKVSnap(storageSnap, gcTs, databaseMap) if err != nil { return nil, err } + log.Info("after load tables in kv snap", + zap.Int("tableMapLen", len(tableMap))) // apply ddl jobs in range (gcTs, snapVersion] startKey, err := ddlJobKey(gcTs + 1) @@ -549,8 +560,17 @@ func loadAllPhysicalTablesInSnap( log.Panic("updateDatabaseInfo error", zap.Error(err)) } } + log.Info("after load tables from ddl", + zap.Int("tableMapLen", len(tableMap))) tables := make([]common.Table, 0) for tableID, tableInfo := range tableMap { + if _, ok := databaseMap[tableInfo.SchemaID]; !ok { + log.Panic("database not found", + zap.Int64("schemaID", tableInfo.SchemaID), + zap.Int64("tableID", tableID), + zap.String("tableName", tableInfo.Name), + zap.Any("databaseMapLen", len(databaseMap))) + } if tableFilter != nil && tableFilter.ShouldIgnoreTable(databaseMap[tableInfo.SchemaID].Name, tableInfo.Name) { continue } diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 190e01ee1..135f642b4 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -98,7 +98,9 @@ func newPersistentStorage( } // TODO: update pebble options - db, err := pebble.Open(dbPath, &pebble.Options{}) + db, err := pebble.Open(dbPath, &pebble.Options{ + DisableWAL: true, + }) if err != nil { log.Fatal("open db failed", zap.Error(err)) } @@ -162,7 +164,9 @@ func (p *persistentStorage) initializeFromKVStorage(dbPath string, storage kv.St var err error // TODO: update pebble options - if p.db, err = pebble.Open(dbPath, &pebble.Options{}); err != nil { + if p.db, err = pebble.Open(dbPath, &pebble.Options{ + DisableWAL: true, + }); err != nil { log.Fatal("open db failed", zap.Error(err)) } log.Info("schema store initialize from kv storage begin", @@ -195,7 +199,7 @@ func (p *persistentStorage) initializeFromDisk() { log.Fatal("load database info from disk failed") } - if p.tableMap, err = loadTablesInKVSnap(storageSnap, p.gcTs); err != nil { + if p.tableMap, err = loadTablesInKVSnap(storageSnap, p.gcTs, p.databaseMap); err != nil { log.Fatal("load tables in kv snapshot failed") } @@ -226,9 +230,9 @@ func (p *persistentStorage) getAllPhysicalTables(snapTs uint64, tableFilter filt defer func() { log.Info("getAllPhysicalTables finish", zap.Uint64("snapTs", snapTs), - zap.Any("duration", time.Since(start).Seconds())) + zap.Any("duration(s)", time.Since(start).Seconds())) }() - return loadAllPhysicalTablesInSnap(storageSnap, gcTs, snapTs, tableFilter) + return loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter) } // only return when table info is initialized @@ -580,6 +584,11 @@ func (p *persistentStorage) handleSortedDDLEvents(ddlEvents ...PersistedDDLEvent for i := range ddlEvents { p.mu.Lock() + log.Info("handle resolved ddl event", + zap.Int64("schemaID", ddlEvents[i].SchemaID), + zap.Int64("tableID", ddlEvents[i].TableID), + zap.Uint64("finishedTs", ddlEvents[i].FinishedTs), + zap.String("query", ddlEvents[i].Query)) if shouldSkipDDL(&ddlEvents[i], p.databaseMap, p.tableMap) { p.mu.Unlock() continue @@ -648,6 +657,10 @@ func completePersistedDDLEvent( switch model.ActionType(event.Type) { case model.ActionCreateSchema, model.ActionDropSchema: + log.Info("completePersistedDDLEvent for create/drop schema", + zap.Any("type", event.Type), + zap.Int64("schemaID", event.SchemaID), + zap.String("schemaName", event.DBInfo.Name.O)) event.SchemaName = event.DBInfo.Name.O case model.ActionCreateTable: event.SchemaName = getSchemaName(event.SchemaID) @@ -831,6 +844,9 @@ func updateDatabaseInfoAndTableInfo( Tables: make(map[int64]bool), } case model.ActionDropSchema: + for tableID := range databaseMap[event.SchemaID].Tables { + delete(tableMap, tableID) + } delete(databaseMap, event.SchemaID) case model.ActionCreateTable: createTable(event.SchemaID, event.TableID) diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 058e0fb5b..0691145c1 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -211,6 +211,24 @@ func TestBuildVersionedTableInfoStore(t *testing.T) { require.Nil(t, err) } + // // create another table + // tableID2 := tableID + 1 + // { + // ddlEvent := PersistedDDLEvent{ + // Type: byte(model.ActionCreateTable), + // SchemaID: schemaID, + // TableID: tableID, + // SchemaVersion: 3000, + // TableInfo: &model.TableInfo{ + // ID: tableID, + // Name: model.NewCIStr("t2"), + // }, + // FinishedTs: renameVersion, + // } + // err = pStorage.handleSortedDDLEvents(ddlEvent) + // require.Nil(t, err) + // } + upperBound := UpperBoundMeta{ FinishedDDLTs: 3000, SchemaVersion: 4000, @@ -386,6 +404,7 @@ func TestHandleCreateDropSchemaTableDDL(t *testing.T) { pStorage.handleSortedDDLEvents(ddlEvent) require.Equal(t, 0, len(pStorage.databaseMap)) + require.Equal(t, 0, len(pStorage.tableMap)) require.Equal(t, 5, len(pStorage.tableTriggerDDLHistory)) require.Equal(t, uint64(300), pStorage.tableTriggerDDLHistory[4]) require.Equal(t, 3, len(pStorage.tablesDDLHistory)) diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 72b06f55b..f7f403133 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -189,6 +189,10 @@ func (s *schemaStore) GetAllPhysicalTables(snapTs uint64, filter filter.Filter) func (s *schemaStore) RegisterTable(tableID int64, startTs uint64) error { metrics.SchemaStoreResolvedRegisterTableGauge.Inc() s.waitResolvedTs(tableID, startTs, 5*time.Second) + log.Info("register table", + zap.Int64("tableID", tableID), + zap.Uint64("startTs", startTs), + zap.Uint64("resolvedTs", s.resolvedTs.Load())) return s.dataStorage.registerTable(tableID, startTs) }