我正在使用Flink 1.4.0
.
假设我有一个如下POJO
:
public class Rating {
public String name;
public String labelA;
public String labelB;
public String labelC;
...
}
和一个JOIN
函数:
public class SetLabelA implements JoinFunction<Tuple2<String, Rating>, Tuple2<String, String>, Tuple2<String, Rating>> {
@Override
public Tuple2<String, Rating> join(Tuple2<String, Rating> rating, Tuple2<String, String> labelA) {
rating.f1.setLabelA(labelA)
return rating;
}
}
假设我想应用一个JOIN
操作来设置DataSet<Tuple2<String, Rating>>
中每个字段的值,我可以执行以下操作:
DataSet<Tuple2<String, Rating>> ratings = // [...]
DataSet<Tuple2<String, Double>> aLabels = // [...]
DataSet<Tuple2<String, Double>> bLabels = // [...]
DataSet<Tuple2<String, Double>> cLabels = // [...]
...
DataSet<Tuple2<String, Rating>>
newRatings =
ratings.leftOuterJoin(aLabels, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
// key of the first input
.where("f0")
// key of the second input
.equalTo("f0")
// applying the JoinFunction on joining pairs
.with(new SetLabelA());
不幸的是,这是必要的,因为评级和所有xLabels
都非常大DataSets
,我被迫查看每个xlabels
以找到我需要的字段值,而同时并非所有评级键都存在于每个xlabels
中。
这实际上意味着我必须执行每xlabel
leftOuterJoin
,为此我还需要创建相应的JoinFunction
实现,以利用Rating
POJO
中的正确setter。
有没有任何人都可以想到的更有效的方法来解决这个问题?
就分区策略而言,我确保使用以下方式对DataSet<Tuple2<String, Rating>> ratings
进行排序:
DataSet<Tuple2<String, Rating>> sorted_ratings = ratings.sortPartition(0, Order.ASCENDING).setParallelism(1);
通过将并行度设置为 1,我可以确定整个数据集将被排序。然后我使用.partitionByRange
:
DataSet<Tuple2<String, Rating>> partitioned_ratings = sorted_ratings.partitionByRange(0).setParallelism(N);
其中N
是我的 VM 上的核心数。我在这里遇到的另一个侧面问题是,设置为 1 的第一个.setParallelism
是否在管道其余部分的执行方式方面受到限制,即后续.setParallelism(N)
能否更改DataSet
的处理方式?
最后,我做了所有这些操作,以便当partitioned_ratings
与xlabels
DataSet
连接时,JOIN 操作将使用JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE
完成。根据v.1.4.0
Flink
文档:
REPARTITION_SORT_MERGE:系统对每个输入进行分区(随机播放)(除非输入已经分区)并对每个输入进行排序(除非它已经排序)。输入通过排序输入的流合并来连接。如果一个或两个输入已排序,则此策略很好。
所以就我而言,ratings
被排序(我认为),而每个xlabels
DataSets
都没有,因此这是最有效的策略是有道理的。这有什么问题吗?有什么替代方法吗?
到目前为止,我还没有能够完成这个策略。似乎依靠JOINs
太麻烦了,因为它们是昂贵的操作,除非真的有必要,否则应该避免它们。
例如,如果两个Datasets
的大小都非常大,则应使用JOINs
。如果不是,一个方便的替代方法是使用BroadCastVariables
,通过该,两个Datasets
中的一个(最小的)在工人之间广播,无论使用什么目的。下面显示了一个示例(为方便起见,从此链接复制)
DataSet<Point> points = env.readCsv(...);
DataSet<Centroid> centroids = ... ; // some computation
points.map(new RichMapFunction<Point, Integer>() {
private List<Centroid> centroids;
@Override
public void open(Configuration parameters) {
this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
}
@Override
public Integer map(Point p) {
return selectCentroid(centroids, p);
}
}).withBroadcastSet("centroids", centroids);
此外,由于填充 POJO 的字段意味着将反复利用非常相似的代码,因此绝对应该使用jlens
来避免代码重复并编写更简洁且易于遵循的解决方案。