Spring cloud kafka流模式注册表



我正试图通过函数编程(和spring cloud stream(从输入主题转换输入AVRO消息,并在输出主题上发布新消息。这是我的转换函数:

@Bean
public Function<KStream<String, Data>, KStream<String, Double>> evenNumberSquareProcessor() {
return kStream -> kStream.transform(() -> new CustomProcessor(STORE_NAME), STORE_NAME);
}

CustomProcessor是一个实现";变压器";界面

我尝试过使用非AVRO输入的转换,它运行良好。

我的困难在于如何在application.yaml文件或spring应用程序中声明模式注册表。

我尝试了很多不同的配置(似乎很难找到正确的文档(,每次应用程序都找不到schema.registry.url的设置。我有以下错误:

创建名为"kafkaStreamsFunctionProcessorInvoker"的bean时出错:init方法调用失败;嵌套异常为java.lang.IollegalStateException:org.apache.kafka.commun.config.ConfigException:缺少必需项配置";schema.registry.url";其没有默认值。

这是我的应用程序.yml文件:

spring:
cloud:
stream:
function:
definition: evenNumberSquareProcessor
bindings:
evenNumberSquareProcessor-in-0:
destination: input
content-type: application/*+avro
group: group-1
evenNumberSquareProcessor-out-0:
destination: output
kafka:
binder:
brokers: my-cluster-kafka-bootstrap.kafka:9092
consumer-properties:
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081

我也尝试过这种配置:

spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: my-cluster-kafka-bootstrap.kafka:9092
configuration:
schema.registry.url: http://localhost:8081
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
evenNumberSquareProcessor-in-0:
consumer:
destination: input
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
evenNumberSquareProcessor-out-0:
destination: output

我的spring-boot应用程序是这样声明的,激活了模式注册表客户端:

@EnableSchemaRegistryClient
@SpringBootApplication
public class TransformApplication {
public static void main(String[] args) {
SpringApplication.run(TransformApplication.class, args);
}
}

谢谢你能给我带来的任何帮助。

问候CG-

configuration下配置架构注册表,然后它将对所有绑定器可用。顺便说一句avro串行器位于bindings和特定通道下。如果要使用默认属性default.value.serde:。你的Serde可能也错了。

spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:9092
configuration:
schema.registry.url: http://localhost:8081
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
process-in-0:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

不要使用@EnableSchemaRegistryClient。在Avro Serde上启用架构注册表。在本例中,我使用您定义的beanData。试着按照这个例子来做。

@Service
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public static Serde<Data> DataAvro() {
final Serde<Data> dataAvroSerde = new SpecificAvroSerde<>();
dataAvroSerde.configure(serdeConfig, false);
return dataAvroSerde;
}
}

相关内容

  • 没有找到相关文章

最新更新