我试图使用Spark的"lead"函数,但在使用它时我得到了一个奇怪的行为。
以下是我输入数据的示例(按"row_id"one_answers"dt_capt"排序):
row_id, dt_capt, dt_capt_time
_____________________________________________________________
1) 1-14ZBW-76, 2016-07-20 12:46:51.516005, 124651516005
2) 1-1BHPHNU, 2016-07-20 21:07:05.779006, 210705779006
3) 1-1BZ1F5B, 2016-07-20 21:07:05.779008, 210705779008
4) 1-1IE18-116,2016-07-20 09:48:52.411000, 94852411000
5) 1-1JEVXD, 2016-07-20 09:05:16.502001, 90516502001
6) 1-1JGTHE, 2016-07-20 09:04:24.183001, 90424183001
7) 1-1KQA6M8, 2016-07-20 21:06:02.483002, 210602483002
8) 1-1WI4W1P, 2016-07-20 09:04:06.163001, 90406163001
9) 1-1XIZRHX, 2016-07-20 00:00:27.646000, 27646000
10) 1-1Y932X9, 2016-07-20 16:47:51.774001, 164751774001
11) 1-1Y932X9, 2016-07-20 21:39:29.662002, 213929662002
12) 1-1YYW7-3, 2016-07-20 13:32:18.110004, 133218110004
13) 1-1YYW7-3, 2016-07-20 13:32:18.114001, 133218114001
14) 1-21CY-79, 2016-07-20 18:12:16.663003, 181216663003
15) 1-21CY-79, 2016-07-20 18:12:16.663008, 181216663008
16) 1-22BT-399, 2016-07-20 16:13:12.259003, 161312259003
17) 1-22BT-399, 2016-07-20 21:39:29.662006, 213929662006
18) 1-22BV-801, 2016-07-20 18:07:24.710001, 180724710001
19) 1-22BV-801, 2016-07-20 18:09:52.584005, 180952584005
20) 1-22BV-801, 2016-07-20 18:12:19.676002, 181219676002
我的代码:
#All the imports
from pyspark import SparkConf,SparkContext
from pyspark import HiveContext
from pyspark.sql.window import Window
from pyspark.sql.functions import lead,col,lit,udf
from pyspark.sql.types import *
import io
import sys
conf = SparkConf().set('spark.kryoserializer.buffer.max', '512m')
sc=SparkContext(conf=conf)
hc=HiveContext(sc)
#Show only errors
sc.setLogLevel("ERROR")
#Get input data
delta = hc.sql("SELECT ROW_ID,DT_CAPT,UNIX_TIMESTAMP(DT_CAPT) as DT_CAPT_TS,CAST(regexp_replace(split(LKR_GRC_S_CONTACT.dt_capt, ' ')[1], ":| [.]", '') AS BIGINT) AS dt_capt_time FROM int_lkr.lkr_grc_s_contact WHERE regexp_replace(to_date(DT_CAPT), '-', '') == "20160720" ORDER BY ROW_ID,DT_CAPT")
delta.registerTempTable("delta")
#Compute "mvt_suiv" with the lead function
w = Window().partitionBy("ROW_ID").orderBy("dt_capt_time")
delta2 = delta.select("row_id","dt_capt","dt_capt_time",lead("dt_capt_time").over(w).alias("mvt_suiv"))
和结果输出:
row_id, dt_capt, dt_capt_time, mvt_suiv
_____________________________________________________________________________
1) 1-14ZBW-76, 2016-07-20 12:46:51.516005, 124651516005, NULL
2) 1-1BHPHNU, 2016-07-20 21:07:05.779006, 210705779006, NULL
3) 1-1BZ1F5B, 2016-07-20 21:07:05.779008, 210705779008, NULL
4) 1-1IE18-116,2016-07-20 09:48:52.411000, 94852411000, NULL
5) 1-1JEVXD, 2016-07-20 09:05:16.502001, 90516502001, 171798691866
6) 1-1JGTHE, 2016-07-20 09:04:24.183001, 90424183001, NULL
7) 1-1KQA6M8, 2016-07-20 21:06:02.483002, 210602483002, NULL
8) 1-1WI4W1P, 2016-07-20 09:04:06.163001, 90406163001, NULL
9) 1-1XIZRHX, 2016-07-20 00:00:27.646000, 27646000, NULL
10) 1-1Y932X9, 2016-07-20 16:47:51.774001, 164751774001, 213929662002
11) 1-1Y932X9, 2016-07-20 21:39:29.662002, 213929662002, NULL
12) 1-1YYW7-3, 2016-07-20 13:32:18.110004, 133218110004, 133218110004
13) 1-1YYW7-3, 2016-07-20 13:32:18.114001, 133218114001, 133218114001
14) 1-21CY-79, 2016-07-20 18:12:16.663003, 181216663003, 181216663008
15) 1-21CY-79, 2016-07-20 18:12:16.663008, 181216663008, NULL
16) 1-22BT-399, 2016-07-20 16:13:12.259003, 161312259003, 213929662006
17) 1-22BT-399, 2016-07-20 21:39:29.662006, 213929662006, NULL
18) 1-22BV-801, 2016-07-20 18:07:24.710001, 180724710001, 180952584005
19) 1-22BV-801, 2016-07-20 18:09:52.584005, 180952584005, 181219676002
20) 1-22BV-801, 2016-07-20 18:12:19.676002, 181219676002, NULL
正如您所看到的,它没有正常工作。(第5、12、13行)。
第5行:它应该是"NULL",因为ROW_ID
没有下一行第12行:应该是"133218114001"而不是"133218110004"
第13行:它应该是"NULL",因为ROW_ID没有下一行。
我做错了什么吗?我尝试过字符串值和整数值,但我仍然有"lead"的奇怪行为(与"lag"相同)。我感觉Spark中的"窗口"函数仍然包含很多bug(至少在Spark 1.5中是这样)。有人能证实一下吗?
Cloudera version: 5.5.1
Spark版本: 1.5.0
我看不出你的代码有什么问题(也许我错过了),但关键是1.5.0有一个错误的窗口函数实现。检查这一束问题,也许你遇到了其中一个(或副作用):
https://issues.apache.org/jira/browse/spark - 11009在CDH 5.5.4
https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_fixed_in_55.html