我正在使用spark-sql 2.4.x版本,datastax-spark-cassandra-connector用于Cassandra-3.x版本。与卡夫卡一起。
我有一个来自 kafka 主题的一些财务数据的场景。 如公司 ID、年份、季度、销售额prev_sales数据。
val kafkaDf = sc.parallelize(Seq((15,2016, 4, 100.5,"")).toDF("companyId", "year","quarter", "sales","prev_sales")
我需要与Cassandra表中的上一年同一季度数据prev_sales,如下所示
val cassandraTabledf = sc.parallelize(Seq(
(15,2016, 3, 120.6, 320.6),
(15,2016, 2, 450.2,650.2),
(15,2016, 1, 200.7,700.7),
(15,2015, 4, 221.4,400),
(15,2015, 3, 320.6,300),
(15,2015, 2, 650.2,200),
(15,2015, 1, 700.7,100))).toDF("companyId", "year","quarter", "sales","prev_sales")
即对于 Seq((15,2016, 4, 100.5,"( 数据,它应该是 2015 年第 4 季度数据,即 221.4
所以新数据是
(15,2016, 4, 100.5,221.4(
如何做/实现这个? 我们可以显式地进行查询,但是有没有办法使用"滞后"函数在 cassandra 表上使用连接?
我认为它不需要任何leg
和lead
函数。你也可以通过join
获得你想要的输出。查看以下代码以供参考:
注意:我在kafkaDF
中添加了更多数据,以便更好地理解。
scala> kafkaDf.show(false)
+---------+----+-------+-----+----------+
|companyId|year|quarter|sales|prev_sales|
+---------+----+-------+-----+----------+
|15 |2016|4 |100.5| |
|15 |2016|1 |115.8| |
|15 |2016|3 |150.1| |
+---------+----+-------+-----+----------+
scala> cassandraTabledf.show
+---------+----+-------+-----+----------+
|companyId|year|quarter|sales|prev_sales|
+---------+----+-------+-----+----------+
| 15|2016| 3|120.6| 320.6|
| 15|2016| 2|450.2| 650.2|
| 15|2016| 1|200.7| 700.7|
| 15|2015| 4|221.4| 400|
| 15|2015| 3|320.6| 300|
| 15|2015| 2|650.2| 200|
| 15|2015| 1|700.7| 100|
+---------+----+-------+-----+----------+
scala>kafkaDf.alias("k").join(
cassandraTabledf.alias("c"),
col("k.companyId") === col("c.companyId") &&
col("k.quarter") === col("c.quarter") &&
(col("k.year") - 1) === col("c.year"),
"left"
)
.drop("prev_sales")
.select(col("k.*"), col("c.sales").alias("prev_sales"))
.withColumn("prev_sales", when(col("prev_sales").isNull, col("sales")).otherwise(col("prev_sales")))
.show()
+---------+----+-------+-----+----------+
|companyId|year|quarter|sales|prev_sales|
+---------+----+-------+-----+----------+
| 15|2016| 1|115.8| 700.7|
| 15|2016| 3|150.1| 320.6|
| 15|2016| 4|100.5| 221.4|
+---------+----+-------+-----+----------+