我有一个Dataset<Tuple3<String,String,Double>> values
,其中包含以下数据:
<Vijaya,Chocolate,5>
<Vijaya,Chips,10>
<Rahul,Chocolate,2>
<Rahul,Chips,8>
我希望DataSet<Tuple5<String,String,Double,String,Double>> values1
如下:
<Vijaya,Chocolate,5,Chips,10>
<Rahul,Chocolate,2,Chips,8>
我的代码如下所示:
DataSet<Tuple5<String, String, Double, String, Double>> values1 = values.fullOuterJoin(values)
.where(0)
.equalTo(0)
.with(
new JoinFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>, Tuple5<String, String, Double, String, Double>>() {
private static final long serialVersionUID = 1L;
public Tuple5<String, String, Double, String, Double> join(Tuple3<String, String, Double> first, Tuple3<String, String, Double> second) {
return new Tuple5<String, String, Double, String, Double>(first.f0, first.f1, first.f2, second.f1, second.f2);
}
})
.distinct(1, 3)
.distinct(1);
在上面的代码中,我尝试进行自连接。我想要该特定格式的输出,但我无法获得它。 怎么做? 请帮忙。
由于您不希望输出重复相同的项目,因此可以使用平面联接,在该联接中,您只能输出第 2 个位置的值不等于第 4 个位置的值的记录。此外,如果您只想在第二个位置使用"巧克力",也可以在 FlatJoinFunction 中检查。请在下面找到 Flink 关于扁平连接的文档链接。
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/batch/dataset_transformations.html#join-with-flat-join-function
使用GroupReduceFunction的方法:
values
.groupBy(0)
.reduceGroup(new GroupReduceFunction<Tuple3<String,String,Double>, Tuple2<String, String>>() {
@Override
public void reduce(Iterable<Tuple3<String,String,Double>> in, Collector<Tuple2<String, String>> out) {
StringBuilder output = new StringBuilder();
String name = null;
for (Tuple3<String,String,Double> item : in) {
name = item.f0;
output.append(item.f1+","+item.f2+",");
}
out.collect(new Tuple2<String, String>(name,output.toString()));
}
});