对于协方差计算,Apache-Spark NA处理的确切差异是什么?



我最近观察到Pandas中的协方差计算结果与等效的MLLib之间存在显著差异。对于完全指定的输入(即没有任何NAs),结果相当接近,但对于缺失值,结果明显偏离。Pandas源代码解释了NAs是如何处理的,但我无法使用Spark复制结果。我无法在源代码中找到关于RowMatrix().computeCovariance()在NAs方面究竟做了什么的文档-但是我的Scala最好是非常公平的,我不熟悉BLAS,也许我错过了什么。有BLAS警告,我无法追踪原因,因为我使用的是预构建macOS Spark设置:

WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS

考虑到协方差对许多应用程序的重要性,我想知道是否有人可以阐明在Apache Spark MLLib中协方差计算的缺失值的确切处理?

编辑:此外,在当前的Spark 3.2版本中没有解决这个问题,因为The method `pd.DataFrame.cov()` is not implemented yet

假设如下设置:

from pyspark.sql import SparkSession
from pyspark.mllib.linalg.distributed import RowMatrix
spark = SparkSession.builder.appName("MyApp") 
.config("spark.sql.execution.arrow.pyspark.enabled", "true") 
.getOrCreate()
sc = spark.sparkContext
good_rows = sc.parallelize([[11, 12, 13, 14, 16, 17, 18], 
[21, 22, 23, 42, 26, 27, 28],
[31, 32, 33, 34, 36, 37, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1,  2,  3,  4,  6,  7,  8]])
bad_rows = sc.parallelize([[11, 12, None, 14, 16, None, 18], 
[21, 22, None, 42, 26, None, 28],
[31, 32, None, 34, 36, None, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1,  2,  3,  4,  6,  7,  8]])

good_rows计算的协方差对于Pandas和Spark是相等的:

good_rows.toDF().toPandas().cov()
# Results in:
_1     _2     _3     _4     _5     _6     _7
_1  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_2  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_3  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_4  332.0  332.0  332.0  368.0  332.0  332.0  332.0
_5  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_6  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_7  350.0  350.0  350.0  332.0  350.0  350.0  350.0
spark.createDataFrame(RowMatrix(good_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1     _2     _3     _4     _5     _6     _7
0  350.0  350.0  350.0  332.0  350.0  350.0  350.0
1  350.0  350.0  350.0  332.0  350.0  350.0  350.0
2  350.0  350.0  350.0  332.0  350.0  350.0  350.0
3  332.0  332.0  332.0  368.0  332.0  332.0  332.0
4  350.0  350.0  350.0  332.0  350.0  350.0  350.0
5  350.0  350.0  350.0  332.0  350.0  350.0  350.0
6  350.0  350.0  350.0  332.0  350.0  350.0  350.0

bad_rows运行相同的结果会得到非常不同的协方差矩阵,除非Pandas是cov(),用min_periods=(bad_rows.count()/2)+1运行

bad_rows.toDF().toPandas().cov()
#Results in: 
_1     _2     _3     _4     _5     _6     _7
_1  350.0  350.0  700.0  332.0  350.0  700.0  350.0
_2  350.0  350.0  700.0  332.0  350.0  700.0  350.0
_3  700.0  700.0  700.0  700.0  700.0  700.0  700.0
_4  332.0  332.0  700.0  368.0  332.0  700.0  332.0
_5  350.0  350.0  700.0  332.0  350.0  700.0  350.0
_6  700.0  700.0  700.0  700.0  700.0  700.0  700.0
_7  350.0  350.0  700.0  332.0  350.0  700.0  350.0
spark.createDataFrame(RowMatrix(bad_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1     _2  _3     _4     _5  _6     _7
0  350.0  350.0 NaN  332.0  350.0 NaN  350.0
1  350.0  350.0 NaN  332.0  350.0 NaN  350.0
2    NaN    NaN NaN    NaN    NaN NaN    NaN
3  332.0  332.0 NaN  368.0  332.0 NaN  332.0
4  350.0  350.0 NaN  332.0  350.0 NaN  350.0
5    NaN    NaN NaN    NaN    NaN NaN    NaN
6  350.0  350.0 NaN  332.0  350.0 NaN  350.0
bad_rows.toDF().toPandas().cov(min_periods=(bad_rows.count()/2)+1)
# With 50% of dataframe rows +1 Pandas equals the Spark result:
_1     _2  _3     _4     _5  _6     _7
_1  350.0  350.0 NaN  332.0  350.0 NaN  350.0
_2  350.0  350.0 NaN  332.0  350.0 NaN  350.0
_3    NaN    NaN NaN    NaN    NaN NaN    NaN
_4  332.0  332.0 NaN  368.0  332.0 NaN  332.0
_5  350.0  350.0 NaN  332.0  350.0 NaN  350.0
_6    NaN    NaN NaN    NaN    NaN NaN    NaN
_7  350.0  350.0 NaN  332.0  350.0 NaN  350.0

我确实尝试将None设置为0mean,但无法用这些标准imputations重现MLLib协方差结果,见下文。

# Zero NA fill:
zeroed_na_rows = sc.parallelize([[11, 12, 0, 14, 16, 0, 18], 
[21, 22, 0, 42, 26, 0, 28],
[31, 32, 0, 34, 36, 0, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[1, 2, 3, 4, 6, 7, 8]])
spark.createDataFrame(RowMatrix(zeroed_na_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1     _2     _3     _4     _5     _6     _7
0  350.0  350.0  379.0  332.0  350.0  391.0  350.0
1  350.0  350.0  379.0  332.0  350.0  391.0  350.0
2  379.0  379.0  606.7  319.6  379.0  646.3  379.0
3  332.0  332.0  319.6  368.0  332.0  324.4  332.0
4  350.0  350.0  379.0  332.0  350.0  391.0  350.0
5  391.0  391.0  646.3  324.4  391.0  690.7  391.0
6  350.0  350.0  379.0  332.0  350.0  391.0  350.0
# Mean NA fill:
mean_rows = sc.parallelize([[11, 12, 27, 14, 16, 37, 18], 
[21, 22, 27, 42, 26, 37, 28],
[31, 32, 27, 34, 36, 37, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1,  2,  3,  4,  6,  7,  8]])
spark.createDataFrame(RowMatrix(mean_rows).computeCovariance().toArray().tolist()).toPandas()
#Results in (still different from Pandas.cov()):
_1     _2     _3     _4     _5     _6     _7
0  350.0  350.0  298.0  332.0  350.0  280.0  350.0
1  350.0  350.0  298.0  332.0  350.0  280.0  350.0
2  298.0  298.0  290.8  287.2  298.0  280.0  298.0
3  332.0  332.0  287.2  368.0  332.0  280.0  332.0
4  350.0  350.0  298.0  332.0  350.0  280.0  350.0
5  280.0  280.0  280.0  280.0  280.0  280.0  280.0
6  350.0  350.0  298.0  332.0  350.0  280.0  350.0

如果不是这样,这里发生了什么,我如何让Spark MLLib产生与Pandas相当相似的结果?

如果不重新实现自己的cov方法,我认为没有简单的方法可以在Spark中复制nan的Pandas处理。

原因是Pandas只是忽略每个NAN -它不会用任何值替换它-这就是为什么你用0或平均值替换NAN不会导致相同的结果。相反,Pandas似乎丢弃了缺失值的那对观测值,并计算剩余对的协方差。

另一方面,Spark实现在被要求计算包含NAN的一组对的协方差时返回NAN。我不知道在代码/计算中到底发生了什么,但据我所知,你不能轻易地通过改变默认参数来改变它,你可能不得不创建你自己的cov函数版本,或者找到一种方法来预处理和后处理带有nan的列,例如,删除它们的nan并计算协方差,然后用这些值替换你的协方差矩阵中的nan。

最新更新