我是一只新蜜蜂,面临着解决以下用例的一些挑战
用例描述:
我将在某个文件夹中收到一个带有时间戳的csv文件,例如输入。文件格式为 file_name_dd-mm-yy-hh-mm-ss.csv。
现在我的 flink 管道将以逐行的方式读取这个 csv 文件,它将被写入我的 Kafka 主题。
数据读取完成后,需要立即将此文件移动到另一个文件夹历史文件夹。
为什么我需要这个是因为:假设您的ververica服务器突然或手动停止,如果您将所有已处理的文件都放在同一位置,那么在ververica重新启动后flink将重新读取它之前处理的所有文件。因此,为了防止这种情况,这些文件需要立即将已读取的文件移动到另一个位置。
我用谷歌搜索了很多,但没有找到任何东西,所以你能指导我实现这一目标吗?
如果还需要什么,请告诉我。
开箱即用 Flink 提供了监控目录和读取它们的工具 - 通过StreamExecutionEnvironment.getExecutionEnvironment.readFile
(参见类似的堆栈溢出线程示例 - 如何在 Flink 的目录中读取新添加的文件/使用 Flink 监控新文件的数据流目录等)
查看readFile
函数的源代码,它调用 createFileInput() 方法,该方法简单地实例化ContinuousFileMonitoringFunction
,ContinuousFileReaderOperatorFactory
并配置源 -
addSource(monitoringFunction, sourceName, null, boundedness)
.transform("Split Reader: " + sourceName, typeInfo, factory);
ContinuFileMonitoringFunction实际上是大多数逻辑发生的地方。
因此,如果我要实现您的要求,我将使用自己的逻辑扩展ContinuousFileMonitoringFunction
的功能,将处理后的文件移动到历史记录文件夹中,并从此函数构造源代码。
假设run
方法在checkpointLock
内执行读取和转发 -
synchronized (checkpointLock) {
monitorDirAndForwardSplits(fileSystem, context);
}
我会说移动到检查点完成文件的历史文件夹是安全的,这些文件的修改日早于globalModificationTime
,在拆分收集时monitorDirAndForwardSplits
更新。
也就是说,我将扩展ContinuousFileMonitoringFunction
类并实现CheckpointListener
接口,并在notifyCheckpointComplete
将已处理的文件移动到历史文件夹:
public class ArchivingContinuousFileMonitoringFunction<OUT> extends ContinuousFileMonitoringFunction<OUT> implements CheckpointListener {
...
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
Map<Path, FileStatus> eligibleFiles = listEligibleForArchiveFiles(fs, new Path(path));
// do move logic
}
/**
* Returns the paths of the files already processed.
*
* @param fileSystem The filesystem where the monitored directory resides.
*/
private Map<Path, FileStatus> listEligibleForArchiveFiles(FileSystem fileSystem, Path path) {
final FileStatus[] statuses;
try {
statuses = fileSystem.listStatus(path);
} catch (IOException e) {
// we may run into an IOException if files are moved while listing their status
// delay the check for eligible files in this case
return Collections.emptyMap();
}
if (statuses == null) {
LOG.warn("Path does not exist: {}", path);
return Collections.emptyMap();
} else {
Map<Path, FileStatus> files = new HashMap<>();
// handle the new files
for (FileStatus status : statuses) {
if (!status.isDir()) {
Path filePath = status.getPath();
long modificationTime = status.getModificationTime();
if (shouldIgnore(filePath, modificationTime)) {
files.put(filePath, status);
}
} else if (format.getNestedFileEnumeration() && format.acceptFile(status)) {
files.putAll(listEligibleForArchiveFiles(fileSystem, status.getPath()));
}
}
return files;
}
}
}
然后使用自定义函数手动定义数据流:
ContinuousFileMonitoringFunction<OUT> monitoringFunction =
new ArchivingContinuousFileMonitoringFunction <>(
inputFormat, monitoringMode, getParallelism(), interval);
ContinuousFileReaderOperatorFactory<OUT, TimestampedFileInputSplit> factory = new ContinuousFileReaderOperatorFactory<>(inputFormat);
final Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
env.addSource(monitoringFunction, sourceName, null, boundedness)
.transform("Split Reader: " + sourceName, typeInfo, factory);
Flink 本身并没有提供解决方案来做到这一点。您可能需要自己构建一些东西,或者找到可以配置为处理此问题的工作流工具。
你可以在 flink 用户邮件列表中询问这个问题。我知道其他人已经写了脚本来做到这一点;也许有人可以分享一个解决方案。