SparkSQL DataFrame:使用缓存时,sql查询不工作



我开始使用spark来学习。我根据这个文档编写了一个简单的程序。

我的程序从文件(在HDFS集群上)读取支付日志,将其传输到一个数据框架,并在一些sql查询中使用这个数据框架。我在两种情况下运行我的程序:带和不带cache()方法。我遇到了一个奇怪的问题,如下所述:

  1. 未使用cache():

我试着运行一些查询,一切都很好。(log_zw是我的表名)

val num_records =  sqlContext.sql("select * from log_zw").count
val num_acc1 =  sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
使用缓存()
  • 我还使用了上面的两个查询。第一个查询返回正确的值,但第二个查询是而不是,它返回0

    然而,当我用另一种方法查询时:

    val num_acc1 = log_zw.filter(log_zw("ACN").contains("acc1")).count
    

    它返回了正确的结果。

    我对Spark和集群计算系统非常陌生,我不知道为什么它是这样工作的。谁能给我解释一下这个问题,特别是使用sql查询和spark方法时的不同。

    编辑:这是schema,它很简单。

    root 
     |-- PRODUCT_ID: string (nullable = true) 
     |-- CHANNEL: string (nullable = true) 
     |-- ACN: string (nullable = true) 
     |-- AMOUNT_VND: double (nullable = false) 
     |-- TRANS_ID: string (nullable = true)
    

    Edit2:这是我使用cache()时的代码:(我运行了一些查询,结果显示在代码中的注释中)

    // read tsv files
    case class LogZW(
      PRODUCT_ID: String,
      PLATFORM: String,
      CHANNEL: String,
      ACN: String,
      AMOUNT_VND: Double,
      TRANS_ID: String)
    def loadLog(filename: String): DataFrame = {
      sc.textFile(filename).map(line => line.split("t")).map(p =>
      LogZW(p(1), p(3), p(4), p(5), p(9).toDouble, p(10).substring(0,8))).toDF()
    }
    // generate schema
    val schemaString = "PRODUCT_ID PLATFORM CHANNEL ACN AMOUNT_VND TRANS_ID"
    val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    // read all files
    val HDFSFolder = "hdfs://master:54310/user/lqthang/data/*"
    val log = loadLog(HDFSFolder)
    // register table
    log.registerTempTable("log")
    log.show()
    // select a subset of log table
    val log_zw =  sqlContext.sql("select PRODUCT_ID, CHANNEL, ACN, AMOUNT_VND, TRANS_ID from log where PLATFORM = 'zingwallet' and CHANNEL not in ('CBZINGDEAL', 'VNPT') and PRODUCT_ID not in ('ZingCredit', 'zingcreditdbg') ")
    // register new table
    log_zw.show()
    log_zw.registerTempTable("log_zw")
    // cache table
    log_zw.cache()
    // this query returns incorrect value!!
    val num_acc1 =  sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
    // this query returns correct value!
    val num_acc2 =  sqlContext.sql("select * from log_zw where trim(ACN) = 'acc1' ").count
    // uncache data and try another query
    log_zw.unpersist()
    // this query also returns the correct value!!!
    val num_acc2 =  sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
    

    Edit3:我试图添加另一个cache()方法到log dataframe:

    // register table
    log.registerTempTable("log")
    log.show()
    log.cache()
    

    下面的代码与上面的相同(带log_zw.cache())。所以重要的结果是:

    // this query returns the CORRECT value!!
    val num_acc1 =  sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
    

    我们没有很多关于数据是什么的细节,但我注意到你的两个代码部分做不同的事情。

    在第一个,你做ACN = 'acc1',但在第二个你检查ACN 是否包含 'acc1'。

    所以如果ACN是'acc1',或'acc1',或'acc1',第二个位(带过滤器)将匹配

    换句话说,我打赌如果你在SQL查询中添加一个trim,你会得到不同的结果。

    那么试试这个:
    val num_records = sqlContext.sql("select * from log_zw").count val num_acc1 = sqlContext.sql("select * from log_zw where trim(ACN) = 'acc1' ").count

    相关内容

    • 没有找到相关文章