在使用者端,带有模式注册表有效负载的春季启动kafka不匹配



我有一个具有此配置的生产商

kafka:
bootstrap-servers: localhost:9092
cloud:
stream:
binder:
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
properties:
specific.avro.reader: true
schemaRegistryClient:
endpoint: http://localhost:8081
bindings:
event-in-0:
destination: event-details
contentType: application/*+avro
group: group1
function:
definition: event

这是我的模式

{
"type": "record",
"name": "Event",
"namespace": "com.example.schema.avro",
"fields": [
{
"name": "eventId",
"type": "int"
}
]
}

和发布消息的代码

public void sendEvent(final EventDto eventDto) {
final Event apply = event().apply(eventDto);
final Message<Event> build = MessageBuilder.withPayload(apply)
.setHeader("partitionKey", eventDto.eventId())
.setHeader("customHeader", "test").build();
final boolean send = streamBridge.send("event-out-0", build);
log.info(String.valueOf(build));
}

这会产生日志中所示的正确事件

2022-08-15 09:12:23.988  INFO 25112 --- [nio-9090-exec-5] c.e.eventproducer.service.EventService   : GenericMessage [payload={"eventId": 200}, headers={customHeader=test, id=70c3e52c-f419-cf0c-8fae-0ba07d8876da, partitionKey=200, timestamp=1660547543987}]

现在在消费者端,我期望相同的事件id:200,但无论事件id是什么,我总是得到0。这是我的消费者配置

kafka:
bootstrap-servers: localhost:9092
cloud:
stream:
binder:
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
properties:
specific.avro.reader: true
schemaRegistryClient:
endpoint: http://localhost:8081
bindings:
event-in-0:
destination: event-details
contentType: application/*+avro
group: group1
function:
definition: event

这是为了订阅消息

@Bean
public Consumer<Message<Event>> event() {
return e -> {
log.info(e.toString());
eventRepository.save(new com.example.eventconsumer.doamin.Event(e.getPayload().getEventId()));
};
}

日志显示

2022-08-15 09:12:23.992  INFO [,b19cd949219432acf232403bdcea45c2,1464ff27cfed8011] 24596 --- [container-0-C-1] c.e.eventconsumer.service.EventService   : GenericMessage [payload={"eventId": 0}, headers={customHeader=test, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, scst_partition=0, kafka_receivedTopic=event-details, kafka_offset=97, partitionKey=200, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@41d9d95, source-type=kafka, id=0762521f-069b-a91d-acc5-69e6bb2eb4eb, kafka_receivedPartitionId=0, contentType=application/vnd.event.v1+avro, kafka_receivedTimestamp=1660547543987, kafka_groupId=group1, timestamp=1660547543992}]

这是从.avsc文件自动生成的Event.java类

/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package com.example.schema.avro;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Event extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = -4520271777387293905L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{"type":"record","name":"Event","namespace":"com.example.schema.avro","fields":[{"name":"eventId","type":"int"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<Event> ENCODER =
new BinaryMessageEncoder<Event>(MODEL$, SCHEMA$);
private static final BinaryMessageDecoder<Event> DECODER =
new BinaryMessageDecoder<Event>(MODEL$, SCHEMA$);
/**
* Return the BinaryMessageDecoder instance used by this class.
*/
public static BinaryMessageDecoder<Event> getDecoder() {
return DECODER;
}
/**
* Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
* @param resolver a {@link SchemaStore} used to find schemas by fingerprint
*/
public static BinaryMessageDecoder<Event> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder<Event>(MODEL$, SCHEMA$, resolver);
}
/** Serializes this Event to a ByteBuffer. */
public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
return ENCODER.encode(this);
}
/** Deserializes a Event from a ByteBuffer. */
public static Event fromByteBuffer(
java.nio.ByteBuffer b) throws java.io.IOException {
return DECODER.decode(b);
}
@Deprecated public int eventId;
/**
* Default constructor.  Note that this does not initialize fields
* to their default values from the schema.  If that is desired then
* one should use <code>newBuilder()</code>.
*/
public Event() {}
/**
* All-args constructor.
* @param eventId The new value for eventId
*/
public Event(java.lang.Integer eventId) {
this.eventId = eventId;
}
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
// Used by DatumWriter.  Applications should not call.
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return eventId;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
// Used by DatumReader.  Applications should not call.
@SuppressWarnings(value="unchecked")
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: eventId = (java.lang.Integer)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
/**
* Gets the value of the 'eventId' field.
* @return The value of the 'eventId' field.
*/
public java.lang.Integer getEventId() {
return eventId;
}
/**
* Sets the value of the 'eventId' field.
* @param value the value to set.
*/
public void setEventId(java.lang.Integer value) {
this.eventId = value;
}
/**
* Creates a new Event RecordBuilder.
* @return A new Event RecordBuilder
*/
public static com.example.schema.avro.Event.Builder newBuilder() {
return new com.example.schema.avro.Event.Builder();
}
/**
* Creates a new Event RecordBuilder by copying an existing Builder.
* @param other The existing builder to copy.
* @return A new Event RecordBuilder
*/
public static com.example.schema.avro.Event.Builder newBuilder(com.example.schema.avro.Event.Builder other) {
return new com.example.schema.avro.Event.Builder(other);
}
/**
* Creates a new Event RecordBuilder by copying an existing Event instance.
* @param other The existing instance to copy.
* @return A new Event RecordBuilder
*/
public static com.example.schema.avro.Event.Builder newBuilder(com.example.schema.avro.Event other) {
return new com.example.schema.avro.Event.Builder(other);
}
/**
* RecordBuilder for Event instances.
*/
public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Event>
implements org.apache.avro.data.RecordBuilder<Event> {
private int eventId;
/** Creates a new Builder */
private Builder() {
super(SCHEMA$);
}
/**
* Creates a Builder by copying an existing Builder.
* @param other The existing Builder to copy.
*/
private Builder(com.example.schema.avro.Event.Builder other) {
super(other);
if (isValidValue(fields()[0], other.eventId)) {
this.eventId = data().deepCopy(fields()[0].schema(), other.eventId);
fieldSetFlags()[0] = true;
}
}
/**
* Creates a Builder by copying an existing Event instance
* @param other The existing instance to copy.
*/
private Builder(com.example.schema.avro.Event other) {
super(SCHEMA$);
if (isValidValue(fields()[0], other.eventId)) {
this.eventId = data().deepCopy(fields()[0].schema(), other.eventId);
fieldSetFlags()[0] = true;
}
}
/**
* Gets the value of the 'eventId' field.
* @return The value.
*/
public java.lang.Integer getEventId() {
return eventId;
}
/**
* Sets the value of the 'eventId' field.
* @param value The value of 'eventId'.
* @return This builder.
*/
public com.example.schema.avro.Event.Builder setEventId(int value) {
validate(fields()[0], value);
this.eventId = value;
fieldSetFlags()[0] = true;
return this;
}
/**
* Checks whether the 'eventId' field has been set.
* @return True if the 'eventId' field has been set, false otherwise.
*/
public boolean hasEventId() {
return fieldSetFlags()[0];
}

/**
* Clears the value of the 'eventId' field.
* @return This builder.
*/
public com.example.schema.avro.Event.Builder clearEventId() {
fieldSetFlags()[0] = false;
return this;
}
@Override
@SuppressWarnings("unchecked")
public Event build() {
try {
Event record = new Event();
record.eventId = fieldSetFlags()[0] ? this.eventId : (java.lang.Integer) defaultValue(fields()[0]);
return record;
} catch (java.lang.Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}
}
@SuppressWarnings("unchecked")
private static final org.apache.avro.io.DatumWriter<Event>
WRITER$ = (org.apache.avro.io.DatumWriter<Event>)MODEL$.createDatumWriter(SCHEMA$);
@Override public void writeExternal(java.io.ObjectOutput out)
throws java.io.IOException {
WRITER$.write(this, SpecificData.getEncoder(out));
}
@SuppressWarnings("unchecked")
private static final org.apache.avro.io.DatumReader<Event>
READER$ = (org.apache.avro.io.DatumReader<Event>)MODEL$.createDatumReader(SCHEMA$);
@Override public void readExternal(java.io.ObjectInput in)
throws java.io.IOException {
READER$.read(this, SpecificData.getDecoder(in));
}
}

有趣的是,如果我将分区键作为事件id传递,那么我会正确地获取它,但不会获取有效负载本身。

我相信问题出在DTO上,evenId的值设置/获取不正确,并且总是返回默认值int,即0

这就是.avsc中自动生成的类的外观。因此,我们需要在事件类中适当地设置/获取eventId。

/** Gets the value of the 'eventId' field */
public java.lang.Integer getEventId() {
return eventId;
}
/** Sets the value of the 'eventId' field */
public com.example.schema.avro.Event.Builder setEventId(int value) {
validate(fields()[0], value);
this.eventId = value;
fieldSetFlags()[0] = true;
return this; 
}
public Event build() {
try {
Event record = new Event();
record.eventId = fieldSetFlags()[0] ? this.eventId : (java.lang.Integer) defaultValue(fields()[0]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
}
}

最新更新