diff --git a/pkg/storage/stores/shipper/util/queries.go b/pkg/storage/stores/shipper/util/queries.go index 3085497f76e60..1150e2086f14c 100644 --- a/pkg/storage/stores/shipper/util/queries.go +++ b/pkg/storage/stores/shipper/util/queries.go @@ -58,7 +58,7 @@ func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries [ type IndexDeduper struct { callback chunk_util.Callback seenRangeValues map[string]map[string]struct{} - mtx sync.Mutex + mtx sync.RWMutex } func NewIndexDeduper(callback chunk_util.Callback) *IndexDeduper { @@ -77,19 +77,32 @@ func (i *IndexDeduper) Callback(query chunk.IndexQuery, batch chunk.ReadBatch) b } func (i *IndexDeduper) isSeen(hashValue string, rangeValue []byte) bool { - i.mtx.Lock() - defer i.mtx.Unlock() + i.mtx.RLock() // index entries are never modified during query processing so it should be safe to reference a byte slice as a string. rangeValueStr := yoloString(rangeValue) - if _, ok := i.seenRangeValues[hashValue]; !ok { - i.seenRangeValues[hashValue] = map[string]struct{}{} + + if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok { + i.mtx.RUnlock() + return true } + i.mtx.RUnlock() + + i.mtx.Lock() + defer i.mtx.Unlock() + + // re-check if another concurrent call added the values already, if so do not add it again and return true if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok { return true } + // add the hashValue first if missing + if _, ok := i.seenRangeValues[hashValue]; !ok { + i.seenRangeValues[hashValue] = map[string]struct{}{} + } + + // add the rangeValue i.seenRangeValues[hashValue][rangeValueStr] = struct{}{} return false }