Skip to content

Commit

Permalink
Merge pull request grafana#1219 from grafana/grpc-client-ratelimit
Browse files Browse the repository at this point in the history
Add options for client-side rate limiting and backoff on all gRPC clients.
  • Loading branch information
tomwilkie authored Feb 12, 2019
2 parents 6fa9b0f + 0a29901 commit 2da94f0
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 40 deletions.
8 changes: 3 additions & 5 deletions gcp/bigtable_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"cloud.google.com/go/bigtable"
ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"google.golang.org/api/option"

"github.com/cortexproject/cortex/pkg/chunk"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
Expand Down Expand Up @@ -64,9 +63,7 @@ type storageClientV1 struct {

// NewStorageClientV1 returns a new v1 StorageClient.
func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) {
opts := instrumentation()
opts = append(opts, option.WithGRPCDialOption(cfg.GRPCClientConfig.DialOption()))

opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation()))
client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...)
if err != nil {
return nil, err
Expand All @@ -90,7 +87,8 @@ func newStorageClientV1(cfg Config, schemaCfg chunk.SchemaConfig, client *bigtab

// NewStorageClientColumnKey returns a new v2 StorageClient.
func NewStorageClientColumnKey(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) {
client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, instrumentation()...)
opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation()))
client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion gcp/bigtable_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type bigtableObjectClient struct {
// NewBigtableObjectClient makes a new chunk.ObjectClient that stores chunks in
// Bigtable.
func NewBigtableObjectClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) {
client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, instrumentation()...)
opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation()))
client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...)
if err != nil {
return nil, err
}
Expand Down
16 changes: 14 additions & 2 deletions gcp/gcs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,24 @@ type gcsObjectClient struct {

// GCSConfig is config for the GCS Chunk Client.
type GCSConfig struct {
BucketName string `yaml:"bucket_name"`
BucketName string `yaml:"bucket_name"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
}

// RegisterFlags registers flags.
func (cfg *GCSConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.BucketName, "gcs.bucketname", "", "Name of GCS bucket to put chunks in.")
f.IntVar(&cfg.ChunkBufferSize, "gcs.chunk-buffer-size", 0, "The size of the buffer that GCS client for each PUT request. 0 to disable buffering.")
}

// NewGCSObjectClient makes a new chunk.ObjectClient that writes chunks to GCS.
func NewGCSObjectClient(ctx context.Context, cfg GCSConfig, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) {
client, err := storage.NewClient(ctx, instrumentation()...)
option, err := gcsInstrumentation(ctx)
if err != nil {
return nil, err
}

client, err := storage.NewClient(ctx, option)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -59,6 +66,11 @@ func (s *gcsObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) e
return err
}
writer := s.bucket.Object(chunk.ExternalKey()).NewWriter(ctx)
// Default GCSChunkSize is 8M and for each call, 8M is allocated xD
// By setting it to 0, we just upload the object in a single a request
// which should work for our chunk sizes.
writer.ChunkSize = s.cfg.ChunkBufferSize

if _, err := writer.Write(buf); err != nil {
return err
}
Expand Down
99 changes: 73 additions & 26 deletions gcp/instrumentation.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,87 @@
package gcp

import (
"github.com/grpc-ecosystem/go-grpc-middleware"
"context"
"net/http"
"strconv"
"time"

otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/api/option"
google_http "google.golang.org/api/transport/http"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/util/middleware"
)

var bigtableRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "bigtable_request_duration_seconds",
Help: "Time spent doing Bigtable requests.",

// Bigtable latency seems to range from a few ms to a few hundred ms and is
// important. So use 6 buckets from 1ms to 1s.
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
}, []string{"operation", "status_code"})

func instrumentation() []option.ClientOption {
return []option.ClientOption{
option.WithGRPCDialOption(
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.PrometheusGRPCUnaryInstrumentation(bigtableRequestDuration),
)),
),
option.WithGRPCDialOption(
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.PrometheusGRPCStreamInstrumentation(bigtableRequestDuration),
)),
),
var (
bigtableRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "bigtable_request_duration_seconds",
Help: "Time spent doing Bigtable requests.",

// Bigtable latency seems to range from a few ms to a few hundred ms and is
// important. So use 6 buckets from 1ms to 1s.
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
}, []string{"operation", "status_code"})

gcsRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "gcs_request_duration_seconds",
Help: "Time spent doing GCS requests.",

// Bigtable latency seems to range from a few ms to a few hundred ms and is
// important. So use 6 buckets from 1ms to 1s.
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
}, []string{"operation", "status_code"})
)

func bigtableInstrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
return []grpc.UnaryClientInterceptor{
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.PrometheusGRPCUnaryInstrumentation(bigtableRequestDuration),
},
[]grpc.StreamClientInterceptor{
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.PrometheusGRPCStreamInstrumentation(bigtableRequestDuration),
}
}

func gcsInstrumentation(ctx context.Context) (option.ClientOption, error) {
transport, err := google_http.NewTransport(ctx, http.DefaultTransport)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: instrumentedTransport{
observer: gcsRequestDuration,
next: transport,
},
}
return option.WithHTTPClient(client), nil
}

func toOptions(opts []grpc.DialOption) []option.ClientOption {
result := make([]option.ClientOption, 0, len(opts))
for _, opt := range opts {
result = append(result, option.WithGRPCDialOption(opt))
}
return result
}

type instrumentedTransport struct {
observer prometheus.ObserverVec
next http.RoundTripper
}

func (i instrumentedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()
resp, err := i.next.RoundTrip(req)
if err == nil {
i.observer.WithLabelValues(req.Method, strconv.Itoa(resp.StatusCode)).Observe(time.Since(start).Seconds())
}
return resp, err
}
19 changes: 13 additions & 6 deletions gcp/table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"google.golang.org/grpc/status"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/pkg/errors"
)

type tableClient struct {
Expand All @@ -17,7 +18,8 @@ type tableClient struct {

// NewTableClient returns a new TableClient.
func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) {
client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, instrumentation()...)
opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation()))
client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, opts...)
if err != nil {
return nil, err
}
Expand All @@ -30,15 +32,15 @@ func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error)
func (c *tableClient) ListTables(ctx context.Context) ([]string, error) {
tables, err := c.client.Tables(ctx)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "client.Tables")
}

// Check each table has the right column family. If not, omit it.
output := make([]string, 0, len(tables))
for _, table := range tables {
info, err := c.client.TableInfo(ctx, table)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "client.TableInfo")
}

if hasColumnFamily(info.FamilyInfos) {
Expand All @@ -61,10 +63,15 @@ func hasColumnFamily(infos []bigtable.FamilyInfo) bool {
func (c *tableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error {
if err := c.client.CreateTable(ctx, desc.Name); err != nil {
if !alreadyExistsError(err) {
return err
return errors.Wrap(err, "client.CreateTable")
}
}
return c.client.CreateColumnFamily(ctx, desc.Name, columnFamily)

if err := c.client.CreateColumnFamily(ctx, desc.Name, columnFamily); err != nil {
return errors.Wrap(err, "client.CreateColumnFamily")
}

return nil
}

func alreadyExistsError(err error) bool {
Expand All @@ -76,7 +83,7 @@ func alreadyExistsError(err error) bool {

func (c *tableClient) DeleteTable(ctx context.Context, name string) error {
if err := c.client.DeleteTable(ctx, name); err != nil {
return err
return errors.Wrap(err, "client.DeleteTable")
}

return nil
Expand Down

0 comments on commit 2da94f0

Please sign in to comment.