Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
fixes #361
Browse files Browse the repository at this point in the history
When the `task.spin()` goroutine returns, it leaves the one running
`task.waitForSchedule()` blocked.  When the task is restarted, its
response chan is available again, causing the hung `waitForSchedule`
goroutine to begin execution again.

At this time, the _new_ call to `spin()` spawns a _new_ goroutine for
`waitForSchedule`, causing the spin loop to be executed twice.

This pattern continued for each restart of a task.
  • Loading branch information
pittma committed Sep 25, 2015
1 parent bfec09b commit 847c010
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 16 deletions.
2 changes: 2 additions & 0 deletions mgmt/rest/rest_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,8 @@ func TestPluginRestCalls(t *testing.T) {
plr4 := r4.Body.(*rbody.ScheduledTaskStopped)
So(plr4.ID, ShouldEqual, id)

time.Sleep(1 * time.Second)

r5 := getTasks(port)
So(r5.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskListReturned))
plr5 := r5.Body.(*rbody.ScheduledTaskListReturned)
Expand Down
13 changes: 9 additions & 4 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func newTask(s schedule.Schedule, wf *schedulerWorkflow, m *workManager, mm mana
id: taskId,
name: name,
schResponseChan: make(chan schedule.Response),
killChan: make(chan struct{}),
schedule: s,
state: core.TaskStopped,
creationTime: time.Now(),
Expand Down Expand Up @@ -174,6 +173,7 @@ func (t *task) Spin() {
defer t.Unlock()
if t.state == core.TaskStopped {
t.state = core.TaskSpinning
t.killChan = make(chan struct{})
// spin in a goroutine
go t.spin()
}
Expand All @@ -183,15 +183,15 @@ func (t *task) Stop() {
t.Lock()
defer t.Unlock()
if t.state == core.TaskFiring || t.state == core.TaskSpinning {
t.killChan <- struct{}{}
close(t.killChan)
}
}

func (t *task) Kill() {
t.Lock()
defer t.Unlock()
if t.state == core.TaskFiring || t.state == core.TaskSpinning {
t.killChan <- struct{}{}
close(t.killChan)
t.state = core.TaskDisabled
}
}
Expand Down Expand Up @@ -274,6 +274,7 @@ func (t *task) spin() {
case <-t.killChan:
// Only here can it truly be stopped
t.state = core.TaskStopped
t.lastFireTime = time.Time{}
return
}
}
Expand All @@ -289,7 +290,11 @@ func (t *task) fire() {
}

func (t *task) waitForSchedule() {
t.schResponseChan <- t.schedule.Wait(t.lastFireTime)
select {
case <-t.killChan:
return
case t.schResponseChan <- t.schedule.Wait(t.lastFireTime):
}
}

type taskCollection struct {
Expand Down
12 changes: 0 additions & 12 deletions scheduler/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,6 @@ func TestTask(t *testing.T) {
task.Stop()
time.Sleep(time.Millisecond * 10) // it is a race so we slow down the test
So(task.state, ShouldEqual, core.TaskStopped)
Convey("Stopping a stopped tasks should not send to kill channel", func() {
task.Stop()
b := false
select {
case <-task.killChan:
b = true
default:
b = false
}
So(task.state, ShouldEqual, core.TaskStopped)
So(b, ShouldBeFalse)
})
})
})

Expand Down

0 comments on commit 847c010

Please sign in to comment.