Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve @enforce_ordering to leverage a wait channel to avoid spinlocks #144

Merged
merged 10 commits into from
May 12, 2016
30 changes: 18 additions & 12 deletions channels/sessions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import functools
import hashlib
import warnings
from importlib import import_module

from django.conf import settings
Expand Down Expand Up @@ -71,7 +70,7 @@ def enforce_ordering(func=None, slight=False):
Enforces either slight (order=0 comes first, everything else isn't ordered)
or strict (all messages exactly ordered) ordering against a reply_channel.

Uses sessions to track ordering.
Uses sessions to track ordering and socket-specific wait channels for unordered messages.

You cannot mix slight ordering and strict ordering on a channel; slight
ordering does not write to the session after the first message to improve
Expand All @@ -91,19 +90,26 @@ def inner(message, *args, **kwargs):
# See what the current next order should be
next_order = message.channel_session.get("__channels_next_order", 0)
if order == next_order or (slight and next_order > 0):
# Message is in right order. Maybe persist next one?
# Run consumer
func(message, *args, **kwargs)
# Mark next message order as available for running
if order == 0 or not slight:
message.channel_session["__channels_next_order"] = order + 1
# Run consumer
return func(message, *args, **kwargs)
message.channel_session.save()
# Requeue any pending wait channel messages for this socket connection back onto it's original channel
while True:
wait_channel = "wait.%s" % message.reply_channel.name
channel, content = message.channel_layer.receive_many([wait_channel], block=False)
if channel and content:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only need "if channel" here (if channel is None content always will be)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this

original_channel = content.pop("original_channel")
message.channel_layer.send(original_channel, content)
else:
break
else:
# Bad ordering - warn if we're getting close to the limit
if getattr(message, "__doomed__", False):
warnings.warn(
"Enforce ordering consumer reached retry limit, message " +
"being dropped. Did you decorate all protocol consumers correctly?"
)
raise ConsumeLater()
# Since out of order, enqueue message temporarily to wait channel for this socket connection
wait_channel = "wait.%s" % message.reply_channel.name
message.content["original_channel"] = message.channel.name
message.channel_layer.send(wait_channel, message.content)
return inner
if func is not None:
return decorator(func)
Expand Down