Pyspark变量的数据类型为decimal(6,-12).df.dtypes和df.columns给出错误ValueE



我有一个spark数据帧,我收到错误ValueError:每当我执行df.dtypes或df.columns时,由于一个特定变量的数据类型为decimal(6,-12(,无法解析数据类型:decimal。


df = spark.read.csv("data.csv",inferSchema=True,header=True)  
df.columns

运行df.columns或df.dtypes会产生以下错误


---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-26-0581cf80a9b2> in <module>
----> 1 df.columns

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4623.11628701/lib/spark/python/pyspark/sql/dataframe.py in columns(self)
934         ['age', 'name']
935         """
--> 936         return [f.name for f in self.schema.fields]
937 
938     @since(2.3)

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4623.11628701/lib/spark/python/pyspark/sql/dataframe.py in schema(self)
251         if self._schema is None:
252             try:
--> 253                 self._schema = _parse_datatype_json_string(self._jdf.schema().json())
254             except AttributeError as e:
255                 raise Exception(

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4623.11628701/lib/spark/python/pyspark/sql/types.py in _parse_datatype_json_string(json_string)
867     >>> check_datatype(complex_maptype)
868     """
--> 869     return _parse_datatype_json_value(json.loads(json_string))
870 
871 

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4623.11628701/lib/spark/python/pyspark/sql/types.py in _parse_datatype_json_value(json_value)
884         tpe = json_value["type"]
885         if tpe in _all_complex_types:
--> 886             return _all_complex_types[tpe].fromJson(json_value)
887         elif tpe == 'udt':
888             return UserDefinedType.fromJson(json_value)

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4623.11628701/lib/spark/python/pyspark/sql/types.py in fromJson(cls, json)
575     @classmethod
576     def fromJson(cls, json):
--> 577         return StructType([StructField.fromJson(f) for f in json["fields"]])
578 
579     def fieldNames(self):

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4623.11628701/lib/spark/python/pyspark/sql/types.py in <listcomp>(.0)
575     @classmethod
576     def fromJson(cls, json):
--> 577         return StructType([StructField.fromJson(f) for f in json["fields"]])
578 
579     def fieldNames(self):

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4623.11628701/lib/spark/python/pyspark/sql/types.py in fromJson(cls, json)
432     def fromJson(cls, json):
433         return StructField(json["name"],
--> 434                            _parse_datatype_json_value(json["type"]),
435                            json["nullable"],
436                            json["metadata"])

/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p4623.11628701/lib/spark/python/pyspark/sql/types.py in _parse_datatype_json_value(json_value)
880             return DecimalType(int(m.group(1)), int(m.group(2)))
881         else:
--> 882             raise ValueError("Could not parse datatype: %s" % json_value)
883     else:
884         tpe = json_value["type"]

ValueError: Could not parse datatype: decimal(6,-12)

如果我将列类型更改为double或string,我就可以继续操作了。但我正在开发一个自动化工具,需要一个可以在所有数据集上工作的解决方案。

我尝试了df.columns中给出的解决方案,给出了下面给出的pyspark中的ValueError。


from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("basics").getOrCreate()
df = spark.read.csv("data.csv",inferSchema=True,header=True)  
for column_type in df.dtypes:
if 'string' in column_type[1]:
df = df.withColumn(column_type[0], df[column_type[0]].cast(StringType()))
elif 'double' in column_type[1]:
df = df.withColumn(column_type[0],df[column_type[0]].cast(DoubleType()))
elif 'int' in column_type[1]:
df = df.withColumn(column_type[0],df[column_type[0]].cast(IntegerType()))
elif 'bool' in column_type[1]:
df = df.withColumn(column_type[0], df[column_type[0]].cast(BooleanType()))
elif 'decimal' in column_type[1]:
df = df.withColumn(column_type[0],df[column_type[0]].cast(DoubleType()))
# add as many conditions as you need for types

df.schema

但不幸的是,这段代码中提到的df.dtypes给出了相同的错误。

我唯一能够检查数据类型的代码是df.printSchema((。有没有一种方法可以读取df.printSchema((的输出,并将数据类型为decimal的变量的数据类型更改为double类型?


df.select('variable_name').printSchema()

root
|-- variable_name: decimal(6,-12) (nullable = true)

PySpark版本中存在一个错误<2.4.8用于解析具有-ve小数位数的十进制类型。查看此jira页面。

我认为您需要禁用inferSchema并创建自定义模式,并在读取CSV时应用它。

最新更新