-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix potential deadlock in the table manager #5472
Changes from 1 commit
1b7c0f9
ea2e94d
8069e39
72778af
13eea15
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -153,31 +153,27 @@ func (tm *TableManager) query(ctx context.Context, tableName string, queries []c | |
} | ||
|
||
func (tm *TableManager) getOrCreateTable(tableName string) (*Table, error) { | ||
// if table is already there, use it. | ||
tm.tablesMtx.RLock() | ||
defer tm.tablesMtx.RUnlock() | ||
|
||
// if table is already there, use it. | ||
table, ok := tm.tables[tableName] | ||
tm.tablesMtx.RUnlock() | ||
if ok { | ||
return table, nil | ||
} | ||
|
||
if !ok { | ||
tm.tablesMtx.Lock() | ||
// check if some other competing goroutine got the lock before us and created the table, use it if so. | ||
table, ok = tm.tables[tableName] | ||
if !ok { | ||
// table not found, creating one. | ||
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloading all files for table %s", tableName)) | ||
|
||
tablePath := filepath.Join(tm.cfg.CacheDir, tableName) | ||
err := chunk_util.EnsureDirectory(tablePath) | ||
if err != nil { | ||
return nil, err | ||
} | ||
// table not found, creating one. | ||
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloading all files for table %s", tableName)) | ||
|
||
table = NewTable(tableName, filepath.Join(tm.cfg.CacheDir, tableName), tm.indexStorageClient, tm.boltIndexClient, tm.metrics) | ||
tm.tables[tableName] = table | ||
} | ||
tm.tablesMtx.Unlock() | ||
tablePath := filepath.Join(tm.cfg.CacheDir, tableName) | ||
err := chunk_util.EnsureDirectory(tablePath) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
table = NewTable(tableName, filepath.Join(tm.cfg.CacheDir, tableName), tm.indexStorageClient, tm.boltIndexClient, tm.metrics) | ||
tm.tables[tableName] = table | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't you need to write lock here for this ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I missed that we're using |
||
|
||
return table, nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |
"context" | ||
"fmt" | ||
"math" | ||
"os" | ||
"path/filepath" | ||
"testing" | ||
"time" | ||
|
@@ -35,29 +36,50 @@ func buildTestTableManager(t *testing.T, path string) (*TableManager, stopFunc) | |
} | ||
|
||
func TestTableManager_QueryPages(t *testing.T) { | ||
tempDir := t.TempDir() | ||
|
||
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) | ||
t.Run("QueryPages", func(t *testing.T) { | ||
tempDir := t.TempDir() | ||
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) | ||
|
||
var queries []chunk.IndexQuery | ||
for i, name := range []string{"table1", "table2"} { | ||
testutil.SetupTable(t, filepath.Join(objectStoragePath, name), testutil.DBsConfig{ | ||
NumUnCompactedDBs: 5, | ||
DBRecordsStart: i * 1000, | ||
}, testutil.PerUserDBsConfig{ | ||
DBsConfig: testutil.DBsConfig{ | ||
var queries []chunk.IndexQuery | ||
for i, name := range []string{"table1", "table2"} { | ||
testutil.SetupTable(t, filepath.Join(objectStoragePath, name), testutil.DBsConfig{ | ||
NumUnCompactedDBs: 5, | ||
DBRecordsStart: i*1000 + 500, | ||
}, | ||
NumUsers: 1, | ||
}) | ||
queries = append(queries, chunk.IndexQuery{TableName: name}) | ||
} | ||
DBRecordsStart: i * 1000, | ||
}, testutil.PerUserDBsConfig{ | ||
DBsConfig: testutil.DBsConfig{ | ||
NumUnCompactedDBs: 5, | ||
DBRecordsStart: i*1000 + 500, | ||
}, | ||
NumUsers: 1, | ||
}) | ||
queries = append(queries, chunk.IndexQuery{TableName: name}) | ||
} | ||
|
||
tableManager, stopFunc := buildTestTableManager(t, tempDir) | ||
defer stopFunc() | ||
tableManager, stopFunc := buildTestTableManager(t, tempDir) | ||
defer stopFunc() | ||
|
||
testutil.TestMultiTableQuery(t, testutil.BuildUserID(0), queries, tableManager, 0, 2000) | ||
testutil.TestMultiTableQuery(t, testutil.BuildUserID(0), queries, tableManager, 0, 2000) | ||
}) | ||
|
||
t.Run("it doesn't deadlock when table create fails", func(t *testing.T) { | ||
tempDir := os.TempDir() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a nit, lets use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I pushed this change and resolved the merge conflict because I wanted to cut a new release with this fix. |
||
|
||
// This file forces chunk_util.EnsureDirectory to fail. Any write error would cause this | ||
// deadlock | ||
f, err := os.CreateTemp(filepath.Join(tempDir, "cache"), "not-a-directory") | ||
require.NoError(t, err) | ||
badTable := filepath.Base(f.Name()) | ||
|
||
tableManager, stopFunc := buildTestTableManager(t, tempDir) | ||
defer stopFunc() | ||
|
||
err = tableManager.query(context.Background(), badTable, nil, nil) | ||
require.Error(t, err) | ||
|
||
// This one deadlocks without the fix | ||
err = tableManager.query(context.Background(), badTable, nil, nil) | ||
require.Error(t, err) | ||
}) | ||
} | ||
|
||
func TestTableManager_cleanupCache(t *testing.T) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're fixing a deadlock and introducing a new one. We use
RLock
first as a lower-cost way to check if the table exists, but mustRUnlock
it after (the previous PR did this). We cannotdefer
it here because we need toLock
it later if the table wasn't found. I think we can introduce a 1 line fix:defer tm.tablesMtx.Unlock()
immediately after the write lock is acquired, which will ensure we release the write lock correctly in all cases it's acquired.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I missed the
Rlock
vs.Lock
The latest commit just defers
tm.tablesMtx.Unlock()
after it's created.