根据文档,鉴于我已经实例化了一个连接:
>>> import stomp
>>> c = stomp.Connection([('127.0.0.1', 62615)])
>>> c.start()
>>> c.connect('admin', 'password', wait=True)
如何监视它,以使其在c.is_connected == False
上重新连接?
>>> reconnect_on_dead_connection(c)
...
>>> [1479749503] reconnected dead connection
您可以包装连接并检查它是否连接了每个呼叫。
import stomp
def reconnect(connection):
"""reconnect here"""
class ReconnectWrapper(object):
def __init__(self, connection):
self.__connection = connection
def __getattr__(self, item):
if not self.__connection.is_connected:
reconnect(self.__connection)
return getattr(self.__connection, item)
if __name__ == '__main__':
c = stomp.Connection([('127.0.0.1', 62615)])
c.start()
c.connect('admin', 'password', wait=True)
magic_connection = ReconnectWrapper(c)
测试:
from scratch_35 import ReconnectWrapper
import unittest
import mock
class TestReconnection(unittest.TestCase):
def setUp(self):
self.connection = mock.MagicMock()
self.reconnect_patcher = mock.patch("scratch_35.reconnect")
self.reconnect = self.reconnect_patcher.start()
def tearDown(self):
self.reconnect_patcher.stop()
def test_pass_call_to_warapped_connection(self):
connection = ReconnectWrapper(self.connection)
connection.send("abc")
self.reconnect.assert_not_called()
self.connection.send.assert_called_once_with("abc")
def test_reconnect_when_disconnected(self):
self.connection.is_connected = False
connection = ReconnectWrapper(self.connection)
connection.send("abc")
self.reconnect.assert_called_once_with(self.connection)
self.connection.send.assert_called_once_with("abc")
if __name__ == '__main__':
unittest.main()
结果:
..
----------------------------------------------------------------------
Ran 2 tests in 0.004s
OK
密钥是魔术方法__getatter__
,每当您尝试访问对象未提供的属性时,都会称呼它。有关方法__getattr__
的更多信息,您可以在Doucmentation中找到https://docs.python.org/2/reference/datamodel.html#object。 getAttr 。
如果您使用的是stomp作为发件人:
def reconnect_on_dead_connection(c):
if c.is_connected():
c.connect('admin', 'password', wait=True)
并在发送Stomp消息之前打电话给它。
如果您使用的是ConnectionListener类扩展名,请使用on_disconnected
重新连接。