Spark ML Pipeline导致java.lang.Exception: failed to compile ..



使用Spark 2.0,我试图在pyspark ML管道中运行一个简单的VectorAssembler,如下所示:

feature_assembler = VectorAssembler(inputCols=['category_count', 'name_count'], 
                                    outputCol="features") 
pipeline = Pipeline(stages=[feature_assembler])
model = pipeline.fit(df_train)
model_output = model.transform(df_train)

当我尝试使用

查看输出时
model_output.select("features").show(1)

我得到错误

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-95-7a3e3d4f281c> in <module>()
      2 
      3 
----> 4 model_output.select("features").show(1)
/usr/local/spark20/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
    285         +---+-----+
    286         """
--> 287         print(self._jdf.showString(n, truncate))
    288 
    289     def __repr__(self):
/usr/local/spark20/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:
/usr/local/spark20/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()
/usr/local/spark20/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling {0}{1}{2}.n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(
Py4JJavaError: An error occurred while calling o2875.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure:   
Task 0 in stage 1084.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1084.0 (TID 42910, 169.45.92.174): 
java.util.concurrent.ExecutionException: java.lang.Exception: failed to 
compile: org.codehaus.janino.JaninoRuntimeException: Code of method "
  (Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I" of class    
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering" 
grows beyond 64 KB

然后列出生成的代码,如下所示:

/* 001 */ public SpecificOrdering generate(Object[] references) {
/* 002 */   return new SpecificOrdering(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificOrdering extends org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */
/* 009 */
/* 010 */   public int compareStruct1(InternalRow a, InternalRow b) {
/* 011 */     InternalRow i = null;
/* 012 */
/* 013 */     i = a;
/* 014 */     boolean isNullA836;
/* 015 */     byte primitiveA834;
/* 016 */     {
/* 017 */
/* 018 */       boolean isNull834 = i.isNullAt(0);
/* 019 */       byte value834 = isNull834 ? (byte)-1 : (i.getByte(0));
/* 020 */       isNullA836 = isNull834;
/* 021 */       primitiveA834 = value834;
/* 022 */     }
/* 023 */     i = b;
…
/* 28649 */     return 0;
/* 28650 */   }
/* 28651 */ }

为什么这个简单的VectorAssembler生成28,651行代码并超过64KB?

Spark的惰性求值似乎有一个限制,即64KB。换句话说,在这种情况下,它有点太懒了,这导致它达到了限制。

我能够围绕这个相同的异常工作,我是通过一个join而不是一个VectorAssembler触发的,通过在我的Dataset s之一上调用cache,大约在我的管道的一半。然而,我不知道(还)确切地知道为什么这解决了这个问题。

相关内容

  • 没有找到相关文章

最新更新