写UPDATE_BEFORE消息来更新卡夫卡



我在https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/.

上面写着:

作为接收器,upstart kafka连接器可以使用变更日志流。它将写入INSERT/UPDATE_AFTER数据作为正常的Kafka消息值,并将DELETE数据写入具有空值的Kafka消息(指示键的tombstone(。

它没有提到,如果UPDATE_BEFORE消息被写入upstart kafka,那么会发生什么?

在同一链接中(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/#full-示例(,文档提供了一个完整的示例:

INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

通过上面的INSERT/SELECT操作,将生成INSERT/UPDATE_BEFORE/UPDATE_AFTER消息,并将其发送到upsert-kafka接收器,我想问当upsert-kafka遇到UPDATE_BEFORE消息时会发生什么。

源代码上的注释

/ /   partial code
// During the Upsert mode during the serialization process, if the operation is executed is Rowkind.delete or Rowkind.Update_before                 
// set it to NULL (corresponding to Kafka tomb news)

https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=165221669#content/view/165221669

Upsert-kafka接收器不需要planner发送UPDATE_BEFORE消息(在某些情况下,planner仍然可以发送UPDATE_BEFORE消息(,并且会将INSERT/UPDATE_AFTER消息作为具有关键部分的普通kafka记录写入,并将DELETE消息作为具有空值的kafka记录写入(指示密钥的逻辑删除(。Flink将通过主键列值上的分区数据来保证主键上的消息顺序。

Upsert-kafka源是一种变更日志源。变更日志源上的主键语义意味着物化变更日志(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE(在主键约束上是唯一的。Flink假定主键上的所有消息都是按顺序排列的。

实施详细信息由于upstart kafka连接器只生成不包含UPDATE_BEFORE消息的upstart流。但是,有些操作需要UPDATE_BEFORE消息才能正确处理,例如聚合。因此,我们需要有一个物理节点来实现upstart流,并生成带有完整更改消息的变更日志流。在物理操作符中,我们将使用state来知道密钥是否是第一次被看到。操作员将生成INSERT行,或者为上一个图像额外生成UPDATE_BEFORE行,或者生成DELETE行,其中所有列都填充了值。

相关内容

  • 没有找到相关文章

最新更新