Skip to content

Commit

Permalink
Merge branch 'redis:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
immersedin authored Jul 21, 2024
2 parents 2311396 + 3613df6 commit 8a6ca48
Show file tree
Hide file tree
Showing 12 changed files with 611 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: golangci-lint
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@v6
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ build:

testdata/redis:
mkdir -p $@
wget -qO- https://download.redis.io/releases/redis-7.4-rc1.tar.gz | tar xvz --strip-components=1 -C $@
wget -qO- https://download.redis.io/releases/redis-7.4-rc2.tar.gz | tar xvz --strip-components=1 -C $@

testdata/redis/src/redis-server: testdata/redis
cd $< && make all
Expand Down
55 changes: 33 additions & 22 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2486,35 +2486,40 @@ var _ = Describe("Commands", func() {
})

It("should HExpire", Label("hash-expiration", "NonRedisEnterprise"), func() {
res, err := client.HExpire(ctx, "no_such_key", 10, "field1", "field2", "field3").Result()
res, err := client.HExpire(ctx, "no_such_key", 10*time.Second, "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(res).To(BeEquivalentTo([]int64{-2, -2, -2}))

for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}

res, err = client.HExpire(ctx, "myhash", 10, "key1", "key2", "key200").Result()
res, err = client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, 1, -2}))
})

It("should HPExpire", Label("hash-expiration", "NonRedisEnterprise"), func() {
_, err := client.HPExpire(ctx, "no_such_key", 10, "field1", "field2", "field3").Result()
res, err := client.HPExpire(ctx, "no_such_key", 10*time.Second, "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(res).To(BeEquivalentTo([]int64{-2, -2, -2}))

for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}

res, err := client.HPExpire(ctx, "myhash", 10, "key1", "key2", "key200").Result()
res, err = client.HPExpire(ctx, "myhash", 10*time.Second, "key1", "key2", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, 1, -2}))
})

It("should HExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() {

_, err := client.HExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
resEmpty, err := client.HExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))

for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
Expand All @@ -2526,9 +2531,10 @@ var _ = Describe("Commands", func() {
})

It("should HPExpireAt", Label("hash-expiration", "NonRedisEnterprise"), func() {

_, err := client.HPExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
resEmpty, err := client.HPExpireAt(ctx, "no_such_key", time.Now().Add(10*time.Second), "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))

for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
Expand All @@ -2540,9 +2546,10 @@ var _ = Describe("Commands", func() {
})

It("should HPersist", Label("hash-expiration", "NonRedisEnterprise"), func() {

_, err := client.HPersist(ctx, "no_such_key", "field1", "field2", "field3").Result()
resEmpty, err := client.HPersist(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))

for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
Expand All @@ -2552,7 +2559,7 @@ var _ = Describe("Commands", func() {
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{-1, -1, -2}))

res, err = client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
res, err = client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))

Expand All @@ -2562,15 +2569,16 @@ var _ = Describe("Commands", func() {
})

It("should HExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() {

_, err := client.HExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
resEmpty, err := client.HExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))

for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}

res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))

Expand All @@ -2580,9 +2588,10 @@ var _ = Describe("Commands", func() {
})

It("should HPExpireTime", Label("hash-expiration", "NonRedisEnterprise"), func() {

_, err := client.HPExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
resEmpty, err := client.HPExpireTime(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))

for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
Expand All @@ -2599,15 +2608,16 @@ var _ = Describe("Commands", func() {
})

It("should HTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {

_, err := client.HTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
resEmpty, err := client.HTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))

for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}

res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))

Expand All @@ -2617,15 +2627,16 @@ var _ = Describe("Commands", func() {
})

It("should HPTTL", Label("hash-expiration", "NonRedisEnterprise"), func() {

_, err := client.HPTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
resEmpty, err := client.HPTTL(ctx, "no_such_key", "field1", "field2", "field3").Result()
Expect(err).To(BeNil())
Expect(resEmpty).To(BeEquivalentTo([]int64{-2, -2, -2}))

for i := 0; i < 100; i++ {
sadd := client.HSet(ctx, "myhash", fmt.Sprintf("key%d", i), "hello")
Expect(sadd.Err()).NotTo(HaveOccurred())
}

res, err := client.HExpire(ctx, "myhash", 10, "key1", "key200").Result()
res, err := client.HExpire(ctx, "myhash", 10*time.Second, "key1", "key200").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]int64{1, -2}))

Expand Down
17 changes: 15 additions & 2 deletions hash_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ type HashCmdable interface {
HVals(ctx context.Context, key string) *StringSliceCmd
HRandField(ctx context.Context, key string, count int) *StringSliceCmd
HRandFieldWithValues(ctx context.Context, key string, count int) *KeyValueSliceCmd
HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd
HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
HPExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd
HPExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
HExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd
HExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
HPExpireAt(ctx context.Context, key string, tm time.Time, fields ...string) *IntSliceCmd
HPExpireAtWithArgs(ctx context.Context, key string, tm time.Time, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd
HPersist(ctx context.Context, key string, fields ...string) *IntSliceCmd
HExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd
HPExpireTime(ctx context.Context, key string, fields ...string) *IntSliceCmd
HTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd
HPTTL(ctx context.Context, key string, fields ...string) *IntSliceCmd
}

func (c cmdable) HDel(ctx context.Context, key string, fields ...string) *IntCmd {
Expand Down Expand Up @@ -202,7 +215,7 @@ type HExpireArgs struct {
// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
// For more information - https://redis.io/commands/hexpire/
func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Duration, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIRE", key, expiration, "FIELDS", len(fields)}
args := []interface{}{"HEXPIRE", key, formatSec(ctx, expiration), "FIELDS", len(fields)}

for _, field := range fields {
args = append(args, field)
Expand All @@ -217,7 +230,7 @@ func (c cmdable) HExpire(ctx context.Context, key string, expiration time.Durati
// The command constructs an argument list starting with "HEXPIRE", followed by the key, duration, any conditional flags, and the specified fields.
// For more information - https://redis.io/commands/hexpire/
func (c cmdable) HExpireWithArgs(ctx context.Context, key string, expiration time.Duration, expirationArgs HExpireArgs, fields ...string) *IntSliceCmd {
args := []interface{}{"HEXPIRE", key, expiration}
args := []interface{}{"HEXPIRE", key, formatSec(ctx, expiration)}

// only if one argument is true, we can add it to the args
// if more than one argument is true, it will cause an error
Expand Down
5 changes: 5 additions & 0 deletions internal/pool/conn_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package pool

import (
"crypto/tls"
"errors"
"io"
"net"
Expand All @@ -16,6 +17,10 @@ func connCheck(conn net.Conn) error {
// Reset previous timeout.
_ = conn.SetDeadline(time.Time{})

// Check if tls.Conn.
if c, ok := conn.(*tls.Conn); ok {
conn = c.NetConn()
}
sysConn, ok := conn.(syscall.Conn)
if !ok {
return nil
Expand Down
18 changes: 18 additions & 0 deletions internal/pool/conn_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package pool

import (
"crypto/tls"
"net"
"net/http/httptest"
"time"
Expand All @@ -14,12 +15,17 @@ import (
var _ = Describe("tests conn_check with real conns", func() {
var ts *httptest.Server
var conn net.Conn
var tlsConn *tls.Conn
var err error

BeforeEach(func() {
ts = httptest.NewServer(nil)
conn, err = net.DialTimeout(ts.Listener.Addr().Network(), ts.Listener.Addr().String(), time.Second)
Expect(err).NotTo(HaveOccurred())
tlsTestServer := httptest.NewUnstartedServer(nil)
tlsTestServer.StartTLS()
tlsConn, err = tls.DialWithDialer(&net.Dialer{Timeout: time.Second}, tlsTestServer.Listener.Addr().Network(), tlsTestServer.Listener.Addr().String(), &tls.Config{InsecureSkipVerify: true})
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
Expand All @@ -33,11 +39,23 @@ var _ = Describe("tests conn_check with real conns", func() {
Expect(connCheck(conn)).To(HaveOccurred())
})

It("good tls conn check", func() {
Expect(connCheck(tlsConn)).NotTo(HaveOccurred())

Expect(tlsConn.Close()).NotTo(HaveOccurred())
Expect(connCheck(tlsConn)).To(HaveOccurred())
})

It("bad conn check", func() {
Expect(conn.Close()).NotTo(HaveOccurred())
Expect(connCheck(conn)).To(HaveOccurred())
})

It("bad tls conn check", func() {
Expect(tlsConn.Close()).NotTo(HaveOccurred())
Expect(connCheck(tlsConn)).To(HaveOccurred())
})

It("check conn deadline", func() {
Expect(conn.SetDeadline(time.Now())).NotTo(HaveOccurred())
time.Sleep(time.Millisecond * 10)
Expand Down
48 changes: 36 additions & 12 deletions osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ func (n *clusterNode) Close() error {
return n.Client.Close()
}

const maximumNodeLatency = 1 * time.Minute

func (n *clusterNode) updateLatency() {
const numProbe = 10
var dur uint64
Expand All @@ -361,7 +363,7 @@ func (n *clusterNode) updateLatency() {
if successes == 0 {
// If none of the pings worked, set latency to some arbitrarily high value so this node gets
// least priority.
latency = float64((1 * time.Minute) / time.Microsecond)
latency = float64((maximumNodeLatency) / time.Microsecond)
} else {
latency = float64(dur) / float64(successes)
}
Expand Down Expand Up @@ -735,20 +737,40 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
return c.nodes.Random()
}

var node *clusterNode
var allNodesFailing = true
var (
closestNonFailingNode *clusterNode
closestNode *clusterNode
minLatency time.Duration
)

// setting the max possible duration as zerovalue for minlatency
minLatency = time.Duration(math.MaxInt64)

for _, n := range nodes {
if n.Failing() {
continue
}
if node == nil || n.Latency() < node.Latency() {
node = n
if closestNode == nil || n.Latency() < minLatency {
closestNode = n
minLatency = n.Latency()
if !n.Failing() {
closestNonFailingNode = n
allNodesFailing = false
}
}
}
if node != nil {
return node, nil

// pick the healthly node with the lowest latency
if !allNodesFailing && closestNonFailingNode != nil {
return closestNonFailingNode, nil
}

// If all nodes are failing - return random node
// if all nodes are failing, we will pick the temporarily failing node with lowest latency
if minLatency < maximumNodeLatency && closestNode != nil {
internal.Logger.Printf(context.TODO(), "redis: all nodes are marked as failed, picking the temporarily failing node with lowest latency")
return closestNode, nil
}

// If all nodes are having the maximum latency(all pings are failing) - return a random node across the cluster
internal.Logger.Printf(context.TODO(), "redis: pings to all nodes are failing, picking a random node across the cluster")
return c.nodes.Random()
}

Expand Down Expand Up @@ -916,10 +938,13 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
slot := c.cmdSlot(ctx, cmd)
var node *clusterNode
var moved bool
var ask bool
var lastErr error
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
// MOVED and ASK responses are not transient errors that require retry delay; they
// should be attempted immediately.
if attempt > 0 && !moved && !ask {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
return err
}
Expand Down Expand Up @@ -963,7 +988,6 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
continue
}

var moved bool
var addr string
moved, ask, addr = isMovedError(lastErr)
if moved || ask {
Expand Down
Loading

0 comments on commit 8a6ca48

Please sign in to comment.