Skip to content

Commit

Permalink
remove empty streams after wal replay (#4265)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Sep 7, 2021
1 parent 58219ff commit 1c90b9c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 13 deletions.
6 changes: 1 addition & 5 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe
i.replayController.Sub(int64(subtracted))

if mayRemoveStream && len(stream.chunks) == 0 {
delete(instance.streamsByFP, stream.fp)
delete(instance.streams, stream.labelsString)
instance.index.Delete(stream.labels, stream.fp)
instance.streamsRemovedTotal.Inc()
memoryStreams.WithLabelValues(instance.instanceID).Dec()
instance.removeStream(stream)
}
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,15 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
return stream, nil
}

// removeStream removes a stream from the instance. The streamsMtx must be held.
func (i *instance) removeStream(s *stream) {
delete(i.streamsByFP, s.fp)
delete(i.streams, s.labelsString)
i.index.Delete(s.labels, s.fp)
i.streamsRemovedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Dec()
}

func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint {
var fp uint64
fp, i.buf = ls.HashWithoutLabels(i.buf, []string(nil)...)
Expand Down
19 changes: 11 additions & 8 deletions pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,17 @@ func (r *ingesterRecoverer) Close() {

for _, inst := range r.ing.getInstances() {
inst.forAllStreams(context.Background(), func(s *stream) error {
s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()

// reset all the incrementing stream counters after a successful WAL replay.
s.resetCounter()

if len(s.chunks) == 0 {
inst.removeStream(s)
return nil
}

// If we've replayed a WAL with unordered writes, but the new
// configuration disables them, convert all streams/head blocks
// to ensure unordered writes are disabled after the replay,
Expand All @@ -232,14 +240,9 @@ func (r *ingesterRecoverer) Close() {
s.unorderedWrites = isAllowed

if !isAllowed && old {

s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()
if len(s.chunks) > 0 {
err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(isAllowed))
if err != nil {
return err
}
err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(isAllowed))
if err != nil {
return err
}
}

Expand Down

0 comments on commit 1c90b9c

Please sign in to comment.