我正在尝试比较表中的行,并记录DataBricks环境中PySpark中发生更改的值。我创建了一个空列表,并试图在发生更改时将项目追加到列表中。问题是,当我发现一个更改并插入列表时,我得到了这个错误:
AttributeError: 'numpy.int32' object has no attribute '_get_object_id'
以下是代码的大致片段。这不是我使用的确切代码。
def compare_rows(df):
df = df.toPandas()
changes = []
if df.col_x != df2.col_y:
changes.append(col_x)
return changes
为了方便起见,我将df转换为Pandas DataFrame。当我试图将元素附加到列表更改时,会发生错误。经过一些研究,我发现这可能是由于PySpark中Python和Java集合之间发生了Py4j转换。我对此并不完全确定。此外,当我没有将其作为一个函数来实现时,我也不会得到这个错误。有人能帮我弄清楚吗?
在这里分享一种与您所执行的略有不同的方法,比较spark中每一行的最佳方法是将数据帧与基于primary_key
的每一列连接起来,并获得结果输出-下面的代码片段解释过程
在此处创建数据框
df_left = spark.createDataFrame([(1,"chicago","USA", "US", "US"),(2,"houston","USA","US" , "US"),(3,"Sydney","Australia","None","AU")],[ "id","city","country","region","continent"])
df_right = spark.createDataFrame([(1,"chicago","USA", "USA", "USA"),(2,"houston","USA","US","US"),(3,"Sydney","Australia","None","AU")],[ "id","city","country","region","continent"])
df.show()
+---+-------+---------+------+---------+
| id| city| country|region|continent|
+---+-------+---------+------+---------+
| 1|chicago| USA| US| US|
| 2|houston| USA| US| US|
| 3| Sydney|Australia| None| AU|
+---+-------+---------+------+---------+
+---+-------+---------+------+---------+
| id| city| country|region|continent|
+---+-------+---------+------+---------+
| 1|chicago| USA| USA| USA|
| 2|houston| USA| US| US|
| 3| Sydney|Australia| None| AU|
+---+-------+---------+------+---------+
首先重命名右侧的数据帧列,这样您就可以同时拥有这两个数据帧中的所有列
df_right = df_right.withColumnRenamed("id", "right_id").withColumnRenamed("city", "right_city").withColumnRenamed("country", "right_country").withColumnRenamed("region", "right_region").withColumnRenamed("continent", "right_continent")
df_result = df_left.join(df_right, (df_left.id == df_right.right_id) & (df_left.city == df_right.right_city) & (df_left.country == df_right.right_country) & (df_left.region == df_right.right_region), "left")
+---+-------+---------+------+---------+--------+----------+-------------+------------+---------------+
| id| city| country|region|continent|right_id|right_city|right_country|right_region|right_continent|
+---+-------+---------+------+---------+--------+----------+-------------+------------+---------------+
| 1|chicago| USA| US| US| null| null| null| null| null|
| 2|houston| USA| US| US| 2| houston| USA| US| US|
| 3| Sydney|Australia| None| AU| 3| Sydney| Australia| None| AU|
+---+-------+---------+------+---------+--------+----------+-------------+------------+---------------+
现在,如果您想从右侧数据帧中查看不匹配的事务,只需过滤掉null id列
df_result = df_result.filter(F.col("right_id").isNull())
df_result.show()
+---+-------+-------+------+---------+--------+----------+-------------+------------+---------------+
| id| city|country|region|continent|right_id|right_city|right_country|right_region|right_continent|
+---+-------+-------+------+---------+--------+----------+-------------+------------+---------------+
| 1|chicago| USA| US| US| null| null| null| null| null|
+---+-------+-------+------+---------+--------+----------+-------------+------------+---------------+