使用KafkaConnect JDBC发布记录错误来源:STRUCT类型无效:类org.apache.avro.gene



我想使用Kafka Connect JDBC源连接器(Postgres)向Kafka发布事件

我有一个发件箱表,我将有效载荷id和有效载荷存储为字节后,使用KafkaAvroSerializer序列化它们。

被序列化的对象是一个avro生成的SpecificRecord类,例如EmployeeCreatedEvent

发件箱表的数据类型:

payload bytea,
payload_id bytea
我已经为Kafka Connect转换器编写了一个自定义SMT。代码将数据、有效负载和payload_id反序列化为"GenericData"。记录">

但是我得到下面的错误:

Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class org.apache.avro.generic.GenericData$Record

我的环境:支流6.0.1中

配置:

key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter

ConnectRecord值有两个元素:subject_id &;Subject和它们是byte[]。我想使用Key=payload_id value=payload

final byte[] subjectId = (byte[]) values.get("subject_id"); 
final byte[] retrievedPayload = (byte[]) values.get("subject"); 
I get the Exception: DataException: Invalid type for STRUCT: class [B

我正在从模式注册表中获取模式,并在创建新的ConnectRecord之前转换为connectSchema。

record.newRecord("mytopic", record.kafkaPartition(), derivedKeySchema, values.get("subject_id"), derivedValueSchema, values.get("subject"), record.timestamp());

我在开始时从模式注册表中检索模式,并在创建新的连接记录时使用它。

全栈跟踪:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:311)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class [B
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:597)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:344)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:87)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:311)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)

谁能提供解决方案?也有可能将一个SpecificRecord对象转换为JSON?如果是这样,我可以将它们存储为json而不是字节在发件箱表中。

谢谢。

postgres中发件箱表的数据类型:

我假设这些列仅用于记录值?如果不是,则需要ID+bytea作为记录键。也可以是记录时间戳,还可以添加主题名称

老实说,我认为在拉出ID到一个数据库列没有价值,因为它已经是有效载荷的一部分,你可能可以提取它使用数据库查询,如果需要(我不确定字节函数在Postgres,但似乎是可以做的事情)

在任何情况下,您都需要使用ByteArrayConverter来访问二进制数据,然后使用Connect转换来获取JDBC连接器期望的Struct值中的数据

我已经为Kafka Connect转换器编写了一个自定义SMT。代码将数据、有效负载和payload_id反序列化为"GenericData"。记录">

这定义了connect记录的值,但connect会将其解释为字节数组除非你在transform

中也调用了AvroData.toConnectSchema

或者,这里有一些伪代码,用于使用原始字节

// class MyTransform<R extends ConnectRecord<R>> implements Transformation<R> {

@Override
public R apply(R r) {
final Object value = r.value();

byte[] valueAsBytes = (byte[]) value;
ByteBuffer b = ByteBuffer.wrap(valueAsBytes);
b.get();
int id = b.getInt();
byte[] payload = b.slice().toArray();
// TODO: define the payload to forward. 
// note: these are Connect API imports, not Avro types 
Schema valueSchema; // {int id; bytes payload} 
Struct updatedValue; 
return r.newRecord(topic, r.kafkaPartition(),
r.keySchema(), r.key(),
valueSchema, updatedValue,
r.timestamp());
}

如果您仍然得到一个错误,如" STRUCT: class的无效类型[B"那是因为ByteArrayConverter也在转换后以某种方式应用,你最好只是使用Kafka流作业,使用Bytes serde消费,操纵字节,生成实际的Avro(或您首选的JSON)有效负载,然后使用连接,因为你通常会没有任何转换

最新更新