这真的很奇怪。 在我的数据库中,当我执行此 SQL 时:
select count(*) from mySchema.myTable where some_col = '2'
其结果是 :26000000
当连接器配置中的查询设置为此并运行连接器时:
QUERY="select * from mySchema.myTable where some_col = '2' order by primary_key, sec_key limit 26000000"
连接器工作没有问题,我能够使用所有消息。
但是,当连接器配置中的 QUERY 设置为此并运行连接器时:
QUERY="select * from mySchema.myTable where some_col = '2' order by primary_key, sec_key"
连接器给了我这个例外:
[2019-12-23 22:51:16,671] ERROR WorkerSourceTask{id=HIVE_JDBC_BATCH_SOURCE-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
以下是配置:
[2019-12-23 22:51:00,681] 信息连接器配置值:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = HIVE_JDBC_BATCH_SOURCE
tasks.max = 8
transforms = [createKey, extractString]
value.converter = class org.apache.kafka.connect.json.JsonConverter
[2019-12-23 22:51:00,681] 信息扩充连接器配置值:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class org.apache.kafka.connect.storage.StringConverter
name = HIVE_JDBC_BATCH_SOURCE
tasks.max = 8
transforms = [createKey, extractString]
transforms.createKey.fields = [mySchema.primary_key]
transforms.createKey.type = class org.apache.kafka.connect.transforms.ValueToKey
transforms.extractString.field = mySchema.primary_key
transforms.extractString.type = class org.apache.kafka.connect.transforms.ExtractField$Key
value.converter = class org.apache.kafka.connect.json.JsonConverter
[2019-12-23 22:51:00,686] 信息字符串转换器配置值:
converter.encoding = UTF8
converter.type = key
[2019-12-23 22:51:00,686] 信息 Json转换器配置值:
converter.type = value
schemas.cache.size = 1000
schemas.enable = false
[2019-12-23 22:51:00,701] 信息生产者配置值:
acks = all
batch.size = 100000
bootstrap.servers = xxx
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 2147483647
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 10
max.block.ms = 9223372036854775807
max.in.flight.requests.per.connection = 1
max.request.size = 10485760
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 310000
retries = 2147483647
retry.backoff.ms = 100
[2019-12-23 22:51:00,810] 信息 Jdbc源任务配置值:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 5
connection.backoff.ms = 60000
connection.password = null
connection.url = xxx
connection.user = null
db.timezone = UTC
dialect.name =
incrementing.column.name =
mode = bulk
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 86400000
query = select * from mySchema.myTable where some_col = '2' order by primary_key, sec_key
quote.sql.identifiers = ALWAYS
schema.pattern = mySchema
table.blacklist = []
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = []
tables = []
timestamp.column.name = []
timestamp.delay.interval.ms = 0
topic.prefix = my_topic
validate.non.null = false
数据库表中的示例数据:
primary_key 2C58131FF9680D5632CB1FDC27675490
sec_key 3EE
year_cd 1
year_month 201911
content_txt 2016-10-072016-10-12MEMOREX1234500172409430291.52
连接器生成的示例消息:
{"mySchema.primary_key": "2C58131FF9680D5632CB1FDC27675490", "mySchema.sec_key": "3EE", "mySchema.year_cd": "1", "mySchema.year_month": ">201911", "mySchema.content_txt": "2016-10-072016-10-12MEMOREX1234500172409430291.52"}
我能够找到错误(IMO,JDBC SourceConnector 中的错误(
当查询具有"LIMIT 子句"时,连接器生成的示例消息:
{"mySchema.primary_key": "2C58131FF9680D5632CB1FDC27675490", "mySchema.sec_key": "3EE", "mySchema.year_cd": "1", "mySchema.year_month": ">201911", "mySchema.content_txt": "2016-10-072016-10-12MEMOREX1234500172409430291.52"}
当查询没有"LIMIT 子句"时,连接器生成的示例消息:
{"primary_key": "2C58131FF9680D5632CB1FDC27675490", "sec_key": "3EE", "year_cd": "1", "year_month": ">201911", "content_txt": "2016-10-072016-10-12MEMOREX1234500172409430291.52"}
当设置是这样的: transforms.extractString.field=mySchema.primary_key 连接器将引发 NullPointerException,因此我将设置更改为: transforms.extractString.field=primary_key 它就像一个魅力。