春季靴卡夫卡信息.如何简化处理程序的dto映射



我已经使用Kafka配置了我的春季启动项目。我可以接收和发布任何基于字符串的消息。

字符串消息不是最好的处理方式。拥有将消息从字符串默认转换为对象的功能会更有用。

为了实现这个功能,我需要将几乎所有的Kafka配置从yml移动到java(使用属性(。。。生产者示例

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AccountSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Account> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Account> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

代码有效,但我接受了简化。在最好的情况下,我希望有优雅的配置yml,可能是一些java更改。但通过直接的方式,我将获得额外的每3个bean,用于配置每个kafkaTemplatelistenerFactory

它是否可能简化未来的配置(我需要更多额外的Serializer"反序列化程序"(?怎样

P.S

我想用类似的方式配置yml,例如:

spring:
kafka:
consumer:
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
app:
topic:
foo: foo.t

但我不清楚如何在这里用不同的(De(Serializer配置消费者\生产者,映射指定主题上的。。。

我似乎没有任何机会为同一个侦听器配置不同的SERIALIZER|DESERIALIZERs。

但身份证并不意味着我的问题没有解决方案。

我对所有对象都使用了继承,并提供了一个抽象AbstractEventAbstractEvent通常没有用,但它在我的解决方案中使用,就像指定的SERIALIZER|DESERIALIZER的输入点一样。为了获取上下文中哪个对象的信息,我使用了自定义标头。org.apache.kafka.common.serialization.Deserializer没有头参数,但我已经在ExtendedDeserializer的基础上实现了我的DESERIALIZER。这种方式让我可以访问标题

via public T deserialize(String topic, Headers headers, byte[] data)

我的反序列化程序示例:

@Slf4j
public class AbstractEventDeserializer<T extends AbstractEvent> implements ExtendedDeserializer<T> {
private Map<String, Class<T>> mappers = new HashMap<>();
// default behavior
@Override
public T deserialize(String arg0, byte[] devBytes) {
ObjectMapper mapper = new ObjectMapper();
T bar = null;
try {
bar = (T) mapper.readValue(devBytes, Bar.class);
} catch (Exception e) {
e.printStackTrace();
}
return bar;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
log.info("handling...");
headers.forEach(header -> log.info("   {}: {}", header.key(), getHeaderValueAsString(header)));
Optional<String> classTypeFromHeader = getClassTypeFromHeader(headers);
if (classTypeFromHeader.isPresent()) {
return parseFromJson(data, mappers.get(classTypeFromHeader.get()));
}
return deserialize(topic, data);
}
private Optional<String> getClassTypeFromHeader(Headers headers) {
return StreamSupport.stream(headers.headers("X-CLASS-TYPE").spliterator(), false)
.map(Header::value)
.map(String::new)
.findFirst();
}
private String getHeaderValueAsString(Header header) {
return Optional.ofNullable(header.value())
.map(String::new)
.orElse(null);
}
@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
log.info("configuring deserialiser");
if (arg0.containsKey("mappers")) {
this.mappers = (Map<String, Class<T>>) arg0.get("mappers");
}
arg0.keySet().forEach(key -> log.info("   {}:{}", key, arg0.get(key)));
}
}

如果你想尝试工作解决方案,请查看实验示例。

Spring云服务为消费者提供了更好的配置、并发性、反序列化和更少的锅炉板代码。

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

水槽样品

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void handle(Person person) {
System.out.println("Received: " + person);
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}

示例配置:

spring:
cloud:
stream:
bindings:
input:
destination: <your topic>
group: <your consumer group>
consumer:
headerMode: raw
partitioned: true
concurrency: 10
kafka:
binder:
brokers: <Comma seperated list of kafka brokers>

此处提供更多信息https://cloud.spring.io/spring-cloud-stream/

相关内容

最新更新