无法读取flink 1.10.0中的本地文件



我正在尝试读取FLINK中的本地文件。我得到了以下错误。

java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1076)
at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1030)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:957)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 5 more
2020-04-26 20:47:56,003 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

我正在使用以下API

dataStream = streamExecutionEnvironment.readTextFile(params.get("input"));

读取文件用于读取配置参数还是用作源?

PARAMS

parameters = ParameterTool.fromPropertiesFile(parameters.get("configFile"));

env.readFile(new TextInputFormat(new Path(inputFile)), inputFile, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
.uid("importFile").name("Import from file").setParallelism(1).rebalance();

我们还开发了一个库,用于直接从JSON格式的文件中读取类型化对象,并且该文件是一起解析的,而不是逐行解析的。这个图书馆在maven上有售。流式flink文件源

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final JsonFileConsumerConfig<BusinessConfiguration> config = new JsonFileConsumerConfig<>(BusinessConfiguration.class)
.setPath("s3://<YOUR_AWESOME_FILE>.json");
final JsonFileConsumer<BusinessConfiguration> consumer = new JsonFileConsumer<>(config);
env.setParallelism(parallelism);
consumer.getJsonFileStream(env)
.addSink(...);
env.execute();

相关内容

  • 没有找到相关文章