用于在 Flink 数据集上应用多个 JOIN 的分区策略



我正在使用Flink 1.4.0.

假设我有一个如下POJO

public class Rating {
public String name;
public String labelA;
public String labelB;
public String labelC;
...        
}

和一个JOIN函数:

public class SetLabelA implements JoinFunction<Tuple2<String, Rating>, Tuple2<String, String>, Tuple2<String, Rating>> {
@Override
public Tuple2<String, Rating> join(Tuple2<String, Rating> rating, Tuple2<String, String> labelA) {
rating.f1.setLabelA(labelA)
return rating;
}
}

假设我想应用一个JOIN操作来设置DataSet<Tuple2<String, Rating>>中每个字段的值,我可以执行以下操作:

DataSet<Tuple2<String, Rating>> ratings = // [...]
DataSet<Tuple2<String, Double>> aLabels = // [...]
DataSet<Tuple2<String, Double>> bLabels = // [...]
DataSet<Tuple2<String, Double>> cLabels = // [...]
...
DataSet<Tuple2<String, Rating>>
newRatings =
ratings.leftOuterJoin(aLabels, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
// key of the first input
.where("f0")
// key of the second input
.equalTo("f0")
// applying the JoinFunction on joining pairs
.with(new SetLabelA());

不幸的是,这是必要的,因为评级和所有xLabels都非常大DataSets,我被迫查看每个xlabels以找到我需要的字段值,而同时并非所有评级键都存在于每个xlabels中。

这实际上意味着我必须执行每xlabelleftOuterJoin,为此我还需要创建相应的JoinFunction实现,以利用RatingPOJO中的正确setter。

有没有任何人都可以想到的更有效的方法来解决这个问题?

就分区策略而言,我确保使用以下方式对DataSet<Tuple2<String, Rating>> ratings进行排序:

DataSet<Tuple2<String, Rating>> sorted_ratings = ratings.sortPartition(0, Order.ASCENDING).setParallelism(1);

通过将并行度设置为 1,我可以确定整个数据集将被排序。然后我使用.partitionByRange

DataSet<Tuple2<String, Rating>> partitioned_ratings = sorted_ratings.partitionByRange(0).setParallelism(N);

其中N是我的 VM 上的核心数。我在这里遇到的另一个侧面问题是,设置为 1 的第一个.setParallelism是否在管道其余部分的执行方式方面受到限制,即后续.setParallelism(N)能否更改DataSet的处理方式?

最后,我做了所有这些操作,以便当partitioned_ratingsxlabelsDataSet连接时,JOIN 操作将使用JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE完成。根据v.1.4.0Flink文档:

REPARTITION_SORT_MERGE:系统对每个输入进行分区(随机播放)(除非输入已经分区)并对每个输入进行排序(除非它已经排序)。输入通过排序输入的流合并来连接。如果一个或两个输入已排序,则此策略很好。

所以就我而言,ratings被排序(我认为),而每个xlabelsDataSets都没有,因此这是最有效的策略是有道理的。这有什么问题吗?有什么替代方法吗?

到目前为止,我还没有能够完成这个策略。似乎依靠JOINs太麻烦了,因为它们是昂贵的操作,除非真的有必要,否则应该避免它们。

例如,如果两个Datasets的大小都非常大,则应使用JOINs。如果不是,一个方便的替代方法是使用BroadCastVariables,通过该,两个Datasets中的一个(最小的)在工人之间广播,无论使用什么目的。下面显示了一个示例(为方便起见,从此链接复制)

DataSet<Point> points = env.readCsv(...);
DataSet<Centroid> centroids = ... ; // some computation
points.map(new RichMapFunction<Point, Integer>() {
private List<Centroid> centroids;
@Override
public void open(Configuration parameters) {
this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
}
@Override
public Integer map(Point p) {
return selectCentroid(centroids, p);
}
}).withBroadcastSet("centroids", centroids);

此外,由于填充 POJO 的字段意味着将反复利用非常相似的代码,因此绝对应该使用jlens来避免代码重复并编写更简洁且易于遵循的解决方案。

最新更新