我们有一个Flink作业,它具有以下拓扑结构:
source -> filter -> map -> sink
我们在接收器操作员open
-覆盖功能处设置了活动(就绪(状态。在我们获得该状态后,我们发送事件。有时它无法消耗提前发送的事件。
我们想知道我们可以发送不会丢失的数据的确切时间/步骤。
看起来您希望确保不会遗漏任何消息进行处理。Kafka会保留您的消息,因此没有必要只在Flink消费者准备好时才发送消息。您可以通过避免状态消息来简化设计。
任何Kafka使用者(不仅仅是Flink连接器(都会在Kafka服务器中有一个与之相关的偏移量,以跟踪最后一条被消费的消息的id。
来自卡夫卡文档:
Kafka为分区中的每个记录保留一个数字偏移量。这offset充当该分区内的记录的唯一标识符,并且还表示消费者在分区中的位置。对于例如,位于位置5的消费者消费了具有偏移0到4,并且接下来将接收具有偏移5 的记录
在Flink Kafka连接器中,将偏移量指定为提交的偏移量。
OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
这将确保如果重新启动Flink连接器,它将从重新启动前停止的最后一个位置消耗。
如果由于某种原因,偏移量丢失,这将从卡夫卡主题的开头(最早的消息(开始读取。请注意,这种方法将导致您重新处理消息。
你可以探索更多的抵消策略来选择适合你的。
请参阅-https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-偏移