df.columns给出了ValueError:在pyspark中



我是 pyspark 的新手。我在执行命令时出错

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("basics").getOrCreate()
df = spark.read.csv("data.csv",inferSchema=True,header=True)  
df.columns

我的数据有 1,000,000 行和 50 列。我收到以下错误。

ValueError                                Traceback (most recent call last)
<ipython-input-71-b666bf274d0a> in <module>
----> 1 df.columns
~/anaconda3/lib/python3.7/site-packages/pyspark/sql/dataframe.py in columns(self)
935         ['age', 'name']
936         """
--> 937         return [f.name for f in self.schema.fields]
938 
939     @since(2.3)
~/anaconda3/lib/python3.7/site-packages/pyspark/sql/dataframe.py in schema(self)
253         if self._schema is None:
254             try:
--> 255                 self._schema = _parse_datatype_json_string(self._jdf.schema().json())
256             except AttributeError as e:
257                 raise Exception(
~/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py in _parse_datatype_json_string(json_string)
867     >>> check_datatype(complex_maptype)
868     """
--> 869     return _parse_datatype_json_value(json.loads(json_string))
870 
871 
~/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py in _parse_datatype_json_value(json_value)
884         tpe = json_value["type"]
885         if tpe in _all_complex_types:
--> 886             return _all_complex_types[tpe].fromJson(json_value)
887         elif tpe == 'udt':
888             return UserDefinedType.fromJson(json_value)
~/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py in fromJson(cls, json)
575     @classmethod
576     def fromJson(cls, json):
--> 577         return StructType([StructField.fromJson(f) for f in json["fields"]])
578 
579     def fieldNames(self):
~/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py in <listcomp>(.0)
575     @classmethod
576     def fromJson(cls, json):
--> 577         return StructType([StructField.fromJson(f) for f in json["fields"]])
578 
579     def fieldNames(self):
~/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py in fromJson(cls, json)
432     def fromJson(cls, json):
433         return StructField(json["name"],
--> 434                            _parse_datatype_json_value(json["type"]),
435                            json["nullable"],
436                            json["metadata"])
~/anaconda3/lib/python3.7/site-packages/pyspark/sql/types.py in _parse_datatype_json_value(json_value)
880             return DecimalType(int(m.group(1)), int(m.group(2)))
881         else:
--> 882             raise ValueError("Could not parse datatype: %s" % json_value)
883     else:
884         tpe = json_value["type"]
ValueError: Could not parse datatype: decimal(6,-8)

任何人都可以帮助我了解为什么我会收到此错误以及如何克服此错误?如果我由于错误的架构而收到错误,如何定义 50 列的架构?啪!

根据你的评论,使用inferSchema=True,这个未经测试的代码应该可以帮助你:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName("basics").getOrCreate()
df = spark.read.csv("data.csv",inferSchema=True,header=True)  
for column_type in df.dtypes:
if 'string' in column_type[1]:
df = df.withColumn(column_type[0], df[column_type[0]].cast(StringType()))
elif 'double' in column_type[1]:
df = df.withColumn(column_type[0],df[column_type[0]].cast(DoubleType()))
elif 'int' in column_type[1]:
df = df.withColumn(column_type[0],df[column_type[0]].cast(IntegerType()))
elif 'bool' in column_type[1]:
df = df.withColumn(column_type[0], df[column_type[0]].cast(BooleanType()))
elif 'decimal' in column_type[1]:
df = df.withColumn(column_type[0],df[column_type[0]].cast(DoubleType()))
# add as many condiitions as you need for types
df.schema

让我知道它是否为您做了,如果没有,我会测试和更新它

最新更新