cline conture infailure持续consa.concurrent.future无法正常工作



我遇到了一个问题,其中我有两种方法,第一个呼叫第二个循环中的第二个方法,第二个将Future创建为:

public class WorkersCoordinator {
    private static Logger LOGGER = 
        LoggerFactory.getLogger(WorkersCoordinator.class);
    private final Timeout timeout;
    private final ActorSystem system;
    private final List<Class<? extends BaseWorker>> workers;
    private final Map<Class, ActorRef> refMap;
    private final WorkResultPackageQueue pkgQueue;
    private final ActorFactory actorFactory;
    @Autowired
    public WorkersCoordinator(final ApplicationConfiguration config,
                             final ActorSystem system,
                             final ActorFactory actorFactory, 
                             final WorkerFactory workerFactory,
                             final WorkResultPackageQueue pkgQueue) {
        timeout = new Timeout(config.getTimeoutInMilliseconds(), 
                              TimeUnit.MILLISECONDS);
        this.system = system;
        this.actorFactory = actorFactory;
        this.pkgQueue = pkgQueue;
        refMap = Map.newHashMap();
        workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
    }
    public void delegateWorkers() {
        for (Class<? extends BaseWorker> worker : workers) {
            if (refMap.containsKey(worker) continue;
            sendWork(worker);
        }
    }
    private void sendWork(Class<? extends BaseWorker> worker) {
        // GetDataActor extends AbstractActor
        ActorRef actorRef = actorFactory.create(GetDataActor.class);
        Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);
        responseRef.onFailure(new OnFailure() {
            @Override
            public void onFailure(Throwable failure) throws Throwable {
                LOGGER.error("Worker {} encountered a problem - cancelling.", 
                             worker.getSimpleName());
                if (refMap.containsKey(worker)) {
                    refMap.remove(worker);
                }
            }
        }, system.dispatcher());
        responseRef.onSuccess(new OnSuccess<Object>() {
            @Override
            public void onSuccess(Object msg) throws Throwable {
                if (msg instanceof WorkResultPackage) {
                    final WorkResultPackage reportPackage = (WorkResultPackage) msg;
                    LOGGER.info(
                        "Received AssetDataPackage from {}, containing {} items",
                        reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                        reportPackage.getPkg().getData().size());
                    pkgQueue.enqueue(reportPackage);
                } else {
                    LOGGER.eror(
                        "Expected to receive WorkResultPackage Object but received: {}",
                        msg.getClass());
                        throw new UnknownObjectReceived(msg);
                }
            }
        }, system.dispatcher());
        refMap.put(worker, actorRef);
    }
}

问题是,我相信responseRef.onFailure的封闭不会像我期望的那样行动。如果我将其称为3名工人,其中一名我将其配置为失败,则处理失败,但记录的记录是不确定的,即使该工人将被报告为失败,即使我始终是我标记为失败的人。我是这个技术堆栈的新手(Java,Scala Futures和Akka),以及建立的代码基础,在该代码基础上,我发现了这种已建立的模式,因此我不知道我是否忽略了某些东西或误解Java/Scala Futures的封闭。这里的关键点是,我需要报告哪个工人失败并将其从refMap中删除,以便不再考虑它。甚至陌生人的事实是,即使有错误的工人被报告为失败,所有工人似乎都可以完成并从refMap中删除。

更新:在没有运气的答案之后,我做了一些调查,我进行了一些调查,并找到了另一篇文章,回答了Java 8是否支持关闭:

Java 8支持关闭?

简短的答案,我相信确实如此。但是,它谈到了final或有效的final变量。因此,我如下更新了代码。希望这能使人们了解关闭的人,以帮助我理解为什么他们不习惯(在C#和JavaScript中)。我只是向sendWork(...)发布更新,以突出我尝试无济于事的努力。

private void sendWork(Class<? extends BaseWorker> worker) {
    // GetDataActor extends AbstractActor
    ActorRef actorRef = actorFactory.create(GetDataActor.class);
    Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);
    Consumer<Throwable> onFailureClosure = (ex) -> {
            LOGGER.error("Worker {} encountered a problem - cancelling.", 
                         worker.getSimpleName());
            if (refMap.containsKey(worker)) {
                refMap.remove(worker);
            }
    }
    responseRef.onFailure(new OnFailure() {
        @Override
        public void onFailure(Throwable failure) throws Throwable {
            onFailureClosure.accept(failure);
        }
    }, system.dispatcher());
    responseRef.onSuccess(new OnSuccess<Object>() {
        @Override
        public void onSuccess(Object msg) throws Throwable {
            if (msg instanceof WorkResultPackage) {
                final WorkResultPackage reportPackage = (WorkResultPackage) msg;
                LOGGER.info(
                    "Received AssetDataPackage from {}, containing {} items",
                    reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                    reportPackage.getPkg().getData().size());
                pkgQueue.enqueue(reportPackage);
            } else {
                LOGGER.eror(
                    "Expected to receive WorkResultPackage Object but received: {}",
                    msg.getClass());
                    throw new UnknownObjectReceived(msg);
            }
        }
    }, system.dispatcher());
    refMap.put(worker, actorRef);
}

代码存在一个基本问题,可能会导致您看到的行为:代码正在并发环境中突变数据,而没有任何保障数据。将来可以随时执行未来的回调,并有可能并行运行。将来有多个回调检查和突变相同的数据可能会导致怪异的行为。

Java中典型的方法处理可变数据的并发访问是使用同步和锁。幸运的是,由于您使用了Akka,因此有一种更好的方法。基本上,重构WorkersCoordinator是演员,并使用顺序消息处理的演员行为安全处理可变状态。

为了进一步简化问题,您可以在这种情况下放弃使用ask,而是使用tell在参与者之间进行通信。我猜想在这里使用未来以捕获错误,但是更好地处理错误的方法是Akka的主管策略。也就是说,如果WorkersCoordinator是演员,并且每个GetDataActor实例是WorkersCoordinator的孩子,则后者可以决定如何处理前者中出现的错误。例如,如果在GetDataActor中抛出异常,则协调员可以决定记录错误,然后停止孩子。

以下是包含上述想法的替代WorkersCoordinator

public class WorkersCoordinator extends AbstractActor {
  private static Logger LOGGER = ...
  private final List<Class<? extends BaseWorker>> workers;
  private final Map<ActorRef, Class> refMap;
  private final WorkResultPackageQueue pkgQueue;
  public WorkersCoordinator(final WorkerFactory workerFactory,
                            final WorkResultPackageQueue pkgQueue) {
    this.pkgQueue = pkgQueue;
    this.refMap = Map.newHashMap();
    this.workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
  }
  static Props props(WorkerFactory factory, WorkResultPackageQueue queue) {
    return Props.create(WorkersCoordinator.class, factory, queue);
  }
  static public class Delegate {}
  private static SupervisorStrategy strategy =
    new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), DeciderBuilder.
      matchAny(t -> {
         ActorRef failedChild = getSender();
         LOGGER.error("This child failed: {}", failedChild);
         refMap.remove(failedChild);
         stop();
      })
      .build());
  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }
  @Override
  public void preStart() {
    for (worker : workers) {
       ActorRef child = getContext().actorOf(Props.create(GetDataActor.class));
       refMap.put(child, worker);
    }
  }
  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(Delegate.class, d -> {
        refMap.forEach((actor, msg) -> actor.tell(msg, getSelf()));
      })
      .match(WorkResultPackage.class, p -> {
        LOGGER.info("Received AssetDataPackage from {}, containing {} items",
                    reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                    reportPackage.getPkg().getData().size());
        pkgQueue.enqueue(p);
        ActorRef dataActor = getSender();
        refMap.remove(dataActor);
      })
      .matchAny(
        msg -> LOGGER.eror("Expected to receive WorkResultPackage Object but received: {}", msg)
      )
      .build();
  }
}

有关上述代码的一些注释:

  • 而不是使用ActorFactory,它似乎是某些自定义类,而是使用Props
  • refMap被倒置,因此ActorRef现在是关键,并且工作类型是值。这使我们可以根据ActorRefrefMap中删除条目,这是在儿童演员的成功回复的情况下,并且如果孩子抛出例外。
  • 我删除了@Autowired注释,以简单起见。此处找到有关参与者依赖注入的更多信息。
  • 要创建并启动WorkersCoordinator,请调用其props方法。为了启动这项工作,演员期望一条自定义的Delegate消息:一旦演员收到此消息,它将通过refMap迭代,向地图中的每个孩子发送与该孩子相关的工作单位。
WorkerFactory Factory = ...WorkResultPackagequeue队列= ...ActorRef协调器= actorsystem.actorof(workersCoordinator.props(工厂,queue));委托Dowork = new差异();coordinator.tell(dowork,actorref.nosender());

最新更新