我有一个函数,返回一个DataFrame
和两个累加器。使用spark-shell
手动运行(从jar中调用该函数),它会像预期的那样工作。在DataFrame
上执行.count
填充累加器。
但是如果我从spark-submit
调用函数,累加器总是保持空。我已经尝试返回两个DataFrame
具有相同的奇怪行为:工程在spark-shell
,不从spark-submit
。
这是我的代码可能无法运行的框架:
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext, Row}
import scala.collection._
...
def process(
sc:SparkContext,
sqlContext:SQLContext,
filepaths : RDD[String]
): (
val logMetadataAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String, String, Long, String, Long, Long)]())
val logFailedRowAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String)]())
...
...
val logRecordsPre = logRawFlow.map(
entry => {
val date = """(dddd)-(dd)-(dd)""".r
if ( fields.length == 23 && date.findFirstMatchIn(fields(2)).nonEmpty && fields(22).forall(_.isDigit) && fields(21).forall(_.isDigit) ) {
...
Row( 1L, "blah" "blah", 0L )
}
else ( fields(0) == logMetaDataPrefix ) {
...
logMetadataAccumulator += (fileName, logType, numLines, logSource, startTime, endTime)
Row( 0L, "blah" "blah", 0L )
}
else {
try {
val fileName = fields(0)
logFailedRowAccumulator += (fileName)
Row( 0L, "blah" "blah", 0L )
}
catch {
case e: Exception => {
logFailedRowAccumulator += ("unknown")
Row( 0L, "blah" "blah", 0L )
}
}
}
}
)
val logRecords = logRecordsPre.filter( _.getLong(0) != 0L)
val logDF = sqlContext.createDataFrame(logRecords, logSchema)
( logDF, logMetadataAccumulator, logFailedRowAccumulator )
)
我的错,经过仔细检查,我发现,虽然在shell中手动调用函数,我在将累加器转换为数据帧之前正在做.count
,但在保存数据帧并由spark-submit
的跑步者调用的函数中,从累加器创建的数据帧是在返回的数据帧有一个动作之前创建的。
,
val ( df, acc1, acc2 ) = process( sc, sqlContext, filePaths )
df.count
val acc1Array = acc1.value.toArray
val acc2Array = acc2.value.toArray
NOT (What I was doing the Runner:)
val ( df, acc1, acc2 ) = process( sc, sqlContext, filePaths )
val acc1Array = acc1.value.toArray
val acc2Array = acc2.value.toArray
df.count