我正在尝试使用kafka-connect从两个表中获取行。我以这种方式配置了connect-file-source.properties
name=jdbc_source_postgres_foobar_01
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:postgresql://localhost:5432/store?user=postgres&password=root
table.whitelist=author,book
mode=incrementing
incrementing.column.name=id
validate.non.null=false
topics=author,book
topic.prefix=
涉及的表有author
和book
,后者有一个外键指向author
。
然后我注册了一个监听器来消费来自";作者"以及";书;主题,以便将它们插入到另一个数据库中。
@KafkaListener(
topics={"author","book"},
groupId = "foo",
containerFactory = "fooKafkaListenerContainerFactory"
)
public void listenGroupFoo(@Payload PostgresTableRow message) {
System.out.println("Received" + message);
String tableName = message.tableName();
HashMap<String, Object> params = message.params();
insert(tableName, params);
}
当涉及的表彼此之间没有约束时,这一点很好,但在这种情况下,当来自";书;主题在来自"的消息之前被消费;作者";。
例如,我在源数据库中插入作者";George Orwell";用CCD_ 5和书";1984";对于id=37
和authorId=23
,两个消息被推送到Kafka中,一个在";作者"主题和";书;话题如果消息首先从";书;主题,然后从";作者"主题,我得到了一个错误,无法在我的水槽数据库中插入id为37的书,因为不存在id为23的作者。
那么我该如何解决这个问题呢?有没有一种方法可以将多个表推送到一个主题中并授予订单?
在Kafka位于中间的CDC(变更数据捕获(世界中,您面临着一个需要解决的复杂问题。
您希望实现从数据库到Kafka以及从Kafka到另一个数据库的事务一致、有序、一次复制,这样就不会出现您面临的引用完整性问题,即:由于竞争条件。
我建议阅读Robin Moffatt关于CDC和Kafka Connect JDBC连接器的文章,以及Shawn Robertson在Kafka Summit 18上关于这个问题的演讲。
- 不再有思洛存储器:如何将数据库与Apache Kafka和CDC集成
- Kafka Connect Deep Dive–JDBC源连接器
- 事务一致、有序、一次复制从数据库到云中的Kafka并返回--一种利用Kafka提供端到端ACID事务的解决方案
不幸的是,如果没有现成的端到端CDC解决方案,我想你要么需要非常有创意,要么投入大量精力来克服这个问题。