我正在实现一个程序,该程序将整个数据框架作为参数。我知道这可能不支持Spark,但我想知道有一个很好的方法来解决我的问题。
我有这样的火花数据框架:
Item_sale_table
item_id date Sale Amount
aaa 3-11 20
aaa 3-12 21
aaa 3-13 28
... ... ...
bbb 3-11 17
bbb 3-12 12
... ... ...
ccc 3-11 9
... ... ...
item_list
item_id description
aaa xxxx
bbb xxxyx
ccc zxsa
...
我想做的是从item_list
表中获取每个项目,并从item_sale
表中收集历史数据,并在其上应用一个函数(这是一个简单的计数函数)。
因此,项目过程功能看起来像
def ItemProcess (item_id: String, Dataset: DataFrame) = {
val item_count = Dataset.filter(Dataset("item_id").contains(item_id)).count()
println(item_id,item_count)
}
和调用此功能的主要功能是
val item_count_collection = item_list.select("item_id").foreach(x => ItemProcess(x(0).toString, item_sale_table))
然后我得到
ERROR Executor: Exception in task 4.0 in stage 11.0 (TID 504)
java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652)
因此,我将整个数据帧传递给了foreach函数。我认为这是问题。但是如何纠正它?
==================
我发现我什至会得到NullPointerException
,即使我像这样嵌入了项目过程
val item_count_collection = item_list.select("item_id").foreach(x => Item_sale_table.filter(Item_sale_table("item_id").contains(x(0).toString)).count())
汇总和(选择)加入:
val item_counts = item_sale_table.groupBy("item_id").count()
可选加入:
item_list.join(item_counts, Seq("item_id"))
或使用contains
(效率降低):
item_list.join(
item_counts,
item_counts("item_id").contains(item_list("item_id"))),
"left"
)