Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support redis lock automatic renewal #55

Merged
merged 5 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
Addr: "localhost:6379",
}
redisClient := redis.NewClient(redisOptions)
locker, err := redislock.NewRedisLocker(redisClient, redislock.WithTries(1))
locker, err := redislock.NewRedisLocker(redisClient,time.Second * 2, redislock.WithTries(1))
if err != nil {
// handle the error
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func main() {
Addrs: []string{"localhost:6379"},
}
redisClient := redis.NewUniversalClient(redisOptions)
locker, err := redislock.NewRedisLocker(redisClient, redislock.WithTries(1))
locker, err := redislock.NewRedisLocker(redisClient,time.Second * 2, redislock.WithTries(1))
if err != nil {
// handle the error
}
Expand Down
46 changes: 36 additions & 10 deletions redislock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redislock
import (
"context"
"fmt"
"time"

"github.com/go-co-op/gocron"
"github.com/go-redsync/redsync/v4"
Expand All @@ -24,30 +25,31 @@ var (

// NewRedisLocker provides an implementation of the Locker interface using
// redis for storage.
func NewRedisLocker(r redis.UniversalClient, options ...redsync.Option) (gocron.Locker, error) {
func NewRedisLocker(r redis.UniversalClient, autoExtendDuration time.Duration, options ...redsync.Option) (gocron.Locker, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a breaking change (so let's not do it this way). An oversight on my part to have the options be ...resync.Option instead of a gocron.RedisLockerOption, to which we could just add WithAutoExtendDuration.

Instead, let's do it the way it should have been:

  • Add a new method func NewRedisLockerWithOptions(r redis.UniversalClient, options ...LockerOption) (gocron.Locker, error)
  • Define the LockerOption as type LockerOption func(*redisLocker) error
  • Then, we could define all the options, with Option appended to the name so they are unique, as LockerOption type
  • Then, also add WithAutoExtendDuration as an Option and implement it through there.

Let me know if you would like to complete this work. If not, I will take it on when I have time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some changes:

  1. Add LockOption, support WithAutoExtendDuration,WithRedsyncOptions to be compatible with redsync options
  2. Added NewRedisLockerWithOptions
  3. Modified TestAutoExtend

Please review it again.

if err := r.Ping(context.Background()).Err(); err != nil {
return nil, fmt.Errorf("%s: %w", gocron.ErrFailedToConnectToRedis, err)
}
return newLocker(r, options...), nil
return newLocker(r, autoExtendDuration, options...), nil
}

// NewRedisLockerAlways provides an implementation of the Locker interface using
// redis for storage, even if the connection fails.
func NewRedisLockerAlways(r redis.UniversalClient, options ...redsync.Option) (gocron.Locker, error) {
return newLocker(r, options...), r.Ping(context.Background()).Err()
func NewRedisLockerAlways(r redis.UniversalClient, autoExtendDuration time.Duration, options ...redsync.Option) (gocron.Locker, error) {
return newLocker(r, autoExtendDuration, options...), r.Ping(context.Background()).Err()
}

func newLocker(r redis.UniversalClient, options ...redsync.Option) gocron.Locker {
func newLocker(r redis.UniversalClient, autoExtendDuration time.Duration, options ...redsync.Option) gocron.Locker {
pool := goredis.NewPool(r)
rs := redsync.New(pool)
return &redisLocker{rs: rs, options: options}
return &redisLocker{rs: rs, autoExtendDuration: autoExtendDuration, options: options}
}

var _ gocron.Locker = (*redisLocker)(nil)

type redisLocker struct {
rs *redsync.Redsync
options []redsync.Option
rs *redsync.Redsync
options []redsync.Option
autoExtendDuration time.Duration
}

func (r *redisLocker) Lock(ctx context.Context, key string) (gocron.Lock, error) {
Expand All @@ -57,18 +59,27 @@ func (r *redisLocker) Lock(ctx context.Context, key string) (gocron.Lock, error)
return nil, gocron.ErrFailedToObtainLock
}
rl := &redisLock{
mu: mu,
mu: mu,
autoExtendDuration: r.autoExtendDuration,
done: make(chan struct{}),
}

if r.autoExtendDuration > 0 {
go func() { rl.doExtend() }()
}
return rl, nil
}

var _ gocron.Lock = (*redisLock)(nil)

type redisLock struct {
mu *redsync.Mutex
mu *redsync.Mutex
done chan struct{}
autoExtendDuration time.Duration
}

func (r *redisLock) Unlock(ctx context.Context) error {
close(r.done)
unlocked, err := r.mu.UnlockContext(ctx)
if err != nil {
return gocron.ErrFailedToReleaseLock
Expand All @@ -79,3 +90,18 @@ func (r *redisLock) Unlock(ctx context.Context) error {

return nil
}

func (r *redisLock) doExtend() {
ticker := time.NewTicker(r.autoExtendDuration)
for {
select {
case <-r.done:
return
case <-ticker.C:
_, err := r.mu.Extend()
if err != nil {
return
}
}
}
}
43 changes: 42 additions & 1 deletion redislock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestEnableDistributedLocking(t *testing.T) {
}

redisClient := redis.NewClient(&redis.Options{Addr: strings.TrimPrefix(uri, "redis://")})
l, err := NewRedisLocker(redisClient, WithTries(1))
l, err := NewRedisLocker(redisClient, 0, WithTries(1))
require.NoError(t, err)

s1 := gocron.NewScheduler(time.UTC)
Expand Down Expand Up @@ -67,3 +67,44 @@ func TestEnableDistributedLocking(t *testing.T) {
}
assert.Len(t, results, 4)
}

func TestAutoExtend(t *testing.T) {
ctx := context.Background()
redisContainer, err := testcontainersredis.RunContainer(ctx)
require.NoError(t, err)
t.Cleanup(func() {
if err := redisContainer.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

uri, err := redisContainer.ConnectionString(ctx)
require.NoError(t, err)

redisClient := redis.NewClient(&redis.Options{Addr: strings.TrimPrefix(uri, "redis://")})
l1, err := NewRedisLocker(redisClient, 0, WithTries(1))
_, err = l1.Lock(ctx, "test1")
require.NoError(t, err)

t.Logf("waiting 9 seconds for lock to expire")
// wait for the lock to expire
time.Sleep(9 * time.Second)

_, err = l1.Lock(ctx, "test1")
require.NoError(t, err)

// create auto extend lock
l2, err := NewRedisLocker(redisClient, time.Second*2, WithTries(1))
unlocker, err := l2.Lock(ctx, "test2")
require.NoError(t, err)

t.Log("waiting 9 seconds for lock to expire")
// wait for the lock to expire
time.Sleep(9 * time.Second)

_, err = l2.Lock(ctx, "test2")
require.Equal(t, gocron.ErrFailedToObtainLock, err)

err = unlocker.Unlock(ctx)
require.NoError(t, err)
}