我是Spark和Scala的新手,想获得一些关于这种情况的帮助:这是我当前的模式。
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- date: timestamp (nullable = true)
|-- horizon: double (nullable = true)
|-- risk_table: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- index: string (nullable = true)
| | |-- risk_buy: double (nullable = true)
| | |-- reward_buy: double (nullable = true)
| | |-- risk_sell: double (nullable = true)
| | |-- reward_sell: double (nullable = true)
|-- symbol_id: string (nullable = true)
以下是数据外观的示例:
+--------------------+
| risk_table|
+--------------------+
|[{count, 201.0, 2...|
|[{count, 219.0, 2...|
|[{count, 119.0, 1...|
|[{count, 217.0, 2...|
|[{count, 17.0, 17...|
|[{count, 189.0, 1...|
|[{count, 105.0, 1...|
|[{count, 188.0, 1...|
|[{count, 111.0, 1...|
|[{count, 276.0, 2...|
|[{count, 70.0, 70...|
|[{count, 121.0, 1...|
|[{count, 133.0, 1...|
|[{count, 116.0, 1...|
|[{count, 70.0, 70...|
|[{count, 193.0, 1...|
|[{count, 131.0, 1...|
|[{count, 93.0, 93...|
|[{count, 84.0, 84...|
|[{count, 114.0, 1...|
+--------------------+
我想将risk_table列值分解为多列,通常有4个嵌套的文档/字典,其中索引名称发生了变化,因此预期的输出看起来像这个
+-----------+------+---------+------------------+--------------------+-----+---------------------+
| symbol_id | date | index_0 | risk_buy_index_0 | reward_buy_index_0 | ... | reward_sell_index_3 |
+-----------+------+---------+------------------+--------------------+-----+---------------------+
| APPL | xxxx | 0 | 0 | 0 | ... | 0 |
+-----------+------+---------+------------------+--------------------+-----+---------------------+
| APPL | xxxx | 0 | 0 | 0 | ... | 0 |
+-----------+------+---------+------------------+--------------------+-----+---------------------+
| APPL | xxxx | 0 | 0 | 0 | ... | 0 |
+-----------+------+---------+------------------+--------------------+-----+---------------------+
我发现了一些关于如何只分解一个文档/词典而不嵌套的信息,如果有人能帮忙,我将不胜感激。
更新#1:
在@vilalabinot回答后,这是返回的数据帧:
|index_0| risk_buy_index_0| reward_buy_index_0|
+-------+--------------------+--------------------+
| count| 201.0| 201.0|
| mean|-0.00842858807942...|0.034462956359400186|
| std|0.010321886923670486|0.024028309176849814|
| min|-0.04742597827704211| -0.0|
| 25%|-0.01445728455890...|0.018636627472515977|
| 50%|-0.00424808836023...|0.029910269192422685|
| 75%| 0.0| 0.04336544006150825|
| max| -0.0| 0.141510207428056|
| count| 219.0| 219.0|
| mean|-0.00825181843661...| 0.03181657870541232|
| std|0.009920846095541787| 0.02024399549501371|
| min|-0.04707151894023976|0.002521912798573491|
| 25%|-0.01330755207577...|0.015617730900798106|
| 50%|-0.00475774347023...|0.026639344262294966|
| 75%| -0.0| 0.04315554182360575|
| max| -0.0| 0.11667197234981128|
| count| 119.0| 119.0|
| mean|-0.01337031203096...|0.049558424443669605|
| std| 0.01942541767615014|0.036681645417330024|
| min| -0.0951268206361449|0.004135946772163351|
+-------+--------------------+--------------------+
相反,我希望得到这样的东西,risk_table中记录的值将作为列而不是行附加。
+-----------+------+-------+-----+-----+-----+-----+------------+-------------+
| symbol_id | mean | count | min | max | 50% | 75% | reward_buy | reward_sell |
+-----------+------+-------+-----+-----+-----+-----+------------+-------------+
| | ... | ... | ... | ... | ... | ... | .... | ... |
+-----------+------+-------+-----+-----+-----+-----+------------+-------------+
| ... | ... | ... | ... | ... | ... | ... | ... | ... |
+-----------+------+-------+-----+-----+-----+-----+------------+-------------+
| ... | ... | ... | ... | ... | ... | ... | ... | ... |
+-----------+------+-------+-----+-----+-----+-----+------------+-------------+
假设您的数据集是main
。首先,我们必须分解risk_table
的内容,因为如果我们不这样做,我们将把数组作为列的值,这是我们不喜欢的,所以:
df1 = df1.withColumn("explode", explode(col("risk_table")))
现在,explode
列每行有一个对象;有很多方法可以从对象创建列,但我喜欢使用selectExpr:
.selectExpr("id", "symbol_id", // or whatever other field you like
"explode.index as index_0", // then target the key with dot operator
"explode.risk_buy as risk_buy_index_0",
"explode.reward_buy as reward_buy_index_0"
// add your other wanted values
)
伪输入:
+--------------------------+---+---------+
|risk_table |id |symbol_id|
+--------------------------+---+---------+
|[{1, 0.25, 0.3, 0.1, 0.3}]|1 |1 |
+--------------------------+---+---------+
最终输出:
+---+---------+-------+----------------+------------------+
| id|symbol_id|index_0|risk_buy_index_0|reward_buy_index_0|
+---+---------+-------+----------------+------------------+
| 1| 1| 1| 0.25| 0.3|
+---+---------+-------+----------------+------------------+
解决方案:
在从vilalabinot调用explore函数response后,我只需要按日期分组,然后将记录的字段转到risk_table中,类似于这样:
df1.groupBy(col("date")).pivot(col("index_0"))
.agg(first(col("risk_buy_index_0")), first(col("reward_buy_index_0")))