spark aggregateByKey 添加额外的空行



我有一个用 Scala 编写的 Spark 流应用程序,运行在 CDH 中。应用程序从 Kafka 读取数据并将数据写入 HDFS。在将数据写入 HDFS 之前,我执行 partitionBy,因此数据被分区写入。这是代码:

//Some code
val ssc = new StreamingContext(sc, Seconds(1))
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String,String](topics, kafkaParams))
val sparkExecutorsCount = sc.getConf.getInt("spark.executor.instances", 1)
//Some code
stream.foreachRDD { rdd =>
if(!rdd.isEmpty()) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val data = rdd.map(kafkaData => (getKey(kafkaData.value()), kafkaData.value()))
val columns = Array("key", "value")
val addOp = (record1: String, record2:String) => record1 + "n" + record2
val mergeOp = (record1: String, record2:String) => record1 + record2
val initialValue = ""
val out = data.aggregateByKey(initialValue)(addOp, mergeOp)
out.toDF(columns: _*).coalesce(sparkExecutorsCount)
.write.mode(SaveMode.Append)
.partitionBy("key").text(MY_PATH)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} else {
//handle empty RDD
}
}

我的期望是此代码生成以下输出(ls -l命令的示例(:

> MY_PATH/key=1
> MY_PATH/key=1/file1.txt
> MY_PATH/key=1/file2.txt
> MY_PATH/key=1/file3.txt
> MY_PATH/key=2
> MY_PATH/key=2/file1.txt
> MY_PATH/key=2/file2.txt
> MY_PATH/key=2/file3.txt

在每个文本文件中,将逐行包含来自数据帧的条目。

事实上,这实际上正在发生。唯一的问题是,即使我initalValue=""initialValue也总是出现在每个文件中的第一行,因此我总是在每个文件中得到额外的空行。

这个额外的空行对我来说是一个巨大的问题,我必须避免它。其中一个选项是使用groupByKey而不是aggregateByKey,但这groupByKey会导致集群中出现更多洗牌,我想避免这种情况。

请告知如何防止每个写入文件中出现额外的空行。

TL;DR只需使用groupByKey后跟mapValues(_.mkString("n"))

两件事:

  • initialValue可以添加任意次数(实际上 #partitions(。这意味着每个分区都将以空字符串开头,后跟换行符。您可以检查record1record2是否为空,addOpmergeOp,如果是,则跳过n

  • 此外,您的声明:

    但是 groupByKey 会导致集群中出现更多洗牌,我想避免这种情况。

    并不真正准确。您拥有的代码不会显着(如果有的话(减少数据量。根据密钥的不同,它实际上可以增加它。

    例如,请参阅:

    • groupByKey 是否比 reduceByKey 更受欢迎。
    • Scala vs Python 的 Spark 性能(解释了为什么像这里使用的代码效率非常低(。

最新更新