From e1a325e8fa3bc6e3fa3ed92a95b042a9455f6810 Mon Sep 17 00:00:00 2001 From: Sachin Rekhi Date: Thu, 12 May 2016 10:38:06 -0700 Subject: [PATCH] improve @enforce_ordering to leverage a wait channel to avoid spinlocks (#144) * improved @enforce_ordering to leverage a wait channel to avoid spinlocks * addressed pyflake issues * renamed wait channel to __wait__. * handled potential ChannelFull exception * updated sessions unit tests * updated enforce_ordering tests to reflect new approach of leveraging wait channels * addressed pyflake issues * more pyflake fixes * removed close_on_error handling on enforce_ordering since only worked on websockets --- channels/sessions.py | 40 ++++++++++----- channels/tests/test_sessions.py | 88 +++++++++++++++++++++++++++------ 2 files changed, 102 insertions(+), 26 deletions(-) diff --git a/channels/sessions.py b/channels/sessions.py index a749fcddc..ea2e69658 100644 --- a/channels/sessions.py +++ b/channels/sessions.py @@ -1,6 +1,5 @@ import functools import hashlib -import warnings from importlib import import_module from django.conf import settings @@ -75,7 +74,7 @@ def enforce_ordering(func=None, slight=False): Enforces either slight (order=0 comes first, everything else isn't ordered) or strict (all messages exactly ordered) ordering against a reply_channel. - Uses sessions to track ordering. + Uses sessions to track ordering and socket-specific wait channels for unordered messages. You cannot mix slight ordering and strict ordering on a channel; slight ordering does not write to the session after the first message to improve @@ -95,19 +94,38 @@ def inner(message, *args, **kwargs): # See what the current next order should be next_order = message.channel_session.get("__channels_next_order", 0) if order == next_order or (slight and next_order > 0): - # Message is in right order. Maybe persist next one? + # Run consumer + func(message, *args, **kwargs) + # Mark next message order as available for running if order == 0 or not slight: message.channel_session["__channels_next_order"] = order + 1 - # Run consumer - return func(message, *args, **kwargs) + message.channel_session.save() + # Requeue any pending wait channel messages for this socket connection back onto it's original channel + while True: + wait_channel = "__wait__.%s" % message.reply_channel.name + channel, content = message.channel_layer.receive_many([wait_channel], block=False) + if channel: + original_channel = content.pop("original_channel") + try: + message.channel_layer.send(original_channel, content) + except message.channel_layer.ChannelFull: + raise message.channel_layer.ChannelFull( + "Cannot requeue pending __wait__ channel message " + + "back on to already full channel %s" % original_channel + ) + else: + break else: - # Bad ordering - warn if we're getting close to the limit - if getattr(message, "__doomed__", False): - warnings.warn( - "Enforce ordering consumer reached retry limit, message " + - "being dropped. Did you decorate all protocol consumers correctly?" + # Since out of order, enqueue message temporarily to wait channel for this socket connection + wait_channel = "__wait__.%s" % message.reply_channel.name + message.content["original_channel"] = message.channel.name + try: + message.channel_layer.send(wait_channel, message.content) + except message.channel_layer.ChannelFull: + raise message.channel_layer.ChannelFull( + "Cannot add unordered message to already " + + "full __wait__ channel for socket %s" % message.reply_channel.name ) - raise ConsumeLater() return inner if func is not None: return decorator(func) diff --git a/channels/tests/test_sessions.py b/channels/tests/test_sessions.py index e973f9d45..da65f7a8e 100644 --- a/channels/tests/test_sessions.py +++ b/channels/tests/test_sessions.py @@ -2,10 +2,10 @@ from django.conf import settings from django.test import override_settings -from channels.exceptions import ConsumeLater from channels.message import Message from channels.sessions import channel_session, http_session, enforce_ordering, session_for_reply_channel from channels.tests import ChannelTestCase +from channels import DEFAULT_CHANNEL_LAYER, channel_layers @override_settings(SESSION_ENGINE="django.contrib.sessions.backends.cache") @@ -110,9 +110,21 @@ def test_enforce_ordering_slight(self): Tests that slight mode of enforce_ordering works """ # Construct messages to send - message0 = Message({"reply_channel": "test-reply-a", "order": 0}, None, None) - message1 = Message({"reply_channel": "test-reply-a", "order": 1}, None, None) - message2 = Message({"reply_channel": "test-reply-a", "order": 2}, None, None) + message0 = Message( + {"reply_channel": "test-reply-a", "order": 0}, + "websocket.connect", + channel_layers[DEFAULT_CHANNEL_LAYER] + ) + message1 = Message( + {"reply_channel": "test-reply-a", "order": 1}, + "websocket.receive", + channel_layers[DEFAULT_CHANNEL_LAYER] + ) + message2 = Message( + {"reply_channel": "test-reply-a", "order": 2}, + "websocket.receive", + channel_layers[DEFAULT_CHANNEL_LAYER] + ) # Run them in an acceptable slight order @enforce_ordering(slight=True) @@ -123,29 +135,54 @@ def inner(message): inner(message2) inner(message1) + # Ensure wait channel is empty + wait_channel = "__wait__.%s" % "test-reply-a" + next_message = self.get_next_message(wait_channel) + self.assertEqual(next_message, None) + def test_enforce_ordering_slight_fail(self): """ Tests that slight mode of enforce_ordering fails on bad ordering """ # Construct messages to send - message2 = Message({"reply_channel": "test-reply-e", "order": 2}, None, None) + message2 = Message( + {"reply_channel": "test-reply-e", "order": 2}, + "websocket.receive", + channel_layers[DEFAULT_CHANNEL_LAYER] + ) # Run them in an acceptable strict order @enforce_ordering(slight=True) def inner(message): pass - with self.assertRaises(ConsumeLater): - inner(message2) + inner(message2) + + # Ensure wait channel is not empty + wait_channel = "__wait__.%s" % "test-reply-e" + next_message = self.get_next_message(wait_channel) + self.assertNotEqual(next_message, None) def test_enforce_ordering_strict(self): """ Tests that strict mode of enforce_ordering works """ # Construct messages to send - message0 = Message({"reply_channel": "test-reply-b", "order": 0}, None, None) - message1 = Message({"reply_channel": "test-reply-b", "order": 1}, None, None) - message2 = Message({"reply_channel": "test-reply-b", "order": 2}, None, None) + message0 = Message( + {"reply_channel": "test-reply-b", "order": 0}, + "websocket.connect", + channel_layers[DEFAULT_CHANNEL_LAYER] + ) + message1 = Message( + {"reply_channel": "test-reply-b", "order": 1}, + "websocket.receive", + channel_layers[DEFAULT_CHANNEL_LAYER] + ) + message2 = Message( + {"reply_channel": "test-reply-b", "order": 2}, + "websocket.receive", + channel_layers[DEFAULT_CHANNEL_LAYER] + ) # Run them in an acceptable strict order @enforce_ordering @@ -156,13 +193,26 @@ def inner(message): inner(message1) inner(message2) + # Ensure wait channel is empty + wait_channel = "__wait__.%s" % "test-reply-b" + next_message = self.get_next_message(wait_channel) + self.assertEqual(next_message, None) + def test_enforce_ordering_strict_fail(self): """ Tests that strict mode of enforce_ordering fails on bad ordering """ # Construct messages to send - message0 = Message({"reply_channel": "test-reply-c", "order": 0}, None, None) - message2 = Message({"reply_channel": "test-reply-c", "order": 2}, None, None) + message0 = Message( + {"reply_channel": "test-reply-c", "order": 0}, + "websocket.connect", + channel_layers[DEFAULT_CHANNEL_LAYER] + ) + message2 = Message( + {"reply_channel": "test-reply-c", "order": 2}, + "websocket.receive", + channel_layers[DEFAULT_CHANNEL_LAYER] + ) # Run them in an acceptable strict order @enforce_ordering @@ -170,14 +220,22 @@ def inner(message): pass inner(message0) - with self.assertRaises(ConsumeLater): - inner(message2) + inner(message2) + + # Ensure wait channel is not empty + wait_channel = "__wait__.%s" % "test-reply-c" + next_message = self.get_next_message(wait_channel) + self.assertNotEqual(next_message, None) def test_enforce_ordering_fail_no_order(self): """ Makes sure messages with no "order" key fail """ - message0 = Message({"reply_channel": "test-reply-d"}, None, None) + message0 = Message( + {"reply_channel": "test-reply-d"}, + None, + channel_layers[DEFAULT_CHANNEL_LAYER] + ) @enforce_ordering(slight=True) def inner(message):