Skip to content

Commit

Permalink
Refine ingester locks: eliminate smell & allow concurrent ingester qu…
Browse files Browse the repository at this point in the history
…ery (#120)

Signed-off-by: alei <rayingecho@gmail.com>

Fix chunk retain period

Signed-off-by: alei <rayingecho@gmail.com>

Fix undefined symbol

Signed-off-by: alei <rayingecho@gmail.com>
  • Loading branch information
aylei authored and tomwilkie committed Dec 16, 2018
1 parent 2267992 commit 251017e
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 55 deletions.
77 changes: 41 additions & 36 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,20 @@ func (o *flushOp) Priority() int64 {

// sweepUsers periodically schedules series for flushing and garbage collects users with no series
func (i *Ingester) sweepUsers(immediate bool) {
i.instancesMtx.RLock()
instances := make([]*instance, 0, len(i.instances))
for _, instance := range i.instances {
instances = append(instances, instance)
}
i.instancesMtx.RUnlock()
instances := i.getInstances()

for _, instance := range instances {
instance.streamsMtx.Lock()
for _, stream := range instance.streams {
i.sweepStream(instance, stream, immediate)
i.removeFlushedChunks(instance, stream)
}
instance.streamsMtx.Unlock()
i.sweepInstance(instance, immediate)
}
}

func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
instance.streamsMtx.Lock()
defer instance.streamsMtx.Unlock()

for _, stream := range instance.streams {
i.sweepStream(instance, stream, immediate)
i.removeFlushedChunks(instance, stream)
}
}

Expand Down Expand Up @@ -123,23 +123,43 @@ func (i *Ingester) flushLoop(j int) {
}

func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
i.instancesMtx.Lock()
instance, ok := i.instances[userID]
i.instancesMtx.Unlock()
instance, ok := i.getInstanceByID(userID)
if !ok {
return nil
}

chunks, labels := i.collectChunksToFlush(instance, fp, immediate)
if len(chunks) < 1 {
return nil
}

ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
err := i.flushChunks(ctx, fp, labels, chunks)
if err != nil {
return err
}

instance.streamsMtx.Lock()
for _, chunk := range chunks {
chunk.flushed = time.Now()
}
instance.streamsMtx.Unlock()
return nil
}

func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, []client.LabelPair) {
instance.streamsMtx.Lock()
defer instance.streamsMtx.Unlock()

stream, ok := instance.streams[fp]
if !ok {
instance.streamsMtx.Unlock()
return nil
return nil, nil
}

if len(stream.chunks) < 2 && !immediate {
instance.streamsMtx.Unlock()
return nil
return nil, nil
}

var chunks []*chunkDesc
Expand All @@ -152,35 +172,20 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
if !stream.chunks[i].closed {
stream.chunks[i].closed = true
}

// Flush this chunk if it hasn't already been successfully flushed.
if stream.chunks[i].flushed.IsZero() {
chunks = append(chunks, &stream.chunks[i])
}
}
instance.streamsMtx.Unlock()

ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
err := i.flushChunks(ctx, fp, stream.labels, chunks)
if err != nil {
return err
}

instance.streamsMtx.Lock()
for _, chunk := range chunks {
chunk.flushed = time.Now()
}
instance.streamsMtx.Unlock()
return nil
return chunks, stream.labels
}

func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
now := time.Now()

for len(stream.chunks) > 0 {
if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) > i.cfg.RetainPeriod {
if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod {
break
}

Expand Down
23 changes: 20 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
}

func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
i.instancesMtx.RLock()
inst, ok := i.instances[instanceID]
i.instancesMtx.RUnlock()
inst, ok := i.getInstanceByID(instanceID)
if ok {
return inst
}
Expand Down Expand Up @@ -197,3 +195,22 @@ func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}
}

func (i *Ingester) getInstanceByID(id string) (*instance, bool) {
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()

inst, ok := i.instances[id]
return inst, ok
}

func (i *Ingester) getInstances() []*instance {
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()

instances := make([]*instance, 0, len(i.instances))
for _, instance := range i.instances {
instances = append(instances, instance)
}
return instances
}
41 changes: 25 additions & 16 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ingester/index"
Expand Down Expand Up @@ -45,7 +46,7 @@ func init() {
}

type instance struct {
streamsMtx sync.Mutex
streamsMtx sync.RWMutex
streams map[model.Fingerprint]*stream
index *index.InvertedIndex

Expand Down Expand Up @@ -93,22 +94,10 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}

// TODO: lock smell
i.streamsMtx.Lock()
ids := i.index.Lookup(matchers)
iterators := make([]iter.EntryIterator, len(ids))
for j := range ids {
stream, ok := i.streams[ids[j]]
if !ok {
i.streamsMtx.Unlock()
return ErrStreamMissing
}
iterators[j], err = stream.Iterator(req.Start, req.End, req.Direction)
if err != nil {
return err
}
iterators, err := i.lookupStreams(req, matchers)
if err != nil {
return err
}
i.streamsMtx.Unlock()

iterator := iter.NewHeapIterator(iterators, req.Direction)
defer helpers.LogError("closing iterator", iterator.Close)
Expand Down Expand Up @@ -144,6 +133,26 @@ func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
}, nil
}

func (i *instance) lookupStreams(req *logproto.QueryRequest, matchers []*labels.Matcher) ([]iter.EntryIterator, error) {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()

var err error
ids := i.index.Lookup(matchers)
iterators := make([]iter.EntryIterator, len(ids))
for j := range ids {
stream, ok := i.streams[ids[j]]
if !ok {
return nil, ErrStreamMissing
}
iterators[j], err = stream.Iterator(req.Start, req.End, req.Direction)
if err != nil {
return nil, err
}
}
return iterators, nil
}

func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
Expand Down

0 comments on commit 251017e

Please sign in to comment.