如何在Flink中传递"execution.buffer timeout"配置?我尝试在flink-conf.yaml
中添加此配置,但没有成功。我们需要在哪里设置此配置?
我试着用下面的代码编程设置它,但不知怎么的,它并没有减少我的延迟。我只是想知道我是否可以通过配置文件设置它
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(hots, port, null));
env.setBufferTimeout(2);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, streamSettings);
tableEnv.executeSql(query);
不确定我是否遗漏了什么。
更新
为了测试该属性是否被传递给Flink,我尝试在flink-conf.yaml
中将BufferTimeout
属性设置为-2,但它没有引发任何异常。当我在代码中将其设置为-2时,出现了异常。这让我觉得这处房产并没有被移交给Flink。此外,我的延迟数有一个明确的模式,从45ms到100ms连续变化。这表明Flink在交付之前发生了一些缓冲。
你能给我指一下我们在Flink github repo中设置这个的文件吗?
env.setBufferTimeout(2)
,或在flink-conf.yaml中设置execution.buffer-timeout: 2
(或通过-Dexecution.buffer-timeout=2
在命令行上设置(应该可以工作。
请注意,如果缓冲区没有超时,这不会有任何明显的影响。也许你的缓冲区通常是满的?您可以根据度量(numBytesInRemote / numBuffersInRemote
(计算平均缓冲区大小,并将其与缓冲区大小(32KiB(进行比较,以估计缓冲区超时的频率。
请注意,如果您将flink作为一个小型集群运行(换句话说,像在IDE中一样,在一个JVM中运行客户端应用程序、作业管理器和任务管理器(,则不会读取flink-conf.yaml。