如何将JavaDStream转换为RDD?或者有没有办法在JavaDStream的映射函数中创建新的RDD



我从kafka获得的流数据是hdfs文件的路径,我需要获取该文件的数据。

batchInputDStream.map(new Function<Tuple2<String,String>, FreshBatchInput>() {
            @Override
            public String call(Tuple2<String, String> arg0)
                    throws Exception {
                StringReader reader = new StringReader(arg0._2);
                 JAXBContext jaxbContext = JAXBContext.newInstance(FreshBatchInput.class);  
                    Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();  
                    FreshBatchInput input = (FreshBatchInput)jaxbUnmarshaller.unmarshal(reader);

                return input.getPath();    
            }
        });

这里 input.getPath() 是文件的 hdfs 路径。

没有收集JavaDstream对象的选项,否则我会通过首先收集数据而不是从文件中获取数据来使用它。

我无法在映射函数内创建新的RDD,它给出了错误任务不可序列化。

还有其他选择吗?

你可以使用 foreachRDD。它在驱动程序上执行,因此允许 rdd 操作

transformed.foreachRDD (rdd -> {
    String inputPath = doSomethingWithRDD(rdd)
    rdd.sparkContext.textFile(inputPath) ...
});

请记住,不能在转换或操作中创建 RDD - 只能在驱动程序上创建 RDD。与foreachRDD示例类似的问题在这里。这意味着,您不能在map,filter或foreachPartition中使用SparkContext

最新更新