如何在火花流应用程序中使用滞后/领先功能?



我正在使用spark-sql 2.4.x版本,datastax-spark-cassandra-connector用于Cassandra-3.x版本。与卡夫卡一起。

我有一个来自 kafka 主题的一些财务数据的场景。 如公司 ID、年份、季度、销售额prev_sales数据。

val kafkaDf = sc.parallelize(Seq((15,2016, 4, 100.5,"")).toDF("companyId", "year","quarter", "sales","prev_sales")

我需要与Cassandra表中的上一年同一季度数据prev_sales,如下所示

val cassandraTabledf = sc.parallelize(Seq(
(15,2016, 3, 120.6, 320.6),
(15,2016, 2, 450.2,650.2),
(15,2016, 1, 200.7,700.7),
(15,2015, 4, 221.4,400),
(15,2015, 3, 320.6,300),
(15,2015, 2, 650.2,200),
(15,2015, 1, 700.7,100))).toDF("companyId", "year","quarter", "sales","prev_sales")

即对于 Seq((15,2016, 4, 100.5,"( 数据,它应该是 2015 年第 4 季度数据,即 221.4

所以新数据是

(15,2016, 4, 100.5,221.4(

如何做/实现这个? 我们可以显式地进行查询,但是有没有办法使用"滞后"函数在 cassandra 表上使用连接?

我认为它不需要任何leglead函数。你也可以通过join获得你想要的输出。查看以下代码以供参考:

注意:我在kafkaDF中添加了更多数据,以便更好地理解。

scala> kafkaDf.show(false)
+---------+----+-------+-----+----------+
|companyId|year|quarter|sales|prev_sales|
+---------+----+-------+-----+----------+
|15       |2016|4      |100.5|          |
|15       |2016|1      |115.8|          |
|15       |2016|3      |150.1|          |
+---------+----+-------+-----+----------+

scala> cassandraTabledf.show
+---------+----+-------+-----+----------+
|companyId|year|quarter|sales|prev_sales|
+---------+----+-------+-----+----------+
|       15|2016|      3|120.6|     320.6|
|       15|2016|      2|450.2|     650.2|
|       15|2016|      1|200.7|     700.7|
|       15|2015|      4|221.4|       400|
|       15|2015|      3|320.6|       300|
|       15|2015|      2|650.2|       200|
|       15|2015|      1|700.7|       100|
+---------+----+-------+-----+----------+

scala>kafkaDf.alias("k").join(
cassandraTabledf.alias("c"), 
col("k.companyId") === col("c.companyId") && 
col("k.quarter") === col("c.quarter") && 
(col("k.year") - 1) === col("c.year"),
"left"
)
.drop("prev_sales")
.select(col("k.*"), col("c.sales").alias("prev_sales"))
.withColumn("prev_sales", when(col("prev_sales").isNull, col("sales")).otherwise(col("prev_sales")))
.show()
+---------+----+-------+-----+----------+
|companyId|year|quarter|sales|prev_sales|
+---------+----+-------+-----+----------+
|       15|2016|      1|115.8|     700.7|
|       15|2016|      3|150.1|     320.6|
|       15|2016|      4|100.5|     221.4|
+---------+----+-------+-----+----------+

最新更新