如何去除"Missing transform attribute error"?



我正在使用pyspark在palantir中编写代码,但我遇到了无法解决的错误。

错误为:

A TransformInput object does not have an attribute withColumn. 
Please check the spelling and/or the datatype of the object.

我的代码供您参考

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import when
from transforms.api import configure, transform, Input, Output
@transform(
result = Output('Output_data_file_location'),
first_input=Input('Input_file1'),
second_input= Input('Input_file2'),
)
def function_temp(first_input, second_input, result):
from pyspark.sql.functions import monotonically_increasing_id
res = ncbs.withColumn("id", monotonically_increasing_id())
# Recode type
res = res.withColumn("old_col_type", F.when(
(F.col("col_type") == 'left') | (F.col("col_type") == 'right'), 'turn'
).when(
(F.col("col_type") == 'up') | (F.col("col_type") == 'down'), 'straight'
))

res = res.withColumnRenamed("old_col_type","t_old_col_type") 
.withColumnRenamed("old_col2_type","t_old_col2_type")

res = res.filter((res.col_type== 'straight') 

res = res.join(second_input,  #eqNullSafe is like an equal sign but includes null in join
(res.col1.eqNullSafe(second_input.pre_col1)) & 
(res.col2.eqNullSafe(second_input.pre_col2)), 
how='left')
.drop(*["pre_col1", "pre_col2"]).withColumnRenamed("temp_result", "final_answer")

result.write_dataframe(res)

有人能帮我纠正这个错误吗。提前感谢

您收到的错误代码很好地解释了这一点,您在一个不是常规Spark Dataframe而是TransformInput对象的对象上调用.withColumn()。您需要调用.dataframe()方法来访问Dataframe。

供参考的文件。

此外,您应该将monotonically_increasing_id移到文件的顶部,因为根据文档,Foundrys的转换逻辑级别版本控制仅在导入发生在模块级别时有效。

最新更新