如何使用Panache.withTransaction()在单元测试中链接2个Uni<?>而不获取java.util.concurrent.TimeoutException。



我在单元测试中使用Panache.withTransaction(),无论我做什么,我得到一个java.util.concurrent.TimeoutException

注意:它不需要交易,但我必须手动删除插入。

我想在事务中链接insertKline和getOhlcList,以便我可以从回滚中受益:

@QuarkusTest
@Slf4j
class KlineServiceTest {
@Inject
KlineRepository klineRepository;
@Inject
CurrencyPairRepository currencyPairRepository;
@Inject
KlineService service;
@Test
@DisplayName("ohlc matches inserted kline")
void ohlcMatchesInsertedKline() {
// GIVEN
val volume       = BigDecimal.valueOf(1d);
val closeTime    = LocalDateTime.now().withSecond(0).withNano(0);
val currencyPair = new CurrencyPair("BTC", "USDT");
val currencyPairEntity = currencyPairRepository
.findOrCreate(currencyPair)
.await().indefinitely();
val kline = KlineEntity.builder()
.id(new KlineId(currencyPairEntity, closeTime))
.volume(volume)
.build();
val insertKline = Uni.createFrom().item(kline)
.call(klineRepository::persistAndFlush);
val getOhlcList = service.listOhlcByCurrencyPairAndTimeWindow(currencyPair, ofMinutes(5));
// WHEN
val ohlcList = Panache.withTransaction(
() -> Panache.currentTransaction()
.invoke(Transaction::markForRollback)
.replaceWith(insertKline)
.chain(() -> getOhlcList))
.await().indefinitely();
// THEN
assertThat(ohlcList).hasSize(1);
val ohlc = ohlcList.get(0);
assertThat(ohlc).extracting(Ohlc::getCloseTime, Ohlc::getVolume)
.containsExactly(closeTime, volume);
}
}

我得到了这个异常:

java.lang.RuntimeException: java.util.concurrent.TimeoutException
at io.quarkus.hibernate.reactive.panache.common.runtime.AbstractJpaOperations.executeInVertxEventLoop(AbstractJpaOperations.java:52)
at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.subscribe(UniRunSubscribeOn.java:25)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)

AbstractJpaOperations,我可以看到:

public abstract class AbstractJpaOperations<PanacheQueryType> {
// FIXME: make it configurable?
static final long TIMEOUT_MS = 5000;
...
}

也,同样的问题,当我试图使用runOnContext():

@Test
@DisplayName("ohlc matches inserted kline")
void ohlcMatchesInsertedKline() throws ExecutionException, InterruptedException {
// GIVEN
val volume       = BigDecimal.valueOf(1d);
val closeTime    = LocalDateTime.now().withSecond(0).withNano(0);
val currencyPair = new CurrencyPair("BTC", "USDT");
val currencyPairEntity = currencyPairRepository
.findOrCreate(currencyPair)
.await().indefinitely();
val kline = KlineEntity.builder()
.id(new KlineId(currencyPairEntity, closeTime))
.volume(volume)
.build();
val insertKline = Uni.createFrom().item(kline)
.call(klineRepository::persist);
val getOhlcList  = service.listOhlcByCurrencyPairAndTimeWindow(currencyPair, ofMinutes(5));
val insertAndGet = insertKline.chain(() -> getOhlcList);
// WHEN
val ohlcList = runAndRollback(insertAndGet)
.runSubscriptionOn(action -> vertx.getOrCreateContext()
.runOnContext(action))
.await().indefinitely();
// THEN
assertThat(ohlcList).hasSize(1);
val ohlc = ohlcList.get(0);
assertThat(ohlc).extracting(Ohlc::getCloseTime, Ohlc::getVolume)
.containsExactly(closeTime, volume);
}
private static Uni<List<Ohlc>> runAndRollback(Uni<List<Ohlc>> getOhlcList) {
return Panache.withTransaction(
() -> Panache.currentTransaction()
.invoke(Transaction::markForRollback)
.replaceWith(getOhlcList));
}

注释@TestReactiveTransaction

Quarkus提供了注释@TestReactiveTransaction:它将测试方法包装在事务中,并在最后回滚事务。

我将使用quarkus-test-vertx来测试响应式代码:

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-test-vertx</artifactId>
<scope>test</scope>
</dependency>

下面是一个测试类的例子,可以与Hibernate react快速入门与Panache一起使用(在添加quarkus-test-vertx依赖之后):

实体:

@Entity
public class Fruit extends PanacheEntity {
@Column(length = 40, unique = true)
public String name;
...
}

测试类:

package org.acme.hibernate.orm.panache;
import java.util.List;
import org.junit.jupiter.api.Test;
import io.quarkus.test.TestReactiveTransaction;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.vertx.UniAsserter;
import io.smallrye.mutiny.Uni;
import org.assertj.core.api.Assertions;
@QuarkusTest
public class ExampleReactiveTest {
@Test
@TestReactiveTransaction
public void test(UniAsserter asserter) {
printThread( "Start" );
Uni<List<Fruit>> listAllUni = Fruit.<Fruit>listAll();
Fruit mandarino = new Fruit( "Mandarino" );
asserter.assertThat(
() -> Fruit
.persist( mandarino )
.replaceWith( listAllUni ),
result -> {
Assertions.assertThat( result ).hasSize( 4 );
Assertions.assertThat( result ).contains( mandarino );
printThread( "End" );
}
);
}
private void printThread(String step) {
System.out.println( step + " - " + Thread.currentThread().getId() + ":" + Thread.currentThread().getName() );
}
}

@TestReactiveTransaction在一个将在测试结束时回滚的事务中运行该方法。UniAsserter使测试响应式代码成为可能,而不必阻塞任何东西。

注释@RunOnVertxContext

也可以在Vert中运行测试。x事件循环使用quarkus-vertx-test库中的注释@RunOnVertxContext:

这样您就不需要将整个测试封装在一个事务中:

import io.quarkus.test.vertx.RunOnVertxContext;
@QuarkusTest
public class ExampleReactiveTest {
@Test
@RunOnVertxContext
public void test(UniAsserter asserter) {
printThread( "Start" );
Uni<List<Fruit>> listAllUni = Fruit.<Fruit>listAll();
Fruit mandarino = new Fruit( "Mandarino" );
asserter.assertThat(
() -> Panache.withTransaction( () -> Panache
// This test doesn't have @TestReactiveTransaction
// we need to rollback the transaction manually
.currentTransaction().invoke( Mutiny.Transaction::markForRollback )
.call( () -> Fruit.persist( mandarino ) )
.replaceWith( listAllUni )
),
result -> {
Assertions.assertThat( result ).hasSize( 4 );
Assertions.assertThat( result ).contains( mandarino );
printThread( "End" );
}
);
}

我终于设法让它工作了,诀窍是推迟Uni的创建:

像:

@QuarkusTest
public class ExamplePanacheTest {
@Test
public void test() {
final var mandarino = new Fruit("Mandarino");
final var insertAndGet = Uni.createFrom()
.deferred(() -> Fruit.persist(mandarino)
.replaceWith(Fruit.<Fruit>listAll()));
final var fruits = runAndRollback(insertAndGet)
.await().indefinitely();
assertThat(fruits).hasSize(4)
.contains(mandarino);
}
private static Uni<List<Fruit>> runAndRollback(Uni<List<Fruit>> insertAndGet) {
return Panache.withTransaction(
() -> Panache.currentTransaction()
.invoke(Transaction::markForRollback)
.replaceWith(insertAndGet));
}
}

最新更新