为什么不能从Flow API订阅服务器中的onComplete访问类变量



使用Java流API。我有一个Processors的链和一个终端Subscriber。我已经对Subscriber进行了编码,这些CCD_3保持了接收到的与SubmissionPublisher配合良好的项目的计数。

但是,当我将值从Processor(扩展SubmissionPublisher(传递到下面的Subscriber时,不会向控制台打印任何输出。

如果我删除minValue.get(),文本将打印到控制台。为什么调用类变量会导致onComplete无法按预期执行?

public class MinValue implements Subscriber<Integer> {
private Flow.Subscription subscription;
private AtomicInteger minValue=new AtomicInteger(Integer.MAX_VALUE);
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
if(minValue.get()>item){
System.out.println("Min Value : "+item);
minValue.set(item);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error  + throwable.getMessage() nMin Value is : "+minValue.get());
}
@Override
public void onComplete() {
System.out.println("Successful Completion  - Min Value is : "+minValue.get());
}
}

编辑-添加最小可复制示例

public class NewClass {
public static void main(String args[]) {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
Subscriber<Integer> subscriber = new MinValue();
publisher.subscribe(subscriber);

publisher.submit(10);
publisher.submit(11);
publisher.submit(9);
publisher.submit(12);
publisher.submit(8);
publisher.close();
}
}

minValue.get((返回一个int,而item是一个CCD11对象。也许这就是问题所在。

问题是主线程在调用subscribers onComplete((之前退出。

如果在调用publisher.close((后添加睡眠,subscriber.onComplete((将有时间在自己的线程上执行。

最新更新