我编写了一个spring云流应用程序,生产者将消息发布到指定的kafka主题。我的问题是如何添加生产者回调来接收消息已成功发布在主题上的回复/确认?就像我们在spring kafkaproducer.send(record, new callback { ... })
中做的那样(维护异步生产者)。下面是我的代码:
private final Sinks.Many<Message<?>> responseProcessor = Sinks.many().multicast().onBackpressureBuffer();
@Bean
public Supplier<Flux<Message<?>>> event() {
return responseProcessor::asFlux;
}
public Message<?> publishEvent(String status) {
try {
String key = ...;
response = MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.build();
responseProcessor.tryEmitNext(response);
}
如何确保tryEmitNext是否已经成功地写到了主题?
正在实现ProducerListener解决方案和可能性?在Spring Cloud Stream中找不到具体的解决方案/文档
我已经实现下面现在,似乎工作如预期的
@Component
public class MyProducerListener<K, V> implements ProducerListener<K, V> {
@Override
public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
// Do nothing on onSuccess
}
@Override
public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
log.error("Producer exception occurred while publishing message : {}, exception : {}", producerRecord, exception);
}
}
@Bean
ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> customizer(MyProducerListener pl) {
return (handler, destinationName) -> handler.getKafkaTemplate().setProducerListener(pl);
}
查看Kafka Producer属性。
recordMetadataChannel
发送成功结果的
MessageChannel
的bean名称;bean必须存在于应用程序上下文中。发送到通道的消息是发送的消息(转换后,如果有的话)与一个额外的标题KafkaHeaders.RECORD_METADATA
。头包含一个由Kafka客户端提供的RecordMetadata
对象;它包括在主题中写入记录的分区和偏移量。
ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders. conf)RECORD_METADATA RecordMetadata.class)
失败发送到生产者错误通道(如果配置);请参阅错误通道。默认值:空
你可以添加一个@ServiceActivator
从这个通道异步消费。