将窗口值除以引用行



我希望有一些指导或帮助解决以下问题:

我在Spark数据框架中有以下数据。我想创建一个参考记录之前n天的窗口,然后使用参考值与窗口中的值计算除法。然而,我还没有弄清楚如何做这种操作,我发现的一切只是意味着,计数或求和操作在窗口。

原始数据如下:

| symbol_id | date       | close    | is_reference |
|----------|------------|----------|--------------|
| XXXX     | 2000-01-19 | 809.9644 | FALSE        |
| XXXX     | 2000-01-20 | 784.274  | FALSE        |
| XXXX     | 2000-01-21 | 774.2831 | FALSE        |
| XXXX     | 2000-01-24 | 760.0106 | FALSE        |
| XXXX     | 2000-01-25 | 750.7335 | FALSE        |
| XXXX     | 2000-01-26 | 750.7335 | TRUE         |
| XXXX     | 2000-01-27 | 742.17   | FALSE        |
| XXXX     | 2000-01-28 | 749.3063 | FALSE        |
| XXXX     | 2000-01-31 | 750.02   | FALSE        |
| XXXX     | 2000-02-01 | 762.8653 | FALSE        |
| XXXX     | 2000-02-02 | 749.3063 | FALSE        |

预期输出如下:

| symbol_id | date       | close    | is_reference | reference_change  |
|----------|------------|----------|--------------|-------------------|
| XXXX     | 2000-01-19 | 809.9644 | FALSE        | 1.07889737170381  |
| XXXX     | 2000-01-20 | 784.274  | FALSE        | 1.04467697258748  |
| XXXX     | 2000-01-21 | 774.2831 | FALSE        | 1.03136878799201  |
| XXXX     | 2000-01-24 | 760.0106 | FALSE        | 1.0123573811479   |
| XXXX     | 2000-01-25 | 750.7335 | FALSE        | 1                 |
| XXXX     | 2000-01-26 | 750.7335 | TRUE         | 1                 |
| XXXX     | 2000-01-27 | 742.17   | FALSE        | 0.988593155893536 |
| XXXX     | 2000-01-28 | 749.3063 | FALSE        | 0.99809892591712  |
| XXXX     | 2000-01-31 | 750.02   | FALSE        | 0.999049596161621 |
| XXXX     | 2000-02-01 | 762.8653 | FALSE        | 1.01615992892285  |
| XXXX     | 2000-02-02 | 749.3063 | FALSE        | 0.99809892591712  |

我目前使用以下代码片段按symbol_id进行分区:

val window = Window.partitionBy(SYMBOL_ID)
.orderBy(col(DATE).desc)
.rowsBetween(5,0) // RangeBetween looks better but i just trying with rowsBetween for now

并尝试在reference_change列上做类似的操作。

df
.withColumn("close_movement", $"close"/lit(col("close")
.where(col("is_reference") === true)).over(window)) // This command is wrong but its the most similar to thoughts in my mind.

因此,最后我将使用closeWHEREis_reference = true除以窗口上的close,就像我们在预期输出上的reference_change列一样。

谢谢你的帮助!

我将使用一个简单的连接:

val ref = df.filter($"is_reference")
df.join(ref, df.col("symbol_id") === ref.col("symbol_id") &&
abs(date_diff(df.col("date"), ref.col("date"))) <= 5)
.select(df.col("symbol_id"), df.col("date"), df.col("close"), df.col("is_reference"),
(df.col("close") / ref.col("close")).as("reference_change"))

最新更新