替换
我有以下代码。
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val baseDF = sqlContext.read.json(fileFullPath)
我的JSON有2个感兴趣的领域:造成生产力和数量。我正在寻找的
{
"sales": {
"saledate": "17Mar2008",
"sale": [{
"productid": 1,
"quantity": 10
}, {
"productid": 2,
"quantity": 1
}, {
"productid": 3,
"quantity": 3
}, {
"productid": 4,
"quantity": 5
}]
}
}
我想将其更改为Spark RDD或DF,该火花RDD或DF具有2列,造型和数量,但基于数量的多行。我想要每个数量的1。
在上面的示例产品1中有10行,产品2具有1,产品3具有3,产品4有5行,总计19行,即#行= sum(数量)。
任何帮助。我正在使用Spark 1.6.2和Scala。
这应该做的事情:
import org.apache.spark.sql.functions._
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._
val baseDF = sqlContext.read.json(fileFullPath)
val listFromQuantity = udf { quantity: Int => List.fill(quantity)(quantity) }
baseDF.select(explode($"sales.sale")).select($"col.productId", explode(listFromQuantity($"col.quantity"))).show()
返回:
+---------+--------+
|productId|quantity|
+---------+--------+
| 1| 10|
| 1| 10|
| 1| 10|
| 1| 10|
| 1| 10|
| 1| 10|
| 1| 10|
| 1| 10|
| 1| 10|
| 1| 10|
| 2| 1|
| 3| 3|
| 3| 3|
| 3| 3|
| 4| 5|
| 4| 5|
| 4| 5|
| 4| 5|
| 4| 5|
+---------+--------+
如果您想在第二列中拥有一个数量(例如,具有值1
而不是5
),则应用List.fill(quantity)(1)
List.fill(quantity)(quantity)