实现 Spring 服务,根据配置向不同的 Kafka 主题发送消息



我想使用Spring Services,以便根据配置将数据发送到不同的Kafka消息:

ResponseFactory processingPeply = null;
switch(endpointType)
{
case "email":
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-email.request", tf);
RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionEmailReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
break;
case "sms":
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sms.request", tf);
RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionSmsReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
break;
case "network":
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-network.request", tf);
RequestReplyFuture<String, Object, Object> replyFuture = processingTransactionNetworkReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
processingPeply = (ResponseFactory) consumerRecord.value();
break;

default:
processingPeply = ResponseFactory.builder().status("error").build();
} 

我目前得到:

  • 变量"记录"已在作用域中定义
  • 变量"sendResult"已在作用域中定义
  • 变量"consumerRecord"已在作用域中定义

您知道如何以更好的方式重新设计代码以解决问题吗? 我想在春季服务中使用 DRY 原则以减少代码。

您可以自动连接所有ReplyingKafkaTemplate并查找与您的终端节点类型匹配的。

@Autowired
private List<ReplyingKafkaTemplate<String, Object, Object>> templates;
ReplyingKafkaTemplate<String, Object, Object> template = null;
for(ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate :  templates) {
String defaultTopic = replyingKafkaTemplate.getDefaultTopic();
if (defaultTopic.contains(endpointType)) {
template = replyingKafkaTemplate;
break;
}
}
ProducerRecord<String, Object> record = new ProducerRecord<>(template.getDefaultTopic(), tf);
RequestReplyFuture<String, Object, Object> replyFuture = template.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
ResponseFactory processingPeply = (ResponseFactory) consumerRecord.value();

您还可以以这样一种方式设置配置,以创建查找类型的 Bean,然后注入Map<String, ReplyingKafkaTemplate>以便于查找。由于我不知道您的设置,因此无法为您提供配置设置。

@Autowired
private Map<String, ReplyingKafkaTemplate<String, Object, Object>>> templates;
ReplyingKafkaTemplate<String, Object, Object> template = templates.get(endpointType);
ProducerRecord<String, Object> record = new ProducerRecord<>(template.getDefaultTopic(), tf);
RequestReplyFuture<String, Object, Object> replyFuture = template.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
ResponseFactory processingPeply = (ResponseFactory) consumerRecord.value();

我认为您可以使用接口来分离将数据发送到不同端点的逻辑。看看下面的代码:

发送数据和接收响应的主类。它对电子邮件,短信,网络发件人一无所知。

package com.example.demo.service;
import com.example.demo.dto.Response;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class KafkaSender {
private final List<EndpointSender> senders;
public KafkaSender(List<EndpointSender> senders) {
this.senders = senders;
}
public Response send(Object data, String endpoint) {
return senders
.stream()
.filter(it -> it.supports(endpoint))
.findAny()
.map(it -> it.send(data))
.orElseGet(() -> new Response("error"));
}
}

然后我们创建这样的接口:

package com.example.demo.service;
import com.example.demo.dto.Response;
public interface EndpointSender {
Response send(Object obj);
boolean supports(String endpoint);
}

和实现:

减少沸腾板代码的基类:

package com.example.demo.service.sender;
import com.example.demo.dto.Response;
import com.example.demo.service.EndpointSender;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;
import java.util.concurrent.TimeUnit;
public abstract class BaseSender implements EndpointSender {
public abstract ProducerRecord<String, Object> getRecord(Object obj);
public abstract ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate();
@Override
public Response send(Object obj) {
try {
RequestReplyFuture<String, Object, Object> replyFuture = kafkaTemplate().sendAndReceive(getRecord(obj));
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
return (Response) consumerRecord.value();
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
}

以及发件人的实现: 电子邮件发件人:

package com.example.demo.service.sender;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class EmailSender extends BaseSender {
private final ReplyingKafkaTemplate<String, Object, Object> processingTransactionEmailReplyKafkaTemplate;
public EmailSender(ReplyingKafkaTemplate<String, Object, Object> processingTransactionEmailReplyKafkaTemplate) {
this.processingTransactionEmailReplyKafkaTemplate = processingTransactionEmailReplyKafkaTemplate;
}
@Override
public boolean supports(String endpoint) {
return "email".equals(endpoint);
}
@Override
public ProducerRecord<String, Object> getRecord(Object obj) {
return new ProducerRecord<>("tp-email.request", obj);
}
@Override
public ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate() {
return processingTransactionEmailReplyKafkaTemplate;
}
}

短信发送者:

package com.example.demo.service.sender;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class SmsSender extends BaseSender{
private final ReplyingKafkaTemplate<String, Object, Object> processingTransactionSmsReplyKafkaTemplate;
public SmsSender(ReplyingKafkaTemplate<String, Object, Object> processingTransactionSmsReplyKafkaTemplate) {
this.processingTransactionSmsReplyKafkaTemplate = processingTransactionSmsReplyKafkaTemplate;
}
@Override
public boolean supports(String endpoint) {
return "sms".equals(endpoint);
}
@Override
public ProducerRecord<String, Object> getRecord(Object obj) {
return new ProducerRecord<>("tp-sms.request", obj);
}
@Override
public ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate() {
return processingTransactionSmsReplyKafkaTemplate;
}
}

网络发送方:

package com.example.demo.service.sender;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class NetworkSender extends BaseSender{
private final ReplyingKafkaTemplate<String, Object, Object> processingTransactionNetworkReplyKafkaTemplate;
public NetworkSender(ReplyingKafkaTemplate<String, Object, Object> processingTransactionNetworkReplyKafkaTemplate) {
this.processingTransactionNetworkReplyKafkaTemplate = processingTransactionNetworkReplyKafkaTemplate;
}
@Override
public boolean supports(String endpoint) {
return "network".equals(endpoint);
}
@Override
public ProducerRecord<String, Object> getRecord(Object obj) {
return new ProducerRecord<>("tp-network.request", obj);
}
@Override
public ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate() {
return processingTransactionNetworkReplyKafkaTemplate;
}
}

应用 KISS 但不是那么干燥... 将每个案例中的每个代码块放入括号中

case "email": {
...
}
break;
...

通过这样做,您可以减少案例的范围,然后您可以重用相同的变量名称。

最新更新