RxJava2有一个doAfterNext
操作符,它向下游发送项目,然后调用使用者。Project Reactor似乎没有这样的操作员,所以我想找到一些关于创建自己的最佳方法的建议,以实现同样的目标。
用例是在订阅者收到项目后释放内存
不确定离开doOnEach
是否是有效的解决方案:
public class ByteBufferSafeReleaseConsumer implements Consumer<Signal<ByteBuffer<?>>> {
private final List<ByteBuffer<?>> elements = new ArrayList<>();
@Override
public void accept(Signal<ByteBuffer<?>> signal) {
if (signal.isOnNext()) {
ByteBuffer<?> next = signal.get();
if (next != null) {
elements.add(next);
}
}
if (signal.isOnComplete() || signal.isOnError()) {
for (ByteBuffer<?> buffer : elements) {
ByteBufferUtils.safeRelease(buffer);
}
}
}
}
ByteBufferSafeReleaseConsumer consumer = new ByteBufferSafeReleaseConsumer()
Flux.from(byteBufferPublisher).doOnEach(consumer)