无法将元素追加到PySpark中的列表中



我正在尝试比较表中的行,并记录DataBricks环境中PySpark中发生更改的值。我创建了一个空列表,并试图在发生更改时将项目追加到列表中。问题是,当我发现一个更改并插入列表时,我得到了这个错误:

AttributeError: 'numpy.int32' object has no attribute '_get_object_id'

以下是代码的大致片段。这不是我使用的确切代码。

def compare_rows(df):
df = df.toPandas()
changes = []
if df.col_x != df2.col_y:
changes.append(col_x)
return changes

为了方便起见,我将df转换为Pandas DataFrame。当我试图将元素附加到列表更改时,会发生错误。经过一些研究,我发现这可能是由于PySpark中Python和Java集合之间发生了Py4j转换。我对此并不完全确定。此外,当我没有将其作为一个函数来实现时,我也不会得到这个错误。有人能帮我弄清楚吗?

在这里分享一种与您所执行的略有不同的方法,比较spark中每一行的最佳方法是将数据帧与基于primary_key的每一列连接起来,并获得结果输出-下面的代码片段解释过程

在此处创建数据框

df_left = spark.createDataFrame([(1,"chicago","USA", "US", "US"),(2,"houston","USA","US" , "US"),(3,"Sydney","Australia","None","AU")],[ "id","city","country","region","continent"])
df_right = spark.createDataFrame([(1,"chicago","USA", "USA", "USA"),(2,"houston","USA","US","US"),(3,"Sydney","Australia","None","AU")],[ "id","city","country","region","continent"])
df.show()
+---+-------+---------+------+---------+
| id|   city|  country|region|continent|
+---+-------+---------+------+---------+
|  1|chicago|      USA|    US|       US|
|  2|houston|      USA|    US|       US|
|  3| Sydney|Australia|  None|       AU|
+---+-------+---------+------+---------+
+---+-------+---------+------+---------+
| id|   city|  country|region|continent|
+---+-------+---------+------+---------+
|  1|chicago|      USA|   USA|      USA|
|  2|houston|      USA|    US|       US|
|  3| Sydney|Australia|  None|       AU|
+---+-------+---------+------+---------+

首先重命名右侧的数据帧列,这样您就可以同时拥有这两个数据帧中的所有列

df_right = df_right.withColumnRenamed("id", "right_id").withColumnRenamed("city", "right_city").withColumnRenamed("country", "right_country").withColumnRenamed("region", "right_region").withColumnRenamed("continent", "right_continent")
df_result = df_left.join(df_right, (df_left.id == df_right.right_id) & (df_left.city == df_right.right_city) &  (df_left.country == df_right.right_country) & (df_left.region == df_right.right_region), "left")
+---+-------+---------+------+---------+--------+----------+-------------+------------+---------------+
| id|   city|  country|region|continent|right_id|right_city|right_country|right_region|right_continent|
+---+-------+---------+------+---------+--------+----------+-------------+------------+---------------+
|  1|chicago|      USA|    US|       US|    null|      null|         null|        null|           null|
|  2|houston|      USA|    US|       US|       2|   houston|          USA|          US|             US|
|  3| Sydney|Australia|  None|       AU|       3|    Sydney|    Australia|        None|             AU|
+---+-------+---------+------+---------+--------+----------+-------------+------------+---------------+

现在,如果您想从右侧数据帧中查看不匹配的事务,只需过滤掉null id列

df_result = df_result.filter(F.col("right_id").isNull())
df_result.show()
+---+-------+-------+------+---------+--------+----------+-------------+------------+---------------+
| id|   city|country|region|continent|right_id|right_city|right_country|right_region|right_continent|
+---+-------+-------+------+---------+--------+----------+-------------+------------+---------------+
|  1|chicago|    USA|    US|       US|    null|      null|         null|        null|           null|
+---+-------+-------+------+---------+--------+----------+-------------+------------+---------------+

最新更新