使用scala-apark检索存储在数据帧列中的每一行的数组



以下数据帧属于我的

+-------------------------------+-------------------------+
|value                          |feeling                  |
+-------------------------------+-------------------------+
|Sam got these marks            |[sad, sad, dissappointed ]|
|I got good marks               |[happy, excited, happy]   |
+-------------------------------+-------------------------+

我想遍历这个数据帧,得到每行的marks列数组,并将marks数组用于某些计算方法。

def calculationMethod(arrayValue : Array[String]) {
//get averege of words
}

输出数据帧

+-------------------------------+-----------------------------+--------------
|value                          |feeling                   |average       |
+-------------------------------+-----------------------------------------+
|Sam got these marks            |[sad, sad, dissappointed ]|sad           |
|I got good marks               |[happy, excited, happy]   |happy         |
+-------------------------------+-----------------------------------------+

我不确定如何遍历每一行并获得第二列中的数组,该数组可以传递到我编写的方法中。另外请注意,问题中显示的数据帧是一个流数据帧。

编辑1

val calculateUDF = udf(calculationMethod _)
val editedDataFrame = filteredDataFrame.withColumn("average", calculateUDF(col("feeling"))) 
def calculationMethod(emojiArray: Seq[String]) : DataFrame {
val existingSparkSession = SparkSession.builder().getOrCreate()
import existingSparkSession.implicits._
val df = emojiArray.toDF("feeling")
val result = df.selectExpr(
"feeling",
"'U+' || trim('0' , string(hex(encode(feeling, 'utf-32')))) as unicode"
)
result
}

我得到以下错误

org.apache.spark.sql.Dataset类型的模式〔org.apache.sspark.sql.Row〕不支持

请注意,问题中提到的初始数据帧是流数据帧

编辑2

这应该是我期望的最终数据帧

+-------------------+--------------+-------------------------+
|value              |feeling       |unicode                  |
+-------------------+--------------+-------------------------+
|Sam got these marks|[😀😆😁]     |[U+1F600 U+1F606 U+1F601]|
|I got good marks   |[😄🙃]        | [U+1F604 U+1F643 ]      |
+-------------------+---------------+-------------------------+

您可以transform数组,而不是使用UDF:

val df2 = df.withColumn(
"unicode", 
expr("transform(feeling, x -> 'U+' || ltrim('0' , string(hex(encode(x, 'utf-32')))))")
)
df2.show(false)
+-------------------+------------+---------------------------+
|value              |feeling     |unicode                    |
+-------------------+------------+---------------------------+
|Sam got these marks|[😀, 😆, 😁]|[U+1F600, U+1F606, U+1F601]|
|I got good marks   |[😄, 🙃]    |[U+1F604, U+1F643]         |
+-------------------+------------+---------------------------+

最新更新