如何在Spark中将具有多条记录的列分解为多个列



我是Spark和Scala的新手,想获得一些关于这种情况的帮助:这是我当前的模式。

|-- _id: struct (nullable = true)
|    |-- oid: string (nullable = true)
|-- date: timestamp (nullable = true)
|-- horizon: double (nullable = true)
|-- risk_table: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- index: string (nullable = true)
|    |    |-- risk_buy: double (nullable = true)
|    |    |-- reward_buy: double (nullable = true)
|    |    |-- risk_sell: double (nullable = true)
|    |    |-- reward_sell: double (nullable = true)
|-- symbol_id: string (nullable = true)

以下是数据外观的示例:

+--------------------+
|          risk_table|
+--------------------+
|[{count, 201.0, 2...|
|[{count, 219.0, 2...|
|[{count, 119.0, 1...|
|[{count, 217.0, 2...|
|[{count, 17.0, 17...|
|[{count, 189.0, 1...|
|[{count, 105.0, 1...|
|[{count, 188.0, 1...|
|[{count, 111.0, 1...|
|[{count, 276.0, 2...|
|[{count, 70.0, 70...|
|[{count, 121.0, 1...|
|[{count, 133.0, 1...|
|[{count, 116.0, 1...|
|[{count, 70.0, 70...|
|[{count, 193.0, 1...|
|[{count, 131.0, 1...|
|[{count, 93.0, 93...|
|[{count, 84.0, 84...|
|[{count, 114.0, 1...|
+--------------------+

我想将risk_table列值分解为多列,通常有4个嵌套的文档/字典,其中索引名称发生了变化,因此预期的输出看起来像这个

+-----------+------+---------+------------------+--------------------+-----+---------------------+
| symbol_id | date | index_0 | risk_buy_index_0 | reward_buy_index_0 | ... | reward_sell_index_3 |
+-----------+------+---------+------------------+--------------------+-----+---------------------+
| APPL      | xxxx | 0       | 0                | 0                  | ... | 0                   |
+-----------+------+---------+------------------+--------------------+-----+---------------------+
| APPL      | xxxx | 0       | 0                | 0                  | ... | 0                   |
+-----------+------+---------+------------------+--------------------+-----+---------------------+
| APPL      | xxxx | 0       | 0                | 0                  | ... | 0                   |
+-----------+------+---------+------------------+--------------------+-----+---------------------+

我发现了一些关于如何只分解一个文档/词典而不嵌套的信息,如果有人能帮忙,我将不胜感激。

更新#1

在@vilalabinot回答后,这是返回的数据帧:

|index_0|    risk_buy_index_0|  reward_buy_index_0|
+-------+--------------------+--------------------+
|  count|               201.0|               201.0|
|   mean|-0.00842858807942...|0.034462956359400186|
|    std|0.010321886923670486|0.024028309176849814|
|    min|-0.04742597827704211|                -0.0|
|    25%|-0.01445728455890...|0.018636627472515977|
|    50%|-0.00424808836023...|0.029910269192422685|
|    75%|                 0.0| 0.04336544006150825|
|    max|                -0.0|   0.141510207428056|
|  count|               219.0|               219.0|
|   mean|-0.00825181843661...| 0.03181657870541232|
|    std|0.009920846095541787| 0.02024399549501371|
|    min|-0.04707151894023976|0.002521912798573491|
|    25%|-0.01330755207577...|0.015617730900798106|
|    50%|-0.00475774347023...|0.026639344262294966|
|    75%|                -0.0| 0.04315554182360575|
|    max|                -0.0| 0.11667197234981128|
|  count|               119.0|               119.0|
|   mean|-0.01337031203096...|0.049558424443669605|
|    std| 0.01942541767615014|0.036681645417330024|
|    min| -0.0951268206361449|0.004135946772163351|
+-------+--------------------+--------------------+

相反,我希望得到这样的东西,risk_table中记录的值将作为列而不是行附加。

+-----------+------+-------+-----+-----+-----+-----+------------+-------------+
| symbol_id | mean | count | min | max | 50% | 75% | reward_buy | reward_sell |
+-----------+------+-------+-----+-----+-----+-----+------------+-------------+
|           | ...  | ...   | ... | ... | ... | ... | ....       | ...         |
+-----------+------+-------+-----+-----+-----+-----+------------+-------------+
| ...       | ...  | ...   | ... | ... | ... | ... | ...        | ...         |
+-----------+------+-------+-----+-----+-----+-----+------------+-------------+
| ...       | ...  | ...   | ... | ... | ... | ... | ...        | ...         |
+-----------+------+-------+-----+-----+-----+-----+------------+-------------+

假设您的数据集是main。首先,我们必须分解risk_table的内容,因为如果我们不这样做,我们将把数组作为列的值,这是我们不喜欢的,所以:

df1 = df1.withColumn("explode", explode(col("risk_table")))

现在,explode列每行有一个对象;有很多方法可以从对象创建列,但我喜欢使用selectExpr:

.selectExpr("id", "symbol_id", // or whatever other field you like
"explode.index as index_0",  // then target the key with dot operator
"explode.risk_buy as risk_buy_index_0",
"explode.reward_buy as reward_buy_index_0"
// add your other wanted values
)

伪输入:

+--------------------------+---+---------+
|risk_table                |id |symbol_id|
+--------------------------+---+---------+
|[{1, 0.25, 0.3, 0.1, 0.3}]|1  |1        |
+--------------------------+---+---------+

最终输出:

+---+---------+-------+----------------+------------------+
| id|symbol_id|index_0|risk_buy_index_0|reward_buy_index_0|
+---+---------+-------+----------------+------------------+
|  1|        1|      1|            0.25|               0.3|
+---+---------+-------+----------------+------------------+

解决方案:

在从vilalabinot调用explore函数response后,我只需要按日期分组,然后将记录的字段转到risk_table中,类似于这样:

df1.groupBy(col("date")).pivot(col("index_0"))
.agg(first(col("risk_buy_index_0")), first(col("reward_buy_index_0")))

最新更新