SparkSQL -延迟功能



我在这篇DataBricks的帖子中看到,在SparkSql中有对窗口函数的支持,特别是我试图使用lag()窗口函数。

我有信用卡交易的行,我已经对它们进行了排序,现在我想遍历这些行,并为每一行显示交易的金额,以及当前行的金额与前一行的金额之差。

在DataBricks的帖子之后,我提出了这个查询,但是它抛出了一个异常,我不能完全理解为什么…

这是在PySpark..

是我已经创建并注册的临时表。
test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")

和异常(截断).

py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found

我真的很欣赏任何见解,这个功能是相当新的,没有太多的继续到现有的例子或其他相关的帖子。

编辑

我也尝试在没有SQL语句的情况下这样做,但仍然得到一个错误。我已经将此与Hive和SQLContext一起使用,并收到相同的错误。

windowSpec = 
Window 
    .partitionBy(h_tx_df_ordered['cc_num']) 
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])
windowSpec.rowsBetween(-1, 0)
lag_amt = 
   (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'],
    h_tx_df_ordered['trans_date'],
    h_tx_df_ordered['trans_time'],
    h_tx_df_ordered['amt'],
    lag_amt.alias("prev_amt")).show()

Traceback (most recent call last):
  File "rdd_raw_data.py", line 116, in <module>
    lag_amt.alias("prev_amt")).show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
  1. 框架规范应该以关键字ROWS而不是ROW开头
  2. 框架规范需要一个下限值

    ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
    

    UNBOUNDED关键字

    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    
  3. LAG函数根本不接受帧,所以一个正确的SQL查询与延迟可以看起来像这样

    SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
         PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time
    ) as prev_amt from tx
    

编辑:

关于SQL DSL的使用:

  1. 可以在错误信息

    中读取

    注意,使用窗口函数目前需要一个HiveContex

    确保用HiveContext而不是SQLContext初始化sqlContext

  2. windowSpec.rowsBetween(-1, 0)没有执行任何操作,但是lag函数再次不支持帧规格。

相关内容

  • 没有找到相关文章

最新更新