远程集群Apache Flink上的RegisterInputOutput()错误



在本地版本的Apache Flink中执行以下代码不会产生任何错误,但在远程集群上会产生以下错误!感谢您的帮助!

 personRecords =  env.readCsvFile("dataPath/dataset.csv")
               .lineDelimiter("n").fieldDelimiter(",")         
               .includeFields("11").types(Integer.class, String.class);
 partitionedData = personRecords.map(new cleanerMap());

错误显示如下:

java.lang.Exception: Call to registerInputOutput() of invokable failed
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: org.apache.flink.quickstart.myProj$cleanerMap
        at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
        ... 1 more
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: org.apache.flink.quickstart.myProj$cleanerMap
        at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284)
        at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1465)
        at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.setup(ChainedMapDriver.java:39)
        at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:78)
        at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1336)
        at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:284)
        at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:86)
如果未找到用户代码类,则TaskConfig.getStubWrapper()抛出一个CorruptConfigurationException

我会检查cleanerMap类是否包含在您的程序jar文件中。

相关内容

  • 没有找到相关文章

最新更新