将 mapType 列与原始数据框连接/展开



我在(py(Spark中有一个数据帧,其中1列来自"map"类型。我想将那列展平或拆分为应添加到原始数据帧的多列。我能够使用 flatMap 展开列,但是我丢失了将新数据帧(来自展开的列(与原始数据帧连接起来的键。

我的架构是这样的:

    rroot
 |-- key: string (nullable = true)
 |-- metric: map (nullable = false)
 |    |-- key: string
 |    |-- value: float (valueContainsNull = true)

如您所见,"度量"列是一个地图字段。这是我想要展平的列。在展平之前,它看起来像:

+----+---------------------------------------------------+
|key |metric                                             |
+----+---------------------------------------------------+
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)|
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)|
|as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0)|
+----+---------------------------------------------------+

将该字段转换为我所做的列

df2.select('metric').rdd.flatMap(lambda x: x).toDF().show()

这给了

   +------------------+-----------------+-----------------+
|           metric1|          metric2|          metric3|
+------------------+-----------------+-----------------+
|1.2999999523162842|6.300000190734863|7.599999904632568|
|               1.5|              2.0|2.200000047683716|
| 2.200000047683716|4.300000190734863|              9.0|
+------------------+-----------------+-----------------+

但是我没有看到键,因此我不知道如何将此数据添加到原始数据帧。

我想要的是:

+----+-------+-------+-------+
| key|metric1|metric2|metric3|
+----+-------+-------+-------+
|123k|    1.3|    6.3|    7.6|
|d23d|    1.5|    2.0|    2.2|
|as3d|    2.2|    4.3|    9.0|
+----+-------+-------+-------+

因此,我的问题是:我怎样才能让 df2 回到 df(鉴于我最初不知道 df 并且只有 df2(

要制作 df2:

rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6),
                      ('d23d', 1.5, 2.0, 2.2), 
                      ('as3d', 2.2, 4.3, 9.0)
                          ])
schema = StructType([StructField('key', StringType(), True),
                     StructField('metric1', FloatType(), True),
                     StructField('metric2', FloatType(), True),
                     StructField('metric3', FloatType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

from pyspark.sql.functions import lit, col, create_map
from itertools import chain
metric = create_map(list(chain(*(
    (lit(name), col(name)) for name in df.columns if "metric" in name
)))).alias("metric")

df2 = df.select("key", metric)
from pyspark.sql.functions import explode
# fetch column names of the original dataframe from keys of MapType 'metric' column
col_names = df2.select(explode("metric")).select("key").distinct().sort("key").rdd.flatMap(lambda x: x).collect()
exprs = [col("key")] + [col("metric").getItem(k).alias(k) for k in col_names]    
df2_to_original_df = df2.select(*exprs)
df2_to_original_df.show()

输出为:

+----+-------+-------+-------+
| key|metric1|metric2|metric3|
+----+-------+-------+-------+
|123k|    1.3|    6.3|    7.6|
|d23d|    1.5|    2.0|    2.2|
|as3d|    2.2|    4.3|    9.0|
+----+-------+-------+-------+

我可以通过执行以下操作从映射类型中选择某个键:

df.select('maptypecolumn'.'key')

在我的示例中,我按如下方式操作:

columns= df2.select('metric').rdd.flatMap(lambda x: x).toDF().columns
for i in columns:
  df2= df2.withColumn(i,lit(df2.metric[i]))
您可以

像这样访问keyvalue

from pyspark.sql.functions import explode
df.select(explode("custom_dimensions")).select("key")

相关内容

  • 没有找到相关文章

最新更新