当我运行下面的代码时,我得到错误java.lang.AssertionError:断言失败:发现重复的重写属性。在更新我们的databricks运行时之前,它运行得很顺利。
-
top10_df是列表
groups
中具有唯一键的数据的数据帧。 -
res_df是top10_df中具有最小和最大日期的唯一密钥的集合。
-
一旦创建并持久化了resdf,它就会重新加入到组中唯一键上的top10df中。
groups = ['col1','col2','col3','col4']
min_date_created = fn.min('date_created').alias('min_date_created')
max_date_created = fn.max('date_created').alias('max_date_created')
res_df = (top10_df
.groupBy(groups)
.agg(min_date_created
,max_date_created
)
)
res_df.persist()
print(res_df.count())
score_rank = fn.row_number().over(w.partitionBy(groups).orderBy(fn.desc('score')))
unique_issue_id = fn.row_number().over(w.orderBy(groups))
out_df = (top10_df.alias('t10')
.join(res_df.alias('res'),groups,'left')
.where(fn.col('t10.date_created')==fn.col('res.max_date_created'))
.drop(fn.col('t10.date_created'))
.drop(fn.col('t10.date_updated'))
.withColumn('score_rank',score_rank)
.where(fn.col('score_rank')==1)
.drop('score_rank'
,'latest_revision_complete_hash'
,'latest_revision_durable_hash'
)
.withColumn('unique_issue_id',unique_issue_id)
.withColumnRenamed('res.id','resource_id')
)
out_df.persist()
print(out_df.count())
代替:
out_df = (top10_df.alias('t10')
.join(res_df.alias('res'),groups,'left')
在联接之后,选择右侧df中的所有列并对其进行别名,以消除重复属性的歧义:
out_df = (
top10_df.alias('t10')
.join(
res_df.alias('res').select(
fn.col('groups').alias('groups'),
fn.col('min_date_created').alias('min_date_created'),
fn.col('max_date_created').alias('max_date_created')
),
groups,
'left'
)