From c9150606f90ab6cbea6b9967b3fc85fede299824 Mon Sep 17 00:00:00 2001 From: JBD Date: Mon, 7 Sep 2020 14:23:34 -0700 Subject: [PATCH] Fix the worker concurrency model (#126) Limit the number of concurrent workers by starting a fixed number of goroutines and providing work by an input channel. Fixes #123 --- goprocess/gp.go | 61 +++++++++++++++++++++++++++---------------------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/goprocess/gp.go b/goprocess/gp.go index aa88e150..87d9959c 100644 --- a/goprocess/gp.go +++ b/goprocess/gp.go @@ -27,47 +27,54 @@ type P struct { // FindAll returns all the Go processes currently running on this host. func FindAll() []P { - const concurrencyProcesses = 10 // limit the maximum number of concurrent reading process tasks + const concurrencyLimit = 10 // max number of concurrent workers pss, err := ps.Processes() if err != nil { return nil } + input := make(chan ps.Process, len(pss)) + output := make(chan P, len(pss)) + + for _, ps := range pss { + input <- ps + } + close(input) + var wg sync.WaitGroup - wg.Add(len(pss)) - found := make(chan P) - limitCh := make(chan struct{}, concurrencyProcesses) + wg.Add(concurrencyLimit) // used to wait for workers to be finished - for _, pr := range pss { - limitCh <- struct{}{} - pr := pr + // Run concurrencyLimit of workers until there + // is no more processes to be checked in the input channel. + for i := 0; i < concurrencyLimit; i++ { go func() { - defer func() { <-limitCh }() defer wg.Done() - path, version, agent, ok, err := isGo(pr) - if err != nil { - // TODO(jbd): Return a list of errors. - } - if !ok { - return - } - found <- P{ - PID: pr.Pid(), - PPID: pr.PPid(), - Exec: pr.Executable(), - Path: path, - BuildVersion: version, - Agent: agent, + for pr := range input { + path, version, agent, ok, err := isGo(pr) + if err != nil { + // TODO(jbd): Return a list of errors. + continue + } + if !ok { + continue + } + output <- P{ + PID: pr.Pid(), + PPID: pr.PPid(), + Exec: pr.Executable(), + Path: path, + BuildVersion: version, + Agent: agent, + } } }() } - go func() { - wg.Wait() - close(found) - }() + wg.Wait() // wait until all workers are finished + close(output) // no more results to be waited for + var results []P - for p := range found { + for p := range output { results = append(results, p) } return results