如何在Spark中传递DataFrame作为函数参数



我正在实现一个程序,该程序将整个数据框架作为参数。我知道这可能不支持Spark,但我想知道有一个很好的方法来解决我的问题。

我有这样的火花数据框架:

Item_sale_table 
    item_id   date   Sale Amount 
    aaa       3-11      20
    aaa       3-12      21
    aaa       3-13      28
    ...       ...       ...
    bbb       3-11      17
    bbb       3-12      12
    ...       ...       ...
    ccc       3-11      9
    ...       ...       ...

item_list

item_id   description
aaa          xxxx
bbb          xxxyx
ccc          zxsa
...

我想做的是从item_list表中获取每个项目,并从item_sale表中收集历史数据,并在其上应用一个函数(这是一个简单的计数函数)。

因此,项目过程功能看起来像

def ItemProcess (item_id: String, Dataset: DataFrame)  = {
      val item_count = Dataset.filter(Dataset("item_id").contains(item_id)).count()
      println(item_id,item_count)
  }

和调用此功能的主要功能是

val item_count_collection = item_list.select("item_id").foreach(x => ItemProcess(x(0).toString, item_sale_table))

然后我得到

ERROR Executor: Exception in task 4.0 in stage 11.0 (TID 504)
java.lang.NullPointerException
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
    at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652)

因此,我将整个数据帧传递给了foreach函数。我认为这是问题。但是如何纠正它?

==================

我发现我什至会得到NullPointerException,即使我像这样嵌入了项目过程

val item_count_collection = item_list.select("item_id").foreach(x => Item_sale_table.filter(Item_sale_table("item_id").contains(x(0).toString)).count())

汇总和(选择)加入:

val item_counts = item_sale_table.groupBy("item_id").count()

可选加入:

item_list.join(item_counts, Seq("item_id"))

或使用contains(效率降低):

item_list.join(
   item_counts,
   item_counts("item_id").contains(item_list("item_id"))),
  "left"
)

相关内容

  • 没有找到相关文章

最新更新