Aeron ReplayMerge从不合并



我的ReplayMerge卡在状态ATTEMPT_LIVE_JOIN,然后由于没有进展而超时。它添加了没有问题的实时目的地(我看到相应的订阅出现在aeron-stat中,并且调用了onImageAvailable回调)。最终它会完全赶上,但不会过渡到下一个状态。

经过调查,我发现有问题的检查是在函数shouldStopAndRemoveReplay中,image.activeTransportCount() >= 2是假的,因为image.activeTransportCount()停留在1。如果没有这个检查,ReplayMerge就会成功。

以下是我的ReplayMerge参数:
replayChannel = "aeron:udp"
replayDestination = "aeron:udp?endpoint=localhost:0"
liveDestination = "aeron:udp?endpoint=localhost:0|control=localhost:12345"

我试过Java客户端和c++客户端。我错过了什么?

编辑:aeron-stat在客户端给出如下图:

42:                    1 - rcv-local-sockaddr: 41 <some IP address>:54709
43:          452,985,472 - sub-pos: 24 -106708072 3000 aeron:udp?control-mode=manual @0
44:          452,985,472 - rcv-hwm: 28 -106708072 3000 aeron:udp?control-mode=manual
45:          452,985,472 - rcv-pos: 28 -106708072 3000 aeron:udp?control-mode=manual
46:                    1 - rcv-local-sockaddr: 41 0.0.0.0:39238
47:          452,971,520 - sub-pos: 24 -106708098 3000 aeron:udp?control-mode=manual @452971520
48:          452,985,472 - rcv-hwm: 89 -106708098 3000 aeron:udp?control-mode=manual
49:          452,971,520 - rcv-pos: 89 -106708098 3000 aeron:udp?control-mode=manual

第一个驱动程序订阅来自replayDestination。如你所料,所有的数字都上升了,就像正常的重播一样。

第二个来自添加的liveDestination。一旦创建,它就完全跟不上了,这与我上面的初步评估相反。sub-posrcv-pos停留在452971520的初始位置,但rcv-hwm随着重放订阅的位置一起上升。这是否表明正在接收数据,但没有在实时目的地订阅上读取数据?

我注意到ReplayMerge#image被简单地定义为

image = subscription.imageBySessionId((int)replaySessionId);

所以我试着去轮询我传递给ReplayMerge构造函数的Subscription,这样两个图像都会在内部得到轮询。

通过确保传递给ReplayMergereplayChannel是特定于会话id的,我修复了我的问题(遇到此代码)。

File ReplayMergeTest.java在aeron代码库中使用

private final String publicationChannel = new ChannelUriStringBuilder()
// ...
.tags("1," + PUBLICATION_TAG)
// ...
;
private final String replayChannel = new ChannelUriStringBuilder()
.media(CommonContext.UDP_MEDIA)
.isSessionIdTagged(true)
.sessionId(PUBLICATION_TAG)
.build();

使重放通道的会话ID设置为与标签PUBLICATION_TAG相关联的Publication的会话ID。在发布媒体驱动程序和订阅媒体驱动程序不同的情况下,这也可以工作,但您仍然需要以某种方式将Publication标记通信给订阅者,这可能不方便。

所以我要去的解决方案是从录音的录音描述符中获取会话ID要重播,在我发现AeronArchive#listRecordingsForUri(或类似)录音的早期点。

此要点显示了跨两个媒体驱动程序的工作ReplayMerge

最新更新