Skip to content

Commit

Permalink
improve @enforce_ordering to leverage a wait channel to avoid spinloc…
Browse files Browse the repository at this point in the history
…ks (django#144)

* improved @enforce_ordering to leverage a wait channel to avoid spinlocks

* addressed pyflake issues

* renamed wait channel to __wait__.<reply channel>

* handled potential ChannelFull exception

* updated sessions unit tests

* updated enforce_ordering tests to reflect new approach of leveraging wait channels

* addressed pyflake issues

* more pyflake fixes

* removed close_on_error handling on enforce_ordering since only worked on websockets
  • Loading branch information
sachinrekhi authored and Krukov committed May 15, 2016
1 parent 4d8bcc6 commit e1a325e
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 26 deletions.
40 changes: 29 additions & 11 deletions channels/sessions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import functools
import hashlib
import warnings
from importlib import import_module

from django.conf import settings
Expand Down Expand Up @@ -75,7 +74,7 @@ def enforce_ordering(func=None, slight=False):
Enforces either slight (order=0 comes first, everything else isn't ordered)
or strict (all messages exactly ordered) ordering against a reply_channel.
Uses sessions to track ordering.
Uses sessions to track ordering and socket-specific wait channels for unordered messages.
You cannot mix slight ordering and strict ordering on a channel; slight
ordering does not write to the session after the first message to improve
Expand All @@ -95,19 +94,38 @@ def inner(message, *args, **kwargs):
# See what the current next order should be
next_order = message.channel_session.get("__channels_next_order", 0)
if order == next_order or (slight and next_order > 0):
# Message is in right order. Maybe persist next one?
# Run consumer
func(message, *args, **kwargs)
# Mark next message order as available for running
if order == 0 or not slight:
message.channel_session["__channels_next_order"] = order + 1
# Run consumer
return func(message, *args, **kwargs)
message.channel_session.save()
# Requeue any pending wait channel messages for this socket connection back onto it's original channel
while True:
wait_channel = "__wait__.%s" % message.reply_channel.name
channel, content = message.channel_layer.receive_many([wait_channel], block=False)
if channel:
original_channel = content.pop("original_channel")
try:
message.channel_layer.send(original_channel, content)
except message.channel_layer.ChannelFull:
raise message.channel_layer.ChannelFull(
"Cannot requeue pending __wait__ channel message " +
"back on to already full channel %s" % original_channel
)
else:
break
else:
# Bad ordering - warn if we're getting close to the limit
if getattr(message, "__doomed__", False):
warnings.warn(
"Enforce ordering consumer reached retry limit, message " +
"being dropped. Did you decorate all protocol consumers correctly?"
# Since out of order, enqueue message temporarily to wait channel for this socket connection
wait_channel = "__wait__.%s" % message.reply_channel.name
message.content["original_channel"] = message.channel.name
try:
message.channel_layer.send(wait_channel, message.content)
except message.channel_layer.ChannelFull:
raise message.channel_layer.ChannelFull(
"Cannot add unordered message to already " +
"full __wait__ channel for socket %s" % message.reply_channel.name
)
raise ConsumeLater()
return inner
if func is not None:
return decorator(func)
Expand Down
88 changes: 73 additions & 15 deletions channels/tests/test_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

from django.conf import settings
from django.test import override_settings
from channels.exceptions import ConsumeLater
from channels.message import Message
from channels.sessions import channel_session, http_session, enforce_ordering, session_for_reply_channel
from channels.tests import ChannelTestCase
from channels import DEFAULT_CHANNEL_LAYER, channel_layers


@override_settings(SESSION_ENGINE="django.contrib.sessions.backends.cache")
Expand Down Expand Up @@ -110,9 +110,21 @@ def test_enforce_ordering_slight(self):
Tests that slight mode of enforce_ordering works
"""
# Construct messages to send
message0 = Message({"reply_channel": "test-reply-a", "order": 0}, None, None)
message1 = Message({"reply_channel": "test-reply-a", "order": 1}, None, None)
message2 = Message({"reply_channel": "test-reply-a", "order": 2}, None, None)
message0 = Message(
{"reply_channel": "test-reply-a", "order": 0},
"websocket.connect",
channel_layers[DEFAULT_CHANNEL_LAYER]
)
message1 = Message(
{"reply_channel": "test-reply-a", "order": 1},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)
message2 = Message(
{"reply_channel": "test-reply-a", "order": 2},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)

# Run them in an acceptable slight order
@enforce_ordering(slight=True)
Expand All @@ -123,29 +135,54 @@ def inner(message):
inner(message2)
inner(message1)

# Ensure wait channel is empty
wait_channel = "__wait__.%s" % "test-reply-a"
next_message = self.get_next_message(wait_channel)
self.assertEqual(next_message, None)

def test_enforce_ordering_slight_fail(self):
"""
Tests that slight mode of enforce_ordering fails on bad ordering
"""
# Construct messages to send
message2 = Message({"reply_channel": "test-reply-e", "order": 2}, None, None)
message2 = Message(
{"reply_channel": "test-reply-e", "order": 2},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)

# Run them in an acceptable strict order
@enforce_ordering(slight=True)
def inner(message):
pass

with self.assertRaises(ConsumeLater):
inner(message2)
inner(message2)

# Ensure wait channel is not empty
wait_channel = "__wait__.%s" % "test-reply-e"
next_message = self.get_next_message(wait_channel)
self.assertNotEqual(next_message, None)

def test_enforce_ordering_strict(self):
"""
Tests that strict mode of enforce_ordering works
"""
# Construct messages to send
message0 = Message({"reply_channel": "test-reply-b", "order": 0}, None, None)
message1 = Message({"reply_channel": "test-reply-b", "order": 1}, None, None)
message2 = Message({"reply_channel": "test-reply-b", "order": 2}, None, None)
message0 = Message(
{"reply_channel": "test-reply-b", "order": 0},
"websocket.connect",
channel_layers[DEFAULT_CHANNEL_LAYER]
)
message1 = Message(
{"reply_channel": "test-reply-b", "order": 1},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)
message2 = Message(
{"reply_channel": "test-reply-b", "order": 2},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)

# Run them in an acceptable strict order
@enforce_ordering
Expand All @@ -156,28 +193,49 @@ def inner(message):
inner(message1)
inner(message2)

# Ensure wait channel is empty
wait_channel = "__wait__.%s" % "test-reply-b"
next_message = self.get_next_message(wait_channel)
self.assertEqual(next_message, None)

def test_enforce_ordering_strict_fail(self):
"""
Tests that strict mode of enforce_ordering fails on bad ordering
"""
# Construct messages to send
message0 = Message({"reply_channel": "test-reply-c", "order": 0}, None, None)
message2 = Message({"reply_channel": "test-reply-c", "order": 2}, None, None)
message0 = Message(
{"reply_channel": "test-reply-c", "order": 0},
"websocket.connect",
channel_layers[DEFAULT_CHANNEL_LAYER]
)
message2 = Message(
{"reply_channel": "test-reply-c", "order": 2},
"websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER]
)

# Run them in an acceptable strict order
@enforce_ordering
def inner(message):
pass

inner(message0)
with self.assertRaises(ConsumeLater):
inner(message2)
inner(message2)

# Ensure wait channel is not empty
wait_channel = "__wait__.%s" % "test-reply-c"
next_message = self.get_next_message(wait_channel)
self.assertNotEqual(next_message, None)

def test_enforce_ordering_fail_no_order(self):
"""
Makes sure messages with no "order" key fail
"""
message0 = Message({"reply_channel": "test-reply-d"}, None, None)
message0 = Message(
{"reply_channel": "test-reply-d"},
None,
channel_layers[DEFAULT_CHANNEL_LAYER]
)

@enforce_ordering(slight=True)
def inner(message):
Expand Down

0 comments on commit e1a325e

Please sign in to comment.