Skip to content

Commit

Permalink
Use conc.PoolOption instead of ants.Option (#24585)
Browse files Browse the repository at this point in the history
- Add conc.PoolOption to setup conc.Pool
- Change panic default behavior
- Make future has error when job panicks

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia authored Jun 1, 2023
1 parent 3022e37 commit 31880ab
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 18 deletions.
3 changes: 1 addition & 2 deletions internal/datanode/channel_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/panjf2000/ants/v2"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -144,7 +143,7 @@ var _ Channel = &ChannelMeta{}
func newChannel(channelName string, collID UniqueID, schema *schemapb.CollectionSchema, rc types.RootCoord, cm storage.ChunkManager) *ChannelMeta {
metaService := newMetaService(rc, collID)

pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0), ants.WithPreAlloc(false), ants.WithNonblocking(false))
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0), conc.WithPreAlloc(false), conc.WithNonBlocking(false))

channel := ChannelMeta{
collectionID: collID,
Expand Down
5 changes: 2 additions & 3 deletions internal/querynodev2/segments/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/paramtable"
ants "github.com/panjf2000/ants/v2"
"go.uber.org/atomic"
)

Expand All @@ -36,8 +35,8 @@ func InitPool() {
initOnce.Do(func() {
pool := conc.NewPool[any](
paramtable.Get().QueryNodeCfg.MaxReadConcurrency.GetAsInt(),
ants.WithPreAlloc(true),
ants.WithDisablePurge(true),
conc.WithPreAlloc(true),
conc.WithDisablePurge(true),
)
conc.WarmupPool(pool, runtime.LockOSThread)

Expand Down
3 changes: 1 addition & 2 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

"github.com/cockroachdb/errors"
ants "github.com/panjf2000/ants/v2"
"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -91,7 +90,7 @@ func NewLoader(
ioPoolSize = configPoolSize
}

ioPool := conc.NewPool[*storage.Blob](ioPoolSize, ants.WithPreAlloc(true))
ioPool := conc.NewPool[*storage.Blob](ioPoolSize, conc.WithPreAlloc(true))

log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))

Expand Down
3 changes: 1 addition & 2 deletions internal/querynodev2/tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/panjf2000/ants/v2"
)

const (
Expand Down Expand Up @@ -38,7 +37,7 @@ func NewScheduler() *Scheduler {
mergedSearchTasks: make(chan *SearchTask),
// queryProcessQueue: make(chan),

pool: conc.NewPool[any](maxReadConcurrency, ants.WithPreAlloc(true)),
pool: conc.NewPool[any](maxReadConcurrency, conc.WithPreAlloc(true)),
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/klauspost/compress v1.14.4
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230529034923-4579ee9d5723
github.com/panjf2000/ants/v2 v2.4.8
github.com/panjf2000/ants/v2 v2.7.2
github.com/prometheus/client_golang v1.11.1
github.com/samber/lo v1.27.0
github.com/shirou/gopsutil/v3 v3.22.9
Expand All @@ -29,7 +29,7 @@ require (
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sync v0.1.0
google.golang.org/grpc v1.52.3
google.golang.org/protobuf v1.28.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
Expand Down
8 changes: 4 additions & 4 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNia
github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/panjf2000/ants/v2 v2.4.8 h1:JgTbolX6K6RreZ4+bfctI0Ifs+3mrE5BIHudQxUDQ9k=
github.com/panjf2000/ants/v2 v2.4.8/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/panjf2000/ants/v2 v2.7.2 h1:2NUt9BaZFO5kQzrieOmK/wdb/tQ/K+QHaxN8sOgD63U=
github.com/panjf2000/ants/v2 v2.7.2/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ=
Expand Down Expand Up @@ -881,8 +881,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
106 changes: 106 additions & 0 deletions pkg/util/conc/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package conc

import (
"time"

"github.com/milvus-io/milvus/pkg/log"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)

type poolOption struct {
// pre-allocs workers
preAlloc bool
// block or not when pool is full
nonBlocking bool
// duration to cleanup worker goroutine
expiryDuration time.Duration
// disable purge worker
disablePurge bool
// whether conceal panic when job has panic
concealPanic bool
// panicHandler when task panics
panicHandler func(any)
}

func (opt *poolOption) antsOptions() []ants.Option {
var result []ants.Option
result = append(result, ants.WithPreAlloc(opt.preAlloc))
result = append(result, ants.WithNonblocking(opt.nonBlocking))
result = append(result, ants.WithDisablePurge(opt.disablePurge))
// ants recovers panic by default
// however the error is not returned
result = append(result, ants.WithPanicHandler(func(v any) {
log.Error("Conc pool panicked", zap.Any("panic", v))
if !opt.concealPanic {
panic(v)
}
}))
if opt.panicHandler != nil {
result = append(result, ants.WithPanicHandler(opt.panicHandler))
}
if opt.expiryDuration > 0 {
result = append(result, ants.WithExpiryDuration(opt.expiryDuration))
}

return result
}

// PoolOption options function to setup pool.
type PoolOption func(opt *poolOption)

func defaultPoolOption() *poolOption {
return &poolOption{
preAlloc: false,
nonBlocking: false,
expiryDuration: 0,
disablePurge: false,
concealPanic: false,
}
}

func WithPreAlloc(v bool) PoolOption {
return func(opt *poolOption) {
opt.preAlloc = v
}
}

func WithNonBlocking(v bool) PoolOption {
return func(opt *poolOption) {
opt.nonBlocking = v
}
}

func WithDisablePurge(v bool) PoolOption {
return func(opt *poolOption) {
opt.disablePurge = v
}
}

func WithExpiryDuration(d time.Duration) PoolOption {
return func(opt *poolOption) {
opt.expiryDuration = d
}
}

func WithConcealPanic(v bool) PoolOption {
return func(opt *poolOption) {
opt.concealPanic = v
}
}
48 changes: 48 additions & 0 deletions pkg/util/conc/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package conc

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestPoolOption(t *testing.T) {
opt := &poolOption{}

o := WithPreAlloc(true)
o(opt)
assert.True(t, opt.preAlloc)

o = WithNonBlocking(true)
o(opt)
assert.True(t, opt.nonBlocking)

o = WithDisablePurge(true)
o(opt)
assert.True(t, opt.disablePurge)

o = WithExpiryDuration(time.Second)
o(opt)
assert.Equal(t, time.Second, opt.expiryDuration)

o = WithConcealPanic(true)
o(opt)
assert.True(t, opt.concealPanic)
}
19 changes: 16 additions & 3 deletions pkg/util/conc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package conc

import (
"fmt"
"runtime"
"sync"

Expand All @@ -27,13 +28,19 @@ import (
// A goroutine pool
type Pool[T any] struct {
inner *ants.Pool
opt *poolOption
}

// NewPool returns a goroutine pool.
// cap: the number of workers.
// This panic if provide any invalid option.
func NewPool[T any](cap int, opts ...ants.Option) *Pool[T] {
pool, err := ants.NewPool(cap, opts...)
func NewPool[T any](cap int, opts ...PoolOption) *Pool[T] {
opt := defaultPoolOption()
for _, o := range opts {
o(opt)
}

pool, err := ants.NewPool(cap, opt.antsOptions()...)
if err != nil {
panic(err)
}
Expand All @@ -46,7 +53,7 @@ func NewPool[T any](cap int, opts ...ants.Option) *Pool[T] {
// NewDefaultPool returns a pool with cap of the number of logical CPU,
// and pre-alloced goroutines.
func NewDefaultPool[T any]() *Pool[T] {
return NewPool[T](runtime.GOMAXPROCS(0), ants.WithPreAlloc(true))
return NewPool[T](runtime.GOMAXPROCS(0), WithPreAlloc(true))
}

// Submit a task into the pool,
Expand All @@ -57,6 +64,12 @@ func (pool *Pool[T]) Submit(method func() (T, error)) *Future[T] {
future := newFuture[T]()
err := pool.inner.Submit(func() {
defer close(future.ch)
defer func() {
if x := recover(); x != nil {
future.err = fmt.Errorf("panicked with error: %v", x)
panic(x) // throw panic out
}
}()
res, err := method()
if err != nil {
future.err = err
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/conc/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,15 @@ func TestPool(t *testing.T) {
assert.Equal(t, err, errDup)
}
}

func TestPoolWithPanic(t *testing.T) {
pool := NewPool[any](1, WithConcealPanic(true))

future := pool.Submit(func() (any, error) {
panic("mocked panic")
})

// make sure error returned when conceal panic
_, err := future.Await()
assert.Error(t, err)
}

0 comments on commit 31880ab

Please sign in to comment.