我有以下 scala 代码从 Spark 中提取数据:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.functions._
val emailDF = loadTable("email")
.where(s"topic = '${Topics.Email}'")
.cache()
val df = emailDF.withColumn("rank",row_number()
.over(Window.partitionBy("email_address")
.orderBy(desc("created_at"))))
val resultDf = df.filter(s"rank == 1").drop("rank")
运行代码时出现此错误:
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
四处搜索以发现我需要添加 Hive 依赖项,这是我更新的依赖项:
build.sbt
val sparkVersion = "1.6.3"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided"
)
但是我仍然遇到同样的错误。
尝试了HiveContext方法:
val emailDF = Email.load()
.filter(col(Email.TopicId).isin(Topics.Email))
.filter(col(Email.OptIn).isin(optInFlag))
.cache()
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
logger.info(s"sc: ${sc.appName}, ${sc.sparkUser}")
emailDF.registerTempTable("emailDFTable")
val df = hiveContext.sql("""SELECT *,
row_number() over(partition by email_address order by event_at desc) AS rank
FROM emailDFTable""")
val resultDf = df.filter(s"rank == 1").drop("rank")
现在我得到了错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: emailDFTable; line 3 pos 30
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:305)
我尝试的另一种方法:
val windowSpec = Window.partitionBy(col(EmailChannel.EmailAddress)).orderBy(col(EmailChannel.EventAt).desc)
val resultDf = emailDF.withColumn("maxEventAt", first("event_at").over(windowSpec))
.select("*").where(col("maxEventAt") === col(EmailChannel.EventAt))
.drop("maxEventAt")
然后再次出现类似的错误:
org.apache.spark.sql.AnalysisException: Could not resolve window function 'first_value'. Note that, using window functions currently requires a HiveContext;
我真的不明白我有导入 hiveContext 并添加了 spark-hive 依赖项,为什么它不起作用。 我能想到的一件事是我们使用 datastax spark,所以我们在 build.sbt 中有以下缺陷
"com.datastax.spark" %% "spark-cassandra-connector" % "1.6.11",
我也需要一个datastax.spark.hive吗? 但是没有看到这样的库存在。
我也显示我的电子邮件DF:电子邮件DF.show(假) 它里面有很多数据,而不是空的。
==== 更新 ====
是的,切换到HiveContext工作,我没有注意到在代码开头初始化了SparkContext和SQLContext,而不是使用HiveContext切换SQLContext,我尝试从SparkContext创建一个新的HiveContext:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
这就是为什么它不起作用的原因。 在我将SQLContext更改为HiveContext后,它工作正常。
从
implicit val sc: SparkContext = new SparkContext(sparkConfig)
implicit val sqlContext: SQLContext = new SQLContext(sc)
自
implicit val sc: SparkContext = new SparkContext(sparkConfig)
implicit val sqlContext: HiveContext = new HiveContext(sc)
Spark 1.6 Windowing 函数中的
仅适用于HiveContext。使用 sparkContext(sc) 创建 HiveContext。
import org.apache.spark.sql.hive.HiveContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
将数据帧注册为临时表,并使用 hiveContext 对临时表运行查询。
emailDF.registerTempTable("emailDFTable")
将数据帧注册为临时表后,请检查您的临时表。
hiveContext.sql("SHOW tables").show()
+--------+------------+-----------+
|database| tableName|isTemporary|
+--------+------------+-----------+
| |emaildftable| true|
+--------+------------+-----------+
现在,您可以查询临时表。
val df = hiveContext.sql("""SELECT *,
row_number() over(partition by email_address order by created_at desc) AS rank
FROM emailDFTable""")
让我知道它是怎么回事。