如何根据PCollection的大小编写一个Beam条件



我有一个包含许多MyResult对象的PCollection。

PCollection<MyResult> myResultCollection = ....

我想检查这个PCollection,这样如果它是空的,然后插入一个虚拟的MyResult对象。

我知道count . global()可以用来计算PCollection的大小。它返回一个包含单个LONG值的PCollection。

然而,我不知道如何从PCollection中提取长值(可能不允许),这样我就可以做这样的事情:

// Psudo-Code

PCollection<MyResult> myResultCollection = ....
PCollection<Long> sizeCollection = myResultCollection.apply(Count.globally());

Long size = sizeCollection.getValue() // I know this method does not exist
if(size == 0) {
myResultCollection.add(new MyResult());
}
return myResultCollection;

编辑:

我试着实现@Louis建议的想法如下:

public class MyDummyGeneration extends SimpleFunction<Long, MyClass> { 
public MyClass apply(final Long resultCount) {
if(resultCount == 0) {
return MyUtils.createDummyMyClass();
} else {
return null;    // This caused exception
}
}
}

public class MyClassPostProcessingTransform extends PTransform<PCollection<MyClass>, PCollection<MyClass>> {
public PCollection<MyClass> expand(final PCollection<MyClass> input) {
var count = input.apply(Count.globally());
var dummyPCollection = count.apply(MapElements.via(new MyDummyGeneration()));
var collections = PCollectionList.of(diffResult).and(dummyPCollection);
return collections.apply(Flattern.pCollections());
}    
}

return null;引起异常,因为它不允许。我不知道如何表示这样的逻辑:如果长度不为零,我不希望PCollection包含任何元素。

我想澄清的一件大事是:当您编写Beam管道时,所有的计算都被延迟。这就是为什么sizeCollection.getValue()不存在的原因,因为这意味着启动管道的主程序和运行管道之间的同步。

第二件事是,我们应该从你的端到端需求开始,以便了解如何做到最好。PCollection中可能为空也可能为空的数据来自哪里?你打算在下游做什么?

几个例子:

  • 如果你要做一个下游聚合,你可以无条件地插入一个虚拟元素,在任何非空的聚合
  • 中被忽略。
  • 如果你已经在上游有一个聚合,你可以将结果作为一个默认值
  • 的侧输入。

相关内容

  • 没有找到相关文章