Spring Kafka - 遇到"Magic v0 不支持记录标头"错误



我正在运行一个 Spring Boot 应用程序,并且已经屈服于compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE')

我试图反对这个版本的Cloudera安装:

Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50

我的 KafkaProducerConfig 类非常简单:

@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;
@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;
@Value("${spring.kafka.producer.client-id}")
private String producerClientId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
@Bean
public ProducerFactory<String, Pdid> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(this.defaultTopicName);
return kafkaTemplate;
}
@PostConstruct
public void postConstruct() {
LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}

}

当我启动应用程序时,我收到:

2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.12018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e

然后,我发送有效载荷。它出现在卡夫卡工具中,反对该主题。但是,在 Kafka 端的日志中,当数据尝试摄取时,我收到:

[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)

我已经从生产者应用程序方面尝试了以下事情:

  1. 降级到春季卡夫卡 2.0.4。我希望降到卡夫卡版本的 0.11.0 将有助于摆脱这个问题,但它没有效果。
  2. 已验证节点是否均为同一版本。根据我的管理员的说法,他们是。
  3. 已向我的管理员验证我们没有混合安装。我再次被告知我们没有。
  4. 基于类似的堆栈溢出问题,我回到了 2.1.5 并尝试将JsonSerializer.ADD_TYPE_INFO_HEADERS放入 false。我想也许它会删除日志所引用的标题。同样,不行,并记录了相同的错误。

我希望我错过了一些明显的东西。我是否需要打开/关闭任何其他标头来帮助解决任何人都知道的 Magic v0 问题?

我们还有其他应用程序可以在同一环境中成功写入其他主题,但它们是手工制作必要的 Spring bean 的旧应用程序。此外,这些应用程序还使用更旧的客户端 (0.8.2.2(,并且它们对生产者值使用 StringSerializer 而不是 JSON。我需要我的数据在 JSON 中,当我们在应该支持 0.11.x 的系统上时,我真的不想降级到 0.8.2.2。

但它们是手工制作必要的春豆的旧应用程序。

在org.apache.kafka.common.record.FileRecords。向下转换(文件记录.java:245(

我不熟悉 kafka 代理内部,但它"听起来"像是使用旧代理创建的主题,它们的格式不支持标头,而不是代理版本本身(提示:downConvert(。

您是否与干净的经纪人尝试过?

1.0.x 客户端可以与较旧的代理通信(返回到 0.10.2.x IIRC(,只要您不尝试使用代理不支持的功能。您的代理是 0.11(确实支持标头(这一事实进一步表明,主题记录格式是问题所在。

我已经成功地测试了上/下经纪商/客户端/主题版本,没有问题,只要您使用通用功能子集。

JsonSerializer.ADD_TYPE_INFO_HEADERS为假。

这应该可以防止框架设置任何标头;你需要显示你的生产者代码(以及所有配置(。

您还可以向生产者配置添加ProducerInterceptor,并检查onSend()方法中的ProducerRecordheaders属性,以确定输出消息中设置标头的内容。

如果您使用的是 Spring 消息消息(template.setn(Message<?> m),默认情况下将映射标头(。使用原始template.send()方法不会设置任何标头(除非您发送填充了标头的ProducerRecord

这个问题的解决方案是混合两件事:

  1. 我需要添加JsonSerializer.ADD_TYPE_INFO_HEADERS to false,就像加里·罗素建议的那样。
  2. 我需要刷新在将配置放入我的应用程序之前放入主题的所有记录。以前的记录有标题,这破坏了Flume消费者。

我已经将我的应用程序升级到 Spring boot 2x,并且我在 kafka 客户端依赖项方面遇到了一些兼容性问题(参见 Spring-boot 和 Spring-Kafka 兼容性矩阵(,所以我也必须升级它。另一方面,我在服务器上运行了一个较旧的代理(kafka 0.10(,然后我无法向其发送消息。我还意识到,即使将JsonSerializer.ADD_TYPE_INFO_HEADERS设置为 false,kafka 制作者也在内部设置标头,并且由于魔法是根据 kafka 的版本(在RecordBatch中(固定的,在这种情况下,没有办法不陷入MemoryRecordsBuilder.appendWithOffsetif (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");的条件。 最后,我解决这个问题的唯一方法是升级我的 kafka 服务器。

最新更新