用Java API创建一个简单的一行Spark DataFrame



在Scala中,我可以从内存中的字符串创建单行DataFrame,如下所示:

val stringAsList = List("buzz")
val df = sqlContext.sparkContext.parallelize(jsonValues).toDF("fizz")
df.show()

df.show()运行时输出:

+-----+
| fizz|
+-----+
| buzz|
+-----+

现在我正试图从Java类内部做到这一点。显然JavaRDD s没有toDF(String)方法。我试过:

List<String> stringAsList = new ArrayList<String>();
stringAsList.add("buzz");
SQLContext sqlContext = new SQLContext(sparkContext);
DataFrame df = sqlContext.createDataFrame(sparkContext
    .parallelize(stringAsList), StringType);
df.show();

…但似乎还是不够。现在当df.show();执行时,我得到:

++
||
++
||
++

(空DF)所以我问:使用Java API,我如何将内存中的字符串读取到只有1行和1列的DataFrame中,并指定该列的名称?(以便df.show()与上面的Scala相同)?

如果你需要升级,我已经为Spark 2创建了2个例子:

Simple Fizz/Buzz(或foe/bar -老一代:)):

    SparkSession spark = SparkSession.builder().appName("Build a DataFrame from Scratch").master("local[*]")
            .getOrCreate();
    List<String> stringAsList = new ArrayList<>();
    stringAsList.add("bar");
    JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
    JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String row) -> RowFactory.create(row));
    // Creates schema
    StructType schema = DataTypes.createStructType(
            new StructField[] { DataTypes.createStructField("foe", DataTypes.StringType, false) });
    Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF();

2 x2数据:

    SparkSession spark = SparkSession.builder().appName("Build a DataFrame from Scratch").master("local[*]")
            .getOrCreate();
    List<String[]> stringAsList = new ArrayList<>();
    stringAsList.add(new String[] { "bar1.1", "bar2.1" });
    stringAsList.add(new String[] { "bar1.2", "bar2.2" });
    JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
    JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String[] row) -> RowFactory.create(row));
    // Creates schema
    StructType schema = DataTypes
            .createStructType(new StructField[] { DataTypes.createStructField("foe1", DataTypes.StringType, false),
                    DataTypes.createStructField("foe2", DataTypes.StringType, false) });
    Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF();

代码可从https://github.com/jgperrin/net.jgp.labs.spark下载。

您可以通过将List创建为Rdd,然后创建包含列名的Schema来实现这一点。

可能还有其他方法,这只是其中之一。

List<String> stringAsList = new ArrayList<String>();
        stringAsList.add("buzz");
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String row) -> {
                return RowFactory.create(row);
            });
StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("fizz", DataTypes.StringType, false) });
DataFrame df = sqlContext.createDataFrame(rowRDD, schema).toDF();
df.show();
//+----+
|fizz|
+----+
|buzz|

根据@jgp的建议构建。如果您想为混合类型执行此操作,您可以执行:

List<Tuple2<Integer, Boolean>> mixedTypes = Arrays.asList(
                new Tuple2<>(1, false),
                new Tuple2<>(1, false),
                new Tuple2<>(1, false));
JavaRDD<Row> rowRDD = sparkContext.parallelize(mixedTypes).map(row -> RowFactory.create(row._1, row._2));
StructType mySchema = new StructType()
                .add("id", DataTypes.IntegerType, false)
                .add("flag", DataTypes.BooleanType, false);
Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, mySchema).toDF();

这可能有助于解决@jdk2588的问题。

这篇文章提供了一个不经过sparkContext.parallelize(...)的解决方案:https://timepasstechies.com/create-spark-dataframe-java-list/

相关内容

  • 没有找到相关文章

最新更新