diff --git a/pubsub.go b/pubsub.go index 3ff2dd54..8dc9219f 100644 --- a/pubsub.go +++ b/pubsub.go @@ -99,6 +99,9 @@ type PubSub struct { // validateThrottle limits the number of active validation goroutines validateThrottle chan struct{} + // this is the number of synchronous validation workers + validateWorkers int + // eval thunk in event loop eval chan func() @@ -194,6 +197,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option blacklistPeer: make(chan peer.ID), seenMessages: timecache.NewTimeCache(TimeCacheDuration), counter: uint64(time.Now().UnixNano()), + validateWorkers: runtime.NumCPU(), } for _, opt := range opts { @@ -216,8 +220,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option go ps.processLoop(ctx) - numcpu := runtime.NumCPU() - for i := 0; i < numcpu; i++ { + for i := 0; i < ps.validateWorkers; i++ { go ps.validateWorker() } @@ -233,6 +236,18 @@ func WithValidateThrottle(n int) Option { } } +// WithValidateWorkers sets the number of synchronous validation worker goroutines. +// Defaults to NumCPU. +func WithValidateWorkers(n int) Option { + return func(ps *PubSub) error { + if n > 0 { + ps.validateWorkers = n + return nil + } + return fmt.Errorf("number of validation workers must be > 0") + } +} + // WithMessageSigning enables or disables message signing (enabled by default). func WithMessageSigning(enabled bool) Option { return func(p *PubSub) error {