我正试图在mysql中使用模式时间戳,但当我这样做时,它不会在我的kafka队列中创建任何主题,也没有错误日志。
以下是我正在使用的连接器属性,
{
"name": "jdbc_source_mysql_reqistrations_local",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"tasks.max": "5",
"connection.url": "jdbc:mysql://localhost:3306/prokafka?zeroDateTimeBehavior=ROUND&user=kotesh&password=kotesh",
"poll.interval.ms":"100000000",
"query": "SELECT Language, matriid, DateUpdated from usersdata.user",
"mode": "timestamp",
"timestamp.column.name": "DateUpdated",
"validate.non.null": "false",
"batch.max.rows":"10",
"topic.prefix": "mysql-local-"
}
}
启动:
./bin/confluent load jdbc_source_mysql_registration_local -d /home/prokafka/config-json/kafka-connect-jdbc-local-mysql.json
{
"name": "jdbc_source_mysql_reqistrations_local",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"tasks.max": "5",
"connection.url": "jdbc:mysql://localhost:3306/prokafka?zeroDateTimeBehavior=ROUND&user=kotesh&password=kotesh",
"poll.interval.ms": "100000000",
"query": "SELECT Language, matriid, DateUpdated from usersdata.users",
"mode": "timestamp",
"timestamp.column.name": "DateUpdated",
"validate.non.null": "false",
"batch.max.rows": "10",
"topic.prefix": "mysql-local-",
"name": "jdbc_source_mysql_reqistrations_local"
},
"tasks": [
{
"connector": "jdbc_source_mysql_reqistrations_local",
"task": 0
}
],
"type": null
}
SQLException:Java堆空间
似乎您加载的数据太多,Connect无法处理,必须增加堆大小
例如,将其增加到6GB(或更多(
我还没有尝试过使用Confluent CLI来实现这一点,但根据代码,这可能适用于
confluent stop connect
export CONNECT_KAFKA_HEAP_OPTS="-Xmx6g"
confluent start connect
如果你在这台机器上的内存有限,那么从Mysql数据库、Kafka代理、Zookeeper、Schema Registry等中单独运行Connect。