Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis client: default to use explicit pipelining #163

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- [Debug Port](#debug-port)
- [Local Cache](#local-cache)
- [Redis](#redis)
- [Pipelining](#pipelining)
- [One Redis Instance](#one-redis-instance)
- [Two Redis Instances](#two-redis-instances)
- [Contact](#contact)
Expand Down Expand Up @@ -447,7 +448,11 @@ As well Ratelimit supports TLS connections and authentication. These can be conf
1. `REDIS_TLS` & `REDIS_PERSECOND_TLS`: set to `"true"` to enable a TLS connection for the specific connection type.
1. `REDIS_AUTH` & `REDIS_PERSECOND_AUTH`: set to `"password"` to enable authentication to the redis host.

Ratelimit use [implicit pipelining](https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L238) to send requests to redis. Pipelining can be configured using the following environment variables:
## Pipelining

By default, for each request, ratelimit will pick up a connection from pool, wirte multiple redis commands in a single write then reads their responses in a single read. This reduces network delay.

For high throughput scenarios, ratelimit also support [implicit pipelining](https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L238) . It can be configured using the following environment variables:

1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: sets the duration after which internal pipelines will be flushed.
If window is zero then implicit pipelining will be disabled.
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
31 changes: 17 additions & 14 deletions src/redis/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,9 @@ func max(a uint32, b uint32) uint32 {
return b
}

func pipelineAppend(client Client, key string, hitsAddend uint32, result *uint32, expirationSeconds int64) (err error) {
if err = client.DoCmd(result, "INCRBY", key, hitsAddend); err != nil {
return
}
if err = client.DoCmd(nil, "EXPIRE", key, expirationSeconds); err != nil {
return
}
return
func pipelineAppend(client Client, pipeline *Pipeline, key string, hitsAddend uint32, result *uint32, expirationSeconds int64) {
*pipeline = client.PipeAppend(*pipeline, result, "INCRBY", key, hitsAddend)
*pipeline = client.PipeAppend(*pipeline, nil, "EXPIRE", key, expirationSeconds)
}

func (this *rateLimitCacheImpl) DoLimit(
Expand Down Expand Up @@ -74,7 +69,7 @@ func (this *rateLimitCacheImpl) DoLimit(

isOverLimitWithLocalCache := make([]bool, len(request.Descriptors))
results := make([]uint32, len(request.Descriptors))
var err error
var pipeline, perSecondPipeline Pipeline

// Now, actually setup the pipeline, skipping empty cache keys.
for i, cacheKey := range cacheKeys {
Expand All @@ -101,16 +96,24 @@ func (this *rateLimitCacheImpl) DoLimit(

// Use the perSecondConn if it is not nil and the cacheKey represents a per second Limit.
if this.perSecondClient != nil && cacheKey.PerSecond {
if err = pipelineAppend(this.perSecondClient, cacheKey.Key, hitsAddend, &results[i], expirationSeconds); err != nil {
break
if perSecondPipeline == nil {
perSecondPipeline = Pipeline{}
}
pipelineAppend(this.perSecondClient, &perSecondPipeline, cacheKey.Key, hitsAddend, &results[i], expirationSeconds)
} else {
if err = pipelineAppend(this.client, cacheKey.Key, hitsAddend, &results[i], expirationSeconds); err != nil {
break
if pipeline == nil {
pipeline = Pipeline{}
}
pipelineAppend(this.client, &pipeline, cacheKey.Key, hitsAddend, &results[i], expirationSeconds)
}
}
checkError(err)

if pipeline != nil {
checkError(this.client.PipeDo(pipeline))
}
if perSecondPipeline != nil {
checkError(this.perSecondClient.PipeDo(perSecondPipeline))
}

// Now fetch the pipeline.
responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus,
Expand Down
23 changes: 23 additions & 0 deletions src/redis/driver.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package redis

import "github.com/mediocregopher/radix/v3"

// Errors that may be raised during config parsing.
type RedisError string

Expand All @@ -17,10 +19,31 @@ type Client interface {
// @param args supplies the additional arguments.
DoCmd(rcv interface{}, cmd, key string, args ...interface{}) error

// PipeAppend append a command onto the pipeline queue.
//
// @param pipeline supplies the queue for pending commands.
// @param rcv supplies receiver for the result.
// @param cmd supplies the command to append.
// @param key supplies the key to append.
// @param args supplies the additional arguments.
PipeAppend(pipeline Pipeline, rcv interface{}, cmd, key string, args ...interface{}) Pipeline
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PipeAppend seems no need to be method of Client interface. Add it just for unit testing. Need more thinking.


// PipeDo writes multiple commands to a Conn in
// a single write, then reads their responses in a single read. This reduces
// network delay into a single round-trip.
//
// @param pipeline supplies the queue for pending commands.
PipeDo(pipeline Pipeline) error

// Once Close() is called all future method calls on the Client will return
// an error
Close() error

// NumActiveConns return number of active connections, used in testing.
NumActiveConns() int

// ImplicitPipeliningEnabled return true if implicit pipelining is enabled.
ImplicitPipeliningEnabled() bool
}

type Pipeline []radix.CmdAction
45 changes: 37 additions & 8 deletions src/redis/driver_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func poolTrace(ps *poolStats) trace.PoolTrace {
}

type clientImpl struct {
client radix.Client
stats poolStats
client radix.Client
stats poolStats
implicitPipelining bool
}

func checkError(err error) {
Expand Down Expand Up @@ -76,11 +77,17 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth string, url string, pool

stats := newPoolStats(scope)

opts := []radix.PoolOpt{radix.PoolConnFunc(df), radix.PoolWithTrace(poolTrace(&stats))}

implicitPipelining := true
if pipelineWindow == 0 && pipelineLimit == 0 {
implicitPipelining = false
} else {
opts = append(opts, radix.PoolPipelineWindow(pipelineWindow, pipelineLimit))
}

// TODO: support sentinel and redis cluster
pool, err := radix.NewPool("tcp", url, poolSize, radix.PoolConnFunc(df),
radix.PoolPipelineWindow(pipelineWindow, pipelineLimit),
radix.PoolWithTrace(poolTrace(&stats)),
)
pool, err := radix.NewPool("tcp", url, poolSize, opts...)
checkError(err)

// Check if connection is good
Expand All @@ -91,8 +98,9 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth string, url string, pool
}

return &clientImpl{
client: pool,
stats: stats,
client: pool,
stats: stats,
implicitPipelining: implicitPipelining,
}
}

Expand All @@ -107,3 +115,24 @@ func (c *clientImpl) Close() error {
func (c *clientImpl) NumActiveConns() int {
return int(c.stats.connectionActive.Value())
}

func (c *clientImpl) PipeAppend(pipeline Pipeline, rcv interface{}, cmd, key string, args ...interface{}) Pipeline {
return append(pipeline, radix.FlatCmd(rcv, cmd, key, args...))
}

func (c *clientImpl) PipeDo(pipeline Pipeline) error {
if c.implicitPipelining {
for _, action := range pipeline {
if err := c.client.Do(action); err != nil {
return err
}
}
return nil
}

return c.client.Do(radix.Pipeline(pipeline...))
}

func (c *clientImpl) ImplicitPipeliningEnabled() bool {
return c.implicitPipelining
}
8 changes: 4 additions & 4 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ type Settings struct {
RedisPoolSize int `envconfig:"REDIS_POOL_SIZE" default:"10"`
RedisAuth string `envconfig:"REDIS_AUTH" default:""`
RedisTls bool `envconfig:"REDIS_TLS" default:"false"`
RedisPipelineWindow time.Duration `envconfig:"REDIS_PIPELINE_WINDOW" default:"75µs"`
RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"8"`
RedisPipelineWindow time.Duration `envconfig:"REDIS_PIPELINE_WINDOW" default:"0"`
RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"`
RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"`
RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"`
RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"`
RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"`
RedisPerSecondAuth string `envconfig:"REDIS_PERSECOND_AUTH" default:""`
RedisPerSecondTls bool `envconfig:"REDIS_PERSECOND_TLS" default:"false"`
RedisPerSecondPipelineWindow time.Duration `envconfig:"REDIS_PERSECOND_PIPELINE_WINDOW" default:"75µs"`
RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"8"`
RedisPerSecondPipelineWindow time.Duration `envconfig:"REDIS_PERSECOND_PIPELINE_WINDOW" default:"0"`
RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"`
ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"`
LocalCacheSizeInBytes int `envconfig:"LOCAL_CACHE_SIZE_IN_BYTES" default:"0"`
}
Expand Down
48 changes: 48 additions & 0 deletions test/mocks/redis/redis.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading