Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve @enforce_ordering to leverage a wait channel to avoid spinlocks #144

Merged
merged 10 commits into from
May 12, 2016
40 changes: 29 additions & 11 deletions channels/sessions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import functools
import hashlib
import warnings
from importlib import import_module

from django.conf import settings
Expand Down Expand Up @@ -75,7 +74,7 @@ def enforce_ordering(func=None, slight=False):
Enforces either slight (order=0 comes first, everything else isn't ordered)
or strict (all messages exactly ordered) ordering against a reply_channel.

Uses sessions to track ordering.
Uses sessions to track ordering and socket-specific wait channels for unordered messages.

You cannot mix slight ordering and strict ordering on a channel; slight
ordering does not write to the session after the first message to improve
Expand All @@ -95,19 +94,38 @@ def inner(message, *args, **kwargs):
# See what the current next order should be
next_order = message.channel_session.get("__channels_next_order", 0)
if order == next_order or (slight and next_order > 0):
# Message is in right order. Maybe persist next one?
# Run consumer
func(message, *args, **kwargs)
# Mark next message order as available for running
if order == 0 or not slight:
message.channel_session["__channels_next_order"] = order + 1
# Run consumer
return func(message, *args, **kwargs)
message.channel_session.save()
# Requeue any pending wait channel messages for this socket connection back onto it's original channel
while True:
wait_channel = "__wait__.%s" % message.reply_channel.name
channel, content = message.channel_layer.receive_many([wait_channel], block=False)
if channel:
original_channel = content.pop("original_channel")
try:
message.channel_layer.send(original_channel, content)
except message.channel_layer.ChannelFull:
raise message.channel_layer.ChannelFull(
"Cannot requeue pending __wait__ channel message " +
"back on to already full channel %s" % original_channel
)
else:
break
else:
# Bad ordering - warn if we're getting close to the limit
if getattr(message, "__doomed__", False):
warnings.warn(
"Enforce ordering consumer reached retry limit, message " +
"being dropped. Did you decorate all protocol consumers correctly?"
# Since out of order, enqueue message temporarily to wait channel for this socket connection
wait_channel = "__wait__.%s" % message.reply_channel.name
message.content["original_channel"] = message.channel.name
try:
message.channel_layer.send(wait_channel, message.content)
except message.channel_layer.ChannelFull:
raise message.channel_layer.ChannelFull(
"Cannot add unordered message to already " +
"full __wait__ channel for socket %s" % message.reply_channel.name
)
raise ConsumeLater()
return inner
if func is not None:
return decorator(func)
Expand Down
88 changes: 73 additions & 15 deletions channels/tests/test_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

from django.conf import settings
from django.test import override_settings
from channels.exceptions import ConsumeLater
from channels.message import Message
from channels.sessions import channel_session, http_session, enforce_ordering, session_for_reply_channel
from channels.tests import ChannelTestCase
from channels import DEFAULT_CHANNEL_LAYER, channel_layers


@override_settings(SESSION_ENGINE="django.contrib.sessions.backends.cache")
Expand Down Expand Up @@ -110,9 +110,21 @@ def test_enforce_ordering_slight(self):
Tests that slight mode of enforce_ordering works
"""
# Construct messages to send
message0 = Message({"reply_channel": "test-reply-a", "order": 0}, None, None)
message1 = Message({"reply_channel": "test-reply-a", "order": 1}, None, None)
message2 = Message({"reply_channel": "test-reply-a", "order": 2}, None, None)
message0 = Message(
{"reply_channel": "test-reply-a", "order": 0},
"websocket.connect",
channel_layers[DEFAULT_CHANNEL_LAYER]
)
message1 = Message(
{"reply_channel": "test-reply-a", "order": 1},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)
message2 = Message(
{"reply_channel": "test-reply-a", "order": 2},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)

# Run them in an acceptable slight order
@enforce_ordering(slight=True)
Expand All @@ -123,29 +135,54 @@ def inner(message):
inner(message2)
inner(message1)

# Ensure wait channel is empty
wait_channel = "__wait__.%s" % "test-reply-a"
next_message = self.get_next_message(wait_channel)
self.assertEqual(next_message, None)

def test_enforce_ordering_slight_fail(self):
"""
Tests that slight mode of enforce_ordering fails on bad ordering
"""
# Construct messages to send
message2 = Message({"reply_channel": "test-reply-e", "order": 2}, None, None)
message2 = Message(
{"reply_channel": "test-reply-e", "order": 2},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)

# Run them in an acceptable strict order
@enforce_ordering(slight=True)
def inner(message):
pass

with self.assertRaises(ConsumeLater):
inner(message2)
inner(message2)

# Ensure wait channel is not empty
wait_channel = "__wait__.%s" % "test-reply-e"
next_message = self.get_next_message(wait_channel)
self.assertNotEqual(next_message, None)

def test_enforce_ordering_strict(self):
"""
Tests that strict mode of enforce_ordering works
"""
# Construct messages to send
message0 = Message({"reply_channel": "test-reply-b", "order": 0}, None, None)
message1 = Message({"reply_channel": "test-reply-b", "order": 1}, None, None)
message2 = Message({"reply_channel": "test-reply-b", "order": 2}, None, None)
message0 = Message(
{"reply_channel": "test-reply-b", "order": 0},
"websocket.connect",
channel_layers[DEFAULT_CHANNEL_LAYER]
)
message1 = Message(
{"reply_channel": "test-reply-b", "order": 1},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)
message2 = Message(
{"reply_channel": "test-reply-b", "order": 2},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)

# Run them in an acceptable strict order
@enforce_ordering
Expand All @@ -156,28 +193,49 @@ def inner(message):
inner(message1)
inner(message2)

# Ensure wait channel is empty
wait_channel = "__wait__.%s" % "test-reply-b"
next_message = self.get_next_message(wait_channel)
self.assertEqual(next_message, None)

def test_enforce_ordering_strict_fail(self):
"""
Tests that strict mode of enforce_ordering fails on bad ordering
"""
# Construct messages to send
message0 = Message({"reply_channel": "test-reply-c", "order": 0}, None, None)
message2 = Message({"reply_channel": "test-reply-c", "order": 2}, None, None)
message0 = Message(
{"reply_channel": "test-reply-c", "order": 0},
"websocket.connect",
channel_layers[DEFAULT_CHANNEL_LAYER]
)
message2 = Message(
{"reply_channel": "test-reply-c", "order": 2},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)

# Run them in an acceptable strict order
@enforce_ordering
def inner(message):
pass

inner(message0)
with self.assertRaises(ConsumeLater):
inner(message2)
inner(message2)

# Ensure wait channel is not empty
wait_channel = "__wait__.%s" % "test-reply-c"
next_message = self.get_next_message(wait_channel)
self.assertNotEqual(next_message, None)

def test_enforce_ordering_fail_no_order(self):
"""
Makes sure messages with no "order" key fail
"""
message0 = Message({"reply_channel": "test-reply-d"}, None, None)
message0 = Message(
{"reply_channel": "test-reply-d"},
None,
channel_layers[DEFAULT_CHANNEL_LAYER]
)

@enforce_ordering(slight=True)
def inner(message):
Expand Down