我正在使用pyspark在palantir中编写代码,但我遇到了无法解决的错误。
错误为:
A TransformInput object does not have an attribute withColumn.
Please check the spelling and/or the datatype of the object.
我的代码供您参考
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import when
from transforms.api import configure, transform, Input, Output
@transform(
result = Output('Output_data_file_location'),
first_input=Input('Input_file1'),
second_input= Input('Input_file2'),
)
def function_temp(first_input, second_input, result):
from pyspark.sql.functions import monotonically_increasing_id
res = ncbs.withColumn("id", monotonically_increasing_id())
# Recode type
res = res.withColumn("old_col_type", F.when(
(F.col("col_type") == 'left') | (F.col("col_type") == 'right'), 'turn'
).when(
(F.col("col_type") == 'up') | (F.col("col_type") == 'down'), 'straight'
))
res = res.withColumnRenamed("old_col_type","t_old_col_type")
.withColumnRenamed("old_col2_type","t_old_col2_type")
res = res.filter((res.col_type== 'straight')
res = res.join(second_input, #eqNullSafe is like an equal sign but includes null in join
(res.col1.eqNullSafe(second_input.pre_col1)) &
(res.col2.eqNullSafe(second_input.pre_col2)),
how='left')
.drop(*["pre_col1", "pre_col2"]).withColumnRenamed("temp_result", "final_answer")
result.write_dataframe(res)
有人能帮我纠正这个错误吗。提前感谢
您收到的错误代码很好地解释了这一点,您在一个不是常规Spark Dataframe而是TransformInput
对象的对象上调用.withColumn()
。您需要调用.dataframe()
方法来访问Dataframe。
供参考的文件。
此外,您应该将monotonically_increasing_id
移到文件的顶部,因为根据文档,Foundrys的转换逻辑级别版本控制仅在导入发生在模块级别时有效。