-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rework validator pipeline #176
Conversation
test failures are known, will fix in a separate pr. |
pubsub.go
Outdated
select { | ||
case p.validateThrottle <- struct{}{}: | ||
go func() { | ||
p.doValidateTopic(vals, src, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this defeat the point of having a worker pool? Is there any reason to spawn this as a goroutine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it kind of does. But at the moment all registered validators are asynchronous which means they can block, and we need to execute them in a goroutine.
Having said that, this is an artifact of WIP; I want to separate the validators to synchronous and asynchronous, and execute the synchronous ones inline.
The asynchronous can be in a goroutine or even better in an asynchronous api that doesn't use throttles at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, since we are going to spawn a goroutine for async validators, we can still use the throttles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. But why not just have 1024 synchronous validators (well, dynamically adjusting)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The asynchronous validators can block at will, I don't think we should have pinned goroutines for them. The cost of actually spawning the goroutine shouldn't be too high.
Also, if we had that many synchronous goroutines we could overtax the system doing synchronous validation.
That is we could end up validating signatures for 1024 messages concurrently which is not the intention (oversubscribes the cpu by a lot).
Perhaps we could have a second pool with prespawned (or dynamically adjustable) goroutines for the async validators but I don't think we gain much for the complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So we're assuming that some validators will be CPU bound and others will won't. Async validators are those non-CPU bound validators.
This is really where async/await shines.
Perhaps we could have a second pool with prespawned (or dynamically adjustable) goroutines for the async validators but I don't think we gain much for the complexity.
We can revisit this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to allow multiple validators per topic: one could register an inline validator that fails fast with basic structural validation and an async validator that fetches stuff from the network for deep validation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultimately I'd like to see something like a claim-check pattern here, but we can iterate towards that later. In summary:
- The validator pipeline sends the message to the validator via a channel. The message includes the message ID, a deadline until which a result will be accepted, and a response channel.
- The validator is free to manage its own goroutine pool, without the pipeline making assumptions whether the task is CPU or IO-bound.
- The validator performs validation.
- The validator returns the result to the pipeline via a channel, reporting the message ID and the result (OK/KO).
pubsub.go
Outdated
@@ -209,6 +217,11 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option | |||
|
|||
go ps.processLoop(ctx) | |||
|
|||
numcpu := runtime.NumCPU() | |||
for i := 0; i < numcpu; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this numcpu
be a configurable number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make it configurable, but is there any benefit in using more than NumCPU
synchronous validation workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the validation function is not CPU-bound, yes. That’s the case for Eth2.0 folks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These validators will be asynchronous and be executed in a goroutine.
This is the number of workers for the validation front-end; it handles signature verification, synchronous validators (TBD), and spawning goroutines for async validators with the throttle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added inline (synchronous) validation supprt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we give a number less than numcpu
, would that help other components of the program utilize cpu resource better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hrm maybe. We can add an option to make it less of a hog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added option to control the number of workers.
pubsub.go
Outdated
@@ -183,6 +190,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option | |||
topics: make(map[string]map[peer.ID]struct{}), | |||
peers: make(map[peer.ID]chan *RPC), | |||
topicVals: make(map[string]*topicVal), | |||
validateQ: make(chan *validateReq, 32), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the size of validateQ
the same as or greater than defaultValidateThrottle
? Though it seems defaultValidateThrottle
is the potential max number of the concurrent goroutines previously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, or we will have the same problems with overloading we have previously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to explain: the throttle controls the number of active asynchronous validation requests, which can take a while to complete.
The queue controls the front end to the validation pipeline, which can drop messages if we are overloaded - see #172.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Big thanks for the clear explanation. Are my following understandings correct?
validatorQ
with the workers serve as the rate-limiting gate- Except for the removal of the timeout, the asynchronous verification part is roughly the same as the one before this PR(except for the fact that signature verification is done synchronously by the workers).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is correct.
I've removed the WIP moniker, this is ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a quick look at this, shaping up well!
Now that validation is becoming more complex, could we please encapsulate it behind a ValidationPipeline
type?
The logic is a bit scattered over the place now, and it's difficult to navigate how the different aspects interplay with one another, e.g. inline
, timeout
, throttles
, concurrency, etc.
On a related note, do you feel strongly about async validator throttling being enforced by pubsub? I think this may be an artificial need arising from us spawning a single goroutine per validator call -- once you do that you need a mechanism to prevent DoS, and hence you introduce a throttling mechanism. If, however, we made async validators truly async by using channels, each validator would be in charge of accepting or rejecting jobs, and could use a different (more efficient) concurrency model instead of 1 goroutine per task. |
We can certainly do that. Or we can lump all validation related logic in its own file, which
I don't have particularly strong views, other than having a throttle somewhere. |
I have split off the validation logic into its own component, per @raulk's request. Note that the diff has become rather dirty because of this, and it's hard to see exactly what changed. |
018dfbb
to
9f0263a
Compare
Rebased on master for test fixes in #178. |
…eline-update Upgrade to inflight pubsub This upgrade reworks the validator pipeline in pubsub. Testing shows that this is far more reliable than the commit we're currently pinned to. This has the unfortunate side-effect of pinning us to a commit that's currently under PR (in libp2p). A subsequent PR will need to upgrade this commit we're pinned to, to the latest go-libp2p-pubsub master. See libp2p/go-libp2p-pubsub#176
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the long turn-around! This looks really good, @vyzo. The separation of the validation logic is clean, and I'm excited to roll this out. Just a few small items, but nothing blocking from me. Good job.
|
||
// validateWorker is an active goroutine performing inline validation | ||
func (v *validation) validateWorker() { | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to use a waitgroup here for an orderly shutdown of workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could, but is it necessary?
for { | ||
select { | ||
case req := <-v.validateQ: | ||
v.validate(req.vals, req.src, req.msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think validate should take a context. Otherwise we risk blocking a system shutdown if a lengthy validation is inflight.
Ah, nevermind. I see that validate
does pass in the pubsub context to the validator itself.
I am going to rebase for merge. |
…lidation throttle. and some better documentation.
93590ff
to
f4d9eee
Compare
…eline-update Upgrade to inflight pubsub This upgrade reworks the validator pipeline in pubsub. Testing shows that this is far more reliable than the commit we're currently pinned to. This has the unfortunate side-effect of pinning us to a commit that's currently under PR (in libp2p). A subsequent PR will need to upgrade this commit we're pinned to, to the latest go-libp2p-pubsub master. See libp2p/go-libp2p-pubsub#176
and needs to be offloaded.See #172 #173 #169
Closes #103