Quarkus - Keycloak client in ServerRequestFilter in reactive



我使用Quarkus与quarkus-resteasy-reactivequarkus-keycloak-admin-client-reactive扩展。我正在构建一个ServerRequestFilter,查询keycloak的用户属性,但即使我使用响应式客户端,我得到BlockingNotAllowedException时调用它的方法。

下面是过滤器:

public class Filters {
@Inject
Keycloak keycloak;
@ServerRequestFilter
public Uni<Response> filter(ContainerRequestContext requestContext) {
return Uni.createFrom().item(() -> 
keycloak.realm("my-realm")
.users()
.search("user-that-i-get-from-context")
.stream()
.findFirst()
.orElseThrow()
.firstAttribute("the-attribute")).map(attr -> {
if (attr.equals("some-value")){
return null;
}
return Response.status(403).build();
});
}
}

我已经尝试将runSubscriptionOn(Infrastructure.getDefaultWorkerPool())添加到Uni,但错误仍然存在。

阻塞请求不是一个选项,因为我希望我的端点是响应的。

我怀疑问题是在过滤器和Keycloak客户端注入的上下文中的某个地方,但我一直无法查明问题

不幸的是,KeycloakClient使用经典的ResteasyClient阻塞,而不是ResteasyReactiveClient。

我用这样的Vertx解决了这个问题:

import io.vertx.mutiny.core.Vertx;
public class Filters {
@Inject Keycloak keycloak;
@Inject Vertx vertx;
@ServerRequestFilter
public Uni<Response> filter(ContainerRequestContext requestContext) {
return vertx.getOrCreateContext().executeBlocking(
Uni.createFrom().emitter(emitter -> {
var attr = keycloak.realm("my-realm")
.users()
.search("user-that-i-get-from-context")
.stream()
.findFirst()
.orElseThrow()
.firstAttribute("the-attribute");
if ("some-value".equals(attr)) {
emitter.complete(null);
} else {
emitter.complete(Response.status(403).build());
}
}));
}
}

问题是,尽管quarkus-keycloak-admin-client-reactive在底层使用RESTEasy Reactive并且做非阻塞API,但Keycloak API并没有暴露Mutiny API的变体。你想对穆特尼做的事不会成功的。

当前唯一的解决方案是通过返回Response而不是Uni<Response>使您的过滤器成为阻塞过滤器

@Jean Merelis的回答起作用了,他让我走上了正确的道路。我想补充一下,我也发现了这个runSubscriptionOn(Infrastructure.getDefaultWorkerPool())也可以工作,但只有当我做Uni.createFrom().item()的供应商变体时,否则mutiny将认为里面的项目是一个完整的单元,并且不会将上下文从事件循环切换到执行线程。当我测试它时,我使用了非供应商的变体:

Uni.createFrom().item(keycloak.realm("my-realm")...)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())

所以我在问题中发布的代码将与.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())

一起工作

我有一个类似的问题,我解决如下使用Uni.createFrom().completionStage(future):

我的用例是获得一个Keycloak用户资源和表示作为Uni管道的一部分,然后使用该用户数据更新管道下的Keycloak用户。

下面返回的自定义KeycloakUser类只是存储资源和表示以供以后使用。

获取Keycloak用户数据的简化命令式函数:

private KeycloakUser doGetKeycloakUser(String username){
// Get our realm resource
realmResource = keycloak.realm("quarkus");
// Get the Users
UsersResource usersResource = realmResource.users();
// Get the specific user by username
List<UserRepresentation> user = usersResource.search(username);
// Get that User's representation
UserRepresentation userRepresentation = user.get(0);
// Get the user resource via the user id
UserResource userResource = usersResource.get(userRepresentation.getId());
// To update a Keycloak user we need both the representation and the resource.
// My KeycloakUser class just stores them for later use downstream in the Uni pipeline.
KeycloakUser kcUser = new KeycloakUser(userRepresentation, userResource);
return kcUser;
}

然后我调用这个函数作为Uni链的一部分,像这样:

public Uni<KeycloakUser> getKeycloakUser(String username){
// FYI: about this block:
// While it is true you are using the quarkus-keycloak-admin-client-reactive
// ie a *reactive* client,
// the org.keycloak.admin.client.Keycloak itself is not reactive.
// So, when you call it as part of a Uni pipeline,
// it blocks the IO thread and throws a BlockingNotAllowedException.
// See: https://github.com/quarkusio/quarkus/issues/29786
// So, here we wrap the blocking function doGetKeycloakUser() in a CompletableFuture
// and run it as an async operation, then when it completes we
// turn that completionStage into a Uni.
CompletableFuture<KeycloakUser> future = CompletableFuture.supplyAsync(() -> {
return doGetKeycloakUser(username);
});
return Uni.createFrom().completionStage(future);
}

也许不是最好的解决方案,但它有效。

在您的测试中,您需要使用awaitItem()而不是assertCompleted():

Uni<String> uni = adminClient.getKeycloakUser(username)
.chain(kcUser -> adminClient.setAttributeToUser(kcUser, "mykey", "myvalue"))
.onItem()
. ...
UniAssertSubscriber<String> subscriber = uni.subscribe().withSubscriber(UniAssertSubscriber.create());
subscriber.awaitItem().assertItem("Was set");

最新更新