使用Spark 2.0,我试图在pyspark ML管道中运行一个简单的VectorAssembler,如下所示:
feature_assembler = VectorAssembler(inputCols=['category_count', 'name_count'],
outputCol="features")
pipeline = Pipeline(stages=[feature_assembler])
model = pipeline.fit(df_train)
model_output = model.transform(df_train)
当我尝试使用
查看输出时model_output.select("features").show(1)
我得到错误
Py4JJavaError Traceback (most recent call last)
<ipython-input-95-7a3e3d4f281c> in <module>()
2
3
----> 4 model_output.select("features").show(1)
/usr/local/spark20/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
285 +---+-----+
286 """
--> 287 print(self._jdf.showString(n, truncate))
288
289 def __repr__(self):
/usr/local/spark20/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/usr/local/spark20/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark20/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(
Py4JJavaError: An error occurred while calling o2875.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 1084.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1084.0 (TID 42910, 169.45.92.174):
java.util.concurrent.ExecutionException: java.lang.Exception: failed to
compile: org.codehaus.janino.JaninoRuntimeException: Code of method "
(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I" of class
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering"
grows beyond 64 KB
然后列出生成的代码,如下所示:
/* 001 */ public SpecificOrdering generate(Object[] references) {
/* 002 */ return new SpecificOrdering(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificOrdering extends org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering {
/* 006 */
/* 007 */ private Object[] references;
/* 008 */
/* 009 */
/* 010 */ public int compareStruct1(InternalRow a, InternalRow b) {
/* 011 */ InternalRow i = null;
/* 012 */
/* 013 */ i = a;
/* 014 */ boolean isNullA836;
/* 015 */ byte primitiveA834;
/* 016 */ {
/* 017 */
/* 018 */ boolean isNull834 = i.isNullAt(0);
/* 019 */ byte value834 = isNull834 ? (byte)-1 : (i.getByte(0));
/* 020 */ isNullA836 = isNull834;
/* 021 */ primitiveA834 = value834;
/* 022 */ }
/* 023 */ i = b;
…
/* 28649 */ return 0;
/* 28650 */ }
/* 28651 */ }
为什么这个简单的VectorAssembler生成28,651行代码并超过64KB?
Spark的惰性求值似乎有一个限制,即64KB。换句话说,在这种情况下,它有点太懒了,这导致它达到了限制。
我能够围绕这个相同的异常工作,我是通过一个join
而不是一个VectorAssembler触发的,通过在我的Dataset
s之一上调用cache
,大约在我的管道的一半。然而,我不知道(还)确切地知道为什么这解决了这个问题。