刷新Fluxes映射中存储的Web客户端配置



我有一个很大的困难,也许有人不知何故知道答案,或者至少有一条线索需要抓住:D说到点子上,我在spring boot上有一个环境,在那里我将定义的web客户端(连接到不同主机(的流量存储在的映射中,以连接并获取数据。如果前端有人要求相同的数据源,我会从缓存中返回数据,而不是创建一个新的Web客户端实例:(

它听起来很有魅力,但当实例上的ip/端口发生更改时,一切都会崩溃-然后用户无法从存储的网络客户端获取数据,因为它指向一个无效的连接:(.我已经考虑了几个解决问题的方法:

  • 未检测到活动订阅者并将其从地图中删除
  • 安排清理操作以移除旧的未使用/故障焊剂
  • 定义反向传播机制中交换连接的操作详细信息

在下面的代码中,我给出了缓存管理器的定义。

我将非常感谢任何帮助或建议如何处理这个硬饼干:D

public abstract class CachedFluxReactiveManager<T, M extends FiltreableCommand> extends CachedReactiveManager<T, M, Flux<T>> {
protected final Map<ReactiveCallsKey, Flux<T>> connections = new ConcurrentHashMap<>();
protected final FluxClientCreator<T> clientCreator;
protected final InstancesQueryCreator queryCreator;
private final boolean useCache;
protected CachedFluxReactiveManager(InstancesCache instancesCache, FluxClientCreator<T> clientCreator,
InstancesQueryCreator queryCreator, boolean useCache) {
super(instancesCache);
this.clientCreator = clientCreator;
this.queryCreator = queryCreator;
this.useCache = useCache;
}
@Override
public Flux<T> getData(M command) {
if (useCache) {
if (command.isFiltered()) {
return createAndReturnConnection(command).next().flux();
}
return ofNullable(getConnectionIfExists(command))
.orElseGet(() -> createAndReturnConnection(command));
} else {
return createAndReturnConnection(command);
}
}
@Override
protected Flux<T> getConnectionIfExists(M command) {
return connections.get(
getReactiveCallsKey(command.getApplicationName(), command.getTicker(), command.getReferenceEx()));
}}```

是的,需要一些类似于健康检查的操作。

HttpClient httpClient = HttpClient.create()
.tcpConfiguration(client ->
client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.doOnDisconnected(conn -> {
//todo Remove the cached connection.
}));
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();

首先感谢@echooymxq和@Martin Tarjányi在某种程度上,两者都帮助我拉皮条获取密码。以下是我所做的:

  • 我已经将缓存的实现更改为咖啡因,这对我来说简化了很多:D
// the cache implementation
protected Cache<ReactiveCallsKey, Flux<T>> connections;

// cache configuration (cleanup) 
protected final Cache<ReactiveCallsKey, Flux<T>> connections = Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(Duration.ofMinutes(10))
.build();
  • 发现一个问题,我没有清理缓存,因为有些事情很糟糕

protected Flux<T> manageConnectionConfiguration(ReactiveCall reactiveCall,
Flux<T> out) {
return out.doOnError(throwable -> {
removeInstanceCallFromCache(reactiveCall);
log.error("connection error for {} reactive source",
reactiveCall.getApplicationName(),
new OperationException(systemExceptionMessage, HttpStatus.INTERNAL_SERVER_ERROR,
throwable));
})
.doOnSubscribe(
subscription -> log.debug("subscription established for {} ", reactiveCall.getApplicationName()))
.doOnComplete(() -> log.debug("subscription completed for {} ", reactiveCall.getApplicationName()));
}
protected void removeInstanceCallFromCache(ReactiveCall reactiveCall) {
Try.run(() -> connections.invalidate(
new ReactiveCallsKey(reactiveCall.getApplicationName(), reactiveCall.getTicker(),
reactiveCall.getReference())));
}
  • 我还调整了超时的等待时间,但最终我恢复了这一更改,因为默认值很好:(

最新更新