-
-
Notifications
You must be signed in to change notification settings - Fork 810
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
improve @enforce_ordering to leverage a wait channel to avoid spinlocks #144
Conversation
while True: | ||
wait_channel = "wait.%s" % message.reply_channel.name | ||
channel, content = message.channel_layer.receive_many([wait_channel], block=False) | ||
if channel and content: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You only need "if channel" here (if channel is None content always will be)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated this
Yeah, this seems just like what I was thinking. Couple of things:
|
|
Yes, the only situation I can come up with where #3 ends up on the wait channel without #2 not being running is if #2 has died, at which point you arguably don't want any other packets processing anyway. You'll end up with a zombie socket that has no way of telling it's queueing everything into a void, but at least its messages will get cleaned up after the expiry delay. Also, given the new backpressure stuff I added this week, you'll need to handle the possibility that |
I added handling for the channel_layer.ChannelFull exception by re-raising the same exception with specific messaging on what in-fact happened. I attempted to address your suggestion on enabling closing the websocket by adding an optional close_on_error parameter to the decorator that is default True which will send a "close": True message to the reply channel whenever the ChannelFull exception is raised. I'm not sure why the CI integration is failing on these changes. The Travis CI error report seems to suggest channel_layer is None in message.channel_layer, but not sure how that could be? |
Ah, some of the unit tests set channel_layer to None if they're just testing message parsing (it's one of the arguments to the Message constructor) - you likely need to just patch the offending test to take |
Got it. I updated all the @enforce_ordering unit tests to also make more relevant assertions against the wait channel since they no longer raise ConsumeLater. We should be all set now. |
Urgh, sorry to be nitpicky again, but your close_on_error implementation is currently specific to the WebSocket endpoint - other protocols are likely to turn up with an 'order' field but without the same close mechanism; can we pull it out for now and I'll merge it like that? |
Oh good point, have only been using websockets myself. I pulled it. |
Merged! Thank you so much for your work on this. |
…ks (django#144) * improved @enforce_ordering to leverage a wait channel to avoid spinlocks * addressed pyflake issues * renamed wait channel to __wait__.<reply channel> * handled potential ChannelFull exception * updated sessions unit tests * updated enforce_ordering tests to reflect new approach of leveraging wait channels * addressed pyflake issues * more pyflake fixes * removed close_on_error handling on enforce_ordering since only worked on websockets
@andrewgodwin I tried my hand at implementing the approach we discussed for improving @enforce_ordering to avoid spinlocks and hitting the current retry limits
Discussion here: #141
Have a look and let me know if this is what you were generally thinking. This seems to be working well for me in my use-case in development thus far.
For the wait channel name, I'm currently using "wait.%s " % reply_channel.name
I had to force the channel session to save prior to finishing the method in order to enforce the strict ordering that we discussed of marking next message order as available for running prior to re-queueing wait channel message.