Spark Collect方法在处理RDD [String]中存储的记录时需要太多时间



我有一个要求,我必须将parquet文件从s3处理中提取并将其转换为另一种对象格式,然后将其存储在json和parquet格式中。

我已经完成了此问题语句的以下更改,但是Spark Job花费了太多收集语句的时间,请让我知道如何进行优化,以下是完整的代码,它从S3和处理并将其存储到S3。我是Spark和Bigdata技术的新手

package com.expedia.www.lambda
import java.io._
import com.amazonaws.ClientConfiguration
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing}
import com.expedia.hendrix.lambda.HotelInfosite
import com.expedia.www.hendrix.signals.definition.local.HotelInfoSignal
import com.expedia.www.options.HendrixHistoricalOfflineProcessorOptions
import com.expedia.www.user.interaction.v1.UserInteraction
import com.expedia.www.util._
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random

object  GenericLambdaMapper{
  private def currentTimeMillis: Long = System.currentTimeMillis
  /** The below Generic mapper object is built for creating json similar to the Signal pushed by hendrix */
  def populateSignalRecord( genericRecord: GenericRecord, uisMessage: UserInteraction, signalType: String): HotelInfoSignal ={
    val objectMapper:ObjectMapper = new ObjectMapper
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    objectMapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true)
    val hotelInfoObject = objectMapper.readValue( genericRecord.toString, classOf[com.expedia.www.hendrix.signals.definition.local.HotelInfosite])
    val userKey = UserKeyUtil.createUserKey(uisMessage)
    val hotelInfoSignal:HotelInfoSignal = new HotelInfoSignal
    hotelInfoSignal.setSignalType(signalType)
    hotelInfoSignal.setData(hotelInfoObject)
    hotelInfoSignal.setUserKey(userKey)
    hotelInfoSignal.setGeneratedAtTimestamp(currentTimeMillis)
    return hotelInfoSignal
  }
}
class GenericLambdaMapper extends Serializable{
  var LOGGER:Logger = LoggerFactory.getLogger("GenericLambdaMapper")
  var bw : BufferedWriter  = null
  var fw :FileWriter = null
  val random: Random = new Random
  var counter: Int = 0
  var fileName: String= null
  val s3Util = new S3Util

  /** Object Mapper function for serializing and deserializing objects**/
  def objectMapper : ObjectMapper= {
    val mapper = new ObjectMapper
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true)
  }

  def process(sparkContext: SparkContext, options: HendrixHistoricalOfflineProcessorOptions ): Unit = { //ObjectListing
    try {
      LOGGER.info("Start Date : "+options.startDate)
      LOGGER.info("END Date : "+options.endDate)
      val listOfFilePath: List[String] = DateTimeUtil.getDateRangeStrFromInput(options.startDate, options.endDate)
      /**Looping through each folder based on start and end date **/
      listOfFilePath.map(
        path => applyLambdaForGivenPathAndPushToS3Signal( sparkContext, path, options )
      )
    }catch {
      case ex: Exception => {
        LOGGER.error( "Exception in downloading data :" + options.rawBucketName + options.rawS3UploadRootFolder  + options.startDate)
        LOGGER.error("Stack Trace :"+ExceptionUtils.getFullStackTrace(ex))
      }
    }
  }
  // TODO: Currently the Lambda is hardcoded only to HotelInfoSite to be made generic
  def prepareUisObjectAndApplyLambda(uisMessage: UserInteraction, options: HendrixHistoricalOfflineProcessorOptions): List[GenericRecord] = {
    try {
      val schemaDefinition = Source.fromInputStream(getClass.getResourceAsStream("/"+options.avroSchemaName)).getLines.mkString("n")
      val schemaHotelInfo = new Schema.Parser().parse(schemaDefinition)
      HotelInfosite.apply(uisMessage, schemaHotelInfo).toList
    }catch {
      case ex: Exception =>  LOGGER.error("Exception while preparing UIS Object" + ex.toString)
        List.empty
    }
  }
  /** Below method is used to extract userInteraction Data from Raw file **/
  private def constructUisObject(uisMessageRaw: String): UserInteraction = objectMapper.readValue( uisMessageRaw, classOf[UserInteraction])

  /** Below function contains logic to apply the lambda for the given range of dates and push to signals folder in S3 **/
  def applyLambdaForGivenPathAndPushToS3Signal(sparkContext: SparkContext, dateFolderPath: String, options: HendrixHistoricalOfflineProcessorOptions ): Unit ={
    var awsS3Client: AmazonS3Client = null;
    try {
      if ("sandbox".equals(options.environment)) {
        val clientConfiguration = new ClientConfiguration()
          .withConnectionTimeout(options.awsConnectionTimeout)
          .withSocketTimeout(options.awsSocketTimeout)
          .withTcpKeepAlive(true)
        awsS3Client = S3Client.getAWSConnection(options.awsS3AccessKey, options.awsS3SecretKey, clientConfiguration)
      } else {
        awsS3Client = S3Client.getAWSConnection
      }
      /** Validate if destination path has any gzip file if so then just skip that date and process next record **/
      LOGGER.info("Validating if the destination folder path is empty: " + dateFolderPath)
      var objectListing: ObjectListing = null
      var listObjectsRequest: ListObjectsRequest = new ListObjectsRequest().withBucketName(options.destinationBucketName).withPrefix(options.s3SignalRootFolder + options.signalType + "/" + dateFolderPath.toString)
      objectListing = awsS3Client.listObjects(listObjectsRequest)
      if (objectListing.getObjectSummaries.size > 0) {
        LOGGER.warn("Record already present at the below location, so skipping the processing of record for the folder path :" + dateFolderPath.toString)
        LOGGER.warn("s3n://" + options.destinationBucketName + "/" + options.s3SignalRootFolder + options.signalType + "/" + dateFolderPath.toString)
        return
      }
      LOGGER.info("Validated the destination folder path :" + dateFolderPath + " and found it to be empty ")
      /** End of validation **/

      /*Selecting all the files under the source path and iterating*/
      counter = 0
      listObjectsRequest = new ListObjectsRequest().withBucketName(options.rawBucketName).withPrefix(options.rawS3UploadRootFolder + dateFolderPath.toString)
      objectListing = awsS3Client.listObjects(listObjectsRequest)
      val rddListOfParquetFileNames = objectListing.getObjectSummaries.asScala.map(_.getKey).toList
      rddListOfParquetFileNames.flatMap{key => { processIndividualParquetFileAndUploadToS3(sparkContext, awsS3Client, options, key, dateFolderPath)
                                                 "COMPLETED Processing=>"+key;
                                                }}
    }catch{
      case ex: Exception =>
        LOGGER.error("Exception occured while processing records for the path " + dateFolderPath)
        LOGGER.error("Exception in Apply Lambda method Message :" + ex.getMessage + "n Stack Trace :" + ex.getStackTrace)
    }finally {
      awsS3Client.shutdown
      LOGGER.info("JOB Complete ")
    }
  }
  def processIndividualParquetFileAndUploadToS3(sparkContext: SparkContext, awsS3Client: AmazonS3Client, options: HendrixHistoricalOfflineProcessorOptions, parquetFilePath:String, dateFolderPath:String ):Unit ={
    try{
      LOGGER.info("Currently Processing the Parquet file: "+parquetFilePath)
      LOGGER.info("Starting to reading Parquet File Start Time: "+System.currentTimeMillis)
      val dataSetString: RDD[String] = ParquetHelper.readParquetData(sparkContext, options, parquetFilePath)
      LOGGER.info("Data Set returned from Parquet file Successful Time: "+System.currentTimeMillis)
      val lambdaSignalRecords: Array[HotelInfoSignal] = dataSetString.map(x => constructUisObject(x))
        .filter(_ != null)
        .map(userInteraction => processIndividualRecords(userInteraction, options))
        .filter(_ != null)
        .collect
      LOGGER.info("Successfully Generated "+lambdaSignalRecords.length+" Signal Records")
      if(lambdaSignalRecords.length > 0) {
        //Write to Paraquet File :Start
        val parquetFileName: String = getFileNameForParquet(dateFolderPath, counter)
        val parquetWriter = ParquetHelper.newParquetWriter(HotelInfoSignal.getClassSchema, dateFolderPath, parquetFileName, options)
        LOGGER.info("Initialized Parquet Writer")
        lambdaSignalRecords.map(signalRecord => parquetWriter.write(signalRecord))
        LOGGER.info("Completed writing the data in Parquet format")
        parquetWriter.close
        //Parquet Write Complete
        /*val avroSignalString = lambdaSignalRecords.mkString("n")
        val sparkSession = SparkSession.builder.getOrCreate
        uploadProceessedDataToS3(sparkSession, awsS3Client, dateFolderPath, avroSignalString, options)
*/      }
    }catch {case ex:Exception =>
      LOGGER.error("Skipping processing of record :"+parquetFilePath+" because of Exception: "+ExceptionUtils.getFullStackTrace(ex))
    }
    LOGGER.info("Completed data processing for file :" + options.rawBucketName + options.rawS3UploadRootFolder + parquetFilePath)
  }
  def uploadProceessedDataToS3(sparkSession:SparkSession, awsS3Client: AmazonS3Client, filePath: String, genericSignalRecords: String, options: HendrixHistoricalOfflineProcessorOptions):Unit ={
    var jsonFile: File = null
    var gzFile: File = null
    try {
      //Building the file name based on the folder accessed
      fileName = getFileName (filePath, counter)
      jsonFile = IOUtil.createS3JsonFile (genericSignalRecords, fileName)
      gzFile =  IOUtil.gzipIt (jsonFile)
      s3Util.uploadToS3(awsS3Client, options.destinationBucketName, options.s3SignalRootFolder + options.signalType + "/" + filePath, gzFile)
      counter += 1 //Incement counter
    } catch {
      case ex: RuntimeException => LOGGER.error ("Exception while uploading file to path :" + options.s3SignalRootFolder + options.signalType + "/" + filePath + "/" + fileName)
        LOGGER.error ("Stack Trace for S3 Upload :" + ExceptionUtils.getFullStackTrace(ex))
    } finally {
      //Cleaning the temp file created after upload to s3, we can create a temp dir if required.
      jsonFile.delete
      gzFile.delete
    }
  }

  def processIndividualRecords(userInteraction: UserInteraction, options: HendrixHistoricalOfflineProcessorOptions): HotelInfoSignal ={
    try {
      //Applying lambda for the indivisual UserInteraction
      val list: List[GenericRecord] = prepareUisObjectAndApplyLambda (userInteraction, options)
      if (list.nonEmpty) return GenericLambdaMapper.populateSignalRecord (list.head, userInteraction, options.signalType)
    } catch { case ex: Exception => LOGGER.error ("Error while creating signal record from UserInteraction for Singal Type :"+ options.signalType +" For Interaction "+userInteraction.toString)
      LOGGER.error ("Stack Trace while processIndividualRecords :" + ExceptionUtils.getFullStackTrace(ex))}
    null
  }

  /** This method is used to prepare the exact file name which has processed date and the no of files counter **/
  def getFileName(filePath : String, counter : Int): String = {
    filePath.replace("/","-")+"_"+counter+"_"+random.alphanumeric.take(5).mkString+".json"
  }
  /** This method is used to prepare the exact file name which has processed date and the no of files counter **/
  def getFileNameForParquet(filePath : String, counter : Int): String = {
    filePath.replace("/","-")+"_"+counter+"_"+random.alphanumeric.take(5).mkString+".parquet"
  }

}

package com.expedia.www.util
import com.expedia.www.options.HendrixHistoricalOfflineProcessorOptions
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.{AvroParquetWriter, AvroSchemaConverter}
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import org.apache.parquet.schema.MessageType
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
/**
  * Created by prasubra on 2/17/17.
  */
object ParquetHelper {
  val LOGGER:Logger = LoggerFactory.getLogger("ParquetHelper")
  def newParquetWriter(signalSchema: Schema, folderPath:String, fileName:String, options:HendrixHistoricalOfflineProcessorOptions): ParquetWriter[GenericRecord] = {
    val blockSize: Int = 256 * 1024 * 1024
    val pageSize: Int = 64 * 1024
    val compressionCodec = if (options.parquetCompressionToGzip) CompressionCodecName.GZIP else CompressionCodecName.UNCOMPRESSED
    val path: Path = new Path("s3n://" + options.destinationBucketName + "/" + options.parquetSignalFolderName + options.signalType + "/" + folderPath + "/" + fileName);
    val parquetSchema: MessageType = new AvroSchemaConverter().convert(signalSchema);
    // var writeSupport:WriteSupport = new AvroWriteSupport(parquetSchema, signalSchema);
    //(path, writeSupport, compressionCodec, blockSize, pageSize)
    //var parquetWriter:ParquetWriter[GenericRecord] = new ParquetWriter(path, writeSupport, compressionCodec, blockSize, pageSize);
    if ("sandbox".equals(options.environment)) {
      val hadoopConf = new Configuration
      hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
      hadoopConf.set("fs.s3n.awsAccessKeyId", options.awsS3AccessKey)
      hadoopConf.set("fs.s3n.awsSecretAccessKey", options.awsS3SecretKey)
      hadoopConf.set("fs.s3n.maxRetries", options.awsFileReaderRetry)
      AvroParquetWriter.builder(path)
        .withSchema(signalSchema)
        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
        .withCompressionCodec(compressionCodec)
        .withConf(hadoopConf)
        .build()
    } else {
      AvroParquetWriter.builder(path)
        .withSchema(signalSchema)
        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
        .withCompressionCodec(compressionCodec)
        .withPageSize(pageSize)
        .build()
    }
  }
  def readParquetData(sc: SparkContext, options: HendrixHistoricalOfflineProcessorOptions, filePath: String): RDD[String] = {
    val filePathOfParquet = "s3n://"+options.rawBucketName+"/"+ filePath
    LOGGER.info("Reading Parquet file from path :"+filePathOfParquet)
    val sparkSession  = SparkSession.builder.getOrCreate
    val dataFrame = sparkSession.sqlContext.read.parquet(filePathOfParquet)
    //dataFrame.printSchema()
    dataFrame.toJSON.rdd
  }
}

首先,您确实应该通过最小的代码示例来提高问题。真的很难看到代码中发生了什么...

将RDD的所有元素收集到驱动程序上的单个RDD中。如果您的RDD很大,那么这当然会花费大量时间(如果内容不适合驾驶员的主内存,也许会导致OutOfMemeoryError)。

您可以使用Parquet直接编写Dataframe/Dataset的内容。这肯定会更快,更可扩展。

使用s3a://urls。s3n//有一个确实杀死兽人/帕quet性能的错误,并且已由s3a现在

最新更新