我有两个流进行区间连接,流A是左流,流B是右流,代码库如上:
streamA
.keyBy((a: EventA) => a.common_key)
.intervalJoin(
streamB
.keyBy((b: EventB) => b.common_key)
)
.between(Time.seconds(0), Time.minutes(5))
.process(new ProcessJoinFunction<PojoA, PojoB, Result>() {
@Override
public void processElement(PojoA left, PojoB right, Context ctx, Collector<Result> out) throws Exception {
out.collect(Result.build(left, right));
}
})
我得到了PojoA和PojoB在间歇加入后的pojo result
。result
包含一些pojoA和pojoB维度和度量字段,例如:
class result {
long userId; // it's common key
String name; //from pojoA
long number; // from pojoA
String shop; // from pojoB
long orders; // from pojoA
double price: //from pojoA
}
情况是一个流A可能匹配多个流B,所以在加入后,我需要聚合为加入流的orders
和price
的总和,并设置为pojo result
。例如,有两个加入的记录:
joined record 1: (123, "nameA", 455, "shop", 3, 4.2)
joined record 2: (123, "nameA", 455, "shop", 6, 4.8)
after processing, should be: (123, "nameA", 455, "shop", 9(3+6), 1(4.2+4.8)/(3+6))
如何编写聚合函数来实现它?
您可以在.keyBy
之后执行一个简单的reduce函数,如:
.keyBy(r -> r.getUserId())
.reduce(new YourReduceFunction))
其中YourReduceFunction
看起来像:
public class YourReduceFunction implements ReduceFunction<result> {
result reduce(result v1, result v2) {
// calculate the sum of orders, sum of prices, and average price
// Note you need a new sumOfPrices field to correctly calc average
// price.
}
}