筛选或映射数据集时无法使用 lambda 表达式



我在下面有一段代码。问题是 filter(( 中的 lambda 函数无法将数据集解析为 [TransactionReportData]。以前有没有人面对过它并可以帮助它?


val transactions = spark.read.format( "com.mongodb.spark.sql.DefaultSource") .option(uri, uriIn) .option("pipeline", s"[{ $$match: { duplicate: { $$ne: true }, time : { $$gte : $start, $$lt: $end }} }]") .schema(schema).load.as[TransactionReportData].persist(StorageLevel.MEMORY_AND_DISK_SER)
val attributedTransactions : Dataset[TransactionReportData] = transactions.filter((transaction: TransactionReportData) => {transaction.type == 2 || transaction.type == 1})

它抛出:

java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 2267, Column 82: No applicable constructor/method found for actual parameters "java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, double, double, double, int, java.lang.String, java.lang.String, long, long, java.lang.String, scala.Option, scala.Option, long, java.lang.String, scala.collection.Seq, scala.collection.Map, scala.collection.Seq, scala.collection.Seq, com.something.models.package$OsBrowserInfo, boolean, java.lang.String, java.lang.String"; candidates are: "com.something.models.package$TransactionReportData(java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, double, double, double, int, java.lang.String, java.lang.String, long, long, java.lang.String, scala.Option, scala.Option, long, java.lang.String, scala.collection.immutable.List, scala.collection.immutable.Map, scala.collection.immutable.List, scala.collection.immutable.List, com.something.models.package$OsBrowserInfo, boolean, java.lang.String, java.lang.String)”

Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 2262, Column 81: No applicable constructor/method found for actual parameters "java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, double, double, double, int, java.lang.String, java.lang.String, long, long, java.lang.String, scala.Option, scala.Option, long, java.lang.String, scala.collection.Seq, scala.collection.Map, scala.collection.Seq, scala.collection.Seq, com.something.models.package$OsBrowserInfo, boolean, java.lang.String, java.lang.String"; candidates are: "com.something.models.package$TransactionReportData(java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, java.lang.String, double, double, double, int, java.lang.String, java.lang.String, long, long, java.lang.String, scala.Option, scala.Option, long, java.lang.String, scala.collection.immutable.List, scala.collection.immutable.Map, scala.collection.immutable.List, scala.collection.immutable.List, com.something.models.package$OsBrowserInfo, boolean, java.lang.String, java.lang.String)"

以及大约 2000 个 Java 编译的 LOC

但我试过了

import spark.implicits._
val attributedTransactions : Dataset[TransactionReportData] = transactions.filter($"`type`" === 2 || $"`type`" === 1)

它奏效了。

当事务报告数据被实例化而不是过滤器时,您似乎会收到错误。我尝试了案例类并将其用于数据集的过滤器函数中,如下所示,它对我来说效果很好。把它放在下面供您参考。我在火花壳中尝试了所有这些。

case class emp(id:String,emptype:String)
val list=List(emp("1","fulltime"),emp("2","contractor"))
val rdd = sc.parallelize(list)
val ds=rdd.toDS
ds.filter((x:emp)=> x.emptype =="contractor").show // One approach
ds.filter(_.emptype == "contractor").show          // Another approach

我尝试在 docker 中运行此作业,它工作正常,哈哈。使用相同的jar,env,config spark_version。我现在认为唯一的问题是JVM,但没有证据可以证明。

最新更新