diff --git a/Makefile b/Makefile index ab4186019ea03..7bea127450b07 100644 --- a/Makefile +++ b/Makefile @@ -239,6 +239,7 @@ publish: dist lint: GO111MODULE=on GOGC=10 golangci-lint run -v $(GOLANGCI_ARG) + faillint -paths "sync/atomic=go.uber.org/atomic" ./... ######## # Test # diff --git a/go.mod b/go.mod index d4fe73ac0cd21..b8966751355dc 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,7 @@ require ( github.com/ugorji/go v1.1.7 // indirect github.com/weaveworks/common v0.0.0-20200625145055-4b1847531bc9 go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50 + go.uber.org/atomic v1.6.0 golang.org/x/net v0.0.0-20200707034311-ab3426394381 google.golang.org/grpc v1.29.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/loki-build-image/Dockerfile b/loki-build-image/Dockerfile index abba84fe825d4..6113fc4ee6f28 100644 --- a/loki-build-image/Dockerfile +++ b/loki-build-image/Dockerfile @@ -20,6 +20,14 @@ RUN apk add --no-cache docker-cli FROM golang:1.14.2 as drone RUN GO111MODULE=on go get github.com/drone/drone-cli/drone@1fad337d74ca0ecf420993d9d2d7229a1c99f054 +# Install faillint used to lint go imports in CI. +# This collisions with the version of go tools used in the base image, thus we install it in its own image and copy it over. +# Error: +# github.com/fatih/faillint@v1.5.0 requires golang.org/x/tools@v0.0.0-20200207224406-61798d64f025 +# (not golang.org/x/tools@v0.0.0-20190918214920-58d531046acd from golang.org/x/tools/cmd/goyacc@58d531046acdc757f177387bc1725bfa79895d69) +FROM golang:1.14.2 as faillint +RUN GO111MODULE=on go get github.com/fatih/faillint@v1.5.0 + FROM golang:1.14.2-stretch RUN apt-get update && \ apt-get install -qy \ @@ -33,6 +41,7 @@ COPY --from=docker /usr/bin/docker /usr/bin/docker COPY --from=helm /usr/bin/helm /usr/bin/helm COPY --from=golangci /bin/golangci-lint /usr/local/bin COPY --from=drone /go/bin/drone /usr/bin/drone +COPY --from=faillint /go/bin/faillint /usr/bin/faillint # Install some necessary dependencies. # Forcing GO111MODULE=on is required to specify dependencies at specific versions using the go mod notation. diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 084e82b7d223d..2461745008512 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -4,7 +4,6 @@ import ( "context" "flag" "net/http" - "sync/atomic" "time" cortex_distributor "github.com/cortexproject/cortex/pkg/distributor" @@ -14,6 +13,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/services" "github.com/pkg/errors" + "go.uber.org/atomic" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" @@ -164,14 +164,14 @@ type streamTracker struct { stream logproto.Stream minSuccess int maxFailures int - succeeded int32 - failed int32 + succeeded atomic.Int32 + failed atomic.Int32 } // TODO taken from Cortex, see if we can refactor out an usable interface. type pushTracker struct { - samplesPending int32 - samplesFailed int32 + samplesPending atomic.Int32 + samplesFailed atomic.Int32 done chan struct{} err chan error } @@ -263,10 +263,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } tracker := pushTracker{ - samplesPending: int32(len(streams)), done: make(chan struct{}), err: make(chan error), } + tracker.samplesPending.Store(int32(len(streams))) for ingester, samples := range samplesByIngester { go func(ingester ring.IngesterDesc, samples []*streamTracker) { // Use a background context to make sure all ingesters get samples even if we return early @@ -304,17 +304,17 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDes // goroutine will write to either channel. for i := range streamTrackers { if err != nil { - if atomic.AddInt32(&streamTrackers[i].failed, 1) <= int32(streamTrackers[i].maxFailures) { + if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) { continue } - if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 { + if pushTracker.samplesFailed.Inc() == 1 { pushTracker.err <- err } } else { - if atomic.AddInt32(&streamTrackers[i].succeeded, 1) != int32(streamTrackers[i].minSuccess) { + if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) { continue } - if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 { + if pushTracker.samplesPending.Dec() == 0 { pushTracker.done <- struct{}{} } } diff --git a/pkg/ingester/mapper.go b/pkg/ingester/mapper.go index 6817d04181d1b..0b57b06f3f558 100644 --- a/pkg/ingester/mapper.go +++ b/pkg/ingester/mapper.go @@ -5,7 +5,6 @@ import ( "sort" "strings" "sync" - "sync/atomic" "github.com/prometheus/prometheus/pkg/labels" @@ -14,6 +13,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" + "go.uber.org/atomic" ) const maxMappedFP = 1 << 20 // About 1M fingerprints reserved for mapping. @@ -24,7 +24,7 @@ var separatorString = string([]byte{model.SeparatorByte}) // collisions. type fpMapper struct { // highestMappedFP has to be aligned for atomic operations. - highestMappedFP model.Fingerprint + highestMappedFP atomic.Uint64 mtx sync.RWMutex // Protects mappings. // maps original fingerprints to a map of string representations of @@ -163,7 +163,7 @@ func (m *fpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric []clien } func (m *fpMapper) nextMappedFP() model.Fingerprint { - mappedFP := model.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1)) + mappedFP := model.Fingerprint(m.highestMappedFP.Inc()) if mappedFP > maxMappedFP { panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP)) } diff --git a/vendor/modules.txt b/vendor/modules.txt index bb91aca32abb9..f2bd6e99edfc1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -984,6 +984,7 @@ go.opencensus.io/trace/internal go.opencensus.io/trace/propagation go.opencensus.io/trace/tracestate # go.uber.org/atomic v1.6.0 +## explicit go.uber.org/atomic # go.uber.org/goleak v1.0.0 go.uber.org/goleak