如何从Apache Spark到Kafka主题发送消息



我有一个用例,其中有关传感器的事件信息连续插入MySQL。我们需要每1或2分钟在Kafka主题中进行一些处理。

我正在使用Spark将此信息发送到Kafka主题并在Phoenix表中维护CDC。

我目前面临的问题是消息订购,我需要在上升时间戳中发送这些消息,以结束系统KAFKA主题(其中有1个分区)。但是,由于超过1个SPARK DATAFRAME分区,大多数消息顺序丢失了,同时将信息发送到Kafka主题。

目前作为解决方法,我正在1点重新分配我的数据框架,以维护消息订购,但这不是一个长期解决方案,因为我正在丢失Spark分布式计算。

如果你们围绕此问题有更好的解决方案设计,请建议。

我能够按照升级时间戳实现消息顺序,以通过赔偿我的数据来扩展我的数据,并在分区中应用排序。

val pairJdbcDF = jdbcTable.map(row => ((row.getInt(0), row.getString(4)), s"${row.getInt(0)},${row.getString(1)},${row.getLong(2)},${row. /*getDecimal*/ getString(3)},${row.getString(4)}"))
        .toDF("Asset", "Message")
val repartitionedDF = pairJdbcDF.repartition(getPartitionCount, $"Asset")
        .select($"Message")
        .select(expr("(split(Message, ','))[0]").cast("Int").as("Col1"),
          expr("(split(Message, ','))[1]").cast("String").as("TS"),
          expr("(split(Message, ','))[2]").cast("Long").as("Col3"),
          expr("(split(Message, ','))[3]").cast("String").as("Col4"),
          expr("(split(Message, ','))[4]").cast("String").as("Value"))
        .sortWithinPartitions($"TS", $"Value")

相关内容

  • 没有找到相关文章

最新更新