我正在做一个小项目,从kafka中获取数据,并将每条记录发送到UDF。在UDF中,我们有while循环代码,我需要用尾部递归来替换它。
while (condition) {
fields
body
}
至
def whileReplacement(dummy: Int): Int = {
if(!condition) return 1
body
return parseExtTag(dummy)
}
但我得到了java.io.NotSerializableException
。我不知道是什么导致了错误以及如何解决它。如果你有任何更好的方法来解决这个问题,请提供它。谢谢
Before I Just声明并调用UDF函数中的递归函数it self。
这个问题是通过将递归函数放在UDF函数之外来解决的。我认为通过向spark执行器提供函数,可以解决这种情况下的序列化问题。这只是我的理解,我不确定到底发生了什么。如果有人知道,请解释一下。