如何禁用JGROUPS中的FIFO和重新传输协议



我是Jgroups的新手,但根据我对文档的理解,它的一个关键优势是可以禁用不需要/想要的协议元素(以获得更好的性能(。然而,当我试图禁用任何与"FIFO"交货顺序和"保证交货"有关的东西时,我得到了以下错误:

Exception in thread "main" java.lang.Exception: events [GET_DIGEST SET_DIGEST ] are required by GMS, but not provided by any of the protocols below it
at org.jgroups.stack.Configurator.sanityCheck(Configurator.java:320)
at org.jgroups.stack.Configurator.connectProtocols(Configurator.java:197)
at org.jgroups.stack.Configurator.setupProtocolStack(Configurator.java:115)
at org.jgroups.stack.Configurator.setupProtocolStack(Configurator.java:49)
at org.jgroups.stack.ProtocolStack.setup(ProtocolStack.java:475)
at org.jgroups.JChannel.init(JChannel.java:965)
at org.jgroups.JChannel.<init>(JChannel.java:148)
at org.jgroups.JChannel.<init>(JChannel.java:130)
at RpcDispatcherTest.start(RpcDispatcherTest.java:29)
at RpcDispatcherTest.main(RpcDispatcherTest.java:83)

我的xml配置文件如下:

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:org:jgroups"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
<TCP bind_addr="127.0.0.1"
bind_port="7800"
recv_buf_size="${tcp.recv_buf_size:130k}"
send_buf_size="${tcp.send_buf_size:130k}"
max_bundle_size="64K"
sock_conn_timeout="300"
enable_diagnostics="true"
thread_pool.min_threads="10"
thread_pool.max_threads="20"
thread_pool.keep_alive_time="30000"
stats = "false"
/>
<TCPPING initial_hosts="127.0.0.1[7800]"
port_range="0" stats = "false"/>
<MERGE3  min_interval="10000"
max_interval="30000" stats = "false"/>
<FD_SOCK stats = "false"/>
<FD timeout="3000" max_tries="3" stats = "false" />
<VERIFY_SUSPECT timeout="1500" stats = "false" />
<pbcast.GMS print_local_addr="true" join_timeout="2000"
view_bundling="true" stats = "false"/>
</config>

如果我注释掉最后一个协议(pgcast.GMS协议(,我不会收到错误,而且它"似乎"在一个windows虚拟机(在谷歌云上(上工作,但如果我启动第二个jvm(仍然在同一台windows机器上(,我会注意到每个jvm都在一个"单独"的集群中,看不到另一个。(在"普通tcp.xml"配置中(包括NACKA和XXXX协议(,例如

<pbcast.NAKACK2 use_mcast_xmit="false"
discard_delivered_msgs="true"
stats = "false"/>
<UNICAST3 stats = "false"/>
<!--<pbcast.STABLE desired_avg_gossip="50000"-->
<!--max_bytes="4M"/>-->

一切都"按预期"工作,即如果我在同一台windows机器上启动第二个JVM,第二个JVM似乎确实加入了第一个JVM的集群,因此在第二个JVM上发送的消息会出现在第一个JVM中,反之亦然。

那么,有没有一种方法可以禁用UNICAST3和NAKACK2(本质上,任何与FIFO排序或保证消息传递有关的东西(,但仍然包括确保"工作完整的集群"所需的逻辑,该集群还可以捕获哪些节点离开/加入集群(例如pbcast.GMS逻辑?(我不知道如何。。。。

(背景信息:我正在努力提高性能,我怀疑性能有些慢是因为"有保证的消息传递"one_answers"FIFO"协议,我认为我不需要这些协议,因为a(我使用TCP,b(消息可以按任何顺序发送。(也就是说,我假设TCP几乎是根据定义来保证消息传递的,因为这很关键。(我也在谷歌云上,我认为TCP逻辑的"保证"方面运行在高度优化的路由器上,无论如何都不允许多播,这支持了UDP多播的主要优势之一。(

最后(我确实NOT认为这是必要的(,但这是我的测试代码(这只是对JGroups 4.0附带的演示的一个轻微修改(:

import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.blocks.*;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public class RpcDispatcherTest {
JChannel channel;
RpcDispatcher disp;
RspList rsp_list;
String             props = "gs-tcp.xml"; // set by application
public static int print(int number) throws Exception {
return number;
}
public void start() throws Exception {
RequestOptions opts=new RequestOptions(ResponseMode.GET_FIRST, 1000);
channel=new JChannel(props);
disp=new RpcDispatcher(channel, this);
channel.connect("RpcDispatcherTestGroup");
final Address myCurAddress = channel.getAddress();
System.out.println("Currrent address is " + myCurAddress + " all members address are " + channel.getView().getMembers().toString());

final long t1 = System.currentTimeMillis();
final IntStream x = IntStream.range(0, 1_000_000);
final AtomicInteger cnt = new AtomicInteger();
x.asLongStream().parallel().forEach(l -> {
try {
final int i = (int) l;
if (i % (100) == 0) {
System.out.println("At " + i + " on thread  + " + Thread.currentThread().getId());
}

final MethodCall call=new MethodCall(getClass().getMethod("print", int.class));
call.setArgs(i);
final CompletableFuture<Integer> response = disp.<Integer>callRemoteMethodWithFuture(myCurAddress, call, opts);
response.thenAccept(integer -> {
if (integer % (1024*8) == 0) {
System.out.println("At " + cnt.incrementAndGet() + " Execution time for " + integer + " is " + (System.currentTimeMillis() - t1)/1000f);
}
});
} catch (Exception e) {
e.printStackTrace();
}
});
//   Util.close(disp, channel);
}
public static void main(String[] args) throws Exception {
new RpcDispatcherTest().start();
}
}

我没有找到禁用所有可靠消息传输协议的方法,因为至少GMS协议依赖于NAKACK2GMS要求上述协议提供GET_DIGEST事件,NAKACK2提供该事件。

但去掉UNICAST3协议确实帮了很大的忙,现在性能也好多了。

最新更新