。
我从kafka获得的流数据是hdfs文件的路径,我需要获取该文件的数据。
batchInputDStream.map(new Function<Tuple2<String,String>, FreshBatchInput>() {
@Override
public String call(Tuple2<String, String> arg0)
throws Exception {
StringReader reader = new StringReader(arg0._2);
JAXBContext jaxbContext = JAXBContext.newInstance(FreshBatchInput.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
FreshBatchInput input = (FreshBatchInput)jaxbUnmarshaller.unmarshal(reader);
return input.getPath();
}
});
这里 input.getPath() 是文件的 hdfs 路径。
没有收集JavaDstream对象的选项,否则我会通过首先收集数据而不是从文件中获取数据来使用它。
我无法在映射函数内创建新的RDD,它给出了错误任务不可序列化。
还有其他选择吗?
你可以使用 foreachRDD。它在驱动程序上执行,因此允许 rdd 操作
transformed.foreachRDD (rdd -> {
String inputPath = doSomethingWithRDD(rdd)
rdd.sparkContext.textFile(inputPath) ...
});
请记住,不能在转换或操作中创建 RDD - 只能在驱动程序上创建 RDD。与foreachRDD示例类似的问题在这里。这意味着,您不能在map,filter或foreachPartition中使用SparkContext