也许我在 Twisted 的异步设计中缺少一些东西,但我似乎找不到一种方法来调用 sendMessage() 方法"外部"。我的意思是,发送消息时不必仅使用Twisted/AutobahnWebsockets的回调方法(例如在onOpen或从onMessage()的服务器接收数据时)
当然,我可以启动一个线程并调用my_protocol_instance.sendMessage("hello"),但这会破坏异步设计的所有目的,对吗?
在一个具体的例子中,我需要有一个顶部包装类来打开连接并管理它,每当我需要时,我都会调用my_class.send_my_toplevel_message(msg)。我该如何实现?
希望我已经清楚了我的解释。
谢谢
为什么需要一个线程来启动protocolInstance.sendMessage() ?这可以在正常的反应堆回路中完成。
扭曲的核心是反应器,当你认为扭曲本身是反应性的时,它更容易看到事物 - 这意味着它做某事作为对其他事物的反应(响应)。
现在我假设您正在谈论的线程也会由于某些事件或活动或状态而在调用 sendMessage 时创建和创建。我很难想象您只需要突然发送一条消息而没有任何理由做出反应的情况。
但是,如果有一个事件应该触发 sendMessage,则无需在线程中调用它:只需使用扭曲的机制来捕获该事件,然后从该特定事件的回调调用 sendMessage。
现在来看你的具体例子:你能具体说明"每当我需要"在这个问题的上下文中到底是什么意思吗?来自另一个连接的输入?来自用户的输入?循环活动?
我设法通过在另一个线程中运行 Twisted 来实现我需要的东西,让我的程序自由运行并允许它使用 reactor.callFromThread() 触发 Twisted 中的发送数据。
你觉得怎么样?
# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
def __init__(self, factory):
self.factory = factory
def onOpen(self):
log.debug("Client connected")
self.factory.protocol_instance = self
self.factory.base_client._connected_event.set()
class _WebSocketClientFactory(WebSocketClientFactory):
def __init__(self, *args, **kwargs):
WebSocketClientFactory.__init__(self, *args, **kwargs)
self.protocol_instance = None
self.base_client = None
def buildProtocol(self, addr):
return _WebSocketClientProtocol(self)
# ------ end twisted -------
class BaseWBClient(object):
def __init__(self, websocket_settings):
self.settings = websocket_settings
# instance to be set by the own factory
self.factory = None
# this event will be triggered on onOpen()
self._connected_event = threading.Event()
# queue to hold not yet dispatched messages
self._send_queue = Queue.Queue()
self._reactor_thread = None
def connect(self):
log.debug("Connecting to %(host)s:%(port)d" % self.settings)
self.factory = _WebSocketClientFactory(
"ws://%(host)s:%(port)d" % self.settings,
debug=True)
self.factory.base_client = self
c = connectWS(self.factory)
self._reactor_thread = threading.Thread(target=reactor.run,
args=(False,))
self._reactor_thread.daemon = True
self._reactor_thread.start()
def send_message(self, body):
if not self._check_connection():
return
log.debug("Queing send")
self._send_queue.put(body)
reactor.callFromThread(self._dispatch)
def _check_connection(self):
if not self._connected_event.wait(timeout=10):
log.error("Unable to connect to server")
self.close()
return False
return True
def _dispatch(self):
log.debug("Dispatching")
while True:
try:
body = self._send_queue.get(block=False)
except Queue.Empty:
break
self.factory.protocol_instance.sendMessage(body)
def close(self):
reactor.callFromThread(reactor.stop)