在从有效负载的一部分中的一个字段中创建键时低于异常"value"



smt不使用以下配置,任何想法

"transforms": "createKey",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"column_id",
getting below exception
Exception:    org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:292)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
    at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
    at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

在kafka-connect中还添加了配置: key.converter.schemas.enable=true value.converter.schemas.enable=true,以启用类ValueTokey所需的模式

这可能迟到了,但对我来说,问题很敏感。我的列名是ID,在配置中是ID。因此,下面是对我有用的配置

transforms=createKey
transforms.createKey.fields=ID
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey

相关内容

  • 没有找到相关文章

最新更新