将JavaDStream<String>转换为JavaRDD<String>



我有一个JavaDStream,它从外部源获取数据。我正在尝试集成Spark Streaming和SparkSQL。众所周知,JavaDStream是由JavaRDD组成的。只有当我有一个JavaRDD时,我才能应用函数applySchema()。请帮我把它转换成JavaRDD。我知道scala中有一些函数,而且它要简单得多。但是用Java帮我吧。

不能将数据流转换为RDD。正如您提到的,数据流包含RDD。访问RDD的方法是使用foreachRDD将函数应用于数据流的每个RDD。请参阅文档:https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/streaming/api/java/JavaDStreamLike.html#foreachRDD(org.apache.spark.api.java.function.Function2)

您必须首先使用forEachRDD访问数据流中的所有RDD,如下所示:

javaDStream.foreachRDD( rdd => {
    rdd.collect.foreach({
        ...
    })
})

我希望这有助于将JavaDstream转换为JavaRDD!

    JavaDStream<String> lines = stream.map(ConsumerRecord::value);
    //Create JavaRDD<Row>
    lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> rdd) {
            JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
                @Override
                public Row call(String msg) {
                    Row row = RowFactory.create(msg);
                    return row;
                }
            });
            //Create Schema
            StructType schema = DataTypes.createStructType(new StructField[] {
                    DataTypes.createStructField("value", DataTypes.StringType, true)});
            //Get Spark 2.0 session
            SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
            Dataset msgDataFrame = spark.createDataFrame(rowRDD, schema);
            msgDataFrame.show();

相关内容

  • 没有找到相关文章

最新更新