使用scala将转换后的列追加到spark数据框



我正在尝试访问一个hive表,并从表/数据框架中提取和转换某些列,然后将这些新列放入新的数据框架中。我正试着用这种方法做这件事-

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val hiveDF = sqlContext.sql("select * from table_x")
val system_generated_id = hiveDF("unique_key")
val application_assigned_event_id = hiveDF("event_event_id")
val trnEventDf = sqlContext.emptyDataFrame
trnEventDf.withColumn("system_generated_id",lit(system_generated_id))

使用sbt构建时没有任何错误。但是当我尝试运行它时,我收到以下错误-

main线程异常java.lang.IllegalArgumentException:需求失败scala.Predef .require美元(Predef.scala: 221)org.apache.spark.sql.catalyst.analysis.UnresolvedStar.expand (unresolved.scala: 199)在org.apache.spark.sql.catalyst.analysis.Analyzer ResolveReferences anonfun应用10美元美元美元anonfun applyOrElse美元14.美元申请(Analyzer.scala: 354)在org.apache.spark.sql.catalyst.analysis.Analyzer ResolveReferences anonfun应用10美元美元美元anonfun applyOrElse美元14.美元申请(Analyzer.scala: 353)在scala.collection.TraversableLike anonfun flatMap美元1.美元申请(TraversableLike.scala: 251)在scala.collection.TraversableLike anonfun flatMap美元1.美元申请(TraversableLike.scala: 251)scala.collection.mutable.ResizableArray class.foreach美元(ResizableArray.scala: 59)在scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 47)scala.collection.TraversableLike class.flatMap美元(TraversableLike.scala: 251)scala.collection.AbstractTraversable.flatMap (Traversable.scala: 105)在org.apache.spark.sql.catalyst.analysis.Analyzer ResolveReferences anonfun申请美元10.美元美元applyorelse (Analyzer.scala: 353)在org.apache.spark.sql.catalyst.analysis.Analyzer ResolveReferences anonfun申请美元10.美元美元applyorelse (Analyzer.scala: 347)在org.apache.spark.sql.catalyst.plans.logical.LogicalPlan anonfun resolveOperators美元1.美元申请(LogicalPlan.scala: 57)在org.apache.spark.sql.catalyst.plans.logical.LogicalPlan anonfun resolveOperators美元1.美元申请(LogicalPlan.scala: 57)org.apache.spark.sql.catalyst.trees.CurrentOrigin .withOrigin美元(TreeNode.scala: 69)org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators (LogicalPlan.scala: 56)在org.apache.spark.sql.catalyst.analysis.Analyzer ResolveReferences苹果美元(Analyzer.scala: 347)在org.apache.spark.sql.catalyst.analysis.Analyzer ResolveReferences苹果美元(Analyzer.scala: 328)在org.apache.spark.sql.catalyst.rules.RuleExecutor anonfun执行美元1美元anonfun申请美元1.美元应用(RuleExecutor.scala: 83)在org.apache.spark.sql.catalyst.rules.RuleExecutor anonfun执行美元1美元anonfun申请美元1.美元应用(RuleExecutor.scala: 80)scala.collection.LinearSeqOptimized class.foldLeft美元(LinearSeqOptimized.scala: 111)scala.collection.immutable.List.foldLeft (List.scala: 84)在org.apache.spark.sql.catalyst.rules.RuleExecutor anonfun执行美元1.美元申请(RuleExecutor.scala: 80)在org.apache.spark.sql.catalyst.rules.RuleExecutor anonfun执行美元1.美元申请(RuleExecutor.scala: 72)scala.collection.immutable.List.foreach (List.scala: 318)org.apache.spark.sql.catalyst.rules.RuleExecutor.execute (RuleExecutor.scala: 72)org.apache.spark.sql.execution.QueryExecution.analyzed lzycompute美元(QueryExecution.scala: 36)在org.apache.spark.sql.execution.QueryExecution.analyzed (QueryExecution.scala: 36)在org.apache.spark.sql.execution.QueryExecution.assertAnalyzed (QueryExecution.scala: 34)在org.apache.spark.sql.DataFrame。(DataFrame.scala: 133)在org.apache.spark.sql.DataFrame.org apache火花sql DataFrame $ $美元美元withPlan (DataFrame.scala: 2126)org.apache.spark.sql.DataFrame.select (DataFrame.scala: 707)org.apache.spark.sql.DataFrame.withColumn (DataFrame.scala: 1188)培根.main美元(bacon.scala: 31)bacon.main (bacon.scala)在sun.reflect.NativeMethodAccessorImpl。invoke0(本地方法)在sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java: 57)sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java: 43)java.lang.reflect.Method.invoke (Method.java: 606)在.org apache引发美元美元org.apache.spark.deploy.SparkSubmit部署SparkSubmit美元$ $ runMain (SparkSubmit.scala: 731)

我想知道是什么原因导致这个错误,如果有任何其他方法来完成我正在尝试做的事情。

通常不需要为此创建新的df。当你通过给df添加一个唯一的Id来变换它时,你就得到了你想要的df。如果您想保存它,只需将其保存为一个新的hive表即可。

相关内容

  • 没有找到相关文章

最新更新