我是Flink的新手,我需要从Kafka中读取数据,通过使用一些API有条件地丰富这些数据(如果记录属于X类(并写入S3。
我用上面的逻辑制作了一个helloworldFlink应用程序,它的工作原理很有魅力。
但是,我用来丰富的API没有100%的正常运行时间SLA,所以我需要设计一些具有重试逻辑的东西。
以下是我发现的选项,
选项1(进行指数重试,直到我从API得到响应,但这会阻塞队列,所以我不喜欢这个
选项2(如果API关闭,则再使用一个主题(称为主题失败(并将其发布到主题失败。这样它就不会阻塞实际的主队列。我还需要一个工作人员来处理队列主题失败的数据。同样,如果API长时间关闭,则必须将此队列用作循环队列。例如,从队列主题失败中读取消息,如果消息未能推送到称为主题失败的同一队列,则尝试丰富消息,并使用队列主题失败的下一条消息。
我更喜欢选项2,但完成这项任务似乎并不容易。是否有任何标准的Flink方法可用于实施选项2?
这是从微服务迁移时发生的一个相当常见的问题。正确的解决方案是将查找数据也保存在Kafka或一些数据库中,这些数据库可以作为附加源集成在同一个Flink应用程序中。
如果您无法做到这一点(例如,API是外部的,或者数据无法轻松映射到数据存储(,那么这两种方法都是可行的,并且它们具有不同的优势。
1( 将允许您保留输入事件的顺序。如果下游应用程序希望有序,则需要重试。
2( 常见的术语是死信队列(尽管更常用于无效记录(。在Flink中有两种简单的集成方法,要么有一个单独的源代码,要么使用一个源代码的主题模式/列表。
您的拓扑结构如下所示:
Kafka Source - Async IO /-> Filter good -> S3 sink
+-> Union -> with timeout -+
Kafka Source dead -/ (for API call!) -> Filter bad -> Kafka sink dead