访问包装数组数据帧的元素并应用 UDF 函数



我正在寻找一种使用UDF将函数应用于我的数据框的方法。我的数据框看起来像这样:

+--------------------+-----+
|               TOPIC|COUNT|
+--------------------+-----+
|           [outlook]|   71|
|      [AppsOnDemand]|   12|
|  [OUTLOOK, OUTLOOK]|    1|
|             [SkyPe]|    3|
|       [Citrix, VPN]|    1|
|            [Citrix]|   31|
|               [VPN]|   51|
|      [PANDA, panda]|    1|
|      [SKYPE, SKYPE]|    2|
|             [panda]|    5|
|             [Cisco]|   75|
|       [télétravail]|   14|
|               [vpn]|    4|
|           [OUTLOOK]|  212|
|[SKYPE, télétravail]|    2|
|      [appsondemand]|    1|
|              [WIFI]|    5|
|      [CISCO, CISCO]|    4|
|              [MOOC]|    2|
|      [PANDA, Panda]|    1|
+--------------------+-----+

我的目标是在"主题"列中的列表上循环,然后将字符串从小写变为大写。因此,我需要一个简单的Scala函数,该功能将一系列字符串作为输入,并返回这些字符串的大写版本。仅处理字符串,这很简单。我只是这样做了:

import org.apache.spark.sql.functions.{array, col, count, lit, udf, upper}
DF.select($"COUNT", upper($"TOPIC")).show()

我正在尝试此操作,但它不起作用:

def myFunc(context: Array[Seq[String]]) = udf {
  (topic: Seq[String]) => context.toString().toUpperCase
}
val Df = (df
  .where('TOPIC.isNotNull)
  .select($"TOPIC", $"COUNT",
    myFunc(context)($"TOPIC").alias("NEW_TOPIC"))
  )

定义您的函数如下:

import org.apache.spark.sql.functions._
val arrayUpperCase = udf[Seq[String], Seq[String]](_.map(_.toUpperCase))

,然后

df.select($"TOPIC", $"COUNT", arrayUpperCase($"TOPIC").alias("NEW_TOPIC")).show(false)

返回

+--------------------+-----+--------------------+
|TOPIC               |COUNT|NEW_TOPIC           |
+--------------------+-----+--------------------+
|[outlook]           |71   |[OUTLOOK]           |
|[AppsOnDemand]      |12   |[APPSONDEMAND]      |
|[OUTLOOK, OUTLOOK]  |1    |[OUTLOOK, OUTLOOK]  |
|[SkyPe]             |3    |[SKYPE]             |
|[Citrix, VPN]       |1    |[CITRIX, VPN]       |
|[Citrix]            |31   |[CITRIX]            |
|[VPN]               |51   |[VPN]               |
|[PANDA, panda]      |1    |[PANDA, PANDA]      |
|[SKYPE, SKYPE]      |2    |[SKYPE, SKYPE]      |
|[panda]             |5    |[PANDA]             |
|[Cisco]             |75   |[CISCO]             |
|[télétravail]       |14   |[TÉLÉTRAVAIL]       |
|[vpn]               |4    |[VPN]               |
|[OUTLOOK]           |212  |[OUTLOOK]           |
|[SKYPE, télétravail]|2    |[SKYPE, TÉLÉTRAVAIL]|
|[appsondemand]      |1    |[APPSONDEMAND]      |
|[WIFI]              |5    |[WIFI]              |
|[CISCO, CISCO]      |4    |[CISCO, CISCO]      |
|[MOOC]              |2    |[MOOC]              |
|[PANDA, Panda]      |1    |[PANDA, PANDA]      |
+--------------------+-----+--------------------+

您应该在下面写下udf函数

import org.apache.spark.sql.functions._
def upperUdf = udf((array: collection.mutable.WrappedArray[String])=> array.map(_.toUpperCase()))

并使用withColumn称为

df.withColumn("TOPIC", upperUdf($"TOPIC"))

您应该获得输出为

+--------------------+-----+
|TOPIC               |COUNT|
+--------------------+-----+
|[OUTLOOK]           |71   |
|[APPSONDEMAND]      |12   |
|[OUTLOOK, OUTLOOK]  |1    |
|[SKYPE]             |3    |
|[CITRIX, VPN]       |1    |
|[CITRIX]            |31   |
|[VPN]               |51   |
|[PANDA, PANDA]      |1    |
|[SKYPE, SKYPE]      |2    |
|[PANDA]             |5    |
|[CISCO]             |75   |
|[TÉLÉTRAVAIL]       |14   |
|[VPN]               |4    |
|[OUTLOOK]           |212  |
|[SKYPE, TÉLÉTRAVAIL]|2    |
|[APPSONDEMAND]      |1    |
|[WIFI]              |5    |
|[CISCO, CISCO]      |4    |
|[MOOC]              |2    |
|[PANDA, PANDA]      |1    |
+--------------------+-----+

最新更新