使用 poll() 的消费者单元测试永远不会收到任何内容



请考虑以下代码:

@Test(singleThreaded = true)
public class KafkaConsumerTest
{
private KafkaTemplate<String, byte[]> template;
private DefaultKafkaConsumerFactory<String, byte[]> consumerFactory;
private static final KafkaEmbedded EMBEDDED_KAFKA;
static {
EMBEDDED_KAFKA = new KafkaEmbedded(1, true, "topic");
try { EMBEDDED_KAFKA.before(); } catch (final Exception e) { e.printStackTrace(); }
}
@BeforeMethod
public void setUp() throws Exception {
final Map<String, Object> senderProps = KafkaTestUtils.senderProps(EMBEDDED_KAFKA.getBrokersAsString());
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
final ProducerFactory<String, byte[]> pf = new DefaultKafkaProducerFactory<>(senderProps);
this.template = new KafkaTemplate<>(pf);
this.template.setDefaultTopic("topic");
final Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sender", "false", EMBEDDED_KAFKA);
this.consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
this.consumerFactory.setValueDeserializer(new ByteArrayDeserializer());
this.consumerFactory.setKeyDeserializer(new StringDeserializer());
}
@Test
public void testSendToKafka() throws InterruptedException, ExecutionException, TimeoutException {
final String message = "42";
final Message<byte[]> msg = MessageBuilder.withPayload(message.getBytes(StandardCharsets.UTF_8)).setHeader(KafkaHeaders.TOPIC, "topic").build();
this.template.send(msg).get(10, TimeUnit.SECONDS);
final Consumer<String, byte[]> consumer = this.consumerFactory.createConsumer();
consumer.subscribe(Collections.singleton("topic"));
final ConsumerRecords<String, byte[]> records = consumer.poll(10000);
Assert.assertTrue(records.count() > 0);
Assert.assertEquals(new String(records.iterator().next().value(), StandardCharsets.UTF_8), message);
consumer.commitSync();
}
}

我正在尝试向KafkaTemplate发送消息并使用Consumer.poll()再次阅读。我正在使用的测试框架是TestNG。

发送作品,我已经使用我在网络中找到的"通常"代码(在KafkaMessageListenerContainer上注册消息侦听器(验证了这一点。

只是,我从来没有在消费者那里收到任何东西。我已经针对"真正的"Kafka 安装尝试了相同的顺序(创建Consumerpoll()(,并且它有效。

因此,看起来我设置ConsumerFactory的方式有问题?任何帮助将不胜感激!

你需要使用

EMBEDDED_KAFKA.consumeFromAnEmbeddedTopic(consumer, "topic");

在通过KafkaTemplate发布记录之前。

然后在验证测试结束时,您需要使用这样的东西:

ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer,  "topic");

您也可以按照自己的方式使用它,只是缺少的是作为earliestConsumerConfig.AUTO_OFFSET_RESET_CONFIG,因为默认的latest。这样,稍后添加到主题的使用者将不会看到之前发布的任何记录。

相关内容

最新更新