Skip to content

Commit

Permalink
add option to control number of synchronous validation workers
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed Apr 26, 2019
1 parent ae804d1 commit 21d9b9b
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type PubSub struct {
// validateThrottle limits the number of active validation goroutines
validateThrottle chan struct{}

// this is the number of synchronous validation workers
validateWorkers int

// eval thunk in event loop
eval chan func()

Expand Down Expand Up @@ -194,6 +197,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()),
validateWorkers: runtime.NumCPU(),
}

for _, opt := range opts {
Expand All @@ -216,8 +220,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option

go ps.processLoop(ctx)

numcpu := runtime.NumCPU()
for i := 0; i < numcpu; i++ {
for i := 0; i < ps.validateWorkers; i++ {
go ps.validateWorker()
}

Expand All @@ -233,6 +236,18 @@ func WithValidateThrottle(n int) Option {
}
}

// WithValidateWorkers sets the number of synchronous validation worker goroutines.
// Defaults to NumCPU.
func WithValidateWorkers(n int) Option {
return func(ps *PubSub) error {
if n > 0 {
ps.validateWorkers = n
return nil
}
return fmt.Errorf("number of validation workers must be > 0")
}
}

// WithMessageSigning enables or disables message signing (enabled by default).
func WithMessageSigning(enabled bool) Option {
return func(p *PubSub) error {
Expand Down

0 comments on commit 21d9b9b

Please sign in to comment.