我可以在 Flink 中的 FlatMapFunction 中在分布式缓存中注册文件吗?



我有一个列出S3中的项目的FlatMapFunction。我想在分布式文件缓存中注册每个项目。

这可能吗?

即,在我的工作中:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
...
... = myDataSet.flatMap(new S3Lister(env));

并在S3Lister文件中:

...
String id = os.getKey().substring(os.getKey().lastIndexOf('/') + 1);
env.registerCachedFile("s3://" + bucket + os.getKey(), id);
...

然后从另一个自定义 coGroup 函数中的分布式缓存访问它。

这能行吗?你甚至可以像这样传递执行环境吗?

更新

如果没有,将整个 S3 存储桶放入分布式文件缓存以用于 flink 作业的最佳方法是什么?

本质上,registerCachedFiles 方法有助于在提交作业时上传文件。因此,无法在已部署的程序中调用它。

但是从您的描述来看,为什么不直接读取 S3 文件?

您可以使用 Reach 函数而不是普通函数,然后在其中加载分布式缓存。

首先加载文件:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register a file from HDFS
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// register a local executable file (script, executable, ...)
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// define your program and execute
...
DataSet<String> input = ...
DataSet<Integer> result = input.map(new MyMapper());
...
env.execute();

然后在 ReachFunction 类中使用它:

// extend a RichFunction to have access to the RuntimeContext
public final class MyMapper extends RichMapFunction<String, Integer> {
@Override
public void open(Configuration config) {
// access cached file via RuntimeContext and DistributedCache
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
// read the file (or navigate the directory)
...
}
@Override
public Integer map(String value) throws Exception {
// use content of cached file
...
}
}

你可以在这个 Flink 文档中看到这些。

相关内容

  • 没有找到相关文章