From 754539326db9c6a148910a58dc1184e5aa11f980 Mon Sep 17 00:00:00 2001 From: "justin.miron" Date: Thu, 9 Nov 2023 12:01:44 -0600 Subject: [PATCH 1/2] Only check latencies once every 10 seconds with `routeByLatency` `routeByLatency` currently checks latencies any time a server returns a MOVED or READONLY reply. When a shard is down, the ClusterClient chooses to issue the request to a random server, which returns a MOVED reply. This causes a state refresh and a latency update on all servers. This can lead to significant ping load to clusters with a large number of clients. This introduces logic to ping only once every 10 seconds, only performing a latency update on a node during the `GC` function if the latency was set later than 10 seconds ago. Fixes https://github.com/redis/go-redis/issues/2782 --- osscluster.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/osscluster.go b/osscluster.go index 93e0eef1e..2a7232778 100644 --- a/osscluster.go +++ b/osscluster.go @@ -21,6 +21,10 @@ import ( "github.com/redis/go-redis/v9/internal/rand" ) +const ( + minLatencyMeasurementInterval = 10 * time.Second +) + var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") // ClusterOptions are used to configure a cluster client and should be @@ -309,6 +313,8 @@ type clusterNode struct { latency uint32 // atomic generation uint32 // atomic failing uint32 // atomic + + lastLatencyMeasurement int64 } func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { @@ -359,6 +365,7 @@ func (n *clusterNode) updateLatency() { latency = float64(dur) / float64(successes) } atomic.StoreUint32(&n.latency, uint32(latency+0.5)) + n.SetLastLatencyMeasurement(time.Now()) } func (n *clusterNode) Latency() time.Duration { @@ -388,6 +395,10 @@ func (n *clusterNode) Generation() uint32 { return atomic.LoadUint32(&n.generation) } +func (n *clusterNode) LastLatencyMeasurement() int64 { + return atomic.LoadInt64(&n.lastLatencyMeasurement) +} + func (n *clusterNode) SetGeneration(gen uint32) { for { v := atomic.LoadUint32(&n.generation) @@ -397,6 +408,15 @@ func (n *clusterNode) SetGeneration(gen uint32) { } } +func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) { + for { + v := atomic.LoadInt64(&n.lastLatencyMeasurement) + if t.Unix() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.Unix()) { + break + } + } +} + //------------------------------------------------------------------------------ type clusterNodes struct { @@ -484,10 +504,11 @@ func (c *clusterNodes) GC(generation uint32) { c.mu.Lock() c.activeAddrs = c.activeAddrs[:0] + now := time.Now() for addr, node := range c.nodes { if node.Generation() >= generation { c.activeAddrs = append(c.activeAddrs, addr) - if c.opt.RouteByLatency { + if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).Unix() { go node.updateLatency() } continue From 97cee82e597aa7e2c95b866e7c2e9317f467acb5 Mon Sep 17 00:00:00 2001 From: "justin.miron" Date: Thu, 29 Feb 2024 12:26:40 -0600 Subject: [PATCH 2/2] use UnixNano instead of Unix for better precision --- osscluster.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/osscluster.go b/osscluster.go index 2a7232778..cb0c90983 100644 --- a/osscluster.go +++ b/osscluster.go @@ -314,7 +314,9 @@ type clusterNode struct { generation uint32 // atomic failing uint32 // atomic - lastLatencyMeasurement int64 + // last time the latency measurement was performed for the node, stored in nanoseconds + // from epoch + lastLatencyMeasurement int64 // atomic } func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { @@ -411,7 +413,7 @@ func (n *clusterNode) SetGeneration(gen uint32) { func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) { for { v := atomic.LoadInt64(&n.lastLatencyMeasurement) - if t.Unix() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.Unix()) { + if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) { break } } @@ -508,7 +510,7 @@ func (c *clusterNodes) GC(generation uint32) { for addr, node := range c.nodes { if node.Generation() >= generation { c.activeAddrs = append(c.activeAddrs, addr) - if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).Unix() { + if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() { go node.updateLatency() } continue