根据其他列之间的操作(最小值、最大值、总和)将列添加到数据帧



我有这个数据帧:

val df = Seq(
("thin", "Cell phone", 6000, 150,  "01/01/2018"),
("Normal", "Tablet", 1500, 200, "01/01/2018"),
("Mini", "Tablet", 2000, 250, "02/01/2018"),
("Ultra thin", "Cell phone", 5000, 300, "02/01/2018"),
("Very thin", "Cell phone", 6000, 400, "03/01/2018"),
("Big", "Tablet", 4500, 250, "03/01/2018"),
("Bendable", "Cell phone", 3000, 200, "04/01/2018"),
("Fordable", "Cell phone", 3000, 150, "05/01/2018"),
("Pro", "Cell phone", 4500, 300, "06/01/2018"),
("Pro2", "Tablet", 6500, 350, "04/01/2018")).toDF("product", "category", 
"revenue", "extra", "date")

我正在尝试向此数据帧添加一个Column,其中包含基于列revenueextra的操作。假设一个min操作,以便我得到这样的Column

df.withColumn("output", min("revenue", "extra"))

我在火花函数中发现的问题是,这些minmax聚合在Column中垂直应用。但是,我在这里的目标是跨列水平应用这些概念。

谢谢

为此,您需要使用 UDF((。看看这个。

scala> val df = Seq(
|  ("thin", "Cell phone", 6000, 150,  "01/01/2018"),
|  ("Normal", "Tablet", 1500, 200, "01/01/2018"),
|  ("Mini", "Tablet", 2000, 250, "02/01/2018"),
|  ("Ultra thin", "Cell phone", 5000, 300, "02/01/2018"),
|  ("Very thin", "Cell phone", 6000, 400, "03/01/2018"),
|  ("Big", "Tablet", 4500, 250, "03/01/2018"),
|  ("Bendable", "Cell phone", 3000, 200, "04/01/2018"),
|  ("Fordable", "Cell phone", 3000, 150, "05/01/2018"),
|  ("Pro", "Cell phone", 4500, 300, "06/01/2018"),
|  ("Pro2", "Tablet", 6500, 350, "04/01/2018")).toDF("product", "category",
|  "revenue", "extra", "date")
df: org.apache.spark.sql.DataFrame = [product: string, category: string ... 3 more fields]
scala> df.printSchema
root
|-- product: string (nullable = true)
|-- category: string (nullable = true)
|-- revenue: integer (nullable = false)
|-- extra: integer (nullable = false)
|-- date: string (nullable = true)

scala> def min2col(x:Int,y:Int):Int =
| return if(x<y) x else y
min2col: (x: Int, y: Int)Int
scala>  val myudfmin2col = udf( min2col(_:Int,_:Int):Int )
myudfmin2col: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(IntegerType, IntegerType)))
scala> df.withColumn("output",myudfmin2col('extra,'revenue)).show(false)
+----------+----------+-------+-----+----------+------+
|product   |category  |revenue|extra|date      |output|
+----------+----------+-------+-----+----------+------+
|thin      |Cell phone|6000   |150  |01/01/2018|150   |
|Normal    |Tablet    |1500   |200  |01/01/2018|200   |
|Mini      |Tablet    |2000   |250  |02/01/2018|250   |
|Ultra thin|Cell phone|5000   |300  |02/01/2018|300   |
|Very thin |Cell phone|6000   |400  |03/01/2018|400   |
|Big       |Tablet    |4500   |250  |03/01/2018|250   |
|Bendable  |Cell phone|3000   |200  |04/01/2018|200   |
|Fordable  |Cell phone|3000   |150  |05/01/2018|150   |
|Pro       |Cell phone|4500   |300  |06/01/2018|300   |
|Pro2      |Tablet    |6500   |350  |04/01/2018|350   |
+----------+----------+-------+-----+----------+------+

scala>

编辑1:

scala> df.createOrReplaceTempView("product")
scala> spark.sql("select product,category,revenue,extra,date, case when revenue<extra then revenue else extra end as minextra  from product ").show(false)
+----------+----------+-------+-----+----------+--------+
|product   |category  |revenue|extra|date      |minextra|
+----------+----------+-------+-----+----------+--------+
|thin      |Cell phone|6000   |150  |01/01/2018|150     |
|Normal    |Tablet    |1500   |200  |01/01/2018|200     |
|Mini      |Tablet    |2000   |250  |02/01/2018|250     |
|Ultra thin|Cell phone|5000   |300  |02/01/2018|300     |
|Very thin |Cell phone|6000   |400  |03/01/2018|400     |
|Big       |Tablet    |4500   |250  |03/01/2018|250     |
|Bendable  |Cell phone|3000   |200  |04/01/2018|200     |
|Fordable  |Cell phone|3000   |150  |05/01/2018|150     |
|Pro       |Cell phone|4500   |300  |06/01/2018|300     |
|Pro2      |Tablet    |6500   |350  |04/01/2018|350     |
+----------+----------+-------+-----+----------+--------+

scala>

最新更新