我有一个熊猫数据框架my_df
, my_df.dtypes
给我们:
ts int64
fieldA object
fieldB object
fieldC object
fieldD object
fieldE object
dtype: object
然后我试图转换熊猫数据帧my_df
到一个火花数据帧做以下:
spark_my_df = sc.createDataFrame(my_df)
但是,我得到了以下错误:
ValueErrorTraceback (most recent call last)
<ipython-input-29-d4c9bb41bb1e> in <module>()
----> 1 spark_my_df = sc.createDataFrame(my_df)
2 spark_my_df.take(20)
/usr/local/spark-latest/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio)
520 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
521 else:
--> 522 rdd, schema = self._createFromLocal(map(prepare, data), schema)
523 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
524 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/usr/local/spark-latest/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
384
385 if schema is None or isinstance(schema, (list, tuple)):
--> 386 struct = self._inferSchemaFromList(data)
387 if isinstance(schema, (list, tuple)):
388 for i, name in enumerate(schema):
/usr/local/spark-latest/python/pyspark/sql/session.py in _inferSchemaFromList(self, data)
318 schema = reduce(_merge_type, map(_infer_schema, data))
319 if _has_nulltype(schema):
--> 320 raise ValueError("Some of types cannot be determined after inferring")
321 return schema
322
ValueError: Some of types cannot be determined after inferring
有人知道上面的错误是什么意思吗?谢谢!
为了推断字段类型,PySpark查看每个字段中的非none记录。如果一个字段只有None记录,PySpark无法推断类型,并将引发该错误。
手动定义模式将解决这个问题
>>> from pyspark.sql.types import StructType, StructField, StringType
>>> schema = StructType([StructField("foo", StringType(), True)])
>>> df = spark.createDataFrame([[None]], schema=schema)
>>> df.show()
+----+
|foo |
+----+
|null|
+----+
为了解决这个问题,您可以提供您自己定义的模式。
例如:重现错误:
>>> df = spark.createDataFrame([[None, None]], ["name", "score"])
修复错误:
>>> from pyspark.sql.types import StructType, StructField, StringType, DoubleType
>>> schema = StructType([StructField("name", StringType(), True), StructField("score", DoubleType(), True)])
>>> df = spark.createDataFrame([[None, None]], schema=schema)
>>> df.show()
+----+-----+
|name|score|
+----+-----+
|null| null|
+----+-----+
如果您正在使用RDD[Row].toDF()
猴子补丁方法,您可以在推断类型时增加样本比率以检查100条以上的记录:
# Set sampleRatio smaller as the data size increases
my_df = my_rdd.toDF(sampleRatio=0.01)
my_df.show()
假设在您的RDD中的所有字段中都有非空行,当您将sampleRatio
增加到1.0时,将更有可能找到它们。
我遇到过同样的问题,如果您不需要为空的列,您可以在导入到spark之前将它们从pandas数据框中删除:
my_df = my_df.dropna(axis='columns', how='all') # Drops columns with all NA values
spark_my_df = sc.createDataFrame(my_df)
这可能是因为列的值都是空的。您应该在将这些列转换为spark数据框之前删除它们
这个错误的原因是Spark无法确定您的pandas数据框的数据类型,因此,解决这个问题的一种方法是将schema
单独传递给sparks的createDataFrame
函数。
例如你的pandas数据框架是这样的
d = {
'col1': [1, 2],
'col2': ['A', 'B]
}
df = pd.DataFrame(data = d)
print(df)
col1 col2
0 1 A
1 2 B
当您想将其转换为Spark数据框架时,首先定义模式并将其添加到createDataFrame
中,如下所示
from pyspark.sql.types import StructType, StructField, LongType, StringType
schema = StructType([
StructField("col1", LongType()),
StructField("col2", StringType()),
])
spark_df = spark.createDataFrame(df, schema = schema)