如何用Redisson锁定和解锁包裹反应堆通量流



我有一个从数据库中读取对象的Flux流
对于这些对象中的每一个,我都有一个要运行的处理函数
我希望处理函数在获取给定对象ID上的Redis锁后执行,并在处理后释放锁(如果处理函数抛出错误(。

Flux中创建这样一个流的最简单方法是什么
以下是我尝试使用转换时失败的一些代码
我可能会让withLock接受一个附加为afterLock.flatMap(func)的函数,但我正在寻找一个可以避免这种情况的解决方案。

我希望这对流的其余部分尽可能透明,并且不需要单独的锁定和解锁功能附件,只需要一个附件就可以做到";锁定过程解锁";。


private <T> Function<Flux<T>, Publisher<T>> withLock(Function<T, String> keyExtractor) {

return flux -> {
Flux<T> afterLock = flux.flatMap(ev -> redis.getLock(keyExtractor.apply(ev)).lock(1000L, TimeUnit.MILLISECONDS).map(ret -> ev));

// processing logic should be attached somewhere here

afterLock
.flatMap(ret -> redis.getLock(keyExtractor.apply(ret)).unlock()
.thenReturn(ret)
.onErrorResume(e -> redis.getLock(keyExtractor.apply(ret)).unlock().thenReturn(ret)));

return afterLock;

};
}

Flux.just(someObjectFromDatabase)
.transform(withLock(t -> t.id()))
.flatMap(this::objectProcessor)

其中一个解决方案是使用Mono.usingWhen,它允许对资源供应商、资源关闭和清理使用异步操作。

Mono.usingWhen(
lockService.acquire(key),
lock -> process(),
lock -> lockService.release(lock)
);

在我们的例子中,我们将Redis锁封装到LockService中,看起来像下面的

public interface ReactiveLockService {
Mono<LockEntry> acquire(String key, Duration expireAfter);
Mono<Void> release(LockEntry lock);
interface LockEntry {
String getKey();
String getValue();
}
}

感谢您的回答@Alex,与此同时,我能够提供这样的东西,它在组织流和故障恢复方面非常灵活(我花了一段时间来处理边缘情况…(
它可以用作对stream.flatMap(withLock(..., processor)的调用

public static <T> Function<T, Flux<T>> withLock(
long tid, String lockPrefix, int lockTimeMs, Function<T, String> keyExtractor, Function<Mono<T>, Flux<T>> processor, RedissonReactiveClient redis) {
// If Redis lock or unlock operations fail, that will on purpose propagate the error.
// If processor throws an error, lock will be unlocked first before error propagation.
// tid has to be unique for each local task, it's a virtual "thread id" so if it's used concurrently locks will not protect the code
return flux -> {
Function<T, RLockReactive> getLock = ev -> redis.getLock(lockPrefix + keyExtractor.apply(ev));
RLockReactive lock = getLock.apply(flux);
Supplier<Mono<T>> unlock = () -> lock.unlock(tid).then(Mono.<T>empty());
Supplier<Mono<T>> doLock = () -> lock.lock(lockTimeMs, TimeUnit.MILLISECONDS, tid).then(Mono.<T>empty());
// Careful not to call map/flatMap on redis.lock/redis.unlock which returns Void and so it won't trigger on empty stream...lol!
return Flux.concat(
Mono.defer(doLock),
Flux.defer(() -> processor.apply(Mono.just(flux))
.onErrorResume(err -> unlock.get()
.onErrorResume(unlockError -> {
err.addSuppressed(unlockError);
// Propagate original processor error, but note the unlock error as suppressed
return Mono.error(err);
})
.then(Mono.error(err)))),
Mono.defer(unlock)
);
};

最新更新