From 728312f6b371d5e361df4c0e7476330d1f62a308 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 2 Feb 2019 10:58:10 +0100 Subject: [PATCH 1/6] Add options for client-side load balancing on all gRPC clients. Signed-off-by: Tom Wilkie --- gcp/bigtable_index_client.go | 10 ++++------ gcp/bigtable_object_client.go | 3 ++- gcp/gcs_object_client.go | 2 +- gcp/instrumentation.go | 33 +++++++++++++++++---------------- gcp/table_client.go | 3 ++- 5 files changed, 26 insertions(+), 25 deletions(-) diff --git a/gcp/bigtable_index_client.go b/gcp/bigtable_index_client.go index 15975b7594e26..3b79fc5e077e3 100644 --- a/gcp/bigtable_index_client.go +++ b/gcp/bigtable_index_client.go @@ -10,7 +10,6 @@ import ( "cloud.google.com/go/bigtable" ot "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" - "google.golang.org/api/option" "github.com/cortexproject/cortex/pkg/chunk" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" @@ -64,10 +63,8 @@ type storageClientV1 struct { // NewStorageClientV1 returns a new v1 StorageClient. func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - opts := instrumentation() - opts = append(opts, option.WithGRPCDialOption(cfg.GRPCClientConfig.DialOption())) - - client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) + opts := cfg.GRPCClientConfig.DialOption(instrumentation()) + client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, toBigtableOpts(opts)...) if err != nil { return nil, err } @@ -90,7 +87,8 @@ func newStorageClientV1(cfg Config, schemaCfg chunk.SchemaConfig, client *bigtab // NewStorageClientColumnKey returns a new v2 StorageClient. func NewStorageClientColumnKey(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, instrumentation()...) + opts := toBigtableOpts(cfg.GRPCClientConfig.DialOption(instrumentation())) + client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err } diff --git a/gcp/bigtable_object_client.go b/gcp/bigtable_object_client.go index 1dab5853fe704..383286a7ed42e 100644 --- a/gcp/bigtable_object_client.go +++ b/gcp/bigtable_object_client.go @@ -22,7 +22,8 @@ type bigtableObjectClient struct { // NewBigtableObjectClient makes a new chunk.ObjectClient that stores chunks in // Bigtable. func NewBigtableObjectClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) { - client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, instrumentation()...) + opts := toBigtableOpts(cfg.GRPCClientConfig.DialOption(instrumentation())) + client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err } diff --git a/gcp/gcs_object_client.go b/gcp/gcs_object_client.go index 1f621cf6f7547..ca7d249dbf753 100644 --- a/gcp/gcs_object_client.go +++ b/gcp/gcs_object_client.go @@ -31,7 +31,7 @@ func (cfg *GCSConfig) RegisterFlags(f *flag.FlagSet) { // NewGCSObjectClient makes a new chunk.ObjectClient that writes chunks to GCS. func NewGCSObjectClient(ctx context.Context, cfg GCSConfig, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) { - client, err := storage.NewClient(ctx, instrumentation()...) + client, err := storage.NewClient(ctx) if err != nil { return nil, err } diff --git a/gcp/instrumentation.go b/gcp/instrumentation.go index 62b7b9e050ce9..40687a5be7a0c 100644 --- a/gcp/instrumentation.go +++ b/gcp/instrumentation.go @@ -1,9 +1,8 @@ package gcp import ( - "github.com/grpc-ecosystem/go-grpc-middleware" otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/api/option" @@ -22,19 +21,21 @@ var bigtableRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), }, []string{"operation", "status_code"}) -func instrumentation() []option.ClientOption { - return []option.ClientOption{ - option.WithGRPCDialOption( - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.PrometheusGRPCUnaryInstrumentation(bigtableRequestDuration), - )), - ), - option.WithGRPCDialOption( - grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), - middleware.PrometheusGRPCStreamInstrumentation(bigtableRequestDuration), - )), - ), +func instrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { + return []grpc.UnaryClientInterceptor{ + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.PrometheusGRPCUnaryInstrumentation(bigtableRequestDuration), + }, + []grpc.StreamClientInterceptor{ + otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), + middleware.PrometheusGRPCStreamInstrumentation(bigtableRequestDuration), + } +} + +func toBigtableOpts(opts []grpc.DialOption) []option.ClientOption { + result := make([]option.ClientOption, 0, len(opts)) + for _, opt := range opts { + result = append(result, option.WithGRPCDialOption(opt)) } + return result } diff --git a/gcp/table_client.go b/gcp/table_client.go index 2a62c1b04216b..258e9c73fa2e3 100644 --- a/gcp/table_client.go +++ b/gcp/table_client.go @@ -17,7 +17,8 @@ type tableClient struct { // NewTableClient returns a new TableClient. func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) { - client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, instrumentation()...) + opts := toBigtableOpts(cfg.GRPCClientConfig.DialOption(instrumentation())) + client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err } From ecd173c223b91bddd45bd22ec0b3019a9090fb03 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 2 Feb 2019 11:29:06 +0100 Subject: [PATCH 2/6] Correctly instrument GCS client. Signed-off-by: Tom Wilkie --- gcp/bigtable_index_client.go | 6 ++-- gcp/bigtable_object_client.go | 2 +- gcp/gcs_object_client.go | 7 +++- gcp/instrumentation.go | 66 +++++++++++++++++++++++++++++------ gcp/table_client.go | 2 +- 5 files changed, 67 insertions(+), 16 deletions(-) diff --git a/gcp/bigtable_index_client.go b/gcp/bigtable_index_client.go index 3b79fc5e077e3..9297bb1e98ca3 100644 --- a/gcp/bigtable_index_client.go +++ b/gcp/bigtable_index_client.go @@ -63,8 +63,8 @@ type storageClientV1 struct { // NewStorageClientV1 returns a new v1 StorageClient. func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - opts := cfg.GRPCClientConfig.DialOption(instrumentation()) - client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, toBigtableOpts(opts)...) + opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) + client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err } @@ -87,7 +87,7 @@ func newStorageClientV1(cfg Config, schemaCfg chunk.SchemaConfig, client *bigtab // NewStorageClientColumnKey returns a new v2 StorageClient. func NewStorageClientColumnKey(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - opts := toBigtableOpts(cfg.GRPCClientConfig.DialOption(instrumentation())) + opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err diff --git a/gcp/bigtable_object_client.go b/gcp/bigtable_object_client.go index 383286a7ed42e..aa927133aff3d 100644 --- a/gcp/bigtable_object_client.go +++ b/gcp/bigtable_object_client.go @@ -22,7 +22,7 @@ type bigtableObjectClient struct { // NewBigtableObjectClient makes a new chunk.ObjectClient that stores chunks in // Bigtable. func NewBigtableObjectClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) { - opts := toBigtableOpts(cfg.GRPCClientConfig.DialOption(instrumentation())) + opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err diff --git a/gcp/gcs_object_client.go b/gcp/gcs_object_client.go index ca7d249dbf753..34b1f256f385d 100644 --- a/gcp/gcs_object_client.go +++ b/gcp/gcs_object_client.go @@ -31,7 +31,12 @@ func (cfg *GCSConfig) RegisterFlags(f *flag.FlagSet) { // NewGCSObjectClient makes a new chunk.ObjectClient that writes chunks to GCS. func NewGCSObjectClient(ctx context.Context, cfg GCSConfig, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) { - client, err := storage.NewClient(ctx) + option, err := gcsInstrumentation(ctx) + if err != nil { + return nil, err + } + + client, err := storage.NewClient(ctx, option) if err != nil { return nil, err } diff --git a/gcp/instrumentation.go b/gcp/instrumentation.go index 40687a5be7a0c..69347dd9d3f60 100644 --- a/gcp/instrumentation.go +++ b/gcp/instrumentation.go @@ -1,27 +1,45 @@ package gcp import ( + "context" + "net/http" + "strconv" + "time" + otgrpc "github.com/opentracing-contrib/go-grpc" opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/api/option" + google_http "google.golang.org/api/transport/http" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/util/middleware" ) -var bigtableRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "bigtable_request_duration_seconds", - Help: "Time spent doing Bigtable requests.", +var ( + bigtableRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "bigtable_request_duration_seconds", + Help: "Time spent doing Bigtable requests.", + + // Bigtable latency seems to range from a few ms to a few hundred ms and is + // important. So use 6 buckets from 1ms to 1s. + Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), + }, []string{"operation", "status_code"}) - // Bigtable latency seems to range from a few ms to a few hundred ms and is - // important. So use 6 buckets from 1ms to 1s. - Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), -}, []string{"operation", "status_code"}) + gcsRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "gcs_request_duration_seconds", + Help: "Time spent doing GCS requests.", + + // Bigtable latency seems to range from a few ms to a few hundred ms and is + // important. So use 6 buckets from 1ms to 1s. + Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), + }, []string{"operation", "status_code"}) +) -func instrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { +func bigtableInstrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { return []grpc.UnaryClientInterceptor{ otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), middleware.PrometheusGRPCUnaryInstrumentation(bigtableRequestDuration), @@ -32,10 +50,38 @@ func instrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterc } } -func toBigtableOpts(opts []grpc.DialOption) []option.ClientOption { +func gcsInstrumentation(ctx context.Context) (option.ClientOption, error) { + transport, err := google_http.NewTransport(ctx, http.DefaultTransport) + if err != nil { + return nil, err + } + client := &http.Client{ + Transport: instrumentedTransport{ + observer: gcsRequestDuration, + next: transport, + }, + } + return option.WithHTTPClient(client), nil +} + +func toOptions(opts []grpc.DialOption) []option.ClientOption { result := make([]option.ClientOption, 0, len(opts)) for _, opt := range opts { result = append(result, option.WithGRPCDialOption(opt)) } return result } + +type instrumentedTransport struct { + observer prometheus.ObserverVec + next http.RoundTripper +} + +func (i instrumentedTransport) RoundTrip(req *http.Request) (*http.Response, error) { + start := time.Now() + resp, err := i.next.RoundTrip(req) + if err == nil { + i.observer.WithLabelValues(req.URL.Path, strconv.Itoa(resp.StatusCode)).Observe(time.Since(start).Seconds()) + } + return resp, err +} diff --git a/gcp/table_client.go b/gcp/table_client.go index 258e9c73fa2e3..9f705e7fea443 100644 --- a/gcp/table_client.go +++ b/gcp/table_client.go @@ -17,7 +17,7 @@ type tableClient struct { // NewTableClient returns a new TableClient. func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) { - opts := toBigtableOpts(cfg.GRPCClientConfig.DialOption(instrumentation())) + opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err From 23b69ca0c4ec4c4a277afb697bb8d387c724e726 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 2 Feb 2019 12:23:18 +0100 Subject: [PATCH 3/6] Also wrap errors in the bigtable table client so we can see where they fail. Signed-off-by: Tom Wilkie --- gcp/table_client.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/gcp/table_client.go b/gcp/table_client.go index 9f705e7fea443..ae6d16444ddd7 100644 --- a/gcp/table_client.go +++ b/gcp/table_client.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/pkg/errors" ) type tableClient struct { @@ -31,7 +32,7 @@ func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) func (c *tableClient) ListTables(ctx context.Context) ([]string, error) { tables, err := c.client.Tables(ctx) if err != nil { - return nil, err + return nil, errors.Wrap(err, "client.Tables") } // Check each table has the right column family. If not, omit it. @@ -39,7 +40,7 @@ func (c *tableClient) ListTables(ctx context.Context) ([]string, error) { for _, table := range tables { info, err := c.client.TableInfo(ctx, table) if err != nil { - return nil, err + return nil, errors.Wrap(err, "client.TableInfo") } if hasColumnFamily(info.FamilyInfos) { @@ -62,10 +63,15 @@ func hasColumnFamily(infos []bigtable.FamilyInfo) bool { func (c *tableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error { if err := c.client.CreateTable(ctx, desc.Name); err != nil { if !alreadyExistsError(err) { - return err + return errors.Wrap(err, "client.CreateTable") } } - return c.client.CreateColumnFamily(ctx, desc.Name, columnFamily) + + if err := c.client.CreateColumnFamily(ctx, desc.Name, columnFamily); err != nil { + return errors.Wrap(err, "client.CreateColumnFamily") + } + + return nil } func alreadyExistsError(err error) bool { @@ -77,7 +83,7 @@ func alreadyExistsError(err error) bool { func (c *tableClient) DeleteTable(ctx context.Context, name string) error { if err := c.client.DeleteTable(ctx, name); err != nil { - return err + return errors.Wrap(err, "client.DeleteTable") } return nil From c89c4dcb6955fbb503d5ede5e21cf8dac36f3d63 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 8 Feb 2019 21:26:51 +0100 Subject: [PATCH 4/6] Make sure GCS instrumentation doesn't have full paths Signed-off-by: Goutham Veeramachaneni --- gcp/instrumentation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcp/instrumentation.go b/gcp/instrumentation.go index 69347dd9d3f60..a87cd10b3dffb 100644 --- a/gcp/instrumentation.go +++ b/gcp/instrumentation.go @@ -81,7 +81,7 @@ func (i instrumentedTransport) RoundTrip(req *http.Request) (*http.Response, err start := time.Now() resp, err := i.next.RoundTrip(req) if err == nil { - i.observer.WithLabelValues(req.URL.Path, strconv.Itoa(resp.StatusCode)).Observe(time.Since(start).Seconds()) + i.observer.WithLabelValues(req.Method, strconv.Itoa(resp.StatusCode)).Observe(time.Since(start).Seconds()) } return resp, err } From 8ba9cca29b2489521c4876f159644ac604c940b9 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 12 Feb 2019 00:32:37 +0100 Subject: [PATCH 5/6] Set GCS.ChunkSize to 0 to reduce allocs. Signed-off-by: Goutham Veeramachaneni --- gcp/gcs_object_client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/gcp/gcs_object_client.go b/gcp/gcs_object_client.go index 34b1f256f385d..89c8f22293e2b 100644 --- a/gcp/gcs_object_client.go +++ b/gcp/gcs_object_client.go @@ -64,6 +64,11 @@ func (s *gcsObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) e return err } writer := s.bucket.Object(chunk.ExternalKey()).NewWriter(ctx) + // Default GCSChunkSize is 8M and for each call, 8M is allocated xD + // By setting it to 0, we just upload the object in a single a request + // which should work for our chunk sizes. + writer.ChunkSize = 0 + if _, err := writer.Write(buf); err != nil { return err } From 0a29901ebf1d0079f48e4dee078d38e8c490a3fd Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 12 Feb 2019 17:55:45 +0100 Subject: [PATCH 6/6] Add a flag for ChunkBufferSize Signed-off-by: Goutham Veeramachaneni --- gcp/gcs_object_client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gcp/gcs_object_client.go b/gcp/gcs_object_client.go index 89c8f22293e2b..0d92a5b49e5aa 100644 --- a/gcp/gcs_object_client.go +++ b/gcp/gcs_object_client.go @@ -21,12 +21,14 @@ type gcsObjectClient struct { // GCSConfig is config for the GCS Chunk Client. type GCSConfig struct { - BucketName string `yaml:"bucket_name"` + BucketName string `yaml:"bucket_name"` + ChunkBufferSize int `yaml:"chunk_buffer_size"` } // RegisterFlags registers flags. func (cfg *GCSConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.BucketName, "gcs.bucketname", "", "Name of GCS bucket to put chunks in.") + f.IntVar(&cfg.ChunkBufferSize, "gcs.chunk-buffer-size", 0, "The size of the buffer that GCS client for each PUT request. 0 to disable buffering.") } // NewGCSObjectClient makes a new chunk.ObjectClient that writes chunks to GCS. @@ -67,7 +69,7 @@ func (s *gcsObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) e // Default GCSChunkSize is 8M and for each call, 8M is allocated xD // By setting it to 0, we just upload the object in a single a request // which should work for our chunk sizes. - writer.ChunkSize = 0 + writer.ChunkSize = s.cfg.ChunkBufferSize if _, err := writer.Write(buf); err != nil { return err