当标准化大型Pyspark数据框架时,CodeGen会生长超过64 KB错误



我有一个带有1300万行和800列的pyspark数据框架。我需要将这些数据归一化,因此一直使用此代码,该代码可与较小的开发数据集一起使用。

def z_score_w(col, w):
    avg_ = avg(col).over(w)
    stddev_ = stddev_pop(col).over(w)
    return (col - avg_) / stddev_
w = Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)    
norm_exprs = [z_score_w(signalsDF[x], w).alias(x) for x in signalsDF.columns]
normDF = signalsDF.select(norm_exprs)

但是,当使用完整数据集时,我会遇到使用Codegen的例外:

        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893
)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
        at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        ... 44 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.
spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB
        at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
        at org.codehaus.janino.CodeContext.write(CodeContext.java:836)
        at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251)
        at org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:8933)
        at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4346)
        at org.codehaus.janino.UnitCompiler.access$7100(UnitCompiler.java:185)
        at org.codehaus.janino.UnitCompiler$10.visitBooleanLiteral(UnitCompiler.java:3267)

周围有一些Spark Jira问题,这些问题似乎相似,但是这些都标记了。还有一个相关的问题,但答案是一种替代技术。

我有自己的解决方法,可以将数据框架的列批准化。这有效,但我最终得到了多个数据范围,然后我必须加入,这很慢。

那么,我的问题是 - 是否有一种替代技术来使我缺少的大型数据框架进行标准化?

我正在使用Spark-2.0.1。

一个明显的问题是您使用窗口函数的方式。以下帧:

Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)    

在实践中有点没用。没有分区列,它将所有数据首先重新装修为单个分区。这种缩放方法仅对分组进行缩放很有用。

Spark提供了两个可以用于扩展功能的类:

  • pyspark.ml.feature.StandardScaler
  • pyspark.mllib.feature.StandardScaler

不幸的是,两者都需要Vector数据作为输入。使用ML

from pyspark.ml.feature import StandardScaler as MLScaler, VectorAssembler
from pyspark.ml import Pipeline
scaled = Pipeline(stages=[
    VectorAssembler(inputCols=df.columns, outputCol="features"), 
    MLScaler(withMean=True, inputCol="features", outputCol="scaled")
]).fit(df).transform(df).select("scaled")

如果您需要原始形状,则需要进一步扩展scaled列。

使用mllib:

from pyspark.mllib.feature import StandardScaler as MLLibScaler
from pyspark.mllib.linalg import DenseVector
rdd = df.rdd.map(DenseVector)
scaler = MLLibScaler(withMean=True, withStd=True)
scaler.fit(rdd).transform(rdd).map(lambda v: v.array.tolist()).toDF(df.columns)

如果存在与列数相关的代码根问题,则后一种方法可能更有用。

另一种方法可以解决此问题以计算全局统计

from pyspark.sql.functions import avg, col, stddev_pop, struct
stats = df.agg(*[struct(avg(c), stddev_pop(c)) for c in df.columns]).first()

和选择:

df.select(*[
    ((col(c) - mean) / std).alias(c)
    for (c, (mean, std)) in zip(df.columns, stats)
])

按照您的评论,您可以认为可以使用numpy和一些基本转换来表达的最简单解决方案:

rdd = df.rdd.map(np.array)  # Convert to RDD of NumPy vectors
stats = rdd.stats()  # Compute mean and std
scaled = rdd.map(lambda v: (v - stats.mean()) / stats.stdev())  # Normalize

并转换回DataFrame

scaled.map(lambda x: x.tolist()).toDF(df.columns)

请参阅此链接,我们通过在代码中添加检查点解决了此错误。

检查点只需将数据或数据框架写回磁盘并将其读回。

https://stackoverflow.com/a/555208567/7241837

检查点上的详细信息

https://github.com/jerrylead/sparkinternals/blob/master/markdown/english/6-cacheandcheckpoint.md

问:哪种RDD需要检查点?

the computation takes a long time
the computing chain is too long
depends too many RDDs

实际上,在本地磁盘上保存ShuffMapTask的输出也是检查点,但仅用于分区的数据输出。

问:什么时候到检查点?

如上所述,每次需要缓存计算的分区时,它都会被缓存到内存中。但是,检查点不遵循相同的原则。取而代之的是,它一直等到作业结束,并启动另一个工作以完成检查点。需要检查的RDD将进行两次计算;因此建议在rdd.checkpoint()之前进行rdd.cache()。在这种情况下,第二项工作将不会重新计算RDD。相反,它只会读取缓存。实际上,Spark提供rdd.persist(Storagelevel.disk_only)方法,例如磁盘上的缓存。因此,它在第一次计算过程中将rdd缓存在磁盘上,但是这种持久性和检查点是不同的,我们将稍后讨论差异。

问:如何实现检查点?

这是过程:

rdd将是:[初始化 ->标记用于检查点 -> 检查点正在进行中 ->检查点]。最后,它将是 检查点。

similalry for DataFrame:将数据框架写入磁盘或S3,然后在新的DataFrame中读取数据。

初始化

在驱动程序端,在调用rdd.checkpoint()之后,RDD将由rddcheckpointdata管理。用户应设置检查点的存储路径(在HDFS上)。

标记用于检查点

初始化后,rddcheckpointdata将标记RDD标记ForCheckpoint。

检查点正在进行中

工作完成后,将调用finalrdd.docheckpoint()。FinalRDD向后扫描计算链。当遇到需要检查点的RDD时,RDD将被标记为CheckPointingInprogress,然后将配置文件(用于HDFS写作),例如Core-Site.xml,将被广播给其他工作节点的BlockManager。之后,将启动一项工作以完成检查点:

  rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString,  broadcastedConf))

检查点

作业完成检查点后,它将清洁RDD的所有依赖关系,并将RDD设置为检查点。然后,添加一个补充依赖性,并将父rdd设置为checkpointrdd。将来将使用CheckPoIntrDD从文件系统读取检查点文件,然后生成RDD分区

有趣的是:

在驱动程序程序中检查了两个RDD,但仅将结果(请参见下面的代码)成功检查点。不确定是否是错误的,还是下游RDD会被故意检查。

val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'),
    (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
   val pairs1 = sc.parallelize(data1, 3)
   val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
   val pairs2 = sc.parallelize(data2, 2)
   pairs2.checkpoint
   val result = pairs1.join(pairs2)
   result.checkpoint