我正在尝试在pyspark中编写一个用户定义的函数,该函数确定数据帧中的给定条目是否错误(Null或NaN)。 我似乎无法弄清楚我在这个函数中做错了什么:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
def is_bad(value):
if (value != value | (value.isNull())):
return True
else:
return False
isBadEntry = UserDefinedFunction(lambda x: is_bad(x),BooleanType())
df_test = sql.createDataFrame([(1,1,None ), (1,2, 5), (1,3, None), (1,4, None), (1,5, 10), (1,6,None )], ('session',"timestamp", "id"))
df_test =df_test.withColumn("testing", isBadEntry(df_test.id)).show()
这是崩溃并出现一个神秘的错误:
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-379-b4109047ba40> in <module>()
1 df_test = sql.createDataFrame([(1,1,None ), (1,2, 5), (1,3, None), (1,4, None), (1,5, 10), (1,6,None )], ('session',"timestamp", "id"))
2 #df_test.show()
----> 3 df_test =df_test.withColumn("testing", isBadEntry(df_test.id)).show()
/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate)
285 +---+-----+
286 """
--> 287 print(self._jdf.showString(n, truncate))
288
289 def __repr__(self):
/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o29884.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 43.0 failed 4 times, most recent failure: Lost task 0.3 in stage 43.0 (TID 167, 172.16.193.79): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
process()
File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
func = lambda _, it: map(mapper, it)
File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-378-2aac40340a6c>", line 14, in <lambda>
File "<ipython-input-378-2aac40340a6c>", line 9, in is_bad
AttributeError: 'NoneType' object has no attribute 'isNull'
有人可以帮忙吗?
正如 Psidom 在注释中暗示的那样,在 Python 中,NULL 对象是单例None
(源);按如下方式更改函数工作正常:
def is_bad(value):
if (value != value) | (value is None):
return True
else:
return False
isBadEntry = UserDefinedFunction(lambda x: is_bad(x),BooleanType())
df_test.withColumn("testing", is_bad(df_test.id)).show()
# +-------+---------+----+-------+
# |session|timestamp| id|testing|
# +-------+---------+----+-------+
# | 1| 1|null| true|
# | 1| 2| 5| false|
# | 1| 3|null| true|
# | 1| 4|null| true|
# | 1| 5| 10| false|
# | 1| 6|null| true|
# +-------+---------+----+-------+
并且也适用于NaN
的:
from pyspark.sql import Row
# toy data:
df = spark.createDataFrame([Row(1.0, 7., None),
Row(2., 4., float('nan')),
Row(3., 3., 5.0),
Row(4., 1., 4.0),
Row(5., 1., 1.0)],
["col_1", "col_2", "col_3"])
df.withColumn("testing", isBadEntry(df.col_3)).show()
# +-----+-----+-----+-------+
# |col_1|col_2|col_3|testing|
# +-----+-----+-----+-------+
# | 1.0| 7.0| null| true|
# | 2.0| 4.0| NaN| true|
# | 3.0| 3.0| 5.0| false|
# | 4.0| 1.0| 4.0| false|
# | 5.0| 1.0| 1.0| false|
# +-----+-----+-----+-------+