Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UserWarning: Enforce ordering consumer reached retry limit in production #141

Closed
sachinrekhi opened this issue May 1, 2016 · 7 comments
Closed

Comments

@sachinrekhi
Copy link
Contributor

I'm unfortunately seeing the following error in production:

channels/sessions.py:104: UserWarning: Enforce ordering consumer reached retry limit, message being dropped. Did you decorate all protocol consumers correctly?

As I mentioned before, I believe I have correctly decorated all protocol consumers, as here is my consumer code:

from channels.sessions import enforce_ordering
from channels.auth import channel_session_user_from_http, channel_session_user

from notes.api.socket.router import SocketRouter

@enforce_ordering
@channel_session_user_from_http
def connect(message):
    SocketRouter().connect(message.channel_session, message.reply_channel, message.user)

@enforce_ordering
@channel_session_user
def disconnect(message):
    SocketRouter().disconnect(message.channel_session, message.reply_channel, message.user)

@enforce_ordering
@channel_session_user
def message(message):
    SocketRouter().message(message.channel_session, message.reply_channel, message.user, message.content)

I'm sending socket messages to the server at a fairly slow rate now (one socket message sent every 500ms). I'm currently running 3 channel workers against Daphne. There are currently no other socket connections on the server, just the one user / socket connection. The server in general is pretty idle.

My best guess at what may be happening is that my server may in-fact be too idle. When a message comes in, one channel worker picks it up and then the next message comes in and the next channel worker tries to pick up, but since the first one is not done it raises ConsumeLater. But since it has nothing else to pick up other than the same task since there are no other users, it simply spins very quickly through it's retries. Is this possible? Are the channel workers running in a tight loop constantly picking up tasks and in this case raising ConsumerLater? Or is there some delay before it will pick up a subsequent task? Or maybe is there some mechanism by which they only pull subsequent tasks when another worker has finished it's task? If it's just in a tight loop, then it seems like this issue would happen whenever the rate of incoming messages exceeds the processing rate of the messages and there are more idle workers than socket connections to process.

The dropping of the message is unfortunately breaking my collaborative editing scenario since a message from the client is being dropped and not being processed. I'm planning on building re-send logic from the client as well as message ids so the server doesn't process the same message twice, but I was expecting this to be only used in exceptional cases (server connection died, poor latency, etc) not a mainline scenario like this.

Thoughts on how to resolve?

@andrewgodwin
Copy link
Member

Yes, they are indeed running in a tight loop as you suggest, so if they are "too idle" and the other messages have not yet made it through they'd quickly spin lock into the retry limit.

It's possible I might have to drop enforce_ordering's strict mode (the one you're using here) and just have it provide slight mode, where it merely makes sure connect runs before receive, or somehow provide more logic so that it knows when things before it in the queue are still being processed.

I'm not really sure how to proceed, it's definitely not doing what you want in this case, and increasing the retry limit will just lead to more spinlocks (though ironically higher load would probably solve the problem). Let me think about it a bit.

@sachinrekhi
Copy link
Contributor Author

One of the supposed benefits of websockets over http is strict message ordering, so I'd love to see us find a way to maintain a mainline path to support this. Otherwise it feels like an implementation detail of django channels ended up breaking a typical websocket benefit.

Let me know throw out a couple of approaches to begin the brainstorming...

  1. What if we take an approach like ReconnectingWebsocket where we use exponential back-off on retries to increase the delays between a worker pulling off a task from the queue. In normal operation, it would continue to pull off in it's tight-loop fashion, but when ConsumerLater starts to come up, it starts to introduce delays at increasing intervals before picking off more tasks. This may just be good enough to avoid the issue I'm running into, but not sacrifice performance in the common case where there is significant load on the server and a tight-loop is justified
  2. Could we take an approach in the worker like RedLock where it attempts to acquire the lock for a certain number of ms before immediately failing and raising ConsumerLater. This way you create some room for the previously ordered task to finish. Or consider only taking this approach on the last retry loop before completely failing to again avoid reducing performance in the case there are plenty of other tasks to take on
  3. Is there an approach similar to iterators where a worker can peek at what else is on the queue to understand whether there is other stuff to process. if there is, fail immediately. otherwise take one of the above approaches to introduce some delay before failing
  4. All of the above approaches try to introduce enough delays in a performant way to prevent reaching the retry limit, which could work. But ideally what we want is the worker to be able to get some signal that new tasks have showed up or previous tasks have completed as the impetus for it to look for additional tasks. Is there any way such signal could be provided?

@andrewgodwin
Copy link
Member

I agree, in-order processing is important for WebSocket and any other similar protocol, I want to keep it a thing.

  1. This will cause a huge performance hit - the workers have to always run at full speed, or an out-of-order socket will cause everything else to slow down.
  2. The problem is that there's no locking system currently in place past Django Sessions, which don't support this concept.
  3. No, you can't peek at the queue for scalability reasons (every operation you implement against the queue makes it harder to implement)
  4. This might be the basis of a good idea, but the question is how do we do that signal?

The basic problem here is that enforce_ordering does not use an actual locking backend, but instead uses Django Sessions, and combined with the fact that messages cannot be delayed, only sent to the back of the queue, it's a recipe for spinlocking.

The fundamental problem to solve is not to re-inject messages back onto an active channel while another worker is processing something from the same reply_channel. In abstract, a good way of doing this would be to store up any out-of-order messages encountered while a message is processing, and then when it finishes flushing them back out onto their original channel.

Thinking about it, it's perhaps possible that this could be done with a new channel per reply channel that unordered messages get shunted onto by the decorator if they're out of order, and then the decorator round the running consumer pulls things from that channel back onto the original one. It means a lot of shoving things to and fro, but it gets it done, I think.

The other, more radical, option is to change the very way websocket.receive works, and have the actual content of the websocket.receive message always be the same and generic (just containing reply_channel and path), and have the actual contents of the websocket provided on a single-listener channel that the consumer draws from, much like the way HTTP bodies work now. You'd still write a consumer against websocket.receive, but whenever you got one you'd then turn to the single-listener channel and fetch one message from there, ensuring they were still in order. This, however, does not prevent things from running in parallel still, which I presume you also want?

@sachinrekhi
Copy link
Contributor Author

sachinrekhi commented May 2, 2016

Thinking about it, it's perhaps possible that this could be done with a new channel per reply channel that unordered messages get shunted onto by the decorator if they're out of order, and then the decorator round the running consumer pulls things from that channel back onto the original one. It means a lot of shoving things to and fro, but it gets it done, I think.

This actually seems like a great solution to me. You avoid unnecessary spin locks and the messages become available for pickup by channel workers on the main channel at the exact moment they can be serviced since you are guaranteed the previous blocking message has now finished.

You'd want to make sure this mechanism could be used for an arbitrary number of unordered messages (ex: while message 1 is being processed, message 2 and 3 come in, both will be picked up by channel workers and inserted into the out-of-order channel based on the reply channel; and then when message 1 completes processing, both of these messages are taken from the out-of-order channel and placed back on the main channel in FIFO order).

I assume you have a mechanism to also handle the potential race condition when worker 2 adds message to out-of-order channel at the exact same time worker 1 checks for any out-of-order messages in the channel to insert back into the main channel. Otherwise if worker 1 completes before worker2 adds the message, then you could have a message in the out-of-order channel but no way to pull it back out to the main channel since there are no longer any workers processing messages associated with that reply channel.

regarding the more radical approach: preventing messages from being processed in parallel is still also important for my scenarios, so wouldn't want a solution to ordering to come at the cost of now allowing parallel message processing.

@andrewgodwin
Copy link
Member

I assume you have a mechanism to also handle the potential race condition when worker 2 adds message to out-of-order channel at the exact same time worker 1 checks for any out-of-order messages in the channel to insert back into the main channel. Otherwise if worker 1 completes before worker2 adds the message, then you could have a message in the out-of-order channel but no way to pull it back out to the main channel since there are no longer any workers processing messages associated with that reply channel.

Well, I haven't written it yet, so I don't; this is certainly a risk, and the problem is that there's no event to hang a later check of that delayed task channel on either, so I suspect the naive implementation would lose the message in this scenario (though it would get it back once another message came in and the decorator got a chance to look at the channel again).

I do suspect there's no perfect solution here without either proper distributed locking or exactly pinning all message processing for one WebSocket to a single thread (which is how most other systems achieve this right now).

I suspect the best we could do is to strictly run the "mark next message as available for running" check strictly before the "empty the wait queue back onto the main channel" code, so that the failure case is that the next worker begins before the current one exits (but after the business logic has run), rather than the other way around where, as you say, you risk losing messages forever.

@sachinrekhi
Copy link
Contributor Author

Oh ya, the strict ordering you suggest seems fine then since the message wouldn't be lost.

@sachinrekhi
Copy link
Contributor Author

The new approach addressing this issue is now merged, so will be available in the next release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants