我有一个名为'new_emp_final_1'的数据帧。当我尝试从 cookTime 和 prepTime 中派生出一列"难度"时,通过从 udf 调用函数难度,它给了我错误。
new_emp_final_1.dtypes如下 -
[('name', 'string'), ('ingredients', 'string'), ('url', 'string'), ('image', 'string'), ('cookTime', 'string'), ('recipeYield', 'string'), ('datePublished', 'strin
g'), ('prepTime', 'string'), ('description', 'string')]
new_emp_final_1.schema 的结果是 -
StructType(List(StructField(name,StringType,true),StructField(ingredients,StringType,true),StructField(url,StringType,true),StructField(image,StringType,true),StructField(cookTime,StringType,true),StructField(recipeYield,StringType,true),StructField(datePublished,StringType,true),StructField(prepTime,StringType,true),StructField(description,StringType,true)))
法典:
def difficulty(cookTime, prepTime):
if not cookTime or not prepTime:
return "Unkown"
total_duration = cookTime + prepTime
if total_duration > 3600:
return "Hard"
elif total_duration > 1800 and total_duration < 3600:
return "Medium"
elif total_duration < 1800:
return "Easy"
else:
return "Unkown"
func_udf = udf(difficulty, IntegerType())
new_emp_final_1 = new_emp_final_1.withColumn("difficulty", func_udf(new_emp_final_1.cookTime, new_emp_final_1.prepTime))
new_emp_final_1.show(20,False)
错误是 -
File "/home/raghavcomp32915/mypycode.py", line 56, in <module>
func_udf = udf(difficulty, IntegerType())
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/udf.py", line 186, in wrapper
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/udf.py", line 166, in __call__
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/column.py", line 66, in _to_seq
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/column.py", line 54, in _to_java_column
TypeError: Invalid argument, not a string or column: <function difficulty at 0x7f707e9750c8> of type <type 'function'>. For column literals, use 'lit', 'array', 's
truct' or 'create_map' function.
我期待现有数据帧new_emp_final_1中名为难度的列,其值为硬、中、简单或未知。
我在使用 Python sum
时遇到了这个问题,因为与 Spark 的 SQL sum
存在冲突——这是为什么这样的真实例证:
from pyspark.sql.functions import *
不好。
不言而喻,解决方案是将导入限制为所需的函数,或者导入pyspark.sql.functions
并用它作为所需函数的前缀。
研究udf(难度),我看到了两件事:
- 您正在尝试对 udf 中的 2 个字符串求和(烹饪时间和准备时间)
- udf 应该返回 StringType()
这个例子对我有用:
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
import pandas as pd
schema = StructType([StructField("name", StringType(), True),
StructField('ingredients',StringType(),True),
StructField('url',StringType(),True),
StructField('image',StringType(),True),
StructField('cookTime',StringType(),True),
StructField('recipeYield',StringType(),True),
StructField('datePublished',StringType(),True),
StructField('prepTime',StringType(),True),
StructField('description',StringType(),True)])
data = {
"name": ['meal1', 'meal2'],
"ingredients": ['ingredient11, ingredient12','ingredient21, ingredient22'],
"url": ['URL1', 'URL2'],
"image": ['Image1', 'Image2'],
"cookTime": ['60', '3601'],
"recipeYield": ['recipeYield1', 'recipeYield2'],
"prepTime": ['0','3000'],
"description": ['desc1','desc2']
}
new_emp_final_1_pd = pd.DataFrame(data=data)
new_emp_final_1 = spark.createDataFrame(new_emp_final_1_pd)
def difficulty(cookTime, prepTime):
if not cookTime or not prepTime:
return "Unkown"
total_duration = int(cookTime) + int(prepTime)
if total_duration > 3600:
return "Hard"
elif total_duration > 1800 and total_duration < 3600:
return "Medium"
elif total_duration < 1800:
return "Easy"
else:
return "Unkown"
func_udf = udf(difficulty, StringType())
new_emp_final_1 = new_emp_final_1.withColumn("difficulty",
func_udf(new_emp_final_1.cookTime, new_emp_final_1.prepTime))
new_emp_final_1.show(20,False)
您是否尝试过像这样发送 cookTime 和 prepTime 的文字值:
new_emp_final_1 = new_emp_final_1.withColumn("difficulty", func_udf(new_emp_final_1.lit(cookTime), new_emp_final_1.lit(prepTime)))