Skip to content

Commit

Permalink
feat: allow disabling global distributed locker per job (#811)
Browse files Browse the repository at this point in the history
* chore: fix distributed locker tests

* feat: allow disabling global dist locker per job
  • Loading branch information
seinshah authored Jan 3, 2025
1 parent bf75107 commit f5a5a2d
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 19 deletions.
4 changes: 2 additions & 2 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.incrementJobCounter(j, Skip)
return
}
} else if j.locker != nil {
} else if !j.disabledLocker && j.locker != nil {
lock, err := j.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
Expand All @@ -379,7 +379,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
} else if e.locker != nil {
} else if !j.disabledLocker && e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
Expand Down
11 changes: 11 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type internalJob struct {
afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error)
afterJobRunsWithPanic func(jobID uuid.UUID, jobName string, recoverData any)
afterLockError func(jobID uuid.UUID, jobName string, err error)
disabledLocker bool

locker Locker
}
Expand Down Expand Up @@ -556,6 +557,16 @@ func WithDistributedJobLocker(locker Locker) JobOption {
}
}

// WithDisabledDistributedJobLocker disables the distributed job locker.
// This is useful when a global distributed locker has been set on the scheduler
// level using WithDistributedLocker and need to be disabled for specific jobs.
func WithDisabledDistributedJobLocker(disabled bool) JobOption {
return func(j *internalJob, _ time.Time) error {
j.disabledLocker = disabled
return nil
}
}

// WithEventListeners sets the event listeners that should be
// run for the job.
func WithEventListeners(eventListeners ...EventListener) JobOption {
Expand Down
2 changes: 2 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,8 @@ func WithDistributedElector(elector Elector) SchedulerOption {
// WithDistributedLocker sets the locker to be used by multiple
// Scheduler instances to ensure that only one instance of each
// job is run.
// To disable this global locker for specific jobs, see
// WithDisabledDistributedJobLocker.
func WithDistributedLocker(locker Locker) SchedulerOption {
return func(s *scheduler) error {
if locker == nil {
Expand Down
83 changes: 66 additions & 17 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1452,13 +1452,15 @@ func TestScheduler_WithDistributed(t *testing.T) {
tests := []struct {
name string
count int
runCount int
schedulerOpts []SchedulerOption
jobOpts []JobOption
assertions func(*testing.T)
}{
{
"3 schedulers with elector",
3,
1,
[]SchedulerOption{
WithDistributedElector(&testElector{notLeader: notLeader}),
},
Expand All @@ -1482,6 +1484,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
{
"3 schedulers with locker",
3,
1,
[]SchedulerOption{
WithDistributedLocker(&testLocker{notLocked: notLocked}),
},
Expand All @@ -1499,11 +1502,14 @@ func TestScheduler_WithDistributed(t *testing.T) {
default:
}
}

assert.Equal(t, 2, notLockedCount)
},
},
{
"3 schedulers and job with Distributed locker",
3,
1,
nil,
[]JobOption{
WithDistributedJobLocker(&testLocker{notLocked: notLocked}),
Expand All @@ -1521,6 +1527,35 @@ func TestScheduler_WithDistributed(t *testing.T) {
default:
}
}

assert.Equal(t, 2, notLockedCount)
},
},
{
"3 schedulers and job with disabled Distributed locker",
3,
3,
[]SchedulerOption{
WithDistributedLocker(&testLocker{notLocked: notLocked}),
},
[]JobOption{
WithDisabledDistributedJobLocker(true),
},
func(_ *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLockedCount int
for {
if time.Now().After(timeout) {
break
}
select {
case <-notLocked:
notLockedCount++
default:
}
}

assert.Equal(t, 0, notLockedCount)
},
},
}
Expand All @@ -1531,6 +1566,11 @@ func TestScheduler_WithDistributed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
schedulersDone := make(chan struct{}, tt.count)

var (
runCount int
doneCount int
)

for i := tt.count; i > 0; i-- {
s := newTestScheduler(t,
tt.schedulerOpts...,
Expand All @@ -1539,6 +1579,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
WithStartAt(
WithStartImmediately(),
),
WithLimitedRuns(1),
}
jobOpts = append(jobOpts, tt.jobOpts...)

Expand All @@ -1565,31 +1606,39 @@ func TestScheduler_WithDistributed(t *testing.T) {
}()
}

var runCount int
select {
case <-jobsRan:
cancel()
runCount++
case <-time.After(time.Second):
cancel()
t.Error("timed out waiting for job to run")
RunCountLoop:
for {
select {
case <-jobsRan:
runCount++
if runCount >= tt.runCount {
break RunCountLoop
}
case <-time.After(time.Second):
t.Error("timed out waiting for job to run")
break RunCountLoop
}
}

var doneCount int
timeout := time.Now().Add(3 * time.Second)
for doneCount < tt.count && time.Now().After(timeout) {
cancel()
assert.Equal(t, tt.runCount, runCount)

DoneCountLoop:
for {
select {
case <-schedulersDone:
doneCount++
default:
if doneCount >= tt.count {
break DoneCountLoop
}
case <-time.After(3 * time.Second):
t.Error("timed out waiting for schedulers to shutdown")
break DoneCountLoop
}
}
close(jobsRan)
for range jobsRan {
runCount++
}

assert.Equal(t, 1, runCount)
assert.Equal(t, tt.count, doneCount)

time.Sleep(time.Second)
tt.assertions(t)
})
Expand Down

0 comments on commit f5a5a2d

Please sign in to comment.