FileNotFoundException :File file:/path/to/file/in.txt 不存在,或者



我正在尝试使用 flink 和 python 批处理 API 测试字数经典示例。我的问题是将数据源从 env.from_elements(( 修改为 env.read_text(((对于更大的测试用例(后,会发生错误。下面的代码描述了我的实现。

[...]
if __name__ == "__main__":
env = get_environment()
input_file = 'file:///workfile.txt/'
if len(sys.argv) != 1 and len(sys.argv) != 3:
    sys.exit("Usage: ./bin/pyflink.sh WordCount[ - <text path> <result path>]")
if len(sys.argv) == 3:
    data = env.read_text(sys.argv[1])
else:
    #data = env.from_elements("hello","world","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello","car","tree","data","hello")
    data = env.read_text(input_file)
result = data 
    .flat_map(Tokenizer()) 
    .group_by(1) 
    .reduce_group(Adder(), combinable=True) 
if len(sys.argv) == 3:
    result.write_csv(sys.argv[2])
else:
    result.output()
[...]

执行上述代码,抛出文件权限错误。更具体地说,以下消息

由以下原因引起:org.apache.flink.runtime.JobException:创建输入拆分导致错误:File file:/workfile.txt 不存在,或者运行 Flink 的用户("user"(没有足够的权限访问它。

PS:寻找解决方案,但找不到东西。如果这个问题已经解决,我将不胜感激。

我假设"workfile.txt"应该是一个相对路径。但是,不能具有方案 ("file:///"( 的相对文件。

请提供完整的绝对路径,它应该可以工作。

请注意,相对路径通常不适用于 Python API,因为我们在临时位置执行脚本。

最新更新