>最近我正在尝试使用Apache Flink进行快速批处理。 我有一个带有列:值和不相关的索引列的表
基本上我想计算每 5 行值的平均值和范围。然后,我将根据我刚刚计算的平均值计算平均值和标准差。所以我想最好的方法是使用Tumble
窗口。
看起来像这样
DataSet<Tuple2<Double, Integer>> rawData = {get the source data};
Table table = tableEnvironment.fromDataSet(rawData);
Table groupedTable = table
.window(Tumble.over("5.rows").on({what should I write?}).as("w")
.groupBy("w")
.select("f0.avg, f0.max-f0.min");
{The next step is to use groupedTable to calculate overall mean and stdDev}
但我不知道该用.on()
写什么.我已经尝试过"proctime"
但它说没有这样的输入。我只是希望它在从源读取时按顺序分组。但它必须是一个时间属性,所以我不能使用"f2"
- 索引列也作为排序。
我必须添加时间戳才能执行此操作吗?批处理中是否有必要,是否会减慢计算速度?解决这个问题的最佳方法是什么?
更新:我尝试在表 API 中使用滑动窗口,它让我异常。
// Calculate mean value in each group
Table groupedTable = table
.groupBy("f0")
.select("f0.cast(LONG) as groupNum, f1.avg as avg")
.orderBy("groupNum");
//Calculate moving range of group Mean using sliding window
Table movingRangeTable = groupedTable
.window(Slide.over("2.rows").every("1.rows").on("groupNum").as("w"))
.groupBy("w")
.select("groupNum.max as groupNumB, (avg.max - avg.min) as MR");
例外情况是:
线程"main"java.lang 中的异常。不支持操作异常:当前不支持在事件时间对滑动组窗口进行计数。
at org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.createEventTimeSlidingWindowDataSet(DataSetWindowAggregate.scala:456(
at org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:139(
。
这是否意味着表 API 不支持滑动窗口?如果我没记错的话,数据集 API 中没有窗口函数。那么如何在批处理过程中计算移动范围呢?
window
子句用于定义基于窗口函数(如Tumble
或Session
(的分组。除非指定行的顺序,否则表 API(或 SQL(中未明确定义每 5 行分组。这是在Tumble
函数的on
子句中完成的。由于此功能源自流处理,因此on
子句需要一个时间戳属性。
您可以使用currentTimestamp()
函数获取当前时间的时间戳。但是,我应该指出 Flink 会对数据进行排序,因为它不知道函数的单调属性。此外,所有这些都将以并行度 1 进行,因为没有允许分区的子句。
或者,您也可以实现用户定义的标量函数,该函数将索引属性转换为时间戳(实际上是 Long 值(。但同样,Flink 将完成完整的数据排序。