Kotlin与spark从POJO中创建数据框架,其中包含POJO类



我有一个kotlin数据类,如下所示

data class Persona_Items(
val key1:Int = 0,
val key2:String = "Hello")
data class Persona(
val persona_type: String,
val created_using_algo: String,
val version_algo: String,
val createdAt:Long,
val listPersonaItems:List<Persona_Items>)

data class PersonaMetaData
(val user_id: Int,
val persona_created: Boolean,
val persona_createdAt: Long,
val listPersona:List<Persona>)
fun main() {
val personalItemList1 = listOf(Persona_Items(1), Persona_Items(key2="abc"), Persona_Items(10,"rrr"))
val personalItemList2 = listOf(Persona_Items(10), Persona_Items(key2="abcffffff"),Persona_Items(20,"rrr"))
val persona1 = Persona("HelloWorld","tttAlgo","1.0",10L,personalItemList1)
val persona2 = Persona("HelloWorld","qqqqAlgo","1.0",10L,personalItemList2)
val personMetaData = PersonaMetaData(884,true,1L, listOf(persona1,persona2))
val spark = SparkSession
.builder()
.master("local[2]")
.config("spark.driver.host","127.0.0.1")
.appName("Simple Application").orCreate

val rdd1: RDD<PersonaMetaData> = spark.toDS(listOf(personMetaData)).rdd()
val df = spark.createDataFrame(rdd1, PersonaMetaData::class.java)
df.show(false)
}

当我尝试创建一个数据帧时,我得到下面的错误。主线程异常java.lang.UnsupportedOperationException: Schema for type src。不支持角色

这是否意味着对于数据类列表,不支持创建数据框架?请帮助我理解上面的代码缺少什么。

为Apache Spark使用Kotlin API可能会容易得多(完全披露:我是该API的作者)。有了它,你的代码可能看起来像这样:

withSpark {
val ds = dsOf(Persona_Items(1), Persona_Items(key2="abc"), Persona_Items(10,"rrr")))
// rest of logics here
}

问题是Spark不支持开箱的数据类,我们不得不做——Kotlin中没有像import spark.implicits._这样的东西,所以我们不得不做额外的步骤来使它自动工作。

在Scala中,import spark.implicits._需要自动编码你的序列化和反序列化实体,在Kotlin API中,我们几乎在编译时就完成了。

Error表示Spark不知道如何序列化Person类。

嗯,它对我来说是开箱即用的。我为你创建了一个简单的应用程序来演示它看看这里,https://github.com/szymonprz/kotlin-spark-simple-app/blob/master/src/main/kotlin/CreateDataframeFromRDD.kt

你可以运行这个main,你会看到正确的内容显示出来。也许你需要修复你的构建工具配置,如果你在kotlin项目中看到一些特定于scala的东西,那么你可以检查我的构建。或者你可以在这里阅读更多内容https://github.com/JetBrains/kotlin-spark-api/blob/main/docs/quick-start-guide.md