将消息从一个Kafka群集到另一个Kafka集群



我当前正在尝试轻松地将消息从一个kafka群集上的一个主题流到另一个主题到另一个主题(远程 -> local cluster)。
这个想法是立即使用kafka-streams,以便我们不需要在本地群集上复制实际消息,而只需要将Kafka-streams处理的"结果"加入我们的Kafka-Topics。

因此,假设WordCount演示是在我自己的一台PC上的一台Kafka-Insance上。我在本地机器上也有一个kafka-nastance。
现在,我想让WordCount演示在主题("远程")上运行,其中包含句子应计算的句子。
但是,计数应写入我本地系统上的主题,而不是"远程"主题。

kafka-streams api可以使用类似的东西吗?
例如。

val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic", 
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
    .groupBy((_, word) => word)
    .count("word-counts")
wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)
val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()

非常感谢
-Tim

kafka流仅用于单个群集。

解决方法是使用foreach()或类似的方法,并实例化您自己的KafkaProducer写入目标群集。请注意,您自己的生产者必须使用Sync写入!否则,如果失败,您可能会丢失数据。因此,这不是一个非常表现的解决方案。

最好只写入源群集并将数据复制到目标群集。请注意,您很可能会在源群集中使用输出主题的保留时间更短,因为实际数据还是在目标群集中存储的保留时间较长。这允许,您限制了源群集上所需的存储。

edit(@quickinsights从下面回复以下评论)

如果您的kafka流服务比保留更长的时间

怎么办

这似乎是一个正交的问题,可以为任何设计提出。保留时间应根据您的最大停机时间设置,以避免一般数据丢失。请注意,由于应用程序从/从/到源群集读取/写入,并且源群集输出主题可能是较小的保留时间进行配置,因此如果应用程序降低,则不会发生任何不好的情况。输入主题不会处理,也不会产生新的输出数据。您可能只担心将复制管道降低到目标群集的情况下 - 您应该相应地将输出主题的保留时间设置为确保您不丢失任何数据。

它也使您的写作回到kafka。

是。它还增加了磁盘上的存储足迹。在应用程序弹性和运行时性能与群集负载之间,(一如既往)的权衡(一如既往)。你的选择。我个人建议您采用上面指出的更具弹性的选项。比处理应用程序代码中的所有弹性边缘案例,比例扩大kafka群集要容易。

似乎超级效率低

这是个人判断。这是一个权衡,没有客观的对与错。

相关内容

  • 没有找到相关文章

最新更新