计算每个数据框的每一列的模式,将其存储在列表中,然后使用IT进行数据框架



我在数据框架中找到了所有列的'模式'并将其存储在列表中。计算每列模式的代码:

from pyspark.sql.functions import *
 #calculating mode value
mode_val = []
for i in df_num.columns :
       cnts = df_num.groupBy(i).count()
       mode = cnts.join(
       cnts.agg(max("count").alias("max_")), col("count") == col("max_")
        ).limit(1)
        mode2 = mode.withColumn(i,col(i).cast("double"))
        mode_val.append(mode2.first()[0])

输出

[6500.0, 0.0, 没有任何, 1300.0, 3.0, 3.0, 0.0, 没有任何, 38000.0]

当我尝试将列表转换为数据框时,我似乎会遇到错误。

这是我用于将我的模式列表转换为dataFrame的代码:

univar_df4 = spark.createDataFrame(mode_val,["Mode"])

错误

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-171-d5ca3ecf8d79> in <module>()
      1 #not able to apply to dataframe.
----> 2 univar_df4 = spark.createDataFrame(mode_val,["Mode"])
/usr/lib/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    535             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    536         else:
--> 537             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    538         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    539         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/usr/lib/spark/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    399 
    400         if schema is None or isinstance(schema, (list, tuple)):
--> 401             struct = self._inferSchemaFromList(data)
    402             converter = _create_converter(struct)
    403             data = map(converter, data)
/usr/lib/spark/python/pyspark/sql/session.py in _inferSchemaFromList(self, data)
    331             warnings.warn("inferring schema from dict is deprecated,"
    332                           "please use pyspark.sql.Row instead")
--> 333         schema = reduce(_merge_type, map(_infer_schema, data))
    334         if _has_nulltype(schema):
    335             raise ValueError("Some of types cannot be determined after inferring")
/usr/lib/spark/python/pyspark/sql/types.py in _infer_schema(row)
    990 
    991     else:
--> 992         raise TypeError("Can not infer schema for type: %s" % type(row))
    993 
    994     fields = [StructField(k, _infer_type(v), True) for k, v in items]
TypeError: Can not infer schema for type: <class 'float'>

要解释该错误,我将从另一个问题中引用自己:

我发现将createDataFrame()的论点视为一个很有用 列表中每个条目对应于一行的元组列表 元组的数据框和每个元素对应于列。

您可以通过使列表中的每个元素成为元组来获得所需的输出:

mode_val = [6500.0, 0.0, None, 1300.0, 3.0, 3.0, 0.0, None, 38000.0]
mode_val = [(x,) for x in mode_val]
print(mode_val)
#[(6500.0,), (0.0,), (None,), (1300.0,), (3.0,), (3.0,), (0.0,), (None,), (38000.0,)]

现在创建数据框:

univar_df4 = spark.createDataFrame(mode_val,["Mode"])
univar_df4.show()
#+-------+
#|   Mode|
#+-------+
#| 6500.0|
#|    0.0|
#|   null|
#| 1300.0|
#|    3.0|
#|    3.0|
#|    0.0|
#|   null|
#|38000.0|
#+-------+

但是,似乎您的目标是将每列的模式纳入一个新的数据帧中。这是不依赖于列表中的值存储的另一种方法:

创建一个示例数据框:

import pyspark.sql.functions as f
data = [
    (1, 2, 3),
    (1, 3, 3),
    (2, 3, 2)
]
df_num = sqlCtx.createDataFrame(data, ["a", "b", "c"])
df_num.show()
#+---+---+---+
#|  a|  b|  c|
#+---+---+---+
#|  1|  2|  3|
#|  1|  3|  3|
#|  2|  3|  2|
#+---+---+---+

使用 union使用列表理解 reduce,以获取每列的模式:

mode = reduce(
    lambda a, b: a.union(b),
    [
        df_num.groupBy(i)
            .count()
             .sort(f.col("count").desc())
             .limit(1)
             .select(
                f.lit(i).alias("col"),
                f.col(i).alias("mode")
            ) 
        for i in df_num.columns
    ]
)
mode.show()
#+---+----+
#|col|mode|
#+---+----+
#|  a|   1|
#|  b|   3|
#|  c|   3|
#+---+----+

在列表理解中,我们正在迭代数据框中的所有列,并执行groupBy()count()(就像您这样做一样(。但是,我没有寻找计数等于最大的行,而是对列降和使用 limit(1)排序以获取最大值。

在此步骤之后,数据框将有两个列和一行。我们操纵值将其转换为两个列的数据框架:(column_name, mode)

最后,我们通过调用Union将所有行连接在一起来减少列表。

最新更新