From 2e1c21af7a499c9ceaca59f7edd6c4fce772b4ff Mon Sep 17 00:00:00 2001 From: Dhananjay Sathe Date: Wed, 15 May 2013 14:37:10 +0200 Subject: [PATCH] Repalced with a PushProducer based implementation #9 --- rce-comm/rce/comm/buffer.py | 49 ++++++++++++++++++++++++++++++------- rce-comm/rce/comm/client.py | 2 +- rce-comm/rce/comm/server.py | 2 +- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/rce-comm/rce/comm/buffer.py b/rce-comm/rce/comm/buffer.py index 4bbd509..e27811f 100644 --- a/rce-comm/rce/comm/buffer.py +++ b/rce-comm/rce/comm/buffer.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# rce-comm/rce/comm/client.py +# rce-comm/rce/comm/buffer.py # # This file is part of the RoboEarth Cloud Engine framework. # @@ -31,26 +31,57 @@ # from autobahn.websocket import WebSocketProtocol +from collections import defaultdict, deque class BufferManager(object): - implements(IPullProducer) + implements(IPushProducer) def __init__(self, consumer, protocol): self.consumer = consumer self.protocol = protocol + self._paused = False def resumeProducing(self): self._paused = False - try: - while True: + while not self._paused: + try: data = self.protocol._binary_buff.popleft() msg = data[0] + data[1].getvalue() WebSocketProtocol.sendMessage(self.protocol, msg, binary=True) - if self.paused: - break - except IndexError: - pass + except IndexError: + pass - def stopProducing(self): + def pauseProducing(self): self._paused = True + + def stopProducing(self): + pass + +# TODO Implement Buffer Queues +class BufferQueue(object): + "Priority Queues in ascending order" + def __init__(self): + self._queues = defaultdict(deque) + self._prios = set() + + def append(self, item, priority=1): + """ Adds an item to queues. + Higher number indicates higher priority + 0 indicates real time + + """ + self._curr_prio + self._prios.add(prio) + self._queues[priority].append(item) + + def get_item(self): + """ Return the right element + """ + + curr_prio = 0 if 0 in self._prios else max(self._prios) + try: + return self._queues[curr_prio].popleft() + except IndexError: + self._prios.remove(curr_prio) + del self._queues[curr_prio] diff --git a/rce-comm/rce/comm/client.py b/rce-comm/rce/comm/client.py index eb3d083..434d79b 100644 --- a/rce-comm/rce/comm/client.py +++ b/rce-comm/rce/comm/client.py @@ -78,7 +78,7 @@ def __init__(self, conn): def connectionMade(self): WebSocketClientProtocol.connectionMade(self) self._buffermanager = BufferManager(self.transport, self) - self.transport.registerProducer(self._buffermanager, False) + self.transport.registerProducer(self._buffermanager, True) def onOpen(self): """ This method is called by twisted as soon as the WebSocket diff --git a/rce-comm/rce/comm/server.py b/rce-comm/rce/comm/server.py index 8734f98..a34b632 100644 --- a/rce-comm/rce/comm/server.py +++ b/rce-comm/rce/comm/server.py @@ -201,7 +201,7 @@ def __init__(self, realm): def connectionMade(self): WebSocketServerProtocol.connectionMade(self) self._buffermanager = BufferManager(self.transport, self) - self.transport.registerProducer(self._buffermanager, False) + self.transport.registerProducer(self._buffermanager, True) def onConnect(self, req): """ Method is called by the Autobahn engine when a request to establish