由于性能测量,我想逐步执行为Flink编写的Scala程序,即
execute first operator; materialize result;
execute second operator; materialize result;
...
等等。原始代码:
var filename = new String("<filename>")
var text = env.readTextFile(filename)
var counts = text.flatMap { _.toLowerCase.split("\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts.writeAsText("file://result.txt", WriteMode.OVERWRITE)
env.execute()
所以我希望var counts = text.flatMap { _.toLowerCase.split("\W+") }.map { (_, 1) }.groupBy(0).sum(1)
的执行是逐步的。
在每个操作员之后调用env.execute()
是正确的方法吗?
还是在每次操作后写入/dev/null
,即调用counts.writeAsText("file:///home/username/dev/null", WriteMode.OVERWRITE)
,然后调用env.execute()
是更好的选择?Flink真的有NullSink
这样的东西吗?
edit:我在集群上使用Flink Scala Shell,并将应用程序的并行度设置为1,以执行上述代码。
Flink默认使用流水线数据传输来提高作业执行的性能。但是,您也可以通过调用来强制批量数据传输
ExecutionEnvironment env = ...
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
这将分离两个运算符的执行(除非它们是链接的)。您可以从日志文件中获取每个任务的执行时间,也可以查看web面板。请注意,这不适用于链式运营商,即具有相同并行性且不需要网络洗牌的运营商。此外,您应该注意,使用批处理传输会增加程序的总体执行时间。我不认为在流水线数据处理器中真正分离运算符的执行时间是可能的。
在每个运算符之后调用execute()
将不起作用,因为Flink还不支持在内存中缓存结果。因此,如果执行运算符2,则需要将运算符1的结果写入某个持久存储并再次读取,或者再次执行运算符1。