我正在使用 Flink 来读取和写入来自不同 Kafka 主题的数据。 具体来说,我使用的是FlinkKafkaConsumer和FlinkKafkaProducer。
我想知道是否有可能根据程序中的逻辑或记录本身的内容将我正在读取和写入的 Kafka 主题更改为"动态"。
例如,如果读取了具有新字段的记录,我想创建一个新主题并开始将该字段的记录转移到新主题。
谢谢。
如果你的主题遵循通用命名模式,例如,"topic-n*",你的 Flink Kafka 使用者可以自动读取 "topic-n1"、"topic-n2", ...等等,因为它们被添加到卡夫卡中。
Flink 1.5 (FlinkKafkaConsumer09( 添加了对基于正则表达式的动态分区发现和主题发现的支持。这意味着 Flink-Kafka 使用者可以获取新的 Kafka 分区,而无需重新启动作业,同时保持恰好一次的保证。
接受订阅模式的使用者构造函数:链接。
更多地考虑需求,
第一步是 - 您将从一个主题开始(为简单起见(,并根据提供的数据在运行时生成更多主题,并将相应的消息定向到这些主题。这是完全可能的,不会是一个复杂的代码。使用 ZkClient API 检查主题名称是否存在,如果不存在,则创建一个具有新名称的模型主题,并开始通过与此新主题绑定的新生产者将消息推送到其中。您无需重新启动作业即可生成指向特定主题的消息。
您的初始消费者成为生产者(针对新主题(+消费者(旧主题(
第二步是 - 您想为新主题使用消息。一种方法可能是完全催生一份新工作。您可以通过最初创建线程池并为其提供参数来执行此操作。
再次要更加小心,在出现循环错误的情况下,更多的自动化可能会导致集群过载。考虑一下,如果输入数据不受控制或只是脏污,一段时间后创建过多主题的可能性。如上文评论中所述,可能会有更好的架构方法。