diff --git a/common/task/task.go b/common/task/task.go index d37d9e72..6b266d28 100644 --- a/common/task/task.go +++ b/common/task/task.go @@ -23,6 +23,7 @@ type Group struct { tasks []taskItem cleanup func() fastFail bool + queue chan struct{} } func (g *Group) Append(name string, f func(ctx context.Context) error) { @@ -46,6 +47,13 @@ func (g *Group) FastFail() { g.fastFail = true } +func (g *Group) Concurrency(n int) { + g.queue = make(chan struct{}, n) + for i := 0; i < n; i++ { + g.queue <- struct{}{} + } +} + func (g *Group) Run(contextList ...context.Context) error { return g.RunContextList(contextList) } @@ -65,6 +73,14 @@ func (g *Group) RunContextList(contextList []context.Context) error { for _, task := range g.tasks { currentTask := task go func() { + if g.queue != nil { + <-g.queue + select { + case <-taskCancelContext.Done(): + return + default: + } + } err := currentTask.Run(taskCancelContext) errorAccess.Lock() if err != nil { @@ -83,6 +99,9 @@ func (g *Group) RunContextList(contextList []context.Context) error { taskCancel(errTaskSucceed{}) taskFinish(errTaskSucceed{}) } + if g.queue != nil { + g.queue <- struct{}{} + } }() }