diff --git a/mgmt/tribe/tribe.go b/mgmt/tribe/tribe.go index 28274d417..1fabc166f 100644 --- a/mgmt/tribe/tribe.go +++ b/mgmt/tribe/tribe.go @@ -61,7 +61,7 @@ type tribe struct { pluginWorkQueue chan worker.PluginRequest taskWorkQueue chan worker.TaskRequest - workerQuitChan chan interface{} + workerQuitChan chan struct{} workerWaitGroup *sync.WaitGroup } @@ -99,6 +99,8 @@ func New(c *config) (*tribe, error) { tags: map[string]string{agreement.RestAPIPort: strconv.Itoa(c.restAPIPort)}, pluginWorkQueue: make(chan worker.PluginRequest, 999), taskWorkQueue: make(chan worker.TaskRequest, 999), + workerQuitChan: make(chan struct{}), + workerWaitGroup: &sync.WaitGroup{}, } tribe.broadcasts = &memberlist.TransmitLimitedQueue{ diff --git a/mgmt/tribe/worker/worker.go b/mgmt/tribe/worker/worker.go index 604075b14..c19bdbe66 100644 --- a/mgmt/tribe/worker/worker.go +++ b/mgmt/tribe/worker/worker.go @@ -71,88 +71,55 @@ type Member interface { // newPluginWorker func newWorker(id int, - pluginWorkerQueue chan chan PluginRequest, - taskWorkerQueue chan chan TaskRequest, - quitChan chan interface{}, + pluginQueue chan PluginRequest, + taskQueue chan TaskRequest, + quitChan chan struct{}, wg *sync.WaitGroup, pm ManagesPlugins, tm ManagesTasks, mm getsMembers) worker { - // Create, and return the worker. worker := worker{ - pluginManager: pm, - taskManager: tm, - memberManager: mm, - id: id, - pluginWork: make(chan PluginRequest), - pluginWorkerQueue: pluginWorkerQueue, - taskWork: make(chan TaskRequest), - taskWorkerQueue: taskWorkerQueue, - quitChan: make(chan bool), + pluginManager: pm, + taskManager: tm, + memberManager: mm, + id: id, + pluginWork: pluginQueue, + taskWork: taskQueue, + waitGroup: wg, + quitChan: quitChan, } return worker } type worker struct { - pluginManager ManagesPlugins - memberManager getsMembers - taskManager ManagesTasks - id int - pluginWork chan PluginRequest - pluginWorkerQueue chan chan PluginRequest - taskWork chan TaskRequest - taskWorkerQueue chan chan TaskRequest - quitChan chan bool - waitGroup *sync.WaitGroup + pluginManager ManagesPlugins + memberManager getsMembers + taskManager ManagesTasks + id int + pluginWork chan PluginRequest + taskWork chan TaskRequest + quitChan chan struct{} + waitGroup *sync.WaitGroup } -func DispatchWorkers(nworkers int, pluginWorkQueue chan PluginRequest, taskWorkQueue chan TaskRequest, quitChan chan interface{}, workerWaitGroup *sync.WaitGroup, cp ManagesPlugins, tm ManagesTasks, mm getsMembers) { - pluginWorkerQueue := make(chan chan PluginRequest, nworkers) - taskWorkerQueue := make(chan chan TaskRequest, nworkers) +func DispatchWorkers(nworkers int, pluginQueue chan PluginRequest, taskQueue chan TaskRequest, quitChan chan struct{}, workerWaitGroup *sync.WaitGroup, cp ManagesPlugins, tm ManagesTasks, mm getsMembers) { for i := 0; i < nworkers; i++ { workerLogger.Infof("Starting tribe worker-%d", i+1) - worker := newWorker(i+1, pluginWorkerQueue, taskWorkerQueue, quitChan, workerWaitGroup, cp, tm, mm) + worker := newWorker(i+1, pluginQueue, taskQueue, quitChan, workerWaitGroup, cp, tm, mm) worker.start() } - - go func() { - for { - select { - case pluginWork := <-pluginWorkQueue: - workerLogger.Infof("Received plugin work request") - go func() { - pluginWorker := <-pluginWorkerQueue - - workerLogger.Infof("Dispatching plugin work request") - pluginWorker <- pluginWork - }() - case taskWork := <-taskWorkQueue: - workerLogger.Infof("Received task work request") - go func() { - workerLogger.Infof("Waiting for free worker") - taskWorker := <-taskWorkerQueue - - workerLogger.Infof("Dispatching task work request") - taskWorker <- taskWork - }() - case <-quitChan: - workerLogger.Infof("Stopping plugin work dispatcher") - return - } - } - }() } // Start "starts" the workers func (w worker) start() { // task worker + w.waitGroup.Add(1) go func() { + defer w.waitGroup.Done() + workerLogger.Debugf("Starting task worker-%d", w.id) for { - defer w.waitGroup.Done() - w.taskWorkerQueue <- w.taskWork - select { case work := <-w.taskWork: done := false @@ -162,6 +129,7 @@ func (w worker) start() { "worker": w.id, "_block": "start", }) + wLogger.Error("received task work") if work.RequestType == TaskCreatedType { task, _ := w.taskManager.GetTask(work.Task.ID) if task != nil { @@ -222,12 +190,11 @@ func (w worker) start() { }() // plugin worker + w.waitGroup.Add(1) go func() { defer w.waitGroup.Done() + workerLogger.Debugf("Starting plugin worker-%d", w.id) for { - // Add ourselves into the worker queue. - w.pluginWorkerQueue <- w.pluginWork - select { case work := <-w.pluginWork: // Receive a work request. @@ -238,6 +205,7 @@ func (w worker) start() { "worker": w.id, "_block": "start", }) + wLogger.Debug("received plugin work") done := false for { if w.isPluginLoaded(work.Plugin.Name(), work.Plugin.TypeName(), work.Plugin.Version()) { @@ -249,7 +217,7 @@ func (w worker) start() { continue } for _, member := range shuffle(members) { - url := fmt.Sprintf("http://%s:%s/v1/plugins/%s/%s/%d", member.GetAddr(), member.GetRESTAPIPort(), work.Plugin.TypeName(), work.Plugin.Name(), work.Plugin.Version()) + url := fmt.Sprintf("http://%s:%s/v1/plugins/%s/%s/%d?download=true", member.GetAddr(), member.GetRESTAPIPort(), work.Plugin.TypeName(), work.Plugin.Name(), work.Plugin.Version()) resp, err := http.Get(url) if err != nil { wLogger.Error(err) @@ -296,13 +264,6 @@ func (w worker) start() { }() } -// Stop tells the worker to stop listening -func (w worker) Stop() { - go func() { - w.quitChan <- true - }() -} - func shuffle(m []Member) []Member { result := make([]Member, len(m)) perm := rand.Perm(len(m))