From 517eccdfa5157f44c225b5539d65003de0e53a45 Mon Sep 17 00:00:00 2001 From: Tong Cai Date: Fri, 7 Aug 2020 13:18:23 +0800 Subject: [PATCH 1/4] redis cache impl: use explicit pipelining when pipeline(window=0, limit=0) Signed-off-by: Tong Cai --- go.sum | 1 + src/redis/cache_impl.go | 31 +++++++++++---------- src/redis/driver.go | 20 ++++++++++++++ src/redis/driver_impl.go | 41 +++++++++++++++++++++------ test/mocks/redis/redis.go | 34 +++++++++++++++++++++++ test/redis/bench_test.go | 52 ++++++++++++++++++----------------- test/redis/cache_impl_test.go | 30 ++++++++++++-------- 7 files changed, 151 insertions(+), 58 deletions(-) diff --git a/go.sum b/go.sum index 2951b438..d04c7445 100644 --- a/go.sum +++ b/go.sum @@ -152,6 +152,7 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index ea4c52ee..50248cd8 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -36,14 +36,9 @@ func max(a uint32, b uint32) uint32 { return b } -func pipelineAppend(client Client, key string, hitsAddend uint32, result *uint32, expirationSeconds int64) (err error) { - if err = client.DoCmd(result, "INCRBY", key, hitsAddend); err != nil { - return - } - if err = client.DoCmd(nil, "EXPIRE", key, expirationSeconds); err != nil { - return - } - return +func pipelineAppend(client Client, pipeline *Pipeline, key string, hitsAddend uint32, result *uint32, expirationSeconds int64) { + *pipeline = client.PipeAppend(*pipeline, result, "INCRBY", key, hitsAddend) + *pipeline = client.PipeAppend(*pipeline, nil, "EXPIRE", key, expirationSeconds) } func (this *rateLimitCacheImpl) DoLimit( @@ -74,7 +69,7 @@ func (this *rateLimitCacheImpl) DoLimit( isOverLimitWithLocalCache := make([]bool, len(request.Descriptors)) results := make([]uint32, len(request.Descriptors)) - var err error + var pipeline, perSecondPipeline Pipeline // Now, actually setup the pipeline, skipping empty cache keys. for i, cacheKey := range cacheKeys { @@ -101,16 +96,24 @@ func (this *rateLimitCacheImpl) DoLimit( // Use the perSecondConn if it is not nil and the cacheKey represents a per second Limit. if this.perSecondClient != nil && cacheKey.PerSecond { - if err = pipelineAppend(this.perSecondClient, cacheKey.Key, hitsAddend, &results[i], expirationSeconds); err != nil { - break + if perSecondPipeline == nil { + perSecondPipeline = Pipeline{} } + pipelineAppend(this.perSecondClient, &perSecondPipeline, cacheKey.Key, hitsAddend, &results[i], expirationSeconds) } else { - if err = pipelineAppend(this.client, cacheKey.Key, hitsAddend, &results[i], expirationSeconds); err != nil { - break + if pipeline == nil { + pipeline = Pipeline{} } + pipelineAppend(this.client, &pipeline, cacheKey.Key, hitsAddend, &results[i], expirationSeconds) } } - checkError(err) + + if pipeline != nil { + checkError(this.client.PipeDo(pipeline)) + } + if perSecondPipeline != nil { + checkError(this.perSecondClient.PipeDo(pipeline)) + } // Now fetch the pipeline. responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus, diff --git a/src/redis/driver.go b/src/redis/driver.go index 0f672df7..f007104b 100644 --- a/src/redis/driver.go +++ b/src/redis/driver.go @@ -1,5 +1,7 @@ package redis +import "github.com/mediocregopher/radix/v3" + // Errors that may be raised during config parsing. type RedisError string @@ -17,6 +19,22 @@ type Client interface { // @param args supplies the additional arguments. DoCmd(rcv interface{}, cmd, key string, args ...interface{}) error + // Pipe append a command onto the pipeline queue. + // + // @param pipeline supplies the queue for pending commands. + // @param rcv supplies receiver for the result. + // @param cmd supplies the command to append. + // @param key supplies the key to append. + // @param args supplies the additional arguments. + PipeAppend(pipeline Pipeline, rcv interface{}, cmd, key string, args ...interface{}) Pipeline + + // PipeDo writes multiple commands to a Conn in + // a single write, then reads their responses in a single read. This reduces + // network delay into a single round-trip. + // + // @param pipeline supplies the queue for pending commands. + PipeDo(pipeline Pipeline) error + // Once Close() is called all future method calls on the Client will return // an error Close() error @@ -24,3 +42,5 @@ type Client interface { // NumActiveConns return number of active connections, used in testing. NumActiveConns() int } + +type Pipeline []radix.CmdAction diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 47d0d585..7d1d2b10 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -40,8 +40,9 @@ func poolTrace(ps *poolStats) trace.PoolTrace { } type clientImpl struct { - client radix.Client - stats poolStats + client radix.Client + stats poolStats + implicitPipelining bool } func checkError(err error) { @@ -76,11 +77,17 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth string, url string, pool stats := newPoolStats(scope) + opts := []radix.PoolOpt{radix.PoolConnFunc(df), radix.PoolWithTrace(poolTrace(&stats))} + + implicitPipelining := true + if pipelineWindow == 0 && pipelineLimit == 0 { + implicitPipelining = false + } else { + opts = append(opts, radix.PoolPipelineWindow(pipelineWindow, pipelineLimit)) + } + // TODO: support sentinel and redis cluster - pool, err := radix.NewPool("tcp", url, poolSize, radix.PoolConnFunc(df), - radix.PoolPipelineWindow(pipelineWindow, pipelineLimit), - radix.PoolWithTrace(poolTrace(&stats)), - ) + pool, err := radix.NewPool("tcp", url, poolSize, opts...) checkError(err) // Check if connection is good @@ -91,8 +98,9 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth string, url string, pool } return &clientImpl{ - client: pool, - stats: stats, + client: pool, + stats: stats, + implicitPipelining: implicitPipelining, } } @@ -107,3 +115,20 @@ func (c *clientImpl) Close() error { func (c *clientImpl) NumActiveConns() int { return int(c.stats.connectionActive.Value()) } + +func (c *clientImpl) PipeAppend(pipeline Pipeline, rcv interface{}, cmd, key string, args ...interface{}) Pipeline { + return append(pipeline, radix.FlatCmd(rcv, cmd, key, args...)) +} + +func (c *clientImpl) PipeDo(pipeline Pipeline) error { + if c.implicitPipelining { + for _, action := range pipeline { + if err := c.client.Do(action); err != nil { + return err + } + } + return nil + } + + return c.client.Do(radix.Pipeline(pipeline...)) +} diff --git a/test/mocks/redis/redis.go b/test/mocks/redis/redis.go index 4d300146..938daa1e 100644 --- a/test/mocks/redis/redis.go +++ b/test/mocks/redis/redis.go @@ -5,6 +5,7 @@ package mock_redis import ( + redis "github.com/envoyproxy/ratelimit/src/redis" gomock "github.com/golang/mock/gomock" reflect "reflect" ) @@ -78,3 +79,36 @@ func (mr *MockClientMockRecorder) NumActiveConns() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NumActiveConns", reflect.TypeOf((*MockClient)(nil).NumActiveConns)) } + +// PipeAppend mocks base method +func (m *MockClient) PipeAppend(arg0 redis.Pipeline, arg1 interface{}, arg2, arg3 string, arg4 ...interface{}) redis.Pipeline { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1, arg2, arg3} + for _, a := range arg4 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PipeAppend", varargs...) + ret0, _ := ret[0].(redis.Pipeline) + return ret0 +} + +// PipeAppend indicates an expected call of PipeAppend +func (mr *MockClientMockRecorder) PipeAppend(arg0, arg1, arg2, arg3 interface{}, arg4 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1, arg2, arg3}, arg4...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PipeAppend", reflect.TypeOf((*MockClient)(nil).PipeAppend), varargs...) +} + +// PipeDo mocks base method +func (m *MockClient) PipeDo(arg0 redis.Pipeline) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PipeDo", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// PipeDo indicates an expected call of PipeDo +func (mr *MockClientMockRecorder) PipeDo(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PipeDo", reflect.TypeOf((*MockClient)(nil).PipeDo), arg0) +} diff --git a/test/redis/bench_test.go b/test/redis/bench_test.go index 8945f870..37d1468d 100644 --- a/test/redis/bench_test.go +++ b/test/redis/bench_test.go @@ -18,7 +18,7 @@ import ( ) func BenchmarkParallelDoLimit(b *testing.B) { - b.Skip("Skip benchmark") + //b.Skip("Skip benchmark") b.ReportAllocs() @@ -67,28 +67,30 @@ func BenchmarkParallelDoLimit(b *testing.B) { b.Run("no pipeline", mkDoLimitBench(0, 0)) - b.Run("pipeline 35us 1", mkDoLimitBench(35*time.Microsecond, 1)) - b.Run("pipeline 75us 1", mkDoLimitBench(75*time.Microsecond, 1)) - b.Run("pipeline 150us 1", mkDoLimitBench(150*time.Microsecond, 1)) - b.Run("pipeline 300us 1", mkDoLimitBench(300*time.Microsecond, 1)) - - b.Run("pipeline 35us 2", mkDoLimitBench(35*time.Microsecond, 2)) - b.Run("pipeline 75us 2", mkDoLimitBench(75*time.Microsecond, 2)) - b.Run("pipeline 150us 2", mkDoLimitBench(150*time.Microsecond, 2)) - b.Run("pipeline 300us 2", mkDoLimitBench(300*time.Microsecond, 2)) - - b.Run("pipeline 35us 4", mkDoLimitBench(35*time.Microsecond, 4)) - b.Run("pipeline 75us 4", mkDoLimitBench(75*time.Microsecond, 4)) - b.Run("pipeline 150us 4", mkDoLimitBench(150*time.Microsecond, 4)) - b.Run("pipeline 300us 4", mkDoLimitBench(300*time.Microsecond, 4)) - - b.Run("pipeline 35us 8", mkDoLimitBench(35*time.Microsecond, 8)) - b.Run("pipeline 75us 8", mkDoLimitBench(75*time.Microsecond, 8)) - b.Run("pipeline 150us 8", mkDoLimitBench(150*time.Microsecond, 8)) - b.Run("pipeline 300us 8", mkDoLimitBench(300*time.Microsecond, 8)) - - b.Run("pipeline 35us 16", mkDoLimitBench(35*time.Microsecond, 16)) - b.Run("pipeline 75us 16", mkDoLimitBench(75*time.Microsecond, 16)) - b.Run("pipeline 150us 16", mkDoLimitBench(150*time.Microsecond, 16)) - b.Run("pipeline 300us 16", mkDoLimitBench(300*time.Microsecond, 16)) + /* + b.Run("pipeline 35us 1", mkDoLimitBench(35*time.Microsecond, 1)) + b.Run("pipeline 75us 1", mkDoLimitBench(75*time.Microsecond, 1)) + b.Run("pipeline 150us 1", mkDoLimitBench(150*time.Microsecond, 1)) + b.Run("pipeline 300us 1", mkDoLimitBench(300*time.Microsecond, 1)) + + b.Run("pipeline 35us 2", mkDoLimitBench(35*time.Microsecond, 2)) + b.Run("pipeline 75us 2", mkDoLimitBench(75*time.Microsecond, 2)) + b.Run("pipeline 150us 2", mkDoLimitBench(150*time.Microsecond, 2)) + b.Run("pipeline 300us 2", mkDoLimitBench(300*time.Microsecond, 2)) + + b.Run("pipeline 35us 4", mkDoLimitBench(35*time.Microsecond, 4)) + b.Run("pipeline 75us 4", mkDoLimitBench(75*time.Microsecond, 4)) + b.Run("pipeline 150us 4", mkDoLimitBench(150*time.Microsecond, 4)) + b.Run("pipeline 300us 4", mkDoLimitBench(300*time.Microsecond, 4)) + + b.Run("pipeline 35us 8", mkDoLimitBench(35*time.Microsecond, 8)) + b.Run("pipeline 75us 8", mkDoLimitBench(75*time.Microsecond, 8)) + b.Run("pipeline 150us 8", mkDoLimitBench(150*time.Microsecond, 8)) + b.Run("pipeline 300us 8", mkDoLimitBench(300*time.Microsecond, 8)) + + b.Run("pipeline 35us 16", mkDoLimitBench(35*time.Microsecond, 16)) + b.Run("pipeline 75us 16", mkDoLimitBench(75*time.Microsecond, 16)) + b.Run("pipeline 150us 16", mkDoLimitBench(150*time.Microsecond, 16)) + b.Run("pipeline 300us 16", mkDoLimitBench(300*time.Microsecond, 16)) + */ } diff --git a/test/redis/cache_impl_test.go b/test/redis/cache_impl_test.go index 7c7951e1..e76f1d7d 100644 --- a/test/redis/cache_impl_test.go +++ b/test/redis/cache_impl_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/coocood/freecache" + "github.com/mediocregopher/radix/v3" pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" "github.com/envoyproxy/ratelimit/src/config" @@ -27,6 +28,10 @@ func TestRedis(t *testing.T) { t.Run("WithPerSecondRedis", testRedis(true)) } +func pipeAppend(pipeline redis.Pipeline, rcv interface{}, cmd, key string, args ...interface{}) redis.Pipeline { + return append(pipeline, radix.FlatCmd(rcv, cmd, key, args...)) +} + func testRedis(usePerSecondRedis bool) func(*testing.T) { return func(t *testing.T) { assert := assert.New(t) @@ -52,8 +57,9 @@ func testRedis(usePerSecondRedis bool) func(*testing.T) { clientUsed = client } - clientUsed.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key_value_1234", uint32(1)).SetArg(0, uint32(5)) - clientUsed.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key_value_1234", int64(1)) + clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key_value_1234", uint32(1)).SetArg(1, uint32(5)).DoAndReturn(pipeAppend) + clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key_value_1234", int64(1)).DoAndReturn(pipeAppend) + clientUsed.EXPECT().PipeDo(gomock.Any()).Return(nil) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value", statsStore)} @@ -67,9 +73,10 @@ func testRedis(usePerSecondRedis bool) func(*testing.T) { clientUsed = client timeSource.EXPECT().UnixNow().Return(int64(1234)) - clientUsed.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key2_value2_subkey2_subvalue2_1200", uint32(1)).SetArg(0, uint32(11)) - clientUsed.EXPECT().DoCmd(gomock.Any(), - "EXPIRE", "domain_key2_value2_subkey2_subvalue2_1200", int64(60)) + clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key2_value2_subkey2_subvalue2_1200", uint32(1)).SetArg(1, uint32(11)).DoAndReturn(pipeAppend) + clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), + "EXPIRE", "domain_key2_value2_subkey2_subvalue2_1200", int64(60)).DoAndReturn(pipeAppend) + clientUsed.EXPECT().PipeDo(gomock.Any()).Return(nil) request = common.NewRateLimitRequest( "domain", @@ -90,12 +97,13 @@ func testRedis(usePerSecondRedis bool) func(*testing.T) { clientUsed = client timeSource.EXPECT().UnixNow().Return(int64(1000000)) - clientUsed.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key3_value3_997200", uint32(1)).SetArg(0, uint32(11)) - clientUsed.EXPECT().DoCmd(gomock.Any(), - "EXPIRE", "domain_key3_value3_997200", int64(3600)) - clientUsed.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key3_value3_subkey3_subvalue3_950400", uint32(1)).SetArg(0, uint32(13)) - clientUsed.EXPECT().DoCmd(gomock.Any(), - "EXPIRE", "domain_key3_value3_subkey3_subvalue3_950400", int64(86400)) + clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key3_value3_997200", uint32(1)).SetArg(1, uint32(11)).DoAndReturn(pipeAppend) + clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), + "EXPIRE", "domain_key3_value3_997200", int64(3600)).DoAndReturn(pipeAppend) + clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key3_value3_subkey3_subvalue3_950400", uint32(1)).SetArg(1, uint32(13)).DoAndReturn(pipeAppend) + clientUsed.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), + "EXPIRE", "domain_key3_value3_subkey3_subvalue3_950400", int64(86400)).DoAndReturn(pipeAppend) + clientUsed.EXPECT().PipeDo(gomock.Any()).Return(nil) request = common.NewRateLimitRequest( "domain", From 109d8fd6de28496842033df5e180fba6174d240d Mon Sep 17 00:00:00 2001 From: Tong Cai Date: Fri, 7 Aug 2020 13:25:39 +0800 Subject: [PATCH 2/4] bootstrap settings: redis client pipeline settings default to Signed-off-by: Tong Cai --- src/settings/settings.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/settings/settings.go b/src/settings/settings.go index 971ff60c..78e90242 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -27,16 +27,16 @@ type Settings struct { RedisPoolSize int `envconfig:"REDIS_POOL_SIZE" default:"10"` RedisAuth string `envconfig:"REDIS_AUTH" default:""` RedisTls bool `envconfig:"REDIS_TLS" default:"false"` - RedisPipelineWindow time.Duration `envconfig:"REDIS_PIPELINE_WINDOW" default:"75µs"` - RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"8"` + RedisPipelineWindow time.Duration `envconfig:"REDIS_PIPELINE_WINDOW" default:"0"` + RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"` RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"` RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"` RedisPerSecondAuth string `envconfig:"REDIS_PERSECOND_AUTH" default:""` RedisPerSecondTls bool `envconfig:"REDIS_PERSECOND_TLS" default:"false"` - RedisPerSecondPipelineWindow time.Duration `envconfig:"REDIS_PERSECOND_PIPELINE_WINDOW" default:"75µs"` - RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"8"` + RedisPerSecondPipelineWindow time.Duration `envconfig:"REDIS_PERSECOND_PIPELINE_WINDOW" default:"0"` + RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"` ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"` LocalCacheSizeInBytes int `envconfig:"LOCAL_CACHE_SIZE_IN_BYTES" default:"0"` } From 47eb979d44fedaf4c24ef7edcd8a451088d33f4d Mon Sep 17 00:00:00 2001 From: Tong Cai Date: Wed, 12 Aug 2020 02:18:48 +0800 Subject: [PATCH 3/4] redis: fix unit tests Signed-off-by: Tong Cai --- src/redis/cache_impl.go | 2 +- src/redis/driver.go | 5 +- src/redis/driver_impl.go | 4 + test/mocks/redis/redis.go | 14 +++ test/redis/bench_test.go | 52 ++++---- test/redis/cache_impl_test.go | 230 ++++++++++++++++++++++++---------- 6 files changed, 210 insertions(+), 97 deletions(-) diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index 50248cd8..22528e49 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -112,7 +112,7 @@ func (this *rateLimitCacheImpl) DoLimit( checkError(this.client.PipeDo(pipeline)) } if perSecondPipeline != nil { - checkError(this.perSecondClient.PipeDo(pipeline)) + checkError(this.perSecondClient.PipeDo(perSecondPipeline)) } // Now fetch the pipeline. diff --git a/src/redis/driver.go b/src/redis/driver.go index f007104b..7ffc0c7b 100644 --- a/src/redis/driver.go +++ b/src/redis/driver.go @@ -19,7 +19,7 @@ type Client interface { // @param args supplies the additional arguments. DoCmd(rcv interface{}, cmd, key string, args ...interface{}) error - // Pipe append a command onto the pipeline queue. + // PipeAppend append a command onto the pipeline queue. // // @param pipeline supplies the queue for pending commands. // @param rcv supplies receiver for the result. @@ -41,6 +41,9 @@ type Client interface { // NumActiveConns return number of active connections, used in testing. NumActiveConns() int + + // ImplicitPipeliningEnabled return true if implicit pipelining is enabled. + ImplicitPipeliningEnabled() bool } type Pipeline []radix.CmdAction diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 7d1d2b10..07706895 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -132,3 +132,7 @@ func (c *clientImpl) PipeDo(pipeline Pipeline) error { return c.client.Do(radix.Pipeline(pipeline...)) } + +func (c *clientImpl) ImplicitPipeliningEnabled() bool { + return c.implicitPipelining +} diff --git a/test/mocks/redis/redis.go b/test/mocks/redis/redis.go index 938daa1e..032b500d 100644 --- a/test/mocks/redis/redis.go +++ b/test/mocks/redis/redis.go @@ -66,6 +66,20 @@ func (mr *MockClientMockRecorder) DoCmd(arg0, arg1, arg2 interface{}, arg3 ...in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoCmd", reflect.TypeOf((*MockClient)(nil).DoCmd), varargs...) } +// ImplicitPipeliningEnabled mocks base method +func (m *MockClient) ImplicitPipeliningEnabled() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ImplicitPipeliningEnabled") + ret0, _ := ret[0].(bool) + return ret0 +} + +// ImplicitPipeliningEnabled indicates an expected call of ImplicitPipeliningEnabled +func (mr *MockClientMockRecorder) ImplicitPipeliningEnabled() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImplicitPipeliningEnabled", reflect.TypeOf((*MockClient)(nil).ImplicitPipeliningEnabled)) +} + // NumActiveConns mocks base method func (m *MockClient) NumActiveConns() int { m.ctrl.T.Helper() diff --git a/test/redis/bench_test.go b/test/redis/bench_test.go index 37d1468d..8945f870 100644 --- a/test/redis/bench_test.go +++ b/test/redis/bench_test.go @@ -18,7 +18,7 @@ import ( ) func BenchmarkParallelDoLimit(b *testing.B) { - //b.Skip("Skip benchmark") + b.Skip("Skip benchmark") b.ReportAllocs() @@ -67,30 +67,28 @@ func BenchmarkParallelDoLimit(b *testing.B) { b.Run("no pipeline", mkDoLimitBench(0, 0)) - /* - b.Run("pipeline 35us 1", mkDoLimitBench(35*time.Microsecond, 1)) - b.Run("pipeline 75us 1", mkDoLimitBench(75*time.Microsecond, 1)) - b.Run("pipeline 150us 1", mkDoLimitBench(150*time.Microsecond, 1)) - b.Run("pipeline 300us 1", mkDoLimitBench(300*time.Microsecond, 1)) - - b.Run("pipeline 35us 2", mkDoLimitBench(35*time.Microsecond, 2)) - b.Run("pipeline 75us 2", mkDoLimitBench(75*time.Microsecond, 2)) - b.Run("pipeline 150us 2", mkDoLimitBench(150*time.Microsecond, 2)) - b.Run("pipeline 300us 2", mkDoLimitBench(300*time.Microsecond, 2)) - - b.Run("pipeline 35us 4", mkDoLimitBench(35*time.Microsecond, 4)) - b.Run("pipeline 75us 4", mkDoLimitBench(75*time.Microsecond, 4)) - b.Run("pipeline 150us 4", mkDoLimitBench(150*time.Microsecond, 4)) - b.Run("pipeline 300us 4", mkDoLimitBench(300*time.Microsecond, 4)) - - b.Run("pipeline 35us 8", mkDoLimitBench(35*time.Microsecond, 8)) - b.Run("pipeline 75us 8", mkDoLimitBench(75*time.Microsecond, 8)) - b.Run("pipeline 150us 8", mkDoLimitBench(150*time.Microsecond, 8)) - b.Run("pipeline 300us 8", mkDoLimitBench(300*time.Microsecond, 8)) - - b.Run("pipeline 35us 16", mkDoLimitBench(35*time.Microsecond, 16)) - b.Run("pipeline 75us 16", mkDoLimitBench(75*time.Microsecond, 16)) - b.Run("pipeline 150us 16", mkDoLimitBench(150*time.Microsecond, 16)) - b.Run("pipeline 300us 16", mkDoLimitBench(300*time.Microsecond, 16)) - */ + b.Run("pipeline 35us 1", mkDoLimitBench(35*time.Microsecond, 1)) + b.Run("pipeline 75us 1", mkDoLimitBench(75*time.Microsecond, 1)) + b.Run("pipeline 150us 1", mkDoLimitBench(150*time.Microsecond, 1)) + b.Run("pipeline 300us 1", mkDoLimitBench(300*time.Microsecond, 1)) + + b.Run("pipeline 35us 2", mkDoLimitBench(35*time.Microsecond, 2)) + b.Run("pipeline 75us 2", mkDoLimitBench(75*time.Microsecond, 2)) + b.Run("pipeline 150us 2", mkDoLimitBench(150*time.Microsecond, 2)) + b.Run("pipeline 300us 2", mkDoLimitBench(300*time.Microsecond, 2)) + + b.Run("pipeline 35us 4", mkDoLimitBench(35*time.Microsecond, 4)) + b.Run("pipeline 75us 4", mkDoLimitBench(75*time.Microsecond, 4)) + b.Run("pipeline 150us 4", mkDoLimitBench(150*time.Microsecond, 4)) + b.Run("pipeline 300us 4", mkDoLimitBench(300*time.Microsecond, 4)) + + b.Run("pipeline 35us 8", mkDoLimitBench(35*time.Microsecond, 8)) + b.Run("pipeline 75us 8", mkDoLimitBench(75*time.Microsecond, 8)) + b.Run("pipeline 150us 8", mkDoLimitBench(150*time.Microsecond, 8)) + b.Run("pipeline 300us 8", mkDoLimitBench(300*time.Microsecond, 8)) + + b.Run("pipeline 35us 16", mkDoLimitBench(35*time.Microsecond, 16)) + b.Run("pipeline 75us 16", mkDoLimitBench(75*time.Microsecond, 16)) + b.Run("pipeline 150us 16", mkDoLimitBench(150*time.Microsecond, 16)) + b.Run("pipeline 300us 16", mkDoLimitBench(300*time.Microsecond, 16)) } diff --git a/test/redis/cache_impl_test.go b/test/redis/cache_impl_test.go index e76f1d7d..65a9d204 100644 --- a/test/redis/cache_impl_test.go +++ b/test/redis/cache_impl_test.go @@ -179,9 +179,10 @@ func TestOverLimitWithLocalCache(t *testing.T) { // Test Near Limit Stats. Under Near Limit Ratio timeSource.EXPECT().UnixNow().Return(int64(1000000)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(11)) - client.EXPECT().DoCmd(gomock.Any(), - "EXPIRE", "domain_key4_value4_997200", int64(3600)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(1, uint32(11)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), + "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key4", "value4"}}}, 1) @@ -202,9 +203,10 @@ func TestOverLimitWithLocalCache(t *testing.T) { // Test Near Limit Stats. At Near Limit Ratio, still OK timeSource.EXPECT().UnixNow().Return(int64(1000000)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(13)) - client.EXPECT().DoCmd(gomock.Any(), - "EXPIRE", "domain_key4_value4_997200", int64(3600)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(1, uint32(13)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), + "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -220,9 +222,10 @@ func TestOverLimitWithLocalCache(t *testing.T) { // Test Over limit stats timeSource.EXPECT().UnixNow().Return(int64(1000000)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(16)) - client.EXPECT().DoCmd(gomock.Any(), - "EXPIRE", "domain_key4_value4_997200", int64(3600)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(1, uint32(16)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), + "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -238,8 +241,8 @@ func TestOverLimitWithLocalCache(t *testing.T) { // Test Over limit stats with local cache timeSource.EXPECT().UnixNow().Return(int64(1000000)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).Times(0) - client.EXPECT().DoCmd(gomock.Any(), + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).Times(0) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).Times(0) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -266,9 +269,10 @@ func TestNearLimit(t *testing.T) { // Test Near Limit Stats. Under Near Limit Ratio timeSource.EXPECT().UnixNow().Return(int64(1000000)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(11)) - client.EXPECT().DoCmd(gomock.Any(), - "EXPIRE", "domain_key4_value4_997200", int64(3600)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(1, uint32(11)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), + "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key4", "value4"}}}, 1) @@ -285,9 +289,10 @@ func TestNearLimit(t *testing.T) { // Test Near Limit Stats. At Near Limit Ratio, still OK timeSource.EXPECT().UnixNow().Return(int64(1000000)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(13)) - client.EXPECT().DoCmd(gomock.Any(), - "EXPIRE", "domain_key4_value4_997200", int64(3600)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(1, uint32(13)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), + "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -300,9 +305,10 @@ func TestNearLimit(t *testing.T) { // Test Near Limit Stats. We went OVER_LIMIT, but the near_limit counter only increases // when we are near limit, not after we have passed the limit. timeSource.EXPECT().UnixNow().Return(int64(1000000)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(16)) - client.EXPECT().DoCmd(gomock.Any(), - "EXPIRE", "domain_key4_value4_997200", int64(3600)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(1, uint32(16)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), + "EXPIRE", "domain_key4_value4_997200", int64(3600)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -315,8 +321,9 @@ func TestNearLimit(t *testing.T) { // Now test hitsAddend that is greater than 1 // All of it under limit, under near limit timeSource.EXPECT().UnixNow().Return(int64(1234)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key5_value5_1234", uint32(3)).SetArg(0, uint32(5)) - client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key5_value5_1234", int64(1)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key5_value5_1234", uint32(3)).SetArg(1, uint32(5)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key5_value5_1234", int64(1)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key5", "value5"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key5_value5", statsStore)} @@ -330,8 +337,9 @@ func TestNearLimit(t *testing.T) { // All of it under limit, some over near limit timeSource.EXPECT().UnixNow().Return(int64(1234)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key6_value6_1234", uint32(2)).SetArg(0, uint32(7)) - client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key6_value6_1234", int64(1)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key6_value6_1234", uint32(2)).SetArg(1, uint32(7)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key6_value6_1234", int64(1)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key6", "value6"}}}, 2) limits = []*config.RateLimit{config.NewRateLimit(8, pb.RateLimitResponse_RateLimit_SECOND, "key6_value6", statsStore)} @@ -345,8 +353,9 @@ func TestNearLimit(t *testing.T) { // All of it under limit, all of it over near limit timeSource.EXPECT().UnixNow().Return(int64(1234)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key7_value7_1234", uint32(3)).SetArg(0, uint32(19)) - client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key7_value7_1234", int64(1)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key7_value7_1234", uint32(3)).SetArg(1, uint32(19)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key7_value7_1234", int64(1)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key7", "value7"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key7_value7", statsStore)} @@ -360,8 +369,9 @@ func TestNearLimit(t *testing.T) { // Some of it over limit, all of it over near limit timeSource.EXPECT().UnixNow().Return(int64(1234)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key8_value8_1234", uint32(3)).SetArg(0, uint32(22)) - client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key8_value8_1234", int64(1)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key8_value8_1234", uint32(3)).SetArg(1, uint32(22)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key8_value8_1234", int64(1)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key8", "value8"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key8_value8", statsStore)} @@ -375,8 +385,9 @@ func TestNearLimit(t *testing.T) { // Some of it in all three places timeSource.EXPECT().UnixNow().Return(int64(1234)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key9_value9_1234", uint32(7)).SetArg(0, uint32(22)) - client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key9_value9_1234", int64(1)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key9_value9_1234", uint32(7)).SetArg(1, uint32(22)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key9_value9_1234", int64(1)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key9", "value9"}}}, 7) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key9_value9", statsStore)} @@ -390,8 +401,9 @@ func TestNearLimit(t *testing.T) { // all of it over limit timeSource.EXPECT().UnixNow().Return(int64(1234)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key10_value10_1234", uint32(3)).SetArg(0, uint32(30)) - client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key10_value10_1234", int64(1)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key10_value10_1234", uint32(3)).SetArg(1, uint32(30)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key10_value10_1234", int64(1)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key10", "value10"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key10_value10", statsStore)} @@ -417,8 +429,9 @@ func TestRedisWithJitter(t *testing.T) { timeSource.EXPECT().UnixNow().Return(int64(1234)) jitterSource.EXPECT().Int63().Return(int64(100)) - client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key_value_1234", uint32(1)).SetArg(0, uint32(5)) - client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key_value_1234", int64(101)) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "INCRBY", "domain_key_value_1234", uint32(1)).SetArg(1, uint32(5)).DoAndReturn(pipeAppend) + client.EXPECT().PipeAppend(gomock.Any(), gomock.Any(), "EXPIRE", "domain_key_value_1234", int64(101)).DoAndReturn(pipeAppend) + client.EXPECT().PipeDo(gomock.Any()).Return(nil) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value", statsStore)} @@ -451,53 +464,73 @@ func expectPanicError(t *testing.T, f assert.PanicTestFunc) (result error) { return } -func TestNewClientImpl(t *testing.T) { - redisAuth := "123" - statsStore := stats.NewStore(stats.NewNullSink(), false) +func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) func(t *testing.T) { + return func(t *testing.T) { + redisAuth := "123" + statsStore := stats.NewStore(stats.NewNullSink(), false) - mkRedisClient := func(auth, addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, auth, addr, 1, 1*time.Millisecond, 1) - } + mkRedisClient := func(auth, addr string) redis.Client { + return redis.NewClientImpl(statsStore, false, auth, addr, 1, pipelineWindow, pipelineLimit) + } - t.Run("connection refused", func(t *testing.T) { - // It's possible there is a redis server listening on 6379 in ci environment, so - // use a random port. - panicErr := expectPanicError(t, func() { mkRedisClient("", "localhost:12345") }) - assert.Contains(t, panicErr.Error(), "connection refused") - }) + t.Run("connection refused", func(t *testing.T) { + // It's possible there is a redis server listening on 6379 in ci environment, so + // use a random port. + panicErr := expectPanicError(t, func() { mkRedisClient("", "localhost:12345") }) + assert.Contains(t, panicErr.Error(), "connection refused") + }) - t.Run("ok", func(t *testing.T) { - redisSrv := mustNewRedisServer() - defer redisSrv.Close() + t.Run("ok", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() - var client redis.Client - assert.NotPanics(t, func() { - client = mkRedisClient("", redisSrv.Addr()) + var client redis.Client + assert.NotPanics(t, func() { + client = mkRedisClient("", redisSrv.Addr()) + }) + assert.NotNil(t, client) }) - assert.NotNil(t, client) - }) - t.Run("auth fail", func(t *testing.T) { - redisSrv := mustNewRedisServer() - defer redisSrv.Close() + t.Run("auth fail", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() - redisSrv.RequireAuth(redisAuth) + redisSrv.RequireAuth(redisAuth) - assert.PanicsWithError(t, "NOAUTH Authentication required.", func() { - mkRedisClient("", redisSrv.Addr()) + assert.PanicsWithError(t, "NOAUTH Authentication required.", func() { + mkRedisClient("", redisSrv.Addr()) + }) }) - }) - t.Run("auth pass", func(t *testing.T) { - redisSrv := mustNewRedisServer() - defer redisSrv.Close() + t.Run("auth pass", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() - redisSrv.RequireAuth(redisAuth) + redisSrv.RequireAuth(redisAuth) - assert.NotPanics(t, func() { - mkRedisClient(redisAuth, redisSrv.Addr()) + assert.NotPanics(t, func() { + mkRedisClient(redisAuth, redisSrv.Addr()) + }) }) - }) + + t.Run("ImplicitPipeliningEnabled() return expected value", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() + + client := mkRedisClient("", redisSrv.Addr()) + + if pipelineWindow == 0 && pipelineLimit == 0 { + assert.False(t, client.ImplicitPipeliningEnabled()) + } else { + assert.True(t, client.ImplicitPipeliningEnabled()) + } + }) + } +} + +func TestNewClientImpl(t *testing.T) { + t.Run("ImplicitPipeliningEnabled", testNewClientImpl(t, 2*time.Millisecond, 2)) + t.Run("ImplicitPipeliningDisabled", testNewClientImpl(t, 0, 0)) } func TestDoCmd(t *testing.T) { @@ -543,3 +576,64 @@ func TestDoCmd(t *testing.T) { assert.EqualError(t, client.DoCmd(nil, "GET", "foo"), "EOF") }) } + +func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) func(t *testing.T) { + return func(t *testing.T) { + statsStore := stats.NewStore(stats.NewNullSink(), false) + + mkRedisClient := func(addr string) redis.Client { + return redis.NewClientImpl(statsStore, false, "", addr, 1, pipelineWindow, pipelineLimit) + } + + t.Run("SETGET ok", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() + + client := mkRedisClient(redisSrv.Addr()) + var res string + + pipeline := redis.Pipeline{} + pipeline = client.PipeAppend(pipeline, nil, "SET", "foo", "bar") + pipeline = client.PipeAppend(pipeline, &res, "GET", "foo") + + assert.Nil(t, client.PipeDo(pipeline)) + assert.Equal(t, "bar", res) + }) + + t.Run("INCRBY ok", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() + + client := mkRedisClient(redisSrv.Addr()) + var res uint32 + hits := uint32(1) + + assert.Nil(t, client.PipeDo(client.PipeAppend(redis.Pipeline{}, &res, "INCRBY", "a", hits))) + assert.Equal(t, hits, res) + + assert.Nil(t, client.PipeDo(client.PipeAppend(redis.Pipeline{}, &res, "INCRBY", "a", hits))) + assert.Equal(t, uint32(2), res) + }) + + t.Run("connection broken", func(t *testing.T) { + redisSrv := mustNewRedisServer() + client := mkRedisClient(redisSrv.Addr()) + + assert.Nil(t, nil, client.PipeDo(client.PipeAppend(redis.Pipeline{}, nil, "SET", "foo", "bar"))) + + redisSrv.Close() + + expectErrContainEOF := func(t *testing.T, err error) { + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "EOF") + } + + expectErrContainEOF(t, client.PipeDo(client.PipeAppend(redis.Pipeline{}, nil, "GET", "foo"))) + }) + } +} + +func TestPipeDo(t *testing.T) { + t.Run("ImplicitPipeliningEnabled", testPipeDo(t, 10*time.Millisecond, 2)) + t.Run("ImplicitPipeliningDisabled", testPipeDo(t, 0, 0)) +} From edb278b6adbe972393bb79feb677e498d6ae3710 Mon Sep 17 00:00:00 2001 From: Tong Cai Date: Wed, 12 Aug 2020 02:32:00 +0800 Subject: [PATCH 4/4] update doc Signed-off-by: Tong Cai --- README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 944e66b2..e2f45e36 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ - [Debug Port](#debug-port) - [Local Cache](#local-cache) - [Redis](#redis) + - [Pipelining](#pipelining) - [One Redis Instance](#one-redis-instance) - [Two Redis Instances](#two-redis-instances) - [Contact](#contact) @@ -447,7 +448,11 @@ As well Ratelimit supports TLS connections and authentication. These can be conf 1. `REDIS_TLS` & `REDIS_PERSECOND_TLS`: set to `"true"` to enable a TLS connection for the specific connection type. 1. `REDIS_AUTH` & `REDIS_PERSECOND_AUTH`: set to `"password"` to enable authentication to the redis host. -Ratelimit use [implicit pipelining](https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L238) to send requests to redis. Pipelining can be configured using the following environment variables: +## Pipelining + +By default, for each request, ratelimit will pick up a connection from pool, wirte multiple redis commands in a single write then reads their responses in a single read. This reduces network delay. + +For high throughput scenarios, ratelimit also support [implicit pipelining](https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L238) . It can be configured using the following environment variables: 1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: sets the duration after which internal pipelines will be flushed. If window is zero then implicit pipelining will be disabled.