Skip to content

Commit

Permalink
Repalced with a PushProducer based implementation #9
Browse files Browse the repository at this point in the history
  • Loading branch information
dhananjaysathe committed May 15, 2013
1 parent 69b0f1d commit 2e1c21a
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 11 deletions.
49 changes: 40 additions & 9 deletions rce-comm/rce/comm/buffer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# rce-comm/rce/comm/client.py
# rce-comm/rce/comm/buffer.py
#
# This file is part of the RoboEarth Cloud Engine framework.
#
Expand Down Expand Up @@ -31,26 +31,57 @@
#

from autobahn.websocket import WebSocketProtocol
from collections import defaultdict, deque


class BufferManager(object):
implements(IPullProducer)
implements(IPushProducer)

def __init__(self, consumer, protocol):
self.consumer = consumer
self.protocol = protocol
self._paused = False

def resumeProducing(self):
self._paused = False
try:
while True:
while not self._paused:
try:
data = self.protocol._binary_buff.popleft()
msg = data[0] + data[1].getvalue()
WebSocketProtocol.sendMessage(self.protocol, msg, binary=True)
if self.paused:
break
except IndexError:
pass
except IndexError:
pass

def stopProducing(self):
def pauseProducing(self):
self._paused = True

def stopProducing(self):
pass

# TODO Implement Buffer Queues
class BufferQueue(object):
"Priority Queues in ascending order"
def __init__(self):
self._queues = defaultdict(deque)
self._prios = set()

def append(self, item, priority=1):
""" Adds an item to queues.
Higher number indicates higher priority
0 indicates real time
"""
self._curr_prio
self._prios.add(prio)
self._queues[priority].append(item)

def get_item(self):
""" Return the right element
"""

curr_prio = 0 if 0 in self._prios else max(self._prios)
try:
return self._queues[curr_prio].popleft()
except IndexError:
self._prios.remove(curr_prio)
del self._queues[curr_prio]
2 changes: 1 addition & 1 deletion rce-comm/rce/comm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self, conn):
def connectionMade(self):
WebSocketClientProtocol.connectionMade(self)
self._buffermanager = BufferManager(self.transport, self)
self.transport.registerProducer(self._buffermanager, False)
self.transport.registerProducer(self._buffermanager, True)

def onOpen(self):
""" This method is called by twisted as soon as the WebSocket
Expand Down
2 changes: 1 addition & 1 deletion rce-comm/rce/comm/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(self, realm):
def connectionMade(self):
WebSocketServerProtocol.connectionMade(self)
self._buffermanager = BufferManager(self.transport, self)
self.transport.registerProducer(self._buffermanager, False)
self.transport.registerProducer(self._buffermanager, True)

def onConnect(self, req):
""" Method is called by the Autobahn engine when a request to establish
Expand Down

1 comment on commit 2e1c21a

@dhananjaysathe
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was targetted for #8 not #9

Please sign in to comment.