PySpark 尝试将上一个字段的架构应用于下一个字段



PySpark有这个奇怪的问题。它似乎正在尝试将上一个字段的架构应用于下一个字段,因为它正在处理。

我能想到的最简单的测试用例:

%pyspark
from pyspark.sql.types import (
    DateType,
    StructType,
    StructField,
    StringType,
)
from datetime import date
from pyspark.sql import Row

schema = StructType(
    [
        StructField("date", DateType(), True),
        StructField("country", StringType(), True),
    ]
)
test = spark.createDataFrame(
    [
        Row(
            date=date(2019, 1, 1),
            country="RU",
        ),
    ],
    schema
)

堆栈跟踪:

Fail to execute line 26:     schema
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-8579306903394369208.py", line 380, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 26, in <module>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 691, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 423, in _createFromLocal
    data = [schema.toInternal(row) for row in data]
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in toInternal
    for f, v, c in zip(self.fields, obj, self._needConversion))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in <genexpr>
    for f, v, c in zip(self.fields, obj, self._needConversion))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 439, in toInternal
    return self.dataType.toInternal(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 175, in toInternal
    return d.toordinal() - self.EPOCH_ORDINAL
AttributeError: 'str' object has no attribute 'toordinal'

在本地而不是在 Zepplin中运行它的奖励信息:

self = DateType, d = 'RU'
    def toInternal(self, d):
        if d is not None:
>           return d.toordinal() - self.EPOCH_ORDINAL
E           AttributeError: 'str' object has no attribute 'toordinal'

例如,它试图将DateType应用于country。如果我摆脱date,那就好了。如果我摆脱country,那就好了。两者放在一起,是不行的。

有什么想法吗?我错过了一些明显的东西吗?

如果要使用Row的列表,则无需指定架构。这是因为Row已经知道架构。

出现此问题的原因是pyspark.sql.Row对象不维护您为字段指定的顺序。

print(Row(date=date(2019, 1, 1), country="RU"))
#Row(country='RU', date=datetime.date(2019, 1, 1))

从文档中:

可用于通过使用命名参数创建行对象,字段将按名称排序。

如您所见,country字段被放在第一位。当 Spark 尝试使用指定的schema创建数据帧时,它期望第一项是DateType

解决此问题的一种方法是按字母顺序将字段放在schema中:

schema = StructType(
    [
        StructField("country", StringType(), True),
        StructField("date", DateType(), True)
    ]
)
test = spark.createDataFrame(
    [
        Row(date=date(2019, 1, 1), country="RU")
    ],
    schema
)
test.show()
#+-------+----------+
#|country|      date|
#+-------+----------+
#|     RU|2019-01-01|
#+-------+----------+

或者在这种情况下,甚至没有必要将schema传递给createDataFrame。从Row s可以推断:

test = spark.createDataFrame(
    [
        Row(date=date(2019, 1, 1), country="RU")
    ]
)

如果要对列重新排序,请使用select

test = test.select("date", "country")
test.show()
#+----------+-------+
#|      date|country|
#+----------+-------+
#|2019-01-01|     RU|
#+----------+-------+

相关内容

  • 没有找到相关文章

最新更新