我希望有一些指导或帮助解决以下问题:
我在Spark数据框架中有以下数据。我想创建一个参考记录之前n天的窗口,然后使用参考值与窗口中的值计算除法。然而,我还没有弄清楚如何做这种操作,我发现的一切只是意味着,计数或求和操作在窗口。
原始数据如下:
| symbol_id | date | close | is_reference |
|----------|------------|----------|--------------|
| XXXX | 2000-01-19 | 809.9644 | FALSE |
| XXXX | 2000-01-20 | 784.274 | FALSE |
| XXXX | 2000-01-21 | 774.2831 | FALSE |
| XXXX | 2000-01-24 | 760.0106 | FALSE |
| XXXX | 2000-01-25 | 750.7335 | FALSE |
| XXXX | 2000-01-26 | 750.7335 | TRUE |
| XXXX | 2000-01-27 | 742.17 | FALSE |
| XXXX | 2000-01-28 | 749.3063 | FALSE |
| XXXX | 2000-01-31 | 750.02 | FALSE |
| XXXX | 2000-02-01 | 762.8653 | FALSE |
| XXXX | 2000-02-02 | 749.3063 | FALSE |
预期输出如下:
| symbol_id | date | close | is_reference | reference_change |
|----------|------------|----------|--------------|-------------------|
| XXXX | 2000-01-19 | 809.9644 | FALSE | 1.07889737170381 |
| XXXX | 2000-01-20 | 784.274 | FALSE | 1.04467697258748 |
| XXXX | 2000-01-21 | 774.2831 | FALSE | 1.03136878799201 |
| XXXX | 2000-01-24 | 760.0106 | FALSE | 1.0123573811479 |
| XXXX | 2000-01-25 | 750.7335 | FALSE | 1 |
| XXXX | 2000-01-26 | 750.7335 | TRUE | 1 |
| XXXX | 2000-01-27 | 742.17 | FALSE | 0.988593155893536 |
| XXXX | 2000-01-28 | 749.3063 | FALSE | 0.99809892591712 |
| XXXX | 2000-01-31 | 750.02 | FALSE | 0.999049596161621 |
| XXXX | 2000-02-01 | 762.8653 | FALSE | 1.01615992892285 |
| XXXX | 2000-02-02 | 749.3063 | FALSE | 0.99809892591712 |
我目前使用以下代码片段按symbol_id进行分区:
val window = Window.partitionBy(SYMBOL_ID)
.orderBy(col(DATE).desc)
.rowsBetween(5,0) // RangeBetween looks better but i just trying with rowsBetween for now
并尝试在reference_change
列上做类似的操作。
df
.withColumn("close_movement", $"close"/lit(col("close")
.where(col("is_reference") === true)).over(window)) // This command is wrong but its the most similar to thoughts in my mind.
因此,最后我将使用close
WHERE
is_reference = true
除以窗口上的close
,就像我们在预期输出上的reference_change
列一样。
谢谢你的帮助!
我将使用一个简单的连接:
val ref = df.filter($"is_reference")
df.join(ref, df.col("symbol_id") === ref.col("symbol_id") &&
abs(date_diff(df.col("date"), ref.col("date"))) <= 5)
.select(df.col("symbol_id"), df.col("date"), df.col("close"), df.col("is_reference"),
(df.col("close") / ref.col("close")).as("reference_change"))