闪烁:打印和写入文件不起作用



我有这个管道:KafkaProducer->主题1->FlinkConsumer->主题2->KafkaConsumer

我试图提取管道每个阶段的记录时间:

在Flink java应用程序中,我做了这样的事情:

inputstream.
// To calculate flink input time
map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.printf("source time : %dn",System.nanoTime());
writeDataLineByLine("flinkinput_data.csv",-1,System.nanoTime());
return s;
}
}).
// Process
map(new MapFunction<String, String>() {
@Override
public String map(String record) throws InterruptedException {
for(int i=0;i<2;i++)
Thread.sleep(1);
return record + " mapped";
}
}).
// To calculate flink output time
map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.printf("sink time : %dn",System.nanoTime());
writeDataLineByLine("flinkoutput_data.csv",-1,System.nanoTime());
return s;
}
}).
addSink(producer);

虽然这在Intellij的迷你集群中工作,但在独立集群中不工作。有人能向我解释为什么打印和写入csv行被忽略吗?

任务管理器向stdout写入的内容都会进入每个任务管理器节点上Flink日志目录中的文件中。

相关内容

  • 没有找到相关文章

最新更新