我正在尝试使用TopologyTestDriver测试kafka流。我正在分享代码片段和我所面临的错误。
public class ToplogyTest extends AvroSourceJsonTopologyTestSupport {
private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://test:8081";
private TopologyTestDriver testDriver;
private TestInputTopic<String, GenericRecord> inputTopic;
private MockSchemaRegistryClient schemaRegistryClient;
}
private final Properties props;
public ToplogyTest () {
super();
props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL);
schemaRegistryClient = new MockSchemaRegistryClient();
}
@BeforeEach
public void setup() throws Exception {
// Created the topology
// Create test driver
testDriver = new TopologyTestDriver(topology, props);
// Create Serdes used for test record keys and values
Serde<String> stringSerde = Serdes.String();
Serde<GenericRecord> avroSerde = new GenericAvroSerde();
final Map<String,String> avroSerdeConfig = new HashMap<>();
avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL);
avroSerde.configure(avroSerdeConfig, false);
usersTopic = testDriver.createInputTopic(
"input-topic",
stringSerde.serializer(),
avroSerde.serializer());
}
@Test
public void Test(){
usersTopic.pipeInput("null",record);
}
错误org.apache.kafka.commun.errors.SerializationException:错误序列化Avro消息已禁止:java.lang.IollegalArgumentException:请始终:初始化每次测试前,在设置结束时使用提供的测试驱动程序方法使用提供的方法在每次测试后关闭测试驱动程序。引起原因:java.net.MorformedURLException:未知协议:mock atjava.net.URL。(URL.java:618(在java.net.UURL。(URL.java:508(在java.net.URL。(URL.java:457(在io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:152(
我在这里所理解的是,我无法注册mock模式注册表。有人面临类似的问题吗?
我还没有尝试过GenericAvroSerde,但使用SpecificAvroSerd,它有一个构造函数,它采用SchemaRegistryClient的参数。
当您创建GenericAvroSerde时,传入mockSchemaRegistryClient的实例,以便它使用您的mock,以强制它不使用它将自己创建的SchemaRegistryClient(不使用arg构造函数(。
此外,删除default.value.serde=GenericAvroSerde的属性-如果你在测试中像通常在测试中的代码中那样配置它,你可以再次配置一个没有mock的GenericAvroy实例,并且在运行时它仍然尝试连接到schema.registry.url
我不知道MockSchemaRegistryClient是否打算这样使用,但当与SpecificAvroSerde 一起使用时,这种方法对我有效
根据错误,mock://
不是有效的URI方案。对于您的测试,即使没有使用,它也需要是http://