我开始使用spark来学习。我根据这个文档编写了一个简单的程序。
我的程序从文件(在HDFS集群上)读取支付日志,将其传输到一个数据框架,并在一些sql查询中使用这个数据框架。我在两种情况下运行我的程序:带和不带cache()方法。我遇到了一个奇怪的问题,如下所述:
- 未使用cache():
我试着运行一些查询,一切都很好。(log_zw是我的表名)
val num_records = sqlContext.sql("select * from log_zw").count
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
- 使用缓存()
我还使用了上面的两个查询。第一个查询返回正确的值,但第二个查询是而不是,它返回0。
然而,当我用另一种方法查询时:
val num_acc1 = log_zw.filter(log_zw("ACN").contains("acc1")).count
它返回了正确的结果。
我对Spark和集群计算系统非常陌生,我不知道为什么它是这样工作的。谁能给我解释一下这个问题,特别是使用sql查询和spark方法时的不同。
编辑:这是schema,它很简单。
root
|-- PRODUCT_ID: string (nullable = true)
|-- CHANNEL: string (nullable = true)
|-- ACN: string (nullable = true)
|-- AMOUNT_VND: double (nullable = false)
|-- TRANS_ID: string (nullable = true)
Edit2:这是我使用cache()时的代码:(我运行了一些查询,结果显示在代码中的注释中)
// read tsv files
case class LogZW(
PRODUCT_ID: String,
PLATFORM: String,
CHANNEL: String,
ACN: String,
AMOUNT_VND: Double,
TRANS_ID: String)
def loadLog(filename: String): DataFrame = {
sc.textFile(filename).map(line => line.split("t")).map(p =>
LogZW(p(1), p(3), p(4), p(5), p(9).toDouble, p(10).substring(0,8))).toDF()
}
// generate schema
val schemaString = "PRODUCT_ID PLATFORM CHANNEL ACN AMOUNT_VND TRANS_ID"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// read all files
val HDFSFolder = "hdfs://master:54310/user/lqthang/data/*"
val log = loadLog(HDFSFolder)
// register table
log.registerTempTable("log")
log.show()
// select a subset of log table
val log_zw = sqlContext.sql("select PRODUCT_ID, CHANNEL, ACN, AMOUNT_VND, TRANS_ID from log where PLATFORM = 'zingwallet' and CHANNEL not in ('CBZINGDEAL', 'VNPT') and PRODUCT_ID not in ('ZingCredit', 'zingcreditdbg') ")
// register new table
log_zw.show()
log_zw.registerTempTable("log_zw")
// cache table
log_zw.cache()
// this query returns incorrect value!!
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
// this query returns correct value!
val num_acc2 = sqlContext.sql("select * from log_zw where trim(ACN) = 'acc1' ").count
// uncache data and try another query
log_zw.unpersist()
// this query also returns the correct value!!!
val num_acc2 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
Edit3:我试图添加另一个cache()方法到log
dataframe:
// register table
log.registerTempTable("log")
log.show()
log.cache()
下面的代码与上面的相同(带log_zw.cache()
)。所以重要的结果是:
// this query returns the CORRECT value!!
val num_acc1 = sqlContext.sql("select * from log_zw where ACN = 'acc1' ").count
我们没有很多关于数据是什么的细节,但我注意到你的两个代码部分做不同的事情。
在第一个,你做ACN = 'acc1'
,但在第二个你检查ACN 是否包含 'acc1'。
所以如果ACN是'acc1',或'acc1',或'acc1',第二个位(带过滤器)将匹配
换句话说,我打赌如果你在SQL查询中添加一个trim,你会得到不同的结果。
那么试试这个:
val num_records = sqlContext.sql("select * from log_zw").count
val num_acc1 = sqlContext.sql("select * from log_zw where trim(ACN) = 'acc1' ").count