在此操作发布的所有问题中,我找不到有效的东西。
我正在尝试几个版本,在所有版本中我都有这个DataFrame
:
dataFrame = spark.read.format("com.mongodb.spark.sql").load()
dataFrame.printSchema()
的打印输出是
root
|-- SensorId: string (nullable = true)
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- _type: string (nullable = true)
|-- device: string (nullable = true)
|-- deviceType: string (nullable = true)
|-- event_id: string (nullable = true)
|-- gen_val: string (nullable = true)
|-- lane_id: string (nullable = true)
|-- system_id: string (nullable = true)
|-- time: string (nullable = true)
创建数据帧后,我想将列'gen_val'
(存储在变量 results.inputColumns
中)从String
类型转换为Double
类型。不同的版本导致不同的错误。
版本#1
法典:
dataFrame = dataFrame.withColumn(results.inputColumns, dataFrame[results.inputColumns].cast('double'))
改用cast(DoubleType())
将生成相同的错误
错误:
AttributeError: 'DataFrame' object has no attribute 'cast'
版本#2
法典:
dataFrame = dataFrame.withColumn(results.inputColumns, dataFrame['gen_val'].cast('double'))
即使此选项并不真正相关,因为参数不能硬编码......
错误:
dataFrame = dataFrame.withColumn(results.inputColumns, dataFrame['gen_val'].cast('double'))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1502, in withColumn
File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 323, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o31.withColumn. Trace:
py4j.Py4JException: Method withColumn([class java.util.ArrayList, class org.apache.spark.sql.Column]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
不清楚您要做什么; withColumn
的第一个参数应该是数据帧列名称,可以是现有的列名称(要修改),也可以是新列(要创建),而(至少在版本 1 中)您将其用作results.inputColums
就好像它们已经是一列(事实并非如此)。
无论如何,将字符串转换为双精度类型都是非常困难的;这里有一个玩具示例:
spark.version
# u'2.2.0'
from pyspark.sql.types import DoubleType
df = spark.createDataFrame([("foo", '1'), ("bar", '2')], schema=['A', 'B'])
df
# DataFrame[A: string, B: string]
df.show()
# +---+---+
# | A| B|
# +---+---+
# |foo| 1|
# |bar| 2|
# +---+---+
df2 = df.withColumn('B', df['B'].cast('double'))
df2.show()
# +---+---+
# | A| B|
# +---+---+
# |foo|1.0|
# |bar|2.0|
# +---+---+
df2
# DataFrame[A: string, B: double]
在您的情况下,这应该可以完成这项工作:
from pyspark.sql.types import DoubleType
new_df = dataframe.withColumn('gen_val', dataframe['gen_val'].cast('double'))
我尝试了其他方法,它有效 - 我没有更改输入列数据,而是创建了一个转换/转换的列。我认为它的效率较低,但这就是我目前所拥有的。
dataFrame = spark.read.format("com.mongodb.spark.sql").load()
col = dataFrame.gen_val.cast('double')
dataFrame = dataFrame.withColumn('doubled', col.cast('double'))
assembler = VectorAssembler(inputCols=["doubled"], outputCol="features")
output = assembler.transform(dataFrame)
对于张彤:这是dataFrame.printSchema()
的打印输出:
root
|-- SensorId: string (nullable = true)
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- _type: string (nullable = true)
|-- device: string (nullable = true)
|-- deviceType: string (nullable = true)
|-- event_id: string (nullable = true)
|-- gen_val: string (nullable = true)
|-- lane_id: string (nullable = true)
|-- system_id: string (nullable = true)
|-- time: string (nullable = true)
无论如何,这是一个非常基本的转换,在(不久的)将来,我将需要做更复杂的转换。如果你们中的任何人知道使用 Spark 和 Python 进行数据帧转换的好例子、说明或文档,我将不胜感激。