Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Clean up watch tasks go routines to actually close go routine when wa…
Browse files Browse the repository at this point in the history
…tch is done.
  • Loading branch information
geauxvirtual committed Sep 30, 2015
1 parent 2977879 commit 2174fa0
Showing 1 changed file with 19 additions and 30 deletions.
49 changes: 19 additions & 30 deletions mgmt/rest/client/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (c *Client) WatchTask(id uint) *WatchTasksResult {
r := &WatchTasksResult{
EventChan: make(chan *rbody.StreamedTaskEvent),
DoneChan: make(chan struct{}),
killChan: make(chan struct{}),
}

url := fmt.Sprintf("%s/tasks/%v/watch", c.prefix, id)
Expand All @@ -77,37 +76,30 @@ func (c *Client) WatchTask(id uint) *WatchTasksResult {
r.Err = err
}

go func() {
select {
case <-r.DoneChan:
// We killed so just exit select
case <-r.killChan:
// We were killed so close resp to signal to server and exit
resp.Body.Close()
close(r.DoneChan)
}
}()
// Start watching
go func() {
reader := bufio.NewReader(resp.Body)
for {
line, _ := reader.ReadBytes('\n')
ste := &rbody.StreamedTaskEvent{}
err := json.Unmarshal(line, ste)
if err != nil {
r.Err = err
resp.Body.Close()
close(r.DoneChan)
return
}
switch ste.EventType {
case rbody.TaskWatchTaskDisabled:
r.EventChan <- ste
select {
case <-r.DoneChan:
resp.Body.Close()
close(r.DoneChan)
return
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
r.EventChan <- ste
default:
line, _ := reader.ReadBytes('\n')
ste := &rbody.StreamedTaskEvent{}
err := json.Unmarshal(line, ste)
if err != nil {
r.Err = err
r.Close()
return
}
switch ste.EventType {
case rbody.TaskWatchTaskDisabled:
r.EventChan <- ste
r.Close()
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
r.EventChan <- ste
}
}
}
}()
Expand Down Expand Up @@ -212,13 +204,10 @@ type WatchTasksResult struct {
Err error
EventChan chan *rbody.StreamedTaskEvent
DoneChan chan struct{}
killChan chan struct{}
}

func (w *WatchTasksResult) Close() {
close(w.killChan)
// We do this as a way to ensure the signal gets to the server
<-w.DoneChan
close(w.DoneChan)
}

type GetTasksResult struct {
Expand Down

0 comments on commit 2174fa0

Please sign in to comment.