我有一个奇怪的问题:当我在其他处理(BulkIteration)之前在数据集上使用count()时,apache flink只会执行count()的计划并跳过我的其他操作。我在日志中找不到任何有关此内容的内容。
此外,这在我的 IDE 中不会发生。那里的所有操作都有效。只有当我通过WebUI上传它时,才会出现这种问题。
那么:这是一个普遍的问题吗?如何在不必自己计算值计数的情况下解决这个问题?
谢谢!
更新:
代码做类似的事情(嗯,我知道,这个例子不是为生产性代码设计的,但它显示了我的问题)。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple1;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
public class CountProblemExample {
public static void main(String[] args) throws Exception {
Random rnd = new Random();
int randomNumber = 100000 + rnd.nextInt(100000);
List<Double> doubles = new LinkedList<>();
for (int i = 0; i < randomNumber; i++) {
doubles.add(rnd.nextDouble());
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Double> doubleDataSet = env.fromCollection(doubles);
final int count = (int)doubleDataSet.count(); // In the UI there the code stops further execution
DataSet<Double> avgSet = doubleDataSet
.map(new MapFunction<Double, Tuple1<Double>>() {
@Override
public Tuple1<Double> map(Double value) throws Exception {
return new Tuple1<>(value);
}
})
.aggregate(Aggregations.SUM, 0)
.map(new MapFunction<Tuple1<Double>, Double>() {
@Override
public Double map(Tuple1<Double> t) throws Exception {
double avg = 0;
if (count > 0) {
avg = t.f0 / count;
}
return avg;
}
});
double avg = avgSet
.collect()
.get(0);
System.out.println(avg);
}
}
你可能忘了打电话给ExecutionEnvironment.execute()
.在调用该方法之前,不会执行数据集作业。
DataSet.count()
和DataSet.collect()
内部也会触发执行。