我想在测试中模拟 websocket 的 ping 超时。
首先,我尝试悄悄关闭TCP套接字,但由于无法关闭无论如何都会发送 FIN。iptables 不是一种选择。
我使用龙卷风库来模拟 websocket 客户端。
我阅读了如何在已经实例化的对象中替换方法。它在一个简单的演示中工作,但龙卷风似乎忽略了我的操作。
可能是什么?
import types
ws.websocket_connect(url, callback=openCallback,
on_message_callback=messageCallback)
def openCallback(future):
ws = future.result()
inst = ws.protocol
def stub(self, x):
print "STUB"
inst.write_ping = types.MethodType(stub, inst, inst.__class__)
没有任何错误,但未调用存根,客户端仍回复服务器 ping 请求。
我意识到我想禁用错误的方法。
服务器使用 write_ping 发送 ping 请求,但是客户端写 pong,所以我覆盖了_write_frame方法(fin、opcode)如果操作码 != 0xA,则将控制权转发给原始实现。