我正在根据键对输入数据进行分组,然后在聚合器中执行 1 分钟的窗口,跳跃 30 秒。
数据由应用程序发送和使用,并且对该应用程序的需求将来可能会发生变化,因此,我认为需要未来的灵活性和快速更改。
当前逻辑描述如下:
@StreamListener("input")
public void process(KStream<String, Data> DataKStream) {
JsonSerde<DataAggregator> DataJsonSerde =
new JsonSerde<>(DataAggregator.class);
DataKStream
.groupByKey()
.windowedBy(TimeWindows.of(60000).advanceBy(30000))
.aggregate(
DataAggregator::new,
(key, Data, aggregator) -> aggregator.add(Data),
Materialized.with(Serdes.String(), DataJsonSerde)
);
}
DataAggregator.java
public class DataAggregator {
private List<String> dataList = new ArrayList<>();
public DataAggregator add(Data data) {
dataList.add(data.getId());
System.out.println(dataList);
return this;
}
public List<String> getDataList() {
return dataList;
}
}
但是,鉴于不断变化的需求,我想为用户提供通过菜单更改逻辑的可能性。
例如,用户可以根据需要更改窗口或更改数据隔离的方式。
我可能正在考虑编写几个 java 类,当用户选择特定选项时,可以打开和关闭这些类。
但我想知道是否可以做一些更好、更有活力的事情。
使用 Flink,有些东西在作业运行时无法更改——值得注意的是,作业图的拓扑和运算符的并行性。
另一方面,控制流可以在整个集群中广播,以影响对业务逻辑的动态更改。在简单的情况下,这已用于修改过滤器参数;在更复杂的情况下,它已被用于触发转换中使用的代码或机器学习模型的动态加载(例如,通过广播PMML(。
示例用例:RBEA:King的可扩展实时分析,StreamING模型,ING如何添加模型......
不太明显的是如何动态地重新配置聚合。开源欺诈检测演示(第 1 部分,第 2 部分,github(说明了如何实现这一点。
有关另一个示例,请参阅 Cogynt:没有代码的 Flink。