我已经设置了我的第一个玩具flink,我想做一个非常简单的事情:不断读取本地文件并打印内容。
问题是,每次我更新本地文件时,都会打印所有行,我希望它仅打印新添加的行。
代码段:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String path = "/home/foobar/input";
TextInputFormat inputFormat = new TextInputFormat(new Path(path));
inputFormat.setCharsetName("UTF-8");
DataStreamSource<String> ds = env.readFile(inputFormat, path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 60000l, BasicTypeInfo.STRING_TYPE_INFO);
ds.print();
env.execute("jobname02");
有人知道我在这里做错了吗?感谢您的帮助。
您没有做错任何事情,这是process_conniunly模式的记录行为:
如果watchtype设置为fileProcessingMode.process_continely, 修改文件时,其内容将完全重新处理。这 可以打破"完全符合"语义的语义,作为末尾附加数据 文件的所有内容都将被重新处理。
应用于目录时,此模式将更有用,一旦文件被完全编写,您就会将其移动。