Spark for Python - 无法将字符串列转换为十进制/双精度



在此操作发布的所有问题中,我找不到有效的东西。

我正在尝试几个版本,在所有版本中我都有这个DataFrame

dataFrame = spark.read.format("com.mongodb.spark.sql").load()

dataFrame.printSchema()的打印输出是

root
 |-- SensorId: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- _type: string (nullable = true)
 |-- device: string (nullable = true)
 |-- deviceType: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- gen_val: string (nullable = true)
 |-- lane_id: string (nullable = true)
 |-- system_id: string (nullable = true)
 |-- time: string (nullable = true)

创建数据帧后,我想将列'gen_val'(存储在变量 results.inputColumns 中)从String类型转换为Double类型。不同的版本导致不同的错误。

版本#1

法典:

dataFrame = dataFrame.withColumn(results.inputColumns, dataFrame[results.inputColumns].cast('double'))

改用cast(DoubleType())将生成相同的错误

错误:

AttributeError: 'DataFrame' object has no attribute 'cast'

版本#2

法典:

dataFrame = dataFrame.withColumn(results.inputColumns, dataFrame['gen_val'].cast('double'))

即使此选项并不真正相关,因为参数不能硬编码......

错误:

dataFrame = dataFrame.withColumn(results.inputColumns, dataFrame['gen_val'].cast('double'))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1502, in withColumn
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 323, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o31.withColumn. Trace:
py4j.Py4JException: Method withColumn([class java.util.ArrayList, class org.apache.spark.sql.Column]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
        at py4j.Gateway.invoke(Gateway.java:272)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)
目前还

不清楚您要做什么; withColumn的第一个参数应该是数据帧列名称,可以是现有的列名称(要修改),也可以是新列(要创建),而(至少在版本 1 中)您将其用作results.inputColums就好像它们已经是一列(事实并非如此)。

无论如何,将字符串转换为双精度类型都是非常困难的;这里有一个玩具示例:

spark.version
# u'2.2.0'
from pyspark.sql.types import DoubleType
df = spark.createDataFrame([("foo", '1'), ("bar", '2')], schema=['A', 'B'])
df
# DataFrame[A: string, B: string]
df.show()
# +---+---+ 
# |  A|  B|
# +---+---+
# |foo|  1| 
# |bar|  2|
# +---+---+
df2 = df.withColumn('B', df['B'].cast('double'))
df2.show()
# +---+---+ 
# |  A|  B|
# +---+---+
# |foo|1.0| 
# |bar|2.0|
# +---+---+
df2
# DataFrame[A: string, B: double]

在您的情况下,这应该可以完成这项工作:

from pyspark.sql.types import DoubleType
new_df = dataframe.withColumn('gen_val', dataframe['gen_val'].cast('double'))

我尝试了其他方法,它有效 - 我没有更改输入列数据,而是创建了一个转换/转换的列。我认为它的效率较低,但这就是我目前所拥有的。

dataFrame = spark.read.format("com.mongodb.spark.sql").load()
col = dataFrame.gen_val.cast('double')
dataFrame = dataFrame.withColumn('doubled', col.cast('double'))
assembler = VectorAssembler(inputCols=["doubled"], outputCol="features")
output = assembler.transform(dataFrame)

对于张彤:这是dataFrame.printSchema()的打印输出:

root
 |-- SensorId: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- _type: string (nullable = true)
 |-- device: string (nullable = true)
 |-- deviceType: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- gen_val: string (nullable = true)
 |-- lane_id: string (nullable = true)
 |-- system_id: string (nullable = true)
 |-- time: string (nullable = true)

无论如何,这是一个非常基本的转换,在(不久的)将来,我将需要做更复杂的转换。如果你们中的任何人知道使用 Spark 和 Python 进行数据帧转换的好例子、说明或文档,我将不胜感激。

相关内容

  • 没有找到相关文章

最新更新