我正在尝试使用Apache Flink ML包的随机异常值选择模型。
我无法弄清楚如何使用它作为 Kafka 作为数据源,我知道它需要一个数据集而不是数据流,但我似乎无法将我的 Kafka 数据流窗口化为数据集。
有没有办法将我的流视为一系列小数据集。例如,有没有办法说流中与模式匹配的每 10 个元素(按元素唯一 ID 滑动窗口(将它们视为固定大小的数据集并检测此固定大小数据集中的任何异常值?
我要创建的场景是:
数据源 -> 卡夫卡主题 1 -> Flink 预处理 -> 卡夫卡主题 2 -按 ID> Flink 组 -> 组上的异常值检测
我已经有一个预处理的工作实现,我希望 Flink 能够满足我的要求?
我想你可以创建一个基于计数的全局窗口并使用执行环境来获取数据集。类似下面的方法可能会起作用(getResult 将返回一个数据集(:
stream.
keyBy(...).
window(GlobalWindows.create).
trigger(CountTrigger.of(10)).
aggregate(new MyAggregator()).
...
class MyAggregator extends AggregateFunction[..., ..., ...] {
var valueList: List[LabeledVector] = List[LabeledVector]()
override def createAccumulator(): MyAggregator = new MyAggregator()
override def add(value: .., accumulator: MyAggregator): ... = ...
override def merge(agg1: MyAggregator, agg2: MyAggregator): ... = ...
override def getResult(accumulator: MyAggregator): ... = {
ExecutionEnvironment.getExecutionEnvironment.fromCollection(valueList)
}
}