Kafka Connect 'ExtractField$Key' SMT 导致'Unknown Field'错误



我有一个Debezium连接器的设置(在ksqlDB-server上运行),它是从SQL Server CDC表流到Kafka主题的值。我试图转换我的消息从JSON到整数值的关键。我收到的示例键是这样的:{"InternalID":11117},我想把它表示成一个数字11117。根据Kafka Connect文档,使用ExtractField SMT应该相当容易。然而,当我配置我的连接器使用此转换时,我收到一个错误Caused by: java.lang.IllegalArgumentException: Unknown field: InternalID

连接器配置:

CREATE SOURCE CONNECTOR properties_sql_connector WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector', 
'database.hostname'= 'propertiessql', 
'database.port'= '1433', 
'database.user'= 'XXX', 
'database.password'= 'XXX', 
'database.dbname'= 'Properties', 
'database.server.name'= 'properties', 
'table.exclude.list'= 'dbo.__EFMigrationsHistory', 
'database.history.kafka.bootstrap.servers'= 'kafka:9091', 
'database.history.kafka.topic'= 'dbhistory.properties',
'key.converter.schemas.enable'= 'false',
'transforms'= 'unwrap,extractField',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.delete.handling.mode'= 'none',
'transforms.extractField.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractField.field'= 'InternalID',
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter');

错误详细信息:

--------------------------------------------------------------------------------------------------------------------------------------
0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223) 
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:258)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unknown field: InternalID
at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)       
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207) 
... 11 more

对于这个转换失败的原因有什么想法吗?我是不是漏掉了什么构型?当extractField变换被删除时,我的消息的键看起来像上面那样:{"InternalID":11117}

默认情况下,当您为任何连接器(包括Debezium)配置smt时,转换将应用于连接器发出的每个记录。这包括更改事件消息,这些消息可能没有检索到的数据,也可能没有必要的字段。要解决这个问题,您需要有选择地将您的SMT应用于Debezium使用SMT谓词生成的更改事件消息的特定子集。

官方文档在这里。

在您的特定情况下,您可以将SMT仅应用于特定数据库表的输出主题,它看起来像这样:

# Create a predicate that matches your output 
predicates: topicNameMatch
predicates.topicNameMatch.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.topicNameMatch.pattern: *output topic name goes here*
# Your logic to extract the field from the key
transforms.extractField.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractField.field: InternalID
# This references the predicate above
transforms.extractField.predicate: topicNameMatch

如果主题名称匹配不适合您,那么在上面列出的文档中还有其他谓词。

为了从JSON中提取命名字段,您需要该转换器的schemas.enable = 'true'

对于任何不是来自Debezium的数据,这将需要JSON有一个模式作为事件的一部分。

或者,如果你正在使用Schema Registry,切换到使用它的其他转换器,它应该可以工作。

相关内容

最新更新