具有2个传入边缘的处理器-当在一个边缘返回false时,请继续从同一边缘重新处理,而不要在另一个边缘处理新项目



我要求确认我对tryProcess((逻辑的假设。

详细说明返回值(真/假(如何影响处理器上的DAG工作流,该处理器有2条传入边,但没有指定优先级。

我的假设是,如果处理器的两个边缘都有传入项,并且其中一个tryProcess((返回false,则将处理另一个边缘(如果在这样的边缘上有更多传入项可用(。根据哪个边停止处理和哪个边接受传入项,交替传入项。

问题

有时会发生这样的情况:一个处理器实例阻塞tryProcess(#0(,它总是返回false(因为我们希望从另一边处理新项(。tryProcess(#0(被重复调用,而tryProcess(#1(从未被调用。我确信completeEdge((从来没有在处理器上被调用过,无论是对于#0还是#1边缘,所以我希望从边缘#1有更多的项目要处理。这种情况通常发生在多次运行同一作业之后。

为了更好地解释这个问题,这是我的用例:

用例

我的数据模型由以下对象组成

  • A:由"ida"属性标识的对象
  • B: 由"idb"属性标识的对象。它引用了使用"ida"值的a
  • AB:将B对象与其引用的A对象耦合的对象

我需要将B对象与正确引用的A对象匹配,并发射其中的几个对象。

我有一个DAG与这个设置:

顶点

  • S-A:类型为"A"的源项(localParallelism(1(,发出按"ida"属性排序的A对象(
  • S-B:类型为"B"的源项(localParallelism(1(,发出按引用的"ida"属性排序的B对象(
  • C-AB:将B对象与引用的A对象匹配的处理器(发射AB对象(

连接

  • S-A->C-AB:传入边缘#0(未指定优先级,由"ida"属性分区(
  • S-B->C-AB:传入边缘#1(未指定优先级,通过引用"ida"属性进行分区(

环境由一个有2个节点的榛色喷流集群组成。

逻辑

C-AB处理器获取第一个"A"对象(来自边#0(,并将其保留,直到与该"A"物体相关的所有"B"物体都得到处理。如果它接收到另一个"A"对象,则在tryProcess(#0(中返回false。

当它接收与当前"A"匹配的"B"对象(来自边#1(时,它会发出"AB"。

当处理器接收到一个"B"对象并引用下一个"a"对象时,它将丢弃当前的"a"并等待下一个。

如果它在拥有引用的"A"对象之前接收到"B"对象,请等待正确的"A"匹配,如果接收到新的"B",则在tryProcess(#1(中返回false。

这应该是有效的,因为S-A和S-B发出正确排序的对象,并且边缘被正确分区,以便将具有相同"ida"值的对象发送到同一处理器。

我的假设是,如果处理器的两条边都有传入项,并且其中一条tryProcess((返回false,则将处理另一条边(如果在这条边上有更多传入项可用(。

这个假设是错误的。处理器的行为相当于

for (Object item : inbox) process(item);

但是使用协作多线程来实现,这就是为什么这个循环必须能够"挂起"自己。我们通过让tryProcess()返回false来实现暂停。

执行引擎总是从它停止的地方恢复处理器,并且在收件箱清空之前不会尝试处理任何其他项目。收件箱本身包含从输入队列中提取的一批项目,而不是边缘在整个作业过程中传输的所有项目。

Jet为解决边之间的共同依赖性提供的唯一机制是边优先级。如果您需要更细粒度的东西,那么您的处理器应该接受所有项目,并在内部缓冲它们,直到您的进度条件得到满足。

最新更新