我正在尝试在Java 中实现ZMQREQ/REP
模型
我有一个服务器角色,运行在5564号邮件上,充当Replier
ZMQ.Socket repSock = context.socket(ZMQ.REP);
我有一个客户端角色,在5563号上运行
ZMQ.Socket syncclient = context.socket(ZMQ.REQ);
我有一个代理-服务器在中间,它传递请求和响应
ZMQ.proxy(reqSocket, repSocket, null);
有一个代理的好处是我可以添加多个服务器
repSocket.connect("tcp://" + addr.getHostAddress() + ":" + port);
它运行良好。
现在,当我从代理中删除服务器节点时
repSocket.disconnect("tcp://" + addr.getHostAddress() + ":" + port);
客户端被卡住,因为已经发出请求,REQ
-套接字等待响应。
因此该过程停留在syncclient.recvStr()
for (int request_nbr = 0; request_nbr < (request_nbr + 1); request_nbr++) {
syncclient.send(str.getBytes(),0);
System.out.println("Send Dataaaa....... " );
String data = syncclient.recvStr(Charset.defaultCharset());
System.out.println(" here.. " +data);
request_nbr++;
}
我搜索了一下,找不到跟踪REQ
-套接字的方法
我需要两件事中的任何一件:
一种跟踪
Socket
-实例的方法,我即将断开该实例,等待所有消息都得到处理,这样syncclient.recvStr()
就不会被阻止一种重置
syncclient
-套接字的方法,这样我就可以在不中断的情况下继续获得REQ/REP
响应
在实际场景中,请避免使用ZeroMQ.send()
/.recv()
方法的阻塞模式,而更好地使用.poll()
。
虽然这可能需要更多的SLOC代码,但结果会让你处于控制之中,而阻塞SLOC会从你的代码中获得所有的控制权,在下一条消息传递之前(如果有的话),你无法对此做太多。这是一种非常错误的设计实践,除了最简单的教科书示例外,这些示例实际上是对现实世界的某种反模式。
因此,不要期望问题2以某种方式神奇地得到解决,这不是ZeroMQ API的一部分(出于许多相当响亮的原因)。如果API版本和使用上下文允许,最好在.setsockopt( ZMQ.REQ_RELAXED, 1 )
之间做出决定,或者根本不使用琐碎的REQ/REP
模式(因为已知有陷入无法解决的相互死锁的风险(请参阅我关于这个主题的其他帖子,其中对这种现象进行了说明和无数次解释))。
以类似的方式,如果您从未阅读过ZeroMQ规范和/或文档以及ZeroMQ"最佳实践",那么问问题1似乎是合理的。在这方面花了一些时间后,你的选择会非常清楚。没有内置这样的工具来实现这一点。如果需要为自己的需要添加任何类似的非核心逻辑,可以添加一些附加组件。唯一间接影响aSocket.close()
行为的设置在.setsockopt( ZMQ.LINGER, 0 )
中可用,这可能有助于防止系统在aSocket
无限等待消息队列仍然为非空(消息仍在等待传递)的情况下进入有效挂断状态。
进入分布式系统设计就像进入了一个新世界。没有保证序列(发生非串行代码执行路径)。无法对远程实体、它们的状态、它们的故障、它们的存在、它们的实际ZeroMQ API版本进行任何本地控制。
的确是一个充满挑战的世界。
注意:
您可能已经知道,可以在不使用代理的情况下将.connect()
aSocket
实例(最好是aSocket
实例的接入点)连接到多个远程端。通过一些额外的.setsockopt()
将ZMQ.IMMEDIATE
调整为值1,将有助于更好地管理循环分发策略,而不考虑用于实际消息传递的传输类({ tcp:// | ipc:// | vmci:// | pgm:// | epgm:// | inproc:// }
)。一切触手可及。