Kafka Connect JDBC接收器连接器给出WorkerLinkTask错误



我使用的Jdbc接收器连接器具有以下配置:

curl -s -X POST http://********:8083/connectors -H "Content-Type: application/json" --data '{
"name":"mysql-sensor-sink-connector-02",
"config": {
"tasks.max":"2",
"batch.size":"1000",
"batch.max.rows":"1000",
"poll.interval.ms":"500",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://**********:3306/device_details",    
"table.name.format":"tb_sensordata",
"topics":"sensor_data_topic",
"connection.user":"*******",
"connection.password":"********",
"insert.mode":"insert",
"auto.create":"true",
"auto.evolve":"true",
"pk.mode":"record_value",
"pk.fields":"packetid",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",  
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false"
}
}' | jq '.'

我得到的错误是:

[2020-04-18 09:42:16,444] INFO ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [cpnode.local.lan:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = confluent.monitoring.interceptor.connector-consumer-mysql-sensor-sink-connector-03-1
compression.type = lz4
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 500
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 10485760
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 10
retry.backoff.ms = 500
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig:347)
[2020-04-18 09:42:16,457] INFO Kafka version: 5.4.0-ce (org.apache.kafka.common.utils.AppInfoParser:117)
[2020-04-18 09:42:16,457] INFO Kafka commitId: ca78a82127cbef3a (org.apache.kafka.common.utils.AppInfoParser:118)
[2020-04-18 09:42:16,458] INFO Kafka startTimeMs: 1587183136457 (org.apache.kafka.common.utils.AppInfoParser:119)
[2020-04-18 09:42:16,458] INFO interceptor=confluent.monitoring.interceptor.connector-consumer-mysql-sensor-sink-connector-03-1 created for client_id=connector-consumer-mysql-sensor-sink-connector-03-1 client_type=CONSUMER session= cluster=dy7maqKlQBysl4HTbJuIEQ group=connect-mysql-sensor-sink-connector-03 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:153)
[2020-04-18 09:42:16,470] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:87)
[2020-04-18 09:42:16,602] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:49)
[2020-04-18 09:42:16,765] ERROR WorkerSinkTask{id=mysql-sensor-sink-connector-03-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:559)
java.lang.NullPointerException
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-04-18 09:42:16,766] ERROR WorkerSinkTask{id=mysql-sensor-sink-connector-03-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
... 10 more
[2020-04-18 09:42:16,766] ERROR WorkerSinkTask{id=mysql-sensor-sink-connector-03-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-04-18 09:42:16,766] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:105)
[2020-04-18 09:42:16,767] INFO Closing connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:113)

主题中的数据采用JSON格式,没有架构。

当运行控制台消费者时,我能够看到如下数据:

kafka-console-consumer --bootstrap-server *********:9092 --topic sensor_data_topic --from-beginning | jq '.'
{
"packetid": 1501,
"macaddress": "F8-27-B9-C7-AB-99",
"readingtypeid": "temperature",
"readingvalue": 70.1,
"setpoint": 77.2,
"threshold": 7,
"readingtime": "2019-05-20 02:46:30",
"logtime": "2019-09-01 19:32:06"
}
{
"packetid": 1502,
"macaddress": "2B-6A-C1-45-86-ED",
"readingtypeid": "temperature",
"readingvalue": 65.21,
"setpoint": 77.06,
"threshold": 7,
"readingtime": "2019-05-17 03:39:18",
"logtime": "2020-04-05 06:37:45"
}

mysql表如下:

+-----------------+-------------+------+-----+-------------------+-------------------+
| Field           | Type        | Null | Key | Default           | Extra             |
+-----------------+-------------+------+-----+-------------------+-------------------+
| packetid        | bigint      | NO   | PRI | NULL              |                   |
| macaddress      | varchar(20) | NO   | MUL | NULL              |                   |
| readingtypeid   | bigint      | NO   | MUL | NULL              |                   |
| readingvalue    | float       | YES  |     | NULL              |                   |
| setpoint        | float       | YES  |     | NULL              |                   |
| threshold       | float       | YES  |     | NULL              |                   |
| lastupdatedtime | timestamp   | NO   |     | NULL              |                   |
| logtime         | timestamp   | YES  |     | CURRENT_TIMESTAMP | DEFAULT_GENERATED |
+-----------------+-------------+------+-----+-------------------+-------------------+

有人能帮我把数据放到MySql中吗。提前谢谢。

PS:我使用的版本是:汇流平台5.4.0

如果要使用接收器连接器,则必须定义模式。这可以使用Avro和Schema Registry或JSON和模式来实现。


如果您想坚持使用JsonConverter,只需确保启用了模式:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",  
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"

相关内容

  • 没有找到相关文章

最新更新