KafkaConnect:JDBC源连接器:创建具有多个分区的Topic



我已经创建了一个从MySQL轮询数据的示例管道,并写入HDFS(以及配置单元表)。

由于我的要求,我需要为每个数据库表创建Source+Connector对。以下是我发布的源和接收器连接器的配置设置。

我可以看到一个主题是用一个分区创建的,复制因子为1。

主题创建应该是自动的,这意味着我不能在创建源+接收器对之前手动创建主题。

我的问题:

1) 在创建源连接器时,是否有配置分区数量和复制因子的方法?

2) 如果可以创建多个分区,那么源连接器使用什么样的分区策略?

3) 应为源和接收器连接器创建正确数量的工作人员?

源连接器:

{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp+incrementing",
"timestamp.column.name": "modified",
"incrementing.column.name": "id",
"topic.prefix": "jdbc_var_cols-",
"tasks.max": "1",
"poll.interval.ms": "1000",
"query": "SELECT id,name,email,department,modified FROM test",
"connection.url": "jdbc:mariadb://127.0.0.1:3306/connect_test?user=root&password=confluent"
}

接收器连接器:

{
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"topics.dir": "/user/datalake/topics-hive-var_cols3",
"hadoop.conf.dir": "/tmp/quickstart/hadoop/conf",
"flush.size": "5",
"schema.compatibility": "BACKWARD",
"connect.hdfs.principal": "datalake@MYREALM.LOCAL",
"connect.hdfs.keytab": "/tmp/quickstart/datalake.keytab",
"tasks.max": "3",
"topics": "jdbc_var_cols-",
"hdfs.url": "hdfs://mycluster:8020",
"hive.database": "kafka_connect_db_var_cols3",
"hdfs.authentication.kerberos": "true",
"rotate.interval.ms": "1000",
"hive.metastore.uris": "thrift://hive_server:9083",
"hadoop.home": "/tmp/quickstart/hadoop",
"logs.dir": "/logs",
"format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
"hive.integration": "true",
"hdfs.namenode.principal": "nn/_HOST@MYREALM.LOCAL",
"hive.conf.dir": "/tmp/quickstart/hadoop/conf"
}

1)在创建源连接器时,是否有方法配置分区数量和复制因子?

不是来自Connect,不是

听起来您在broker上启用了自动主题创建,所以它使用默认设置。理想情况下,这应该在生产环境中禁用,因此您必须提前创建主题。

源连接器使用什么样的分区策略?

取决于哪个连接器以及代码的编写方式(即是否/如何生成Record的密钥)。比方说,对于JDBC连接器,密钥可能是数据库表的主键。它将使用DefaultPartitioner进行散列。我认为Connect不允许您在每个连接器级别指定自定义分区器。如果键为null,那么消息将分布在所有分区上。

3)应为源和接收器连接器创建正确数量的工作程序?

同样,取决于来源。对于JDBC,每个表将有一个任务。

不过,对于接收器,任务最多只能达到要下沉的主题的分区数(与所有使用者组一样)。


此外,您通常会将Connect集群与数据库(和Hadoop集群)分开运行

最新更新