为什么 Flink 停止我的流应用程序



我的代码使用 readTextFile 来读取日志文件,当我在 Flink (/opt/flink-1.0.3/bin/flink run -m yarn-cluster -yn 2 /home/flink/flink-json-0.1.jar 中运行 jar 时,它成功地处理了里面的行并停止了我的应用程序,而不是等待新行。我需要一些参数来做到这一点吗?

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("hdfs:///test/ignicion.io")

提前谢谢你

您正在寻找

StreamExecutionEnvironment.readFileStream(String filePath, long intervalMillis, WatchType watchType) 

对于监视类型,您有以下选项

  • ONLY_NEW_FILES,
  • REPROCESS_WITH_APPENDED,
  • PROCESS_ONLY_APPENDED;

流来自

StreamExecutionEnvironment.readTextFile(String filePath, String charsetName)

将在读取所有文件后完成。我认为,它主要用于开发过程中的本地测试。

相关内容

  • 没有找到相关文章

最新更新