diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index e942a53aec066..7cc3d0452b15f 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -248,15 +248,18 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat } func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels, *sync.RWMutex) { - stream, ok := instance.streams.LoadByFP(fp) + var stream *stream + var ok bool + stream, ok = instance.streams.LoadByFP(fp) if !ok { return nil, nil, nil } - var result []*chunkDesc stream.chunkMtx.Lock() defer stream.chunkMtx.Unlock() + + var result []*chunkDesc for j := range stream.chunks { shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j]) if immediate || shouldFlush { @@ -319,7 +322,16 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe i.replayController.Sub(int64(subtracted)) if mayRemoveStream && len(stream.chunks) == 0 { - instance.removeStream(stream) + // Unlock first, then lock inside streams' lock to prevent deadlock + stream.chunkMtx.Unlock() + // Only lock streamsMap when it's needed to remove a stream + instance.streams.WithLock(func() { + stream.chunkMtx.Lock() + // Double check length + if len(stream.chunks) == 0 { + instance.removeStream(stream) + } + }) } } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 40dc5f25ec187..fbfb2c37db5f7 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -116,15 +116,18 @@ func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runt func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *logproto.Chunk) error { fp := i.getHashForLabels(ls) - s, loaded, _ := i.streams.LoadOrStoreNewByFP(fp, func() (*stream, error) { - sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(ls), fp) - return newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics), nil - }) - if !loaded { - i.streamsCreatedTotal.Inc() - memoryStreams.WithLabelValues(i.instanceID).Inc() - i.addTailersToNewStream(s) - } + s, _, _ := i.streams.LoadOrStoreNewByFP(fp, + func() (*stream, error) { + s := i.createStreamByFP(ls, fp) + s.chunkMtx.Lock() + return s, nil + }, + func(s *stream) error { + s.chunkMtx.Lock() + return nil + }, + ) + defer s.chunkMtx.Unlock() err := s.consumeChunk(ctx, chunk) if err == nil { @@ -140,16 +143,32 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { defer recordPool.PutRecord(record) var appendErr error - for _, s := range req.Streams { - - err := i.executeWithGetOrCreateStream(s, record, func(stream *stream) error { - _, err := stream.Push(ctx, s.Entries, record, 0, false) - return err - }) + for _, reqStream := range req.Streams { + + s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels, + func() (*stream, error) { + s, err := i.createStream(reqStream, record) + // Lock before adding to maps + if err == nil { + s.chunkMtx.Lock() + } + return s, err + }, + func(s *stream) error { + s.chunkMtx.Lock() + return nil + }, + ) if err != nil { appendErr = err continue } + + _, err = s.Push(ctx, reqStream.Entries, record, 0, false) + if err != nil { + appendErr = err + } + s.chunkMtx.Unlock() } if !record.IsEmpty() { @@ -241,62 +260,28 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord return s, nil } +func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *stream { + sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(ls), fp) + s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics) + + i.streamsCreatedTotal.Inc() + memoryStreams.WithLabelValues(i.instanceID).Inc() + i.addTailersToNewStream(s) + + return s +} + // getOrCreateStream returns the stream or creates it. +// It's safe to use this function if returned stream is not consistency sensitive to streamsMap(e.g. ingesterRecoverer), +// otherwise use streamsMap.LoadOrStoreNew with locking stream's chunkMtx inside. func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *WALRecord) (*stream, error) { s, _, err := i.streams.LoadOrStoreNew(pushReqStream.Labels, func() (*stream, error) { return i.createStream(pushReqStream, record) - }) + }, nil) return s, err } -// executeWithGetOrCreateStream executes fn after find or create stream, with chunkMtx locked. -// Use this to keep Load and Delete consistency for streams while executing fn. -// If fn doesn't care about stream deletion from streams, use getOrCreateStream directly(e.g. ingesterRecoverer). -func (i *instance) executeWithGetOrCreateStream(pushReqStream logproto.Stream, record *WALRecord, fn func(*stream) error) error { - var s *stream - prev, loaded, err := i.streams.LoadOrStoreNew(pushReqStream.Labels, func() (*stream, error) { - prev, err := i.createStream(pushReqStream, record) - if err != nil { - return nil, err - } - // Lock before adding to maps - prev.chunkMtx.Lock() - return prev, nil - }) - if err != nil { - return err - } - if !loaded { - defer prev.chunkMtx.Unlock() - s = prev - } else { - prev.chunkMtx.Lock() - defer prev.chunkMtx.Unlock() - // Double check with prev locked - s, loaded, err = i.streams.LoadOrStoreNew(pushReqStream.Labels, func() (*stream, error) { - s, err := i.createStream(pushReqStream, record) - if err != nil { - return nil, err - } - s.chunkMtx.Lock() - return s, nil - }) - if err != nil { - return err - } - if prev != s { - if loaded { - // s is not created in this push request, lock it - s.chunkMtx.Lock() - } - defer s.chunkMtx.Unlock() - } - } - - return fn(s) -} - // removeStream removes a stream from the instance. func (i *instance) removeStream(s *stream) { if i.streams.Delete(s) { diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 287331d216592..55a881f5685d2 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -129,6 +129,7 @@ func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model. // consumeChunk manually adds a chunk to the stream that was received during // ingester chunk transfer. +// Must hold chunkMtx // DEPRECATED: chunk transfers are no longer suggested and remain for compatibility. func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { c, err := chunkenc.NewByteChunk(chunk.Data, s.cfg.BlockSize, s.cfg.TargetChunkSize) @@ -136,8 +137,6 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { return err } - s.chunkMtx.Lock() - defer s.chunkMtx.Unlock() s.chunks = append(s.chunks, chunkDesc{ chunk: c, }) diff --git a/pkg/ingester/streams_map.go b/pkg/ingester/streams_map.go index 270d6f523e2fa..6a19dc9281bd9 100644 --- a/pkg/ingester/streams_map.go +++ b/pkg/ingester/streams_map.go @@ -8,40 +8,43 @@ import ( ) type streamsMap struct { - consistencyMtx sync.Mutex - streams *sync.Map // map[string]*stream - streamsByFP *sync.Map // map[model.Fingerprint]*stream + consistencyMtx sync.RWMutex // Keep read/write consistency between other fields + streams *sync.Map // map[string]*stream + streamsByFP *sync.Map // map[model.Fingerprint]*stream streamsCounter *atomic.Int64 } func newStreamsMap() *streamsMap { return &streamsMap{ - consistencyMtx: sync.Mutex{}, + consistencyMtx: sync.RWMutex{}, streams: &sync.Map{}, streamsByFP: &sync.Map{}, streamsCounter: atomic.NewInt64(0), } } +// Load is lock-free. If usage of the stream is consistency sensitive, must be called inside WithRLock at least func (m *streamsMap) Load(key string) (*stream, bool) { return m.load(m.streams, key) } +// LoadByFP is lock-free. If usage of the stream is consistency sensitive, must be called inside WithRLock at least func (m *streamsMap) LoadByFP(fp model.Fingerprint) (*stream, bool) { return m.load(m.streamsByFP, fp) } -func (m *streamsMap) LoadOrStoreNew(key string, newStreamFn func() (*stream, error)) (*stream, bool, error) { - return m.loadOrStoreNew(m.streams, key, newStreamFn) +// Store must be called inside WithLock +func (m *streamsMap) Store(key string, s *stream) { + m.store(key, s) } -func (m *streamsMap) LoadOrStoreNewByFP(fp model.Fingerprint, newStreamFn func() (*stream, error)) (*stream, bool, error) { - return m.loadOrStoreNew(m.streamsByFP, fp, newStreamFn) +// StoreByFP must be called inside WithLock +func (m *streamsMap) StoreByFP(fp model.Fingerprint, s *stream) { + m.store(fp, s) } +// Delete must be called inside WithLock func (m *streamsMap) Delete(s *stream) bool { - m.consistencyMtx.Lock() - defer m.consistencyMtx.Unlock() _, loaded := m.streams.LoadAndDelete(s.labelsString) if loaded { m.streamsByFP.Delete(s.fp) @@ -51,6 +54,32 @@ func (m *streamsMap) Delete(s *stream) bool { return false } +// LoadOrStoreNew already has lock inside, do NOT call inside WithLock or WithRLock +func (m *streamsMap) LoadOrStoreNew(key string, newStreamFn func() (*stream, error), postLoadFn func(*stream) error) (*stream, bool, error) { + return m.loadOrStoreNew(m.streams, key, newStreamFn, postLoadFn) +} + +// LoadOrStoreNewByFP already has lock inside, do NOT call inside WithLock or WithRLock +func (m *streamsMap) LoadOrStoreNewByFP(fp model.Fingerprint, newStreamFn func() (*stream, error), postLoadFn func(*stream) error) (*stream, bool, error) { + return m.loadOrStoreNew(m.streamsByFP, fp, newStreamFn, postLoadFn) +} + +// WithLock is a helper function to execute write operations +func (m *streamsMap) WithLock(fn func()) { + m.consistencyMtx.Lock() + defer m.consistencyMtx.Unlock() + fn() +} + +// WithRLock is a helper function to execute consistency sensitive read operations. +// Generally, if a stream loaded from streamsMap will have its chunkMtx locked, chunkMtx.Lock is supposed to be called +// within this function. +func (m *streamsMap) WithRLock(fn func()) { + m.consistencyMtx.RLock() + defer m.consistencyMtx.RUnlock() + fn() +} + func (m *streamsMap) ForEach(fn func(s *stream) (bool, error)) error { var c bool var err error @@ -72,26 +101,7 @@ func (m *streamsMap) load(mp *sync.Map, key interface{}) (*stream, bool) { return nil, false } -func (m *streamsMap) loadOrStoreNew(mp *sync.Map, key interface{}, newStreamFn func() (*stream, error)) (*stream, bool, error) { - s, ok := m.load(mp, key) - - if ok { - return s, true, nil - } - - m.consistencyMtx.Lock() - defer m.consistencyMtx.Unlock() - // Double check - s, ok = m.load(mp, key) - - if ok { - return s, true, nil - } - - s, err := newStreamFn() - if err != nil { - return nil, false, err - } +func (m *streamsMap) store(key interface{}, s *stream) { if labelsString, ok := key.(string); ok { m.streams.Store(labelsString, s) } else { @@ -99,5 +109,41 @@ func (m *streamsMap) loadOrStoreNew(mp *sync.Map, key interface{}, newStreamFn f } m.streamsByFP.Store(s.fp, s) m.streamsCounter.Inc() - return s, false, nil +} + +// newStreamFn: Called if not loaded, with consistencyMtx locked. Must not be nil +// postLoadFn: Called if loaded, with consistencyMtx read-locked at least. Can be nil +func (m *streamsMap) loadOrStoreNew(mp *sync.Map, key interface{}, newStreamFn func() (*stream, error), postLoadFn func(*stream) error) (*stream, bool, error) { + var s *stream + var loaded bool + var err error + m.WithRLock(func() { + if s, loaded = m.load(mp, key); loaded { + if postLoadFn != nil { + err = postLoadFn(s) + } + } + }) + + if loaded { + return s, true, err + } + + m.WithLock(func() { + // Double check + if s, loaded = m.load(mp, key); loaded { + if postLoadFn != nil { + err = postLoadFn(s) + } + return + } + + s, err = newStreamFn() + if err != nil { + return + } + m.store(key, s) + }) + + return s, loaded, err } diff --git a/pkg/ingester/streams_map_test.go b/pkg/ingester/streams_map_test.go index fddefc050d400..d6bdcdda79e79 100644 --- a/pkg/ingester/streams_map_test.go +++ b/pkg/ingester/streams_map_test.go @@ -52,16 +52,18 @@ func TestStreamsMap(t *testing.T) { require.Nil(t, s) require.False(t, loaded) + // Test LoadOrStoreNew s, loaded, err = streams.LoadOrStoreNew(ss[0].labelsString, func() (*stream, error) { return ss[0], nil - }) + }, nil) require.Equal(t, s, ss[0]) require.False(t, loaded) require.Nil(t, err) + // Test LoadOrStoreNewByFP s, loaded, err = streams.LoadOrStoreNewByFP(ss[1].fp, func() (*stream, error) { return ss[1], nil - }) + }, nil) require.Equal(t, s, ss[1]) require.False(t, loaded) require.Nil(t, err) @@ -78,6 +80,7 @@ func TestStreamsMap(t *testing.T) { require.True(t, loaded) } + // Test Delete for _, st := range ss { deleted := streams.Delete(st) require.True(t, deleted) @@ -90,4 +93,30 @@ func TestStreamsMap(t *testing.T) { require.Nil(t, s) require.False(t, loaded) } + + require.Equal(t, 0, streams.Len()) + + // Test Store + streams.Store(ss[0].labelsString, ss[0]) + + s, loaded = streams.Load(ss[0].labelsString) + require.Equal(t, ss[0], s) + require.True(t, loaded) + + s, loaded = streams.LoadByFP(ss[0].fp) + require.Equal(t, ss[0], s) + require.True(t, loaded) + + // Test StoreByFP + streams.StoreByFP(ss[1].fp, ss[1]) + + s, loaded = streams.Load(ss[1].labelsString) + require.Equal(t, ss[1], s) + require.True(t, loaded) + + s, loaded = streams.LoadByFP(ss[1].fp) + require.Equal(t, ss[1], s) + require.True(t, loaded) + + require.Equal(t, len(ss), streams.Len()) }