我是Apache Flink 1.10中的sum one字段,像这样,我接收RabbitMQ消息并在内存中处理,最后将其保存到MySQL,sum操作代码像这样:
consumeRecord.keyBy("gameType")
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("realPumpAmount")
.addSink(new SinkFunction<ReportPump>() {
@Override
public void invoke(ReportPump value, Context context) throws Exception {
// handle sink logic
}
});
现在我想对MQ实体中的多个字段求和,如下所示:
consumeRecord.keyBy("gameType")
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("field1","field2")
.addSink(new SinkFunction<ReportPump>() {
@Override
public void invoke(ReportPump value, Context context) throws Exception {
// handle sink logic
}
});
有什么办法来实现这个目的吗?
sum
reducer只接受一个字段。你可以自己写这样的减速器:
consumeRecord.keyBy("gameType")
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((d1, d2) -> {
d1.field1 += d2.field1;
d1.field2 += d2.field2;
return d1;
})
.addSink(new SinkFunction<ReportPump>() {
@Override
public void invoke(ReportPump value, Context context) throws Exception {
// handle sink logic
}
});