Skip to content

Commit

Permalink
add support for inline (synchronous) validators
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed Apr 26, 2019
1 parent 00d83c1 commit ae804d1
Showing 1 changed file with 34 additions and 3 deletions.
37 changes: 34 additions & 3 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,29 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) {
return
}

if len(vals) > 0 {
var inline, async []*topicVal
for _, val := range vals {
if val.validateInline {
inline = append(inline, val)
} else {
async = append(async, val)
}
}

// apply inline (synchronous) validators
for _, val := range inline {
if !val.validateMsg(p.ctx, src, msg) {
log.Debugf("message validation failed; dropping message from %s", src)
return
}
}

// apply async validators
if len(async) > 0 {
select {
case p.validateThrottle <- struct{}{}:
go func() {
p.doValidateTopic(vals, src, msg)
p.doValidateTopic(async, src, msg)
<-p.validateThrottle
}()
default:
Expand All @@ -709,7 +727,7 @@ func (p *PubSub) validate(vals []*topicVal, src peer.ID, msg *Message) {
return
}

// no user validators and the signature was valid, send the message
// no async validators, send the message
p.sendMsg <- &sendReq{
from: src,
msg: msg,
Expand Down Expand Up @@ -945,6 +963,7 @@ type addValReq struct {
validate Validator
timeout time.Duration
throttle int
inline bool
resp chan error
}

Expand All @@ -958,6 +977,7 @@ type topicVal struct {
validate Validator
validateTimeout time.Duration
validateThrottle chan struct{}
validateInline bool
}

// Validator is a function that validates a message.
Expand All @@ -984,6 +1004,16 @@ func WithValidatorConcurrency(n int) ValidatorOpt {
}
}

// WithValidatorInline is an option that sets the validation disposition to synchronous:
// it will be executed inline in validation front-end, without spawning a new goroutine.
// This is suitable for simple or cpu-bound validators that do not block.
func WithValidatorInline(inline bool) ValidatorOpt {
return func(addVal *addValReq) error {
addVal.inline = inline
return nil
}
}

// RegisterTopicValidator registers a validator for topic.
// By default validators are asynchronous, which means they will run in a separate goroutine.
// The number of active goroutines is controlled by global and per topic validator
Expand Down Expand Up @@ -1020,6 +1050,7 @@ func (ps *PubSub) addValidator(req *addValReq) {
validate: req.validate,
validateTimeout: 0,
validateThrottle: make(chan struct{}, defaultValidateConcurrency),
validateInline: req.inline,
}

if req.timeout > 0 {
Expand Down

0 comments on commit ae804d1

Please sign in to comment.