我有一个用例,其中有关传感器的事件信息连续插入MySQL。我们需要每1或2分钟在Kafka主题中进行一些处理。
我正在使用Spark将此信息发送到Kafka主题并在Phoenix表中维护CDC。
我目前面临的问题是消息订购,我需要在上升时间戳中发送这些消息,以结束系统KAFKA主题(其中有1个分区)。但是,由于超过1个SPARK DATAFRAME分区,大多数消息顺序丢失了,同时将信息发送到Kafka主题。
目前作为解决方法,我正在1点重新分配我的数据框架,以维护消息订购,但这不是一个长期解决方案,因为我正在丢失Spark分布式计算。
如果你们围绕此问题有更好的解决方案设计,请建议。
我能够按照升级时间戳实现消息顺序,以通过赔偿我的数据来扩展我的数据,并在分区中应用排序。
val pairJdbcDF = jdbcTable.map(row => ((row.getInt(0), row.getString(4)), s"${row.getInt(0)},${row.getString(1)},${row.getLong(2)},${row. /*getDecimal*/ getString(3)},${row.getString(4)}"))
.toDF("Asset", "Message")
val repartitionedDF = pairJdbcDF.repartition(getPartitionCount, $"Asset")
.select($"Message")
.select(expr("(split(Message, ','))[0]").cast("Int").as("Col1"),
expr("(split(Message, ','))[1]").cast("String").as("TS"),
expr("(split(Message, ','))[2]").cast("Long").as("Col3"),
expr("(split(Message, ','))[3]").cast("String").as("Col4"),
expr("(split(Message, ','))[4]").cast("String").as("Value"))
.sortWithinPartitions($"TS", $"Value")