我想使用Kafka Connect JDBC源连接器(Postgres)向Kafka发布事件
我有一个发件箱表,我将有效载荷id和有效载荷存储为字节后,使用KafkaAvroSerializer
序列化它们。
被序列化的对象是一个avro生成的SpecificRecord类,例如EmployeeCreatedEvent
发件箱表的数据类型:
payload bytea,
payload_id bytea
我已经为Kafka Connect转换器编写了一个自定义SMT。代码将数据、有效负载和payload_id反序列化为"GenericData"。记录">但是我得到下面的错误:
Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class org.apache.avro.generic.GenericData$Record
我的环境:支流6.0.1中
配置:
key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter
ConnectRecord值有两个元素:subject_id &;Subject和它们是byte[]。我想使用Key=payload_id value=payload
final byte[] subjectId = (byte[]) values.get("subject_id");
final byte[] retrievedPayload = (byte[]) values.get("subject");
I get the Exception: DataException: Invalid type for STRUCT: class [B
我正在从模式注册表中获取模式,并在创建新的ConnectRecord之前转换为connectSchema。
record.newRecord("mytopic", record.kafkaPartition(), derivedKeySchema, values.get("subject_id"), derivedValueSchema, values.get("subject"), record.timestamp());
我在开始时从模式注册表中检索模式,并在创建新的连接记录时使用它。
全栈跟踪:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:311)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class [B
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:597)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:344)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:87)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:311)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
谁能提供解决方案?也有可能将一个SpecificRecord对象转换为JSON?如果是这样,我可以将它们存储为json而不是字节在发件箱表中。
谢谢。
postgres中发件箱表的数据类型:
我假设这些列仅用于记录值?如果不是,则需要ID+bytea作为记录键。也可以是记录时间戳,还可以添加主题名称
老实说,我认为在拉出ID到一个数据库列没有价值,因为它已经是有效载荷的一部分,你可能可以提取它使用数据库查询,如果需要(我不确定字节函数在Postgres,但似乎是可以做的事情)
在任何情况下,您都需要使用ByteArrayConverter来访问二进制数据,然后使用Connect转换来获取JDBC连接器期望的Struct值中的数据
我已经为Kafka Connect转换器编写了一个自定义SMT。代码将数据、有效负载和payload_id反序列化为"GenericData"。记录">
这定义了connect记录的值,但connect会将其解释为字节数组除非你在transform
中也调用了AvroData.toConnectSchema
或者,这里有一些伪代码,用于使用原始字节
// class MyTransform<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public R apply(R r) {
final Object value = r.value();
byte[] valueAsBytes = (byte[]) value;
ByteBuffer b = ByteBuffer.wrap(valueAsBytes);
b.get();
int id = b.getInt();
byte[] payload = b.slice().toArray();
// TODO: define the payload to forward.
// note: these are Connect API imports, not Avro types
Schema valueSchema; // {int id; bytes payload}
Struct updatedValue;
return r.newRecord(topic, r.kafkaPartition(),
r.keySchema(), r.key(),
valueSchema, updatedValue,
r.timestamp());
}
如果您仍然得到一个错误,如" STRUCT: class的无效类型[B"那是因为ByteArrayConverter也在转换后以某种方式应用,你最好只是使用Kafka流作业,使用Bytes serde消费,操纵字节,生成实际的Avro(或您首选的JSON)有效负载,然后使用连接,因为你通常会没有任何转换