我实际上正在处理一个流,接收一堆字符串,需要对所有字符串进行计数。 总和被加重,这意味着对于第二条记录,总和被添加到前一天输出必须是看起来像
{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
{"date" : "2018-03-03",
"sum" : 120},
{"date" :"2018-03-04",
"sum" : 203}
]
}
我创建了一个看起来像这样的流:
val eventStream : DataStream [String] =
eventStream
.addSource(source)
.keyBy("")
.TimeWindow(Time.days(1), Time.days(1))
.trigger(new MyTriggerFunc)
.aggregation(new MyAggregationFunc)
.addSink(sink)
提前感谢您的帮助:)
关于在 Flink 中使用 JSON 的说明:
使用 JSONDeserializationSchema
反序列化事件,这将产生ObjectNode
。为方便起见,您可以将ObjectNode
映射到YourObject
,也可以继续使用ObjectNode
。
使用ObjectNode
教程:http://www.baeldung.com/jackson-json-node-tree-model
回到您的案例,您可以像下面这样操作:
val eventStream : DataStream [ObjectNode] =
oneMinuteAgg
.addSource(source)
.windowAll()
.TimeWindow(Time.minutes(1))
.trigger(new MyTriggerFunc)
.aggregation(new MyAggregationFunc)
将输出 1 分钟聚合的流
[
{
"date" :2018-03-03
"sum" : 120
},
{
"date" :2018-03-03
"sum" : 120
}
]
然后将另一个运算符链接到"oneMinuteAgg",该运算符会将 1 分钟聚合添加到 1 天聚合中:
[...]
oneMinuteAgg
.windowAll()
.TimeWindow(Time.days(1))
.trigger(new Whatever)
.aggregation(new YourDayAggF)
这将输出您需要的内容
{
"aggregationType" : "day"
"days before" : 4
"aggregates : [{
"date" :2018-03-03
"sum" : 120
},
{
"date" :2018-03-03
"sum" : 120
}]
}
我曾经windowAll()
假设您不需要对流进行键控。