Kafka Connect SMT ApplyWithSchema需要结构错误



我已经部署了来自汇流的样本https://github.com/confluentinc/kafka-connect-insert-uuid用于添加简单的UUID字段,但我得到一个错误,它需要结构。我正在Debezium MySQLConnector 中应用此功能

Only Struct objects supported for [adding UUID to record], found: 
java.lang.Stringntat org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)

什么是只按原样返回记录的极简主义applyWithSchema方法?我正在尝试调试,需要一个HelloWorld SMT,没有任何错误,必须应用包括applyWithSchema 在内的方法

我认为这可能是最简单的应用程序,但需要applyWithSchema

Override
public R apply(R record) {

return record.newRecord(
record.topic(), record.kafkaPartition(),
record.keySchema(), record.key(),
record.valueSchema(), record.value(),
record.timestamp()
);
}
Override
public R applyWithSchema(R record) {
// what is minimal transform here??
}

我现在只需要这些函数在没有错误的情况下运行,因为我只对record.headers((.add((进行了更改。

下面是给出错误的applyWithSchema方法:

private R applyWithSchema(R record) {
// FAILS HERE!
final Struct value = requireStruct(operatingValue(record), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(value.schema());
if(updatedSchema == null) {
updatedSchema = makeUpdatedSchema(value.schema()); 
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : value.schema().fields()) {
// updatedValue.put(field.name(), value.get(field));
}
//updatedValue.put(fieldName, getRandomUuid());
return newRecord(record, updatedSchema, updatedValue);
}

大多数情况下,问题在于连接器转换配置顺序,连接器在定义时将按同步顺序应用这些SMT操作。如果您希望自定义SMT具有结构或本机信息,请确保以前的SMT不会改变自然状态。

"transforms":"SMT1, SMT2, CustomSMT3",

相关内容

  • 没有找到相关文章

最新更新