将Spark DataFrame转换为Pojo对象



请参阅以下代码:

    //Create Spark Context
    SparkConf sparkConf = new SparkConf().setAppName("TestWithObjects").setMaster("local");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    //Creating RDD
    JavaRDD<Person> personsRDD = javaSparkContext.parallelize(persons);
    //Creating SQL context
    SQLContext sQLContext = new SQLContext(javaSparkContext);
    DataFrame personDataFrame = sQLContext.createDataFrame(personsRDD, Person.class);
    personDataFrame.show();
    personDataFrame.printSchema();
    personDataFrame.select("name").show();
    personDataFrame.registerTempTable("peoples");
    DataFrame result = sQLContext.sql("SELECT * FROM peoples WHERE name='test'");
    result.show();

在此之后,我需要将DataFrame-"result"转换为Person Object或List。提前谢谢。

DataFrame只是Dataset[Row]的一个类型别名。这些操作也被称为"非类型转换",与强类型Scala/Java数据集附带的"类型转换"形成对比。

在spark 中,从数据集[行]到数据集[人]的转换非常简单

DataFrame result = sQLContext.sql("SELECT * FROM peoples WHERE name='test'");

此时,Spark将数据转换为DataFrame=Dataset[Row],这是一个通用Row对象的集合,因为它不知道确切的类型。

// Create an Encoders for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class); 
Dataset<Person> personDF = result.as(personEncoder);
personDF.show();

现在,Spark根据类Person的指示,转换Dataset[Row]->Dataset[Person]类型特定的Scala/Java JVM对象。

有关的更多详细信息,请参阅databricks提供的以下链接

https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

DataFrame存储为Row s,因此您可以使用那里的方法从非类型化转换为类型化。看看get方法。

如果有人在寻找Dataset<Row>中的json字符串列转换为Dataset<PojoClass>

样本pojo:测试

@Data
public class Testing implements Serializable {
    private String name;
    private String dept;
}

在上面的代码中,@Data是Lombok的一个注释,用于为这个Testing类生成getter和setter。

Spark中的实际转换逻辑

@Test
void shouldConvertJsonStringToPojo() {
   var sparkSession  = SparkSession.builder().getOrCreate(); 
   var structType =  new StructType(new StructField[] {
        new StructField("employee", DataTypes.StringType, false, Metadata.empty()),
    });
    var ds = sparkSession.createDataFrame(new ArrayList<>(
        Arrays.asList(RowFactory.create(new Object[]{"{ "name": "test", "dept": "IT"}"}))), structType);
    var objectMapper = new ObjectMapper();
    var bean = Encoders.bean(Testing.class);
    var testingDataset = ds.map((MapFunction<Row, Testing>) row -> {
        var dept = row.<String>getAs("employee");
        return objectMapper.readValue(dept, Testing.class);
    }, bean);
    assertEquals("test", testingDataset.head().getName());
}

相关内容

  • 没有找到相关文章

最新更新