我正试图通过函数编程(和spring cloud stream(从输入主题转换输入AVRO消息,并在输出主题上发布新消息。这是我的转换函数:
@Bean
public Function<KStream<String, Data>, KStream<String, Double>> evenNumberSquareProcessor() {
return kStream -> kStream.transform(() -> new CustomProcessor(STORE_NAME), STORE_NAME);
}
CustomProcessor是一个实现";变压器";界面
我尝试过使用非AVRO输入的转换,它运行良好。
我的困难在于如何在application.yaml文件或spring应用程序中声明模式注册表。
我尝试了很多不同的配置(似乎很难找到正确的文档(,每次应用程序都找不到schema.registry.url的设置。我有以下错误:
创建名为"kafkaStreamsFunctionProcessorInvoker"的bean时出错:init方法调用失败;嵌套异常为java.lang.IollegalStateException:org.apache.kafka.commun.config.ConfigException:缺少必需项配置";schema.registry.url";其没有默认值。
这是我的应用程序.yml文件:
spring:
cloud:
stream:
function:
definition: evenNumberSquareProcessor
bindings:
evenNumberSquareProcessor-in-0:
destination: input
content-type: application/*+avro
group: group-1
evenNumberSquareProcessor-out-0:
destination: output
kafka:
binder:
brokers: my-cluster-kafka-bootstrap.kafka:9092
consumer-properties:
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
我也尝试过这种配置:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: my-cluster-kafka-bootstrap.kafka:9092
configuration:
schema.registry.url: http://localhost:8081
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
evenNumberSquareProcessor-in-0:
consumer:
destination: input
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
evenNumberSquareProcessor-out-0:
destination: output
我的spring-boot应用程序是这样声明的,激活了模式注册表客户端:
@EnableSchemaRegistryClient
@SpringBootApplication
public class TransformApplication {
public static void main(String[] args) {
SpringApplication.run(TransformApplication.class, args);
}
}
谢谢你能给我带来的任何帮助。
问候CG-
在configuration
下配置架构注册表,然后它将对所有绑定器可用。顺便说一句avro串行器位于bindings
和特定通道下。如果要使用默认属性default.value.serde:
。你的Serde可能也错了。
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:9092
configuration:
schema.registry.url: http://localhost:8081
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
process-in-0:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
不要使用@EnableSchemaRegistryClient
。在Avro Serde上启用架构注册表。在本例中,我使用您定义的beanData
。试着按照这个例子来做。
@Service
public class CustomSerdes extends Serdes {
private final static Map<String, String> serdeConfig = Stream.of(
new AbstractMap.SimpleEntry<>(SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
public static Serde<Data> DataAvro() {
final Serde<Data> dataAvroSerde = new SpecificAvroSerde<>();
dataAvroSerde.configure(serdeConfig, false);
return dataAvroSerde;
}
}