Spark结构化流连接csv文件流和速率流过多的时间每批



我有rate和csv文件流连接rat值和csv文件id:

def readFromCSVFile(path: String)(implicit spark: SparkSession): DataFrame =  {
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("value1", LongType, nullable = false) ::
StructField("another", DoubleType, nullable = false) :: Nil)
val spark: SparkSession = SparkSession
.builder
.master("local[1]")
.config(new SparkConf().setIfMissing("spark.master", "local[1]")
.set("spark.eventLog.dir", "file:///tmp/spark-events")
).getOrCreate()
spark
.readStream
.format("csv")
.option("header", value=true)
.schema(schema)
.option("delimiter", ",")
.option("maxFilesPerTrigger", 1)
//.option("inferSchema", value = true)
.load(path)
}
val rate = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.option("numPartitions", 10)
.load()
.withWatermark("timestamp", "1 seconds")
val cvsStream=readFromCSVFile(tmpPath.toString)
val cvsStream2 = cvsStream.as("csv").join(rate.as("counter")).where("csv.id == counter.value").withWatermark("timestamp", "1 seconds")
cvsStream2
.writeStream
.trigger(Trigger.ProcessingTime(10))
.format("console")
.option("truncate", "false")
.queryName("kafkaDataGenerator")
.start().awaitTermination(300000)

CSV文件有6行长,但是处理一批文件需要大约100秒:

2021-10-15 23:21:29 WARN  ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 10 milliseconds, but spent 92217 milliseconds
-------------------------------------------
Batch: 1
-------------------------------------------
+---+------+-------+-----------------------+-----+
|id |value1|another|timestamp              |value|
+---+------+-------+-----------------------+-----+
|6  |2     |3.0    |2021-10-15 20:20:02.507|6    |
|5  |2     |2.0    |2021-10-15 20:20:01.507|5    |
|1  |1     |1.0    |2021-10-15 20:19:57.507|1    |
|3  |1     |3.0    |2021-10-15 20:19:59.507|3    |
|2  |1     |2.0    |2021-10-15 20:19:58.507|2    |
|4  |2     |1.0    |2021-10-15 20:20:00.507|4    |
+---+------+-------+-----------------------+-----+

如何优化join操作以更快地处理此批处理?它不应该需要这么多的计算,所以看起来有一种隐藏的水印或其他什么,使批处理等待大约100秒。可以应用哪些选项/属性?

我建议您还没有足够的数据来研究性能。你为什么不把数据放大到50万,看看你是否有问题?现在我担心你没有运行足够的数据来有效地测试系统的性能,并且启动成本没有适当地分摊到数据量上。

是什么显著提高了性能?使用spark.read代替spark.readStream,并将DataFrame保存在内存中:

val dataFrameToBeReturned = spark.read
.format("csv")
.schema(schema)
.option("delimiter", ";")
.option("maxFilesPerTrigger", 1)
.csv("hdfs://" + hdfsLocation + homeZeppelinPrefix + "/generator/" + shortPath)
.persist(StorageLevel.MEMORY_ONLY_SER)

最新更新