替换为pyspark中的with Column



你能帮我理解以下错误消息及其背后的原因吗:

创建一个伪数据集:

df_=spark.createDataFrame([(1, np.nan,'x'), (None, 2.0,'y'),(3,4.0,None)], ("a", "b","c"))
df_.show()
+----+---+----+
|   a|  b|   c|
+----+---+----+
|   1|NaN|   x|
|null|2.0|   y|
|   3|4.0|null|
+----+---+----+

现在,我尝试以以下方式替换列"b"中的NaN:

df_.withColumn("b", df_.select("b").replace({float("nan"):5}).b)

df_.select("b").replace({float("nan"):5}).b运行得很好,并提供了一个具有期望值的适当列。然而上面的代码不起作用,我无法理解错误

我得到的错误是:

AnalysisException                         Traceback (most recent call last)
Cell In[170], line 1
----> 1 df_.withColumn("b", df_.select("b").replace({float("nan"):5}).b)
File /usr/lib/spark/python/pyspark/sql/dataframe.py:2455, in DataFrame.withColumn(self, colName, col)
2425 """
2426 Returns a new :class:`DataFrame` by adding a column or replacing the
2427 existing column that has the same name.
(...)
2452 
2453 """
2454 assert isinstance(col, Column), "col should be Column"
-> 2455 return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
File /opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
1298 command = proto.CALL_COMMAND_NAME +
1299     self.command_header +
1300     args_command +
1301     proto.END_COMMAND_PART
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305     answer, self.gateway_client, self.target_id, self.name)
1307 for temp_arg in temp_args:
1308     temp_arg._detach()
File /usr/lib/spark/python/pyspark/sql/utils.py:117, in capture_sql_exception.<locals>.deco(*a, **kw)
113 converted = convert_exception(e.java_exception)
114 if not isinstance(converted, UnknownException):
115     # Hide where the exception came from that shows a non-Pythonic
116     # JVM exception message.
--> 117     raise converted from None
118 else:
119     raise
AnalysisException: Resolved attribute(s) b#1083 missing from a#930L,b#931,c#932 in operator !Project [a#930L, b#1083 AS b#1085, c#932]. Attribute(s) with the same name appear in the operation: b. Please check if the right attribute(s) are used.;
!Project [a#930L, b#1083 AS b#1085, c#932]
+- LogicalRDD [a#930L, b#931, c#932], false

我可以通过使用替换API中的子集参数来实现所需的目标。即df_.replace({float("nan"):5},subset = ['b'])然而,我正在努力更好地理解我所看到的错误及其背后的原因

基于df.withColumn:的文档

通过添加列或替换现有列来返回新的DataFrame具有相同名称的列。

列表达式必须是此DataFrame上的表达式;尝试从其他DataFrame添加列将引发错误

所以当你这样做时df_.select("b").replace({float("nan"):5}).b这将创建具有列b的不同属性id的不同数据帧(因为df_.select返回新的数据帧)。原始数据帧中不存在此属性id。

您应该使用replace with subset,它引用来自同一数据帧的同一指针

new_df = df_.replace({float("nan"):5},subset='b')
new_df.explain()
== Physical Plan ==
*(1) Project [a#2131L, CASE WHEN (b#2132 = NaN) THEN 5.0 ELSE b#2132 END AS b#2351, c#2133]
+- *(1) Scan ExistingRDD[a#2131L,b#2132,c#2133]

注意以下属性指针的变化:

df1 = df_
df1.replace({float("nan"):5},subset='b').explain()
== Physical Plan ==
*(1) Project [a#2131L, CASE WHEN (b#2132 = NaN) THEN 5.0 ELSE b#2132 END AS b#2378, c#2133]
+- *(1) Scan ExistingRDD[a#2131L,b#2132,c#2133]

最新更新