diff --git a/go.mod b/go.mod index ce96c94..5fd7a38 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/mattn/go-isatty v0.0.7 github.com/oklog/run v1.0.0 - github.com/panjf2000/ants v0.0.0-20190820151255-b60dfa8c16b0 + github.com/panjf2000/ants/v2 v2.1.1 github.com/pkg/errors v0.8.1 github.com/rs/zerolog v1.14.3 github.com/ryanuber/columnize v2.1.0+incompatible diff --git a/go.sum b/go.sum index b175ecd..c9c38e6 100644 --- a/go.sum +++ b/go.sum @@ -100,8 +100,8 @@ github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQz github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= -github.com/panjf2000/ants v0.0.0-20190820151255-b60dfa8c16b0 h1:otVy5M/CL7sQJRx9MxpFq3feQp7AEgx1Hp4/RPBRqkE= -github.com/panjf2000/ants v0.0.0-20190820151255-b60dfa8c16b0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY= +github.com/panjf2000/ants/v2 v2.1.1 h1:Or1KjIoVSdiWAh9553KI6vmBoMqEo0lqRcwS9v86+ts= +github.com/panjf2000/ants/v2 v2.1.1/go.mod h1:1GFm8bV8nyCQvU5K4WvBCTG1/YBFOD2VzjffD8fV55A= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= diff --git a/pkg/autoscale/handler.go b/pkg/autoscale/handler.go index 5b3e5b1..ba14cc3 100644 --- a/pkg/autoscale/handler.go +++ b/pkg/autoscale/handler.go @@ -7,7 +7,7 @@ import ( "github.com/jrasell/sherpa/pkg/policy" policyBackend "github.com/jrasell/sherpa/pkg/policy/backend" "github.com/jrasell/sherpa/pkg/scale" - "github.com/panjf2000/ants" + ants "github.com/panjf2000/ants/v2" "github.com/rs/zerolog" ) @@ -156,13 +156,7 @@ func (a *AutoScale) setScalingInProgressFalse() { // createWorkerPool is responsible for building the ants goroutine worker pool with the number of // threads controlled by the operator configured value. func (a *AutoScale) createWorkerPool() (*ants.PoolWithFunc, error) { - return ants.NewPoolWithFunc( - ants.Options{ - Capacity: a.cfg.ScalingThreads, - ExpiryDuration: 60 * time.Second, - PoolFunc: a.workerPoolFunc(), - }, - ) + return ants.NewPoolWithFunc(a.cfg.ScalingThreads, a.workerPoolFunc(), ants.WithExpiryDuration(60*time.Second)) } func (a *AutoScale) workerPoolFunc() func(payload interface{}) { diff --git a/vendor/github.com/panjf2000/ants/go.mod b/vendor/github.com/panjf2000/ants/go.mod deleted file mode 100644 index 53277c0..0000000 --- a/vendor/github.com/panjf2000/ants/go.mod +++ /dev/null @@ -1 +0,0 @@ -module github.com/panjf2000/ants diff --git a/vendor/github.com/panjf2000/ants/.gitignore b/vendor/github.com/panjf2000/ants/v2/.gitignore similarity index 100% rename from vendor/github.com/panjf2000/ants/.gitignore rename to vendor/github.com/panjf2000/ants/v2/.gitignore diff --git a/vendor/github.com/panjf2000/ants/.travis.yml b/vendor/github.com/panjf2000/ants/v2/.travis.yml similarity index 95% rename from vendor/github.com/panjf2000/ants/.travis.yml rename to vendor/github.com/panjf2000/ants/v2/.travis.yml index 469e02e..494077a 100644 --- a/vendor/github.com/panjf2000/ants/.travis.yml +++ b/vendor/github.com/panjf2000/ants/v2/.travis.yml @@ -1,11 +1,11 @@ language: go go: - - "1.8.x" - "1.9.x" - "1.10.x" - "1.11.x" - "1.12.x" + - "1.13.x" before_install: - go get -t -v ./... diff --git a/vendor/github.com/panjf2000/ants/v2/CODE_OF_CONDUCT.md b/vendor/github.com/panjf2000/ants/v2/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..2fccd5c --- /dev/null +++ b/vendor/github.com/panjf2000/ants/v2/CODE_OF_CONDUCT.md @@ -0,0 +1,76 @@ +# Contributor Covenant Code of Conduct + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as +contributors and maintainers pledge to making participation in our project and +our community a harassment-free experience for everyone, regardless of age, body +size, disability, ethnicity, sex characteristics, gender identity and expression, +level of experience, education, socio-economic status, nationality, personal +appearance, race, religion, or sexual identity and orientation. + +## Our Standards + +Examples of behavior that contributes to creating a positive environment +include: + +* Using welcoming and inclusive language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or + advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or electronic + address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable +behavior and are expected to take appropriate and fair corrective action in +response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or +reject comments, commits, code, wiki edits, issues, and other contributions +that are not aligned to this Code of Conduct, or to ban temporarily or +permanently any contributor for other behaviors that they deem inappropriate, +threatening, offensive, or harmful. + +## Scope + +This Code of Conduct applies both within project spaces and in public spaces +when an individual is representing the project or its community. Examples of +representing a project or community include using an official project e-mail +address, posting via an official social media account, or acting as an appointed +representative at an online or offline event. Representation of a project may be +further defined and clarified by project maintainers. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by contacting the project team at panjf2000@gmail.com. All +complaints will be reviewed and investigated and will result in a response that +is deemed necessary and appropriate to the circumstances. The project team is +obligated to maintain confidentiality with regard to the reporter of an incident. +Further details of specific enforcement policies may be posted separately. + +Project maintainers who do not follow or enforce the Code of Conduct in good +faith may face temporary or permanent repercussions as determined by other +members of the project's leadership. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, +available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html + +[homepage]: https://www.contributor-covenant.org + +For answers to common questions about this code of conduct, see +https://www.contributor-covenant.org/faq diff --git a/vendor/github.com/panjf2000/ants/v2/CONTRIBUTING.md b/vendor/github.com/panjf2000/ants/v2/CONTRIBUTING.md new file mode 100644 index 0000000..6911574 --- /dev/null +++ b/vendor/github.com/panjf2000/ants/v2/CONTRIBUTING.md @@ -0,0 +1,14 @@ +# Contributing + +## With issues: + - Use the search tool before opening a new issue. + - Please provide source code and commit sha if you found a bug. + - Review existing issues and provide feedback or react to them. + +## With pull requests: + - Open your pull request against `master`. + - Open one pull request for only one feature/proposal, if you have several those, please put them into different PRs, whereas you are allowed to open one pull request with several bug-fixs. + - Your pull request should have no more than two commits, if not, you should squash them. + - It should pass all tests in the available continuous integrations systems such as TravisCI. + - You should add/modify tests to cover your proposed code changes. + - If your pull request contains a new feature, please document it on the README. diff --git a/vendor/github.com/panjf2000/ants/LICENSE b/vendor/github.com/panjf2000/ants/v2/LICENSE similarity index 100% rename from vendor/github.com/panjf2000/ants/LICENSE rename to vendor/github.com/panjf2000/ants/v2/LICENSE diff --git a/vendor/github.com/panjf2000/ants/README.md b/vendor/github.com/panjf2000/ants/v2/README.md similarity index 71% rename from vendor/github.com/panjf2000/ants/README.md rename to vendor/github.com/panjf2000/ants/v2/README.md index 466827e..7c153cb 100644 --- a/vendor/github.com/panjf2000/ants/README.md +++ b/vendor/github.com/panjf2000/ants/v2/README.md @@ -9,12 +9,13 @@ A goroutine pool for Go
- + +

-[中文](README_ZH.md) | [Project Blog](https://taohuawu.club/high-performance-implementation-of-goroutine-pool) +# [[中文](README_ZH.md)] Library `ants` implements a goroutine pool with fixed capacity, managing and recycling a massive number of goroutines, allowing developers to limit the number of goroutines in your concurrent programs. @@ -25,6 +26,7 @@ Library `ants` implements a goroutine pool with fixed capacity, managing and rec - Friendly interfaces: submitting tasks, getting the number of running goroutines, tuning capacity of pool dynamically, closing pool. - Handle panic gracefully to prevent programs from crash. - Efficient in memory usage and it even achieves higher performance than unlimited goroutines in golang. +- Nonblocking mechanism. ## Tested in the following Golang versions: @@ -33,6 +35,7 @@ Library `ants` implements a goroutine pool with fixed capacity, managing and rec - 1.10.x - 1.11.x - 1.12.x +- 1.13.x ## How to install @@ -42,7 +45,7 @@ go get -u github.com/panjf2000/ants ``` ## How to use -Just take a imagination that your program starts a massive number of goroutines, from which a vast amount of memory will be consumed. To mitigate that kind of situation, all you need to do is to import `ants` package and submit all your tasks to a default pool with fixed capacity, activated when package `ants` is imported: +Just take a imagination that your program starts a massive number of goroutines, resulting in a huge consumption of memory. To mitigate that kind of situation, all you need to do is to import `ants` package and submit all your tasks to a default pool with fixed capacity, activated when package `ants` is imported: ``` go package main @@ -53,7 +56,7 @@ import ( "sync/atomic" "time" - "github.com/panjf2000/ants" + "github.com/panjf2000/ants/v2" ) var sum int32 @@ -88,12 +91,12 @@ func main() { fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("finish all tasks.\n") - // Use the pool with a method, + // Use the pool with a function, // set 10 to the capacity of goroutine pool and 1 second for expired duration. - p, _ := ants.NewPoolWithFunc(ants.Options{Capacity: 10, PoolFunc: func(i interface{}) { + p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { myFunc(i) wg.Done() - }}) + }) defer p.Release() // Submit tasks one by one. for i := 0; i < runTimes; i++ { @@ -114,7 +117,7 @@ import ( "io/ioutil" "net/http" - "github.com/panjf2000/ants" + "github.com/panjf2000/ants/v2" ) type Request struct { @@ -123,7 +126,7 @@ type Request struct { } func main() { - pool, _ := ants.NewPoolWithFunc(ants.Options{Capacity:100, PoolFunc:func(payload interface{}) { + pool, _ := ants.NewPoolWithFunc(100000, func(payload interface{}) { request, ok := payload.(*Request) if !ok { return @@ -136,7 +139,7 @@ func main() { }(request.Param) request.Result <- reverseParam - }}) + }) defer pool.Release() http.HandleFunc("/reverse", func(w http.ResponseWriter, r *http.Request) { @@ -161,13 +164,10 @@ func main() { } ``` -## Options for ants pool +## Functional options for ants pool ```go type Options struct { - // Capacity of the pool. - Capacity int - // ExpiryDuration set the expired time (second) of every worker. ExpiryDuration time.Duration @@ -186,13 +186,46 @@ type Options struct { // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. PanicHandler func(interface{}) +} + +func WithOptions(options Options) Option { + return func(opts *Options) { + *opts = options + } +} + +func WithExpiryDuration(expiryDuration time.Duration) Option { + return func(opts *Options) { + opts.ExpiryDuration = expiryDuration + } +} + +func WithPreAlloc(preAlloc bool) Option { + return func(opts *Options) { + opts.PreAlloc = preAlloc + } +} + +func WithMaxBlockingTasks(maxBlockingTasks int) Option { + return func(opts *Options) { + opts.MaxBlockingTasks = maxBlockingTasks + } +} - // poolFunc is the function for processing tasks. - PoolFunc func(interface{}) +func WithNonblocking(nonblocking bool) Option { + return func(opts *Options) { + opts.Nonblocking = nonblocking + } +} + +func WithPanicHandler(panicHandler func(interface{})) Option { + return func(opts *Options) { + opts.PanicHandler = panicHandler + } } ``` -`ants.Options`contains all configurations of ants pool, which allow you to customize the goroutine pool by setting up each field in it, then passing it to `NewPool`method. +`ants.Options`contains all optional configurations of ants pool, which allows you to customize the goroutine pool by invoking option functions to set up each configuration in `NewPool`/`NewPoolWithFunc`method. ## Customize limited pool @@ -200,7 +233,7 @@ type Options struct { ``` go // Set 10000 the size of goroutine pool -p, _ := ants.NewPool(ants.Options{Capacity: 10000}) +p, _ := ants.NewPool(10000) ``` ## Submit tasks @@ -221,11 +254,11 @@ Don't worry about the synchronous problems in this case, the method here is thre ## Pre-malloc goroutine queue in pool -`ants` allows you to pre-allocate memory of goroutine queue in pool, which may get a performance enhancement under some special certain circumstances such as the scenario that requires an pool with ultra-large capacity, meanwhile each task in goroutine lasts for a long time, in this case, pre-mallocing will reduce a lot of costs when re-slicing goroutine queue. +`ants` allows you to pre-allocate memory of goroutine queue in pool, which may get a performance enhancement under some special certain circumstances such as the scenario that requires a pool with ultra-large capacity, meanwhile each task in goroutine lasts for a long time, in this case, pre-mallocing will reduce a lot of costs when re-slicing goroutine queue. ```go // ants will pre-malloc the whole capacity of pool when you invoke this method -p, _ := ants.NewPool(ants.Options{Capacity: AntsSize, PreAlloc: true}) +p, _ := ants.NewPool(100000, ants.WithPreAlloc(true)) ``` ## Release Pool @@ -239,36 +272,26 @@ All tasks submitted to `ants` pool will not be guaranteed to be addressed in ord ## Benchmarks -``` -OS: macOS High Sierra -Processor: 2.7 GHz Intel Core i5 -Memory: 8 GB 1867 MHz DDR3 - -Go Version: 1.9 -``` -
- In that benchmark-picture, the first and second benchmarks performed test cases with 1M tasks and the rest of benchmarks performed test cases with 10M tasks, both in unlimited goroutines and `ants` pool, and the capacity of this `ants` goroutine-pool was limited to 50K. + In this benchmark-picture, the first and second benchmarks performed test cases with 1M tasks and the rest of benchmarks performed test cases with 10M tasks, both in unlimited goroutines and `ants` pool, and the capacity of this `ants` goroutine-pool was limited to 50K. - BenchmarkGoroutine-4 represents the benchmarks with unlimited goroutines in golang. - BenchmarkPoolGroutine-4 represents the benchmarks with a `ants` pool. -The test data above is a basic benchmark and more detailed benchmarks are about to be uploaded later. - ### Benchmarks with Pool ![](https://user-images.githubusercontent.com/7496278/51515499-f187c500-1e4e-11e9-80e5-3df8f94fa70f.png) In above benchmark picture, the first and second benchmarks performed test cases with 1M tasks and the rest of benchmarks performed test cases with 10M tasks, both in unlimited goroutines and `ants` pool, and the capacity of this `ants` goroutine-pool was limited to 50K. -**As you can see, `ants` can up to 2x faster than goroutines without pool (10M tasks) and it only consumes half the memory comparing with goroutines without pool. (both 1M and 10M tasks)** +**As you can see, `ants` performs 2 times faster than goroutines without pool (10M tasks) and it only consumes half the memory comparing with goroutines without pool. (both in 1M and 10M tasks)** ### Benchmarks with PoolWithFunc ![](https://user-images.githubusercontent.com/7496278/51515565-1e3bdc80-1e4f-11e9-8a08-452ab91d117e.png) -### Throughput (it is suitable for scenarios where asynchronous tasks are submitted despite of the final results) +### Throughput (it is suitable for scenarios where tasks are submitted asynchronously without waiting for the final results) #### 100K tasks @@ -284,6 +307,18 @@ In above benchmark picture, the first and second benchmarks performed test cases ### Performance Summary -![](https://user-images.githubusercontent.com/7496278/52989641-51b65a80-343f-11e9-86c0-e855d97343ea.gif) +![](https://user-images.githubusercontent.com/7496278/63449727-3ae6d400-c473-11e9-81e3-8b3280d8288a.gif) + +**In conclusion, `ants` performs 2~6 times faster than goroutines without a pool and the memory consumption is reduced by 10 to 20 times.** + +# License + +Source code in `gnet` is available under the MIT [License](/LICENSE). + +# Relevant Articles + +- [Goroutine 并发调度模型深度解析之手撸一个高性能协程池](https://taohuawu.club/high-performance-implementation-of-goroutine-pool) + +# Users of ants (please feel free to add your projects here ~~) -**In conclusion, `ants` can up to 2x~6x faster than goroutines without a pool and the memory consumption is reduced by 10 to 20 times.** +[![](https://raw.githubusercontent.com/panjf2000/gnet/master/logo.png)](https://github.com/panjf2000/gnet) \ No newline at end of file diff --git a/vendor/github.com/panjf2000/ants/README_ZH.md b/vendor/github.com/panjf2000/ants/v2/README_ZH.md similarity index 81% rename from vendor/github.com/panjf2000/ants/README_ZH.md rename to vendor/github.com/panjf2000/ants/v2/README_ZH.md index 53b03e9..3cb47b7 100644 --- a/vendor/github.com/panjf2000/ants/README_ZH.md +++ b/vendor/github.com/panjf2000/ants/v2/README_ZH.md @@ -9,12 +9,13 @@ A goroutine pool for Go
- + +

-[英文](README.md) | [项目博客](https://taohuawu.club/high-performance-implementation-of-goroutine-pool) +# [[英文](README.md)] `ants`是一个高性能的协程池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制协程数量,复用资源,达到更高效执行任务的效果。 @@ -25,6 +26,7 @@ A goroutine pool for Go - 提供了友好的接口:任务提交、获取运行中的协程数量、动态调整协程池大小 - 优雅处理 panic,防止程序崩溃 - 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能 +- 非阻塞机制 ## 目前测试通过的Golang版本: @@ -33,6 +35,7 @@ A goroutine pool for Go - 1.10.x - 1.11.x - 1.12.x +- 1.13.x ## 安装 @@ -53,7 +56,7 @@ import ( "sync/atomic" "time" - "github.com/panjf2000/ants" + "github.com/panjf2000/ants/v2" ) var sum int32 @@ -88,12 +91,12 @@ func main() { fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("finish all tasks.\n") - // Use the pool with a method, + // Use the pool with a function, // set 10 to the capacity of goroutine pool and 1 second for expired duration. - p, _ := ants.NewPoolWithFunc(ants.Options{Capacity: 10, PoolFunc: func(i interface{}) { + p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { myFunc(i) wg.Done() - }}) + }) defer p.Release() // Submit tasks one by one. for i := 0; i < runTimes; i++ { @@ -114,7 +117,7 @@ import ( "io/ioutil" "net/http" - "github.com/panjf2000/ants" + "github.com/panjf2000/ants/v2" ) type Request struct { @@ -123,7 +126,7 @@ type Request struct { } func main() { - pool, _ := ants.NewPoolWithFunc(ants.Options{Capacity:100, PoolFunc:func(payload interface{}) { + pool, _ := ants.NewPoolWithFunc(100000, func(payload interface{}) { request, ok := payload.(*Request) if !ok { return @@ -136,7 +139,7 @@ func main() { }(request.Param) request.Result <- reverseParam - }}) + }) defer pool.Release() http.HandleFunc("/reverse", func(w http.ResponseWriter, r *http.Request) { @@ -165,9 +168,6 @@ func main() { ```go type Options struct { - // Capacity of the pool. - Capacity int - // ExpiryDuration set the expired time (second) of every worker. ExpiryDuration time.Duration @@ -186,13 +186,46 @@ type Options struct { // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. PanicHandler func(interface{}) +} + +func WithOptions(options Options) Option { + return func(opts *Options) { + *opts = options + } +} + +func WithExpiryDuration(expiryDuration time.Duration) Option { + return func(opts *Options) { + opts.ExpiryDuration = expiryDuration + } +} + +func WithPreAlloc(preAlloc bool) Option { + return func(opts *Options) { + opts.PreAlloc = preAlloc + } +} + +func WithMaxBlockingTasks(maxBlockingTasks int) Option { + return func(opts *Options) { + opts.MaxBlockingTasks = maxBlockingTasks + } +} + +func WithNonblocking(nonblocking bool) Option { + return func(opts *Options) { + opts.Nonblocking = nonblocking + } +} - // poolFunc is the function for processing tasks. - PoolFunc func(interface{}) +func WithPanicHandler(panicHandler func(interface{})) Option { + return func(opts *Options) { + opts.PanicHandler = panicHandler + } } ``` -你可以根据自己的需求在`ants.Options`中设置各个配置项的值,然后用它来初始化 goroutine pool. +通过在调用`NewPool`/`NewPoolWithFunc`之时使用各种 optional function,可以设置`ants.Options`中各个配置项的值,然后用它来定制化 goroutine pool. ## 自定义池 @@ -200,7 +233,7 @@ type Options struct { ``` go // Set 10000 the size of goroutine pool -p, _ := ants.NewPool(ants.Options{Capacity: 10000}) +p, _ := ants.NewPool(10000) ``` ## 任务提交 @@ -226,7 +259,7 @@ pool.Tune(100000) // Tune its capacity to 100000 ```go // ants will pre-malloc the whole capacity of pool when you invoke this function -p, _ := ants.NewPool(ants.Options{Capacity: AntsSize, PreAlloc: true}) +p, _ := ants.NewPool(100000, ants.WithPreAlloc(true)) ``` @@ -239,24 +272,12 @@ pool.Release() ## Benchmarks -系统参数: - -``` -OS: macOS High Sierra -Processor: 2.7 GHz Intel Core i5 -Memory: 8 GB 1867 MHz DDR3 - -Go Version: 1.9 -``` - - -
-上图中的前两个 benchmark 测试结果是基于100w 任务量的条件,剩下的几个是基于 1000w 任务量的测试结果,`ants`的默认池容量是 5w。 +上图中的前两个 benchmark 测试结果是基于100w 任务量的条件,剩下的几个是基于 1000w 任务量的测试结果,`ants` 的默认池容量是 5w。 - BenchmarkGoroutine-4 代表原生 goroutine -- BenchmarkPoolGroutine-4 代表使用协程池`ants` +- BenchmarkPoolGroutine-4 代表使用协程池 `ants` ### Benchmarks with Pool @@ -286,6 +307,18 @@ Go Version: 1.9 ### 性能小结 -![](https://user-images.githubusercontent.com/7496278/52989641-51b65a80-343f-11e9-86c0-e855d97343ea.gif) +![](https://user-images.githubusercontent.com/7496278/63449727-3ae6d400-c473-11e9-81e3-8b3280d8288a.gif) + +**从该 demo 测试吞吐性能对比可以看出,使用`ants`的吞吐性能相较于原生 goroutine 可以保持在 2-6 倍的性能压制,而内存消耗则可以达到 10-20 倍的节省优势。** + +# 证书 + +`gnet` 的源码允许用户在遵循 MIT [开源证书](/LICENSE) 规则的前提下使用。 + +# 相关文章 + +- [Goroutine 并发调度模型深度解析之手撸一个高性能协程池](https://taohuawu.club/high-performance-implementation-of-goroutine-pool) + +# 谁在使用 ants(欢迎补充 ~~) -**从该 demo 测试吞吐性能对比可以看出,使用`ants`的吞吐性能相较于原生 goroutine 可以保持在 2-6 倍的性能压制,而内存消耗则可以达到 10-20 倍的节省优势。** \ No newline at end of file +[![](https://raw.githubusercontent.com/panjf2000/gnet/master/logo.png)](https://github.com/panjf2000/gnet) \ No newline at end of file diff --git a/vendor/github.com/panjf2000/ants/ants.go b/vendor/github.com/panjf2000/ants/v2/ants.go similarity index 72% rename from vendor/github.com/panjf2000/ants/ants.go rename to vendor/github.com/panjf2000/ants/v2/ants.go index 5031c86..38a0dca 100644 --- a/vendor/github.com/panjf2000/ants/ants.go +++ b/vendor/github.com/panjf2000/ants/v2/ants.go @@ -30,11 +30,11 @@ import ( ) const ( - // DEFAULT_ANTS_POOL_SIZE is the default capacity for a default goroutine pool. - DEFAULT_ANTS_POOL_SIZE = math.MaxInt32 + // DefaultAntsPoolSize is the default capacity for a default goroutine pool. + DefaultAntsPoolSize = math.MaxInt32 - // DEFAULT_CLEAN_INTERVAL_TIME is the interval time to clean up goroutines. - DEFAULT_CLEAN_INTERVAL_TIME = 1 + // DefaultCleanIntervalTime is the interval time to clean up goroutines. + DefaultCleanIntervalTime = time.Second // CLOSED represents that the pool is closed. CLOSED = 1 @@ -77,13 +77,14 @@ var ( }() // Init a instance pool when importing ants. - defaultAntsPool, _ = NewPool(Options{Capacity: DEFAULT_ANTS_POOL_SIZE}) + defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) ) -type Options struct { - // Capacity of the pool. - Capacity int +// Option represents the optional function. +type Option func(opts *Options) +// Options contains all options which will be applied when instantiating a ants pool. +type Options struct { // ExpiryDuration set the expired time (second) of every worker. ExpiryDuration time.Duration @@ -102,9 +103,48 @@ type Options struct { // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. PanicHandler func(interface{}) +} + +// WithOptions accepts the whole options config. +func WithOptions(options Options) Option { + return func(opts *Options) { + *opts = options + } +} + +// WithExpiryDuration sets up the interval time of cleaning up goroutines. +func WithExpiryDuration(expiryDuration time.Duration) Option { + return func(opts *Options) { + opts.ExpiryDuration = expiryDuration + } +} + +// WithPreAlloc indicates whether it should malloc for workers. +func WithPreAlloc(preAlloc bool) Option { + return func(opts *Options) { + opts.PreAlloc = preAlloc + } +} + +// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool. +func WithMaxBlockingTasks(maxBlockingTasks int) Option { + return func(opts *Options) { + opts.MaxBlockingTasks = maxBlockingTasks + } +} + +// WithNonblocking indicates that pool will return nil when there is no available workers. +func WithNonblocking(nonblocking bool) Option { + return func(opts *Options) { + opts.Nonblocking = nonblocking + } +} - // PoolFunc is the function for processing tasks. - PoolFunc func(interface{}) +// WithPanicHandler sets up panic handler. +func WithPanicHandler(panicHandler func(interface{})) Option { + return func(opts *Options) { + opts.PanicHandler = panicHandler + } } // Submit submits a task to pool. diff --git a/vendor/github.com/panjf2000/ants/v2/go.mod b/vendor/github.com/panjf2000/ants/v2/go.mod new file mode 100644 index 0000000..201c358 --- /dev/null +++ b/vendor/github.com/panjf2000/ants/v2/go.mod @@ -0,0 +1,3 @@ +module github.com/panjf2000/ants/v2 + +go 1.12 diff --git a/vendor/github.com/panjf2000/ants/v2/internal/spinlock.go b/vendor/github.com/panjf2000/ants/v2/internal/spinlock.go new file mode 100644 index 0000000..16607ef --- /dev/null +++ b/vendor/github.com/panjf2000/ants/v2/internal/spinlock.go @@ -0,0 +1,28 @@ +// Copyright 2019 Andy Pan. All rights reserved. +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package internal + +import ( + "runtime" + "sync" + "sync/atomic" +) + +type spinLock uint32 + +func (sl *spinLock) Lock() { + for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) { + runtime.Gosched() + } +} + +func (sl *spinLock) Unlock() { + atomic.StoreUint32((*uint32)(sl), 0) +} + +// NewSpinLock instantiates a spin-lock. +func NewSpinLock() sync.Locker { + return new(spinLock) +} diff --git a/vendor/github.com/panjf2000/ants/pool.go b/vendor/github.com/panjf2000/ants/v2/pool.go similarity index 84% rename from vendor/github.com/panjf2000/ants/pool.go rename to vendor/github.com/panjf2000/ants/v2/pool.go index 91b7d7d..3f52a86 100644 --- a/vendor/github.com/panjf2000/ants/pool.go +++ b/vendor/github.com/panjf2000/ants/v2/pool.go @@ -26,6 +26,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/panjf2000/ants/v2/internal" ) // Pool accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. @@ -46,7 +48,7 @@ type Pool struct { release int32 // lock for synchronous operation. - lock sync.Mutex + lock sync.Locker // cond for waiting to get a idle worker. cond *sync.Cond @@ -89,9 +91,8 @@ func (p *Pool) periodicallyPurge() { p.lock.Lock() idleWorkers := p.workers n := len(idleWorkers) - i := 0 - for i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration { - i++ + var i int + for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration; i++ { } expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...) if i > 0 { @@ -122,37 +123,42 @@ func (p *Pool) periodicallyPurge() { } // NewPool generates an instance of ants pool. -func NewPool(opts Options) (*Pool, error) { - if opts.Capacity <= 0 { +func NewPool(size int, options ...Option) (*Pool, error) { + if size <= 0 { return nil, ErrInvalidPoolSize } + opts := new(Options) + for _, option := range options { + option(opts) + } + if expiry := opts.ExpiryDuration; expiry < 0 { return nil, ErrInvalidPoolExpiry } else if expiry == 0 { - opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second + opts.ExpiryDuration = DefaultCleanIntervalTime } - var p *Pool + p := &Pool{ + capacity: int32(size), + expiryDuration: opts.ExpiryDuration, + nonblocking: opts.Nonblocking, + maxBlockingTasks: int32(opts.MaxBlockingTasks), + panicHandler: opts.PanicHandler, + lock: internal.NewSpinLock(), + } + p.workerCache = sync.Pool{ + New: func() interface{} { + return &goWorker{ + pool: p, + task: make(chan func(), workerChanCap), + } + }, + } if opts.PreAlloc { - p = &Pool{ - capacity: int32(opts.Capacity), - expiryDuration: opts.ExpiryDuration, - workers: make([]*goWorker, 0, opts.Capacity), - nonblocking: opts.Nonblocking, - maxBlockingTasks: int32(opts.MaxBlockingTasks), - panicHandler: opts.PanicHandler, - } - } else { - p = &Pool{ - capacity: int32(opts.Capacity), - expiryDuration: opts.ExpiryDuration, - nonblocking: opts.Nonblocking, - maxBlockingTasks: int32(opts.MaxBlockingTasks), - panicHandler: opts.PanicHandler, - } + p.workers = make([]*goWorker, 0, size) } - p.cond = sync.NewCond(&p.lock) + p.cond = sync.NewCond(p.lock) // Start a goroutine to clean up expired workers periodically. go p.periodicallyPurge() @@ -167,11 +173,11 @@ func (p *Pool) Submit(task func()) error { if atomic.LoadInt32(&p.release) == CLOSED { return ErrPoolClosed } - if w := p.retrieveWorker(); w == nil { + var w *goWorker + if w = p.retrieveWorker(); w == nil { return ErrPoolOverload - } else { - w.task <- task } + w.task <- task return nil } @@ -182,7 +188,7 @@ func (p *Pool) Running() int { // Free returns the available goroutines to work. func (p *Pool) Free() int { - return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) + return p.Cap() - p.Running() } // Cap returns the capacity of this pool. @@ -196,10 +202,6 @@ func (p *Pool) Tune(size int) { return } atomic.StoreInt32(&p.capacity, int32(size)) - diff := p.Running() - size - for i := 0; i < diff; i++ { - p.retrieveWorker().task <- nil - } } // Release Closes this pool. @@ -233,14 +235,7 @@ func (p *Pool) decRunning() { func (p *Pool) retrieveWorker() *goWorker { var w *goWorker spawnWorker := func() { - if cacheWorker := p.workerCache.Get(); cacheWorker != nil { - w = cacheWorker.(*goWorker) - } else { - w = &goWorker{ - pool: p, - task: make(chan func(), workerChanCap), - } - } + w = p.workerCache.Get().(*goWorker) w.run() } @@ -287,12 +282,13 @@ func (p *Pool) retrieveWorker() *goWorker { // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) revertWorker(worker *goWorker) bool { - if atomic.LoadInt32(&p.release) == CLOSED { + if atomic.LoadInt32(&p.release) == CLOSED || p.Running() > p.Cap() { return false } worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) + // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. p.cond.Signal() p.lock.Unlock() diff --git a/vendor/github.com/panjf2000/ants/pool_func.go b/vendor/github.com/panjf2000/ants/v2/pool_func.go similarity index 83% rename from vendor/github.com/panjf2000/ants/pool_func.go rename to vendor/github.com/panjf2000/ants/v2/pool_func.go index 3360c62..b174d21 100644 --- a/vendor/github.com/panjf2000/ants/pool_func.go +++ b/vendor/github.com/panjf2000/ants/v2/pool_func.go @@ -26,6 +26,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/panjf2000/ants/v2/internal" ) // PoolWithFunc accept the tasks from client, it limits the total of goroutines to a given number by recycling goroutines. @@ -46,7 +48,7 @@ type PoolWithFunc struct { release int32 // lock for synchronous operation. - lock sync.Mutex + lock sync.Locker // cond for waiting to get a idle worker. cond *sync.Cond @@ -92,9 +94,8 @@ func (p *PoolWithFunc) periodicallyPurge() { p.lock.Lock() idleWorkers := p.workers n := len(idleWorkers) - i := 0 - for i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration { - i++ + var i int + for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration; i++ { } expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...) if i > 0 { @@ -125,44 +126,51 @@ func (p *PoolWithFunc) periodicallyPurge() { } // NewPoolWithFunc generates an instance of ants pool with a specific function. -func NewPoolWithFunc(opts Options) (*PoolWithFunc, error) { - if opts.Capacity <= 0 { +func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { + if size <= 0 { return nil, ErrInvalidPoolSize } - if opts.PoolFunc == nil { + if pf == nil { return nil, ErrLackPoolFunc } + opts := new(Options) + for _, option := range options { + option(opts) + } + if expiry := opts.ExpiryDuration; expiry < 0 { return nil, ErrInvalidPoolExpiry } else if expiry == 0 { - opts.ExpiryDuration = time.Duration(DEFAULT_CLEAN_INTERVAL_TIME) * time.Second + opts.ExpiryDuration = DefaultCleanIntervalTime } - var p *PoolWithFunc + p := &PoolWithFunc{ + capacity: int32(size), + expiryDuration: opts.ExpiryDuration, + poolFunc: pf, + nonblocking: opts.Nonblocking, + maxBlockingTasks: int32(opts.MaxBlockingTasks), + panicHandler: opts.PanicHandler, + lock: internal.NewSpinLock(), + } + p.workerCache = sync.Pool{ + New: func() interface{} { + return &goWorkerWithFunc{ + pool: p, + args: make(chan interface{}, workerChanCap), + } + }, + } if opts.PreAlloc { - p = &PoolWithFunc{ - capacity: int32(opts.Capacity), - expiryDuration: opts.ExpiryDuration, - poolFunc: opts.PoolFunc, - workers: make([]*goWorkerWithFunc, 0, opts.Capacity), - nonblocking: opts.Nonblocking, - maxBlockingTasks: int32(opts.MaxBlockingTasks), - panicHandler: opts.PanicHandler, - } - } else { - p = &PoolWithFunc{ - capacity: int32(opts.Capacity), - expiryDuration: opts.ExpiryDuration, - poolFunc: opts.PoolFunc, - nonblocking: opts.Nonblocking, - maxBlockingTasks: int32(opts.MaxBlockingTasks), - panicHandler: opts.PanicHandler, - } + p.workers = make([]*goWorkerWithFunc, 0, size) } - p.cond = sync.NewCond(&p.lock) + p.cond = sync.NewCond(p.lock) + + // Start a goroutine to clean up expired workers periodically. go p.periodicallyPurge() + return p, nil } @@ -173,11 +181,11 @@ func (p *PoolWithFunc) Invoke(args interface{}) error { if atomic.LoadInt32(&p.release) == CLOSED { return ErrPoolClosed } - if w := p.retrieveWorker(); w == nil { + var w *goWorkerWithFunc + if w = p.retrieveWorker(); w == nil { return ErrPoolOverload - } else { - w.args <- args } + w.args <- args return nil } @@ -188,7 +196,7 @@ func (p *PoolWithFunc) Running() int { // Free returns a available goroutines to work. func (p *PoolWithFunc) Free() int { - return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running)) + return p.Cap() - p.Running() } // Cap returns the capacity of this pool. @@ -202,10 +210,6 @@ func (p *PoolWithFunc) Tune(size int) { return } atomic.StoreInt32(&p.capacity, int32(size)) - diff := p.Running() - size - for i := 0; i < diff; i++ { - p.retrieveWorker().args <- nil - } } // Release Closed this pool. @@ -239,14 +243,7 @@ func (p *PoolWithFunc) decRunning() { func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc { var w *goWorkerWithFunc spawnWorker := func() { - if cacheWorker := p.workerCache.Get(); cacheWorker != nil { - w = cacheWorker.(*goWorkerWithFunc) - } else { - w = &goWorkerWithFunc{ - pool: p, - args: make(chan interface{}, workerChanCap), - } - } + w = p.workerCache.Get().(*goWorkerWithFunc) w.run() } @@ -293,12 +290,13 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc { // revertWorker puts a worker back into free pool, recycling the goroutines. func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool { - if atomic.LoadInt32(&p.release) == CLOSED { + if atomic.LoadInt32(&p.release) == CLOSED || p.Running() > p.Cap() { return false } worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) + // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. p.cond.Signal() p.lock.Unlock() diff --git a/vendor/github.com/panjf2000/ants/worker.go b/vendor/github.com/panjf2000/ants/v2/worker.go similarity index 94% rename from vendor/github.com/panjf2000/ants/worker.go rename to vendor/github.com/panjf2000/ants/v2/worker.go index b79d9b8..f38e9d2 100644 --- a/vendor/github.com/panjf2000/ants/worker.go +++ b/vendor/github.com/panjf2000/ants/v2/worker.go @@ -48,9 +48,8 @@ func (w *goWorker) run() { w.pool.incRunning() go func() { defer func() { + w.pool.decRunning() if p := recover(); p != nil { - w.pool.decRunning() - w.pool.workerCache.Put(w) if w.pool.panicHandler != nil { w.pool.panicHandler(p) } else { @@ -60,17 +59,16 @@ func (w *goWorker) run() { log.Printf("worker exits from panic: %s\n", string(buf[:n])) } } + w.pool.workerCache.Put(w) }() for f := range w.task { if f == nil { - w.pool.decRunning() - w.pool.workerCache.Put(w) return } f() if ok := w.pool.revertWorker(w); !ok { - break + return } } }() diff --git a/vendor/github.com/panjf2000/ants/worker_func.go b/vendor/github.com/panjf2000/ants/v2/worker_func.go similarity index 94% rename from vendor/github.com/panjf2000/ants/worker_func.go rename to vendor/github.com/panjf2000/ants/v2/worker_func.go index befd966..4c56ae8 100644 --- a/vendor/github.com/panjf2000/ants/worker_func.go +++ b/vendor/github.com/panjf2000/ants/v2/worker_func.go @@ -48,9 +48,8 @@ func (w *goWorkerWithFunc) run() { w.pool.incRunning() go func() { defer func() { + w.pool.decRunning() if p := recover(); p != nil { - w.pool.decRunning() - w.pool.workerCache.Put(w) if w.pool.panicHandler != nil { w.pool.panicHandler(p) } else { @@ -60,17 +59,16 @@ func (w *goWorkerWithFunc) run() { log.Printf("worker with func exits from panic: %s\n", string(buf[:n])) } } + w.pool.workerCache.Put(w) }() for args := range w.args { if args == nil { - w.pool.decRunning() - w.pool.workerCache.Put(w) return } w.pool.poolFunc(args) if ok := w.pool.revertWorker(w); !ok { - break + return } } }() diff --git a/vendor/modules.txt b/vendor/modules.txt index 2a588fc..28af56c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -48,8 +48,9 @@ github.com/mitchellh/go-homedir github.com/mitchellh/mapstructure # github.com/oklog/run v1.0.0 github.com/oklog/run -# github.com/panjf2000/ants v0.0.0-20190820151255-b60dfa8c16b0 -github.com/panjf2000/ants +# github.com/panjf2000/ants/v2 v2.1.1 +github.com/panjf2000/ants/v2 +github.com/panjf2000/ants/v2/internal # github.com/pelletier/go-toml v1.2.0 github.com/pelletier/go-toml # github.com/pkg/errors v0.8.1