From da1d55261a4b2f460d744d7b9e8a93e8d67712e8 Mon Sep 17 00:00:00 2001 From: Dhananjay Sathe Date: Thu, 16 May 2013 17:07:30 +0200 Subject: [PATCH] Fix to working buffer #8 . --- rce-comm/rce/comm/buffer.py | 18 ++++++++++-------- rce-comm/rce/comm/client.py | 2 +- rce-comm/rce/comm/server.py | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/rce-comm/rce/comm/buffer.py b/rce-comm/rce/comm/buffer.py index f33f55d..783294c 100644 --- a/rce-comm/rce/comm/buffer.py +++ b/rce-comm/rce/comm/buffer.py @@ -30,19 +30,20 @@ # # from zope.interface import implements -from twisted.internet.interfaces import IPushProducer +from twisted.internet.interfaces import IPullProducer from autobahn.websocket import WebSocketProtocol from collections import defaultdict, deque class BufferManager(object): - implements(IPushProducer) + implements(IPullProducer) def __init__(self, consumer, protocol): self.consumer = consumer self.protocol = protocol - self._paused = False + self._paused = True + self._new = True self._binary_buff = deque(maxlen=10) def push_data(self, data): @@ -54,7 +55,10 @@ def push_data(self, data): self._binary_buff.append(data) def resumeProducing(self): - self._paused = False + if not self._new: + self._paused = False + else: + self._new = False while not self._paused: try: uriBinary, msgURI = self._binary_buff.popleft() @@ -63,13 +67,11 @@ def resumeProducing(self): msg = data[0] + data[1].getvalue() WebSocketProtocol.sendMessage(self.protocol, msg, binary=True) except IndexError: + self._paused = True pass - def pauseProducing(self): - self._paused = True - def stopProducing(self): - pass + self._paused = True # TODO Implement Buffer Queues class BufferQueue(object): diff --git a/rce-comm/rce/comm/client.py b/rce-comm/rce/comm/client.py index b91e3af..68a2fee 100644 --- a/rce-comm/rce/comm/client.py +++ b/rce-comm/rce/comm/client.py @@ -75,7 +75,7 @@ def __init__(self, conn): def connectionMade(self): WebSocketClientProtocol.connectionMade(self) self._buffermanager = BufferManager(self.transport, self) - self.transport.registerProducer(self._buffermanager, True) + self.transport.registerProducer(self._buffermanager, False) def onOpen(self): """ This method is called by twisted as soon as the WebSocket diff --git a/rce-comm/rce/comm/server.py b/rce-comm/rce/comm/server.py index aaf5d31..5b8a3e1 100644 --- a/rce-comm/rce/comm/server.py +++ b/rce-comm/rce/comm/server.py @@ -197,7 +197,7 @@ def __init__(self, realm): def connectionMade(self): WebSocketServerProtocol.connectionMade(self) self._buffermanager = BufferManager(self.transport, self) - self.transport.registerProducer(self._buffermanager, True) + self.transport.registerProducer(self._buffermanager, False) def onConnect(self, req): """ Method is called by the Autobahn engine when a request to establish