请参阅以下代码:
//Create Spark Context
SparkConf sparkConf = new SparkConf().setAppName("TestWithObjects").setMaster("local");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
//Creating RDD
JavaRDD<Person> personsRDD = javaSparkContext.parallelize(persons);
//Creating SQL context
SQLContext sQLContext = new SQLContext(javaSparkContext);
DataFrame personDataFrame = sQLContext.createDataFrame(personsRDD, Person.class);
personDataFrame.show();
personDataFrame.printSchema();
personDataFrame.select("name").show();
personDataFrame.registerTempTable("peoples");
DataFrame result = sQLContext.sql("SELECT * FROM peoples WHERE name='test'");
result.show();
在此之后,我需要将DataFrame-"result"转换为Person Object或List。提前谢谢。
DataFrame只是Dataset[Row]的一个类型别名。这些操作也被称为"非类型转换",与强类型Scala/Java数据集附带的"类型转换"形成对比。
在spark 中,从数据集[行]到数据集[人]的转换非常简单
DataFrame result = sQLContext.sql("SELECT * FROM peoples WHERE name='test'");
此时,Spark将数据转换为DataFrame=Dataset[Row],这是一个通用Row对象的集合,因为它不知道确切的类型。
// Create an Encoders for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> personDF = result.as(personEncoder);
personDF.show();
现在,Spark根据类Person的指示,转换Dataset[Row]->Dataset[Person]类型特定的Scala/Java JVM对象。
有关的更多详细信息,请参阅databricks提供的以下链接
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
DataFrame
存储为Row
s,因此您可以使用那里的方法从非类型化转换为类型化。看看get
方法。
如果有人在寻找将Dataset<Row>
中的json字符串列转换为Dataset<PojoClass>
样本pojo:测试
@Data
public class Testing implements Serializable {
private String name;
private String dept;
}
在上面的代码中,@Data
是Lombok的一个注释,用于为这个Testing
类生成getter和setter。
Spark中的实际转换逻辑
@Test
void shouldConvertJsonStringToPojo() {
var sparkSession = SparkSession.builder().getOrCreate();
var structType = new StructType(new StructField[] {
new StructField("employee", DataTypes.StringType, false, Metadata.empty()),
});
var ds = sparkSession.createDataFrame(new ArrayList<>(
Arrays.asList(RowFactory.create(new Object[]{"{ "name": "test", "dept": "IT"}"}))), structType);
var objectMapper = new ObjectMapper();
var bean = Encoders.bean(Testing.class);
var testingDataset = ds.map((MapFunction<Row, Testing>) row -> {
var dept = row.<String>getAs("employee");
return objectMapper.readValue(dept, Testing.class);
}, bean);
assertEquals("test", testingDataset.head().getName());
}