Skip to content

Commit 4c3751b

Browse files
authored
schemastore: fix panic and disable wal (#336)
* ajust some config and add log * add more log * improve log * fix log * add more log * add more log * add more log * fix * try fix * remove log * try fix * add more log * fix * remove log * fix * add more log * add some log * fix log
1 parent a8fea74 commit 4c3751b

File tree

5 files changed

+70
-10
lines changed

5 files changed

+70
-10
lines changed

logservice/eventstore/event_store.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ func New(
187187
// TODO: close pebble db at exit
188188
for i := 0; i < dbCount; i++ {
189189
db, err := pebble.Open(fmt.Sprintf("%s/%d", dbPath, i), &pebble.Options{
190-
DisableWAL: true,
190+
DisableWAL: true,
191+
MemTableSize: 8 << 20,
191192
})
192193
if err != nil {
193194
log.Fatal("open db failed", zap.Error(err))
@@ -380,7 +381,7 @@ func (e *eventStore) Close(ctx context.Context) error {
380381
}
381382

382383
func (e *eventStore) updateMetrics(ctx context.Context) error {
383-
ticker := time.NewTicker(3 * time.Second)
384+
ticker := time.NewTicker(5 * time.Second)
384385
for {
385386
select {
386387
case <-ctx.Done():

logservice/schemastore/disk_format.go

+23-3
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func loadDatabasesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*Basic
183183
return databaseMap, nil
184184
}
185185

186-
func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*BasicTableInfo, error) {
186+
func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int64]*BasicDatabaseInfo) (map[int64]*BasicTableInfo, error) {
187187
tablesInKVSnap := make(map[int64]*BasicTableInfo)
188188

189189
startKey, err := tableInfoKey(gcTs, 0)
@@ -212,6 +212,15 @@ func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*BasicTab
212212
if err := json.Unmarshal(table_info_entry.TableInfoValue, &tbNameInfo); err != nil {
213213
log.Fatal("unmarshal table name info failed", zap.Error(err))
214214
}
215+
databaseInfo, ok := databaseMap[table_info_entry.SchemaID]
216+
if !ok {
217+
log.Panic("database not found",
218+
zap.Int64("schemaID", table_info_entry.SchemaID),
219+
zap.String("schemaName", table_info_entry.SchemaName),
220+
zap.String("tableName", tbNameInfo.Name.O))
221+
}
222+
// TODO: add a unit test for this case
223+
databaseInfo.Tables[tbNameInfo.ID] = true
215224
tablesInKVSnap[tbNameInfo.ID] = &BasicTableInfo{
216225
SchemaID: table_info_entry.SchemaID,
217226
Name: tbNameInfo.Name.O,
@@ -502,7 +511,7 @@ func cleanObseleteData(db *pebble.DB, oldGcTs uint64, gcTs uint64) {
502511
}
503512
}
504513

505-
func loadAllPhysicalTablesInSnap(
514+
func loadAllPhysicalTablesAtTs(
506515
storageSnap *pebble.Snapshot,
507516
gcTs uint64,
508517
snapVersion uint64,
@@ -514,10 +523,12 @@ func loadAllPhysicalTablesInSnap(
514523
return nil, err
515524
}
516525

517-
tableMap, err := loadTablesInKVSnap(storageSnap, gcTs)
526+
tableMap, err := loadTablesInKVSnap(storageSnap, gcTs, databaseMap)
518527
if err != nil {
519528
return nil, err
520529
}
530+
log.Info("after load tables in kv snap",
531+
zap.Int("tableMapLen", len(tableMap)))
521532

522533
// apply ddl jobs in range (gcTs, snapVersion]
523534
startKey, err := ddlJobKey(gcTs + 1)
@@ -549,8 +560,17 @@ func loadAllPhysicalTablesInSnap(
549560
log.Panic("updateDatabaseInfo error", zap.Error(err))
550561
}
551562
}
563+
log.Info("after load tables from ddl",
564+
zap.Int("tableMapLen", len(tableMap)))
552565
tables := make([]common.Table, 0)
553566
for tableID, tableInfo := range tableMap {
567+
if _, ok := databaseMap[tableInfo.SchemaID]; !ok {
568+
log.Panic("database not found",
569+
zap.Int64("schemaID", tableInfo.SchemaID),
570+
zap.Int64("tableID", tableID),
571+
zap.String("tableName", tableInfo.Name),
572+
zap.Any("databaseMapLen", len(databaseMap)))
573+
}
554574
if tableFilter != nil && tableFilter.ShouldIgnoreTable(databaseMap[tableInfo.SchemaID].Name, tableInfo.Name) {
555575
continue
556576
}

logservice/schemastore/persist_storage.go

+21-5
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ func newPersistentStorage(
9898
}
9999

100100
// TODO: update pebble options
101-
db, err := pebble.Open(dbPath, &pebble.Options{})
101+
db, err := pebble.Open(dbPath, &pebble.Options{
102+
DisableWAL: true,
103+
})
102104
if err != nil {
103105
log.Fatal("open db failed", zap.Error(err))
104106
}
@@ -162,7 +164,9 @@ func (p *persistentStorage) initializeFromKVStorage(dbPath string, storage kv.St
162164

163165
var err error
164166
// TODO: update pebble options
165-
if p.db, err = pebble.Open(dbPath, &pebble.Options{}); err != nil {
167+
if p.db, err = pebble.Open(dbPath, &pebble.Options{
168+
DisableWAL: true,
169+
}); err != nil {
166170
log.Fatal("open db failed", zap.Error(err))
167171
}
168172
log.Info("schema store initialize from kv storage begin",
@@ -195,7 +199,7 @@ func (p *persistentStorage) initializeFromDisk() {
195199
log.Fatal("load database info from disk failed")
196200
}
197201

198-
if p.tableMap, err = loadTablesInKVSnap(storageSnap, p.gcTs); err != nil {
202+
if p.tableMap, err = loadTablesInKVSnap(storageSnap, p.gcTs, p.databaseMap); err != nil {
199203
log.Fatal("load tables in kv snapshot failed")
200204
}
201205

@@ -226,9 +230,9 @@ func (p *persistentStorage) getAllPhysicalTables(snapTs uint64, tableFilter filt
226230
defer func() {
227231
log.Info("getAllPhysicalTables finish",
228232
zap.Uint64("snapTs", snapTs),
229-
zap.Any("duration", time.Since(start).Seconds()))
233+
zap.Any("duration(s)", time.Since(start).Seconds()))
230234
}()
231-
return loadAllPhysicalTablesInSnap(storageSnap, gcTs, snapTs, tableFilter)
235+
return loadAllPhysicalTablesAtTs(storageSnap, gcTs, snapTs, tableFilter)
232236
}
233237

234238
// only return when table info is initialized
@@ -580,6 +584,11 @@ func (p *persistentStorage) handleSortedDDLEvents(ddlEvents ...PersistedDDLEvent
580584

581585
for i := range ddlEvents {
582586
p.mu.Lock()
587+
log.Info("handle resolved ddl event",
588+
zap.Int64("schemaID", ddlEvents[i].SchemaID),
589+
zap.Int64("tableID", ddlEvents[i].TableID),
590+
zap.Uint64("finishedTs", ddlEvents[i].FinishedTs),
591+
zap.String("query", ddlEvents[i].Query))
583592
if shouldSkipDDL(&ddlEvents[i], p.databaseMap, p.tableMap) {
584593
p.mu.Unlock()
585594
continue
@@ -648,6 +657,10 @@ func completePersistedDDLEvent(
648657
switch model.ActionType(event.Type) {
649658
case model.ActionCreateSchema,
650659
model.ActionDropSchema:
660+
log.Info("completePersistedDDLEvent for create/drop schema",
661+
zap.Any("type", event.Type),
662+
zap.Int64("schemaID", event.SchemaID),
663+
zap.String("schemaName", event.DBInfo.Name.O))
651664
event.SchemaName = event.DBInfo.Name.O
652665
case model.ActionCreateTable:
653666
event.SchemaName = getSchemaName(event.SchemaID)
@@ -831,6 +844,9 @@ func updateDatabaseInfoAndTableInfo(
831844
Tables: make(map[int64]bool),
832845
}
833846
case model.ActionDropSchema:
847+
for tableID := range databaseMap[event.SchemaID].Tables {
848+
delete(tableMap, tableID)
849+
}
834850
delete(databaseMap, event.SchemaID)
835851
case model.ActionCreateTable:
836852
createTable(event.SchemaID, event.TableID)

logservice/schemastore/persist_storage_test.go

+19
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,24 @@ func TestBuildVersionedTableInfoStore(t *testing.T) {
211211
require.Nil(t, err)
212212
}
213213

214+
// // create another table
215+
// tableID2 := tableID + 1
216+
// {
217+
// ddlEvent := PersistedDDLEvent{
218+
// Type: byte(model.ActionCreateTable),
219+
// SchemaID: schemaID,
220+
// TableID: tableID,
221+
// SchemaVersion: 3000,
222+
// TableInfo: &model.TableInfo{
223+
// ID: tableID,
224+
// Name: model.NewCIStr("t2"),
225+
// },
226+
// FinishedTs: renameVersion,
227+
// }
228+
// err = pStorage.handleSortedDDLEvents(ddlEvent)
229+
// require.Nil(t, err)
230+
// }
231+
214232
upperBound := UpperBoundMeta{
215233
FinishedDDLTs: 3000,
216234
SchemaVersion: 4000,
@@ -386,6 +404,7 @@ func TestHandleCreateDropSchemaTableDDL(t *testing.T) {
386404
pStorage.handleSortedDDLEvents(ddlEvent)
387405

388406
require.Equal(t, 0, len(pStorage.databaseMap))
407+
require.Equal(t, 0, len(pStorage.tableMap))
389408
require.Equal(t, 5, len(pStorage.tableTriggerDDLHistory))
390409
require.Equal(t, uint64(300), pStorage.tableTriggerDDLHistory[4])
391410
require.Equal(t, 3, len(pStorage.tablesDDLHistory))

logservice/schemastore/schema_store.go

+4
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,10 @@ func (s *schemaStore) GetAllPhysicalTables(snapTs uint64, filter filter.Filter)
189189
func (s *schemaStore) RegisterTable(tableID int64, startTs uint64) error {
190190
metrics.SchemaStoreResolvedRegisterTableGauge.Inc()
191191
s.waitResolvedTs(tableID, startTs, 5*time.Second)
192+
log.Info("register table",
193+
zap.Int64("tableID", tableID),
194+
zap.Uint64("startTs", startTs),
195+
zap.Uint64("resolvedTs", s.resolvedTs.Load()))
192196
return s.dataStorage.registerTable(tableID, startTs)
193197
}
194198

0 commit comments

Comments
 (0)