通过Flink动态地在Kafka中写入多个主题异常处理



我目前正在从单个kafka主题读取数据,并根据数据本身写入动态主题。我已经实现了以下代码(这是基于数据的accountId动态选择主题),它工作得很好:

class KeyedEnrichableEventSerializationSchema(schemaRegistryUrl: String)
extends KafkaSerializationSchema[KeyedEnrichableEvent]
with KafkaContextAware[KeyedEnrichableEvent] {
private val enrichableEventClass = classOf[EnrichableEvent]
private val enrichableEventSerialization: AvroSerializationSchema[EnrichableEvent] =
ConfluentRegistryAvroSerializationSchema.forSpecific(enrichableEventClass, enrichableEventClass.getCanonicalName, schemaRegistryUrl)
override def serialize(element: KeyedEnrichableEvent, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] =
new ProducerRecord("trackingevents."+element.value.getEventMetadata.getAccountId, element.key, enrichableEventSerialization.serialize(element.value))
override def getTargetTopic(element: KeyedEnrichableEvent): String = "trackingevents."+element.value.getEventMetadata.getAccountId

问题/关注的是,如果主题不存在,我在JobManager UI中有一个关于主题不存在的异常,整个处理被停止,直到我创建主题。也许这是推荐的行为,但有没有其他选择,比如我们可以把数据放在不同的主题中,然后继续处理,而不是停止整个处理。或者至少在flink中有任何可用的通知机制,可以立即通知处理停止。

我认为在这种情况下,最简单的解决方案是在Kafka中启用自动创建主题,这样问题就彻底解决了。

然而,如果出于某种原因,这在你的情况下是不可能做到的,我认为最简单的解决方案是创建一个ProcessFunction,它将使用KafkaConsumerAdminClient保持与Kafka的连接,并定期检查将用于给定消息的主题是否存在。然后,您可以将数据推送到sideOutput或尝试创建缺少的主题。

相关内容

  • 没有找到相关文章

最新更新