如何计算Spark结构化流中的滞后差



我正在编写一个Spark结构化流媒体程序。我需要创建一个具有滞后差异的附加列。

为了重现我的问题,我提供了代码片段。此代码使用存储在data文件夹中的data.json文件:

[
{"id": 77,"type": "person","timestamp": 1532609003},
{"id": 77,"type": "person","timestamp": 1532609005},
{"id": 78,"type": "crane","timestamp": 1532609005}
]

代码:

from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import *
spark = SparkSession 
.builder 
.appName("Test") 
.master("local[2]") 
.getOrCreate()
schema = StructType([
StructField("id", IntegerType()),
StructField("type", StringType()),
StructField("timestamp", LongType())
])
ds = spark 
.readStream 
.format("json") 
.schema(schema) 
.load("data/")
diff_window = Window.partitionBy("id").orderBy("timestamp")
ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))
query = ds 
.writeStream 
.format('console') 
.start()
query.awaitTermination()

我得到这个错误:

pyspark.sql.utils.AnalysisException:u'基于非时间的窗口不是流式数据帧/数据集支持;;\WinDow[lag(时间戳#71L,1,null(windowspecdefinition(host_id#68,时间戳#71L ASC第一个NULLS,排在1个前导和1之间PREEDING(AS prev_timestamp#129L]

pyspark.sql.utils.AnalysisException:流式数据帧/数据集不支持u'基于非时间的窗口

这意味着您的窗口应该基于timestamp列。因此,如果每秒钟都有一个数据点,并创建一个30s窗口,其中stride10s,则生成的窗口将创建一个新的window列,其中包含startend列,它们将包含差为30s的时间戳。

您应该以这种方式使用窗口:

words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))
w = F.window('date_time', '30 seconds', '10 seconds')
words = words 
.withWatermark('date_format', '1 minutes') 
.groupBy(w).agg(F.mean('value'))

最新更新