我正在通过Flink作业从websocket中处理数据,需要根据以下逻辑输出滚动加权平均值:
每个消息都有一个属性"0";"父"name""数量"值";通过"获取最新消息;name";并与其他最新消息相结合;"父";以获得基于";金额";以及";值";
- 父=";a";;name=";m〃;;数量=100;值=12.45
- parent=";a";;name=";n〃;;数量=40;值=14.55
- parent=";a";;name=";m〃;;数量=100;值=17.45
- parent=";a";;name=";o";;数量=24;值=13.25
- parent=";a";;name=";n〃;;数量=40;值=12.55
Msg3、4和5分别是parent:name的最新消息,因此这些消息可以用于获得"的当前加权平均值;a";。在任何时候,都不知道父母有多少孩子。加权平均值的逻辑很好。更多的是如何在Flink中键入、获取最新、聚合、平均、保持状态等。
我看过RichFlatMapFunction、AggregateFunction,但事实证明很难将它们拼凑在一起。
感谢任何帮助或想法。
使用低级构建块,您可以使用KeyedProcessFunction
构建解决方案。您可以通过parent
为事件流设置关键帧,然后使用MapState<String, Event>
来跟踪每个名称的最新事件。在处理事件时,可以发出更新的结果。有关使用MapState的KeyedProcessFunction的示例,请参阅Flink文档。
如果要使用事件时间处理,则必须决定如何处理无序事件。也许您可以忽略无序的事件,或者您需要首先按时间戳对流进行排序。
在更高级别上工作时,您可以使用Flink SQL。您可以使用按父级和名称组合划分的OVER窗口来跟踪每个父级/名称组合的最新事件,然后按父级分组并计算加权平均值(可能使用用户定义的聚合函数(。有关如何使用OVER窗口获取给定密钥的最新事件流的示例,请参阅Immerok Cookbook。
免责声明:我为Immerok工作(我写了Flink文档的那一部分(。