Skip to content

Commit

Permalink
Lock consistencyMtx outside locking chunkMtx (#5210)
Browse files Browse the repository at this point in the history
* Lock consistencyMtx outside locking chunkMtx

* Remove unnecessary RLock
  • Loading branch information
RangerCD authored Jan 26, 2022
1 parent d0c6e3d commit 87cbe50
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 101 deletions.
18 changes: 15 additions & 3 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,18 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
}

func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels, *sync.RWMutex) {
stream, ok := instance.streams.LoadByFP(fp)
var stream *stream
var ok bool
stream, ok = instance.streams.LoadByFP(fp)

if !ok {
return nil, nil, nil
}

var result []*chunkDesc
stream.chunkMtx.Lock()
defer stream.chunkMtx.Unlock()

var result []*chunkDesc
for j := range stream.chunks {
shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j])
if immediate || shouldFlush {
Expand Down Expand Up @@ -319,7 +322,16 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe
i.replayController.Sub(int64(subtracted))

if mayRemoveStream && len(stream.chunks) == 0 {
instance.removeStream(stream)
// Unlock first, then lock inside streams' lock to prevent deadlock
stream.chunkMtx.Unlock()
// Only lock streamsMap when it's needed to remove a stream
instance.streams.WithLock(func() {
stream.chunkMtx.Lock()
// Double check length
if len(stream.chunks) == 0 {
instance.removeStream(stream)
}
})
}
}

Expand Down
111 changes: 48 additions & 63 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,18 @@ func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runt
func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *logproto.Chunk) error {
fp := i.getHashForLabels(ls)

s, loaded, _ := i.streams.LoadOrStoreNewByFP(fp, func() (*stream, error) {
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(ls), fp)
return newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics), nil
})
if !loaded {
i.streamsCreatedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Inc()
i.addTailersToNewStream(s)
}
s, _, _ := i.streams.LoadOrStoreNewByFP(fp,
func() (*stream, error) {
s := i.createStreamByFP(ls, fp)
s.chunkMtx.Lock()
return s, nil
},
func(s *stream) error {
s.chunkMtx.Lock()
return nil
},
)
defer s.chunkMtx.Unlock()

err := s.consumeChunk(ctx, chunk)
if err == nil {
Expand All @@ -140,16 +143,32 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
defer recordPool.PutRecord(record)

var appendErr error
for _, s := range req.Streams {

err := i.executeWithGetOrCreateStream(s, record, func(stream *stream) error {
_, err := stream.Push(ctx, s.Entries, record, 0, false)
return err
})
for _, reqStream := range req.Streams {

s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels,
func() (*stream, error) {
s, err := i.createStream(reqStream, record)
// Lock before adding to maps
if err == nil {
s.chunkMtx.Lock()
}
return s, err
},
func(s *stream) error {
s.chunkMtx.Lock()
return nil
},
)
if err != nil {
appendErr = err
continue
}

_, err = s.Push(ctx, reqStream.Entries, record, 0, false)
if err != nil {
appendErr = err
}
s.chunkMtx.Unlock()
}

if !record.IsEmpty() {
Expand Down Expand Up @@ -241,62 +260,28 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord
return s, nil
}

func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *stream {
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(ls), fp)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)

i.streamsCreatedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Inc()
i.addTailersToNewStream(s)

return s
}

// getOrCreateStream returns the stream or creates it.
// It's safe to use this function if returned stream is not consistency sensitive to streamsMap(e.g. ingesterRecoverer),
// otherwise use streamsMap.LoadOrStoreNew with locking stream's chunkMtx inside.
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *WALRecord) (*stream, error) {
s, _, err := i.streams.LoadOrStoreNew(pushReqStream.Labels, func() (*stream, error) {
return i.createStream(pushReqStream, record)
})
}, nil)

return s, err
}

// executeWithGetOrCreateStream executes fn after find or create stream, with chunkMtx locked.
// Use this to keep Load and Delete consistency for streams while executing fn.
// If fn doesn't care about stream deletion from streams, use getOrCreateStream directly(e.g. ingesterRecoverer).
func (i *instance) executeWithGetOrCreateStream(pushReqStream logproto.Stream, record *WALRecord, fn func(*stream) error) error {
var s *stream
prev, loaded, err := i.streams.LoadOrStoreNew(pushReqStream.Labels, func() (*stream, error) {
prev, err := i.createStream(pushReqStream, record)
if err != nil {
return nil, err
}
// Lock before adding to maps
prev.chunkMtx.Lock()
return prev, nil
})
if err != nil {
return err
}
if !loaded {
defer prev.chunkMtx.Unlock()
s = prev
} else {
prev.chunkMtx.Lock()
defer prev.chunkMtx.Unlock()
// Double check with prev locked
s, loaded, err = i.streams.LoadOrStoreNew(pushReqStream.Labels, func() (*stream, error) {
s, err := i.createStream(pushReqStream, record)
if err != nil {
return nil, err
}
s.chunkMtx.Lock()
return s, nil
})
if err != nil {
return err
}
if prev != s {
if loaded {
// s is not created in this push request, lock it
s.chunkMtx.Lock()
}
defer s.chunkMtx.Unlock()
}
}

return fn(s)
}

// removeStream removes a stream from the instance.
func (i *instance) removeStream(s *stream) {
if i.streams.Delete(s) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,14 @@ func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.

// consumeChunk manually adds a chunk to the stream that was received during
// ingester chunk transfer.
// Must hold chunkMtx
// DEPRECATED: chunk transfers are no longer suggested and remain for compatibility.
func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
c, err := chunkenc.NewByteChunk(chunk.Data, s.cfg.BlockSize, s.cfg.TargetChunkSize)
if err != nil {
return err
}

s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()
s.chunks = append(s.chunks, chunkDesc{
chunk: c,
})
Expand Down
108 changes: 77 additions & 31 deletions pkg/ingester/streams_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,43 @@ import (
)

type streamsMap struct {
consistencyMtx sync.Mutex
streams *sync.Map // map[string]*stream
streamsByFP *sync.Map // map[model.Fingerprint]*stream
consistencyMtx sync.RWMutex // Keep read/write consistency between other fields
streams *sync.Map // map[string]*stream
streamsByFP *sync.Map // map[model.Fingerprint]*stream
streamsCounter *atomic.Int64
}

func newStreamsMap() *streamsMap {
return &streamsMap{
consistencyMtx: sync.Mutex{},
consistencyMtx: sync.RWMutex{},
streams: &sync.Map{},
streamsByFP: &sync.Map{},
streamsCounter: atomic.NewInt64(0),
}
}

// Load is lock-free. If usage of the stream is consistency sensitive, must be called inside WithRLock at least
func (m *streamsMap) Load(key string) (*stream, bool) {
return m.load(m.streams, key)
}

// LoadByFP is lock-free. If usage of the stream is consistency sensitive, must be called inside WithRLock at least
func (m *streamsMap) LoadByFP(fp model.Fingerprint) (*stream, bool) {
return m.load(m.streamsByFP, fp)
}

func (m *streamsMap) LoadOrStoreNew(key string, newStreamFn func() (*stream, error)) (*stream, bool, error) {
return m.loadOrStoreNew(m.streams, key, newStreamFn)
// Store must be called inside WithLock
func (m *streamsMap) Store(key string, s *stream) {
m.store(key, s)
}

func (m *streamsMap) LoadOrStoreNewByFP(fp model.Fingerprint, newStreamFn func() (*stream, error)) (*stream, bool, error) {
return m.loadOrStoreNew(m.streamsByFP, fp, newStreamFn)
// StoreByFP must be called inside WithLock
func (m *streamsMap) StoreByFP(fp model.Fingerprint, s *stream) {
m.store(fp, s)
}

// Delete must be called inside WithLock
func (m *streamsMap) Delete(s *stream) bool {
m.consistencyMtx.Lock()
defer m.consistencyMtx.Unlock()
_, loaded := m.streams.LoadAndDelete(s.labelsString)
if loaded {
m.streamsByFP.Delete(s.fp)
Expand All @@ -51,6 +54,32 @@ func (m *streamsMap) Delete(s *stream) bool {
return false
}

// LoadOrStoreNew already has lock inside, do NOT call inside WithLock or WithRLock
func (m *streamsMap) LoadOrStoreNew(key string, newStreamFn func() (*stream, error), postLoadFn func(*stream) error) (*stream, bool, error) {
return m.loadOrStoreNew(m.streams, key, newStreamFn, postLoadFn)
}

// LoadOrStoreNewByFP already has lock inside, do NOT call inside WithLock or WithRLock
func (m *streamsMap) LoadOrStoreNewByFP(fp model.Fingerprint, newStreamFn func() (*stream, error), postLoadFn func(*stream) error) (*stream, bool, error) {
return m.loadOrStoreNew(m.streamsByFP, fp, newStreamFn, postLoadFn)
}

// WithLock is a helper function to execute write operations
func (m *streamsMap) WithLock(fn func()) {
m.consistencyMtx.Lock()
defer m.consistencyMtx.Unlock()
fn()
}

// WithRLock is a helper function to execute consistency sensitive read operations.
// Generally, if a stream loaded from streamsMap will have its chunkMtx locked, chunkMtx.Lock is supposed to be called
// within this function.
func (m *streamsMap) WithRLock(fn func()) {
m.consistencyMtx.RLock()
defer m.consistencyMtx.RUnlock()
fn()
}

func (m *streamsMap) ForEach(fn func(s *stream) (bool, error)) error {
var c bool
var err error
Expand All @@ -72,32 +101,49 @@ func (m *streamsMap) load(mp *sync.Map, key interface{}) (*stream, bool) {
return nil, false
}

func (m *streamsMap) loadOrStoreNew(mp *sync.Map, key interface{}, newStreamFn func() (*stream, error)) (*stream, bool, error) {
s, ok := m.load(mp, key)

if ok {
return s, true, nil
}

m.consistencyMtx.Lock()
defer m.consistencyMtx.Unlock()
// Double check
s, ok = m.load(mp, key)

if ok {
return s, true, nil
}

s, err := newStreamFn()
if err != nil {
return nil, false, err
}
func (m *streamsMap) store(key interface{}, s *stream) {
if labelsString, ok := key.(string); ok {
m.streams.Store(labelsString, s)
} else {
m.streams.Store(s.labelsString, s)
}
m.streamsByFP.Store(s.fp, s)
m.streamsCounter.Inc()
return s, false, nil
}

// newStreamFn: Called if not loaded, with consistencyMtx locked. Must not be nil
// postLoadFn: Called if loaded, with consistencyMtx read-locked at least. Can be nil
func (m *streamsMap) loadOrStoreNew(mp *sync.Map, key interface{}, newStreamFn func() (*stream, error), postLoadFn func(*stream) error) (*stream, bool, error) {
var s *stream
var loaded bool
var err error
m.WithRLock(func() {
if s, loaded = m.load(mp, key); loaded {
if postLoadFn != nil {
err = postLoadFn(s)
}
}
})

if loaded {
return s, true, err
}

m.WithLock(func() {
// Double check
if s, loaded = m.load(mp, key); loaded {
if postLoadFn != nil {
err = postLoadFn(s)
}
return
}

s, err = newStreamFn()
if err != nil {
return
}
m.store(key, s)
})

return s, loaded, err
}
Loading

0 comments on commit 87cbe50

Please sign in to comment.