如何使用debezium更改数据捕获捕获在mysql中捕获数据,并在kafka connect中使用jdbc sink?



我遇到使用debezium更改数据捕获在mysql中捕获数据并使用kafka connect jdbc sink将其消耗到另一个mysql的问题。

因为 debezium 为 kafka 主题生成的模式和有效负载与 kafka connect jdbc sink 期望的模式不兼容。

当 jdbc sink 想要在另一个 mysql 中使用数据并创建记录时,我遇到异常。

我应该如何解决这个问题?

Debezium 生成的消息结构确实与 JDBC 接收器预期的不同。JDBC 接收器期望消息中的每个字段对应于行中的一个字段,因此消息对应于行的"之后"状态。OTOH,Debezium MySQL连接器执行变更数据捕获,这意味着它不仅仅是包括行的最新状态。具体而言,连接器输出的邮件中,该键包含行的主键或唯一键列,以及包含信封结构的邮件值,其中:

  • 操作,例如是插入、更新还是删除
  • 发生更改之前行的状态(插入时为 null)
  • 发生更改行的状态(删除时为 null)
  • 特定于源的信息,包括服务器元数据、事务 ID、数据库和表名称、事件发生时的服务器时间戳以及有关发现事件的位置的详细信息等。
  • 连接器生成事件的时间戳

解决这种差异的最简单方法是使用 Kafka 0.10.2.x(当前最新版本是 0.10.2.1)和 Kafka Connect 的新单消息转换 (SMT)。每个 Kafka Connect 连接器都可以配置零个或多个 SMT 链,这些 SMT 可以在将消息写入 Kafka 之前转换源连接器的输出,或者在将消息作为输入传递到接收器连接器之前转换从 Kafka 读取的消息。SMT 故意非常简单,处理单个消息,绝对不应该访问外部资源或维护任何状态,因此不能替代 Kafka Streams 或其他流处理系统,这些系统功能更强大,可以加入多个输入流,并且可以执行非常复杂的操作并跨多个消息维护状态。

如果您使用 Kafka Streams进行任何类型的处理,那么您应该考虑在 Kafka Streams 应用程序中操作消息结构。如果没有,那么 SMT 是解决问题的好方法。实际上,有两种方法可以使用SMT来调整消息结构。

第一个选项是使用带有 Debezium 连接器的 SMT 来提取/保留行的"之后"状态,并在将行写入 Kafka 之前丢弃所有其他信息。当然,你会在 Kafka 主题中存储更少的信息,并丢弃一些将来可能有价值的 CDC 信息。

第二个也是 IMO 首选选项是将源连接器保留原样,并将所有 CDC 消息保留在 Kafka 主题中,但随后使用带有接收器连接器的 SMT 来提取/保留行的"之后"状态,并在将消息传递到 JDBC 接收器连接器之前放弃所有其他信息。您可以使用 Kafka Connect 中包含的现有 SMT 之一,但您可以考虑编写自己的 SMT 来执行您想要的操作。

最新更新