间隔后连接进行聚合



我有两个流进行区间连接,流A是左流,流B是右流,代码库如上:

streamA
  .keyBy((a: EventA) => a.common_key)
  .intervalJoin(
      streamB
        .keyBy((b: EventB) => b.common_key)
    )
  .between(Time.seconds(0), Time.minutes(5))
  .process(new ProcessJoinFunction<PojoA, PojoB, Result>() {
                    @Override
                    public void processElement(PojoA left, PojoB right, Context ctx, Collector<Result> out) throws Exception {
                        out.collect(Result.build(left, right));
                    }
                })

我得到了PojoA和PojoB在间歇加入后的pojo resultresult包含一些pojoA和pojoB维度和度量字段,例如:

class result {
   long userId; // it's common key
   String name; //from pojoA
   long number; // from pojoA
   String shop; // from pojoB
   long orders; // from pojoA
   double price: //from pojoA
    
   
}

情况是一个流A可能匹配多个流B,所以在加入后,我需要聚合为加入流的ordersprice的总和,并设置为pojo result。例如,有两个加入的记录:

joined record 1: (123, "nameA", 455, "shop", 3, 4.2)
joined record 2: (123, "nameA", 455, "shop", 6, 4.8)
after processing, should be: (123, "nameA", 455, "shop", 9(3+6), 1(4.2+4.8)/(3+6))

如何编写聚合函数来实现它?

您可以在.keyBy之后执行一个简单的reduce函数,如:

  .keyBy(r -> r.getUserId())
  .reduce(new YourReduceFunction))

其中YourReduceFunction看起来像:

public class YourReduceFunction implements ReduceFunction<result> {
    result reduce(result v1, result v2) {
        // calculate the sum of orders, sum of prices, and average price
        // Note you need a new sumOfPrices field to correctly calc average
        // price.
    }
}

相关内容

  • 没有找到相关文章

最新更新