Skip to content

Commit

Permalink
always spawn a worker with the task it was supposed to complete (#112)
Browse files Browse the repository at this point in the history
For "unlimited" pools, in the original code:

```go
p.handle.Go(p.worker)
p.tasks <- f
```

If in between the call to `handle.Go` and sending the task to the
worker, another goroutine would call `pool.Go`, that task would "hijack"
the newly created worker, causing the original send to block. This is
undesirable behavior if the pool is unlimited.

The solution was to add an `initialFunc` to the worker, which will be
executed before the worker starts waiting for new tasks. This ensures
that a worker will first complete the task it was supposed to, then
complete others.
  • Loading branch information
Link512 authored May 3, 2023
1 parent 06d3061 commit 8e5ba59
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,18 @@ func (p *Pool) Go(f func()) {
default:
// No goroutine was available to handle the task.
// Spawn a new one and send it the task.
p.handle.Go(p.worker)
p.tasks <- f
p.handle.Go(func() {
p.worker(f)
})
}
} else {
select {
case p.limiter <- struct{}{}:
// If we are below our limit, spawn a new worker rather
// than waiting for one to become available.
p.handle.Go(p.worker)

// We know there is at least one worker running, so wait
// for it to become available. This ensures we never spawn
// more workers than the number of tasks.
p.tasks <- f
p.handle.Go(func() {
p.worker(f)
})
case p.tasks <- f:
// A worker is available and has accepted the task.
return
Expand Down Expand Up @@ -149,11 +147,15 @@ func (p *Pool) WithContext(ctx context.Context) *ContextPool {
}
}

func (p *Pool) worker() {
func (p *Pool) worker(initialFunc func()) {
// The only time this matters is if the task panics.
// This makes it possible to spin up new workers in that case.
defer p.limiter.release()

if initialFunc != nil {
initialFunc()
}

for f := range p.tasks {
f()
}
Expand Down

0 comments on commit 8e5ba59

Please sign in to comment.