我使用Quarkus与quarkus-resteasy-reactive
和quarkus-keycloak-admin-client-reactive
扩展。我正在构建一个ServerRequestFilter
,查询keycloak的用户属性,但即使我使用响应式客户端,我得到BlockingNotAllowedException
时调用它的方法。
下面是过滤器:
public class Filters {
@Inject
Keycloak keycloak;
@ServerRequestFilter
public Uni<Response> filter(ContainerRequestContext requestContext) {
return Uni.createFrom().item(() ->
keycloak.realm("my-realm")
.users()
.search("user-that-i-get-from-context")
.stream()
.findFirst()
.orElseThrow()
.firstAttribute("the-attribute")).map(attr -> {
if (attr.equals("some-value")){
return null;
}
return Response.status(403).build();
});
}
}
我已经尝试将runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
添加到Uni
,但错误仍然存在。
阻塞请求不是一个选项,因为我希望我的端点是响应的。
我怀疑问题是在过滤器和Keycloak
客户端注入的上下文中的某个地方,但我一直无法查明问题
不幸的是,KeycloakClient使用经典的ResteasyClient阻塞,而不是ResteasyReactiveClient。
我用这样的Vertx解决了这个问题:
import io.vertx.mutiny.core.Vertx;
public class Filters {
@Inject Keycloak keycloak;
@Inject Vertx vertx;
@ServerRequestFilter
public Uni<Response> filter(ContainerRequestContext requestContext) {
return vertx.getOrCreateContext().executeBlocking(
Uni.createFrom().emitter(emitter -> {
var attr = keycloak.realm("my-realm")
.users()
.search("user-that-i-get-from-context")
.stream()
.findFirst()
.orElseThrow()
.firstAttribute("the-attribute");
if ("some-value".equals(attr)) {
emitter.complete(null);
} else {
emitter.complete(Response.status(403).build());
}
}));
}
}
问题是,尽管quarkus-keycloak-admin-client-reactive
在底层使用RESTEasy Reactive并且做非阻塞API,但Keycloak API并没有暴露Mutiny API的变体。你想对穆特尼做的事不会成功的。
当前唯一的解决方案是通过返回Response
而不是Uni<Response>
使您的过滤器成为阻塞过滤器
@Jean Merelis的回答起作用了,他让我走上了正确的道路。我想补充一下,我也发现了这个runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
也可以工作,但只有当我做Uni.createFrom().item()
的供应商变体时,否则mutiny将认为里面的项目是一个完整的单元,并且不会将上下文从事件循环切换到执行线程。当我测试它时,我使用了非供应商的变体:
Uni.createFrom().item(keycloak.realm("my-realm")...)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
所以我在问题中发布的代码将与.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
我有一个类似的问题,我解决如下使用Uni.createFrom().completionStage(future)
:
我的用例是获得一个Keycloak用户资源和表示作为Uni管道的一部分,然后使用该用户数据更新管道下的Keycloak用户。
下面返回的自定义KeycloakUser
类只是存储资源和表示以供以后使用。
获取Keycloak用户数据的简化命令式函数:
private KeycloakUser doGetKeycloakUser(String username){
// Get our realm resource
realmResource = keycloak.realm("quarkus");
// Get the Users
UsersResource usersResource = realmResource.users();
// Get the specific user by username
List<UserRepresentation> user = usersResource.search(username);
// Get that User's representation
UserRepresentation userRepresentation = user.get(0);
// Get the user resource via the user id
UserResource userResource = usersResource.get(userRepresentation.getId());
// To update a Keycloak user we need both the representation and the resource.
// My KeycloakUser class just stores them for later use downstream in the Uni pipeline.
KeycloakUser kcUser = new KeycloakUser(userRepresentation, userResource);
return kcUser;
}
然后我调用这个函数作为Uni链的一部分,像这样:
public Uni<KeycloakUser> getKeycloakUser(String username){
// FYI: about this block:
// While it is true you are using the quarkus-keycloak-admin-client-reactive
// ie a *reactive* client,
// the org.keycloak.admin.client.Keycloak itself is not reactive.
// So, when you call it as part of a Uni pipeline,
// it blocks the IO thread and throws a BlockingNotAllowedException.
// See: https://github.com/quarkusio/quarkus/issues/29786
// So, here we wrap the blocking function doGetKeycloakUser() in a CompletableFuture
// and run it as an async operation, then when it completes we
// turn that completionStage into a Uni.
CompletableFuture<KeycloakUser> future = CompletableFuture.supplyAsync(() -> {
return doGetKeycloakUser(username);
});
return Uni.createFrom().completionStage(future);
}
也许不是最好的解决方案,但它有效。
在您的测试中,您需要使用awaitItem()
而不是assertCompleted()
:
Uni<String> uni = adminClient.getKeycloakUser(username)
.chain(kcUser -> adminClient.setAttributeToUser(kcUser, "mykey", "myvalue"))
.onItem()
. ...
UniAssertSubscriber<String> subscriber = uni.subscribe().withSubscriber(UniAssertSubscriber.create());
subscriber.awaitItem().assertItem("Was set");