Skip to content

Commit

Permalink
improved error handling in On listener + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
naueramant committed Dec 22, 2022
1 parent 7424cde commit 1bce146
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 15 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/go-redis/redis/v8 v8.11.5
github.com/mitchellh/mapstructure v1.5.0
github.com/stretchr/testify v1.8.1
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
Expand Down
54 changes: 40 additions & 14 deletions schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ import (
)

const redisNamespace = "boomerang"

var (
ErrUnexpectedReturnCodeFromRedis = errors.New("unexpected return code from redis")
ErrTaskAlreadyExists = errors.New("task already exists")
ErrTaskDoesNotExist = errors.New("task does not exist")
ErrUnexpectedReturnCode = errors.New("unexpected return code from redis")
ErrUnexpectedReturnCodeType = errors.New("unexpected return code type from redis, expect integer")
ErrTaskAlreadyExists = errors.New("task already exists")
ErrTaskDoesNotExist = errors.New("task does not exist")
ErrTaskDataDoesNotExist = errors.New("task data does not exist")
ErrTaskDataInvalidFormat = errors.New("task data has invalid format, expected JSON")
)

type Schedule interface {
Expand Down Expand Up @@ -96,7 +100,7 @@ func (s *ScheduleImpl) Add(ctx context.Context, task *Task) error {
case -1:
return ErrTaskAlreadyExists
default:
return ErrUnexpectedReturnCodeFromRedis
return ErrUnexpectedReturnCode
}
}

Expand Down Expand Up @@ -160,7 +164,7 @@ func (s *ScheduleImpl) Update(ctx context.Context, task *Task) error {
case -1:
return ErrTaskDoesNotExist
default:
return ErrUnexpectedReturnCodeFromRedis
return ErrUnexpectedReturnCode
}
}

Expand Down Expand Up @@ -203,7 +207,7 @@ func (s *ScheduleImpl) Remove(ctx context.Context, kind string, id string) error
case -1:
return ErrTaskDoesNotExist
default:
return ErrUnexpectedReturnCodeFromRedis
return ErrUnexpectedReturnCode
}
}

Expand Down Expand Up @@ -250,7 +254,7 @@ func (s *ScheduleImpl) RunNow(ctx context.Context, kind string, id string) error
case -1:
return ErrTaskDoesNotExist
default:
return ErrUnexpectedReturnCodeFromRedis
return ErrUnexpectedReturnCode
}
}

Expand All @@ -267,6 +271,7 @@ func (s *ScheduleImpl) On(ctx context.Context, kind string, handler func(ctx con
local res = redis.call("ZPOPMIN", queueKey)
if #res == 0 then
-- Error: No tasks scheduled
return { -1 }
end
Expand All @@ -277,19 +282,23 @@ func (s *ScheduleImpl) On(ctx context.Context, kind string, handler func(ctx con
if score > (now + 1000) then
redis.call("ZADD", queueKey, score, id)
-- Error: Next task is scheduled for more than 1 second in the future
return { -1 }
end
-- Get the task data
local taskDataRaw = redis.call("HGET", taskDataKey, id)
if taskDataRaw == nil then
return { -1 }
-- Error: task data does not exist
return { -2 }
end
local taskData = cjson.decode(taskDataRaw)
if taskData == nil then
return { -1 }
-- Error: task data has invalid format
return { -3 }
end
-- Schedule the next execution
Expand Down Expand Up @@ -335,21 +344,38 @@ func (s *ScheduleImpl) On(ctx context.Context, kind string, handler func(ctx con
return err
}

if len(resSlice) != 3 {
code, ok := resSlice[0].(int64)
if !ok {
return ErrUnexpectedReturnCodeType
}

if code == -1 {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
case <-time.After(time.Second):
continue
}
}

id, ok := resSlice[0].(string)
if code == -2 {
return ErrTaskDataDoesNotExist
}

if code == -3 {
return ErrTaskDataInvalidFormat
}

if code != 0 {
return ErrUnexpectedReturnCode
}

id, ok := resSlice[1].(string)
if !ok {
return errors.New("unexpected type for id")
}

score, ok := resSlice[1].(int64)
score, ok := resSlice[2].(int64)
if !ok {
return errors.New("unexpected type for score")
}
Expand All @@ -359,7 +385,7 @@ func (s *ScheduleImpl) On(ctx context.Context, kind string, handler func(ctx con
time.Sleep(time.Duration(delta) * time.Millisecond)
}

taskDataRaw, ok := resSlice[2].(string)
taskDataRaw, ok := resSlice[3].(string)
if !ok {
return errors.New("unexpected type for taskDataRaw")
}
Expand Down
50 changes: 49 additions & 1 deletion schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
var testTask1 = NewTask(
"test",
"id",
1*time.Second,
10*time.Millisecond,
map[string]any{
"foo": "bar",
},
Expand Down Expand Up @@ -78,3 +78,51 @@ func TestScheduleImpl_Remove(t *testing.T) {
err = schedule.Remove(ctx, testTask1.Kind, testTask1.ID)
assert.NoError(t, err)
}

func TestScheduleImpl_RunNow(t *testing.T) {
t.Parallel()

ctx := context.Background()

schedule := newSchedule(t, ctx, 4)

err := schedule.RunNow(ctx, testTask1.Kind, testTask1.ID)
assert.ErrorIs(t, err, ErrTaskDoesNotExist)

err = schedule.Add(ctx, testTask1)
assert.NoError(t, err)

err = schedule.RunNow(ctx, testTask1.Kind, testTask1.ID)
assert.NoError(t, err)
}

func TestScheduleImpl_On(t *testing.T) {
t.Parallel()

ctx := context.Background()

schedule := newSchedule(t, ctx, 5)

// Test receiving a task.

ctxA, cancelA := context.WithTimeout(ctx, 1*time.Second)

err := schedule.Add(ctx, testTask1)
assert.NoError(t, err)

err = schedule.On(ctxA, testTask1.Kind, func(ctx context.Context, task *Task) {
cancelA()
})

assert.ErrorIs(t, err, context.Canceled)

// Test never receiving a task because it is of the wrong kind.

ctxB, cancelB := context.WithTimeout(ctx, 100*time.Millisecond)

err = schedule.On(ctxB, "unknown", func(ctx context.Context, task *Task) {
cancelB()
})

assert.ErrorIs(t, err, context.DeadlineExceeded)
}

0 comments on commit 1bce146

Please sign in to comment.