我正在尝试编写单元测试案例,以添加回调,但我不确定该怎么做。在互联网上没有任何有用的东西。
@Test
public void can_publish_data_to_kafka() {
String topic = someString(10);
String key = someAlphanumericString(5);
String data = someString(50);
SendResult sendResult = mock(SendResult.class);
ListenableFuture<SendResult<String, Object>> future = mock(ListenableFuture.class);
given(kafkaTemplate.send(topic, key, data)).willReturn(future);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return invocationOnMock.getArguments()[1];
}
});
service.method(key, topic, data);
}
我想编写测试用例的代码
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName, key, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> stringKafkaBeanSendResult) {
RecordMetadata recordMetadata = stringKafkaBeanSendResult.getRecordMetadata();
LOGGER.info(String.format("sent message %s to topic %s partition %s with offset %s" + data.toString(), recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()));
}
@Override
public void onFailure(Throwable throwable) {
LOGGER.error(String.format("unable to send message = %s to topic %s because of error %s" + data.toString(), topicName, throwable.getMessage()));
}
});
我期望我应该使用Mockito撰写UT的方向。
您可以编写这样的测试用例。
@Test
public void can_publishDataToKafka() {
String key = someAlphanumericString();
String topic = someAlphaString(10);
long offset = somePositiveLong();
int partition = somePositiveInteger();
SiebelRecord siebelRecord = mock(SiebelRecord.class);
SendResult<String, Object> sendResult = mock(SendResult.class);
ListenableFuture<SendResult<String, Object>> responseFuture = mock(ListenableFuture.class);
RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition(topic, partition), offset, 0L, 0L, 0L, 0, 0);
given(sendResult.getRecordMetadata()).willReturn(recordMetadata);
when(kafkaTemplate.send(topic, key, siebelRecord)).thenReturn(responseFuture);
doAnswer(invocationOnMock -> {
ListenableFutureCallback listenableFutureCallback = invocationOnMock.getArgument(0);
listenableFutureCallback.onSuccess(sendResult);
assertEquals(sendResult.getRecordMetadata().offset(), offset);
assertEquals(sendResult.getRecordMetadata().partition(), partition);
return null;
}).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
service.publishDataToKafka(key, topic, siebelRecord);
verify(kafkaTemplate, times(1)).send(topic, key, siebelRecord);
}
@Test(expected = KafkaException.class)
public void can_capture_failure_publishDataToKafka() {
String key = someAlphanumericString();
String topic = someAlphaString(10);
String message = someString(20);
SiebelRecord siebelRecord = mock(SiebelRecord.class);
ListenableFuture<SendResult<String, Object>> responseFuture = mock(ListenableFuture.class);
Throwable throwable = mock(Throwable.class);
given(throwable.getMessage()).willReturn(message);
when(kafkaTemplate.send(topic, key, siebelRecord)).thenReturn(responseFuture);
doAnswer(invocationOnMock -> {
ListenableFutureCallback listenableFutureCallback = invocationOnMock.getArgument(0);
listenableFutureCallback.onFailure(throwable);
return null;
}).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
service.publishDataToKafka(key, topic, siebelRecord);
}
给定以下主类:
public class KafkaTemplateUnitTestExample {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final Logger logger;
public KafkaTemplateUnitTestExample(
final KafkaTemplate<String, Object> kafkaTemplate,
final Logger logger) {
this.kafkaTemplate = kafkaTemplate;
this.logger = logger;
}
public void method(String topicName, String key, Object data) {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName, key, data);
future.addCallback(stringKafkaBeanSendResult -> {
RecordMetadata recordMetadata = stringKafkaBeanSendResult.getRecordMetadata();
logger.info(String.format("sent message %s to topic %s partition %s with offset %s", data.toString(), recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()));
}, ex -> {
logger.error(String.format("unable to send message = %s to topic %s because of error %s", data.toString(), topicName, ex.getMessage()));
});
}
}
然后,您可以使用这样的Mockito测试可听的回调:
@ExtendWith(MockitoExtension.class)
class KafkaTemplateUnitTestExampleTest {
@Mock
KafkaTemplate<String, Object> kafkaTemplate;
@Mock
Logger logger;
@InjectMocks
KafkaTemplateUnitTestExample service;
private CompletableFuture<SendResult<String, Object>> completableFuture;
@BeforeEach
void setUp() {
completableFuture = new CompletableFuture<>();
var futureAdapter = new CompletableToListenableFutureAdapter<>(completableFuture);
when(kafkaTemplate.send(any(), any(), any())).thenReturn(futureAdapter);
}
@Test
void testSuccessCallback() {
String topic = someString(10);
String key = someAlphanumericString(5);
String data = someString(50);
service.method(key, topic, data);
completableFuture.complete(new SendResult<>(
new ProducerRecord<>("myTopic", "result"),
new RecordMetadata(new TopicPartition("myTopic", 1), 42, 23, 123456, 4, 8)
));
verify(logger).info(anyString());
}
@Test
void testFailureCallback() {
String topic = someString(10);
String key = someAlphanumericString(5);
String data = someString(50);
service.method(key, topic, data);
completableFuture.completeExceptionally(new RuntimeException("testEx"));
verify(logger).error(anyString());
}
// missing String factories
}
我认为这是测试回调的一种更清洁的方式。您必须能够为此解决方案模拟您的记录器,但这希望不应该是问题。