我有一个包含数值的csv文件。
val row = withoutHeader.map{
line => {
val arr = line.split(',')
for (h <- 0 until arr.length){
if(arr(h).trim == ""){
val abc = avgrdd.filter {case ((x,y),z) => x == h && y == arr(dependent_col_index).toDouble} //crashing here
arr(h) = //imputing with the value above
}
}
arr.mkString(",")
}
}
这是我尝试用类标签的平均值插补缺失值的代码片段。
avgrdd 包含键值对的平均值,其中键是列索引和类标签值。这个 avgrdd 是使用合并器计算的,我看到它正在正确计算结果。
dependent_col_index是包含类标签的列。
带有筛选器的行崩溃并出现空指针异常。删除此行时,原始数组是输出(逗号分隔)。
我很困惑为什么过滤器操作会导致崩溃。
请就如何解决此问题提出建议。
例
col1,dependent_col_index
4,1
8,0
,1
21,1
21,0
,1
25,1
,0
34,1
mean for class 1 is 84/4 = 21 and for class 0 is 29/2 = 14.5
Required Output
4,1
8,0
21,1
21,1
21,0
21,1
25,1
14.5,0
34,1
谢谢!!
您正在尝试在另一个 RDD 转换中执行 RDD 转换。请记住,您不能在另一个RDD转换中使用RDD,这会导致错误。
继续操作的方式如下:
- 将源 RDD
withoutHeader
转换为相应类型的<Class, Value>
对的 RDD(在您的情况下为 Long)。缓存它 - 在
withoutHeader
之上计算avgrdd
。这应该是对的RDD<Class, AvgValue>
withoutHeader
RDD连接并avgrdd
在一起 - 这样对于每一行,您将拥有一个结构<Class, <Value, AvgValue>>
- 在结果之上执行
map
,以将缺失的Value
替换为AvgValue
另一种选择可能是在步骤 3 中将 RDD 分成两部分(一部分 - 具有缺失值的 RDD,第二部分 - 具有非缺失值的 RDD),仅将avgrdd
与仅包含缺失值的 RDD 连接起来,然后在这两部分之间建立联合。如果你有一小部分缺失值会更快