当从数据库(postgresql)中获取4000或更多的值时,不可能过滤元素.(使用响应式流) &g



有必要对数据库中的值流进行过滤(过滤必须以响应式、非阻塞式的方式执行)。您需要获取传入元素,从中获取email字段并验证该值。在第一个正确的结果之后。停止处理。

然而。为了得到一个有效值,在得到第一个合适的值之前,可能需要处理几千个元素。

public class User {
@Id
Long id;
String email;

String phoneHome;
}

有一个按电话号码搜索

private Mono<Response> findUserByHomePhone(Response response) {
return Mono.just(response)
.flatMap(this::retrieveUserList);
}
private Mono<Response> retrieveUserList (Response response) {
String phone = retrievePhoneFromResponse (response);

return Mono.from(userService.getByPhone (phone)
.groupBy(Customer::getId)
.flatMap(this::processGroupedObjects)
.switchOnFirst((signal, flux) -> getFirsFoundElement(signal, response)))
.as(Log.of(log,
"Search by phone {}",
phone (response))::info);
}
  • userService。getByPhone(电话)-通过服务类(@Transactional)ReactiveCrudRepository中定义的方法这个方法向数据库发起一个请求。. 请求类型:
select * from users by phone = $1
  • .groupBy(客户:getId)-我们迭代元素流,流中的每个元素都是一个User. 我们执行分组通过id.

  • .flatMap (:: processGroupedObjects)

    • 处理分组对象. 这是冻结的地方是有可能的。在调试模式下,我无法检查所有的元素。因为前几千个元素都有email→零。
  • 。switchOnFirst((信号,磁通)->getFirsFoundElement(信号,响应)))-只要有一个用户的email字段被填写,我们就返回结果。

private Mono< Response > getFirsFoundElement(Signal<? extends User> signal, Response response) {
boolean isFoundElement = signal.hasValue();
if (isFoundElement) {
return Mono.just(response);
}
return Mono.error(new Exception());
}

private Flux<User> processGroupedObjects(GroupedFlux<Long, User> group) {
return group
.mapNotNull(this::checkEmailOnNull);
}
private User checkEmailOnNull (User user) {
String email = user.getEmail();
if (Strings.isEmpty(email)){
return null;
}
return user;
}

然而,此时(进程组对象(GroupedFlux<Lang,>组a冻结发生时,我没有在控制台中看到错误。

数据库中源元素的个数来自4 000to250003000) .

我发现了这个:GroupedFlux

分组最适合于具有中低数量的组。还必须命令式地使用组(例如由flatMap),以便groupBy继续从上游和喂养更多的群体。有时,这两个约束相乘导致挂起,例如当您具有高基数时使用组的flatMap的并发性太低。

然而,我不清楚文档中的意思,虽然我认为我的情况在这里描述,但我不确定。

Мaybe有人知道为什么会冻结吗?发生了,怎样才能修复?

正如您所发现的,分组不适用于包含许多组的流。所以我建议完全避免分组。

因为你的目标只是找到任何有电子邮件的用户,所以很简单:

userService.getByPhone (phone)
.filter((user) -> user.getEmail() != null)
.next()

如果你想接收所有用户的邮件,你不需要groupBy,只需要distinct:

userService.getByPhone (phone)
.filter((user) -> user.getEmail() != null)
.distinct((user) -> user.getId()) 

我这样组织滤镜:


private Mono<Response> filterUsersByEmail(Response response) {
String phone = retrieveMobilePhoneFromResponse(response);
return userService
.findByMobilePhone(phone)
.filter(user -> isaBoolean(user))
.next()
.switchIfEmpty(
Mono.defer(() -> Mono.error(
new Exception())
)
)
.flatMap(user -> Mono.just(response));
}
private static boolean isaBoolean(User user) {
String email = user.getEmail();
boolean b = !Strings.isEmpty(email);
return b;
}

相关内容

  • 没有找到相关文章

最新更新