Skip to content

Commit

Permalink
Fix the worker concurrency model (#126)
Browse files Browse the repository at this point in the history
Limit the number of concurrent workers by starting a fixed number of goroutines and providing work by an input channel.

Fixes #123
  • Loading branch information
rakyll authored Sep 7, 2020
1 parent 6fb0d86 commit c915060
Showing 1 changed file with 34 additions and 27 deletions.
61 changes: 34 additions & 27 deletions goprocess/gp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,54 @@ type P struct {

// FindAll returns all the Go processes currently running on this host.
func FindAll() []P {
const concurrencyProcesses = 10 // limit the maximum number of concurrent reading process tasks
const concurrencyLimit = 10 // max number of concurrent workers
pss, err := ps.Processes()
if err != nil {
return nil
}

input := make(chan ps.Process, len(pss))
output := make(chan P, len(pss))

for _, ps := range pss {
input <- ps
}
close(input)

var wg sync.WaitGroup
wg.Add(len(pss))
found := make(chan P)
limitCh := make(chan struct{}, concurrencyProcesses)
wg.Add(concurrencyLimit) // used to wait for workers to be finished

for _, pr := range pss {
limitCh <- struct{}{}
pr := pr
// Run concurrencyLimit of workers until there
// is no more processes to be checked in the input channel.
for i := 0; i < concurrencyLimit; i++ {
go func() {
defer func() { <-limitCh }()
defer wg.Done()

path, version, agent, ok, err := isGo(pr)
if err != nil {
// TODO(jbd): Return a list of errors.
}
if !ok {
return
}
found <- P{
PID: pr.Pid(),
PPID: pr.PPid(),
Exec: pr.Executable(),
Path: path,
BuildVersion: version,
Agent: agent,
for pr := range input {
path, version, agent, ok, err := isGo(pr)
if err != nil {
// TODO(jbd): Return a list of errors.
continue
}
if !ok {
continue
}
output <- P{
PID: pr.Pid(),
PPID: pr.PPid(),
Exec: pr.Executable(),
Path: path,
BuildVersion: version,
Agent: agent,
}
}
}()
}
go func() {
wg.Wait()
close(found)
}()
wg.Wait() // wait until all workers are finished
close(output) // no more results to be waited for

var results []P
for p := range found {
for p := range output {
results = append(results, p)
}
return results
Expand Down

0 comments on commit c915060

Please sign in to comment.