Skip to content

Commit

Permalink
Update github.com/thanos-io/objstore (#5604)
Browse files Browse the repository at this point in the history
* Update github.com/thanos-io/objstore

* Fix span finish when error happens

As highlighted by the review.
  • Loading branch information
simonswine authored Jul 28, 2023
1 parent 19a35c6 commit 12792f6
Show file tree
Hide file tree
Showing 18 changed files with 295 additions and 281 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.0.0-20230717235037-3f2821e2c1b1
github.com/prometheus/procfs v0.11.0
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca
github.com/thanos-io/objstore v0.0.0-20230727115635-d0c43443ecda
github.com/xlab/treeprint v1.2.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0013
go.opentelemetry.io/otel v1.16.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1253,8 +1253,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca h1:JRF7i58HovirZQVJGwCClQsMK6CCmK2fvialXjeoSpI=
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM=
github.com/thanos-io/objstore v0.0.0-20230727115635-d0c43443ecda h1:DtxaU/a7QRPiUhwtPrZFlS81y+9Mgny4KoLq65cu04U=
github.com/thanos-io/objstore v0.0.0-20230727115635-d0c43443ecda/go.mod h1:IS7Z25+0KaknyU2P5PTP/5hwY6Yr/FzbInF88Yd5auU=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/uber/jaeger-client-go v2.28.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
Expand Down
6 changes: 3 additions & 3 deletions pkg/mimir/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/thanos-io/objstore/tracing"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
"github.com/uber/jaeger-client-go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
Expand All @@ -22,14 +22,14 @@ import (
// ThanosTracerUnaryInterceptor injects the opentracing global tracer into the context
// in order to get it picked up by Thanos components.
func ThanosTracerUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(tracing.ContextWithTracer(ctx, opentracing.GlobalTracer()), req)
return handler(objstoretracing.ContextWithTracer(ctx, opentracing.GlobalTracer()), req)
}

// ThanosTracerStreamInterceptor injects the opentracing global tracer into the context
// in order to get it picked up by Thanos components.
func ThanosTracerStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, wrappedServerStream{
ctx: tracing.ContextWithTracer(ss.Context(), opentracing.GlobalTracer()),
ctx: objstoretracing.ContextWithTracer(ss.Context(), opentracing.GlobalTracer()),
ServerStream: ss,
})
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/regexp"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"

"github.com/grafana/mimir/pkg/storage/bucket/azure"
"github.com/grafana/mimir/pkg/storage/bucket/filesystem"
Expand Down Expand Up @@ -180,7 +181,7 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
backendClient = NewPrefixedBucketClient(backendClient, cfg.StoragePrefix)
}

instrumentedClient := objstore.NewTracingBucket(bucketWithMetrics(backendClient, name, reg))
instrumentedClient := objstoretracing.WrapWithTraces(bucketWithMetrics(backendClient, name, reg))

// Wrap the client with any provided middleware
for _, wrap := range cfg.Middlewares {
Expand All @@ -203,8 +204,9 @@ func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus
reg = prometheus.WrapRegistererWithPrefix("thanos_", reg)
reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg)

return objstore.BucketWithMetrics(
"", // bucket label value
return objstore.WrapWithMetrics(
bucketClient,
reg)
reg,
"", // bucket label value
)
}
8 changes: 4 additions & 4 deletions pkg/storage/tsdb/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestMetaFetcher_Fetch_ShouldReturnDiscoveredBlocksIncludingMarkedForDeletio
require.NoError(t, err)
bkt = BucketWithGlobalMarkers(bkt)

f, err := NewMetaFetcher(logger, 10, objstore.BucketWithMetrics("test", bkt, reg), t.TempDir(), reg, nil)
f, err := NewMetaFetcher(logger, 10, objstore.WrapWithMetrics(bkt, reg, "test"), t.TempDir(), reg, nil)
require.NoError(t, err)

t.Run("should return no metas and no partials on no block in the storage", func(t *testing.T) {
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestMetaFetcher_FetchWithoutMarkedForDeletion_ShouldReturnDiscoveredBlocksE
require.NoError(t, err)
bkt = BucketWithGlobalMarkers(bkt)

f, err := NewMetaFetcher(logger, 10, objstore.BucketWithMetrics("test", bkt, reg), t.TempDir(), reg, nil)
f, err := NewMetaFetcher(logger, 10, objstore.WrapWithMetrics(bkt, reg, "test"), t.TempDir(), reg, nil)
require.NoError(t, err)

t.Run("should return no metas and no partials on no block in the storage", func(t *testing.T) {
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestMetaFetcher_ShouldNotIssueAnyAPICallToObjectStorageIfAllBlockMetasAreCa

// Create a fetcher and fetch block metas to populate the cache on disk.
reg1 := prometheus.NewPedanticRegistry()
fetcher1, err := NewMetaFetcher(logger, 10, objstore.BucketWithMetrics("test", bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg1)), fetcherDir, nil, nil)
fetcher1, err := NewMetaFetcher(logger, 10, objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg1), "test"), fetcherDir, nil, nil)
require.NoError(t, err)
actualMetas, _, actualErr := fetcher1.Fetch(ctx)
require.NoError(t, actualErr)
Expand All @@ -306,7 +306,7 @@ func TestMetaFetcher_ShouldNotIssueAnyAPICallToObjectStorageIfAllBlockMetasAreCa

// Create a new fetcher and fetch blocks again. This time we expect all meta.json to be loaded from cache.
reg2 := prometheus.NewPedanticRegistry()
fetcher2, err := NewMetaFetcher(logger, 10, objstore.BucketWithMetrics("test", bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg2)), fetcherDir, nil, nil)
fetcher2, err := NewMetaFetcher(logger, 10, objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg2), "test"), fetcherDir, nil, nil)
require.NoError(t, err)
actualMetas, _, actualErr = fetcher2.Fetch(ctx)
require.NoError(t, actualErr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func TestBucketWithGlobalMarkers_ShouldWorkCorrectlyWithBucketMetrics(t *testing
// global markers (intentionally in the middle of the chain) and
// user prefix.
bkt, _ := mimir_testutil.PrepareFilesystemBucket(t)
bkt = objstore.BucketWithMetrics("", bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg))
bkt = objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", reg), "")
bkt = BucketWithGlobalMarkers(bkt)
userBkt := bucket.NewUserBucketClient("user-1", bkt, nil)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/testutil/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ func PrepareFilesystemBucket(t testing.TB) (objstore.Bucket, string) {
bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

return objstore.BucketWithMetrics("test", bkt, nil), storageDir
return objstore.WrapWithMetrics(bkt, nil, "test"), storageDir
}
10 changes: 5 additions & 5 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/prometheus/prometheus/tsdb/hashcache"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/tracing"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand Down Expand Up @@ -611,9 +611,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie

// Wait for the query gate only after opening blocks. Opening blocks is usually fast (~1ms),
// but sometimes it can take minutes if the block isn't loaded and there is a surge in queries for unloaded blocks.
tracing.DoWithSpan(ctx, "store_query_gate_ismyturn", func(ctx context.Context, _ tracing.Span) {
err = s.queryGate.Start(ctx)
})
span, spanCtx := opentracing.StartSpanFromContext(ctx, "store_query_gate_ismyturn")
err = s.queryGate.Start(spanCtx)
span.Finish()
if err != nil {
return errors.Wrapf(err, "failed to wait for turn")
}
Expand Down Expand Up @@ -1252,7 +1252,7 @@ func (s *BucketStore) recordSeriesHashCacheStats(stats *queryStats) {

func (s *BucketStore) openBlocksForReading(ctx context.Context, skipChunks bool, minT, maxT int64, blockMatchers []*labels.Matcher, stats *safeQueryStats) ([]*bucketBlock, map[ulid.ULID]*bucketIndexReader, map[ulid.ULID]chunkReader) {
// ignore the span context so that we can use the context for cancellation
span, _ := tracing.StartSpan(ctx, "bucket_store_open_blocks_for_reading")
span, _ := opentracing.StartSpanFromContext(ctx, "bucket_store_open_blocks_for_reading")
defer span.Finish()

s.blocksMx.RLock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/bucket_index_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/thanos-io/objstore/tracing"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/storage/tsdb"
Expand Down Expand Up @@ -83,7 +83,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
cached bool
promise expandedPostingsPromise
)
span, ctx := tracing.StartSpan(ctx, "ExpandedPostings()")
span, ctx := opentracing.StartSpanFromContext(ctx, "ExpandedPostings()")
defer func() {
span.LogKV("returned postings", len(returnRefs), "cached", cached, "promise_loaded", loaded, "block_id", r.block.meta.ULID.String())
if returnErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/usagestats/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,5 +375,5 @@ func prepareLocalBucketClient(t *testing.T) objstore.InstrumentedBucket {
bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: t.TempDir()})
require.NoError(t, err)

return objstore.BucketWithMetrics("", bucketClient, nil)
return objstore.WrapWithMetrics(bucketClient, nil, "")
}
8 changes: 4 additions & 4 deletions pkg/usagestats/seed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestReadSeedFile(t *testing.T) {
bucketClient := &bucket.ClientMock{}
testData.setup(bucketClient)

seed, err := readSeedFile(context.Background(), objstore.BucketWithMetrics("", bucketClient, nil), log.NewNopLogger())
seed, err := readSeedFile(context.Background(), objstore.WrapWithMetrics(bucketClient, nil, ""), log.NewNopLogger())
if testData.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), testData.expectedErr.Error())
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestWriteSeedFile(t *testing.T) {
bucketClient := &bucket.ClientMock{}
testData.setup(bucketClient)

err := writeSeedFile(context.Background(), objstore.BucketWithMetrics("", bucketClient, nil), seed)
err := writeSeedFile(context.Background(), objstore.WrapWithMetrics(bucketClient, nil, ""), seed)
if testData.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), testData.expectedErr.Error())
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestWaitSeedFileStability(t *testing.T) {
bucketClient := &bucket.ClientMock{}
testData := testSetup(t, bucketClient)

actualSeed, err := waitSeedFileStability(context.Background(), objstore.BucketWithMetrics("", bucketClient, nil), minStability, log.NewNopLogger())
actualSeed, err := waitSeedFileStability(context.Background(), objstore.WrapWithMetrics(bucketClient, nil, ""), minStability, log.NewNopLogger())
if testData.expectedErr != nil {
require.Error(t, err)
require.Contains(t, err.Error(), testData.expectedErr.Error())
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestInitSeedFile_CreatingConcurrency(t *testing.T) {
// Wait for the start.
<-start

seed, err := initSeedFile(context.Background(), objstore.BucketWithMetrics("", bucketClient, nil), minStability, log.NewNopLogger())
seed, err := initSeedFile(context.Background(), objstore.WrapWithMetrics(bucketClient, nil, ""), minStability, log.NewNopLogger())
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions vendor/github.com/thanos-io/objstore/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions vendor/github.com/thanos-io/objstore/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 12792f6

Please sign in to comment.