正在读取Flink中附加的文件



我们有一个遗留应用程序,它将结果作为记录写入一些本地文件。我们希望实时处理这些记录,因此我们计划使用Flink作为引擎。我知道我可以使用StreamingExecutionEnvironment#readFile读取文本文件。我们似乎需要类似于PROCESS_CONTINUOUSLY的东西,但这个标志会在每次更改时重新处理整个文件,这不是我们想要的。

当然,我可以编写自定义源代码,在其状态下保存每个文件的记录数。但我想这种检查点之类的方法可能会有一些问题——我的理由是,如果这很容易可靠地实现,那么它就已经在Flink中实现了。

如何处理这个问题有什么提示/建议吗?

只要您满足于从单个文件(每个源实例(中读取,就可以使用自定义源非常容易地做到这一点。您将需要使用运算符状态并实现检查点。状态处理和检查点看起来像这样:

public class CheckpointedFileSource implements SourceFunction<Event>, ListCheckpointed<Long> {
private long eventCnt = 0;
public void run(SourceContext<Event> sourceContext) throws Exception {
final Object lock = sourceContext.getCheckpointLock();
// skip over previously emitted events
...
while (not cancelled) {
read event from file;
synchronized (lock) {
eventCnt++;
sourceContext.collectWithTimestamp(event, timestamp);
}
}
}
@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return Collections.singletonList(eventCnt);
}
@Override
public void restoreState(List<Long> state) throws Exception {
for (Long s : state)
this.eventCnt = s;
}
}

有关完整的示例,请参阅Flink训练练习中使用的检查点出租车乘坐数据源。你必须对它进行一些调整,因为它是为读取静态文件而设计的,而不是附加到文件中的

相关内容

  • 没有找到相关文章

最新更新