Spark sql Dataframe - import sqlContext.implicits._



我有一个创建spark上下文的main:

    val sc = new SparkContext(sparkConf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

然后创建dataframe,并对dataframe进行筛选和验证。

    val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00")
    val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0))
    // record length cannot be < 2 
    .na.drop(3)
    // round to hours
    .withColumn("time",convertToHourly($"time"))

但是当我试图通过发送数据帧到

将验证移动到另一个文件时
function ValidateAndTransform(df: DataFrame) : DataFrame = {...}

获取数据框&执行验证和转换:似乎我需要

 import sqlContext.implicits._

避免错误:" value $不是StringContext的成员"这发生在网上:.withColumn("时间",convertToHourly ( $ "时间"))

但是要使用import sqlContext.implicits._我还需要在新文件中定义sqlContext,像这样:

val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

或者发送到

function ValidateAndTransform(df: DataFrame) : DataFrame = {...}
function

我觉得我正在尝试对2个文件(main &验证)没有正确完成…

你知道怎么设计这个吗?还是简单地将sqlContext发送给函数?

谢谢!

您可以使用SQLContext的单例实例。您可以在spark存储库

中查看这个示例。
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _
  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}
...
//And wherever you want you can do
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._  

最新更新