TimeWindow所有函数未在 apache flink 中调用



我在 apache flink 中有一个非常简单的流水线设置,流水线工作,我能够像这样将 processFunction 应用于输入数据流:

DataStream<MeasurementData> data = env.addSource(consumer);
DataStream<MeasurementData> dataProcessed =data.process(new FFT());
dataProcessed.print();
dataProcessed.addSink(new FlinkKafkaProducer011<>(
"localhost:9092",      // Kafka     broker host:port
OUTPUT_TOPIC,       // Topic to write to
new MeasurementDataSchema())  // Serializer
);  

现在,我想应用在特定时间的窗口上运行的ProcessWindowFunction,而不是为每个传入的数据点应用该函数。我是这样试过的:

DataStream<MeasurementData> dataProcessed = data.timeWindowAll(Time.minutes(5))
.process(new MyProcessWindowFunction());

以及 MyProcessWindowFunction(( 的定义:

public static class MyProcessWindowFunction extends ProcessAllWindowFunction<MeasurementData, MeasurementData, TimeWindow> {
public void process(Context context, Iterable<MeasurementData> input, Collector<MeasurementData> out) {
long count = 0;
for (MeasurementData data : input) {
for (int frequencyCounter = 0; frequencyCounter < data.data.size(); frequencyCounter++) {
matrices[frequencyCounter].addElement(data.u, data.v, data.data.get(frequencyCounter).get(0));
}
count++;
out.collect(data);
}
}
}

但是这个函数似乎永远不会被调用。我尝试将 print 语句放在那里,并使用调试器遍历整个程序。我错过了什么吗?任何提示都值得赞赏。

发现问题:环境设置为使用 EventTime 而不是 processingTime,而我的数据不包含任何事件时间戳。

相关内容

  • 没有找到相关文章

最新更新