无法为多个使用者订阅Spring Kafka Test嵌入式Kafka broker



我尝试为两个消费者订阅一个EmbeddedKafkaBroker。第一个成功了,但第二个失败了。@EmbeddedKafka@ClassRule代理均失败。

@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(topics = { "topic" })
public class AnnotationEmbeddedKafkaTest {
@Autowired
private EmbeddedKafkaBroker broker;
@Test
public void annotationEmbeddedKafkaTest() {
Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
Consumer<String, String> consumer1 = new KafkaConsumer<>(consumerProps1);
broker.consumeFromAnEmbeddedTopic(consumer1, "topic");
System.out.println("consumer1 assignments=" + consumer1.assignment());
Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
Consumer<String, String> consumer2 = new KafkaConsumer<>(consumerProps2);
broker.consumeFromAnEmbeddedTopic(consumer2, "topic");
System.out.println("consumer2 assignments=" + consumer2.assignment());
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ClassRuleEmbeddedKafkaTest {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, false, "topic");
private EmbeddedKafkaBroker broker = embeddedKafkaRule.getEmbeddedKafka();
@Test
public void classRuleEmbeddedKafkaTest() {
Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
Consumer<String, String> consumer1 = new KafkaConsumer<>(consumerProps1);
broker.consumeFromAnEmbeddedTopic(consumer1, "topic");
System.out.println("consumer1 assignments=" + consumer1.assignment());
Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded", "false", broker);
Consumer<String, String> consumer2 = new KafkaConsumer<>(consumerProps2);
broker.consumeFromAnEmbeddedTopic(consumer2, "topic");
System.out.println("consumer2 assignments=" + consumer2.assignment());
}
}

我希望这两个消费者可以订阅一个EmbeddedKafkaBroker。这在春季卡夫卡测试中可能吗?

我在这里复制了这一点:https://github.com/yraydhitya/spring-kafka-test-multiple-consumers

如果您希望两个消费者都能收到来自主题的所有消息,您需要他们成为不同消费者群体的一部分,例如:

Map<String, Object> consumerProps1 = KafkaTestUtils.consumerProps("testEmbedded1", "false", broker);

Map<String, Object> consumerProps2 = KafkaTestUtils.consumerProps("testEmbedded2", "false", broker);

否则,每个使用者都将被分配到主题的不同分区,并且由于嵌入的kafka主题(默认情况下(只有一个分区,因此只有一个使用者将被分配给它。

最新更新