Skip to content

Commit e47554b

Browse files
authored
fix fail to get table info (#339)
1 parent 8396c53 commit e47554b

File tree

4 files changed

+37
-42
lines changed

4 files changed

+37
-42
lines changed

logservice/schemastore/disk_format.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@ func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int6
224224
tablesInKVSnap[tbNameInfo.ID] = &BasicTableInfo{
225225
SchemaID: table_info_entry.SchemaID,
226226
Name: tbNameInfo.Name.O,
227-
InKVSnap: true,
228227
}
229228
}
230229

@@ -294,6 +293,9 @@ func readTableInfoInKVSnap(snap *pebble.Snapshot, tableID int64, version uint64)
294293
log.Fatal("generate table info failed", zap.Error(err))
295294
}
296295
value, closer, err := snap.Get(targetKey)
296+
if err == pebble.ErrNotFound {
297+
return nil
298+
}
297299
if err != nil {
298300
log.Fatal("get table info failed", zap.Error(err))
299301
}
@@ -432,7 +434,6 @@ func writeSchemaSnapshotAndMeta(
432434
tablesInKVSnap[tableID] = &BasicTableInfo{
433435
SchemaID: dbInfo.ID,
434436
Name: tableName,
435-
InKVSnap: true,
436437
}
437438
tables[tableID] = true
438439
} else {

logservice/schemastore/persist_storage.go

+6-16
Original file line numberDiff line numberDiff line change
@@ -410,19 +410,12 @@ func (p *persistentStorage) buildVersionedTableInfoStore(
410410

411411
p.mu.RLock()
412412
kvSnapVersion := p.gcTs
413-
tableBasicInfo, ok := p.tableMap[tableID]
414-
if !ok {
415-
log.Panic("table not found", zap.Int64("tableID", tableID))
416-
}
417-
inKVSnap := tableBasicInfo.InKVSnap
418413
var allDDLFinishedTs []uint64
419414
allDDLFinishedTs = append(allDDLFinishedTs, p.tablesDDLHistory[tableID]...)
420415
p.mu.RUnlock()
421416

422-
if inKVSnap {
423-
if err := addTableInfoFromKVSnap(store, kvSnapVersion, storageSnap); err != nil {
424-
return err
425-
}
417+
if err := addTableInfoFromKVSnap(store, kvSnapVersion, storageSnap); err != nil {
418+
return err
426419
}
427420

428421
for _, version := range allDDLFinishedTs {
@@ -439,8 +432,10 @@ func addTableInfoFromKVSnap(
439432
snap *pebble.Snapshot,
440433
) error {
441434
tableInfo := readTableInfoInKVSnap(snap, store.getTableID(), kvSnapVersion)
442-
tableInfo.InitPreSQLs()
443-
store.addInitialTableInfo(tableInfo)
435+
if tableInfo != nil {
436+
tableInfo.InitPreSQLs()
437+
store.addInitialTableInfo(tableInfo)
438+
}
444439
return nil
445440
}
446441

@@ -506,10 +501,6 @@ func (p *persistentStorage) cleanObseleteDataInMemory(gcTs uint64, tablesInKVSna
506501
defer p.mu.Unlock()
507502
p.gcTs = gcTs
508503

509-
for tableID := range tablesInKVSnap {
510-
p.tableMap[tableID].InKVSnap = true
511-
}
512-
513504
// clean tablesDDLHistory
514505
for tableID := range p.tablesDDLHistory {
515506
if _, ok := tablesInKVSnap[tableID]; !ok {
@@ -830,7 +821,6 @@ func updateDatabaseInfoAndTableInfo(
830821
tableMap[tableID] = &BasicTableInfo{
831822
SchemaID: schemaID,
832823
Name: event.TableInfo.Name.O,
833-
InKVSnap: false,
834824
}
835825
}
836826

logservice/schemastore/persist_storage_test.go

+28-23
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ func mockWriteKVSnapOnDisk(db *pebble.DB, snapTs uint64, dbInfos map[int64]*mode
8989
tablesInKVSnap[tableInfo.ID] = &BasicTableInfo{
9090
SchemaID: dbInfo.ID,
9191
Name: tableInfo.Name.O,
92-
InKVSnap: true,
9392
}
9493
tableInfoValue, err := json.Marshal(tableInfo)
9594
if err != nil {
@@ -211,23 +210,24 @@ func TestBuildVersionedTableInfoStore(t *testing.T) {
211210
require.Nil(t, err)
212211
}
213212

214-
// // create another table
215-
// tableID2 := tableID + 1
216-
// {
217-
// ddlEvent := PersistedDDLEvent{
218-
// Type: byte(model.ActionCreateTable),
219-
// SchemaID: schemaID,
220-
// TableID: tableID,
221-
// SchemaVersion: 3000,
222-
// TableInfo: &model.TableInfo{
223-
// ID: tableID,
224-
// Name: model.NewCIStr("t2"),
225-
// },
226-
// FinishedTs: renameVersion,
227-
// }
228-
// err = pStorage.handleSortedDDLEvents(ddlEvent)
229-
// require.Nil(t, err)
230-
// }
213+
// create another table
214+
tableID2 := tableID + 1
215+
createVersion := renameVersion + 200
216+
{
217+
ddlEvent := PersistedDDLEvent{
218+
Type: byte(model.ActionCreateTable),
219+
SchemaID: schemaID,
220+
TableID: tableID2,
221+
SchemaVersion: 3500,
222+
TableInfo: &model.TableInfo{
223+
ID: tableID2,
224+
Name: model.NewCIStr("t3"),
225+
},
226+
FinishedTs: createVersion,
227+
}
228+
err = pStorage.handleSortedDDLEvents(ddlEvent)
229+
require.Nil(t, err)
230+
}
231231

232232
upperBound := UpperBoundMeta{
233233
FinishedDDLTs: 3000,
@@ -263,6 +263,16 @@ func TestBuildVersionedTableInfoStore(t *testing.T) {
263263
require.Nil(t, err)
264264
require.Equal(t, "t3", tableInfo3.Name.O)
265265
}
266+
267+
{
268+
store := newEmptyVersionedTableInfoStore(tableID2)
269+
pStorage.buildVersionedTableInfoStore(store)
270+
require.Equal(t, 1, len(store.infos))
271+
tableInfo, err := store.getTableInfo(createVersion)
272+
require.Nil(t, err)
273+
require.Equal(t, "t3", tableInfo.Name.O)
274+
require.Equal(t, tableID2, tableInfo.ID)
275+
}
266276
}
267277

268278
func TestHandleCreateDropSchemaTableDDL(t *testing.T) {
@@ -453,7 +463,6 @@ func TestHandleRenameTable(t *testing.T) {
453463
require.Equal(t, 2, len(pStorage.databaseMap))
454464
require.Equal(t, 1, len(pStorage.databaseMap[schemaID1].Tables))
455465
require.Equal(t, 0, len(pStorage.databaseMap[schemaID2].Tables))
456-
require.Equal(t, false, pStorage.tableMap[tableID].InKVSnap)
457466
require.Equal(t, schemaID1, pStorage.tableMap[tableID].SchemaID)
458467
require.Equal(t, "t1", pStorage.tableMap[tableID].Name)
459468
}
@@ -475,7 +484,6 @@ func TestHandleRenameTable(t *testing.T) {
475484
require.Equal(t, 2, len(pStorage.databaseMap))
476485
require.Equal(t, 0, len(pStorage.databaseMap[schemaID1].Tables))
477486
require.Equal(t, 1, len(pStorage.databaseMap[schemaID2].Tables))
478-
require.Equal(t, false, pStorage.tableMap[tableID].InKVSnap)
479487
require.Equal(t, schemaID2, pStorage.tableMap[tableID].SchemaID)
480488
require.Equal(t, "t2", pStorage.tableMap[tableID].Name)
481489
}
@@ -883,13 +891,11 @@ func TestGC(t *testing.T) {
883891

884892
require.Equal(t, 3, len(pStorage.tableTriggerDDLHistory))
885893
require.Equal(t, 3, len(pStorage.tablesDDLHistory))
886-
require.Equal(t, false, pStorage.tableMap[tableID3].InKVSnap)
887894
pStorage.cleanObseleteDataInMemory(newGcTs, tablesInKVSnap)
888895
require.Equal(t, 1, len(pStorage.tableTriggerDDLHistory))
889896
require.Equal(t, uint64(605), pStorage.tableTriggerDDLHistory[0])
890897
require.Equal(t, 1, len(pStorage.tablesDDLHistory))
891898
require.Equal(t, 1, len(pStorage.tablesDDLHistory[tableID1]))
892-
require.Equal(t, true, pStorage.tableMap[tableID3].InKVSnap)
893899
tableInfoT1, err := pStorage.getTableInfo(tableID1, newGcTs)
894900
require.Nil(t, err)
895901
require.Equal(t, "t1", tableInfoT1.Name.O)
@@ -906,7 +912,6 @@ func TestGC(t *testing.T) {
906912
require.Equal(t, uint64(605), pStorage.tableTriggerDDLHistory[0])
907913
require.Equal(t, 1, len(pStorage.tablesDDLHistory))
908914
require.Equal(t, 1, len(pStorage.tablesDDLHistory[tableID1]))
909-
require.Equal(t, true, pStorage.tableMap[tableID3].InKVSnap)
910915
}
911916

912917
// TODO: test obsolete data can be removed

logservice/schemastore/types.go

-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ type BasicDatabaseInfo struct {
7878
type BasicTableInfo struct {
7979
SchemaID int64
8080
Name string
81-
InKVSnap bool
8281
}
8382

8483
//msgp:ignore DDLJobWithCommitTs

0 commit comments

Comments
 (0)