从函数返回数据框和累加器



我有一个函数,返回一个DataFrame和两个累加器。使用spark-shell手动运行(从jar中调用该函数),它会像预期的那样工作。在DataFrame上执行.count填充累加器。

但是如果我从spark-submit调用函数,累加器总是保持空。我已经尝试返回两个DataFrame具有相同的奇怪行为:工程在spark-shell,不从spark-submit

这是我的代码可能无法运行的框架:

import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext, Row}
import scala.collection._
...
def process(
    sc:SparkContext,
    sqlContext:SQLContext,
    filepaths : RDD[String]
    ): ( 
    val logMetadataAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String, String, Long, String, Long, Long)]())
    val logFailedRowAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String)]())
    ...
    ...
    val logRecordsPre = logRawFlow.map(
        entry => {
            val date = """(dddd)-(dd)-(dd)""".r
            if ( fields.length == 23 && date.findFirstMatchIn(fields(2)).nonEmpty && fields(22).forall(_.isDigit) && fields(21).forall(_.isDigit) ) {
                ...
                Row( 1L, "blah" "blah", 0L )
            }
            else ( fields(0) == logMetaDataPrefix ) {
                ...
                logMetadataAccumulator += (fileName, logType, numLines, logSource, startTime, endTime)
                Row( 0L, "blah" "blah", 0L )
            }
            else {
                try { 
                    val fileName = fields(0)
                    logFailedRowAccumulator += (fileName)
                    Row( 0L, "blah" "blah", 0L )
                }
                catch {
                    case e: Exception => {
                        logFailedRowAccumulator += ("unknown")
                        Row( 0L, "blah" "blah", 0L )
                    }
                }
            }
        }
    )
    val logRecords = logRecordsPre.filter( _.getLong(0) != 0L)
    val logDF = sqlContext.createDataFrame(logRecords, logSchema)
    ( logDF, logMetadataAccumulator, logFailedRowAccumulator )
)

我的错,经过仔细检查,我发现,虽然在shell中手动调用函数,我在将累加器转换为数据帧之前正在做.count,但在保存数据帧并由spark-submit的跑步者调用的函数中,从累加器创建的数据帧是在返回的数据帧有一个动作之前创建的。

,

val ( df, acc1, acc2 ) = process( sc, sqlContext, filePaths )
df.count
val acc1Array = acc1.value.toArray
val acc2Array = acc2.value.toArray

NOT (What I was doing the Runner:)

val ( df, acc1, acc2 ) = process( sc, sqlContext, filePaths )
val acc1Array = acc1.value.toArray
val acc2Array = acc2.value.toArray
df.count

相关内容

  • 没有找到相关文章

最新更新