如何在Project Reactor中实现doAfterNext操作符



RxJava2有一个doAfterNext操作符,它向下游发送项目,然后调用使用者。Project Reactor似乎没有这样的操作员,所以我想找到一些关于创建自己的最佳方法的建议,以实现同样的目标。

用例是在订阅者收到项目后释放内存

不确定离开doOnEach是否是有效的解决方案:

public class ByteBufferSafeReleaseConsumer implements Consumer<Signal<ByteBuffer<?>>> {
private final List<ByteBuffer<?>> elements = new ArrayList<>();
@Override
public void accept(Signal<ByteBuffer<?>> signal) {
if (signal.isOnNext()) {
ByteBuffer<?> next = signal.get();
if (next != null) {
elements.add(next);
}
}
if (signal.isOnComplete() || signal.isOnError()) {
for (ByteBuffer<?> buffer : elements) {
ByteBufferUtils.safeRelease(buffer);
}
}
}
}
ByteBufferSafeReleaseConsumer consumer = new ByteBufferSafeReleaseConsumer()
Flux.from(byteBufferPublisher).doOnEach(consumer)

最新更新