我有一个DataStream,它由具有表示一批生成元素的属性的事件组成。这个属性,我们称之为"batchNumber",在我从同一个生产批次摄取的每个事件中都是常量。我每批收到多个事件。
当"batchNumber"更改时,我想分析批次中的机器性能。我的方法是使用全局流,并使用"batchNumber"作为键对其进行分区。我希望这会将全局流划分到窗口中,在这些窗口中,每个事件都具有"batchNumber"。然后我定义了一个触发器,它应该在"batchNumber"更改时触发。然后我可以在ProcessWindowFunction中分析聚合的数据。
我的问题是:
- 当prodnr发生变化时,触发器并不总是启动
- 即使它真的着火了,也只有一个元素被聚集在一起。我预计会接近200
这是我正在使用的代码。
public class batchnrTrigger extends Trigger<ImaginePaperData, GlobalWindow> {
private static final long serialVersionUID = 1L;
public batchnrTrigger() {}
private final ValueStateDescriptor<Integer> prevbatchnr = new ValueStateDescriptor<>("batchnr", Integer.class);
@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Integer> batchnrState = ctx.getPartitionedState(prevbatchnr);
if (batchnrState == null || batchnrState.value() == null || !(element.batchnr == batchnrState.value())) {
System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should fire and process elements from window!");
batchnrState.update(element.batchnr);
return TriggerResult.FIRE;
}
System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should not fire and continue ingesting elements!");
batchnrState.update(element.batchnr);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
}
}
这就是我所说的触发:
DataStream<String> imaginePaperDataStream = nifiStreamSource
.map(new ImaginePaperDataConverter())
.keyBy((ImaginePaperData event) -> event.lunum)
.window(GlobalWindows.create())
.trigger(new LunumTrigger())
.process(new ImaginePaperWindowReportFunction());
我知道这个问题和这个问题类似。但我使用的是ValueState,我认为我的解雇条件一点也不相似。
我该怎么做?
您确定要通过event.lunum为流设置密钥吗?如果你预计每个不同的lunum值大约有200个事件,那么这是有道理的。但是,如果每个lunum值每个批次只有一个事件,这将解释您所看到的行为。
此外,您确定您的事件正在按顺序处理吗?如果批处理在处理管道中的某个地方被并行进程之间的竞争条件交错,这也可能有助于解释您所看到的情况。
此外,您应该在触发器的clear方法中清除状态。您还需要实现一个Evictor到,在处理完窗口后从窗口中删除元素。
窗口API的这一部分相当复杂。我认为这个特定的应用程序可以更直接地实现为RichFlatMap,它可以收集ListState中的项,直到批号发生变化(您可以将其保存在ValueState中(。